blob: 1ca74d186bcd253669cc2eaa433079f986a406ab [file] [log] [blame]
Pavel Kotucek59dda062017-03-02 15:22:47 +01001#!/usr/bin/env python
2"""ACL plugin Test Case HLD:
3"""
4
5import unittest
6import random
7
8from scapy.packet import Raw
9from scapy.layers.l2 import Ether
10from scapy.layers.inet import IP, TCP, UDP, ICMP
11from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +000012from scapy.layers.inet6 import IPv6ExtHdrFragment
Pavel Kotucek59dda062017-03-02 15:22:47 +010013from framework import VppTestCase, VppTestRunner
14from util import Host, ppp
15
Andrew Yourtchenkode3682f2018-03-23 10:38:46 +010016from vpp_lo_interface import VppLoInterface
17
Pavel Kotucek59dda062017-03-02 15:22:47 +010018
19class TestACLplugin(VppTestCase):
20 """ ACL plugin Test Case """
21
22 # traffic types
23 IP = 0
24 ICMP = 1
25
26 # IP version
27 IPRANDOM = -1
28 IPV4 = 0
29 IPV6 = 1
30
31 # rule types
32 DENY = 0
33 PERMIT = 1
34
35 # supported protocols
36 proto = [[6, 17], [1, 58]]
37 proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
38 ICMPv4 = 0
39 ICMPv6 = 1
40 TCP = 0
41 UDP = 1
42 PROTO_ALL = 0
43
44 # port ranges
45 PORTS_ALL = -1
46 PORTS_RANGE = 0
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +020047 PORTS_RANGE_2 = 1
Pavel Kotucek59dda062017-03-02 15:22:47 +010048 udp_sport_from = 10
49 udp_sport_to = udp_sport_from + 5
50 udp_dport_from = 20000
51 udp_dport_to = udp_dport_from + 5000
52 tcp_sport_from = 30
53 tcp_sport_to = tcp_sport_from + 5
54 tcp_dport_from = 40000
55 tcp_dport_to = tcp_dport_from + 5000
56
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +020057 udp_sport_from_2 = 90
58 udp_sport_to_2 = udp_sport_from_2 + 5
59 udp_dport_from_2 = 30000
60 udp_dport_to_2 = udp_dport_from_2 + 5000
61 tcp_sport_from_2 = 130
62 tcp_sport_to_2 = tcp_sport_from_2 + 5
63 tcp_dport_from_2 = 20000
64 tcp_dport_to_2 = tcp_dport_from_2 + 5000
65
Pavel Kotucek59dda062017-03-02 15:22:47 +010066 icmp4_type = 8 # echo request
67 icmp4_code = 3
68 icmp6_type = 128 # echo request
69 icmp6_code = 3
70
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +020071 icmp4_type_2 = 8
72 icmp4_code_from_2 = 5
73 icmp4_code_to_2 = 20
74 icmp6_type_2 = 128
75 icmp6_code_from_2 = 8
76 icmp6_code_to_2 = 42
77
Pavel Kotucek59dda062017-03-02 15:22:47 +010078 # Test variables
79 bd_id = 1
80
81 @classmethod
82 def setUpClass(cls):
83 """
84 Perform standard class setup (defined by class method setUpClass in
85 class VppTestCase) before running the test case, set test case related
86 variables and configure VPP.
87 """
88 super(TestACLplugin, cls).setUpClass()
89
Pavel Kotucek59dda062017-03-02 15:22:47 +010090 try:
91 # Create 2 pg interfaces
92 cls.create_pg_interfaces(range(2))
93
94 # Packet flows mapping pg0 -> pg1, pg2 etc.
95 cls.flows = dict()
96 cls.flows[cls.pg0] = [cls.pg1]
97
98 # Packet sizes
99 cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
100
101 # Create BD with MAC learning and unknown unicast flooding disabled
102 # and put interfaces to this BD
103 cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
104 learn=1)
105 for pg_if in cls.pg_interfaces:
Ole Troana5b2eec2019-03-11 19:23:25 +0100106 cls.vapi.sw_interface_set_l2_bridge(
107 rx_sw_if_index=pg_if.sw_if_index, bd_id=cls.bd_id)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100108
109 # Set up all interfaces
110 for i in cls.pg_interfaces:
111 i.admin_up()
112
113 # Mapping between packet-generator index and lists of test hosts
114 cls.hosts_by_pg_idx = dict()
115 for pg_if in cls.pg_interfaces:
116 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
117
118 # Create list of deleted hosts
119 cls.deleted_hosts_by_pg_idx = dict()
120 for pg_if in cls.pg_interfaces:
121 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
122
123 # warm-up the mac address tables
124 # self.warmup_test()
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +0200125 count = 16
126 start = 0
127 n_int = len(cls.pg_interfaces)
128 macs_per_if = count / n_int
129 i = -1
130 for pg_if in cls.pg_interfaces:
131 i += 1
132 start_nr = macs_per_if * i + start
133 end_nr = count + start if i == (n_int - 1) \
134 else macs_per_if * (i + 1) + start
135 hosts = cls.hosts_by_pg_idx[pg_if.sw_if_index]
136 for j in range(start_nr, end_nr):
137 host = Host(
138 "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
139 "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
140 "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
141 hosts.append(host)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100142
143 except Exception:
144 super(TestACLplugin, cls).tearDownClass()
145 raise
146
Paul Vinciguerra7f9b7f92019-03-12 19:23:27 -0700147 @classmethod
148 def tearDownClass(cls):
149 super(TestACLplugin, cls).tearDownClass()
150
Pavel Kotucek59dda062017-03-02 15:22:47 +0100151 def setUp(self):
152 super(TestACLplugin, self).setUp()
153 self.reset_packet_infos()
154
155 def tearDown(self):
156 """
157 Show various debug prints after each test.
158 """
159 super(TestACLplugin, self).tearDown()
Paul Vinciguerra90cf21b2019-03-13 09:23:05 -0700160
161 def show_commands_at_teardown(self):
162 cli = "show vlib graph l2-input-feat-arc"
163 self.logger.info(self.vapi.ppcli(cli))
164 cli = "show vlib graph l2-input-feat-arc-end"
165 self.logger.info(self.vapi.ppcli(cli))
166 cli = "show vlib graph l2-output-feat-arc"
167 self.logger.info(self.vapi.ppcli(cli))
168 cli = "show vlib graph l2-output-feat-arc-end"
169 self.logger.info(self.vapi.ppcli(cli))
170 self.logger.info(self.vapi.ppcli("show l2fib verbose"))
171 self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
172 self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
173 self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
174 self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
175 % self.bd_id))
Pavel Kotucek59dda062017-03-02 15:22:47 +0100176
Pavel Kotucek59dda062017-03-02 15:22:47 +0100177 def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
178 s_prefix=0, s_ip='\x00\x00\x00\x00',
179 d_prefix=0, d_ip='\x00\x00\x00\x00'):
180 if proto == -1:
181 return
182 if ports == self.PORTS_ALL:
183 sport_from = 0
184 dport_from = 0
185 sport_to = 65535 if proto != 1 and proto != 58 else 255
186 dport_to = sport_to
187 elif ports == self.PORTS_RANGE:
188 if proto == 1:
189 sport_from = self.icmp4_type
190 sport_to = self.icmp4_type
191 dport_from = self.icmp4_code
192 dport_to = self.icmp4_code
193 elif proto == 58:
194 sport_from = self.icmp6_type
195 sport_to = self.icmp6_type
196 dport_from = self.icmp6_code
197 dport_to = self.icmp6_code
198 elif proto == self.proto[self.IP][self.TCP]:
199 sport_from = self.tcp_sport_from
200 sport_to = self.tcp_sport_to
201 dport_from = self.tcp_dport_from
202 dport_to = self.tcp_dport_to
203 elif proto == self.proto[self.IP][self.UDP]:
204 sport_from = self.udp_sport_from
205 sport_to = self.udp_sport_to
206 dport_from = self.udp_dport_from
207 dport_to = self.udp_dport_to
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +0200208 elif ports == self.PORTS_RANGE_2:
209 if proto == 1:
210 sport_from = self.icmp4_type_2
211 sport_to = self.icmp4_type_2
212 dport_from = self.icmp4_code_from_2
213 dport_to = self.icmp4_code_to_2
214 elif proto == 58:
215 sport_from = self.icmp6_type_2
216 sport_to = self.icmp6_type_2
217 dport_from = self.icmp6_code_from_2
218 dport_to = self.icmp6_code_to_2
219 elif proto == self.proto[self.IP][self.TCP]:
220 sport_from = self.tcp_sport_from_2
221 sport_to = self.tcp_sport_to_2
222 dport_from = self.tcp_dport_from_2
223 dport_to = self.tcp_dport_to_2
224 elif proto == self.proto[self.IP][self.UDP]:
225 sport_from = self.udp_sport_from_2
226 sport_to = self.udp_sport_to_2
227 dport_from = self.udp_dport_from_2
228 dport_to = self.udp_dport_to_2
Pavel Kotucek59dda062017-03-02 15:22:47 +0100229 else:
230 sport_from = ports
231 sport_to = ports
232 dport_from = ports
233 dport_to = ports
234
235 rule = ({'is_permit': permit_deny, 'is_ipv6': ip, 'proto': proto,
236 'srcport_or_icmptype_first': sport_from,
237 'srcport_or_icmptype_last': sport_to,
238 'src_ip_prefix_len': s_prefix,
239 'src_ip_addr': s_ip,
240 'dstport_or_icmpcode_first': dport_from,
241 'dstport_or_icmpcode_last': dport_to,
242 'dst_ip_prefix_len': d_prefix,
243 'dst_ip_addr': d_ip})
244 return rule
245
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800246 def apply_rules(self, rules, tag=b''):
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200247 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
248 tag=tag)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100249 self.logger.info("Dumped ACL: " + str(
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200250 self.vapi.acl_dump(reply.acl_index)))
Pavel Kotucek59dda062017-03-02 15:22:47 +0100251 # Apply a ACL on the interface as inbound
252 for i in self.pg_interfaces:
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200253 self.vapi.acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
254 n_input=1,
255 acls=[reply.acl_index])
Pavel Kotucek59dda062017-03-02 15:22:47 +0100256 return
257
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800258 def apply_rules_to(self, rules, tag=b'', sw_if_index=0xFFFFFFFF):
Andrew Yourtchenkode3682f2018-03-23 10:38:46 +0100259 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
260 tag=tag)
261 self.logger.info("Dumped ACL: " + str(
262 self.vapi.acl_dump(reply.acl_index)))
263 # Apply a ACL on the interface as inbound
264 self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index,
265 n_input=1,
266 acls=[reply.acl_index])
267 return
268
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100269 def etype_whitelist(self, whitelist, n_input):
270 # Apply whitelists on all the interfaces
271 for i in self.pg_interfaces:
272 # checkstyle can't read long names. Help them.
273 fun = self.vapi.acl_interface_set_etype_whitelist
274 fun(sw_if_index=i.sw_if_index, n_input=n_input,
275 whitelist=whitelist)
276 return
277
Pavel Kotucek59dda062017-03-02 15:22:47 +0100278 def create_upper_layer(self, packet_index, proto, ports=0):
279 p = self.proto_map[proto]
280 if p == 'UDP':
281 if ports == 0:
282 return UDP(sport=random.randint(self.udp_sport_from,
283 self.udp_sport_to),
284 dport=random.randint(self.udp_dport_from,
285 self.udp_dport_to))
286 else:
287 return UDP(sport=ports, dport=ports)
288 elif p == 'TCP':
289 if ports == 0:
290 return TCP(sport=random.randint(self.tcp_sport_from,
291 self.tcp_sport_to),
292 dport=random.randint(self.tcp_dport_from,
293 self.tcp_dport_to))
294 else:
295 return TCP(sport=ports, dport=ports)
296 return ''
297
298 def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100299 proto=-1, ports=0, fragments=False,
300 pkt_raw=True, etype=-1):
Pavel Kotucek59dda062017-03-02 15:22:47 +0100301 """
302 Create input packet stream for defined interface using hosts or
303 deleted_hosts list.
304
305 :param object src_if: Interface to create packet stream for.
306 :param list packet_sizes: List of required packet sizes.
307 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
308 :return: Stream of packets.
309 """
310 pkts = []
311 if self.flows.__contains__(src_if):
312 src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
313 for dst_if in self.flows[src_if]:
314 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
315 n_int = len(dst_hosts) * len(src_hosts)
316 for i in range(0, n_int):
317 dst_host = dst_hosts[i / len(src_hosts)]
318 src_host = src_hosts[i % len(src_hosts)]
319 pkt_info = self.create_packet_info(src_if, dst_if)
320 if ipv6 == 1:
321 pkt_info.ip = 1
322 elif ipv6 == 0:
323 pkt_info.ip = 0
324 else:
325 pkt_info.ip = random.choice([0, 1])
326 if proto == -1:
327 pkt_info.proto = random.choice(self.proto[self.IP])
328 else:
329 pkt_info.proto = proto
330 payload = self.info_to_payload(pkt_info)
331 p = Ether(dst=dst_host.mac, src=src_host.mac)
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100332 if etype > 0:
333 p = Ether(dst=dst_host.mac,
334 src=src_host.mac,
335 type=etype)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100336 if pkt_info.ip:
337 p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000338 if fragments:
339 p /= IPv6ExtHdrFragment(offset=64, m=1)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100340 else:
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000341 if fragments:
342 p /= IP(src=src_host.ip4, dst=dst_host.ip4,
343 flags=1, frag=64)
344 else:
345 p /= IP(src=src_host.ip4, dst=dst_host.ip4)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100346 if traffic_type == self.ICMP:
347 if pkt_info.ip:
348 p /= ICMPv6EchoRequest(type=self.icmp6_type,
349 code=self.icmp6_code)
350 else:
351 p /= ICMP(type=self.icmp4_type,
352 code=self.icmp4_code)
353 else:
354 p /= self.create_upper_layer(i, pkt_info.proto, ports)
Pavel Kotuceke7b67342017-04-18 13:12:20 +0200355 if pkt_raw:
356 p /= Raw(payload)
357 pkt_info.data = p.copy()
358 if pkt_raw:
359 size = random.choice(packet_sizes)
360 self.extend_packet(p, size)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100361 pkts.append(p)
362 return pkts
363
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100364 def verify_capture(self, pg_if, capture,
365 traffic_type=0, ip_type=0, etype=-1):
Pavel Kotucek59dda062017-03-02 15:22:47 +0100366 """
367 Verify captured input packet stream for defined interface.
368
369 :param object pg_if: Interface to verify captured packet stream for.
370 :param list capture: Captured packet stream.
371 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
372 """
373 last_info = dict()
374 for i in self.pg_interfaces:
375 last_info[i.sw_if_index] = None
376 dst_sw_if_index = pg_if.sw_if_index
377 for packet in capture:
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100378 if etype > 0:
379 if packet[Ether].type != etype:
380 self.logger.error(ppp("Unexpected ethertype in packet:",
381 packet))
382 else:
383 continue
Pavel Kotucek59dda062017-03-02 15:22:47 +0100384 try:
385 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
386 if traffic_type == self.ICMP and ip_type == self.IPV6:
387 payload_info = self.payload_to_info(
Paul Vinciguerraeaea4212019-03-06 11:58:06 -0800388 packet[ICMPv6EchoRequest], 'data')
Pavel Kotucek59dda062017-03-02 15:22:47 +0100389 payload = packet[ICMPv6EchoRequest]
390 else:
Paul Vinciguerraeaea4212019-03-06 11:58:06 -0800391 payload_info = self.payload_to_info(packet[Raw])
Pavel Kotucek59dda062017-03-02 15:22:47 +0100392 payload = packet[self.proto_map[payload_info.proto]]
393 except:
394 self.logger.error(ppp("Unexpected or invalid packet "
395 "(outside network):", packet))
396 raise
397
398 if ip_type != 0:
399 self.assertEqual(payload_info.ip, ip_type)
400 if traffic_type == self.ICMP:
401 try:
402 if payload_info.ip == 0:
403 self.assertEqual(payload.type, self.icmp4_type)
404 self.assertEqual(payload.code, self.icmp4_code)
405 else:
406 self.assertEqual(payload.type, self.icmp6_type)
407 self.assertEqual(payload.code, self.icmp6_code)
408 except:
409 self.logger.error(ppp("Unexpected or invalid packet "
410 "(outside network):", packet))
411 raise
412 else:
413 try:
414 ip_version = IPv6 if payload_info.ip == 1 else IP
415
416 ip = packet[ip_version]
417 packet_index = payload_info.index
418
419 self.assertEqual(payload_info.dst, dst_sw_if_index)
420 self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
421 (pg_if.name, payload_info.src,
422 packet_index))
423 next_info = self.get_next_packet_info_for_interface2(
424 payload_info.src, dst_sw_if_index,
425 last_info[payload_info.src])
426 last_info[payload_info.src] = next_info
427 self.assertTrue(next_info is not None)
428 self.assertEqual(packet_index, next_info.index)
429 saved_packet = next_info.data
430 # Check standard fields
431 self.assertEqual(ip.src, saved_packet[ip_version].src)
432 self.assertEqual(ip.dst, saved_packet[ip_version].dst)
433 p = self.proto_map[payload_info.proto]
434 if p == 'TCP':
435 tcp = packet[TCP]
436 self.assertEqual(tcp.sport, saved_packet[
437 TCP].sport)
438 self.assertEqual(tcp.dport, saved_packet[
439 TCP].dport)
440 elif p == 'UDP':
441 udp = packet[UDP]
442 self.assertEqual(udp.sport, saved_packet[
443 UDP].sport)
444 self.assertEqual(udp.dport, saved_packet[
445 UDP].dport)
446 except:
447 self.logger.error(ppp("Unexpected or invalid packet:",
448 packet))
449 raise
450 for i in self.pg_interfaces:
451 remaining_packet = self.get_next_packet_info_for_interface2(
452 i, dst_sw_if_index, last_info[i.sw_if_index])
453 self.assertTrue(
454 remaining_packet is None,
455 "Port %u: Packet expected from source %u didn't arrive" %
456 (dst_sw_if_index, i.sw_if_index))
457
458 def run_traffic_no_check(self):
459 # Test
460 # Create incoming packet streams for packet-generator interfaces
461 for i in self.pg_interfaces:
462 if self.flows.__contains__(i):
463 pkts = self.create_stream(i, self.pg_if_packet_sizes)
464 if len(pkts) > 0:
465 i.add_stream(pkts)
466
467 # Enable packet capture and start packet sending
468 self.pg_enable_capture(self.pg_interfaces)
469 self.pg_start()
470
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000471 def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100472 frags=False, pkt_raw=True, etype=-1):
Pavel Kotucek59dda062017-03-02 15:22:47 +0100473 # Test
474 # Create incoming packet streams for packet-generator interfaces
475 pkts_cnt = 0
476 for i in self.pg_interfaces:
477 if self.flows.__contains__(i):
478 pkts = self.create_stream(i, self.pg_if_packet_sizes,
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000479 traffic_type, ip_type, proto, ports,
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100480 frags, pkt_raw, etype)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100481 if len(pkts) > 0:
482 i.add_stream(pkts)
483 pkts_cnt += len(pkts)
484
485 # Enable packet capture and start packet sendingself.IPV
486 self.pg_enable_capture(self.pg_interfaces)
487 self.pg_start()
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +0200488 self.logger.info("sent packets count: %d" % pkts_cnt)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100489
490 # Verify
491 # Verify outgoing packet streams per packet-generator interface
492 for src_if in self.pg_interfaces:
493 if self.flows.__contains__(src_if):
494 for dst_if in self.flows[src_if]:
495 capture = dst_if.get_capture(pkts_cnt)
496 self.logger.info("Verifying capture on interface %s" %
497 dst_if.name)
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100498 self.verify_capture(dst_if, capture,
499 traffic_type, ip_type, etype)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100500
501 def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100502 ports=0, frags=False, etype=-1):
Pavel Kotucek59dda062017-03-02 15:22:47 +0100503 # Test
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +0200504 pkts_cnt = 0
Pavel Kotucek59dda062017-03-02 15:22:47 +0100505 self.reset_packet_infos()
506 for i in self.pg_interfaces:
507 if self.flows.__contains__(i):
508 pkts = self.create_stream(i, self.pg_if_packet_sizes,
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000509 traffic_type, ip_type, proto, ports,
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100510 frags, True, etype)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100511 if len(pkts) > 0:
512 i.add_stream(pkts)
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +0200513 pkts_cnt += len(pkts)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100514
515 # Enable packet capture and start packet sending
516 self.pg_enable_capture(self.pg_interfaces)
517 self.pg_start()
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +0200518 self.logger.info("sent packets count: %d" % pkts_cnt)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100519
520 # Verify
521 # Verify outgoing packet streams per packet-generator interface
522 for src_if in self.pg_interfaces:
523 if self.flows.__contains__(src_if):
524 for dst_if in self.flows[src_if]:
525 self.logger.info("Verifying capture on interface %s" %
526 dst_if.name)
527 capture = dst_if.get_capture(0)
528 self.assertEqual(len(capture), 0)
529
Pavel Kotucek59dda062017-03-02 15:22:47 +0100530 def test_0000_warmup_test(self):
531 """ ACL plugin version check; learn MACs
532 """
Pavel Kotucek59dda062017-03-02 15:22:47 +0100533 reply = self.vapi.papi.acl_plugin_get_version()
534 self.assertEqual(reply.major, 1)
535 self.logger.info("Working with ACL plugin version: %d.%d" % (
536 reply.major, reply.minor))
537 # minor version changes are non breaking
538 # self.assertEqual(reply.minor, 0)
539
540 def test_0001_acl_create(self):
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200541 """ ACL create/delete test
Pavel Kotucek59dda062017-03-02 15:22:47 +0100542 """
543
544 self.logger.info("ACLP_TEST_START_0001")
545 # Add an ACL
546 r = [{'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
547 'srcport_or_icmptype_first': 1234,
548 'srcport_or_icmptype_last': 1235,
549 'src_ip_prefix_len': 0,
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800550 'src_ip_addr': b'\x00\x00\x00\x00',
Pavel Kotucek59dda062017-03-02 15:22:47 +0100551 'dstport_or_icmpcode_first': 1234,
552 'dstport_or_icmpcode_last': 1234,
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800553 'dst_ip_addr': b'\x00\x00\x00\x00',
Pavel Kotucek59dda062017-03-02 15:22:47 +0100554 'dst_ip_prefix_len': 0}]
555 # Test 1: add a new ACL
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200556 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r,
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800557 tag=b"permit 1234")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100558 self.assertEqual(reply.retval, 0)
559 # The very first ACL gets #0
560 self.assertEqual(reply.acl_index, 0)
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200561 first_acl = reply.acl_index
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200562 rr = self.vapi.acl_dump(reply.acl_index)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100563 self.logger.info("Dumped ACL: " + str(rr))
564 self.assertEqual(len(rr), 1)
565 # We should have the same number of ACL entries as we had asked
566 self.assertEqual(len(rr[0].r), len(r))
567 # The rules should be the same. But because the submitted and returned
568 # are different types, we need to iterate over rules and keys to get
569 # to basic values.
570 for i_rule in range(0, len(r) - 1):
571 for rule_key in r[i_rule]:
572 self.assertEqual(rr[0].r[i_rule][rule_key],
573 r[i_rule][rule_key])
574
575 # Add a deny-1234 ACL
Ole Troan895b6e82017-10-20 13:28:20 +0200576 r_deny = [{'is_permit': 0, 'is_ipv6': 0, 'proto': 17,
Pavel Kotucek59dda062017-03-02 15:22:47 +0100577 'srcport_or_icmptype_first': 1234,
578 'srcport_or_icmptype_last': 1235,
579 'src_ip_prefix_len': 0,
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800580 'src_ip_addr': b'\x00\x00\x00\x00',
Pavel Kotucek59dda062017-03-02 15:22:47 +0100581 'dstport_or_icmpcode_first': 1234,
582 'dstport_or_icmpcode_last': 1234,
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800583 'dst_ip_addr': b'\x00\x00\x00\x00',
Pavel Kotucek59dda062017-03-02 15:22:47 +0100584 'dst_ip_prefix_len': 0},
585 {'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
586 'srcport_or_icmptype_first': 0,
587 'srcport_or_icmptype_last': 0,
588 'src_ip_prefix_len': 0,
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800589 'src_ip_addr': b'\x00\x00\x00\x00',
Pavel Kotucek59dda062017-03-02 15:22:47 +0100590 'dstport_or_icmpcode_first': 0,
591 'dstport_or_icmpcode_last': 0,
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800592 'dst_ip_addr': b'\x00\x00\x00\x00',
Ole Troan895b6e82017-10-20 13:28:20 +0200593 'dst_ip_prefix_len': 0}]
Pavel Kotucek59dda062017-03-02 15:22:47 +0100594
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200595 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_deny,
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800596 tag=b"deny 1234;permit all")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100597 self.assertEqual(reply.retval, 0)
598 # The second ACL gets #1
599 self.assertEqual(reply.acl_index, 1)
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200600 second_acl = reply.acl_index
Pavel Kotucek59dda062017-03-02 15:22:47 +0100601
602 # Test 2: try to modify a nonexistent ACL
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200603 reply = self.vapi.acl_add_replace(acl_index=432, r=r,
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800604 tag=b"FFFF:FFFF", expected_retval=-6)
Jon Loeliger27cadd22017-11-10 13:15:12 -0600605 self.assertEqual(reply.retval, -6)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100606 # The ACL number should pass through
607 self.assertEqual(reply.acl_index, 432)
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200608 # apply an ACL on an interface inbound, try to delete ACL, must fail
609 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
610 n_input=1,
611 acls=[first_acl])
Jon Loeliger27cadd22017-11-10 13:15:12 -0600612 reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=-142)
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200613 # Unapply an ACL and then try to delete it - must be ok
614 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
615 n_input=0,
616 acls=[])
617 reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=0)
618
619 # apply an ACL on an interface outbound, try to delete ACL, must fail
620 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
621 n_input=0,
622 acls=[second_acl])
Jon Loeliger27cadd22017-11-10 13:15:12 -0600623 reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=-143)
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200624 # Unapply the ACL and then try to delete it - must be ok
625 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
626 n_input=0,
627 acls=[])
628 reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=0)
629
630 # try to apply a nonexistent ACL - must fail
631 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
632 n_input=1,
633 acls=[first_acl],
Jon Loeliger27cadd22017-11-10 13:15:12 -0600634 expected_retval=-6)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100635
636 self.logger.info("ACLP_TEST_FINISH_0001")
637
638 def test_0002_acl_permit_apply(self):
639 """ permit ACL apply test
640 """
641 self.logger.info("ACLP_TEST_START_0002")
642
643 rules = []
644 rules.append(self.create_rule(self.IPV4, self.PERMIT,
645 0, self.proto[self.IP][self.UDP]))
646 rules.append(self.create_rule(self.IPV4, self.PERMIT,
647 0, self.proto[self.IP][self.TCP]))
648
649 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800650 self.apply_rules(rules, b"permit per-flow")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100651
652 # Traffic should still pass
653 self.run_verify_test(self.IP, self.IPV4, -1)
654 self.logger.info("ACLP_TEST_FINISH_0002")
655
656 def test_0003_acl_deny_apply(self):
657 """ deny ACL apply test
658 """
659 self.logger.info("ACLP_TEST_START_0003")
660 # Add a deny-flows ACL
661 rules = []
662 rules.append(self.create_rule(self.IPV4, self.DENY,
663 self.PORTS_ALL, self.proto[self.IP][self.UDP]))
664 # Permit ip any any in the end
665 rules.append(self.create_rule(self.IPV4, self.PERMIT,
666 self.PORTS_ALL, 0))
667
668 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800669 self.apply_rules(rules, b"deny per-flow;permit all")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100670
671 # Traffic should not pass
672 self.run_verify_negat_test(self.IP, self.IPV4,
673 self.proto[self.IP][self.UDP])
674 self.logger.info("ACLP_TEST_FINISH_0003")
675 # self.assertEqual(1, 0)
676
677 def test_0004_vpp624_permit_icmpv4(self):
678 """ VPP_624 permit ICMPv4
679 """
680 self.logger.info("ACLP_TEST_START_0004")
681
682 # Add an ACL
683 rules = []
684 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
685 self.proto[self.ICMP][self.ICMPv4]))
686 # deny ip any any in the end
687 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
688
689 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800690 self.apply_rules(rules, b"permit icmpv4")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100691
692 # Traffic should still pass
693 self.run_verify_test(self.ICMP, self.IPV4,
694 self.proto[self.ICMP][self.ICMPv4])
695
696 self.logger.info("ACLP_TEST_FINISH_0004")
697
698 def test_0005_vpp624_permit_icmpv6(self):
699 """ VPP_624 permit ICMPv6
700 """
701 self.logger.info("ACLP_TEST_START_0005")
702
703 # Add an ACL
704 rules = []
705 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
706 self.proto[self.ICMP][self.ICMPv6]))
707 # deny ip any any in the end
708 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
709
710 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800711 self.apply_rules(rules, b"permit icmpv6")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100712
713 # Traffic should still pass
714 self.run_verify_test(self.ICMP, self.IPV6,
715 self.proto[self.ICMP][self.ICMPv6])
716
717 self.logger.info("ACLP_TEST_FINISH_0005")
718
719 def test_0006_vpp624_deny_icmpv4(self):
720 """ VPP_624 deny ICMPv4
721 """
722 self.logger.info("ACLP_TEST_START_0006")
723 # Add an ACL
724 rules = []
725 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
726 self.proto[self.ICMP][self.ICMPv4]))
727 # permit ip any any in the end
728 rules.append(self.create_rule(self.IPV4, self.PERMIT,
729 self.PORTS_ALL, 0))
730
731 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800732 self.apply_rules(rules, b"deny icmpv4")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100733
734 # Traffic should not pass
735 self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
736
737 self.logger.info("ACLP_TEST_FINISH_0006")
738
739 def test_0007_vpp624_deny_icmpv6(self):
740 """ VPP_624 deny ICMPv6
741 """
742 self.logger.info("ACLP_TEST_START_0007")
743 # Add an ACL
744 rules = []
745 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
746 self.proto[self.ICMP][self.ICMPv6]))
747 # deny ip any any in the end
748 rules.append(self.create_rule(self.IPV6, self.PERMIT,
749 self.PORTS_ALL, 0))
750
751 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800752 self.apply_rules(rules, b"deny icmpv6")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100753
754 # Traffic should not pass
755 self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
756
757 self.logger.info("ACLP_TEST_FINISH_0007")
758
759 def test_0008_tcp_permit_v4(self):
760 """ permit TCPv4
761 """
762 self.logger.info("ACLP_TEST_START_0008")
763
764 # Add an ACL
765 rules = []
766 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
767 self.proto[self.IP][self.TCP]))
768 # deny ip any any in the end
769 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
770
771 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800772 self.apply_rules(rules, b"permit ipv4 tcp")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100773
774 # Traffic should still pass
775 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
776
777 self.logger.info("ACLP_TEST_FINISH_0008")
778
779 def test_0009_tcp_permit_v6(self):
780 """ permit TCPv6
781 """
782 self.logger.info("ACLP_TEST_START_0009")
783
784 # Add an ACL
785 rules = []
786 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
787 self.proto[self.IP][self.TCP]))
788 # deny ip any any in the end
789 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
790
791 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800792 self.apply_rules(rules, b"permit ip6 tcp")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100793
794 # Traffic should still pass
795 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
796
797 self.logger.info("ACLP_TEST_FINISH_0008")
798
799 def test_0010_udp_permit_v4(self):
800 """ permit UDPv4
801 """
802 self.logger.info("ACLP_TEST_START_0010")
803
804 # Add an ACL
805 rules = []
806 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
807 self.proto[self.IP][self.UDP]))
808 # deny ip any any in the end
809 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
810
811 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800812 self.apply_rules(rules, b"permit ipv udp")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100813
814 # Traffic should still pass
815 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
816
817 self.logger.info("ACLP_TEST_FINISH_0010")
818
819 def test_0011_udp_permit_v6(self):
820 """ permit UDPv6
821 """
822 self.logger.info("ACLP_TEST_START_0011")
823
824 # Add an ACL
825 rules = []
826 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
827 self.proto[self.IP][self.UDP]))
828 # deny ip any any in the end
829 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
830
831 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800832 self.apply_rules(rules, b"permit ip6 udp")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100833
834 # Traffic should still pass
835 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
836
837 self.logger.info("ACLP_TEST_FINISH_0011")
838
839 def test_0012_tcp_deny(self):
840 """ deny TCPv4/v6
841 """
842 self.logger.info("ACLP_TEST_START_0012")
843
844 # Add an ACL
845 rules = []
846 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
847 self.proto[self.IP][self.TCP]))
848 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
849 self.proto[self.IP][self.TCP]))
850 # permit ip any any in the end
851 rules.append(self.create_rule(self.IPV4, self.PERMIT,
852 self.PORTS_ALL, 0))
853 rules.append(self.create_rule(self.IPV6, self.PERMIT,
854 self.PORTS_ALL, 0))
855
856 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800857 self.apply_rules(rules, b"deny ip4/ip6 tcp")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100858
859 # Traffic should not pass
860 self.run_verify_negat_test(self.IP, self.IPRANDOM,
861 self.proto[self.IP][self.TCP])
862
863 self.logger.info("ACLP_TEST_FINISH_0012")
864
865 def test_0013_udp_deny(self):
866 """ deny UDPv4/v6
867 """
868 self.logger.info("ACLP_TEST_START_0013")
869
870 # Add an ACL
871 rules = []
872 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
873 self.proto[self.IP][self.UDP]))
874 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
875 self.proto[self.IP][self.UDP]))
876 # permit ip any any in the end
877 rules.append(self.create_rule(self.IPV4, self.PERMIT,
878 self.PORTS_ALL, 0))
879 rules.append(self.create_rule(self.IPV6, self.PERMIT,
880 self.PORTS_ALL, 0))
881
882 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800883 self.apply_rules(rules, b"deny ip4/ip6 udp")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100884
885 # Traffic should not pass
886 self.run_verify_negat_test(self.IP, self.IPRANDOM,
887 self.proto[self.IP][self.UDP])
888
889 self.logger.info("ACLP_TEST_FINISH_0013")
890
891 def test_0014_acl_dump(self):
892 """ verify add/dump acls
893 """
894 self.logger.info("ACLP_TEST_START_0014")
895
896 r = [[self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
897 [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
898 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
899 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
900 [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
901 [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
902 [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
903 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
904 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
905 [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
906 [self.IPV4, self.DENY, self.PORTS_ALL, 0],
907 [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
908 [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
909 [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
910 [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
911 [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
912 [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
913 [self.IPV6, self.DENY, self.PORTS_ALL, 0]
914 ]
915
916 # Add and verify new ACLs
917 rules = []
918 for i in range(len(r)):
919 rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
920
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200921 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules)
922 result = self.vapi.acl_dump(reply.acl_index)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100923
924 i = 0
925 for drules in result:
926 for dr in drules.r:
927 self.assertEqual(dr.is_ipv6, r[i][0])
928 self.assertEqual(dr.is_permit, r[i][1])
929 self.assertEqual(dr.proto, r[i][3])
930
931 if r[i][2] > 0:
932 self.assertEqual(dr.srcport_or_icmptype_first, r[i][2])
933 else:
934 if r[i][2] < 0:
935 self.assertEqual(dr.srcport_or_icmptype_first, 0)
936 self.assertEqual(dr.srcport_or_icmptype_last, 65535)
937 else:
938 if dr.proto == self.proto[self.IP][self.TCP]:
939 self.assertGreater(dr.srcport_or_icmptype_first,
940 self.tcp_sport_from-1)
941 self.assertLess(dr.srcport_or_icmptype_first,
942 self.tcp_sport_to+1)
943 self.assertGreater(dr.dstport_or_icmpcode_last,
944 self.tcp_dport_from-1)
945 self.assertLess(dr.dstport_or_icmpcode_last,
946 self.tcp_dport_to+1)
947 elif dr.proto == self.proto[self.IP][self.UDP]:
948 self.assertGreater(dr.srcport_or_icmptype_first,
949 self.udp_sport_from-1)
950 self.assertLess(dr.srcport_or_icmptype_first,
951 self.udp_sport_to+1)
952 self.assertGreater(dr.dstport_or_icmpcode_last,
953 self.udp_dport_from-1)
954 self.assertLess(dr.dstport_or_icmpcode_last,
955 self.udp_dport_to+1)
956 i += 1
957
958 self.logger.info("ACLP_TEST_FINISH_0014")
959
960 def test_0015_tcp_permit_port_v4(self):
961 """ permit single TCPv4
962 """
963 self.logger.info("ACLP_TEST_START_0015")
964
965 port = random.randint(0, 65535)
966 # Add an ACL
967 rules = []
968 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
969 self.proto[self.IP][self.TCP]))
970 # deny ip any any in the end
971 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
972
973 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800974 self.apply_rules(rules, b"permit ip4 tcp %d" % port)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100975
976 # Traffic should still pass
977 self.run_verify_test(self.IP, self.IPV4,
978 self.proto[self.IP][self.TCP], port)
979
980 self.logger.info("ACLP_TEST_FINISH_0015")
981
982 def test_0016_udp_permit_port_v4(self):
983 """ permit single UDPv4
984 """
985 self.logger.info("ACLP_TEST_START_0016")
986
987 port = random.randint(0, 65535)
988 # Add an ACL
989 rules = []
990 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
991 self.proto[self.IP][self.UDP]))
992 # deny ip any any in the end
993 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
994
995 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800996 self.apply_rules(rules, b"permit ip4 tcp %d" % port)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100997
998 # Traffic should still pass
999 self.run_verify_test(self.IP, self.IPV4,
1000 self.proto[self.IP][self.UDP], port)
1001
1002 self.logger.info("ACLP_TEST_FINISH_0016")
1003
1004 def test_0017_tcp_permit_port_v6(self):
1005 """ permit single TCPv6
1006 """
1007 self.logger.info("ACLP_TEST_START_0017")
1008
1009 port = random.randint(0, 65535)
1010 # Add an ACL
1011 rules = []
1012 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1013 self.proto[self.IP][self.TCP]))
1014 # deny ip any any in the end
1015 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1016
1017 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -08001018 self.apply_rules(rules, b"permit ip4 tcp %d" % port)
Pavel Kotucek59dda062017-03-02 15:22:47 +01001019
1020 # Traffic should still pass
1021 self.run_verify_test(self.IP, self.IPV6,
1022 self.proto[self.IP][self.TCP], port)
1023
1024 self.logger.info("ACLP_TEST_FINISH_0017")
1025
1026 def test_0018_udp_permit_port_v6(self):
1027 """ permit single UPPv6
1028 """
1029 self.logger.info("ACLP_TEST_START_0018")
1030
1031 port = random.randint(0, 65535)
1032 # Add an ACL
1033 rules = []
1034 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1035 self.proto[self.IP][self.UDP]))
1036 # deny ip any any in the end
1037 rules.append(self.create_rule(self.IPV6, self.DENY,
1038 self.PORTS_ALL, 0))
1039
1040 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -08001041 self.apply_rules(rules, b"permit ip4 tcp %d" % port)
Pavel Kotucek59dda062017-03-02 15:22:47 +01001042
1043 # Traffic should still pass
1044 self.run_verify_test(self.IP, self.IPV6,
1045 self.proto[self.IP][self.UDP], port)
1046
1047 self.logger.info("ACLP_TEST_FINISH_0018")
1048
1049 def test_0019_udp_deny_port(self):
1050 """ deny single TCPv4/v6
1051 """
1052 self.logger.info("ACLP_TEST_START_0019")
1053
1054 port = random.randint(0, 65535)
1055 # Add an ACL
1056 rules = []
1057 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1058 self.proto[self.IP][self.TCP]))
1059 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1060 self.proto[self.IP][self.TCP]))
1061 # Permit ip any any in the end
1062 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1063 self.PORTS_ALL, 0))
1064 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1065 self.PORTS_ALL, 0))
1066
1067 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -08001068 self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port)
Pavel Kotucek59dda062017-03-02 15:22:47 +01001069
1070 # Traffic should not pass
1071 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1072 self.proto[self.IP][self.TCP], port)
1073
1074 self.logger.info("ACLP_TEST_FINISH_0019")
1075
1076 def test_0020_udp_deny_port(self):
1077 """ deny single UDPv4/v6
1078 """
1079 self.logger.info("ACLP_TEST_START_0020")
1080
1081 port = random.randint(0, 65535)
1082 # Add an ACL
1083 rules = []
1084 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1085 self.proto[self.IP][self.UDP]))
1086 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1087 self.proto[self.IP][self.UDP]))
1088 # Permit ip any any in the end
1089 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1090 self.PORTS_ALL, 0))
1091 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1092 self.PORTS_ALL, 0))
1093
1094 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -08001095 self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port)
Pavel Kotucek59dda062017-03-02 15:22:47 +01001096
1097 # Traffic should not pass
1098 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1099 self.proto[self.IP][self.UDP], port)
1100
1101 self.logger.info("ACLP_TEST_FINISH_0020")
1102
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +00001103 def test_0021_udp_deny_port_verify_fragment_deny(self):
Chris Luked0421942018-04-10 15:19:54 -04001104 """ deny single UDPv4/v6, permit ip any, verify non-initial fragment
1105 blocked
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +00001106 """
1107 self.logger.info("ACLP_TEST_START_0021")
1108
1109 port = random.randint(0, 65535)
1110 # Add an ACL
1111 rules = []
1112 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1113 self.proto[self.IP][self.UDP]))
1114 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1115 self.proto[self.IP][self.UDP]))
1116 # deny ip any any in the end
1117 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1118 self.PORTS_ALL, 0))
1119 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1120 self.PORTS_ALL, 0))
1121
1122 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -08001123 self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port)
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +00001124
1125 # Traffic should not pass
1126 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1127 self.proto[self.IP][self.UDP], port, True)
1128
1129 self.logger.info("ACLP_TEST_FINISH_0021")
1130
Pavel Kotuceke7b67342017-04-18 13:12:20 +02001131 def test_0022_zero_length_udp_ipv4(self):
1132 """ VPP-687 zero length udp ipv4 packet"""
1133 self.logger.info("ACLP_TEST_START_0022")
1134
1135 port = random.randint(0, 65535)
1136 # Add an ACL
1137 rules = []
1138 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
1139 self.proto[self.IP][self.UDP]))
1140 # deny ip any any in the end
1141 rules.append(
1142 self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1143
1144 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -08001145 self.apply_rules(rules, b"permit empty udp ip4 %d" % port)
Pavel Kotuceke7b67342017-04-18 13:12:20 +02001146
1147 # Traffic should still pass
1148 # Create incoming packet streams for packet-generator interfaces
1149 pkts_cnt = 0
1150 pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1151 self.IP, self.IPV4,
1152 self.proto[self.IP][self.UDP], port,
1153 False, False)
1154 if len(pkts) > 0:
1155 self.pg0.add_stream(pkts)
1156 pkts_cnt += len(pkts)
1157
1158 # Enable packet capture and start packet sendingself.IPV
1159 self.pg_enable_capture(self.pg_interfaces)
1160 self.pg_start()
1161
1162 self.pg1.get_capture(pkts_cnt)
1163
1164 self.logger.info("ACLP_TEST_FINISH_0022")
1165
1166 def test_0023_zero_length_udp_ipv6(self):
1167 """ VPP-687 zero length udp ipv6 packet"""
1168 self.logger.info("ACLP_TEST_START_0023")
1169
1170 port = random.randint(0, 65535)
1171 # Add an ACL
1172 rules = []
1173 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1174 self.proto[self.IP][self.UDP]))
1175 # deny ip any any in the end
1176 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1177
1178 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -08001179 self.apply_rules(rules, b"permit empty udp ip6 %d" % port)
Pavel Kotuceke7b67342017-04-18 13:12:20 +02001180
1181 # Traffic should still pass
1182 # Create incoming packet streams for packet-generator interfaces
1183 pkts_cnt = 0
1184 pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1185 self.IP, self.IPV6,
1186 self.proto[self.IP][self.UDP], port,
1187 False, False)
1188 if len(pkts) > 0:
1189 self.pg0.add_stream(pkts)
1190 pkts_cnt += len(pkts)
1191
1192 # Enable packet capture and start packet sendingself.IPV
1193 self.pg_enable_capture(self.pg_interfaces)
1194 self.pg_start()
1195
1196 # Verify outgoing packet streams per packet-generator interface
1197 self.pg1.get_capture(pkts_cnt)
1198
1199 self.logger.info("ACLP_TEST_FINISH_0023")
1200
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +02001201 def test_0108_tcp_permit_v4(self):
1202 """ permit TCPv4 + non-match range
1203 """
1204 self.logger.info("ACLP_TEST_START_0108")
1205
1206 # Add an ACL
1207 rules = []
1208 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1209 self.proto[self.IP][self.TCP]))
1210 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1211 self.proto[self.IP][self.TCP]))
1212 # deny ip any any in the end
1213 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1214
1215 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -08001216 self.apply_rules(rules, b"permit ipv4 tcp")
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +02001217
1218 # Traffic should still pass
1219 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1220
1221 self.logger.info("ACLP_TEST_FINISH_0108")
1222
1223 def test_0109_tcp_permit_v6(self):
1224 """ permit TCPv6 + non-match range
1225 """
1226 self.logger.info("ACLP_TEST_START_0109")
1227
1228 # Add an ACL
1229 rules = []
1230 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1231 self.proto[self.IP][self.TCP]))
1232 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1233 self.proto[self.IP][self.TCP]))
1234 # deny ip any any in the end
1235 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1236
1237 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -08001238 self.apply_rules(rules, b"permit ip6 tcp")
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +02001239
1240 # Traffic should still pass
1241 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
1242
1243 self.logger.info("ACLP_TEST_FINISH_0109")
1244
1245 def test_0110_udp_permit_v4(self):
1246 """ permit UDPv4 + non-match range
1247 """
1248 self.logger.info("ACLP_TEST_START_0110")
1249
1250 # Add an ACL
1251 rules = []
1252 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1253 self.proto[self.IP][self.UDP]))
1254 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1255 self.proto[self.IP][self.UDP]))
1256 # deny ip any any in the end
1257 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1258
1259 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -08001260 self.apply_rules(rules, b"permit ipv4 udp")
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +02001261
1262 # Traffic should still pass
1263 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
1264
1265 self.logger.info("ACLP_TEST_FINISH_0110")
1266
1267 def test_0111_udp_permit_v6(self):
1268 """ permit UDPv6 + non-match range
1269 """
1270 self.logger.info("ACLP_TEST_START_0111")
1271
1272 # Add an ACL
1273 rules = []
1274 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1275 self.proto[self.IP][self.UDP]))
1276 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1277 self.proto[self.IP][self.UDP]))
1278 # deny ip any any in the end
1279 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1280
1281 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -08001282 self.apply_rules(rules, b"permit ip6 udp")
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +02001283
1284 # Traffic should still pass
1285 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
1286
1287 self.logger.info("ACLP_TEST_FINISH_0111")
1288
1289 def test_0112_tcp_deny(self):
1290 """ deny TCPv4/v6 + non-match range
1291 """
1292 self.logger.info("ACLP_TEST_START_0112")
1293
1294 # Add an ACL
1295 rules = []
1296 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1297 self.PORTS_RANGE_2,
1298 self.proto[self.IP][self.TCP]))
1299 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1300 self.PORTS_RANGE_2,
1301 self.proto[self.IP][self.TCP]))
1302 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1303 self.proto[self.IP][self.TCP]))
1304 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1305 self.proto[self.IP][self.TCP]))
1306 # permit ip any any in the end
1307 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1308 self.PORTS_ALL, 0))
1309 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1310 self.PORTS_ALL, 0))
1311
1312 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -08001313 self.apply_rules(rules, b"deny ip4/ip6 tcp")
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +02001314
1315 # Traffic should not pass
1316 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1317 self.proto[self.IP][self.TCP])
1318
1319 self.logger.info("ACLP_TEST_FINISH_0112")
1320
1321 def test_0113_udp_deny(self):
1322 """ deny UDPv4/v6 + non-match range
1323 """
1324 self.logger.info("ACLP_TEST_START_0113")
1325
1326 # Add an ACL
1327 rules = []
1328 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1329 self.PORTS_RANGE_2,
1330 self.proto[self.IP][self.UDP]))
1331 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1332 self.PORTS_RANGE_2,
1333 self.proto[self.IP][self.UDP]))
1334 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1335 self.proto[self.IP][self.UDP]))
1336 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1337 self.proto[self.IP][self.UDP]))
1338 # permit ip any any in the end
1339 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1340 self.PORTS_ALL, 0))
1341 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1342 self.PORTS_ALL, 0))
1343
1344 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -08001345 self.apply_rules(rules, b"deny ip4/ip6 udp")
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +02001346
1347 # Traffic should not pass
1348 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1349 self.proto[self.IP][self.UDP])
1350
1351 self.logger.info("ACLP_TEST_FINISH_0113")
1352
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001353 def test_0300_tcp_permit_v4_etype_aaaa(self):
1354 """ permit TCPv4, send 0xAAAA etype
1355 """
1356 self.logger.info("ACLP_TEST_START_0300")
1357
1358 # Add an ACL
1359 rules = []
1360 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1361 self.proto[self.IP][self.TCP]))
1362 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1363 self.proto[self.IP][self.TCP]))
1364 # deny ip any any in the end
1365 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1366
1367 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -08001368 self.apply_rules(rules, b"permit ipv4 tcp")
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001369
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001370 # Traffic should still pass also for an odd ethertype
1371 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1372 0, False, True, 0xaaaa)
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001373 self.logger.info("ACLP_TEST_FINISH_0300")
1374
1375 def test_0305_tcp_permit_v4_etype_blacklist_aaaa(self):
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +02001376 """ permit TCPv4, whitelist 0x0BBB ethertype, send 0xAAAA-blocked
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001377 """
1378 self.logger.info("ACLP_TEST_START_0305")
1379
1380 # Add an ACL
1381 rules = []
1382 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1383 self.proto[self.IP][self.TCP]))
1384 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1385 self.proto[self.IP][self.TCP]))
1386 # deny ip any any in the end
1387 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1388
1389 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -08001390 self.apply_rules(rules, b"permit ipv4 tcp")
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001391
1392 # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1393 self.etype_whitelist([0xbbb], 1)
1394
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001395 # The oddball ethertype should be blocked
1396 self.run_verify_negat_test(self.IP, self.IPV4,
1397 self.proto[self.IP][self.TCP],
1398 0, False, 0xaaaa)
1399
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +02001400 # remove the whitelist
1401 self.etype_whitelist([], 0)
1402
1403 self.logger.info("ACLP_TEST_FINISH_0305")
1404
1405 def test_0306_tcp_permit_v4_etype_blacklist_aaaa(self):
1406 """ permit TCPv4, whitelist 0x0BBB ethertype, send 0x0BBB - pass
1407 """
1408 self.logger.info("ACLP_TEST_START_0306")
1409
1410 # Add an ACL
1411 rules = []
1412 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1413 self.proto[self.IP][self.TCP]))
1414 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1415 self.proto[self.IP][self.TCP]))
1416 # deny ip any any in the end
1417 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1418
1419 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -08001420 self.apply_rules(rules, b"permit ipv4 tcp")
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +02001421
1422 # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1423 self.etype_whitelist([0xbbb], 1)
1424
1425 # The whitelisted traffic, should pass
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001426 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1427 0, False, True, 0x0bbb)
1428
1429 # remove the whitelist, the previously blocked 0xAAAA should pass now
1430 self.etype_whitelist([], 0)
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +02001431
1432 self.logger.info("ACLP_TEST_FINISH_0306")
1433
1434 def test_0307_tcp_permit_v4_etype_blacklist_aaaa(self):
1435 """ permit TCPv4, whitelist 0x0BBB, remove, send 0xAAAA - pass
1436 """
1437 self.logger.info("ACLP_TEST_START_0307")
1438
1439 # Add an ACL
1440 rules = []
1441 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1442 self.proto[self.IP][self.TCP]))
1443 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1444 self.proto[self.IP][self.TCP]))
1445 # deny ip any any in the end
1446 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1447
1448 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -08001449 self.apply_rules(rules, b"permit ipv4 tcp")
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +02001450
1451 # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1452 self.etype_whitelist([0xbbb], 1)
1453 # remove the whitelist, the previously blocked 0xAAAA should pass now
1454 self.etype_whitelist([], 0)
1455
1456 # The whitelisted traffic, should pass
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001457 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1458 0, False, True, 0xaaaa)
1459
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +02001460 self.logger.info("ACLP_TEST_FINISH_0306")
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +02001461
Andrew Yourtchenkode3682f2018-03-23 10:38:46 +01001462 def test_0315_del_intf(self):
1463 """ apply an acl and delete the interface
1464 """
1465 self.logger.info("ACLP_TEST_START_0315")
1466
1467 # Add an ACL
1468 rules = []
1469 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1470 self.proto[self.IP][self.TCP]))
1471 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1472 self.proto[self.IP][self.TCP]))
1473 # deny ip any any in the end
1474 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1475
1476 # create an interface
1477 intf = []
Klement Sekerabeaded52018-06-24 10:30:37 +02001478 intf.append(VppLoInterface(self))
Andrew Yourtchenkode3682f2018-03-23 10:38:46 +01001479
1480 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -08001481 self.apply_rules_to(rules, b"permit ipv4 tcp", intf[0].sw_if_index)
Andrew Yourtchenkode3682f2018-03-23 10:38:46 +01001482
1483 # Remove the interface
1484 intf[0].remove_vpp_config()
1485
1486 self.logger.info("ACLP_TEST_FINISH_0315")
1487
Pavel Kotucek59dda062017-03-02 15:22:47 +01001488if __name__ == '__main__':
1489 unittest.main(testRunner=VppTestRunner)