blob: 25ae3a55996a304e3d489a1206523fec78945057 [file] [log] [blame]
# ================================================================================
# Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============LICENSE_END=========================================================
#
"""read and use the config"""
import base64
import copy
import json
import logging
import logging.config
import os
from .onap.audit import Audit
from .utils import Utils
LOGS_DIR = 'logs'
try:
os.makedirs(LOGS_DIR, mode=0o770, exist_ok=True)
except Exception:
pass
logging.basicConfig(
filename=os.path.join(LOGS_DIR, 'policy_handler.log'),
format=('%(asctime)s.%(msecs)03d %(levelname)+8s ' +
'%(threadName)s %(name)s.%(funcName)s: %(message)s'),
datefmt='%Y%m%d_%H%M%S', level=logging.DEBUG)
_LOGGER = Utils.get_logger(__file__)
class Settings(object):
"""settings of module or an application
that is the config filtered by the collection of config-keys.
keeps track of changes versus the previous set_config unless committed
"""
def __init__(self, *config_keys):
"""provide the collection of top level keys in config to limit the config"""
self._config_keys = config_keys
self._changed = False
self._config = None
self._prev_config = None
def __str__(self):
"""get str of the config"""
if not self._changed:
return Audit.json_dumps({
"config_keys": self._config_keys,
"config": self._config
})
return Audit.json_dumps({
"config_keys": self._config_keys,
"changed": self._changed,
"config": self._config,
"prev_config": self._prev_config
})
def is_loaded(self):
"""whether loaded already"""
return bool(self._config)
def commit_change(self):
"""set the prev config to the latest config"""
self._prev_config = copy.deepcopy(self._config)
self._changed = False
def _set_changed(self):
"""determine whether the config changed"""
self._changed = not (self._prev_config
and Utils.are_the_same(self._prev_config, self._config,
Audit.json_dumps))
def set_config(self, config, auto_commit=False):
"""update the config"""
self.commit_change()
if isinstance(config, Settings):
config = config._config
if not isinstance(config, dict):
config = {}
self._config = copy.deepcopy(dict((k, v) for (k, v) in config.items()
if not self._config_keys or k in self._config_keys))
if auto_commit:
self.commit_change()
else:
self._set_changed()
def is_changed(self):
"""whether the config has changed"""
return self._changed
def get_by_key(self, config_key, default=None):
"""get the latest sub config by config_key and whether it has changed"""
if not config_key or not isinstance(config_key, str):
return False, default
value = copy.deepcopy(self._config.get(config_key, default))
if not self._prev_config:
return True, value
prev_value = self._prev_config.get(config_key, default)
return self._changed and not Utils.are_the_same(prev_value, value, Audit.json_dumps), value
def update(self, config_key, value=None):
"""set the latest sub config by config_key and determine whether the config has changed"""
if not config_key:
return
self._config[config_key] = copy.deepcopy(value)
self._set_changed()
class Config(object):
"""main config of the application"""
CONFIG_FILE_PATH = "etc/config.json"
LOGGER_CONFIG_FILE_PATH = "etc/common_logger.config"
SERVICE_NAME_POLICY_HANDLER = "policy_handler"
FIELD_SYSTEM = "system"
FIELD_CONSUL_URL = "consul_url"
FIELD_WSERVICE_PORT = "wservice_port"
FIELD_TLS = "tls"
FIELD_POLICY_ENGINE = "policy_engine"
DMAAP_MR = "dmaap_mr"
POOL_CONNECTIONS = "pool_connections"
DEPLOY_HANDLER = "deploy_handler"
THREAD_POOL_SIZE = "thread_pool_size"
POLICY_RETRY_COUNT = "policy_retry_count"
POLICY_RETRY_SLEEP = "policy_retry_sleep"
RECONFIGURE = "reconfigure"
TIMER_INTERVAL = "interval"
REQUESTS_VERIFY = "verify"
TLS_CA_MODE = "tls_ca_mode"
TLS_WSS_CA_MODE = "tls_wss_ca_mode"
TLS_CA_MODE_DO_NOT_VERIFY = "do_not_verify"
TIMEOUT_IN_SECS = "timeout_in_secs"
CONSUL_TIMEOUT_IN_SECS = "consul_timeout_in_secs"
WS_PING_INTERVAL_IN_SECS = "ws_ping_interval_in_secs"
DEFAULT_TIMEOUT_IN_SECS = 60
SERVICE_ACTIVATOR = "service_activator"
MODE_OF_OPERATION = "mode_of_operation"
PDP_API_VERSION = "PDP_API_VERSION"
QUERY_TIMEOUT = "timeout"
PDP_USER = "PDP_USER"
PDP_PWD = "PDP_PWD"
DMAAP_MR_USER = "DMAAP_MR_USER"
DMAAP_MR_PWD = "DMAAP_MR_PWD"
system_name = SERVICE_NAME_POLICY_HANDLER
wservice_port = 25577
_pdp_api_version = os.environ.get(PDP_API_VERSION)
consul_url = "http://consul:8500"
consul_timeout_in_secs = DEFAULT_TIMEOUT_IN_SECS
tls_cacert_file = None
tls_server_cert_file = None
tls_private_key_file = None
tls_server_ca_chain_file = None
_pdp_authorization = None
_dmaap_mr_authorization = None
_local_config = Settings()
discovered_config = Settings()
@staticmethod
def _get_tls_file_path(tls_config, cert_directory, tls_name):
"""calc file path and verify its existance"""
file_name = tls_config.get(tls_name)
if not file_name:
return None
tls_file_path = os.path.join(cert_directory, file_name)
if not os.path.isfile(tls_file_path) or not os.access(tls_file_path, os.R_OK):
_LOGGER.error("invalid %s: %s", tls_name, tls_file_path)
return None
return tls_file_path
@staticmethod
def _set_tls_config(tls_config):
"""verify and set tls certs in config"""
try:
Config.tls_cacert_file = None
Config.tls_server_cert_file = None
Config.tls_private_key_file = None
Config.tls_server_ca_chain_file = None
if not (tls_config and isinstance(tls_config, dict)):
_LOGGER.info("no tls in config: %s", json.dumps(tls_config))
return
cert_directory = tls_config.get("cert_directory")
if not (cert_directory and isinstance(cert_directory, str)):
_LOGGER.warning("unexpected tls.cert_directory: %r", cert_directory)
return
cert_directory = os.path.join(
os.path.dirname(os.path.dirname(os.path.realpath(__file__))), cert_directory)
if not (cert_directory and os.path.isdir(cert_directory)):
_LOGGER.warning("ignoring invalid cert_directory: %s", cert_directory)
return
Config.tls_cacert_file = Config._get_tls_file_path(tls_config, cert_directory, "cacert")
Config.tls_server_cert_file = Config._get_tls_file_path(tls_config, cert_directory,
"server_cert")
Config.tls_private_key_file = Config._get_tls_file_path(tls_config, cert_directory,
"private_key")
Config.tls_server_ca_chain_file = Config._get_tls_file_path(tls_config, cert_directory,
"server_ca_chain")
finally:
_LOGGER.info("tls_cacert_file = %s", Config.tls_cacert_file)
_LOGGER.info("tls_server_cert_file = %s", Config.tls_server_cert_file)
_LOGGER.info("tls_private_key_file = %s", Config.tls_private_key_file)
_LOGGER.info("tls_server_ca_chain_file = %s", Config.tls_server_ca_chain_file)
@staticmethod
def init_config(file_path=None):
"""read and store the config from config file"""
if Config._local_config.is_loaded():
_LOGGER.info("config already inited: %s", Config._local_config)
return
if not file_path:
file_path = Config.CONFIG_FILE_PATH
loaded_config = None
if os.access(file_path, os.R_OK):
with open(file_path, 'r') as config_json:
loaded_config = json.load(config_json)
if not loaded_config:
_LOGGER.warning("config not loaded from file: %s", file_path)
return
_LOGGER.info("config loaded from file(%s): %s", file_path, Audit.json_dumps(loaded_config))
logging_config = loaded_config.get("logging")
if logging_config:
logging.config.dictConfig(logging_config)
Config.wservice_port = loaded_config.get(Config.FIELD_WSERVICE_PORT, Config.wservice_port)
Config.consul_url = os.environ.get(
"CONSUL_URL", loaded_config.get(Config.FIELD_CONSUL_URL, Config.consul_url)).rstrip("/")
Config.consul_timeout_in_secs = loaded_config.get(Config.CONSUL_TIMEOUT_IN_SECS)
if not Config.consul_timeout_in_secs or Config.consul_timeout_in_secs < 1:
Config.consul_timeout_in_secs = Config.DEFAULT_TIMEOUT_IN_SECS
Config._pdp_api_version = os.environ.get(
Config.PDP_API_VERSION, loaded_config.get(Config.PDP_API_VERSION.lower()))
pdp_user = os.environ.get(Config.PDP_USER)
pdp_pwd = os.environ.get(Config.PDP_PWD)
if pdp_user and pdp_pwd:
Config._pdp_authorization = "Basic {}".format(base64.b64encode(
("{}:{}".format(pdp_user, pdp_pwd)).encode()).decode("utf-8"))
dmaap_mr_user = os.environ.get(Config.DMAAP_MR_USER)
dmaap_mr_pwd = os.environ.get(Config.DMAAP_MR_PWD)
if dmaap_mr_user and dmaap_mr_pwd:
Config._dmaap_mr_authorization = "Basic {}".format(base64.b64encode(
("{}:{}".format(dmaap_mr_user, dmaap_mr_pwd)).encode()).decode("utf-8"))
local_config = loaded_config.get(Config.SERVICE_NAME_POLICY_HANDLER, {})
Config.system_name = local_config.get(Config.FIELD_SYSTEM, Config.system_name)
Config._set_tls_config(local_config.get(Config.FIELD_TLS))
Config._local_config.set_config(local_config, auto_commit=True)
@staticmethod
def _overwrite_discovered_config(audit, discovered_config):
"""replace the secrets in discovered_config with data from environment"""
changes = []
if Config._pdp_authorization:
pdp_cfg = discovered_config.get("policy_engine", {})
if pdp_cfg.get("url", "").lower().startswith("https:"):
pdp_cfg.get("headers", {})["Authorization"] = Config._pdp_authorization
changes.append("pdp_authorization")
if Config._dmaap_mr_authorization:
dmaap_mr_cfg = discovered_config.get("dmaap_mr", {})
if dmaap_mr_cfg.get("url", "").lower().startswith("https:"):
dmaap_mr_cfg.get("headers", {})["Authorization"] = Config._dmaap_mr_authorization
changes.append("dmaap_mr_authorization")
if changes:
_LOGGER.info(audit.info("overwritten discovered config: {}".format(", ".join(changes))))
@staticmethod
def discover(audit):
"""bring the config settings from the discovery service"""
discovery_key = Config.system_name
from .discovery import DiscoveryClient
new_config = DiscoveryClient.get_value(audit, discovery_key)
if not new_config or not isinstance(new_config, dict):
_LOGGER.warning(audit.warn("unexpected config from discovery: {}".format(new_config)))
return
_LOGGER.debug(audit.debug("loaded config from discovery({}): {}".format(
discovery_key, Audit.json_dumps(new_config))))
discovered_config = new_config.get(Config.SERVICE_NAME_POLICY_HANDLER)
Config._overwrite_discovered_config(audit, discovered_config)
Config.discovered_config.set_config(discovered_config)
_LOGGER.info(audit.info("config from discovery: {}".format(Config.discovered_config)))
@staticmethod
def get_tls_verify(tls_ca_mode=None):
"""
generate verify value based on tls_ca_mode
tls_ca_mode can be one of:
"cert_directory" - use the cacert.pem stored locally in cert_directory.
this is the default if cacert.pem file is found
"os_ca_bundle" - use the public ca_bundle provided by linux system.
this is the default if cacert.pem file not found
"do_not_verify" - special hack to turn off the verification by cacert and hostname
"""
if tls_ca_mode == Config.TLS_CA_MODE_DO_NOT_VERIFY:
return False
if tls_ca_mode == "os_ca_bundle" or not Config.tls_cacert_file:
return True
return Config.tls_cacert_file
@staticmethod
def get_requests_kwargs(tls_ca_mode=None):
"""generate kwargs with verify for requests based on the tls_ca_mode"""
return {Config.REQUESTS_VERIFY: Config.get_tls_verify(tls_ca_mode)}
@staticmethod
def is_pdp_api_default(log_status=True):
"""whether to use the old (2018) or the default pdp API (started in 2019)"""
is_default = (Config._pdp_api_version is None)
if log_status:
_LOGGER.info("_pdp_api_version(%s) default(%s)", Config._pdp_api_version, is_default)
return is_default