| |
| # ============LICENSE_START=============================================== |
| # Copyright (C) 2020 Nordix Foundation. All rights reserved. |
| # ======================================================================== |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # ============LICENSE_END================================================= |
| # |
| |
| from flask import Flask, request, Response |
| from time import sleep |
| import time |
| from datetime import datetime |
| import json |
| import traceback |
| import logging |
| import socket |
| from threading import RLock |
| from hashlib import md5 |
| |
| # Disable all logging of GET on reading counters and db |
| class AjaxFilter(logging.Filter): |
| def filter(self, record): |
| return ("/counter/" not in record.getMessage()) and ("/db" not in record.getMessage()) |
| |
| log = logging.getLogger('werkzeug') |
| log.addFilter(AjaxFilter()) |
| |
| app = Flask(__name__) |
| |
| lock = RLock() |
| |
| # list of callback messages |
| msg_callbacks={} |
| |
| # Server info |
| HOST_IP = "::" |
| HOST_PORT = 2222 |
| |
| # Metrics vars |
| cntr_msg_callbacks=0 |
| cntr_msg_fetched=0 |
| cntr_callbacks={} |
| hosts_set=set() |
| |
| # Request and response constants |
| CALLBACK_URL="/callbacks/<string:id>" |
| CALLBACK_MR_URL="/callbacks-mr/<string:id>" #Json list with string encoded items |
| CALLBACK_TEXT_URL="/callbacks-text/<string:id>" # Callback for string of text |
| APP_READ_URL="/get-event/<string:id>" |
| APP_READ_ALL_URL="/get-all-events/<string:id>" |
| DUMP_ALL_URL="/db" |
| |
| MIME_TEXT="text/plain" |
| MIME_JSON="application/json" |
| CAUGHT_EXCEPTION="Caught exception: " |
| SERVER_ERROR="Server error :" |
| TIME_STAMP="cr-timestamp" |
| |
| forced_settings={} |
| forced_settings['delay']=None |
| |
| |
| # Remote host lookup and print host name |
| def remote_host_logging(request): |
| |
| if request.environ.get('HTTP_X_FORWARDED_FOR') is None: |
| host_ip=str(request.environ['REMOTE_ADDR']) |
| else: |
| host_ip=str(request.environ['HTTP_X_FORWARDED_FOR']) |
| prefix='::ffff:' |
| if (host_ip.startswith('::ffff:')): |
| host_ip=host_ip[len(prefix):] |
| try: |
| name, alias, addresslist = socket.gethostbyaddr(host_ip) |
| print("Calling host: "+str(name)) |
| hosts_set.add(name) |
| except Exception: |
| print("Calling host not possible to retrieve IP: "+str(host_ip)) |
| hosts_set.add(host_ip) |
| |
| |
| #I'm alive function |
| @app.route('/', |
| methods=['GET']) |
| def index(): |
| return 'OK', 200 |
| |
| ### Callback interface, for control |
| |
| # Fetch the oldest callback message for an id |
| # URI and parameter, (GET): /get-event/<id> |
| # response: message + 200 or just 204 or just 500(error) |
| @app.route(APP_READ_URL, |
| methods=['GET']) |
| def receiveresponse(id): |
| global msg_callbacks |
| global cntr_msg_fetched |
| |
| with lock: |
| try: |
| if ((id in msg_callbacks.keys()) and (len(msg_callbacks[id]) > 0)): |
| cntr_msg_fetched+=1 |
| cntr_callbacks[id][1]+=1 |
| msg=msg_callbacks[id][0] |
| print("Fetching msg for id: "+id+", msg="+str(msg)) |
| |
| if (isinstance(msg,dict)): |
| del msg[TIME_STAMP] |
| if ("md5" in msg.keys()): |
| print("EXTRACTED MD5") |
| msg=msg["md5"] |
| print("MD5: "+str(msg)) |
| |
| del msg_callbacks[id][0] |
| return json.dumps(msg),200 |
| print("No messages for id: "+id) |
| except Exception as e: |
| print(CAUGHT_EXCEPTION+str(e)) |
| traceback.print_exc() |
| return "",500 |
| |
| return "",204 |
| |
| # Fetch all callback message for an id in an array |
| # URI and parameter, (GET): /get-all-events/<id> |
| # response: message + 200 or just 500(error) |
| @app.route(APP_READ_ALL_URL, |
| methods=['GET']) |
| def receiveresponse_all(id): |
| global msg_callbacks |
| global cntr_msg_fetched |
| |
| with lock: |
| try: |
| if ((id in msg_callbacks.keys()) and (len(msg_callbacks[id]) > 0)): |
| cntr_msg_fetched+=len(msg_callbacks[id]) |
| cntr_callbacks[id][1]+=len(msg_callbacks[id]) |
| msg=msg_callbacks[id] |
| print("Fetching all msgs for id: "+id+", msg="+str(msg)) |
| for sub_msg in msg: |
| if (isinstance(sub_msg, dict)): |
| del sub_msg[TIME_STAMP] |
| del msg_callbacks[id] |
| return json.dumps(msg),200 |
| print("No messages for id: "+id) |
| except Exception as e: |
| print(CAUGHT_EXCEPTION+str(e)) |
| traceback.print_exc() |
| return "",500 |
| |
| msg=[] |
| return json.dumps(msg),200 |
| |
| # Receive a callback message |
| # URI and payload, (PUT or POST): /callbacks/<id> <json messages> |
| # response: OK 200 or 500 for other errors |
| @app.route(CALLBACK_URL, |
| methods=['PUT','POST']) |
| def events_write(id): |
| global msg_callbacks |
| global cntr_msg_callbacks |
| |
| try: |
| print("Received callback for id: "+id +", content-type="+request.content_type) |
| remote_host_logging(request) |
| print("raw data: str(request.data): "+str(request.data)) |
| do_delay() |
| try: |
| if (request.content_type == MIME_JSON): |
| data = request.data |
| msg = json.loads(data) |
| print("Payload(json): "+str(msg)) |
| else: |
| msg={} |
| print("Payload(content-type="+request.content_type+"). Setting empty json as payload") |
| except Exception as e: |
| msg={} |
| print("(Exception) Payload does not contain any json, setting empty json as payload") |
| traceback.print_exc() |
| |
| with lock: |
| cntr_msg_callbacks += 1 |
| if (isinstance(msg, dict)): |
| msg[TIME_STAMP]=str(datetime.now()) |
| if (id in msg_callbacks.keys()): |
| msg_callbacks[id].append(msg) |
| else: |
| msg_callbacks[id]=[] |
| msg_callbacks[id].append(msg) |
| |
| if (id in cntr_callbacks.keys()): |
| cntr_callbacks[id][0] += 1 |
| else: |
| cntr_callbacks[id]=[] |
| cntr_callbacks[id].append(1) |
| cntr_callbacks[id].append(0) |
| |
| except Exception as e: |
| print(CAUGHT_EXCEPTION+str(e)) |
| traceback.print_exc() |
| return 'NOTOK',500 |
| |
| return 'OK',200 |
| |
| |
| # Receive a json callback message with payload formatted according to output from the message router |
| # Array of stringified json objects |
| # URI and payload, (PUT or POST): /callbacks-mr/<id> <json messages> |
| # json is a list of string encoded json items |
| # response: OK 200 or 500 for other errors |
| @app.route(CALLBACK_MR_URL, |
| methods=['PUT','POST']) |
| def events_write_mr(id): |
| global msg_callbacks |
| global cntr_msg_callbacks |
| |
| storeas=request.args.get('storeas') #If set, store payload as a md5 hascode and dont log the payload |
| #Large payloads will otherwise overload the server |
| try: |
| print("Received callback (mr) for id: "+id +", content-type="+request.content_type) |
| print("raw data: str(request.data): "+str(request.data)) |
| if (storeas is None): |
| print("raw data: str(request.data): "+str(request.data)) |
| do_delay() |
| try: |
| #if (request.content_type == MIME_JSON): |
| if (MIME_JSON in request.content_type): |
| data = request.data |
| msg_list = json.loads(data) |
| if (storeas is None): |
| print("Payload(json): "+str(msg_list)) |
| else: |
| msg_list=[] |
| print("Payload(content-type="+request.content_type+"). Setting empty json as payload") |
| except Exception as e: |
| msg_list=[] |
| print("(Exception) Payload does not contain any json, setting empty json as payload") |
| traceback.print_exc() |
| |
| with lock: |
| remote_host_logging(request) |
| for msg in msg_list: |
| if (storeas is None): |
| msg=json.loads(msg) |
| else: |
| #Convert to compact json without ws between parameter and value... |
| #It seem that ws is added somewhere along to way to this server |
| msg=json.loads(msg) |
| msg=json.dumps(msg, separators=(',', ':')) |
| |
| md5msg={} |
| md5msg["md5"]=md5(msg.encode('utf-8')).hexdigest() |
| msg=md5msg |
| print("msg (json converted to md5 hash): "+str(msg["md5"])) |
| cntr_msg_callbacks += 1 |
| if (isinstance(msg, dict)): |
| msg[TIME_STAMP]=str(datetime.now()) |
| if (id in msg_callbacks.keys()): |
| msg_callbacks[id].append(msg) |
| else: |
| msg_callbacks[id]=[] |
| msg_callbacks[id].append(msg) |
| |
| if (id in cntr_callbacks.keys()): |
| cntr_callbacks[id][0] += 1 |
| else: |
| cntr_callbacks[id]=[] |
| cntr_callbacks[id].append(1) |
| cntr_callbacks[id].append(0) |
| |
| except Exception as e: |
| print(CAUGHT_EXCEPTION+str(e)) |
| traceback.print_exc() |
| return 'NOTOK',500 |
| |
| return 'OK',200 |
| |
| # Receive a callback message of a single text message (content type ignored) |
| # or a json array of strings (content type json) |
| # URI and payload, (PUT or POST): /callbacks-text/<id> <text message> |
| # response: OK 200 or 500 for other errors |
| @app.route(CALLBACK_TEXT_URL, |
| methods=['PUT','POST']) |
| def events_write_text(id): |
| global msg_callbacks |
| global cntr_msg_callbacks |
| |
| storeas=request.args.get('storeas') #If set, store payload as a md5 hascode and dont log the payload |
| #Large payloads will otherwise overload the server |
| try: |
| print("Received callback for id: "+id +", content-type="+request.content_type) |
| remote_host_logging(request) |
| if (storeas is None): |
| print("raw data: str(request.data): "+str(request.data)) |
| do_delay() |
| |
| try: |
| msg_list=None |
| if (MIME_JSON in request.content_type): #Json array of strings |
| msg_list=json.loads(request.data) |
| else: |
| data=request.data.decode("utf-8") #Assuming string |
| msg_list=[] |
| msg_list.append(data) |
| |
| for msg in msg_list: |
| if (storeas == "md5"): |
| md5msg={} |
| print("msg: "+str(msg)) |
| print("msg (endcode str): "+str(msg.encode('utf-8'))) |
| md5msg["md5"]=md5(msg.encode('utf-8')).hexdigest() |
| msg=md5msg |
| print("msg (data converted to md5 hash): "+str(msg["md5"])) |
| |
| if (isinstance(msg, dict)): |
| msg[TIME_STAMP]=str(datetime.now()) |
| |
| with lock: |
| cntr_msg_callbacks += 1 |
| if (id in msg_callbacks.keys()): |
| msg_callbacks[id].append(msg) |
| else: |
| msg_callbacks[id]=[] |
| msg_callbacks[id].append(msg) |
| |
| if (id in cntr_callbacks.keys()): |
| cntr_callbacks[id][0] += 1 |
| else: |
| cntr_callbacks[id]=[] |
| cntr_callbacks[id].append(1) |
| cntr_callbacks[id].append(0) |
| except Exception as e: |
| print(CAUGHT_EXCEPTION+str(e)) |
| traceback.print_exc() |
| return 'NOTOK',500 |
| |
| |
| except Exception as e: |
| print(CAUGHT_EXCEPTION+str(e)) |
| traceback.print_exc() |
| return 'NOTOK',500 |
| |
| return 'OK',200 |
| |
| ### Functions for test ### |
| |
| # Dump the whole db of current callbacks |
| # URI and parameter, (GET): /db |
| # response: message + 200 |
| @app.route(DUMP_ALL_URL, |
| methods=['GET']) |
| def dump_db(): |
| return json.dumps(msg_callbacks),200 |
| |
| ### Functions for metrics read out ### |
| |
| @app.route('/counter/received_callbacks', |
| methods=['GET']) |
| def requests_submitted(): |
| req_id = request.args.get('id') |
| if (req_id is None): |
| return Response(str(cntr_msg_callbacks), status=200, mimetype=MIME_TEXT) |
| |
| if (req_id in cntr_callbacks.keys()): |
| return Response(str(cntr_callbacks[req_id][0]), status=200, mimetype=MIME_TEXT) |
| else: |
| return Response(str("0"), status=200, mimetype=MIME_TEXT) |
| |
| @app.route('/counter/fetched_callbacks', |
| methods=['GET']) |
| def requests_fetched(): |
| req_id = request.args.get('id') |
| if (req_id is None): |
| return Response(str(cntr_msg_fetched), status=200, mimetype=MIME_TEXT) |
| |
| if (req_id in cntr_callbacks.keys()): |
| return Response(str(cntr_callbacks[req_id][1]), status=200, mimetype=MIME_TEXT) |
| else: |
| return Response(str("0"), status=200, mimetype=MIME_TEXT) |
| |
| @app.route('/counter/current_messages', |
| methods=['GET']) |
| def current_messages(): |
| req_id = request.args.get('id') |
| if (req_id is None): |
| return Response(str(cntr_msg_callbacks-cntr_msg_fetched), status=200, mimetype=MIME_TEXT) |
| |
| if (req_id in cntr_callbacks.keys()): |
| return Response(str(cntr_callbacks[req_id][0]-cntr_callbacks[req_id][1]), status=200, mimetype=MIME_TEXT) |
| else: |
| return Response(str("0"), status=200, mimetype=MIME_TEXT) |
| |
| @app.route('/counter/remote_hosts', |
| methods=['GET']) |
| def remote_hosts(): |
| global hosts_set |
| |
| hosts=",".join(hosts_set) |
| return Response(str(hosts), status=200, mimetype=MIME_TEXT) |
| |
| |
| #Set force delay response, in seconds, for all callbacks |
| #/froceesponse?delay=<seconds> |
| @app.route('/forcedelay', methods=['POST']) |
| def forcedelay(): |
| |
| try: |
| forced_settings['delay']=int(request.args.get('delay')) |
| except Exception: |
| forced_settings['delay']=None |
| return Response("Force delay: " + str(forced_settings['delay']) + " sec set for all callback responses", 200, mimetype=MIME_TEXT) |
| |
| # Helper: Delay if delayed response code is set |
| def do_delay(): |
| if (forced_settings['delay'] is not None): |
| try: |
| val=int(forced_settings['delay']) |
| if (val < 1): |
| return Response("Force delay too short: " + str(forced_settings['delay']) + " sec", 500, mimetype=MIME_TEXT) |
| print("Delaying "+str(val)+ " sec.") |
| time.sleep(val) |
| except Exception: |
| return Response("Force delay : " + str(forced_settings['delay']) + " sec failed", 500, mimetype=MIME_TEXT) |
| ### Admin ### |
| |
| # Reset all messsages and counters |
| @app.route('/reset', |
| methods=['GET', 'POST', 'PUT']) |
| def reset(): |
| global msg_callbacks |
| global cntr_msg_fetched |
| global cntr_msg_callbacks |
| global cntr_callbacks |
| global forced_settings |
| |
| with lock: |
| msg_callbacks={} |
| cntr_msg_fetched=0 |
| cntr_msg_callbacks=0 |
| cntr_callbacks={} |
| forced_settings['delay']=None |
| |
| return Response('OK', status=200, mimetype=MIME_TEXT) |
| |
| ### Main function ### |
| |
| if __name__ == "__main__": |
| app.run(port=HOST_PORT, host=HOST_IP) |