Add support for DMaaP in SMO
SMO is planning on using Kafka bus as the message bus. DMaaP is nothing but a wrapper around the Kafka bus. Since there are legacy applications, mostly from ONAP that need support for reading from or writing to the message bus using DMaaP. This task will track the work related to that support.
Issue-Id: SMO-18
Signed-off-by: santanude <santanu.de@xoriant.com>
Change-Id: I31303440051fd5823ec2bb3199011d3920de6e13
Signed-off-by: santanude <santanu.de@xoriant.com>
diff --git a/dmaapadapter/Dockerfile b/dmaapadapter/Dockerfile
new file mode 100755
index 0000000..977c6f1
--- /dev/null
+++ b/dmaapadapter/Dockerfile
@@ -0,0 +1,31 @@
+# Copyright 2021 Xoriant Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM ubuntu:focal
+
+RUN apt-get update && apt-get -y upgrade
+RUN apt-get install -y git curl python3 python3-pip
+RUN pip3 install requests jsonschema kafka-python flask confluent-kafka
+
+
+RUN mkdir /opt/ves
+
+# Clone adapter folder
+RUN mkdir /opt/ves/adapter
+ADD adapter /opt/ves/adapter
+
+
+COPY start.sh /opt/ves/start.sh
+ENTRYPOINT ["/bin/bash", "/opt/ves/start.sh"]
diff --git a/dmaapadapter/Makefile b/dmaapadapter/Makefile
new file mode 100755
index 0000000..929f921
--- /dev/null
+++ b/dmaapadapter/Makefile
@@ -0,0 +1,20 @@
+# Copyright 2021 Xoriant Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+default: all
+
+all:
+ docker build -t ves-dmaap-adapter .
+
diff --git a/dmaapadapter/adapter/LICENSE.md b/dmaapadapter/adapter/LICENSE.md
new file mode 100755
index 0000000..9054138
--- /dev/null
+++ b/dmaapadapter/adapter/LICENSE.md
@@ -0,0 +1,14 @@
+# Copyright 2021 Xoriant Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
diff --git a/dmaapadapter/adapter/code/app_config.py b/dmaapadapter/adapter/code/app_config.py
new file mode 100644
index 0000000..fb5be95
--- /dev/null
+++ b/dmaapadapter/adapter/code/app_config.py
@@ -0,0 +1,76 @@
+# Copyright 2021 Xoriant Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
+import configparser
+import logging.handlers
+
+
+class AppConfig:
+ kafka_broker = ""
+ logger = logging.getLogger()
+
+ def __init__(self):
+ parser = ArgumentParser(description="",
+ formatter_class=ArgumentDefaultsHelpFormatter)
+ parser.add_argument('-c', '--config',
+ dest='config',
+ default='/opt/ves/adapter/config/adapter.conf',
+ help='Use this config file.')
+ parser.add_argument('-s', '--section',
+ dest='section',
+ default='default',
+ metavar='<section>',
+ help='section to use in the config file')
+
+ args = parser.parse_args()
+ config_file = args.config
+ config_section = args.section
+
+ overrides = {}
+ config = configparser.ConfigParser()
+ config.read(config_file)
+
+ self.kafka_broker = config.get(config_section,
+ 'kafka_broker',
+ vars=overrides)
+ log_file = config.get(config_section, 'log_file', vars=overrides)
+ log_level = config.get(config_section, 'log_level', vars=overrides)
+
+ handler = logging.handlers.RotatingFileHandler(log_file,
+ maxBytes=1000000,
+ backupCount=10)
+ formatter = logging.Formatter('%(asctime)s %(name)s - '
+ '%(levelname)s - %(message)s',
+ '%Y-%m-%d %H:%M:%S.%f %z')
+ handler.setFormatter(formatter)
+ self.logger.addHandler(handler)
+
+ # we are going to set the log level
+ if (log_level == 'DEBUG'):
+ self.logger.setLevel(logging.DEBUG)
+ elif (log_level == 'ERROR'):
+ self.logger.setLevel(logging.ERROR)
+ else:
+ self.logger.setLevel(logging.INFO)
+
+ self.logger.info('Log level {} and log file {} : '
+ .format(log_level, log_file))
+
+ def getKafkaBroker(self):
+ return self.kafka_broker
+
+ def getLogger(self):
+ return self.logger
diff --git a/dmaapadapter/adapter/code/consumer.py b/dmaapadapter/adapter/code/consumer.py
new file mode 100644
index 0000000..62d744c
--- /dev/null
+++ b/dmaapadapter/adapter/code/consumer.py
@@ -0,0 +1,140 @@
+# Copyright 2021 Xoriant Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from confluent_kafka.admin import AdminClient
+from confluent_kafka import Consumer
+from app_config import AppConfig
+import logging
+import sys
+
+
+class EventConsumer:
+ broker = ""
+ logger = logging.getLogger()
+
+ def __init__(self):
+ appConfig = AppConfig()
+ self.logger = appConfig.getLogger()
+ self.broker = appConfig.getKafkaBroker()
+
+ def consumeEvents(self, topic, consumergroup, consumerid, limit, timeout):
+ self.logger.debug("topic={}, consumergroup={}, consumerid={}, limit={}, timeout={} "
+ .format(topic, consumergroup, consumerid, limit, timeout))
+ consumer_config = {
+ 'bootstrap.servers': self.broker,
+ 'group.id': consumergroup,
+ 'group.instance.id': consumerid,
+ 'auto.offset.reset': 'earliest',
+ 'enable.auto.commit': 'false'
+ }
+
+ consumer = Consumer(consumer_config)
+ consumer.subscribe([topic])
+ event_list = []
+
+ try:
+ ctr = 0
+ content_size = 0
+ while True:
+ if (ctr == int(limit)):
+ break
+ if(content_size > 300000):
+ break
+ ctr += 1
+ # read single message at a time
+ msg = consumer.poll(timeout=int(timeout))
+ if msg is None:
+ self.logger.debug("No records ")
+ break
+ if msg.error():
+ self.logger.debug("Error reading message : {}".format(msg.error()))
+ break
+ event = msg.value().decode('utf8').replace("'", '"')
+ content_size = content_size + sys.getsizeof(event)
+ event_list.append(event)
+ consumer.commit()
+
+ except Exception as ex:
+ self.logger.debug('Failed to get event information due to unexpected reason! {0}'.format(ex))
+
+ finally:
+ self.logger.debug("closing consumer")
+ consumer.close()
+ return event_list
+
+
+class TopicConsumer:
+
+ broker = ""
+ logger = logging.getLogger()
+ timeout = 10
+
+ def __init__(self):
+ appConfig = AppConfig()
+ self.logger = appConfig.getLogger()
+ self.broker = appConfig.getKafkaBroker()
+
+ def getTopics(self):
+ try:
+ adminClient = AdminClient({"bootstrap.servers": self.broker})
+ ListTopicsResult = adminClient.list_topics(timeout=self.timeout)
+ topic_list = []
+
+ for key, value in ListTopicsResult.topics.items():
+ topic_list.append(key)
+
+ dict = {'topics': topic_list}
+ return dict
+
+ except Exception as ex:
+ self.logger.debug('Failed to get topic information due to unexpected reason! {0}'.format(ex))
+
+ def listAllTopics(self):
+ try:
+ topic_list = []
+ adminClient = AdminClient({"bootstrap.servers": self.broker})
+ ListTopicsResult = adminClient.list_topics(timeout=self.timeout)
+
+ for key, value in ListTopicsResult.topics.items():
+ dict = {'topicName': key,
+ 'owner': '',
+ 'txenabled': False
+ }
+ topic_list.append(dict)
+
+ dict2 = {'topics': topic_list}
+ return dict2
+ except Exception as ex:
+ self.logger.debug('Failed to get list of topic information due to unexpected reason! {0}'.format(ex))
+
+ def getTopicDetails(self, topic):
+ try:
+ adminClient = AdminClient({"bootstrap.servers": self.broker})
+ ListTopicsResult = adminClient.list_topics(timeout=self.timeout)
+
+ for key, value in ListTopicsResult.topics.items():
+ if (key == topic):
+ dict = {'name': key,
+ 'owner': '',
+ 'description': '',
+ 'readerAcl': {"enabled": True, "users": []},
+ 'writerAcl': {"enabled": True, "users": []}
+ }
+ return dict
+
+ self.logger.debug("Topic {} does not exists! ".format(topic))
+ return "Topic [" + topic + "] does not exists"
+ except Exception as ex:
+ self.logger.debug('Failed to get topic detail due to unexpected reason! {0}'.format(ex))
diff --git a/dmaapadapter/adapter/code/dmaap_adapter.py b/dmaapadapter/adapter/code/dmaap_adapter.py
new file mode 100644
index 0000000..4db2935
--- /dev/null
+++ b/dmaapadapter/adapter/code/dmaap_adapter.py
@@ -0,0 +1,104 @@
+# Copyright 2021 Xoriant Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import flask
+from flask import request
+from consumer import EventConsumer, TopicConsumer
+import json
+
+app = flask.Flask(__name__)
+app.config["DEBUG"] = True
+api_base_url = "/dmaapapi/v1/"
+
+
+@app.route("/")
+def index():
+ return "Welcome !!"
+
+
+@app.route(api_base_url + '/topics', methods=['GET'])
+def get_all_topics():
+ topicConsumer = TopicConsumer()
+ response = app.response_class(response=json.dumps(topicConsumer.getTopics()),
+ status=200,
+ mimetype='application/json')
+ return response
+
+
+@app.route(api_base_url + '/topics/listAll', methods=['GET'])
+def listall_topics():
+ topicConsumer = TopicConsumer()
+ response = app.response_class(response=json.dumps(topicConsumer.listAllTopics()),
+ status=200,
+ mimetype='application/json')
+ return response
+
+
+@app.route(api_base_url + '/topics/<topic>', methods=['GET'])
+def topic_details(topic):
+ assert topic == request.view_args['topic']
+ topicConsumer = TopicConsumer()
+ response = app.response_class(response=json.dumps(topicConsumer.getTopicDetails(topic)),
+ status=200,
+ mimetype='application/json')
+ return response
+
+
+@app.route(api_base_url + '/events/<topic>/<consumergroup>/<consumerid>', methods=['GET'])
+def get_events(topic, consumergroup, consumerid):
+ assert topic == request.view_args['topic']
+ assert consumergroup == request.view_args['consumergroup']
+ assert consumerid == request.view_args['consumerid']
+ limit = ""
+ timeout = ""
+
+ if 'limit' in request.args:
+ limit = request.args['limit']
+ if 'timeout' in request.args:
+ timeout = request.args['timeout']
+
+ eventConsumer = EventConsumer()
+ response = app.response_class(response=json.dumps(
+ eventConsumer.consumeEvents(
+ topic,
+ consumergroup,
+ consumerid,
+ getLimit(limit),
+ getTimeout(timeout))),
+ status=200,
+ mimetype='application/json')
+ return response
+
+
+def getLimit(limit):
+ try:
+ limit = int(limit)
+ except Exception:
+ limit = -1
+ finally:
+ return limit
+
+
+def getTimeout(timeout):
+ try:
+ timeout = int(timeout)
+ except Exception:
+ timeout = 15
+ finally:
+ return timeout
+
+
+if __name__ == '__main__':
+ app.run(debug=True, host='0.0.0.0')
diff --git a/dmaapadapter/adapter/config/adapter.conf b/dmaapadapter/adapter/config/adapter.conf
new file mode 100755
index 0000000..0f86b96
--- /dev/null
+++ b/dmaapadapter/adapter/config/adapter.conf
@@ -0,0 +1,9 @@
+#------------------------------------------------------------------------------
+# This is a config file for the dmaap-adapter.
+#
+
+[default]
+log_file = dmaap.log
+log_level =
+kafka_broker =
+
diff --git a/dmaapadapter/start.sh b/dmaapadapter/start.sh
new file mode 100755
index 0000000..e2cd683
--- /dev/null
+++ b/dmaapadapter/start.sh
@@ -0,0 +1,39 @@
+#!/bin/bash
+# Copyright 2021 Xoriant Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cd /opt/ves
+touch dmaap.log
+
+config_file="adapter/config/adapter.conf"
+
+sed -i -- "s/kafka_broker =/kafka_broker = $kafka_host:$kafka_port/g" \
+ $config_file
+sed -i -- "s/log_level =/log_level = $log_level/g" \
+ $config_file
+
+
+echo; echo $config_file
+cat $config_file
+
+
+if [ "$log_level" != "" ]; then
+ python3 /opt/ves/adapter/code/dmaap_adapter.py \
+ --config /opt/ves/adapter/config/adapter.conf \
+ --section default > /opt/ves/dmaap.log 2>&1
+else
+ python3 /opt/ves/adapter/code/dmaap_adapter.py \
+ --config /opt/ves/adapter/config/adapter.conf \
+ --section default
+fi
diff --git a/docker-compose.yaml b/docker-compose.yaml
index 5ec3fee..6a03f2c 100644
--- a/docker-compose.yaml
+++ b/docker-compose.yaml
@@ -155,6 +155,20 @@
depends_on:
- ves-kafka
- ves-collector
+ ves-dmaap-adapter:
+ container_name: ves-dmaap-adapter
+ build: ./dmaapadapter
+ image: ves-dmaap-adapter
+ networks:
+ - ves-net
+ ports:
+ - 5000:5000
+ environment:
+ kafka_host: "smo-kafka"
+ kafka_port: "29092"
+ log_level: "DEBUG"
+ depends_on:
+ - smo-kafka
ves-elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.11.1
container_name: ves-elasticsearch