blob: 2d2f7ca257bfaff880ad8875a9876ea3e7d1af29 [file] [log] [blame]
Jakub Grajciar2f8cd912020-03-27 06:55:06 +01001from ipaddress import IPv4Network
2
3from vpp_object import VppObject
4from vpp_papi import VppEnum
5from vpp_ip import INVALID_INDEX
6from vpp_papi_provider import UnexpectedApiReturnValueError
7
8
9class VppAclPlugin(VppObject):
10
11 def __init__(self, test, enable_intf_counters=False):
12 self._test = test
13 self.enable_intf_counters = enable_intf_counters
14
15 @property
16 def enable_intf_counters(self):
17 return self._enable_intf_counters
18
19 @enable_intf_counters.setter
20 def enable_intf_counters(self, enable):
21 self.vapi.acl_stats_intf_counters_enable(enable=enable)
22
23 def add_vpp_config(self):
24 pass
25
26 def remove_vpp_config(self):
27 pass
28
29 def query_vpp_config(self):
30 pass
31
32 def object_id(self):
33 return ("acl-plugin-%d" % (self._sw_if_index))
34
35
36class AclRule():
37 """ ACL Rule """
38
39 # port ranges
40 PORTS_ALL = -1
41 PORTS_RANGE = 0
42 PORTS_RANGE_2 = 1
43 udp_sport_from = 10
44 udp_sport_to = udp_sport_from + 5
45 udp_dport_from = 20000
46 udp_dport_to = udp_dport_from + 5000
47 tcp_sport_from = 30
48 tcp_sport_to = tcp_sport_from + 5
49 tcp_dport_from = 40000
50 tcp_dport_to = tcp_dport_from + 5000
51
52 udp_sport_from_2 = 90
53 udp_sport_to_2 = udp_sport_from_2 + 5
54 udp_dport_from_2 = 30000
55 udp_dport_to_2 = udp_dport_from_2 + 5000
56 tcp_sport_from_2 = 130
57 tcp_sport_to_2 = tcp_sport_from_2 + 5
58 tcp_dport_from_2 = 20000
59 tcp_dport_to_2 = tcp_dport_from_2 + 5000
60
61 icmp4_type = 8 # echo request
62 icmp4_code = 3
63 icmp6_type = 128 # echo request
64 icmp6_code = 3
65
66 icmp4_type_2 = 8
67 icmp4_code_from_2 = 5
68 icmp4_code_to_2 = 20
69 icmp6_type_2 = 128
70 icmp6_code_from_2 = 8
71 icmp6_code_to_2 = 42
72
73 def __init__(self, is_permit, src_prefix=IPv4Network('0.0.0.0/0'),
74 dst_prefix=IPv4Network('0.0.0.0/0'),
75 proto=0, ports=PORTS_ALL, sport_from=None, sport_to=None,
76 dport_from=None, dport_to=None):
77 self.is_permit = is_permit
78 self.src_prefix = src_prefix
79 self.dst_prefix = dst_prefix
80 self._proto = proto
81 self._ports = ports
82 # assign ports by range
83 self.update_ports()
84 # assign specified ports
85 if sport_from:
86 self.sport_from = sport_from
87 if sport_to:
88 self.sport_to = sport_to
89 if dport_from:
90 self.dport_from = dport_from
91 if dport_to:
92 self.dport_to = dport_to
93
94 def __copy__(self):
95 new_rule = AclRule(self.is_permit, self.src_prefix, self.dst_prefix,
96 self._proto, self._ports, self.sport_from,
97 self.sport_to, self.dport_from, self.dport_to)
98 return new_rule
99
100 def update_ports(self):
101 if self._ports == self.PORTS_ALL:
102 self.sport_from = 0
103 self.dport_from = 0
104 self.sport_to = 65535
105 if self._proto == 1 or self._proto == 58:
106 self.sport_to = 255
107 self.dport_to = self.sport_to
108 elif self._ports == self.PORTS_RANGE:
109 if self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP:
110 self.sport_from = self.icmp4_type
111 self.sport_to = self.icmp4_type
112 self.dport_from = self.icmp4_code
113 self.dport_to = self.icmp4_code
114 elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP6:
115 self.sport_from = self.icmp6_type
116 self.sport_to = self.icmp6_type
117 self.dport_from = self.icmp6_code
118 self.dport_to = self.icmp6_code
119 elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_TCP:
120 self.sport_from = self.tcp_sport_from
121 self.sport_to = self.tcp_sport_to
122 self.dport_from = self.tcp_dport_from
123 self.dport_to = self.tcp_dport_to
124 elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP:
125 self.sport_from = self.udp_sport_from
126 self.sport_to = self.udp_sport_to
127 self.dport_from = self.udp_dport_from
128 self.dport_to = self.udp_dport_to
129 elif self._ports == self.PORTS_RANGE_2:
130 if self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP:
131 self.sport_from = self.icmp4_type_2
132 self.sport_to = self.icmp4_type_2
133 self.dport_from = self.icmp4_code_from_2
134 self.dport_to = self.icmp4_code_to_2
135 elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP6:
136 self.sport_from = self.icmp6_type_2
137 self.sport_to = self.icmp6_type_2
138 self.dport_from = self.icmp6_code_from_2
139 self.dport_to = self.icmp6_code_to_2
140 elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_TCP:
141 self.sport_from = self.tcp_sport_from_2
142 self.sport_to = self.tcp_sport_to_2
143 self.dport_from = self.tcp_dport_from_2
144 self.dport_to = self.tcp_dport_to_2
145 elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP:
146 self.sport_from = self.udp_sport_from_2
147 self.sport_to = self.udp_sport_to_2
148 self.dport_from = self.udp_dport_from_2
149 self.dport_to = self.udp_dport_to_2
150 else:
151 self.sport_from = self._ports
152 self.sport_to = self._ports
153 self.dport_from = self._ports
154 self.dport_to = self._ports
155
156 @property
157 def proto(self):
158 return self._proto
159
160 @proto.setter
161 def proto(self, proto):
162 self._proto = proto
163 self.update_ports()
164
165 @property
166 def ports(self):
167 return self._ports
168
169 @ports.setter
170 def ports(self, ports):
171 self._ports = ports
172 self.update_ports()
173
174 def encode(self):
175 return {'is_permit': self.is_permit, 'proto': self.proto,
176 'srcport_or_icmptype_first': self.sport_from,
177 'srcport_or_icmptype_last': self.sport_to,
178 'src_prefix': self.src_prefix,
179 'dstport_or_icmpcode_first': self.dport_from,
180 'dstport_or_icmpcode_last': self.dport_to,
181 'dst_prefix': self.dst_prefix}
182
183
184class VppAcl(VppObject):
185 """ VPP ACL """
186
187 def __init__(self, test, rules, acl_index=INVALID_INDEX, tag=None):
188 self._test = test
189 self._acl_index = acl_index
190 self.tag = tag
191 self._rules = rules
192
193 @property
194 def rules(self):
195 return self._rules
196
197 @property
198 def acl_index(self):
199 return self._acl_index
200
201 @property
202 def count(self):
203 return len(self._rules)
204
205 def encode_rules(self):
206 rules = []
207 for rule in self._rules:
208 rules.append(rule.encode())
209 return rules
210
211 def add_vpp_config(self, expect_error=False):
212 try:
213 reply = self._test.vapi.acl_add_replace(
214 acl_index=self._acl_index, tag=self.tag, count=self.count,
215 r=self.encode_rules())
216 self._acl_index = reply.acl_index
217 self._test.registry.register(self, self._test.logger)
218 if expect_error:
219 self._test.fail("Unexpected api reply")
220 return self
221 except UnexpectedApiReturnValueError:
222 if not expect_error:
223 self._test.fail("Unexpected api reply")
224 return None
225
226 def modify_vpp_config(self, rules):
227 self._rules = rules
228 self.add_vpp_config()
229
230 def remove_vpp_config(self, expect_error=False):
231 try:
232 self._test.vapi.acl_del(acl_index=self._acl_index)
233 if expect_error:
234 self._test.fail("Unexpected api reply")
235 except UnexpectedApiReturnValueError:
236 if not expect_error:
237 self._test.fail("Unexpected api reply")
238
239 def dump(self):
240 return self._test.vapi.acl_dump(acl_index=self._acl_index)
241
242 def query_vpp_config(self):
243 dump = self.dump()
244 for rule in dump:
245 if rule.acl_index == self._acl_index:
246 return True
247 return False
248
249 def object_id(self):
250 return ("acl-%s-%d" % (self.tag, self._acl_index))
251
252
253class VppEtypeWhitelist(VppObject):
254 """ VPP Etype Whitelist """
255
256 def __init__(self, test, sw_if_index, whitelist, n_input=0):
257 self._test = test
258 self.whitelist = whitelist
259 self.n_input = n_input
260 self._sw_if_index = sw_if_index
261
262 @property
263 def sw_if_index(self):
264 return self._sw_if_index
265
266 @property
267 def count(self):
268 return len(self.whitelist)
269
270 def add_vpp_config(self):
271 self._test.vapi.acl_interface_set_etype_whitelist(
272 sw_if_index=self._sw_if_index, count=self.count,
273 n_input=self.n_input, whitelist=self.whitelist)
274 self._test.registry.register(self, self._test.logger)
275 return self
276
277 def remove_vpp_config(self):
278 self._test.vapi.acl_interface_set_etype_whitelist(
279 sw_if_index=self._sw_if_index, count=0, n_input=0, whitelist=[])
280
281 def query_vpp_config(self):
282 self._test.vapi.acl_interface_etype_whitelist_dump(
283 sw_if_index=self._sw_if_index)
284 return False
285
286 def object_id(self):
287 return ("acl-etype_wl-%d" % (self._sw_if_index))
288
289
290class VppAclInterface(VppObject):
291 """ VPP ACL Interface """
292
293 def __init__(self, test, sw_if_index, acls, n_input=0):
294 self._test = test
295 self._sw_if_index = sw_if_index
296 self.n_input = n_input
297 self.acls = acls
298
299 @property
300 def sw_if_index(self):
301 return self._sw_if_index
302
303 @property
304 def count(self):
305 return len(self.acls)
306
307 def encode_acls(self):
308 acls = []
309 for acl in self.acls:
310 acls.append(acl.acl_index)
311 return acls
312
313 def add_vpp_config(self, expect_error=False):
314 try:
315 reply = self._test.vapi.acl_interface_set_acl_list(
316 sw_if_index=self._sw_if_index, n_input=self.n_input,
317 count=self.count, acls=self.encode_acls())
318 self._test.registry.register(self, self._test.logger)
319 if expect_error:
320 self._test.fail("Unexpected api reply")
321 return self
322 except UnexpectedApiReturnValueError:
323 if not expect_error:
324 self._test.fail("Unexpected api reply")
325 return None
326
327 def remove_vpp_config(self, expect_error=False):
328 try:
329 reply = self._test.vapi.acl_interface_set_acl_list(
330 sw_if_index=self._sw_if_index, n_input=0, count=0, acls=[])
331 if expect_error:
332 self._test.fail("Unexpected api reply")
333 except UnexpectedApiReturnValueError:
334 if not expect_error:
335 self._test.fail("Unexpected api reply")
336
337 def query_vpp_config(self):
338 dump = self._test.vapi.acl_interface_list_dump(
339 sw_if_index=self._sw_if_index)
340 for acl_list in dump:
341 if acl_list.count > 0:
342 return True
343 return False
344
345 def object_id(self):
346 return ("acl-if-list-%d" % (self._sw_if_index))
347
348
349class MacipRule():
350 """ Mac Ip rule """
351
352 def __init__(self, is_permit, src_mac=0, src_mac_mask=0,
353 src_prefix=IPv4Network('0.0.0.0/0')):
354 self.is_permit = is_permit
355 self.src_mac = src_mac
356 self.src_mac_mask = src_mac_mask
357 self.src_prefix = src_prefix
358
359 def encode(self):
360 return {'is_permit': self.is_permit, 'src_mac': self.src_mac,
361 'src_mac_mask': self.src_mac_mask,
362 'src_prefix': self.src_prefix}
363
364
365class VppMacipAcl(VppObject):
366 """ Vpp Mac Ip ACL """
367
368 def __init__(self, test, rules, acl_index=INVALID_INDEX, tag=None):
369 self._test = test
370 self._acl_index = acl_index
371 self.tag = tag
372 self._rules = rules
373
374 @property
375 def acl_index(self):
376 return self._acl_index
377
378 @property
379 def rules(self):
380 return self._rules
381
382 @property
383 def count(self):
384 return len(self._rules)
385
386 def encode_rules(self):
387 rules = []
388 for rule in self._rules:
389 rules.append(rule.encode())
390 return rules
391
392 def add_vpp_config(self, expect_error=False):
393 try:
394 reply = self._test.vapi.macip_acl_add_replace(
395 acl_index=self._acl_index, tag=self.tag, count=self.count,
396 r=self.encode_rules())
397 self._acl_index = reply.acl_index
398 self._test.registry.register(self, self._test.logger)
399 if expect_error:
400 self._test.fail("Unexpected api reply")
401 return self
402 except UnexpectedApiReturnValueError:
403 if not expect_error:
404 self._test.fail("Unexpected api reply")
405 return None
406
407 def modify_vpp_config(self, rules):
408 self._rules = rules
409 self.add_vpp_config()
410
411 def remove_vpp_config(self, expect_error=False):
412 try:
413 self._test.vapi.macip_acl_del(acl_index=self._acl_index)
414 if expect_error:
415 self._test.fail("Unexpected api reply")
416 except UnexpectedApiReturnValueError:
417 if not expect_error:
418 self._test.fail("Unexpected api reply")
419
420 def dump(self):
421 return self._test.vapi.macip_acl_dump(acl_index=self._acl_index)
422
423 def query_vpp_config(self):
424 dump = self.dump()
425 for rule in dump:
426 if rule.acl_index == self._acl_index:
427 return True
428 return False
429
430 def object_id(self):
431 return ("macip-acl-%s-%d" % (self.tag, self._acl_index))
432
433
434class VppMacipAclInterface(VppObject):
435 """ VPP Mac Ip ACL Interface """
436
437 def __init__(self, test, sw_if_index, acls):
438 self._test = test
439 self._sw_if_index = sw_if_index
440 self.acls = acls
441
442 @property
443 def sw_if_index(self):
444 return self._sw_if_index
445
446 @property
447 def count(self):
448 return len(self.acls)
449
450 def add_vpp_config(self):
451 for acl in self.acls:
452 self._test.vapi.macip_acl_interface_add_del(
453 is_add=True, sw_if_index=self._sw_if_index,
454 acl_index=acl.acl_index)
455 self._test.registry.register(self, self._test.logger)
456
457 def remove_vpp_config(self):
458 for acl in self.acls:
459 self._test.vapi.macip_acl_interface_add_del(
460 is_add=False, sw_if_index=self._sw_if_index,
461 acl_index=acl.acl_index)
462
463 def dump(self):
464 return self._test.vapi.macip_acl_interface_list_dump(
465 sw_if_index=self._sw_if_index)
466
467 def query_vpp_config(self):
468 dump = self.dump()
469 for acl_list in dump:
470 for acl_index in acl_list.acls:
471 if acl_index != INVALID_INDEX:
472 return True
473 return False
474
475 def object_id(self):
476 return ("macip-acl-if-list-%d" % (self._sw_if_index))