blob: 95db6b24b481154ea45a0581c7bf9fcfcfe41ff1 [file] [log] [blame]
# ============LICENSE_START===================================================
# Copyright (C) 2019-2020 Nordix Foundation.
# ============================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# SPDX-License-Identifier: Apache-2.0
# ============LICENSE_END=====================================================
import json
import os
from test.support import EnvironmentVarGuard
from unittest import TestCase
from unittest.mock import patch
from requests import Session
from tenacity import stop_after_attempt
import mod.aai_client as aai_client
from mod import db, create_app
from mod.api.db_models import NetworkFunctionModel
from mod.network_function import NetworkFunction, OrchestrationStatus
from mod.pmsh_utils import AppConfig
from mod.subscription import Subscription
class SubscriptionTest(TestCase):
@patch('mod.update_logging_config')
@patch('mod.pmsh_utils._MrPub')
@patch('mod.pmsh_utils._MrSub')
@patch('mod.get_db_connection_url')
@patch.object(Session, 'put')
@patch('mod.pmsh_utils.AppConfig._get_pmsh_config')
def setUp(self, mock_get_pmsh_config, mock_session, mock_get_db_url,
mock_mr_sub, mock_mr_pub, mock_update_config):
mock_get_db_url.return_value = 'sqlite://'
with open(os.path.join(os.path.dirname(__file__), 'data/aai_xnfs.json'), 'r') as data:
self.aai_response_data = data.read()
mock_session.return_value.status_code = 200
mock_session.return_value.text = self.aai_response_data
self.env = EnvironmentVarGuard()
self.env.set('AAI_SERVICE_HOST', '1.2.3.4')
self.env.set('AAI_SERVICE_PORT', '8443')
self.env.set('LOGGER_CONFIG', os.path.join(os.path.dirname(__file__), 'log_config.yaml'))
with open(os.path.join(os.path.dirname(__file__), 'data/cbs_data_1.json'), 'r') as data:
self.cbs_data = json.load(data)
mock_get_pmsh_config.return_value = self.cbs_data
self.app_conf = AppConfig()
self.xnfs = aai_client.get_pmsh_nfs_from_aai(self.app_conf)
self.mock_mr_sub = mock_mr_sub
self.mock_mr_pub = mock_mr_pub
self.app = create_app()
self.app_context = self.app.app_context()
self.app_context.push()
db.create_all()
def tearDown(self):
db.session.remove()
db.drop_all()
self.app_context.pop()
def test_xnf_filter_true(self):
self.assertTrue(self.app_conf.nf_filter.is_nf_in_filter('pnf1',
OrchestrationStatus.ACTIVE.value))
def test_xnf_filter_false(self):
self.assertFalse(self.app_conf.nf_filter.is_nf_in_filter('PNF-33',
OrchestrationStatus.ACTIVE.value))
def test_sub_measurement_group(self):
self.assertEqual(len(self.app_conf.subscription.measurementGroups), 2)
def test_sub_file_location(self):
self.assertEqual(self.app_conf.subscription.fileLocation, '/pm/pm.xml')
def test_get_subscription(self):
sub_name = 'ExtraPM-All-gNB-R2B'
self.app_conf.subscription.create()
new_sub = Subscription.get(sub_name)
self.assertEqual(sub_name, new_sub.subscription_name)
def test_get_subscription_no_match(self):
sub_name = 'sub2_does_not_exist'
sub = Subscription.get(sub_name)
self.assertEqual(sub, None)
def test_get_nf_names_per_sub(self):
self.app_conf.subscription.create()
self.app_conf.subscription.add_network_function_to_subscription(list(self.xnfs)[0])
self.app_conf.subscription.add_network_function_to_subscription(list(self.xnfs)[1])
nfs = Subscription.get_nf_names_per_sub(self.app_conf.subscription.subscriptionName)
self.assertEqual(2, len(nfs))
def test_create_existing_subscription(self):
sub1 = self.app_conf.subscription.create()
same_sub1 = self.app_conf.subscription.create()
self.assertEqual(sub1, same_sub1)
self.assertEqual(1, len(self.app_conf.subscription.get_all()))
def test_add_network_functions_per_subscription(self):
for nf in self.xnfs:
self.app_conf.subscription.add_network_function_to_subscription(nf)
nfs_for_sub_1 = Subscription.get_all_nfs_subscription_relations()
self.assertEqual(3, len(nfs_for_sub_1))
new_nf = NetworkFunction(nf_name='vnf_3', orchestration_status='Active')
self.app_conf.subscription.add_network_function_to_subscription(new_nf)
nf_subs = Subscription.get_all_nfs_subscription_relations()
self.assertEqual(4, len(nf_subs))
def test_add_duplicate_network_functions_per_subscription(self):
self.app_conf.subscription.add_network_function_to_subscription(list(self.xnfs)[0])
nf_subs = Subscription.get_all_nfs_subscription_relations()
self.assertEqual(1, len(nf_subs))
self.app_conf.subscription.add_network_function_to_subscription(list(self.xnfs)[0])
nf_subs = Subscription.get_all_nfs_subscription_relations()
self.assertEqual(1, len(nf_subs))
def test_update_subscription_status(self):
sub_name = 'ExtraPM-All-gNB-R2B'
self.app_conf.subscription.create()
self.app_conf.subscription.administrativeState = 'new_status'
self.app_conf.subscription.update_subscription_status()
sub = Subscription.get(sub_name)
self.assertEqual('new_status', sub.status)
def test_delete_subscription(self):
for nf in self.xnfs:
self.app_conf.subscription.add_network_function_to_subscription(nf)
self.app_conf.subscription.delete_subscription()
self.assertEqual(0, len(Subscription.get_all()))
self.assertEqual(None, Subscription.get(self.app_conf.subscription.subscriptionName))
self.assertEqual(0, len(Subscription.get_all_nfs_subscription_relations()))
self.assertEqual(0, len(NetworkFunction.get_all()))
self.assertEqual(None, NetworkFunction.get(nf_name=list(self.xnfs)[0].nf_name))
def test_update_sub_nf_status(self):
sub_name = 'ExtraPM-All-gNB-R2B'
for nf in self.xnfs:
self.app_conf.subscription.add_network_function_to_subscription(nf)
sub_nfs = Subscription.get_all_nfs_subscription_relations()
self.assertEqual('PENDING_CREATE', sub_nfs[0].nf_sub_status)
Subscription.update_sub_nf_status(sub_name, 'Active', 'pnf_23')
sub_nfs = Subscription.get_all_nfs_subscription_relations()
self.assertEqual('PENDING_CREATE', sub_nfs[0].nf_sub_status)
self.assertEqual('PENDING_CREATE', sub_nfs[1].nf_sub_status)
@patch('mod.subscription.Subscription.add_network_function_to_subscription')
@patch('mod.subscription.Subscription.update_sub_nf_status')
@patch('mod.subscription.Subscription.update_subscription_status')
def test_process_activate_subscription(self, mock_update_sub_status,
mock_update_sub_nf, mock_add_nfs):
self.app_conf.subscription.process_subscription.retry.stop = stop_after_attempt(1)
self.app_conf.subscription.process_subscription([list(self.xnfs)[0]], self.mock_mr_pub,
self.app_conf)
mock_update_sub_status.assert_called()
mock_add_nfs.assert_called()
self.assertTrue(self.mock_mr_pub.publish_subscription_event_data.called)
mock_update_sub_nf.assert_called_with(self.app_conf.subscription.subscriptionName,
'PENDING_CREATE', list(self.xnfs)[0].nf_name)
@patch('mod.subscription.Subscription.update_sub_nf_status')
@patch('mod.subscription.Subscription.update_subscription_status')
def test_process_deactivate_subscription(self, mock_update_sub_status,
mock_update_sub_nf):
self.app_conf.subscription.administrativeState = 'LOCKED'
self.app_conf.subscription.process_subscription.retry.stop = stop_after_attempt(1)
self.app_conf.subscription.process_subscription([list(self.xnfs)[0]], self.mock_mr_pub,
self.app_conf)
self.assertTrue(self.mock_mr_pub.publish_subscription_event_data.called)
mock_update_sub_nf.assert_called_with(self.app_conf.subscription.subscriptionName,
'PENDING_DELETE', list(self.xnfs)[0].nf_name)
mock_update_sub_status.assert_called()
def test_process_subscription_exception(self):
self.app_conf.subscription.process_subscription.retry.stop = stop_after_attempt(1)
self.assertRaises(Exception, self.app_conf.subscription.process_subscription,
[list(self.xnfs)[0]], 'not_mr_pub', 'app_config')
def test_prepare_subscription_event(self):
with open(os.path.join(os.path.dirname(__file__),
'data/pm_subscription_event.json'), 'r') as data:
expected_sub_event = json.load(data)
actual_sub_event = self.app_conf.subscription.prepare_subscription_event('pnf_1',
self.app_conf)
self.assertEqual(expected_sub_event, actual_sub_event)
def test_get_nf_models(self):
for nf in self.xnfs:
self.app_conf.subscription.add_network_function_to_subscription(nf)
nf_models = self.app_conf.subscription._get_nf_models()
self.assertEqual(3, len(nf_models))
self.assertIsInstance(nf_models[0], NetworkFunctionModel)
def test_get_network_functions(self):
for nf in self.xnfs:
self.app_conf.subscription.add_network_function_to_subscription(nf)
nfs = self.app_conf.subscription.get_network_functions()
self.assertEqual(3, len(nfs))
self.assertIsInstance(nfs[0], NetworkFunction)