blob: ee79d52340354ab81ac0c5e62b99960b8962bb7b [file] [log] [blame]
# ============LICENSE_START===================================================
# Copyright (C) 2019-2020 Nordix Foundation.
# ============================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# SPDX-License-Identifier: Apache-2.0
# ============LICENSE_END=====================================================
import json
import os
import unittest
from unittest import mock
from unittest.mock import patch
import responses
from requests import Session
from mod.pmsh_utils import AppConfig
from mod.subscription import Subscription
class PmshUtilsTestCase(unittest.TestCase):
def setUp(self):
with open(os.path.join(os.path.dirname(__file__), 'data/cbs_data.json'), 'r') as data:
self.cbs_data = json.load(data)
self.app_conf = AppConfig(**self.cbs_data['config'])
self.sub = Subscription(**self.cbs_data['policy']['subscription'])
def test_utils_get_mr_sub(self):
mr_policy_sub = self.app_conf.get_mr_sub('policy_pm_subscriber')
self.assertTrue(mr_policy_sub.aaf_id, 'dcae@dcae.onap.org')
def test_utils_get_mr_sub_fails_with_invalid_name(self):
with self.assertRaises(KeyError):
self.app_conf.get_mr_sub('invalid_sub')
def test_utils_get_mr_pub(self):
mr_policy_pub = self.app_conf.get_mr_pub('policy_pm_publisher')
self.assertTrue(mr_policy_pub.aaf_pass, 'demo123456!')
def test_utils_get_mr_pub_fails_with_invalid_name(self):
with self.assertRaises(KeyError):
self.app_conf.get_mr_pub('invalid_pub')
def test_utils_get_cert_data(self):
self.assertTrue(self.app_conf.cert_params, ('/opt/app/pm-mapper/etc/certs/cert.pem',
'/opt/app/pm-mapper/etc/certs/key.pem'))
@mock.patch.object(Session, 'post')
def test_mr_pub_publish_to_topic_success(self, mock_session):
mock_session.return_value.status_code = 200
mr_policy_pub = self.app_conf.get_mr_pub('policy_pm_publisher')
with patch('requests.Session.post') as session_post_call:
mr_policy_pub.publish_to_topic({"dummy_val": "43c4ee19-6b8d-4279-a80f-c507850aae47"})
session_post_call.assert_called_once()
@responses.activate
def test_mr_pub_publish_to_topic_fail(self):
responses.add(responses.POST,
'https://node:30226/events/org.onap.dmaap.mr.PM_SUBSCRIPTIONS',
json={'error': 'Client Error'}, status=400)
mr_policy_pub = self.app_conf.get_mr_pub('policy_pm_publisher')
with self.assertRaises(Exception):
mr_policy_pub.publish_to_topic({"dummy_val": "43c4ee19-6b8d-4279-a80f-c507850aae47"})
def test_mr_pub_publish_sub_event_data_success(self):
mr_policy_pub = self.app_conf.get_mr_pub('policy_pm_publisher')
with patch('mod.pmsh_utils._MrPub.publish_to_topic') as pub_to_topic_call:
mr_policy_pub.publish_subscription_event_data(self.sub, 'pnf201')
pub_to_topic_call.assert_called_once()
@responses.activate
def test_mr_sub_get_from_topic_success(self):
responses.add(responses.GET,
'https://node:30226/events/org.onap.dmaap.mr.PM_SUBSCRIPTIONS/'
'dcae_pmsh_cg/1?timeout=1000',
json={"dummy_val": "43c4ee19-6b8d-4279-a80f-c507850aae47"}, status=200)
mr_policy_sub = self.app_conf.get_mr_sub('policy_pm_subscriber')
mr_topic_data = mr_policy_sub.get_from_topic(1)
self.assertIsNotNone(mr_topic_data)
@responses.activate
def test_mr_sub_get_from_topic_fail(self):
responses.add(responses.GET,
'https://node:30226/events/org.onap.dmaap.mr.PM_SUBSCRIPTIONS/'
'dcae_pmsh_cg/1?timeout=1000',
json={"dummy_val": "43c4ee19-6b8d-4279-a80f-c507850aae47"}, status=400)
mr_policy_sub = self.app_conf.get_mr_sub('policy_pm_subscriber')
mr_topic_data = mr_policy_sub.get_from_topic(1)
self.assertIsNone(mr_topic_data)