blob: 53d9621594965ad4576940b7b88be6e7b300af58 [file] [log] [blame]
Renato Botelho do Coutoead1e532019-10-31 13:31:07 -05001#!/usr/bin/env python3
Pavel Kotucek59dda062017-03-02 15:22:47 +01002"""ACL plugin Test Case HLD:
3"""
4
5import unittest
6import random
7
8from scapy.packet import Raw
9from scapy.layers.l2 import Ether
10from scapy.layers.inet import IP, TCP, UDP, ICMP
11from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +000012from scapy.layers.inet6 import IPv6ExtHdrFragment
Pavel Kotucek59dda062017-03-02 15:22:47 +010013from framework import VppTestCase, VppTestRunner
Andrew Yourtchenkoc8eae8d2021-01-20 20:30:36 +000014from framework import tag_fixme_vpp_workers
Pavel Kotucek59dda062017-03-02 15:22:47 +010015from util import Host, ppp
Jakub Grajciar2f8cd912020-03-27 06:55:06 +010016from ipaddress import IPv4Network, IPv6Network
Pavel Kotucek59dda062017-03-02 15:22:47 +010017
Andrew Yourtchenkode3682f2018-03-23 10:38:46 +010018from vpp_lo_interface import VppLoInterface
Jakub Grajciar2f8cd912020-03-27 06:55:06 +010019from vpp_acl import AclRule, VppAcl, VppAclInterface, VppEtypeWhitelist
20from vpp_ip import INVALID_INDEX
Andrew Yourtchenkode3682f2018-03-23 10:38:46 +010021
Pavel Kotucek59dda062017-03-02 15:22:47 +010022
Andrew Yourtchenkoc8eae8d2021-01-20 20:30:36 +000023@tag_fixme_vpp_workers
Pavel Kotucek59dda062017-03-02 15:22:47 +010024class TestACLplugin(VppTestCase):
25 """ ACL plugin Test Case """
26
27 # traffic types
28 IP = 0
29 ICMP = 1
30
31 # IP version
32 IPRANDOM = -1
33 IPV4 = 0
34 IPV6 = 1
35
36 # rule types
37 DENY = 0
38 PERMIT = 1
39
40 # supported protocols
41 proto = [[6, 17], [1, 58]]
42 proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
43 ICMPv4 = 0
44 ICMPv6 = 1
45 TCP = 0
46 UDP = 1
47 PROTO_ALL = 0
48
49 # port ranges
50 PORTS_ALL = -1
51 PORTS_RANGE = 0
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +020052 PORTS_RANGE_2 = 1
Pavel Kotucek59dda062017-03-02 15:22:47 +010053 udp_sport_from = 10
54 udp_sport_to = udp_sport_from + 5
55 udp_dport_from = 20000
56 udp_dport_to = udp_dport_from + 5000
57 tcp_sport_from = 30
58 tcp_sport_to = tcp_sport_from + 5
59 tcp_dport_from = 40000
60 tcp_dport_to = tcp_dport_from + 5000
61
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +020062 udp_sport_from_2 = 90
63 udp_sport_to_2 = udp_sport_from_2 + 5
64 udp_dport_from_2 = 30000
65 udp_dport_to_2 = udp_dport_from_2 + 5000
66 tcp_sport_from_2 = 130
67 tcp_sport_to_2 = tcp_sport_from_2 + 5
68 tcp_dport_from_2 = 20000
69 tcp_dport_to_2 = tcp_dport_from_2 + 5000
70
Pavel Kotucek59dda062017-03-02 15:22:47 +010071 icmp4_type = 8 # echo request
72 icmp4_code = 3
73 icmp6_type = 128 # echo request
74 icmp6_code = 3
75
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +020076 icmp4_type_2 = 8
77 icmp4_code_from_2 = 5
78 icmp4_code_to_2 = 20
79 icmp6_type_2 = 128
80 icmp6_code_from_2 = 8
81 icmp6_code_to_2 = 42
82
Pavel Kotucek59dda062017-03-02 15:22:47 +010083 # Test variables
84 bd_id = 1
85
86 @classmethod
87 def setUpClass(cls):
88 """
89 Perform standard class setup (defined by class method setUpClass in
90 class VppTestCase) before running the test case, set test case related
91 variables and configure VPP.
92 """
93 super(TestACLplugin, cls).setUpClass()
94
Pavel Kotucek59dda062017-03-02 15:22:47 +010095 try:
96 # Create 2 pg interfaces
97 cls.create_pg_interfaces(range(2))
98
99 # Packet flows mapping pg0 -> pg1, pg2 etc.
100 cls.flows = dict()
101 cls.flows[cls.pg0] = [cls.pg1]
102
103 # Packet sizes
104 cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
105
106 # Create BD with MAC learning and unknown unicast flooding disabled
107 # and put interfaces to this BD
108 cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
109 learn=1)
110 for pg_if in cls.pg_interfaces:
Ole Troana5b2eec2019-03-11 19:23:25 +0100111 cls.vapi.sw_interface_set_l2_bridge(
112 rx_sw_if_index=pg_if.sw_if_index, bd_id=cls.bd_id)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100113
114 # Set up all interfaces
115 for i in cls.pg_interfaces:
116 i.admin_up()
117
118 # Mapping between packet-generator index and lists of test hosts
119 cls.hosts_by_pg_idx = dict()
120 for pg_if in cls.pg_interfaces:
121 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
122
123 # Create list of deleted hosts
124 cls.deleted_hosts_by_pg_idx = dict()
125 for pg_if in cls.pg_interfaces:
126 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
127
128 # warm-up the mac address tables
129 # self.warmup_test()
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +0200130 count = 16
131 start = 0
132 n_int = len(cls.pg_interfaces)
snaramre2bb71512019-10-16 22:15:43 +0000133 macs_per_if = count // n_int
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +0200134 i = -1
135 for pg_if in cls.pg_interfaces:
136 i += 1
137 start_nr = macs_per_if * i + start
138 end_nr = count + start if i == (n_int - 1) \
139 else macs_per_if * (i + 1) + start
140 hosts = cls.hosts_by_pg_idx[pg_if.sw_if_index]
snaramre2bb71512019-10-16 22:15:43 +0000141 for j in range(int(start_nr), int(end_nr)):
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +0200142 host = Host(
143 "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
144 "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
145 "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
146 hosts.append(host)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100147
148 except Exception:
149 super(TestACLplugin, cls).tearDownClass()
150 raise
151
Paul Vinciguerra7f9b7f92019-03-12 19:23:27 -0700152 @classmethod
153 def tearDownClass(cls):
154 super(TestACLplugin, cls).tearDownClass()
155
Pavel Kotucek59dda062017-03-02 15:22:47 +0100156 def setUp(self):
157 super(TestACLplugin, self).setUp()
158 self.reset_packet_infos()
159
160 def tearDown(self):
161 """
162 Show various debug prints after each test.
163 """
164 super(TestACLplugin, self).tearDown()
Paul Vinciguerra90cf21b2019-03-13 09:23:05 -0700165
166 def show_commands_at_teardown(self):
167 cli = "show vlib graph l2-input-feat-arc"
168 self.logger.info(self.vapi.ppcli(cli))
169 cli = "show vlib graph l2-input-feat-arc-end"
170 self.logger.info(self.vapi.ppcli(cli))
171 cli = "show vlib graph l2-output-feat-arc"
172 self.logger.info(self.vapi.ppcli(cli))
173 cli = "show vlib graph l2-output-feat-arc-end"
174 self.logger.info(self.vapi.ppcli(cli))
175 self.logger.info(self.vapi.ppcli("show l2fib verbose"))
176 self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
177 self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
178 self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
179 self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
180 % self.bd_id))
Pavel Kotucek59dda062017-03-02 15:22:47 +0100181
Pavel Kotucek59dda062017-03-02 15:22:47 +0100182 def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
Jakub Grajciar2f8cd912020-03-27 06:55:06 +0100183 s_prefix=0, s_ip=0,
184 d_prefix=0, d_ip=0):
185 if ip:
186 src_prefix = IPv6Network((s_ip, s_prefix))
187 dst_prefix = IPv6Network((d_ip, d_prefix))
Pavel Kotucek59dda062017-03-02 15:22:47 +0100188 else:
Jakub Grajciar2f8cd912020-03-27 06:55:06 +0100189 src_prefix = IPv4Network((s_ip, s_prefix))
190 dst_prefix = IPv4Network((d_ip, d_prefix))
191 return AclRule(is_permit=permit_deny, ports=ports, proto=proto,
192 src_prefix=src_prefix, dst_prefix=dst_prefix)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100193
Jakub Grajciar2f8cd912020-03-27 06:55:06 +0100194 def apply_rules(self, rules, tag=None):
195 acl = VppAcl(self, rules, tag=tag)
196 acl.add_vpp_config()
197 self.logger.info("Dumped ACL: " + str(acl.dump()))
Pavel Kotucek59dda062017-03-02 15:22:47 +0100198 # Apply a ACL on the interface as inbound
199 for i in self.pg_interfaces:
Jakub Grajciar2f8cd912020-03-27 06:55:06 +0100200 acl_if = VppAclInterface(
201 self, sw_if_index=i.sw_if_index, n_input=1, acls=[acl])
202 acl_if.add_vpp_config()
203 return acl.acl_index
Pavel Kotucek59dda062017-03-02 15:22:47 +0100204
Jakub Grajciar2f8cd912020-03-27 06:55:06 +0100205 def apply_rules_to(self, rules, tag=None, sw_if_index=INVALID_INDEX):
206 acl = VppAcl(self, rules, tag=tag)
207 acl.add_vpp_config()
208 self.logger.info("Dumped ACL: " + str(acl.dump()))
Andrew Yourtchenkode3682f2018-03-23 10:38:46 +0100209 # Apply a ACL on the interface as inbound
Jakub Grajciar2f8cd912020-03-27 06:55:06 +0100210 acl_if = VppAclInterface(self, sw_if_index=sw_if_index, n_input=1,
211 acls=[acl])
212 return acl.acl_index
Andrew Yourtchenkode3682f2018-03-23 10:38:46 +0100213
Jakub Grajciar2f8cd912020-03-27 06:55:06 +0100214 def etype_whitelist(self, whitelist, n_input, add=True):
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100215 # Apply whitelists on all the interfaces
Jakub Grajciar2f8cd912020-03-27 06:55:06 +0100216 if add:
217 self._wl = []
218 for i in self.pg_interfaces:
219 self._wl.append(VppEtypeWhitelist(
220 self, sw_if_index=i.sw_if_index, whitelist=whitelist,
221 n_input=n_input).add_vpp_config())
222 else:
223 if hasattr(self, "_wl"):
224 for wl in self._wl:
225 wl.remove_vpp_config()
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100226
Pavel Kotucek59dda062017-03-02 15:22:47 +0100227 def create_upper_layer(self, packet_index, proto, ports=0):
228 p = self.proto_map[proto]
229 if p == 'UDP':
230 if ports == 0:
231 return UDP(sport=random.randint(self.udp_sport_from,
232 self.udp_sport_to),
233 dport=random.randint(self.udp_dport_from,
234 self.udp_dport_to))
235 else:
236 return UDP(sport=ports, dport=ports)
237 elif p == 'TCP':
238 if ports == 0:
239 return TCP(sport=random.randint(self.tcp_sport_from,
240 self.tcp_sport_to),
241 dport=random.randint(self.tcp_dport_from,
242 self.tcp_dport_to))
243 else:
244 return TCP(sport=ports, dport=ports)
245 return ''
246
247 def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100248 proto=-1, ports=0, fragments=False,
249 pkt_raw=True, etype=-1):
Pavel Kotucek59dda062017-03-02 15:22:47 +0100250 """
251 Create input packet stream for defined interface using hosts or
252 deleted_hosts list.
253
254 :param object src_if: Interface to create packet stream for.
255 :param list packet_sizes: List of required packet sizes.
256 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
257 :return: Stream of packets.
258 """
259 pkts = []
260 if self.flows.__contains__(src_if):
261 src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
262 for dst_if in self.flows[src_if]:
263 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
264 n_int = len(dst_hosts) * len(src_hosts)
265 for i in range(0, n_int):
snaramre2bb71512019-10-16 22:15:43 +0000266 dst_host = dst_hosts[int(i / len(src_hosts))]
Pavel Kotucek59dda062017-03-02 15:22:47 +0100267 src_host = src_hosts[i % len(src_hosts)]
268 pkt_info = self.create_packet_info(src_if, dst_if)
269 if ipv6 == 1:
270 pkt_info.ip = 1
271 elif ipv6 == 0:
272 pkt_info.ip = 0
273 else:
274 pkt_info.ip = random.choice([0, 1])
275 if proto == -1:
276 pkt_info.proto = random.choice(self.proto[self.IP])
277 else:
278 pkt_info.proto = proto
279 payload = self.info_to_payload(pkt_info)
280 p = Ether(dst=dst_host.mac, src=src_host.mac)
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100281 if etype > 0:
282 p = Ether(dst=dst_host.mac,
283 src=src_host.mac,
284 type=etype)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100285 if pkt_info.ip:
286 p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000287 if fragments:
288 p /= IPv6ExtHdrFragment(offset=64, m=1)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100289 else:
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000290 if fragments:
291 p /= IP(src=src_host.ip4, dst=dst_host.ip4,
292 flags=1, frag=64)
293 else:
294 p /= IP(src=src_host.ip4, dst=dst_host.ip4)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100295 if traffic_type == self.ICMP:
296 if pkt_info.ip:
297 p /= ICMPv6EchoRequest(type=self.icmp6_type,
298 code=self.icmp6_code)
299 else:
300 p /= ICMP(type=self.icmp4_type,
301 code=self.icmp4_code)
302 else:
303 p /= self.create_upper_layer(i, pkt_info.proto, ports)
Pavel Kotuceke7b67342017-04-18 13:12:20 +0200304 if pkt_raw:
305 p /= Raw(payload)
306 pkt_info.data = p.copy()
307 if pkt_raw:
308 size = random.choice(packet_sizes)
309 self.extend_packet(p, size)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100310 pkts.append(p)
311 return pkts
312
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100313 def verify_capture(self, pg_if, capture,
314 traffic_type=0, ip_type=0, etype=-1):
Pavel Kotucek59dda062017-03-02 15:22:47 +0100315 """
316 Verify captured input packet stream for defined interface.
317
318 :param object pg_if: Interface to verify captured packet stream for.
319 :param list capture: Captured packet stream.
320 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
321 """
322 last_info = dict()
323 for i in self.pg_interfaces:
324 last_info[i.sw_if_index] = None
325 dst_sw_if_index = pg_if.sw_if_index
326 for packet in capture:
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100327 if etype > 0:
328 if packet[Ether].type != etype:
329 self.logger.error(ppp("Unexpected ethertype in packet:",
330 packet))
331 else:
332 continue
Pavel Kotucek59dda062017-03-02 15:22:47 +0100333 try:
334 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
335 if traffic_type == self.ICMP and ip_type == self.IPV6:
336 payload_info = self.payload_to_info(
Paul Vinciguerraeaea4212019-03-06 11:58:06 -0800337 packet[ICMPv6EchoRequest], 'data')
Pavel Kotucek59dda062017-03-02 15:22:47 +0100338 payload = packet[ICMPv6EchoRequest]
339 else:
Paul Vinciguerraeaea4212019-03-06 11:58:06 -0800340 payload_info = self.payload_to_info(packet[Raw])
Pavel Kotucek59dda062017-03-02 15:22:47 +0100341 payload = packet[self.proto_map[payload_info.proto]]
342 except:
343 self.logger.error(ppp("Unexpected or invalid packet "
344 "(outside network):", packet))
345 raise
346
347 if ip_type != 0:
348 self.assertEqual(payload_info.ip, ip_type)
349 if traffic_type == self.ICMP:
350 try:
351 if payload_info.ip == 0:
352 self.assertEqual(payload.type, self.icmp4_type)
353 self.assertEqual(payload.code, self.icmp4_code)
354 else:
355 self.assertEqual(payload.type, self.icmp6_type)
356 self.assertEqual(payload.code, self.icmp6_code)
357 except:
358 self.logger.error(ppp("Unexpected or invalid packet "
359 "(outside network):", packet))
360 raise
361 else:
362 try:
363 ip_version = IPv6 if payload_info.ip == 1 else IP
364
365 ip = packet[ip_version]
366 packet_index = payload_info.index
367
368 self.assertEqual(payload_info.dst, dst_sw_if_index)
369 self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
370 (pg_if.name, payload_info.src,
371 packet_index))
372 next_info = self.get_next_packet_info_for_interface2(
373 payload_info.src, dst_sw_if_index,
374 last_info[payload_info.src])
375 last_info[payload_info.src] = next_info
376 self.assertTrue(next_info is not None)
377 self.assertEqual(packet_index, next_info.index)
378 saved_packet = next_info.data
379 # Check standard fields
380 self.assertEqual(ip.src, saved_packet[ip_version].src)
381 self.assertEqual(ip.dst, saved_packet[ip_version].dst)
382 p = self.proto_map[payload_info.proto]
383 if p == 'TCP':
384 tcp = packet[TCP]
385 self.assertEqual(tcp.sport, saved_packet[
386 TCP].sport)
387 self.assertEqual(tcp.dport, saved_packet[
388 TCP].dport)
389 elif p == 'UDP':
390 udp = packet[UDP]
391 self.assertEqual(udp.sport, saved_packet[
392 UDP].sport)
393 self.assertEqual(udp.dport, saved_packet[
394 UDP].dport)
395 except:
396 self.logger.error(ppp("Unexpected or invalid packet:",
397 packet))
398 raise
399 for i in self.pg_interfaces:
400 remaining_packet = self.get_next_packet_info_for_interface2(
401 i, dst_sw_if_index, last_info[i.sw_if_index])
402 self.assertTrue(
403 remaining_packet is None,
404 "Port %u: Packet expected from source %u didn't arrive" %
405 (dst_sw_if_index, i.sw_if_index))
406
407 def run_traffic_no_check(self):
408 # Test
409 # Create incoming packet streams for packet-generator interfaces
410 for i in self.pg_interfaces:
411 if self.flows.__contains__(i):
412 pkts = self.create_stream(i, self.pg_if_packet_sizes)
413 if len(pkts) > 0:
414 i.add_stream(pkts)
415
416 # Enable packet capture and start packet sending
417 self.pg_enable_capture(self.pg_interfaces)
418 self.pg_start()
419
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000420 def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100421 frags=False, pkt_raw=True, etype=-1):
Pavel Kotucek59dda062017-03-02 15:22:47 +0100422 # Test
423 # Create incoming packet streams for packet-generator interfaces
424 pkts_cnt = 0
425 for i in self.pg_interfaces:
426 if self.flows.__contains__(i):
427 pkts = self.create_stream(i, self.pg_if_packet_sizes,
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000428 traffic_type, ip_type, proto, ports,
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100429 frags, pkt_raw, etype)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100430 if len(pkts) > 0:
431 i.add_stream(pkts)
432 pkts_cnt += len(pkts)
433
434 # Enable packet capture and start packet sendingself.IPV
435 self.pg_enable_capture(self.pg_interfaces)
436 self.pg_start()
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +0200437 self.logger.info("sent packets count: %d" % pkts_cnt)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100438
439 # Verify
440 # Verify outgoing packet streams per packet-generator interface
441 for src_if in self.pg_interfaces:
442 if self.flows.__contains__(src_if):
443 for dst_if in self.flows[src_if]:
444 capture = dst_if.get_capture(pkts_cnt)
445 self.logger.info("Verifying capture on interface %s" %
446 dst_if.name)
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100447 self.verify_capture(dst_if, capture,
448 traffic_type, ip_type, etype)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100449
450 def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100451 ports=0, frags=False, etype=-1):
Pavel Kotucek59dda062017-03-02 15:22:47 +0100452 # Test
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +0200453 pkts_cnt = 0
Pavel Kotucek59dda062017-03-02 15:22:47 +0100454 self.reset_packet_infos()
455 for i in self.pg_interfaces:
456 if self.flows.__contains__(i):
457 pkts = self.create_stream(i, self.pg_if_packet_sizes,
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000458 traffic_type, ip_type, proto, ports,
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100459 frags, True, etype)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100460 if len(pkts) > 0:
461 i.add_stream(pkts)
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +0200462 pkts_cnt += len(pkts)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100463
464 # Enable packet capture and start packet sending
465 self.pg_enable_capture(self.pg_interfaces)
466 self.pg_start()
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +0200467 self.logger.info("sent packets count: %d" % pkts_cnt)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100468
469 # Verify
470 # Verify outgoing packet streams per packet-generator interface
471 for src_if in self.pg_interfaces:
472 if self.flows.__contains__(src_if):
473 for dst_if in self.flows[src_if]:
474 self.logger.info("Verifying capture on interface %s" %
475 dst_if.name)
476 capture = dst_if.get_capture(0)
477 self.assertEqual(len(capture), 0)
478
Pavel Kotucek59dda062017-03-02 15:22:47 +0100479 def test_0000_warmup_test(self):
480 """ ACL plugin version check; learn MACs
481 """
Pavel Kotucek59dda062017-03-02 15:22:47 +0100482 reply = self.vapi.papi.acl_plugin_get_version()
483 self.assertEqual(reply.major, 1)
484 self.logger.info("Working with ACL plugin version: %d.%d" % (
485 reply.major, reply.minor))
486 # minor version changes are non breaking
487 # self.assertEqual(reply.minor, 0)
488
489 def test_0001_acl_create(self):
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200490 """ ACL create/delete test
Pavel Kotucek59dda062017-03-02 15:22:47 +0100491 """
492
493 self.logger.info("ACLP_TEST_START_0001")
Jakub Grajciar2f8cd912020-03-27 06:55:06 +0100494 # Create a permit-1234 ACL
495 r = [AclRule(is_permit=1, proto=17, ports=1234, sport_to=1235)]
Pavel Kotucek59dda062017-03-02 15:22:47 +0100496 # Test 1: add a new ACL
Jakub Grajciar2f8cd912020-03-27 06:55:06 +0100497 first_acl = VppAcl(self, rules=r, tag="permit 1234")
498 first_acl.add_vpp_config()
499 self.assertTrue(first_acl.query_vpp_config())
Pavel Kotucek59dda062017-03-02 15:22:47 +0100500 # The very first ACL gets #0
Jakub Grajciar2f8cd912020-03-27 06:55:06 +0100501 self.assertEqual(first_acl.acl_index, 0)
502 rr = first_acl.dump()
Pavel Kotucek59dda062017-03-02 15:22:47 +0100503 self.logger.info("Dumped ACL: " + str(rr))
504 self.assertEqual(len(rr), 1)
505 # We should have the same number of ACL entries as we had asked
506 self.assertEqual(len(rr[0].r), len(r))
507 # The rules should be the same. But because the submitted and returned
508 # are different types, we need to iterate over rules and keys to get
509 # to basic values.
510 for i_rule in range(0, len(r) - 1):
Jakub Grajciar2f8cd912020-03-27 06:55:06 +0100511 encoded_rule = r[i_rule].encode()
512 for rule_key in encoded_rule:
Pavel Kotucek59dda062017-03-02 15:22:47 +0100513 self.assertEqual(rr[0].r[i_rule][rule_key],
Jakub Grajciar2f8cd912020-03-27 06:55:06 +0100514 encoded_rule[rule_key])
Pavel Kotucek59dda062017-03-02 15:22:47 +0100515
Jakub Grajciar2f8cd912020-03-27 06:55:06 +0100516 # Create a deny-1234 ACL
517 r_deny = [AclRule(is_permit=0, proto=17, ports=1234, sport_to=1235),
518 AclRule(is_permit=1, proto=17, ports=0)]
519 second_acl = VppAcl(self, rules=r_deny, tag="deny 1234;permit all")
520 second_acl.add_vpp_config()
521 self.assertTrue(second_acl.query_vpp_config())
Pavel Kotucek59dda062017-03-02 15:22:47 +0100522 # The second ACL gets #1
Jakub Grajciar2f8cd912020-03-27 06:55:06 +0100523 self.assertEqual(second_acl.acl_index, 1)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100524
525 # Test 2: try to modify a nonexistent ACL
Jakub Grajciar2f8cd912020-03-27 06:55:06 +0100526 invalid_acl = VppAcl(self, acl_index=432, rules=r, tag="FFFF:FFFF")
527 reply = invalid_acl.add_vpp_config(expect_error=True)
Jakub Grajciaraad1ee12020-03-11 12:47:32 +0100528
Jakub Grajciar2f8cd912020-03-27 06:55:06 +0100529 # apply an ACL on an interface inbound, try to delete ACL, must fail
530 acl_if_list = VppAclInterface(
531 self, sw_if_index=self.pg0.sw_if_index, n_input=1,
532 acls=[first_acl])
533 acl_if_list.add_vpp_config()
534 first_acl.remove_vpp_config(expect_error=True)
535 # Unapply an ACL and then try to delete it - must be ok
536 acl_if_list.remove_vpp_config()
537 first_acl.remove_vpp_config()
538
539 # apply an ACL on an interface inbound, try to delete ACL, must fail
540 acl_if_list = VppAclInterface(
541 self, sw_if_index=self.pg0.sw_if_index, n_input=0,
542 acls=[second_acl])
543 acl_if_list.add_vpp_config()
544 second_acl.remove_vpp_config(expect_error=True)
545 # Unapply an ACL and then try to delete it - must be ok
546 acl_if_list.remove_vpp_config()
547 second_acl.remove_vpp_config()
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200548
549 # try to apply a nonexistent ACL - must fail
Jakub Grajciar2f8cd912020-03-27 06:55:06 +0100550 acl_if_list = VppAclInterface(
551 self, sw_if_index=self.pg0.sw_if_index, n_input=0,
552 acls=[invalid_acl])
553 acl_if_list.add_vpp_config(expect_error=True)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100554
555 self.logger.info("ACLP_TEST_FINISH_0001")
556
557 def test_0002_acl_permit_apply(self):
558 """ permit ACL apply test
559 """
560 self.logger.info("ACLP_TEST_START_0002")
561
562 rules = []
563 rules.append(self.create_rule(self.IPV4, self.PERMIT,
Jakub Grajciar2f8cd912020-03-27 06:55:06 +0100564 0, self.proto[self.IP][self.UDP]))
Pavel Kotucek59dda062017-03-02 15:22:47 +0100565 rules.append(self.create_rule(self.IPV4, self.PERMIT,
Jakub Grajciar2f8cd912020-03-27 06:55:06 +0100566 0, self.proto[self.IP][self.TCP]))
Pavel Kotucek59dda062017-03-02 15:22:47 +0100567
568 # Apply rules
Jakub Grajciar2f8cd912020-03-27 06:55:06 +0100569 acl_idx = self.apply_rules(rules, "permit per-flow")
Andrew Yourtchenkof995c712019-06-13 15:23:21 +0000570
571 # enable counters
572 reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=1)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100573
574 # Traffic should still pass
575 self.run_verify_test(self.IP, self.IPV4, -1)
Andrew Yourtchenkof995c712019-06-13 15:23:21 +0000576
577 matches = self.statistics.get_counter('/acl/%d/matches' % acl_idx)
578 self.logger.info("stat segment counters: %s" % repr(matches))
579 cli = "show acl-plugin acl"
580 self.logger.info(self.vapi.ppcli(cli))
581 cli = "show acl-plugin tables"
582 self.logger.info(self.vapi.ppcli(cli))
583
584 total_hits = matches[0][0]['packets'] + matches[0][1]['packets']
585 self.assertEqual(total_hits, 64)
586
587 # disable counters
588 reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=0)
589
Pavel Kotucek59dda062017-03-02 15:22:47 +0100590 self.logger.info("ACLP_TEST_FINISH_0002")
591
592 def test_0003_acl_deny_apply(self):
593 """ deny ACL apply test
594 """
595 self.logger.info("ACLP_TEST_START_0003")
596 # Add a deny-flows ACL
597 rules = []
Jakub Grajciar2f8cd912020-03-27 06:55:06 +0100598 rules.append(self.create_rule(
599 self.IPV4, self.DENY, self.PORTS_ALL,
600 self.proto[self.IP][self.UDP]))
Pavel Kotucek59dda062017-03-02 15:22:47 +0100601 # Permit ip any any in the end
602 rules.append(self.create_rule(self.IPV4, self.PERMIT,
603 self.PORTS_ALL, 0))
604
605 # Apply rules
Jakub Grajciar2f8cd912020-03-27 06:55:06 +0100606 acl_idx = self.apply_rules(rules, "deny per-flow;permit all")
Andrew Yourtchenkof995c712019-06-13 15:23:21 +0000607
608 # enable counters
609 reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=1)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100610
611 # Traffic should not pass
612 self.run_verify_negat_test(self.IP, self.IPV4,
613 self.proto[self.IP][self.UDP])
Andrew Yourtchenkof995c712019-06-13 15:23:21 +0000614
615 matches = self.statistics.get_counter('/acl/%d/matches' % acl_idx)
616 self.logger.info("stat segment counters: %s" % repr(matches))
617 cli = "show acl-plugin acl"
618 self.logger.info(self.vapi.ppcli(cli))
619 cli = "show acl-plugin tables"
620 self.logger.info(self.vapi.ppcli(cli))
621 self.assertEqual(matches[0][0]['packets'], 64)
622 # disable counters
623 reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=0)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100624 self.logger.info("ACLP_TEST_FINISH_0003")
Andrew Yourtchenkof995c712019-06-13 15:23:21 +0000625 # self.assertEqual(, 0)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100626
627 def test_0004_vpp624_permit_icmpv4(self):
628 """ VPP_624 permit ICMPv4
629 """
630 self.logger.info("ACLP_TEST_START_0004")
631
632 # Add an ACL
633 rules = []
634 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
635 self.proto[self.ICMP][self.ICMPv4]))
636 # deny ip any any in the end
637 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
638
639 # Apply rules
Jakub Grajciar2f8cd912020-03-27 06:55:06 +0100640 self.apply_rules(rules, "permit icmpv4")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100641
642 # Traffic should still pass
643 self.run_verify_test(self.ICMP, self.IPV4,
644 self.proto[self.ICMP][self.ICMPv4])
645
646 self.logger.info("ACLP_TEST_FINISH_0004")
647
648 def test_0005_vpp624_permit_icmpv6(self):
649 """ VPP_624 permit ICMPv6
650 """
651 self.logger.info("ACLP_TEST_START_0005")
652
653 # Add an ACL
654 rules = []
655 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
656 self.proto[self.ICMP][self.ICMPv6]))
657 # deny ip any any in the end
658 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
659
660 # Apply rules
Jakub Grajciar2f8cd912020-03-27 06:55:06 +0100661 self.apply_rules(rules, "permit icmpv6")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100662
663 # Traffic should still pass
664 self.run_verify_test(self.ICMP, self.IPV6,
665 self.proto[self.ICMP][self.ICMPv6])
666
667 self.logger.info("ACLP_TEST_FINISH_0005")
668
669 def test_0006_vpp624_deny_icmpv4(self):
670 """ VPP_624 deny ICMPv4
671 """
672 self.logger.info("ACLP_TEST_START_0006")
673 # Add an ACL
674 rules = []
675 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
676 self.proto[self.ICMP][self.ICMPv4]))
677 # permit ip any any in the end
678 rules.append(self.create_rule(self.IPV4, self.PERMIT,
679 self.PORTS_ALL, 0))
680
681 # Apply rules
Jakub Grajciar2f8cd912020-03-27 06:55:06 +0100682 self.apply_rules(rules, "deny icmpv4")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100683
684 # Traffic should not pass
685 self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
686
687 self.logger.info("ACLP_TEST_FINISH_0006")
688
689 def test_0007_vpp624_deny_icmpv6(self):
690 """ VPP_624 deny ICMPv6
691 """
692 self.logger.info("ACLP_TEST_START_0007")
693 # Add an ACL
694 rules = []
695 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
696 self.proto[self.ICMP][self.ICMPv6]))
697 # deny ip any any in the end
698 rules.append(self.create_rule(self.IPV6, self.PERMIT,
699 self.PORTS_ALL, 0))
700
701 # Apply rules
Jakub Grajciar2f8cd912020-03-27 06:55:06 +0100702 self.apply_rules(rules, "deny icmpv6")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100703
704 # Traffic should not pass
705 self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
706
707 self.logger.info("ACLP_TEST_FINISH_0007")
708
709 def test_0008_tcp_permit_v4(self):
710 """ permit TCPv4
711 """
712 self.logger.info("ACLP_TEST_START_0008")
713
714 # Add an ACL
715 rules = []
716 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
Jakub Grajciar2f8cd912020-03-27 06:55:06 +0100717 self.proto[self.IP][self.TCP]))
Pavel Kotucek59dda062017-03-02 15:22:47 +0100718 # deny ip any any in the end
719 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
720
721 # Apply rules
Jakub Grajciar2f8cd912020-03-27 06:55:06 +0100722 self.apply_rules(rules, "permit ipv4 tcp")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100723
724 # Traffic should still pass
725 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
726
727 self.logger.info("ACLP_TEST_FINISH_0008")
728
729 def test_0009_tcp_permit_v6(self):
730 """ permit TCPv6
731 """
732 self.logger.info("ACLP_TEST_START_0009")
733
734 # Add an ACL
735 rules = []
736 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
737 self.proto[self.IP][self.TCP]))
738 # deny ip any any in the end
739 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
740
741 # Apply rules
Jakub Grajciar2f8cd912020-03-27 06:55:06 +0100742 self.apply_rules(rules, "permit ip6 tcp")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100743
744 # Traffic should still pass
745 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
746
747 self.logger.info("ACLP_TEST_FINISH_0008")
748
749 def test_0010_udp_permit_v4(self):
750 """ permit UDPv4
751 """
752 self.logger.info("ACLP_TEST_START_0010")
753
754 # Add an ACL
755 rules = []
756 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
757 self.proto[self.IP][self.UDP]))
758 # deny ip any any in the end
759 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
760
761 # Apply rules
Jakub Grajciar2f8cd912020-03-27 06:55:06 +0100762 self.apply_rules(rules, "permit ipv udp")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100763
764 # Traffic should still pass
765 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
766
767 self.logger.info("ACLP_TEST_FINISH_0010")
768
769 def test_0011_udp_permit_v6(self):
770 """ permit UDPv6
771 """
772 self.logger.info("ACLP_TEST_START_0011")
773
774 # Add an ACL
775 rules = []
776 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
777 self.proto[self.IP][self.UDP]))
778 # deny ip any any in the end
779 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
780
781 # Apply rules
Jakub Grajciar2f8cd912020-03-27 06:55:06 +0100782 self.apply_rules(rules, "permit ip6 udp")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100783
784 # Traffic should still pass
785 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
786
787 self.logger.info("ACLP_TEST_FINISH_0011")
788
789 def test_0012_tcp_deny(self):
790 """ deny TCPv4/v6
791 """
792 self.logger.info("ACLP_TEST_START_0012")
793
794 # Add an ACL
795 rules = []
796 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
797 self.proto[self.IP][self.TCP]))
798 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
799 self.proto[self.IP][self.TCP]))
800 # permit ip any any in the end
801 rules.append(self.create_rule(self.IPV4, self.PERMIT,
802 self.PORTS_ALL, 0))
803 rules.append(self.create_rule(self.IPV6, self.PERMIT,
804 self.PORTS_ALL, 0))
805
806 # Apply rules
Jakub Grajciar2f8cd912020-03-27 06:55:06 +0100807 self.apply_rules(rules, "deny ip4/ip6 tcp")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100808
809 # Traffic should not pass
810 self.run_verify_negat_test(self.IP, self.IPRANDOM,
811 self.proto[self.IP][self.TCP])
812
813 self.logger.info("ACLP_TEST_FINISH_0012")
814
815 def test_0013_udp_deny(self):
816 """ deny UDPv4/v6
817 """
818 self.logger.info("ACLP_TEST_START_0013")
819
820 # Add an ACL
821 rules = []
822 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
823 self.proto[self.IP][self.UDP]))
824 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
825 self.proto[self.IP][self.UDP]))
826 # permit ip any any in the end
827 rules.append(self.create_rule(self.IPV4, self.PERMIT,
828 self.PORTS_ALL, 0))
829 rules.append(self.create_rule(self.IPV6, self.PERMIT,
830 self.PORTS_ALL, 0))
831
832 # Apply rules
Jakub Grajciar2f8cd912020-03-27 06:55:06 +0100833 self.apply_rules(rules, "deny ip4/ip6 udp")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100834
835 # Traffic should not pass
836 self.run_verify_negat_test(self.IP, self.IPRANDOM,
837 self.proto[self.IP][self.UDP])
838
839 self.logger.info("ACLP_TEST_FINISH_0013")
840
841 def test_0014_acl_dump(self):
842 """ verify add/dump acls
843 """
844 self.logger.info("ACLP_TEST_START_0014")
845
846 r = [[self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
847 [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
848 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
849 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
850 [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
851 [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
852 [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
853 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
854 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
855 [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
856 [self.IPV4, self.DENY, self.PORTS_ALL, 0],
857 [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
858 [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
859 [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
860 [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
861 [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
862 [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
863 [self.IPV6, self.DENY, self.PORTS_ALL, 0]
864 ]
865
866 # Add and verify new ACLs
867 rules = []
868 for i in range(len(r)):
869 rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
870
Jakub Grajciar2f8cd912020-03-27 06:55:06 +0100871 acl = VppAcl(self, rules=rules)
872 acl.add_vpp_config()
873 result = acl.dump()
Pavel Kotucek59dda062017-03-02 15:22:47 +0100874
875 i = 0
876 for drules in result:
877 for dr in drules.r:
Pavel Kotucek59dda062017-03-02 15:22:47 +0100878 self.assertEqual(dr.is_permit, r[i][1])
879 self.assertEqual(dr.proto, r[i][3])
880
881 if r[i][2] > 0:
882 self.assertEqual(dr.srcport_or_icmptype_first, r[i][2])
883 else:
884 if r[i][2] < 0:
885 self.assertEqual(dr.srcport_or_icmptype_first, 0)
886 self.assertEqual(dr.srcport_or_icmptype_last, 65535)
887 else:
888 if dr.proto == self.proto[self.IP][self.TCP]:
889 self.assertGreater(dr.srcport_or_icmptype_first,
890 self.tcp_sport_from-1)
891 self.assertLess(dr.srcport_or_icmptype_first,
892 self.tcp_sport_to+1)
893 self.assertGreater(dr.dstport_or_icmpcode_last,
894 self.tcp_dport_from-1)
895 self.assertLess(dr.dstport_or_icmpcode_last,
896 self.tcp_dport_to+1)
897 elif dr.proto == self.proto[self.IP][self.UDP]:
898 self.assertGreater(dr.srcport_or_icmptype_first,
899 self.udp_sport_from-1)
900 self.assertLess(dr.srcport_or_icmptype_first,
901 self.udp_sport_to+1)
902 self.assertGreater(dr.dstport_or_icmpcode_last,
903 self.udp_dport_from-1)
904 self.assertLess(dr.dstport_or_icmpcode_last,
905 self.udp_dport_to+1)
906 i += 1
907
908 self.logger.info("ACLP_TEST_FINISH_0014")
909
910 def test_0015_tcp_permit_port_v4(self):
911 """ permit single TCPv4
912 """
913 self.logger.info("ACLP_TEST_START_0015")
914
Andrew Yourtchenkoec574ff2019-10-03 07:55:52 +0000915 port = random.randint(16384, 65535)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100916 # Add an ACL
917 rules = []
918 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
919 self.proto[self.IP][self.TCP]))
920 # deny ip any any in the end
921 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
922
923 # Apply rules
Jakub Grajciar2f8cd912020-03-27 06:55:06 +0100924 self.apply_rules(rules, "permit ip4 tcp %d" % port)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100925
926 # Traffic should still pass
927 self.run_verify_test(self.IP, self.IPV4,
928 self.proto[self.IP][self.TCP], port)
929
930 self.logger.info("ACLP_TEST_FINISH_0015")
931
932 def test_0016_udp_permit_port_v4(self):
933 """ permit single UDPv4
934 """
935 self.logger.info("ACLP_TEST_START_0016")
936
Andrew Yourtchenkoec574ff2019-10-03 07:55:52 +0000937 port = random.randint(16384, 65535)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100938 # Add an ACL
939 rules = []
940 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
941 self.proto[self.IP][self.UDP]))
942 # deny ip any any in the end
943 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
944
945 # Apply rules
Jakub Grajciar2f8cd912020-03-27 06:55:06 +0100946 self.apply_rules(rules, "permit ip4 tcp %d" % port)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100947
948 # Traffic should still pass
949 self.run_verify_test(self.IP, self.IPV4,
950 self.proto[self.IP][self.UDP], port)
951
952 self.logger.info("ACLP_TEST_FINISH_0016")
953
954 def test_0017_tcp_permit_port_v6(self):
955 """ permit single TCPv6
956 """
957 self.logger.info("ACLP_TEST_START_0017")
958
Andrew Yourtchenkoec574ff2019-10-03 07:55:52 +0000959 port = random.randint(16384, 65535)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100960 # Add an ACL
961 rules = []
962 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
963 self.proto[self.IP][self.TCP]))
964 # deny ip any any in the end
965 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
966
967 # Apply rules
Jakub Grajciar2f8cd912020-03-27 06:55:06 +0100968 self.apply_rules(rules, "permit ip4 tcp %d" % port)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100969
970 # Traffic should still pass
971 self.run_verify_test(self.IP, self.IPV6,
972 self.proto[self.IP][self.TCP], port)
973
974 self.logger.info("ACLP_TEST_FINISH_0017")
975
976 def test_0018_udp_permit_port_v6(self):
Jakub Grajciar2f8cd912020-03-27 06:55:06 +0100977 """ permit single UDPv6
Pavel Kotucek59dda062017-03-02 15:22:47 +0100978 """
979 self.logger.info("ACLP_TEST_START_0018")
980
Andrew Yourtchenkoec574ff2019-10-03 07:55:52 +0000981 port = random.randint(16384, 65535)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100982 # Add an ACL
983 rules = []
984 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
985 self.proto[self.IP][self.UDP]))
986 # deny ip any any in the end
987 rules.append(self.create_rule(self.IPV6, self.DENY,
988 self.PORTS_ALL, 0))
989
990 # Apply rules
Jakub Grajciar2f8cd912020-03-27 06:55:06 +0100991 self.apply_rules(rules, "permit ip4 tcp %d" % port)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100992
993 # Traffic should still pass
994 self.run_verify_test(self.IP, self.IPV6,
995 self.proto[self.IP][self.UDP], port)
996
997 self.logger.info("ACLP_TEST_FINISH_0018")
998
999 def test_0019_udp_deny_port(self):
1000 """ deny single TCPv4/v6
1001 """
1002 self.logger.info("ACLP_TEST_START_0019")
1003
Andrew Yourtchenkoec574ff2019-10-03 07:55:52 +00001004 port = random.randint(16384, 65535)
Pavel Kotucek59dda062017-03-02 15:22:47 +01001005 # Add an ACL
1006 rules = []
1007 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1008 self.proto[self.IP][self.TCP]))
1009 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1010 self.proto[self.IP][self.TCP]))
1011 # Permit ip any any in the end
1012 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1013 self.PORTS_ALL, 0))
1014 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1015 self.PORTS_ALL, 0))
1016
1017 # Apply rules
Jakub Grajciar2f8cd912020-03-27 06:55:06 +01001018 self.apply_rules(rules, "deny ip4/ip6 udp %d" % port)
Pavel Kotucek59dda062017-03-02 15:22:47 +01001019
1020 # Traffic should not pass
1021 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1022 self.proto[self.IP][self.TCP], port)
1023
1024 self.logger.info("ACLP_TEST_FINISH_0019")
1025
1026 def test_0020_udp_deny_port(self):
1027 """ deny single UDPv4/v6
1028 """
1029 self.logger.info("ACLP_TEST_START_0020")
1030
Andrew Yourtchenkoec574ff2019-10-03 07:55:52 +00001031 port = random.randint(16384, 65535)
Pavel Kotucek59dda062017-03-02 15:22:47 +01001032 # Add an ACL
1033 rules = []
1034 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1035 self.proto[self.IP][self.UDP]))
1036 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1037 self.proto[self.IP][self.UDP]))
1038 # Permit ip any any in the end
1039 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1040 self.PORTS_ALL, 0))
1041 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1042 self.PORTS_ALL, 0))
1043
1044 # Apply rules
Jakub Grajciar2f8cd912020-03-27 06:55:06 +01001045 self.apply_rules(rules, "deny ip4/ip6 udp %d" % port)
Pavel Kotucek59dda062017-03-02 15:22:47 +01001046
1047 # Traffic should not pass
1048 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1049 self.proto[self.IP][self.UDP], port)
1050
1051 self.logger.info("ACLP_TEST_FINISH_0020")
1052
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +00001053 def test_0021_udp_deny_port_verify_fragment_deny(self):
Chris Luked0421942018-04-10 15:19:54 -04001054 """ deny single UDPv4/v6, permit ip any, verify non-initial fragment
1055 blocked
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +00001056 """
1057 self.logger.info("ACLP_TEST_START_0021")
1058
Andrew Yourtchenkoec574ff2019-10-03 07:55:52 +00001059 port = random.randint(16384, 65535)
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +00001060 # Add an ACL
1061 rules = []
1062 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1063 self.proto[self.IP][self.UDP]))
1064 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1065 self.proto[self.IP][self.UDP]))
1066 # deny ip any any in the end
1067 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1068 self.PORTS_ALL, 0))
1069 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1070 self.PORTS_ALL, 0))
1071
1072 # Apply rules
Jakub Grajciar2f8cd912020-03-27 06:55:06 +01001073 self.apply_rules(rules, "deny ip4/ip6 udp %d" % port)
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +00001074
1075 # Traffic should not pass
1076 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1077 self.proto[self.IP][self.UDP], port, True)
1078
1079 self.logger.info("ACLP_TEST_FINISH_0021")
1080
Pavel Kotuceke7b67342017-04-18 13:12:20 +02001081 def test_0022_zero_length_udp_ipv4(self):
1082 """ VPP-687 zero length udp ipv4 packet"""
1083 self.logger.info("ACLP_TEST_START_0022")
1084
Andrew Yourtchenkoec574ff2019-10-03 07:55:52 +00001085 port = random.randint(16384, 65535)
Pavel Kotuceke7b67342017-04-18 13:12:20 +02001086 # Add an ACL
1087 rules = []
1088 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
1089 self.proto[self.IP][self.UDP]))
1090 # deny ip any any in the end
1091 rules.append(
1092 self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1093
1094 # Apply rules
Jakub Grajciar2f8cd912020-03-27 06:55:06 +01001095 self.apply_rules(rules, "permit empty udp ip4 %d" % port)
Pavel Kotuceke7b67342017-04-18 13:12:20 +02001096
1097 # Traffic should still pass
1098 # Create incoming packet streams for packet-generator interfaces
1099 pkts_cnt = 0
1100 pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1101 self.IP, self.IPV4,
1102 self.proto[self.IP][self.UDP], port,
1103 False, False)
1104 if len(pkts) > 0:
1105 self.pg0.add_stream(pkts)
1106 pkts_cnt += len(pkts)
1107
1108 # Enable packet capture and start packet sendingself.IPV
1109 self.pg_enable_capture(self.pg_interfaces)
1110 self.pg_start()
1111
1112 self.pg1.get_capture(pkts_cnt)
1113
1114 self.logger.info("ACLP_TEST_FINISH_0022")
1115
1116 def test_0023_zero_length_udp_ipv6(self):
1117 """ VPP-687 zero length udp ipv6 packet"""
1118 self.logger.info("ACLP_TEST_START_0023")
1119
Andrew Yourtchenkoec574ff2019-10-03 07:55:52 +00001120 port = random.randint(16384, 65535)
Pavel Kotuceke7b67342017-04-18 13:12:20 +02001121 # Add an ACL
1122 rules = []
1123 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1124 self.proto[self.IP][self.UDP]))
1125 # deny ip any any in the end
1126 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1127
1128 # Apply rules
Jakub Grajciar2f8cd912020-03-27 06:55:06 +01001129 self.apply_rules(rules, "permit empty udp ip6 %d" % port)
Pavel Kotuceke7b67342017-04-18 13:12:20 +02001130
1131 # Traffic should still pass
1132 # Create incoming packet streams for packet-generator interfaces
1133 pkts_cnt = 0
1134 pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1135 self.IP, self.IPV6,
1136 self.proto[self.IP][self.UDP], port,
1137 False, False)
1138 if len(pkts) > 0:
1139 self.pg0.add_stream(pkts)
1140 pkts_cnt += len(pkts)
1141
1142 # Enable packet capture and start packet sendingself.IPV
1143 self.pg_enable_capture(self.pg_interfaces)
1144 self.pg_start()
1145
1146 # Verify outgoing packet streams per packet-generator interface
1147 self.pg1.get_capture(pkts_cnt)
1148
1149 self.logger.info("ACLP_TEST_FINISH_0023")
1150
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +02001151 def test_0108_tcp_permit_v4(self):
1152 """ permit TCPv4 + non-match range
1153 """
1154 self.logger.info("ACLP_TEST_START_0108")
1155
1156 # Add an ACL
1157 rules = []
1158 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
Jakub Grajciar2f8cd912020-03-27 06:55:06 +01001159 self.proto[self.IP][self.TCP]))
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +02001160 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
Jakub Grajciar2f8cd912020-03-27 06:55:06 +01001161 self.proto[self.IP][self.TCP]))
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +02001162 # deny ip any any in the end
1163 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1164
1165 # Apply rules
Jakub Grajciar2f8cd912020-03-27 06:55:06 +01001166 self.apply_rules(rules, "permit ipv4 tcp")
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +02001167
1168 # Traffic should still pass
1169 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1170
1171 self.logger.info("ACLP_TEST_FINISH_0108")
1172
1173 def test_0109_tcp_permit_v6(self):
1174 """ permit TCPv6 + non-match range
1175 """
1176 self.logger.info("ACLP_TEST_START_0109")
1177
1178 # Add an ACL
1179 rules = []
1180 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1181 self.proto[self.IP][self.TCP]))
1182 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1183 self.proto[self.IP][self.TCP]))
1184 # deny ip any any in the end
1185 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1186
1187 # Apply rules
Jakub Grajciar2f8cd912020-03-27 06:55:06 +01001188 self.apply_rules(rules, "permit ip6 tcp")
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +02001189
1190 # Traffic should still pass
1191 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
1192
1193 self.logger.info("ACLP_TEST_FINISH_0109")
1194
1195 def test_0110_udp_permit_v4(self):
1196 """ permit UDPv4 + non-match range
1197 """
1198 self.logger.info("ACLP_TEST_START_0110")
1199
1200 # Add an ACL
1201 rules = []
1202 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1203 self.proto[self.IP][self.UDP]))
1204 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1205 self.proto[self.IP][self.UDP]))
1206 # deny ip any any in the end
1207 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1208
1209 # Apply rules
Jakub Grajciar2f8cd912020-03-27 06:55:06 +01001210 self.apply_rules(rules, "permit ipv4 udp")
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +02001211
1212 # Traffic should still pass
1213 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
1214
1215 self.logger.info("ACLP_TEST_FINISH_0110")
1216
1217 def test_0111_udp_permit_v6(self):
1218 """ permit UDPv6 + non-match range
1219 """
1220 self.logger.info("ACLP_TEST_START_0111")
1221
1222 # Add an ACL
1223 rules = []
1224 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1225 self.proto[self.IP][self.UDP]))
1226 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1227 self.proto[self.IP][self.UDP]))
1228 # deny ip any any in the end
1229 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1230
1231 # Apply rules
Jakub Grajciar2f8cd912020-03-27 06:55:06 +01001232 self.apply_rules(rules, "permit ip6 udp")
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +02001233
1234 # Traffic should still pass
1235 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
1236
1237 self.logger.info("ACLP_TEST_FINISH_0111")
1238
1239 def test_0112_tcp_deny(self):
1240 """ deny TCPv4/v6 + non-match range
1241 """
1242 self.logger.info("ACLP_TEST_START_0112")
1243
1244 # Add an ACL
1245 rules = []
1246 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1247 self.PORTS_RANGE_2,
1248 self.proto[self.IP][self.TCP]))
1249 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1250 self.PORTS_RANGE_2,
1251 self.proto[self.IP][self.TCP]))
1252 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1253 self.proto[self.IP][self.TCP]))
1254 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1255 self.proto[self.IP][self.TCP]))
1256 # permit ip any any in the end
1257 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1258 self.PORTS_ALL, 0))
1259 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1260 self.PORTS_ALL, 0))
1261
1262 # Apply rules
Jakub Grajciar2f8cd912020-03-27 06:55:06 +01001263 self.apply_rules(rules, "deny ip4/ip6 tcp")
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +02001264
1265 # Traffic should not pass
1266 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1267 self.proto[self.IP][self.TCP])
1268
1269 self.logger.info("ACLP_TEST_FINISH_0112")
1270
1271 def test_0113_udp_deny(self):
1272 """ deny UDPv4/v6 + non-match range
1273 """
1274 self.logger.info("ACLP_TEST_START_0113")
1275
1276 # Add an ACL
1277 rules = []
1278 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1279 self.PORTS_RANGE_2,
1280 self.proto[self.IP][self.UDP]))
1281 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1282 self.PORTS_RANGE_2,
1283 self.proto[self.IP][self.UDP]))
1284 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1285 self.proto[self.IP][self.UDP]))
1286 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1287 self.proto[self.IP][self.UDP]))
1288 # permit ip any any in the end
1289 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1290 self.PORTS_ALL, 0))
1291 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1292 self.PORTS_ALL, 0))
1293
1294 # Apply rules
Jakub Grajciar2f8cd912020-03-27 06:55:06 +01001295 self.apply_rules(rules, "deny ip4/ip6 udp")
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +02001296
1297 # Traffic should not pass
1298 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1299 self.proto[self.IP][self.UDP])
1300
1301 self.logger.info("ACLP_TEST_FINISH_0113")
1302
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001303 def test_0300_tcp_permit_v4_etype_aaaa(self):
1304 """ permit TCPv4, send 0xAAAA etype
1305 """
1306 self.logger.info("ACLP_TEST_START_0300")
1307
1308 # Add an ACL
1309 rules = []
1310 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
Jakub Grajciar2f8cd912020-03-27 06:55:06 +01001311 self.proto[self.IP][self.TCP]))
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001312 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
Jakub Grajciar2f8cd912020-03-27 06:55:06 +01001313 self.proto[self.IP][self.TCP]))
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001314 # deny ip any any in the end
1315 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1316
1317 # Apply rules
Jakub Grajciar2f8cd912020-03-27 06:55:06 +01001318 self.apply_rules(rules, "permit ipv4 tcp")
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001319
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001320 # Traffic should still pass also for an odd ethertype
1321 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1322 0, False, True, 0xaaaa)
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001323 self.logger.info("ACLP_TEST_FINISH_0300")
1324
1325 def test_0305_tcp_permit_v4_etype_blacklist_aaaa(self):
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +02001326 """ permit TCPv4, whitelist 0x0BBB ethertype, send 0xAAAA-blocked
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001327 """
1328 self.logger.info("ACLP_TEST_START_0305")
1329
1330 # Add an ACL
1331 rules = []
1332 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
Jakub Grajciar2f8cd912020-03-27 06:55:06 +01001333 self.proto[self.IP][self.TCP]))
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001334 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
Jakub Grajciar2f8cd912020-03-27 06:55:06 +01001335 self.proto[self.IP][self.TCP]))
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001336 # deny ip any any in the end
1337 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1338
1339 # Apply rules
Jakub Grajciar2f8cd912020-03-27 06:55:06 +01001340 self.apply_rules(rules, "permit ipv4 tcp")
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001341 # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1342 self.etype_whitelist([0xbbb], 1)
1343
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001344 # The oddball ethertype should be blocked
1345 self.run_verify_negat_test(self.IP, self.IPV4,
1346 self.proto[self.IP][self.TCP],
1347 0, False, 0xaaaa)
1348
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +02001349 # remove the whitelist
Jakub Grajciar2f8cd912020-03-27 06:55:06 +01001350 self.etype_whitelist([], 0, add=False)
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +02001351
1352 self.logger.info("ACLP_TEST_FINISH_0305")
1353
1354 def test_0306_tcp_permit_v4_etype_blacklist_aaaa(self):
1355 """ permit TCPv4, whitelist 0x0BBB ethertype, send 0x0BBB - pass
1356 """
1357 self.logger.info("ACLP_TEST_START_0306")
1358
1359 # Add an ACL
1360 rules = []
1361 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
Jakub Grajciar2f8cd912020-03-27 06:55:06 +01001362 self.proto[self.IP][self.TCP]))
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +02001363 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
Jakub Grajciar2f8cd912020-03-27 06:55:06 +01001364 self.proto[self.IP][self.TCP]))
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +02001365 # deny ip any any in the end
1366 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1367
1368 # Apply rules
Jakub Grajciar2f8cd912020-03-27 06:55:06 +01001369 self.apply_rules(rules, "permit ipv4 tcp")
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +02001370 # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1371 self.etype_whitelist([0xbbb], 1)
1372
1373 # The whitelisted traffic, should pass
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001374 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1375 0, False, True, 0x0bbb)
1376
1377 # remove the whitelist, the previously blocked 0xAAAA should pass now
Jakub Grajciar2f8cd912020-03-27 06:55:06 +01001378 self.etype_whitelist([], 0, add=False)
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +02001379
1380 self.logger.info("ACLP_TEST_FINISH_0306")
1381
1382 def test_0307_tcp_permit_v4_etype_blacklist_aaaa(self):
1383 """ permit TCPv4, whitelist 0x0BBB, remove, send 0xAAAA - pass
1384 """
1385 self.logger.info("ACLP_TEST_START_0307")
1386
1387 # Add an ACL
1388 rules = []
1389 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
Jakub Grajciar2f8cd912020-03-27 06:55:06 +01001390 self.proto[self.IP][self.TCP]))
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +02001391 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
Jakub Grajciar2f8cd912020-03-27 06:55:06 +01001392 self.proto[self.IP][self.TCP]))
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +02001393 # deny ip any any in the end
1394 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1395
1396 # Apply rules
Jakub Grajciar2f8cd912020-03-27 06:55:06 +01001397 self.apply_rules(rules, "permit ipv4 tcp")
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +02001398
1399 # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1400 self.etype_whitelist([0xbbb], 1)
1401 # remove the whitelist, the previously blocked 0xAAAA should pass now
Jakub Grajciar2f8cd912020-03-27 06:55:06 +01001402 self.etype_whitelist([], 0, add=False)
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +02001403
1404 # The whitelisted traffic, should pass
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001405 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1406 0, False, True, 0xaaaa)
1407
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +02001408 self.logger.info("ACLP_TEST_FINISH_0306")
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +02001409
Andrew Yourtchenkode3682f2018-03-23 10:38:46 +01001410 def test_0315_del_intf(self):
1411 """ apply an acl and delete the interface
1412 """
1413 self.logger.info("ACLP_TEST_START_0315")
1414
1415 # Add an ACL
1416 rules = []
1417 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
Jakub Grajciar2f8cd912020-03-27 06:55:06 +01001418 self.proto[self.IP][self.TCP]))
Andrew Yourtchenkode3682f2018-03-23 10:38:46 +01001419 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
Jakub Grajciar2f8cd912020-03-27 06:55:06 +01001420 self.proto[self.IP][self.TCP]))
Andrew Yourtchenkode3682f2018-03-23 10:38:46 +01001421 # deny ip any any in the end
1422 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1423
1424 # create an interface
1425 intf = []
Klement Sekerabeaded52018-06-24 10:30:37 +02001426 intf.append(VppLoInterface(self))
Andrew Yourtchenkode3682f2018-03-23 10:38:46 +01001427
1428 # Apply rules
Jakub Grajciar2f8cd912020-03-27 06:55:06 +01001429 self.apply_rules_to(rules, "permit ipv4 tcp", intf[0].sw_if_index)
Andrew Yourtchenkode3682f2018-03-23 10:38:46 +01001430
1431 # Remove the interface
1432 intf[0].remove_vpp_config()
1433
1434 self.logger.info("ACLP_TEST_FINISH_0315")
1435
Jakub Grajciar2f8cd912020-03-27 06:55:06 +01001436
Pavel Kotucek59dda062017-03-02 15:22:47 +01001437if __name__ == '__main__':
1438 unittest.main(testRunner=VppTestRunner)