blob: 39c26a5e699abf3faed92fdad4af395d7329d61b [file] [log] [blame]
# ================================================================================
# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============LICENSE_END=========================================================
#
# ECOMP is a trademark and service mark of AT&T Intellectual Property.
"""policy-client communicates with policy-engine thru REST API"""
import copy
import json
import logging
import time
from multiprocessing.dummy import Pool as ThreadPool
import requests
from .config import Config
from .onap.audit import (REQUEST_X_ECOMP_REQUESTID, AuditHttpCode,
AuditResponseCode, Metrics)
from .policy_consts import (ERRORED_POLICIES, LATEST_POLICIES, POLICY_BODY,
POLICY_CONFIG, POLICY_FILTER, POLICY_FILTERS,
POLICY_ID, POLICY_NAME)
from .policy_utils import PolicyUtils
class PolicyRest(object):
"""using the http API to policy-engine"""
_logger = logging.getLogger("policy_handler.policy_rest")
_lazy_inited = False
POLICY_GET_CONFIG = 'getConfig'
PDP_CONFIG_STATUS = "policyConfigStatus"
PDP_CONFIG_RETRIEVED = "CONFIG_RETRIEVED"
PDP_CONFIG_NOT_FOUND = "CONFIG_NOT_FOUND"
PDP_CONFIG_MESSAGE = "policyConfigMessage"
PDP_NO_RESPONSE_RECEIVED = "No Response Received"
PDP_STATUS_CODE_ERROR = 400
PDP_DATA_NOT_FOUND = "PE300 - Data Issue: Incorrect Params passed: Decision not a Permit."
MIN_VERSION_EXPECTED = "min_version_expected"
IGNORE_POLICY_NAMES = "ignore_policy_names"
_requests_session = None
_url_get_config = None
_headers = None
_target_entity = None
_thread_pool_size = 4
_policy_retry_count = 1
_policy_retry_sleep = 0
@staticmethod
def _lazy_init():
"""init static config"""
if PolicyRest._lazy_inited:
return
PolicyRest._lazy_inited = True
config = Config.settings[Config.FIELD_POLICY_ENGINE]
pool_size = Config.settings.get("pool_connections", 20)
PolicyRest._requests_session = requests.Session()
PolicyRest._requests_session.mount(
'https://',
requests.adapters.HTTPAdapter(pool_connections=pool_size, pool_maxsize=pool_size)
)
PolicyRest._requests_session.mount(
'http://',
requests.adapters.HTTPAdapter(pool_connections=pool_size, pool_maxsize=pool_size)
)
PolicyRest._url_get_config = (config["url"] + config["path_api"]
+ PolicyRest.POLICY_GET_CONFIG)
PolicyRest._headers = config["headers"]
PolicyRest._target_entity = config.get("target_entity", Config.FIELD_POLICY_ENGINE)
PolicyRest._thread_pool_size = Config.settings.get("thread_pool_size", 4)
if PolicyRest._thread_pool_size < 2:
PolicyRest._thread_pool_size = 2
PolicyRest._policy_retry_count = Config.settings.get("policy_retry_count", 1) or 1
PolicyRest._policy_retry_sleep = Config.settings.get("policy_retry_sleep", 0)
PolicyRest._logger.info(
"PolicyClient url(%s) headers(%s)",
PolicyRest._url_get_config, Metrics.log_json_dumps(PolicyRest._headers))
@staticmethod
def _pdp_get_config(audit, json_body):
"""Communication with the policy-engine"""
metrics = Metrics(aud_parent=audit, targetEntity=PolicyRest._target_entity,
targetServiceName=PolicyRest._url_get_config)
msg = json.dumps(json_body)
headers = copy.copy(PolicyRest._headers)
headers[REQUEST_X_ECOMP_REQUESTID] = metrics.request_id
headers_str = Metrics.log_json_dumps(headers)
log_line = "post to PDP {0} msg={1} headers={2}".format(
PolicyRest._url_get_config, msg, headers_str)
metrics.metrics_start(log_line)
PolicyRest._logger.info(log_line)
res = None
try:
res = PolicyRest._requests_session.post(
PolicyRest._url_get_config, json=json_body, headers=headers)
except Exception as ex:
error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value
if isinstance(ex, requests.exceptions.RequestException)
else AuditHttpCode.SERVER_INTERNAL_ERROR.value)
error_msg = (
"failed to post to PDP {0} {1}: {2} msg={3} headers={4}"
.format(PolicyRest._url_get_config, type(ex).__name__, str(ex), msg, headers_str))
PolicyRest._logger.exception(error_msg)
metrics.set_http_status_code(error_code)
audit.set_http_status_code(error_code)
metrics.metrics(error_msg)
return (error_code, None)
log_line = "response from PDP to post {0}: {1} msg={2} text={3} headers={4}".format(
PolicyRest._url_get_config, res.status_code, msg, res.text,
Metrics.log_json_dumps(dict(res.request.headers.items())))
status_code, res_data = PolicyRest._extract_pdp_res_data(audit, metrics, log_line, res)
if status_code:
return status_code, res_data
metrics.set_http_status_code(res.status_code)
metrics.metrics(log_line)
PolicyRest._logger.info(log_line)
return res.status_code, res_data
@staticmethod
def _extract_pdp_res_data(audit, metrics, log_line, res):
"""special treatment of pdp response"""
res_data = None
if res.status_code == requests.codes.ok:
res_data = res.json()
if res_data and isinstance(res_data, list) and len(res_data) == 1:
rslt = res_data[0]
if rslt and not rslt.get(POLICY_NAME):
res_data = None
if rslt.get(PolicyRest.PDP_CONFIG_MESSAGE) == PolicyRest.PDP_NO_RESPONSE_RECEIVED:
error_code = AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value
error_msg = "unexpected {0}".format(log_line)
PolicyRest._logger.error(error_msg)
metrics.set_http_status_code(error_code)
audit.set_http_status_code(error_code)
metrics.metrics(error_msg)
return error_code, None
return None, res_data
if res.status_code == PolicyRest.PDP_STATUS_CODE_ERROR:
try:
res_data = res.json()
except ValueError:
return None, None
if not res_data or not isinstance(res_data, list) or len(res_data) != 1:
return None, None
rslt = res_data[0]
if (rslt and not rslt.get(POLICY_NAME)
and rslt.get(PolicyRest.PDP_CONFIG_STATUS) == PolicyRest.PDP_CONFIG_NOT_FOUND
and rslt.get(PolicyRest.PDP_CONFIG_MESSAGE) == PolicyRest.PDP_DATA_NOT_FOUND):
status_code = AuditHttpCode.DATA_NOT_FOUND_ERROR.value
info_msg = "not found {0}".format(log_line)
PolicyRest._logger.info(info_msg)
metrics.set_http_status_code(status_code)
metrics.metrics(info_msg)
return status_code, None
return None, None
@staticmethod
def _validate_policy(policy):
"""Validates the config on policy"""
if not policy:
return
policy_body = policy.get(POLICY_BODY)
return bool(
policy_body
and policy_body.get(PolicyRest.PDP_CONFIG_STATUS) == PolicyRest.PDP_CONFIG_RETRIEVED
and policy_body.get(POLICY_CONFIG)
)
@staticmethod
def get_latest_policy(aud_policy_id):
"""safely try retrieving the latest policy for the policy_id from the policy-engine"""
audit, policy_id, min_version_expected, ignore_policy_names = aud_policy_id
str_metrics = "policy_id({0}), min_version_expected({1}) ignore_policy_names({2})".format(
policy_id, min_version_expected, json.dumps(ignore_policy_names))
try:
return PolicyRest._get_latest_policy(
audit, policy_id, min_version_expected, ignore_policy_names, str_metrics)
except Exception as ex:
error_msg = ("{0}: crash {1} {2} at {3}: {4}"
.format(audit.request_id, type(ex).__name__, str(ex),
"get_latest_policy", str_metrics))
PolicyRest._logger.exception(error_msg)
audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
return None
@staticmethod
def _get_latest_policy(audit, policy_id,
min_version_expected, ignore_policy_names, str_metrics):
"""retry several times getting the latest policy for the policy_id from the policy-engine"""
PolicyRest._lazy_init()
latest_policy = None
status_code = 0
retry_get_config = audit.kwargs.get("retry_get_config")
expect_policy_removed = (ignore_policy_names and not min_version_expected)
for retry in range(1, PolicyRest._policy_retry_count + 1):
PolicyRest._logger.debug(str_metrics)
done, latest_policy, status_code = PolicyRest._get_latest_policy_once(
audit, policy_id, min_version_expected, ignore_policy_names,
expect_policy_removed)
if done or not retry_get_config or not PolicyRest._policy_retry_sleep:
break
if retry == PolicyRest._policy_retry_count:
audit.warn("gave up retrying {0} from PDP after #{1} for policy_id={2}"
.format(PolicyRest._url_get_config, retry, policy_id),
error_code=AuditResponseCode.DATA_ERROR)
break
audit.warn(
"retry #{0} {1} from PDP in {2} secs for policy_id={3}".format(
retry, PolicyRest._url_get_config,
PolicyRest._policy_retry_sleep, policy_id),
error_code=AuditResponseCode.DATA_ERROR)
time.sleep(PolicyRest._policy_retry_sleep)
if (expect_policy_removed and not latest_policy
and AuditHttpCode.RESPONSE_ERROR.value == status_code):
audit.set_http_status_code(AuditHttpCode.HTTP_OK.value)
return None
audit.set_http_status_code(status_code)
if not PolicyRest._validate_policy(latest_policy):
audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
audit.error(
"received invalid policy from PDP: {0}".format(json.dumps(latest_policy)),
error_code=AuditResponseCode.DATA_ERROR)
return latest_policy
@staticmethod
def _get_latest_policy_once(audit, policy_id,
min_version_expected, ignore_policy_names,
expect_policy_removed):
"""single attempt to get the latest policy for the policy_id from the policy-engine"""
status_code, policy_bodies = PolicyRest._pdp_get_config(audit, {POLICY_NAME:policy_id})
PolicyRest._logger.debug("%s %s policy_bodies: %s",
status_code, policy_id, json.dumps(policy_bodies or []))
latest_policy = PolicyUtils.select_latest_policy(
policy_bodies, min_version_expected, ignore_policy_names
)
if not latest_policy and not expect_policy_removed:
audit.error("received unexpected policy data from PDP for policy_id={0}: {1}"
.format(policy_id, json.dumps(policy_bodies or [])),
error_code=AuditResponseCode.DATA_ERROR)
done = bool(latest_policy
or (expect_policy_removed and not policy_bodies)
or audit.is_serious_error(status_code))
return done, latest_policy, status_code
@staticmethod
def get_latest_updated_policies(aud_policy_updates):
"""safely try retrieving the latest policies for the list of policy_names"""
audit, policies_updated, policies_removed = aud_policy_updates
if not policies_updated and not policies_removed:
return None, None
str_metrics = "policies_updated[{0}]: {1} policies_removed[{2}]: {3}".format(
len(policies_updated), json.dumps(policies_updated),
len(policies_removed), json.dumps(policies_removed))
try:
return PolicyRest._get_latest_updated_policies(
audit, str_metrics, policies_updated, policies_removed)
except Exception as ex:
error_msg = ("{0}: crash {1} {2} at {3}: {4}"
.format(audit.request_id, type(ex).__name__, str(ex),
"get_latest_updated_policies", str_metrics))
PolicyRest._logger.exception(error_msg)
audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
return None, None
@staticmethod
def _get_latest_updated_policies(audit, str_metrics, policies_updated, policies_removed):
"""Get the latest policies of the list of policy_names from the policy-engine"""
PolicyRest._lazy_init()
metrics_total = Metrics(
aud_parent=audit,
targetEntity="{0} total get_latest_updated_policies".format(PolicyRest._target_entity),
targetServiceName=PolicyRest._url_get_config)
metrics_total.metrics_start("get_latest_updated_policies {0}".format(str_metrics))
PolicyRest._logger.debug(str_metrics)
policies_to_find = {}
for (policy_id, policy_version) in policies_updated:
if not policy_id or not policy_version or not policy_version.isdigit():
continue
policy = policies_to_find.get(policy_id)
if not policy:
policies_to_find[policy_id] = {
POLICY_ID: policy_id,
PolicyRest.MIN_VERSION_EXPECTED: int(policy_version),
PolicyRest.IGNORE_POLICY_NAMES: {}
}
continue
if int(policy[PolicyRest.MIN_VERSION_EXPECTED]) < int(policy_version):
policy[PolicyRest.MIN_VERSION_EXPECTED] = int(policy_version)
for (policy_id, policy_names) in policies_removed:
if not policy_id:
continue
policy = policies_to_find.get(policy_id)
if not policy:
policies_to_find[policy_id] = {
POLICY_ID: policy_id,
PolicyRest.IGNORE_POLICY_NAMES: policy_names
}
continue
policy[PolicyRest.IGNORE_POLICY_NAMES].update(policy_names)
apns = [(audit, policy_id,
policy_to_find.get(PolicyRest.MIN_VERSION_EXPECTED),
policy_to_find.get(PolicyRest.IGNORE_POLICY_NAMES))
for (policy_id, policy_to_find) in policies_to_find.items()]
policies = None
apns_length = len(apns)
if apns_length == 1:
policies = [PolicyRest.get_latest_policy(apns[0])]
else:
pool = ThreadPool(min(PolicyRest._thread_pool_size, apns_length))
policies = pool.map(PolicyRest.get_latest_policy, apns)
pool.close()
pool.join()
metrics_total.metrics("result get_latest_updated_policies {0}: {1} {2}"
.format(str_metrics, len(policies), json.dumps(policies)))
updated_policies = dict((policy[POLICY_ID], policy)
for policy in policies
if policy and policy.get(POLICY_ID))
removed_policies = dict((policy_id, True)
for (policy_id, policy_to_find) in policies_to_find.items()
if not policy_to_find.get(PolicyRest.MIN_VERSION_EXPECTED)
and policy_to_find.get(PolicyRest.IGNORE_POLICY_NAMES)
and policy_id not in updated_policies)
errored_policies = dict((policy_id, policy_to_find)
for (policy_id, policy_to_find) in policies_to_find.items()
if policy_id not in updated_policies
and policy_id not in removed_policies)
PolicyRest._logger.debug(
"result updated_policies %s, removed_policies %s, errored_policies %s",
json.dumps(updated_policies), json.dumps(removed_policies),
json.dumps(errored_policies))
if errored_policies:
audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
audit.error(
"errored_policies in PDP: {0}".format(json.dumps(errored_policies)),
error_code=AuditResponseCode.DATA_ERROR)
return updated_policies, removed_policies
@staticmethod
def _get_latest_policies(aud_policy_filter):
"""get the latest policies by policy_filter from the policy-engine"""
audit, policy_filter = aud_policy_filter
try:
str_policy_filter = json.dumps(policy_filter)
PolicyRest._logger.debug("%s", str_policy_filter)
status_code, policy_bodies = PolicyRest._pdp_get_config(audit, policy_filter)
PolicyRest._logger.debug("%s policy_bodies: %s %s", status_code,
str_policy_filter, json.dumps(policy_bodies or []))
latest_policies = PolicyUtils.select_latest_policies(policy_bodies)
if not latest_policies:
audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
audit.warn(
"received no policies from PDP for policy_filter {0}: {1}"
.format(str_policy_filter, json.dumps(policy_bodies or [])),
error_code=AuditResponseCode.DATA_ERROR)
return None, latest_policies
audit.set_http_status_code(status_code)
valid_policies = {}
errored_policies = {}
for (policy_id, policy) in latest_policies.items():
if PolicyRest._validate_policy(policy):
valid_policies[policy_id] = policy
else:
errored_policies[policy_id] = policy
return valid_policies, errored_policies
except Exception as ex:
error_msg = ("{0}: crash {1} {2} at {3}: policy_filter({4})"
.format(audit.request_id, type(ex).__name__, str(ex),
"_get_latest_policies", json.dumps(policy_filter)))
PolicyRest._logger.exception(error_msg)
audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
return None, None
@staticmethod
def get_latest_policies(audit, policy_filter=None, policy_filters=None):
"""Get the latest policies by policy-filter(s) from the policy-engine"""
result = {}
aud_policy_filters = None
str_policy_filters = None
str_metrics = None
target_entity = None
try:
PolicyRest._lazy_init()
if policy_filter:
aud_policy_filters = [(audit, policy_filter)]
str_policy_filters = json.dumps(policy_filter)
str_metrics = "get_latest_policies for policy_filter {0}".format(
str_policy_filters)
target_entity = ("{0} total get_latest_policies by policy_filter"
.format(PolicyRest._target_entity))
result[POLICY_FILTER] = copy.deepcopy(policy_filter)
elif policy_filters:
aud_policy_filters = [
(audit, policy_filter)
for policy_filter in policy_filters
]
str_policy_filters = json.dumps(policy_filters)
str_metrics = "get_latest_policies for policy_filters {0}".format(
str_policy_filters)
target_entity = ("{0} total get_latest_policies by policy_filters"
.format(PolicyRest._target_entity))
result[POLICY_FILTERS] = copy.deepcopy(policy_filters)
else:
return result
PolicyRest._logger.debug("%s", str_policy_filters)
metrics_total = Metrics(aud_parent=audit, targetEntity=target_entity,
targetServiceName=PolicyRest._url_get_config)
metrics_total.metrics_start(str_metrics)
latest_policies = None
apfs_length = len(aud_policy_filters)
if apfs_length == 1:
latest_policies = [PolicyRest._get_latest_policies(aud_policy_filters[0])]
else:
pool = ThreadPool(min(PolicyRest._thread_pool_size, apfs_length))
latest_policies = pool.map(PolicyRest._get_latest_policies, aud_policy_filters)
pool.close()
pool.join()
metrics_total.metrics("total result {0}: {1} {2}".format(
str_metrics, len(latest_policies), json.dumps(latest_policies)))
# latest_policies == [(valid_policies, errored_policies), ...]
result[LATEST_POLICIES] = dict(
pair for (vps, _) in latest_policies if vps for pair in vps.items())
result[ERRORED_POLICIES] = dict(
pair for (_, eps) in latest_policies if eps for pair in eps.items())
PolicyRest._logger.debug("got policies for policy_filters: %s. result: %s",
str_policy_filters, json.dumps(result))
return result
except Exception as ex:
error_msg = ("{0}: crash {1} {2} at {3}: {4}"
.format(audit.request_id, type(ex).__name__, str(ex),
"get_latest_policies", str_metrics))
PolicyRest._logger.exception(error_msg)
audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
return None