Merge "Leverage DMaaP adapter to send error code and error message"
diff --git a/dmaapadapter/adapter/code/app_config.py b/dmaapadapter/adapter/code/app_config.py
index fb5be95..7eb2029 100644
--- a/dmaapadapter/adapter/code/app_config.py
+++ b/dmaapadapter/adapter/code/app_config.py
@@ -43,34 +43,43 @@
         config = configparser.ConfigParser()
         config.read(config_file)
 
-        self.kafka_broker = config.get(config_section,
-                                       'kafka_broker',
-                                       vars=overrides)
+        self.kafka_broker = config.get(config_section, 'kafka_broker', vars=overrides)
         log_file = config.get(config_section, 'log_file', vars=overrides)
         log_level = config.get(config_section, 'log_level', vars=overrides)
 
-        handler = logging.handlers.RotatingFileHandler(log_file,
-                                                       maxBytes=1000000,
-                                                       backupCount=10)
-        formatter = logging.Formatter('%(asctime)s %(name)s - '
-                                      '%(levelname)s - %(message)s',
-                                      '%Y-%m-%d %H:%M:%S.%f %z')
-        handler.setFormatter(formatter)
-        self.logger.addHandler(handler)
-
-        # we are going to set the log level
-        if (log_level == 'DEBUG'):
-            self.logger.setLevel(logging.DEBUG)
-        elif (log_level == 'ERROR'):
-            self.logger.setLevel(logging.ERROR)
-        else:
-            self.logger.setLevel(logging.INFO)
-
-        self.logger.info('Log level {} and log file {} : '
-                         .format(log_level, log_file))
+        self.setLogger(log_file, log_level)
 
     def getKafkaBroker(self):
         return self.kafka_broker
 
     def getLogger(self):
         return self.logger
+
+    def setLogger(self, log_file, log_level):
+        rfh = logging.handlers.RotatingFileHandler(
+            filename=log_file,
+            mode='w',
+            maxBytes=1000000,
+            backupCount=10,
+            encoding=None,
+            delay=0
+        )
+
+        logging.basicConfig(
+            format="%(asctime)s %(name)-8s %(levelname)-8s %(message)s",
+            datefmt="%Y-%m-%d %H:%M:%S.%f %z",
+            handlers=[rfh]
+        )
+
+        logger = logging.getLogger("DMaaP")
+
+        # we are going to set the log level
+        if (log_level == 'DEBUG'):
+            logger.setLevel(logging.DEBUG)
+        elif (log_level == 'ERROR'):
+            logger.setLevel(logging.ERROR)
+        else:
+            logger.setLevel(logging.INFO)
+
+        logger.info('Log level {} and log file {} : '.format(log_level, log_file))
+        self.logger = logger
diff --git a/dmaapadapter/adapter/code/consumer.py b/dmaapadapter/adapter/code/consumer.py
index 62d744c..06f0ba6 100644
--- a/dmaapadapter/adapter/code/consumer.py
+++ b/dmaapadapter/adapter/code/consumer.py
@@ -14,10 +14,10 @@
 #
 
 from confluent_kafka.admin import AdminClient
-from confluent_kafka import Consumer
+from confluent_kafka import Consumer, KafkaError
 from app_config import AppConfig
-import logging
 import sys
+import logging.handlers
 
 
 class EventConsumer:
@@ -29,9 +29,9 @@
         self.logger = appConfig.getLogger()
         self.broker = appConfig.getKafkaBroker()
 
-    def consumeEvents(self, topic, consumergroup, consumerid, limit, timeout):
+    def consumeEvents(self, prepareResponse, topic, consumergroup, consumerid, limit, timeout):
         self.logger.debug("topic={}, consumergroup={}, consumerid={}, limit={}, timeout={} "
-                          .format(topic, consumergroup, consumerid, limit, timeout))
+                     .format(topic, consumergroup, consumerid, limit, timeout))
         consumer_config = {
             'bootstrap.servers': self.broker,
             'group.id': consumergroup,
@@ -47,6 +47,7 @@
         try:
             ctr = 0
             content_size = 0
+            response_code = 200
             while True:
                 if (ctr == int(limit)):
                     break
@@ -56,85 +57,103 @@
                 # read single message at a time
                 msg = consumer.poll(timeout=int(timeout))
                 if msg is None:
-                    self.logger.debug("No records ")
+                    self.logger.debug("No new records exists in topic {} of broker {}".format(topic, self.broker))
                     break
                 if msg.error():
+                    if (msg.error().code() == KafkaError.UNKNOWN_TOPIC_OR_PART):
+                        response_code = 409
                     self.logger.debug("Error reading message : {}".format(msg.error()))
                     break
-                event = msg.value().decode('utf8').replace("'", '"')
-                content_size = content_size + sys.getsizeof(event)
-                event_list.append(event)
+
+                content_size = content_size + sys.getsizeof(msg.value().decode('utf8').replace("'", '"'))
+                event_list.append(msg.value().decode('utf8').replace("'", '"'))
                 consumer.commit()
 
+            prepareResponse.setResponseCode(response_code)
+            if (response_code == 409):
+                prepareResponse.setResponseMsg("Unable to read the messages from the topic")
+            else:
+                prepareResponse.setResponseMsg(event_list)
+
         except Exception as ex:
-            self.logger.debug('Failed to get event information due to unexpected reason! {0}'.format(ex))
+            self.logger.error("Failed to get event information due to unexpected reason! {0}".format(ex))
+            prepareResponse.setResponseCode(500)
+            prepareResponse.setResponseMsg("Failed to return the events")
 
         finally:
             self.logger.debug("closing consumer")
             consumer.close()
-            return event_list
 
 
 class TopicConsumer:
 
     broker = ""
-    logger = logging.getLogger()
     timeout = 10
+    logger = logging.getLogger()
 
     def __init__(self):
         appConfig = AppConfig()
         self.logger = appConfig.getLogger()
         self.broker = appConfig.getKafkaBroker()
 
-    def getTopics(self):
+    def getTopics(self, prepareResponse):
         try:
+            topic_list = []
             adminClient = AdminClient({"bootstrap.servers": self.broker})
             ListTopicsResult = adminClient.list_topics(timeout=self.timeout)
-            topic_list = []
 
             for key, value in ListTopicsResult.topics.items():
                 topic_list.append(key)
 
-            dict = {'topics': topic_list}
-            return dict
+            dict = {"topics": topic_list}
+            prepareResponse.setResponseCode(200)
+            prepareResponse.setResponseMsg(dict)
 
         except Exception as ex:
-            self.logger.debug('Failed to get topic information due to unexpected reason! {0}'.format(ex))
+            self.logger.error('Failed to get topic information due to unexpected reason! {0}'.format(ex))
+            prepareResponse.setResponseCode(500)
+            prepareResponse.setResponseMsg("Failed to return the topics")
 
-    def listAllTopics(self):
+    def listAllTopics(self, prepareResponse):
         try:
             topic_list = []
             adminClient = AdminClient({"bootstrap.servers": self.broker})
             ListTopicsResult = adminClient.list_topics(timeout=self.timeout)
 
             for key, value in ListTopicsResult.topics.items():
-                dict = {'topicName': key,
-                        'owner': '',
-                        'txenabled': False
-                        }
+                dict = {"topicName": key, "owner": "", "txenabled": False}
                 topic_list.append(dict)
 
-            dict2 = {'topics': topic_list}
-            return dict2
+            dict2 = {"topics": topic_list}
+            prepareResponse.setResponseCode(200)
+            prepareResponse.setResponseMsg(dict2)
         except Exception as ex:
-            self.logger.debug('Failed to get list of topic information due to unexpected reason! {0}'.format(ex))
+            self.logger.error('Failed to get list of topic information due to unexpected reason! {0}'.format(ex))
+            prepareResponse.setResponseCode(500)
+            prepareResponse.setResponseMsg("Failed to return the topics")
 
-    def getTopicDetails(self, topic):
+    def getTopicDetails(self, prepareResponse, topic):
         try:
             adminClient = AdminClient({"bootstrap.servers": self.broker})
             ListTopicsResult = adminClient.list_topics(timeout=self.timeout)
 
+            topic_exists = False
             for key, value in ListTopicsResult.topics.items():
                 if (key == topic):
-                    dict = {'name': key,
-                            'owner': '',
-                            'description': '',
-                            'readerAcl': {"enabled": True, "users": []},
-                            'writerAcl': {"enabled": True, "users": []}
-                            }
-                    return dict
+                    topic_exists = True
+                    dict = {"name": key,
+                            "owner": "",
+                            "description": "",
+                            "readerAcl": {"enabled": True, "users": []},
+                            "writerAcl": {"enabled": True, "users": []}}
+                    prepareResponse.setResponseCode(200)
+                    prepareResponse.setResponseMsg(dict)
 
-            self.logger.debug("Topic {} does not exists! ".format(topic))
-            return "Topic [" + topic + "] does not exists"
+            if (topic_exists is False):
+                self.logger.debug("Topic '{}' does not exists! ".format(topic))
+                prepareResponse.setResponseCode(404)
+                prepareResponse.setResponseMsg("Topic [" + topic + "] not found")
         except Exception as ex:
-            self.logger.debug('Failed to get topic detail due to unexpected reason! {0}'.format(ex))
+            self.logger.error('Failed to get topic detail due to unexpected reason! {0}'.format(ex))
+            prepareResponse.setResponseCode(500)
+            prepareResponse.setResponseMsg("Failed to return the topics")
diff --git a/dmaapadapter/adapter/code/dmaap_adapter.py b/dmaapadapter/adapter/code/dmaap_adapter.py
index 4db2935..e3042e2 100644
--- a/dmaapadapter/adapter/code/dmaap_adapter.py
+++ b/dmaapadapter/adapter/code/dmaap_adapter.py
@@ -16,7 +16,7 @@
 import flask
 from flask import request
 from consumer import EventConsumer, TopicConsumer
-import json
+from prepare_response import PrepareResponse
 
 app = flask.Flask(__name__)
 app.config["DEBUG"] = True
@@ -30,18 +30,22 @@
 
 @app.route(api_base_url + '/topics', methods=['GET'])
 def get_all_topics():
+    prepareResponse = PrepareResponse()
     topicConsumer = TopicConsumer()
-    response = app.response_class(response=json.dumps(topicConsumer.getTopics()),
-                                  status=200,
+    topicConsumer.getTopics(prepareResponse)
+    response = app.response_class(response=prepareResponse.getResponseMsg(),
+                                  status=prepareResponse.getResponseCode(),
                                   mimetype='application/json')
     return response
 
 
 @app.route(api_base_url + '/topics/listAll', methods=['GET'])
 def listall_topics():
+    prepareResponse = PrepareResponse()
     topicConsumer = TopicConsumer()
-    response = app.response_class(response=json.dumps(topicConsumer.listAllTopics()),
-                                  status=200,
+    topicConsumer.listAllTopics(prepareResponse)
+    response = app.response_class(response=prepareResponse.getResponseMsg(),
+                                  status=prepareResponse.getResponseCode(),
                                   mimetype='application/json')
     return response
 
@@ -49,9 +53,11 @@
 @app.route(api_base_url + '/topics/<topic>', methods=['GET'])
 def topic_details(topic):
     assert topic == request.view_args['topic']
+    prepareResponse = PrepareResponse()
     topicConsumer = TopicConsumer()
-    response = app.response_class(response=json.dumps(topicConsumer.getTopicDetails(topic)),
-                                  status=200,
+    topicConsumer.getTopicDetails(prepareResponse, topic)
+    response = app.response_class(response=prepareResponse.getResponseMsg(),
+                                  status=prepareResponse.getResponseCode(),
                                   mimetype='application/json')
     return response
 
@@ -69,15 +75,11 @@
     if 'timeout' in request.args:
         timeout = request.args['timeout']
 
+    prepareResponse = PrepareResponse()
     eventConsumer = EventConsumer()
-    response = app.response_class(response=json.dumps(
-                                    eventConsumer.consumeEvents(
-                                        topic,
-                                        consumergroup,
-                                        consumerid,
-                                        getLimit(limit),
-                                        getTimeout(timeout))),
-                                  status=200,
+    eventConsumer.consumeEvents(prepareResponse, topic, consumergroup, consumerid, getLimit(limit), getTimeout(timeout))
+    response = app.response_class(response=prepareResponse.getResponseMsg(),
+                                  status=prepareResponse.getResponseCode(),
                                   mimetype='application/json')
     return response
 
@@ -94,6 +96,8 @@
 def getTimeout(timeout):
     try:
         timeout = int(timeout)
+        if (timeout < 0):
+            timeout = 15
     except Exception:
         timeout = 15
     finally:
diff --git a/dmaapadapter/adapter/code/prepare_response.py b/dmaapadapter/adapter/code/prepare_response.py
new file mode 100644
index 0000000..6cce394
--- /dev/null
+++ b/dmaapadapter/adapter/code/prepare_response.py
@@ -0,0 +1,34 @@
+# Copyright 2021 Xoriant Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import json
+
+
+class PrepareResponse:
+
+    responseCode = None
+    responseMsg = ""
+
+    def setResponseCode(self, responseCode):
+        self.responseCode = responseCode
+
+    def getResponseCode(self):
+        return self.responseCode
+
+    def setResponseMsg(self, responseMsg):
+        self.responseMsg = json.dumps(responseMsg)
+
+    def getResponseMsg(self):
+        return self.responseMsg