blob: 94ef606d533115a74e304193c82b9df6de12beb2 [file] [log] [blame]
BjornMagnussonXA80a92002020-03-19 14:31:06 +01001
2# ============LICENSE_START===============================================
3# Copyright (C) 2020 Nordix Foundation. All rights reserved.
4# ========================================================================
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16# ============LICENSE_END=================================================
17#
18
BjornMagnussonXA49f0e5a2020-11-08 22:41:39 +010019from flask import Flask, request, Response
BjornMagnussonXA80a92002020-03-19 14:31:06 +010020from time import sleep
21import time
BjornMagnussonXA7b36db62020-11-23 10:57:57 +010022from datetime import datetime
BjornMagnussonXA80a92002020-03-19 14:31:06 +010023import json
BjornMagnussonXA80a92002020-03-19 14:31:06 +010024import traceback
BjornMagnussonXA7b36db62020-11-23 10:57:57 +010025import logging
BjornMagnussonXAc963b732021-01-20 14:24:13 +010026import socket
BjornMagnussonXA663566c2021-11-08 10:25:07 +010027from threading import RLock
BjornMagnussonXA6fc58fd2021-11-18 08:19:45 +010028from hashlib import md5
BjornMagnussonXA7b36db62020-11-23 10:57:57 +010029
30# Disable all logging of GET on reading counters and db
31class AjaxFilter(logging.Filter):
32 def filter(self, record):
33 return ("/counter/" not in record.getMessage()) and ("/db" not in record.getMessage())
34
35log = logging.getLogger('werkzeug')
36log.addFilter(AjaxFilter())
BjornMagnussonXA80a92002020-03-19 14:31:06 +010037
38app = Flask(__name__)
39
BjornMagnussonXA663566c2021-11-08 10:25:07 +010040lock = RLock()
41
BjornMagnussonXA80a92002020-03-19 14:31:06 +010042# list of callback messages
43msg_callbacks={}
44
45# Server info
BjornMagnussonXA72667f12020-04-24 09:20:18 +020046HOST_IP = "::"
ecaiyanlinux113bf092020-08-01 14:05:17 +000047HOST_PORT = 2222
BjornMagnussonXA80a92002020-03-19 14:31:06 +010048
49# Metrics vars
50cntr_msg_callbacks=0
51cntr_msg_fetched=0
BjornMagnussonXA49f0e5a2020-11-08 22:41:39 +010052cntr_callbacks={}
BjornMagnussonXA663566c2021-11-08 10:25:07 +010053hosts_set=set()
BjornMagnussonXA80a92002020-03-19 14:31:06 +010054
55# Request and response constants
56CALLBACK_URL="/callbacks/<string:id>"
BjornMagnussonXA663566c2021-11-08 10:25:07 +010057CALLBACK_MR_URL="/callbacks-mr/<string:id>" #Json list with string encoded items
BjornMagnussonXA6fc58fd2021-11-18 08:19:45 +010058CALLBACK_TEXT_URL="/callbacks-text/<string:id>" # Callback for string of text
BjornMagnussonXA80a92002020-03-19 14:31:06 +010059APP_READ_URL="/get-event/<string:id>"
BjornMagnussonXA49f0e5a2020-11-08 22:41:39 +010060APP_READ_ALL_URL="/get-all-events/<string:id>"
61DUMP_ALL_URL="/db"
BjornMagnussonXA80a92002020-03-19 14:31:06 +010062
63MIME_TEXT="text/plain"
64MIME_JSON="application/json"
65CAUGHT_EXCEPTION="Caught exception: "
66SERVER_ERROR="Server error :"
BjornMagnussonXA7b36db62020-11-23 10:57:57 +010067TIME_STAMP="cr-timestamp"
BjornMagnussonXA80a92002020-03-19 14:31:06 +010068
BjornMagnussonXA663566c2021-11-08 10:25:07 +010069forced_settings={}
70forced_settings['delay']=None
71
72
BjornMagnussonXAc963b732021-01-20 14:24:13 +010073# Remote host lookup and print host name
74def remote_host_logging(request):
75
76 if request.environ.get('HTTP_X_FORWARDED_FOR') is None:
77 host_ip=str(request.environ['REMOTE_ADDR'])
78 else:
79 host_ip=str(request.environ['HTTP_X_FORWARDED_FOR'])
80 prefix='::ffff:'
81 if (host_ip.startswith('::ffff:')):
82 host_ip=host_ip[len(prefix):]
83 try:
84 name, alias, addresslist = socket.gethostbyaddr(host_ip)
85 print("Calling host: "+str(name))
BjornMagnussonXA663566c2021-11-08 10:25:07 +010086 hosts_set.add(name)
BjornMagnussonXAc963b732021-01-20 14:24:13 +010087 except Exception:
88 print("Calling host not possible to retrieve IP: "+str(host_ip))
BjornMagnussonXA663566c2021-11-08 10:25:07 +010089 hosts_set.add(host_ip)
BjornMagnussonXAc963b732021-01-20 14:24:13 +010090
91
BjornMagnussonXA80a92002020-03-19 14:31:06 +010092#I'm alive function
93@app.route('/',
94 methods=['GET'])
95def index():
96 return 'OK', 200
97
98### Callback interface, for control
99
100# Fetch the oldest callback message for an id
101# URI and parameter, (GET): /get-event/<id>
BjornMagnussonXA49f0e5a2020-11-08 22:41:39 +0100102# response: message + 200 or just 204 or just 500(error)
BjornMagnussonXA80a92002020-03-19 14:31:06 +0100103@app.route(APP_READ_URL,
104 methods=['GET'])
105def receiveresponse(id):
106 global msg_callbacks
107 global cntr_msg_fetched
108
BjornMagnussonXA663566c2021-11-08 10:25:07 +0100109 with lock:
110 try:
111 if ((id in msg_callbacks.keys()) and (len(msg_callbacks[id]) > 0)):
112 cntr_msg_fetched+=1
113 cntr_callbacks[id][1]+=1
114 msg=msg_callbacks[id][0]
115 print("Fetching msg for id: "+id+", msg="+str(msg))
BjornMagnussonXA6fc58fd2021-11-18 08:19:45 +0100116
117 if (isinstance(msg,dict)):
118 del msg[TIME_STAMP]
119 if ("md5" in msg.keys()):
120 print("EXTRACTED MD5")
121 msg=msg["md5"]
122 print("MD5: "+str(msg))
123
BjornMagnussonXA663566c2021-11-08 10:25:07 +0100124 del msg_callbacks[id][0]
125 return json.dumps(msg),200
126 print("No messages for id: "+id)
127 except Exception as e:
128 print(CAUGHT_EXCEPTION+str(e))
129 traceback.print_exc()
130 return "",500
BjornMagnussonXA80a92002020-03-19 14:31:06 +0100131
BjornMagnussonXA663566c2021-11-08 10:25:07 +0100132 return "",204
BjornMagnussonXA80a92002020-03-19 14:31:06 +0100133
BjornMagnussonXA49f0e5a2020-11-08 22:41:39 +0100134# Fetch all callback message for an id in an array
135# URI and parameter, (GET): /get-all-events/<id>
136# response: message + 200 or just 500(error)
137@app.route(APP_READ_ALL_URL,
138 methods=['GET'])
139def receiveresponse_all(id):
140 global msg_callbacks
141 global cntr_msg_fetched
142
BjornMagnussonXA663566c2021-11-08 10:25:07 +0100143 with lock:
144 try:
145 if ((id in msg_callbacks.keys()) and (len(msg_callbacks[id]) > 0)):
146 cntr_msg_fetched+=len(msg_callbacks[id])
147 cntr_callbacks[id][1]+=len(msg_callbacks[id])
148 msg=msg_callbacks[id]
149 print("Fetching all msgs for id: "+id+", msg="+str(msg))
150 for sub_msg in msg:
BjornMagnussonXA6fc58fd2021-11-18 08:19:45 +0100151 if (isinstance(sub_msg, dict)):
152 del sub_msg[TIME_STAMP]
BjornMagnussonXA663566c2021-11-08 10:25:07 +0100153 del msg_callbacks[id]
154 return json.dumps(msg),200
155 print("No messages for id: "+id)
156 except Exception as e:
157 print(CAUGHT_EXCEPTION+str(e))
158 traceback.print_exc()
159 return "",500
BjornMagnussonXA49f0e5a2020-11-08 22:41:39 +0100160
BjornMagnussonXA663566c2021-11-08 10:25:07 +0100161 msg=[]
162 return json.dumps(msg),200
BjornMagnussonXA80a92002020-03-19 14:31:06 +0100163
164# Receive a callback message
BjornMagnussonXA49f0e5a2020-11-08 22:41:39 +0100165# URI and payload, (PUT or POST): /callbacks/<id> <json messages>
BjornMagnussonXA80a92002020-03-19 14:31:06 +0100166# response: OK 200 or 500 for other errors
167@app.route(CALLBACK_URL,
168 methods=['PUT','POST'])
169def events_write(id):
170 global msg_callbacks
171 global cntr_msg_callbacks
172
173 try:
174 print("Received callback for id: "+id +", content-type="+request.content_type)
BjornMagnussonXAc963b732021-01-20 14:24:13 +0100175 remote_host_logging(request)
BjornMagnussonXA663566c2021-11-08 10:25:07 +0100176 print("raw data: str(request.data): "+str(request.data))
177 do_delay()
BjornMagnussonXA80a92002020-03-19 14:31:06 +0100178 try:
BjornMagnussonXA80a92002020-03-19 14:31:06 +0100179 if (request.content_type == MIME_JSON):
BjornMagnussonXA49f0e5a2020-11-08 22:41:39 +0100180 data = request.data
181 msg = json.loads(data)
BjornMagnussonXA80a92002020-03-19 14:31:06 +0100182 print("Payload(json): "+str(msg))
BjornMagnussonXA80a92002020-03-19 14:31:06 +0100183 else:
BjornMagnussonXA49f0e5a2020-11-08 22:41:39 +0100184 msg={}
185 print("Payload(content-type="+request.content_type+"). Setting empty json as payload")
186 except Exception as e:
187 msg={}
188 print("(Exception) Payload does not contain any json, setting empty json as payload")
189 traceback.print_exc()
BjornMagnussonXA80a92002020-03-19 14:31:06 +0100190
BjornMagnussonXA663566c2021-11-08 10:25:07 +0100191 with lock:
192 cntr_msg_callbacks += 1
BjornMagnussonXA6fc58fd2021-11-18 08:19:45 +0100193 if (isinstance(msg, dict)):
194 msg[TIME_STAMP]=str(datetime.now())
BjornMagnussonXA663566c2021-11-08 10:25:07 +0100195 if (id in msg_callbacks.keys()):
196 msg_callbacks[id].append(msg)
197 else:
198 msg_callbacks[id]=[]
199 msg_callbacks[id].append(msg)
BjornMagnussonXA49f0e5a2020-11-08 22:41:39 +0100200
BjornMagnussonXA663566c2021-11-08 10:25:07 +0100201 if (id in cntr_callbacks.keys()):
202 cntr_callbacks[id][0] += 1
203 else:
204 cntr_callbacks[id]=[]
205 cntr_callbacks[id].append(1)
206 cntr_callbacks[id].append(0)
207
208 except Exception as e:
209 print(CAUGHT_EXCEPTION+str(e))
210 traceback.print_exc()
211 return 'NOTOK',500
212
213 return 'OK',200
214
215
BjornMagnussonXA6fc58fd2021-11-18 08:19:45 +0100216# Receive a json callback message with payload formatted according to output from the message router
217# Array of stringified json objects
218# URI and payload, (PUT or POST): /callbacks-mr/<id> <json messages>
BjornMagnussonXA663566c2021-11-08 10:25:07 +0100219# json is a list of string encoded json items
220# response: OK 200 or 500 for other errors
221@app.route(CALLBACK_MR_URL,
222 methods=['PUT','POST'])
223def events_write_mr(id):
224 global msg_callbacks
225 global cntr_msg_callbacks
226
BjornMagnussonXA6fc58fd2021-11-18 08:19:45 +0100227 storeas=request.args.get('storeas') #If set, store payload as a md5 hascode and dont log the payload
228 #Large payloads will otherwise overload the server
BjornMagnussonXA663566c2021-11-08 10:25:07 +0100229 try:
230 print("Received callback (mr) for id: "+id +", content-type="+request.content_type)
BjornMagnussonXA663566c2021-11-08 10:25:07 +0100231 print("raw data: str(request.data): "+str(request.data))
BjornMagnussonXA6fc58fd2021-11-18 08:19:45 +0100232 if (storeas is None):
233 print("raw data: str(request.data): "+str(request.data))
BjornMagnussonXA663566c2021-11-08 10:25:07 +0100234 do_delay()
235 try:
236 #if (request.content_type == MIME_JSON):
237 if (MIME_JSON in request.content_type):
238 data = request.data
239 msg_list = json.loads(data)
BjornMagnussonXA6fc58fd2021-11-18 08:19:45 +0100240 if (storeas is None):
241 print("Payload(json): "+str(msg_list))
BjornMagnussonXA663566c2021-11-08 10:25:07 +0100242 else:
243 msg_list=[]
244 print("Payload(content-type="+request.content_type+"). Setting empty json as payload")
245 except Exception as e:
246 msg_list=[]
247 print("(Exception) Payload does not contain any json, setting empty json as payload")
248 traceback.print_exc()
249
250 with lock:
251 remote_host_logging(request)
252 for msg in msg_list:
BjornMagnussonXA6fc58fd2021-11-18 08:19:45 +0100253 if (storeas is None):
254 msg=json.loads(msg)
255 else:
256 #Convert to compact json without ws between parameter and value...
257 #It seem that ws is added somewhere along to way to this server
258 msg=json.loads(msg)
259 msg=json.dumps(msg, separators=(',', ':'))
260
261 md5msg={}
262 md5msg["md5"]=md5(msg.encode('utf-8')).hexdigest()
263 msg=md5msg
264 print("msg (json converted to md5 hash): "+str(msg["md5"]))
BjornMagnussonXA663566c2021-11-08 10:25:07 +0100265 cntr_msg_callbacks += 1
BjornMagnussonXA6fc58fd2021-11-18 08:19:45 +0100266 if (isinstance(msg, dict)):
267 msg[TIME_STAMP]=str(datetime.now())
BjornMagnussonXA663566c2021-11-08 10:25:07 +0100268 if (id in msg_callbacks.keys()):
269 msg_callbacks[id].append(msg)
270 else:
271 msg_callbacks[id]=[]
272 msg_callbacks[id].append(msg)
273
274 if (id in cntr_callbacks.keys()):
275 cntr_callbacks[id][0] += 1
276 else:
277 cntr_callbacks[id]=[]
278 cntr_callbacks[id].append(1)
279 cntr_callbacks[id].append(0)
BjornMagnussonXA49f0e5a2020-11-08 22:41:39 +0100280
BjornMagnussonXA80a92002020-03-19 14:31:06 +0100281 except Exception as e:
282 print(CAUGHT_EXCEPTION+str(e))
BjornMagnussonXA49f0e5a2020-11-08 22:41:39 +0100283 traceback.print_exc()
284 return 'NOTOK',500
BjornMagnussonXA80a92002020-03-19 14:31:06 +0100285
286 return 'OK',200
287
BjornMagnussonXA6fc58fd2021-11-18 08:19:45 +0100288# Receive a callback message of a single text message (content type ignored)
289# or a json array of strings (content type json)
290# URI and payload, (PUT or POST): /callbacks-text/<id> <text message>
291# response: OK 200 or 500 for other errors
292@app.route(CALLBACK_TEXT_URL,
293 methods=['PUT','POST'])
294def events_write_text(id):
295 global msg_callbacks
296 global cntr_msg_callbacks
297
298 storeas=request.args.get('storeas') #If set, store payload as a md5 hascode and dont log the payload
299 #Large payloads will otherwise overload the server
300 try:
301 print("Received callback for id: "+id +", content-type="+request.content_type)
302 remote_host_logging(request)
303 if (storeas is None):
304 print("raw data: str(request.data): "+str(request.data))
305 do_delay()
306
307 try:
308 msg_list=None
309 if (MIME_JSON in request.content_type): #Json array of strings
310 msg_list=json.loads(request.data)
311 else:
312 data=request.data.decode("utf-8") #Assuming string
313 msg_list=[]
314 msg_list.append(data)
315
316 for msg in msg_list:
317 if (storeas == "md5"):
318 md5msg={}
319 print("msg: "+str(msg))
320 print("msg (endcode str): "+str(msg.encode('utf-8')))
321 md5msg["md5"]=md5(msg.encode('utf-8')).hexdigest()
322 msg=md5msg
323 print("msg (data converted to md5 hash): "+str(msg["md5"]))
324
325 if (isinstance(msg, dict)):
326 msg[TIME_STAMP]=str(datetime.now())
327
328 with lock:
329 cntr_msg_callbacks += 1
330 if (id in msg_callbacks.keys()):
331 msg_callbacks[id].append(msg)
332 else:
333 msg_callbacks[id]=[]
334 msg_callbacks[id].append(msg)
335
336 if (id in cntr_callbacks.keys()):
337 cntr_callbacks[id][0] += 1
338 else:
339 cntr_callbacks[id]=[]
340 cntr_callbacks[id].append(1)
341 cntr_callbacks[id].append(0)
342 except Exception as e:
343 print(CAUGHT_EXCEPTION+str(e))
344 traceback.print_exc()
345 return 'NOTOK',500
346
347
348 except Exception as e:
349 print(CAUGHT_EXCEPTION+str(e))
350 traceback.print_exc()
351 return 'NOTOK',500
352
353 return 'OK',200
354
BjornMagnussonXA49f0e5a2020-11-08 22:41:39 +0100355### Functions for test ###
356
357# Dump the whole db of current callbacks
358# URI and parameter, (GET): /db
359# response: message + 200
360@app.route(DUMP_ALL_URL,
361 methods=['GET'])
362def dump_db():
363 return json.dumps(msg_callbacks),200
BjornMagnussonXA80a92002020-03-19 14:31:06 +0100364
365### Functions for metrics read out ###
366
367@app.route('/counter/received_callbacks',
368 methods=['GET'])
369def requests_submitted():
BjornMagnussonXA49f0e5a2020-11-08 22:41:39 +0100370 req_id = request.args.get('id')
371 if (req_id is None):
372 return Response(str(cntr_msg_callbacks), status=200, mimetype=MIME_TEXT)
373
374 if (req_id in cntr_callbacks.keys()):
375 return Response(str(cntr_callbacks[req_id][0]), status=200, mimetype=MIME_TEXT)
376 else:
377 return Response(str("0"), status=200, mimetype=MIME_TEXT)
BjornMagnussonXA80a92002020-03-19 14:31:06 +0100378
379@app.route('/counter/fetched_callbacks',
380 methods=['GET'])
381def requests_fetched():
BjornMagnussonXA49f0e5a2020-11-08 22:41:39 +0100382 req_id = request.args.get('id')
383 if (req_id is None):
384 return Response(str(cntr_msg_fetched), status=200, mimetype=MIME_TEXT)
385
386 if (req_id in cntr_callbacks.keys()):
387 return Response(str(cntr_callbacks[req_id][1]), status=200, mimetype=MIME_TEXT)
388 else:
389 return Response(str("0"), status=200, mimetype=MIME_TEXT)
BjornMagnussonXA80a92002020-03-19 14:31:06 +0100390
391@app.route('/counter/current_messages',
392 methods=['GET'])
393def current_messages():
BjornMagnussonXA49f0e5a2020-11-08 22:41:39 +0100394 req_id = request.args.get('id')
395 if (req_id is None):
396 return Response(str(cntr_msg_callbacks-cntr_msg_fetched), status=200, mimetype=MIME_TEXT)
BjornMagnussonXA80a92002020-03-19 14:31:06 +0100397
BjornMagnussonXA49f0e5a2020-11-08 22:41:39 +0100398 if (req_id in cntr_callbacks.keys()):
399 return Response(str(cntr_callbacks[req_id][0]-cntr_callbacks[req_id][1]), status=200, mimetype=MIME_TEXT)
400 else:
401 return Response(str("0"), status=200, mimetype=MIME_TEXT)
BjornMagnussonXA80a92002020-03-19 14:31:06 +0100402
BjornMagnussonXA663566c2021-11-08 10:25:07 +0100403@app.route('/counter/remote_hosts',
404 methods=['GET'])
405def remote_hosts():
406 global hosts_set
BjornMagnussonXA80a92002020-03-19 14:31:06 +0100407
BjornMagnussonXA663566c2021-11-08 10:25:07 +0100408 hosts=",".join(hosts_set)
409 return Response(str(hosts), status=200, mimetype=MIME_TEXT)
410
411
412#Set force delay response, in seconds, for all callbacks
413#/froceesponse?delay=<seconds>
414@app.route('/forcedelay', methods=['POST'])
415def forcedelay():
416
417 try:
418 forced_settings['delay']=int(request.args.get('delay'))
419 except Exception:
420 forced_settings['delay']=None
421 return Response("Force delay: " + str(forced_settings['delay']) + " sec set for all callback responses", 200, mimetype=MIME_TEXT)
422
423# Helper: Delay if delayed response code is set
424def do_delay():
425 if (forced_settings['delay'] is not None):
426 try:
427 val=int(forced_settings['delay'])
428 if (val < 1):
429 return Response("Force delay too short: " + str(forced_settings['delay']) + " sec", 500, mimetype=MIME_TEXT)
430 print("Delaying "+str(val)+ " sec.")
431 time.sleep(val)
432 except Exception:
433 return Response("Force delay : " + str(forced_settings['delay']) + " sec failed", 500, mimetype=MIME_TEXT)
BjornMagnussonXA80a92002020-03-19 14:31:06 +0100434### Admin ###
435
436# Reset all messsages and counters
437@app.route('/reset',
438 methods=['GET', 'POST', 'PUT'])
439def reset():
440 global msg_callbacks
441 global cntr_msg_fetched
442 global cntr_msg_callbacks
BjornMagnussonXA7b36db62020-11-23 10:57:57 +0100443 global cntr_callbacks
BjornMagnussonXA663566c2021-11-08 10:25:07 +0100444 global forced_settings
BjornMagnussonXA80a92002020-03-19 14:31:06 +0100445
BjornMagnussonXA663566c2021-11-08 10:25:07 +0100446 with lock:
447 msg_callbacks={}
448 cntr_msg_fetched=0
449 cntr_msg_callbacks=0
450 cntr_callbacks={}
451 forced_settings['delay']=None
BjornMagnussonXA80a92002020-03-19 14:31:06 +0100452
BjornMagnussonXA663566c2021-11-08 10:25:07 +0100453 return Response('OK', status=200, mimetype=MIME_TEXT)
BjornMagnussonXA80a92002020-03-19 14:31:06 +0100454
455### Main function ###
456
457if __name__ == "__main__":
458 app.run(port=HOST_PORT, host=HOST_IP)