Merge "DataLake DB module interface Function modification"
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
index 2bcd0a3..fa9f7d9 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
@@ -54,6 +54,10 @@
 	private String dmaapZookeeperHostPort;
 	private String dmaapKafkaHostPort;
 	private String dmaapKafkaGroup;
+	private String dmaapKafkaLogin;
+	private String dmaapKafkaPass;
+	private String dmaapKafkaSecurityProtocol;
+	
 	private long dmaapKafkaTimeout;
 	private String[] dmaapKafkaExclude;
 
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java
index 9e4ab45..e7121dd 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/Puller.java
@@ -26,11 +26,12 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.annotation.PostConstruct;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -42,8 +43,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.config.ConfigurableBeanFactory;
-import org.springframework.context.annotation.Scope;
 import org.springframework.stereotype.Service;
 
 /**
@@ -90,9 +89,12 @@
 		consumerConfig.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");
 		consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
 
-		//		consumerConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
-		//	consumerConfig.put("sasl.mechanism", "PLAIN");
-
+		if (StringUtils.isNotBlank(config.getDmaapKafkaLogin())) {
+			String jaas = "org.apache.kafka.common.security.plain.PlainLoginModule required username=" + config.getDmaapKafkaLogin() + " password=" + config.getDmaapKafkaPass() + " serviceName=kafka;";
+			consumerConfig.put("sasl.jaas.config", jaas);
+			consumerConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, config.getDmaapKafkaSecurityProtocol());
+			consumerConfig.put("sasl.mechanism", "PLAIN");
+		}
 		return consumerConfig;
 	}
 
diff --git a/components/datalake-handler/feeder/src/main/resources/application.properties b/components/datalake-handler/feeder/src/main/resources/application.properties
index faf2758..60fcb1a 100644
--- a/components/datalake-handler/feeder/src/main/resources/application.properties
+++ b/components/datalake-handler/feeder/src/main/resources/application.properties
@@ -26,18 +26,20 @@
 
 
 #####################DMaaP
-#dmaapZookeeperHostPort=127.0.0.1:2181
-#dmaapKafkaHostPort=127.0.0.1:9092
 dmaapZookeeperHostPort=message-router-zookeeper:2181
 dmaapKafkaHostPort=message-router-kafka:9092
 dmaapKafkaGroup=dlgroup44
+#dmaapKafkaLogin=admin
+#dmaapKafkaPass=admin-secret
+#dmaapKafkaSecurityProtocol=SASL_PLAINTEXT
+
 #in second
-dmaapKafkaTimeout=60
+dmaapKafkaTimeout=10
 dmaapKafkaExclude[0]=__consumer_offsets
 dmaapKafkaExclude[1]=__transaction_state
 #dmaapKafkaExclude[2]=msgrtr.apinode.metrics.dmaap
 #check for new topics , in millisecond
-dmaapCheckNewTopicInterval=60000
+dmaapCheckNewTopicInterval=10000
 
 kafkaConsumerCount=3
 
diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/PullerTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/PullerTest.java
index fab5d4c..4a5553f 100644
--- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/PullerTest.java
+++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/PullerTest.java
@@ -73,6 +73,9 @@
 
 		when(config.getDmaapKafkaHostPort()).thenReturn("test:1000");
 		when(config.getDmaapKafkaGroup()).thenReturn("test");
+		when(config.getDmaapKafkaLogin()).thenReturn("login");
+		when(config.getDmaapKafkaPass()).thenReturn("pass");
+		when(config.getDmaapKafkaSecurityProtocol()).thenReturn("TEXT");
 
 		Thread thread = new Thread(puller);
 		thread.start();