Add DMaaP publisher configuration to DmaapClient
Change-Id: I59f366bb5045e344e026f8d51e8f8b611b631d73
Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
diff --git a/policy-agent/pom.xml b/policy-agent/pom.xml
index 52c69d5..52761cc 100644
--- a/policy-agent/pom.xml
+++ b/policy-agent/pom.xml
@@ -41,6 +41,7 @@
<sdk.version>1.1.6</sdk.version>
<swagger.version>2.0.0</swagger.version>
<json.version>20180130</json.version>
+ <commons-net.version>3.3</commons-net.version>
<awaitility.version>4.0.1</awaitility.version>
<maven-compiler-plugin.version>3.8.0</maven-compiler-plugin.version>
<formatter-maven-plugin.version>2.8.1</formatter-maven-plugin.version>
@@ -101,6 +102,11 @@
<artifactId>json</artifactId>
<version>${json.version}</version>
</dependency>
+ <dependency>
+ <groupId>commons-net</groupId>
+ <artifactId>commons-net</artifactId>
+ <version>${commons-net.version}</version>
+ </dependency>
<!--TEST -->
<dependency>
<groupId>org.springframework.boot</groupId>
@@ -115,6 +121,11 @@
<dependency>
<groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
<artifactId>cbs-client</artifactId>
+ <version>${sdk.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
+ <artifactId>dmaap-client</artifactId>
<version>${sdk.version}</version>
</dependency>
<!--REQUIRED TO GENERATE DOCUMENTATION -->
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java
index d4f7261..673fe1d 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java
@@ -23,6 +23,7 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
+import java.util.Properties;
import java.util.Vector;
import javax.validation.constraints.NotEmpty;
@@ -41,6 +42,7 @@
private Collection<Observer> observers = new Vector<>();
private Map<String, RicConfig> ricConfigs = new HashMap<>();
+ private Properties dmaapConsumerConfig;
@Autowired
public ApplicationConfig() {
@@ -50,6 +52,13 @@
return this.filepath;
}
+ /*
+ * Do not remove, used by framework!
+ */
+ public synchronized void setFilepath(String filepath) {
+ this.filepath = filepath;
+ }
+
public synchronized Collection<RicConfig> getRicConfigs() {
return this.ricConfigs.values();
}
@@ -63,6 +72,10 @@
throw new ServiceException("Could not find ric: " + ricName);
}
+ public Properties getDmaapConsumerConfig() {
+ return dmaapConsumerConfig;
+ }
+
public static enum RicConfigUpdate {
ADDED, CHANGED, REMOVED
}
@@ -85,7 +98,7 @@
}
}
- public void setConfiguration(@NotNull Collection<RicConfig> ricConfigs) {
+ public void setConfiguration(@NotNull Collection<RicConfig> ricConfigs, Properties dmaapConsumerConfig) {
Collection<Notification> notifications = new Vector<>();
synchronized (this) {
Map<String, RicConfig> newRicConfigs = new HashMap<>();
@@ -109,6 +122,8 @@
this.ricConfigs = newRicConfigs;
}
notifyObservers(notifications);
+
+ this.dmaapConsumerConfig = dmaapConsumerConfig;
}
private void notifyObservers(Collection<Notification> notifications) {
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java
index 75ee135..9dc41f2 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java
@@ -25,32 +25,44 @@
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
-
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
import java.util.Vector;
-
+import javax.validation.constraints.NotNull;
import org.oransc.policyagent.exceptions.ServiceException;
public class ApplicationConfigParser {
private static final String CONFIG = "config";
+
private static Gson gson = new GsonBuilder() //
.serializeNulls() //
.create(); //
private Vector<RicConfig> ricConfig;
+ private Properties dmaapConsumerConfig;
public ApplicationConfigParser() {
}
public void parse(JsonObject root) throws ServiceException {
- JsonObject config = root.getAsJsonObject(CONFIG);
- ricConfig = parseRics(config);
+ JsonObject ricConfigJson = root.getAsJsonObject(CONFIG);
+ ricConfig = parseRics(ricConfigJson);
+ JsonObject dmaapConfigJson = root.getAsJsonObject("streams_subscribes");
+ dmaapConsumerConfig = parseDmaapConsumerConfig(dmaapConfigJson);
}
public Vector<RicConfig> getRicConfigs() {
return this.ricConfig;
}
+ public Properties getDmaapConsumerConfig() {
+ return dmaapConsumerConfig;
+ }
+
private Vector<RicConfig> parseRics(JsonObject config) throws ServiceException {
Vector<RicConfig> result = new Vector<RicConfig>();
for (JsonElement ricElem : getAsJsonArray(config, "ric")) {
@@ -71,4 +83,69 @@
return get(obj, memberName).getAsJsonArray();
}
+ private Properties parseDmaapConsumerConfig(JsonObject consumerCfg) throws ServiceException {
+ Set<Entry<String, JsonElement>> topics = consumerCfg.entrySet();
+ if (topics.size() != 1) {
+ throw new ServiceException("Invalid configuration, number of topic must be one, config: " + topics);
+ }
+ JsonObject topic = topics.iterator().next().getValue().getAsJsonObject();
+ JsonObject dmaapInfo = get(topic, "dmaap_info").getAsJsonObject();
+ String topicUrl = getAsString(dmaapInfo, "topic_url");
+
+ Properties dmaapProps = new Properties();
+ try {
+ URL url = new URL(topicUrl);
+ String passwd = "";
+ String userName = "";
+ if (url.getUserInfo() != null) {
+ String[] userInfo = url.getUserInfo().split(":");
+ userName = userInfo[0];
+ passwd = userInfo[1];
+ }
+ String urlPath = url.getPath();
+ DmaapConsumerUrlPath path = parseDmaapUrlPath(urlPath);
+
+ dmaapProps.put("port", url.getPort());
+ dmaapProps.put("server", url.getHost());
+ dmaapProps.put("topic", path.dmaapTopicName);
+ dmaapProps.put("consumerGroup", path.consumerGroup);
+ dmaapProps.put("consumerInstance", path.consumerId);
+ dmaapProps.put("fetchTimeout", 15000);
+ dmaapProps.put("fetchLimit", 1000);
+ dmaapProps.put("userName", userName);
+ dmaapProps.put("password", passwd);
+ } catch (MalformedURLException e) {
+ throw new ServiceException("Could not parse the URL", e);
+ }
+
+ return dmaapProps;
+ }
+
+ private static @NotNull String getAsString(JsonObject obj, String memberName) throws ServiceException {
+ return get(obj, memberName).getAsString();
+ }
+
+ private class DmaapConsumerUrlPath {
+ final String dmaapTopicName;
+ final String consumerGroup;
+ final String consumerId;
+
+ DmaapConsumerUrlPath(String dmaapTopicName, String consumerGroup, String consumerId) {
+ this.dmaapTopicName = dmaapTopicName;
+ this.consumerGroup = consumerGroup;
+ this.consumerId = consumerId;
+ }
+ }
+
+ private DmaapConsumerUrlPath parseDmaapUrlPath(String urlPath) throws ServiceException {
+ String[] tokens = urlPath.split("/"); // /events/A1-P/users/sdnc1
+ if (tokens.length != 5) {
+ throw new ServiceException("The path has incorrect syntax: " + urlPath);
+ }
+
+ final String dmaapTopicName = tokens[1] + "/" + tokens[2]; // /events/A1-P
+ final String consumerGroup = tokens[3]; // users
+ final String consumerId = tokens[4]; // sdnc1
+ return new DmaapConsumerUrlPath(dmaapTopicName, consumerGroup, consumerId);
+ }
}
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/BusTopicParams.java b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/BusTopicParams.java
deleted file mode 100644
index 47ab592..0000000
--- a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/BusTopicParams.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package org.oransc.policyagent.dmaap;
-
-import org.springframework.context.annotation.Configuration;
-
-@Configuration("dmaap")
-public class BusTopicParams {
-
- private int port;
- private String server;
- private String topic;
- private String consumerGroup;
- private String consumerInstance;
- private int fetchTimeout;
- private int fetchLimit;
- private String userName;
- private String password;
-}
\ No newline at end of file
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java
index b64a822..30444e2 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java
@@ -3,13 +3,20 @@
import java.util.Properties;
import org.onap.dmaap.mr.client.impl.MRConsumerImpl;
+import org.oransc.policyagent.configuration.ApplicationConfig;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+@Component
public class DmaapMessageConsumerImpl implements DmaapMessageConsumer {
+ private final ApplicationConfig applicationConfig;
+
protected MRConsumerImpl consumer;
-
- public DmaapMessageConsumerImpl() {
- // TODO Auto-generated constructor stub
+
+ @Autowired
+ public DmaapMessageConsumerImpl(ApplicationConfig applicationConfig) {
+ this.applicationConfig = applicationConfig;
}
@Override
@@ -20,6 +27,7 @@
@Override
public void init(Properties baseProperties) {
+ Properties dmaapConsumerConfig = applicationConfig.getDmaapConsumerConfig();
// Initialize the DMAAP with the properties
// TODO Auto-generated method stub
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java
index 9a8dc34..bc43eda 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RefreshConfigTask.java
@@ -125,7 +125,7 @@
try {
ApplicationConfigParser parser = new ApplicationConfigParser();
parser.parse(jsonObject);
- this.appConfig.setConfiguration(parser.getRicConfigs());
+ this.appConfig.setConfiguration(parser.getRicConfigs(), parser.getDmaapConsumerConfig());
} catch (ServiceException e) {
logger.error("Could not parse configuration {}", e.toString(), e);
}
@@ -152,7 +152,7 @@
}
ApplicationConfigParser appParser = new ApplicationConfigParser();
appParser.parse(rootObject);
- appConfig.setConfiguration(appParser.getRicConfigs());
+ appConfig.setConfiguration(appParser.getRicConfigs(), appParser.getDmaapConsumerConfig());
logger.info("Local configuration file loaded: {}", filepath);
} catch (JsonSyntaxException | ServiceException | IOException e) {
logger.trace("Local configuration file not loaded: {}", filepath, e);
diff --git a/policy-agent/src/test/resources/test_application_configuration.json b/policy-agent/src/test/resources/test_application_configuration.json
index 446c061..c63b710 100644
--- a/policy-agent/src/test/resources/test_application_configuration.json
+++ b/policy-agent/src/test/resources/test_application_configuration.json
@@ -19,5 +19,13 @@
]
}
]
+ },
+ "streams_subscribes": {
+ "dmaap_subscriber": {
+ "dmaap_info": {
+ "topic_url": "http://dradmin:dradmin@localhost:2222/events/A1-P/users/sdnc1"
+ },
+ "type": "message_router"
+ }
}
}
\ No newline at end of file