blob: 2b71686faa3aa311757493de78dfcd93540c3e95 [file] [log] [blame]
ac25505082fd72018-03-20 12:35:48 +01001input {
2 http_poller {
3 urls => {
4 event_queue => {
5 method => get
6 url => "${dmaap_base_url}/events/${event_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000"
7 headers => {
8 Accept => "application/json"
9 }
10 add_field => { "topic" => "${event_topic}" }
osgn422w67ca7eb2018-09-04 17:00:19 +020011 type => "dmaap_event"
ac25505082fd72018-03-20 12:35:48 +010012 }
13 notification_queue => {
14 method => get
15 url => "${dmaap_base_url}/events/${notification_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000"
16 headers => {
17 Accept => "application/json"
18 }
19 add_field => { "topic" => "${notification_topic}" }
osgn422w67ca7eb2018-09-04 17:00:19 +020020 type => "dmaap_notification"
ac25505082fd72018-03-20 12:35:48 +010021 }
22 request_queue => {
23 method => get
24 url => "${dmaap_base_url}/events/${request_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000"
25 headers => {
26 Accept => "application/json"
27 }
28 add_field => { "topic" => "${request_topic}" }
osgn422w67ca7eb2018-09-04 17:00:19 +020029 type => "dmaap_request"
ac25505082fd72018-03-20 12:35:48 +010030 }
31 }
32 socket_timeout => 30
33 request_timeout => 30
ac25505082fd72018-03-20 12:35:48 +010034 codec => "plain"
ac255007401e82018-09-10 15:32:48 +020035 schedule => { "every" => "1m" }
ac25505082fd72018-03-20 12:35:48 +010036 }
37}
38
osgn422w67ca7eb2018-09-04 17:00:19 +020039input {
40 file {
41 path => [
42 "/log-input/dmaap_evt.log"
43 ]
44 type => "dmaap_log"
45 codec => "json"
46 }
47}
48
ac25505082fd72018-03-20 12:35:48 +010049filter {
ac25505082fd72018-03-20 12:35:48 +010050
51 # parse json, split the list into multiple events, and parse each event
osgn422w67ca7eb2018-09-04 17:00:19 +020052 if [type] != "dmaap_log" {
53 # avoid noise if no entry in the list
54 if [message] == "[]" {
55 drop { }
56 }
57
58 json {
59 source => "[message]"
60 target => "message"
61 }
62# ruby {
63# code => "event.get('message').each{|m| m.set('type',event.get('type')}"
64# }
65 split {
66 field => "message"
67 add_field => {
68 "type" => "%{type}"
69 "topic" => "%{topic}"
70 }
71 }
72
73 json {
74 source => "message"
75 }
76
77 mutate { remove_field => [ "message" ] }
ac25505082fd72018-03-20 12:35:48 +010078 }
osgn422w67ca7eb2018-09-04 17:00:19 +020079
ac25505082fd72018-03-20 12:35:48 +010080 # express timestamps in milliseconds instead of microseconds
ac25502dc64172018-05-08 11:10:51 +020081 if [closedLoopAlarmStart] {
82 ruby {
osgn422w67ca7eb2018-09-04 17:00:19 +020083 code => "
84 if event.get('closedLoopAlarmStart').to_s.to_i(10) > 9999999999999
85 event.set('closedLoopAlarmStart', event.get('closedLoopAlarmStart').to_s.to_i(10) / 1000)
86 else
87 event.set('closedLoopAlarmStart', event.get('closedLoopAlarmStart').to_s.to_i(10))
88 end
89 "
ac25502dc64172018-05-08 11:10:51 +020090 }
91 date {
92 match => [ "closedLoopAlarmStart", UNIX_MS ]
93 target => "closedLoopAlarmStart"
94 }
ac25505082fd72018-03-20 12:35:48 +010095 }
96
97 if [closedLoopAlarmEnd] {
98 ruby {
osgn422w67ca7eb2018-09-04 17:00:19 +020099 code => "
100 if event.get('closedLoopAlarmEnd').to_s.to_i(10) > 9999999999999
101 event.set('closedLoopAlarmEnd', event.get('closedLoopAlarmEnd').to_s.to_i(10) / 1000)
102 else
103 event.set('closedLoopAlarmEnd', event.get('closedLoopAlarmEnd').to_s.to_i(10))
104 end
105 "
ac25505082fd72018-03-20 12:35:48 +0100106 }
107 date {
108 match => [ "closedLoopAlarmEnd", UNIX_MS ]
109 target => "closedLoopAlarmEnd"
110 }
111
112 }
113 #"yyyy-MM-dd HH:mm:ss"
114 if [notificationTime] {
115 mutate {
116 gsub => [
117 "notificationTime", " ", "T"
118 ]
119 }
120 date {
121 match => [ "notificationTime", ISO8601 ]
122 target => "notificationTime"
123 }
124 }
125}
126output {
127 stdout {
128 codec => rubydebug
129 }
130
ac2550ba939622018-03-28 14:41:44 +0200131 if [http_request_failure] {
132 elasticsearch {
133 codec => "json"
ac255007401e82018-09-10 15:32:48 +0200134 hosts => ["${elasticsearch_hosts}"]
ac2550ba939622018-03-28 14:41:44 +0200135 index => "errors-%{+YYYY.MM.DD}"
136 doc_as_upsert => true
137 }
138 } else {
139 elasticsearch {
140 codec => "json"
ac255007401e82018-09-10 15:32:48 +0200141 hosts => ["${elasticsearch_hosts}"]
142 index => "events-%{+YYYY.MM.DD}" # creates daily indexes
ac2550ba939622018-03-28 14:41:44 +0200143 doc_as_upsert => true
ac25505082fd72018-03-20 12:35:48 +0100144
ac2550ba939622018-03-28 14:41:44 +0200145 }
ac25505082fd72018-03-20 12:35:48 +0100146 }
147
148}