Merge "logstash input"
diff --git a/extra/docker/elk/docker-compose.yml b/extra/docker/elk/docker-compose.yml
index 3b7284d..388c318 100644
--- a/extra/docker/elk/docker-compose.yml
+++ b/extra/docker/elk/docker-compose.yml
@@ -14,6 +14,7 @@
     image: logstash
     volumes:
       - ./logstash-conf:/config-dir
+      - ./logstash-input:/log-input
     command: logstash -f /config-dir/logstash.conf
     depends_on:
       - elasticsearch
diff --git a/extra/docker/elk/logstash-conf/logstash.conf b/extra/docker/elk/logstash-conf/logstash.conf
index 0a2caf2..c511995 100644
--- a/extra/docker/elk/logstash-conf/logstash.conf
+++ b/extra/docker/elk/logstash-conf/logstash.conf
@@ -8,6 +8,7 @@
                     Accept => "application/json"
                 }
                 add_field => { "topic" => "${event_topic}" }
+                type => "dmaap_event"
             }
             notification_queue => {
                 method => get
@@ -16,6 +17,7 @@
                     Accept => "application/json"
                 }
                 add_field => { "topic" => "${notification_topic}" }
+                type => "dmaap_notification"
             }
             request_queue => {
                 method => get
@@ -24,6 +26,7 @@
                     Accept => "application/json"
                 }
                 add_field => { "topic" => "${request_topic}" }
+                type => "dmaap_request"
             }
         }
         socket_timeout => 30
@@ -33,28 +36,57 @@
   }
 }
 
+input {
+  file {
+    path => [
+      "/log-input/dmaap_evt.log"
+    ]
+    type => "dmaap_log"
+    codec => "json"
+  }
+}
+
 filter {
-    # avoid noise if no entry in the list
-    if [message] == "[]" {
-       drop { }
-    }
 
     # parse json, split  the list into multiple events, and parse each event
-    json {
-         source => "[message]"
-         target => "message"
+    if [type] != "dmaap_log" {
+	    # avoid noise if no entry in the list
+	    if [message] == "[]" {
+	       drop { }
+	    }
+	    
+	    json {
+	         source => "[message]"
+	         target => "message"
+	    }
+#	    ruby {
+#	    	code => "event.get('message').each{|m| m.set('type',event.get('type')}"
+#	    }
+	    split {
+	          field => "message"
+	          add_field => {
+      			"type" => "%{type}"
+      			"topic" => "%{topic}"
+    		  }
+	    }
+	    
+	    json {
+	         source => "message"
+	    }
+	    
+	    mutate { remove_field => [ "message" ] }
     }
-    split {
-          field => "message"
-    }
-    json {
-         source => "message"
-    }
-    mutate { remove_field => [ "message" ] }
+    
     # express timestamps in milliseconds instead of microseconds
     if [closedLoopAlarmStart] {
         ruby {
-            code => "event.set('closedLoopAlarmStart', Integer(event.get('closedLoopAlarmStart')) / 1000)"
+            code => "
+                     if event.get('closedLoopAlarmStart').to_s.to_i(10) > 9999999999999
+                       event.set('closedLoopAlarmStart', event.get('closedLoopAlarmStart').to_s.to_i(10) / 1000)
+                     else
+                       event.set('closedLoopAlarmStart', event.get('closedLoopAlarmStart').to_s.to_i(10))
+                     end
+                    "
         }
         date {
             match => [ "closedLoopAlarmStart", UNIX_MS ]
@@ -64,7 +96,13 @@
 
     if [closedLoopAlarmEnd] {
         ruby {
-            code => "event.set('closedLoopAlarmEnd', Integer(event.get('closedLoopAlarmEnd')) / 1000)"
+            code => "
+                    if event.get('closedLoopAlarmEnd').to_s.to_i(10) > 9999999999999  
+                      event.set('closedLoopAlarmEnd', event.get('closedLoopAlarmEnd').to_s.to_i(10) / 1000)
+                    else
+                      event.set('closedLoopAlarmEnd', event.get('closedLoopAlarmEnd').to_s.to_i(10))
+                    end
+                    "
         }
         date {
             match => [ "closedLoopAlarmEnd", UNIX_MS ]