blob: 5bb09885cadfb9b5cfdebd27a2113f9aa31d8c02 [file] [log] [blame]
ecaiyanlinux67355802020-05-11 12:53:23 +02001
2# ============LICENSE_START===============================================
3# Copyright (C) 2020 Nordix Foundation. All rights reserved.
4# ========================================================================
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16# ============LICENSE_END=================================================
17#
18
19from flask import Flask, request
20from time import sleep
21import time
22import datetime
23import json
24from flask import Flask
25from flask import Response
26import traceback
27from threading import RLock
28
29app = Flask(__name__)
30lock = RLock()
31# list of messages to/from Dmaap
32msg_requests=[]
33msg_responses={}
34
35# Server info
36HOST_IP = "::"
37HOST_PORT = 2222
38
39# Metrics vars
40cntr_msg_requests_submitted=0
41cntr_msg_requests_fetched=0
42cntr_msg_responses_submitted=0
43cntr_msg_responses_fetched=0
44
45# Request and response constants
46AGENT_WRITE_URL="/events/A1-POLICY-AGENT-WRITE"
47AGENT_READ_URL="/events/A1-POLICY-AGENT-READ/users/policy-agent"
48APP_WRITE_URL="/send-request"
49APP_READ_URL="/receive-response"
50MIME_TEXT="text/plain"
51MIME_JSON="application/json"
52CAUGHT_EXCEPTION="Caught exception: "
53SERVER_ERROR="Server error :"
54
55#I'm alive function
56@app.route('/',
57 methods=['GET'])
58def index():
59 return 'OK', 200
60
61
62# Helper function to create a Dmaap request message
63# args : <GET|PUT|DELETE> <correlation-id> <json-string-payload - may be None> <url>
64# response: json formatted string of a complete Dmaap message
65def create_message(operation, correlation_id, payload, url):
66 if (payload is None):
67 payload="{}"
68 time_stamp=datetime.datetime.utcnow()
69 msg = '{\"apiVersion\":\"1.0\",\"operation\":\"'+operation+'\",\"correlationId\":\"'+correlation_id+'\",\"originatorId\": \"849e6c6b420\",'
70 msg = msg + '\"payload\":'+payload+',\"requestId\":\"23343221\", \"target\":\"policy-agent\", \"timestamp\":\"'+str(time_stamp)+'\", \"type\":\"request\",\"url\":\"'+url+'\"}'
71 return msg
72
73
74### MR-stub interface, for MR control
75
76# Send a message to MR
77# URI and parameters (GET): /send-request?operation=<GET|PUT|POST|DELETE>&url=<url>
78# response: <correlation-id> (http 200) o4 400 for parameter error or 500 for other errors
79@app.route(APP_WRITE_URL,
80 methods=['PUT','POST'])
81def sendrequest():
82 global msg_requests
83 global cntr_msg_requests_submitted
84 with lock:
85 print("APP_WRITE_URL lock")
86 try:
87 oper=request.args.get('operation')
88 if (oper is None):
89 print(APP_WRITE_URL+" parameter 'operation' missing")
90 return Response('Parameter operation missing in request', status=400, mimetype=MIME_TEXT)
91
92 url=request.args.get('url')
93 if (url is None):
94 print(APP_WRITE_URL+" parameter 'url' missing")
95 return Response('Parameter url missing in request', status=400, mimetype=MIME_TEXT)
96
97 if (oper != "GET" and oper != "PUT" and oper != "POST" and oper != "DELETE"):
98 print(APP_WRITE_URL+" parameter 'operation' need to be: DEL|PUT|POST|DELETE")
99 return Response('Parameter operation does not contain DEL|PUT|POST|DELETE in request', status=400, mimetype=MIME_TEXT)
100
101 print(APP_WRITE_URL+" operation="+oper+" url="+url)
102 correlation_id=str(time.time_ns())
103 payload=None
104 if (oper == "PUT") and (request.json is not None):
105 payload=json.dumps(request.json)
106
107 msg=create_message(oper, correlation_id, payload, url)
108 print(msg)
109 print(APP_WRITE_URL+" MSG(correlationid = "+correlation_id+"): " + json.dumps(json.loads(msg), indent=2))
110 msg_requests.append(msg)
111 cntr_msg_requests_submitted += 1
112 return Response(correlation_id, status=200, mimetype=MIME_TEXT)
113 except Exception as e:
114 print(APP_WRITE_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
115 return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT)
116
117# Receive a message response for MR for the included correlation id
118# URI and parameter, (GET): /receive-response?correlationid=<correlation-id>
119# response: <json-array of 1 response> 200 or empty 204 or other errors 500
120@app.route(APP_READ_URL,
121 methods=['GET'])
122def receiveresponse():
123 global msg_responses
124 global cntr_msg_responses_fetched
125 with lock:
126 print("APP_READ_URL lock")
127 try:
128 id=request.args.get('correlationid')
129 if (id is None):
130 print(APP_READ_URL+" parameter 'correclationid' missing")
131 return Response('Parameter correlationid missing in json', status=500, mimetype=MIME_TEXT)
132
133 if (id in msg_responses):
134 answer=msg_responses[id]
135 del msg_responses[id]
136 print(APP_READ_URL+" response (correlationid="+id+"): " + answer)
137 cntr_msg_responses_fetched += 1
138 return Response(answer, status=200, mimetype=MIME_JSON)
139
140 print(APP_READ_URL+" - no messages (correlationid="+id+"): ")
141 return Response('', status=204, mimetype=MIME_JSON)
142 except Exception as e:
143 print(APP_READ_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
144 return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT)
145
146### Dmaap interface ###
147
148# Read messages stream. URI according to agent configuration.
149# URI, (GET): /events/A1-POLICY-AGENT-READ/users/policy-agent
150# response: 200 <json array of request messages>, or 500 for other errors
151@app.route(AGENT_READ_URL,
152 methods=['GET'])
153def events_read():
154 global msg_requests
155 global cntr_msg_requests_fetched
156
157 limit=request.args.get('limit')
158 if (limit is None):
159 limit=4096
160 else:
161 limit=int(limit)
162 if (limit<0):
163 limit=0
164 if (limit>4096):
165 limit=4096
166 print("Limting number of returned messages to: "+str(limit))
167
168 timeout=request.args.get('timeout')
169 if (timeout is None):
170 timeout=10000
171 else:
172 timeout=min(int(timeout),60000)
173
174 startTime=int(round(time.time() * 1000))
175 currentTime=int(round(time.time() * 1000))
176
177 while(currentTime<startTime+int(timeout)):
178 with lock:
179 if(len(msg_requests)>0):
180 try:
181 msgs=''
182 cntr=0
183 while(cntr<limit and len(msg_requests)>0):
184 if (len(msgs)>1):
185 msgs=msgs+','
186 msgs=msgs+msg_requests.pop(0)
187 cntr_msg_requests_fetched += 1
188 cntr=cntr+1
189 msgs='['+msgs+']'
190 print(AGENT_READ_URL+" MSGs: "+json.dumps(json.loads(msgs), indent=2))
191 return Response(msgs, status=200, mimetype=MIME_JSON)
192 except Exception as e:
193 print(AGENT_READ_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
194 return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT)
195 sleep(0.025) # sleep 25 milliseconds
196 currentTime=int(round(time.time() * 1000))
197
198 print("timeout: "+str(timeout)+", startTime: "+str(startTime)+", currentTime: "+str(currentTime))
199 return Response("[]", status=200, mimetype=MIME_JSON)
200
201# Write messages stream. URI according to agent configuration.
202# URI and payload, (PUT or POST): /events/A1-POLICY-AGENT-WRITE <json array of response messages>
203# response: OK 200 or 400 for missing json parameters, 500 for other errors
204@app.route(AGENT_WRITE_URL,
205 methods=['PUT','POST'])
206def events_write():
207 global msg_responses
208 global cntr_msg_responses_submitted
209 with lock:
210 print("AGENT_WRITE_URL lock")
211 try:
212 answer=request.json
213 print(AGENT_WRITE_URL+ " json=" + json.dumps(answer, indent=2))
214 for item in answer:
215 id=item['correlationId']
216 if (id is None):
217 print(AGENT_WRITE_URL+" parameter 'correlatonid' missing")
218 return Response('Parameter <correlationid> missing in json', status=400, mimetype=MIME_TEXT)
219 msg=item['message']
220 if (msg is None):
221 print(AGENT_WRITE_URL+" parameter 'msgs' missing")
222 return Response('Parameter >message> missing in json', status=400, mimetype=MIME_TEXT)
223 status=item['status']
224 if (status is None):
225 print(AGENT_WRITE_URL+" parameter 'status' missing")
226 return Response('Parameter <status> missing in json', status=400, mimetype=MIME_TEXT)
227 if isinstance(msg, list) or isinstance(msg, dict):
228 msg_str=json.dumps(msg)+status[0:3]
229 else:
230 msg_str=msg+status[0:3]
231 msg_responses[id]=msg_str
232 cntr_msg_responses_submitted += 1
233 print(AGENT_WRITE_URL+ " msg+status (correlationid="+id+") :" + str(msg_str))
234 except Exception as e:
235 print(AGENT_WRITE_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
236 return Response('{"message": "' + SERVER_ERROR + ' ' + str(e) + '","status":"500"}', status=200, mimetype=MIME_JSON)
237
238 return Response('{}', status=200, mimetype=MIME_JSON)
239
240
241### Functions for metrics read out ###
242
243@app.route('/counter/requests_submitted',
244 methods=['GET'])
245def requests_submitted():
246 return Response(str(cntr_msg_requests_submitted), status=200, mimetype=MIME_TEXT)
247
248@app.route('/counter/requests_fetched',
249 methods=['GET'])
250def requests_fetched():
251 return Response(str(cntr_msg_requests_fetched), status=200, mimetype=MIME_TEXT)
252
253@app.route('/counter/responses_submitted',
254 methods=['GET'])
255def responses_submitted():
256 return Response(str(cntr_msg_responses_submitted), status=200, mimetype=MIME_TEXT)
257
258@app.route('/counter/responses_fetched',
259 methods=['GET'])
260def responses_fetched():
261 return Response(str(cntr_msg_responses_fetched), status=200, mimetype=MIME_TEXT)
262
263@app.route('/counter/current_requests',
264 methods=['GET'])
265def current_requests():
266 return Response(str(len(msg_requests)), status=200, mimetype=MIME_TEXT)
267
268@app.route('/counter/current_responses',
269 methods=['GET'])
270def current_responses():
271 return Response(str(len(msg_responses)), status=200, mimetype=MIME_TEXT)
272
273### Admin ###
274
275# Reset all messsages and counters
276@app.route('/reset',
277 methods=['GET', 'POST', 'PUT'])
278def reset():
279 global cntr_msg_requests_submitted
280 global cntr_msg_requests_fetched
281 global cntr_msg_responses_submitted
282 global cntr_msg_responses_fetched
283 global msg_requests
284 global msg_responses
285
286 cntr_msg_requests_submitted=0
287 cntr_msg_requests_fetched=0
288 cntr_msg_responses_submitted=0
289 cntr_msg_responses_fetched=0
290 msg_requests=[]
291 msg_responses={}
292 return Response('OK', status=200, mimetype=MIME_TEXT)
293
294if __name__ == "__main__":
295 app.run(port=HOST_PORT, host=HOST_IP)