Merge "Leverage DMaaP adapter to send error code and error message"
diff --git a/dmaapadapter/adapter/code/app_config.py b/dmaapadapter/adapter/code/app_config.py
index fb5be95..7eb2029 100644
--- a/dmaapadapter/adapter/code/app_config.py
+++ b/dmaapadapter/adapter/code/app_config.py
@@ -43,34 +43,43 @@
config = configparser.ConfigParser()
config.read(config_file)
- self.kafka_broker = config.get(config_section,
- 'kafka_broker',
- vars=overrides)
+ self.kafka_broker = config.get(config_section, 'kafka_broker', vars=overrides)
log_file = config.get(config_section, 'log_file', vars=overrides)
log_level = config.get(config_section, 'log_level', vars=overrides)
- handler = logging.handlers.RotatingFileHandler(log_file,
- maxBytes=1000000,
- backupCount=10)
- formatter = logging.Formatter('%(asctime)s %(name)s - '
- '%(levelname)s - %(message)s',
- '%Y-%m-%d %H:%M:%S.%f %z')
- handler.setFormatter(formatter)
- self.logger.addHandler(handler)
-
- # we are going to set the log level
- if (log_level == 'DEBUG'):
- self.logger.setLevel(logging.DEBUG)
- elif (log_level == 'ERROR'):
- self.logger.setLevel(logging.ERROR)
- else:
- self.logger.setLevel(logging.INFO)
-
- self.logger.info('Log level {} and log file {} : '
- .format(log_level, log_file))
+ self.setLogger(log_file, log_level)
def getKafkaBroker(self):
return self.kafka_broker
def getLogger(self):
return self.logger
+
+ def setLogger(self, log_file, log_level):
+ rfh = logging.handlers.RotatingFileHandler(
+ filename=log_file,
+ mode='w',
+ maxBytes=1000000,
+ backupCount=10,
+ encoding=None,
+ delay=0
+ )
+
+ logging.basicConfig(
+ format="%(asctime)s %(name)-8s %(levelname)-8s %(message)s",
+ datefmt="%Y-%m-%d %H:%M:%S.%f %z",
+ handlers=[rfh]
+ )
+
+ logger = logging.getLogger("DMaaP")
+
+ # we are going to set the log level
+ if (log_level == 'DEBUG'):
+ logger.setLevel(logging.DEBUG)
+ elif (log_level == 'ERROR'):
+ logger.setLevel(logging.ERROR)
+ else:
+ logger.setLevel(logging.INFO)
+
+ logger.info('Log level {} and log file {} : '.format(log_level, log_file))
+ self.logger = logger
diff --git a/dmaapadapter/adapter/code/consumer.py b/dmaapadapter/adapter/code/consumer.py
index 62d744c..06f0ba6 100644
--- a/dmaapadapter/adapter/code/consumer.py
+++ b/dmaapadapter/adapter/code/consumer.py
@@ -14,10 +14,10 @@
#
from confluent_kafka.admin import AdminClient
-from confluent_kafka import Consumer
+from confluent_kafka import Consumer, KafkaError
from app_config import AppConfig
-import logging
import sys
+import logging.handlers
class EventConsumer:
@@ -29,9 +29,9 @@
self.logger = appConfig.getLogger()
self.broker = appConfig.getKafkaBroker()
- def consumeEvents(self, topic, consumergroup, consumerid, limit, timeout):
+ def consumeEvents(self, prepareResponse, topic, consumergroup, consumerid, limit, timeout):
self.logger.debug("topic={}, consumergroup={}, consumerid={}, limit={}, timeout={} "
- .format(topic, consumergroup, consumerid, limit, timeout))
+ .format(topic, consumergroup, consumerid, limit, timeout))
consumer_config = {
'bootstrap.servers': self.broker,
'group.id': consumergroup,
@@ -47,6 +47,7 @@
try:
ctr = 0
content_size = 0
+ response_code = 200
while True:
if (ctr == int(limit)):
break
@@ -56,85 +57,103 @@
# read single message at a time
msg = consumer.poll(timeout=int(timeout))
if msg is None:
- self.logger.debug("No records ")
+ self.logger.debug("No new records exists in topic {} of broker {}".format(topic, self.broker))
break
if msg.error():
+ if (msg.error().code() == KafkaError.UNKNOWN_TOPIC_OR_PART):
+ response_code = 409
self.logger.debug("Error reading message : {}".format(msg.error()))
break
- event = msg.value().decode('utf8').replace("'", '"')
- content_size = content_size + sys.getsizeof(event)
- event_list.append(event)
+
+ content_size = content_size + sys.getsizeof(msg.value().decode('utf8').replace("'", '"'))
+ event_list.append(msg.value().decode('utf8').replace("'", '"'))
consumer.commit()
+ prepareResponse.setResponseCode(response_code)
+ if (response_code == 409):
+ prepareResponse.setResponseMsg("Unable to read the messages from the topic")
+ else:
+ prepareResponse.setResponseMsg(event_list)
+
except Exception as ex:
- self.logger.debug('Failed to get event information due to unexpected reason! {0}'.format(ex))
+ self.logger.error("Failed to get event information due to unexpected reason! {0}".format(ex))
+ prepareResponse.setResponseCode(500)
+ prepareResponse.setResponseMsg("Failed to return the events")
finally:
self.logger.debug("closing consumer")
consumer.close()
- return event_list
class TopicConsumer:
broker = ""
- logger = logging.getLogger()
timeout = 10
+ logger = logging.getLogger()
def __init__(self):
appConfig = AppConfig()
self.logger = appConfig.getLogger()
self.broker = appConfig.getKafkaBroker()
- def getTopics(self):
+ def getTopics(self, prepareResponse):
try:
+ topic_list = []
adminClient = AdminClient({"bootstrap.servers": self.broker})
ListTopicsResult = adminClient.list_topics(timeout=self.timeout)
- topic_list = []
for key, value in ListTopicsResult.topics.items():
topic_list.append(key)
- dict = {'topics': topic_list}
- return dict
+ dict = {"topics": topic_list}
+ prepareResponse.setResponseCode(200)
+ prepareResponse.setResponseMsg(dict)
except Exception as ex:
- self.logger.debug('Failed to get topic information due to unexpected reason! {0}'.format(ex))
+ self.logger.error('Failed to get topic information due to unexpected reason! {0}'.format(ex))
+ prepareResponse.setResponseCode(500)
+ prepareResponse.setResponseMsg("Failed to return the topics")
- def listAllTopics(self):
+ def listAllTopics(self, prepareResponse):
try:
topic_list = []
adminClient = AdminClient({"bootstrap.servers": self.broker})
ListTopicsResult = adminClient.list_topics(timeout=self.timeout)
for key, value in ListTopicsResult.topics.items():
- dict = {'topicName': key,
- 'owner': '',
- 'txenabled': False
- }
+ dict = {"topicName": key, "owner": "", "txenabled": False}
topic_list.append(dict)
- dict2 = {'topics': topic_list}
- return dict2
+ dict2 = {"topics": topic_list}
+ prepareResponse.setResponseCode(200)
+ prepareResponse.setResponseMsg(dict2)
except Exception as ex:
- self.logger.debug('Failed to get list of topic information due to unexpected reason! {0}'.format(ex))
+ self.logger.error('Failed to get list of topic information due to unexpected reason! {0}'.format(ex))
+ prepareResponse.setResponseCode(500)
+ prepareResponse.setResponseMsg("Failed to return the topics")
- def getTopicDetails(self, topic):
+ def getTopicDetails(self, prepareResponse, topic):
try:
adminClient = AdminClient({"bootstrap.servers": self.broker})
ListTopicsResult = adminClient.list_topics(timeout=self.timeout)
+ topic_exists = False
for key, value in ListTopicsResult.topics.items():
if (key == topic):
- dict = {'name': key,
- 'owner': '',
- 'description': '',
- 'readerAcl': {"enabled": True, "users": []},
- 'writerAcl': {"enabled": True, "users": []}
- }
- return dict
+ topic_exists = True
+ dict = {"name": key,
+ "owner": "",
+ "description": "",
+ "readerAcl": {"enabled": True, "users": []},
+ "writerAcl": {"enabled": True, "users": []}}
+ prepareResponse.setResponseCode(200)
+ prepareResponse.setResponseMsg(dict)
- self.logger.debug("Topic {} does not exists! ".format(topic))
- return "Topic [" + topic + "] does not exists"
+ if (topic_exists is False):
+ self.logger.debug("Topic '{}' does not exists! ".format(topic))
+ prepareResponse.setResponseCode(404)
+ prepareResponse.setResponseMsg("Topic [" + topic + "] not found")
except Exception as ex:
- self.logger.debug('Failed to get topic detail due to unexpected reason! {0}'.format(ex))
+ self.logger.error('Failed to get topic detail due to unexpected reason! {0}'.format(ex))
+ prepareResponse.setResponseCode(500)
+ prepareResponse.setResponseMsg("Failed to return the topics")
diff --git a/dmaapadapter/adapter/code/dmaap_adapter.py b/dmaapadapter/adapter/code/dmaap_adapter.py
index 4db2935..e3042e2 100644
--- a/dmaapadapter/adapter/code/dmaap_adapter.py
+++ b/dmaapadapter/adapter/code/dmaap_adapter.py
@@ -16,7 +16,7 @@
import flask
from flask import request
from consumer import EventConsumer, TopicConsumer
-import json
+from prepare_response import PrepareResponse
app = flask.Flask(__name__)
app.config["DEBUG"] = True
@@ -30,18 +30,22 @@
@app.route(api_base_url + '/topics', methods=['GET'])
def get_all_topics():
+ prepareResponse = PrepareResponse()
topicConsumer = TopicConsumer()
- response = app.response_class(response=json.dumps(topicConsumer.getTopics()),
- status=200,
+ topicConsumer.getTopics(prepareResponse)
+ response = app.response_class(response=prepareResponse.getResponseMsg(),
+ status=prepareResponse.getResponseCode(),
mimetype='application/json')
return response
@app.route(api_base_url + '/topics/listAll', methods=['GET'])
def listall_topics():
+ prepareResponse = PrepareResponse()
topicConsumer = TopicConsumer()
- response = app.response_class(response=json.dumps(topicConsumer.listAllTopics()),
- status=200,
+ topicConsumer.listAllTopics(prepareResponse)
+ response = app.response_class(response=prepareResponse.getResponseMsg(),
+ status=prepareResponse.getResponseCode(),
mimetype='application/json')
return response
@@ -49,9 +53,11 @@
@app.route(api_base_url + '/topics/<topic>', methods=['GET'])
def topic_details(topic):
assert topic == request.view_args['topic']
+ prepareResponse = PrepareResponse()
topicConsumer = TopicConsumer()
- response = app.response_class(response=json.dumps(topicConsumer.getTopicDetails(topic)),
- status=200,
+ topicConsumer.getTopicDetails(prepareResponse, topic)
+ response = app.response_class(response=prepareResponse.getResponseMsg(),
+ status=prepareResponse.getResponseCode(),
mimetype='application/json')
return response
@@ -69,15 +75,11 @@
if 'timeout' in request.args:
timeout = request.args['timeout']
+ prepareResponse = PrepareResponse()
eventConsumer = EventConsumer()
- response = app.response_class(response=json.dumps(
- eventConsumer.consumeEvents(
- topic,
- consumergroup,
- consumerid,
- getLimit(limit),
- getTimeout(timeout))),
- status=200,
+ eventConsumer.consumeEvents(prepareResponse, topic, consumergroup, consumerid, getLimit(limit), getTimeout(timeout))
+ response = app.response_class(response=prepareResponse.getResponseMsg(),
+ status=prepareResponse.getResponseCode(),
mimetype='application/json')
return response
@@ -94,6 +96,8 @@
def getTimeout(timeout):
try:
timeout = int(timeout)
+ if (timeout < 0):
+ timeout = 15
except Exception:
timeout = 15
finally:
diff --git a/dmaapadapter/adapter/code/prepare_response.py b/dmaapadapter/adapter/code/prepare_response.py
new file mode 100644
index 0000000..6cce394
--- /dev/null
+++ b/dmaapadapter/adapter/code/prepare_response.py
@@ -0,0 +1,34 @@
+# Copyright 2021 Xoriant Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import json
+
+
+class PrepareResponse:
+
+ responseCode = None
+ responseMsg = ""
+
+ def setResponseCode(self, responseCode):
+ self.responseCode = responseCode
+
+ def getResponseCode(self):
+ return self.responseCode
+
+ def setResponseMsg(self, responseMsg):
+ self.responseMsg = json.dumps(responseMsg)
+
+ def getResponseMsg(self):
+ return self.responseMsg