blob: cd806ac1a651a9ce26fa3d5e1be544e667a61e4d [file] [log] [blame]
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -04001# ==================================================================================
2# Copyright (c) 2020 Nokia
3# Copyright (c) 2020 AT&T Intellectual Property.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16# ==================================================================================
17import json
18import time
19from contextlib import suppress
Tommy Carpenter09894e32020-04-02 19:45:19 -040020from ricxappframe.xapp_frame import Xapp, RMRXapp, RIC_HEALTH_CHECK_REQ, RIC_HEALTH_CHECK_RESP
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -040021
22rmr_xapp = None
Tommy Carpenter09894e32020-04-02 19:45:19 -040023rmr_xapp_health = None
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -040024gen_xapp = None
25
26
Tommy Carpenter09894e32020-04-02 19:45:19 -040027def test_rmr_init():
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -040028
29 # test variables
Tommy Carpenter09894e32020-04-02 19:45:19 -040030 def_pay = None
31 sixty_pay = None
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -040032
33 # create rmr app
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -040034
35 def default_handler(self, summary, sbuf):
Tommy Carpenter09894e32020-04-02 19:45:19 -040036 nonlocal def_pay
37 def_pay = json.loads(summary["payload"])
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -040038 self.rmr_free(sbuf)
39
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -040040 def sixtythou_handler(self, summary, sbuf):
Tommy Carpenter09894e32020-04-02 19:45:19 -040041 nonlocal sixty_pay
42 sixty_pay = json.loads(summary["payload"])
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -040043 self.rmr_free(sbuf)
44
Tommy Carpenter09894e32020-04-02 19:45:19 -040045 global rmr_xapp
46 rmr_xapp = RMRXapp(default_handler, rmr_port=4564, use_fake_sdl=True)
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -040047 rmr_xapp.register_callback(sixtythou_handler, 60000)
Tommy Carpenter1c9ce6b2020-03-13 09:36:36 -040048 rmr_xapp.run(thread=True) # in unit tests we need to thread here or else execution is not returned!
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -040049
50 time.sleep(1)
51
Tommy Carpenter09894e32020-04-02 19:45:19 -040052 # create a general xapp that will demonstrate some SDL functionality and launch some requests against the rmr xapp
53
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -040054 def entry(self):
55
Tommy Carpenter09894e32020-04-02 19:45:19 -040056 time.sleep(1)
57
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -040058 self.sdl_set("testns", "mykey", 6)
59 assert self.sdl_get("testns", "mykey") == 6
60 assert self.sdl_find_and_get("testns", "myk") == {"mykey": 6}
61 assert self.healthcheck()
62
63 val = json.dumps({"test send 60000": 1}).encode()
64 self.rmr_send(val, 60000)
65
66 val = json.dumps({"test send 60001": 2}).encode()
67 self.rmr_send(val, 60001)
68
69 global gen_xapp
Tommy Carpenter09894e32020-04-02 19:45:19 -040070 gen_xapp = Xapp(entrypoint=entry, use_fake_sdl=True)
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -040071 gen_xapp.run()
72
73 time.sleep(1)
74
Tommy Carpenter09894e32020-04-02 19:45:19 -040075 assert def_pay == {"test send 60001": 2}
76 assert sixty_pay == {"test send 60000": 1}
77
78
79def test_rmr_healthcheck():
80 # thanos uses the rmr xapp to healthcheck the rmr xapp
81
82 # test variables
83 health_pay = None
84
85 def post_init(self):
86 self.rmr_send(b"", RIC_HEALTH_CHECK_REQ)
87
88 def default_handler(self, summary, sbuf):
89 pass
90
91 global rmr_xapp_health
92 rmr_xapp_health = RMRXapp(default_handler, post_init=post_init, rmr_port=4666, use_fake_sdl=True)
93
94 def health_handler(self, summary, sbuf):
95 nonlocal health_pay
96 health_pay = summary["payload"]
97 self.rmr_free(sbuf)
98
99 rmr_xapp_health.register_callback(health_handler, RIC_HEALTH_CHECK_RESP)
100 rmr_xapp_health.run(thread=True) # in unit tests we need to thread here or else execution is not returned!
101
102 time.sleep(1)
103
104 assert health_pay == b"OK\n"
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -0400105
106
107def teardown_module():
108 """
109 this is like a "finally"; the name of this function is pytest magic
110 safer to put down here since certain failures above can lead to pytest never returning
111 for example if an exception gets raised before stop is called in any test function above, pytest will hang forever
112 """
113 with suppress(Exception):
114 gen_xapp.stop()
115 with suppress(Exception):
116 rmr_xapp.stop()
Tommy Carpenter09894e32020-04-02 19:45:19 -0400117 with suppress(Exception):
118 rmr_xapp_health.stop()