blob: 2b5a24e0436e75187220d7dda2d716fec193284d [file] [log] [blame]
input {
http_poller {
urls => {
event_queue => {
method => get
url => "${dmaap_base_url}/events/${event_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000"
headers => {
Accept => "application/json"
}
add_field => { "topic" => "${event_topic}" }
}
notification_queue => {
method => get
url => "${dmaap_base_url}/events/${notification_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000"
headers => {
Accept => "application/json"
}
add_field => { "topic" => "${notification_topic}" }
}
request_queue => {
method => get
url => "${dmaap_base_url}/events/${request_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000"
headers => {
Accept => "application/json"
}
add_field => { "topic" => "${request_topic}" }
}
}
socket_timeout => 30
request_timeout => 30
interval => 15
codec => "plain"
}
}
filter {
# avoid noise if no entry in the list
if [message] == "[]" {
drop { }
}
# parse json, split the list into multiple events, and parse each event
json {
source => "[message]"
target => "message"
}
split {
field => "message"
}
json {
source => "message"
}
mutate { remove_field => [ "message" ] }
# express timestamps in milliseconds instead of microseconds
ruby {
code => "event.set('closedLoopAlarmStart', Integer(event.get('closedLoopAlarmStart')))"
}
date {
match => [ "closedLoopAlarmStart", UNIX_MS ]
target => "closedLoopAlarmStart"
}
if [closedLoopAlarmEnd] {
ruby {
code => "event.set('closedLoopAlarmEnd', Integer(event.get('closedLoopAlarmEnd')))"
}
date {
match => [ "closedLoopAlarmEnd", UNIX_MS ]
target => "closedLoopAlarmEnd"
}
}
#"yyyy-MM-dd HH:mm:ss"
if [notificationTime] {
mutate {
gsub => [
"notificationTime", " ", "T"
]
}
date {
match => [ "notificationTime", ISO8601 ]
target => "notificationTime"
}
}
}
output {
stdout {
codec => rubydebug
}
elasticsearch {
codec => "json"
hosts => [elasticsearch]
index => "logstash-%{+YYYY.MM.DD}" # creates daily indexes
doc_as_upsert => true
}
}