| # ================================================================================ |
| # Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved. |
| # ================================================================================ |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # ============LICENSE_END========================================================= |
| # |
| # ECOMP is a trademark and service mark of AT&T Intellectual Property. |
| |
| """policy-updater thread""" |
| |
| import json |
| import logging |
| from threading import Event, Lock, Thread |
| |
| from .config import Config, Settings |
| from .deploy_handler import DeployHandler, PolicyUpdateMessage |
| from .onap.audit import Audit, AuditHttpCode, AuditResponseCode |
| from .policy_consts import (AUTO_CATCH_UP, CATCH_UP, POLICY_BODY, POLICY_ID, |
| POLICY_NAME, POLICY_NAMES, POLICY_VERSION) |
| from .policy_matcher import PolicyMatcher |
| from .policy_rest import PolicyRest |
| from .policy_utils import PolicyUtils |
| from .step_timer import StepTimer |
| |
| |
| class _PolicyUpdate(object): |
| """Keep and consolidate the policy-updates (audit, policies_updated, policies_removed)""" |
| _logger = logging.getLogger("policy_handler.policy_update") |
| |
| def __init__(self): |
| """init and reset""" |
| self._audit = None |
| self._policies_updated = {} |
| self._policies_removed = {} |
| |
| def reset(self): |
| """resets the state""" |
| self.__init__() |
| |
| def pop_policy_update(self): |
| """ |
| Returns the consolidated (audit, policies_updated, policies_removed) |
| and resets the state |
| """ |
| if not self._audit: |
| return None, None, None |
| |
| audit = self._audit |
| policies_updated = self._policies_updated |
| policies_removed = self._policies_removed |
| |
| self.reset() |
| |
| return audit, policies_updated, policies_removed |
| |
| |
| def push_policy_update(self, policies_updated, policies_removed): |
| """consolidate the new policies_updated, policies_removed to existing ones""" |
| for policy_body in policies_updated: |
| policy_name = policy_body.get(POLICY_NAME) |
| policy = PolicyUtils.convert_to_policy(policy_body) |
| if not policy: |
| continue |
| policy_id = policy.get(POLICY_ID) |
| |
| self._policies_updated[policy_id] = policy |
| |
| rm_policy_names = self._policies_removed.get(policy_id, {}).get(POLICY_NAMES) |
| if rm_policy_names and policy_name in rm_policy_names: |
| del rm_policy_names[policy_name] |
| |
| for policy_body in policies_removed: |
| policy_name = policy_body.get(POLICY_NAME) |
| policy = PolicyUtils.convert_to_policy(policy_body) |
| if not policy: |
| continue |
| policy_id = policy.get(POLICY_ID) |
| |
| if policy_id in self._policies_removed: |
| policy = self._policies_removed[policy_id] |
| |
| if POLICY_NAMES not in policy: |
| policy[POLICY_NAMES] = {} |
| policy[POLICY_NAMES][policy_name] = True |
| self._policies_removed[policy_id] = policy |
| |
| req_message = ("policy-update notification - updated[{0}], removed[{1}]" |
| .format(len(self._policies_updated), |
| len(self._policies_removed))) |
| |
| if not self._audit: |
| self._audit = Audit(job_name="policy_update", |
| req_message=req_message, |
| retry_get_config=True) |
| else: |
| self._audit.req_message = req_message |
| |
| self._logger.info( |
| "pending request_id %s for %s policies_updated %s policies_removed %s", |
| self._audit.request_id, req_message, |
| json.dumps(policies_updated), json.dumps(policies_removed)) |
| |
| |
| class PolicyUpdater(Thread): |
| """sequentially handle the policy-updates and catch-ups in its own policy_updater thread""" |
| _logger = logging.getLogger("policy_handler.policy_updater") |
| |
| def __init__(self, on_reconfigure_receiver): |
| """init static config of PolicyUpdater.""" |
| Thread.__init__(self, name="policy_updater", daemon=True) |
| self._reconfigure_receiver = on_reconfigure_receiver |
| |
| self._lock = Lock() |
| self._run = Event() |
| self._settings = Settings(CATCH_UP, Config.RECONFIGURE) |
| |
| self._catch_up_timer = None |
| self._reconfigure_timer = None |
| |
| self._aud_shutdown = None |
| self._aud_catch_up = None |
| self._aud_reconfigure = None |
| self._policy_update = _PolicyUpdate() |
| |
| self._catch_up_interval = None |
| self._reconfigure_interval = None |
| self._set_timer_intervals() |
| |
| def _set_timer_intervals(self): |
| """set intervals on timers""" |
| self._settings.set_config(Config.discovered_config) |
| if not self._settings.is_changed(): |
| self._settings.commit_change() |
| return False |
| |
| _, catch_up = self._settings.get_by_key(CATCH_UP, {}) |
| self._catch_up_interval = catch_up.get(Config.TIMER_INTERVAL) or 15*60 |
| |
| _, reconfigure = self._settings.get_by_key(Config.RECONFIGURE, {}) |
| self._reconfigure_interval = reconfigure.get(Config.TIMER_INTERVAL) or 10*60 |
| |
| PolicyUpdater._logger.info( |
| "intervals: catch_up(%s) reconfigure(%s): %s", |
| self._catch_up_interval, self._reconfigure_interval, self._settings) |
| self._settings.commit_change() |
| return True |
| |
| def policy_update(self, policies_updated, policies_removed): |
| """enqueue the policy-updates""" |
| with self._lock: |
| self._policy_update.push_policy_update(policies_updated, policies_removed) |
| self._run.set() |
| |
| def catch_up(self, audit=None): |
| """need to bring the latest policies to DCAE-Controller""" |
| with self._lock: |
| if not self._aud_catch_up: |
| self._aud_catch_up = audit or Audit(req_message=AUTO_CATCH_UP) |
| PolicyUpdater._logger.info( |
| "catch_up %s request_id %s", |
| self._aud_catch_up.req_message, self._aud_catch_up.request_id |
| ) |
| self._run.set() |
| |
| def _reconfigure(self): |
| """job to check for and bring in the updated config for policy-handler""" |
| with self._lock: |
| if not self._aud_reconfigure: |
| self._aud_reconfigure = Audit(req_message=Config.RECONFIGURE) |
| PolicyUpdater._logger.info( |
| "reconfigure %s request_id %s", |
| self._aud_reconfigure.req_message, self._aud_reconfigure.request_id |
| ) |
| self._run.set() |
| |
| def run(self): |
| """wait and run the policy-update in thread""" |
| self._run_reconfigure_timer() |
| while True: |
| PolicyUpdater._logger.info("waiting for policy-updates...") |
| self._run.wait() |
| |
| with self._lock: |
| self._run.clear() |
| |
| if not self._keep_running(): |
| break |
| |
| self._on_reconfigure() |
| |
| if not self._keep_running(): |
| break |
| |
| if self._on_catch_up(): |
| continue |
| |
| self._on_policy_update() |
| |
| PolicyUpdater._logger.info("exit policy-updater") |
| |
| def _keep_running(self): |
| """thread-safe check whether to continue running""" |
| with self._lock: |
| keep_running = not self._aud_shutdown |
| |
| if self._aud_shutdown: |
| self._aud_shutdown.audit_done() |
| return keep_running |
| |
| def _run_catch_up_timer(self): |
| """create and start the catch_up timer""" |
| if not self._catch_up_interval: |
| return |
| |
| if self._catch_up_timer: |
| self._logger.info("next step catch_up_timer in %s", self._catch_up_interval) |
| self._catch_up_timer.next(self._catch_up_interval) |
| return |
| |
| self._catch_up_timer = StepTimer( |
| "catch_up_timer", |
| self._catch_up_interval, |
| PolicyUpdater.catch_up, |
| PolicyUpdater._logger, |
| self |
| ) |
| self._logger.info("started catch_up_timer in %s", self._catch_up_interval) |
| self._catch_up_timer.start() |
| |
| def _run_reconfigure_timer(self): |
| """create and start the reconfigure timer""" |
| if not self._reconfigure_interval: |
| return |
| |
| if self._reconfigure_timer: |
| self._logger.info("next step reconfigure_timer in %s", self._reconfigure_interval) |
| self._reconfigure_timer.next(self._reconfigure_interval) |
| return |
| |
| self._reconfigure_timer = StepTimer( |
| "reconfigure_timer", |
| self._reconfigure_interval, |
| PolicyUpdater._reconfigure, |
| PolicyUpdater._logger, |
| self |
| ) |
| self._logger.info("started reconfigure_timer in %s", self._reconfigure_interval) |
| self._reconfigure_timer.start() |
| |
| def _pause_catch_up_timer(self): |
| """pause catch_up_timer""" |
| if self._catch_up_timer: |
| self._logger.info("pause catch_up_timer") |
| self._catch_up_timer.pause() |
| |
| def _stop_timers(self): |
| """stop and destroy the catch_up and reconfigure timers""" |
| if self._catch_up_timer: |
| self._logger.info("stopping catch_up_timer") |
| self._catch_up_timer.stop() |
| self._catch_up_timer.join() |
| self._catch_up_timer = None |
| self._logger.info("stopped catch_up_timer") |
| |
| if self._reconfigure_timer: |
| self._logger.info("stopping reconfigure_timer") |
| self._reconfigure_timer.stop() |
| self._reconfigure_timer.join() |
| self._reconfigure_timer = None |
| self._logger.info("stopped reconfigure_timer") |
| |
| def _on_reconfigure(self): |
| """bring the latest config and reconfigure""" |
| with self._lock: |
| aud_reconfigure = self._aud_reconfigure |
| if self._aud_reconfigure: |
| self._aud_reconfigure = None |
| |
| if not aud_reconfigure: |
| return |
| |
| log_line = "{}({})".format(aud_reconfigure.req_message, aud_reconfigure.request_id) |
| reconfigure_result = "" |
| try: |
| PolicyUpdater._logger.info(log_line) |
| Config.discover(aud_reconfigure) |
| if not Config.discovered_config.is_changed(): |
| reconfigure_result = " -- config not changed" |
| else: |
| reconfigure_result = " -- config changed for:" |
| if self._set_timer_intervals(): |
| reconfigure_result += " timer_intervals" |
| |
| if PolicyRest.reconfigure(): |
| reconfigure_result += " " + Config.FIELD_POLICY_ENGINE |
| |
| if DeployHandler.reconfigure(aud_reconfigure): |
| reconfigure_result += " " + Config.DEPLOY_HANDLER |
| |
| if self._reconfigure_receiver(): |
| reconfigure_result += " web-socket" |
| |
| reconfigure_result += " -- change: {}".format(Config.discovered_config) |
| |
| Config.discovered_config.commit_change() |
| aud_reconfigure.audit_done(result=reconfigure_result) |
| PolicyUpdater._logger.info(log_line + reconfigure_result) |
| |
| except Exception as ex: |
| error_msg = "crash {} {}{}: {}: {}".format( |
| "_on_reconfigure", log_line, reconfigure_result, type(ex).__name__, str(ex)) |
| |
| PolicyUpdater._logger.exception(error_msg) |
| aud_reconfigure.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) |
| aud_reconfigure.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) |
| aud_reconfigure.audit_done(result=error_msg) |
| |
| self._run_reconfigure_timer() |
| |
| PolicyUpdater._logger.info("policy_handler health: %s", |
| json.dumps(aud_reconfigure.health(full=True))) |
| PolicyUpdater._logger.info("process_info: %s", json.dumps(aud_reconfigure.process_info())) |
| |
| |
| def _on_catch_up(self): |
| """bring all the latest policies to DCAE-Controller""" |
| with self._lock: |
| aud_catch_up = self._aud_catch_up |
| if self._aud_catch_up: |
| self._aud_catch_up = None |
| self._policy_update.reset() |
| |
| if not aud_catch_up: |
| return False |
| |
| log_line = "catch_up {0} request_id {1}".format( |
| aud_catch_up.req_message, aud_catch_up.request_id |
| ) |
| catch_up_result = "" |
| try: |
| PolicyUpdater._logger.info(log_line) |
| self._pause_catch_up_timer() |
| |
| _, catch_up_message = PolicyMatcher.get_latest_policies(aud_catch_up) |
| |
| if not catch_up_message or not aud_catch_up.is_success_or_not_found(): |
| catch_up_result = "- not sending catch-up to deployment-handler due to errors" |
| PolicyUpdater._logger.warning(catch_up_result) |
| elif catch_up_message.empty(): |
| catch_up_result = "- not sending empty catch-up to deployment-handler" |
| else: |
| aud_catch_up.reset_http_status_not_found() |
| DeployHandler.policy_update(aud_catch_up, catch_up_message) |
| if not aud_catch_up.is_success(): |
| catch_up_result = "- failed to send catch-up to deployment-handler" |
| PolicyUpdater._logger.warning(catch_up_result) |
| else: |
| catch_up_result = "- sent catch-up to deployment-handler" |
| success, _, _ = aud_catch_up.audit_done(result=catch_up_result) |
| PolicyUpdater._logger.info(log_line + " " + catch_up_result) |
| |
| except Exception as ex: |
| error_msg = ("{0}: crash {1} {2} at {3}: {4}" |
| .format(aud_catch_up.request_id, type(ex).__name__, str(ex), |
| "on_catch_up", log_line + " " + catch_up_result)) |
| |
| PolicyUpdater._logger.exception(error_msg) |
| aud_catch_up.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) |
| aud_catch_up.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) |
| aud_catch_up.audit_done(result=error_msg) |
| success = False |
| |
| self._run_catch_up_timer() |
| |
| PolicyUpdater._logger.info("policy_handler health: %s", |
| json.dumps(aud_catch_up.health(full=True))) |
| PolicyUpdater._logger.info("process_info: %s", json.dumps(aud_catch_up.process_info())) |
| return success |
| |
| |
| def _on_policy_update(self): |
| """handle the event of policy-updates""" |
| result = "" |
| with self._lock: |
| audit, policies_updated, policies_removed = self._policy_update.pop_policy_update() |
| |
| if not audit: |
| return |
| |
| log_line = "request_id: {} policies_updated: {} policies_removed: {}".format( |
| audit.request_id, json.dumps(policies_updated), json.dumps(policies_removed)) |
| PolicyUpdater._logger.info(log_line) |
| |
| try: |
| (updated_policies, removed_policies, |
| policy_filter_matches) = PolicyMatcher.match_to_deployed_policies( |
| audit, policies_updated, policies_removed) |
| |
| if updated_policies or removed_policies: |
| updated_policies, removed_policies = PolicyRest.get_latest_updated_policies( |
| (audit, |
| [(policy_id, policy.get(POLICY_BODY, {}).get(POLICY_VERSION)) |
| for policy_id, policy in updated_policies.items()], |
| [(policy_id, policy.get(POLICY_NAMES, {})) |
| for policy_id, policy in removed_policies.items()] |
| )) |
| |
| if not audit.is_success_or_not_found(): |
| result = "- not sending policy-updates to deployment-handler due to errors" |
| PolicyUpdater._logger.warning(result) |
| elif not updated_policies and not removed_policies: |
| result = "- not sending empty policy-updates to deployment-handler" |
| PolicyUpdater._logger.info(result) |
| else: |
| message = PolicyUpdateMessage(updated_policies, removed_policies, |
| policy_filter_matches, False) |
| log_updates = ("policies-updated[{}], removed[{}], policy_filter_matches[{}]" |
| .format(len(updated_policies), |
| len(removed_policies), |
| len(policy_filter_matches))) |
| |
| audit.reset_http_status_not_found() |
| DeployHandler.policy_update(audit, message) |
| |
| log_line = "request_id[{}]: {}".format(audit.request_id, str(message)) |
| if not audit.is_success(): |
| result = "- failed to send to deployment-handler {}".format(log_updates) |
| PolicyUpdater._logger.warning(result) |
| else: |
| result = "- sent to deployment-handler {}".format(log_updates) |
| |
| audit.audit_done(result=result) |
| PolicyUpdater._logger.info(log_line + " " + result) |
| |
| except Exception as ex: |
| error_msg = ("{0}: crash {1} {2} at {3}: {4}" |
| .format(audit.request_id, type(ex).__name__, str(ex), |
| "on_policies_update", log_line + " " + result)) |
| |
| PolicyUpdater._logger.exception(error_msg) |
| audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR) |
| audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value) |
| |
| if DeployHandler.server_instance_changed: |
| DeployHandler.server_instance_changed = False |
| self._pause_catch_up_timer() |
| self.catch_up() |
| |
| |
| def shutdown(self, audit): |
| """Shutdown the policy-updater""" |
| PolicyUpdater._logger.info("shutdown policy-updater") |
| with self._lock: |
| self._aud_shutdown = audit |
| self._run.set() |
| |
| self._stop_timers() |
| |
| if self.is_alive(): |
| self.join() |