Upgraded test env with Kubernetes support
Issue-ID: NONRTRIC-356
Signed-off-by: BjornMagnussonXA <bjorn.magnusson@est.tech>
Change-Id: I942b37c05077b3ba753b3327455d6babed8f6061
diff --git a/test/mrstub/app/main.py b/test/mrstub/app/main.py
index 75b23f1..db13fc0 100644
--- a/test/mrstub/app/main.py
+++ b/test/mrstub/app/main.py
@@ -24,8 +24,10 @@
from flask import Flask
from flask import Response
import traceback
-from threading import RLock
+from threading import RLock, Thread
import logging
+import os
+import requests
# Disable all logging of GET on reading counters
class AjaxFilter(logging.Filter):
@@ -61,6 +63,89 @@
CAUGHT_EXCEPTION="Caught exception: "
SERVER_ERROR="Server error :"
+topic_write=""
+topic_read=""
+
+uploader_thread=None
+downloader_thread=None
+
+# Function to download messages from dmaap
+def dmaap_uploader():
+ global msg_requests
+ global cntr_msg_requests_fetched
+
+ print("Starting uploader")
+
+ headers = {'Content-type': 'application/json', 'Accept': '*/*'}
+ #url="http://"+topic_host+"/events/"+topic_read
+ url=topic_read
+
+ while True:
+ while (len(msg_requests)>0):
+ msg=msg_requests[0]
+ if msg is not None:
+ try:
+ print("Sending to dmaap : "+ url)
+ print("Sending to dmaap : "+ msg)
+ resp=requests.post(url, data=msg, headers=headers, timeout=10)
+ if (resp.status_code<199 & resp.status_code > 299):
+ print("Failed, response code: " + str(resp.status_code))
+ sleep(1)
+ else:
+ print("Dmaap response code: " + str(resp.status_code))
+ print("Dmaap response text: " + str(resp.text))
+ with lock:
+ msg_requests.pop(0)
+ cntr_msg_requests_fetched += 1
+ except Exception as e:
+ print("Failed, exception: "+ str(e))
+ sleep(1)
+ sleep(0.01)
+
+
+# Function to download messages from dmaap
+def dmaap_downloader():
+ global msg_responses
+ global cntr_msg_responses_submitted
+
+ print("Starting uploader")
+
+ while True:
+
+ try :
+ #url="http://"+topic_host+"/events/"+topic_write+"/users/mr-stub?timeout=15000&limit=100"
+ url=topic_write
+ headers = {'Accept': 'application/json'}
+ print("Reading from dmaap: " + url)
+ resp=requests.get(url, headers=headers)
+ if (resp.status_code<199 & resp.status_code > 299):
+ print("Failed, response code: " + resp.status_code)
+ sleep(1)
+ else:
+ print("Recieved data from dmaap mr")
+ try:
+ data=resp.json()
+ print("Recieved data (raw): " + str(resp.text))
+ if isinstance(data, list):
+ for item in data:
+ item=json.loads(item)
+ corrid=str(item["correlationId"])
+ status=str(item["status"])
+ msg=str(item["message"])
+ item_str=msg+status[0:3]
+ with lock:
+ msg_responses[corrid]=item_str
+ cntr_msg_responses_submitted += 1
+ else:
+ print("Data from dmaap is not json array: " + str(resp.text))
+ sleep(1)
+ except Exception as e:
+ print("Corrupt data from dmaap mr - dropping " + str(data))
+ print("CAUGHT_EXCEPTION" + str(e) + " "+traceback.format_exc())
+ sleep(1)
+ except Exception as e:
+ sleep(1)
+
#I'm alive function
@app.route('/',
methods=['GET'])
@@ -134,19 +219,19 @@
with lock:
print("APP_READ_URL lock")
try:
- id=request.args.get('correlationid')
- if (id is None):
+ cid=request.args.get('correlationid')
+ if (cid is None):
print(APP_READ_URL+" parameter 'correclationid' missing")
return Response('Parameter correlationid missing in json', status=500, mimetype=MIME_TEXT)
- if (id in msg_responses):
- answer=msg_responses[id]
- del msg_responses[id]
- print(APP_READ_URL+" response (correlationid="+id+"): " + answer)
+ if (cid in msg_responses):
+ answer=msg_responses[cid]
+ del msg_responses[cid]
+ print(APP_READ_URL+" response (correlationid="+cid+"): " + answer)
cntr_msg_responses_fetched += 1
return Response(answer, status=200, mimetype=MIME_JSON)
- print(APP_READ_URL+" - no messages (correlationid="+id+"): ")
+ print(APP_READ_URL+" - no messages (correlationid="+cid+"): ")
return Response('', status=204, mimetype=MIME_JSON)
except Exception as e:
print(APP_READ_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
@@ -163,6 +248,9 @@
global msg_requests
global cntr_msg_requests_fetched
+ if topic_write or topic_read:
+ return Response('Url not available when running as mrstub frontend', status=404, mimetype=MIME_TEXT)
+
limit=request.args.get('limit')
if (limit is None):
limit=4096
@@ -180,10 +268,10 @@
else:
timeout=min(int(timeout),60000)
- startTime=int(round(time.time() * 1000))
- currentTime=int(round(time.time() * 1000))
+ start_time=int(round(time.time() * 1000))
+ current_time=int(round(time.time() * 1000))
- while(currentTime<startTime+int(timeout)):
+ while(current_time<start_time+int(timeout)):
with lock:
if(len(msg_requests)>0):
try:
@@ -202,9 +290,9 @@
print(AGENT_READ_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
return Response(SERVER_ERROR+" "+str(e), status=500, mimetype=MIME_TEXT)
sleep(0.025) # sleep 25 milliseconds
- currentTime=int(round(time.time() * 1000))
+ current_time=int(round(time.time() * 1000))
- print("timeout: "+str(timeout)+", startTime: "+str(startTime)+", currentTime: "+str(currentTime))
+ print("timeout: "+str(timeout)+", start_time: "+str(start_time)+", current_time: "+str(current_time))
return Response("[]", status=200, mimetype=MIME_JSON)
# Write messages stream. URI according to agent configuration.
@@ -215,6 +303,10 @@
def events_write():
global msg_responses
global cntr_msg_responses_submitted
+
+ if topic_write or topic_read:
+ return Response('Url not available when running as mrstub frontend', status=404, mimetype=MIME_TEXT)
+
with lock:
print("AGENT_WRITE_URL lock")
try:
@@ -227,8 +319,8 @@
answer=answer_list
for item in answer:
- id=item['correlationId']
- if (id is None):
+ cid=item['correlationId']
+ if (cid is None):
print(AGENT_WRITE_URL+" parameter 'correlatonid' missing")
return Response('Parameter <correlationid> missing in json', status=400, mimetype=MIME_TEXT)
msg=item['message']
@@ -243,9 +335,9 @@
msg_str=json.dumps(msg)+status[0:3]
else:
msg_str=msg+status[0:3]
- msg_responses[id]=msg_str
+ msg_responses[cid]=msg_str
cntr_msg_responses_submitted += 1
- print(AGENT_WRITE_URL+ " msg+status (correlationid="+id+") :" + str(msg_str))
+ print(AGENT_WRITE_URL+ " msg+status (correlationid="+cid+") :" + str(msg_str))
except Exception as e:
print(AGENT_WRITE_URL+"-"+CAUGHT_EXCEPTION+" "+str(e) + " "+traceback.format_exc())
return Response('{"message": "' + SERVER_ERROR + ' ' + str(e) + '","status":"500"}', status=200, mimetype=MIME_JSON)
@@ -306,5 +398,27 @@
msg_responses={}
return Response('OK', status=200, mimetype=MIME_TEXT)
+# Get env vars, if present
+if os.getenv("TOPIC_READ") is not None:
+
+ print("Env variables:")
+ print("TOPIC_READ:"+os.environ['TOPIC_READ'])
+ print("TOPIC_WRITE:"+os.environ['TOPIC_WRITE'])
+
+ topic_read=os.environ['TOPIC_READ']
+ topic_write=os.environ['TOPIC_WRITE']
+
+
+ if topic_read and downloader_thread is None:
+ downloader_thread=Thread(target=dmaap_downloader)
+ downloader_thread.start()
+
+ if topic_write and uploader_thread is None:
+ uploader_thread=Thread(target=dmaap_uploader)
+ uploader_thread.start()
+
+else:
+ print("No env variables - OK")
+
if __name__ == "__main__":
app.run(port=HOST_PORT, host=HOST_IP)
\ No newline at end of file
diff --git a/test/mrstub/app/nginx.conf b/test/mrstub/app/nginx.conf
index 60b1dd9..c548e56 100644
--- a/test/mrstub/app/nginx.conf
+++ b/test/mrstub/app/nginx.conf
@@ -28,10 +28,10 @@
default_type application/octet-stream;
server { # simple reverse-proxy
- listen 3905;
- listen [::]:3905;
- listen 3906 ssl;
- listen [::]:3906 ssl;
+ listen 3904;
+ listen [::]:3904;
+ listen 3905 ssl;
+ listen [::]:3905 ssl;
server_name localhost;
ssl_certificate /usr/src/app/cert/cert.crt;
ssl_certificate_key /usr/src/app/cert/key.crt;
diff --git a/test/mrstub/app/requirements.txt b/test/mrstub/app/requirements.txt
index 8fd414f..8c3d61f 100644
--- a/test/mrstub/app/requirements.txt
+++ b/test/mrstub/app/requirements.txt
@@ -1,2 +1,3 @@
pip==20.1
-Flask==1.1.2
\ No newline at end of file
+Flask==1.1.2
+requests==2.25.1
\ No newline at end of file