blob: 59ed7b86d15c1ec9ee7ecdd164b11d65c5015e30 [file] [log] [blame]
# Copyright (c) 2019 AT&T Intellectual Property. #
# #
# Licensed under the Apache License, Version 2.0 (the "License"); #
# you may not use this file except in compliance with the License. #
# You may obtain a copy of the License at #
# #
# http://www.apache.org/licenses/LICENSE-2.0 #
# #
# Unless required by applicable law or agreed to in writing, software #
# distributed under the License is distributed on an "AS IS" BASIS, #
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
# See the License for the specific language governing permissions and #
# limitations under the License. #
################################################################################
# File name: a1-mediator-vth.py #
# Description: vth for a1-mediator-vth service #
# Date created: 08/22/2019 #
# Last modified: 04/02/2020 #
# Python Version: 3.7.4 #
# Author: Jackie Chen (jv246a) #
# Email: jv246a@att.com #
################################################################################
import datetime
import json
import logging
from logging import FileHandler
import os
import requests
from flask import Flask, request, jsonify
# redirect http to https
app = Flask(__name__)
# Prevents print statement every time an endpoint is triggered.
logging.getLogger("werkzeug").setLevel(logging.WARNING)
def sendCallback(url, data):
try:
if type(data) is not dict:
data = {"msg": data}
app.logger.info("sending callback")
requests.post(url, json=data)
except Exception as e:
app.logger.info(e)
return
def unix_time_millis(dt):
epoch = datetime.datetime.utcfromtimestamp(0)
return (dt - epoch).total_seconds() * 1000.0
@app.route("/otf/vth/oran/a1/v1/health", methods=['GET'])
def getHealth():
return "UP"
@app.route("/otf/vth/oran/a1/v1", methods=['POST'])
def executeRicRequest():
response_data = {
'vthResponse': {
'testDuration': '',
'dateTimeUTC': str(datetime.datetime.now()),
'abstractMessage': '',
'resultData': {}
}
}
startTime = unix_time_millis(datetime.datetime.now())
ret_url = request.args.get('retURL')
try:
if not request.is_json:
raise ValueError("request must be json")
requestData = request.get_json()
app.logger.info("A1 requestData:" + str(requestData))
action = requestData['action'].lower()
_check_incoming_request(requestData)
os.environ['NO_PROXY'] = '*'
with open('config.json') as configFile:
config = json.load(configFile)
baseAddress = config['base_address']
if action == 'health_check' or action == 'list_policy':
res = requests.get(baseAddress + config['actions_path'][action])
response_data['vthResponse']['resultData']['statusCode'] = res.status_code
if action == 'health_check':
response_data['vthResponse']['resultData']['resultOutput'] = res.text
else:
response_data['vthResponse']['resultData']['resultOutput'] = res.json()
elif action == 'list_policy_instance':
res = requests.get(baseAddress + config['actions_path'][action]
.format(policy_type_id=requestData['policy_type_id']))
response_data['vthResponse']['resultData']['statusCode'] = res.status_code
response_data['vthResponse']['resultData']['resultOutput'] = res.json()
elif action == 'get_policy_instance_status':
res = requests.get(baseAddress + config['actions_path'][action]
.format(policy_type_id=requestData['policy_type_id'],
policy_instance_id=requestData['policy_instance_id']))
response_data['vthResponse']['resultData']['statusCode'] = res.status_code
response_data['vthResponse']['resultData']['resultOutput'] = res.json()
elif action == 'edit_policy':
res = _send_edit_request(requestData, config)
response_data['vthResponse']['resultData']['statusCode'] = res.status_code
if requestData['request_type'].lower() == 'get' and res.status_code == 200:
response_data['vthResponse']['resultData']['resultOutput'] = res.json()
else:
response_data['vthResponse']['resultData']['resultOutput'] = res.text
elif action == 'edit_policy_instance':
res = _send_edit_request(requestData, config)
response_data['vthResponse']['resultData']['statusCode'] = res.status_code
if requestData['request_type'].lower() == 'get' and res.status_code == 200:
response_data['vthResponse']['resultData']['resultOutput'] = res.json()
else:
response_data['vthResponse']['resultData']['resultOutput'] = res.text
except Exception as ex:
endTime = unix_time_millis(datetime.datetime.now())
totalTime = endTime - startTime
response_data['vthResponse']['testDuration'] = totalTime
response_data['vthResponse']['abstractMessage'] = str(ex)
return jsonify(response_data)
endTime = unix_time_millis(datetime.datetime.now())
totalTime = endTime - startTime
response_data['vthResponse']['testDuration'] = totalTime
if ret_url is not None:
sendCallback(ret_url, response_data)
return '', 200
return jsonify(response_data), 200
def _send_edit_request(request_data, config):
baseAddress = config['base_address']
path = ''
action = request_data['action']
policy_type_id = request_data['policy_type_id']
request_type = request_data['request_type']
if action == "edit_policy":
path = baseAddress + config['actions_path'][action].format(policy_type_id=policy_type_id)
if action == 'edit_policy_instance':
instance_id = request_data['policy_instance_id']
path = baseAddress + config['actions_path'][action].format(policy_type_id=policy_type_id,
policy_instance_id=instance_id)
if request_type == 'get':
return requests.get(path)
if request_type == 'put':
payload = request_data['payload']
return requests.put(path, json=payload)
if request_type == 'delete':
return requests.delete(path)
def _check_incoming_request(requestData): # check if the request is valid
if 'action' not in requestData:
raise KeyError('no action was specify')
action = requestData['action'].lower()
edit_actions = ['edit_policy', 'edit_policy_instance']
requires_policy_id = ['edit_policy', 'list_policy_instance'
, 'edit_policy_instance', 'get_policy_instance_status']
requires_policy_instance_id = ['edit_policy_instance', 'get_policy_instance_status']
possible_actions = ['health_check', 'list_policy', 'edit_policy', 'list_policy_instance'
, 'edit_policy_instance', 'get_policy_instance_status']
possible_request_type = ['get', 'put', 'delete']
if action not in possible_actions:
raise KeyError("invalid action")
if action in edit_actions: # request type is required
if 'request_type' not in requestData:
raise KeyError('this action: ' + action + ' requires a request type')
if requestData['request_type'] not in possible_request_type:
raise KeyError('this request_type: ' + requestData['request_type'] + ' is not valid')
if requestData['request_type'] == 'put' and 'payload' not in requestData:
raise KeyError('put request requires a payload')
if action in requires_policy_id:
if 'policy_type_id' not in requestData:
raise KeyError('this action: ' + action + ' requires a policy_type_id')
if action in requires_policy_instance_id:
if 'policy_instance_id' not in requestData:
raise KeyError('this action: ' + action + ' requires a policy_instance_id')
if __name__ == '__main__':
logHandler = FileHandler('a1-mediator-vth.log', mode='a')
logHandler.setLevel(logging.INFO)
app.logger.setLevel(logging.INFO)
app.logger.addHandler(logHandler)
# context = ('opt/cert/otf.pem', 'opt/cert/privateKey.pem')
# app.run(debug = False, host = '0.0.0.0', port = 5000, ssl_context = context)
app.run(debug=False, host='0.0.0.0', port=5000)