blob: b43ee29e2902fabc01a7af1a66644d46c90067a6 [file] [log] [blame]
Gary Wu9abb61c2018-09-27 10:38:50 -07001'''
2Created on Aug 18, 2017
3
4@author: sw6830
5'''
6from robot.api import logger
7from Queue import Queue
8import uuid
9import time
10import datetime
11import json
12import threading
13import os
14import platform
15import subprocess
16import paramiko
17import DcaeVariables
18import DMaaP
19
20
21class DcaeLibrary(object):
22
23 def __init__(self):
24 pass
25
26 @staticmethod
27 def setup_dmaap_server(port_num=3904):
28 if DcaeVariables.HttpServerThread is not None:
29 DMaaP.clean_up_event()
30 logger.console("Clean up event from event queue before test")
31 logger.info("DMaaP Server already started")
32 return "true"
33
34 DcaeVariables.IsRobotRun = True
35 DMaaP.test(port=port_num)
36 try:
37 DcaeVariables.VESEventQ = Queue()
38 DcaeVariables.HttpServerThread = threading.Thread(name='DMAAP_HTTPServer', target=DMaaP.DMaaPHttpd.serve_forever)
39 DcaeVariables.HttpServerThread.start()
40 logger.console("DMaaP Mockup Sever started")
41 time.sleep(2)
42 return "true"
43 except Exception as e:
44 print (str(e))
45 return "false"
46
47 @staticmethod
48 def shutdown_dmaap():
49 if DcaeVariables.HTTPD is not None:
50 DcaeVariables.HTTPD.shutdown()
51 logger.console("DMaaP Server shut down")
52 time.sleep(3)
53 return "true"
54 else:
55 return "false"
56
57 @staticmethod
58 def cleanup_ves_events():
59 if DcaeVariables.HttpServerThread is not None:
60 DMaaP.clean_up_event()
61 logger.console("DMaaP event queue is cleaned up")
62 return "true"
63 logger.console("DMaaP server not started yet")
64 return "false"
65
66 @staticmethod
67 def enable_vesc_https_auth():
68 global client
69 if 'Windows' in platform.system():
70 try:
71 client = paramiko.SSHClient()
72 client.load_system_host_keys()
73 # client.set_missing_host_key_policy(paramiko.WarningPolicy)
74 client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
75
76 client.connect(os.environ['CSIT_IP'], port=22, username=os.environ['CSIT_USER'], password=os.environ['CSIT_PD'])
77 stdin, stdout, stderr = client.exec_command('%{WORKSPACE}/test/csit/tests/dcaegen2/testcases/resources/vesc_enable_https_auth.sh')
78 logger.console(stdout.read())
79 finally:
80 client.close()
81 return
82 ws = os.environ['WORKSPACE']
83 script2run = ws + "/test/csit/tests/dcaegen2/testcases/resources/vesc_enable_https_auth.sh"
84 logger.info("Running script: " + script2run)
85 logger.console("Running script: " + script2run)
86 subprocess.call(script2run)
87 time.sleep(5)
88 return
89
90 @staticmethod
91 def dmaap_message_receive(evtobj, action='contain'):
92
93 evt_str = DMaaP.deque_event()
94 while evt_str != None:
95 logger.console("DMaaP receive VES Event:\n" + evt_str)
96 if action == 'contain':
97 if evtobj in evt_str:
98 logger.info("DMaaP Receive Expected Publish Event:\n" + evt_str)
99 return 'true'
100 if action == 'sizematch':
101 if len(evtobj) == len(evt_str):
102 return 'true'
103 if action == 'dictmatch':
104 evt_dict = json.loads(evt_str)
105 if cmp(evtobj, evt_dict) == 0:
106 return 'true'
107 evt_str = DMaaP.deque_event()
108 return 'false'
109
110 @staticmethod
111 def is_json_empty(resp):
112 logger.info("Enter is_json_empty: resp.text: " + resp.text)
113 if resp.text is None or len(resp.text) < 2:
114 return 'True'
115 return 'False'
116
117 @staticmethod
118 def generate_uuid():
119 """generate a uuid"""
120 return uuid.uuid4()
121
122 @staticmethod
123 def get_json_value_list(jsonstr, keyval):
124 logger.info("Enter Get_Json_Key_Value_List")
125 if jsonstr is None or len(jsonstr) < 2:
126 logger.info("No Json data found")
127 return []
128 try:
129 data = json.loads(jsonstr)
130 nodelist = []
131 for item in data:
132 nodelist.append(item[keyval])
133 return nodelist
134 except Exception as e:
135 logger.info("Json data parsing fails")
136 print str(e)
137 return []
138
139 @staticmethod
140 def generate_millitimestamp_uuid():
141 """generate a millisecond timestamp uuid"""
142 then = datetime.datetime.now()
143 return int(time.mktime(then.timetuple())*1e3 + then.microsecond/1e3)
144
145 @staticmethod
146 def test():
147 import json
148 from pprint import pprint
149
150 with open('robot/assets/dcae/ves_volte_single_fault_event.json') as data_file:
151 data = json.load(data_file)
152
153 data['event']['commonEventHeader']['version'] = '5.0'
154 pprint(data)
155
156
157if __name__ == '__main__':
158 '''
159 dictStr = "action=getTable,Accept=application/json,Content-Type=application/json,X-FromAppId=1234908903284"
160 cls = DcaeLibrary()
161 #dict = cls.create_header_from_string(dictStr)
162 #print str(dict)
163 jsonStr = "[{'Node': 'onapfcnsl00', 'CheckID': 'serfHealth', 'Name': 'Serf Health Status', 'ServiceName': '', 'Notes': '', 'ModifyIndex': 6, 'Status': 'passing', 'ServiceID': '', 'ServiceTags': [], 'Output': 'Agent alive and reachable', 'CreateIndex': 6}]"
164 lsObj = cls.get_json_value_list(jsonStr, 'Status')
165 print lsObj
166 '''
167
168 lib = DcaeLibrary()
169 lib.enable_vesc_https_auth()
170
171 ret = lib.setup_dmaap_server()
172 print ret
173 time.sleep(100000)