‘correlateClearedMessage’

Change-Id: I32ea4043d32f29c920b370de3c84341c218ed5c2
Issue-ID: DCAEGEN2-1190
Signed-off-by: ZhangZihao <zhangzihao@chinamobile.com>
diff --git a/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql b/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql
index d674ead..44f4ef1 100644
--- a/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql
+++ b/components/datalake-handler/feeder/src/assembly/scripts/init_db.sql
@@ -52,6 +52,7 @@
 -- in production, default enabled should be off

 insert into `topic`(`name`,`enabled`,`save_raw`,`ttl`,`data_format`) values ('_DL_DEFAULT_',1,0,3650,'JSON');

 insert into `topic`(`name`,`enabled`) values ('__consumer_offsets',0);

+insert into `topic`(`name`,correlate_cleared_message,`enabled`,default_topic, message_id_path) values ('unauthenticated.SEC_FAULT_OUTPUT',1,0,'_DL_DEFAULT_', '/event/commonEventHeader/eventName,/event/commonEventHeader/reportingEntityName,/event/faultFields/specificProblem');

 

 

 insert into `map_db_topic`(`db_name`,`topic_name`) values ('Couchbase','_DL_DEFAULT_');

diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
index 21ca1b7..a3add0e 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/config/ApplicationConfiguration.java
@@ -57,4 +57,6 @@
 	private String rawDataLabel;
 	
 	private String defaultTopicName;
+
+	private String elasticsearchType;
 }
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
index 2c16b2b..30aa733 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/ElasticsearchService.java
@@ -26,8 +26,13 @@
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.http.HttpHost;
+import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.action.get.GetResponse;
+import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.client.indices.CreateIndexRequest;
 import org.elasticsearch.client.indices.CreateIndexResponse;
 import org.elasticsearch.client.indices.GetIndexRequest; 
@@ -38,6 +43,7 @@
 import org.elasticsearch.client.RestClient;
 import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.rest.RestStatus;
 import org.json.JSONObject;
 import org.onap.datalake.feeder.config.ApplicationConfiguration;
 import org.onap.datalake.feeder.domain.Db;
@@ -118,15 +124,15 @@
 
 		for (JSONObject json : jsons) {
 			if(topic.isCorrelateClearedMessage()) {
-				boolean found = correlateClearedMessage(json);
+				boolean found = correlateClearedMessage(topic, json);
 				if(found) {
 					continue;
 				}
 			}
 			
 			String id = topic.getMessageId(json); //id can be null
-			
-			request.add(new IndexRequest(topic.getName().toLowerCase(), "doc", id).source(json.toString(), XContentType.JSON));
+
+			request.add(new IndexRequest(topic.getName().toLowerCase(), config.getElasticsearchType(), id).source(json.toString(), XContentType.JSON));
 		}
 		if(config.isAsync()) {
 			client.bulkAsync(request, RequestOptions.DEFAULT, listener);			
@@ -138,21 +144,96 @@
 			}
 		}
 	}
-	
-	private boolean correlateClearedMessage(JSONObject json) {
-		boolean found = true;
-				
-		/*TODO
-		 * 1. check if this is a alarm cleared message
-		 * 2. search previous alarm message
-		 * 3. update previous message, if success, set found=true
-		 */
-		//for Sonar test, remove the following
-		if(json.isNull("kkkkk")) {
-			found = false;
+
+	/**
+	 *
+	 * @param topic
+	 * @param json
+	 * @return boolean
+	 *
+	 * Because of query by id, The search API cannot be used for query.
+	 * The search API can only query all data or based on the fields in the source.
+	 * So use the get API, three parameters: index, type, document id
+	 */
+	private boolean correlateClearedMessage(Topic topic, JSONObject json) {
+		boolean found = false;
+		String eName = null;
+
+		try {
+			eName = json.query("/event/commonEventHeader/eventName").toString();
+
+			if (StringUtils.isNotBlank(eName)) {
+
+				if (eName.endsWith("Cleared")) {
+
+					String name = eName.substring(0, eName.length() - 7);
+					String reportingEntityName = json.query("/event/commonEventHeader/reportingEntityName").toString();
+					String specificProblem = json.query("/event/faultFields/specificProblem").toString();
+
+					String id = null;
+					StringBuilder stringBuilder = new StringBuilder();
+					stringBuilder = stringBuilder.append(name).append('^').append(reportingEntityName).append('^').append(specificProblem);
+
+					id = stringBuilder.toString();//example: id = "aaaa^cccc^bbbbb"
+					String index = topic.getName().toLowerCase();
+
+					//get
+					GetRequest getRequest = new GetRequest(index, config.getElasticsearchType(), id);
+
+					GetResponse getResponse = null;
+					try {
+						getResponse = client.get(getRequest, RequestOptions.DEFAULT);
+						if (getResponse != null) {
+
+							if (getResponse.isExists()) {
+								String sourceAsString = getResponse.getSourceAsString();
+								JSONObject jsonObject = new JSONObject(sourceAsString);
+								jsonObject.getJSONObject("event").getJSONObject("faultFields").put("vfStatus", "closed");
+								String jsonString = jsonObject.toString();
+
+								//update
+								IndexRequest request = new IndexRequest(index, config.getElasticsearchType(), id);
+								request.source(jsonString, XContentType.JSON);
+								IndexResponse indexResponse = null;
+								try {
+									indexResponse = client.index(request, RequestOptions.DEFAULT);
+									found = true;
+								} catch (IOException e) {
+									log.error("save failure");
+								}
+							} else {
+								log.error("The getResponse was not exists" );
+							}
+
+						} else {
+							log.error("The document for this id was not found" );
+						}
+
+					} catch (ElasticsearchException e) {
+						if (e.status() == RestStatus.NOT_FOUND) {
+							log.error("The document for this id was not found" );
+						}
+						if (e.status() == RestStatus.CONFLICT) {
+							log.error("Version conflict" );
+						}
+						log.error("Get document exception", e);
+					}catch (IOException e) {
+						log.error(topic.getName() , e);
+					}
+
+				} else {
+					log.info("The data is normal");
+				}
+
+			} else {
+				log.debug("event id null");
+			}
+
+		} catch (Exception e) {
+			log.error("error",e);
 		}
-		
-		return found; 
+
+		return found;
 	}
 
 }
diff --git a/components/datalake-handler/feeder/src/main/resources/application.properties b/components/datalake-handler/feeder/src/main/resources/application.properties
index 1bd5339..dfe48a2 100644
--- a/components/datalake-handler/feeder/src/main/resources/application.properties
+++ b/components/datalake-handler/feeder/src/main/resources/application.properties
@@ -44,6 +44,7 @@
 
 defaultTopicName=_DL_DEFAULT_
 
+elasticsearchType=doc
 	
 #Logging
 logging.level.org.springframework.web=ERROR