Added tests and improvements
Real message router and kafka now works in docker and in kubernetes
Added test of dmaap adapter kafka jobs
Added possibility to collect runtime stats of pods/containers
Improved callback receiver to handle large payloads
Various simplifications, improvements and corrections
Issue-ID: NONRTRIC-618
Signed-off-by: BjornMagnussonXA <bjorn.magnusson@est.tech>
Change-Id: I397b4842bf860a3126cc57ddcef61bd8db3aa76b
diff --git a/test/mrstub/app/main.py b/test/mrstub/app/main.py
index fb6d674..4b1913f 100644
--- a/test/mrstub/app/main.py
+++ b/test/mrstub/app/main.py
@@ -69,11 +69,13 @@
topic_write=""
topic_read=""
+generic_topics_upload_baseurl=""
uploader_thread=None
downloader_thread=None
+generic_uploader_thread=None
-# Function to download messages from dmaap
+# Function to upload PMS messages to dmaap
def dmaap_uploader():
global msg_requests
global cntr_msg_requests_fetched
@@ -107,7 +109,7 @@
sleep(0.01)
-# Function to upload messages to dmaap
+# Function to download PMS messages from dmaap
def dmaap_downloader():
global msg_responses
global cntr_msg_responses_submitted
@@ -150,6 +152,48 @@
except Exception as e:
sleep(1)
+# Function to upload generic messages to dmaap
+def dmaap_generic_uploader():
+ global msg_requests
+ global cntr_msg_requests_fetched
+
+ print("Starting generic uploader")
+
+ headers_json = {'Content-type': 'application/json', 'Accept': '*/*'}
+ headers_text = {'Content-type': 'text/plain', 'Accept': '*/*'}
+
+ while True:
+ if (len(generic_messages)):
+ for topicname in generic_messages.keys(): #topicname contains the path of the topics, eg. "/event/<topic>"
+ topic_queue=generic_messages[topicname]
+ if (len(topic_queue)>0):
+ if (topicname.endswith(".text")):
+ msg=topic_queue[0]
+ headers=headers_text
+ else:
+ msg=topic_queue[0]
+ msg=json.dumps(msg)
+ headers=headers_json
+ url=generic_topics_upload_baseurl+topicname
+ print("Sending to dmaap : "+ url)
+ print("Sending to dmaap : "+ msg)
+ print("Sending to dmaap : "+ str(headers))
+ try:
+ resp=requests.post(url, data=msg, headers=headers, timeout=10)
+ if (resp.status_code<199 & resp.status_code > 299):
+ print("Failed, response code: " + str(resp.status_code))
+ sleep(1)
+ else:
+ print("Dmaap response code: " + str(resp.status_code))
+ print("Dmaap response text: " + str(resp.text))
+ with lock:
+ topic_queue.pop(0)
+ cntr_msg_requests_fetched += 1
+ except Exception as e:
+ print("Failed, exception: "+ str(e))
+ sleep(1)
+ sleep(0.01)
+
#I'm alive function
@app.route('/',
methods=['GET'])
@@ -157,7 +201,7 @@
return 'OK', 200
-# Helper function to create a Dmaap request message
+# Helper function to create a Dmaap PMS request message
# args : <GET|PUT|DELETE> <correlation-id> <json-string-payload - may be None> <url>
# response: json formatted string of a complete Dmaap message
def create_message(operation, correlation_id, payload, url):
@@ -171,7 +215,7 @@
### MR-stub interface, for MR control
-# Send a message to MR
+# Send a PMS message to MR
# URI and parameters (PUT or POST): /send-request?operation=<GET|PUT|POST|DELETE>&url=<url>
# response: <correlation-id> (http 200) o4 400 for parameter error or 500 for other errors
@app.route(APP_WRITE_URL,
@@ -212,7 +256,7 @@
print(APP_WRITE_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT)
-# Receive a message response for MR for the included correlation id
+# Receive a PMS message response for MR for the included correlation id
# URI and parameter, (GET): /receive-response?correlationid=<correlation-id>
# response: <json-array of 1 response> 200 or empty 204 or other errors 500
@app.route(APP_READ_URL,
@@ -243,7 +287,7 @@
### Dmaap interface ###
-# Read messages stream. URI according to agent configuration.
+# Read PMS messages stream. URI according to agent configuration.
# URI, (GET): /events/A1-POLICY-AGENT-READ/users/policy-agent
# response: 200 <json array of request messages>, or 500 for other errors
@app.route(AGENT_READ_URL,
@@ -299,7 +343,7 @@
print("timeout: "+str(timeout)+", start_time: "+str(start_time)+", current_time: "+str(current_time))
return Response("[]", status=200, mimetype=MIME_JSON)
-# Write messages stream. URI according to agent configuration.
+# Write PMS messages stream. URI according to agent configuration.
# URI and payload, (PUT or POST): /events/A1-POLICY-AGENT-WRITE <json array of response messages>
# response: OK 200 or 400 for missing json parameters, 500 for other errors
@app.route(AGENT_WRITE_URL,
@@ -367,10 +411,10 @@
return Response(json.dumps(res), status=200, mimetype=MIME_JSON)
return Response("[]", status=200, mimetype=MIME_JSON)
-# Generic POST/PUT catching all urls starting with /events/<topic>.
+# Generic POST catching all urls starting with /events/<topic>.
# Writes the message in a que for that topic
@app.route("/events/<path>",
- methods=['PUT','POST'])
+ methods=['POST'])
def generic_write(path):
global generic_messages
global cntr_msg_responses_submitted
@@ -378,8 +422,12 @@
write_method=str(request.method)
with lock:
try:
- payload=request.json
- print(write_method+" on "+urlkey+" json=" + json.dumps(payload))
+ if (urlkey.endswith(".text")):
+ payload=str(request.data.decode('UTF-8'))
+ print(write_method+" on "+urlkey+" text=" + payload)
+ else:
+ payload=request.json
+ print(write_method+" on "+urlkey+" json=" + json.dumps(payload))
topicmsgs=[]
if (urlkey in generic_messages.keys()):
topicmsgs=generic_messages[urlkey]
@@ -407,6 +455,9 @@
global generic_messages
global cntr_msg_requests_fetched
+ if generic_topics_upload_baseurl:
+ return Response('Url not available when running as mrstub frontend', status=404, mimetype=MIME_TEXT)
+
urlpath="/events/"+str(path)
urlkey="/events/"+str(path).split("/")[0] #Extract topic
print("GET on topic"+urlkey)
@@ -530,7 +581,14 @@
uploader_thread=Thread(target=dmaap_uploader)
uploader_thread.start()
-else:
+if os.environ['GENERIC_TOPICS_UPLOAD_BASEURL'] is not None:
+ print("GENERIC_TOPICS_UPLOAD_BASEURL:"+os.environ['GENERIC_TOPICS_UPLOAD_BASEURL'])
+ generic_topics_upload_baseurl=os.environ['GENERIC_TOPICS_UPLOAD_BASEURL']
+ if generic_topics_upload_baseurl and generic_uploader_thread is None:
+ generic_uploader_thread=Thread(target=dmaap_generic_uploader)
+ generic_uploader_thread.start()
+
+if os.getenv("TOPIC_READ") is None or os.environ['GENERIC_TOPICS_UPLOAD_BASEURL'] is None:
print("No env variables - OK")
if __name__ == "__main__":