blob: 6b7788c438bf68690fca7cad20e549d69020aff5 [file] [log] [blame]
# ================================================================================
# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============LICENSE_END=========================================================
#
# ECOMP is a trademark and service mark of AT&T Intellectual Property.
"""send policy-update notification to deployment-handler"""
import json
import logging
from copy import copy, deepcopy
import requests
from .config import Config
from .customize import CustomizerUser
from .discovery import DiscoveryClient
from .onap.audit import (REQUEST_X_ECOMP_REQUESTID, AuditHttpCode,
AuditResponseCode, Metrics)
from .policy_consts import (CATCH_UP, LATEST_POLICIES, POLICIES,
POLICY_FILTER_MATCHES, POLICY_FILTERS,
REMOVED_POLICIES)
class PolicyUpdateMessage(object):
"""class for messages to deployment-handler on policy-update"""
BYTES_IN_MB = 1 << 2 * 10
def __init__(self, latest_policies=None,
removed_policies=None, policy_filter_matches=None, catch_up=True):
"""init"""
self._catch_up = catch_up
self._latest_policies = deepcopy(latest_policies or {})
self._removed_policies = copy(removed_policies or {})
self._policy_filter_matches = deepcopy(policy_filter_matches or {})
self._message = {
CATCH_UP: self._catch_up,
LATEST_POLICIES: self._latest_policies,
REMOVED_POLICIES: self._removed_policies,
POLICY_FILTER_MATCHES: self._policy_filter_matches
}
self.msg_length = 0
self._calc_stats()
def _calc_stats(self):
"""generate the message and calc stats"""
self.msg_length = len(json.dumps(self._message))
def empty(self):
"""checks whether have any data"""
return (not self._latest_policies
and not self._removed_policies
and not self._policy_filter_matches)
def add(self, policy_id, latest_policy=None, policy_filter_ids=None, removed_policy=None):
"""add the parts from the other message to the current message"""
if not policy_id or not (latest_policy or policy_filter_ids or removed_policy):
return
if latest_policy:
self._latest_policies[policy_id] = deepcopy(latest_policy)
if policy_filter_ids:
if policy_id not in self._policy_filter_matches:
self._policy_filter_matches[policy_id] = {}
self._policy_filter_matches[policy_id].update(policy_filter_ids)
if removed_policy is not None:
self._removed_policies[policy_id] = removed_policy
self._calc_stats()
def get_message(self):
"""expose the copy of the message"""
return deepcopy(self._message)
def __str__(self):
"""to string"""
return json.dumps(self._message)
def _iter_over_removed_policies(self):
"""generator of iterator over removed_policies"""
for (policy_id, value) in self._removed_policies.items():
yield (policy_id, value)
def _iter_over_latest_policies(self):
"""generator of iterator over latest_policies and policy_filter_matches"""
for (policy_id, policy) in self._latest_policies.items():
yield (policy_id, policy, self._policy_filter_matches.get(policy_id))
def gen_segmented_messages(self, max_msg_length_mb):
"""
Break the policy-update message into a list of segmented messages.
Each segmented message should not exceed the max_msg_length_mb from config.
"""
max_msg_length_mb = (max_msg_length_mb or 10) * PolicyUpdateMessage.BYTES_IN_MB
messages = []
curr_message = PolicyUpdateMessage(catch_up=self._catch_up)
for (policy_id, value) in self._iter_over_removed_policies():
if (not curr_message.empty()
and (len(policy_id) + len(str(value)) + curr_message.msg_length
> max_msg_length_mb)):
messages.append(curr_message.get_message())
curr_message = PolicyUpdateMessage(catch_up=self._catch_up)
curr_message.add(policy_id, removed_policy=value)
for (policy_id, policy, policy_filter_ids) in self._iter_over_latest_policies():
if (not curr_message.empty()
and (2 * len(policy_id) + len(json.dumps(policy))
+ len(json.dumps(policy_filter_ids))
+ curr_message.msg_length > max_msg_length_mb)):
messages.append(curr_message.get_message())
curr_message = PolicyUpdateMessage(catch_up=self._catch_up)
curr_message.add(policy_id, latest_policy=policy, policy_filter_ids=policy_filter_ids)
if not curr_message.empty():
messages.append(curr_message.get_message())
msg_count = len(messages)
if msg_count > 1:
msg_count = "/" + str(msg_count)
for idx, msg in enumerate(messages):
msg["data_segment"] = str((idx+1)) + msg_count
return messages
class DeployHandler(object):
"""calling the deployment-handler web apis"""
_logger = logging.getLogger("policy_handler.deploy_handler")
_lazy_inited = False
_requests_session = None
_url = None
_url_policy = None
_max_msg_length_mb = 10
_target_entity = None
_custom_kwargs = None
_server_instance_uuid = None
server_instance_changed = False
@staticmethod
def _lazy_init(audit, rediscover=False):
""" set static properties """
if DeployHandler._lazy_inited and not rediscover:
return
DeployHandler._custom_kwargs = (CustomizerUser.get_customizer()
.get_deploy_handler_kwargs(audit))
if (not DeployHandler._custom_kwargs
or not isinstance(DeployHandler._custom_kwargs, dict)):
DeployHandler._custom_kwargs = {}
if not DeployHandler._requests_session:
pool_size = Config.settings.get("pool_connections", 20)
DeployHandler._requests_session = requests.Session()
DeployHandler._requests_session.mount(
'https://',
requests.adapters.HTTPAdapter(pool_connections=pool_size, pool_maxsize=pool_size)
)
DeployHandler._requests_session.mount(
'http://',
requests.adapters.HTTPAdapter(pool_connections=pool_size, pool_maxsize=pool_size)
)
config_dh = Config.settings.get("deploy_handler")
if config_dh and isinstance(config_dh, dict):
# dns based routing to deployment-handler
# config for policy-handler >= 2.4.0
# "deploy_handler" : {
# "target_entity" : "deployment_handler",
# "url" : "http://deployment_handler:8188",
# "max_msg_length_mb" : 100
# }
DeployHandler._target_entity = config_dh.get("target_entity", "deployment_handler")
DeployHandler._url = config_dh.get("url")
DeployHandler._max_msg_length_mb = config_dh.get("max_msg_length_mb",
DeployHandler._max_msg_length_mb)
DeployHandler._logger.info("dns based routing to %s: url(%s)",
DeployHandler._target_entity, DeployHandler._url)
if not DeployHandler._url:
# discover routing to deployment-handler at consul-services
if not isinstance(config_dh, dict):
# config for policy-handler <= 2.3.1
# "deploy_handler" : "deployment_handler"
DeployHandler._target_entity = str(config_dh or "deployment_handler")
DeployHandler._url = DiscoveryClient.get_service_url(audit,
DeployHandler._target_entity)
DeployHandler._url_policy = str(DeployHandler._url or "") + '/policy'
DeployHandler._logger.info(
"got %s policy url(%s)", DeployHandler._target_entity, DeployHandler._url_policy)
DeployHandler._lazy_inited = bool(DeployHandler._url)
@staticmethod
def policy_update(audit, policy_update_message, rediscover=False):
"""
segments the big policy_update_message limited by size
and sequatially sends each segment as put to deployment-handler at /policy.
param policy_update_message is of PolicyUpdateMessage type
"""
if not policy_update_message or policy_update_message.empty():
return
DeployHandler._lazy_init(audit, rediscover)
str_metrics = "policy_update {0}".format(str(policy_update_message))
metrics_total = Metrics(
aud_parent=audit,
targetEntity="{0} total policy_update".format(DeployHandler._target_entity),
targetServiceName=DeployHandler._url_policy)
metrics_total.metrics_start("started {}".format(str_metrics))
messages = policy_update_message.gen_segmented_messages(DeployHandler._max_msg_length_mb)
for message in messages:
DeployHandler._policy_update(audit, message)
if not audit.is_success():
break
metrics_total.metrics("done {}".format(str_metrics))
@staticmethod
def _policy_update(audit, message):
"""
sends the put message to deployment-handler at /policy
detects whether server_instance_changed condition on deployment-handler
that is the cause to catch_up
"""
if not message:
return
metrics = Metrics(aud_parent=audit, targetEntity=DeployHandler._target_entity,
targetServiceName=DeployHandler._url_policy)
headers = {REQUEST_X_ECOMP_REQUESTID : metrics.request_id}
log_action = "put to {0} at {1}".format(
DeployHandler._target_entity, DeployHandler._url_policy)
log_data = " msg={0} headers={1}".format(json.dumps(message),
json.dumps(headers))
log_line = log_action + log_data
DeployHandler._logger.info(log_line)
metrics.metrics_start(log_line)
if not DeployHandler._url:
error_msg = "no url found to {0}".format(log_line)
DeployHandler._logger.error(error_msg)
metrics.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value)
audit.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value)
metrics.metrics(error_msg)
return
res = None
try:
res = DeployHandler._requests_session.put(
DeployHandler._url_policy, json=message, headers=headers,
**DeployHandler._custom_kwargs
)
except Exception as ex:
error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value
if isinstance(ex, requests.exceptions.RequestException)
else AuditHttpCode.SERVER_INTERNAL_ERROR.value)
error_msg = ("failed to {0} {1}: {2}{3}"
.format(log_action, type(ex).__name__, str(ex), log_data))
DeployHandler._logger.exception(error_msg)
metrics.set_http_status_code(error_code)
audit.set_http_status_code(error_code)
metrics.metrics(error_msg)
return
metrics.set_http_status_code(res.status_code)
audit.set_http_status_code(res.status_code)
log_line = "response {0} from {1}: text={2}{3}".format(res.status_code, log_action,
res.text, log_data)
metrics.metrics(log_line)
if res.status_code != requests.codes.ok:
DeployHandler._logger.error(log_line)
return
DeployHandler._logger.info(log_line)
result = res.json() or {}
DeployHandler._server_instance_changed(result, metrics)
@staticmethod
def get_deployed_policies(audit, rediscover=False):
"""
Retrieves policies and policy-filters from components
that were deployed by deployment-handler
"""
DeployHandler._lazy_init(audit, rediscover)
metrics = Metrics(aud_parent=audit, targetEntity=DeployHandler._target_entity,
targetServiceName=DeployHandler._url_policy)
headers = {REQUEST_X_ECOMP_REQUESTID : metrics.request_id}
log_action = "get {0}: {1}".format(DeployHandler._target_entity, DeployHandler._url_policy)
log_data = " headers={}".format(json.dumps(headers))
log_line = log_action + log_data
DeployHandler._logger.info(log_line)
metrics.metrics_start(log_line)
if not DeployHandler._url:
error_msg = "no url found to {0}".format(log_line)
DeployHandler._logger.error(error_msg)
metrics.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value)
audit.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value)
metrics.metrics(error_msg)
return None, None
res = None
try:
res = DeployHandler._requests_session.get(
DeployHandler._url_policy, headers=headers,
**DeployHandler._custom_kwargs
)
except Exception as ex:
error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value
if isinstance(ex, requests.exceptions.RequestException)
else AuditHttpCode.SERVER_INTERNAL_ERROR.value)
error_msg = ("failed to {0} {1}: {2}{3}"
.format(log_action, type(ex).__name__, str(ex), log_data))
DeployHandler._logger.exception(error_msg)
metrics.set_http_status_code(error_code)
audit.set_http_status_code(error_code)
metrics.metrics(error_msg)
return None, None
metrics.set_http_status_code(res.status_code)
audit.set_http_status_code(res.status_code)
log_line = ("response {0} from {1}: text={2}{3}"
.format(res.status_code, log_action, res.text, log_data))
metrics.metrics(log_line)
if res.status_code != requests.codes.ok:
DeployHandler._logger.error(log_line)
return None, None
result = res.json() or {}
DeployHandler._server_instance_changed(result, metrics)
policies = result.get(POLICIES, {})
policy_filters = result.get(POLICY_FILTERS, {})
if not policies and not policy_filters:
audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
DeployHandler._logger.warning(audit.warn(
"found no deployed policies or policy-filters: {}".format(log_line),
error_code=AuditResponseCode.DATA_ERROR))
return policies, policy_filters
DeployHandler._logger.info(log_line)
return policies, policy_filters
@staticmethod
def _server_instance_changed(result, metrics):
"""Checks whether the deployment-handler instance changed since last call."""
prev_server_instance_uuid = DeployHandler._server_instance_uuid
DeployHandler._server_instance_uuid = result.get("server_instance_uuid")
if (prev_server_instance_uuid
and prev_server_instance_uuid != DeployHandler._server_instance_uuid):
DeployHandler.server_instance_changed = True
log_line = ("deployment_handler_changed: {1} != {0}"
.format(prev_server_instance_uuid, DeployHandler._server_instance_uuid))
metrics.info(log_line)
DeployHandler._logger.info(log_line)