| # ================================================================================ |
| # Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved. |
| # ================================================================================ |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # ============LICENSE_END========================================================= |
| # |
| |
| """read and use the config""" |
| |
| import base64 |
| import copy |
| import json |
| import logging |
| import logging.config |
| import os |
| |
| from .onap.audit import Audit |
| from .utils import Utils |
| |
| LOGS_DIR = 'logs' |
| |
| try: |
| os.makedirs(LOGS_DIR, mode=0o770, exist_ok=True) |
| except Exception: |
| pass |
| |
| logging.basicConfig( |
| filename=os.path.join(LOGS_DIR, 'policy_handler.log'), |
| format=('%(asctime)s.%(msecs)03d %(levelname)+8s ' + |
| '%(threadName)s %(name)s.%(funcName)s: %(message)s'), |
| datefmt='%Y%m%d_%H%M%S', level=logging.DEBUG) |
| |
| _LOGGER = Utils.get_logger(__file__) |
| |
| class Settings(object): |
| """settings of module or an application |
| that is the config filtered by the collection of config-keys. |
| |
| keeps track of changes versus the previous set_config unless committed |
| """ |
| def __init__(self, *config_keys): |
| """provide the collection of top level keys in config to limit the config""" |
| self._config_keys = config_keys |
| self._changed = False |
| self._config = None |
| self._prev_config = None |
| |
| def __str__(self): |
| """get str of the config""" |
| if not self._changed: |
| return Audit.json_dumps({ |
| "config_keys": self._config_keys, |
| "config": self._config |
| }) |
| |
| return Audit.json_dumps({ |
| "config_keys": self._config_keys, |
| "changed": self._changed, |
| "config": self._config, |
| "prev_config": self._prev_config |
| }) |
| |
| def is_loaded(self): |
| """whether loaded already""" |
| return bool(self._config) |
| |
| def commit_change(self): |
| """set the prev config to the latest config""" |
| self._prev_config = copy.deepcopy(self._config) |
| self._changed = False |
| |
| def _set_changed(self): |
| """determine whether the config changed""" |
| self._changed = not (self._prev_config |
| and Utils.are_the_same(self._prev_config, self._config, |
| Audit.json_dumps)) |
| |
| def set_config(self, config, auto_commit=False): |
| """update the config""" |
| self.commit_change() |
| |
| if isinstance(config, Settings): |
| config = config._config |
| |
| if not isinstance(config, dict): |
| config = {} |
| |
| self._config = copy.deepcopy(dict((k, v) for (k, v) in config.items() |
| if not self._config_keys or k in self._config_keys)) |
| |
| if auto_commit: |
| self.commit_change() |
| else: |
| self._set_changed() |
| |
| def is_changed(self): |
| """whether the config has changed""" |
| return self._changed |
| |
| def get_by_key(self, config_key, default=None): |
| """get the latest sub config by config_key and whether it has changed""" |
| if not config_key or not isinstance(config_key, str): |
| return False, default |
| |
| value = copy.deepcopy(self._config.get(config_key, default)) |
| if not self._prev_config: |
| return True, value |
| prev_value = self._prev_config.get(config_key, default) |
| return self._changed and not Utils.are_the_same(prev_value, value, Audit.json_dumps), value |
| |
| def update(self, config_key, value=None): |
| """set the latest sub config by config_key and determine whether the config has changed""" |
| if not config_key: |
| return |
| |
| self._config[config_key] = copy.deepcopy(value) |
| self._set_changed() |
| |
| |
| class Config(object): |
| """main config of the application""" |
| CONFIG_FILE_PATH = "etc/config.json" |
| LOGGER_CONFIG_FILE_PATH = "etc/common_logger.config" |
| SERVICE_NAME_POLICY_HANDLER = "policy_handler" |
| |
| FIELD_SYSTEM = "system" |
| FIELD_CONSUL_URL = "consul_url" |
| FIELD_WSERVICE_PORT = "wservice_port" |
| FIELD_TLS = "tls" |
| FIELD_POLICY_ENGINE = "policy_engine" |
| DMAAP_MR = "dmaap_mr" |
| POOL_CONNECTIONS = "pool_connections" |
| DEPLOY_HANDLER = "deploy_handler" |
| THREAD_POOL_SIZE = "thread_pool_size" |
| POLICY_RETRY_COUNT = "policy_retry_count" |
| POLICY_RETRY_SLEEP = "policy_retry_sleep" |
| RECONFIGURE = "reconfigure" |
| TIMER_INTERVAL = "interval" |
| REQUESTS_VERIFY = "verify" |
| TLS_CA_MODE = "tls_ca_mode" |
| TLS_WSS_CA_MODE = "tls_wss_ca_mode" |
| TLS_CA_MODE_DO_NOT_VERIFY = "do_not_verify" |
| TIMEOUT_IN_SECS = "timeout_in_secs" |
| CONSUL_TIMEOUT_IN_SECS = "consul_timeout_in_secs" |
| WS_PING_INTERVAL_IN_SECS = "ws_ping_interval_in_secs" |
| DEFAULT_TIMEOUT_IN_SECS = 60 |
| SERVICE_ACTIVATOR = "service_activator" |
| MODE_OF_OPERATION = "mode_of_operation" |
| PDP_API_VERSION = "PDP_API_VERSION" |
| QUERY_TIMEOUT = "timeout" |
| PDP_USER = "PDP_USER" |
| PDP_PWD = "PDP_PWD" |
| DMAAP_MR_USER = "DMAAP_MR_USER" |
| DMAAP_MR_PWD = "DMAAP_MR_PWD" |
| |
| system_name = SERVICE_NAME_POLICY_HANDLER |
| wservice_port = 25577 |
| _pdp_api_version = os.environ.get(PDP_API_VERSION) |
| consul_url = "http://consul:8500" |
| consul_timeout_in_secs = DEFAULT_TIMEOUT_IN_SECS |
| tls_cacert_file = None |
| tls_server_cert_file = None |
| tls_private_key_file = None |
| tls_server_ca_chain_file = None |
| _pdp_authorization = None |
| _dmaap_mr_authorization = None |
| |
| _local_config = Settings() |
| discovered_config = Settings() |
| |
| @staticmethod |
| def _get_tls_file_path(tls_config, cert_directory, tls_name): |
| """calc file path and verify its existance""" |
| file_name = tls_config.get(tls_name) |
| if not file_name: |
| return None |
| tls_file_path = os.path.join(cert_directory, file_name) |
| if not os.path.isfile(tls_file_path) or not os.access(tls_file_path, os.R_OK): |
| _LOGGER.error("invalid %s: %s", tls_name, tls_file_path) |
| return None |
| return tls_file_path |
| |
| @staticmethod |
| def _set_tls_config(tls_config): |
| """verify and set tls certs in config""" |
| try: |
| Config.tls_cacert_file = None |
| Config.tls_server_cert_file = None |
| Config.tls_private_key_file = None |
| Config.tls_server_ca_chain_file = None |
| |
| if not (tls_config and isinstance(tls_config, dict)): |
| _LOGGER.info("no tls in config: %s", json.dumps(tls_config)) |
| return |
| |
| cert_directory = tls_config.get("cert_directory") |
| |
| if not (cert_directory and isinstance(cert_directory, str)): |
| _LOGGER.warning("unexpected tls.cert_directory: %r", cert_directory) |
| return |
| |
| cert_directory = os.path.join( |
| os.path.dirname(os.path.dirname(os.path.realpath(__file__))), cert_directory) |
| if not (cert_directory and os.path.isdir(cert_directory)): |
| _LOGGER.warning("ignoring invalid cert_directory: %s", cert_directory) |
| return |
| |
| Config.tls_cacert_file = Config._get_tls_file_path(tls_config, cert_directory, "cacert") |
| Config.tls_server_cert_file = Config._get_tls_file_path(tls_config, cert_directory, |
| "server_cert") |
| Config.tls_private_key_file = Config._get_tls_file_path(tls_config, cert_directory, |
| "private_key") |
| Config.tls_server_ca_chain_file = Config._get_tls_file_path(tls_config, cert_directory, |
| "server_ca_chain") |
| |
| finally: |
| _LOGGER.info("tls_cacert_file = %s", Config.tls_cacert_file) |
| _LOGGER.info("tls_server_cert_file = %s", Config.tls_server_cert_file) |
| _LOGGER.info("tls_private_key_file = %s", Config.tls_private_key_file) |
| _LOGGER.info("tls_server_ca_chain_file = %s", Config.tls_server_ca_chain_file) |
| |
| @staticmethod |
| def init_config(file_path=None): |
| """read and store the config from config file""" |
| if Config._local_config.is_loaded(): |
| _LOGGER.info("config already inited: %s", Config._local_config) |
| return |
| |
| if not file_path: |
| file_path = Config.CONFIG_FILE_PATH |
| |
| loaded_config = None |
| if os.access(file_path, os.R_OK): |
| with open(file_path, 'r') as config_json: |
| loaded_config = json.load(config_json) |
| |
| if not loaded_config: |
| _LOGGER.warning("config not loaded from file: %s", file_path) |
| return |
| |
| _LOGGER.info("config loaded from file(%s): %s", file_path, Audit.json_dumps(loaded_config)) |
| logging_config = loaded_config.get("logging") |
| if logging_config: |
| logging.config.dictConfig(logging_config) |
| |
| Config.wservice_port = loaded_config.get(Config.FIELD_WSERVICE_PORT, Config.wservice_port) |
| Config.consul_url = os.environ.get( |
| "CONSUL_URL", loaded_config.get(Config.FIELD_CONSUL_URL, Config.consul_url)).rstrip("/") |
| Config.consul_timeout_in_secs = loaded_config.get(Config.CONSUL_TIMEOUT_IN_SECS) |
| if not Config.consul_timeout_in_secs or Config.consul_timeout_in_secs < 1: |
| Config.consul_timeout_in_secs = Config.DEFAULT_TIMEOUT_IN_SECS |
| |
| Config._pdp_api_version = os.environ.get( |
| Config.PDP_API_VERSION, loaded_config.get(Config.PDP_API_VERSION.lower())) |
| |
| pdp_user = os.environ.get(Config.PDP_USER) |
| pdp_pwd = os.environ.get(Config.PDP_PWD) |
| if pdp_user and pdp_pwd: |
| Config._pdp_authorization = "Basic {}".format(base64.b64encode( |
| ("{}:{}".format(pdp_user, pdp_pwd)).encode()).decode("utf-8")) |
| |
| dmaap_mr_user = os.environ.get(Config.DMAAP_MR_USER) |
| dmaap_mr_pwd = os.environ.get(Config.DMAAP_MR_PWD) |
| if dmaap_mr_user and dmaap_mr_pwd: |
| Config._dmaap_mr_authorization = "Basic {}".format(base64.b64encode( |
| ("{}:{}".format(dmaap_mr_user, dmaap_mr_pwd)).encode()).decode("utf-8")) |
| |
| local_config = loaded_config.get(Config.SERVICE_NAME_POLICY_HANDLER, {}) |
| Config.system_name = local_config.get(Config.FIELD_SYSTEM, Config.system_name) |
| |
| Config._set_tls_config(local_config.get(Config.FIELD_TLS)) |
| |
| Config._local_config.set_config(local_config, auto_commit=True) |
| |
| @staticmethod |
| def _overwrite_discovered_config(audit, discovered_config): |
| """replace the secrets in discovered_config with data from environment""" |
| changes = [] |
| if Config._pdp_authorization: |
| pdp_cfg = discovered_config.get("policy_engine", {}) |
| if pdp_cfg.get("url", "").lower().startswith("https:"): |
| pdp_cfg.get("headers", {})["Authorization"] = Config._pdp_authorization |
| changes.append("pdp_authorization") |
| |
| if Config._dmaap_mr_authorization: |
| dmaap_mr_cfg = discovered_config.get("dmaap_mr", {}) |
| if dmaap_mr_cfg.get("url", "").lower().startswith("https:"): |
| dmaap_mr_cfg.get("headers", {})["Authorization"] = Config._dmaap_mr_authorization |
| changes.append("dmaap_mr_authorization") |
| |
| if changes: |
| _LOGGER.info(audit.info("overwritten discovered config: {}".format(", ".join(changes)))) |
| |
| |
| @staticmethod |
| def discover(audit): |
| """bring the config settings from the discovery service""" |
| discovery_key = Config.system_name |
| from .discovery import DiscoveryClient |
| new_config = DiscoveryClient.get_value(audit, discovery_key) |
| |
| if not new_config or not isinstance(new_config, dict): |
| _LOGGER.warning(audit.warn("unexpected config from discovery: {}".format(new_config))) |
| return |
| |
| _LOGGER.debug(audit.debug("loaded config from discovery({}): {}".format( |
| discovery_key, Audit.json_dumps(new_config)))) |
| discovered_config = new_config.get(Config.SERVICE_NAME_POLICY_HANDLER) |
| |
| Config._overwrite_discovered_config(audit, discovered_config) |
| |
| Config.discovered_config.set_config(discovered_config) |
| _LOGGER.info(audit.info("config from discovery: {}".format(Config.discovered_config))) |
| |
| |
| @staticmethod |
| def get_tls_verify(tls_ca_mode=None): |
| """ |
| generate verify value based on tls_ca_mode |
| |
| tls_ca_mode can be one of: |
| |
| "cert_directory" - use the cacert.pem stored locally in cert_directory. |
| this is the default if cacert.pem file is found |
| |
| "os_ca_bundle" - use the public ca_bundle provided by linux system. |
| this is the default if cacert.pem file not found |
| |
| "do_not_verify" - special hack to turn off the verification by cacert and hostname |
| """ |
| if tls_ca_mode == Config.TLS_CA_MODE_DO_NOT_VERIFY: |
| return False |
| |
| if tls_ca_mode == "os_ca_bundle" or not Config.tls_cacert_file: |
| return True |
| |
| return Config.tls_cacert_file |
| |
| @staticmethod |
| def get_requests_kwargs(tls_ca_mode=None): |
| """generate kwargs with verify for requests based on the tls_ca_mode""" |
| return {Config.REQUESTS_VERIFY: Config.get_tls_verify(tls_ca_mode)} |
| |
| @staticmethod |
| def is_pdp_api_default(log_status=True): |
| """whether to use the old (2018) or the default pdp API (started in 2019)""" |
| is_default = (Config._pdp_api_version is None) |
| if log_status: |
| _LOGGER.info("_pdp_api_version(%s) default(%s)", Config._pdp_api_version, is_default) |
| return is_default |