blob: 685d7d8f7776abdce5a61080d2bd5a7a54c86529 [file] [log] [blame]
Alex Shatov1369bea2018-01-10 11:00:50 -05001# ================================================================================
Alex Shatov78ff88f2020-02-27 12:45:54 -05002# Copyright (c) 2018-2020 AT&T Intellectual Property. All rights reserved.
Alex Shatov1369bea2018-01-10 11:00:50 -05003# ================================================================================
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15# ============LICENSE_END=========================================================
16#
Alex Shatov1369bea2018-01-10 11:00:50 -050017
Alex Shatov9a4d3c52019-04-01 11:32:06 -040018"""utils and conversions"""
Alex Shatovf53e5e72018-01-11 11:15:56 -050019
Alex Shatov1369bea2018-01-10 11:00:50 -050020import json
Alex Shatova2b26292018-03-13 17:50:51 -040021import logging
Alex Shatov9a4d3c52019-04-01 11:32:06 -040022import os
Alex Shatovd7f34d42018-08-07 12:11:35 -040023from copy import deepcopy
24from typing import Pattern
Alex Shatov1369bea2018-01-10 11:00:50 -050025
Alex Shatova2b26292018-03-13 17:50:51 -040026class Utils(object):
27 """general purpose utils"""
28 _logger = logging.getLogger("policy_handler.utils")
29
30 @staticmethod
Alex Shatov9a4d3c52019-04-01 11:32:06 -040031 def get_logger(file_path):
32 """get the logger for the file_path == __file__"""
33 logger_path = []
34 file_path = os.path.realpath(file_path)
35 logger_path.append(os.path.basename(file_path)[:-3])
36 while file_path:
37 file_path = os.path.dirname(file_path)
38 folder_name = os.path.basename(file_path)
39 if folder_name == "policyhandler" or len(logger_path) > 5:
40 break
41 if folder_name == "tests":
42 logger_path.append("unit_test")
43 break
44 logger_path.append(folder_name)
45
46 logger_path.append("policy_handler")
47 return logging.getLogger(".".join(reversed(logger_path)))
48
49 @staticmethod
Alex Shatova2b26292018-03-13 17:50:51 -040050 def safe_json_parse(json_str):
51 """try parsing json without exception - returns the json_str back if fails"""
52 if not json_str:
53 return json_str
54 try:
55 return json.loads(json_str)
56 except (ValueError, TypeError) as err:
Alex Shatovc9ec2312018-06-14 12:06:42 -040057 Utils._logger.warning("unexpected json error(%s): len(%s) str[:100]: (%s)",
58 str(err), len(json_str), str(json_str)[:100])
Alex Shatova2b26292018-03-13 17:50:51 -040059 return json_str
60
61 @staticmethod
Alex Shatov1d693372018-08-24 13:15:04 -040062 def are_the_same(body_1, body_2, json_dumps=None):
Alex Shatova2b26292018-03-13 17:50:51 -040063 """check whether both objects are the same"""
Alex Shatov1d693372018-08-24 13:15:04 -040064 if not json_dumps:
65 json_dumps = json.dumps
Alex Shatova2b26292018-03-13 17:50:51 -040066 if (body_1 and not body_2) or (not body_1 and body_2):
67 Utils._logger.debug("only one is empty %s != %s", body_1, body_2)
68 return False
69
70 if body_1 is None and body_2 is None:
71 return True
72
73 if isinstance(body_1, list) and isinstance(body_2, list):
74 if len(body_1) != len(body_2):
Alex Shatov1d693372018-08-24 13:15:04 -040075 Utils._logger.debug("len %s != %s", json_dumps(body_1), json_dumps(body_2))
Alex Shatova2b26292018-03-13 17:50:51 -040076 return False
77
78 for val_1, val_2 in zip(body_1, body_2):
Alex Shatov1d693372018-08-24 13:15:04 -040079 if not Utils.are_the_same(val_1, val_2, json_dumps):
Alex Shatova2b26292018-03-13 17:50:51 -040080 return False
81 return True
82
83 if isinstance(body_1, dict) and isinstance(body_2, dict):
Alex Shatovc9ec2312018-06-14 12:06:42 -040084 if body_1.keys() ^ body_2.keys():
Alex Shatov1d693372018-08-24 13:15:04 -040085 Utils._logger.debug("keys %s != %s", json_dumps(body_1), json_dumps(body_2))
Alex Shatova2b26292018-03-13 17:50:51 -040086 return False
87
Alex Shatovc9ec2312018-06-14 12:06:42 -040088 for key, val_1 in body_1.items():
Alex Shatov78ff88f2020-02-27 12:45:54 -050089 val_2 = body_2[key]
90 if isinstance(val_1, str) or isinstance(val_2, str):
91 if val_1 != val_2:
92 Utils._logger.debug("key-values %s != %s",
93 json_dumps({key: val_1}), json_dumps({key: val_2}))
94 return False
95 continue
96
Alex Shatov1d693372018-08-24 13:15:04 -040097 if not Utils.are_the_same(val_1, body_2[key], json_dumps):
Alex Shatova2b26292018-03-13 17:50:51 -040098 return False
99 return True
100
101 # ... here when primitive values or mismatched types ...
102 the_same_values = (body_1 == body_2)
103 if not the_same_values:
104 Utils._logger.debug("values %s != %s", body_1, body_2)
105 return the_same_values
Alex Shatovd7f34d42018-08-07 12:11:35 -0400106
107class RegexCoarser(object):
108 """
109 utility to combine or coarse the collection of regex patterns
110 into a single regex that is at least not narrower (wider or the same)
111 than the collection regexes
112
113 inspired by https://github.com/spadgos/regex-combiner in js
114 """
115 ENDER = '***'
116 GROUPERS = {'{': '}', '[': ']', '(': ')'}
117 MODIFIERS = '*?+'
118 CHOICE_STARTER = '('
119 HIDDEN_CHOICE_STARTER = '(?:'
120 ANY_CHARS = '.*'
121 LINE_START = '^'
122
123 def __init__(self, regex_patterns=None):
124 """regex coarser"""
125 self.trie = {}
126 self.patterns = []
127 self.add_regex_patterns(regex_patterns)
128
129 def get_combined_regex_pattern(self):
130 """gets the pattern for the combined regex"""
131 trie = deepcopy(self.trie)
132 RegexCoarser._compress(trie)
133 return RegexCoarser._trie_to_pattern(trie)
134
135 def get_coarse_regex_patterns(self, max_length=100):
136 """gets the patterns for the coarse regex"""
137 trie = deepcopy(self.trie)
138 RegexCoarser._compress(trie)
139 patterns = RegexCoarser._trie_to_pattern(trie, True)
140
141 root_patterns = []
142 for pattern in patterns:
143 left, _, choice = pattern.partition(RegexCoarser.CHOICE_STARTER)
144 if choice and left and left.strip() != RegexCoarser.LINE_START and not left.isspace():
145 pattern = left + RegexCoarser.ANY_CHARS
146 root_patterns.append(pattern)
147 root_patterns = RegexCoarser._join_patterns(root_patterns, max_length)
148
149 if not root_patterns or root_patterns == ['']:
150 return []
151 return root_patterns
152
153
154 def add_regex_patterns(self, new_regex_patterns):
155 """adds the new_regex patterns to RegexPatternCoarser"""
156 if not new_regex_patterns or not isinstance(new_regex_patterns, list):
157 return
158 for new_regex_pattern in new_regex_patterns:
159 self.add_regex_pattern(new_regex_pattern)
160
161 def add_regex_pattern(self, new_regex_pattern):
162 """adds the new_regex to RegexPatternCoarser"""
163 new_regex_pattern = RegexCoarser._regex_pattern_to_string(new_regex_pattern)
164 if not new_regex_pattern:
165 return
166
167 self.patterns.append(new_regex_pattern)
168
169 tokens = RegexCoarser._tokenize(new_regex_pattern)
170 last_token_idx = len(tokens) - 1
171 trie_node = self.trie
172 for idx, token in enumerate(tokens):
173 if token not in trie_node:
174 trie_node[token] = {}
175 if idx == last_token_idx:
176 trie_node[token][RegexCoarser.ENDER] = {}
177 trie_node = trie_node[token]
178
179 @staticmethod
180 def _regex_pattern_to_string(regex_pattern):
181 """convert regex pattern to string"""
182 if not regex_pattern:
183 return ''
184
185 if isinstance(regex_pattern, str):
186 return regex_pattern
187
188 if isinstance(regex_pattern, Pattern):
189 return regex_pattern.pattern
190 return None
191
192 @staticmethod
193 def _tokenize(regex_pattern):
194 """tokenize the regex pattern for trie assignment"""
195 tokens = []
196 token = ''
197 group_ender = None
198 use_next = False
199
200 for char in regex_pattern:
201 if use_next:
202 use_next = False
203 token += char
204 char = None
205
206 if char == '\\':
207 use_next = True
208 token += char
209 continue
210
211 if not group_ender and char in RegexCoarser.GROUPERS:
212 group_ender = RegexCoarser.GROUPERS[char]
213 token = char
214 char = None
215
216 if char is None:
217 pass
218 elif char == group_ender:
219 token += char
220 group_ender = None
221 if char == '}': # this group is a modifier
222 tokens[len(tokens) - 1] += token
223 token = ''
224 continue
225 elif char in RegexCoarser.MODIFIERS:
226 if group_ender:
227 token += char
228 else:
229 tokens[len(tokens) - 1] += char
230 continue
231 else:
232 token += char
233
234 if not group_ender:
235 tokens.append(token)
236 token = ''
237
238 if token:
239 tokens.append(token)
240 return tokens
241
242 @staticmethod
243 def _compress(trie):
244 """compress trie into shortest leaves"""
245 for key, subtrie in trie.items():
246 RegexCoarser._compress(subtrie)
247 subkeys = list(subtrie.keys())
248 if len(subkeys) == 1:
249 trie[key + subkeys[0]] = subtrie[subkeys[0]]
250 del trie[key]
251
252 @staticmethod
253 def _trie_to_pattern(trie, top_keep=False):
254 """convert trie to the regex pattern"""
255 patterns = [
256 key.replace(RegexCoarser.ENDER, '') + RegexCoarser._trie_to_pattern(subtrie)
257 for key, subtrie in trie.items()
258 ]
259
260 if top_keep:
261 return patterns
262
263 return RegexCoarser._join_patterns(patterns)[0]
264
265 @staticmethod
266 def _join_patterns(patterns, max_length=0):
267 """convert list of patterns to the segmented list of dense regex patterns"""
268 if not patterns:
269 return ['']
270
271 if len(patterns) == 1:
272 return patterns
273
274 if not max_length:
275 return [RegexCoarser.HIDDEN_CHOICE_STARTER + '|'.join(patterns) + ')']
276
277 long_patterns = []
278 join_patterns = []
279 for pattern in patterns:
280 len_pattern = len(pattern)
281 if not len_pattern:
282 continue
283 if len_pattern >= max_length:
284 long_patterns.append(pattern)
285 continue
286
287 for idx, patterns_to_join in enumerate(join_patterns):
288 patterns_to_join, len_patterns_to_join = patterns_to_join
289 if len_pattern + len_patterns_to_join < max_length:
290 patterns_to_join.append(pattern)
291 len_patterns_to_join += len_pattern
292 join_patterns[idx] = (patterns_to_join, len_patterns_to_join)
293 len_pattern = 0
294 break
295 if len_pattern:
296 join_patterns.append(([pattern], len_pattern))
297 join_patterns.sort(key=lambda x: x[1])
298
299 if join_patterns:
300 # pattern, _, choice = pattern.endswith(RegexCoarser.ANY_CHARS)
301 join_patterns = [
302 RegexCoarser.HIDDEN_CHOICE_STARTER + '|'.join(patterns_to_join) + ')'
303 for patterns_to_join, _ in join_patterns
304 ]
305
306 return join_patterns + long_patterns