blob: 6b7788c438bf68690fca7cad20e549d69020aff5 [file] [log] [blame]
alex_sh9d980ce2017-08-23 17:30:56 -04001# ================================================================================
Alex Shatovd80b5d52018-02-05 13:01:14 -05002# Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
alex_sh9d980ce2017-08-23 17:30:56 -04003# ================================================================================
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15# ============LICENSE_END=========================================================
16#
17# ECOMP is a trademark and service mark of AT&T Intellectual Property.
18
Alex Shatovd7f34d42018-08-07 12:11:35 -040019"""send policy-update notification to deployment-handler"""
Alex Shatovf53e5e72018-01-11 11:15:56 -050020
alex_sh9d980ce2017-08-23 17:30:56 -040021import json
Alex Shatovb9b955c2018-03-08 13:12:23 -050022import logging
Alex Shatovd7f34d42018-08-07 12:11:35 -040023from copy import copy, deepcopy
Alex Shatovb9b955c2018-03-08 13:12:23 -050024
alex_sh9d980ce2017-08-23 17:30:56 -040025import requests
26
27from .config import Config
Alex Shatov14411ac2018-03-20 10:55:03 -040028from .customize import CustomizerUser
alex_sh9d980ce2017-08-23 17:30:56 -040029from .discovery import DiscoveryClient
Alex Shatovd7f34d42018-08-07 12:11:35 -040030from .onap.audit import (REQUEST_X_ECOMP_REQUESTID, AuditHttpCode,
31 AuditResponseCode, Metrics)
32from .policy_consts import (CATCH_UP, LATEST_POLICIES, POLICIES,
33 POLICY_FILTER_MATCHES, POLICY_FILTERS,
34 REMOVED_POLICIES)
alex_sh9d980ce2017-08-23 17:30:56 -040035
Alex Shatovd7f34d42018-08-07 12:11:35 -040036
37class PolicyUpdateMessage(object):
38 """class for messages to deployment-handler on policy-update"""
39 BYTES_IN_MB = 1 << 2 * 10
40
41 def __init__(self, latest_policies=None,
42 removed_policies=None, policy_filter_matches=None, catch_up=True):
43 """init"""
44 self._catch_up = catch_up
45 self._latest_policies = deepcopy(latest_policies or {})
46 self._removed_policies = copy(removed_policies or {})
47 self._policy_filter_matches = deepcopy(policy_filter_matches or {})
48
49 self._message = {
50 CATCH_UP: self._catch_up,
51 LATEST_POLICIES: self._latest_policies,
52 REMOVED_POLICIES: self._removed_policies,
53 POLICY_FILTER_MATCHES: self._policy_filter_matches
54 }
55 self.msg_length = 0
56 self._calc_stats()
57
58 def _calc_stats(self):
59 """generate the message and calc stats"""
60 self.msg_length = len(json.dumps(self._message))
61
62 def empty(self):
63 """checks whether have any data"""
64 return (not self._latest_policies
65 and not self._removed_policies
66 and not self._policy_filter_matches)
67
68 def add(self, policy_id, latest_policy=None, policy_filter_ids=None, removed_policy=None):
69 """add the parts from the other message to the current message"""
70 if not policy_id or not (latest_policy or policy_filter_ids or removed_policy):
71 return
72
73 if latest_policy:
74 self._latest_policies[policy_id] = deepcopy(latest_policy)
75
76 if policy_filter_ids:
77 if policy_id not in self._policy_filter_matches:
78 self._policy_filter_matches[policy_id] = {}
79 self._policy_filter_matches[policy_id].update(policy_filter_ids)
80
81 if removed_policy is not None:
82 self._removed_policies[policy_id] = removed_policy
83
84 self._calc_stats()
85
86 def get_message(self):
87 """expose the copy of the message"""
88 return deepcopy(self._message)
89
90 def __str__(self):
91 """to string"""
92 return json.dumps(self._message)
93
94 def _iter_over_removed_policies(self):
95 """generator of iterator over removed_policies"""
96 for (policy_id, value) in self._removed_policies.items():
97 yield (policy_id, value)
98
99 def _iter_over_latest_policies(self):
100 """generator of iterator over latest_policies and policy_filter_matches"""
101 for (policy_id, policy) in self._latest_policies.items():
102 yield (policy_id, policy, self._policy_filter_matches.get(policy_id))
103
104 def gen_segmented_messages(self, max_msg_length_mb):
105 """
106 Break the policy-update message into a list of segmented messages.
107
108 Each segmented message should not exceed the max_msg_length_mb from config.
109 """
110 max_msg_length_mb = (max_msg_length_mb or 10) * PolicyUpdateMessage.BYTES_IN_MB
111
112 messages = []
113 curr_message = PolicyUpdateMessage(catch_up=self._catch_up)
114
115 for (policy_id, value) in self._iter_over_removed_policies():
116 if (not curr_message.empty()
117 and (len(policy_id) + len(str(value)) + curr_message.msg_length
118 > max_msg_length_mb)):
119 messages.append(curr_message.get_message())
120 curr_message = PolicyUpdateMessage(catch_up=self._catch_up)
121 curr_message.add(policy_id, removed_policy=value)
122
123 for (policy_id, policy, policy_filter_ids) in self._iter_over_latest_policies():
124 if (not curr_message.empty()
125 and (2 * len(policy_id) + len(json.dumps(policy))
126 + len(json.dumps(policy_filter_ids))
127 + curr_message.msg_length > max_msg_length_mb)):
128 messages.append(curr_message.get_message())
129 curr_message = PolicyUpdateMessage(catch_up=self._catch_up)
130 curr_message.add(policy_id, latest_policy=policy, policy_filter_ids=policy_filter_ids)
131
132 if not curr_message.empty():
133 messages.append(curr_message.get_message())
134
135 msg_count = len(messages)
136 if msg_count > 1:
137 msg_count = "/" + str(msg_count)
138 for idx, msg in enumerate(messages):
139 msg["data_segment"] = str((idx+1)) + msg_count
140
141 return messages
142
Alex Shatov541b5bc2017-09-13 16:22:17 -0400143
alex_sh9d980ce2017-08-23 17:30:56 -0400144class DeployHandler(object):
Alex Shatovd7f34d42018-08-07 12:11:35 -0400145 """calling the deployment-handler web apis"""
alex_sh9d980ce2017-08-23 17:30:56 -0400146 _logger = logging.getLogger("policy_handler.deploy_handler")
alex_sh9d980ce2017-08-23 17:30:56 -0400147 _lazy_inited = False
Alex Shatov541b5bc2017-09-13 16:22:17 -0400148
149 _requests_session = None
alex_sh9d980ce2017-08-23 17:30:56 -0400150 _url = None
Alex Shatov14411ac2018-03-20 10:55:03 -0400151 _url_policy = None
Alex Shatovd7f34d42018-08-07 12:11:35 -0400152 _max_msg_length_mb = 10
alex_sh9d980ce2017-08-23 17:30:56 -0400153 _target_entity = None
Alex Shatovb9b955c2018-03-08 13:12:23 -0500154 _custom_kwargs = None
Alex Shatova2b26292018-03-13 17:50:51 -0400155 _server_instance_uuid = None
Alex Shatovd7f34d42018-08-07 12:11:35 -0400156 server_instance_changed = False
alex_sh9d980ce2017-08-23 17:30:56 -0400157
158 @staticmethod
Alex Shatovf2d7bef2018-05-10 09:23:16 -0400159 def _lazy_init(audit, rediscover=False):
alex_sh9d980ce2017-08-23 17:30:56 -0400160 """ set static properties """
Alex Shatovf2d7bef2018-05-10 09:23:16 -0400161 if DeployHandler._lazy_inited and not rediscover:
alex_sh9d980ce2017-08-23 17:30:56 -0400162 return
Alex Shatov541b5bc2017-09-13 16:22:17 -0400163
Alex Shatovf2d7bef2018-05-10 09:23:16 -0400164 DeployHandler._custom_kwargs = (CustomizerUser.get_customizer()
165 .get_deploy_handler_kwargs(audit))
166 if (not DeployHandler._custom_kwargs
167 or not isinstance(DeployHandler._custom_kwargs, dict)):
Alex Shatovb9b955c2018-03-08 13:12:23 -0500168 DeployHandler._custom_kwargs = {}
169
Alex Shatovf2d7bef2018-05-10 09:23:16 -0400170 if not DeployHandler._requests_session:
Alex Shatovd7f34d42018-08-07 12:11:35 -0400171 pool_size = Config.settings.get("pool_connections", 20)
Alex Shatovf2d7bef2018-05-10 09:23:16 -0400172 DeployHandler._requests_session = requests.Session()
173 DeployHandler._requests_session.mount(
174 'https://',
Alex Shatovd7f34d42018-08-07 12:11:35 -0400175 requests.adapters.HTTPAdapter(pool_connections=pool_size, pool_maxsize=pool_size)
Alex Shatovf2d7bef2018-05-10 09:23:16 -0400176 )
177 DeployHandler._requests_session.mount(
178 'http://',
Alex Shatovd7f34d42018-08-07 12:11:35 -0400179 requests.adapters.HTTPAdapter(pool_connections=pool_size, pool_maxsize=pool_size)
Alex Shatovf2d7bef2018-05-10 09:23:16 -0400180 )
Alex Shatov541b5bc2017-09-13 16:22:17 -0400181
Alex Shatovd444b322018-06-21 09:19:07 -0400182 config_dh = Config.settings.get("deploy_handler")
Alex Shatov14411ac2018-03-20 10:55:03 -0400183 if config_dh and isinstance(config_dh, dict):
184 # dns based routing to deployment-handler
185 # config for policy-handler >= 2.4.0
186 # "deploy_handler" : {
187 # "target_entity" : "deployment_handler",
Alex Shatovd7f34d42018-08-07 12:11:35 -0400188 # "url" : "http://deployment_handler:8188",
189 # "max_msg_length_mb" : 100
Alex Shatov14411ac2018-03-20 10:55:03 -0400190 # }
191 DeployHandler._target_entity = config_dh.get("target_entity", "deployment_handler")
192 DeployHandler._url = config_dh.get("url")
Alex Shatovd7f34d42018-08-07 12:11:35 -0400193 DeployHandler._max_msg_length_mb = config_dh.get("max_msg_length_mb",
194 DeployHandler._max_msg_length_mb)
Alex Shatov14411ac2018-03-20 10:55:03 -0400195 DeployHandler._logger.info("dns based routing to %s: url(%s)",
Alex Shatovf2d7bef2018-05-10 09:23:16 -0400196 DeployHandler._target_entity, DeployHandler._url)
Alex Shatov14411ac2018-03-20 10:55:03 -0400197
198 if not DeployHandler._url:
199 # discover routing to deployment-handler at consul-services
200 if not isinstance(config_dh, dict):
201 # config for policy-handler <= 2.3.1
202 # "deploy_handler" : "deployment_handler"
203 DeployHandler._target_entity = str(config_dh or "deployment_handler")
Alex Shatovf2d7bef2018-05-10 09:23:16 -0400204 DeployHandler._url = DiscoveryClient.get_service_url(audit,
205 DeployHandler._target_entity)
Alex Shatov14411ac2018-03-20 10:55:03 -0400206
207 DeployHandler._url_policy = str(DeployHandler._url or "") + '/policy'
208 DeployHandler._logger.info(
209 "got %s policy url(%s)", DeployHandler._target_entity, DeployHandler._url_policy)
alex_sh9d980ce2017-08-23 17:30:56 -0400210
Alex Shatovf2d7bef2018-05-10 09:23:16 -0400211 DeployHandler._lazy_inited = bool(DeployHandler._url)
212
213
alex_sh9d980ce2017-08-23 17:30:56 -0400214 @staticmethod
Alex Shatovd7f34d42018-08-07 12:11:35 -0400215 def policy_update(audit, policy_update_message, rediscover=False):
Alex Shatova2b26292018-03-13 17:50:51 -0400216 """
Alex Shatovd7f34d42018-08-07 12:11:35 -0400217 segments the big policy_update_message limited by size
218 and sequatially sends each segment as put to deployment-handler at /policy.
Alex Shatova2b26292018-03-13 17:50:51 -0400219
Alex Shatovd7f34d42018-08-07 12:11:35 -0400220 param policy_update_message is of PolicyUpdateMessage type
221 """
222 if not policy_update_message or policy_update_message.empty():
223 return
224
225 DeployHandler._lazy_init(audit, rediscover)
226
227 str_metrics = "policy_update {0}".format(str(policy_update_message))
228
229 metrics_total = Metrics(
230 aud_parent=audit,
231 targetEntity="{0} total policy_update".format(DeployHandler._target_entity),
232 targetServiceName=DeployHandler._url_policy)
233
234 metrics_total.metrics_start("started {}".format(str_metrics))
235 messages = policy_update_message.gen_segmented_messages(DeployHandler._max_msg_length_mb)
236 for message in messages:
237 DeployHandler._policy_update(audit, message)
238 if not audit.is_success():
239 break
240 metrics_total.metrics("done {}".format(str_metrics))
241
242 @staticmethod
243 def _policy_update(audit, message):
244 """
245 sends the put message to deployment-handler at /policy
246
247 detects whether server_instance_changed condition on deployment-handler
248 that is the cause to catch_up
Alex Shatova2b26292018-03-13 17:50:51 -0400249 """
Alex Shatovac779d32018-02-01 14:16:56 -0500250 if not message:
Alex Shatov1369bea2018-01-10 11:00:50 -0500251 return
252
Alex Shatovf2d7bef2018-05-10 09:23:16 -0400253 metrics = Metrics(aud_parent=audit, targetEntity=DeployHandler._target_entity,
254 targetServiceName=DeployHandler._url_policy)
255 headers = {REQUEST_X_ECOMP_REQUESTID : metrics.request_id}
alex_sh9d980ce2017-08-23 17:30:56 -0400256
Alex Shatovd7f34d42018-08-07 12:11:35 -0400257 log_action = "put to {0} at {1}".format(
Alex Shatov14411ac2018-03-20 10:55:03 -0400258 DeployHandler._target_entity, DeployHandler._url_policy)
Alex Shatovd7f34d42018-08-07 12:11:35 -0400259 log_data = " msg={0} headers={1}".format(json.dumps(message),
260 json.dumps(headers))
Alex Shatovb9b955c2018-03-08 13:12:23 -0500261 log_line = log_action + log_data
alex_sh9d980ce2017-08-23 17:30:56 -0400262 DeployHandler._logger.info(log_line)
Alex Shatovf2d7bef2018-05-10 09:23:16 -0400263 metrics.metrics_start(log_line)
Alex Shatov1369bea2018-01-10 11:00:50 -0500264
265 if not DeployHandler._url:
266 error_msg = "no url found to {0}".format(log_line)
267 DeployHandler._logger.error(error_msg)
Alex Shatovf2d7bef2018-05-10 09:23:16 -0400268 metrics.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value)
Alex Shatov1369bea2018-01-10 11:00:50 -0500269 audit.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value)
Alex Shatovf2d7bef2018-05-10 09:23:16 -0400270 metrics.metrics(error_msg)
Alex Shatov1369bea2018-01-10 11:00:50 -0500271 return
alex_sh9d980ce2017-08-23 17:30:56 -0400272
273 res = None
274 try:
Alex Shatovd7f34d42018-08-07 12:11:35 -0400275 res = DeployHandler._requests_session.put(
Alex Shatov14411ac2018-03-20 10:55:03 -0400276 DeployHandler._url_policy, json=message, headers=headers,
Alex Shatovb9b955c2018-03-08 13:12:23 -0500277 **DeployHandler._custom_kwargs
Alex Shatov541b5bc2017-09-13 16:22:17 -0400278 )
Alex Shatovf2d7bef2018-05-10 09:23:16 -0400279 except Exception as ex:
280 error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value
281 if isinstance(ex, requests.exceptions.RequestException)
282 else AuditHttpCode.SERVER_INTERNAL_ERROR.value)
283 error_msg = ("failed to {0} {1}: {2}{3}"
284 .format(log_action, type(ex).__name__, str(ex), log_data))
alex_sh9d980ce2017-08-23 17:30:56 -0400285 DeployHandler._logger.exception(error_msg)
Alex Shatovf2d7bef2018-05-10 09:23:16 -0400286 metrics.set_http_status_code(error_code)
287 audit.set_http_status_code(error_code)
288 metrics.metrics(error_msg)
alex_sh9d980ce2017-08-23 17:30:56 -0400289 return
290
Alex Shatovf2d7bef2018-05-10 09:23:16 -0400291 metrics.set_http_status_code(res.status_code)
alex_sh9d980ce2017-08-23 17:30:56 -0400292 audit.set_http_status_code(res.status_code)
293
Alex Shatovd7f34d42018-08-07 12:11:35 -0400294 log_line = "response {0} from {1}: text={2}{3}".format(res.status_code, log_action,
295 res.text, log_data)
Alex Shatovf2d7bef2018-05-10 09:23:16 -0400296 metrics.metrics(log_line)
alex_sh9d980ce2017-08-23 17:30:56 -0400297
Alex Shatova2b26292018-03-13 17:50:51 -0400298 if res.status_code != requests.codes.ok:
Alex Shatovf2d7bef2018-05-10 09:23:16 -0400299 DeployHandler._logger.error(log_line)
Alex Shatova2b26292018-03-13 17:50:51 -0400300 return
301
Alex Shatovf2d7bef2018-05-10 09:23:16 -0400302 DeployHandler._logger.info(log_line)
Alex Shatova2b26292018-03-13 17:50:51 -0400303 result = res.json() or {}
Alex Shatovd7f34d42018-08-07 12:11:35 -0400304 DeployHandler._server_instance_changed(result, metrics)
305
306
307 @staticmethod
308 def get_deployed_policies(audit, rediscover=False):
309 """
310 Retrieves policies and policy-filters from components
311 that were deployed by deployment-handler
312 """
313 DeployHandler._lazy_init(audit, rediscover)
314 metrics = Metrics(aud_parent=audit, targetEntity=DeployHandler._target_entity,
315 targetServiceName=DeployHandler._url_policy)
316 headers = {REQUEST_X_ECOMP_REQUESTID : metrics.request_id}
317
318 log_action = "get {0}: {1}".format(DeployHandler._target_entity, DeployHandler._url_policy)
319 log_data = " headers={}".format(json.dumps(headers))
320 log_line = log_action + log_data
321 DeployHandler._logger.info(log_line)
322 metrics.metrics_start(log_line)
323
324 if not DeployHandler._url:
325 error_msg = "no url found to {0}".format(log_line)
326 DeployHandler._logger.error(error_msg)
327 metrics.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value)
328 audit.set_http_status_code(AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value)
329 metrics.metrics(error_msg)
330 return None, None
331
332 res = None
333 try:
334 res = DeployHandler._requests_session.get(
335 DeployHandler._url_policy, headers=headers,
336 **DeployHandler._custom_kwargs
337 )
338 except Exception as ex:
339 error_code = (AuditHttpCode.SERVICE_UNAVAILABLE_ERROR.value
340 if isinstance(ex, requests.exceptions.RequestException)
341 else AuditHttpCode.SERVER_INTERNAL_ERROR.value)
342 error_msg = ("failed to {0} {1}: {2}{3}"
343 .format(log_action, type(ex).__name__, str(ex), log_data))
344 DeployHandler._logger.exception(error_msg)
345 metrics.set_http_status_code(error_code)
346 audit.set_http_status_code(error_code)
347 metrics.metrics(error_msg)
348 return None, None
349
350 metrics.set_http_status_code(res.status_code)
351 audit.set_http_status_code(res.status_code)
352
353 log_line = ("response {0} from {1}: text={2}{3}"
354 .format(res.status_code, log_action, res.text, log_data))
355 metrics.metrics(log_line)
356
357 if res.status_code != requests.codes.ok:
358 DeployHandler._logger.error(log_line)
359 return None, None
360
361 result = res.json() or {}
362 DeployHandler._server_instance_changed(result, metrics)
363
364 policies = result.get(POLICIES, {})
365 policy_filters = result.get(POLICY_FILTERS, {})
366 if not policies and not policy_filters:
367 audit.set_http_status_code(AuditHttpCode.DATA_NOT_FOUND_ERROR.value)
368 DeployHandler._logger.warning(audit.warn(
369 "found no deployed policies or policy-filters: {}".format(log_line),
370 error_code=AuditResponseCode.DATA_ERROR))
371 return policies, policy_filters
372
373 DeployHandler._logger.info(log_line)
374 return policies, policy_filters
375
376 @staticmethod
377 def _server_instance_changed(result, metrics):
378 """Checks whether the deployment-handler instance changed since last call."""
Alex Shatova2b26292018-03-13 17:50:51 -0400379 prev_server_instance_uuid = DeployHandler._server_instance_uuid
380 DeployHandler._server_instance_uuid = result.get("server_instance_uuid")
381
Alex Shatovd7f34d42018-08-07 12:11:35 -0400382 if (prev_server_instance_uuid
383 and prev_server_instance_uuid != DeployHandler._server_instance_uuid):
384 DeployHandler.server_instance_changed = True
385
Alex Shatovf2d7bef2018-05-10 09:23:16 -0400386 log_line = ("deployment_handler_changed: {1} != {0}"
387 .format(prev_server_instance_uuid, DeployHandler._server_instance_uuid))
388 metrics.info(log_line)
Alex Shatova2b26292018-03-13 17:50:51 -0400389 DeployHandler._logger.info(log_line)