| # ================================================================================ |
| # Copyright (c) 2018-2020 AT&T Intellectual Property. All rights reserved. |
| # ================================================================================ |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # ============LICENSE_END========================================================= |
| # |
| |
| """ |
| policy-receiver communicates with policy-engine |
| thru web-socket to receive push notifications |
| on updates and removal of policies. |
| |
| on receiving the policy-notifications, the policy-receiver |
| passes the notifications to policy-updater |
| """ |
| |
| from .config import Config |
| from .service_activator import ServiceActivator |
| |
| |
| class PolicyReceiver(object): |
| """ |
| policy-receiver - static singleton wrapper around two threads |
| policy_updater - master thread for all scheduled actions |
| policy_listener - listens to policy-engine through DMaaP MR or web-socket |
| """ |
| _policy_updater = None |
| _policy_listener = None |
| |
| @staticmethod |
| def is_running(): |
| """check whether the policy-receiver runs""" |
| return (PolicyReceiver._policy_listener |
| and PolicyReceiver._policy_listener.is_alive() |
| and PolicyReceiver._policy_updater |
| and PolicyReceiver._policy_updater.is_alive()) |
| |
| @staticmethod |
| def _close_listener(audit): |
| """stop the notification-handler""" |
| if PolicyReceiver._policy_listener: |
| policy_listener = PolicyReceiver._policy_listener |
| PolicyReceiver._policy_listener = None |
| policy_listener.shutdown(audit) |
| |
| @staticmethod |
| def shutdown(audit): |
| """shutdown the notification-handler and policy-updater""" |
| PolicyReceiver._close_listener(audit) |
| PolicyReceiver._policy_updater.shutdown(audit) |
| |
| @staticmethod |
| def catch_up(audit): |
| """request to bring the latest policies to DCAE""" |
| PolicyReceiver._policy_updater.catch_up(audit) |
| |
| @staticmethod |
| def reconfigure(audit): |
| """request to reconfigure the updated config for policy-handler""" |
| PolicyReceiver._policy_updater.reconfigure(audit) |
| |
| @staticmethod |
| def _on_reconfigure(audit): |
| """act on reconfiguration event""" |
| active = ServiceActivator.is_active_mode_of_operation(audit) |
| |
| if not PolicyReceiver._policy_listener: |
| if active: |
| from . import pdp_client |
| PolicyReceiver._policy_listener = pdp_client.PolicyListener( |
| audit, PolicyReceiver._policy_updater |
| ) |
| PolicyReceiver._policy_listener.start() |
| return |
| |
| if not active: |
| PolicyReceiver._close_listener(audit) |
| return |
| |
| PolicyReceiver._policy_listener.reconfigure(audit) |
| |
| |
| @staticmethod |
| def run(audit): |
| """run policy_updater and policy_receiver""" |
| from .policy_updater import PolicyUpdater |
| PolicyReceiver._policy_updater = PolicyUpdater(PolicyReceiver._on_reconfigure) |
| |
| PolicyReceiver._on_reconfigure(audit) |
| |
| PolicyReceiver._policy_updater.start() |
| |
| if Config.is_pdp_api_default(): |
| audit.audit_done(result="will catch_up after draining the policy-update queue") |
| else: |
| PolicyReceiver.catch_up(audit) |