Merge "Add connector between Kafka bus and InfluxdB to sync VES events"
diff --git a/collector/evel-test-collector/code/collector/monitor.py b/collector/evel-test-collector/code/collector/monitor.py
index 784139a..60a04f0 100755
--- a/collector/evel-test-collector/code/collector/monitor.py
+++ b/collector/evel-test-collector/code/collector/monitor.py
@@ -33,7 +33,6 @@
import json
import jsonschema
from functools import partial
-import requests
from datetime import timezone
from elasticsearch import Elasticsearch
from kafka import KafkaProducer
@@ -222,9 +221,6 @@
# if word 'elasticsearch' exists in config file then save data in elasticsearch
if('elasticsearch' in data_storageArr):
save_event_in_elasticsearch(body)
- # if word 'influxdb' exists in config file then save data in influxdb
- if('influxdb' in data_storageArr):
- save_event_in_db(body)
except jsonschema.SchemaError as e:
logger.error('Schema is not valid! {0}'.format(e))
@@ -255,18 +251,6 @@
# --------------------------------------------------------------------------
-# Send event to influxdb
-# --------------------------------------------------------------------------
-
-def send_to_influxdb(event, pdata):
- url = 'http://{}/write?db=veseventsdb'.format(influxdb)
- logger.debug('Send {} to influxdb at {}: {}'.format(event, influxdb, pdata))
- r = requests.post(url, data=pdata, headers={'Content-Type': 'text/plain'})
- logger.info('influxdb return code {}'.format(r.status_code))
- if r.status_code != 204:
- logger.debug('*** Influxdb save failed, return code {} ***'.format(r.status_code))
-
-# --------------------------------------------------------------------------
# Save event data in Kafka
# --------------------------------------------------------------------------
def save_event_in_kafka(body):
@@ -299,6 +283,7 @@
# Save event data in Elasticsearch
# --------------------------------------------------------------------------
+
def format_timestamp(EpochMicrosec):
if isinstance(EpochMicrosec, int):
return datetime.datetime.fromtimestamp(int(str(EpochMicrosec)[:10]), tz=timezone.utc)
@@ -401,252 +386,6 @@
es.index(index=domain, body=jobj['event'])
-def process_additional_measurements(val, domain, eventId, startEpochMicrosec, lastEpochMicrosec):
- for additionalMeasurements in val:
- pdata = domain + ",eventId={},system={}".format(eventId, source)
- nonstringpdata = " startEpochMicrosec={},lastEpochMicrosec={},".format(startEpochMicrosec, lastEpochMicrosec)
- for key, val in additionalMeasurements.items():
- if isinstance(val, str):
- pdata = pdata + ',{}={}'.format(key, process_special_char(val))
- elif isinstance(val, dict):
- for key2, val2 in val.items():
- if isinstance(val2, str):
- pdata = pdata + ',{}={}'.format(key2, process_special_char(val2))
- else:
- nonstringpdata = nonstringpdata + '{}={},'.format(key2, val2)
- else:
- nonstringpdata = nonstringpdata + '{}={},'.format(key, val)
- send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp))
-
-
-def process_nonadditional_measurements(val, domain, eventId, startEpochMicrosec, lastEpochMicrosec):
- for disk in val:
- pdata = domain + ",eventId={},system={}".format(eventId, source)
- nonstringpdata = " startEpochMicrosec={},lastEpochMicrosec={},".format(startEpochMicrosec, lastEpochMicrosec)
- for key, val in disk.items():
- if isinstance(val, str):
- pdata = pdata + ',{}={}'.format(key, process_special_char(val))
- else:
- nonstringpdata = nonstringpdata + '{}={},'.format(key, val)
-
- send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp))
-
-
-def process_pnfRegistration_event(domain, jobj, pdata, nonstringpdata):
- pdata = pdata + ",system={}".format(source)
- for key, val in jobj.items():
- if key != 'additionalFields' and val != "":
- if isinstance(val, str):
- pdata = pdata + ',{}={}'.format(key, process_special_char(val))
- else:
- nonstringpdata = nonstringpdata + '{}={},'.format(key, val)
- elif key == 'additionalFields':
- for key2, val2 in val.items():
- if val2 != "" and isinstance(val2, str):
- pdata = pdata + ',{}={}'.format(key2, process_special_char(val2))
- elif val2 != "":
- nonstringpdata = nonstringpdata + '{}={},'.format(key2, val2)
-
- send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp))
-
-
-def process_thresholdCrossingAlert_event(domain, jobj, pdata, nonstringpdata):
- pdata = pdata + ",system={}".format(source)
- for key, val in jobj.items():
- if (key != 'additionalFields' and key != 'additionalParameters' and key != 'associatedAlertIdList') and val != "":
- if isinstance(val, str):
- pdata = pdata + ',{}={}'.format(key, process_special_char(val))
- else:
- nonstringpdata = nonstringpdata + '{}={},'.format(key, val)
- elif key == 'additionalFields':
- for key2, val2 in val.items():
- if val2 != "" and isinstance(val2, str):
- pdata = pdata + ',{}={}'.format(key2, process_special_char(val2))
- elif val2 != "":
- nonstringpdata = nonstringpdata + '{}={},'.format(key2, val2)
- elif key == 'additionalParameters':
- for addParameter in val:
- for key2, val2 in addParameter.items():
- if key2 != "hashMap":
- if val2 != "" and isinstance(val2, str):
- pdata = pdata + ',{}={}'.format(key2, process_special_char(val2))
- elif val2 != "":
- nonstringpdata = nonstringpdata + '{}={},'.format(key2, val2)
- elif key2 == "hashMap":
- for key3, val3 in val2.items():
- if val3 != "" and isinstance(val3, str):
- pdata = pdata + ',{}={}'.format(key3, process_special_char(val3))
- elif val3 != "":
- nonstringpdata = nonstringpdata + '{}={},'.format(key3, val3)
- elif key == 'associatedAlertIdList':
- associatedAlertIdList = ""
- for associatedAlertId in val:
- associatedAlertIdList = associatedAlertIdList + associatedAlertId + "|"
- if(associatedAlertIdList != ""):
- pdata = pdata + ',{}={}'.format("associatedAlertIdList",
- process_special_char(associatedAlertIdList)[:-1])
-
- send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp))
-
-
-def process_fault_event(domain, jobj, pdata, nonstringpdata):
- pdata = pdata + ",system={}".format(source)
- for key, val in jobj.items():
- if key != 'alarmAdditionalInformation' and val != "":
- if isinstance(val, str):
- pdata = pdata + ',{}={}'.format(key, process_special_char(val))
- else:
- nonstringpdata = nonstringpdata + '{}={},'.format(key, val)
- elif key == 'alarmAdditionalInformation':
- for key2, val2 in val.items():
- if val2 != "" and isinstance(val2, str):
- pdata = pdata + ',{}={}'.format(key2, process_special_char(val2))
- elif val2 != "":
- nonstringpdata = nonstringpdata + '{}={},'.format(key2, val2)
-
- send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp))
-
-
-def process_heartbeat_events(domain, jobj, pdata, nonstringpdata):
- pdata = pdata + ",system={}".format(source)
- for key, val in jobj.items():
- if key != 'additionalFields' and val != "":
- if isinstance(val, str):
- pdata = pdata + ',{}={}'.format(key, process_special_char(val))
- else:
- nonstringpdata = nonstringpdata + '{}={},'.format(key, val)
- elif key == 'additionalFields':
- for key2, val2 in val.items():
- if val2 != "" and isinstance(val2, str):
- pdata = pdata + ',{}={}'.format(key2, process_special_char(val2))
- elif val2 != "":
- nonstringpdata = nonstringpdata + '{}={},'.format(key2, val2)
-
- send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp))
-
-
-def process_measurement_events(domain, jobj, pdata, nonstringpdata, eventId, startEpochMicrosec, lastEpochMicrosec):
- pdata = pdata + ",system={}".format(source)
- for key, val in jobj.items():
- if val != "":
- if isinstance(val, str):
- pdata = pdata + ',{}={}'.format(key, process_special_char(val))
- elif isinstance(val, list):
- if key == 'additionalMeasurements':
- process_additional_measurements(val, domain + "additionalmeasurements", eventId, startEpochMicrosec, lastEpochMicrosec)
- elif key == 'cpuUsageArray':
- process_nonadditional_measurements(val, domain + "cpuusage", eventId, startEpochMicrosec, lastEpochMicrosec)
- elif key == 'diskUsageArray':
- process_nonadditional_measurements(val, domain + "diskusage", eventId, startEpochMicrosec, lastEpochMicrosec)
- elif key == 'memoryUsageArray':
- process_nonadditional_measurements(val, domain + "memoryusage", eventId, startEpochMicrosec, lastEpochMicrosec)
- elif key == 'nicPerformanceArray':
- process_nonadditional_measurements(val, domain + "nicperformance", eventId, startEpochMicrosec, lastEpochMicrosec)
- elif key == 'loadArray':
- process_nonadditional_measurements(val, domain + "load", eventId, startEpochMicrosec, lastEpochMicrosec)
- elif key == 'networkSliceArray':
- process_nonadditional_measurements(val, domain + "networkslice", eventId, startEpochMicrosec, lastEpochMicrosec)
- elif isinstance(val, dict):
- for key2, val2 in val.items():
- if isinstance(val2, str):
- pdata = pdata + ',{}={}'.format(key2, process_special_char(val2))
- else:
- nonstringpdata = nonstringpdata + '{}={},'.format(key2, val2)
- else:
- nonstringpdata = nonstringpdata + '{}={},'.format(key, val)
-
- send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp))
-
-
-def process_special_char(str):
- for search_char, replace_char in {" ": "\ ", ",": "\,"}.items():
- str = str.replace(search_char, replace_char)
- return str
-
-
-def process_time(eventTimestamp):
- eventTimestamp = str(eventTimestamp).replace(".", "")
- while len(eventTimestamp) < 19:
- eventTimestamp = eventTimestamp + "0"
- return format(int(eventTimestamp))
-
-
-# --------------------------------------------------------------------------
-# Save event data
-# --------------------------------------------------------------------------
-def save_event_in_db(body):
- global source
- global eventTimestamp
-
- jobj = json.loads(body)
- source = "unknown"
- domain = jobj['event']['commonEventHeader']['domain']
- eventTimestamp = jobj['event']['commonEventHeader']['startEpochMicrosec']
- agent = jobj['event']['commonEventHeader']['reportingEntityName'].upper()
- if "LOCALHOST" in agent:
- agent = "computehost"
- source = jobj['event']['commonEventHeader']['sourceId'].upper()
-
- # processing common header part
- pdata = domain
- nonstringpdata = " "
- commonHeaderObj = jobj['event']['commonEventHeader'].items()
- for key, val in commonHeaderObj:
- if val != "":
- if (key != 'internalHeaderFields'):
- if isinstance(val, str):
- pdata = pdata + ',{}={}'.format(key, process_special_char(val))
- else:
- nonstringpdata = nonstringpdata + '{}={}'.format(key, val) + ','
- if (key == 'internalHeaderFields'):
- for key2, val2 in val.items():
- if val2 != "":
- if isinstance(val2, str):
- pdata = pdata + ',{}={}'.format(key2, process_special_char(val2))
- else:
- nonstringpdata = nonstringpdata + '{}={}'.format(key2, val2) + ','
-
- # processing pnfRegistration events
- if 'pnfRegistrationFields' in jobj['event']:
- logger.debug('Found pnfRegistrationFields')
- process_pnfRegistration_event(domain,
- jobj['event']['pnfRegistrationFields'],
- pdata,
- nonstringpdata)
-
- # processing thresholdCrossingAlert events
- if 'thresholdCrossingAlertFields' in jobj['event']:
- logger.debug('Found thresholdCrossingAlertFields')
- process_thresholdCrossingAlert_event(domain,
- jobj['event']['thresholdCrossingAlertFields'],
- pdata,
- nonstringpdata)
-
- # processing fault events
- if 'faultFields' in jobj['event']:
- logger.debug('Found faultFields')
- process_fault_event(domain, jobj['event']['faultFields'], pdata, nonstringpdata)
-
- # process heartbeat events
- if 'heartbeatFields' in jobj['event']:
- logger.debug('Found Heartbeat')
- process_heartbeat_events(domain,
- jobj['event']['heartbeatFields'],
- pdata,
- nonstringpdata)
-
- # processing measurement events
- if 'measurementFields' in jobj['event']:
- logger.debug('Found measurementFields')
- process_measurement_events(domain,
- jobj['event']['measurementFields'],
- pdata,
- nonstringpdata,
- jobj['event']['commonEventHeader']['eventId'],
- jobj['event']['commonEventHeader']['startEpochMicrosec'],
- jobj['event']['commonEventHeader']['lastEpochMicrosec'])
-
-
def test_listener(environ, start_response, schema):
'''
Handler for the Test Collector Test Control API.
diff --git a/docker-compose.yaml b/docker-compose.yaml
index 4f5bbaf..4aeb25d 100644
--- a/docker-compose.yaml
+++ b/docker-compose.yaml
@@ -29,6 +29,9 @@
- 8880:3000
networks:
- ves-net
+ depends_on:
+ - ves-influxdb
+ - kafka-connector-influxdb
ves-zookeeper:
container_name: ves-zookeeper
image: confluentinc/cp-zookeeper:5.5.6
@@ -114,7 +117,7 @@
ves_influxdb_port: "8086"
ves_grafana_host: "ves-grafana"
ves_grafana_port: "3000"
- data_storage: "influxdb|elasticsearch"
+ data_storage: "elasticsearch"
elasticsearch_domain: "ves-elasticsearch"
kafka_host_2: "smo-kafka"
kafka_port_2: "29092"
@@ -132,6 +135,23 @@
- smo-kafka
- ves-influxdb
- ves-grafana
+ kafka-connector-influxdb:
+ container_name: kafka-connector-influxdb
+ build: ./kafka-connector-influxdb
+ image: influxdb-connector
+ networks:
+ - ves-net
+ ports:
+ - 9990:9990
+ environment:
+ ves_influxdb_host: "ves-influxdb"
+ ves_influxdb_port: "8086"
+ ves_loglevel: "ERROR"
+ kafka_host_2: "smo-kafka"
+ kafka_port_2: "29092"
+ depends_on:
+ - smo-kafka
+ - ves-influxdb
ves-agent:
container_name: ves-agent
build: ./agent
diff --git a/kafka-connector-influxdb/Dockerfile b/kafka-connector-influxdb/Dockerfile
new file mode 100755
index 0000000..89b3ac7
--- /dev/null
+++ b/kafka-connector-influxdb/Dockerfile
@@ -0,0 +1,27 @@
+# Copyright 2021 Xoriant Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM ubuntu:focal
+
+RUN apt-get update && apt-get -y upgrade
+RUN apt-get install -y git curl python3 python3-pip
+RUN pip3 install requests confluent-kafka
+
+# Clone influxdb-connector
+RUN mkdir -p /opt/ves/influxdb-connector
+ADD influxdb-connector /opt/ves/influxdb-connector
+
+COPY start.sh /opt/ves/start.sh
+ENTRYPOINT ["/bin/bash", "/opt/ves/start.sh"]
diff --git a/kafka-connector-influxdb/Makefile b/kafka-connector-influxdb/Makefile
new file mode 100755
index 0000000..637376f
--- /dev/null
+++ b/kafka-connector-influxdb/Makefile
@@ -0,0 +1,5 @@
+default: all
+
+all:
+ docker build -t influxdb-connector .
+
diff --git a/kafka-connector-influxdb/influxdb-connector/LICENSE.md b/kafka-connector-influxdb/influxdb-connector/LICENSE.md
new file mode 100755
index 0000000..9054138
--- /dev/null
+++ b/kafka-connector-influxdb/influxdb-connector/LICENSE.md
@@ -0,0 +1,14 @@
+# Copyright 2021 Xoriant Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
diff --git a/kafka-connector-influxdb/influxdb-connector/code/influxdb_connector.py b/kafka-connector-influxdb/influxdb-connector/code/influxdb_connector.py
new file mode 100644
index 0000000..34df978
--- /dev/null
+++ b/kafka-connector-influxdb/influxdb-connector/code/influxdb_connector.py
@@ -0,0 +1,413 @@
+# Copyright 2021 Xoriant Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import platform
+import json
+import logging
+from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
+import configparser
+import logging.handlers
+import requests
+from confluent_kafka import Consumer, KafkaError
+
+# ------------------------------------------------------------------------------
+# Address of influxdb server.
+# ------------------------------------------------------------------------------
+
+influxdb = '127.0.0.1'
+
+logger = None
+
+
+def send_to_influxdb(event, pdata):
+ url = 'http://{}/write?db=veseventsdb'.format(influxdb)
+ logger.debug('Send {} to influxdb at {}: {}'.format(event, influxdb, pdata))
+ r = requests.post(url, data=pdata, headers={'Content-Type': 'text/plain'})
+ logger.info('influxdb return code {}'.format(r.status_code))
+ if r.status_code != 204:
+ logger.debug('*** Influxdb save failed, return code {} ***'.format(r.status_code))
+
+
+def process_additional_measurements(val, domain, eventId, startEpochMicrosec, lastEpochMicrosec):
+ for additionalMeasurements in val:
+ pdata = domain + ",eventId={},system={}".format(eventId, source)
+ nonstringpdata = " startEpochMicrosec={},lastEpochMicrosec={},".format(startEpochMicrosec, lastEpochMicrosec)
+ for key, val in additionalMeasurements.items():
+ if isinstance(val, str):
+ pdata = pdata + ',{}={}'.format(key, process_special_char(val))
+ elif isinstance(val, dict):
+ for key2, val2 in val.items():
+ if isinstance(val2, str):
+ pdata = pdata + ',{}={}'.format(key2, process_special_char(val2))
+ else:
+ nonstringpdata = nonstringpdata + '{}={},'.format(key2, val2)
+ else:
+ nonstringpdata = nonstringpdata + '{}={},'.format(key, val)
+ send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp))
+
+
+def process_nonadditional_measurements(val, domain, eventId, startEpochMicrosec, lastEpochMicrosec):
+ for disk in val:
+ pdata = domain + ",eventId={},system={}".format(eventId, source)
+ nonstringpdata = " startEpochMicrosec={},lastEpochMicrosec={},".format(startEpochMicrosec, lastEpochMicrosec)
+ for key, val in disk.items():
+ if isinstance(val, str):
+ pdata = pdata + ',{}={}'.format(key, process_special_char(val))
+ else:
+ nonstringpdata = nonstringpdata + '{}={},'.format(key, val)
+
+ send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp))
+
+
+def process_pnfRegistration_event(domain, jobj, pdata, nonstringpdata):
+ pdata = pdata + ",system={}".format(source)
+ for key, val in jobj.items():
+ if key != 'additionalFields' and val != "":
+ if isinstance(val, str):
+ pdata = pdata + ',{}={}'.format(key, process_special_char(val))
+ else:
+ nonstringpdata = nonstringpdata + '{}={},'.format(key, val)
+ elif key == 'additionalFields':
+ for key2, val2 in val.items():
+ if val2 != "" and isinstance(val2, str):
+ pdata = pdata + ',{}={}'.format(key2, process_special_char(val2))
+ elif val2 != "":
+ nonstringpdata = nonstringpdata + '{}={},'.format(key2, val2)
+
+ send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp))
+
+
+def process_thresholdCrossingAlert_event(domain, jobj, pdata, nonstringpdata):
+ pdata = pdata + ",system={}".format(source)
+ for key, val in jobj.items():
+ if (key != 'additionalFields' and key != 'additionalParameters' and key != 'associatedAlertIdList') and val != "":
+ if isinstance(val, str):
+ pdata = pdata + ',{}={}'.format(key, process_special_char(val))
+ else:
+ nonstringpdata = nonstringpdata + '{}={},'.format(key, val)
+ elif key == 'additionalFields':
+ for key2, val2 in val.items():
+ if val2 != "" and isinstance(val2, str):
+ pdata = pdata + ',{}={}'.format(key2, process_special_char(val2))
+ elif val2 != "":
+ nonstringpdata = nonstringpdata + '{}={},'.format(key2, val2)
+ elif key == 'additionalParameters':
+ for addParameter in val:
+ for key2, val2 in addParameter.items():
+ if key2 != "hashMap":
+ if val2 != "" and isinstance(val2, str):
+ pdata = pdata + ',{}={}'.format(key2, process_special_char(val2))
+ elif val2 != "":
+ nonstringpdata = nonstringpdata + '{}={},'.format(key2, val2)
+ elif key2 == "hashMap":
+ for key3, val3 in val2.items():
+ if val3 != "" and isinstance(val3, str):
+ pdata = pdata + ',{}={}'.format(key3, process_special_char(val3))
+ elif val3 != "":
+ nonstringpdata = nonstringpdata + '{}={},'.format(key3, val3)
+ elif key == 'associatedAlertIdList':
+ associatedAlertIdList = ""
+ for associatedAlertId in val:
+ associatedAlertIdList = associatedAlertIdList + associatedAlertId + "|"
+ if(associatedAlertIdList != ""):
+ pdata = pdata + ',{}={}'.format("associatedAlertIdList",
+ process_special_char(associatedAlertIdList)[:-1])
+
+ send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp))
+
+
+def process_fault_event(domain, jobj, pdata, nonstringpdata):
+ pdata = pdata + ",system={}".format(source)
+ for key, val in jobj.items():
+ if key != 'alarmAdditionalInformation' and val != "":
+ if isinstance(val, str):
+ pdata = pdata + ',{}={}'.format(key, process_special_char(val))
+ else:
+ nonstringpdata = nonstringpdata + '{}={},'.format(key, val)
+ elif key == 'alarmAdditionalInformation':
+ for key2, val2 in val.items():
+ if val2 != "" and isinstance(val2, str):
+ pdata = pdata + ',{}={}'.format(key2, process_special_char(val2))
+ elif val2 != "":
+ nonstringpdata = nonstringpdata + '{}={},'.format(key2, val2)
+
+ send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp))
+
+
+def process_heartbeat_events(domain, jobj, pdata, nonstringpdata):
+ pdata = pdata + ",system={}".format(source)
+ for key, val in jobj.items():
+ if key != 'additionalFields' and val != "":
+ if isinstance(val, str):
+ pdata = pdata + ',{}={}'.format(key, process_special_char(val))
+ else:
+ nonstringpdata = nonstringpdata + '{}={},'.format(key, val)
+ elif key == 'additionalFields':
+ for key2, val2 in val.items():
+ if val2 != "" and isinstance(val2, str):
+ pdata = pdata + ',{}={}'.format(key2, process_special_char(val2))
+ elif val2 != "":
+ nonstringpdata = nonstringpdata + '{}={},'.format(key2, val2)
+
+ send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp))
+
+
+def process_measurement_events(domain, jobj, pdata, nonstringpdata, eventId, startEpochMicrosec, lastEpochMicrosec):
+ pdata = pdata + ",system={}".format(source)
+ for key, val in jobj.items():
+ if val != "":
+ if isinstance(val, str):
+ pdata = pdata + ',{}={}'.format(key, process_special_char(val))
+ elif isinstance(val, list):
+ if key == 'additionalMeasurements':
+ process_additional_measurements(val, domain + "additionalmeasurements", eventId, startEpochMicrosec, lastEpochMicrosec)
+ elif key == 'cpuUsageArray':
+ process_nonadditional_measurements(val, domain + "cpuusage", eventId, startEpochMicrosec, lastEpochMicrosec)
+ elif key == 'diskUsageArray':
+ process_nonadditional_measurements(val, domain + "diskusage", eventId, startEpochMicrosec, lastEpochMicrosec)
+ elif key == 'memoryUsageArray':
+ process_nonadditional_measurements(val, domain + "memoryusage", eventId, startEpochMicrosec, lastEpochMicrosec)
+ elif key == 'nicPerformanceArray':
+ process_nonadditional_measurements(val, domain + "nicperformance", eventId, startEpochMicrosec, lastEpochMicrosec)
+ elif key == 'loadArray':
+ process_nonadditional_measurements(val, domain + "load", eventId, startEpochMicrosec, lastEpochMicrosec)
+ elif key == 'networkSliceArray':
+ process_nonadditional_measurements(val, domain + "networkslice", eventId, startEpochMicrosec, lastEpochMicrosec)
+ elif isinstance(val, dict):
+ for key2, val2 in val.items():
+ if isinstance(val2, str):
+ pdata = pdata + ',{}={}'.format(key2, process_special_char(val2))
+ else:
+ nonstringpdata = nonstringpdata + '{}={},'.format(key2, val2)
+ else:
+ nonstringpdata = nonstringpdata + '{}={},'.format(key, val)
+
+ send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp))
+
+
+def process_special_char(str):
+ for search_char, replace_char in {" ": "\ ", ",": "\,"}.items():
+ str = str.replace(search_char, replace_char)
+ return str
+
+
+def process_time(eventTimestamp):
+ eventTimestamp = str(eventTimestamp).replace(".", "")
+ while len(eventTimestamp) < 19:
+ eventTimestamp = eventTimestamp + "0"
+ return format(int(eventTimestamp))
+
+
+def save_event_in_db(body):
+ jobj = json.loads(body)
+ global source
+ global eventTimestamp
+ source = "unknown"
+
+ domain = jobj['event']['commonEventHeader']['domain']
+ eventTimestamp = jobj['event']['commonEventHeader']['startEpochMicrosec']
+ agent = jobj['event']['commonEventHeader']['reportingEntityName'].upper()
+ if "LOCALHOST" in agent:
+ agent = "computehost"
+ source = jobj['event']['commonEventHeader']['sourceId'].upper()
+
+ # processing common header part
+ pdata = domain
+
+ nonstringpdata = " "
+ commonHeaderObj = jobj['event']['commonEventHeader'].items()
+ for key, val in commonHeaderObj:
+ if val != "":
+ if (key != 'internalHeaderFields'):
+ if isinstance(val, str):
+ pdata = pdata + ',{}={}'.format(key, process_special_char(val))
+ else:
+ nonstringpdata = nonstringpdata + '{}={}'.format(key, val) + ','
+ if (key == 'internalHeaderFields'):
+ for key2, val2 in val.items():
+ if val2 != "":
+ if isinstance(val2, str):
+ pdata = pdata + ',{}={}'.format(key2, process_special_char(val2))
+ else:
+ nonstringpdata = nonstringpdata + '{}={}'.format(key2, val2) + ','
+
+ # processing pnfRegistration events
+ if 'pnfRegistrationFields' in jobj['event']:
+ logger.debug('Found pnfRegistrationFields')
+ process_pnfRegistration_event(domain,
+ jobj['event']['pnfRegistrationFields'],
+ pdata,
+ nonstringpdata)
+
+ # processing thresholdCrossingAlert events
+ if 'thresholdCrossingAlertFields' in jobj['event']:
+ logger.debug('Found thresholdCrossingAlertFields')
+ process_thresholdCrossingAlert_event(domain,
+ jobj['event']['thresholdCrossingAlertFields'],
+ pdata,
+ nonstringpdata)
+
+ # processing fault events
+ if 'faultFields' in jobj['event']:
+ logger.debug('Found faultFields')
+ process_fault_event(domain, jobj['event']['faultFields'], pdata, nonstringpdata)
+
+ # process heartbeat events
+ if 'heartbeatFields' in jobj['event']:
+ logger.debug('Found Heartbeat')
+ process_heartbeat_events(domain,
+ jobj['event']['heartbeatFields'],
+ pdata,
+ nonstringpdata)
+
+ # processing measurement events
+ if 'measurementFields' in jobj['event']:
+ logger.debug('Found measurementFields')
+ process_measurement_events(domain,
+ jobj['event']['measurementFields'],
+ pdata,
+ nonstringpdata,
+ jobj['event']['commonEventHeader']['eventId'],
+ jobj['event']['commonEventHeader']['startEpochMicrosec'],
+ jobj['event']['commonEventHeader']['lastEpochMicrosec'])
+
+
+def main():
+
+ # ----------------------------------------------------------------------
+ # Setup argument parser so we can parse the command-line.
+ # ----------------------------------------------------------------------
+ parser = ArgumentParser(description='',
+ formatter_class=ArgumentDefaultsHelpFormatter)
+ parser.add_argument('-i', '--influxdb',
+ dest='influxdb',
+ default='localhost',
+ help='InfluxDB server addresss')
+ parser.add_argument('-v', '--verbose',
+ dest='verbose',
+ action='count',
+ help='set verbosity level')
+ parser.add_argument('-c', '--config',
+ dest='config',
+ default='/opt/ves/connector/config/consumer.conf',
+ help='Use this config file.',
+ metavar='<file>')
+ parser.add_argument('-s', '--section',
+ dest='section',
+ default='default',
+ metavar='<section>',
+ help='section to use in the config file')
+
+ # ----------------------------------------------------------------------
+ # Process arguments received.
+ # ----------------------------------------------------------------------
+ args = parser.parse_args()
+ config_file = args.config
+ verbose = args.verbose
+ config_section = args.section
+
+ # ----------------------------------------------------------------------
+ # Now read the config file, using command-line supplied values as
+ # overrides.
+ # ----------------------------------------------------------------------
+ overrides = {}
+ config = configparser.ConfigParser()
+ config['defaults'] = {'log_file': 'influxdbConnector.log'
+ }
+ config.read(config_file)
+
+ # ----------------------------------------------------------------------
+ # extract the values we want.
+ # ----------------------------------------------------------------------
+
+ global influxdb
+ global kafka_server
+
+ influxdb = config.get(config_section, 'influxdb', vars=overrides)
+ log_file = config.get(config_section, 'log_file', vars=overrides)
+ kafka_server = config.get(config_section, 'kafka_server', vars=overrides)
+
+ # ----------------------------------------------------------------------
+ # Finally we have enough info to start a proper flow trace.
+ # ----------------------------------------------------------------------
+ global logger
+ logger = logging.getLogger('monitor')
+
+ if ((verbose is not None) and (verbose > 0)):
+ logger.info('Verbose mode on')
+ logger.setLevel(logging.DEBUG)
+ else:
+ logger.setLevel(logging.DEBUG)
+ handler = logging.handlers.RotatingFileHandler(log_file,
+ maxBytes=1000000,
+ backupCount=10)
+ if (platform.system() == 'Windows'):
+ date_format = '%Y-%m-%d %H:%M:%S'
+ else:
+ date_format = '%Y-%m-%d %H:%M:%S.%f %z'
+ formatter = logging.Formatter('%(asctime)s %(name)s - '
+ '%(levelname)s - %(message)s',
+ date_format)
+ handler.setFormatter(formatter)
+ logger.addHandler(handler)
+ logger.info('Started')
+ # ----------------------------------------------------------------------
+ # Log the details of the configuration.
+ # ----------------------------------------------------------------------
+ logger.debug('Log file = {0}'.format(log_file))
+ logger.debug('Influxdb server = {0}'.format(influxdb))
+ logger.debug('kafka server = {0}'.format(kafka_server))
+
+ # ----------------------------------------------------------------------
+ # kafka Consumer code .
+ # ----------------------------------------------------------------------
+
+ settings = {
+ 'bootstrap.servers': kafka_server,
+ 'group.id': 'mygroup',
+ 'client.id': 'client-1',
+ 'enable.auto.commit': True,
+ 'session.timeout.ms': 6000,
+ 'default.topic.config': {'auto.offset.reset': 'earliest'}
+ }
+
+ c = Consumer(settings)
+
+ c.subscribe(['measurement', 'pnfregistration',
+ 'fault', 'thresholdcrossingalert', 'heartbeat'])
+
+ try:
+ while True:
+ msg = c.poll(0.1)
+ if msg is None:
+ continue
+ elif not msg.error():
+ logger.debug('Recived message from topic name {} and offset number {}'.format(msg.topic(), msg.offset()))
+ # saving data in influxdb
+ save_event_in_db(msg.value())
+ elif msg.error().code() == KafkaError._PARTITION_EOF:
+ logger.error('End of partition reached {0}/{1}'
+ .format(msg.topic(), msg.partition()))
+ else:
+ logger.error('Error occured: {0}'.format(msg.error().str()))
+
+ except KeyboardInterrupt:
+ pass
+
+ finally:
+ c.close()
+
+
+if __name__ == '__main__':
+ main()
diff --git a/kafka-connector-influxdb/influxdb-connector/config/influxdb_connector.conf b/kafka-connector-influxdb/influxdb-connector/config/influxdb_connector.conf
new file mode 100755
index 0000000..dc0fc5d
--- /dev/null
+++ b/kafka-connector-influxdb/influxdb-connector/config/influxdb_connector.conf
@@ -0,0 +1,4 @@
+[default]
+log_file = /opt/ves/influxdbconnector.log
+kafka_server =
+influxdb =
diff --git a/kafka-connector-influxdb/start.sh b/kafka-connector-influxdb/start.sh
new file mode 100755
index 0000000..301d785
--- /dev/null
+++ b/kafka-connector-influxdb/start.sh
@@ -0,0 +1,57 @@
+# Copyright 2021 Xoriant Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+cd /opt/ves
+touch monitor.log
+
+config_file="influxdb-connector/config/influxdb_connector.conf"
+
+sed -i -- "s/influxdb =/influxdb = $ves_influxdb_host:$ves_influxdb_port/g" \
+ $config_file
+sed -i -- "s/kafka_server =/kafka_server = $kafka_host_2:$kafka_port_2/g" \
+ $config_file
+
+echo; echo $config_file
+cat $config_file
+
+echo; echo "wait for InfluxDB API at $ves_influxdb_host:$ves_influxdb_port"
+STARTTIME=$(date +%s)
+max_time=60
+while ! curl http://$ves_influxdb_host:$ves_influxdb_port/ping ;
+ do
+ ELAPSED_TIME=$(($(date +%s) - $STARTTIME))
+ if [ $ELAPSED_TIME -ge $max_time ]; then
+ echo "InfluxDB API is not yet up after several attempts! Exiting from script."
+ exit 1
+ fi
+ echo "InfluxDB API is not yet responding... waiting 10 seconds"
+ sleep 10
+ done
+ echo "Done."
+echo; echo "setup veseventsdb in InfluxDB"
+# TODO: check if pre-existing and skip
+curl -X POST http://$ves_influxdb_host:$ves_influxdb_port/query \
+ --data-urlencode "q=CREATE DATABASE veseventsdb"
+
+if [ "$ves_loglevel" != "" ]; then
+ python3 /opt/ves/influxdb-connector/code/influxdb_connector.py \
+ --config /opt/ves/influxdb-connector/config/influxdb_connector.conf \
+ --influxdb $ves_influxdb_host:$ves_influxdb_port \
+ --section default > /opt/ves/monitor.log 2>&1
+else
+ python3 /opt/ves/influxdb-connector/code/influxdb_connector.py \
+ --config /opt/ves/influxdb-connector/config/influxdb_connector.conf \
+ --influxdb $ves_influxdb_host:$ves_influxdb_port \
+ --section default
+fi