Merge "Add connector between Kafka bus and InfluxdB to sync VES events"
diff --git a/collector/evel-test-collector/code/collector/monitor.py b/collector/evel-test-collector/code/collector/monitor.py
index 784139a..60a04f0 100755
--- a/collector/evel-test-collector/code/collector/monitor.py
+++ b/collector/evel-test-collector/code/collector/monitor.py
@@ -33,7 +33,6 @@
 import json
 import jsonschema
 from functools import partial
-import requests
 from datetime import timezone
 from elasticsearch import Elasticsearch
 from kafka import KafkaProducer
@@ -222,9 +221,6 @@
             # if word 'elasticsearch' exists in config file then save data in elasticsearch
             if('elasticsearch' in data_storageArr):
                 save_event_in_elasticsearch(body)
-            # if word 'influxdb' exists in config file then save data in influxdb
-            if('influxdb' in data_storageArr):
-                save_event_in_db(body)
 
         except jsonschema.SchemaError as e:
             logger.error('Schema is not valid! {0}'.format(e))
@@ -255,18 +251,6 @@
 
 
 # --------------------------------------------------------------------------
-# Send event to influxdb
-# --------------------------------------------------------------------------
-
-def send_to_influxdb(event, pdata):
-    url = 'http://{}/write?db=veseventsdb'.format(influxdb)
-    logger.debug('Send {} to influxdb at {}: {}'.format(event, influxdb, pdata))
-    r = requests.post(url, data=pdata, headers={'Content-Type': 'text/plain'})
-    logger.info('influxdb return code {}'.format(r.status_code))
-    if r.status_code != 204:
-        logger.debug('*** Influxdb save failed, return code {} ***'.format(r.status_code))
-
-# --------------------------------------------------------------------------
 # Save event data in Kafka
 # --------------------------------------------------------------------------
 def save_event_in_kafka(body):
@@ -299,6 +283,7 @@
 # Save event data in Elasticsearch
 # --------------------------------------------------------------------------
 
+
 def format_timestamp(EpochMicrosec):
     if isinstance(EpochMicrosec, int):
         return datetime.datetime.fromtimestamp(int(str(EpochMicrosec)[:10]), tz=timezone.utc)
@@ -401,252 +386,6 @@
         es.index(index=domain, body=jobj['event'])
 
 
-def process_additional_measurements(val, domain, eventId, startEpochMicrosec, lastEpochMicrosec):
-    for additionalMeasurements in val:
-        pdata = domain + ",eventId={},system={}".format(eventId, source)
-        nonstringpdata = " startEpochMicrosec={},lastEpochMicrosec={},".format(startEpochMicrosec, lastEpochMicrosec)
-        for key, val in additionalMeasurements.items():
-            if isinstance(val, str):
-                pdata = pdata + ',{}={}'.format(key, process_special_char(val))
-            elif isinstance(val, dict):
-                for key2, val2 in val.items():
-                    if isinstance(val2, str):
-                        pdata = pdata + ',{}={}'.format(key2, process_special_char(val2))
-                    else:
-                        nonstringpdata = nonstringpdata + '{}={},'.format(key2, val2)
-            else:
-                nonstringpdata = nonstringpdata + '{}={},'.format(key, val)
-    send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp))
-
-
-def process_nonadditional_measurements(val, domain, eventId, startEpochMicrosec, lastEpochMicrosec):
-    for disk in val:
-        pdata = domain + ",eventId={},system={}".format(eventId, source)
-        nonstringpdata = " startEpochMicrosec={},lastEpochMicrosec={},".format(startEpochMicrosec, lastEpochMicrosec)
-        for key, val in disk.items():
-            if isinstance(val, str):
-                pdata = pdata + ',{}={}'.format(key, process_special_char(val))
-            else:
-                nonstringpdata = nonstringpdata + '{}={},'.format(key, val)
-
-        send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp))
-
-
-def process_pnfRegistration_event(domain, jobj, pdata, nonstringpdata):
-    pdata = pdata + ",system={}".format(source)
-    for key, val in jobj.items():
-        if key != 'additionalFields' and val != "":
-            if isinstance(val, str):
-                pdata = pdata + ',{}={}'.format(key, process_special_char(val))
-            else:
-                nonstringpdata = nonstringpdata + '{}={},'.format(key, val)
-        elif key == 'additionalFields':
-            for key2, val2 in val.items():
-                if val2 != "" and isinstance(val2, str):
-                    pdata = pdata + ',{}={}'.format(key2, process_special_char(val2))
-                elif val2 != "":
-                    nonstringpdata = nonstringpdata + '{}={},'.format(key2, val2)
-
-    send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp))
-
-
-def process_thresholdCrossingAlert_event(domain, jobj, pdata, nonstringpdata):
-    pdata = pdata + ",system={}".format(source)
-    for key, val in jobj.items():
-        if (key != 'additionalFields' and key != 'additionalParameters' and key != 'associatedAlertIdList') and val != "":
-            if isinstance(val, str):
-                pdata = pdata + ',{}={}'.format(key, process_special_char(val))
-            else:
-                nonstringpdata = nonstringpdata + '{}={},'.format(key, val)
-        elif key == 'additionalFields':
-            for key2, val2 in val.items():
-                if val2 != "" and isinstance(val2, str):
-                    pdata = pdata + ',{}={}'.format(key2, process_special_char(val2))
-                elif val2 != "":
-                    nonstringpdata = nonstringpdata + '{}={},'.format(key2, val2)
-        elif key == 'additionalParameters':
-            for addParameter in val:
-                for key2, val2 in addParameter.items():
-                    if key2 != "hashMap":
-                        if val2 != "" and isinstance(val2, str):
-                            pdata = pdata + ',{}={}'.format(key2, process_special_char(val2))
-                        elif val2 != "":
-                            nonstringpdata = nonstringpdata + '{}={},'.format(key2, val2)
-                    elif key2 == "hashMap":
-                        for key3, val3 in val2.items():
-                            if val3 != "" and isinstance(val3, str):
-                                pdata = pdata + ',{}={}'.format(key3, process_special_char(val3))
-                            elif val3 != "":
-                                nonstringpdata = nonstringpdata + '{}={},'.format(key3, val3)
-        elif key == 'associatedAlertIdList':
-            associatedAlertIdList = ""
-            for associatedAlertId in val:
-                associatedAlertIdList = associatedAlertIdList + associatedAlertId + "|"
-                if(associatedAlertIdList != ""):
-                    pdata = pdata + ',{}={}'.format("associatedAlertIdList",
-                                                    process_special_char(associatedAlertIdList)[:-1])
-
-    send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp))
-
-
-def process_fault_event(domain, jobj, pdata, nonstringpdata):
-    pdata = pdata + ",system={}".format(source)
-    for key, val in jobj.items():
-        if key != 'alarmAdditionalInformation' and val != "":
-            if isinstance(val, str):
-                pdata = pdata + ',{}={}'.format(key, process_special_char(val))
-            else:
-                nonstringpdata = nonstringpdata + '{}={},'.format(key, val)
-        elif key == 'alarmAdditionalInformation':
-            for key2, val2 in val.items():
-                if val2 != "" and isinstance(val2, str):
-                    pdata = pdata + ',{}={}'.format(key2, process_special_char(val2))
-                elif val2 != "":
-                    nonstringpdata = nonstringpdata + '{}={},'.format(key2, val2)
-
-    send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp))
-
-
-def process_heartbeat_events(domain, jobj, pdata, nonstringpdata):
-    pdata = pdata + ",system={}".format(source)
-    for key, val in jobj.items():
-        if key != 'additionalFields' and val != "":
-            if isinstance(val, str):
-                pdata = pdata + ',{}={}'.format(key, process_special_char(val))
-            else:
-                nonstringpdata = nonstringpdata + '{}={},'.format(key, val)
-        elif key == 'additionalFields':
-            for key2, val2 in val.items():
-                if val2 != "" and isinstance(val2, str):
-                    pdata = pdata + ',{}={}'.format(key2, process_special_char(val2))
-                elif val2 != "":
-                    nonstringpdata = nonstringpdata + '{}={},'.format(key2, val2)
-
-    send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp))
-
-
-def process_measurement_events(domain, jobj, pdata, nonstringpdata, eventId, startEpochMicrosec, lastEpochMicrosec):
-    pdata = pdata + ",system={}".format(source)
-    for key, val in jobj.items():
-        if val != "":
-            if isinstance(val, str):
-                pdata = pdata + ',{}={}'.format(key, process_special_char(val))
-            elif isinstance(val, list):
-                if key == 'additionalMeasurements':
-                    process_additional_measurements(val, domain + "additionalmeasurements", eventId, startEpochMicrosec, lastEpochMicrosec)
-                elif key == 'cpuUsageArray':
-                    process_nonadditional_measurements(val, domain + "cpuusage", eventId, startEpochMicrosec, lastEpochMicrosec)
-                elif key == 'diskUsageArray':
-                    process_nonadditional_measurements(val, domain + "diskusage", eventId, startEpochMicrosec, lastEpochMicrosec)
-                elif key == 'memoryUsageArray':
-                    process_nonadditional_measurements(val, domain + "memoryusage", eventId, startEpochMicrosec, lastEpochMicrosec)
-                elif key == 'nicPerformanceArray':
-                    process_nonadditional_measurements(val, domain + "nicperformance", eventId, startEpochMicrosec, lastEpochMicrosec)
-                elif key == 'loadArray':
-                    process_nonadditional_measurements(val, domain + "load", eventId, startEpochMicrosec, lastEpochMicrosec)
-                elif key == 'networkSliceArray':
-                    process_nonadditional_measurements(val, domain + "networkslice", eventId, startEpochMicrosec, lastEpochMicrosec)
-            elif isinstance(val, dict):
-                for key2, val2 in val.items():
-                    if isinstance(val2, str):
-                        pdata = pdata + ',{}={}'.format(key2, process_special_char(val2))
-                    else:
-                        nonstringpdata = nonstringpdata + '{}={},'.format(key2, val2)
-            else:
-                nonstringpdata = nonstringpdata + '{}={},'.format(key, val)
-
-    send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp))
-
-
-def process_special_char(str):
-    for search_char, replace_char in {" ": "\ ", ",": "\,"}.items():
-        str = str.replace(search_char, replace_char)
-    return str
-
-
-def process_time(eventTimestamp):
-    eventTimestamp = str(eventTimestamp).replace(".", "")
-    while len(eventTimestamp) < 19:
-        eventTimestamp = eventTimestamp + "0"
-    return format(int(eventTimestamp))
-
-
-# --------------------------------------------------------------------------
-# Save event data
-# --------------------------------------------------------------------------
-def save_event_in_db(body):
-    global source
-    global eventTimestamp
-
-    jobj = json.loads(body)
-    source = "unknown"
-    domain = jobj['event']['commonEventHeader']['domain']
-    eventTimestamp = jobj['event']['commonEventHeader']['startEpochMicrosec']
-    agent = jobj['event']['commonEventHeader']['reportingEntityName'].upper()
-    if "LOCALHOST" in agent:
-        agent = "computehost"
-        source = jobj['event']['commonEventHeader']['sourceId'].upper()
-
-    # processing common header part
-    pdata = domain
-    nonstringpdata = " "
-    commonHeaderObj = jobj['event']['commonEventHeader'].items()
-    for key, val in commonHeaderObj:
-        if val != "":
-            if (key != 'internalHeaderFields'):
-                if isinstance(val, str):
-                    pdata = pdata + ',{}={}'.format(key, process_special_char(val))
-                else:
-                    nonstringpdata = nonstringpdata + '{}={}'.format(key, val) + ','
-            if (key == 'internalHeaderFields'):
-                for key2, val2 in val.items():
-                    if val2 != "":
-                        if isinstance(val2, str):
-                            pdata = pdata + ',{}={}'.format(key2, process_special_char(val2))
-                        else:
-                            nonstringpdata = nonstringpdata + '{}={}'.format(key2, val2) + ','
-
-    # processing pnfRegistration events
-    if 'pnfRegistrationFields' in jobj['event']:
-        logger.debug('Found pnfRegistrationFields')
-        process_pnfRegistration_event(domain,
-                                      jobj['event']['pnfRegistrationFields'],
-                                      pdata,
-                                      nonstringpdata)
-
-    # processing thresholdCrossingAlert events
-    if 'thresholdCrossingAlertFields' in jobj['event']:
-        logger.debug('Found thresholdCrossingAlertFields')
-        process_thresholdCrossingAlert_event(domain,
-                                             jobj['event']['thresholdCrossingAlertFields'],
-                                             pdata,
-                                             nonstringpdata)
-
-    # processing fault events
-    if 'faultFields' in jobj['event']:
-        logger.debug('Found faultFields')
-        process_fault_event(domain, jobj['event']['faultFields'], pdata, nonstringpdata)
-
-    # process heartbeat events
-    if 'heartbeatFields' in jobj['event']:
-        logger.debug('Found Heartbeat')
-        process_heartbeat_events(domain,
-                                 jobj['event']['heartbeatFields'],
-                                 pdata,
-                                 nonstringpdata)
-
-    # processing measurement events
-    if 'measurementFields' in jobj['event']:
-        logger.debug('Found measurementFields')
-        process_measurement_events(domain,
-                                   jobj['event']['measurementFields'],
-                                   pdata,
-                                   nonstringpdata,
-                                   jobj['event']['commonEventHeader']['eventId'],
-                                   jobj['event']['commonEventHeader']['startEpochMicrosec'],
-                                   jobj['event']['commonEventHeader']['lastEpochMicrosec'])
-
-
 def test_listener(environ, start_response, schema):
     '''
     Handler for the Test Collector Test Control API.
diff --git a/docker-compose.yaml b/docker-compose.yaml
index 4f5bbaf..4aeb25d 100644
--- a/docker-compose.yaml
+++ b/docker-compose.yaml
@@ -29,6 +29,9 @@
              - 8880:3000
         networks:
              - ves-net
+        depends_on:
+             - ves-influxdb
+             - kafka-connector-influxdb
   ves-zookeeper:
         container_name: ves-zookeeper
         image: confluentinc/cp-zookeeper:5.5.6
@@ -114,7 +117,7 @@
                  ves_influxdb_port: "8086"
                  ves_grafana_host: "ves-grafana"
                  ves_grafana_port: "3000"
-                 data_storage: "influxdb|elasticsearch"
+                 data_storage: "elasticsearch"
                  elasticsearch_domain: "ves-elasticsearch"
                  kafka_host_2: "smo-kafka"
                  kafka_port_2: "29092"
@@ -132,6 +135,23 @@
              - smo-kafka
              - ves-influxdb
              - ves-grafana
+  kafka-connector-influxdb:
+         container_name: kafka-connector-influxdb
+         build: ./kafka-connector-influxdb
+         image: influxdb-connector
+         networks:
+             - ves-net
+         ports:
+             - 9990:9990
+         environment:
+                 ves_influxdb_host: "ves-influxdb"
+                 ves_influxdb_port: "8086"
+                 ves_loglevel: "ERROR"
+                 kafka_host_2: "smo-kafka"
+                 kafka_port_2: "29092"
+         depends_on:
+             - smo-kafka
+             - ves-influxdb
   ves-agent:
           container_name: ves-agent
           build: ./agent
diff --git a/kafka-connector-influxdb/Dockerfile b/kafka-connector-influxdb/Dockerfile
new file mode 100755
index 0000000..89b3ac7
--- /dev/null
+++ b/kafka-connector-influxdb/Dockerfile
@@ -0,0 +1,27 @@
+# Copyright 2021 Xoriant Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM ubuntu:focal
+
+RUN apt-get update && apt-get -y upgrade
+RUN apt-get install -y git curl python3 python3-pip
+RUN pip3 install requests confluent-kafka
+
+# Clone influxdb-connector
+RUN mkdir -p /opt/ves/influxdb-connector
+ADD influxdb-connector /opt/ves/influxdb-connector
+
+COPY start.sh /opt/ves/start.sh
+ENTRYPOINT ["/bin/bash", "/opt/ves/start.sh"]
diff --git a/kafka-connector-influxdb/Makefile b/kafka-connector-influxdb/Makefile
new file mode 100755
index 0000000..637376f
--- /dev/null
+++ b/kafka-connector-influxdb/Makefile
@@ -0,0 +1,5 @@
+default: all
+
+all:
+	docker build -t influxdb-connector .
+
diff --git a/kafka-connector-influxdb/influxdb-connector/LICENSE.md b/kafka-connector-influxdb/influxdb-connector/LICENSE.md
new file mode 100755
index 0000000..9054138
--- /dev/null
+++ b/kafka-connector-influxdb/influxdb-connector/LICENSE.md
@@ -0,0 +1,14 @@
+# Copyright 2021 Xoriant Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
diff --git a/kafka-connector-influxdb/influxdb-connector/code/influxdb_connector.py b/kafka-connector-influxdb/influxdb-connector/code/influxdb_connector.py
new file mode 100644
index 0000000..34df978
--- /dev/null
+++ b/kafka-connector-influxdb/influxdb-connector/code/influxdb_connector.py
@@ -0,0 +1,413 @@
+# Copyright 2021 Xoriant Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import platform
+import json
+import logging
+from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
+import configparser
+import logging.handlers
+import requests
+from confluent_kafka import Consumer, KafkaError
+
+# ------------------------------------------------------------------------------
+# Address of influxdb server.
+# ------------------------------------------------------------------------------
+
+influxdb = '127.0.0.1'
+
+logger = None
+
+
+def send_to_influxdb(event, pdata):
+    url = 'http://{}/write?db=veseventsdb'.format(influxdb)
+    logger.debug('Send {} to influxdb at {}: {}'.format(event, influxdb, pdata))
+    r = requests.post(url, data=pdata, headers={'Content-Type': 'text/plain'})
+    logger.info('influxdb return code {}'.format(r.status_code))
+    if r.status_code != 204:
+        logger.debug('*** Influxdb save failed, return code {} ***'.format(r.status_code))
+
+
+def process_additional_measurements(val, domain, eventId, startEpochMicrosec, lastEpochMicrosec):
+    for additionalMeasurements in val:
+        pdata = domain + ",eventId={},system={}".format(eventId, source)
+        nonstringpdata = " startEpochMicrosec={},lastEpochMicrosec={},".format(startEpochMicrosec, lastEpochMicrosec)
+        for key, val in additionalMeasurements.items():
+            if isinstance(val, str):
+                pdata = pdata + ',{}={}'.format(key, process_special_char(val))
+            elif isinstance(val, dict):
+                for key2, val2 in val.items():
+                    if isinstance(val2, str):
+                        pdata = pdata + ',{}={}'.format(key2, process_special_char(val2))
+                    else:
+                        nonstringpdata = nonstringpdata + '{}={},'.format(key2, val2)
+            else:
+                nonstringpdata = nonstringpdata + '{}={},'.format(key, val)
+    send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp))
+
+
+def process_nonadditional_measurements(val, domain, eventId, startEpochMicrosec, lastEpochMicrosec):
+    for disk in val:
+        pdata = domain + ",eventId={},system={}".format(eventId, source)
+        nonstringpdata = " startEpochMicrosec={},lastEpochMicrosec={},".format(startEpochMicrosec, lastEpochMicrosec)
+        for key, val in disk.items():
+            if isinstance(val, str):
+                pdata = pdata + ',{}={}'.format(key, process_special_char(val))
+            else:
+                nonstringpdata = nonstringpdata + '{}={},'.format(key, val)
+
+        send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp))
+
+
+def process_pnfRegistration_event(domain, jobj, pdata, nonstringpdata):
+    pdata = pdata + ",system={}".format(source)
+    for key, val in jobj.items():
+        if key != 'additionalFields' and val != "":
+            if isinstance(val, str):
+                pdata = pdata + ',{}={}'.format(key, process_special_char(val))
+            else:
+                nonstringpdata = nonstringpdata + '{}={},'.format(key, val)
+        elif key == 'additionalFields':
+            for key2, val2 in val.items():
+                if val2 != "" and isinstance(val2, str):
+                    pdata = pdata + ',{}={}'.format(key2, process_special_char(val2))
+                elif val2 != "":
+                    nonstringpdata = nonstringpdata + '{}={},'.format(key2, val2)
+
+    send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp))
+
+
+def process_thresholdCrossingAlert_event(domain, jobj, pdata, nonstringpdata):
+    pdata = pdata + ",system={}".format(source)
+    for key, val in jobj.items():
+        if (key != 'additionalFields' and key != 'additionalParameters' and key != 'associatedAlertIdList') and val != "":
+            if isinstance(val, str):
+                pdata = pdata + ',{}={}'.format(key, process_special_char(val))
+            else:
+                nonstringpdata = nonstringpdata + '{}={},'.format(key, val)
+        elif key == 'additionalFields':
+            for key2, val2 in val.items():
+                if val2 != "" and isinstance(val2, str):
+                    pdata = pdata + ',{}={}'.format(key2, process_special_char(val2))
+                elif val2 != "":
+                    nonstringpdata = nonstringpdata + '{}={},'.format(key2, val2)
+        elif key == 'additionalParameters':
+            for addParameter in val:
+                for key2, val2 in addParameter.items():
+                    if key2 != "hashMap":
+                        if val2 != "" and isinstance(val2, str):
+                            pdata = pdata + ',{}={}'.format(key2, process_special_char(val2))
+                        elif val2 != "":
+                            nonstringpdata = nonstringpdata + '{}={},'.format(key2, val2)
+                    elif key2 == "hashMap":
+                        for key3, val3 in val2.items():
+                            if val3 != "" and isinstance(val3, str):
+                                pdata = pdata + ',{}={}'.format(key3, process_special_char(val3))
+                            elif val3 != "":
+                                nonstringpdata = nonstringpdata + '{}={},'.format(key3, val3)
+        elif key == 'associatedAlertIdList':
+            associatedAlertIdList = ""
+            for associatedAlertId in val:
+                associatedAlertIdList = associatedAlertIdList + associatedAlertId + "|"
+                if(associatedAlertIdList != ""):
+                    pdata = pdata + ',{}={}'.format("associatedAlertIdList",
+                                                    process_special_char(associatedAlertIdList)[:-1])
+
+    send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp))
+
+
+def process_fault_event(domain, jobj, pdata, nonstringpdata):
+    pdata = pdata + ",system={}".format(source)
+    for key, val in jobj.items():
+        if key != 'alarmAdditionalInformation' and val != "":
+            if isinstance(val, str):
+                pdata = pdata + ',{}={}'.format(key, process_special_char(val))
+            else:
+                nonstringpdata = nonstringpdata + '{}={},'.format(key, val)
+        elif key == 'alarmAdditionalInformation':
+            for key2, val2 in val.items():
+                if val2 != "" and isinstance(val2, str):
+                    pdata = pdata + ',{}={}'.format(key2, process_special_char(val2))
+                elif val2 != "":
+                    nonstringpdata = nonstringpdata + '{}={},'.format(key2, val2)
+
+    send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp))
+
+
+def process_heartbeat_events(domain, jobj, pdata, nonstringpdata):
+    pdata = pdata + ",system={}".format(source)
+    for key, val in jobj.items():
+        if key != 'additionalFields' and val != "":
+            if isinstance(val, str):
+                pdata = pdata + ',{}={}'.format(key, process_special_char(val))
+            else:
+                nonstringpdata = nonstringpdata + '{}={},'.format(key, val)
+        elif key == 'additionalFields':
+            for key2, val2 in val.items():
+                if val2 != "" and isinstance(val2, str):
+                    pdata = pdata + ',{}={}'.format(key2, process_special_char(val2))
+                elif val2 != "":
+                    nonstringpdata = nonstringpdata + '{}={},'.format(key2, val2)
+
+    send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp))
+
+
+def process_measurement_events(domain, jobj, pdata, nonstringpdata, eventId, startEpochMicrosec, lastEpochMicrosec):
+    pdata = pdata + ",system={}".format(source)
+    for key, val in jobj.items():
+        if val != "":
+            if isinstance(val, str):
+                pdata = pdata + ',{}={}'.format(key, process_special_char(val))
+            elif isinstance(val, list):
+                if key == 'additionalMeasurements':
+                    process_additional_measurements(val, domain + "additionalmeasurements", eventId, startEpochMicrosec, lastEpochMicrosec)
+                elif key == 'cpuUsageArray':
+                    process_nonadditional_measurements(val, domain + "cpuusage", eventId, startEpochMicrosec, lastEpochMicrosec)
+                elif key == 'diskUsageArray':
+                    process_nonadditional_measurements(val, domain + "diskusage", eventId, startEpochMicrosec, lastEpochMicrosec)
+                elif key == 'memoryUsageArray':
+                    process_nonadditional_measurements(val, domain + "memoryusage", eventId, startEpochMicrosec, lastEpochMicrosec)
+                elif key == 'nicPerformanceArray':
+                    process_nonadditional_measurements(val, domain + "nicperformance", eventId, startEpochMicrosec, lastEpochMicrosec)
+                elif key == 'loadArray':
+                    process_nonadditional_measurements(val, domain + "load", eventId, startEpochMicrosec, lastEpochMicrosec)
+                elif key == 'networkSliceArray':
+                    process_nonadditional_measurements(val, domain + "networkslice", eventId, startEpochMicrosec, lastEpochMicrosec)
+            elif isinstance(val, dict):
+                for key2, val2 in val.items():
+                    if isinstance(val2, str):
+                        pdata = pdata + ',{}={}'.format(key2, process_special_char(val2))
+                    else:
+                        nonstringpdata = nonstringpdata + '{}={},'.format(key2, val2)
+            else:
+                nonstringpdata = nonstringpdata + '{}={},'.format(key, val)
+
+    send_to_influxdb(domain, pdata + nonstringpdata[:-1] + ' ' + process_time(eventTimestamp))
+
+
+def process_special_char(str):
+    for search_char, replace_char in {" ": "\ ", ",": "\,"}.items():
+        str = str.replace(search_char, replace_char)
+    return str
+
+
+def process_time(eventTimestamp):
+    eventTimestamp = str(eventTimestamp).replace(".", "")
+    while len(eventTimestamp) < 19:
+        eventTimestamp = eventTimestamp + "0"
+    return format(int(eventTimestamp))
+
+
+def save_event_in_db(body):
+    jobj = json.loads(body)
+    global source
+    global eventTimestamp
+    source = "unknown"
+
+    domain = jobj['event']['commonEventHeader']['domain']
+    eventTimestamp = jobj['event']['commonEventHeader']['startEpochMicrosec']
+    agent = jobj['event']['commonEventHeader']['reportingEntityName'].upper()
+    if "LOCALHOST" in agent:
+        agent = "computehost"
+        source = jobj['event']['commonEventHeader']['sourceId'].upper()
+
+    # processing common header part
+    pdata = domain
+
+    nonstringpdata = " "
+    commonHeaderObj = jobj['event']['commonEventHeader'].items()
+    for key, val in commonHeaderObj:
+        if val != "":
+            if (key != 'internalHeaderFields'):
+                if isinstance(val, str):
+                    pdata = pdata + ',{}={}'.format(key, process_special_char(val))
+                else:
+                    nonstringpdata = nonstringpdata + '{}={}'.format(key, val) + ','
+            if (key == 'internalHeaderFields'):
+                for key2, val2 in val.items():
+                    if val2 != "":
+                        if isinstance(val2, str):
+                            pdata = pdata + ',{}={}'.format(key2, process_special_char(val2))
+                        else:
+                            nonstringpdata = nonstringpdata + '{}={}'.format(key2, val2) + ','
+
+    # processing pnfRegistration events
+    if 'pnfRegistrationFields' in jobj['event']:
+        logger.debug('Found pnfRegistrationFields')
+        process_pnfRegistration_event(domain,
+                                      jobj['event']['pnfRegistrationFields'],
+                                      pdata,
+                                      nonstringpdata)
+
+    # processing thresholdCrossingAlert events
+    if 'thresholdCrossingAlertFields' in jobj['event']:
+        logger.debug('Found thresholdCrossingAlertFields')
+        process_thresholdCrossingAlert_event(domain,
+                                             jobj['event']['thresholdCrossingAlertFields'],
+                                             pdata,
+                                             nonstringpdata)
+
+    # processing fault events
+    if 'faultFields' in jobj['event']:
+        logger.debug('Found faultFields')
+        process_fault_event(domain, jobj['event']['faultFields'], pdata, nonstringpdata)
+
+    # process heartbeat events
+    if 'heartbeatFields' in jobj['event']:
+        logger.debug('Found Heartbeat')
+        process_heartbeat_events(domain,
+                                 jobj['event']['heartbeatFields'],
+                                 pdata,
+                                 nonstringpdata)
+
+    # processing measurement events
+    if 'measurementFields' in jobj['event']:
+        logger.debug('Found measurementFields')
+        process_measurement_events(domain,
+                                   jobj['event']['measurementFields'],
+                                   pdata,
+                                   nonstringpdata,
+                                   jobj['event']['commonEventHeader']['eventId'],
+                                   jobj['event']['commonEventHeader']['startEpochMicrosec'],
+                                   jobj['event']['commonEventHeader']['lastEpochMicrosec'])
+
+
+def main():
+
+    # ----------------------------------------------------------------------
+    # Setup argument parser so we can parse the command-line.
+    # ----------------------------------------------------------------------
+    parser = ArgumentParser(description='',
+                            formatter_class=ArgumentDefaultsHelpFormatter)
+    parser.add_argument('-i', '--influxdb',
+                        dest='influxdb',
+                        default='localhost',
+                        help='InfluxDB server addresss')
+    parser.add_argument('-v', '--verbose',
+                        dest='verbose',
+                        action='count',
+                        help='set verbosity level')
+    parser.add_argument('-c', '--config',
+                        dest='config',
+                        default='/opt/ves/connector/config/consumer.conf',
+                        help='Use this config file.',
+                        metavar='<file>')
+    parser.add_argument('-s', '--section',
+                        dest='section',
+                        default='default',
+                        metavar='<section>',
+                        help='section to use in the config file')
+
+    # ----------------------------------------------------------------------
+    # Process arguments received.
+    # ----------------------------------------------------------------------
+    args = parser.parse_args()
+    config_file = args.config
+    verbose = args.verbose
+    config_section = args.section
+
+    # ----------------------------------------------------------------------
+    # Now read the config file, using command-line supplied values as
+    # overrides.
+    # ----------------------------------------------------------------------
+    overrides = {}
+    config = configparser.ConfigParser()
+    config['defaults'] = {'log_file': 'influxdbConnector.log'
+                          }
+    config.read(config_file)
+
+    # ----------------------------------------------------------------------
+    # extract the values we want.
+    # ----------------------------------------------------------------------
+
+    global influxdb
+    global kafka_server
+
+    influxdb = config.get(config_section, 'influxdb', vars=overrides)
+    log_file = config.get(config_section, 'log_file', vars=overrides)
+    kafka_server = config.get(config_section, 'kafka_server', vars=overrides)
+
+    # ----------------------------------------------------------------------
+    # Finally we have enough info to start a proper flow trace.
+    # ----------------------------------------------------------------------
+    global logger
+    logger = logging.getLogger('monitor')
+
+    if ((verbose is not None) and (verbose > 0)):
+        logger.info('Verbose mode on')
+        logger.setLevel(logging.DEBUG)
+    else:
+        logger.setLevel(logging.DEBUG)
+    handler = logging.handlers.RotatingFileHandler(log_file,
+                                                   maxBytes=1000000,
+                                                   backupCount=10)
+    if (platform.system() == 'Windows'):
+        date_format = '%Y-%m-%d %H:%M:%S'
+    else:
+        date_format = '%Y-%m-%d %H:%M:%S.%f %z'
+        formatter = logging.Formatter('%(asctime)s %(name)s - '
+                                      '%(levelname)s - %(message)s',
+                                      date_format)
+    handler.setFormatter(formatter)
+    logger.addHandler(handler)
+    logger.info('Started')
+    # ----------------------------------------------------------------------
+    # Log the details of the configuration.
+    # ----------------------------------------------------------------------
+    logger.debug('Log file = {0}'.format(log_file))
+    logger.debug('Influxdb server = {0}'.format(influxdb))
+    logger.debug('kafka server = {0}'.format(kafka_server))
+
+    # ----------------------------------------------------------------------
+    # kafka Consumer code .
+    # ----------------------------------------------------------------------
+
+    settings = {
+        'bootstrap.servers': kafka_server,
+        'group.id': 'mygroup',
+        'client.id': 'client-1',
+        'enable.auto.commit': True,
+        'session.timeout.ms': 6000,
+        'default.topic.config': {'auto.offset.reset': 'earliest'}
+    }
+
+    c = Consumer(settings)
+
+    c.subscribe(['measurement', 'pnfregistration',
+                 'fault', 'thresholdcrossingalert', 'heartbeat'])
+
+    try:
+        while True:
+            msg = c.poll(0.1)
+            if msg is None:
+                continue
+            elif not msg.error():
+                logger.debug('Recived message from topic name {} and offset number {}'.format(msg.topic(), msg.offset()))
+                # saving data in influxdb
+                save_event_in_db(msg.value())
+            elif msg.error().code() == KafkaError._PARTITION_EOF:
+                logger.error('End of partition reached {0}/{1}'
+                             .format(msg.topic(), msg.partition()))
+            else:
+                logger.error('Error occured: {0}'.format(msg.error().str()))
+
+    except KeyboardInterrupt:
+        pass
+
+    finally:
+        c.close()
+
+
+if __name__ == '__main__':
+    main()
diff --git a/kafka-connector-influxdb/influxdb-connector/config/influxdb_connector.conf b/kafka-connector-influxdb/influxdb-connector/config/influxdb_connector.conf
new file mode 100755
index 0000000..dc0fc5d
--- /dev/null
+++ b/kafka-connector-influxdb/influxdb-connector/config/influxdb_connector.conf
@@ -0,0 +1,4 @@
+[default]
+log_file = /opt/ves/influxdbconnector.log
+kafka_server =
+influxdb =
diff --git a/kafka-connector-influxdb/start.sh b/kafka-connector-influxdb/start.sh
new file mode 100755
index 0000000..301d785
--- /dev/null
+++ b/kafka-connector-influxdb/start.sh
@@ -0,0 +1,57 @@
+# Copyright 2021 Xoriant Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+cd /opt/ves
+touch monitor.log
+
+config_file="influxdb-connector/config/influxdb_connector.conf"
+
+sed -i -- "s/influxdb =/influxdb = $ves_influxdb_host:$ves_influxdb_port/g" \
+  $config_file
+sed -i -- "s/kafka_server =/kafka_server = $kafka_host_2:$kafka_port_2/g" \
+  $config_file
+
+echo; echo $config_file
+cat $config_file
+
+echo; echo "wait for InfluxDB API at $ves_influxdb_host:$ves_influxdb_port"
+STARTTIME=$(date +%s)
+max_time=60
+while ! curl http://$ves_influxdb_host:$ves_influxdb_port/ping ;
+   do
+     ELAPSED_TIME=$(($(date +%s) - $STARTTIME))
+     if [ $ELAPSED_TIME -ge $max_time ]; then
+        echo "InfluxDB API is not yet up after several attempts! Exiting from script."
+        exit 1
+     fi
+     echo "InfluxDB API is not yet responding... waiting 10 seconds"
+     sleep 10
+   done
+   echo "Done."
+echo; echo "setup veseventsdb in InfluxDB"
+# TODO: check if pre-existing and skip
+curl -X POST http://$ves_influxdb_host:$ves_influxdb_port/query \
+  --data-urlencode "q=CREATE DATABASE veseventsdb"
+
+if [ "$ves_loglevel" != "" ]; then
+  python3 /opt/ves/influxdb-connector/code/influxdb_connector.py \
+    --config /opt/ves/influxdb-connector/config/influxdb_connector.conf \
+    --influxdb $ves_influxdb_host:$ves_influxdb_port \
+    --section default > /opt/ves/monitor.log 2>&1
+else
+  python3 /opt/ves/influxdb-connector/code/influxdb_connector.py  \
+    --config /opt/ves/influxdb-connector/config/influxdb_connector.conf \
+    --influxdb $ves_influxdb_host:$ves_influxdb_port \
+    --section default
+fi