Remove using of DMAAP client from ONAP
The DMAAP client did not work with https, which
is a fundametal need.
Instead, the AsynchRestClient is used.
Change-Id: I347ddd5a140e854da858ad87386227b1da8c437a
Issue-ID: NONRTRIC-195
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
diff --git a/policy-agent/dpo/blueprints/k8s-policy-agent.yaml b/policy-agent/dpo/blueprints/k8s-policy-agent.yaml
index 0347385..aa09ff3 100644
--- a/policy-agent/dpo/blueprints/k8s-policy-agent.yaml
+++ b/policy-agent/dpo/blueprints/k8s-policy-agent.yaml
@@ -67,7 +67,7 @@
streams_subscribes:
dmaap_subscriber:
dmaap_info:
- topic_url: { concat: ['https://message-router:3905/events/',{ get_input: subscribe_topic_name }, '/', { get_input: consumer_group }, "/", { get_input: consumer_id }] }
+ topic_url: { concat: ['https://message-router:3905/events/',{ get_input: subscribe_topic_name }, '/', { get_input: consumer_group }, "/", { get_input: consumer_id }, "?timeout=15000&limit=100"] }
type: message_router
ric:
- name: ric1
diff --git a/policy-agent/pom.xml b/policy-agent/pom.xml
index 76f3c33..3725af6 100644
--- a/policy-agent/pom.xml
+++ b/policy-agent/pom.xml
@@ -77,8 +77,8 @@
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-aop</artifactId>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
@@ -135,11 +135,6 @@
<version>${sdk.version}</version>
</dependency>
<dependency>
- <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
- <artifactId>dmaap-client</artifactId>
- <version>${sdk.version}</version>
- </dependency>
- <dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
@@ -158,13 +153,11 @@
<groupId>org.glassfish.jersey.inject</groupId>
<artifactId>jersey-hk2</artifactId>
</dependency>
-
<!-- Actuator dependencies -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
-
<!--REQUIRED TO GENERATE DOCUMENTATION -->
<dependency>
<groupId>io.springfox</groupId>
@@ -176,7 +169,6 @@
<artifactId>springfox-swagger-ui</artifactId>
<version>${springfox.version}</version>
</dependency>
-
<!-- TEST -->
<dependency>
<groupId>org.springframework.boot</groupId>
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java
index 6c2c91b..a253373 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java
@@ -24,7 +24,6 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
-import java.util.Properties;
import javax.validation.constraints.NotEmpty;
@@ -54,10 +53,12 @@
private String sslTrustStore = "";
private Map<String, RicConfig> ricConfigs = new HashMap<>();
+
@Getter
- private Properties dmaapPublisherConfig;
+ private String dmaapConsumerTopicUrl;
+
@Getter
- private Properties dmaapConsumerConfig;
+ private String dmaapProducerTopicUrl;
private Map<String, ControllerConfig> controllerConfigs = new HashMap<>();
@@ -109,10 +110,11 @@
ApplicationConfigParser.ConfigParserResult parserResult) {
Collection<RicConfigUpdate> modifications = new ArrayList<>();
- this.dmaapPublisherConfig = parserResult.dmaapPublisherConfig();
- this.dmaapConsumerConfig = parserResult.dmaapConsumerConfig();
this.controllerConfigs = parserResult.controllerConfigs();
+ this.dmaapConsumerTopicUrl = parserResult.dmaapConsumerTopicUrl();
+ this.dmaapProducerTopicUrl = parserResult.dmaapProducerTopicUrl();
+
Map<String, RicConfig> newRicConfigs = new HashMap<>();
for (RicConfig newConfig : parserResult.ricConfigs()) {
RicConfig oldConfig = this.ricConfigs.get(newConfig.name());
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java
index a76f964..14e836b 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java
@@ -24,8 +24,6 @@
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
-import java.net.MalformedURLException;
-import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -33,16 +31,13 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Properties;
import java.util.Set;
import javax.validation.constraints.NotNull;
import org.immutables.gson.Gson;
import org.immutables.value.Value;
-import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
import org.oransc.policyagent.exceptions.ServiceException;
-import org.springframework.http.MediaType;
/**
* Parser for the Json representing of the component configuration.
@@ -57,17 +52,18 @@
public interface ConfigParserResult {
List<RicConfig> ricConfigs();
- Properties dmaapPublisherConfig();
-
- Properties dmaapConsumerConfig();
-
Map<String, ControllerConfig> controllerConfigs();
+
+ String dmaapConsumerTopicUrl();
+
+ String dmaapProducerTopicUrl();
+
}
public ConfigParserResult parse(JsonObject root) throws ServiceException {
- Properties dmaapPublisherConfig = new Properties();
- Properties dmaapConsumerConfig = new Properties();
+ String dmaapProducerTopicUrl = "";
+ String dmaapConsumerTopicUrl = "";
JsonObject agentConfigJson = root.getAsJsonObject(CONFIG);
@@ -77,12 +73,12 @@
JsonObject json = agentConfigJson.getAsJsonObject("streams_publishes");
if (json != null) {
- dmaapPublisherConfig = parseDmaapConfig(json);
+ dmaapProducerTopicUrl = parseDmaapConfig(json);
}
json = agentConfigJson.getAsJsonObject("streams_subscribes");
if (json != null) {
- dmaapConsumerConfig = parseDmaapConfig(json);
+ dmaapConsumerTopicUrl = parseDmaapConfig(json);
}
List<RicConfig> ricConfigs = parseRics(agentConfigJson);
@@ -90,8 +86,8 @@
checkConfigurationConsistency(ricConfigs, controllerConfigs);
return ImmutableConfigParserResult.builder() //
- .dmaapConsumerConfig(dmaapConsumerConfig) //
- .dmaapPublisherConfig(dmaapPublisherConfig) //
+ .dmaapConsumerTopicUrl(dmaapConsumerTopicUrl) //
+ .dmaapProducerTopicUrl(dmaapProducerTopicUrl) //
.ricConfigs(ricConfigs) //
.controllerConfigs(controllerConfigs) //
.build();
@@ -114,7 +110,6 @@
}
}
-
}
private List<RicConfig> parseRics(JsonObject config) throws ServiceException {
@@ -177,7 +172,7 @@
return get(obj, memberName).getAsJsonArray();
}
- private Properties parseDmaapConfig(JsonObject streamCfg) throws ServiceException {
+ private String parseDmaapConfig(JsonObject streamCfg) throws ServiceException {
Set<Entry<String, JsonElement>> streamConfigEntries = streamCfg.entrySet();
if (streamConfigEntries.size() != 1) {
throw new ServiceException(
@@ -185,71 +180,10 @@
}
JsonObject streamConfigEntry = streamConfigEntries.iterator().next().getValue().getAsJsonObject();
JsonObject dmaapInfo = get(streamConfigEntry, "dmaap_info").getAsJsonObject();
- String topicUrl = getAsString(dmaapInfo, "topic_url");
-
- try {
- Properties dmaapProps = new Properties();
- URL url = new URL(topicUrl);
- String passwd = "";
- String userName = "";
- if (url.getUserInfo() != null) {
- String[] userInfo = url.getUserInfo().split(":");
- userName = userInfo[0];
- passwd = userInfo[1];
- }
- String urlPath = url.getPath();
- DmaapUrlPath path = parseDmaapUrlPath(urlPath);
-
- dmaapProps.put("ServiceName", url.getHost() + ":" + url.getPort() + "/events");
- dmaapProps.put("topic", path.dmaapTopicName);
- dmaapProps.put("host", url.getHost() + ":" + url.getPort());
- dmaapProps.put("contenttype", MediaType.APPLICATION_JSON.toString());
- dmaapProps.put("userName", userName);
- dmaapProps.put("password", passwd);
- dmaapProps.put("group", path.consumerGroup);
- dmaapProps.put("id", path.consumerId);
- dmaapProps.put("TransportType", ProtocolTypeConstants.HTTPNOAUTH.toString());
- dmaapProps.put("timeout", "15000");
- dmaapProps.put("limit", "100");
- dmaapProps.put("maxBatchSize", "10");
- dmaapProps.put("maxAgeMs", "10000");
- dmaapProps.put("compress", true);
- dmaapProps.put("MessageSentThreadOccurance", "2");
- return dmaapProps;
- } catch (MalformedURLException e) {
- throw new ServiceException("Could not parse the URL", e);
- }
+ return getAsString(dmaapInfo, "topic_url");
}
private static @NotNull String getAsString(JsonObject obj, String memberName) throws ServiceException {
return get(obj, memberName).getAsString();
}
-
- private class DmaapUrlPath {
- final String dmaapTopicName;
- final String consumerGroup;
- final String consumerId;
-
- DmaapUrlPath(String dmaapTopicName, String consumerGroup, String consumerId) {
- this.dmaapTopicName = dmaapTopicName;
- this.consumerGroup = consumerGroup;
- this.consumerId = consumerId;
- }
- }
-
- private DmaapUrlPath parseDmaapUrlPath(String urlPath) throws ServiceException {
- String[] tokens = urlPath.split("/"); // /events/A1-P/users/sdnc1
- if (!(tokens.length == 3 ^ tokens.length == 5)) {
- throw new ServiceException("The path has incorrect syntax: " + urlPath);
- }
-
- final String dmaapTopicName = tokens[2]; // /events/A1-P
- String consumerGroup = ""; // users
- String consumerId = ""; // sdnc1
- if (tokens.length == 5) {
- consumerGroup = tokens[3];
- consumerId = tokens[4];
- }
- return new DmaapUrlPath(dmaapTopicName, consumerGroup, consumerId);
- }
}
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java
index 011b977..e141bab 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java
@@ -21,15 +21,15 @@
package org.oransc.policyagent.dmaap;
import com.google.common.collect.Iterables;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
import java.io.IOException;
import java.time.Duration;
-import java.util.Properties;
+import java.util.ArrayList;
+import java.util.List;
-import org.onap.dmaap.mr.client.MRBatchingPublisher;
-import org.onap.dmaap.mr.client.MRClientFactory;
-import org.onap.dmaap.mr.client.MRConsumer;
-import org.onap.dmaap.mr.client.response.MRConsumerResponse;
import org.oransc.policyagent.clients.AsyncRestClient;
import org.oransc.policyagent.configuration.ApplicationConfig;
import org.oransc.policyagent.exceptions.ServiceException;
@@ -37,6 +37,7 @@
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
+import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
/**
@@ -62,7 +63,6 @@
private final ApplicationConfig applicationConfig;
private DmaapMessageHandler dmaapMessageHandler = null;
- private MRConsumer messageRouterConsumer = null;
@Value("${server.http-port}")
private int localServerHttpPort;
@@ -99,7 +99,7 @@
sleep(TIME_BETWEEN_DMAAP_RETRIES); // wait for configuration
}
} catch (Exception e) {
- logger.warn("Cannot fetch because of {}", e.getMessage());
+ logger.warn("{}", e.getMessage());
sleep(TIME_BETWEEN_DMAAP_RETRIES);
}
}
@@ -110,25 +110,35 @@
}
protected boolean isDmaapConfigured() {
- Properties consumerCfg = applicationConfig.getDmaapConsumerConfig();
- Properties producerCfg = applicationConfig.getDmaapPublisherConfig();
- return (consumerCfg != null && consumerCfg.size() > 0 && producerCfg != null && producerCfg.size() > 0);
+ String producerTopicUrl = applicationConfig.getDmaapProducerTopicUrl();
+ String consumerTopicUrl = applicationConfig.getDmaapConsumerTopicUrl();
+ return (!producerTopicUrl.isEmpty() && !consumerTopicUrl.isEmpty());
+ }
+
+ private static List<String> parseMessages(String jsonString) {
+ JsonArray arrayOfMessages = JsonParser.parseString(jsonString).getAsJsonArray();
+ List<String> result = new ArrayList<>();
+ for (JsonElement element : arrayOfMessages) {
+ if (element.isJsonPrimitive()) {
+ result.add(element.getAsString());
+ } else {
+ String messageAsString = element.toString();
+ result.add(messageAsString);
+ }
+ }
+ return result;
}
protected Iterable<String> fetchAllMessages() throws ServiceException, IOException {
- Properties dmaapConsumerProperties = this.applicationConfig.getDmaapConsumerConfig();
- MRConsumer consumer = getMessageRouterConsumer(dmaapConsumerProperties);
- MRConsumerResponse response = consumer.fetchWithReturnConsumerResponse();
- if (response == null || !"200".equals(response.getResponseCode())) {
- String errorMessage = "DMaaP NULL response received";
- if (response != null) {
- errorMessage = "Error respons " + response.getResponseCode() + " " + response.getResponseMessage()
- + " from DMaaP.";
- }
- throw new ServiceException(errorMessage);
+ String topicUrl = this.applicationConfig.getDmaapConsumerTopicUrl();
+ AsyncRestClient consumer = getMessageRouterConsumer();
+ ResponseEntity<String> response = consumer.getForEntity(topicUrl).block();
+ logger.debug("DMaaP consumer received {} : {}", response.getStatusCode(), response.getBody());
+ if (response.getStatusCode().is2xxSuccessful()) {
+ return parseMessages(response.getBody());
} else {
- logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(), response.getResponseMessage());
- return response.getActualMessages();
+ throw new ServiceException("Cannot fetch because of Error respons: " + response.getStatusCode().toString()
+ + " " + response.getBody());
}
}
@@ -141,8 +151,8 @@
if (this.dmaapMessageHandler == null) {
String agentBaseUrl = "http://localhost:" + this.localServerHttpPort;
AsyncRestClient agentClient = new AsyncRestClient(agentBaseUrl);
- Properties dmaapPublisherProperties = applicationConfig.getDmaapPublisherConfig();
- MRBatchingPublisher producer = MRClientFactory.createBatchingPublisher(dmaapPublisherProperties);
+ AsyncRestClient producer = new AsyncRestClient(this.applicationConfig.getDmaapProducerTopicUrl(),
+ this.applicationConfig.getWebClientConfig());
this.dmaapMessageHandler = new DmaapMessageHandler(producer, agentClient);
}
return this.dmaapMessageHandler;
@@ -156,11 +166,8 @@
}
}
- protected MRConsumer getMessageRouterConsumer(Properties dmaapConsumerProperties) throws IOException {
- if (this.messageRouterConsumer == null) {
- this.messageRouterConsumer = MRClientFactory.createConsumer(dmaapConsumerProperties);
- }
- return this.messageRouterConsumer;
+ protected AsyncRestClient getMessageRouterConsumer() {
+ return new AsyncRestClient("", this.applicationConfig.getWebClientConfig());
}
}
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java
index 19d1564..efdccd8 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java
@@ -24,10 +24,8 @@
import com.google.gson.GsonBuilder;
import com.google.gson.JsonObject;
-import java.io.IOException;
import java.util.Optional;
-import org.onap.dmaap.mr.client.MRBatchingPublisher;
import org.oransc.policyagent.clients.AsyncRestClient;
import org.oransc.policyagent.dmaap.DmaapRequestMessage.Operation;
import org.oransc.policyagent.exceptions.ServiceException;
@@ -49,10 +47,10 @@
private static final Logger logger = LoggerFactory.getLogger(DmaapMessageHandler.class);
private static Gson gson = new GsonBuilder() //
.create(); //
- private final MRBatchingPublisher dmaapClient;
+ private final AsyncRestClient dmaapClient;
private final AsyncRestClient agentClient;
- public DmaapMessageHandler(MRBatchingPublisher dmaapClient, AsyncRestClient agentClient) {
+ public DmaapMessageHandler(AsyncRestClient dmaapClient, AsyncRestClient agentClient) {
this.agentClient = agentClient;
this.dmaapClient = dmaapClient;
}
@@ -99,11 +97,7 @@
}
private String prepareBadOperationErrorMessage(Throwable t, String originalMessage) {
- String operationParameterStart = "operation\":\"";
- int indexOfOperationStart = originalMessage.indexOf(operationParameterStart) + operationParameterStart.length();
- int indexOfOperationEnd = originalMessage.indexOf("\",\"", indexOfOperationStart);
- String badOperation = originalMessage.substring(indexOfOperationStart, indexOfOperationEnd);
- return t.getMessage().replace("null", badOperation);
+ return t.getMessage();
}
private Mono<ResponseEntity<String>> invokePolicyAgent(DmaapRequestMessage dmaapRequestMessage) {
@@ -141,14 +135,8 @@
}
private Mono<String> sendToDmaap(String body) {
- try {
- logger.debug("sendToDmaap: {} ", body);
- dmaapClient.send(body);
- dmaapClient.sendBatchWithResponse();
- return Mono.just("OK");
- } catch (IOException e) {
- return Mono.error(e);
- }
+ logger.debug("sendToDmaap: {} ", body);
+ return dmaapClient.post("", "[" + body + "]");
}
private Mono<String> handleResponseCallError(Throwable t) {
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java
index 5d78207..83c64e8 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java
@@ -134,7 +134,7 @@
private void notifyAllServices(String body) {
for (Service service : services.getAll()) {
String url = service.getCallbackUrl();
- if (service.getCallbackUrl().length() > 0) {
+ if (url.length() > 0) {
createNotificationClient(url) //
.put("", body) //
.subscribe( //
diff --git a/policy-agent/src/test/java/org/oransc/policyagent/configuration/ApplicationConfigParserTest.java b/policy-agent/src/test/java/org/oransc/policyagent/configuration/ApplicationConfigParserTest.java
index 6b106c9..5a6b023 100644
--- a/policy-agent/src/test/java/org/oransc/policyagent/configuration/ApplicationConfigParserTest.java
+++ b/policy-agent/src/test/java/org/oransc/policyagent/configuration/ApplicationConfigParserTest.java
@@ -20,7 +20,6 @@
package org.oransc.policyagent.configuration;
-import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -36,16 +35,12 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
-import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.Map;
-import java.util.Properties;
import org.junit.jupiter.api.Test;
-import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
import org.oransc.policyagent.exceptions.ServiceException;
-import org.springframework.http.MediaType;
class ApplicationConfigParserTest {
@@ -57,35 +52,13 @@
ApplicationConfigParser.ConfigParserResult result = parserUnderTest.parse(jsonRootObject);
- Properties actualPublisherConfig = result.dmaapPublisherConfig();
- assertAll("publisherConfig",
- () -> assertEquals("localhost:6845/events", actualPublisherConfig.get("ServiceName"), "Wrong ServiceName"),
- () -> assertEquals("A1-POLICY-AGENT-WRITE", actualPublisherConfig.get("topic"), "Wrong topic"),
- () -> assertEquals("localhost:6845", actualPublisherConfig.get("host"), "Wrong host"),
- () -> assertEquals(MediaType.APPLICATION_JSON.toString(), actualPublisherConfig.get("contenttype"),
- "Wrong contenttype"),
- () -> assertEquals("admin", actualPublisherConfig.get("userName"), "Wrong userName"),
- () -> assertEquals("admin", actualPublisherConfig.get("password"), "Wrong password"),
- () -> assertEquals(ProtocolTypeConstants.HTTPNOAUTH.toString(), actualPublisherConfig.get("TransportType"),
- "Wrong TransportType"),
- () -> assertEquals("15000", actualPublisherConfig.get("timeout"), "Wrong timeout"),
- () -> assertEquals("100", actualPublisherConfig.get("limit"), "Wrong limit"));
+ String topicUrl = result.dmaapProducerTopicUrl();
+ assertEquals("http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-WRITE", topicUrl, "controller contents");
- Properties actualConsumerConfig = result.dmaapConsumerConfig();
- assertAll("consumerConfig",
- () -> assertEquals("localhost:6845/events", actualConsumerConfig.get("ServiceName"), "Wrong ServiceName"),
- () -> assertEquals("A1-POLICY-AGENT-READ", actualConsumerConfig.get("topic"), "Wrong topic"),
- () -> assertEquals("localhost:6845", actualConsumerConfig.get("host"), "Wrong host"),
- () -> assertEquals(MediaType.APPLICATION_JSON.toString(), actualConsumerConfig.get("contenttype"),
- "Wrong contenttype"),
- () -> assertEquals("admin", actualConsumerConfig.get("userName"), "Wrong userName"),
- () -> assertEquals("admin", actualConsumerConfig.get("password"), "Wrong password"),
- () -> assertEquals("users", actualConsumerConfig.get("group"), "Wrong group"),
- () -> assertEquals("policy-agent", actualConsumerConfig.get("id"), "Wrong id"),
- () -> assertEquals(ProtocolTypeConstants.HTTPNOAUTH.toString(), actualConsumerConfig.get("TransportType"),
- "Wrong TransportType"),
- () -> assertEquals("15000", actualConsumerConfig.get("timeout"), "Wrong timeout"),
- () -> assertEquals("100", actualConsumerConfig.get("limit"), "Wrong limit"));
+ topicUrl = result.dmaapConsumerTopicUrl();
+ assertEquals(
+ "http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-READ/users/policy-agent?timeout=15000&limit=100",
+ topicUrl, "controller contents");
Map<String, ControllerConfig> controllers = result.controllerConfigs();
assertEquals(1, controllers.size(), "size");
@@ -167,38 +140,6 @@
}
@Test
- void whenMalformedUrlStreamsSubscribing() throws Exception {
- JsonObject jsonRootObject = getJsonRootObject();
- final String wrongTopicUrl = "WrongTopicUrl";
- JsonObject json = getDmaapInfo(jsonRootObject, "streams_subscribes", "dmaap_subscriber");
- json.addProperty("topic_url", wrongTopicUrl);
- final String expectedMessage = "Could not parse the URL";
-
- Exception actualException = assertThrows(ServiceException.class, () -> parserUnderTest.parse(jsonRootObject));
-
- assertEquals(expectedMessage, actualException.getMessage().replace("\"", ""),
- "Wrong error message when the streams subscribes' URL is malformed");
- assertEquals(MalformedURLException.class, actualException.getCause().getClass(),
- "The exception is not a MalformedURLException");
- }
-
- @Test
- void whenMalformedUrlStreamsPublishing() throws Exception {
- JsonObject jsonRootObject = getJsonRootObject();
- final String wrongTopicUrl = "WrongTopicUrl";
- JsonObject json = getDmaapInfo(jsonRootObject, "streams_publishes", "dmaap_publisher");
- json.addProperty("topic_url", wrongTopicUrl);
- final String expectedMessage = "Could not parse the URL";
-
- Exception actualException = assertThrows(ServiceException.class, () -> parserUnderTest.parse(jsonRootObject));
-
- assertEquals(expectedMessage, actualException.getMessage().replace("\"", ""),
- "Wrong error message when the streams publishes' URL is malformed");
- assertEquals(MalformedURLException.class, actualException.getCause().getClass(),
- "The exception is not a MalformedURLException");
- }
-
- @Test
void whenWrongMemberNameInObject() throws Exception {
JsonObject jsonRootObject = getJsonRootObject();
JsonObject json = jsonRootObject.getAsJsonObject("config");
@@ -210,38 +151,6 @@
assertEquals(message, actualException.getMessage(), "Wrong error message when wrong member name in object");
}
- @Test
- void whenWrongUrlPathStreamsSubscribing() throws Exception {
- JsonObject jsonRootObject = getJsonRootObject();
- final String wrongTopicUrlString =
- "http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-READ/users/policy-agent/wrong-topic-url";
- final URL wrongTopicUrl = new URL(wrongTopicUrlString);
- JsonObject json = getDmaapInfo(jsonRootObject, "streams_subscribes", "dmaap_subscriber");
- json.addProperty("topic_url", wrongTopicUrlString);
- final String expectedMessage = "The path has incorrect syntax: " + wrongTopicUrl.getPath();
-
- Exception actualException = assertThrows(ServiceException.class, () -> parserUnderTest.parse(jsonRootObject));
-
- assertEquals(expectedMessage, actualException.getMessage(),
- "Wrong error message when the streams subscribes' URL has incorrect syntax");
- }
-
- @Test
- void whenWrongUrlPathStreamsPublishing() throws Exception {
- JsonObject jsonRootObject = getJsonRootObject();
- final String wrongTopicUrlString =
- "http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-WRITE/wrong-topic-url";
- final URL wrongTopicUrl = new URL(wrongTopicUrlString);
- JsonObject json = getDmaapInfo(jsonRootObject, "streams_publishes", "dmaap_publisher");
- json.addProperty("topic_url", wrongTopicUrlString);
- final String expectedMessage = "The path has incorrect syntax: " + wrongTopicUrl.getPath();
-
- Exception actualException = assertThrows(ServiceException.class, () -> parserUnderTest.parse(jsonRootObject));
-
- assertEquals(expectedMessage, actualException.getMessage(),
- "Wrong error message when the streams publishes' URL has incorrect syntax");
- }
-
JsonObject getDmaapInfo(JsonObject jsonRootObject, String streamsPublishesOrSubscribes,
String dmaapPublisherOrSubscriber) throws Exception {
return jsonRootObject.getAsJsonObject("config").getAsJsonObject(streamsPublishesOrSubscribes)
diff --git a/policy-agent/src/test/java/org/oransc/policyagent/configuration/ApplicationConfigTest.java b/policy-agent/src/test/java/org/oransc/policyagent/configuration/ApplicationConfigTest.java
index 6a6f447..5667fd2 100644
--- a/policy-agent/src/test/java/org/oransc/policyagent/configuration/ApplicationConfigTest.java
+++ b/policy-agent/src/test/java/org/oransc/policyagent/configuration/ApplicationConfigTest.java
@@ -27,7 +27,6 @@
import java.util.Arrays;
import java.util.HashMap;
-import java.util.Properties;
import java.util.Vector;
import org.junit.jupiter.api.Test;
@@ -50,8 +49,8 @@
ConfigParserResult configParserResult(RicConfig... rics) {
return ImmutableConfigParserResult.builder() //
.ricConfigs(Arrays.asList(rics)) //
- .dmaapConsumerConfig(new Properties()) //
- .dmaapPublisherConfig(new Properties()) //
+ .dmaapConsumerTopicUrl("dmaapConsumerTopicUrl") //
+ .dmaapProducerTopicUrl("dmaapProducerTopicUrl") //
.controllerConfigs(new HashMap<>()) //
.build();
}
diff --git a/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerTest.java b/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerTest.java
index 6e78656..a78fde0 100644
--- a/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerTest.java
+++ b/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerTest.java
@@ -35,30 +35,28 @@
import ch.qos.logback.core.read.ListAppender;
import java.time.Duration;
-import java.util.Arrays;
-import java.util.Collections;
import java.util.LinkedList;
-import java.util.List;
-import java.util.Properties;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
-import org.onap.dmaap.mr.client.MRConsumer;
-import org.onap.dmaap.mr.client.response.MRConsumerResponse;
+import org.oransc.policyagent.clients.AsyncRestClient;
import org.oransc.policyagent.configuration.ApplicationConfig;
import org.oransc.policyagent.utils.LoggingUtils;
import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import reactor.core.publisher.Mono;
@ExtendWith(MockitoExtension.class)
class DmaapMessageConsumerTest {
@Mock
private ApplicationConfig applicationConfigMock;
@Mock
- private MRConsumer messageRouterConsumerMock;
+ private AsyncRestClient messageRouterConsumerMock;
@Mock
private DmaapMessageHandler messageHandlerMock;
@@ -107,18 +105,15 @@
messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
- MRConsumerResponse response = new MRConsumerResponse();
- response.setResponseCode(Integer.toString(HttpStatus.OK.value()));
- response.setActualMessages(Collections.emptyList());
+ Mono<ResponseEntity<String>> response = Mono.empty();
doReturn(false, true).when(messageConsumerUnderTest).isStopped();
- doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest)
- .getMessageRouterConsumer(any(Properties.class));
- when(messageRouterConsumerMock.fetchWithReturnConsumerResponse()).thenReturn(response);
+ doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer();
+ doReturn(response).when(messageRouterConsumerMock).getForEntity(any());
messageConsumerUnderTest.start().join();
- verify(messageRouterConsumerMock).fetchWithReturnConsumerResponse();
+ verify(messageRouterConsumerMock).getForEntity(any());
verifyNoMoreInteractions(messageRouterConsumerMock);
}
@@ -130,56 +125,73 @@
doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class));
doReturn(false, true).when(messageConsumerUnderTest).isStopped();
- doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest)
- .getMessageRouterConsumer(any(Properties.class));
+ doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer();
- MRConsumerResponse response = new MRConsumerResponse();
- int responseCode = HttpStatus.BAD_REQUEST.value();
- response.setResponseCode(Integer.toString(responseCode));
- String responseMessage = "Error";
- response.setResponseMessage(responseMessage);
- when(messageRouterConsumerMock.fetchWithReturnConsumerResponse()).thenReturn(response);
+ Mono<ResponseEntity<String>> response = Mono.just(new ResponseEntity<>("Error", HttpStatus.BAD_REQUEST));
+ when(messageRouterConsumerMock.getForEntity(any())).thenReturn(response);
final ListAppender<ILoggingEvent> logAppender =
LoggingUtils.getLogListAppender(DmaapMessageConsumer.class, WARN);
messageConsumerUnderTest.start().join();
- assertThat(logAppender.list.get(0).getFormattedMessage()).isEqualTo(
- "Cannot fetch because of Error respons " + responseCode + " " + responseMessage + " from DMaaP.");
+ assertThat(logAppender.list.get(0).getFormattedMessage())
+ .isEqualTo("Cannot fetch because of Error respons: 400 BAD_REQUEST Error");
verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES);
}
@Test
void dmaapConfiguredAndOneMessage_thenPollOnceAndProcessMessage() throws Exception {
+ // The message from MR is here an array of Json objects
setUpMrConfig();
messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
- doReturn(false, true).when(messageConsumerUnderTest).isStopped();
- doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest)
- .getMessageRouterConsumer(any(Properties.class));
+ String message =
+ "{\"apiVersion\":\"1.0\",\"operation\":\"GET\",\"correlationId\":\"1592341013115594000\",\"originatorId\":\"849e6c6b420\",\"payload\":{},\"requestId\":\"23343221\", \"target\":\"policy-agent\",\"timestamp\":\"2020-06-16 20:56:53.115665\",\"type\":\"request\",\"url\":\"/rics\"}";
+ String messages = "[" + message + "]";
- MRConsumerResponse response = new MRConsumerResponse();
- response.setResponseCode(Integer.toString(HttpStatus.OK.value()));
- String responseMessage = "message";
- List<String> messages = Arrays.asList(responseMessage);
- response.setActualMessages(messages);
- when(messageRouterConsumerMock.fetchWithReturnConsumerResponse()).thenReturn(response);
+ doReturn(false, true).when(messageConsumerUnderTest).isStopped();
+ doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer();
+
+ Mono<ResponseEntity<String>> response = Mono.just(new ResponseEntity<>(messages, HttpStatus.OK));
+ when(messageRouterConsumerMock.getForEntity(any())).thenReturn(response);
doReturn(messageHandlerMock).when(messageConsumerUnderTest).getDmaapMessageHandler();
messageConsumerUnderTest.start().join();
- verify(messageHandlerMock).handleDmaapMsg(responseMessage);
+ ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
+ verify(messageHandlerMock).handleDmaapMsg(captor.capture());
+ String messageAfterJsonParsing = captor.getValue();
+ assertThat(messageAfterJsonParsing.contains("apiVersion")).isTrue();
+
verifyNoMoreInteractions(messageHandlerMock);
}
- private Properties setUpMrConfig() {
- Properties properties = new Properties();
- properties.put("key", "value");
- when(applicationConfigMock.getDmaapConsumerConfig()).thenReturn(properties);
- when(applicationConfigMock.getDmaapPublisherConfig()).thenReturn(properties);
- return properties;
+ @Test
+ void dmaapConfiguredAndOneMessage_thenPollOnceAndProcessMessage2() throws Exception {
+ // The message from MR is here an array of String (which is the case when the MR
+ // simulator is used)
+ setUpMrConfig();
+ messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
+
+ doReturn(false, true).when(messageConsumerUnderTest).isStopped();
+ doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer();
+
+ Mono<ResponseEntity<String>> response = Mono.just(new ResponseEntity<>("[\"aMessage\"]", HttpStatus.OK));
+ when(messageRouterConsumerMock.getForEntity(any())).thenReturn(response);
+
+ doReturn(messageHandlerMock).when(messageConsumerUnderTest).getDmaapMessageHandler();
+
+ messageConsumerUnderTest.start().join();
+
+ verify(messageHandlerMock).handleDmaapMsg("aMessage");
+ verifyNoMoreInteractions(messageHandlerMock);
+ }
+
+ private void setUpMrConfig() {
+ when(applicationConfigMock.getDmaapConsumerTopicUrl()).thenReturn("url");
+ when(applicationConfigMock.getDmaapProducerTopicUrl()).thenReturn("url");
}
}
diff --git a/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageHandlerTest.java b/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageHandlerTest.java
index 3cbe28b..7deef6e 100644
--- a/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageHandlerTest.java
+++ b/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageHandlerTest.java
@@ -25,7 +25,6 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@@ -46,8 +45,6 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
-import org.onap.dmaap.mr.client.MRBatchingPublisher;
-import org.onap.dmaap.mr.client.response.MRPublisherResponse;
import org.oransc.policyagent.clients.AsyncRestClient;
import org.oransc.policyagent.dmaap.DmaapRequestMessage.Operation;
import org.oransc.policyagent.repository.ImmutablePolicyType;
@@ -57,7 +54,6 @@
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
-import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
@@ -66,7 +62,7 @@
private static final Logger logger = LoggerFactory.getLogger(DmaapMessageHandlerTest.class);
private static final String URL = "url";
- private final MRBatchingPublisher dmaapClient = mock(MRBatchingPublisher.class);
+ private final AsyncRestClient dmaapClient = mock(AsyncRestClient.class);
private final AsyncRestClient agentClient = mock(AsyncRestClient.class);
private DmaapMessageHandler testedObject;
private static Gson gson = new GsonBuilder() //
@@ -112,6 +108,11 @@
return Mono.just(entity);
}
+ private Mono<ResponseEntity<String>> notOkResponse() {
+ ResponseEntity<String> entity = new ResponseEntity<>("NOK", HttpStatus.BAD_GATEWAY);
+ return Mono.just(entity);
+ }
+
@Test
void testMessageParsing() {
String message = dmaapInputMessage(Operation.DELETE);
@@ -143,8 +144,7 @@
@Test
void successfulDelete() throws IOException {
doReturn(okResponse()).when(agentClient).deleteForEntity(anyString());
- doReturn(1).when(dmaapClient).send(anyString());
- doReturn(new MRPublisherResponse()).when(dmaapClient).sendBatchWithResponse();
+ doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
String message = dmaapInputMessage(Operation.DELETE);
@@ -157,16 +157,15 @@
verify(agentClient).deleteForEntity(URL);
verifyNoMoreInteractions(agentClient);
- verify(dmaapClient).send(anyString());
- verify(dmaapClient).sendBatchWithResponse();
+ verify(dmaapClient).post(anyString(), anyString());
+
verifyNoMoreInteractions(dmaapClient);
}
@Test
void successfulGet() throws IOException {
doReturn(okResponse()).when(agentClient).getForEntity(anyString());
- doReturn(1).when(dmaapClient).send(anyString());
- doReturn(new MRPublisherResponse()).when(dmaapClient).sendBatchWithResponse();
+ doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
StepVerifier //
.create(testedObject.createTask(dmaapInputMessage(Operation.GET))) //
@@ -177,16 +176,14 @@
verify(agentClient).getForEntity(URL);
verifyNoMoreInteractions(agentClient);
- verify(dmaapClient).send(anyString());
- verify(dmaapClient).sendBatchWithResponse();
+ verify(dmaapClient).post(anyString(), anyString());
verifyNoMoreInteractions(dmaapClient);
}
@Test
void successfulPut() throws IOException {
doReturn(okResponse()).when(agentClient).putForEntity(anyString(), anyString());
- doReturn(1).when(dmaapClient).send(anyString());
- doReturn(new MRPublisherResponse()).when(dmaapClient).sendBatchWithResponse();
+ doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
StepVerifier //
.create(testedObject.createTask(dmaapInputMessage(Operation.PUT))) //
@@ -197,16 +194,14 @@
verify(agentClient).putForEntity(URL, payloadAsString());
verifyNoMoreInteractions(agentClient);
- verify(dmaapClient).send(anyString());
- verify(dmaapClient).sendBatchWithResponse();
+ verify(dmaapClient).post(anyString(), anyString());
verifyNoMoreInteractions(dmaapClient);
}
@Test
void successfulPost() throws IOException {
doReturn(okResponse()).when(agentClient).postForEntity(anyString(), anyString());
- doReturn(1).when(dmaapClient).send(anyString());
- doReturn(new MRPublisherResponse()).when(dmaapClient).sendBatchWithResponse();
+ doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
StepVerifier //
.create(testedObject.createTask(dmaapInputMessage(Operation.POST))) //
@@ -217,34 +212,28 @@
verify(agentClient).postForEntity(URL, payloadAsString());
verifyNoMoreInteractions(agentClient);
- verify(dmaapClient).send(anyString());
- verify(dmaapClient).sendBatchWithResponse();
+ verify(dmaapClient).post(anyString(), anyString());
verifyNoMoreInteractions(dmaapClient);
}
@Test
void exceptionWhenCallingPolicyAgent_thenNotFoundResponse() throws IOException {
- WebClientResponseException except = new WebClientResponseException(400, "Refused", null, null, null, null);
- doReturn(Mono.error(except)).when(agentClient).putForEntity(anyString(), any());
- doReturn(1).when(dmaapClient).send(anyString());
- doReturn(new MRPublisherResponse()).when(dmaapClient).sendBatchWithResponse();
- StepVerifier //
- .create(testedObject.createTask(dmaapInputMessage(Operation.PUT))) //
- .expectSubscription() //
- .verifyComplete(); //
+ doReturn(notOkResponse()).when(agentClient).putForEntity(anyString(), anyString());
+ doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
+
+ testedObject.createTask(dmaapInputMessage(Operation.PUT)).block();
verify(agentClient).putForEntity(anyString(), anyString());
verifyNoMoreInteractions(agentClient);
ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
- verify(dmaapClient).send(captor.capture());
+ verify(dmaapClient).post(anyString(), captor.capture());
String actualMessage = captor.getValue();
- assertThat(actualMessage.contains(HttpStatus.BAD_REQUEST.toString()))
- .as("Message \"%s\" sent to DMaaP contains %s", actualMessage, HttpStatus.BAD_REQUEST) //
+ assertThat(actualMessage.contains(HttpStatus.BAD_GATEWAY.toString()))
+ .as("Message \"%s\" sent to DMaaP contains %s", actualMessage, HttpStatus.BAD_GATEWAY) //
.isTrue();
- verify(dmaapClient).sendBatchWithResponse();
verifyNoMoreInteractions(dmaapClient);
}
@@ -257,15 +246,10 @@
testedObject.handleDmaapMsg(message);
ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
- verify(dmaapClient).send(captor.capture());
+ verify(dmaapClient).post(anyString(), captor.capture());
String actualMessage = captor.getValue();
- assertThat(actualMessage
- .contains(HttpStatus.BAD_REQUEST + "\",\"message\":\"Not implemented operation: " + badOperation)) //
- .as("Message \"%s\" sent to DMaaP contains %s", actualMessage, HttpStatus.BAD_REQUEST) //
- .isTrue();
-
- verify(dmaapClient).sendBatchWithResponse();
- verifyNoMoreInteractions(dmaapClient);
+ assertThat(actualMessage.contains("Not implemented operation")).isTrue();
+ assertThat(actualMessage.contains("BAD_REQUEST")).isTrue();
}
@Test
diff --git a/policy-agent/src/test/java/org/oransc/policyagent/tasks/RefreshConfigTaskTest.java b/policy-agent/src/test/java/org/oransc/policyagent/tasks/RefreshConfigTaskTest.java
index 2b1d2a7..9be6c6f 100644
--- a/policy-agent/src/test/java/org/oransc/policyagent/tasks/RefreshConfigTaskTest.java
+++ b/policy-agent/src/test/java/org/oransc/policyagent/tasks/RefreshConfigTaskTest.java
@@ -355,8 +355,8 @@
ConfigParserResult configParserResult(RicConfig... rics) {
return ImmutableConfigParserResult.builder() //
.ricConfigs(Arrays.asList(rics)) //
- .dmaapConsumerConfig(new Properties()) //
- .dmaapPublisherConfig(new Properties()) //
+ .dmaapConsumerTopicUrl("") //
+ .dmaapProducerTopicUrl("") //
.controllerConfigs(new HashMap<>()) //
.build();
}
diff --git a/policy-agent/src/test/resources/test_application_configuration.json b/policy-agent/src/test/resources/test_application_configuration.json
index 0122fb9..3cbc371 100644
--- a/policy-agent/src/test/resources/test_application_configuration.json
+++ b/policy-agent/src/test/resources/test_application_configuration.json
@@ -19,21 +19,21 @@
]
}
],
- "streams_publishes":{
- "dmaap_publisher":{
- "type":"message_router",
- "dmaap_info":{
- "topic_url":"http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-WRITE"
+ "streams_publishes": {
+ "dmaap_publisher": {
+ "type": "message_router",
+ "dmaap_info": {
+ "topic_url": "http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-WRITE"
}
}
},
- "streams_subscribes":{
- "dmaap_subscriber":{
- "type":"message_router",
- "dmaap_info":{
- "topic_url":"http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-READ/users/policy-agent"
+ "streams_subscribes": {
+ "dmaap_subscriber": {
+ "type": "message_router",
+ "dmaap_info": {
+ "topic_url": "http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-READ/users/policy-agent?timeout=15000&limit=100"
}
}
}
}
-}
+}
\ No newline at end of file
diff --git a/policy-agent/src/test/resources/test_application_configuration_with_dmaap_config.json b/policy-agent/src/test/resources/test_application_configuration_with_dmaap_config.json
index 6d6cdce..61ab31e 100644
--- a/policy-agent/src/test/resources/test_application_configuration_with_dmaap_config.json
+++ b/policy-agent/src/test/resources/test_application_configuration_with_dmaap_config.json
@@ -1,47 +1,47 @@
{
- "config":{
- "controller":[
+ "config": {
+ "controller": [
{
- "name":"controller1",
- "baseUrl":"http://localhost:8083/",
- "userName":"user",
- "password":"password"
- }
+ "name": "controller1",
+ "baseUrl": "http://localhost:8083/",
+ "userName": "user",
+ "password": "password"
+ }
],
- "ric":[
+ "ric": [
{
- "name":"ric1",
+ "name": "ric1",
"controller": "controller1",
- "baseUrl":"http://localhost:8083/",
- "managedElementIds":[
+ "baseUrl": "http://localhost:8083/",
+ "managedElementIds": [
"kista_1",
"kista_2"
]
},
{
- "name":"ric2",
- "baseUrl":"http://localhost:8085/",
- "managedElementIds":[
+ "name": "ric2",
+ "baseUrl": "http://localhost:8085/",
+ "managedElementIds": [
"kista_3",
"kista_4"
]
}
],
- "streams_publishes":{
- "dmaap_publisher":{
- "type":"message_router",
- "dmaap_info":{
- "topic_url":"http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-WRITE"
+ "streams_publishes": {
+ "dmaap_publisher": {
+ "type": "message_router",
+ "dmaap_info": {
+ "topic_url": "http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-WRITE"
}
}
},
- "streams_subscribes":{
- "dmaap_subscriber":{
- "type":"message_router",
- "dmaap_info":{
- "topic_url":"http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-READ/users/policy-agent"
+ "streams_subscribes": {
+ "dmaap_subscriber": {
+ "type": "message_router",
+ "dmaap_info": {
+ "topic_url": "http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-READ/users/policy-agent?timeout=15000&limit=100"
}
}
}
}
-}
+}
\ No newline at end of file