blob: 650f8fef84b78ee698979d344d1fb28fa5c6adf1 [file] [log] [blame]
'''
Created on Aug 18, 2017
@author: sw6830
'''
from robot.api import logger
from Queue import Queue
import uuid, time, datetime,json, threading
import DcaeVariables
import DMaaP
class DcaeLibrary(object):
def __init__(self):
pass
def setup_dmaap_server(self, portNum=3904):
if DcaeVariables.HttpServerThread != None:
DMaaP.cleanUpEvent()
logger.console("Clean up event from event queue before test")
logger.info("DMaaP Server already started")
return "true"
DcaeVariables.IsRobotRun = True
DMaaP.test(port=portNum)
try:
DcaeVariables.VESEventQ = Queue()
DcaeVariables.HttpServerThread = threading.Thread(name='DMAAP_HTTPServer', target=DMaaP.DMaaPHttpd.serve_forever)
DcaeVariables.HttpServerThread.start()
logger.console("DMaaP Mockup Sever started")
time.sleep(2)
return "true"
except Exception as e:
print (str(e))
return "false"
def shutdown_dmaap(self):
if DcaeVariables.HTTPD != None:
DcaeVariables.HTTPD.shutdown()
logger.console("DMaaP Server shut down")
time.sleep(3)
return "true"
else:
return "false"
def cleanup_ves_events(self):
if DcaeVariables.HttpServerThread != None:
DMaaP.cleanUpEvent()
logger.console("DMaaP event queue is cleaned up")
return "true"
logger.console("DMaaP server not started yet")
return "false"
def dmaap_message_receive(self, evtobj, action='contain'):
evtStr = DMaaP.dequeEvent()
while evtStr != None:
logger.console("DMaaP receive VES Event:\n" + evtStr)
if action == 'contain':
if evtobj in evtStr:
logger.info("DMaaP Receive Expected Publish Event:\n" + evtStr)
return 'true'
if action == 'sizematch':
if len(evtobj) == len(evtStr):
return 'true'
if action == 'dictmatch':
evtDict = json.loads(evtStr)
if cmp(evtobj, evtDict) == 0:
return 'true'
evtStr = DMaaP.dequeEvent()
return 'false'
def create_header_from_string(self, dictStr):
logger.info("Enter create_header_from_string: dictStr")
return dict(u.split("=") for u in dictStr.split(","))
def is_json_empty(self, resp):
logger.info("Enter is_json_empty: resp.text: " + resp.text)
if resp.text == None or len(resp.text) < 2:
return 'True'
return 'False'
def Generate_UUID(self):
"""generate a uuid"""
return uuid.uuid4()
def get_json_value_list(self, jsonstr, keyval):
logger.info("Enter Get_Json_Key_Value_List")
if jsonstr == None or len(jsonstr) < 2:
logger.info("No Json data found")
return []
try:
data = json.loads(jsonstr)
nodelist = []
for item in data:
nodelist.append(item[keyval])
return nodelist
except Exception as e:
logger.info("Json data parsing fails")
print str(e)
return []
def generate_MilliTimestamp_UUID(self):
"""generate a millisecond timestamp uuid"""
then = datetime.datetime.now()
return int(time.mktime(then.timetuple())*1e3 + then.microsecond/1e3)
def test (self):
import json
from pprint import pprint
with open('robot/assets/dcae/ves_volte_single_fault_event.json') as data_file:
data = json.load(data_file)
data['event']['commonEventHeader']['version'] = '5.0'
pprint(data)
if __name__ == '__main__':
'''
dictStr = "action=getTable,Accept=application/json,Content-Type=application/json,X-FromAppId=1234908903284"
cls = DcaeLibrary()
#dict = cls.create_header_from_string(dictStr)
#print str(dict)
jsonStr = "[{'Node': 'onapfcnsl00', 'CheckID': 'serfHealth', 'Name': 'Serf Health Status', 'ServiceName': '', 'Notes': '', 'ModifyIndex': 6, 'Status': 'passing', 'ServiceID': '', 'ServiceTags': [], 'Output': 'Agent alive and reachable', 'CreateIndex': 6}]"
lsObj = cls.get_json_value_list(jsonStr, 'Status')
print lsObj
'''
lib = DcaeLibrary()
ret = lib.setup_dmaap_server()
print ret
time.sleep(10000000000)