blob: c511995f0b8c76442956ec7ebfb189a5fbda8be5 [file] [log] [blame]
input {
http_poller {
urls => {
event_queue => {
method => get
url => "${dmaap_base_url}/events/${event_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000"
headers => {
Accept => "application/json"
}
add_field => { "topic" => "${event_topic}" }
type => "dmaap_event"
}
notification_queue => {
method => get
url => "${dmaap_base_url}/events/${notification_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000"
headers => {
Accept => "application/json"
}
add_field => { "topic" => "${notification_topic}" }
type => "dmaap_notification"
}
request_queue => {
method => get
url => "${dmaap_base_url}/events/${request_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000"
headers => {
Accept => "application/json"
}
add_field => { "topic" => "${request_topic}" }
type => "dmaap_request"
}
}
socket_timeout => 30
request_timeout => 30
interval => 60
codec => "plain"
}
}
input {
file {
path => [
"/log-input/dmaap_evt.log"
]
type => "dmaap_log"
codec => "json"
}
}
filter {
# parse json, split the list into multiple events, and parse each event
if [type] != "dmaap_log" {
# avoid noise if no entry in the list
if [message] == "[]" {
drop { }
}
json {
source => "[message]"
target => "message"
}
# ruby {
# code => "event.get('message').each{|m| m.set('type',event.get('type')}"
# }
split {
field => "message"
add_field => {
"type" => "%{type}"
"topic" => "%{topic}"
}
}
json {
source => "message"
}
mutate { remove_field => [ "message" ] }
}
# express timestamps in milliseconds instead of microseconds
if [closedLoopAlarmStart] {
ruby {
code => "
if event.get('closedLoopAlarmStart').to_s.to_i(10) > 9999999999999
event.set('closedLoopAlarmStart', event.get('closedLoopAlarmStart').to_s.to_i(10) / 1000)
else
event.set('closedLoopAlarmStart', event.get('closedLoopAlarmStart').to_s.to_i(10))
end
"
}
date {
match => [ "closedLoopAlarmStart", UNIX_MS ]
target => "closedLoopAlarmStart"
}
}
if [closedLoopAlarmEnd] {
ruby {
code => "
if event.get('closedLoopAlarmEnd').to_s.to_i(10) > 9999999999999
event.set('closedLoopAlarmEnd', event.get('closedLoopAlarmEnd').to_s.to_i(10) / 1000)
else
event.set('closedLoopAlarmEnd', event.get('closedLoopAlarmEnd').to_s.to_i(10))
end
"
}
date {
match => [ "closedLoopAlarmEnd", UNIX_MS ]
target => "closedLoopAlarmEnd"
}
}
#"yyyy-MM-dd HH:mm:ss"
if [notificationTime] {
mutate {
gsub => [
"notificationTime", " ", "T"
]
}
date {
match => [ "notificationTime", ISO8601 ]
target => "notificationTime"
}
}
}
output {
stdout {
codec => rubydebug
}
if [http_request_failure] {
elasticsearch {
codec => "json"
hosts => [elasticsearch]
index => "errors-%{+YYYY.MM.DD}"
doc_as_upsert => true
}
} else {
elasticsearch {
codec => "json"
hosts => [elasticsearch]
index => "logstash-%{+YYYY.MM.DD}" # creates daily indexes
doc_as_upsert => true
}
}
}