Flatten and Aggregate features in JSON array processing
Issue-ID: DCAEGEN2-1598
Change-Id: I9f563bcfa18285daf7b48878e8427bfdb1aff21f
Signed-off-by: Guobiao Mo <guobiaomo@chinamobile.com>
diff --git a/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql b/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql
index fd9b3dc..ad142dc 100644
--- a/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql
+++ b/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql
@@ -7,6 +7,8 @@
`enabled` bit(1) DEFAULT 0,
`login` varchar(255) DEFAULT NULL,
`message_id_path` varchar(255) DEFAULT NULL,
+ `aggregate_array_path` varchar(2000) DEFAULT NULL,
+ `flatten_array_path` varchar(2000) DEFAULT NULL,
`pass` varchar(255) DEFAULT NULL,
`save_raw` bit(1) DEFAULT NULL,
`ttl` int(11) DEFAULT NULL,
@@ -86,6 +88,12 @@
-- in production, default enabled should be off
insert into `topic`(`name`,`enabled`,`save_raw`,`ttl`,`data_format`) values ('_DL_DEFAULT_',1,0,3650,'JSON');
+insert into `topic`(`name`,correlate_cleared_message,`enabled`, message_id_path,`data_format`) values ('unauthenticated.SEC_FAULT_OUTPUT',1,1,'/event/commonEventHeader/eventName,/event/commonEventHeader/reportingEntityName,/event/faultFields/specificProblem,/event/commonEventHeader/eventId','JSON');
+insert into `topic`(`name`,`enabled`, aggregate_array_path,flatten_array_path,`data_format`)
+values ('unauthenticated.VES_MEASUREMENT_OUTPUT',1,
+'/event/measurementsForVfScalingFields/diskUsageArray,/event/measurementsForVfScalingFields/cpuUsageArray,/event/measurementsForVfScalingFields/vNicPerformanceArray',
+'/event/measurementsForVfScalingFields/astriMeasurement/astriDPMeasurementArray/astriInterface',
+'JSON');
insert into `map_db_topic`(`db_name`,`topic_name`) values ('Couchbase','_DL_DEFAULT_');
insert into `map_db_topic`(`db_name`,`topic_name`) values ('Elasticsearch','_DL_DEFAULT_');
@@ -93,14 +101,17 @@
insert into `map_db_topic`(`db_name`,`topic_name`) values ('Druid','_DL_DEFAULT_');
insert into `map_db_topic`(`db_name`,`topic_name`) values ('HDFS','_DL_DEFAULT_');
-insert into `topic`(`name`,correlate_cleared_message,`enabled`, message_id_path,`data_format`) values ('unauthenticated.SEC_FAULT_OUTPUT',1,1,'/event/commonEventHeader/eventName,/event/commonEventHeader/reportingEntityName,/event/faultFields/specificProblem,/event/commonEventHeader/eventId','JSON');
-
insert into `map_db_topic`(`db_name`,`topic_name`) values ('Couchbase','unauthenticated.SEC_FAULT_OUTPUT');
insert into `map_db_topic`(`db_name`,`topic_name`) values ('Elasticsearch','unauthenticated.SEC_FAULT_OUTPUT');
insert into `map_db_topic`(`db_name`,`topic_name`) values ('MongoDB','unauthenticated.SEC_FAULT_OUTPUT');
insert into `map_db_topic`(`db_name`,`topic_name`) values ('Druid','unauthenticated.SEC_FAULT_OUTPUT');
insert into `map_db_topic`(`db_name`,`topic_name`) values ('HDFS','unauthenticated.SEC_FAULT_OUTPUT');
+insert into `map_db_topic`(`db_name`,`topic_name`) values ('Couchbase','unauthenticated.VES_MEASUREMENT_OUTPUT');
+insert into `map_db_topic`(`db_name`,`topic_name`) values ('Elasticsearch','unauthenticated.VES_MEASUREMENT_OUTPUT');
+insert into `map_db_topic`(`db_name`,`topic_name`) values ('MongoDB','unauthenticated.VES_MEASUREMENT_OUTPUT');
+insert into `map_db_topic`(`db_name`,`topic_name`) values ('Druid','unauthenticated.VES_MEASUREMENT_OUTPUT');
+insert into `map_db_topic`(`db_name`,`topic_name`) values ('HDFS','unauthenticated.VES_MEASUREMENT_OUTPUT');
insert into portal (`name`,`related_db`, host) values ('Kibana', 'Elasticsearch', 'dl_es');
insert into portal (`name`,`related_db`) values ('Elasticsearch', 'Elasticsearch');
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
index 3073716..acb48ae 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/domain/Topic.java
@@ -100,6 +100,15 @@
@Column(name = "`message_id_path`")
private String messageIdPath;
+ //paths to the array that need aggregation, comma separated, example: "/event/measurementsForVfScalingFields/diskUsageArray,/event/measurementsForVfScalingFields/cpuUsageArray,/event/measurementsForVfScalingFields/vNicPerformanceArray"
+ @Column(name = "`aggregate_array_path`")
+ private String aggregateArrayPath;
+
+ //paths to the element in array that need flatten, this element is used as label, comma separated,
+ //example: "/event/measurementsForVfScalingFields/astriMeasurement/astriDPMeasurementArray/astriInterface,..."
+ @Column(name = "`flatten_array_path`")
+ private String flattenArrayPath;
+
public Topic() {
}
@@ -149,6 +158,8 @@
tConfig.setSaveRaw(isSaveRaw());
tConfig.setCorrelateClearedMessage(isCorrelateClearedMessage());
tConfig.setMessageIdPath(getMessageIdPath());
+ tConfig.setAggregateArrayPath(getAggregateArrayPath());
+ tConfig.setFlattenArrayPath(getFlattenArrayPath());
tConfig.setTtl(getTtl());
Set<Db> topicDb = getDbs();
List<String> dbList = new ArrayList<>();
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java
index deaa096..8dfe1b1 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/dto/TopicConfig.java
@@ -51,6 +51,8 @@
private int ttl;
private boolean correlateClearedMessage;
private String messageIdPath;
+ private String aggregateArrayPath;
+ private String flattenArrayPath;
public DataFormat getDataFormat2() {
if (dataFormat != null) {
@@ -60,7 +62,6 @@
}
}
-
public boolean supportHdfs() {
return containDb("HDFS");
}
@@ -105,6 +106,26 @@
return id;
}
+ public String[] getAggregateArrayPath2() {
+ String[] ret = null;
+
+ if (StringUtils.isNotBlank(aggregateArrayPath)) {
+ ret = aggregateArrayPath.split(",");
+ }
+
+ return ret;
+ }
+
+ public String[] getFlattenArrayPath2() {
+ String[] ret = null;
+
+ if (StringUtils.isNotBlank(flattenArrayPath)) {
+ ret = flattenArrayPath.split(",");
+ }
+
+ return ret;
+ }
+
@Override
public String toString() {
return name;
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
index 03faeb8..2a2f997 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/StoreService.java
@@ -27,12 +27,14 @@
import javax.annotation.PostConstruct;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.json.JSONObject;
import org.json.XML;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
import org.onap.datalake.feeder.dto.TopicConfig;
import org.onap.datalake.feeder.enumeration.DataFormat;
+import org.onap.datalake.feeder.util.JsonUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -100,7 +102,7 @@
saveJsons(topicConfig, docs, messages);
}
- private JSONObject messageToJson(TopicConfig topic, Pair<Long, String> pair) throws IOException {
+ private JSONObject messageToJson(TopicConfig topicConfig, Pair<Long, String> pair) throws IOException {
long timestamp = pair.getLeft();
String text = pair.getRight();
@@ -111,11 +113,11 @@
// log.debug("{} ={}", topicStr, text);
//}
- boolean storeRaw = topic.isSaveRaw();
+ boolean storeRaw = topicConfig.isSaveRaw();
JSONObject json = null;
- DataFormat dataFormat = topic.getDataFormat2();
+ DataFormat dataFormat = topicConfig.getDataFormat2();
switch (dataFormat) {
case JSON:
@@ -146,6 +148,20 @@
json.put(config.getRawDataLabel(), text);
}
+ if (StringUtils.isNotBlank(topicConfig.getAggregateArrayPath())) {
+ String[] paths = topicConfig.getAggregateArrayPath2();
+ for (String path : paths) {
+ JsonUtil.arrayAggregate(path, json);
+ }
+ }
+
+ if (StringUtils.isNotBlank(topicConfig.getFlattenArrayPath())) {
+ String[] paths = topicConfig.getFlattenArrayPath2();
+ for (String path : paths) {
+ JsonUtil.flattenArray(path, json);
+ }
+ }
+
return json;
}
@@ -168,9 +184,9 @@
}
public void flush() { //force flush all buffer
- hdfsService.flush();
+ hdfsService.flush();
}
-
+
public void flushStall() { //flush stall buffer
hdfsService.flushStall();
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
index f0b000b..64e8b8b 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicService.java
@@ -123,6 +123,8 @@
topic.setCorrelateClearedMessage(tConfig.isCorrelateClearedMessage());
topic.setDataFormat(tConfig.getDataFormat());
topic.setMessageIdPath(tConfig.getMessageIdPath());
+ topic.setAggregateArrayPath(tConfig.getAggregateArrayPath());
+ topic.setFlattenArrayPath(tConfig.getFlattenArrayPath());
if(tConfig.getSinkdbs() != null) {
for (String item : tConfig.getSinkdbs()) {
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/JsonUtil.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/JsonUtil.java
new file mode 100644
index 0000000..db4dcfa
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/util/JsonUtil.java
@@ -0,0 +1,158 @@
+/*
+* ============LICENSE_START=======================================================
+* ONAP : DCAE
+* ================================================================================
+* Copyright 2019 China Mobile
+*=================================================================================
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* ============LICENSE_END=========================================================
+*/
+
+package org.onap.datalake.feeder.util;
+
+import java.util.HashMap;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.json.JSONArray;
+import org.json.JSONObject;
+
+import lombok.Getter;
+
+/**
+ * utils for JSON
+ *
+ * @author Guobiao Mo
+ *
+ */
+public class JsonUtil {
+
+ @Getter
+ enum AggregateType {
+ ALL("aggregate"), AVEARGE("average"), SUM("sum"), MAX("max"), MIN("min"), COUNT("count");
+ private final String name;
+
+ AggregateType(String name) {
+ this.name = name;
+ }
+
+ public String getLabel(String path) {
+ return path.substring(path.lastIndexOf('/') + 1) + "_" + name;
+ }
+ }
+
+ public static void flattenArray(String path, JSONObject json) {
+ //path = /event/measurementsForVfScalingFields/astriMeasurement/astriDPMeasurementArray/astriInterface
+
+ int index1 = path.lastIndexOf('/');
+
+ String arrayPath = path.substring(0, index1);// /event/measurementsForVfScalingFields/astriMeasurement/astriDPMeasurementArray
+
+ Object obj;
+ try {
+ obj = json.query(arrayPath);
+ } catch (org.json.JSONPointerException e) {
+ return;
+ }
+ if (obj == null || !(obj instanceof JSONArray)) {
+ return;
+ }
+ Iterable<JSONObject> subjsonaArray = (Iterable<JSONObject>) obj;
+
+ String tagName = path.substring(index1 + 1);//astriInterface
+
+ int index2 = path.lastIndexOf('/', index1 - 1);
+ String arrayName = path.substring(index2 + 1, index1);//astriDPMeasurementArray
+
+ String parentPath = path.substring(0, index2);// /event/measurementsForVfScalingFields/astriMeasurement
+ JSONObject parent = (JSONObject) json.query(parentPath);
+
+ for (JSONObject element : subjsonaArray) {
+ String tagValue = element.get(tagName).toString();
+ String label = arrayName + "_" + tagName + "_" + tagValue;
+
+ parent.put(label, element);
+ }
+ }
+
+ /**
+ * json got modified.
+ *
+ * @param aggregateType
+ * @param path
+ * @param json
+ */
+ public static void arrayAggregate(String path, JSONObject json) {
+ HashMap<String, Double> sumHashMap = new HashMap<>();
+ HashMap<String, Double> maxHashMap = new HashMap<>();
+ HashMap<String, Double> minHashMap = new HashMap<>();
+
+ Object obj;
+ try {
+ obj = json.query(path);
+ } catch (org.json.JSONPointerException e) {
+ return;
+ }
+ if (obj == null || !(obj instanceof JSONArray)) {
+ return;
+ }
+ Iterable<JSONObject> subjsonaArray = (Iterable<JSONObject>) obj;
+
+ int count = 0;
+ for (JSONObject element : subjsonaArray) {
+ String[] names = JSONObject.getNames(element);
+ for (String name : names) {
+ Number value = element.optNumber(name);
+ if (value != null) {
+ double existing = sumHashMap.computeIfAbsent(name, k -> 0.0);
+ sumHashMap.put(name, existing + value.doubleValue());
+
+ existing = maxHashMap.computeIfAbsent(name, k -> Double.MIN_VALUE);
+ maxHashMap.put(name, Math.max(existing, value.doubleValue()));
+
+ existing = minHashMap.computeIfAbsent(name, k -> Double.MAX_VALUE);
+ minHashMap.put(name, Math.min(existing, value.doubleValue()));
+ }
+ }
+ count++;
+ }
+
+ if (count == 0) {
+ return;
+ }
+
+ JSONObject parentJson = (JSONObject) json.query(path.substring(0, path.lastIndexOf('/')));
+
+ //sum
+ JSONObject aggJson = new JSONObject(sumHashMap);
+ parentJson.put(AggregateType.SUM.getLabel(path), aggJson);
+
+ //AVEARGE
+ int c = count;//need to be Effectively Final
+ sumHashMap.replaceAll((k, v) -> v / c);
+ aggJson = new JSONObject(sumHashMap);
+ parentJson.put(AggregateType.AVEARGE.getLabel(path), aggJson);
+
+ //Count
+ parentJson.put(AggregateType.COUNT.getLabel(path), count);
+
+ //Max
+ aggJson = new JSONObject(maxHashMap);
+ parentJson.put(AggregateType.MAX.getLabel(path), aggJson);
+
+ //Min
+ aggJson = new JSONObject(minHashMap);
+ parentJson.put(AggregateType.MIN.getLabel(path), aggJson);
+
+ }
+
+}
diff --git a/components/datalake-handler/feeder/src/main/resources/application.properties b/components/datalake-handler/feeder/src/main/resources/application.properties
index a105473..7bbbac0 100644
--- a/components/datalake-handler/feeder/src/main/resources/application.properties
+++ b/components/datalake-handler/feeder/src/main/resources/application.properties
@@ -42,7 +42,7 @@
kafkaConsumerCount=3
#####################Elasticsearch
-elasticsearchType=doc
+elasticsearchType=_doc
#####################HDFS
hdfsBufferSize=4096
diff --git a/components/datalake-handler/feeder/src/main/resources/elasticsearch/mappings/readme.txt b/components/datalake-handler/feeder/src/main/resources/elasticsearch/mappings/readme.txt
new file mode 100644
index 0000000..8851353
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/resources/elasticsearch/mappings/readme.txt
@@ -0,0 +1,4 @@
+before creating index
+PUT http://dl_es:9200/unauthenticated.ves_measurement_output
+application/json
+body from unauthenticated.ves_measurement_output.json
\ No newline at end of file
diff --git a/components/datalake-handler/feeder/src/main/resources/elasticsearch/mappings/unauthenticated.ves_measurement_output.json b/components/datalake-handler/feeder/src/main/resources/elasticsearch/mappings/unauthenticated.ves_measurement_output.json
new file mode 100644
index 0000000..9a53b70
--- /dev/null
+++ b/components/datalake-handler/feeder/src/main/resources/elasticsearch/mappings/unauthenticated.ves_measurement_output.json
@@ -0,0 +1,31 @@
+{
+ "mappings": {
+ "properties": {
+ "datalake_ts_": {
+ "type": "date",
+ "format": "epoch_millis"
+ },
+ "event.commonEventHeader.internalHeaderFields.collectorTimeStamp": {
+ "type": "date",
+ "format":"EEE, MM dd yyyy HH:mm:ss z"
+ },
+ "event.commonEventHeader.startEpochMicrosec": {
+ "type": "date",
+ "format": "epoch_millis"
+ },
+ "event.commonEventHeader.lastEpochMicrosec": {
+ "type": "date",
+ "format": "epoch_millis"
+ },
+ "event.measurementsForVfScalingFields.diskUsageArray": {
+ "type": "nested"
+ },
+ "event.measurementsForVfScalingFields.cpuUsageArray": {
+ "type": "nested"
+ },
+ "event.measurementsForVfScalingFields.vNicPerformanceArray": {
+ "type": "nested"
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/config/ApplicationConfigurationTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/config/ApplicationConfigurationTest.java
index 617b50e..0c56d5a 100644
--- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/config/ApplicationConfigurationTest.java
+++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/config/ApplicationConfigurationTest.java
@@ -68,6 +68,7 @@
assertNotNull(config.getRawDataLabel());
assertNotNull(config.getTimestampLabel());
assertNotNull(config.getElasticsearchType());
+ assertNotNull(config.getDatalakeVersion());
//HDFS
assertTrue(config.getHdfsBatchSize()>0);
diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/dto/TopicConfigTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/dto/TopicConfigTest.java
index bb31cd7..f52332a 100644
--- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/dto/TopicConfigTest.java
+++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/dto/TopicConfigTest.java
@@ -77,6 +77,23 @@
}
@Test
+ public void testArrayPath() {
+ Topic topic = new Topic("testArrayPath");
+ topic.setAggregateArrayPath("/data/data2/value,/data/data3");
+ topic.setFlattenArrayPath("/data/data2/value,/data/data3");
+
+ TopicConfig topicConfig = topic.getTopicConfig();
+
+ String[] value = topicConfig.getAggregateArrayPath2();
+ assertEquals(value[0], "/data/data2/value");
+ assertEquals(value[1], "/data/data3");
+
+ value = topicConfig.getFlattenArrayPath2();
+ assertEquals(value[0], "/data/data2/value");
+ assertEquals(value[1], "/data/data3");
+ }
+
+ @Test
public void testIs() {
Topic testTopic = new Topic("test");
diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/StoreServiceTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/StoreServiceTest.java
index 44e7632..fc05d1d 100644
--- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/StoreServiceTest.java
+++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/StoreServiceTest.java
@@ -93,6 +93,8 @@
testInit();
TopicConfig topicConfig = createTopicConfig("test1", "JSON");
+ topicConfig.setAggregateArrayPath("/test");
+ topicConfig.setFlattenArrayPath("/test");
topicConfig = createTopicConfig("test2", "XML");
topicConfig.setSaveRaw(false);
diff --git a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/TopicServiceTest.java b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/TopicServiceTest.java
index 265ec96..774cd22 100644
--- a/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/TopicServiceTest.java
+++ b/components/datalake-handler/feeder/src/test/java/org/onap/datalake/feeder/service/TopicServiceTest.java
@@ -21,6 +21,7 @@
package org.onap.datalake.feeder.service;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -70,6 +71,8 @@
String name = "a";
when(topicRepository.findById(name)).thenReturn(Optional.of(new Topic(name)));
assertEquals(topicService.getTopic(name), new Topic(name));
+
+ assertFalse(topicService.istDefaultTopic(new Topic(name)));
}
@Test
diff --git a/components/datalake-handler/feeder/src/test/resources/application.properties b/components/datalake-handler/feeder/src/test/resources/application.properties
index 189adec..64ecdee 100644
--- a/components/datalake-handler/feeder/src/test/resources/application.properties
+++ b/components/datalake-handler/feeder/src/test/resources/application.properties
@@ -14,36 +14,37 @@
defaultTopicName=_DL_DEFAULT_
-#how often do we check topic setting update, in millisecond
-topicCheckInterval=60000
#####################DMaaP
#dmaapZookeeperHostPort=127.0.0.1:2181
#dmaapKafkaHostPort=127.0.0.1:9092
dmaapZookeeperHostPort=message-router-zookeeper:2181
dmaapKafkaHostPort=message-router-kafka:9092
-dmaapKafkaGroup=dlgroup19
+dmaapKafkaGroup=dlgroup44
#in second
dmaapKafkaTimeout=60
dmaapKafkaExclude[0]=__consumer_offsets
dmaapKafkaExclude[1]=__transaction_state
-dmaapKafkaExclude[2]=msgrtr.apinode.metrics.dmaap
+#dmaapKafkaExclude[2]=msgrtr.apinode.metrics.dmaap
#check for new topics , in millisecond
-dmaapCheckNewTopicInterval=300000
+dmaapCheckNewTopicInterval=60000
-kafkaConsumerCount=1
+kafkaConsumerCount=3
#####################Elasticsearch
-elasticsearchType=doc
+elasticsearchType=_doc
#####################HDFS
hdfsBufferSize=4096
#how often we flush stall updates, in millisecond
-hdfsFlushInterval=10000
-hdfsBatchSize=250
+hdfsFlushInterval=30000
+hdfsBatchSize=500
#####################Logging
logging.level.org.springframework.web=ERROR
logging.level.com.att.nsa.apiClient.http=ERROR
logging.level.org.onap.datalake=DEBUG
+
+#####################Verison
+datalakeVersion=0.0.1
diff --git a/components/datalake-handler/pom.xml b/components/datalake-handler/pom.xml
index 0a48af6..d8caee8 100644
--- a/components/datalake-handler/pom.xml
+++ b/components/datalake-handler/pom.xml
@@ -33,7 +33,7 @@
<springcouchbase.version>3.1.2.RELEASE</springcouchbase.version>
<jackson.version>2.9.8</jackson.version>
<kafka.version>2.0.0</kafka.version>
- <elasticsearchjava.version>7.0.0</elasticsearchjava.version>
+ <elasticsearchjava.version>7.1.1</elasticsearchjava.version>
<hadoop.version>3.2.0</hadoop.version>
</properties>