Merge "Introduce new dmaapClient for use with ccsdk"
diff --git a/dmaap-listener/pom.xml b/dmaap-listener/pom.xml
index 4dc1215..0d78169 100755
--- a/dmaap-listener/pom.xml
+++ b/dmaap-listener/pom.xml
@@ -30,6 +30,17 @@
         <SWM_VERSION>${project.version}-${build.number}</SWM_VERSION>
     </properties>
 
+    <dependencyManagement>
+        <dependencies>
+            <!-- dmaapClient needs this version of this jar -->
+            <dependency>
+                <groupId>javax.ws.rs</groupId>
+                <artifactId>javax.ws.rs-api</artifactId>
+                <version>2.1</version>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+
     <dependencies>
 
         <dependency>
@@ -105,8 +116,8 @@
                 <version>2.5.1</version>
                 <inherited>true</inherited>
                 <configuration>
-                    <source>1.7</source>
-                    <target>1.7</target>
+                    <source>1.8</source>
+                    <target>1.8</target>
                 </configuration>
             </plugin>
             <plugin>
diff --git a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/DummyDmaapConsumer.java b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/DummyDmaapConsumer.java
index a833634..57fcd88 100644
--- a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/DummyDmaapConsumer.java
+++ b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/DummyDmaapConsumer.java
@@ -24,7 +24,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class DummyDmaapConsumer extends SdncDmaapConsumer {
+public class DummyDmaapConsumer extends SdncDmaapConsumerImpl {
 	
 	private static final Logger LOG = LoggerFactory
 			.getLogger(DummyDmaapConsumer.class);
diff --git a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/MessageRouterHttpClient.java b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/MessageRouterHttpClient.java
new file mode 100644
index 0000000..234a202
--- /dev/null
+++ b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/MessageRouterHttpClient.java
@@ -0,0 +1,194 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * openECOMP : SDN-C
+ * ================================================================================
+ * Copyright (C) 2017 - 2018 AT&T Intellectual Property. All rights
+ * 						reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.ccsdk.sli.northbound.dmaapclient;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.Invocation;
+import javax.ws.rs.client.Invocation.Builder;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * jax-rs based client to build message router consumers
+ */
+public class MessageRouterHttpClient implements SdncDmaapConsumer {
+    private static final Logger Log = LoggerFactory.getLogger(MessageRouterHttpClient.class);
+
+    protected Boolean isReady = false;
+    protected Boolean isRunning = false;
+    protected Client client;
+    protected URI uri;
+    protected Invocation getMessages;
+    protected Integer fetchPause;
+    protected Properties properties;
+    protected final String DEFAULT_CONNECT_TIMEOUT_SECONDS = "30";
+    protected final String DEFAULT_READ_TIMEOUT_MINUTES = "3";
+    protected final String DEFAULT_TIMEOUT_QUERY_PARAM_VALUE = "15000";
+    protected final String DEFAULT_LIMIT = null;
+
+    public MessageRouterHttpClient() {
+
+    }
+
+    @Override
+    public void run() {
+        if (isReady) {
+            isRunning = true;
+            while (isRunning) {
+                try {
+                    Response response = getMessages.invoke();
+                    Log.info("GET " + uri + " returned http status " + response.getStatus());
+                    String entity = response.readEntity(String.class);
+                    if (entity.contains("{")) {
+                        // Get rid of opening ["
+                        entity = entity.substring(2);
+                        // Get rid of closing "]
+                        entity = entity.substring(0, entity.length() - 2);
+                        // This replacement effectively un-escapes the JSON
+                        for (String message : entity.split("\",\"")) {
+                            try {
+                                processMsg(message.replace("\\\"", "\""));
+                            } catch (InvalidMessageException e) {
+                                Log.error("Message could not be processed", e);
+                            }
+                        }
+                    } else {
+                        Log.info("Entity doesn't appear to contain JSON elements");
+                    }
+                } catch (Exception e) {
+                    Log.error("GET " + uri + " failed.", e);
+                } finally {
+                    Log.info("Pausing " + fetchPause + " milliseconds before fetching from " + uri + " again.");
+                    try {
+                        Thread.sleep(fetchPause);
+                    } catch (InterruptedException e) {
+                        Log.error("Could not sleep thread", e);
+                    }
+                }
+            }
+        }
+    }
+
+    @Override
+    public void init(Properties baseProperties, String consumerPropertiesPath) {
+        try {
+            baseProperties.load(new FileInputStream(new File(consumerPropertiesPath)));
+            this.properties = baseProperties;
+            String username = baseProperties.getProperty("username");
+            String password = baseProperties.getProperty("password");
+            String topic = baseProperties.getProperty("topic");
+            String group = baseProperties.getProperty("group");
+            String host = baseProperties.getProperty("host");
+            String id = baseProperties.getProperty("id");
+
+            String filter = baseProperties.getProperty("filter");
+            if (filter != null) {
+                if (filter.length() > 0) {
+                    filter = URLEncoder.encode(filter, StandardCharsets.UTF_8.name());
+                } else {
+                    filter = null;
+                }
+            }
+
+            String limitString = baseProperties.getProperty("limit", DEFAULT_LIMIT);
+            Integer limit = null;
+            if (limitString != null && limitString.length() > 0) {
+                limit = Integer.valueOf(limitString);
+            }
+
+            Integer timeoutQueryParamValue =
+                    Integer.valueOf(baseProperties.getProperty("timeout", DEFAULT_TIMEOUT_QUERY_PARAM_VALUE));
+            Integer connectTimeoutSeconds = Integer
+                    .valueOf(baseProperties.getProperty("connectTimeoutSeconds", DEFAULT_CONNECT_TIMEOUT_SECONDS));
+            Integer readTimeoutMinutes =
+                    Integer.valueOf(baseProperties.getProperty("readTimeoutMinutes", DEFAULT_READ_TIMEOUT_MINUTES));
+
+            String authorizationString = buildAuthorizationString(username, password);
+            this.uri = buildUri(topic, group, id, host, timeoutQueryParamValue, limit, filter);
+            this.client = getClient(connectTimeoutSeconds, readTimeoutMinutes);
+            Builder builder =
+                    client.target(uri).request("application/json").header("Authorization", authorizationString);
+            this.getMessages = builder.buildGet();
+            this.fetchPause = Integer.valueOf(baseProperties.getProperty("fetchPause"));
+            this.isReady = true;
+        } catch (FileNotFoundException e) {
+            Log.error("FileNotFoundException while reading consumer properties", e);
+        } catch (IOException e) {
+            Log.error("IOException while reading consumer properties", e);
+        }
+    }
+
+    @Override
+    public void processMsg(String msg) throws InvalidMessageException {
+        System.out.println(msg);
+    }
+
+    @Override
+    public boolean isReady() {
+        return isReady;
+    }
+
+    @Override
+    public boolean isRunning() {
+        return isRunning;
+    }
+
+    protected String buildAuthorizationString(String userName, String password) {
+        String basicAuthString = userName + ":" + password;
+        basicAuthString = Base64.getEncoder().encodeToString(basicAuthString.getBytes());
+        return "Basic " + basicAuthString;
+    }
+
+    protected Client getClient(Integer connectTimeoutSeconds, Integer readTimeoutMinutes) {
+        ClientBuilder clientBuilder = ClientBuilder.newBuilder();
+        clientBuilder.connectTimeout(connectTimeoutSeconds, TimeUnit.SECONDS);
+        clientBuilder.readTimeout(readTimeoutMinutes, TimeUnit.MINUTES);
+        return clientBuilder.build();
+    }
+
+    protected URI buildUri(String topic, String consumerGroup, String consumerId, String host, Integer timeout,
+            Integer limit, String filter) {
+        UriBuilder builder = UriBuilder.fromPath("http://" + host + "/events/{topic}/{consumerGroup}/{consumderId}");
+        builder.queryParam("timeout", timeout);
+        if (limit != null) {
+            builder.queryParam("limit", limit);
+        }
+        if (filter != null) {
+            builder.queryParam("filter", filter);
+        }
+        return builder.build(topic, consumerGroup, consumerId);
+    }
+
+}
diff --git a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncAaiDmaapConsumer.java b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncAaiDmaapConsumer.java
index 0e12dfa..2c4de71 100644
--- a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncAaiDmaapConsumer.java
+++ b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncAaiDmaapConsumer.java
@@ -21,9 +21,6 @@
 
 package org.onap.ccsdk.sli.northbound.dmaapclient;
 
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
@@ -34,14 +31,15 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
-
 import org.apache.velocity.VelocityContext;
 import org.apache.velocity.app.VelocityEngine;
 import org.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
 
-public class SdncAaiDmaapConsumer extends SdncDmaapConsumer {
+public class SdncAaiDmaapConsumer extends SdncDmaapConsumerImpl {
 
     private static final Logger LOG = LoggerFactory.getLogger(SdncAaiDmaapConsumer.class);
     private static final String SDNC_ENDPOINT = "SDNC.endpoint";
diff --git a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDhcpEventConsumer.java b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDhcpEventConsumer.java
index 03560d3..7b68ceb 100644
--- a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDhcpEventConsumer.java
+++ b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDhcpEventConsumer.java
@@ -5,19 +5,14 @@
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.sql.SQLException;
-import java.util.Iterator;
-import java.util.Map;
 import java.util.Properties;
-
 import org.onap.ccsdk.sli.core.dblib.DBResourceManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
 
-public class SdncDhcpEventConsumer extends SdncDmaapConsumer {
+public class SdncDhcpEventConsumer extends SdncDmaapConsumerImpl {
 	private static final Logger LOG = LoggerFactory.getLogger(SdncDhcpEventConsumer.class);
 
 	private static final String MAC_ADDR_TAG = "macaddr";
diff --git a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDmaapConsumer.java b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDmaapConsumer.java
index 2b416e7..3fc769d 100644
--- a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDmaapConsumer.java
+++ b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDmaapConsumer.java
@@ -2,8 +2,8 @@
  * ============LICENSE_START=======================================================
  * openECOMP : SDN-C
  * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights
- * 			reserved.
+ * Copyright (C) 2017 - 2018 AT&T Intellectual Property. All rights
+ * 						reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -21,134 +21,14 @@
 
 package org.onap.ccsdk.sli.northbound.dmaapclient;
 
-import com.att.nsa.mr.client.MRClientFactory;
-import com.att.nsa.mr.client.MRConsumer;
-import com.att.nsa.mr.client.response.MRConsumerResponse;
-import java.io.File;
-import java.io.FileInputStream;
 import java.util.Properties;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-public abstract class SdncDmaapConsumer implements Runnable {
+public abstract interface SdncDmaapConsumer extends Runnable {
+    public abstract void init(Properties baseProperties, String consumerPropertiesPath);
 
-    private static final Logger LOG = LoggerFactory
-        .getLogger(SdncDmaapConsumer.class);
+    public abstract void processMsg(String msg) throws InvalidMessageException;
 
-    private Properties properties = null;
-    private MRConsumer consumer = null;
-    private MRConsumerResponse consumerResponse = null;
-    private boolean running = false;
-    private boolean ready = false;
-    private int fetchPause = 5000; // Default pause between fetch - 5 seconds
-    private int timeout = 15000; // Default timeout - 15 seconds
+    public abstract boolean isReady();
 
-    public SdncDmaapConsumer() {
-
-    }
-
-    public SdncDmaapConsumer(Properties properties, String propertiesPath) {
-        init(properties, propertiesPath);
-    }
-
-    public boolean isReady() {
-        return ready;
-    }
-
-    public boolean isRunning() {
-        return running;
-    }
-
-    public String getProperty(String name) {
-        return properties.getProperty(name, "");
-    }
-
-    public void init(Properties properties, String propertiesPath) {
-
-        try (FileInputStream in = new FileInputStream(new File(propertiesPath))) {
-
-	    LOG.debug("propertiesPath: " + propertiesPath);
-            this.properties = (Properties) properties.clone();
-            this.properties.load(in);
-
-
-            String timeoutStr = this.properties.getProperty("timeout");
-	    LOG.debug("timeoutStr: " + timeoutStr);
-
-            if ((timeoutStr != null) && (timeoutStr.length() > 0)) {
-                timeout = parseTimeOutValue(timeoutStr);
-            }
-
-            String fetchPauseStr = this.properties.getProperty("fetchPause");
-	    LOG.debug("fetchPause(Str): " + fetchPauseStr);
-            if ((fetchPauseStr != null) && (fetchPauseStr.length() > 0)) {
-                fetchPause = parseFetchPause(fetchPauseStr);
-            }
-	    LOG.debug("fetchPause: " + fetchPause);
-
-
-            this.consumer = MRClientFactory.createConsumer(propertiesPath);
-            ready = true;
-        } catch (Exception e) {
-            LOG.error("Error initializing DMaaP consumer from file " + propertiesPath, e);
-        }
-    }
-
-    private int parseTimeOutValue(String timeoutStr) {
-        try {
-            return Integer.parseInt(timeoutStr);
-        } catch (NumberFormatException e) {
-            LOG.error("Non-numeric value specified for timeout (" + timeoutStr + ")");
-        }
-        return timeout;
-    }
-
-    private int parseFetchPause(String fetchPauseStr) {
-        try {
-            return Integer.parseInt(fetchPauseStr);
-        } catch (NumberFormatException e) {
-            LOG.error("Non-numeric value specified for fetchPause (" + fetchPauseStr + ")");
-        }
-        return fetchPause;
-    }
-
-
-    @Override
-    public void run() {
-        if (ready) {
-
-            running = true;
-
-            while (running) {
-
-                try {
-                    boolean noData = true;
-                    consumerResponse = consumer.fetchWithReturnConsumerResponse(timeout, -1);
-                    for (String msg : consumerResponse.getActualMessages()) {
-                        noData = false;
-                        LOG.info("Received message from DMaaP:\n" + msg);
-                        processMsg(msg);
-                    }
-
-                    if (noData) {
-                        pauseThread();
-                    }
-                } catch (Exception e) {
-                    LOG.error("Caught exception reading from DMaaP", e);
-                    running = false;
-                }
-            }
-        }
-    }
-
-    private void pauseThread() throws InterruptedException {
-        if (fetchPause > 0) {
-            LOG.info(String.format("No data received from fetch.  Pausing %d ms before retry", fetchPause));
-            Thread.sleep(fetchPause);
-        } else {
-            LOG.info("No data received from fetch.  No fetch pause specified - retrying immediately");
-        }
-    }
-
-    abstract public void processMsg(String msg) throws InvalidMessageException;
+    public abstract boolean isRunning();
 }
diff --git a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDmaapConsumerImpl.java b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDmaapConsumerImpl.java
new file mode 100644
index 0000000..ddd8713
--- /dev/null
+++ b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncDmaapConsumerImpl.java
@@ -0,0 +1,159 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * openECOMP : SDN-C
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights
+ * 			reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.ccsdk.sli.northbound.dmaapclient;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.util.Properties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.att.nsa.mr.client.MRClientFactory;
+import com.att.nsa.mr.client.MRConsumer;
+import com.att.nsa.mr.client.response.MRConsumerResponse;
+
+public abstract class SdncDmaapConsumerImpl implements SdncDmaapConsumer {
+
+	private static final Logger LOG = LoggerFactory
+			.getLogger(SdncDmaapConsumer.class);
+
+    private final String name = this.getClass().getSimpleName();
+	private Properties properties = null;
+    private MRConsumer consumer = null;
+    private MRConsumerResponse consumerResponse = null;
+    private boolean running = false;
+    private boolean ready = false;
+    private int fetchPause = 5000; // Default pause between fetch - 5 seconds
+    private int timeout = 15000; // Default timeout - 15 seconds
+
+	public SdncDmaapConsumerImpl() {
+
+	}
+
+	public SdncDmaapConsumerImpl(Properties properties, String propertiesPath) {
+		init(properties, propertiesPath);
+	}
+
+    public boolean isReady() {
+        return ready;
+    }
+
+    public boolean isRunning() {
+        return running;
+    }
+
+	public String getProperty(String name) {
+        return properties.getProperty(name, "");
+	}
+
+    public void init(Properties properties, String propertiesPath) {
+
+        try (FileInputStream in = new FileInputStream(new File(propertiesPath))) {
+
+	    LOG.debug("propertiesPath: " + propertiesPath);
+            this.properties = (Properties) properties.clone();
+            this.properties.load(in);
+
+
+			String timeoutStr = this.properties.getProperty("timeout");
+	    LOG.debug("timeoutStr: " + timeoutStr);
+
+			if ((timeoutStr != null) && (timeoutStr.length() > 0)) {
+                timeout = parseTimeOutValue(timeoutStr);
+			}
+
+			String fetchPauseStr = this.properties.getProperty("fetchPause");
+	    LOG.debug("fetchPause(Str): " + fetchPauseStr);
+			if ((fetchPauseStr != null) && (fetchPauseStr.length() > 0)) {
+                fetchPause = parseFetchPause(fetchPauseStr);
+            }
+	    LOG.debug("fetchPause: " + fetchPause);
+
+
+            this.consumer = MRClientFactory.createConsumer(propertiesPath);
+            ready = true;
+        } catch (Exception e) {
+            LOG.error("Error initializing DMaaP consumer from file " + propertiesPath, e);
+        }
+    }
+
+    private int parseTimeOutValue(String timeoutStr) {
+				try {
+            return Integer.parseInt(timeoutStr);
+				} catch (NumberFormatException e) {
+            LOG.error("Non-numeric value specified for timeout (" + timeoutStr + ")");
+				}
+        return timeout;
+			}
+
+    private int parseFetchPause(String fetchPauseStr) {
+        try {
+            return Integer.parseInt(fetchPauseStr);
+        } catch (NumberFormatException e) {
+            LOG.error("Non-numeric value specified for fetchPause (" + fetchPauseStr + ")");
+		}
+        return fetchPause;
+	}
+
+
+	@Override
+	public void run() {
+		if (ready) {
+
+			running = true;
+
+			while (running) {
+
+				try {
+					boolean noData = true;
+					consumerResponse = consumer.fetchWithReturnConsumerResponse(timeout, -1);
+					for (String msg : consumerResponse.getActualMessages()) {
+						noData = false;
+						LOG.info(name + " received ActualMessage from DMaaP:\n"+msg);
+						processMsg(msg);
+					}
+
+					if (noData) {
+                        LOG.info(name + " received ResponseCode: " + consumerResponse.getResponseCode());
+					    LOG.info(name + " received ResponseMessage: " + consumerResponse.getResponseMessage());
+                        pauseThread();
+					}
+				} catch (Exception e) {
+					LOG.error("Caught exception reading from DMaaP", e);
+					running = false;
+				}
+
+
+			}
+		}
+    }
+
+    private void pauseThread() throws InterruptedException {
+        if (fetchPause > 0) {
+            LOG.info(String.format("No data received from fetch.  Pausing %d ms before retry", fetchPause));
+            Thread.sleep(fetchPause);
+        } else {
+            LOG.info("No data received from fetch.  No fetch pause specified - retrying immediately");
+        }
+	}
+
+	abstract public void processMsg(String msg) throws InvalidMessageException;
+}
diff --git a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncFlatJsonDmaapConsumer.java b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncFlatJsonDmaapConsumer.java
index 06e8ebe..6c90c71 100644
--- a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncFlatJsonDmaapConsumer.java
+++ b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncFlatJsonDmaapConsumer.java
@@ -21,9 +21,6 @@
 
 package org.onap.ccsdk.sli.northbound.dmaapclient;
 
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
@@ -32,9 +29,12 @@
 import java.util.Map;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 
 
-public class SdncFlatJsonDmaapConsumer extends SdncDmaapConsumer {
+public class SdncFlatJsonDmaapConsumer extends SdncDmaapConsumerImpl {
 
     private static final Logger LOG = LoggerFactory.getLogger(SdncFlatJsonDmaapConsumer.class);
 
diff --git a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncLcmDmaapConsumer.java b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncLcmDmaapConsumer.java
index 53fb6db..04f520b 100644
--- a/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncLcmDmaapConsumer.java
+++ b/dmaap-listener/src/main/java/org/onap/ccsdk/sli/northbound/dmaapclient/SdncLcmDmaapConsumer.java
@@ -27,7 +27,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class SdncLcmDmaapConsumer extends SdncDmaapConsumer {
+public class SdncLcmDmaapConsumer extends SdncDmaapConsumerImpl {
 
     private static final Logger LOG = LoggerFactory.getLogger(SdncLcmDmaapConsumer.class);