blob: 772b5bbb4156aa7cbaa75606d4e48b51f7b85488 [file] [log] [blame]
Pavel Kotucek59dda062017-03-02 15:22:47 +01001#!/usr/bin/env python
2"""ACL plugin Test Case HLD:
3"""
4
5import unittest
6import random
7
8from scapy.packet import Raw
9from scapy.layers.l2 import Ether
10from scapy.layers.inet import IP, TCP, UDP, ICMP
11from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +000012from scapy.layers.inet6 import IPv6ExtHdrFragment
Pavel Kotucek59dda062017-03-02 15:22:47 +010013from framework import VppTestCase, VppTestRunner
14from util import Host, ppp
15
Andrew Yourtchenkode3682f2018-03-23 10:38:46 +010016from vpp_lo_interface import VppLoInterface
17
Pavel Kotucek59dda062017-03-02 15:22:47 +010018
19class TestACLplugin(VppTestCase):
20 """ ACL plugin Test Case """
21
22 # traffic types
23 IP = 0
24 ICMP = 1
25
26 # IP version
27 IPRANDOM = -1
28 IPV4 = 0
29 IPV6 = 1
30
31 # rule types
32 DENY = 0
33 PERMIT = 1
34
35 # supported protocols
36 proto = [[6, 17], [1, 58]]
37 proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
38 ICMPv4 = 0
39 ICMPv6 = 1
40 TCP = 0
41 UDP = 1
42 PROTO_ALL = 0
43
44 # port ranges
45 PORTS_ALL = -1
46 PORTS_RANGE = 0
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +020047 PORTS_RANGE_2 = 1
Pavel Kotucek59dda062017-03-02 15:22:47 +010048 udp_sport_from = 10
49 udp_sport_to = udp_sport_from + 5
50 udp_dport_from = 20000
51 udp_dport_to = udp_dport_from + 5000
52 tcp_sport_from = 30
53 tcp_sport_to = tcp_sport_from + 5
54 tcp_dport_from = 40000
55 tcp_dport_to = tcp_dport_from + 5000
56
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +020057 udp_sport_from_2 = 90
58 udp_sport_to_2 = udp_sport_from_2 + 5
59 udp_dport_from_2 = 30000
60 udp_dport_to_2 = udp_dport_from_2 + 5000
61 tcp_sport_from_2 = 130
62 tcp_sport_to_2 = tcp_sport_from_2 + 5
63 tcp_dport_from_2 = 20000
64 tcp_dport_to_2 = tcp_dport_from_2 + 5000
65
Pavel Kotucek59dda062017-03-02 15:22:47 +010066 icmp4_type = 8 # echo request
67 icmp4_code = 3
68 icmp6_type = 128 # echo request
69 icmp6_code = 3
70
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +020071 icmp4_type_2 = 8
72 icmp4_code_from_2 = 5
73 icmp4_code_to_2 = 20
74 icmp6_type_2 = 128
75 icmp6_code_from_2 = 8
76 icmp6_code_to_2 = 42
77
Pavel Kotucek59dda062017-03-02 15:22:47 +010078 # Test variables
79 bd_id = 1
80
81 @classmethod
82 def setUpClass(cls):
83 """
84 Perform standard class setup (defined by class method setUpClass in
85 class VppTestCase) before running the test case, set test case related
86 variables and configure VPP.
87 """
88 super(TestACLplugin, cls).setUpClass()
89
Pavel Kotucek59dda062017-03-02 15:22:47 +010090 try:
91 # Create 2 pg interfaces
92 cls.create_pg_interfaces(range(2))
93
94 # Packet flows mapping pg0 -> pg1, pg2 etc.
95 cls.flows = dict()
96 cls.flows[cls.pg0] = [cls.pg1]
97
98 # Packet sizes
99 cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
100
101 # Create BD with MAC learning and unknown unicast flooding disabled
102 # and put interfaces to this BD
103 cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
104 learn=1)
105 for pg_if in cls.pg_interfaces:
Ole Troana5b2eec2019-03-11 19:23:25 +0100106 cls.vapi.sw_interface_set_l2_bridge(
107 rx_sw_if_index=pg_if.sw_if_index, bd_id=cls.bd_id)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100108
109 # Set up all interfaces
110 for i in cls.pg_interfaces:
111 i.admin_up()
112
113 # Mapping between packet-generator index and lists of test hosts
114 cls.hosts_by_pg_idx = dict()
115 for pg_if in cls.pg_interfaces:
116 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
117
118 # Create list of deleted hosts
119 cls.deleted_hosts_by_pg_idx = dict()
120 for pg_if in cls.pg_interfaces:
121 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
122
123 # warm-up the mac address tables
124 # self.warmup_test()
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +0200125 count = 16
126 start = 0
127 n_int = len(cls.pg_interfaces)
128 macs_per_if = count / n_int
129 i = -1
130 for pg_if in cls.pg_interfaces:
131 i += 1
132 start_nr = macs_per_if * i + start
133 end_nr = count + start if i == (n_int - 1) \
134 else macs_per_if * (i + 1) + start
135 hosts = cls.hosts_by_pg_idx[pg_if.sw_if_index]
136 for j in range(start_nr, end_nr):
137 host = Host(
138 "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
139 "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
140 "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
141 hosts.append(host)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100142
143 except Exception:
144 super(TestACLplugin, cls).tearDownClass()
145 raise
146
Paul Vinciguerra7f9b7f92019-03-12 19:23:27 -0700147 @classmethod
148 def tearDownClass(cls):
149 super(TestACLplugin, cls).tearDownClass()
150
Pavel Kotucek59dda062017-03-02 15:22:47 +0100151 def setUp(self):
152 super(TestACLplugin, self).setUp()
153 self.reset_packet_infos()
154
155 def tearDown(self):
156 """
157 Show various debug prints after each test.
158 """
159 super(TestACLplugin, self).tearDown()
160 if not self.vpp_dead:
Andrew Yourtchenkoc1f87942018-10-06 09:26:02 +0200161 cli = "show vlib graph l2-input-feat-arc"
162 self.logger.info(self.vapi.ppcli(cli))
163 cli = "show vlib graph l2-input-feat-arc-end"
164 self.logger.info(self.vapi.ppcli(cli))
165 cli = "show vlib graph l2-output-feat-arc"
166 self.logger.info(self.vapi.ppcli(cli))
167 cli = "show vlib graph l2-output-feat-arc-end"
168 self.logger.info(self.vapi.ppcli(cli))
Pavel Kotucek59dda062017-03-02 15:22:47 +0100169 self.logger.info(self.vapi.ppcli("show l2fib verbose"))
Andrew Yourtchenko7f4d5772017-05-24 13:20:47 +0200170 self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
171 self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
172 self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
Pavel Kotucek59dda062017-03-02 15:22:47 +0100173 self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
174 % self.bd_id))
175
Pavel Kotucek59dda062017-03-02 15:22:47 +0100176 def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
177 s_prefix=0, s_ip='\x00\x00\x00\x00',
178 d_prefix=0, d_ip='\x00\x00\x00\x00'):
179 if proto == -1:
180 return
181 if ports == self.PORTS_ALL:
182 sport_from = 0
183 dport_from = 0
184 sport_to = 65535 if proto != 1 and proto != 58 else 255
185 dport_to = sport_to
186 elif ports == self.PORTS_RANGE:
187 if proto == 1:
188 sport_from = self.icmp4_type
189 sport_to = self.icmp4_type
190 dport_from = self.icmp4_code
191 dport_to = self.icmp4_code
192 elif proto == 58:
193 sport_from = self.icmp6_type
194 sport_to = self.icmp6_type
195 dport_from = self.icmp6_code
196 dport_to = self.icmp6_code
197 elif proto == self.proto[self.IP][self.TCP]:
198 sport_from = self.tcp_sport_from
199 sport_to = self.tcp_sport_to
200 dport_from = self.tcp_dport_from
201 dport_to = self.tcp_dport_to
202 elif proto == self.proto[self.IP][self.UDP]:
203 sport_from = self.udp_sport_from
204 sport_to = self.udp_sport_to
205 dport_from = self.udp_dport_from
206 dport_to = self.udp_dport_to
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +0200207 elif ports == self.PORTS_RANGE_2:
208 if proto == 1:
209 sport_from = self.icmp4_type_2
210 sport_to = self.icmp4_type_2
211 dport_from = self.icmp4_code_from_2
212 dport_to = self.icmp4_code_to_2
213 elif proto == 58:
214 sport_from = self.icmp6_type_2
215 sport_to = self.icmp6_type_2
216 dport_from = self.icmp6_code_from_2
217 dport_to = self.icmp6_code_to_2
218 elif proto == self.proto[self.IP][self.TCP]:
219 sport_from = self.tcp_sport_from_2
220 sport_to = self.tcp_sport_to_2
221 dport_from = self.tcp_dport_from_2
222 dport_to = self.tcp_dport_to_2
223 elif proto == self.proto[self.IP][self.UDP]:
224 sport_from = self.udp_sport_from_2
225 sport_to = self.udp_sport_to_2
226 dport_from = self.udp_dport_from_2
227 dport_to = self.udp_dport_to_2
Pavel Kotucek59dda062017-03-02 15:22:47 +0100228 else:
229 sport_from = ports
230 sport_to = ports
231 dport_from = ports
232 dport_to = ports
233
234 rule = ({'is_permit': permit_deny, 'is_ipv6': ip, 'proto': proto,
235 'srcport_or_icmptype_first': sport_from,
236 'srcport_or_icmptype_last': sport_to,
237 'src_ip_prefix_len': s_prefix,
238 'src_ip_addr': s_ip,
239 'dstport_or_icmpcode_first': dport_from,
240 'dstport_or_icmpcode_last': dport_to,
241 'dst_ip_prefix_len': d_prefix,
242 'dst_ip_addr': d_ip})
243 return rule
244
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800245 def apply_rules(self, rules, tag=b''):
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200246 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
247 tag=tag)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100248 self.logger.info("Dumped ACL: " + str(
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200249 self.vapi.acl_dump(reply.acl_index)))
Pavel Kotucek59dda062017-03-02 15:22:47 +0100250 # Apply a ACL on the interface as inbound
251 for i in self.pg_interfaces:
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200252 self.vapi.acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
253 n_input=1,
254 acls=[reply.acl_index])
Pavel Kotucek59dda062017-03-02 15:22:47 +0100255 return
256
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800257 def apply_rules_to(self, rules, tag=b'', sw_if_index=0xFFFFFFFF):
Andrew Yourtchenkode3682f2018-03-23 10:38:46 +0100258 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
259 tag=tag)
260 self.logger.info("Dumped ACL: " + str(
261 self.vapi.acl_dump(reply.acl_index)))
262 # Apply a ACL on the interface as inbound
263 self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index,
264 n_input=1,
265 acls=[reply.acl_index])
266 return
267
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100268 def etype_whitelist(self, whitelist, n_input):
269 # Apply whitelists on all the interfaces
270 for i in self.pg_interfaces:
271 # checkstyle can't read long names. Help them.
272 fun = self.vapi.acl_interface_set_etype_whitelist
273 fun(sw_if_index=i.sw_if_index, n_input=n_input,
274 whitelist=whitelist)
275 return
276
Pavel Kotucek59dda062017-03-02 15:22:47 +0100277 def create_upper_layer(self, packet_index, proto, ports=0):
278 p = self.proto_map[proto]
279 if p == 'UDP':
280 if ports == 0:
281 return UDP(sport=random.randint(self.udp_sport_from,
282 self.udp_sport_to),
283 dport=random.randint(self.udp_dport_from,
284 self.udp_dport_to))
285 else:
286 return UDP(sport=ports, dport=ports)
287 elif p == 'TCP':
288 if ports == 0:
289 return TCP(sport=random.randint(self.tcp_sport_from,
290 self.tcp_sport_to),
291 dport=random.randint(self.tcp_dport_from,
292 self.tcp_dport_to))
293 else:
294 return TCP(sport=ports, dport=ports)
295 return ''
296
297 def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100298 proto=-1, ports=0, fragments=False,
299 pkt_raw=True, etype=-1):
Pavel Kotucek59dda062017-03-02 15:22:47 +0100300 """
301 Create input packet stream for defined interface using hosts or
302 deleted_hosts list.
303
304 :param object src_if: Interface to create packet stream for.
305 :param list packet_sizes: List of required packet sizes.
306 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
307 :return: Stream of packets.
308 """
309 pkts = []
310 if self.flows.__contains__(src_if):
311 src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
312 for dst_if in self.flows[src_if]:
313 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
314 n_int = len(dst_hosts) * len(src_hosts)
315 for i in range(0, n_int):
316 dst_host = dst_hosts[i / len(src_hosts)]
317 src_host = src_hosts[i % len(src_hosts)]
318 pkt_info = self.create_packet_info(src_if, dst_if)
319 if ipv6 == 1:
320 pkt_info.ip = 1
321 elif ipv6 == 0:
322 pkt_info.ip = 0
323 else:
324 pkt_info.ip = random.choice([0, 1])
325 if proto == -1:
326 pkt_info.proto = random.choice(self.proto[self.IP])
327 else:
328 pkt_info.proto = proto
329 payload = self.info_to_payload(pkt_info)
330 p = Ether(dst=dst_host.mac, src=src_host.mac)
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100331 if etype > 0:
332 p = Ether(dst=dst_host.mac,
333 src=src_host.mac,
334 type=etype)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100335 if pkt_info.ip:
336 p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000337 if fragments:
338 p /= IPv6ExtHdrFragment(offset=64, m=1)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100339 else:
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000340 if fragments:
341 p /= IP(src=src_host.ip4, dst=dst_host.ip4,
342 flags=1, frag=64)
343 else:
344 p /= IP(src=src_host.ip4, dst=dst_host.ip4)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100345 if traffic_type == self.ICMP:
346 if pkt_info.ip:
347 p /= ICMPv6EchoRequest(type=self.icmp6_type,
348 code=self.icmp6_code)
349 else:
350 p /= ICMP(type=self.icmp4_type,
351 code=self.icmp4_code)
352 else:
353 p /= self.create_upper_layer(i, pkt_info.proto, ports)
Pavel Kotuceke7b67342017-04-18 13:12:20 +0200354 if pkt_raw:
355 p /= Raw(payload)
356 pkt_info.data = p.copy()
357 if pkt_raw:
358 size = random.choice(packet_sizes)
359 self.extend_packet(p, size)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100360 pkts.append(p)
361 return pkts
362
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100363 def verify_capture(self, pg_if, capture,
364 traffic_type=0, ip_type=0, etype=-1):
Pavel Kotucek59dda062017-03-02 15:22:47 +0100365 """
366 Verify captured input packet stream for defined interface.
367
368 :param object pg_if: Interface to verify captured packet stream for.
369 :param list capture: Captured packet stream.
370 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
371 """
372 last_info = dict()
373 for i in self.pg_interfaces:
374 last_info[i.sw_if_index] = None
375 dst_sw_if_index = pg_if.sw_if_index
376 for packet in capture:
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100377 if etype > 0:
378 if packet[Ether].type != etype:
379 self.logger.error(ppp("Unexpected ethertype in packet:",
380 packet))
381 else:
382 continue
Pavel Kotucek59dda062017-03-02 15:22:47 +0100383 try:
384 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
385 if traffic_type == self.ICMP and ip_type == self.IPV6:
386 payload_info = self.payload_to_info(
Paul Vinciguerraeaea4212019-03-06 11:58:06 -0800387 packet[ICMPv6EchoRequest], 'data')
Pavel Kotucek59dda062017-03-02 15:22:47 +0100388 payload = packet[ICMPv6EchoRequest]
389 else:
Paul Vinciguerraeaea4212019-03-06 11:58:06 -0800390 payload_info = self.payload_to_info(packet[Raw])
Pavel Kotucek59dda062017-03-02 15:22:47 +0100391 payload = packet[self.proto_map[payload_info.proto]]
392 except:
393 self.logger.error(ppp("Unexpected or invalid packet "
394 "(outside network):", packet))
395 raise
396
397 if ip_type != 0:
398 self.assertEqual(payload_info.ip, ip_type)
399 if traffic_type == self.ICMP:
400 try:
401 if payload_info.ip == 0:
402 self.assertEqual(payload.type, self.icmp4_type)
403 self.assertEqual(payload.code, self.icmp4_code)
404 else:
405 self.assertEqual(payload.type, self.icmp6_type)
406 self.assertEqual(payload.code, self.icmp6_code)
407 except:
408 self.logger.error(ppp("Unexpected or invalid packet "
409 "(outside network):", packet))
410 raise
411 else:
412 try:
413 ip_version = IPv6 if payload_info.ip == 1 else IP
414
415 ip = packet[ip_version]
416 packet_index = payload_info.index
417
418 self.assertEqual(payload_info.dst, dst_sw_if_index)
419 self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
420 (pg_if.name, payload_info.src,
421 packet_index))
422 next_info = self.get_next_packet_info_for_interface2(
423 payload_info.src, dst_sw_if_index,
424 last_info[payload_info.src])
425 last_info[payload_info.src] = next_info
426 self.assertTrue(next_info is not None)
427 self.assertEqual(packet_index, next_info.index)
428 saved_packet = next_info.data
429 # Check standard fields
430 self.assertEqual(ip.src, saved_packet[ip_version].src)
431 self.assertEqual(ip.dst, saved_packet[ip_version].dst)
432 p = self.proto_map[payload_info.proto]
433 if p == 'TCP':
434 tcp = packet[TCP]
435 self.assertEqual(tcp.sport, saved_packet[
436 TCP].sport)
437 self.assertEqual(tcp.dport, saved_packet[
438 TCP].dport)
439 elif p == 'UDP':
440 udp = packet[UDP]
441 self.assertEqual(udp.sport, saved_packet[
442 UDP].sport)
443 self.assertEqual(udp.dport, saved_packet[
444 UDP].dport)
445 except:
446 self.logger.error(ppp("Unexpected or invalid packet:",
447 packet))
448 raise
449 for i in self.pg_interfaces:
450 remaining_packet = self.get_next_packet_info_for_interface2(
451 i, dst_sw_if_index, last_info[i.sw_if_index])
452 self.assertTrue(
453 remaining_packet is None,
454 "Port %u: Packet expected from source %u didn't arrive" %
455 (dst_sw_if_index, i.sw_if_index))
456
457 def run_traffic_no_check(self):
458 # Test
459 # Create incoming packet streams for packet-generator interfaces
460 for i in self.pg_interfaces:
461 if self.flows.__contains__(i):
462 pkts = self.create_stream(i, self.pg_if_packet_sizes)
463 if len(pkts) > 0:
464 i.add_stream(pkts)
465
466 # Enable packet capture and start packet sending
467 self.pg_enable_capture(self.pg_interfaces)
468 self.pg_start()
469
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000470 def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100471 frags=False, pkt_raw=True, etype=-1):
Pavel Kotucek59dda062017-03-02 15:22:47 +0100472 # Test
473 # Create incoming packet streams for packet-generator interfaces
474 pkts_cnt = 0
475 for i in self.pg_interfaces:
476 if self.flows.__contains__(i):
477 pkts = self.create_stream(i, self.pg_if_packet_sizes,
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000478 traffic_type, ip_type, proto, ports,
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100479 frags, pkt_raw, etype)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100480 if len(pkts) > 0:
481 i.add_stream(pkts)
482 pkts_cnt += len(pkts)
483
484 # Enable packet capture and start packet sendingself.IPV
485 self.pg_enable_capture(self.pg_interfaces)
486 self.pg_start()
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +0200487 self.logger.info("sent packets count: %d" % pkts_cnt)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100488
489 # Verify
490 # Verify outgoing packet streams per packet-generator interface
491 for src_if in self.pg_interfaces:
492 if self.flows.__contains__(src_if):
493 for dst_if in self.flows[src_if]:
494 capture = dst_if.get_capture(pkts_cnt)
495 self.logger.info("Verifying capture on interface %s" %
496 dst_if.name)
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100497 self.verify_capture(dst_if, capture,
498 traffic_type, ip_type, etype)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100499
500 def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100501 ports=0, frags=False, etype=-1):
Pavel Kotucek59dda062017-03-02 15:22:47 +0100502 # Test
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +0200503 pkts_cnt = 0
Pavel Kotucek59dda062017-03-02 15:22:47 +0100504 self.reset_packet_infos()
505 for i in self.pg_interfaces:
506 if self.flows.__contains__(i):
507 pkts = self.create_stream(i, self.pg_if_packet_sizes,
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000508 traffic_type, ip_type, proto, ports,
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100509 frags, True, etype)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100510 if len(pkts) > 0:
511 i.add_stream(pkts)
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +0200512 pkts_cnt += len(pkts)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100513
514 # Enable packet capture and start packet sending
515 self.pg_enable_capture(self.pg_interfaces)
516 self.pg_start()
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +0200517 self.logger.info("sent packets count: %d" % pkts_cnt)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100518
519 # Verify
520 # Verify outgoing packet streams per packet-generator interface
521 for src_if in self.pg_interfaces:
522 if self.flows.__contains__(src_if):
523 for dst_if in self.flows[src_if]:
524 self.logger.info("Verifying capture on interface %s" %
525 dst_if.name)
526 capture = dst_if.get_capture(0)
527 self.assertEqual(len(capture), 0)
528
Pavel Kotucek59dda062017-03-02 15:22:47 +0100529 def test_0000_warmup_test(self):
530 """ ACL plugin version check; learn MACs
531 """
Pavel Kotucek59dda062017-03-02 15:22:47 +0100532 reply = self.vapi.papi.acl_plugin_get_version()
533 self.assertEqual(reply.major, 1)
534 self.logger.info("Working with ACL plugin version: %d.%d" % (
535 reply.major, reply.minor))
536 # minor version changes are non breaking
537 # self.assertEqual(reply.minor, 0)
538
539 def test_0001_acl_create(self):
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200540 """ ACL create/delete test
Pavel Kotucek59dda062017-03-02 15:22:47 +0100541 """
542
543 self.logger.info("ACLP_TEST_START_0001")
544 # Add an ACL
545 r = [{'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
546 'srcport_or_icmptype_first': 1234,
547 'srcport_or_icmptype_last': 1235,
548 'src_ip_prefix_len': 0,
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800549 'src_ip_addr': b'\x00\x00\x00\x00',
Pavel Kotucek59dda062017-03-02 15:22:47 +0100550 'dstport_or_icmpcode_first': 1234,
551 'dstport_or_icmpcode_last': 1234,
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800552 'dst_ip_addr': b'\x00\x00\x00\x00',
Pavel Kotucek59dda062017-03-02 15:22:47 +0100553 'dst_ip_prefix_len': 0}]
554 # Test 1: add a new ACL
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200555 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r,
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800556 tag=b"permit 1234")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100557 self.assertEqual(reply.retval, 0)
558 # The very first ACL gets #0
559 self.assertEqual(reply.acl_index, 0)
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200560 first_acl = reply.acl_index
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200561 rr = self.vapi.acl_dump(reply.acl_index)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100562 self.logger.info("Dumped ACL: " + str(rr))
563 self.assertEqual(len(rr), 1)
564 # We should have the same number of ACL entries as we had asked
565 self.assertEqual(len(rr[0].r), len(r))
566 # The rules should be the same. But because the submitted and returned
567 # are different types, we need to iterate over rules and keys to get
568 # to basic values.
569 for i_rule in range(0, len(r) - 1):
570 for rule_key in r[i_rule]:
571 self.assertEqual(rr[0].r[i_rule][rule_key],
572 r[i_rule][rule_key])
573
574 # Add a deny-1234 ACL
Ole Troan895b6e82017-10-20 13:28:20 +0200575 r_deny = [{'is_permit': 0, 'is_ipv6': 0, 'proto': 17,
Pavel Kotucek59dda062017-03-02 15:22:47 +0100576 'srcport_or_icmptype_first': 1234,
577 'srcport_or_icmptype_last': 1235,
578 'src_ip_prefix_len': 0,
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800579 'src_ip_addr': b'\x00\x00\x00\x00',
Pavel Kotucek59dda062017-03-02 15:22:47 +0100580 'dstport_or_icmpcode_first': 1234,
581 'dstport_or_icmpcode_last': 1234,
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800582 'dst_ip_addr': b'\x00\x00\x00\x00',
Pavel Kotucek59dda062017-03-02 15:22:47 +0100583 'dst_ip_prefix_len': 0},
584 {'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
585 'srcport_or_icmptype_first': 0,
586 'srcport_or_icmptype_last': 0,
587 'src_ip_prefix_len': 0,
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800588 'src_ip_addr': b'\x00\x00\x00\x00',
Pavel Kotucek59dda062017-03-02 15:22:47 +0100589 'dstport_or_icmpcode_first': 0,
590 'dstport_or_icmpcode_last': 0,
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800591 'dst_ip_addr': b'\x00\x00\x00\x00',
Ole Troan895b6e82017-10-20 13:28:20 +0200592 'dst_ip_prefix_len': 0}]
Pavel Kotucek59dda062017-03-02 15:22:47 +0100593
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200594 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_deny,
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800595 tag=b"deny 1234;permit all")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100596 self.assertEqual(reply.retval, 0)
597 # The second ACL gets #1
598 self.assertEqual(reply.acl_index, 1)
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200599 second_acl = reply.acl_index
Pavel Kotucek59dda062017-03-02 15:22:47 +0100600
601 # Test 2: try to modify a nonexistent ACL
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200602 reply = self.vapi.acl_add_replace(acl_index=432, r=r,
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800603 tag=b"FFFF:FFFF", expected_retval=-6)
Jon Loeliger27cadd22017-11-10 13:15:12 -0600604 self.assertEqual(reply.retval, -6)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100605 # The ACL number should pass through
606 self.assertEqual(reply.acl_index, 432)
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200607 # apply an ACL on an interface inbound, try to delete ACL, must fail
608 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
609 n_input=1,
610 acls=[first_acl])
Jon Loeliger27cadd22017-11-10 13:15:12 -0600611 reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=-142)
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200612 # Unapply an ACL and then try to delete it - must be ok
613 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
614 n_input=0,
615 acls=[])
616 reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=0)
617
618 # apply an ACL on an interface outbound, try to delete ACL, must fail
619 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
620 n_input=0,
621 acls=[second_acl])
Jon Loeliger27cadd22017-11-10 13:15:12 -0600622 reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=-143)
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200623 # Unapply the ACL and then try to delete it - must be ok
624 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
625 n_input=0,
626 acls=[])
627 reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=0)
628
629 # try to apply a nonexistent ACL - must fail
630 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
631 n_input=1,
632 acls=[first_acl],
Jon Loeliger27cadd22017-11-10 13:15:12 -0600633 expected_retval=-6)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100634
635 self.logger.info("ACLP_TEST_FINISH_0001")
636
637 def test_0002_acl_permit_apply(self):
638 """ permit ACL apply test
639 """
640 self.logger.info("ACLP_TEST_START_0002")
641
642 rules = []
643 rules.append(self.create_rule(self.IPV4, self.PERMIT,
644 0, self.proto[self.IP][self.UDP]))
645 rules.append(self.create_rule(self.IPV4, self.PERMIT,
646 0, self.proto[self.IP][self.TCP]))
647
648 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800649 self.apply_rules(rules, b"permit per-flow")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100650
651 # Traffic should still pass
652 self.run_verify_test(self.IP, self.IPV4, -1)
653 self.logger.info("ACLP_TEST_FINISH_0002")
654
655 def test_0003_acl_deny_apply(self):
656 """ deny ACL apply test
657 """
658 self.logger.info("ACLP_TEST_START_0003")
659 # Add a deny-flows ACL
660 rules = []
661 rules.append(self.create_rule(self.IPV4, self.DENY,
662 self.PORTS_ALL, self.proto[self.IP][self.UDP]))
663 # Permit ip any any in the end
664 rules.append(self.create_rule(self.IPV4, self.PERMIT,
665 self.PORTS_ALL, 0))
666
667 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800668 self.apply_rules(rules, b"deny per-flow;permit all")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100669
670 # Traffic should not pass
671 self.run_verify_negat_test(self.IP, self.IPV4,
672 self.proto[self.IP][self.UDP])
673 self.logger.info("ACLP_TEST_FINISH_0003")
674 # self.assertEqual(1, 0)
675
676 def test_0004_vpp624_permit_icmpv4(self):
677 """ VPP_624 permit ICMPv4
678 """
679 self.logger.info("ACLP_TEST_START_0004")
680
681 # Add an ACL
682 rules = []
683 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
684 self.proto[self.ICMP][self.ICMPv4]))
685 # deny ip any any in the end
686 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
687
688 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800689 self.apply_rules(rules, b"permit icmpv4")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100690
691 # Traffic should still pass
692 self.run_verify_test(self.ICMP, self.IPV4,
693 self.proto[self.ICMP][self.ICMPv4])
694
695 self.logger.info("ACLP_TEST_FINISH_0004")
696
697 def test_0005_vpp624_permit_icmpv6(self):
698 """ VPP_624 permit ICMPv6
699 """
700 self.logger.info("ACLP_TEST_START_0005")
701
702 # Add an ACL
703 rules = []
704 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
705 self.proto[self.ICMP][self.ICMPv6]))
706 # deny ip any any in the end
707 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
708
709 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800710 self.apply_rules(rules, b"permit icmpv6")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100711
712 # Traffic should still pass
713 self.run_verify_test(self.ICMP, self.IPV6,
714 self.proto[self.ICMP][self.ICMPv6])
715
716 self.logger.info("ACLP_TEST_FINISH_0005")
717
718 def test_0006_vpp624_deny_icmpv4(self):
719 """ VPP_624 deny ICMPv4
720 """
721 self.logger.info("ACLP_TEST_START_0006")
722 # Add an ACL
723 rules = []
724 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
725 self.proto[self.ICMP][self.ICMPv4]))
726 # permit ip any any in the end
727 rules.append(self.create_rule(self.IPV4, self.PERMIT,
728 self.PORTS_ALL, 0))
729
730 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800731 self.apply_rules(rules, b"deny icmpv4")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100732
733 # Traffic should not pass
734 self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
735
736 self.logger.info("ACLP_TEST_FINISH_0006")
737
738 def test_0007_vpp624_deny_icmpv6(self):
739 """ VPP_624 deny ICMPv6
740 """
741 self.logger.info("ACLP_TEST_START_0007")
742 # Add an ACL
743 rules = []
744 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
745 self.proto[self.ICMP][self.ICMPv6]))
746 # deny ip any any in the end
747 rules.append(self.create_rule(self.IPV6, self.PERMIT,
748 self.PORTS_ALL, 0))
749
750 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800751 self.apply_rules(rules, b"deny icmpv6")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100752
753 # Traffic should not pass
754 self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
755
756 self.logger.info("ACLP_TEST_FINISH_0007")
757
758 def test_0008_tcp_permit_v4(self):
759 """ permit TCPv4
760 """
761 self.logger.info("ACLP_TEST_START_0008")
762
763 # Add an ACL
764 rules = []
765 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
766 self.proto[self.IP][self.TCP]))
767 # deny ip any any in the end
768 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
769
770 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800771 self.apply_rules(rules, b"permit ipv4 tcp")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100772
773 # Traffic should still pass
774 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
775
776 self.logger.info("ACLP_TEST_FINISH_0008")
777
778 def test_0009_tcp_permit_v6(self):
779 """ permit TCPv6
780 """
781 self.logger.info("ACLP_TEST_START_0009")
782
783 # Add an ACL
784 rules = []
785 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
786 self.proto[self.IP][self.TCP]))
787 # deny ip any any in the end
788 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
789
790 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800791 self.apply_rules(rules, b"permit ip6 tcp")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100792
793 # Traffic should still pass
794 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
795
796 self.logger.info("ACLP_TEST_FINISH_0008")
797
798 def test_0010_udp_permit_v4(self):
799 """ permit UDPv4
800 """
801 self.logger.info("ACLP_TEST_START_0010")
802
803 # Add an ACL
804 rules = []
805 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
806 self.proto[self.IP][self.UDP]))
807 # deny ip any any in the end
808 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
809
810 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800811 self.apply_rules(rules, b"permit ipv udp")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100812
813 # Traffic should still pass
814 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
815
816 self.logger.info("ACLP_TEST_FINISH_0010")
817
818 def test_0011_udp_permit_v6(self):
819 """ permit UDPv6
820 """
821 self.logger.info("ACLP_TEST_START_0011")
822
823 # Add an ACL
824 rules = []
825 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
826 self.proto[self.IP][self.UDP]))
827 # deny ip any any in the end
828 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
829
830 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800831 self.apply_rules(rules, b"permit ip6 udp")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100832
833 # Traffic should still pass
834 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
835
836 self.logger.info("ACLP_TEST_FINISH_0011")
837
838 def test_0012_tcp_deny(self):
839 """ deny TCPv4/v6
840 """
841 self.logger.info("ACLP_TEST_START_0012")
842
843 # Add an ACL
844 rules = []
845 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
846 self.proto[self.IP][self.TCP]))
847 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
848 self.proto[self.IP][self.TCP]))
849 # permit ip any any in the end
850 rules.append(self.create_rule(self.IPV4, self.PERMIT,
851 self.PORTS_ALL, 0))
852 rules.append(self.create_rule(self.IPV6, self.PERMIT,
853 self.PORTS_ALL, 0))
854
855 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800856 self.apply_rules(rules, b"deny ip4/ip6 tcp")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100857
858 # Traffic should not pass
859 self.run_verify_negat_test(self.IP, self.IPRANDOM,
860 self.proto[self.IP][self.TCP])
861
862 self.logger.info("ACLP_TEST_FINISH_0012")
863
864 def test_0013_udp_deny(self):
865 """ deny UDPv4/v6
866 """
867 self.logger.info("ACLP_TEST_START_0013")
868
869 # Add an ACL
870 rules = []
871 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
872 self.proto[self.IP][self.UDP]))
873 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
874 self.proto[self.IP][self.UDP]))
875 # permit ip any any in the end
876 rules.append(self.create_rule(self.IPV4, self.PERMIT,
877 self.PORTS_ALL, 0))
878 rules.append(self.create_rule(self.IPV6, self.PERMIT,
879 self.PORTS_ALL, 0))
880
881 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800882 self.apply_rules(rules, b"deny ip4/ip6 udp")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100883
884 # Traffic should not pass
885 self.run_verify_negat_test(self.IP, self.IPRANDOM,
886 self.proto[self.IP][self.UDP])
887
888 self.logger.info("ACLP_TEST_FINISH_0013")
889
890 def test_0014_acl_dump(self):
891 """ verify add/dump acls
892 """
893 self.logger.info("ACLP_TEST_START_0014")
894
895 r = [[self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
896 [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
897 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
898 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
899 [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
900 [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
901 [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
902 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
903 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
904 [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
905 [self.IPV4, self.DENY, self.PORTS_ALL, 0],
906 [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
907 [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
908 [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
909 [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
910 [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
911 [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
912 [self.IPV6, self.DENY, self.PORTS_ALL, 0]
913 ]
914
915 # Add and verify new ACLs
916 rules = []
917 for i in range(len(r)):
918 rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
919
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200920 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules)
921 result = self.vapi.acl_dump(reply.acl_index)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100922
923 i = 0
924 for drules in result:
925 for dr in drules.r:
926 self.assertEqual(dr.is_ipv6, r[i][0])
927 self.assertEqual(dr.is_permit, r[i][1])
928 self.assertEqual(dr.proto, r[i][3])
929
930 if r[i][2] > 0:
931 self.assertEqual(dr.srcport_or_icmptype_first, r[i][2])
932 else:
933 if r[i][2] < 0:
934 self.assertEqual(dr.srcport_or_icmptype_first, 0)
935 self.assertEqual(dr.srcport_or_icmptype_last, 65535)
936 else:
937 if dr.proto == self.proto[self.IP][self.TCP]:
938 self.assertGreater(dr.srcport_or_icmptype_first,
939 self.tcp_sport_from-1)
940 self.assertLess(dr.srcport_or_icmptype_first,
941 self.tcp_sport_to+1)
942 self.assertGreater(dr.dstport_or_icmpcode_last,
943 self.tcp_dport_from-1)
944 self.assertLess(dr.dstport_or_icmpcode_last,
945 self.tcp_dport_to+1)
946 elif dr.proto == self.proto[self.IP][self.UDP]:
947 self.assertGreater(dr.srcport_or_icmptype_first,
948 self.udp_sport_from-1)
949 self.assertLess(dr.srcport_or_icmptype_first,
950 self.udp_sport_to+1)
951 self.assertGreater(dr.dstport_or_icmpcode_last,
952 self.udp_dport_from-1)
953 self.assertLess(dr.dstport_or_icmpcode_last,
954 self.udp_dport_to+1)
955 i += 1
956
957 self.logger.info("ACLP_TEST_FINISH_0014")
958
959 def test_0015_tcp_permit_port_v4(self):
960 """ permit single TCPv4
961 """
962 self.logger.info("ACLP_TEST_START_0015")
963
964 port = random.randint(0, 65535)
965 # Add an ACL
966 rules = []
967 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
968 self.proto[self.IP][self.TCP]))
969 # deny ip any any in the end
970 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
971
972 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800973 self.apply_rules(rules, b"permit ip4 tcp %d" % port)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100974
975 # Traffic should still pass
976 self.run_verify_test(self.IP, self.IPV4,
977 self.proto[self.IP][self.TCP], port)
978
979 self.logger.info("ACLP_TEST_FINISH_0015")
980
981 def test_0016_udp_permit_port_v4(self):
982 """ permit single UDPv4
983 """
984 self.logger.info("ACLP_TEST_START_0016")
985
986 port = random.randint(0, 65535)
987 # Add an ACL
988 rules = []
989 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
990 self.proto[self.IP][self.UDP]))
991 # deny ip any any in the end
992 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
993
994 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -0800995 self.apply_rules(rules, b"permit ip4 tcp %d" % port)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100996
997 # Traffic should still pass
998 self.run_verify_test(self.IP, self.IPV4,
999 self.proto[self.IP][self.UDP], port)
1000
1001 self.logger.info("ACLP_TEST_FINISH_0016")
1002
1003 def test_0017_tcp_permit_port_v6(self):
1004 """ permit single TCPv6
1005 """
1006 self.logger.info("ACLP_TEST_START_0017")
1007
1008 port = random.randint(0, 65535)
1009 # Add an ACL
1010 rules = []
1011 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1012 self.proto[self.IP][self.TCP]))
1013 # deny ip any any in the end
1014 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1015
1016 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -08001017 self.apply_rules(rules, b"permit ip4 tcp %d" % port)
Pavel Kotucek59dda062017-03-02 15:22:47 +01001018
1019 # Traffic should still pass
1020 self.run_verify_test(self.IP, self.IPV6,
1021 self.proto[self.IP][self.TCP], port)
1022
1023 self.logger.info("ACLP_TEST_FINISH_0017")
1024
1025 def test_0018_udp_permit_port_v6(self):
1026 """ permit single UPPv6
1027 """
1028 self.logger.info("ACLP_TEST_START_0018")
1029
1030 port = random.randint(0, 65535)
1031 # Add an ACL
1032 rules = []
1033 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1034 self.proto[self.IP][self.UDP]))
1035 # deny ip any any in the end
1036 rules.append(self.create_rule(self.IPV6, self.DENY,
1037 self.PORTS_ALL, 0))
1038
1039 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -08001040 self.apply_rules(rules, b"permit ip4 tcp %d" % port)
Pavel Kotucek59dda062017-03-02 15:22:47 +01001041
1042 # Traffic should still pass
1043 self.run_verify_test(self.IP, self.IPV6,
1044 self.proto[self.IP][self.UDP], port)
1045
1046 self.logger.info("ACLP_TEST_FINISH_0018")
1047
1048 def test_0019_udp_deny_port(self):
1049 """ deny single TCPv4/v6
1050 """
1051 self.logger.info("ACLP_TEST_START_0019")
1052
1053 port = random.randint(0, 65535)
1054 # Add an ACL
1055 rules = []
1056 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1057 self.proto[self.IP][self.TCP]))
1058 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1059 self.proto[self.IP][self.TCP]))
1060 # Permit ip any any in the end
1061 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1062 self.PORTS_ALL, 0))
1063 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1064 self.PORTS_ALL, 0))
1065
1066 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -08001067 self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port)
Pavel Kotucek59dda062017-03-02 15:22:47 +01001068
1069 # Traffic should not pass
1070 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1071 self.proto[self.IP][self.TCP], port)
1072
1073 self.logger.info("ACLP_TEST_FINISH_0019")
1074
1075 def test_0020_udp_deny_port(self):
1076 """ deny single UDPv4/v6
1077 """
1078 self.logger.info("ACLP_TEST_START_0020")
1079
1080 port = random.randint(0, 65535)
1081 # Add an ACL
1082 rules = []
1083 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1084 self.proto[self.IP][self.UDP]))
1085 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1086 self.proto[self.IP][self.UDP]))
1087 # Permit ip any any in the end
1088 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1089 self.PORTS_ALL, 0))
1090 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1091 self.PORTS_ALL, 0))
1092
1093 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -08001094 self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port)
Pavel Kotucek59dda062017-03-02 15:22:47 +01001095
1096 # Traffic should not pass
1097 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1098 self.proto[self.IP][self.UDP], port)
1099
1100 self.logger.info("ACLP_TEST_FINISH_0020")
1101
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +00001102 def test_0021_udp_deny_port_verify_fragment_deny(self):
Chris Luked0421942018-04-10 15:19:54 -04001103 """ deny single UDPv4/v6, permit ip any, verify non-initial fragment
1104 blocked
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +00001105 """
1106 self.logger.info("ACLP_TEST_START_0021")
1107
1108 port = random.randint(0, 65535)
1109 # Add an ACL
1110 rules = []
1111 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1112 self.proto[self.IP][self.UDP]))
1113 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1114 self.proto[self.IP][self.UDP]))
1115 # deny ip any any in the end
1116 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1117 self.PORTS_ALL, 0))
1118 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1119 self.PORTS_ALL, 0))
1120
1121 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -08001122 self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port)
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +00001123
1124 # Traffic should not pass
1125 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1126 self.proto[self.IP][self.UDP], port, True)
1127
1128 self.logger.info("ACLP_TEST_FINISH_0021")
1129
Pavel Kotuceke7b67342017-04-18 13:12:20 +02001130 def test_0022_zero_length_udp_ipv4(self):
1131 """ VPP-687 zero length udp ipv4 packet"""
1132 self.logger.info("ACLP_TEST_START_0022")
1133
1134 port = random.randint(0, 65535)
1135 # Add an ACL
1136 rules = []
1137 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
1138 self.proto[self.IP][self.UDP]))
1139 # deny ip any any in the end
1140 rules.append(
1141 self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1142
1143 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -08001144 self.apply_rules(rules, b"permit empty udp ip4 %d" % port)
Pavel Kotuceke7b67342017-04-18 13:12:20 +02001145
1146 # Traffic should still pass
1147 # Create incoming packet streams for packet-generator interfaces
1148 pkts_cnt = 0
1149 pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1150 self.IP, self.IPV4,
1151 self.proto[self.IP][self.UDP], port,
1152 False, False)
1153 if len(pkts) > 0:
1154 self.pg0.add_stream(pkts)
1155 pkts_cnt += len(pkts)
1156
1157 # Enable packet capture and start packet sendingself.IPV
1158 self.pg_enable_capture(self.pg_interfaces)
1159 self.pg_start()
1160
1161 self.pg1.get_capture(pkts_cnt)
1162
1163 self.logger.info("ACLP_TEST_FINISH_0022")
1164
1165 def test_0023_zero_length_udp_ipv6(self):
1166 """ VPP-687 zero length udp ipv6 packet"""
1167 self.logger.info("ACLP_TEST_START_0023")
1168
1169 port = random.randint(0, 65535)
1170 # Add an ACL
1171 rules = []
1172 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1173 self.proto[self.IP][self.UDP]))
1174 # deny ip any any in the end
1175 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1176
1177 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -08001178 self.apply_rules(rules, b"permit empty udp ip6 %d" % port)
Pavel Kotuceke7b67342017-04-18 13:12:20 +02001179
1180 # Traffic should still pass
1181 # Create incoming packet streams for packet-generator interfaces
1182 pkts_cnt = 0
1183 pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1184 self.IP, self.IPV6,
1185 self.proto[self.IP][self.UDP], port,
1186 False, False)
1187 if len(pkts) > 0:
1188 self.pg0.add_stream(pkts)
1189 pkts_cnt += len(pkts)
1190
1191 # Enable packet capture and start packet sendingself.IPV
1192 self.pg_enable_capture(self.pg_interfaces)
1193 self.pg_start()
1194
1195 # Verify outgoing packet streams per packet-generator interface
1196 self.pg1.get_capture(pkts_cnt)
1197
1198 self.logger.info("ACLP_TEST_FINISH_0023")
1199
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +02001200 def test_0108_tcp_permit_v4(self):
1201 """ permit TCPv4 + non-match range
1202 """
1203 self.logger.info("ACLP_TEST_START_0108")
1204
1205 # Add an ACL
1206 rules = []
1207 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1208 self.proto[self.IP][self.TCP]))
1209 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1210 self.proto[self.IP][self.TCP]))
1211 # deny ip any any in the end
1212 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1213
1214 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -08001215 self.apply_rules(rules, b"permit ipv4 tcp")
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +02001216
1217 # Traffic should still pass
1218 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1219
1220 self.logger.info("ACLP_TEST_FINISH_0108")
1221
1222 def test_0109_tcp_permit_v6(self):
1223 """ permit TCPv6 + non-match range
1224 """
1225 self.logger.info("ACLP_TEST_START_0109")
1226
1227 # Add an ACL
1228 rules = []
1229 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1230 self.proto[self.IP][self.TCP]))
1231 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1232 self.proto[self.IP][self.TCP]))
1233 # deny ip any any in the end
1234 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1235
1236 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -08001237 self.apply_rules(rules, b"permit ip6 tcp")
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +02001238
1239 # Traffic should still pass
1240 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
1241
1242 self.logger.info("ACLP_TEST_FINISH_0109")
1243
1244 def test_0110_udp_permit_v4(self):
1245 """ permit UDPv4 + non-match range
1246 """
1247 self.logger.info("ACLP_TEST_START_0110")
1248
1249 # Add an ACL
1250 rules = []
1251 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1252 self.proto[self.IP][self.UDP]))
1253 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1254 self.proto[self.IP][self.UDP]))
1255 # deny ip any any in the end
1256 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1257
1258 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -08001259 self.apply_rules(rules, b"permit ipv4 udp")
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +02001260
1261 # Traffic should still pass
1262 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
1263
1264 self.logger.info("ACLP_TEST_FINISH_0110")
1265
1266 def test_0111_udp_permit_v6(self):
1267 """ permit UDPv6 + non-match range
1268 """
1269 self.logger.info("ACLP_TEST_START_0111")
1270
1271 # Add an ACL
1272 rules = []
1273 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1274 self.proto[self.IP][self.UDP]))
1275 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1276 self.proto[self.IP][self.UDP]))
1277 # deny ip any any in the end
1278 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1279
1280 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -08001281 self.apply_rules(rules, b"permit ip6 udp")
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +02001282
1283 # Traffic should still pass
1284 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
1285
1286 self.logger.info("ACLP_TEST_FINISH_0111")
1287
1288 def test_0112_tcp_deny(self):
1289 """ deny TCPv4/v6 + non-match range
1290 """
1291 self.logger.info("ACLP_TEST_START_0112")
1292
1293 # Add an ACL
1294 rules = []
1295 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1296 self.PORTS_RANGE_2,
1297 self.proto[self.IP][self.TCP]))
1298 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1299 self.PORTS_RANGE_2,
1300 self.proto[self.IP][self.TCP]))
1301 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1302 self.proto[self.IP][self.TCP]))
1303 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1304 self.proto[self.IP][self.TCP]))
1305 # permit ip any any in the end
1306 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1307 self.PORTS_ALL, 0))
1308 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1309 self.PORTS_ALL, 0))
1310
1311 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -08001312 self.apply_rules(rules, b"deny ip4/ip6 tcp")
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +02001313
1314 # Traffic should not pass
1315 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1316 self.proto[self.IP][self.TCP])
1317
1318 self.logger.info("ACLP_TEST_FINISH_0112")
1319
1320 def test_0113_udp_deny(self):
1321 """ deny UDPv4/v6 + non-match range
1322 """
1323 self.logger.info("ACLP_TEST_START_0113")
1324
1325 # Add an ACL
1326 rules = []
1327 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1328 self.PORTS_RANGE_2,
1329 self.proto[self.IP][self.UDP]))
1330 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1331 self.PORTS_RANGE_2,
1332 self.proto[self.IP][self.UDP]))
1333 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1334 self.proto[self.IP][self.UDP]))
1335 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1336 self.proto[self.IP][self.UDP]))
1337 # permit ip any any in the end
1338 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1339 self.PORTS_ALL, 0))
1340 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1341 self.PORTS_ALL, 0))
1342
1343 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -08001344 self.apply_rules(rules, b"deny ip4/ip6 udp")
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +02001345
1346 # Traffic should not pass
1347 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1348 self.proto[self.IP][self.UDP])
1349
1350 self.logger.info("ACLP_TEST_FINISH_0113")
1351
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001352 def test_0300_tcp_permit_v4_etype_aaaa(self):
1353 """ permit TCPv4, send 0xAAAA etype
1354 """
1355 self.logger.info("ACLP_TEST_START_0300")
1356
1357 # Add an ACL
1358 rules = []
1359 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1360 self.proto[self.IP][self.TCP]))
1361 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1362 self.proto[self.IP][self.TCP]))
1363 # deny ip any any in the end
1364 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1365
1366 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -08001367 self.apply_rules(rules, b"permit ipv4 tcp")
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001368
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001369 # Traffic should still pass also for an odd ethertype
1370 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1371 0, False, True, 0xaaaa)
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001372 self.logger.info("ACLP_TEST_FINISH_0300")
1373
1374 def test_0305_tcp_permit_v4_etype_blacklist_aaaa(self):
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +02001375 """ permit TCPv4, whitelist 0x0BBB ethertype, send 0xAAAA-blocked
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001376 """
1377 self.logger.info("ACLP_TEST_START_0305")
1378
1379 # Add an ACL
1380 rules = []
1381 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1382 self.proto[self.IP][self.TCP]))
1383 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1384 self.proto[self.IP][self.TCP]))
1385 # deny ip any any in the end
1386 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1387
1388 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -08001389 self.apply_rules(rules, b"permit ipv4 tcp")
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001390
1391 # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1392 self.etype_whitelist([0xbbb], 1)
1393
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001394 # The oddball ethertype should be blocked
1395 self.run_verify_negat_test(self.IP, self.IPV4,
1396 self.proto[self.IP][self.TCP],
1397 0, False, 0xaaaa)
1398
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +02001399 # remove the whitelist
1400 self.etype_whitelist([], 0)
1401
1402 self.logger.info("ACLP_TEST_FINISH_0305")
1403
1404 def test_0306_tcp_permit_v4_etype_blacklist_aaaa(self):
1405 """ permit TCPv4, whitelist 0x0BBB ethertype, send 0x0BBB - pass
1406 """
1407 self.logger.info("ACLP_TEST_START_0306")
1408
1409 # Add an ACL
1410 rules = []
1411 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1412 self.proto[self.IP][self.TCP]))
1413 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1414 self.proto[self.IP][self.TCP]))
1415 # deny ip any any in the end
1416 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1417
1418 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -08001419 self.apply_rules(rules, b"permit ipv4 tcp")
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +02001420
1421 # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1422 self.etype_whitelist([0xbbb], 1)
1423
1424 # The whitelisted traffic, should pass
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001425 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1426 0, False, True, 0x0bbb)
1427
1428 # remove the whitelist, the previously blocked 0xAAAA should pass now
1429 self.etype_whitelist([], 0)
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +02001430
1431 self.logger.info("ACLP_TEST_FINISH_0306")
1432
1433 def test_0307_tcp_permit_v4_etype_blacklist_aaaa(self):
1434 """ permit TCPv4, whitelist 0x0BBB, remove, send 0xAAAA - pass
1435 """
1436 self.logger.info("ACLP_TEST_START_0307")
1437
1438 # Add an ACL
1439 rules = []
1440 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1441 self.proto[self.IP][self.TCP]))
1442 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1443 self.proto[self.IP][self.TCP]))
1444 # deny ip any any in the end
1445 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1446
1447 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -08001448 self.apply_rules(rules, b"permit ipv4 tcp")
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +02001449
1450 # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1451 self.etype_whitelist([0xbbb], 1)
1452 # remove the whitelist, the previously blocked 0xAAAA should pass now
1453 self.etype_whitelist([], 0)
1454
1455 # The whitelisted traffic, should pass
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001456 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1457 0, False, True, 0xaaaa)
1458
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +02001459 self.logger.info("ACLP_TEST_FINISH_0306")
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +02001460
Andrew Yourtchenkode3682f2018-03-23 10:38:46 +01001461 def test_0315_del_intf(self):
1462 """ apply an acl and delete the interface
1463 """
1464 self.logger.info("ACLP_TEST_START_0315")
1465
1466 # Add an ACL
1467 rules = []
1468 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1469 self.proto[self.IP][self.TCP]))
1470 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1471 self.proto[self.IP][self.TCP]))
1472 # deny ip any any in the end
1473 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1474
1475 # create an interface
1476 intf = []
Klement Sekerabeaded52018-06-24 10:30:37 +02001477 intf.append(VppLoInterface(self))
Andrew Yourtchenkode3682f2018-03-23 10:38:46 +01001478
1479 # Apply rules
Paul Vinciguerra4a4cea02019-03-06 19:06:19 -08001480 self.apply_rules_to(rules, b"permit ipv4 tcp", intf[0].sw_if_index)
Andrew Yourtchenkode3682f2018-03-23 10:38:46 +01001481
1482 # Remove the interface
1483 intf[0].remove_vpp_config()
1484
1485 self.logger.info("ACLP_TEST_FINISH_0315")
1486
Pavel Kotucek59dda062017-03-02 15:22:47 +01001487if __name__ == '__main__':
1488 unittest.main(testRunner=VppTestRunner)