blob: 4b4d8dafe3f2afdb1e30358dd4b605ccc88884c1 [file] [log] [blame]
# ============LICENSE_START===============================================
# Copyright (C) 2020 Nordix Foundation. All rights reserved.
# ========================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============LICENSE_END=================================================
#
from flask import Flask, request, Response
from time import sleep
import time
from datetime import datetime
import json
import traceback
import logging
import socket
from threading import RLock
# Disable all logging of GET on reading counters and db
class AjaxFilter(logging.Filter):
def filter(self, record):
return ("/counter/" not in record.getMessage()) and ("/db" not in record.getMessage())
log = logging.getLogger('werkzeug')
log.addFilter(AjaxFilter())
app = Flask(__name__)
lock = RLock()
# list of callback messages
msg_callbacks={}
# Server info
HOST_IP = "::"
HOST_PORT = 2222
# Metrics vars
cntr_msg_callbacks=0
cntr_msg_fetched=0
cntr_callbacks={}
hosts_set=set()
# Request and response constants
CALLBACK_URL="/callbacks/<string:id>"
CALLBACK_MR_URL="/callbacks-mr/<string:id>" #Json list with string encoded items
APP_READ_URL="/get-event/<string:id>"
APP_READ_ALL_URL="/get-all-events/<string:id>"
DUMP_ALL_URL="/db"
MIME_TEXT="text/plain"
MIME_JSON="application/json"
CAUGHT_EXCEPTION="Caught exception: "
SERVER_ERROR="Server error :"
TIME_STAMP="cr-timestamp"
forced_settings={}
forced_settings['delay']=None
# Remote host lookup and print host name
def remote_host_logging(request):
if request.environ.get('HTTP_X_FORWARDED_FOR') is None:
host_ip=str(request.environ['REMOTE_ADDR'])
else:
host_ip=str(request.environ['HTTP_X_FORWARDED_FOR'])
prefix='::ffff:'
if (host_ip.startswith('::ffff:')):
host_ip=host_ip[len(prefix):]
try:
name, alias, addresslist = socket.gethostbyaddr(host_ip)
print("Calling host: "+str(name))
hosts_set.add(name)
except Exception:
print("Calling host not possible to retrieve IP: "+str(host_ip))
hosts_set.add(host_ip)
#I'm alive function
@app.route('/',
methods=['GET'])
def index():
return 'OK', 200
### Callback interface, for control
# Fetch the oldest callback message for an id
# URI and parameter, (GET): /get-event/<id>
# response: message + 200 or just 204 or just 500(error)
@app.route(APP_READ_URL,
methods=['GET'])
def receiveresponse(id):
global msg_callbacks
global cntr_msg_fetched
with lock:
try:
if ((id in msg_callbacks.keys()) and (len(msg_callbacks[id]) > 0)):
cntr_msg_fetched+=1
cntr_callbacks[id][1]+=1
msg=msg_callbacks[id][0]
print("Fetching msg for id: "+id+", msg="+str(msg))
del msg[TIME_STAMP]
del msg_callbacks[id][0]
return json.dumps(msg),200
print("No messages for id: "+id)
except Exception as e:
print(CAUGHT_EXCEPTION+str(e))
traceback.print_exc()
return "",500
return "",204
# Fetch all callback message for an id in an array
# URI and parameter, (GET): /get-all-events/<id>
# response: message + 200 or just 500(error)
@app.route(APP_READ_ALL_URL,
methods=['GET'])
def receiveresponse_all(id):
global msg_callbacks
global cntr_msg_fetched
with lock:
try:
if ((id in msg_callbacks.keys()) and (len(msg_callbacks[id]) > 0)):
cntr_msg_fetched+=len(msg_callbacks[id])
cntr_callbacks[id][1]+=len(msg_callbacks[id])
msg=msg_callbacks[id]
print("Fetching all msgs for id: "+id+", msg="+str(msg))
for sub_msg in msg:
del sub_msg[TIME_STAMP]
del msg_callbacks[id]
return json.dumps(msg),200
print("No messages for id: "+id)
except Exception as e:
print(CAUGHT_EXCEPTION+str(e))
traceback.print_exc()
return "",500
msg=[]
return json.dumps(msg),200
# Receive a callback message
# URI and payload, (PUT or POST): /callbacks/<id> <json messages>
# response: OK 200 or 500 for other errors
@app.route(CALLBACK_URL,
methods=['PUT','POST'])
def events_write(id):
global msg_callbacks
global cntr_msg_callbacks
try:
print("Received callback for id: "+id +", content-type="+request.content_type)
remote_host_logging(request)
print("raw data: str(request.data): "+str(request.data))
do_delay()
try:
if (request.content_type == MIME_JSON):
data = request.data
msg = json.loads(data)
print("Payload(json): "+str(msg))
else:
msg={}
print("Payload(content-type="+request.content_type+"). Setting empty json as payload")
except Exception as e:
msg={}
print("(Exception) Payload does not contain any json, setting empty json as payload")
traceback.print_exc()
with lock:
cntr_msg_callbacks += 1
msg[TIME_STAMP]=str(datetime.now())
if (id in msg_callbacks.keys()):
msg_callbacks[id].append(msg)
else:
msg_callbacks[id]=[]
msg_callbacks[id].append(msg)
if (id in cntr_callbacks.keys()):
cntr_callbacks[id][0] += 1
else:
cntr_callbacks[id]=[]
cntr_callbacks[id].append(1)
cntr_callbacks[id].append(0)
except Exception as e:
print(CAUGHT_EXCEPTION+str(e))
traceback.print_exc()
return 'NOTOK',500
return 'OK',200
# Receive a json callback message with payload fromatted accoirding to output frm the message router
# URI and payload, (PUT or POST): /callbacks/<id> <json messages>
# json is a list of string encoded json items
# response: OK 200 or 500 for other errors
@app.route(CALLBACK_MR_URL,
methods=['PUT','POST'])
def events_write_mr(id):
global msg_callbacks
global cntr_msg_callbacks
try:
print("Received callback (mr) for id: "+id +", content-type="+request.content_type)
remote_host_logging(request)
print("raw data: str(request.data): "+str(request.data))
do_delay()
try:
#if (request.content_type == MIME_JSON):
if (MIME_JSON in request.content_type):
data = request.data
msg_list = json.loads(data)
print("Payload(json): "+str(msg_list))
else:
msg_list=[]
print("Payload(content-type="+request.content_type+"). Setting empty json as payload")
except Exception as e:
msg_list=[]
print("(Exception) Payload does not contain any json, setting empty json as payload")
traceback.print_exc()
with lock:
remote_host_logging(request)
for msg in msg_list:
print("msg (str): "+str(msg))
msg=json.loads(msg)
print("msg (json): "+str(msg))
cntr_msg_callbacks += 1
msg[TIME_STAMP]=str(datetime.now())
if (id in msg_callbacks.keys()):
msg_callbacks[id].append(msg)
else:
msg_callbacks[id]=[]
msg_callbacks[id].append(msg)
if (id in cntr_callbacks.keys()):
cntr_callbacks[id][0] += 1
else:
cntr_callbacks[id]=[]
cntr_callbacks[id].append(1)
cntr_callbacks[id].append(0)
except Exception as e:
print(CAUGHT_EXCEPTION+str(e))
traceback.print_exc()
return 'NOTOK',500
return 'OK',200
### Functions for test ###
# Dump the whole db of current callbacks
# URI and parameter, (GET): /db
# response: message + 200
@app.route(DUMP_ALL_URL,
methods=['GET'])
def dump_db():
return json.dumps(msg_callbacks),200
### Functions for metrics read out ###
@app.route('/counter/received_callbacks',
methods=['GET'])
def requests_submitted():
req_id = request.args.get('id')
if (req_id is None):
return Response(str(cntr_msg_callbacks), status=200, mimetype=MIME_TEXT)
if (req_id in cntr_callbacks.keys()):
return Response(str(cntr_callbacks[req_id][0]), status=200, mimetype=MIME_TEXT)
else:
return Response(str("0"), status=200, mimetype=MIME_TEXT)
@app.route('/counter/fetched_callbacks',
methods=['GET'])
def requests_fetched():
req_id = request.args.get('id')
if (req_id is None):
return Response(str(cntr_msg_fetched), status=200, mimetype=MIME_TEXT)
if (req_id in cntr_callbacks.keys()):
return Response(str(cntr_callbacks[req_id][1]), status=200, mimetype=MIME_TEXT)
else:
return Response(str("0"), status=200, mimetype=MIME_TEXT)
@app.route('/counter/current_messages',
methods=['GET'])
def current_messages():
req_id = request.args.get('id')
if (req_id is None):
return Response(str(cntr_msg_callbacks-cntr_msg_fetched), status=200, mimetype=MIME_TEXT)
if (req_id in cntr_callbacks.keys()):
return Response(str(cntr_callbacks[req_id][0]-cntr_callbacks[req_id][1]), status=200, mimetype=MIME_TEXT)
else:
return Response(str("0"), status=200, mimetype=MIME_TEXT)
@app.route('/counter/remote_hosts',
methods=['GET'])
def remote_hosts():
global hosts_set
hosts=",".join(hosts_set)
return Response(str(hosts), status=200, mimetype=MIME_TEXT)
#Set force delay response, in seconds, for all callbacks
#/froceesponse?delay=<seconds>
@app.route('/forcedelay', methods=['POST'])
def forcedelay():
try:
forced_settings['delay']=int(request.args.get('delay'))
except Exception:
forced_settings['delay']=None
return Response("Force delay: " + str(forced_settings['delay']) + " sec set for all callback responses", 200, mimetype=MIME_TEXT)
# Helper: Delay if delayed response code is set
def do_delay():
if (forced_settings['delay'] is not None):
try:
val=int(forced_settings['delay'])
if (val < 1):
return Response("Force delay too short: " + str(forced_settings['delay']) + " sec", 500, mimetype=MIME_TEXT)
print("Delaying "+str(val)+ " sec.")
time.sleep(val)
except Exception:
return Response("Force delay : " + str(forced_settings['delay']) + " sec failed", 500, mimetype=MIME_TEXT)
### Admin ###
# Reset all messsages and counters
@app.route('/reset',
methods=['GET', 'POST', 'PUT'])
def reset():
global msg_callbacks
global cntr_msg_fetched
global cntr_msg_callbacks
global cntr_callbacks
global forced_settings
with lock:
msg_callbacks={}
cntr_msg_fetched=0
cntr_msg_callbacks=0
cntr_callbacks={}
forced_settings['delay']=None
return Response('OK', status=200, mimetype=MIME_TEXT)
### Main function ###
if __name__ == "__main__":
app.run(port=HOST_PORT, host=HOST_IP)