blob: 235e2b6b9e74be697f141a5051be08af55218efc [file] [log] [blame]
# ================================================================================
# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============LICENSE_END=========================================================
#
# ECOMP is a trademark and service mark of AT&T Intellectual Property.
"""policy-updater thread"""
import json
import logging
from threading import Event, Lock, Thread
from .config import Config, Settings
from .deploy_handler import DeployHandler, PolicyUpdateMessage
from .onap.audit import Audit, AuditHttpCode, AuditResponseCode
from .policy_consts import (AUTO_CATCH_UP, CATCH_UP, POLICY_BODY, POLICY_ID,
POLICY_NAME, POLICY_NAMES, POLICY_VERSION)
from .policy_matcher import PolicyMatcher
from .policy_rest import PolicyRest
from .policy_utils import PolicyUtils
from .step_timer import StepTimer
class _PolicyUpdate(object):
"""Keep and consolidate the policy-updates (audit, policies_updated, policies_removed)"""
_logger = logging.getLogger("policy_handler.policy_update")
def __init__(self):
"""init and reset"""
self._audit = None
self._policies_updated = {}
self._policies_removed = {}
def reset(self):
"""resets the state"""
self.__init__()
def pop_policy_update(self):
"""
Returns the consolidated (audit, policies_updated, policies_removed)
and resets the state
"""
if not self._audit:
return None, None, None
audit = self._audit
policies_updated = self._policies_updated
policies_removed = self._policies_removed
self.reset()
return audit, policies_updated, policies_removed
def push_policy_update(self, policies_updated, policies_removed):
"""consolidate the new policies_updated, policies_removed to existing ones"""
for policy_body in policies_updated:
policy_name = policy_body.get(POLICY_NAME)
policy = PolicyUtils.convert_to_policy(policy_body)
if not policy:
continue
policy_id = policy.get(POLICY_ID)
self._policies_updated[policy_id] = policy
rm_policy_names = self._policies_removed.get(policy_id, {}).get(POLICY_NAMES)
if rm_policy_names and policy_name in rm_policy_names:
del rm_policy_names[policy_name]
for policy_body in policies_removed:
policy_name = policy_body.get(POLICY_NAME)
policy = PolicyUtils.convert_to_policy(policy_body)
if not policy:
continue
policy_id = policy.get(POLICY_ID)
if policy_id in self._policies_removed:
policy = self._policies_removed[policy_id]
if POLICY_NAMES not in policy:
policy[POLICY_NAMES] = {}
policy[POLICY_NAMES][policy_name] = True
self._policies_removed[policy_id] = policy
req_message = ("policy-update notification - updated[{0}], removed[{1}]"
.format(len(self._policies_updated),
len(self._policies_removed)))
if not self._audit:
self._audit = Audit(job_name="policy_update",
req_message=req_message,
retry_get_config=True)
else:
self._audit.req_message = req_message
self._logger.info(
"pending request_id %s for %s policies_updated %s policies_removed %s",
self._audit.request_id, req_message,
json.dumps(policies_updated), json.dumps(policies_removed))
class PolicyUpdater(Thread):
"""sequentially handle the policy-updates and catch-ups in its own policy_updater thread"""
_logger = logging.getLogger("policy_handler.policy_updater")
def __init__(self, on_reconfigure_receiver):
"""init static config of PolicyUpdater."""
Thread.__init__(self, name="policy_updater", daemon=True)
self._reconfigure_receiver = on_reconfigure_receiver
self._lock = Lock()
self._run = Event()
self._settings = Settings(CATCH_UP, Config.RECONFIGURE)
self._catch_up_timer = None
self._reconfigure_timer = None
self._aud_shutdown = None
self._aud_catch_up = None
self._aud_reconfigure = None
self._policy_update = _PolicyUpdate()
self._catch_up_interval = None
self._reconfigure_interval = None
self._set_timer_intervals()
def _set_timer_intervals(self):
"""set intervals on timers"""
self._settings.set_config(Config.discovered_config)
if not self._settings.is_changed():
self._settings.commit_change()
return False
_, catch_up = self._settings.get_by_key(CATCH_UP, {})
self._catch_up_interval = catch_up.get(Config.TIMER_INTERVAL) or 15*60
_, reconfigure = self._settings.get_by_key(Config.RECONFIGURE, {})
self._reconfigure_interval = reconfigure.get(Config.TIMER_INTERVAL) or 10*60
PolicyUpdater._logger.info(
"intervals: catch_up(%s) reconfigure(%s): %s",
self._catch_up_interval, self._reconfigure_interval, self._settings)
self._settings.commit_change()
return True
def policy_update(self, policies_updated, policies_removed):
"""enqueue the policy-updates"""
with self._lock:
self._policy_update.push_policy_update(policies_updated, policies_removed)
self._run.set()
def catch_up(self, audit=None):
"""need to bring the latest policies to DCAE-Controller"""
with self._lock:
if not self._aud_catch_up:
self._aud_catch_up = audit or Audit(req_message=AUTO_CATCH_UP)
PolicyUpdater._logger.info(
"catch_up %s request_id %s",
self._aud_catch_up.req_message, self._aud_catch_up.request_id
)
self._run.set()
def _reconfigure(self):
"""job to check for and bring in the updated config for policy-handler"""
with self._lock:
if not self._aud_reconfigure:
self._aud_reconfigure = Audit(req_message=Config.RECONFIGURE)
PolicyUpdater._logger.info(
"reconfigure %s request_id %s",
self._aud_reconfigure.req_message, self._aud_reconfigure.request_id
)
self._run.set()
def run(self):
"""wait and run the policy-update in thread"""
self._run_reconfigure_timer()
while True:
PolicyUpdater._logger.info("waiting for policy-updates...")
self._run.wait()
with self._lock:
self._run.clear()
if not self._keep_running():
break
self._on_reconfigure()
if not self._keep_running():
break
if self._on_catch_up():
continue
self._on_policy_update()
PolicyUpdater._logger.info("exit policy-updater")
def _keep_running(self):
"""thread-safe check whether to continue running"""
with self._lock:
keep_running = not self._aud_shutdown
if self._aud_shutdown:
self._aud_shutdown.audit_done()
return keep_running
def _run_catch_up_timer(self):
"""create and start the catch_up timer"""
if not self._catch_up_interval:
return
if self._catch_up_timer:
self._logger.info("next step catch_up_timer in %s", self._catch_up_interval)
self._catch_up_timer.next(self._catch_up_interval)
return
self._catch_up_timer = StepTimer(
"catch_up_timer",
self._catch_up_interval,
PolicyUpdater.catch_up,
PolicyUpdater._logger,
self
)
self._logger.info("started catch_up_timer in %s", self._catch_up_interval)
self._catch_up_timer.start()
def _run_reconfigure_timer(self):
"""create and start the reconfigure timer"""
if not self._reconfigure_interval:
return
if self._reconfigure_timer:
self._logger.info("next step reconfigure_timer in %s", self._reconfigure_interval)
self._reconfigure_timer.next(self._reconfigure_interval)
return
self._reconfigure_timer = StepTimer(
"reconfigure_timer",
self._reconfigure_interval,
PolicyUpdater._reconfigure,
PolicyUpdater._logger,
self
)
self._logger.info("started reconfigure_timer in %s", self._reconfigure_interval)
self._reconfigure_timer.start()
def _pause_catch_up_timer(self):
"""pause catch_up_timer"""
if self._catch_up_timer:
self._logger.info("pause catch_up_timer")
self._catch_up_timer.pause()
def _stop_timers(self):
"""stop and destroy the catch_up and reconfigure timers"""
if self._catch_up_timer:
self._logger.info("stopping catch_up_timer")
self._catch_up_timer.stop()
self._catch_up_timer.join()
self._catch_up_timer = None
self._logger.info("stopped catch_up_timer")
if self._reconfigure_timer:
self._logger.info("stopping reconfigure_timer")
self._reconfigure_timer.stop()
self._reconfigure_timer.join()
self._reconfigure_timer = None
self._logger.info("stopped reconfigure_timer")
def _on_reconfigure(self):
"""bring the latest config and reconfigure"""
with self._lock:
aud_reconfigure = self._aud_reconfigure
if self._aud_reconfigure:
self._aud_reconfigure = None
if not aud_reconfigure:
return
log_line = "{}({})".format(aud_reconfigure.req_message, aud_reconfigure.request_id)
reconfigure_result = ""
try:
PolicyUpdater._logger.info(log_line)
Config.discover(aud_reconfigure)
if not Config.discovered_config.is_changed():
reconfigure_result = " -- config not changed"
else:
reconfigure_result = " -- config changed for:"
if self._set_timer_intervals():
reconfigure_result += " timer_intervals"
if PolicyRest.reconfigure():
reconfigure_result += " " + Config.FIELD_POLICY_ENGINE
if DeployHandler.reconfigure(aud_reconfigure):
reconfigure_result += " " + Config.DEPLOY_HANDLER
if self._reconfigure_receiver():
reconfigure_result += " web-socket"
reconfigure_result += " -- change: {}".format(Config.discovered_config)
Config.discovered_config.commit_change()
aud_reconfigure.audit_done(result=reconfigure_result)
PolicyUpdater._logger.info(log_line + reconfigure_result)
except Exception as ex:
error_msg = "crash {} {}{}: {}: {}".format(
"_on_reconfigure", log_line, reconfigure_result, type(ex).__name__, str(ex))
PolicyUpdater._logger.exception(error_msg)
aud_reconfigure.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
aud_reconfigure.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
aud_reconfigure.audit_done(result=error_msg)
self._run_reconfigure_timer()
PolicyUpdater._logger.info("policy_handler health: %s",
json.dumps(aud_reconfigure.health(full=True)))
PolicyUpdater._logger.info("process_info: %s", json.dumps(aud_reconfigure.process_info()))
def _on_catch_up(self):
"""bring all the latest policies to DCAE-Controller"""
with self._lock:
aud_catch_up = self._aud_catch_up
if self._aud_catch_up:
self._aud_catch_up = None
self._policy_update.reset()
if not aud_catch_up:
return False
log_line = "catch_up {0} request_id {1}".format(
aud_catch_up.req_message, aud_catch_up.request_id
)
catch_up_result = ""
try:
PolicyUpdater._logger.info(log_line)
self._pause_catch_up_timer()
_, catch_up_message = PolicyMatcher.get_latest_policies(aud_catch_up)
if not catch_up_message or not aud_catch_up.is_success_or_not_found():
catch_up_result = "- not sending catch-up to deployment-handler due to errors"
PolicyUpdater._logger.warning(catch_up_result)
elif catch_up_message.empty():
catch_up_result = "- not sending empty catch-up to deployment-handler"
else:
aud_catch_up.reset_http_status_not_found()
DeployHandler.policy_update(aud_catch_up, catch_up_message)
if not aud_catch_up.is_success():
catch_up_result = "- failed to send catch-up to deployment-handler"
PolicyUpdater._logger.warning(catch_up_result)
else:
catch_up_result = "- sent catch-up to deployment-handler"
success, _, _ = aud_catch_up.audit_done(result=catch_up_result)
PolicyUpdater._logger.info(log_line + " " + catch_up_result)
except Exception as ex:
error_msg = ("{0}: crash {1} {2} at {3}: {4}"
.format(aud_catch_up.request_id, type(ex).__name__, str(ex),
"on_catch_up", log_line + " " + catch_up_result))
PolicyUpdater._logger.exception(error_msg)
aud_catch_up.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
aud_catch_up.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
aud_catch_up.audit_done(result=error_msg)
success = False
self._run_catch_up_timer()
PolicyUpdater._logger.info("policy_handler health: %s",
json.dumps(aud_catch_up.health(full=True)))
PolicyUpdater._logger.info("process_info: %s", json.dumps(aud_catch_up.process_info()))
return success
def _on_policy_update(self):
"""handle the event of policy-updates"""
result = ""
with self._lock:
audit, policies_updated, policies_removed = self._policy_update.pop_policy_update()
if not audit:
return
log_line = "request_id: {} policies_updated: {} policies_removed: {}".format(
audit.request_id, json.dumps(policies_updated), json.dumps(policies_removed))
PolicyUpdater._logger.info(log_line)
try:
(updated_policies, removed_policies,
policy_filter_matches) = PolicyMatcher.match_to_deployed_policies(
audit, policies_updated, policies_removed)
if updated_policies or removed_policies:
updated_policies, removed_policies = PolicyRest.get_latest_updated_policies(
(audit,
[(policy_id, policy.get(POLICY_BODY, {}).get(POLICY_VERSION))
for policy_id, policy in updated_policies.items()],
[(policy_id, policy.get(POLICY_NAMES, {}))
for policy_id, policy in removed_policies.items()]
))
if not audit.is_success_or_not_found():
result = "- not sending policy-updates to deployment-handler due to errors"
PolicyUpdater._logger.warning(result)
elif not updated_policies and not removed_policies:
result = "- not sending empty policy-updates to deployment-handler"
PolicyUpdater._logger.info(result)
else:
message = PolicyUpdateMessage(updated_policies, removed_policies,
policy_filter_matches, False)
log_updates = ("policies-updated[{}], removed[{}], policy_filter_matches[{}]"
.format(len(updated_policies),
len(removed_policies),
len(policy_filter_matches)))
audit.reset_http_status_not_found()
DeployHandler.policy_update(audit, message)
log_line = "request_id[{}]: {}".format(audit.request_id, str(message))
if not audit.is_success():
result = "- failed to send to deployment-handler {}".format(log_updates)
PolicyUpdater._logger.warning(result)
else:
result = "- sent to deployment-handler {}".format(log_updates)
audit.audit_done(result=result)
PolicyUpdater._logger.info(log_line + " " + result)
except Exception as ex:
error_msg = ("{0}: crash {1} {2} at {3}: {4}"
.format(audit.request_id, type(ex).__name__, str(ex),
"on_policies_update", log_line + " " + result))
PolicyUpdater._logger.exception(error_msg)
audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
if DeployHandler.server_instance_changed:
DeployHandler.server_instance_changed = False
self._pause_catch_up_timer()
self.catch_up()
def shutdown(self, audit):
"""Shutdown the policy-updater"""
PolicyUpdater._logger.info("shutdown policy-updater")
with self._lock:
self._aud_shutdown = audit
self._run.set()
self._stop_timers()
if self.is_alive():
self.join()