[DCAEGEN2] PMSH AppConfig Update
- Simplified existing PMSH Appconfig
- Major version bump for J release
Issue-ID: DCAEGEN2-2814
Signed-off-by: ajay_dp001 <ajay.deep.singh@est.tech>
Change-Id: I8ed572ccc7385cfdf91e51a126622821c113c53d
diff --git a/components/pm-subscription-handler/Changelog.md b/components/pm-subscription-handler/Changelog.md
index bd5929c..3112ac5 100755
--- a/components/pm-subscription-handler/Changelog.md
+++ b/components/pm-subscription-handler/Changelog.md
@@ -5,6 +5,9 @@
The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).
+## [2.0.0]
+### Changed
+* Updated PMSH app configuration, simplified existing config (DCAEGEN2-2814)
## [1.3.2]
### Changed
diff --git a/components/pm-subscription-handler/pmsh_service/mod/__init__.py b/components/pm-subscription-handler/pmsh_service/mod/__init__.py
index 5f78ca1..1024417 100644
--- a/components/pm-subscription-handler/pmsh_service/mod/__init__.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/__init__.py
@@ -17,8 +17,8 @@
# ============LICENSE_END=====================================================
import logging as logging
import os
-import ssl
import pathlib
+import ssl
from urllib.parse import quote
from connexion import App
@@ -46,9 +46,9 @@
if app_config.enable_tls:
logger.info('Launching secure http API server')
ssl_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
- ssl_ctx.load_cert_chain(app_config.cert_params[0], app_config.cert_params[1])
- connex_app.run(port=os.environ.get('PMSH_API_PORT', '8443'),
- ssl_options=ssl_ctx, server="tornado")
+ ssl_ctx.load_cert_chain(app_config.cert_path, app_config.key_path)
+ connex_app.run(port=os.environ.get('PMSH_API_PORT', '8443'), ssl_options=ssl_ctx,
+ server="tornado")
else:
logger.info('Launching unsecure http API server')
connex_app.run(port=os.environ.get('PMSH_API_PORT', '8443'), server="tornado")
diff --git a/components/pm-subscription-handler/pmsh_service/mod/aai_client.py b/components/pm-subscription-handler/pmsh_service/mod/aai_client.py
index e53ea3a..39adba4 100755
--- a/components/pm-subscription-handler/pmsh_service/mod/aai_client.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/aai_client.py
@@ -1,5 +1,5 @@
# ============LICENSE_START===================================================
-# Copyright (C) 2019-2020 Nordix Foundation.
+# Copyright (C) 2019-2021 Nordix Foundation.
# ============================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -74,7 +74,8 @@
app_conf.aaf_creds.get('aaf_pass')),
data=data, params=params,
verify=(app_conf.ca_cert_path if app_conf.enable_tls else False),
- cert=(app_conf.cert_params if app_conf.enable_tls else None))
+ cert=((app_conf.cert_path,
+ app_conf.key_path) if app_conf.enable_tls else None))
response.raise_for_status()
if response.ok:
nf_data = json.loads(response.text)
@@ -177,7 +178,8 @@
auth=HTTPBasicAuth(app_conf.aaf_creds.get('aaf_id'),
app_conf.aaf_creds.get('aaf_pass')),
verify=(app_conf.ca_cert_path if app_conf.enable_tls else False),
- cert=(app_conf.cert_params if app_conf.enable_tls else None))
+ cert=((app_conf.cert_path,
+ app_conf.key_path) if app_conf.enable_tls else None))
response.raise_for_status()
if response.ok:
data = json.loads(response.text)
diff --git a/components/pm-subscription-handler/pmsh_service/mod/pmsh_config.py b/components/pm-subscription-handler/pmsh_service/mod/pmsh_config.py
new file mode 100644
index 0000000..9c282ab
--- /dev/null
+++ b/components/pm-subscription-handler/pmsh_service/mod/pmsh_config.py
@@ -0,0 +1,158 @@
+# ============LICENSE_START===================================================
+# Copyright (C) 2021 Nordix Foundation.
+# ============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# SPDX-License-Identifier: Apache-2.0
+# ============LICENSE_END=====================================================
+
+"""This module represents PMSH application configuration
+ Singleton instance of configuration is created and stored,
+ Enum representation is used for Message Router topics.
+"""
+
+from enum import Enum, unique
+
+import requests
+from onap_dcae_cbs_docker_client.client import get_all
+from requests.auth import HTTPBasicAuth
+from tenacity import wait_fixed, stop_after_attempt, retry, retry_if_exception_type
+
+from mod import logger
+from mod.pmsh_utils import mdc_handler
+
+
+@unique
+class MRTopic(Enum):
+ """ Enum used to represent Message Router Topic"""
+ AAI_SUBSCRIBER = 'aai_subscriber'
+ POLICY_PM_PUBLISHER = 'policy_pm_publisher'
+ POLICY_PM_SUBSCRIBER = 'policy_pm_subscriber'
+
+
+class MetaSingleton(type):
+ """ Metaclass used to create singleton object by overriding __call__() method """
+ _instances = {}
+
+ def __call__(cls, *args, **kwargs):
+ if cls not in cls._instances:
+ cls._instances[cls] = super().__call__(*args, **kwargs)
+ return cls._instances[cls]
+
+ @classmethod
+ def get_cls_instance(mcs, cls_name):
+ return mcs._instances[cls_name]
+
+
+class AppConfig(metaclass=MetaSingleton):
+ """ Object representation of the PMSH Application config. """
+
+ def __init__(self):
+ app_config = self._get_config()
+ self.key_path = app_config['config'].get('key_path')
+ self.cert_path = app_config['config'].get('cert_path')
+ self.ca_cert_path = app_config['config'].get('ca_cert_path')
+ self.enable_tls = app_config['config'].get('enable_tls')
+ self.aaf_id = app_config['config'].get('aaf_identity')
+ self.aaf_pass = app_config['config'].get('aaf_password')
+ self.streams_publishes = app_config['config'].get('streams_publishes')
+ self.streams_subscribes = app_config['config'].get('streams_subscribes')
+ # TODO: aaf_creds variable should be removed on code cleanup
+ self.aaf_creds = {'aaf_id': self.aaf_id, 'aaf_pass': self.aaf_pass}
+
+ @staticmethod
+ def get_instance():
+ return AppConfig.get_cls_instance(AppConfig)
+
+ @retry(wait=wait_fixed(5), stop=stop_after_attempt(5),
+ retry=retry_if_exception_type(ValueError))
+ def _get_config(self):
+
+ """ Retrieves PMSH's configuration from Config binding service. If a non-2xx response
+ is received, it retries after 2 seconds for 5 times before raising an exception.
+
+ Returns:
+ dict: Dictionary representation of the the service configuration
+
+ Raises:
+ Exception: If any error occurred pulling configuration from Config binding service.
+ """
+ try:
+ logger.info('Attempting to fetch PMSH Configuration from CBS.')
+ config = get_all()
+ logger.info(f'Successfully fetched PMSH config from CBS: {config}')
+ return config
+ except Exception as e:
+ logger.error(f'Failed to get config from CBS: {e}', exc_info=True)
+ raise ValueError(e)
+
+ @mdc_handler
+ def publish_to_topic(self, mr_topic, event_json, **kwargs):
+ """
+ Publish the event to the DMaaP Message Router topic.
+
+ Args:
+ mr_topic (enum) : Message Router topic to publish.
+ event_json (dict): the json data to be published.
+
+ Raises:
+ Exception: if post request fails.
+ """
+ try:
+ session = requests.Session()
+ topic_url = self.streams_publishes[mr_topic].get('dmaap_info').get('topic_url')
+ headers = {'content-type': 'application/json', 'x-transactionid': kwargs['request_id'],
+ 'InvocationID': kwargs['invocation_id'], 'RequestID': kwargs['request_id']}
+ logger.info(f'Publishing event to MR topic: {topic_url}')
+ response = session.post(topic_url, headers=headers,
+ auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), json=event_json,
+ verify=(self.ca_cert_path if self.enable_tls else False))
+ response.raise_for_status()
+ except Exception as e:
+ raise e
+
+ @mdc_handler
+ def get_from_topic(self, mr_topic, consumer_id, consumer_group='dcae_pmsh_cg', timeout=5000,
+ **kwargs):
+ """
+ Returns the json data from the MrTopic.
+
+ Args:
+ mr_topic (enum) : Message Router topic to subscribe.
+ consumer_id (str): Within your subscribers group, a name that uniquely
+ identifies your subscribers process.
+ consumer_group (str): A name that uniquely identifies your subscribers.
+ timeout (int): The request timeout value in mSec.
+
+ Returns:
+ list[str]: the json response from DMaaP Message Router topic.
+ """
+ try:
+ session = requests.Session()
+ topic_url = self.streams_subscribes[mr_topic].get('dmaap_info').get('topic_url')
+ headers = {'accept': 'application/json', 'content-type': 'application/json',
+ 'InvocationID': kwargs['invocation_id'], 'RequestID': kwargs['request_id']}
+ logger.info(f'Fetching messages from MR topic: {topic_url}')
+ response = session.get(f'{topic_url}/{consumer_group}/{consumer_id}'
+ f'?timeout={timeout}',
+ auth=HTTPBasicAuth(self.aaf_id, self.aaf_pass), headers=headers,
+ verify=(self.ca_cert_path if self.enable_tls else False))
+ if response.status_code == 503:
+ logger.error(f'MR Service is unavailable at present: {response.content}')
+ pass
+ response.raise_for_status()
+ if response.ok:
+ return response.json()
+ except Exception as e:
+ logger.error(f'Failed to fetch message from MR: {e}', exc_info=True)
+ raise
diff --git a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py
index 9ddff31..26ada11 100755
--- a/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py
+++ b/components/pm-subscription-handler/pmsh_service/mod/pmsh_utils.py
@@ -18,23 +18,25 @@
import json
import os
import uuid
+from functools import wraps
from json import JSONDecodeError
from os import getenv
from threading import Timer
import requests
+from jsonschema import validate, ValidationError
from onap_dcae_cbs_docker_client.client import get_all
from onaplogging.mdcContext import MDC
from requests.auth import HTTPBasicAuth
from tenacity import wait_fixed, stop_after_attempt, retry, retry_if_exception_type
-from jsonschema import validate, ValidationError
from mod import logger
from mod.subscription import Subscription
-def mdc_handler(function):
- def decorator(*args, **kwargs):
+def mdc_handler(func):
+ @wraps(func)
+ def wrapper(*args, **kwargs):
request_id = str(uuid.uuid1())
invocation_id = str(uuid.uuid1())
MDC.put('ServiceName', getenv('HOSTNAME'))
@@ -43,23 +45,9 @@
kwargs['request_id'] = request_id
kwargs['invocation_id'] = invocation_id
- return function(*args, **kwargs)
+ return func(*args, **kwargs)
- return decorator
-
-
-class MySingleton(object):
- instances = {}
-
- def __new__(cls, clz=None):
- if clz is None:
- if cls.__name__ not in MySingleton.instances:
- MySingleton.instances[cls.__name__] = \
- object.__new__(cls)
- return MySingleton.instances[cls.__name__]
- MySingleton.instances[clz.__name__] = clz()
- MySingleton.first = clz
- return type(clz.__name__, (MySingleton,), dict(clz.__dict__))
+ return wrapper
def _load_sub_schema_from_file():
diff --git a/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py b/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py
index 4f2ca4a..0b6544b 100755
--- a/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py
+++ b/components/pm-subscription-handler/pmsh_service/pmsh_service_main.py
@@ -20,6 +20,7 @@
from mod import db, create_app, launch_api_server, logger
from mod.exit_handler import ExitHandler
+from mod.pmsh_config import AppConfig as NewAppConfig
from mod.pmsh_utils import AppConfig, PeriodicTask
from mod.policy_response_handler import PolicyResponseHandler
from mod.subscription_handler import SubscriptionHandler
@@ -32,6 +33,7 @@
app.app_context().push()
db.create_all(app=app)
app_conf = AppConfig()
+ pmsh_app_conf = NewAppConfig()
policy_mr_pub = app_conf.get_mr_pub('policy_pm_publisher')
policy_mr_sub = app_conf.get_mr_sub('policy_pm_subscriber')
aai_event_mr_sub = app_conf.get_mr_sub('aai_subscriber')
@@ -54,7 +56,7 @@
signal(SIGTERM, ExitHandler(periodic_tasks=periodic_tasks,
app_conf=app_conf, subscription_handler=subscription_handler))
- launch_api_server(app_conf)
+ launch_api_server(pmsh_app_conf)
except Exception as e:
logger.error(f'Failed to initialise PMSH: {e}', exc_info=True)
diff --git a/components/pm-subscription-handler/pom.xml b/components/pm-subscription-handler/pom.xml
index a0b58f4..89baa28 100644
--- a/components/pm-subscription-handler/pom.xml
+++ b/components/pm-subscription-handler/pom.xml
@@ -32,7 +32,7 @@
<groupId>org.onap.dcaegen2.services</groupId>
<artifactId>pmsh</artifactId>
<name>dcaegen2-services-pm-subscription-handler</name>
- <version>1.3.2-SNAPSHOT</version>
+ <version>2.0.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<sonar.sources>.</sonar.sources>
diff --git a/components/pm-subscription-handler/setup.py b/components/pm-subscription-handler/setup.py
index 2b8d24a..d5d6040 100644
--- a/components/pm-subscription-handler/setup.py
+++ b/components/pm-subscription-handler/setup.py
@@ -22,7 +22,7 @@
setup(
name="pm_subscription_handler",
- version="1.3.2",
+ version="2.0.0",
packages=find_packages(),
author="lego@est.tech",
author_email="lego@est.tech",
diff --git a/components/pm-subscription-handler/tests/base_setup.py b/components/pm-subscription-handler/tests/base_setup.py
index 9e12f96..e422cea 100755
--- a/components/pm-subscription-handler/tests/base_setup.py
+++ b/components/pm-subscription-handler/tests/base_setup.py
@@ -1,5 +1,5 @@
# ============LICENSE_START===================================================
-# Copyright (C) 2020 Nordix Foundation.
+# Copyright (C) 2020-2021 Nordix Foundation.
# ============================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -23,6 +23,7 @@
from mod import create_app, db
from mod.network_function import NetworkFunctionFilter
from mod.pmsh_utils import AppConfig
+from mod.pmsh_config import AppConfig as NewAppConfig
def get_pmsh_config(file_path='data/cbs_data_1.json'):
@@ -51,6 +52,10 @@
self.app_conf = AppConfig()
self.app_conf.nf_filter = NetworkFunctionFilter(**self.app_conf.subscription.nfFilter)
+ @patch('mod.pmsh_config.AppConfig._get_config', MagicMock(return_value=get_pmsh_config()))
+ def setUpAppConf(self):
+ self.pmsh_app_conf = NewAppConfig()
+
def tearDown(self):
db.drop_all()
diff --git a/components/pm-subscription-handler/tests/test_pmsh_config.py b/components/pm-subscription-handler/tests/test_pmsh_config.py
new file mode 100644
index 0000000..deb867b
--- /dev/null
+++ b/components/pm-subscription-handler/tests/test_pmsh_config.py
@@ -0,0 +1,92 @@
+# ============LICENSE_START===================================================
+# Copyright (C) 2021 Nordix Foundation.
+# ============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# SPDX-License-Identifier: Apache-2.0
+# ============LICENSE_END=====================================================
+from unittest.mock import Mock, patch
+
+import responses
+from requests import Session
+
+from mod.pmsh_config import MRTopic, AppConfig
+from tests.base_setup import BaseClassSetup
+
+
+class PmshConfigTestCase(BaseClassSetup):
+
+ @classmethod
+ def setUpClass(cls):
+ super().setUpClass()
+
+ def setUp(self):
+ super().setUpAppConf()
+ self.mock_app = Mock()
+
+ def tearDown(self):
+ super().tearDown()
+
+ @classmethod
+ def tearDownClass(cls):
+ super().tearDownClass()
+
+ def test_config_get_aaf_creds(self):
+ self.assertEqual(self.pmsh_app_conf.enable_tls, 'true')
+ self.assertEqual(self.pmsh_app_conf.aaf_id, 'dcae@dcae.onap.org')
+ self.assertEqual(self.pmsh_app_conf.aaf_pass, 'demo123456!')
+
+ def test_config_get_cert_data(self):
+ self.assertEqual(self.pmsh_app_conf.key_path, '/opt/app/pmsh/etc/certs/key.pem')
+ self.assertEqual(self.pmsh_app_conf.cert_path, '/opt/app/pmsh/etc/certs/cert.pem')
+ self.assertEqual(self.pmsh_app_conf.ca_cert_path, '/opt/app/pmsh/etc/certs/cacert.pem')
+
+ def test_singleton_instance_is_accessible_using_class_method(self):
+ my_singleton_instance = AppConfig.get_instance()
+ self.assertIsNotNone(my_singleton_instance)
+ self.assertIsInstance(my_singleton_instance, AppConfig)
+
+ @patch.object(Session, 'post')
+ def test_mr_pub_publish_to_topic_success(self, mock_session):
+ mock_session.return_value.status_code = 200
+ with patch('requests.Session.post') as session_post_call:
+ self.pmsh_app_conf.publish_to_topic(MRTopic.POLICY_PM_PUBLISHER.value,
+ {"key": "43c4ee19-6b8d-4279-a80f-c507850aae47"})
+ session_post_call.assert_called_once()
+
+ @responses.activate
+ def test_mr_pub_publish_to_topic_fail(self):
+ responses.add(responses.POST,
+ 'https://message-router:3905/events/org.onap.dmaap.mr.PM_SUBSCRIPTIONS',
+ json={"error": "Client Error"}, status=400)
+ with self.assertRaises(Exception):
+ self.pmsh_app_conf.publish_to_topic(MRTopic.POLICY_PM_PUBLISHER.value,
+ {"key": "43c4ee19-6b8d-4279-a80f-c507850aae47"})
+
+ @responses.activate
+ def test_mr_sub_get_from_topic_success(self):
+ responses.add(responses.GET,
+ 'https://message-router:3905/events/org.onap.dmaap.mr.PM_SUBSCRIPTIONS/'
+ 'dcae_pmsh_cg/1?timeout=5000',
+ json={"key": "43c4ee19-6b8d-4279-a80f-c507850aae47"}, status=200)
+ mr_topic_data = self.pmsh_app_conf.get_from_topic(MRTopic.POLICY_PM_SUBSCRIBER.value, 1)
+ self.assertIsNotNone(mr_topic_data)
+
+ @responses.activate
+ def test_mr_sub_get_from_topic_fail(self):
+ responses.add(responses.GET,
+ 'https://message-router:3905/events/org.onap.dmaap.mr.PM_SUBSCRIPTIONS/'
+ 'dcae_pmsh_cg/1?timeout=5000',
+ json={"key": "43c4ee19-6b8d-4279-a80f-c507850aae47"}, status=400)
+ with self.assertRaises(Exception):
+ self.pmsh_app_conf.get_from_topic(MRTopic.POLICY_PM_SUBSCRIBER.value, 1)
diff --git a/components/pm-subscription-handler/version.properties b/components/pm-subscription-handler/version.properties
index ef20baa..358e99c 100644
--- a/components/pm-subscription-handler/version.properties
+++ b/components/pm-subscription-handler/version.properties
@@ -1,6 +1,6 @@
-major=1
-minor=3
-patch=2
+major=2
+minor=0
+patch=0
base_version=${major}.${minor}.${patch}
release_version=${base_version}
snapshot_version=${base_version}-SNAPSHOT