blob: 8804a0448abed9cd3dd783476f7b8b5830b23f6a [file] [log] [blame]
ecaiyanlinux67355802020-05-11 12:53:23 +02001
2# ============LICENSE_START===============================================
3# Copyright (C) 2020 Nordix Foundation. All rights reserved.
4# ========================================================================
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16# ============LICENSE_END=================================================
17#
18
19from flask import Flask, request
20from time import sleep
21import time
22import datetime
23import json
24from flask import Flask
25from flask import Response
26import traceback
27from threading import RLock
28
29app = Flask(__name__)
30lock = RLock()
31# list of messages to/from Dmaap
32msg_requests=[]
33msg_responses={}
34
35# Server info
36HOST_IP = "::"
37HOST_PORT = 2222
38
39# Metrics vars
40cntr_msg_requests_submitted=0
41cntr_msg_requests_fetched=0
42cntr_msg_responses_submitted=0
43cntr_msg_responses_fetched=0
44
45# Request and response constants
46AGENT_WRITE_URL="/events/A1-POLICY-AGENT-WRITE"
47AGENT_READ_URL="/events/A1-POLICY-AGENT-READ/users/policy-agent"
48APP_WRITE_URL="/send-request"
49APP_READ_URL="/receive-response"
50MIME_TEXT="text/plain"
51MIME_JSON="application/json"
52CAUGHT_EXCEPTION="Caught exception: "
53SERVER_ERROR="Server error :"
54
55#I'm alive function
56@app.route('/',
57 methods=['GET'])
58def index():
59 return 'OK', 200
60
61
62# Helper function to create a Dmaap request message
63# args : <GET|PUT|DELETE> <correlation-id> <json-string-payload - may be None> <url>
64# response: json formatted string of a complete Dmaap message
65def create_message(operation, correlation_id, payload, url):
66 if (payload is None):
67 payload="{}"
68 time_stamp=datetime.datetime.utcnow()
69 msg = '{\"apiVersion\":\"1.0\",\"operation\":\"'+operation+'\",\"correlationId\":\"'+correlation_id+'\",\"originatorId\": \"849e6c6b420\",'
70 msg = msg + '\"payload\":'+payload+',\"requestId\":\"23343221\", \"target\":\"policy-agent\", \"timestamp\":\"'+str(time_stamp)+'\", \"type\":\"request\",\"url\":\"'+url+'\"}'
71 return msg
72
73
74### MR-stub interface, for MR control
75
76# Send a message to MR
77# URI and parameters (GET): /send-request?operation=<GET|PUT|POST|DELETE>&url=<url>
78# response: <correlation-id> (http 200) o4 400 for parameter error or 500 for other errors
79@app.route(APP_WRITE_URL,
80 methods=['PUT','POST'])
81def sendrequest():
82 global msg_requests
83 global cntr_msg_requests_submitted
84 with lock:
85 print("APP_WRITE_URL lock")
86 try:
87 oper=request.args.get('operation')
88 if (oper is None):
89 print(APP_WRITE_URL+" parameter 'operation' missing")
90 return Response('Parameter operation missing in request', status=400, mimetype=MIME_TEXT)
91
92 url=request.args.get('url')
93 if (url is None):
94 print(APP_WRITE_URL+" parameter 'url' missing")
95 return Response('Parameter url missing in request', status=400, mimetype=MIME_TEXT)
96
97 if (oper != "GET" and oper != "PUT" and oper != "POST" and oper != "DELETE"):
98 print(APP_WRITE_URL+" parameter 'operation' need to be: DEL|PUT|POST|DELETE")
99 return Response('Parameter operation does not contain DEL|PUT|POST|DELETE in request', status=400, mimetype=MIME_TEXT)
100
101 print(APP_WRITE_URL+" operation="+oper+" url="+url)
102 correlation_id=str(time.time_ns())
103 payload=None
104 if (oper == "PUT") and (request.json is not None):
105 payload=json.dumps(request.json)
106
107 msg=create_message(oper, correlation_id, payload, url)
108 print(msg)
109 print(APP_WRITE_URL+" MSG(correlationid = "+correlation_id+"): " + json.dumps(json.loads(msg), indent=2))
110 msg_requests.append(msg)
111 cntr_msg_requests_submitted += 1
112 return Response(correlation_id, status=200, mimetype=MIME_TEXT)
113 except Exception as e:
114 print(APP_WRITE_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
115 return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT)
116
117# Receive a message response for MR for the included correlation id
118# URI and parameter, (GET): /receive-response?correlationid=<correlation-id>
119# response: <json-array of 1 response> 200 or empty 204 or other errors 500
120@app.route(APP_READ_URL,
121 methods=['GET'])
122def receiveresponse():
123 global msg_responses
124 global cntr_msg_responses_fetched
125 with lock:
126 print("APP_READ_URL lock")
127 try:
128 id=request.args.get('correlationid')
129 if (id is None):
130 print(APP_READ_URL+" parameter 'correclationid' missing")
131 return Response('Parameter correlationid missing in json', status=500, mimetype=MIME_TEXT)
132
133 if (id in msg_responses):
134 answer=msg_responses[id]
135 del msg_responses[id]
136 print(APP_READ_URL+" response (correlationid="+id+"): " + answer)
137 cntr_msg_responses_fetched += 1
138 return Response(answer, status=200, mimetype=MIME_JSON)
139
140 print(APP_READ_URL+" - no messages (correlationid="+id+"): ")
141 return Response('', status=204, mimetype=MIME_JSON)
142 except Exception as e:
143 print(APP_READ_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
144 return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT)
145
146### Dmaap interface ###
147
148# Read messages stream. URI according to agent configuration.
149# URI, (GET): /events/A1-POLICY-AGENT-READ/users/policy-agent
150# response: 200 <json array of request messages>, or 500 for other errors
151@app.route(AGENT_READ_URL,
152 methods=['GET'])
153def events_read():
154 global msg_requests
155 global cntr_msg_requests_fetched
156
157 limit=request.args.get('limit')
158 if (limit is None):
159 limit=4096
160 else:
161 limit=int(limit)
162 if (limit<0):
163 limit=0
164 if (limit>4096):
165 limit=4096
166 print("Limting number of returned messages to: "+str(limit))
167
168 timeout=request.args.get('timeout')
169 if (timeout is None):
170 timeout=10000
171 else:
172 timeout=min(int(timeout),60000)
173
174 startTime=int(round(time.time() * 1000))
175 currentTime=int(round(time.time() * 1000))
176
177 while(currentTime<startTime+int(timeout)):
178 with lock:
179 if(len(msg_requests)>0):
180 try:
181 msgs=''
182 cntr=0
183 while(cntr<limit and len(msg_requests)>0):
184 if (len(msgs)>1):
185 msgs=msgs+','
186 msgs=msgs+msg_requests.pop(0)
187 cntr_msg_requests_fetched += 1
188 cntr=cntr+1
189 msgs='['+msgs+']'
190 print(AGENT_READ_URL+" MSGs: "+json.dumps(json.loads(msgs), indent=2))
191 return Response(msgs, status=200, mimetype=MIME_JSON)
192 except Exception as e:
193 print(AGENT_READ_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
194 return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT)
195 sleep(0.025) # sleep 25 milliseconds
196 currentTime=int(round(time.time() * 1000))
197
198 print("timeout: "+str(timeout)+", startTime: "+str(startTime)+", currentTime: "+str(currentTime))
199 return Response("[]", status=200, mimetype=MIME_JSON)
200
201# Write messages stream. URI according to agent configuration.
202# URI and payload, (PUT or POST): /events/A1-POLICY-AGENT-WRITE <json array of response messages>
203# response: OK 200 or 400 for missing json parameters, 500 for other errors
204@app.route(AGENT_WRITE_URL,
205 methods=['PUT','POST'])
206def events_write():
207 global msg_responses
208 global cntr_msg_responses_submitted
209 with lock:
210 print("AGENT_WRITE_URL lock")
211 try:
212 answer=request.json
213 print(AGENT_WRITE_URL+ " json=" + json.dumps(answer, indent=2))
BjornMagnussonXA0f9d5172020-06-17 15:43:39 +0200214 if isinstance(answer, dict):
215 #Create a an array if the answer is a dict (single message)
216 answer_list=[]
217 answer_list.append(answer)
218 answer=answer_list
219
ecaiyanlinux67355802020-05-11 12:53:23 +0200220 for item in answer:
221 id=item['correlationId']
222 if (id is None):
223 print(AGENT_WRITE_URL+" parameter 'correlatonid' missing")
224 return Response('Parameter <correlationid> missing in json', status=400, mimetype=MIME_TEXT)
225 msg=item['message']
226 if (msg is None):
227 print(AGENT_WRITE_URL+" parameter 'msgs' missing")
228 return Response('Parameter >message> missing in json', status=400, mimetype=MIME_TEXT)
229 status=item['status']
230 if (status is None):
231 print(AGENT_WRITE_URL+" parameter 'status' missing")
232 return Response('Parameter <status> missing in json', status=400, mimetype=MIME_TEXT)
233 if isinstance(msg, list) or isinstance(msg, dict):
234 msg_str=json.dumps(msg)+status[0:3]
235 else:
236 msg_str=msg+status[0:3]
237 msg_responses[id]=msg_str
238 cntr_msg_responses_submitted += 1
239 print(AGENT_WRITE_URL+ " msg+status (correlationid="+id+") :" + str(msg_str))
240 except Exception as e:
241 print(AGENT_WRITE_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
242 return Response('{"message": "' + SERVER_ERROR + ' ' + str(e) + '","status":"500"}', status=200, mimetype=MIME_JSON)
243
244 return Response('{}', status=200, mimetype=MIME_JSON)
245
246
247### Functions for metrics read out ###
248
249@app.route('/counter/requests_submitted',
250 methods=['GET'])
251def requests_submitted():
252 return Response(str(cntr_msg_requests_submitted), status=200, mimetype=MIME_TEXT)
253
254@app.route('/counter/requests_fetched',
255 methods=['GET'])
256def requests_fetched():
257 return Response(str(cntr_msg_requests_fetched), status=200, mimetype=MIME_TEXT)
258
259@app.route('/counter/responses_submitted',
260 methods=['GET'])
261def responses_submitted():
262 return Response(str(cntr_msg_responses_submitted), status=200, mimetype=MIME_TEXT)
263
264@app.route('/counter/responses_fetched',
265 methods=['GET'])
266def responses_fetched():
267 return Response(str(cntr_msg_responses_fetched), status=200, mimetype=MIME_TEXT)
268
269@app.route('/counter/current_requests',
270 methods=['GET'])
271def current_requests():
272 return Response(str(len(msg_requests)), status=200, mimetype=MIME_TEXT)
273
274@app.route('/counter/current_responses',
275 methods=['GET'])
276def current_responses():
277 return Response(str(len(msg_responses)), status=200, mimetype=MIME_TEXT)
278
279### Admin ###
280
281# Reset all messsages and counters
282@app.route('/reset',
283 methods=['GET', 'POST', 'PUT'])
284def reset():
285 global cntr_msg_requests_submitted
286 global cntr_msg_requests_fetched
287 global cntr_msg_responses_submitted
288 global cntr_msg_responses_fetched
289 global msg_requests
290 global msg_responses
291
292 cntr_msg_requests_submitted=0
293 cntr_msg_requests_fetched=0
294 cntr_msg_responses_submitted=0
295 cntr_msg_responses_fetched=0
296 msg_requests=[]
297 msg_responses={}
298 return Response('OK', status=200, mimetype=MIME_TEXT)
299
300if __name__ == "__main__":
301 app.run(port=HOST_PORT, host=HOST_IP)