Flatten and Aggregate features in JSON array processing

Issue-ID: DCAEGEN2-1598
Change-Id: I9f563bcfa18285daf7b48878e8427bfdb1aff21f
Signed-off-by: Guobiao Mo <guobiaomo@chinamobile.com>
diff --git a/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql b/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql
index fd9b3dc..ad142dc 100644
--- a/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql
+++ b/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql
@@ -7,6 +7,8 @@
   `enabled` bit(1) DEFAULT 0,

   `login` varchar(255) DEFAULT NULL,

   `message_id_path` varchar(255) DEFAULT NULL,

+  `aggregate_array_path` varchar(2000) DEFAULT NULL,

+  `flatten_array_path` varchar(2000) DEFAULT NULL,

   `pass` varchar(255) DEFAULT NULL,

   `save_raw` bit(1) DEFAULT NULL,

   `ttl` int(11) DEFAULT NULL,

@@ -86,6 +88,12 @@
 

 -- in production, default enabled should be off

 insert into `topic`(`name`,`enabled`,`save_raw`,`ttl`,`data_format`) values ('_DL_DEFAULT_',1,0,3650,'JSON');

+insert into `topic`(`name`,correlate_cleared_message,`enabled`, message_id_path,`data_format`) values ('unauthenticated.SEC_FAULT_OUTPUT',1,1,'/event/commonEventHeader/eventName,/event/commonEventHeader/reportingEntityName,/event/faultFields/specificProblem,/event/commonEventHeader/eventId','JSON');

+insert into `topic`(`name`,`enabled`, aggregate_array_path,flatten_array_path,`data_format`) 

+values ('unauthenticated.VES_MEASUREMENT_OUTPUT',1,

+'/event/measurementsForVfScalingFields/diskUsageArray,/event/measurementsForVfScalingFields/cpuUsageArray,/event/measurementsForVfScalingFields/vNicPerformanceArray',

+'/event/measurementsForVfScalingFields/astriMeasurement/astriDPMeasurementArray/astriInterface',

+'JSON');

 

 insert into `map_db_topic`(`db_name`,`topic_name`) values ('Couchbase','_DL_DEFAULT_');

 insert into `map_db_topic`(`db_name`,`topic_name`) values ('Elasticsearch','_DL_DEFAULT_');

@@ -93,14 +101,17 @@
 insert into `map_db_topic`(`db_name`,`topic_name`) values ('Druid','_DL_DEFAULT_');

 insert into `map_db_topic`(`db_name`,`topic_name`) values ('HDFS','_DL_DEFAULT_');

 

-insert into `topic`(`name`,correlate_cleared_message,`enabled`, message_id_path,`data_format`) values ('unauthenticated.SEC_FAULT_OUTPUT',1,1,'/event/commonEventHeader/eventName,/event/commonEventHeader/reportingEntityName,/event/faultFields/specificProblem,/event/commonEventHeader/eventId','JSON');

-

 insert into `map_db_topic`(`db_name`,`topic_name`) values ('Couchbase','unauthenticated.SEC_FAULT_OUTPUT');

 insert into `map_db_topic`(`db_name`,`topic_name`) values ('Elasticsearch','unauthenticated.SEC_FAULT_OUTPUT');

 insert into `map_db_topic`(`db_name`,`topic_name`) values ('MongoDB','unauthenticated.SEC_FAULT_OUTPUT');

 insert into `map_db_topic`(`db_name`,`topic_name`) values ('Druid','unauthenticated.SEC_FAULT_OUTPUT');

 insert into `map_db_topic`(`db_name`,`topic_name`) values ('HDFS','unauthenticated.SEC_FAULT_OUTPUT');

 

+insert into `map_db_topic`(`db_name`,`topic_name`) values ('Couchbase','unauthenticated.VES_MEASUREMENT_OUTPUT');

+insert into `map_db_topic`(`db_name`,`topic_name`) values ('Elasticsearch','unauthenticated.VES_MEASUREMENT_OUTPUT');

+insert into `map_db_topic`(`db_name`,`topic_name`) values ('MongoDB','unauthenticated.VES_MEASUREMENT_OUTPUT');

+insert into `map_db_topic`(`db_name`,`topic_name`) values ('Druid','unauthenticated.VES_MEASUREMENT_OUTPUT');

+insert into `map_db_topic`(`db_name`,`topic_name`) values ('HDFS','unauthenticated.VES_MEASUREMENT_OUTPUT');

 

 insert into portal (`name`,`related_db`, host) values ('Kibana', 'Elasticsearch', 'dl_es');

 insert into portal (`name`,`related_db`) values ('Elasticsearch', 'Elasticsearch');

diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
index 3073716..acb48ae 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
@@ -100,6 +100,15 @@
 	@Column(name = "`message_id_path`")
 	private String messageIdPath;
 
+	//paths to the array that need aggregation, comma separated, example: "/event/measurementsForVfScalingFields/diskUsageArray,/event/measurementsForVfScalingFields/cpuUsageArray,/event/measurementsForVfScalingFields/vNicPerformanceArray"
+	@Column(name = "`aggregate_array_path`") 
+	private String aggregateArrayPath;
+
+	//paths to the element in array that need flatten, this element is used as label, comma separated, 
+	//example: "/event/measurementsForVfScalingFields/astriMeasurement/astriDPMeasurementArray/astriInterface,..."
+	@Column(name = "`flatten_array_path`") 
+	private String flattenArrayPath;
+	
 	public Topic() {
 	}
 
@@ -149,6 +158,8 @@
 		tConfig.setSaveRaw(isSaveRaw());
 		tConfig.setCorrelateClearedMessage(isCorrelateClearedMessage());
 		tConfig.setMessageIdPath(getMessageIdPath());
+		tConfig.setAggregateArrayPath(getAggregateArrayPath());
+		tConfig.setFlattenArrayPath(getFlattenArrayPath());
 		tConfig.setTtl(getTtl());
 		Set<Db> topicDb = getDbs();
 		List<String> dbList = new ArrayList<>();
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java
index deaa096..8dfe1b1 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java
@@ -51,6 +51,8 @@
 	private int ttl;
 	private boolean correlateClearedMessage;
 	private String messageIdPath;
+	private String aggregateArrayPath;
+	private String flattenArrayPath;
 
 	public DataFormat getDataFormat2() {
 		if (dataFormat != null) {
@@ -60,7 +62,6 @@
 		}
 	}
 
-
 	public boolean supportHdfs() {
 		return containDb("HDFS");
 	}
@@ -105,6 +106,26 @@
 		return id;
 	}
 
+	public String[] getAggregateArrayPath2() {
+		String[] ret = null;
+
+		if (StringUtils.isNotBlank(aggregateArrayPath)) {
+			ret = aggregateArrayPath.split(",");
+		}
+
+		return ret;
+	}
+
+	public String[] getFlattenArrayPath2() {
+		String[] ret = null;
+
+		if (StringUtils.isNotBlank(flattenArrayPath)) {
+			ret = flattenArrayPath.split(",");
+		}
+
+		return ret;
+	}
+
 	@Override
 	public String toString() {
 		return name;
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
index 03faeb8..2a2f997 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
@@ -27,12 +27,14 @@
 import javax.annotation.PostConstruct;
 
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.json.JSONObject;
 import org.json.XML;
 import org.onap.datalake.feeder.config.ApplicationConfiguration;
 import org.onap.datalake.feeder.dto.TopicConfig;
 import org.onap.datalake.feeder.enumeration.DataFormat;
+import org.onap.datalake.feeder.util.JsonUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -100,7 +102,7 @@
 		saveJsons(topicConfig, docs, messages);
 	}
 
-	private JSONObject messageToJson(TopicConfig topic, Pair<Long, String> pair) throws IOException {
+	private JSONObject messageToJson(TopicConfig topicConfig, Pair<Long, String> pair) throws IOException {
 
 		long timestamp = pair.getLeft();
 		String text = pair.getRight();
@@ -111,11 +113,11 @@
 		//		log.debug("{} ={}", topicStr, text);
 		//}
 
-		boolean storeRaw = topic.isSaveRaw();
+		boolean storeRaw = topicConfig.isSaveRaw();
 
 		JSONObject json = null;
 
-		DataFormat dataFormat = topic.getDataFormat2();
+		DataFormat dataFormat = topicConfig.getDataFormat2();
 
 		switch (dataFormat) {
 		case JSON:
@@ -146,6 +148,20 @@
 			json.put(config.getRawDataLabel(), text);
 		}
 
+		if (StringUtils.isNotBlank(topicConfig.getAggregateArrayPath())) {
+			String[] paths = topicConfig.getAggregateArrayPath2();
+			for (String path : paths) {
+				JsonUtil.arrayAggregate(path, json);
+			}
+		}
+
+		if (StringUtils.isNotBlank(topicConfig.getFlattenArrayPath())) {
+			String[] paths = topicConfig.getFlattenArrayPath2();
+			for (String path : paths) {
+				JsonUtil.flattenArray(path, json);
+			}
+		}
+
 		return json;
 	}
 
@@ -168,9 +184,9 @@
 	}
 
 	public void flush() { //force flush all buffer 
-		hdfsService.flush(); 
+		hdfsService.flush();
 	}
-	
+
 	public void flushStall() { //flush stall buffer
 		hdfsService.flushStall();
 	}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
index f0b000b..64e8b8b 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
@@ -123,6 +123,8 @@
 		topic.setCorrelateClearedMessage(tConfig.isCorrelateClearedMessage());
 		topic.setDataFormat(tConfig.getDataFormat());
 		topic.setMessageIdPath(tConfig.getMessageIdPath());
+		topic.setAggregateArrayPath(tConfig.getAggregateArrayPath());
+		topic.setFlattenArrayPath(tConfig.getFlattenArrayPath());
 
 		if(tConfig.getSinkdbs() != null) {
 			for (String item : tConfig.getSinkdbs()) {
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/JsonUtil.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/JsonUtil.java
new file mode 100644
index 0000000..db4dcfa
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/JsonUtil.java
@@ -0,0 +1,158 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DCAE
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+
+package org.onap.datalake.feeder.util;
+
+import java.util.HashMap;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.json.JSONArray;
+import org.json.JSONObject;
+
+import lombok.Getter;
+
+/**
+ * utils for JSON
+ * 
+ * @author Guobiao Mo
+ *
+ */
+public class JsonUtil {
+
+	@Getter
+	enum AggregateType {
+		ALL("aggregate"), AVEARGE("average"), SUM("sum"), MAX("max"), MIN("min"), COUNT("count");
+		private final String name;
+
+		AggregateType(String name) {
+			this.name = name;
+		}
+
+		public String getLabel(String path) {
+			return path.substring(path.lastIndexOf('/') + 1) + "_" + name;
+		}
+	}
+
+	public static void flattenArray(String path, JSONObject json) {
+		//path = /event/measurementsForVfScalingFields/astriMeasurement/astriDPMeasurementArray/astriInterface
+
+		int index1 = path.lastIndexOf('/');
+
+		String arrayPath = path.substring(0, index1);// /event/measurementsForVfScalingFields/astriMeasurement/astriDPMeasurementArray
+
+		Object obj;
+		try {
+			obj = json.query(arrayPath);
+		} catch (org.json.JSONPointerException e) {
+			return;
+		}
+		if (obj == null || !(obj instanceof JSONArray)) {
+			return;
+		}
+		Iterable<JSONObject> subjsonaArray = (Iterable<JSONObject>) obj;
+
+		String tagName = path.substring(index1 + 1);//astriInterface
+
+		int index2 = path.lastIndexOf('/', index1 - 1);
+		String arrayName = path.substring(index2 + 1, index1);//astriDPMeasurementArray
+
+		String parentPath = path.substring(0, index2);// /event/measurementsForVfScalingFields/astriMeasurement
+		JSONObject parent = (JSONObject) json.query(parentPath);
+
+		for (JSONObject element : subjsonaArray) {
+			String tagValue = element.get(tagName).toString();
+			String label = arrayName + "_" + tagName + "_" + tagValue;
+
+			parent.put(label, element);
+		}
+	}
+
+	/**
+	 * json got modified.
+	 * 
+	 * @param aggregateType
+	 * @param path
+	 * @param json
+	 */
+	public static void arrayAggregate(String path, JSONObject json) {
+		HashMap<String, Double> sumHashMap = new HashMap<>();
+		HashMap<String, Double> maxHashMap = new HashMap<>();
+		HashMap<String, Double> minHashMap = new HashMap<>();
+
+		Object obj;
+		try {
+			obj = json.query(path);
+		} catch (org.json.JSONPointerException e) {
+			return;
+		}
+		if (obj == null || !(obj instanceof JSONArray)) {
+			return;
+		}
+		Iterable<JSONObject> subjsonaArray = (Iterable<JSONObject>) obj;
+
+		int count = 0;
+		for (JSONObject element : subjsonaArray) {
+			String[] names = JSONObject.getNames(element);
+			for (String name : names) {
+				Number value = element.optNumber(name);
+				if (value != null) {
+					double existing = sumHashMap.computeIfAbsent(name, k -> 0.0);
+					sumHashMap.put(name, existing + value.doubleValue());
+
+					existing = maxHashMap.computeIfAbsent(name, k -> Double.MIN_VALUE);
+					maxHashMap.put(name, Math.max(existing, value.doubleValue()));
+
+					existing = minHashMap.computeIfAbsent(name, k -> Double.MAX_VALUE);
+					minHashMap.put(name, Math.min(existing, value.doubleValue()));
+				}
+			}
+			count++;
+		}
+
+		if (count == 0) {
+			return;
+		}
+
+		JSONObject parentJson = (JSONObject) json.query(path.substring(0, path.lastIndexOf('/')));
+
+		//sum
+		JSONObject aggJson = new JSONObject(sumHashMap);
+		parentJson.put(AggregateType.SUM.getLabel(path), aggJson);
+
+		//AVEARGE
+		int c = count;//need to be Effectively Final
+		sumHashMap.replaceAll((k, v) -> v / c);
+		aggJson = new JSONObject(sumHashMap);
+		parentJson.put(AggregateType.AVEARGE.getLabel(path), aggJson);
+
+		//Count
+		parentJson.put(AggregateType.COUNT.getLabel(path), count);
+
+		//Max
+		aggJson = new JSONObject(maxHashMap);
+		parentJson.put(AggregateType.MAX.getLabel(path), aggJson);
+
+		//Min
+		aggJson = new JSONObject(minHashMap);
+		parentJson.put(AggregateType.MIN.getLabel(path), aggJson);
+
+	}
+
+}
diff --git a/components/datalake-handler/feeder/src/main/resources/application.properties b/components/datalake-handler/feeder/src/main/resources/application.properties
index a105473..7bbbac0 100644
--- a/components/datalake-handler/feeder/src/main/resources/application.properties
+++ b/components/datalake-handler/feeder/src/main/resources/application.properties
@@ -42,7 +42,7 @@
 kafkaConsumerCount=3
 
 #####################Elasticsearch
-elasticsearchType=doc
+elasticsearchType=_doc
 
 #####################HDFS
 hdfsBufferSize=4096
diff --git a/components/datalake-handler/feeder/src/main/resources/elasticsearch/mappings/readme.txt b/components/datalake-handler/feeder/src/main/resources/elasticsearch/mappings/readme.txt
new file mode 100644
index 0000000..8851353
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/resources/elasticsearch/mappings/readme.txt
@@ -0,0 +1,4 @@
+before creating index

+PUT http://dl_es:9200/unauthenticated.ves_measurement_output

+application/json

+body from unauthenticated.ves_measurement_output.json
\ No newline at end of file
diff --git a/components/datalake-handler/feeder/src/main/resources/elasticsearch/mappings/unauthenticated.ves_measurement_output.json b/components/datalake-handler/feeder/src/main/resources/elasticsearch/mappings/unauthenticated.ves_measurement_output.json
new file mode 100644
index 0000000..9a53b70
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/resources/elasticsearch/mappings/unauthenticated.ves_measurement_output.json
@@ -0,0 +1,31 @@
+{ 

+	"mappings": { 

+			"properties": {

+				"datalake_ts_": {

+					"type": "date",

+					"format": "epoch_millis"

+				},				

+				"event.commonEventHeader.internalHeaderFields.collectorTimeStamp": {

+					"type": "date",

+					"format":"EEE, MM dd yyyy HH:mm:ss z"

+				},

+				"event.commonEventHeader.startEpochMicrosec": {

+					"type": "date",

+					"format": "epoch_millis"

+				},

+				"event.commonEventHeader.lastEpochMicrosec": {

+					"type": "date",

+					"format": "epoch_millis"

+				},

+				"event.measurementsForVfScalingFields.diskUsageArray": {

+					"type": "nested"

+				},

+				"event.measurementsForVfScalingFields.cpuUsageArray": {

+					"type": "nested"

+				},

+				"event.measurementsForVfScalingFields.vNicPerformanceArray": {

+					"type": "nested"

+				}

+			} 

+	}

+}
\ No newline at end of file
diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/config/ApplicationConfigurationTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/config/ApplicationConfigurationTest.java
index 617b50e..0c56d5a 100644
--- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/config/ApplicationConfigurationTest.java
+++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/config/ApplicationConfigurationTest.java
@@ -68,6 +68,7 @@
         assertNotNull(config.getRawDataLabel());
         assertNotNull(config.getTimestampLabel());
         assertNotNull(config.getElasticsearchType());
+        assertNotNull(config.getDatalakeVersion());
         
       //HDFS
         assertTrue(config.getHdfsBatchSize()>0);
diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/dto/TopicConfigTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/dto/TopicConfigTest.java
index bb31cd7..f52332a 100644
--- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/dto/TopicConfigTest.java
+++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/dto/TopicConfigTest.java
@@ -77,6 +77,23 @@
     }
 
     @Test
+    public void testArrayPath() {
+        Topic topic = new Topic("testArrayPath");
+        topic.setAggregateArrayPath("/data/data2/value,/data/data3");
+        topic.setFlattenArrayPath("/data/data2/value,/data/data3");
+
+        TopicConfig topicConfig = topic.getTopicConfig();
+
+        String[] value = topicConfig.getAggregateArrayPath2();
+        assertEquals(value[0], "/data/data2/value");
+        assertEquals(value[1], "/data/data3");
+
+        value = topicConfig.getFlattenArrayPath2();
+        assertEquals(value[0], "/data/data2/value");
+        assertEquals(value[1], "/data/data3");
+    }
+
+    @Test
     public void testIs() {
         Topic testTopic = new Topic("test");
 
diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/StoreServiceTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/StoreServiceTest.java
index 44e7632..fc05d1d 100644
--- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/StoreServiceTest.java
+++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/StoreServiceTest.java
@@ -93,6 +93,8 @@
 		testInit();
 
 		TopicConfig topicConfig = createTopicConfig("test1", "JSON");
+		topicConfig.setAggregateArrayPath("/test");
+		topicConfig.setFlattenArrayPath("/test");
 
 		topicConfig = createTopicConfig("test2", "XML");
 		topicConfig.setSaveRaw(false);
diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/TopicServiceTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/TopicServiceTest.java
index 265ec96..774cd22 100644
--- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/TopicServiceTest.java
+++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/TopicServiceTest.java
@@ -21,6 +21,7 @@
 package org.onap.datalake.feeder.service;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -70,6 +71,8 @@
 		String name = "a";
 		when(topicRepository.findById(name)).thenReturn(Optional.of(new Topic(name)));
 		assertEquals(topicService.getTopic(name), new Topic(name));
+		
+		assertFalse(topicService.istDefaultTopic(new Topic(name)));
 	}
 
 	@Test
diff --git a/components/datalake-handler/feeder/src/test/resources/application.properties b/components/datalake-handler/feeder/src/test/resources/application.properties
index 189adec..64ecdee 100644
--- a/components/datalake-handler/feeder/src/test/resources/application.properties
+++ b/components/datalake-handler/feeder/src/test/resources/application.properties
@@ -14,36 +14,37 @@
 
 defaultTopicName=_DL_DEFAULT_
 
-#how often do we check topic setting update, in millisecond
-topicCheckInterval=60000
 
 #####################DMaaP
 #dmaapZookeeperHostPort=127.0.0.1:2181
 #dmaapKafkaHostPort=127.0.0.1:9092
 dmaapZookeeperHostPort=message-router-zookeeper:2181
 dmaapKafkaHostPort=message-router-kafka:9092
-dmaapKafkaGroup=dlgroup19
+dmaapKafkaGroup=dlgroup44
 #in second
 dmaapKafkaTimeout=60
 dmaapKafkaExclude[0]=__consumer_offsets
 dmaapKafkaExclude[1]=__transaction_state
-dmaapKafkaExclude[2]=msgrtr.apinode.metrics.dmaap
+#dmaapKafkaExclude[2]=msgrtr.apinode.metrics.dmaap
 #check for new topics , in millisecond
-dmaapCheckNewTopicInterval=300000
+dmaapCheckNewTopicInterval=60000
 
-kafkaConsumerCount=1
+kafkaConsumerCount=3
 
 #####################Elasticsearch
-elasticsearchType=doc
+elasticsearchType=_doc
 
 #####################HDFS
 hdfsBufferSize=4096
 #how often we flush stall updates, in millisecond
-hdfsFlushInterval=10000
-hdfsBatchSize=250
+hdfsFlushInterval=30000
+hdfsBatchSize=500
 
 #####################Logging
 logging.level.org.springframework.web=ERROR
 logging.level.com.att.nsa.apiClient.http=ERROR
 logging.level.org.onap.datalake=DEBUG
+
+#####################Verison
+datalakeVersion=0.0.1
  
diff --git a/components/datalake-handler/pom.xml b/components/datalake-handler/pom.xml
index 0a48af6..d8caee8 100644
--- a/components/datalake-handler/pom.xml
+++ b/components/datalake-handler/pom.xml
@@ -33,7 +33,7 @@
 		<springcouchbase.version>3.1.2.RELEASE</springcouchbase.version>
 		<jackson.version>2.9.8</jackson.version>
 		<kafka.version>2.0.0</kafka.version>
-		<elasticsearchjava.version>7.0.0</elasticsearchjava.version>
+		<elasticsearchjava.version>7.1.1</elasticsearchjava.version>
 		<hadoop.version>3.2.0</hadoop.version>
 
 	</properties>