blob: 2b5a24e0436e75187220d7dda2d716fec193284d [file] [log] [blame]
ac25505082fd72018-03-20 12:35:48 +01001input {
2 http_poller {
3 urls => {
4 event_queue => {
5 method => get
6 url => "${dmaap_base_url}/events/${event_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000"
7 headers => {
8 Accept => "application/json"
9 }
10 add_field => { "topic" => "${event_topic}" }
11 }
12 notification_queue => {
13 method => get
14 url => "${dmaap_base_url}/events/${notification_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000"
15 headers => {
16 Accept => "application/json"
17 }
18 add_field => { "topic" => "${notification_topic}" }
19 }
20 request_queue => {
21 method => get
22 url => "${dmaap_base_url}/events/${request_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000"
23 headers => {
24 Accept => "application/json"
25 }
26 add_field => { "topic" => "${request_topic}" }
27 }
28 }
29 socket_timeout => 30
30 request_timeout => 30
31 interval => 15
32 codec => "plain"
33 }
34}
35
36filter {
37 # avoid noise if no entry in the list
38 if [message] == "[]" {
39 drop { }
40 }
41
42 # parse json, split the list into multiple events, and parse each event
43 json {
44 source => "[message]"
45 target => "message"
46 }
47 split {
48 field => "message"
49 }
50 json {
51 source => "message"
52 }
53 mutate { remove_field => [ "message" ] }
54 # express timestamps in milliseconds instead of microseconds
55 ruby {
56 code => "event.set('closedLoopAlarmStart', Integer(event.get('closedLoopAlarmStart')))"
57 }
58 date {
59 match => [ "closedLoopAlarmStart", UNIX_MS ]
60 target => "closedLoopAlarmStart"
61 }
62
63 if [closedLoopAlarmEnd] {
64 ruby {
65 code => "event.set('closedLoopAlarmEnd', Integer(event.get('closedLoopAlarmEnd')))"
66 }
67 date {
68 match => [ "closedLoopAlarmEnd", UNIX_MS ]
69 target => "closedLoopAlarmEnd"
70 }
71
72 }
73 #"yyyy-MM-dd HH:mm:ss"
74 if [notificationTime] {
75 mutate {
76 gsub => [
77 "notificationTime", " ", "T"
78 ]
79 }
80 date {
81 match => [ "notificationTime", ISO8601 ]
82 target => "notificationTime"
83 }
84 }
85}
86output {
87 stdout {
88 codec => rubydebug
89 }
90
91 elasticsearch {
92 codec => "json"
93 hosts => [elasticsearch]
94 index => "logstash-%{+YYYY.MM.DD}" # creates daily indexes
95 doc_as_upsert => true
96
97 }
98
99}