blob: f88e40da14469a4b84d941c38592b8a42fa4009a [file] [log] [blame]
Mukul379e2522018-09-05 12:26:02 +00001# Copyright © 2018 AT&T, Amdocs, Bell Canada Intellectual Property. All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
ac25508ac97172018-04-18 14:23:17 +020014input {
15 http_poller {
16 urls => {
17 event_queue => {
18 method => get
19 url => "${dmaap_base_url}/events/${event_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000"
20 headers => {
21 Accept => "application/json"
22 }
23 add_field => { "topic" => "${event_topic}" }
osgn422w7bc14fa2018-09-06 15:33:50 +020024 type => "dmaap_event"
ac25508ac97172018-04-18 14:23:17 +020025 }
26 notification_queue => {
27 method => get
28 url => "${dmaap_base_url}/events/${notification_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000"
29 headers => {
30 Accept => "application/json"
31 }
32 add_field => { "topic" => "${notification_topic}" }
osgn422w7bc14fa2018-09-06 15:33:50 +020033 type => "dmaap_notification"
ac25508ac97172018-04-18 14:23:17 +020034 }
35 request_queue => {
36 method => get
37 url => "${dmaap_base_url}/events/${request_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000"
38 headers => {
39 Accept => "application/json"
40 }
41 add_field => { "topic" => "${request_topic}" }
osgn422w7bc14fa2018-09-06 15:33:50 +020042 type => "dmaap_request"
ac25508ac97172018-04-18 14:23:17 +020043 }
44 }
45 socket_timeout => 30
46 request_timeout => 30
47 interval => 60
48 codec => "plain"
49 }
50}
51
52filter {
osgn422w7bc14fa2018-09-06 15:33:50 +020053 if [type] != "dmaap_log" {
54 # avoid noise if no entry in the list
55 if [message] == "[]" {
56 drop { }
57 }
ac25508ac97172018-04-18 14:23:17 +020058
osgn422w7bc14fa2018-09-06 15:33:50 +020059 # parse json, split the list into multiple events, and parse each event
60 json {
61 source => "[message]"
62 target => "message"
63 }
64 split {
65 field => "message"
66 add_field => {
67 "type" => "%{type}"
68 "topic" => "%{topic}"
69 }
70 }
71 json {
72 source => "message"
73 }
74 mutate { remove_field => [ "message" ] }
ac25508ac97172018-04-18 14:23:17 +020075 }
osgn422w7bc14fa2018-09-06 15:33:50 +020076
ac25508ac97172018-04-18 14:23:17 +020077 # express timestamps in milliseconds instead of microseconds
78 ruby {
osgn422w7bc14fa2018-09-06 15:33:50 +020079 code => "
80 if event.get('closedLoopAlarmStart').to_s.to_i(10) > 9999999999999
81 event.set('closedLoopAlarmStart', event.get('closedLoopAlarmStart').to_s.to_i(10) / 1000)
82 else
83 event.set('closedLoopAlarmStart', event.get('closedLoopAlarmStart').to_s.to_i(10))
84 end
85 "
ac25508ac97172018-04-18 14:23:17 +020086 }
87 date {
88 match => [ "closedLoopAlarmStart", UNIX_MS ]
89 target => "closedLoopAlarmStart"
90 }
91
92 if [closedLoopAlarmEnd] {
93 ruby {
osgn422w7bc14fa2018-09-06 15:33:50 +020094 code => "
95 if event.get('closedLoopAlarmEnd').to_s.to_i(10) > 9999999999999
96 event.set('closedLoopAlarmEnd', event.get('closedLoopAlarmEnd').to_s.to_i(10) / 1000)
97 else
98 event.set('closedLoopAlarmEnd', event.get('closedLoopAlarmEnd').to_s.to_i(10))
99 end
100 "
ac25508ac97172018-04-18 14:23:17 +0200101 }
102 date {
103 match => [ "closedLoopAlarmEnd", UNIX_MS ]
104 target => "closedLoopAlarmEnd"
105 }
106
107 }
108 #"yyyy-MM-dd HH:mm:ss"
109 if [notificationTime] {
110 mutate {
111 gsub => [
112 "notificationTime", " ", "T"
113 ]
114 }
115 date {
116 match => [ "notificationTime", ISO8601 ]
117 target => "notificationTime"
118 }
119 }
120}
121output {
122 stdout {
123 codec => rubydebug
124 }
125
126 if [http_request_failure] {
127 elasticsearch {
128 codec => "json"
129 hosts => ["${elasticsearch_base_url}"]
130 index => "errors-%{+YYYY.MM.DD}"
131 doc_as_upsert => true
132 }
133 } else {
134 elasticsearch {
135 codec => "json"
136 hosts => ["${elasticsearch_base_url}"]
137 index => "logstash-%{+YYYY.MM.DD}" # creates daily indexes
138 doc_as_upsert => true
139
140 }
141 }
142
143}