| # ============LICENSE_START=================================================== |
| # Copyright (C) 2019-2020 Nordix Foundation. |
| # ============================================================================ |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| # SPDX-License-Identifier: Apache-2.0 |
| # ============LICENSE_END===================================================== |
| import json |
| import os |
| import unittest |
| from unittest import mock |
| from unittest.mock import patch |
| |
| import responses |
| from requests import Session |
| |
| from mod.pmsh_utils import AppConfig |
| from mod.subscription import Subscription |
| |
| |
| class PmshUtilsTestCase(unittest.TestCase): |
| |
| def setUp(self): |
| with open(os.path.join(os.path.dirname(__file__), 'data/cbs_data.json'), 'r') as data: |
| self.cbs_data = json.load(data) |
| self.app_conf = AppConfig(**self.cbs_data['config']) |
| self.sub = Subscription(**self.cbs_data['policy']['subscription']) |
| |
| def test_utils_get_mr_sub(self): |
| mr_policy_sub = self.app_conf.get_mr_sub('policy_pm_subscriber') |
| self.assertTrue(mr_policy_sub.aaf_id, 'dcae@dcae.onap.org') |
| |
| def test_utils_get_mr_sub_fails_with_invalid_name(self): |
| with self.assertRaises(KeyError): |
| self.app_conf.get_mr_sub('invalid_sub') |
| |
| def test_utils_get_mr_pub(self): |
| mr_policy_pub = self.app_conf.get_mr_pub('policy_pm_publisher') |
| self.assertTrue(mr_policy_pub.aaf_pass, 'demo123456!') |
| |
| def test_utils_get_mr_pub_fails_with_invalid_name(self): |
| with self.assertRaises(KeyError): |
| self.app_conf.get_mr_pub('invalid_pub') |
| |
| def test_utils_get_cert_data(self): |
| self.assertTrue(self.app_conf.cert_params, ('/opt/app/pm-mapper/etc/certs/cert.pem', |
| '/opt/app/pm-mapper/etc/certs/key.pem')) |
| |
| @mock.patch.object(Session, 'post') |
| def test_mr_pub_publish_to_topic_success(self, mock_session): |
| mock_session.return_value.status_code = 200 |
| mr_policy_pub = self.app_conf.get_mr_pub('policy_pm_publisher') |
| with patch('requests.Session.post') as session_post_call: |
| mr_policy_pub.publish_to_topic({"dummy_val": "43c4ee19-6b8d-4279-a80f-c507850aae47"}) |
| session_post_call.assert_called_once() |
| |
| @responses.activate |
| def test_mr_pub_publish_to_topic_fail(self): |
| responses.add(responses.POST, |
| 'https://node:30226/events/org.onap.dmaap.mr.PM_SUBSCRIPTIONS', |
| json={'error': 'Client Error'}, status=400) |
| mr_policy_pub = self.app_conf.get_mr_pub('policy_pm_publisher') |
| with self.assertRaises(Exception): |
| mr_policy_pub.publish_to_topic({"dummy_val": "43c4ee19-6b8d-4279-a80f-c507850aae47"}) |
| |
| def test_mr_pub_publish_sub_event_data_success(self): |
| mr_policy_pub = self.app_conf.get_mr_pub('policy_pm_publisher') |
| with patch('mod.pmsh_utils._MrPub.publish_to_topic') as pub_to_topic_call: |
| mr_policy_pub.publish_subscription_event_data(self.sub, 'pnf201') |
| pub_to_topic_call.assert_called_once() |
| |
| @responses.activate |
| def test_mr_sub_get_from_topic_success(self): |
| responses.add(responses.GET, |
| 'https://node:30226/events/org.onap.dmaap.mr.PM_SUBSCRIPTIONS/' |
| 'dcae_pmsh_cg/1?timeout=1000', |
| json={"dummy_val": "43c4ee19-6b8d-4279-a80f-c507850aae47"}, status=200) |
| mr_policy_sub = self.app_conf.get_mr_sub('policy_pm_subscriber') |
| mr_topic_data = mr_policy_sub.get_from_topic(1) |
| self.assertIsNotNone(mr_topic_data) |
| |
| @responses.activate |
| def test_mr_sub_get_from_topic_fail(self): |
| responses.add(responses.GET, |
| 'https://node:30226/events/org.onap.dmaap.mr.PM_SUBSCRIPTIONS/' |
| 'dcae_pmsh_cg/1?timeout=1000', |
| json={"dummy_val": "43c4ee19-6b8d-4279-a80f-c507850aae47"}, status=400) |
| mr_policy_sub = self.app_conf.get_mr_sub('policy_pm_subscriber') |
| mr_topic_data = mr_policy_sub.get_from_topic(1) |
| self.assertIsNone(mr_topic_data) |