blob: e3042e25d49cc411b81e79457d8a2972c501f6fd [file] [log] [blame]
# Copyright 2021 Xoriant Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import flask
from flask import request
from consumer import EventConsumer, TopicConsumer
from prepare_response import PrepareResponse
app = flask.Flask(__name__)
app.config["DEBUG"] = True
api_base_url = "/dmaapapi/v1/"
@app.route("/")
def index():
return "Welcome !!"
@app.route(api_base_url + '/topics', methods=['GET'])
def get_all_topics():
prepareResponse = PrepareResponse()
topicConsumer = TopicConsumer()
topicConsumer.getTopics(prepareResponse)
response = app.response_class(response=prepareResponse.getResponseMsg(),
status=prepareResponse.getResponseCode(),
mimetype='application/json')
return response
@app.route(api_base_url + '/topics/listAll', methods=['GET'])
def listall_topics():
prepareResponse = PrepareResponse()
topicConsumer = TopicConsumer()
topicConsumer.listAllTopics(prepareResponse)
response = app.response_class(response=prepareResponse.getResponseMsg(),
status=prepareResponse.getResponseCode(),
mimetype='application/json')
return response
@app.route(api_base_url + '/topics/<topic>', methods=['GET'])
def topic_details(topic):
assert topic == request.view_args['topic']
prepareResponse = PrepareResponse()
topicConsumer = TopicConsumer()
topicConsumer.getTopicDetails(prepareResponse, topic)
response = app.response_class(response=prepareResponse.getResponseMsg(),
status=prepareResponse.getResponseCode(),
mimetype='application/json')
return response
@app.route(api_base_url + '/events/<topic>/<consumergroup>/<consumerid>', methods=['GET'])
def get_events(topic, consumergroup, consumerid):
assert topic == request.view_args['topic']
assert consumergroup == request.view_args['consumergroup']
assert consumerid == request.view_args['consumerid']
limit = ""
timeout = ""
if 'limit' in request.args:
limit = request.args['limit']
if 'timeout' in request.args:
timeout = request.args['timeout']
prepareResponse = PrepareResponse()
eventConsumer = EventConsumer()
eventConsumer.consumeEvents(prepareResponse, topic, consumergroup, consumerid, getLimit(limit), getTimeout(timeout))
response = app.response_class(response=prepareResponse.getResponseMsg(),
status=prepareResponse.getResponseCode(),
mimetype='application/json')
return response
def getLimit(limit):
try:
limit = int(limit)
except Exception:
limit = -1
finally:
return limit
def getTimeout(timeout):
try:
timeout = int(timeout)
if (timeout < 0):
timeout = 15
except Exception:
timeout = 15
finally:
return timeout
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0')