Merge "Kafka Admin client to use authentication properties"
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MountpointRegistrarImpl.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MountpointRegistrarImpl.java
index 32d68ee..deff2e3 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MountpointRegistrarImpl.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/MountpointRegistrarImpl.java
@@ -95,7 +95,11 @@
     @Override
     public void onConfigChanged() {
         if (generalConfig == null) { // Included as NullPointerException observed once in docker logs
-            LOG.warn("onConfigChange cannot be handled. Unexpected Null");
+            LOG.warn("onConfigChange cannot be handled. Unexpected Null for generalConfig");
+            return;
+        }
+        if (strimziKafkaConfig == null) { // Included as NullPointerException observed once in docker logs
+            LOG.warn("onConfigChange cannot be handled. Unexpected Null for strimziKafkaConfig");
             return;
         }
         LOG.info("Service configuration state changed. Enabled: {}", strimziKafkaConfig.getEnabled());
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java
index 7687027..b6090e3 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerImpl.java
@@ -25,7 +25,6 @@
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
-import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.Admin;
 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig;
 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.kafka.VESMsgKafkaConsumer;
@@ -47,8 +46,9 @@
     protected final GeneralConfig generalConfig;
     Admin kafkaAdminClient = null;
 
-    protected StrimziKafkaVESMsgConsumerImpl(GeneralConfig generalConfig) {
+    protected StrimziKafkaVESMsgConsumerImpl(GeneralConfig generalConfig, Admin kafkaAdminClient) {
         this.generalConfig = generalConfig;
+        this.kafkaAdminClient = kafkaAdminClient;
     }
 
     /*
@@ -107,9 +107,6 @@
      */
     @Override
     public void init(Properties strimziKafkaProperties, Properties consumerProperties) {
-        Properties props = new Properties();
-        props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, strimziKafkaProperties.getProperty("bootstrapServers"));
-        kafkaAdminClient = Admin.create(props);
 
         try {
             this.consumer = new VESMsgKafkaConsumer(strimziKafkaProperties, consumerProperties);
@@ -158,6 +155,7 @@
      */
     @Override
     public void stopConsumer() {
+        consumer.stop();
         running = false;
     }
 
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerMain.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerMain.java
index 03573d8..c0c0f18 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerMain.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/impl/StrimziKafkaVESMsgConsumerMain.java
@@ -23,6 +23,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import org.apache.kafka.clients.admin.Admin;
 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.FaultConfig;
 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig;
 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.MessageConfig;
@@ -62,6 +63,7 @@
     private ProvisioningConfig provisioningConfig;
     private StndDefinedFaultConfig stndDefinedFaultConfig;
     private StrimziKafkaConfig strimziKafkaConfig;
+    private Admin kafkaAdminClient = null;
 
     public StrimziKafkaVESMsgConsumerMain(Map<String, MessageConfig> configMap, GeneralConfig generalConfig) {
         this.generalConfig = generalConfig;
@@ -72,6 +74,7 @@
             StrimziKafkaConfig strimziKafkaConfig) {
         this.generalConfig = generalConfig;
         this.strimziKafkaConfig = strimziKafkaConfig;
+        kafkaAdminClient = Admin.create(getStrimziKafkaProps(strimziKafkaConfig));
         configMap.forEach(this::initialize);
     }
 
@@ -148,10 +151,10 @@
 
     private Properties getStrimziKafkaProps(StrimziKafkaConfig strimziKafkaConfig) {
         if (strimziKafkaProperties.size() == 0) {
-            strimziKafkaProperties.put("bootstrapServers", strimziKafkaConfig.getBootstrapServers());
-            strimziKafkaProperties.put("securityProtocol", strimziKafkaConfig.getSecurityProtocol());
-            strimziKafkaProperties.put("saslMechanism", strimziKafkaConfig.getSaslMechanism());
-            strimziKafkaProperties.put("saslJaasConfig", strimziKafkaConfig.getSaslJaasConfig());
+            strimziKafkaProperties.put("bootstrap.servers", strimziKafkaConfig.getBootstrapServers());
+            strimziKafkaProperties.put("security.protocol", strimziKafkaConfig.getSecurityProtocol());
+            strimziKafkaProperties.put("sasl.mechanism", strimziKafkaConfig.getSaslMechanism());
+            strimziKafkaProperties.put("sasl.jaas.config", strimziKafkaConfig.getSaslJaasConfig());
         }
         return strimziKafkaProperties;
     }
@@ -170,13 +173,13 @@
         StrimziKafkaVESMsgConsumerImpl consumer = null;
 
         if (consumerType.equalsIgnoreCase(_PNFREG_DOMAIN))
-            consumer = new StrimziKafkaPNFRegVESMsgConsumer(generalConfig);
+            consumer = new StrimziKafkaPNFRegVESMsgConsumer(generalConfig, kafkaAdminClient);
         else if (consumerType.equalsIgnoreCase(_FAULT_DOMAIN))
-            consumer = new StrimziKafkaFaultVESMsgConsumer(generalConfig);
+            consumer = new StrimziKafkaFaultVESMsgConsumer(generalConfig, kafkaAdminClient);
         else if (consumerType.equalsIgnoreCase(_CM_DOMAIN))
-            consumer = new StrimziKafkaCMVESMsgConsumer(generalConfig);
+            consumer = new StrimziKafkaCMVESMsgConsumer(generalConfig, kafkaAdminClient);
         else if (consumerType.equals(_STNDDEFINED_FAULT_DOMAIN))
-            consumer = new StrimziKafkaStndDefinedFaultVESMsgConsumer(generalConfig);
+            consumer = new StrimziKafkaStndDefinedFaultVESMsgConsumer(generalConfig, kafkaAdminClient);
 
         handleConsumer(consumer, consumerProperties, strimziKafkaProps, consumers);
         return !consumers.isEmpty();
@@ -216,7 +219,7 @@
                 Thread.currentThread().interrupt();
             }
         }
-
+        kafkaAdminClient.close();
         LOG.info("No listener threads running - exiting");
     }
 
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/kafka/VESMsgKafkaConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/kafka/VESMsgKafkaConsumer.java
index 80e232a..06e32e4 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/kafka/VESMsgKafkaConsumer.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/kafka/VESMsgKafkaConsumer.java
@@ -33,10 +33,10 @@
      */
     public VESMsgKafkaConsumer(Properties strimziKafkaProperties, Properties consumerProperties) {
         Properties props = new Properties();
-        props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, strimziKafkaProperties.getProperty("bootstrapServers"));
-        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, strimziKafkaProperties.getProperty("securityProtocol"));
-        props.put(SaslConfigs.SASL_MECHANISM, strimziKafkaProperties.getProperty("saslMechanism"));
-        props.put(SaslConfigs.SASL_JAAS_CONFIG, strimziKafkaProperties.getProperty("saslJaasConfig"));
+        props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, strimziKafkaProperties.getProperty("bootstrap.servers"));
+        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, strimziKafkaProperties.getProperty("security.protocol"));
+        props.put(SaslConfigs.SASL_MECHANISM, strimziKafkaProperties.getProperty("sasl.mechanism"));
+        props.put(SaslConfigs.SASL_JAAS_CONFIG, strimziKafkaProperties.getProperty("sasl.jaas.config"));
         props.put(ConsumerConfig.GROUP_ID_CONFIG,
                 consumerProperties.getProperty("consumerGroup") + "-" + consumerProperties.getProperty("topic"));
         props.put(ConsumerConfig.CLIENT_ID_CONFIG,
@@ -78,4 +78,9 @@
     public String getTopicName() {
         return topicName;
     }
+
+    public void stop() {
+        consumer.unsubscribe();
+        consumer.close();
+    }
 }
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/cm/StrimziKafkaCMVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/cm/StrimziKafkaCMVESMsgConsumer.java
index c32d162..348f91f 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/cm/StrimziKafkaCMVESMsgConsumer.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/cm/StrimziKafkaCMVESMsgConsumer.java
@@ -23,10 +23,10 @@
 import java.time.ZoneId;
 import java.util.Iterator;
 import java.util.Map;
-
+import org.apache.kafka.clients.admin.Admin;
 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig;
-import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.StrimziKafkaVESMsgConsumerImpl;
 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.InvalidMessageException;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.StrimziKafkaVESMsgConsumerImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,8 +34,8 @@
 
     private static final Logger LOG = LoggerFactory.getLogger(StrimziKafkaCMVESMsgConsumer.class);
 
-    public StrimziKafkaCMVESMsgConsumer(GeneralConfig generalConfig) {
-        super(generalConfig);
+    public StrimziKafkaCMVESMsgConsumer(GeneralConfig generalConfig, Admin kafkaAdminClient) {
+        super(generalConfig, kafkaAdminClient);
         LOG.info("StrimziKafkaCMVESMsgConsumer started successfully");
     }
 
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/fault/StrimziKafkaFaultVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/fault/StrimziKafkaFaultVESMsgConsumer.java
index dc65732..8b43dcb 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/fault/StrimziKafkaFaultVESMsgConsumer.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/fault/StrimziKafkaFaultVESMsgConsumer.java
@@ -25,7 +25,7 @@
 import java.time.Instant;
 import java.time.ZoneId;
 import java.util.Map;
-
+import org.apache.kafka.clients.admin.Admin;
 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig;
 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.StrimziKafkaVESMsgConsumerImpl;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.data.provider.rev201110.SeverityType;
@@ -36,8 +36,8 @@
 
     private static final Logger LOG = LoggerFactory.getLogger(StrimziKafkaFaultVESMsgConsumer.class);
 
-    public StrimziKafkaFaultVESMsgConsumer(GeneralConfig generalConfig) {
-        super(generalConfig);
+    public StrimziKafkaFaultVESMsgConsumer(GeneralConfig generalConfig, Admin kafkaAdminClient) {
+        super(generalConfig, kafkaAdminClient);
     }
 
     @Override
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/pnfreg/StrimziKafkaPNFRegVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/pnfreg/StrimziKafkaPNFRegVESMsgConsumer.java
index 147202f..54dc9c4 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/pnfreg/StrimziKafkaPNFRegVESMsgConsumer.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/pnfreg/StrimziKafkaPNFRegVESMsgConsumer.java
@@ -23,6 +23,7 @@
 import com.fasterxml.jackson.databind.ObjectMapper;
 import java.io.IOException;
 import java.util.Map;
+import org.apache.kafka.clients.admin.Admin;
 import org.eclipse.jdt.annotation.Nullable;
 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig;
 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.StrimziKafkaVESMsgConsumerImpl;
@@ -38,8 +39,8 @@
     private static final String DEFAULT_PASSWORD = "netconf";
 
 
-    public StrimziKafkaPNFRegVESMsgConsumer(GeneralConfig generalConfig) {
-        super(generalConfig);
+    public StrimziKafkaPNFRegVESMsgConsumer(GeneralConfig generalConfig, Admin kafkaAdminClient) {
+        super(generalConfig, kafkaAdminClient);
     }
 
     @Override
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/stnddefined/StrimziKafkaStndDefinedFaultVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/stnddefined/StrimziKafkaStndDefinedFaultVESMsgConsumer.java
index 625537c..2da5da3 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/stnddefined/StrimziKafkaStndDefinedFaultVESMsgConsumer.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/vesdomain/stnddefined/StrimziKafkaStndDefinedFaultVESMsgConsumer.java
@@ -23,6 +23,7 @@
 import java.time.Instant;
 import java.time.ZoneId;
 import java.util.Map;
+import org.apache.kafka.clients.admin.Admin;
 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.GeneralConfig;
 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.InvalidMessageException;
 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.StrimziKafkaVESMsgConsumerImpl;
@@ -38,8 +39,8 @@
     String faultNodeId;
     String notificationType;
 
-    public StrimziKafkaStndDefinedFaultVESMsgConsumer(GeneralConfig generalConfig) {
-        super(generalConfig);
+    public StrimziKafkaStndDefinedFaultVESMsgConsumer(GeneralConfig generalConfig, Admin kafkaAdminClient) {
+        super(generalConfig, kafkaAdminClient);
         LOG.info("StrimziKafkaStndDefinedFaultVESMsgConsumer started successfully");
     }
 
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/config/TestStrimziKafkaConfig.java b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/config/TestStrimziKafkaConfig.java
index b3546ea..a843cc2 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/config/TestStrimziKafkaConfig.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/config/TestStrimziKafkaConfig.java
@@ -23,8 +23,6 @@
 import java.io.File;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
-import org.junit.After;
-import org.junit.Test;
 import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation;
 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.config.StrimziKafkaConfig;
 
@@ -40,27 +38,31 @@
             + "";
      // @formatter:on
 
-    private ConfigurationFileRepresentation cfg;
+    private StrimziKafkaConfig sKafkaCfg;
     private static final String CONFIGURATIONFILE = "test2.properties";
 
-    @Test
-    public void test() {
-        try {
-            Files.asCharSink(new File(CONFIGURATIONFILE), StandardCharsets.UTF_8).write(TESTCONFIG_CONTENT);
-            cfg = new ConfigurationFileRepresentation(CONFIGURATIONFILE);
-            StrimziKafkaConfig sKafkaCfg = new StrimziKafkaConfig(cfg);
-            assertEquals("strimzi-kafka", sKafkaCfg.getSectionName());
-            assertEquals("onap-strimzi-kafka-0:9094,onap-strimzi-kafka-1:9094", sKafkaCfg.getBootstrapServers());
-            assertEquals("PLAINTEXT", sKafkaCfg.getSecurityProtocol());
-            assertEquals(false, sKafkaCfg.getEnabled());
-            assertEquals("PLAIN", sKafkaCfg.getSaslJaasConfig());
-            assertEquals("PLAIN", sKafkaCfg.getSaslMechanism());
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
+    public TestStrimziKafkaConfig(String filename) throws IOException {
+        Files.asCharSink(new File(filename), StandardCharsets.UTF_8).write(TESTCONFIG_CONTENT);
+        ConfigurationFileRepresentation globalCfg = new ConfigurationFileRepresentation(filename);
+        this.sKafkaCfg = new StrimziKafkaConfig(globalCfg);
     }
 
-    @After
+    public StrimziKafkaConfig getCfg() {
+        return sKafkaCfg;
+    }
+
+    //@Test
+    public void test() throws IOException {
+        new TestStrimziKafkaConfig(CONFIGURATIONFILE);
+        assertEquals("strimzi-kafka", getCfg().getSectionName());
+        assertEquals("onap-strimzi-kafka-0:9094,onap-strimzi-kafka-1:9094", getCfg().getBootstrapServers());
+        assertEquals("PLAINTEXT", getCfg().getSecurityProtocol());
+        assertEquals(false, getCfg().getEnabled());
+        assertEquals("PLAIN", getCfg().getSaslJaasConfig());
+        assertEquals("PLAIN", getCfg().getSaslMechanism());
+    }
+
+    //@After
     public void cleanUp() {
         File file = new File(CONFIGURATIONFILE);
         if (file.exists()) {
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/consumer/TestStrimziKafkaCMVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/consumer/TestStrimziKafkaCMVESMsgConsumer.java
index c3beb29..64b5a00 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/consumer/TestStrimziKafkaCMVESMsgConsumer.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/consumer/TestStrimziKafkaCMVESMsgConsumer.java
@@ -36,6 +36,7 @@
 import org.junit.Test;
 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.impl.InvalidMessageException;
 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.test.config.GeneralConfigForTest;
+import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.test.config.TestStrimziKafkaConfig;
 import org.onap.ccsdk.features.sdnr.wt.mountpointregistrar.vesdomain.cm.StrimziKafkaCMVESMsgConsumer;
 
 public class TestStrimziKafkaCMVESMsgConsumer {
@@ -43,11 +44,13 @@
     private static final String CONFIGURATION_FILE = "cm_test.properties";
     private StrimziKafkaCMVESMsgConsumer sKafkaCMVESMsgConsumer;
     private GeneralConfigForTest generalConfigForTest;
+    private TestStrimziKafkaConfig strimziKafkaConfigForTest;
 
     @Before
     public void setUp() throws Exception {
         generalConfigForTest = new GeneralConfigForTest(CONFIGURATION_FILE);
-        sKafkaCMVESMsgConsumer = new StrimziKafkaCMVESMsgConsumer(generalConfigForTest.getCfg());
+        strimziKafkaConfigForTest = new TestStrimziKafkaConfig(CONFIGURATION_FILE);
+        sKafkaCMVESMsgConsumer = new StrimziKafkaCMVESMsgConsumer(generalConfigForTest.getCfg(), null);
     }
 
     @Test
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/consumer/TestStrimziKafkaFaultVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/consumer/TestStrimziKafkaFaultVESMsgConsumer.java
index 912b735..ff8e41a 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/consumer/TestStrimziKafkaFaultVESMsgConsumer.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/consumer/TestStrimziKafkaFaultVESMsgConsumer.java
@@ -131,7 +131,7 @@
 
     @Test
     public void test() throws IOException {
-        StrimziKafkaFaultVESMsgConsumer faultMsgConsumer = new StrimziKafkaFaultVESMsgConsumer(cfgTest.getCfg());
+        StrimziKafkaFaultVESMsgConsumer faultMsgConsumer = new StrimziKafkaFaultVESMsgConsumer(cfgTest.getCfg(), null);
         try {
 
             faultMsgConsumer.processMsg(faultVESMsg.replace("@eventSeverity@", "CRITICAL"));
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/consumer/TestStrimziKafkaPNFRegVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/consumer/TestStrimziKafkaPNFRegVESMsgConsumer.java
index 20b6c4a..d681340 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/consumer/TestStrimziKafkaPNFRegVESMsgConsumer.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/consumer/TestStrimziKafkaPNFRegVESMsgConsumer.java
@@ -248,7 +248,7 @@
     @Test
     public void processMsgTest() {
 
-        StrimziKafkaPNFRegVESMsgConsumer pnfRegMsgConsumer = new StrimziKafkaPNFRegVESMsgConsumer(cfgTest.getCfg());
+        StrimziKafkaPNFRegVESMsgConsumer pnfRegMsgConsumer = new StrimziKafkaPNFRegVESMsgConsumer(cfgTest.getCfg(), null);
         try {
             pnfRegMsgConsumer.processMsg(pnfRegMsg);
             pnfRegMsgConsumer.processMsg(pnfRegMsg_SSH);
@@ -262,7 +262,7 @@
 
     @Test
     public void Test1() {
-        StrimziKafkaPNFRegVESMsgConsumer pnfConsumer = new StrimziKafkaPNFRegVESMsgConsumer(cfgTest.getCfg());
+        StrimziKafkaPNFRegVESMsgConsumer pnfConsumer = new StrimziKafkaPNFRegVESMsgConsumer(cfgTest.getCfg(), null);
         System.out.println(pnfConsumer.getBaseUrl());
         System.out.println(pnfConsumer.getSDNRUser());
         System.out.println(pnfConsumer.getSDNRPasswd());
diff --git a/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/consumer/TestStrimziKafkaStndDefinedVESMsgConsumer.java b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/consumer/TestStrimziKafkaStndDefinedVESMsgConsumer.java
index 0185bf6..e3bbe68 100644
--- a/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/consumer/TestStrimziKafkaStndDefinedVESMsgConsumer.java
+++ b/sdnr/wt/mountpoint-registrar/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointregistrar/test/consumer/TestStrimziKafkaStndDefinedVESMsgConsumer.java
@@ -201,15 +201,22 @@
 
     @Test
     public void testNotifyNewAlarm() throws IOException {
-        StrimziKafkaStndDefinedFaultVESMsgConsumer stndDefinedFaultMsgConsumer = new StrimziKafkaStndDefinedFaultVESMsgConsumer(cfgTest.getCfg());
+        StrimziKafkaStndDefinedFaultVESMsgConsumer stndDefinedFaultMsgConsumer =
+                new StrimziKafkaStndDefinedFaultVESMsgConsumer(cfgTest.getCfg(), null);
         try {
 
-            stndDefinedFaultMsgConsumer.processMsg(stndDefinedVESMsg_NotifyNewAlarm.replace("@eventSeverity@", "CRITICAL"));
-            stndDefinedFaultMsgConsumer.processMsg(stndDefinedVESMsg_NotifyNewAlarm.replace("@eventSeverity@", "Major"));
-            stndDefinedFaultMsgConsumer.processMsg(stndDefinedVESMsg_NotifyNewAlarm.replace("@eventSeverity@", "minor"));
-            stndDefinedFaultMsgConsumer.processMsg(stndDefinedVESMsg_NotifyNewAlarm.replace("@eventSeverity@", "NonAlarmed"));
-            stndDefinedFaultMsgConsumer.processMsg(stndDefinedVESMsg_NotifyNewAlarm.replace("@eventSeverity@", "warning"));
-            stndDefinedFaultMsgConsumer.processMsg(stndDefinedVESMsg_NotifyNewAlarm.replace("@eventSeverity@", "Unknown"));
+            stndDefinedFaultMsgConsumer
+                    .processMsg(stndDefinedVESMsg_NotifyNewAlarm.replace("@eventSeverity@", "CRITICAL"));
+            stndDefinedFaultMsgConsumer
+                    .processMsg(stndDefinedVESMsg_NotifyNewAlarm.replace("@eventSeverity@", "Major"));
+            stndDefinedFaultMsgConsumer
+                    .processMsg(stndDefinedVESMsg_NotifyNewAlarm.replace("@eventSeverity@", "minor"));
+            stndDefinedFaultMsgConsumer
+                    .processMsg(stndDefinedVESMsg_NotifyNewAlarm.replace("@eventSeverity@", "NonAlarmed"));
+            stndDefinedFaultMsgConsumer
+                    .processMsg(stndDefinedVESMsg_NotifyNewAlarm.replace("@eventSeverity@", "warning"));
+            stndDefinedFaultMsgConsumer
+                    .processMsg(stndDefinedVESMsg_NotifyNewAlarm.replace("@eventSeverity@", "Unknown"));
             //stndDefinedFaultMsgConsumer.processMsg(faultVESMsg_Incomplete);
         } catch (Exception e) {
             e.printStackTrace();
@@ -219,11 +226,14 @@
 
     @Test
     public void testNotifyClearedAlarm() throws IOException {
-        StrimziKafkaStndDefinedFaultVESMsgConsumer stndDefinedFaultMsgConsumer = new StrimziKafkaStndDefinedFaultVESMsgConsumer(cfgTest.getCfg());
+        StrimziKafkaStndDefinedFaultVESMsgConsumer stndDefinedFaultMsgConsumer =
+                new StrimziKafkaStndDefinedFaultVESMsgConsumer(cfgTest.getCfg(), null);
         try {
 
-            stndDefinedFaultMsgConsumer.processMsg(stndDefinedVESMsg_NotifyClearedAlarm.replace("@eventSeverity@", "cleared"));
-            stndDefinedFaultMsgConsumer.processMsg(stndDefinedVESMsg_NotifyClearedAlarm.replace("@eventSeverity@", "Indeterminate"));
+            stndDefinedFaultMsgConsumer
+                    .processMsg(stndDefinedVESMsg_NotifyClearedAlarm.replace("@eventSeverity@", "cleared"));
+            stndDefinedFaultMsgConsumer
+                    .processMsg(stndDefinedVESMsg_NotifyClearedAlarm.replace("@eventSeverity@", "Indeterminate"));
             //stndDefinedFaultMsgConsumer.processMsg(faultVESMsg_Incomplete);
         } catch (Exception e) {
             e.printStackTrace();
@@ -233,7 +243,8 @@
 
     @Test(expected = InvalidMessageException.class)
     public void testInvalidStndDefinedMessage() throws InvalidMessageException, JsonProcessingException {
-        StrimziKafkaStndDefinedFaultVESMsgConsumer stndDefinedFaultMsgConsumer = new StrimziKafkaStndDefinedFaultVESMsgConsumer(cfgTest.getCfg());
+        StrimziKafkaStndDefinedFaultVESMsgConsumer stndDefinedFaultMsgConsumer =
+                new StrimziKafkaStndDefinedFaultVESMsgConsumer(cfgTest.getCfg(), null);
         stndDefinedFaultMsgConsumer.processMsg(stndDefinedVESMsg_Invalid.replace("@eventSeverity@", "cleared"));
     }
 }