blob: 0a2caf2f1ee670c51faf483954d65cb4d74f543f [file] [log] [blame]
input {
http_poller {
urls => {
event_queue => {
method => get
url => "${dmaap_base_url}/events/${event_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000"
headers => {
Accept => "application/json"
add_field => { "topic" => "${event_topic}" }
notification_queue => {
method => get
url => "${dmaap_base_url}/events/${notification_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000"
headers => {
Accept => "application/json"
add_field => { "topic" => "${notification_topic}" }
request_queue => {
method => get
url => "${dmaap_base_url}/events/${request_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000"
headers => {
Accept => "application/json"
add_field => { "topic" => "${request_topic}" }
socket_timeout => 30
request_timeout => 30
interval => 60
codec => "plain"
filter {
# avoid noise if no entry in the list
if [message] == "[]" {
drop { }
# parse json, split the list into multiple events, and parse each event
json {
source => "[message]"
target => "message"
split {
field => "message"
json {
source => "message"
mutate { remove_field => [ "message" ] }
# express timestamps in milliseconds instead of microseconds
if [closedLoopAlarmStart] {
ruby {
code => "event.set('closedLoopAlarmStart', Integer(event.get('closedLoopAlarmStart')) / 1000)"
date {
match => [ "closedLoopAlarmStart", UNIX_MS ]
target => "closedLoopAlarmStart"
if [closedLoopAlarmEnd] {
ruby {
code => "event.set('closedLoopAlarmEnd', Integer(event.get('closedLoopAlarmEnd')) / 1000)"
date {
match => [ "closedLoopAlarmEnd", UNIX_MS ]
target => "closedLoopAlarmEnd"
#"yyyy-MM-dd HH:mm:ss"
if [notificationTime] {
mutate {
gsub => [
"notificationTime", " ", "T"
date {
match => [ "notificationTime", ISO8601 ]
target => "notificationTime"
output {
stdout {
codec => rubydebug
if [http_request_failure] {
elasticsearch {
codec => "json"
hosts => [elasticsearch]
index => "errors-%{+YYYY.MM.DD}"
doc_as_upsert => true
} else {
elasticsearch {
codec => "json"
hosts => [elasticsearch]
index => "logstash-%{+YYYY.MM.DD}" # creates daily indexes
doc_as_upsert => true