Merge "Introduce new changes to UI"
diff --git a/extra/docker/elk/docker-compose.yml b/extra/docker/elk/docker-compose.yml
index 3b7284d..3b5571e 100644
--- a/extra/docker/elk/docker-compose.yml
+++ b/extra/docker/elk/docker-compose.yml
@@ -2,18 +2,23 @@
 
 services:
   elasticsearch:
-    image: elasticsearch
+    image: docker.elastic.co/elasticsearch/elasticsearch:6.1.3
     ports:
       - 9200:9200
     networks:
       es_net:
         aliases:
           - elasticsearch
+    environment:
+      - cluster.name=docker-cluster
+      - bootstrap.memory_lock=false
+      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
 
   logstash:
-    image: logstash
+    image: docker.elastic.co/logstash/logstash:6.1.3
     volumes:
       - ./logstash-conf:/config-dir
+      - ./logstash-input:/log-input
     command: logstash -f /config-dir/logstash.conf
     depends_on:
       - elasticsearch
@@ -28,9 +33,10 @@
       - event_topic=EVENT_TOPIC
       - notification_topic=NOTIFICATION_TOPIC
       - request_topic=REQUEST_TOPIC
+      - elasticsearch_hosts=elasticsearch
 
   kibana:
-    image: kibana
+    image: docker.elastic.co/kibana/kibana:6.1.3
     ports:
       - 5601:5601
     depends_on:
diff --git a/extra/docker/elk/logstash-conf/logstash.conf b/extra/docker/elk/logstash-conf/logstash.conf
index 0a2caf2..2b71686 100644
--- a/extra/docker/elk/logstash-conf/logstash.conf
+++ b/extra/docker/elk/logstash-conf/logstash.conf
@@ -8,6 +8,7 @@
                     Accept => "application/json"
                 }
                 add_field => { "topic" => "${event_topic}" }
+                type => "dmaap_event"
             }
             notification_queue => {
                 method => get
@@ -16,6 +17,7 @@
                     Accept => "application/json"
                 }
                 add_field => { "topic" => "${notification_topic}" }
+                type => "dmaap_notification"
             }
             request_queue => {
                 method => get
@@ -24,37 +26,67 @@
                     Accept => "application/json"
                 }
                 add_field => { "topic" => "${request_topic}" }
+                type => "dmaap_request"
             }
         }
         socket_timeout => 30
         request_timeout => 30
-        interval => 60
         codec => "plain"
+        schedule => { "every" => "1m"  }
+  }
+}
+
+input {
+  file {
+    path => [
+      "/log-input/dmaap_evt.log"
+    ]
+    type => "dmaap_log"
+    codec => "json"
   }
 }
 
 filter {
-    # avoid noise if no entry in the list
-    if [message] == "[]" {
-       drop { }
-    }
 
     # parse json, split  the list into multiple events, and parse each event
-    json {
-         source => "[message]"
-         target => "message"
+    if [type] != "dmaap_log" {
+	    # avoid noise if no entry in the list
+	    if [message] == "[]" {
+	       drop { }
+	    }
+	    
+	    json {
+	         source => "[message]"
+	         target => "message"
+	    }
+#	    ruby {
+#	    	code => "event.get('message').each{|m| m.set('type',event.get('type')}"
+#	    }
+	    split {
+	          field => "message"
+	          add_field => {
+      			"type" => "%{type}"
+      			"topic" => "%{topic}"
+    		  }
+	    }
+	    
+	    json {
+	         source => "message"
+	    }
+	    
+	    mutate { remove_field => [ "message" ] }
     }
-    split {
-          field => "message"
-    }
-    json {
-         source => "message"
-    }
-    mutate { remove_field => [ "message" ] }
+    
     # express timestamps in milliseconds instead of microseconds
     if [closedLoopAlarmStart] {
         ruby {
-            code => "event.set('closedLoopAlarmStart', Integer(event.get('closedLoopAlarmStart')) / 1000)"
+            code => "
+                     if event.get('closedLoopAlarmStart').to_s.to_i(10) > 9999999999999
+                       event.set('closedLoopAlarmStart', event.get('closedLoopAlarmStart').to_s.to_i(10) / 1000)
+                     else
+                       event.set('closedLoopAlarmStart', event.get('closedLoopAlarmStart').to_s.to_i(10))
+                     end
+                    "
         }
         date {
             match => [ "closedLoopAlarmStart", UNIX_MS ]
@@ -64,7 +96,13 @@
 
     if [closedLoopAlarmEnd] {
         ruby {
-            code => "event.set('closedLoopAlarmEnd', Integer(event.get('closedLoopAlarmEnd')) / 1000)"
+            code => "
+                    if event.get('closedLoopAlarmEnd').to_s.to_i(10) > 9999999999999  
+                      event.set('closedLoopAlarmEnd', event.get('closedLoopAlarmEnd').to_s.to_i(10) / 1000)
+                    else
+                      event.set('closedLoopAlarmEnd', event.get('closedLoopAlarmEnd').to_s.to_i(10))
+                    end
+                    "
         }
         date {
             match => [ "closedLoopAlarmEnd", UNIX_MS ]
@@ -93,15 +131,15 @@
     if [http_request_failure] {
         elasticsearch {
             codec => "json"
-            hosts => [elasticsearch]
+            hosts => ["${elasticsearch_hosts}"]
             index => "errors-%{+YYYY.MM.DD}"
             doc_as_upsert => true
         }
     } else {
         elasticsearch {
             codec => "json"
-            hosts => [elasticsearch]
-            index => "logstash-%{+YYYY.MM.DD}" # creates daily indexes
+            hosts => ["${elasticsearch_hosts}"]
+            index => "events-%{+YYYY.MM.DD}" # creates daily indexes
             doc_as_upsert => true
 
         }