improve logstash parsing

improve parsing of timestamp epoch and micro

Issue-ID: CLAMP-218
Change-Id: If87ba818caaba783ef667e149c4c0824daa7dc2c
Signed-off-by: osgn422w <gn422w@intl.att.com>
diff --git a/kubernetes/clamp/charts/clamp-dash-logstash/resources/config/pipeline.conf b/kubernetes/clamp/charts/clamp-dash-logstash/resources/config/pipeline.conf
index 5d92de6..f88e40d 100644
--- a/kubernetes/clamp/charts/clamp-dash-logstash/resources/config/pipeline.conf
+++ b/kubernetes/clamp/charts/clamp-dash-logstash/resources/config/pipeline.conf
@@ -21,6 +21,7 @@
                     Accept => "application/json"
                 }
                 add_field => { "topic" => "${event_topic}" }
+                type => "dmaap_event"
             }
             notification_queue => {
                 method => get
@@ -29,6 +30,7 @@
                     Accept => "application/json"
                 }
                 add_field => { "topic" => "${notification_topic}" }
+                type => "dmaap_notification"
             }
             request_queue => {
                 method => get
@@ -37,6 +39,7 @@
                     Accept => "application/json"
                 }
                 add_field => { "topic" => "${request_topic}" }
+                type => "dmaap_request"
             }
         }
         socket_timeout => 30
@@ -47,26 +50,39 @@
 }
 
 filter {
-    # avoid noise if no entry in the list
-    if [message] == "[]" {
-       drop { }
-    }
+	if [type] != "dmaap_log" {
+        # avoid noise if no entry in the list
+        if [message] == "[]" {
+           drop { }
+        }
 
-    # parse json, split  the list into multiple events, and parse each event
-    json {
-         source => "[message]"
-         target => "message"
+        # parse json, split  the list into multiple events, and parse each event
+        json {
+             source => "[message]"
+             target => "message"
+        }
+        split {
+              field => "message"
+	          add_field => {
+      			"type" => "%{type}"
+      			"topic" => "%{topic}"
+    		  }
+        }
+        json {
+             source => "message"
+        }
+        mutate { remove_field => [ "message" ] }
     }
-    split {
-          field => "message"
-    }
-    json {
-         source => "message"
-    }
-    mutate { remove_field => [ "message" ] }
+    
     # express timestamps in milliseconds instead of microseconds
     ruby {
-        code => "event.set('closedLoopAlarmStart', Integer(event.get('closedLoopAlarmStart')))"
+            code => "
+                     if event.get('closedLoopAlarmStart').to_s.to_i(10) > 9999999999999
+                       event.set('closedLoopAlarmStart', event.get('closedLoopAlarmStart').to_s.to_i(10) / 1000)
+                     else
+                       event.set('closedLoopAlarmStart', event.get('closedLoopAlarmStart').to_s.to_i(10))
+                     end
+                    "
     }
     date {
         match => [ "closedLoopAlarmStart", UNIX_MS ]
@@ -75,7 +91,13 @@
 
     if [closedLoopAlarmEnd] {
         ruby {
-            code => "event.set('closedLoopAlarmEnd', Integer(event.get('closedLoopAlarmEnd')))"
+            code => "
+                    if event.get('closedLoopAlarmEnd').to_s.to_i(10) > 9999999999999  
+                      event.set('closedLoopAlarmEnd', event.get('closedLoopAlarmEnd').to_s.to_i(10) / 1000)
+                    else
+                      event.set('closedLoopAlarmEnd', event.get('closedLoopAlarmEnd').to_s.to_i(10))
+                    end
+                    "
         }
         date {
             match => [ "closedLoopAlarmEnd", UNIX_MS ]