blob: 93a8fdbc289dd131fe2f2df8bdda82ac22b9b5bd [file] [log] [blame]
ehietala12486342022-06-15 12:21:41 +03001import os
2import requests
3from zipfile import ZipFile
4from requests import Response as response
5from ricxappframe.xapp_symptomdata import Symptomdata
6
7
8def new_loaddata(*args, **kwargs):
9 # Your custom testing override
10 return ""
11
12
13class MockResponse(object):
14 def __init__(self, reponse, jsonout):
15 self.status_code = response
16 self.url = 'http://lwsd.ricplt:8089/ric/v1/lwsd'
17 self.headers = {'Content-type': 'application/json'}
18 self.jsonout = jsonout
19
20 def json(self):
21 return self.jsonout
22
23 def raise_for_status(self):
24 return self.status_code
25
26
27class MockStat(object):
28 def __init__(self, st_ctime=None):
29 self.st_ctime = st_ctime
30
31
32class MockOs(object):
33 def __init__(self, walk=None, st_ctime=None):
34 self.walkresp = walk
35 self.st_ctime = st_ctime
36
37 def walk(self, path):
38 return self.walkresp
39
40 def stat(self, filename):
41 st = MockStat(self.st_ctime)
42 return st
43
44
45def test_symptomdata_subscribe(monkeypatch):
46 def mock_requests_post(uri, data, headers, proxies):
47 print("%s %s" % (uri, data))
48 return MockResponse(200, [{'service': 'xapp.service'}])
49
50 def mock_requests_get(uri, headers, proxies):
51 print("%s" % (uri))
52 return MockResponse(200, [])
53
54 # mock the http get and post
55 monkeypatch.setattr(requests, 'post', mock_requests_post)
56 monkeypatch.setattr(requests, 'get', mock_requests_get)
57
58 # this will return not found
59 s = Symptomdata("xapp", "xapp.ricxapp.service", "tmp", "http://lwsd.ricplt:8089/ric/v1/lwsd")
60 # stop timer loop
61 s.stop()
62 # make subscription
63 s.subscribe(None)
64 assert s.lwsdok is True
65
66
67def test_symptomdata_subscribe_exists(monkeypatch):
68 def mock_requests_get(uri, headers, proxies):
69 print("%s" % (uri))
70 return MockResponse(200, [{'service': 'xapp_other'}, {'service': 'xapp'}])
71
72 # mock the http get
73 monkeypatch.setattr(requests, 'get', mock_requests_get)
74
75 # this will return not found
76 s = Symptomdata("xapp", "xapp.ricxapp.service", "tmp", "http://lwsd.ricplt:8089/ric/v1/lwsd")
77 # stop timer loop
78 s.stop()
79 assert s.lwsdok is True
80
81
82def test_symptomdata_collect_time(monkeypatch):
83 myos = MockOs(walk=[('mydir', (), ('file1.csv', 'file2.csv', 'file3.txt', 'file.json'))], st_ctime=1647502471)
84
85 def mock_requests_get(uri, headers, proxies):
86 return MockResponse(200, [{'service': 'xapp_other'}, {'service': 'xapp'}])
87
88 def mock_os_walk(path):
89 return myos.walk(path)
90
91 def mock_os_stat(filename):
92 return myos.stat(filename)
93
94 def mock_zipfile_write(me, fromfile, tofile):
95 return
96
97 # mock the http get
98 monkeypatch.setattr(requests, 'get', mock_requests_get)
99 # mock the os walk
100 monkeypatch.setattr(os, 'walk', mock_os_walk)
101 # mock the os stat
102 monkeypatch.setattr(os, 'stat', mock_os_stat)
103
104 # mock the zipfile stat
105 monkeypatch.setattr(ZipFile, 'write', mock_zipfile_write)
106
107 # this will return not found
108 s = Symptomdata("xapp", "xapp.ricxapp.service", "tmp", "http://lwsd.ricplt:8089/ric/v1/lwsd")
109 # stop timer loop
110 s.stop()
111 assert s.lwsdok is True
112
113 zipfile = s.collect("zipfile.zip", (r'/tmp/csv/.*\.csv', r'/tmp/json/.*\.json'), 1647502470, 1647502570)
114 assert zipfile is not None
115
116
117def test_symptomdata_collect(monkeypatch):
118 myos = MockOs(walk=[('mydir', (), ('file1.csv', 'file2.csv', 'file3.txt', 'file.json'))], st_ctime=1647502471)
119
120 def mock_requests_get(uri, headers, proxies):
121 return MockResponse(200, [{'service': 'xapp_other'}, {'service': 'xapp'}])
122
123 def mock_os_walk(path):
124 return myos.walk(path)
125
126 def mock_os_stat(filename):
127 return myos.stat(filename)
128
129 def mock_zipfile_write(me, fromfile, tofile):
130 return
131
132 # mock the http get
133 monkeypatch.setattr(requests, 'get', mock_requests_get)
134 # mock the os walk
135 monkeypatch.setattr(os, 'walk', mock_os_walk)
136 # mock the os stat
137 monkeypatch.setattr(os, 'stat', mock_os_stat)
138
139 # mock the zipfile stat
140 monkeypatch.setattr(ZipFile, 'write', mock_zipfile_write)
141
142 # this will return not found
143 s = Symptomdata("xapp", "xapp.ricxapp.service", "tmp", "http://lwsd.ricplt:8089/ric/v1/lwsd")
144 # stop timer loop
145 s.stop()
146 assert s.lwsdok is True
147
148 zipfile = s.collect("zipfile.zip", ('/tmp/csv/.*.csv', '/tmp/json/.*.json'), 0, 0)
149 assert zipfile is not None