blob: e1584a34163a456d0d4f4498df51ff42e23e3df6 [file] [log] [blame]
Alex Shatov1369bea2018-01-10 11:00:50 -05001# ================================================================================
Lusheng Ji967cc9a2018-02-12 10:45:42 -05002# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved.
Alex Shatov1369bea2018-01-10 11:00:50 -05003# ================================================================================
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15# ============LICENSE_END=========================================================
16#
17# ECOMP is a trademark and service mark of AT&T Intellectual Property.
18
Alex Shatovf53e5e72018-01-11 11:15:56 -050019"""
20policy-receiver communicates with policy-engine
21thru web-socket to receive push notifications
22on updates and removal of policies.
23
24on receiving the policy-notifications, the policy-receiver
25filters them out by the policy scope(s) provided in policy-handler config
26and passes the notifications to policy-updater
27"""
28
Alex Shatov1369bea2018-01-10 11:00:50 -050029import json
30import logging
31import re
32import time
33from threading import Lock, Thread
34
35import websocket
36
37from .config import Config
Alex Shatovf2d7bef2018-05-10 09:23:16 -040038from .onap.audit import Audit, AuditHttpCode, AuditResponseCode
Alex Shatov1369bea2018-01-10 11:00:50 -050039from .policy_updater import PolicyUpdater
40
41LOADED_POLICIES = 'loadedPolicies'
42REMOVED_POLICIES = 'removedPolicies'
43POLICY_NAME = 'policyName'
44POLICY_VER = 'versionNo'
45
46class _PolicyReceiver(Thread):
47 """web-socket to PolicyEngine"""
48 _logger = logging.getLogger("policy_handler.policy_receiver")
49
50 def __init__(self):
51 """web-socket inside the thread to receive policy notifications from PolicyEngine"""
52 Thread.__init__(self, name="policy_receiver")
53 self.daemon = True
54
55 self._lock = Lock()
56 self._keep_running = True
57
Alex Shatovd444b322018-06-21 09:19:07 -040058 config = Config.settings[Config.FIELD_POLICY_ENGINE]
Alex Shatov1369bea2018-01-10 11:00:50 -050059 self.web_socket_url = resturl = config["url"] + config["path_pdp"]
60
61 if resturl.startswith("https:"):
62 self.web_socket_url = resturl.replace("https:", "wss:") + "notifications"
63 else:
64 self.web_socket_url = resturl.replace("http:", "ws:") + "notifications"
65
66 self._web_socket = None
67
68 scope_prefixes = [scope_prefix.replace(".", "[.]")
Alex Shatovd444b322018-06-21 09:19:07 -040069 for scope_prefix in Config.settings["scope_prefixes"]]
Alex Shatov1369bea2018-01-10 11:00:50 -050070 self._policy_scopes = re.compile("(" + "|".join(scope_prefixes) + ")")
71 _PolicyReceiver._logger.info("_policy_scopes %s", self._policy_scopes.pattern)
72 self._policy_updater = PolicyUpdater()
73 self._policy_updater.start()
74
75 def run(self):
76 """listen on web-socket and pass the policy notifications to policy-updater"""
77 websocket.enableTrace(True)
78 restarting = False
79 while True:
80 if not self._get_keep_running():
81 break
82
83 self._stop_notifications()
84
85 if restarting:
86 time.sleep(5)
87
88 _PolicyReceiver._logger.info(
89 "connecting to policy-notifications at: %s", self.web_socket_url)
90 self._web_socket = websocket.WebSocketApp(
91 self.web_socket_url,
92 on_message=self._on_pdp_message,
93 on_close=self._on_ws_close,
94 on_error=self._on_ws_error
95 )
96
97 _PolicyReceiver._logger.info("waiting for policy-notifications...")
98 self._web_socket.run_forever()
99 restarting = True
100
101 _PolicyReceiver._logger.info("exit policy-receiver")
102
103 def _get_keep_running(self):
104 """thread-safe check whether to continue running"""
105 with self._lock:
106 keep_running = self._keep_running
107 return keep_running
108
109 def _stop_notifications(self):
110 """Shuts down the AutoNotification service if running."""
111 with self._lock:
112 if self._web_socket and self._web_socket.sock and self._web_socket.sock.connected:
113 self._web_socket.close()
114 _PolicyReceiver._logger.info("Stopped receiving notifications from PDP")
115
116 def _on_pdp_message(self, _, message):
117 """received the notification from PDP"""
Alex Shatovf2d7bef2018-05-10 09:23:16 -0400118 try:
119 _PolicyReceiver._logger.info("Received notification message: %s", message)
120 if not message:
121 return
122 message = json.loads(message)
Alex Shatov1369bea2018-01-10 11:00:50 -0500123
Alex Shatovf2d7bef2018-05-10 09:23:16 -0400124 if not message or not isinstance(message, dict):
Alex Shatovc9ec2312018-06-14 12:06:42 -0400125 _PolicyReceiver._logger.warning("unexpected message from PDP: %s",
126 json.dumps(message))
Alex Shatovf2d7bef2018-05-10 09:23:16 -0400127 return
Alex Shatov1369bea2018-01-10 11:00:50 -0500128
Alex Shatovf2d7bef2018-05-10 09:23:16 -0400129 policies_updated = [(policy.get(POLICY_NAME), policy.get(POLICY_VER))
130 for policy in message.get(LOADED_POLICIES, [])
131 if self._policy_scopes.match(policy.get(POLICY_NAME))]
132 policies_removed = [(policy.get(POLICY_NAME), policy.get(POLICY_VER))
133 for policy in message.get(REMOVED_POLICIES, [])
134 if self._policy_scopes.match(policy.get(POLICY_NAME))]
Alex Shatov1369bea2018-01-10 11:00:50 -0500135
Alex Shatovf2d7bef2018-05-10 09:23:16 -0400136 if not policies_updated and not policies_removed:
137 _PolicyReceiver._logger.info("no policy updated or removed for scopes %s",
138 self._policy_scopes.pattern)
139 return
Alex Shatov1369bea2018-01-10 11:00:50 -0500140
Alex Shatovf2d7bef2018-05-10 09:23:16 -0400141 audit = Audit(job_name="policy_update",
142 req_message="policy-notification - updated[{0}], removed[{1}]"
143 .format(len(policies_updated), len(policies_removed)),
144 retry_get_config=True)
145 self._policy_updater.enqueue(audit, policies_updated, policies_removed)
146 except Exception as ex:
147 error_msg = "crash {} {} at {}: {}".format(type(ex).__name__, str(ex),
148 "on_pdp_message", json.dumps(message))
149
150 _PolicyReceiver._logger.exception(error_msg)
151 audit.fatal(error_msg, error_code=AuditResponseCode.BUSINESS_PROCESS_ERROR)
152 audit.set_http_status_code(AuditHttpCode.SERVER_INTERNAL_ERROR.value)
153
Alex Shatov1369bea2018-01-10 11:00:50 -0500154
155 def _on_ws_error(self, _, error):
156 """report an error"""
157 _PolicyReceiver._logger.error("policy-notification error: %s", error)
158
159 def _on_ws_close(self, _):
160 """restart web-socket on close"""
161 _PolicyReceiver._logger.info("lost connection to PDP - restarting...")
162
163 def shutdown(self, audit):
164 """Shutdown the policy-receiver"""
165 _PolicyReceiver._logger.info("shutdown policy-receiver")
166 with self._lock:
167 self._keep_running = False
168
169 self._stop_notifications()
170
171 if self.is_alive():
172 self.join()
173
174 self._policy_updater.shutdown(audit)
175
176 def catch_up(self, audit):
177 """need to bring the latest policies to DCAE-Controller"""
178 self._policy_updater.catch_up(audit)
179
180class PolicyReceiver(object):
181 """policy-receiver - static singleton wrapper"""
182 _policy_receiver = None
183
184 @staticmethod
185 def shutdown(audit):
186 """Shutdown the notification-handler"""
187 PolicyReceiver._policy_receiver.shutdown(audit)
188
189 @staticmethod
190 def catch_up(audit):
191 """bring the latest policies from policy-engine"""
192 PolicyReceiver._policy_receiver.catch_up(audit)
193
194 @staticmethod
195 def run(audit):
196 """Using policy-engine client to talk to policy engine"""
Alex Shatov1369bea2018-01-10 11:00:50 -0500197 PolicyReceiver._policy_receiver = _PolicyReceiver()
198 PolicyReceiver._policy_receiver.start()
199
Alex Shatov1369bea2018-01-10 11:00:50 -0500200 PolicyReceiver.catch_up(audit)