blob: eca02316bf64ba1d29ddf8235165ea609d12fc24 [file] [log] [blame]
Pavel Kotucek59dda062017-03-02 15:22:47 +01001#!/usr/bin/env python
2"""ACL plugin Test Case HLD:
3"""
4
5import unittest
6import random
7
8from scapy.packet import Raw
9from scapy.layers.l2 import Ether
10from scapy.layers.inet import IP, TCP, UDP, ICMP
11from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +000012from scapy.layers.inet6 import IPv6ExtHdrFragment
Pavel Kotucek59dda062017-03-02 15:22:47 +010013from framework import VppTestCase, VppTestRunner
14from util import Host, ppp
15
Andrew Yourtchenkode3682f2018-03-23 10:38:46 +010016from vpp_lo_interface import VppLoInterface
17
Pavel Kotucek59dda062017-03-02 15:22:47 +010018
19class TestACLplugin(VppTestCase):
20 """ ACL plugin Test Case """
21
22 # traffic types
23 IP = 0
24 ICMP = 1
25
26 # IP version
27 IPRANDOM = -1
28 IPV4 = 0
29 IPV6 = 1
30
31 # rule types
32 DENY = 0
33 PERMIT = 1
34
35 # supported protocols
36 proto = [[6, 17], [1, 58]]
37 proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
38 ICMPv4 = 0
39 ICMPv6 = 1
40 TCP = 0
41 UDP = 1
42 PROTO_ALL = 0
43
44 # port ranges
45 PORTS_ALL = -1
46 PORTS_RANGE = 0
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +020047 PORTS_RANGE_2 = 1
Pavel Kotucek59dda062017-03-02 15:22:47 +010048 udp_sport_from = 10
49 udp_sport_to = udp_sport_from + 5
50 udp_dport_from = 20000
51 udp_dport_to = udp_dport_from + 5000
52 tcp_sport_from = 30
53 tcp_sport_to = tcp_sport_from + 5
54 tcp_dport_from = 40000
55 tcp_dport_to = tcp_dport_from + 5000
56
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +020057 udp_sport_from_2 = 90
58 udp_sport_to_2 = udp_sport_from_2 + 5
59 udp_dport_from_2 = 30000
60 udp_dport_to_2 = udp_dport_from_2 + 5000
61 tcp_sport_from_2 = 130
62 tcp_sport_to_2 = tcp_sport_from_2 + 5
63 tcp_dport_from_2 = 20000
64 tcp_dport_to_2 = tcp_dport_from_2 + 5000
65
Pavel Kotucek59dda062017-03-02 15:22:47 +010066 icmp4_type = 8 # echo request
67 icmp4_code = 3
68 icmp6_type = 128 # echo request
69 icmp6_code = 3
70
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +020071 icmp4_type_2 = 8
72 icmp4_code_from_2 = 5
73 icmp4_code_to_2 = 20
74 icmp6_type_2 = 128
75 icmp6_code_from_2 = 8
76 icmp6_code_to_2 = 42
77
Pavel Kotucek59dda062017-03-02 15:22:47 +010078 # Test variables
79 bd_id = 1
80
81 @classmethod
82 def setUpClass(cls):
83 """
84 Perform standard class setup (defined by class method setUpClass in
85 class VppTestCase) before running the test case, set test case related
86 variables and configure VPP.
87 """
88 super(TestACLplugin, cls).setUpClass()
89
Pavel Kotucek59dda062017-03-02 15:22:47 +010090 try:
91 # Create 2 pg interfaces
92 cls.create_pg_interfaces(range(2))
93
94 # Packet flows mapping pg0 -> pg1, pg2 etc.
95 cls.flows = dict()
96 cls.flows[cls.pg0] = [cls.pg1]
97
98 # Packet sizes
99 cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
100
101 # Create BD with MAC learning and unknown unicast flooding disabled
102 # and put interfaces to this BD
103 cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
104 learn=1)
105 for pg_if in cls.pg_interfaces:
Ole Troana5b2eec2019-03-11 19:23:25 +0100106 cls.vapi.sw_interface_set_l2_bridge(
107 rx_sw_if_index=pg_if.sw_if_index, bd_id=cls.bd_id)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100108
109 # Set up all interfaces
110 for i in cls.pg_interfaces:
111 i.admin_up()
112
113 # Mapping between packet-generator index and lists of test hosts
114 cls.hosts_by_pg_idx = dict()
115 for pg_if in cls.pg_interfaces:
116 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
117
118 # Create list of deleted hosts
119 cls.deleted_hosts_by_pg_idx = dict()
120 for pg_if in cls.pg_interfaces:
121 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
122
123 # warm-up the mac address tables
124 # self.warmup_test()
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +0200125 count = 16
126 start = 0
127 n_int = len(cls.pg_interfaces)
128 macs_per_if = count / n_int
129 i = -1
130 for pg_if in cls.pg_interfaces:
131 i += 1
132 start_nr = macs_per_if * i + start
133 end_nr = count + start if i == (n_int - 1) \
134 else macs_per_if * (i + 1) + start
135 hosts = cls.hosts_by_pg_idx[pg_if.sw_if_index]
136 for j in range(start_nr, end_nr):
137 host = Host(
138 "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
139 "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
140 "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
141 hosts.append(host)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100142
143 except Exception:
144 super(TestACLplugin, cls).tearDownClass()
145 raise
146
Paul Vinciguerra7f9b7f92019-03-12 19:23:27 -0700147 @classmethod
148 def tearDownClass(cls):
149 super(TestACLplugin, cls).tearDownClass()
150
Pavel Kotucek59dda062017-03-02 15:22:47 +0100151 def setUp(self):
152 super(TestACLplugin, self).setUp()
153 self.reset_packet_infos()
154
155 def tearDown(self):
156 """
157 Show various debug prints after each test.
158 """
159 super(TestACLplugin, self).tearDown()
Paul Vinciguerra90cf21b2019-03-13 09:23:05 -0700160
161 def show_commands_at_teardown(self):
162 cli = "show vlib graph l2-input-feat-arc"
163 self.logger.info(self.vapi.ppcli(cli))
164 cli = "show vlib graph l2-input-feat-arc-end"
165 self.logger.info(self.vapi.ppcli(cli))
166 cli = "show vlib graph l2-output-feat-arc"
167 self.logger.info(self.vapi.ppcli(cli))
168 cli = "show vlib graph l2-output-feat-arc-end"
169 self.logger.info(self.vapi.ppcli(cli))
170 self.logger.info(self.vapi.ppcli("show l2fib verbose"))
171 self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
172 self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
173 self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
174 self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
175 % self.bd_id))
Pavel Kotucek59dda062017-03-02 15:22:47 +0100176
Pavel Kotucek59dda062017-03-02 15:22:47 +0100177 def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
178 s_prefix=0, s_ip='\x00\x00\x00\x00',
179 d_prefix=0, d_ip='\x00\x00\x00\x00'):
180 if proto == -1:
181 return
182 if ports == self.PORTS_ALL:
183 sport_from = 0
184 dport_from = 0
185 sport_to = 65535 if proto != 1 and proto != 58 else 255
186 dport_to = sport_to
187 elif ports == self.PORTS_RANGE:
188 if proto == 1:
189 sport_from = self.icmp4_type
190 sport_to = self.icmp4_type
191 dport_from = self.icmp4_code
192 dport_to = self.icmp4_code
193 elif proto == 58:
194 sport_from = self.icmp6_type
195 sport_to = self.icmp6_type
196 dport_from = self.icmp6_code
197 dport_to = self.icmp6_code
198 elif proto == self.proto[self.IP][self.TCP]:
199 sport_from = self.tcp_sport_from
200 sport_to = self.tcp_sport_to
201 dport_from = self.tcp_dport_from
202 dport_to = self.tcp_dport_to
203 elif proto == self.proto[self.IP][self.UDP]:
204 sport_from = self.udp_sport_from
205 sport_to = self.udp_sport_to
206 dport_from = self.udp_dport_from
207 dport_to = self.udp_dport_to
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +0200208 elif ports == self.PORTS_RANGE_2:
209 if proto == 1:
210 sport_from = self.icmp4_type_2
211 sport_to = self.icmp4_type_2
212 dport_from = self.icmp4_code_from_2
213 dport_to = self.icmp4_code_to_2
214 elif proto == 58:
215 sport_from = self.icmp6_type_2
216 sport_to = self.icmp6_type_2
217 dport_from = self.icmp6_code_from_2
218 dport_to = self.icmp6_code_to_2
219 elif proto == self.proto[self.IP][self.TCP]:
220 sport_from = self.tcp_sport_from_2
221 sport_to = self.tcp_sport_to_2
222 dport_from = self.tcp_dport_from_2
223 dport_to = self.tcp_dport_to_2
224 elif proto == self.proto[self.IP][self.UDP]:
225 sport_from = self.udp_sport_from_2
226 sport_to = self.udp_sport_to_2
227 dport_from = self.udp_dport_from_2
228 dport_to = self.udp_dport_to_2
Pavel Kotucek59dda062017-03-02 15:22:47 +0100229 else:
230 sport_from = ports
231 sport_to = ports
232 dport_from = ports
233 dport_to = ports
234
235 rule = ({'is_permit': permit_deny, 'is_ipv6': ip, 'proto': proto,
236 'srcport_or_icmptype_first': sport_from,
237 'srcport_or_icmptype_last': sport_to,
238 'src_ip_prefix_len': s_prefix,
239 'src_ip_addr': s_ip,
240 'dstport_or_icmpcode_first': dport_from,
241 'dstport_or_icmpcode_last': dport_to,
242 'dst_ip_prefix_len': d_prefix,
243 'dst_ip_addr': d_ip})
244 return rule
245
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800246 def apply_rules(self, rules, tag=b''):
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200247 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
248 tag=tag)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100249 self.logger.info("Dumped ACL: " + str(
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200250 self.vapi.acl_dump(reply.acl_index)))
Pavel Kotucek59dda062017-03-02 15:22:47 +0100251 # Apply a ACL on the interface as inbound
252 for i in self.pg_interfaces:
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200253 self.vapi.acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
254 n_input=1,
255 acls=[reply.acl_index])
Andrew Yourtchenkof995c712019-06-13 15:23:21 +0000256 return reply.acl_index
Pavel Kotucek59dda062017-03-02 15:22:47 +0100257
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800258 def apply_rules_to(self, rules, tag=b'', sw_if_index=0xFFFFFFFF):
Andrew Yourtchenkode3682f2018-03-23 10:38:46 +0100259 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
260 tag=tag)
261 self.logger.info("Dumped ACL: " + str(
262 self.vapi.acl_dump(reply.acl_index)))
263 # Apply a ACL on the interface as inbound
264 self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index,
265 n_input=1,
266 acls=[reply.acl_index])
Andrew Yourtchenkof995c712019-06-13 15:23:21 +0000267 return reply.acl_index
Andrew Yourtchenkode3682f2018-03-23 10:38:46 +0100268
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100269 def etype_whitelist(self, whitelist, n_input):
270 # Apply whitelists on all the interfaces
271 for i in self.pg_interfaces:
272 # checkstyle can't read long names. Help them.
273 fun = self.vapi.acl_interface_set_etype_whitelist
274 fun(sw_if_index=i.sw_if_index, n_input=n_input,
275 whitelist=whitelist)
276 return
277
Pavel Kotucek59dda062017-03-02 15:22:47 +0100278 def create_upper_layer(self, packet_index, proto, ports=0):
279 p = self.proto_map[proto]
280 if p == 'UDP':
281 if ports == 0:
282 return UDP(sport=random.randint(self.udp_sport_from,
283 self.udp_sport_to),
284 dport=random.randint(self.udp_dport_from,
285 self.udp_dport_to))
286 else:
287 return UDP(sport=ports, dport=ports)
288 elif p == 'TCP':
289 if ports == 0:
290 return TCP(sport=random.randint(self.tcp_sport_from,
291 self.tcp_sport_to),
292 dport=random.randint(self.tcp_dport_from,
293 self.tcp_dport_to))
294 else:
295 return TCP(sport=ports, dport=ports)
296 return ''
297
298 def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100299 proto=-1, ports=0, fragments=False,
300 pkt_raw=True, etype=-1):
Pavel Kotucek59dda062017-03-02 15:22:47 +0100301 """
302 Create input packet stream for defined interface using hosts or
303 deleted_hosts list.
304
305 :param object src_if: Interface to create packet stream for.
306 :param list packet_sizes: List of required packet sizes.
307 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
308 :return: Stream of packets.
309 """
310 pkts = []
311 if self.flows.__contains__(src_if):
312 src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
313 for dst_if in self.flows[src_if]:
314 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
315 n_int = len(dst_hosts) * len(src_hosts)
316 for i in range(0, n_int):
317 dst_host = dst_hosts[i / len(src_hosts)]
318 src_host = src_hosts[i % len(src_hosts)]
319 pkt_info = self.create_packet_info(src_if, dst_if)
320 if ipv6 == 1:
321 pkt_info.ip = 1
322 elif ipv6 == 0:
323 pkt_info.ip = 0
324 else:
325 pkt_info.ip = random.choice([0, 1])
326 if proto == -1:
327 pkt_info.proto = random.choice(self.proto[self.IP])
328 else:
329 pkt_info.proto = proto
330 payload = self.info_to_payload(pkt_info)
331 p = Ether(dst=dst_host.mac, src=src_host.mac)
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100332 if etype > 0:
333 p = Ether(dst=dst_host.mac,
334 src=src_host.mac,
335 type=etype)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100336 if pkt_info.ip:
337 p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000338 if fragments:
339 p /= IPv6ExtHdrFragment(offset=64, m=1)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100340 else:
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000341 if fragments:
342 p /= IP(src=src_host.ip4, dst=dst_host.ip4,
343 flags=1, frag=64)
344 else:
345 p /= IP(src=src_host.ip4, dst=dst_host.ip4)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100346 if traffic_type == self.ICMP:
347 if pkt_info.ip:
348 p /= ICMPv6EchoRequest(type=self.icmp6_type,
349 code=self.icmp6_code)
350 else:
351 p /= ICMP(type=self.icmp4_type,
352 code=self.icmp4_code)
353 else:
354 p /= self.create_upper_layer(i, pkt_info.proto, ports)
Pavel Kotuceke7b67342017-04-18 13:12:20 +0200355 if pkt_raw:
356 p /= Raw(payload)
357 pkt_info.data = p.copy()
358 if pkt_raw:
359 size = random.choice(packet_sizes)
360 self.extend_packet(p, size)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100361 pkts.append(p)
362 return pkts
363
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100364 def verify_capture(self, pg_if, capture,
365 traffic_type=0, ip_type=0, etype=-1):
Pavel Kotucek59dda062017-03-02 15:22:47 +0100366 """
367 Verify captured input packet stream for defined interface.
368
369 :param object pg_if: Interface to verify captured packet stream for.
370 :param list capture: Captured packet stream.
371 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
372 """
373 last_info = dict()
374 for i in self.pg_interfaces:
375 last_info[i.sw_if_index] = None
376 dst_sw_if_index = pg_if.sw_if_index
377 for packet in capture:
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100378 if etype > 0:
379 if packet[Ether].type != etype:
380 self.logger.error(ppp("Unexpected ethertype in packet:",
381 packet))
382 else:
383 continue
Pavel Kotucek59dda062017-03-02 15:22:47 +0100384 try:
385 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
386 if traffic_type == self.ICMP and ip_type == self.IPV6:
387 payload_info = self.payload_to_info(
Paul Vinciguerraeaea4212019-03-06 11:58:06 -0800388 packet[ICMPv6EchoRequest], 'data')
Pavel Kotucek59dda062017-03-02 15:22:47 +0100389 payload = packet[ICMPv6EchoRequest]
390 else:
Paul Vinciguerraeaea4212019-03-06 11:58:06 -0800391 payload_info = self.payload_to_info(packet[Raw])
Pavel Kotucek59dda062017-03-02 15:22:47 +0100392 payload = packet[self.proto_map[payload_info.proto]]
393 except:
394 self.logger.error(ppp("Unexpected or invalid packet "
395 "(outside network):", packet))
396 raise
397
398 if ip_type != 0:
399 self.assertEqual(payload_info.ip, ip_type)
400 if traffic_type == self.ICMP:
401 try:
402 if payload_info.ip == 0:
403 self.assertEqual(payload.type, self.icmp4_type)
404 self.assertEqual(payload.code, self.icmp4_code)
405 else:
406 self.assertEqual(payload.type, self.icmp6_type)
407 self.assertEqual(payload.code, self.icmp6_code)
408 except:
409 self.logger.error(ppp("Unexpected or invalid packet "
410 "(outside network):", packet))
411 raise
412 else:
413 try:
414 ip_version = IPv6 if payload_info.ip == 1 else IP
415
416 ip = packet[ip_version]
417 packet_index = payload_info.index
418
419 self.assertEqual(payload_info.dst, dst_sw_if_index)
420 self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
421 (pg_if.name, payload_info.src,
422 packet_index))
423 next_info = self.get_next_packet_info_for_interface2(
424 payload_info.src, dst_sw_if_index,
425 last_info[payload_info.src])
426 last_info[payload_info.src] = next_info
427 self.assertTrue(next_info is not None)
428 self.assertEqual(packet_index, next_info.index)
429 saved_packet = next_info.data
430 # Check standard fields
431 self.assertEqual(ip.src, saved_packet[ip_version].src)
432 self.assertEqual(ip.dst, saved_packet[ip_version].dst)
433 p = self.proto_map[payload_info.proto]
434 if p == 'TCP':
435 tcp = packet[TCP]
436 self.assertEqual(tcp.sport, saved_packet[
437 TCP].sport)
438 self.assertEqual(tcp.dport, saved_packet[
439 TCP].dport)
440 elif p == 'UDP':
441 udp = packet[UDP]
442 self.assertEqual(udp.sport, saved_packet[
443 UDP].sport)
444 self.assertEqual(udp.dport, saved_packet[
445 UDP].dport)
446 except:
447 self.logger.error(ppp("Unexpected or invalid packet:",
448 packet))
449 raise
450 for i in self.pg_interfaces:
451 remaining_packet = self.get_next_packet_info_for_interface2(
452 i, dst_sw_if_index, last_info[i.sw_if_index])
453 self.assertTrue(
454 remaining_packet is None,
455 "Port %u: Packet expected from source %u didn't arrive" %
456 (dst_sw_if_index, i.sw_if_index))
457
458 def run_traffic_no_check(self):
459 # Test
460 # Create incoming packet streams for packet-generator interfaces
461 for i in self.pg_interfaces:
462 if self.flows.__contains__(i):
463 pkts = self.create_stream(i, self.pg_if_packet_sizes)
464 if len(pkts) > 0:
465 i.add_stream(pkts)
466
467 # Enable packet capture and start packet sending
468 self.pg_enable_capture(self.pg_interfaces)
469 self.pg_start()
470
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000471 def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100472 frags=False, pkt_raw=True, etype=-1):
Pavel Kotucek59dda062017-03-02 15:22:47 +0100473 # Test
474 # Create incoming packet streams for packet-generator interfaces
475 pkts_cnt = 0
476 for i in self.pg_interfaces:
477 if self.flows.__contains__(i):
478 pkts = self.create_stream(i, self.pg_if_packet_sizes,
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000479 traffic_type, ip_type, proto, ports,
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100480 frags, pkt_raw, etype)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100481 if len(pkts) > 0:
482 i.add_stream(pkts)
483 pkts_cnt += len(pkts)
484
485 # Enable packet capture and start packet sendingself.IPV
486 self.pg_enable_capture(self.pg_interfaces)
487 self.pg_start()
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +0200488 self.logger.info("sent packets count: %d" % pkts_cnt)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100489
490 # Verify
491 # Verify outgoing packet streams per packet-generator interface
492 for src_if in self.pg_interfaces:
493 if self.flows.__contains__(src_if):
494 for dst_if in self.flows[src_if]:
495 capture = dst_if.get_capture(pkts_cnt)
496 self.logger.info("Verifying capture on interface %s" %
497 dst_if.name)
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100498 self.verify_capture(dst_if, capture,
499 traffic_type, ip_type, etype)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100500
501 def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100502 ports=0, frags=False, etype=-1):
Pavel Kotucek59dda062017-03-02 15:22:47 +0100503 # Test
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +0200504 pkts_cnt = 0
Pavel Kotucek59dda062017-03-02 15:22:47 +0100505 self.reset_packet_infos()
506 for i in self.pg_interfaces:
507 if self.flows.__contains__(i):
508 pkts = self.create_stream(i, self.pg_if_packet_sizes,
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000509 traffic_type, ip_type, proto, ports,
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100510 frags, True, etype)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100511 if len(pkts) > 0:
512 i.add_stream(pkts)
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +0200513 pkts_cnt += len(pkts)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100514
515 # Enable packet capture and start packet sending
516 self.pg_enable_capture(self.pg_interfaces)
517 self.pg_start()
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +0200518 self.logger.info("sent packets count: %d" % pkts_cnt)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100519
520 # Verify
521 # Verify outgoing packet streams per packet-generator interface
522 for src_if in self.pg_interfaces:
523 if self.flows.__contains__(src_if):
524 for dst_if in self.flows[src_if]:
525 self.logger.info("Verifying capture on interface %s" %
526 dst_if.name)
527 capture = dst_if.get_capture(0)
528 self.assertEqual(len(capture), 0)
529
Pavel Kotucek59dda062017-03-02 15:22:47 +0100530 def test_0000_warmup_test(self):
531 """ ACL plugin version check; learn MACs
532 """
Pavel Kotucek59dda062017-03-02 15:22:47 +0100533 reply = self.vapi.papi.acl_plugin_get_version()
534 self.assertEqual(reply.major, 1)
535 self.logger.info("Working with ACL plugin version: %d.%d" % (
536 reply.major, reply.minor))
537 # minor version changes are non breaking
538 # self.assertEqual(reply.minor, 0)
539
540 def test_0001_acl_create(self):
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200541 """ ACL create/delete test
Pavel Kotucek59dda062017-03-02 15:22:47 +0100542 """
543
544 self.logger.info("ACLP_TEST_START_0001")
545 # Add an ACL
546 r = [{'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
547 'srcport_or_icmptype_first': 1234,
548 'srcport_or_icmptype_last': 1235,
549 'src_ip_prefix_len': 0,
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800550 'src_ip_addr': b'\x00\x00\x00\x00',
Pavel Kotucek59dda062017-03-02 15:22:47 +0100551 'dstport_or_icmpcode_first': 1234,
552 'dstport_or_icmpcode_last': 1234,
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800553 'dst_ip_addr': b'\x00\x00\x00\x00',
Pavel Kotucek59dda062017-03-02 15:22:47 +0100554 'dst_ip_prefix_len': 0}]
555 # Test 1: add a new ACL
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200556 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r,
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800557 tag=b"permit 1234")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100558 self.assertEqual(reply.retval, 0)
559 # The very first ACL gets #0
560 self.assertEqual(reply.acl_index, 0)
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200561 first_acl = reply.acl_index
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200562 rr = self.vapi.acl_dump(reply.acl_index)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100563 self.logger.info("Dumped ACL: " + str(rr))
564 self.assertEqual(len(rr), 1)
565 # We should have the same number of ACL entries as we had asked
566 self.assertEqual(len(rr[0].r), len(r))
567 # The rules should be the same. But because the submitted and returned
568 # are different types, we need to iterate over rules and keys to get
569 # to basic values.
570 for i_rule in range(0, len(r) - 1):
571 for rule_key in r[i_rule]:
572 self.assertEqual(rr[0].r[i_rule][rule_key],
573 r[i_rule][rule_key])
574
575 # Add a deny-1234 ACL
Ole Troan895b6e82017-10-20 13:28:20 +0200576 r_deny = [{'is_permit': 0, 'is_ipv6': 0, 'proto': 17,
Pavel Kotucek59dda062017-03-02 15:22:47 +0100577 'srcport_or_icmptype_first': 1234,
578 'srcport_or_icmptype_last': 1235,
579 'src_ip_prefix_len': 0,
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800580 'src_ip_addr': b'\x00\x00\x00\x00',
Pavel Kotucek59dda062017-03-02 15:22:47 +0100581 'dstport_or_icmpcode_first': 1234,
582 'dstport_or_icmpcode_last': 1234,
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800583 'dst_ip_addr': b'\x00\x00\x00\x00',
Pavel Kotucek59dda062017-03-02 15:22:47 +0100584 'dst_ip_prefix_len': 0},
585 {'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
586 'srcport_or_icmptype_first': 0,
587 'srcport_or_icmptype_last': 0,
588 'src_ip_prefix_len': 0,
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800589 'src_ip_addr': b'\x00\x00\x00\x00',
Pavel Kotucek59dda062017-03-02 15:22:47 +0100590 'dstport_or_icmpcode_first': 0,
591 'dstport_or_icmpcode_last': 0,
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800592 'dst_ip_addr': b'\x00\x00\x00\x00',
Ole Troan895b6e82017-10-20 13:28:20 +0200593 'dst_ip_prefix_len': 0}]
Pavel Kotucek59dda062017-03-02 15:22:47 +0100594
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200595 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_deny,
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800596 tag=b"deny 1234;permit all")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100597 self.assertEqual(reply.retval, 0)
598 # The second ACL gets #1
599 self.assertEqual(reply.acl_index, 1)
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200600 second_acl = reply.acl_index
Pavel Kotucek59dda062017-03-02 15:22:47 +0100601
602 # Test 2: try to modify a nonexistent ACL
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200603 reply = self.vapi.acl_add_replace(acl_index=432, r=r,
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800604 tag=b"FFFF:FFFF", expected_retval=-6)
Jon Loeliger27cadd22017-11-10 13:15:12 -0600605 self.assertEqual(reply.retval, -6)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100606 # The ACL number should pass through
607 self.assertEqual(reply.acl_index, 432)
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200608 # apply an ACL on an interface inbound, try to delete ACL, must fail
609 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
610 n_input=1,
611 acls=[first_acl])
Jon Loeliger27cadd22017-11-10 13:15:12 -0600612 reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=-142)
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200613 # Unapply an ACL and then try to delete it - must be ok
614 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
615 n_input=0,
616 acls=[])
617 reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=0)
618
619 # apply an ACL on an interface outbound, try to delete ACL, must fail
620 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
621 n_input=0,
622 acls=[second_acl])
Jon Loeliger27cadd22017-11-10 13:15:12 -0600623 reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=-143)
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200624 # Unapply the ACL and then try to delete it - must be ok
625 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
626 n_input=0,
627 acls=[])
628 reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=0)
629
630 # try to apply a nonexistent ACL - must fail
631 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
632 n_input=1,
633 acls=[first_acl],
Jon Loeliger27cadd22017-11-10 13:15:12 -0600634 expected_retval=-6)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100635
636 self.logger.info("ACLP_TEST_FINISH_0001")
637
638 def test_0002_acl_permit_apply(self):
639 """ permit ACL apply test
640 """
641 self.logger.info("ACLP_TEST_START_0002")
642
643 rules = []
644 rules.append(self.create_rule(self.IPV4, self.PERMIT,
645 0, self.proto[self.IP][self.UDP]))
646 rules.append(self.create_rule(self.IPV4, self.PERMIT,
647 0, self.proto[self.IP][self.TCP]))
648
649 # Apply rules
Andrew Yourtchenkof995c712019-06-13 15:23:21 +0000650 acl_idx = self.apply_rules(rules, b"permit per-flow")
651
652 # enable counters
653 reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=1)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100654
655 # Traffic should still pass
656 self.run_verify_test(self.IP, self.IPV4, -1)
Andrew Yourtchenkof995c712019-06-13 15:23:21 +0000657
658 matches = self.statistics.get_counter('/acl/%d/matches' % acl_idx)
659 self.logger.info("stat segment counters: %s" % repr(matches))
660 cli = "show acl-plugin acl"
661 self.logger.info(self.vapi.ppcli(cli))
662 cli = "show acl-plugin tables"
663 self.logger.info(self.vapi.ppcli(cli))
664
665 total_hits = matches[0][0]['packets'] + matches[0][1]['packets']
666 self.assertEqual(total_hits, 64)
667
668 # disable counters
669 reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=0)
670
Pavel Kotucek59dda062017-03-02 15:22:47 +0100671 self.logger.info("ACLP_TEST_FINISH_0002")
672
673 def test_0003_acl_deny_apply(self):
674 """ deny ACL apply test
675 """
676 self.logger.info("ACLP_TEST_START_0003")
677 # Add a deny-flows ACL
678 rules = []
679 rules.append(self.create_rule(self.IPV4, self.DENY,
680 self.PORTS_ALL, self.proto[self.IP][self.UDP]))
681 # Permit ip any any in the end
682 rules.append(self.create_rule(self.IPV4, self.PERMIT,
683 self.PORTS_ALL, 0))
684
685 # Apply rules
Andrew Yourtchenkof995c712019-06-13 15:23:21 +0000686 acl_idx = self.apply_rules(rules, b"deny per-flow;permit all")
687
688 # enable counters
689 reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=1)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100690
691 # Traffic should not pass
692 self.run_verify_negat_test(self.IP, self.IPV4,
693 self.proto[self.IP][self.UDP])
Andrew Yourtchenkof995c712019-06-13 15:23:21 +0000694
695 matches = self.statistics.get_counter('/acl/%d/matches' % acl_idx)
696 self.logger.info("stat segment counters: %s" % repr(matches))
697 cli = "show acl-plugin acl"
698 self.logger.info(self.vapi.ppcli(cli))
699 cli = "show acl-plugin tables"
700 self.logger.info(self.vapi.ppcli(cli))
701 self.assertEqual(matches[0][0]['packets'], 64)
702 # disable counters
703 reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=0)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100704 self.logger.info("ACLP_TEST_FINISH_0003")
Andrew Yourtchenkof995c712019-06-13 15:23:21 +0000705 # self.assertEqual(, 0)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100706
707 def test_0004_vpp624_permit_icmpv4(self):
708 """ VPP_624 permit ICMPv4
709 """
710 self.logger.info("ACLP_TEST_START_0004")
711
712 # Add an ACL
713 rules = []
714 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
715 self.proto[self.ICMP][self.ICMPv4]))
716 # deny ip any any in the end
717 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
718
719 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800720 self.apply_rules(rules, b"permit icmpv4")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100721
722 # Traffic should still pass
723 self.run_verify_test(self.ICMP, self.IPV4,
724 self.proto[self.ICMP][self.ICMPv4])
725
726 self.logger.info("ACLP_TEST_FINISH_0004")
727
728 def test_0005_vpp624_permit_icmpv6(self):
729 """ VPP_624 permit ICMPv6
730 """
731 self.logger.info("ACLP_TEST_START_0005")
732
733 # Add an ACL
734 rules = []
735 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
736 self.proto[self.ICMP][self.ICMPv6]))
737 # deny ip any any in the end
738 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
739
740 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800741 self.apply_rules(rules, b"permit icmpv6")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100742
743 # Traffic should still pass
744 self.run_verify_test(self.ICMP, self.IPV6,
745 self.proto[self.ICMP][self.ICMPv6])
746
747 self.logger.info("ACLP_TEST_FINISH_0005")
748
749 def test_0006_vpp624_deny_icmpv4(self):
750 """ VPP_624 deny ICMPv4
751 """
752 self.logger.info("ACLP_TEST_START_0006")
753 # Add an ACL
754 rules = []
755 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
756 self.proto[self.ICMP][self.ICMPv4]))
757 # permit ip any any in the end
758 rules.append(self.create_rule(self.IPV4, self.PERMIT,
759 self.PORTS_ALL, 0))
760
761 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800762 self.apply_rules(rules, b"deny icmpv4")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100763
764 # Traffic should not pass
765 self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
766
767 self.logger.info("ACLP_TEST_FINISH_0006")
768
769 def test_0007_vpp624_deny_icmpv6(self):
770 """ VPP_624 deny ICMPv6
771 """
772 self.logger.info("ACLP_TEST_START_0007")
773 # Add an ACL
774 rules = []
775 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
776 self.proto[self.ICMP][self.ICMPv6]))
777 # deny ip any any in the end
778 rules.append(self.create_rule(self.IPV6, self.PERMIT,
779 self.PORTS_ALL, 0))
780
781 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800782 self.apply_rules(rules, b"deny icmpv6")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100783
784 # Traffic should not pass
785 self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
786
787 self.logger.info("ACLP_TEST_FINISH_0007")
788
789 def test_0008_tcp_permit_v4(self):
790 """ permit TCPv4
791 """
792 self.logger.info("ACLP_TEST_START_0008")
793
794 # Add an ACL
795 rules = []
796 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
797 self.proto[self.IP][self.TCP]))
798 # deny ip any any in the end
799 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
800
801 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800802 self.apply_rules(rules, b"permit ipv4 tcp")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100803
804 # Traffic should still pass
805 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
806
807 self.logger.info("ACLP_TEST_FINISH_0008")
808
809 def test_0009_tcp_permit_v6(self):
810 """ permit TCPv6
811 """
812 self.logger.info("ACLP_TEST_START_0009")
813
814 # Add an ACL
815 rules = []
816 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
817 self.proto[self.IP][self.TCP]))
818 # deny ip any any in the end
819 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
820
821 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800822 self.apply_rules(rules, b"permit ip6 tcp")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100823
824 # Traffic should still pass
825 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
826
827 self.logger.info("ACLP_TEST_FINISH_0008")
828
829 def test_0010_udp_permit_v4(self):
830 """ permit UDPv4
831 """
832 self.logger.info("ACLP_TEST_START_0010")
833
834 # Add an ACL
835 rules = []
836 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
837 self.proto[self.IP][self.UDP]))
838 # deny ip any any in the end
839 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
840
841 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800842 self.apply_rules(rules, b"permit ipv udp")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100843
844 # Traffic should still pass
845 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
846
847 self.logger.info("ACLP_TEST_FINISH_0010")
848
849 def test_0011_udp_permit_v6(self):
850 """ permit UDPv6
851 """
852 self.logger.info("ACLP_TEST_START_0011")
853
854 # Add an ACL
855 rules = []
856 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
857 self.proto[self.IP][self.UDP]))
858 # deny ip any any in the end
859 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
860
861 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800862 self.apply_rules(rules, b"permit ip6 udp")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100863
864 # Traffic should still pass
865 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
866
867 self.logger.info("ACLP_TEST_FINISH_0011")
868
869 def test_0012_tcp_deny(self):
870 """ deny TCPv4/v6
871 """
872 self.logger.info("ACLP_TEST_START_0012")
873
874 # Add an ACL
875 rules = []
876 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
877 self.proto[self.IP][self.TCP]))
878 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
879 self.proto[self.IP][self.TCP]))
880 # permit ip any any in the end
881 rules.append(self.create_rule(self.IPV4, self.PERMIT,
882 self.PORTS_ALL, 0))
883 rules.append(self.create_rule(self.IPV6, self.PERMIT,
884 self.PORTS_ALL, 0))
885
886 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800887 self.apply_rules(rules, b"deny ip4/ip6 tcp")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100888
889 # Traffic should not pass
890 self.run_verify_negat_test(self.IP, self.IPRANDOM,
891 self.proto[self.IP][self.TCP])
892
893 self.logger.info("ACLP_TEST_FINISH_0012")
894
895 def test_0013_udp_deny(self):
896 """ deny UDPv4/v6
897 """
898 self.logger.info("ACLP_TEST_START_0013")
899
900 # Add an ACL
901 rules = []
902 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
903 self.proto[self.IP][self.UDP]))
904 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
905 self.proto[self.IP][self.UDP]))
906 # permit ip any any in the end
907 rules.append(self.create_rule(self.IPV4, self.PERMIT,
908 self.PORTS_ALL, 0))
909 rules.append(self.create_rule(self.IPV6, self.PERMIT,
910 self.PORTS_ALL, 0))
911
912 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800913 self.apply_rules(rules, b"deny ip4/ip6 udp")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100914
915 # Traffic should not pass
916 self.run_verify_negat_test(self.IP, self.IPRANDOM,
917 self.proto[self.IP][self.UDP])
918
919 self.logger.info("ACLP_TEST_FINISH_0013")
920
921 def test_0014_acl_dump(self):
922 """ verify add/dump acls
923 """
924 self.logger.info("ACLP_TEST_START_0014")
925
926 r = [[self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
927 [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
928 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
929 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
930 [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
931 [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
932 [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
933 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
934 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
935 [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
936 [self.IPV4, self.DENY, self.PORTS_ALL, 0],
937 [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
938 [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
939 [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
940 [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
941 [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
942 [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
943 [self.IPV6, self.DENY, self.PORTS_ALL, 0]
944 ]
945
946 # Add and verify new ACLs
947 rules = []
948 for i in range(len(r)):
949 rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
950
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200951 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules)
952 result = self.vapi.acl_dump(reply.acl_index)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100953
954 i = 0
955 for drules in result:
956 for dr in drules.r:
957 self.assertEqual(dr.is_ipv6, r[i][0])
958 self.assertEqual(dr.is_permit, r[i][1])
959 self.assertEqual(dr.proto, r[i][3])
960
961 if r[i][2] > 0:
962 self.assertEqual(dr.srcport_or_icmptype_first, r[i][2])
963 else:
964 if r[i][2] < 0:
965 self.assertEqual(dr.srcport_or_icmptype_first, 0)
966 self.assertEqual(dr.srcport_or_icmptype_last, 65535)
967 else:
968 if dr.proto == self.proto[self.IP][self.TCP]:
969 self.assertGreater(dr.srcport_or_icmptype_first,
970 self.tcp_sport_from-1)
971 self.assertLess(dr.srcport_or_icmptype_first,
972 self.tcp_sport_to+1)
973 self.assertGreater(dr.dstport_or_icmpcode_last,
974 self.tcp_dport_from-1)
975 self.assertLess(dr.dstport_or_icmpcode_last,
976 self.tcp_dport_to+1)
977 elif dr.proto == self.proto[self.IP][self.UDP]:
978 self.assertGreater(dr.srcport_or_icmptype_first,
979 self.udp_sport_from-1)
980 self.assertLess(dr.srcport_or_icmptype_first,
981 self.udp_sport_to+1)
982 self.assertGreater(dr.dstport_or_icmpcode_last,
983 self.udp_dport_from-1)
984 self.assertLess(dr.dstport_or_icmpcode_last,
985 self.udp_dport_to+1)
986 i += 1
987
988 self.logger.info("ACLP_TEST_FINISH_0014")
989
990 def test_0015_tcp_permit_port_v4(self):
991 """ permit single TCPv4
992 """
993 self.logger.info("ACLP_TEST_START_0015")
994
995 port = random.randint(0, 65535)
996 # Add an ACL
997 rules = []
998 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
999 self.proto[self.IP][self.TCP]))
1000 # deny ip any any in the end
1001 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1002
1003 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -08001004 self.apply_rules(rules, b"permit ip4 tcp %d" % port)
Pavel Kotucek59dda062017-03-02 15:22:47 +01001005
1006 # Traffic should still pass
1007 self.run_verify_test(self.IP, self.IPV4,
1008 self.proto[self.IP][self.TCP], port)
1009
1010 self.logger.info("ACLP_TEST_FINISH_0015")
1011
1012 def test_0016_udp_permit_port_v4(self):
1013 """ permit single UDPv4
1014 """
1015 self.logger.info("ACLP_TEST_START_0016")
1016
1017 port = random.randint(0, 65535)
1018 # Add an ACL
1019 rules = []
1020 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
1021 self.proto[self.IP][self.UDP]))
1022 # deny ip any any in the end
1023 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1024
1025 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -08001026 self.apply_rules(rules, b"permit ip4 tcp %d" % port)
Pavel Kotucek59dda062017-03-02 15:22:47 +01001027
1028 # Traffic should still pass
1029 self.run_verify_test(self.IP, self.IPV4,
1030 self.proto[self.IP][self.UDP], port)
1031
1032 self.logger.info("ACLP_TEST_FINISH_0016")
1033
1034 def test_0017_tcp_permit_port_v6(self):
1035 """ permit single TCPv6
1036 """
1037 self.logger.info("ACLP_TEST_START_0017")
1038
1039 port = random.randint(0, 65535)
1040 # Add an ACL
1041 rules = []
1042 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1043 self.proto[self.IP][self.TCP]))
1044 # deny ip any any in the end
1045 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1046
1047 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -08001048 self.apply_rules(rules, b"permit ip4 tcp %d" % port)
Pavel Kotucek59dda062017-03-02 15:22:47 +01001049
1050 # Traffic should still pass
1051 self.run_verify_test(self.IP, self.IPV6,
1052 self.proto[self.IP][self.TCP], port)
1053
1054 self.logger.info("ACLP_TEST_FINISH_0017")
1055
1056 def test_0018_udp_permit_port_v6(self):
1057 """ permit single UPPv6
1058 """
1059 self.logger.info("ACLP_TEST_START_0018")
1060
1061 port = random.randint(0, 65535)
1062 # Add an ACL
1063 rules = []
1064 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1065 self.proto[self.IP][self.UDP]))
1066 # deny ip any any in the end
1067 rules.append(self.create_rule(self.IPV6, self.DENY,
1068 self.PORTS_ALL, 0))
1069
1070 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -08001071 self.apply_rules(rules, b"permit ip4 tcp %d" % port)
Pavel Kotucek59dda062017-03-02 15:22:47 +01001072
1073 # Traffic should still pass
1074 self.run_verify_test(self.IP, self.IPV6,
1075 self.proto[self.IP][self.UDP], port)
1076
1077 self.logger.info("ACLP_TEST_FINISH_0018")
1078
1079 def test_0019_udp_deny_port(self):
1080 """ deny single TCPv4/v6
1081 """
1082 self.logger.info("ACLP_TEST_START_0019")
1083
1084 port = random.randint(0, 65535)
1085 # Add an ACL
1086 rules = []
1087 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1088 self.proto[self.IP][self.TCP]))
1089 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1090 self.proto[self.IP][self.TCP]))
1091 # Permit ip any any in the end
1092 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1093 self.PORTS_ALL, 0))
1094 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1095 self.PORTS_ALL, 0))
1096
1097 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -08001098 self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port)
Pavel Kotucek59dda062017-03-02 15:22:47 +01001099
1100 # Traffic should not pass
1101 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1102 self.proto[self.IP][self.TCP], port)
1103
1104 self.logger.info("ACLP_TEST_FINISH_0019")
1105
1106 def test_0020_udp_deny_port(self):
1107 """ deny single UDPv4/v6
1108 """
1109 self.logger.info("ACLP_TEST_START_0020")
1110
1111 port = random.randint(0, 65535)
1112 # Add an ACL
1113 rules = []
1114 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1115 self.proto[self.IP][self.UDP]))
1116 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1117 self.proto[self.IP][self.UDP]))
1118 # Permit ip any any in the end
1119 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1120 self.PORTS_ALL, 0))
1121 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1122 self.PORTS_ALL, 0))
1123
1124 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -08001125 self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port)
Pavel Kotucek59dda062017-03-02 15:22:47 +01001126
1127 # Traffic should not pass
1128 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1129 self.proto[self.IP][self.UDP], port)
1130
1131 self.logger.info("ACLP_TEST_FINISH_0020")
1132
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +00001133 def test_0021_udp_deny_port_verify_fragment_deny(self):
Chris Luked0421942018-04-10 15:19:54 -04001134 """ deny single UDPv4/v6, permit ip any, verify non-initial fragment
1135 blocked
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +00001136 """
1137 self.logger.info("ACLP_TEST_START_0021")
1138
1139 port = random.randint(0, 65535)
1140 # Add an ACL
1141 rules = []
1142 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1143 self.proto[self.IP][self.UDP]))
1144 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1145 self.proto[self.IP][self.UDP]))
1146 # deny ip any any in the end
1147 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1148 self.PORTS_ALL, 0))
1149 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1150 self.PORTS_ALL, 0))
1151
1152 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -08001153 self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port)
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +00001154
1155 # Traffic should not pass
1156 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1157 self.proto[self.IP][self.UDP], port, True)
1158
1159 self.logger.info("ACLP_TEST_FINISH_0021")
1160
Pavel Kotuceke7b67342017-04-18 13:12:20 +02001161 def test_0022_zero_length_udp_ipv4(self):
1162 """ VPP-687 zero length udp ipv4 packet"""
1163 self.logger.info("ACLP_TEST_START_0022")
1164
1165 port = random.randint(0, 65535)
1166 # Add an ACL
1167 rules = []
1168 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
1169 self.proto[self.IP][self.UDP]))
1170 # deny ip any any in the end
1171 rules.append(
1172 self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1173
1174 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -08001175 self.apply_rules(rules, b"permit empty udp ip4 %d" % port)
Pavel Kotuceke7b67342017-04-18 13:12:20 +02001176
1177 # Traffic should still pass
1178 # Create incoming packet streams for packet-generator interfaces
1179 pkts_cnt = 0
1180 pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1181 self.IP, self.IPV4,
1182 self.proto[self.IP][self.UDP], port,
1183 False, False)
1184 if len(pkts) > 0:
1185 self.pg0.add_stream(pkts)
1186 pkts_cnt += len(pkts)
1187
1188 # Enable packet capture and start packet sendingself.IPV
1189 self.pg_enable_capture(self.pg_interfaces)
1190 self.pg_start()
1191
1192 self.pg1.get_capture(pkts_cnt)
1193
1194 self.logger.info("ACLP_TEST_FINISH_0022")
1195
1196 def test_0023_zero_length_udp_ipv6(self):
1197 """ VPP-687 zero length udp ipv6 packet"""
1198 self.logger.info("ACLP_TEST_START_0023")
1199
1200 port = random.randint(0, 65535)
1201 # Add an ACL
1202 rules = []
1203 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1204 self.proto[self.IP][self.UDP]))
1205 # deny ip any any in the end
1206 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1207
1208 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -08001209 self.apply_rules(rules, b"permit empty udp ip6 %d" % port)
Pavel Kotuceke7b67342017-04-18 13:12:20 +02001210
1211 # Traffic should still pass
1212 # Create incoming packet streams for packet-generator interfaces
1213 pkts_cnt = 0
1214 pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1215 self.IP, self.IPV6,
1216 self.proto[self.IP][self.UDP], port,
1217 False, False)
1218 if len(pkts) > 0:
1219 self.pg0.add_stream(pkts)
1220 pkts_cnt += len(pkts)
1221
1222 # Enable packet capture and start packet sendingself.IPV
1223 self.pg_enable_capture(self.pg_interfaces)
1224 self.pg_start()
1225
1226 # Verify outgoing packet streams per packet-generator interface
1227 self.pg1.get_capture(pkts_cnt)
1228
1229 self.logger.info("ACLP_TEST_FINISH_0023")
1230
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +02001231 def test_0108_tcp_permit_v4(self):
1232 """ permit TCPv4 + non-match range
1233 """
1234 self.logger.info("ACLP_TEST_START_0108")
1235
1236 # Add an ACL
1237 rules = []
1238 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1239 self.proto[self.IP][self.TCP]))
1240 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1241 self.proto[self.IP][self.TCP]))
1242 # deny ip any any in the end
1243 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1244
1245 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -08001246 self.apply_rules(rules, b"permit ipv4 tcp")
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +02001247
1248 # Traffic should still pass
1249 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1250
1251 self.logger.info("ACLP_TEST_FINISH_0108")
1252
1253 def test_0109_tcp_permit_v6(self):
1254 """ permit TCPv6 + non-match range
1255 """
1256 self.logger.info("ACLP_TEST_START_0109")
1257
1258 # Add an ACL
1259 rules = []
1260 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1261 self.proto[self.IP][self.TCP]))
1262 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1263 self.proto[self.IP][self.TCP]))
1264 # deny ip any any in the end
1265 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1266
1267 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -08001268 self.apply_rules(rules, b"permit ip6 tcp")
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +02001269
1270 # Traffic should still pass
1271 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
1272
1273 self.logger.info("ACLP_TEST_FINISH_0109")
1274
1275 def test_0110_udp_permit_v4(self):
1276 """ permit UDPv4 + non-match range
1277 """
1278 self.logger.info("ACLP_TEST_START_0110")
1279
1280 # Add an ACL
1281 rules = []
1282 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1283 self.proto[self.IP][self.UDP]))
1284 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1285 self.proto[self.IP][self.UDP]))
1286 # deny ip any any in the end
1287 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1288
1289 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -08001290 self.apply_rules(rules, b"permit ipv4 udp")
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +02001291
1292 # Traffic should still pass
1293 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
1294
1295 self.logger.info("ACLP_TEST_FINISH_0110")
1296
1297 def test_0111_udp_permit_v6(self):
1298 """ permit UDPv6 + non-match range
1299 """
1300 self.logger.info("ACLP_TEST_START_0111")
1301
1302 # Add an ACL
1303 rules = []
1304 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1305 self.proto[self.IP][self.UDP]))
1306 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1307 self.proto[self.IP][self.UDP]))
1308 # deny ip any any in the end
1309 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1310
1311 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -08001312 self.apply_rules(rules, b"permit ip6 udp")
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +02001313
1314 # Traffic should still pass
1315 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
1316
1317 self.logger.info("ACLP_TEST_FINISH_0111")
1318
1319 def test_0112_tcp_deny(self):
1320 """ deny TCPv4/v6 + non-match range
1321 """
1322 self.logger.info("ACLP_TEST_START_0112")
1323
1324 # Add an ACL
1325 rules = []
1326 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1327 self.PORTS_RANGE_2,
1328 self.proto[self.IP][self.TCP]))
1329 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1330 self.PORTS_RANGE_2,
1331 self.proto[self.IP][self.TCP]))
1332 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1333 self.proto[self.IP][self.TCP]))
1334 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1335 self.proto[self.IP][self.TCP]))
1336 # permit ip any any in the end
1337 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1338 self.PORTS_ALL, 0))
1339 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1340 self.PORTS_ALL, 0))
1341
1342 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -08001343 self.apply_rules(rules, b"deny ip4/ip6 tcp")
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +02001344
1345 # Traffic should not pass
1346 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1347 self.proto[self.IP][self.TCP])
1348
1349 self.logger.info("ACLP_TEST_FINISH_0112")
1350
1351 def test_0113_udp_deny(self):
1352 """ deny UDPv4/v6 + non-match range
1353 """
1354 self.logger.info("ACLP_TEST_START_0113")
1355
1356 # Add an ACL
1357 rules = []
1358 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1359 self.PORTS_RANGE_2,
1360 self.proto[self.IP][self.UDP]))
1361 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1362 self.PORTS_RANGE_2,
1363 self.proto[self.IP][self.UDP]))
1364 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1365 self.proto[self.IP][self.UDP]))
1366 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1367 self.proto[self.IP][self.UDP]))
1368 # permit ip any any in the end
1369 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1370 self.PORTS_ALL, 0))
1371 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1372 self.PORTS_ALL, 0))
1373
1374 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -08001375 self.apply_rules(rules, b"deny ip4/ip6 udp")
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +02001376
1377 # Traffic should not pass
1378 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1379 self.proto[self.IP][self.UDP])
1380
1381 self.logger.info("ACLP_TEST_FINISH_0113")
1382
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001383 def test_0300_tcp_permit_v4_etype_aaaa(self):
1384 """ permit TCPv4, send 0xAAAA etype
1385 """
1386 self.logger.info("ACLP_TEST_START_0300")
1387
1388 # Add an ACL
1389 rules = []
1390 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1391 self.proto[self.IP][self.TCP]))
1392 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1393 self.proto[self.IP][self.TCP]))
1394 # deny ip any any in the end
1395 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1396
1397 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -08001398 self.apply_rules(rules, b"permit ipv4 tcp")
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001399
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001400 # Traffic should still pass also for an odd ethertype
1401 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1402 0, False, True, 0xaaaa)
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001403 self.logger.info("ACLP_TEST_FINISH_0300")
1404
1405 def test_0305_tcp_permit_v4_etype_blacklist_aaaa(self):
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +02001406 """ permit TCPv4, whitelist 0x0BBB ethertype, send 0xAAAA-blocked
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001407 """
1408 self.logger.info("ACLP_TEST_START_0305")
1409
1410 # Add an ACL
1411 rules = []
1412 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1413 self.proto[self.IP][self.TCP]))
1414 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1415 self.proto[self.IP][self.TCP]))
1416 # deny ip any any in the end
1417 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1418
1419 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -08001420 self.apply_rules(rules, b"permit ipv4 tcp")
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001421
1422 # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1423 self.etype_whitelist([0xbbb], 1)
1424
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001425 # The oddball ethertype should be blocked
1426 self.run_verify_negat_test(self.IP, self.IPV4,
1427 self.proto[self.IP][self.TCP],
1428 0, False, 0xaaaa)
1429
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +02001430 # remove the whitelist
1431 self.etype_whitelist([], 0)
1432
1433 self.logger.info("ACLP_TEST_FINISH_0305")
1434
1435 def test_0306_tcp_permit_v4_etype_blacklist_aaaa(self):
1436 """ permit TCPv4, whitelist 0x0BBB ethertype, send 0x0BBB - pass
1437 """
1438 self.logger.info("ACLP_TEST_START_0306")
1439
1440 # Add an ACL
1441 rules = []
1442 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1443 self.proto[self.IP][self.TCP]))
1444 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1445 self.proto[self.IP][self.TCP]))
1446 # deny ip any any in the end
1447 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1448
1449 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -08001450 self.apply_rules(rules, b"permit ipv4 tcp")
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +02001451
1452 # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1453 self.etype_whitelist([0xbbb], 1)
1454
1455 # The whitelisted traffic, should pass
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001456 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1457 0, False, True, 0x0bbb)
1458
1459 # remove the whitelist, the previously blocked 0xAAAA should pass now
1460 self.etype_whitelist([], 0)
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +02001461
1462 self.logger.info("ACLP_TEST_FINISH_0306")
1463
1464 def test_0307_tcp_permit_v4_etype_blacklist_aaaa(self):
1465 """ permit TCPv4, whitelist 0x0BBB, remove, send 0xAAAA - pass
1466 """
1467 self.logger.info("ACLP_TEST_START_0307")
1468
1469 # Add an ACL
1470 rules = []
1471 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1472 self.proto[self.IP][self.TCP]))
1473 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1474 self.proto[self.IP][self.TCP]))
1475 # deny ip any any in the end
1476 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1477
1478 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -08001479 self.apply_rules(rules, b"permit ipv4 tcp")
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +02001480
1481 # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1482 self.etype_whitelist([0xbbb], 1)
1483 # remove the whitelist, the previously blocked 0xAAAA should pass now
1484 self.etype_whitelist([], 0)
1485
1486 # The whitelisted traffic, should pass
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001487 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1488 0, False, True, 0xaaaa)
1489
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +02001490 self.logger.info("ACLP_TEST_FINISH_0306")
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +02001491
Andrew Yourtchenkode3682f2018-03-23 10:38:46 +01001492 def test_0315_del_intf(self):
1493 """ apply an acl and delete the interface
1494 """
1495 self.logger.info("ACLP_TEST_START_0315")
1496
1497 # Add an ACL
1498 rules = []
1499 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1500 self.proto[self.IP][self.TCP]))
1501 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1502 self.proto[self.IP][self.TCP]))
1503 # deny ip any any in the end
1504 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1505
1506 # create an interface
1507 intf = []
Klement Sekerabeaded52018-06-24 10:30:37 +02001508 intf.append(VppLoInterface(self))
Andrew Yourtchenkode3682f2018-03-23 10:38:46 +01001509
1510 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -08001511 self.apply_rules_to(rules, b"permit ipv4 tcp", intf[0].sw_if_index)
Andrew Yourtchenkode3682f2018-03-23 10:38:46 +01001512
1513 # Remove the interface
1514 intf[0].remove_vpp_config()
1515
1516 self.logger.info("ACLP_TEST_FINISH_0315")
1517
Pavel Kotucek59dda062017-03-02 15:22:47 +01001518if __name__ == '__main__':
1519 unittest.main(testRunner=VppTestRunner)