blob: 4db2935e826b0cb7096a996a30e2e348593222ed [file] [log] [blame]
# Copyright 2021 Xoriant Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import flask
from flask import request
from consumer import EventConsumer, TopicConsumer
import json
app = flask.Flask(__name__)
app.config["DEBUG"] = True
api_base_url = "/dmaapapi/v1/"
@app.route("/")
def index():
return "Welcome !!"
@app.route(api_base_url + '/topics', methods=['GET'])
def get_all_topics():
topicConsumer = TopicConsumer()
response = app.response_class(response=json.dumps(topicConsumer.getTopics()),
status=200,
mimetype='application/json')
return response
@app.route(api_base_url + '/topics/listAll', methods=['GET'])
def listall_topics():
topicConsumer = TopicConsumer()
response = app.response_class(response=json.dumps(topicConsumer.listAllTopics()),
status=200,
mimetype='application/json')
return response
@app.route(api_base_url + '/topics/<topic>', methods=['GET'])
def topic_details(topic):
assert topic == request.view_args['topic']
topicConsumer = TopicConsumer()
response = app.response_class(response=json.dumps(topicConsumer.getTopicDetails(topic)),
status=200,
mimetype='application/json')
return response
@app.route(api_base_url + '/events/<topic>/<consumergroup>/<consumerid>', methods=['GET'])
def get_events(topic, consumergroup, consumerid):
assert topic == request.view_args['topic']
assert consumergroup == request.view_args['consumergroup']
assert consumerid == request.view_args['consumerid']
limit = ""
timeout = ""
if 'limit' in request.args:
limit = request.args['limit']
if 'timeout' in request.args:
timeout = request.args['timeout']
eventConsumer = EventConsumer()
response = app.response_class(response=json.dumps(
eventConsumer.consumeEvents(
topic,
consumergroup,
consumerid,
getLimit(limit),
getTimeout(timeout))),
status=200,
mimetype='application/json')
return response
def getLimit(limit):
try:
limit = int(limit)
except Exception:
limit = -1
finally:
return limit
def getTimeout(timeout):
try:
timeout = int(timeout)
except Exception:
timeout = 15
finally:
return timeout
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0')