‘correlateClearedMessage’
Change-Id: I32ea4043d32f29c920b370de3c84341c218ed5c2
Issue-ID: DCAEGEN2-1190
Signed-off-by: ZhangZihao <zhangzihao@chinamobile.com>
diff --git a/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql b/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql
index d674ead..44f4ef1 100644
--- a/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql
+++ b/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql
@@ -52,6 +52,7 @@
-- in production, default enabled should be off
insert into `topic`(`name`,`enabled`,`save_raw`,`ttl`,`data_format`) values ('_DL_DEFAULT_',1,0,3650,'JSON');
insert into `topic`(`name`,`enabled`) values ('__consumer_offsets',0);
+insert into `topic`(`name`,correlate_cleared_message,`enabled`,default_topic, message_id_path) values ('unauthenticated.SEC_FAULT_OUTPUT',1,0,'_DL_DEFAULT_', '/event/commonEventHeader/eventName,/event/commonEventHeader/reportingEntityName,/event/faultFields/specificProblem');
insert into `map_db_topic`(`db_name`,`topic_name`) values ('Couchbase','_DL_DEFAULT_');
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
index 21ca1b7..a3add0e 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
@@ -57,4 +57,6 @@
private String rawDataLabel;
private String defaultTopicName;
+
+ private String elasticsearchType;
}
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
index 2c16b2b..30aa733 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
@@ -26,8 +26,13 @@
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
+import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHost;
+import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.action.get.GetResponse;
+import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
@@ -38,6 +43,7 @@
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.rest.RestStatus;
import org.json.JSONObject;
import org.onap.datalake.feeder.config.ApplicationConfiguration;
import org.onap.datalake.feeder.domain.Db;
@@ -118,15 +124,15 @@
for (JSONObject json : jsons) {
if(topic.isCorrelateClearedMessage()) {
- boolean found = correlateClearedMessage(json);
+ boolean found = correlateClearedMessage(topic, json);
if(found) {
continue;
}
}
String id = topic.getMessageId(json); //id can be null
-
- request.add(new IndexRequest(topic.getName().toLowerCase(), "doc", id).source(json.toString(), XContentType.JSON));
+
+ request.add(new IndexRequest(topic.getName().toLowerCase(), config.getElasticsearchType(), id).source(json.toString(), XContentType.JSON));
}
if(config.isAsync()) {
client.bulkAsync(request, RequestOptions.DEFAULT, listener);
@@ -138,21 +144,96 @@
}
}
}
-
- private boolean correlateClearedMessage(JSONObject json) {
- boolean found = true;
-
- /*TODO
- * 1. check if this is a alarm cleared message
- * 2. search previous alarm message
- * 3. update previous message, if success, set found=true
- */
- //for Sonar test, remove the following
- if(json.isNull("kkkkk")) {
- found = false;
+
+ /**
+ *
+ * @param topic
+ * @param json
+ * @return boolean
+ *
+ * Because of query by id, The search API cannot be used for query.
+ * The search API can only query all data or based on the fields in the source.
+ * So use the get API, three parameters: index, type, document id
+ */
+ private boolean correlateClearedMessage(Topic topic, JSONObject json) {
+ boolean found = false;
+ String eName = null;
+
+ try {
+ eName = json.query("/event/commonEventHeader/eventName").toString();
+
+ if (StringUtils.isNotBlank(eName)) {
+
+ if (eName.endsWith("Cleared")) {
+
+ String name = eName.substring(0, eName.length() - 7);
+ String reportingEntityName = json.query("/event/commonEventHeader/reportingEntityName").toString();
+ String specificProblem = json.query("/event/faultFields/specificProblem").toString();
+
+ String id = null;
+ StringBuilder stringBuilder = new StringBuilder();
+ stringBuilder = stringBuilder.append(name).append('^').append(reportingEntityName).append('^').append(specificProblem);
+
+ id = stringBuilder.toString();//example: id = "aaaa^cccc^bbbbb"
+ String index = topic.getName().toLowerCase();
+
+ //get
+ GetRequest getRequest = new GetRequest(index, config.getElasticsearchType(), id);
+
+ GetResponse getResponse = null;
+ try {
+ getResponse = client.get(getRequest, RequestOptions.DEFAULT);
+ if (getResponse != null) {
+
+ if (getResponse.isExists()) {
+ String sourceAsString = getResponse.getSourceAsString();
+ JSONObject jsonObject = new JSONObject(sourceAsString);
+ jsonObject.getJSONObject("event").getJSONObject("faultFields").put("vfStatus", "closed");
+ String jsonString = jsonObject.toString();
+
+ //update
+ IndexRequest request = new IndexRequest(index, config.getElasticsearchType(), id);
+ request.source(jsonString, XContentType.JSON);
+ IndexResponse indexResponse = null;
+ try {
+ indexResponse = client.index(request, RequestOptions.DEFAULT);
+ found = true;
+ } catch (IOException e) {
+ log.error("save failure");
+ }
+ } else {
+ log.error("The getResponse was not exists" );
+ }
+
+ } else {
+ log.error("The document for this id was not found" );
+ }
+
+ } catch (ElasticsearchException e) {
+ if (e.status() == RestStatus.NOT_FOUND) {
+ log.error("The document for this id was not found" );
+ }
+ if (e.status() == RestStatus.CONFLICT) {
+ log.error("Version conflict" );
+ }
+ log.error("Get document exception", e);
+ }catch (IOException e) {
+ log.error(topic.getName() , e);
+ }
+
+ } else {
+ log.info("The data is normal");
+ }
+
+ } else {
+ log.debug("event id null");
+ }
+
+ } catch (Exception e) {
+ log.error("error",e);
}
-
- return found;
+
+ return found;
}
}
diff --git a/components/datalake-handler/feeder/src/main/resources/application.properties b/components/datalake-handler/feeder/src/main/resources/application.properties
index 1bd5339..dfe48a2 100644
--- a/components/datalake-handler/feeder/src/main/resources/application.properties
+++ b/components/datalake-handler/feeder/src/main/resources/application.properties
@@ -44,6 +44,7 @@
defaultTopicName=_DL_DEFAULT_
+elasticsearchType=doc
#Logging
logging.level.org.springframework.web=ERROR