blob: aa087e344549f102d10c0b5931bb6fc83ce0d428 [file] [log] [blame]
ac25508ac97172018-04-18 14:23:17 +02001input {
2 http_poller {
3 urls => {
4 event_queue => {
5 method => get
6 url => "${dmaap_base_url}/events/${event_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000"
7 headers => {
8 Accept => "application/json"
9 }
10 add_field => { "topic" => "${event_topic}" }
11 }
12 notification_queue => {
13 method => get
14 url => "${dmaap_base_url}/events/${notification_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000"
15 headers => {
16 Accept => "application/json"
17 }
18 add_field => { "topic" => "${notification_topic}" }
19 }
20 request_queue => {
21 method => get
22 url => "${dmaap_base_url}/events/${request_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000"
23 headers => {
24 Accept => "application/json"
25 }
26 add_field => { "topic" => "${request_topic}" }
27 }
28 }
29 socket_timeout => 30
30 request_timeout => 30
31 interval => 60
32 codec => "plain"
33 }
34}
35
36filter {
37 # avoid noise if no entry in the list
38 if [message] == "[]" {
39 drop { }
40 }
41
42 # parse json, split the list into multiple events, and parse each event
43 json {
44 source => "[message]"
45 target => "message"
46 }
47 split {
48 field => "message"
49 }
50 json {
51 source => "message"
52 }
53 mutate { remove_field => [ "message" ] }
54 # express timestamps in milliseconds instead of microseconds
55 ruby {
56 code => "event.set('closedLoopAlarmStart', Integer(event.get('closedLoopAlarmStart')))"
57 }
58 date {
59 match => [ "closedLoopAlarmStart", UNIX_MS ]
60 target => "closedLoopAlarmStart"
61 }
62
63 if [closedLoopAlarmEnd] {
64 ruby {
65 code => "event.set('closedLoopAlarmEnd', Integer(event.get('closedLoopAlarmEnd')))"
66 }
67 date {
68 match => [ "closedLoopAlarmEnd", UNIX_MS ]
69 target => "closedLoopAlarmEnd"
70 }
71
72 }
73 #"yyyy-MM-dd HH:mm:ss"
74 if [notificationTime] {
75 mutate {
76 gsub => [
77 "notificationTime", " ", "T"
78 ]
79 }
80 date {
81 match => [ "notificationTime", ISO8601 ]
82 target => "notificationTime"
83 }
84 }
85}
86output {
87 stdout {
88 codec => rubydebug
89 }
90
91 if [http_request_failure] {
92 elasticsearch {
93 codec => "json"
94 hosts => ["${elasticsearch_base_url}"]
95 index => "errors-%{+YYYY.MM.DD}"
96 doc_as_upsert => true
97 }
98 } else {
99 elasticsearch {
100 codec => "json"
101 hosts => ["${elasticsearch_base_url}"]
102 index => "logstash-%{+YYYY.MM.DD}" # creates daily indexes
103 doc_as_upsert => true
104
105 }
106 }
107
108}