blob: 8eef73b47e425fe8acbaf311c10ba526b155b606 [file] [log] [blame]
# ==================================================================================
# Copyright (c) 2020 Nokia
# Copyright (c) 2020 AT&T Intellectual Property.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==================================================================================
import time
import os
from contextlib import suppress
from mdclogpy import Logger
from ricxappframe.xapp_frame import RMRXapp, CONFIG_FILE_ENV
mdc_logger = Logger(name=__name__)
rmr_xapp_config = None
rmr_xapp_defconfig = None
rmr_xapp_noconfig = None
config_file_path = "/tmp/file.json"
def init_config_file():
with open(config_file_path, "w") as file:
file.write('{ "start" : "value" }')
def write_config_file():
# generate an inotify/config event
with open(config_file_path, "w") as file:
file.write('{ "change" : "value2" }')
def test_config_no_env(monkeypatch):
init_config_file()
monkeypatch.delenv(CONFIG_FILE_ENV, raising=False)
def default_rmr_handler(self, summary, sbuf):
pass
config_event_seen = False
def config_handler(self, json):
nonlocal config_event_seen
config_event_seen = True
global rmr_xapp_noconfig
rmr_xapp_noconfig = RMRXapp(default_rmr_handler, config_handler=config_handler, rmr_port=4652, use_fake_sdl=True)
# in unit tests we need to thread here or else execution is not returned!
rmr_xapp_noconfig.run(thread=True, rmr_timeout=1)
write_config_file()
# give the work loop a chance to timeout on RMR and process the config event
time.sleep(3)
assert not config_event_seen
rmr_xapp_noconfig.stop()
def test_default_config_handler(monkeypatch):
"""Just for coverage"""
init_config_file()
monkeypatch.setenv(CONFIG_FILE_ENV, config_file_path)
def default_rmr_handler(self, summary, sbuf):
pass
# listen port is irrelevant, no messages arrive
global rmr_xapp_defconfig
rmr_xapp_defconfig = RMRXapp(default_rmr_handler, rmr_port=4567, use_fake_sdl=True)
# in unit tests we need to thread here or else execution is not returned!
rmr_xapp_defconfig.run(thread=True, rmr_timeout=1)
write_config_file()
# give the work loop a chance to timeout on RMR and process the config event
time.sleep(3)
rmr_xapp_defconfig.stop()
def test_custom_config_handler(monkeypatch):
# point watcher at the file
init_config_file()
monkeypatch.setenv(CONFIG_FILE_ENV, config_file_path)
def default_handler(self, summary, sbuf):
pass
startup_config_event = False
change_config_event = False
def config_handler(self, json):
mdc_logger.info("config_handler: json {}".format(json))
nonlocal startup_config_event
nonlocal change_config_event
if "start" in json:
startup_config_event = True
if "change" in json:
change_config_event = True
# listen port is irrelevant, no messages arrive
global rmr_xapp_config
rmr_xapp_config = RMRXapp(default_handler, config_handler=config_handler, rmr_port=4567, use_fake_sdl=True)
assert startup_config_event
rmr_xapp_config.run(thread=True, rmr_timeout=1) # in unit tests we need to thread here or else execution is not returned!
write_config_file()
# give the work loop a chance to timeout on RMR and process the config event
time.sleep(3)
assert change_config_event
rmr_xapp_config.stop()
def teardown_module():
"""
this is like a "finally"; the name of this function is pytest magic
safer to put down here since certain failures above can lead to pytest never returning
for example if an exception gets raised before stop is called in any test function above,
pytest will hang forever
"""
os.remove(config_file_path)
with suppress(Exception):
rmr_xapp_config.stop()
with suppress(Exception):
rmr_xapp_defconfig.stop()
with suppress(Exception):
rmr_xapp_noconfig.stop()