Merge "Remove using of DMAAP client from ONAP"
diff --git a/policy-agent/dpo/blueprints/k8s-policy-agent.yaml b/policy-agent/dpo/blueprints/k8s-policy-agent.yaml
index 0347385..aa09ff3 100644
--- a/policy-agent/dpo/blueprints/k8s-policy-agent.yaml
+++ b/policy-agent/dpo/blueprints/k8s-policy-agent.yaml
@@ -67,7 +67,7 @@
         streams_subscribes:
           dmaap_subscriber:
             dmaap_info:
-              topic_url: { concat: ['https://message-router:3905/events/',{ get_input: subscribe_topic_name }, '/', { get_input: consumer_group }, "/", { get_input: consumer_id }] }
+              topic_url: { concat: ['https://message-router:3905/events/',{ get_input: subscribe_topic_name }, '/', { get_input: consumer_group }, "/", { get_input: consumer_id }, "?timeout=15000&limit=100"] }
             type: message_router
         ric:
           - name: ric1
diff --git a/policy-agent/pom.xml b/policy-agent/pom.xml
index 9bf15b2..8f56cd6 100644
--- a/policy-agent/pom.xml
+++ b/policy-agent/pom.xml
@@ -78,8 +78,8 @@
             <artifactId>spring-boot-starter-webflux</artifactId>
         </dependency>
         <dependency>
-         <groupId>org.springframework.boot</groupId>
-         <artifactId>spring-boot-starter-aop</artifactId>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-aop</artifactId>
         </dependency>
         <dependency>
             <groupId>org.springframework.boot</groupId>
@@ -136,11 +136,6 @@
             <version>${sdk.version}</version>
         </dependency>
         <dependency>
-            <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
-            <artifactId>dmaap-client</artifactId>
-            <version>${sdk.version}</version>
-        </dependency>
-        <dependency>
             <groupId>org.projectlombok</groupId>
             <artifactId>lombok</artifactId>
             <scope>provided</scope>
@@ -159,13 +154,11 @@
             <groupId>org.glassfish.jersey.inject</groupId>
             <artifactId>jersey-hk2</artifactId>
         </dependency>
-
         <!-- Actuator dependencies -->
         <dependency>
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-actuator</artifactId>
         </dependency>
-
         <!--REQUIRED TO GENERATE DOCUMENTATION -->
         <dependency>
             <groupId>io.springfox</groupId>
@@ -177,7 +170,6 @@
             <artifactId>springfox-swagger-ui</artifactId>
             <version>${springfox.version}</version>
         </dependency>
-
         <!-- TEST -->
         <dependency>
             <groupId>org.springframework.boot</groupId>
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java
index 6c2c91b..a253373 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java
@@ -24,7 +24,6 @@
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Properties;
 
 import javax.validation.constraints.NotEmpty;
 
@@ -54,10 +53,12 @@
     private String sslTrustStore = "";
 
     private Map<String, RicConfig> ricConfigs = new HashMap<>();
+
     @Getter
-    private Properties dmaapPublisherConfig;
+    private String dmaapConsumerTopicUrl;
+
     @Getter
-    private Properties dmaapConsumerConfig;
+    private String dmaapProducerTopicUrl;
 
     private Map<String, ControllerConfig> controllerConfigs = new HashMap<>();
 
@@ -109,10 +110,11 @@
         ApplicationConfigParser.ConfigParserResult parserResult) {
 
         Collection<RicConfigUpdate> modifications = new ArrayList<>();
-        this.dmaapPublisherConfig = parserResult.dmaapPublisherConfig();
-        this.dmaapConsumerConfig = parserResult.dmaapConsumerConfig();
         this.controllerConfigs = parserResult.controllerConfigs();
 
+        this.dmaapConsumerTopicUrl = parserResult.dmaapConsumerTopicUrl();
+        this.dmaapProducerTopicUrl = parserResult.dmaapProducerTopicUrl();
+
         Map<String, RicConfig> newRicConfigs = new HashMap<>();
         for (RicConfig newConfig : parserResult.ricConfigs()) {
             RicConfig oldConfig = this.ricConfigs.get(newConfig.name());
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java
index a76f964..14e836b 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java
@@ -24,8 +24,6 @@
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 
-import java.net.MalformedURLException;
-import java.net.URL;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -33,16 +31,13 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Properties;
 import java.util.Set;
 
 import javax.validation.constraints.NotNull;
 
 import org.immutables.gson.Gson;
 import org.immutables.value.Value;
-import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
 import org.oransc.policyagent.exceptions.ServiceException;
-import org.springframework.http.MediaType;
 
 /**
  * Parser for the Json representing of the component configuration.
@@ -57,17 +52,18 @@
     public interface ConfigParserResult {
         List<RicConfig> ricConfigs();
 
-        Properties dmaapPublisherConfig();
-
-        Properties dmaapConsumerConfig();
-
         Map<String, ControllerConfig> controllerConfigs();
+
+        String dmaapConsumerTopicUrl();
+
+        String dmaapProducerTopicUrl();
+
     }
 
     public ConfigParserResult parse(JsonObject root) throws ServiceException {
 
-        Properties dmaapPublisherConfig = new Properties();
-        Properties dmaapConsumerConfig = new Properties();
+        String dmaapProducerTopicUrl = "";
+        String dmaapConsumerTopicUrl = "";
 
         JsonObject agentConfigJson = root.getAsJsonObject(CONFIG);
 
@@ -77,12 +73,12 @@
 
         JsonObject json = agentConfigJson.getAsJsonObject("streams_publishes");
         if (json != null) {
-            dmaapPublisherConfig = parseDmaapConfig(json);
+            dmaapProducerTopicUrl = parseDmaapConfig(json);
         }
 
         json = agentConfigJson.getAsJsonObject("streams_subscribes");
         if (json != null) {
-            dmaapConsumerConfig = parseDmaapConfig(json);
+            dmaapConsumerTopicUrl = parseDmaapConfig(json);
         }
 
         List<RicConfig> ricConfigs = parseRics(agentConfigJson);
@@ -90,8 +86,8 @@
         checkConfigurationConsistency(ricConfigs, controllerConfigs);
 
         return ImmutableConfigParserResult.builder() //
-            .dmaapConsumerConfig(dmaapConsumerConfig) //
-            .dmaapPublisherConfig(dmaapPublisherConfig) //
+            .dmaapConsumerTopicUrl(dmaapConsumerTopicUrl) //
+            .dmaapProducerTopicUrl(dmaapProducerTopicUrl) //
             .ricConfigs(ricConfigs) //
             .controllerConfigs(controllerConfigs) //
             .build();
@@ -114,7 +110,6 @@
             }
 
         }
-
     }
 
     private List<RicConfig> parseRics(JsonObject config) throws ServiceException {
@@ -177,7 +172,7 @@
         return get(obj, memberName).getAsJsonArray();
     }
 
-    private Properties parseDmaapConfig(JsonObject streamCfg) throws ServiceException {
+    private String parseDmaapConfig(JsonObject streamCfg) throws ServiceException {
         Set<Entry<String, JsonElement>> streamConfigEntries = streamCfg.entrySet();
         if (streamConfigEntries.size() != 1) {
             throw new ServiceException(
@@ -185,71 +180,10 @@
         }
         JsonObject streamConfigEntry = streamConfigEntries.iterator().next().getValue().getAsJsonObject();
         JsonObject dmaapInfo = get(streamConfigEntry, "dmaap_info").getAsJsonObject();
-        String topicUrl = getAsString(dmaapInfo, "topic_url");
-
-        try {
-            Properties dmaapProps = new Properties();
-            URL url = new URL(topicUrl);
-            String passwd = "";
-            String userName = "";
-            if (url.getUserInfo() != null) {
-                String[] userInfo = url.getUserInfo().split(":");
-                userName = userInfo[0];
-                passwd = userInfo[1];
-            }
-            String urlPath = url.getPath();
-            DmaapUrlPath path = parseDmaapUrlPath(urlPath);
-
-            dmaapProps.put("ServiceName", url.getHost() + ":" + url.getPort() + "/events");
-            dmaapProps.put("topic", path.dmaapTopicName);
-            dmaapProps.put("host", url.getHost() + ":" + url.getPort());
-            dmaapProps.put("contenttype", MediaType.APPLICATION_JSON.toString());
-            dmaapProps.put("userName", userName);
-            dmaapProps.put("password", passwd);
-            dmaapProps.put("group", path.consumerGroup);
-            dmaapProps.put("id", path.consumerId);
-            dmaapProps.put("TransportType", ProtocolTypeConstants.HTTPNOAUTH.toString());
-            dmaapProps.put("timeout", "15000");
-            dmaapProps.put("limit", "100");
-            dmaapProps.put("maxBatchSize", "10");
-            dmaapProps.put("maxAgeMs", "10000");
-            dmaapProps.put("compress", true);
-            dmaapProps.put("MessageSentThreadOccurance", "2");
-            return dmaapProps;
-        } catch (MalformedURLException e) {
-            throw new ServiceException("Could not parse the URL", e);
-        }
+        return getAsString(dmaapInfo, "topic_url");
     }
 
     private static @NotNull String getAsString(JsonObject obj, String memberName) throws ServiceException {
         return get(obj, memberName).getAsString();
     }
-
-    private class DmaapUrlPath {
-        final String dmaapTopicName;
-        final String consumerGroup;
-        final String consumerId;
-
-        DmaapUrlPath(String dmaapTopicName, String consumerGroup, String consumerId) {
-            this.dmaapTopicName = dmaapTopicName;
-            this.consumerGroup = consumerGroup;
-            this.consumerId = consumerId;
-        }
-    }
-
-    private DmaapUrlPath parseDmaapUrlPath(String urlPath) throws ServiceException {
-        String[] tokens = urlPath.split("/"); // /events/A1-P/users/sdnc1
-        if (!(tokens.length == 3 ^ tokens.length == 5)) {
-            throw new ServiceException("The path has incorrect syntax: " + urlPath);
-        }
-
-        final String dmaapTopicName = tokens[2]; // /events/A1-P
-        String consumerGroup = ""; // users
-        String consumerId = ""; // sdnc1
-        if (tokens.length == 5) {
-            consumerGroup = tokens[3];
-            consumerId = tokens[4];
-        }
-        return new DmaapUrlPath(dmaapTopicName, consumerGroup, consumerId);
-    }
 }
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java
index 011b977..e141bab 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java
@@ -21,15 +21,15 @@
 package org.oransc.policyagent.dmaap;
 
 import com.google.common.collect.Iterables;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
 
 import java.io.IOException;
 import java.time.Duration;
-import java.util.Properties;
+import java.util.ArrayList;
+import java.util.List;
 
-import org.onap.dmaap.mr.client.MRBatchingPublisher;
-import org.onap.dmaap.mr.client.MRClientFactory;
-import org.onap.dmaap.mr.client.MRConsumer;
-import org.onap.dmaap.mr.client.response.MRConsumerResponse;
 import org.oransc.policyagent.clients.AsyncRestClient;
 import org.oransc.policyagent.configuration.ApplicationConfig;
 import org.oransc.policyagent.exceptions.ServiceException;
@@ -37,6 +37,7 @@
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
+import org.springframework.http.ResponseEntity;
 import org.springframework.stereotype.Component;
 
 /**
@@ -62,7 +63,6 @@
     private final ApplicationConfig applicationConfig;
 
     private DmaapMessageHandler dmaapMessageHandler = null;
-    private MRConsumer messageRouterConsumer = null;
 
     @Value("${server.http-port}")
     private int localServerHttpPort;
@@ -99,7 +99,7 @@
                     sleep(TIME_BETWEEN_DMAAP_RETRIES); // wait for configuration
                 }
             } catch (Exception e) {
-                logger.warn("Cannot fetch because of {}", e.getMessage());
+                logger.warn("{}", e.getMessage());
                 sleep(TIME_BETWEEN_DMAAP_RETRIES);
             }
         }
@@ -110,25 +110,35 @@
     }
 
     protected boolean isDmaapConfigured() {
-        Properties consumerCfg = applicationConfig.getDmaapConsumerConfig();
-        Properties producerCfg = applicationConfig.getDmaapPublisherConfig();
-        return (consumerCfg != null && consumerCfg.size() > 0 && producerCfg != null && producerCfg.size() > 0);
+        String producerTopicUrl = applicationConfig.getDmaapProducerTopicUrl();
+        String consumerTopicUrl = applicationConfig.getDmaapConsumerTopicUrl();
+        return (!producerTopicUrl.isEmpty() && !consumerTopicUrl.isEmpty());
+    }
+
+    private static List<String> parseMessages(String jsonString) {
+        JsonArray arrayOfMessages = JsonParser.parseString(jsonString).getAsJsonArray();
+        List<String> result = new ArrayList<>();
+        for (JsonElement element : arrayOfMessages) {
+            if (element.isJsonPrimitive()) {
+                result.add(element.getAsString());
+            } else {
+                String messageAsString = element.toString();
+                result.add(messageAsString);
+            }
+        }
+        return result;
     }
 
     protected Iterable<String> fetchAllMessages() throws ServiceException, IOException {
-        Properties dmaapConsumerProperties = this.applicationConfig.getDmaapConsumerConfig();
-        MRConsumer consumer = getMessageRouterConsumer(dmaapConsumerProperties);
-        MRConsumerResponse response = consumer.fetchWithReturnConsumerResponse();
-        if (response == null || !"200".equals(response.getResponseCode())) {
-            String errorMessage = "DMaaP NULL response received";
-            if (response != null) {
-                errorMessage = "Error respons " + response.getResponseCode() + " " + response.getResponseMessage()
-                    + " from DMaaP.";
-            }
-            throw new ServiceException(errorMessage);
+        String topicUrl = this.applicationConfig.getDmaapConsumerTopicUrl();
+        AsyncRestClient consumer = getMessageRouterConsumer();
+        ResponseEntity<String> response = consumer.getForEntity(topicUrl).block();
+        logger.debug("DMaaP consumer received {} : {}", response.getStatusCode(), response.getBody());
+        if (response.getStatusCode().is2xxSuccessful()) {
+            return parseMessages(response.getBody());
         } else {
-            logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(), response.getResponseMessage());
-            return response.getActualMessages();
+            throw new ServiceException("Cannot fetch because of Error respons: " + response.getStatusCode().toString()
+                + " " + response.getBody());
         }
     }
 
@@ -141,8 +151,8 @@
         if (this.dmaapMessageHandler == null) {
             String agentBaseUrl = "http://localhost:" + this.localServerHttpPort;
             AsyncRestClient agentClient = new AsyncRestClient(agentBaseUrl);
-            Properties dmaapPublisherProperties = applicationConfig.getDmaapPublisherConfig();
-            MRBatchingPublisher producer = MRClientFactory.createBatchingPublisher(dmaapPublisherProperties);
+            AsyncRestClient producer = new AsyncRestClient(this.applicationConfig.getDmaapProducerTopicUrl(),
+                this.applicationConfig.getWebClientConfig());
             this.dmaapMessageHandler = new DmaapMessageHandler(producer, agentClient);
         }
         return this.dmaapMessageHandler;
@@ -156,11 +166,8 @@
         }
     }
 
-    protected MRConsumer getMessageRouterConsumer(Properties dmaapConsumerProperties) throws IOException {
-        if (this.messageRouterConsumer == null) {
-            this.messageRouterConsumer = MRClientFactory.createConsumer(dmaapConsumerProperties);
-        }
-        return this.messageRouterConsumer;
+    protected AsyncRestClient getMessageRouterConsumer() {
+        return new AsyncRestClient("", this.applicationConfig.getWebClientConfig());
     }
 
 }
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java
index 19d1564..efdccd8 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java
@@ -24,10 +24,8 @@
 import com.google.gson.GsonBuilder;
 import com.google.gson.JsonObject;
 
-import java.io.IOException;
 import java.util.Optional;
 
-import org.onap.dmaap.mr.client.MRBatchingPublisher;
 import org.oransc.policyagent.clients.AsyncRestClient;
 import org.oransc.policyagent.dmaap.DmaapRequestMessage.Operation;
 import org.oransc.policyagent.exceptions.ServiceException;
@@ -49,10 +47,10 @@
     private static final Logger logger = LoggerFactory.getLogger(DmaapMessageHandler.class);
     private static Gson gson = new GsonBuilder() //
         .create(); //
-    private final MRBatchingPublisher dmaapClient;
+    private final AsyncRestClient dmaapClient;
     private final AsyncRestClient agentClient;
 
-    public DmaapMessageHandler(MRBatchingPublisher dmaapClient, AsyncRestClient agentClient) {
+    public DmaapMessageHandler(AsyncRestClient dmaapClient, AsyncRestClient agentClient) {
         this.agentClient = agentClient;
         this.dmaapClient = dmaapClient;
     }
@@ -99,11 +97,7 @@
     }
 
     private String prepareBadOperationErrorMessage(Throwable t, String originalMessage) {
-        String operationParameterStart = "operation\":\"";
-        int indexOfOperationStart = originalMessage.indexOf(operationParameterStart) + operationParameterStart.length();
-        int indexOfOperationEnd = originalMessage.indexOf("\",\"", indexOfOperationStart);
-        String badOperation = originalMessage.substring(indexOfOperationStart, indexOfOperationEnd);
-        return t.getMessage().replace("null", badOperation);
+        return t.getMessage();
     }
 
     private Mono<ResponseEntity<String>> invokePolicyAgent(DmaapRequestMessage dmaapRequestMessage) {
@@ -141,14 +135,8 @@
     }
 
     private Mono<String> sendToDmaap(String body) {
-        try {
-            logger.debug("sendToDmaap: {} ", body);
-            dmaapClient.send(body);
-            dmaapClient.sendBatchWithResponse();
-            return Mono.just("OK");
-        } catch (IOException e) {
-            return Mono.error(e);
-        }
+        logger.debug("sendToDmaap: {} ", body);
+        return dmaapClient.post("", "[" + body + "]");
     }
 
     private Mono<String> handleResponseCallError(Throwable t) {
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java
index 5d78207..83c64e8 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java
@@ -134,7 +134,7 @@
     private void notifyAllServices(String body) {
         for (Service service : services.getAll()) {
             String url = service.getCallbackUrl();
-            if (service.getCallbackUrl().length() > 0) {
+            if (url.length() > 0) {
                 createNotificationClient(url) //
                     .put("", body) //
                     .subscribe( //
diff --git a/policy-agent/src/test/java/org/oransc/policyagent/configuration/ApplicationConfigParserTest.java b/policy-agent/src/test/java/org/oransc/policyagent/configuration/ApplicationConfigParserTest.java
index 6b106c9..5a6b023 100644
--- a/policy-agent/src/test/java/org/oransc/policyagent/configuration/ApplicationConfigParserTest.java
+++ b/policy-agent/src/test/java/org/oransc/policyagent/configuration/ApplicationConfigParserTest.java
@@ -20,7 +20,6 @@
 
 package org.oransc.policyagent.configuration;
 
-import static org.junit.jupiter.api.Assertions.assertAll;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
@@ -36,16 +35,12 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
-import java.net.MalformedURLException;
 import java.net.URL;
 import java.nio.charset.StandardCharsets;
 import java.util.Map;
-import java.util.Properties;
 
 import org.junit.jupiter.api.Test;
-import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
 import org.oransc.policyagent.exceptions.ServiceException;
-import org.springframework.http.MediaType;
 
 class ApplicationConfigParserTest {
 
@@ -57,35 +52,13 @@
 
         ApplicationConfigParser.ConfigParserResult result = parserUnderTest.parse(jsonRootObject);
 
-        Properties actualPublisherConfig = result.dmaapPublisherConfig();
-        assertAll("publisherConfig",
-            () -> assertEquals("localhost:6845/events", actualPublisherConfig.get("ServiceName"), "Wrong ServiceName"),
-            () -> assertEquals("A1-POLICY-AGENT-WRITE", actualPublisherConfig.get("topic"), "Wrong topic"),
-            () -> assertEquals("localhost:6845", actualPublisherConfig.get("host"), "Wrong host"),
-            () -> assertEquals(MediaType.APPLICATION_JSON.toString(), actualPublisherConfig.get("contenttype"),
-                "Wrong contenttype"),
-            () -> assertEquals("admin", actualPublisherConfig.get("userName"), "Wrong userName"),
-            () -> assertEquals("admin", actualPublisherConfig.get("password"), "Wrong password"),
-            () -> assertEquals(ProtocolTypeConstants.HTTPNOAUTH.toString(), actualPublisherConfig.get("TransportType"),
-                "Wrong TransportType"),
-            () -> assertEquals("15000", actualPublisherConfig.get("timeout"), "Wrong timeout"),
-            () -> assertEquals("100", actualPublisherConfig.get("limit"), "Wrong limit"));
+        String topicUrl = result.dmaapProducerTopicUrl();
+        assertEquals("http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-WRITE", topicUrl, "controller contents");
 
-        Properties actualConsumerConfig = result.dmaapConsumerConfig();
-        assertAll("consumerConfig",
-            () -> assertEquals("localhost:6845/events", actualConsumerConfig.get("ServiceName"), "Wrong ServiceName"),
-            () -> assertEquals("A1-POLICY-AGENT-READ", actualConsumerConfig.get("topic"), "Wrong topic"),
-            () -> assertEquals("localhost:6845", actualConsumerConfig.get("host"), "Wrong host"),
-            () -> assertEquals(MediaType.APPLICATION_JSON.toString(), actualConsumerConfig.get("contenttype"),
-                "Wrong contenttype"),
-            () -> assertEquals("admin", actualConsumerConfig.get("userName"), "Wrong userName"),
-            () -> assertEquals("admin", actualConsumerConfig.get("password"), "Wrong password"),
-            () -> assertEquals("users", actualConsumerConfig.get("group"), "Wrong group"),
-            () -> assertEquals("policy-agent", actualConsumerConfig.get("id"), "Wrong id"),
-            () -> assertEquals(ProtocolTypeConstants.HTTPNOAUTH.toString(), actualConsumerConfig.get("TransportType"),
-                "Wrong TransportType"),
-            () -> assertEquals("15000", actualConsumerConfig.get("timeout"), "Wrong timeout"),
-            () -> assertEquals("100", actualConsumerConfig.get("limit"), "Wrong limit"));
+        topicUrl = result.dmaapConsumerTopicUrl();
+        assertEquals(
+            "http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-READ/users/policy-agent?timeout=15000&limit=100",
+            topicUrl, "controller contents");
 
         Map<String, ControllerConfig> controllers = result.controllerConfigs();
         assertEquals(1, controllers.size(), "size");
@@ -167,38 +140,6 @@
     }
 
     @Test
-    void whenMalformedUrlStreamsSubscribing() throws Exception {
-        JsonObject jsonRootObject = getJsonRootObject();
-        final String wrongTopicUrl = "WrongTopicUrl";
-        JsonObject json = getDmaapInfo(jsonRootObject, "streams_subscribes", "dmaap_subscriber");
-        json.addProperty("topic_url", wrongTopicUrl);
-        final String expectedMessage = "Could not parse the URL";
-
-        Exception actualException = assertThrows(ServiceException.class, () -> parserUnderTest.parse(jsonRootObject));
-
-        assertEquals(expectedMessage, actualException.getMessage().replace("\"", ""),
-            "Wrong error message when the streams subscribes' URL is malformed");
-        assertEquals(MalformedURLException.class, actualException.getCause().getClass(),
-            "The exception is not a MalformedURLException");
-    }
-
-    @Test
-    void whenMalformedUrlStreamsPublishing() throws Exception {
-        JsonObject jsonRootObject = getJsonRootObject();
-        final String wrongTopicUrl = "WrongTopicUrl";
-        JsonObject json = getDmaapInfo(jsonRootObject, "streams_publishes", "dmaap_publisher");
-        json.addProperty("topic_url", wrongTopicUrl);
-        final String expectedMessage = "Could not parse the URL";
-
-        Exception actualException = assertThrows(ServiceException.class, () -> parserUnderTest.parse(jsonRootObject));
-
-        assertEquals(expectedMessage, actualException.getMessage().replace("\"", ""),
-            "Wrong error message when the streams publishes' URL is malformed");
-        assertEquals(MalformedURLException.class, actualException.getCause().getClass(),
-            "The exception is not a MalformedURLException");
-    }
-
-    @Test
     void whenWrongMemberNameInObject() throws Exception {
         JsonObject jsonRootObject = getJsonRootObject();
         JsonObject json = jsonRootObject.getAsJsonObject("config");
@@ -210,38 +151,6 @@
         assertEquals(message, actualException.getMessage(), "Wrong error message when wrong member name in object");
     }
 
-    @Test
-    void whenWrongUrlPathStreamsSubscribing() throws Exception {
-        JsonObject jsonRootObject = getJsonRootObject();
-        final String wrongTopicUrlString =
-            "http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-READ/users/policy-agent/wrong-topic-url";
-        final URL wrongTopicUrl = new URL(wrongTopicUrlString);
-        JsonObject json = getDmaapInfo(jsonRootObject, "streams_subscribes", "dmaap_subscriber");
-        json.addProperty("topic_url", wrongTopicUrlString);
-        final String expectedMessage = "The path has incorrect syntax: " + wrongTopicUrl.getPath();
-
-        Exception actualException = assertThrows(ServiceException.class, () -> parserUnderTest.parse(jsonRootObject));
-
-        assertEquals(expectedMessage, actualException.getMessage(),
-            "Wrong error message when the streams subscribes' URL has incorrect syntax");
-    }
-
-    @Test
-    void whenWrongUrlPathStreamsPublishing() throws Exception {
-        JsonObject jsonRootObject = getJsonRootObject();
-        final String wrongTopicUrlString =
-            "http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-WRITE/wrong-topic-url";
-        final URL wrongTopicUrl = new URL(wrongTopicUrlString);
-        JsonObject json = getDmaapInfo(jsonRootObject, "streams_publishes", "dmaap_publisher");
-        json.addProperty("topic_url", wrongTopicUrlString);
-        final String expectedMessage = "The path has incorrect syntax: " + wrongTopicUrl.getPath();
-
-        Exception actualException = assertThrows(ServiceException.class, () -> parserUnderTest.parse(jsonRootObject));
-
-        assertEquals(expectedMessage, actualException.getMessage(),
-            "Wrong error message when the streams publishes' URL has incorrect syntax");
-    }
-
     JsonObject getDmaapInfo(JsonObject jsonRootObject, String streamsPublishesOrSubscribes,
         String dmaapPublisherOrSubscriber) throws Exception {
         return jsonRootObject.getAsJsonObject("config").getAsJsonObject(streamsPublishesOrSubscribes)
diff --git a/policy-agent/src/test/java/org/oransc/policyagent/configuration/ApplicationConfigTest.java b/policy-agent/src/test/java/org/oransc/policyagent/configuration/ApplicationConfigTest.java
index 6a6f447..5667fd2 100644
--- a/policy-agent/src/test/java/org/oransc/policyagent/configuration/ApplicationConfigTest.java
+++ b/policy-agent/src/test/java/org/oransc/policyagent/configuration/ApplicationConfigTest.java
@@ -27,7 +27,6 @@
 
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.Properties;
 import java.util.Vector;
 
 import org.junit.jupiter.api.Test;
@@ -50,8 +49,8 @@
     ConfigParserResult configParserResult(RicConfig... rics) {
         return ImmutableConfigParserResult.builder() //
             .ricConfigs(Arrays.asList(rics)) //
-            .dmaapConsumerConfig(new Properties()) //
-            .dmaapPublisherConfig(new Properties()) //
+            .dmaapConsumerTopicUrl("dmaapConsumerTopicUrl") //
+            .dmaapProducerTopicUrl("dmaapProducerTopicUrl") //
             .controllerConfigs(new HashMap<>()) //
             .build();
     }
diff --git a/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerTest.java b/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerTest.java
index 6e78656..a78fde0 100644
--- a/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerTest.java
+++ b/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerTest.java
@@ -35,30 +35,28 @@
 import ch.qos.logback.core.read.ListAppender;
 
 import java.time.Duration;
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.LinkedList;
-import java.util.List;
-import java.util.Properties;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
 import org.mockito.InOrder;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
-import org.onap.dmaap.mr.client.MRConsumer;
-import org.onap.dmaap.mr.client.response.MRConsumerResponse;
+import org.oransc.policyagent.clients.AsyncRestClient;
 import org.oransc.policyagent.configuration.ApplicationConfig;
 import org.oransc.policyagent.utils.LoggingUtils;
 import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import reactor.core.publisher.Mono;
 
 @ExtendWith(MockitoExtension.class)
 class DmaapMessageConsumerTest {
     @Mock
     private ApplicationConfig applicationConfigMock;
     @Mock
-    private MRConsumer messageRouterConsumerMock;
+    private AsyncRestClient messageRouterConsumerMock;
     @Mock
     private DmaapMessageHandler messageHandlerMock;
 
@@ -107,18 +105,15 @@
 
         messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
 
-        MRConsumerResponse response = new MRConsumerResponse();
-        response.setResponseCode(Integer.toString(HttpStatus.OK.value()));
-        response.setActualMessages(Collections.emptyList());
+        Mono<ResponseEntity<String>> response = Mono.empty();
 
         doReturn(false, true).when(messageConsumerUnderTest).isStopped();
-        doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest)
-            .getMessageRouterConsumer(any(Properties.class));
-        when(messageRouterConsumerMock.fetchWithReturnConsumerResponse()).thenReturn(response);
+        doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer();
+        doReturn(response).when(messageRouterConsumerMock).getForEntity(any());
 
         messageConsumerUnderTest.start().join();
 
-        verify(messageRouterConsumerMock).fetchWithReturnConsumerResponse();
+        verify(messageRouterConsumerMock).getForEntity(any());
         verifyNoMoreInteractions(messageRouterConsumerMock);
     }
 
@@ -130,56 +125,73 @@
 
         doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class));
         doReturn(false, true).when(messageConsumerUnderTest).isStopped();
-        doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest)
-            .getMessageRouterConsumer(any(Properties.class));
+        doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer();
 
-        MRConsumerResponse response = new MRConsumerResponse();
-        int responseCode = HttpStatus.BAD_REQUEST.value();
-        response.setResponseCode(Integer.toString(responseCode));
-        String responseMessage = "Error";
-        response.setResponseMessage(responseMessage);
-        when(messageRouterConsumerMock.fetchWithReturnConsumerResponse()).thenReturn(response);
+        Mono<ResponseEntity<String>> response = Mono.just(new ResponseEntity<>("Error", HttpStatus.BAD_REQUEST));
+        when(messageRouterConsumerMock.getForEntity(any())).thenReturn(response);
 
         final ListAppender<ILoggingEvent> logAppender =
             LoggingUtils.getLogListAppender(DmaapMessageConsumer.class, WARN);
 
         messageConsumerUnderTest.start().join();
 
-        assertThat(logAppender.list.get(0).getFormattedMessage()).isEqualTo(
-            "Cannot fetch because of Error respons " + responseCode + " " + responseMessage + " from DMaaP.");
+        assertThat(logAppender.list.get(0).getFormattedMessage())
+            .isEqualTo("Cannot fetch because of Error respons: 400 BAD_REQUEST Error");
 
         verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES);
     }
 
     @Test
     void dmaapConfiguredAndOneMessage_thenPollOnceAndProcessMessage() throws Exception {
+        // The message from MR is here an array of Json objects
         setUpMrConfig();
         messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
 
-        doReturn(false, true).when(messageConsumerUnderTest).isStopped();
-        doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest)
-            .getMessageRouterConsumer(any(Properties.class));
+        String message =
+            "{\"apiVersion\":\"1.0\",\"operation\":\"GET\",\"correlationId\":\"1592341013115594000\",\"originatorId\":\"849e6c6b420\",\"payload\":{},\"requestId\":\"23343221\", \"target\":\"policy-agent\",\"timestamp\":\"2020-06-16 20:56:53.115665\",\"type\":\"request\",\"url\":\"/rics\"}";
+        String messages = "[" + message + "]";
 
-        MRConsumerResponse response = new MRConsumerResponse();
-        response.setResponseCode(Integer.toString(HttpStatus.OK.value()));
-        String responseMessage = "message";
-        List<String> messages = Arrays.asList(responseMessage);
-        response.setActualMessages(messages);
-        when(messageRouterConsumerMock.fetchWithReturnConsumerResponse()).thenReturn(response);
+        doReturn(false, true).when(messageConsumerUnderTest).isStopped();
+        doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer();
+
+        Mono<ResponseEntity<String>> response = Mono.just(new ResponseEntity<>(messages, HttpStatus.OK));
+        when(messageRouterConsumerMock.getForEntity(any())).thenReturn(response);
 
         doReturn(messageHandlerMock).when(messageConsumerUnderTest).getDmaapMessageHandler();
 
         messageConsumerUnderTest.start().join();
 
-        verify(messageHandlerMock).handleDmaapMsg(responseMessage);
+        ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
+        verify(messageHandlerMock).handleDmaapMsg(captor.capture());
+        String messageAfterJsonParsing = captor.getValue();
+        assertThat(messageAfterJsonParsing.contains("apiVersion")).isTrue();
+
         verifyNoMoreInteractions(messageHandlerMock);
     }
 
-    private Properties setUpMrConfig() {
-        Properties properties = new Properties();
-        properties.put("key", "value");
-        when(applicationConfigMock.getDmaapConsumerConfig()).thenReturn(properties);
-        when(applicationConfigMock.getDmaapPublisherConfig()).thenReturn(properties);
-        return properties;
+    @Test
+    void dmaapConfiguredAndOneMessage_thenPollOnceAndProcessMessage2() throws Exception {
+        // The message from MR is here an array of String (which is the case when the MR
+        // simulator is used)
+        setUpMrConfig();
+        messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
+
+        doReturn(false, true).when(messageConsumerUnderTest).isStopped();
+        doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer();
+
+        Mono<ResponseEntity<String>> response = Mono.just(new ResponseEntity<>("[\"aMessage\"]", HttpStatus.OK));
+        when(messageRouterConsumerMock.getForEntity(any())).thenReturn(response);
+
+        doReturn(messageHandlerMock).when(messageConsumerUnderTest).getDmaapMessageHandler();
+
+        messageConsumerUnderTest.start().join();
+
+        verify(messageHandlerMock).handleDmaapMsg("aMessage");
+        verifyNoMoreInteractions(messageHandlerMock);
+    }
+
+    private void setUpMrConfig() {
+        when(applicationConfigMock.getDmaapConsumerTopicUrl()).thenReturn("url");
+        when(applicationConfigMock.getDmaapProducerTopicUrl()).thenReturn("url");
     }
 }
diff --git a/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageHandlerTest.java b/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageHandlerTest.java
index 3cbe28b..7deef6e 100644
--- a/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageHandlerTest.java
+++ b/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageHandlerTest.java
@@ -25,7 +25,6 @@
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -46,8 +45,6 @@
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentCaptor;
-import org.onap.dmaap.mr.client.MRBatchingPublisher;
-import org.onap.dmaap.mr.client.response.MRPublisherResponse;
 import org.oransc.policyagent.clients.AsyncRestClient;
 import org.oransc.policyagent.dmaap.DmaapRequestMessage.Operation;
 import org.oransc.policyagent.repository.ImmutablePolicyType;
@@ -57,7 +54,6 @@
 import org.slf4j.LoggerFactory;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.ResponseEntity;
-import org.springframework.web.reactive.function.client.WebClientResponseException;
 
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
@@ -66,7 +62,7 @@
     private static final Logger logger = LoggerFactory.getLogger(DmaapMessageHandlerTest.class);
     private static final String URL = "url";
 
-    private final MRBatchingPublisher dmaapClient = mock(MRBatchingPublisher.class);
+    private final AsyncRestClient dmaapClient = mock(AsyncRestClient.class);
     private final AsyncRestClient agentClient = mock(AsyncRestClient.class);
     private DmaapMessageHandler testedObject;
     private static Gson gson = new GsonBuilder() //
@@ -112,6 +108,11 @@
         return Mono.just(entity);
     }
 
+    private Mono<ResponseEntity<String>> notOkResponse() {
+        ResponseEntity<String> entity = new ResponseEntity<>("NOK", HttpStatus.BAD_GATEWAY);
+        return Mono.just(entity);
+    }
+
     @Test
     void testMessageParsing() {
         String message = dmaapInputMessage(Operation.DELETE);
@@ -143,8 +144,7 @@
     @Test
     void successfulDelete() throws IOException {
         doReturn(okResponse()).when(agentClient).deleteForEntity(anyString());
-        doReturn(1).when(dmaapClient).send(anyString());
-        doReturn(new MRPublisherResponse()).when(dmaapClient).sendBatchWithResponse();
+        doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
 
         String message = dmaapInputMessage(Operation.DELETE);
 
@@ -157,16 +157,15 @@
         verify(agentClient).deleteForEntity(URL);
         verifyNoMoreInteractions(agentClient);
 
-        verify(dmaapClient).send(anyString());
-        verify(dmaapClient).sendBatchWithResponse();
+        verify(dmaapClient).post(anyString(), anyString());
+
         verifyNoMoreInteractions(dmaapClient);
     }
 
     @Test
     void successfulGet() throws IOException {
         doReturn(okResponse()).when(agentClient).getForEntity(anyString());
-        doReturn(1).when(dmaapClient).send(anyString());
-        doReturn(new MRPublisherResponse()).when(dmaapClient).sendBatchWithResponse();
+        doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
 
         StepVerifier //
             .create(testedObject.createTask(dmaapInputMessage(Operation.GET))) //
@@ -177,16 +176,14 @@
         verify(agentClient).getForEntity(URL);
         verifyNoMoreInteractions(agentClient);
 
-        verify(dmaapClient).send(anyString());
-        verify(dmaapClient).sendBatchWithResponse();
+        verify(dmaapClient).post(anyString(), anyString());
         verifyNoMoreInteractions(dmaapClient);
     }
 
     @Test
     void successfulPut() throws IOException {
         doReturn(okResponse()).when(agentClient).putForEntity(anyString(), anyString());
-        doReturn(1).when(dmaapClient).send(anyString());
-        doReturn(new MRPublisherResponse()).when(dmaapClient).sendBatchWithResponse();
+        doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
 
         StepVerifier //
             .create(testedObject.createTask(dmaapInputMessage(Operation.PUT))) //
@@ -197,16 +194,14 @@
         verify(agentClient).putForEntity(URL, payloadAsString());
         verifyNoMoreInteractions(agentClient);
 
-        verify(dmaapClient).send(anyString());
-        verify(dmaapClient).sendBatchWithResponse();
+        verify(dmaapClient).post(anyString(), anyString());
         verifyNoMoreInteractions(dmaapClient);
     }
 
     @Test
     void successfulPost() throws IOException {
         doReturn(okResponse()).when(agentClient).postForEntity(anyString(), anyString());
-        doReturn(1).when(dmaapClient).send(anyString());
-        doReturn(new MRPublisherResponse()).when(dmaapClient).sendBatchWithResponse();
+        doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
 
         StepVerifier //
             .create(testedObject.createTask(dmaapInputMessage(Operation.POST))) //
@@ -217,34 +212,28 @@
         verify(agentClient).postForEntity(URL, payloadAsString());
         verifyNoMoreInteractions(agentClient);
 
-        verify(dmaapClient).send(anyString());
-        verify(dmaapClient).sendBatchWithResponse();
+        verify(dmaapClient).post(anyString(), anyString());
         verifyNoMoreInteractions(dmaapClient);
     }
 
     @Test
     void exceptionWhenCallingPolicyAgent_thenNotFoundResponse() throws IOException {
-        WebClientResponseException except = new WebClientResponseException(400, "Refused", null, null, null, null);
-        doReturn(Mono.error(except)).when(agentClient).putForEntity(anyString(), any());
-        doReturn(1).when(dmaapClient).send(anyString());
-        doReturn(new MRPublisherResponse()).when(dmaapClient).sendBatchWithResponse();
 
-        StepVerifier //
-            .create(testedObject.createTask(dmaapInputMessage(Operation.PUT))) //
-            .expectSubscription() //
-            .verifyComplete(); //
+        doReturn(notOkResponse()).when(agentClient).putForEntity(anyString(), anyString());
+        doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
+
+        testedObject.createTask(dmaapInputMessage(Operation.PUT)).block();
 
         verify(agentClient).putForEntity(anyString(), anyString());
         verifyNoMoreInteractions(agentClient);
 
         ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
-        verify(dmaapClient).send(captor.capture());
+        verify(dmaapClient).post(anyString(), captor.capture());
         String actualMessage = captor.getValue();
-        assertThat(actualMessage.contains(HttpStatus.BAD_REQUEST.toString()))
-            .as("Message \"%s\" sent to DMaaP contains %s", actualMessage, HttpStatus.BAD_REQUEST) //
+        assertThat(actualMessage.contains(HttpStatus.BAD_GATEWAY.toString()))
+            .as("Message \"%s\" sent to DMaaP contains %s", actualMessage, HttpStatus.BAD_GATEWAY) //
             .isTrue();
 
-        verify(dmaapClient).sendBatchWithResponse();
         verifyNoMoreInteractions(dmaapClient);
     }
 
@@ -257,15 +246,10 @@
         testedObject.handleDmaapMsg(message);
 
         ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
-        verify(dmaapClient).send(captor.capture());
+        verify(dmaapClient).post(anyString(), captor.capture());
         String actualMessage = captor.getValue();
-        assertThat(actualMessage
-            .contains(HttpStatus.BAD_REQUEST + "\",\"message\":\"Not implemented operation: " + badOperation)) //
-                .as("Message \"%s\" sent to DMaaP contains %s", actualMessage, HttpStatus.BAD_REQUEST) //
-                .isTrue();
-
-        verify(dmaapClient).sendBatchWithResponse();
-        verifyNoMoreInteractions(dmaapClient);
+        assertThat(actualMessage.contains("Not implemented operation")).isTrue();
+        assertThat(actualMessage.contains("BAD_REQUEST")).isTrue();
     }
 
     @Test
diff --git a/policy-agent/src/test/java/org/oransc/policyagent/tasks/RefreshConfigTaskTest.java b/policy-agent/src/test/java/org/oransc/policyagent/tasks/RefreshConfigTaskTest.java
index 2b1d2a7..9be6c6f 100644
--- a/policy-agent/src/test/java/org/oransc/policyagent/tasks/RefreshConfigTaskTest.java
+++ b/policy-agent/src/test/java/org/oransc/policyagent/tasks/RefreshConfigTaskTest.java
@@ -355,8 +355,8 @@
     ConfigParserResult configParserResult(RicConfig... rics) {
         return ImmutableConfigParserResult.builder() //
             .ricConfigs(Arrays.asList(rics)) //
-            .dmaapConsumerConfig(new Properties()) //
-            .dmaapPublisherConfig(new Properties()) //
+            .dmaapConsumerTopicUrl("") //
+            .dmaapProducerTopicUrl("") //
             .controllerConfigs(new HashMap<>()) //
             .build();
     }
diff --git a/policy-agent/src/test/resources/test_application_configuration.json b/policy-agent/src/test/resources/test_application_configuration.json
index 0122fb9..3cbc371 100644
--- a/policy-agent/src/test/resources/test_application_configuration.json
+++ b/policy-agent/src/test/resources/test_application_configuration.json
@@ -19,21 +19,21 @@
             ]
          }
       ],
-      "streams_publishes":{
-         "dmaap_publisher":{
-            "type":"message_router",
-            "dmaap_info":{
-               "topic_url":"http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-WRITE"
+      "streams_publishes": {
+         "dmaap_publisher": {
+            "type": "message_router",
+            "dmaap_info": {
+               "topic_url": "http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-WRITE"
             }
          }
       },
-      "streams_subscribes":{
-         "dmaap_subscriber":{
-            "type":"message_router",
-            "dmaap_info":{
-               "topic_url":"http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-READ/users/policy-agent"
+      "streams_subscribes": {
+         "dmaap_subscriber": {
+            "type": "message_router",
+            "dmaap_info": {
+               "topic_url": "http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-READ/users/policy-agent?timeout=15000&limit=100"
             }
          }
       }
    }
-}
+}
\ No newline at end of file
diff --git a/policy-agent/src/test/resources/test_application_configuration_with_dmaap_config.json b/policy-agent/src/test/resources/test_application_configuration_with_dmaap_config.json
index 6d6cdce..61ab31e 100644
--- a/policy-agent/src/test/resources/test_application_configuration_with_dmaap_config.json
+++ b/policy-agent/src/test/resources/test_application_configuration_with_dmaap_config.json
@@ -1,47 +1,47 @@
 {
-   "config":{
-      "controller":[
+   "config": {
+      "controller": [
          {
-          "name":"controller1",
-          "baseUrl":"http://localhost:8083/",
-          "userName":"user",
-          "password":"password"
-        }
+            "name": "controller1",
+            "baseUrl": "http://localhost:8083/",
+            "userName": "user",
+            "password": "password"
+         }
       ],
-      "ric":[
+      "ric": [
          {
-            "name":"ric1",
+            "name": "ric1",
             "controller": "controller1",
-            "baseUrl":"http://localhost:8083/",
-            "managedElementIds":[
+            "baseUrl": "http://localhost:8083/",
+            "managedElementIds": [
                "kista_1",
                "kista_2"
             ]
          },
          {
-            "name":"ric2",
-            "baseUrl":"http://localhost:8085/",
-            "managedElementIds":[
+            "name": "ric2",
+            "baseUrl": "http://localhost:8085/",
+            "managedElementIds": [
                "kista_3",
                "kista_4"
             ]
          }
       ],
-      "streams_publishes":{
-         "dmaap_publisher":{
-            "type":"message_router",
-            "dmaap_info":{
-               "topic_url":"http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-WRITE"
+      "streams_publishes": {
+         "dmaap_publisher": {
+            "type": "message_router",
+            "dmaap_info": {
+               "topic_url": "http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-WRITE"
             }
          }
       },
-      "streams_subscribes":{
-         "dmaap_subscriber":{
-            "type":"message_router",
-            "dmaap_info":{
-               "topic_url":"http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-READ/users/policy-agent"
+      "streams_subscribes": {
+         "dmaap_subscriber": {
+            "type": "message_router",
+            "dmaap_info": {
+               "topic_url": "http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-READ/users/policy-agent?timeout=15000&limit=100"
             }
          }
       }
    }
-}
+}
\ No newline at end of file