blob: 40707d62ff5935385ecc8052c6d859f53b31b425 [file] [log] [blame]
ecaiyanlinux67355802020-05-11 12:53:23 +02001
2# ============LICENSE_START===============================================
3# Copyright (C) 2020 Nordix Foundation. All rights reserved.
4# ========================================================================
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16# ============LICENSE_END=================================================
17#
18
19from flask import Flask, request
20from time import sleep
21import time
22import datetime
23import json
24from flask import Flask
25from flask import Response
26import traceback
BjornMagnussonXAe0b665e2021-01-08 22:19:18 +010027from threading import RLock, Thread
BjornMagnussonXA7b36db62020-11-23 10:57:57 +010028import logging
BjornMagnussonXAe0b665e2021-01-08 22:19:18 +010029import os
30import requests
BjornMagnussonXA7b36db62020-11-23 10:57:57 +010031
32# Disable all logging of GET on reading counters
33class AjaxFilter(logging.Filter):
34 def filter(self, record):
35 return ("/counter/" not in record.getMessage())
36
37log = logging.getLogger('werkzeug')
38log.addFilter(AjaxFilter())
ecaiyanlinux67355802020-05-11 12:53:23 +020039
40app = Flask(__name__)
41lock = RLock()
42# list of messages to/from Dmaap
43msg_requests=[]
44msg_responses={}
45
46# Server info
47HOST_IP = "::"
48HOST_PORT = 2222
49
50# Metrics vars
51cntr_msg_requests_submitted=0
52cntr_msg_requests_fetched=0
53cntr_msg_responses_submitted=0
54cntr_msg_responses_fetched=0
55
56# Request and response constants
57AGENT_WRITE_URL="/events/A1-POLICY-AGENT-WRITE"
58AGENT_READ_URL="/events/A1-POLICY-AGENT-READ/users/policy-agent"
59APP_WRITE_URL="/send-request"
60APP_READ_URL="/receive-response"
61MIME_TEXT="text/plain"
62MIME_JSON="application/json"
63CAUGHT_EXCEPTION="Caught exception: "
64SERVER_ERROR="Server error :"
65
BjornMagnussonXAe0b665e2021-01-08 22:19:18 +010066topic_write=""
67topic_read=""
68
69uploader_thread=None
70downloader_thread=None
71
72# Function to download messages from dmaap
73def dmaap_uploader():
74 global msg_requests
75 global cntr_msg_requests_fetched
76
77 print("Starting uploader")
78
79 headers = {'Content-type': 'application/json', 'Accept': '*/*'}
80 #url="http://"+topic_host+"/events/"+topic_read
81 url=topic_read
82
83 while True:
84 while (len(msg_requests)>0):
85 msg=msg_requests[0]
86 if msg is not None:
87 try:
88 print("Sending to dmaap : "+ url)
89 print("Sending to dmaap : "+ msg)
90 resp=requests.post(url, data=msg, headers=headers, timeout=10)
91 if (resp.status_code<199 & resp.status_code > 299):
92 print("Failed, response code: " + str(resp.status_code))
93 sleep(1)
94 else:
95 print("Dmaap response code: " + str(resp.status_code))
96 print("Dmaap response text: " + str(resp.text))
97 with lock:
98 msg_requests.pop(0)
99 cntr_msg_requests_fetched += 1
100 except Exception as e:
101 print("Failed, exception: "+ str(e))
102 sleep(1)
103 sleep(0.01)
104
105
106# Function to download messages from dmaap
107def dmaap_downloader():
108 global msg_responses
109 global cntr_msg_responses_submitted
110
111 print("Starting uploader")
112
113 while True:
114
115 try :
116 #url="http://"+topic_host+"/events/"+topic_write+"/users/mr-stub?timeout=15000&limit=100"
117 url=topic_write
118 headers = {'Accept': 'application/json'}
119 print("Reading from dmaap: " + url)
120 resp=requests.get(url, headers=headers)
121 if (resp.status_code<199 & resp.status_code > 299):
122 print("Failed, response code: " + resp.status_code)
123 sleep(1)
124 else:
125 print("Recieved data from dmaap mr")
126 try:
127 data=resp.json()
128 print("Recieved data (raw): " + str(resp.text))
129 if isinstance(data, list):
130 for item in data:
131 item=json.loads(item)
132 corrid=str(item["correlationId"])
133 status=str(item["status"])
134 msg=str(item["message"])
135 item_str=msg+status[0:3]
136 with lock:
137 msg_responses[corrid]=item_str
138 cntr_msg_responses_submitted += 1
139 else:
140 print("Data from dmaap is not json array: " + str(resp.text))
141 sleep(1)
142 except Exception as e:
143 print("Corrupt data from dmaap mr - dropping " + str(data))
144 print("CAUGHT_EXCEPTION" + str(e) + " "+traceback.format_exc())
145 sleep(1)
146 except Exception as e:
147 sleep(1)
148
ecaiyanlinux67355802020-05-11 12:53:23 +0200149#I'm alive function
150@app.route('/',
151 methods=['GET'])
152def index():
153 return 'OK', 200
154
155
156# Helper function to create a Dmaap request message
157# args : <GET|PUT|DELETE> <correlation-id> <json-string-payload - may be None> <url>
158# response: json formatted string of a complete Dmaap message
159def create_message(operation, correlation_id, payload, url):
160 if (payload is None):
161 payload="{}"
162 time_stamp=datetime.datetime.utcnow()
163 msg = '{\"apiVersion\":\"1.0\",\"operation\":\"'+operation+'\",\"correlationId\":\"'+correlation_id+'\",\"originatorId\": \"849e6c6b420\",'
164 msg = msg + '\"payload\":'+payload+',\"requestId\":\"23343221\", \"target\":\"policy-agent\", \"timestamp\":\"'+str(time_stamp)+'\", \"type\":\"request\",\"url\":\"'+url+'\"}'
165 return msg
166
167
168### MR-stub interface, for MR control
169
170# Send a message to MR
elinuxhenrik7fbe8852021-04-23 13:07:42 +0200171# URI and parameters (PUT or POST): /send-request?operation=<GET|PUT|POST|DELETE>&url=<url>
ecaiyanlinux67355802020-05-11 12:53:23 +0200172# response: <correlation-id> (http 200) o4 400 for parameter error or 500 for other errors
173@app.route(APP_WRITE_URL,
174 methods=['PUT','POST'])
175def sendrequest():
176 global msg_requests
177 global cntr_msg_requests_submitted
178 with lock:
179 print("APP_WRITE_URL lock")
180 try:
181 oper=request.args.get('operation')
182 if (oper is None):
183 print(APP_WRITE_URL+" parameter 'operation' missing")
184 return Response('Parameter operation missing in request', status=400, mimetype=MIME_TEXT)
185
186 url=request.args.get('url')
187 if (url is None):
188 print(APP_WRITE_URL+" parameter 'url' missing")
189 return Response('Parameter url missing in request', status=400, mimetype=MIME_TEXT)
190
191 if (oper != "GET" and oper != "PUT" and oper != "POST" and oper != "DELETE"):
192 print(APP_WRITE_URL+" parameter 'operation' need to be: DEL|PUT|POST|DELETE")
193 return Response('Parameter operation does not contain DEL|PUT|POST|DELETE in request', status=400, mimetype=MIME_TEXT)
194
195 print(APP_WRITE_URL+" operation="+oper+" url="+url)
196 correlation_id=str(time.time_ns())
197 payload=None
198 if (oper == "PUT") and (request.json is not None):
199 payload=json.dumps(request.json)
200
201 msg=create_message(oper, correlation_id, payload, url)
202 print(msg)
203 print(APP_WRITE_URL+" MSG(correlationid = "+correlation_id+"): " + json.dumps(json.loads(msg), indent=2))
204 msg_requests.append(msg)
205 cntr_msg_requests_submitted += 1
206 return Response(correlation_id, status=200, mimetype=MIME_TEXT)
207 except Exception as e:
208 print(APP_WRITE_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
209 return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT)
210
211# Receive a message response for MR for the included correlation id
212# URI and parameter, (GET): /receive-response?correlationid=<correlation-id>
213# response: <json-array of 1 response> 200 or empty 204 or other errors 500
214@app.route(APP_READ_URL,
215 methods=['GET'])
216def receiveresponse():
217 global msg_responses
218 global cntr_msg_responses_fetched
219 with lock:
220 print("APP_READ_URL lock")
221 try:
BjornMagnussonXAe0b665e2021-01-08 22:19:18 +0100222 cid=request.args.get('correlationid')
223 if (cid is None):
ecaiyanlinux67355802020-05-11 12:53:23 +0200224 print(APP_READ_URL+" parameter 'correclationid' missing")
225 return Response('Parameter correlationid missing in json', status=500, mimetype=MIME_TEXT)
226
BjornMagnussonXAe0b665e2021-01-08 22:19:18 +0100227 if (cid in msg_responses):
228 answer=msg_responses[cid]
229 del msg_responses[cid]
230 print(APP_READ_URL+" response (correlationid="+cid+"): " + answer)
ecaiyanlinux67355802020-05-11 12:53:23 +0200231 cntr_msg_responses_fetched += 1
232 return Response(answer, status=200, mimetype=MIME_JSON)
233
BjornMagnussonXAe0b665e2021-01-08 22:19:18 +0100234 print(APP_READ_URL+" - no messages (correlationid="+cid+"): ")
ecaiyanlinux67355802020-05-11 12:53:23 +0200235 return Response('', status=204, mimetype=MIME_JSON)
236 except Exception as e:
237 print(APP_READ_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
238 return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT)
239
240### Dmaap interface ###
241
242# Read messages stream. URI according to agent configuration.
243# URI, (GET): /events/A1-POLICY-AGENT-READ/users/policy-agent
244# response: 200 <json array of request messages>, or 500 for other errors
245@app.route(AGENT_READ_URL,
246 methods=['GET'])
247def events_read():
248 global msg_requests
249 global cntr_msg_requests_fetched
250
BjornMagnussonXAe0b665e2021-01-08 22:19:18 +0100251 if topic_write or topic_read:
252 return Response('Url not available when running as mrstub frontend', status=404, mimetype=MIME_TEXT)
253
ecaiyanlinux67355802020-05-11 12:53:23 +0200254 limit=request.args.get('limit')
255 if (limit is None):
256 limit=4096
257 else:
258 limit=int(limit)
259 if (limit<0):
260 limit=0
261 if (limit>4096):
262 limit=4096
263 print("Limting number of returned messages to: "+str(limit))
264
265 timeout=request.args.get('timeout')
266 if (timeout is None):
267 timeout=10000
268 else:
269 timeout=min(int(timeout),60000)
270
BjornMagnussonXAe0b665e2021-01-08 22:19:18 +0100271 start_time=int(round(time.time() * 1000))
272 current_time=int(round(time.time() * 1000))
ecaiyanlinux67355802020-05-11 12:53:23 +0200273
BjornMagnussonXAe0b665e2021-01-08 22:19:18 +0100274 while(current_time<start_time+int(timeout)):
ecaiyanlinux67355802020-05-11 12:53:23 +0200275 with lock:
276 if(len(msg_requests)>0):
277 try:
278 msgs=''
279 cntr=0
280 while(cntr<limit and len(msg_requests)>0):
281 if (len(msgs)>1):
282 msgs=msgs+','
283 msgs=msgs+msg_requests.pop(0)
284 cntr_msg_requests_fetched += 1
285 cntr=cntr+1
286 msgs='['+msgs+']'
287 print(AGENT_READ_URL+" MSGs: "+json.dumps(json.loads(msgs), indent=2))
288 return Response(msgs, status=200, mimetype=MIME_JSON)
289 except Exception as e:
290 print(AGENT_READ_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
291 return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT)
292 sleep(0.025) # sleep 25 milliseconds
BjornMagnussonXAe0b665e2021-01-08 22:19:18 +0100293 current_time=int(round(time.time() * 1000))
ecaiyanlinux67355802020-05-11 12:53:23 +0200294
BjornMagnussonXAe0b665e2021-01-08 22:19:18 +0100295 print("timeout: "+str(timeout)+", start_time: "+str(start_time)+", current_time: "+str(current_time))
ecaiyanlinux67355802020-05-11 12:53:23 +0200296 return Response("[]", status=200, mimetype=MIME_JSON)
297
298# Write messages stream. URI according to agent configuration.
299# URI and payload, (PUT or POST): /events/A1-POLICY-AGENT-WRITE <json array of response messages>
300# response: OK 200 or 400 for missing json parameters, 500 for other errors
301@app.route(AGENT_WRITE_URL,
302 methods=['PUT','POST'])
303def events_write():
304 global msg_responses
305 global cntr_msg_responses_submitted
BjornMagnussonXAe0b665e2021-01-08 22:19:18 +0100306
307 if topic_write or topic_read:
308 return Response('Url not available when running as mrstub frontend', status=404, mimetype=MIME_TEXT)
309
ecaiyanlinux67355802020-05-11 12:53:23 +0200310 with lock:
311 print("AGENT_WRITE_URL lock")
312 try:
313 answer=request.json
314 print(AGENT_WRITE_URL+ " json=" + json.dumps(answer, indent=2))
BjornMagnussonXA0f9d5172020-06-17 15:43:39 +0200315 if isinstance(answer, dict):
316 #Create a an array if the answer is a dict (single message)
317 answer_list=[]
318 answer_list.append(answer)
319 answer=answer_list
320
ecaiyanlinux67355802020-05-11 12:53:23 +0200321 for item in answer:
BjornMagnussonXAe0b665e2021-01-08 22:19:18 +0100322 cid=item['correlationId']
323 if (cid is None):
ecaiyanlinux67355802020-05-11 12:53:23 +0200324 print(AGENT_WRITE_URL+" parameter 'correlatonid' missing")
325 return Response('Parameter <correlationid> missing in json', status=400, mimetype=MIME_TEXT)
326 msg=item['message']
327 if (msg is None):
328 print(AGENT_WRITE_URL+" parameter 'msgs' missing")
329 return Response('Parameter >message> missing in json', status=400, mimetype=MIME_TEXT)
330 status=item['status']
331 if (status is None):
332 print(AGENT_WRITE_URL+" parameter 'status' missing")
333 return Response('Parameter <status> missing in json', status=400, mimetype=MIME_TEXT)
334 if isinstance(msg, list) or isinstance(msg, dict):
335 msg_str=json.dumps(msg)+status[0:3]
336 else:
337 msg_str=msg+status[0:3]
BjornMagnussonXAe0b665e2021-01-08 22:19:18 +0100338 msg_responses[cid]=msg_str
ecaiyanlinux67355802020-05-11 12:53:23 +0200339 cntr_msg_responses_submitted += 1
BjornMagnussonXAe0b665e2021-01-08 22:19:18 +0100340 print(AGENT_WRITE_URL+ " msg+status (correlationid="+cid+") :" + str(msg_str))
ecaiyanlinux67355802020-05-11 12:53:23 +0200341 except Exception as e:
342 print(AGENT_WRITE_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
343 return Response('{"message": "' + SERVER_ERROR + ' ' + str(e) + '","status":"500"}', status=200, mimetype=MIME_JSON)
344
345 return Response('{}', status=200, mimetype=MIME_JSON)
346
347
348### Functions for metrics read out ###
349
350@app.route('/counter/requests_submitted',
351 methods=['GET'])
352def requests_submitted():
353 return Response(str(cntr_msg_requests_submitted), status=200, mimetype=MIME_TEXT)
354
355@app.route('/counter/requests_fetched',
356 methods=['GET'])
357def requests_fetched():
358 return Response(str(cntr_msg_requests_fetched), status=200, mimetype=MIME_TEXT)
359
360@app.route('/counter/responses_submitted',
361 methods=['GET'])
362def responses_submitted():
363 return Response(str(cntr_msg_responses_submitted), status=200, mimetype=MIME_TEXT)
364
365@app.route('/counter/responses_fetched',
366 methods=['GET'])
367def responses_fetched():
368 return Response(str(cntr_msg_responses_fetched), status=200, mimetype=MIME_TEXT)
369
370@app.route('/counter/current_requests',
371 methods=['GET'])
372def current_requests():
373 return Response(str(len(msg_requests)), status=200, mimetype=MIME_TEXT)
374
375@app.route('/counter/current_responses',
376 methods=['GET'])
377def current_responses():
378 return Response(str(len(msg_responses)), status=200, mimetype=MIME_TEXT)
379
380### Admin ###
381
382# Reset all messsages and counters
383@app.route('/reset',
384 methods=['GET', 'POST', 'PUT'])
385def reset():
386 global cntr_msg_requests_submitted
387 global cntr_msg_requests_fetched
388 global cntr_msg_responses_submitted
389 global cntr_msg_responses_fetched
390 global msg_requests
391 global msg_responses
392
393 cntr_msg_requests_submitted=0
394 cntr_msg_requests_fetched=0
395 cntr_msg_responses_submitted=0
396 cntr_msg_responses_fetched=0
397 msg_requests=[]
398 msg_responses={}
399 return Response('OK', status=200, mimetype=MIME_TEXT)
400
BjornMagnussonXAe0b665e2021-01-08 22:19:18 +0100401# Get env vars, if present
402if os.getenv("TOPIC_READ") is not None:
403
404 print("Env variables:")
405 print("TOPIC_READ:"+os.environ['TOPIC_READ'])
406 print("TOPIC_WRITE:"+os.environ['TOPIC_WRITE'])
407
408 topic_read=os.environ['TOPIC_READ']
409 topic_write=os.environ['TOPIC_WRITE']
410
411
412 if topic_read and downloader_thread is None:
413 downloader_thread=Thread(target=dmaap_downloader)
414 downloader_thread.start()
415
416 if topic_write and uploader_thread is None:
417 uploader_thread=Thread(target=dmaap_uploader)
418 uploader_thread.start()
419
420else:
421 print("No env variables - OK")
422
ecaiyanlinux67355802020-05-11 12:53:23 +0200423if __name__ == "__main__":
elinuxhenrik7fbe8852021-04-23 13:07:42 +0200424 app.run(port=HOST_PORT, host=HOST_IP)