blob: 080f544de65a9095bbd12ead3a59530e11590781 [file] [log] [blame]
Alex Shatov1369bea2018-01-10 11:00:50 -05001# org.onap.dcae
2# ================================================================================
3# Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
4# ================================================================================
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16# ============LICENSE_END=========================================================
17#
18# ECOMP is a trademark and service mark of AT&T Intellectual Property.
19
Alex Shatovf53e5e72018-01-11 11:15:56 -050020"""
21policy-receiver communicates with policy-engine
22thru web-socket to receive push notifications
23on updates and removal of policies.
24
25on receiving the policy-notifications, the policy-receiver
26filters them out by the policy scope(s) provided in policy-handler config
27and passes the notifications to policy-updater
28"""
29
Alex Shatov1369bea2018-01-10 11:00:50 -050030import json
31import logging
32import re
33import time
34from threading import Lock, Thread
35
36import websocket
37
38from .config import Config
39from .onap.audit import Audit
40from .policy_updater import PolicyUpdater
41
42LOADED_POLICIES = 'loadedPolicies'
43REMOVED_POLICIES = 'removedPolicies'
44POLICY_NAME = 'policyName'
45POLICY_VER = 'versionNo'
46
47class _PolicyReceiver(Thread):
48 """web-socket to PolicyEngine"""
49 _logger = logging.getLogger("policy_handler.policy_receiver")
50
51 def __init__(self):
52 """web-socket inside the thread to receive policy notifications from PolicyEngine"""
53 Thread.__init__(self, name="policy_receiver")
54 self.daemon = True
55
56 self._lock = Lock()
57 self._keep_running = True
58
59 config = Config.config[Config.FIELD_POLICY_ENGINE]
60 self.web_socket_url = resturl = config["url"] + config["path_pdp"]
61
62 if resturl.startswith("https:"):
63 self.web_socket_url = resturl.replace("https:", "wss:") + "notifications"
64 else:
65 self.web_socket_url = resturl.replace("http:", "ws:") + "notifications"
66
67 self._web_socket = None
68
69 scope_prefixes = [scope_prefix.replace(".", "[.]")
70 for scope_prefix in Config.config["scope_prefixes"]]
71 self._policy_scopes = re.compile("(" + "|".join(scope_prefixes) + ")")
72 _PolicyReceiver._logger.info("_policy_scopes %s", self._policy_scopes.pattern)
73 self._policy_updater = PolicyUpdater()
74 self._policy_updater.start()
75
76 def run(self):
77 """listen on web-socket and pass the policy notifications to policy-updater"""
78 websocket.enableTrace(True)
79 restarting = False
80 while True:
81 if not self._get_keep_running():
82 break
83
84 self._stop_notifications()
85
86 if restarting:
87 time.sleep(5)
88
89 _PolicyReceiver._logger.info(
90 "connecting to policy-notifications at: %s", self.web_socket_url)
91 self._web_socket = websocket.WebSocketApp(
92 self.web_socket_url,
93 on_message=self._on_pdp_message,
94 on_close=self._on_ws_close,
95 on_error=self._on_ws_error
96 )
97
98 _PolicyReceiver._logger.info("waiting for policy-notifications...")
99 self._web_socket.run_forever()
100 restarting = True
101
102 _PolicyReceiver._logger.info("exit policy-receiver")
103
104 def _get_keep_running(self):
105 """thread-safe check whether to continue running"""
106 with self._lock:
107 keep_running = self._keep_running
108 return keep_running
109
110 def _stop_notifications(self):
111 """Shuts down the AutoNotification service if running."""
112 with self._lock:
113 if self._web_socket and self._web_socket.sock and self._web_socket.sock.connected:
114 self._web_socket.close()
115 _PolicyReceiver._logger.info("Stopped receiving notifications from PDP")
116
117 def _on_pdp_message(self, _, message):
118 """received the notification from PDP"""
119 _PolicyReceiver._logger.info("Received notification message: %s", message)
120 if not message:
121 return
122 message = json.loads(message)
123
124 if not message:
125 return
126
127 policies_updated = [(policy.get(POLICY_NAME), policy.get(POLICY_VER))
128 for policy in message.get(LOADED_POLICIES, [])
129 if self._policy_scopes.match(policy.get(POLICY_NAME))]
130 policies_removed = [(policy.get(POLICY_NAME), policy.get(POLICY_VER))
131 for policy in message.get(REMOVED_POLICIES, [])
132 if self._policy_scopes.match(policy.get(POLICY_NAME))]
133
134 if not policies_updated and not policies_removed:
135 _PolicyReceiver._logger.info(
136 "no policy updated or removed for scopes %s", self._policy_scopes.pattern
137 )
138 return
139
140 audit = Audit(req_message="policy-notification - updated[{0}], removed[{1}]" \
141 .format(len(policies_updated), len(policies_removed)))
142 audit.retry_get_config = True
143 self._policy_updater.enqueue(audit, policies_updated, policies_removed)
144
145 def _on_ws_error(self, _, error):
146 """report an error"""
147 _PolicyReceiver._logger.error("policy-notification error: %s", error)
148
149 def _on_ws_close(self, _):
150 """restart web-socket on close"""
151 _PolicyReceiver._logger.info("lost connection to PDP - restarting...")
152
153 def shutdown(self, audit):
154 """Shutdown the policy-receiver"""
155 _PolicyReceiver._logger.info("shutdown policy-receiver")
156 with self._lock:
157 self._keep_running = False
158
159 self._stop_notifications()
160
161 if self.is_alive():
162 self.join()
163
164 self._policy_updater.shutdown(audit)
165
166 def catch_up(self, audit):
167 """need to bring the latest policies to DCAE-Controller"""
168 self._policy_updater.catch_up(audit)
169
170class PolicyReceiver(object):
171 """policy-receiver - static singleton wrapper"""
172 _policy_receiver = None
173
174 @staticmethod
175 def shutdown(audit):
176 """Shutdown the notification-handler"""
177 PolicyReceiver._policy_receiver.shutdown(audit)
178
179 @staticmethod
180 def catch_up(audit):
181 """bring the latest policies from policy-engine"""
182 PolicyReceiver._policy_receiver.catch_up(audit)
183
184 @staticmethod
185 def run(audit):
186 """Using policy-engine client to talk to policy engine"""
187 sub_aud = Audit(aud_parent=audit)
188 sub_aud.metrics_start("start policy receiver")
189
190 PolicyReceiver._policy_receiver = _PolicyReceiver()
191 PolicyReceiver._policy_receiver.start()
192
193 sub_aud.metrics("started policy receiver")
194
195 PolicyReceiver.catch_up(audit)