blob: 0a2caf2f1ee670c51faf483954d65cb4d74f543f [file] [log] [blame]
ac25505082fd72018-03-20 12:35:48 +01001input {
2 http_poller {
3 urls => {
4 event_queue => {
5 method => get
6 url => "${dmaap_base_url}/events/${event_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000"
7 headers => {
8 Accept => "application/json"
9 }
10 add_field => { "topic" => "${event_topic}" }
11 }
12 notification_queue => {
13 method => get
14 url => "${dmaap_base_url}/events/${notification_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000"
15 headers => {
16 Accept => "application/json"
17 }
18 add_field => { "topic" => "${notification_topic}" }
19 }
20 request_queue => {
21 method => get
22 url => "${dmaap_base_url}/events/${request_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000"
23 headers => {
24 Accept => "application/json"
25 }
26 add_field => { "topic" => "${request_topic}" }
27 }
28 }
29 socket_timeout => 30
30 request_timeout => 30
ac2550853cd662018-03-28 14:43:56 +020031 interval => 60
ac25505082fd72018-03-20 12:35:48 +010032 codec => "plain"
33 }
34}
35
36filter {
37 # avoid noise if no entry in the list
38 if [message] == "[]" {
39 drop { }
40 }
41
42 # parse json, split the list into multiple events, and parse each event
43 json {
44 source => "[message]"
45 target => "message"
46 }
47 split {
48 field => "message"
49 }
50 json {
51 source => "message"
52 }
53 mutate { remove_field => [ "message" ] }
54 # express timestamps in milliseconds instead of microseconds
ac25502dc64172018-05-08 11:10:51 +020055 if [closedLoopAlarmStart] {
56 ruby {
57 code => "event.set('closedLoopAlarmStart', Integer(event.get('closedLoopAlarmStart')) / 1000)"
58 }
59 date {
60 match => [ "closedLoopAlarmStart", UNIX_MS ]
61 target => "closedLoopAlarmStart"
62 }
ac25505082fd72018-03-20 12:35:48 +010063 }
64
65 if [closedLoopAlarmEnd] {
66 ruby {
ac25502dc64172018-05-08 11:10:51 +020067 code => "event.set('closedLoopAlarmEnd', Integer(event.get('closedLoopAlarmEnd')) / 1000)"
ac25505082fd72018-03-20 12:35:48 +010068 }
69 date {
70 match => [ "closedLoopAlarmEnd", UNIX_MS ]
71 target => "closedLoopAlarmEnd"
72 }
73
74 }
75 #"yyyy-MM-dd HH:mm:ss"
76 if [notificationTime] {
77 mutate {
78 gsub => [
79 "notificationTime", " ", "T"
80 ]
81 }
82 date {
83 match => [ "notificationTime", ISO8601 ]
84 target => "notificationTime"
85 }
86 }
87}
88output {
89 stdout {
90 codec => rubydebug
91 }
92
ac2550ba939622018-03-28 14:41:44 +020093 if [http_request_failure] {
94 elasticsearch {
95 codec => "json"
96 hosts => [elasticsearch]
97 index => "errors-%{+YYYY.MM.DD}"
98 doc_as_upsert => true
99 }
100 } else {
101 elasticsearch {
102 codec => "json"
103 hosts => [elasticsearch]
104 index => "logstash-%{+YYYY.MM.DD}" # creates daily indexes
105 doc_as_upsert => true
ac25505082fd72018-03-20 12:35:48 +0100106
ac2550ba939622018-03-28 14:41:44 +0200107 }
ac25505082fd72018-03-20 12:35:48 +0100108 }
109
110}