blob: f3ab38127166273f38c3da2e17ae429253637cd3 [file] [log] [blame]
Pavel Kotucek59dda062017-03-02 15:22:47 +01001#!/usr/bin/env python
2"""ACL plugin Test Case HLD:
3"""
4
5import unittest
6import random
7
8from scapy.packet import Raw
9from scapy.layers.l2 import Ether
10from scapy.layers.inet import IP, TCP, UDP, ICMP
11from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +000012from scapy.layers.inet6 import IPv6ExtHdrFragment
Pavel Kotucek59dda062017-03-02 15:22:47 +010013from framework import VppTestCase, VppTestRunner
14from util import Host, ppp
15
Andrew Yourtchenkode3682f2018-03-23 10:38:46 +010016from vpp_lo_interface import VppLoInterface
17
Pavel Kotucek59dda062017-03-02 15:22:47 +010018
19class TestACLplugin(VppTestCase):
20 """ ACL plugin Test Case """
21
22 # traffic types
23 IP = 0
24 ICMP = 1
25
26 # IP version
27 IPRANDOM = -1
28 IPV4 = 0
29 IPV6 = 1
30
31 # rule types
32 DENY = 0
33 PERMIT = 1
34
35 # supported protocols
36 proto = [[6, 17], [1, 58]]
37 proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
38 ICMPv4 = 0
39 ICMPv6 = 1
40 TCP = 0
41 UDP = 1
42 PROTO_ALL = 0
43
44 # port ranges
45 PORTS_ALL = -1
46 PORTS_RANGE = 0
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +020047 PORTS_RANGE_2 = 1
Pavel Kotucek59dda062017-03-02 15:22:47 +010048 udp_sport_from = 10
49 udp_sport_to = udp_sport_from + 5
50 udp_dport_from = 20000
51 udp_dport_to = udp_dport_from + 5000
52 tcp_sport_from = 30
53 tcp_sport_to = tcp_sport_from + 5
54 tcp_dport_from = 40000
55 tcp_dport_to = tcp_dport_from + 5000
56
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +020057 udp_sport_from_2 = 90
58 udp_sport_to_2 = udp_sport_from_2 + 5
59 udp_dport_from_2 = 30000
60 udp_dport_to_2 = udp_dport_from_2 + 5000
61 tcp_sport_from_2 = 130
62 tcp_sport_to_2 = tcp_sport_from_2 + 5
63 tcp_dport_from_2 = 20000
64 tcp_dport_to_2 = tcp_dport_from_2 + 5000
65
Pavel Kotucek59dda062017-03-02 15:22:47 +010066 icmp4_type = 8 # echo request
67 icmp4_code = 3
68 icmp6_type = 128 # echo request
69 icmp6_code = 3
70
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +020071 icmp4_type_2 = 8
72 icmp4_code_from_2 = 5
73 icmp4_code_to_2 = 20
74 icmp6_type_2 = 128
75 icmp6_code_from_2 = 8
76 icmp6_code_to_2 = 42
77
Pavel Kotucek59dda062017-03-02 15:22:47 +010078 # Test variables
79 bd_id = 1
80
81 @classmethod
82 def setUpClass(cls):
83 """
84 Perform standard class setup (defined by class method setUpClass in
85 class VppTestCase) before running the test case, set test case related
86 variables and configure VPP.
87 """
88 super(TestACLplugin, cls).setUpClass()
89
Pavel Kotucek59dda062017-03-02 15:22:47 +010090 try:
91 # Create 2 pg interfaces
92 cls.create_pg_interfaces(range(2))
93
94 # Packet flows mapping pg0 -> pg1, pg2 etc.
95 cls.flows = dict()
96 cls.flows[cls.pg0] = [cls.pg1]
97
98 # Packet sizes
99 cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
100
101 # Create BD with MAC learning and unknown unicast flooding disabled
102 # and put interfaces to this BD
103 cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
104 learn=1)
105 for pg_if in cls.pg_interfaces:
106 cls.vapi.sw_interface_set_l2_bridge(pg_if.sw_if_index,
107 bd_id=cls.bd_id)
108
109 # Set up all interfaces
110 for i in cls.pg_interfaces:
111 i.admin_up()
112
113 # Mapping between packet-generator index and lists of test hosts
114 cls.hosts_by_pg_idx = dict()
115 for pg_if in cls.pg_interfaces:
116 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
117
118 # Create list of deleted hosts
119 cls.deleted_hosts_by_pg_idx = dict()
120 for pg_if in cls.pg_interfaces:
121 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
122
123 # warm-up the mac address tables
124 # self.warmup_test()
125
126 except Exception:
127 super(TestACLplugin, cls).tearDownClass()
128 raise
129
130 def setUp(self):
131 super(TestACLplugin, self).setUp()
132 self.reset_packet_infos()
133
134 def tearDown(self):
135 """
136 Show various debug prints after each test.
137 """
138 super(TestACLplugin, self).tearDown()
139 if not self.vpp_dead:
140 self.logger.info(self.vapi.ppcli("show l2fib verbose"))
Andrew Yourtchenko7f4d5772017-05-24 13:20:47 +0200141 self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
142 self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
143 self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
Pavel Kotucek59dda062017-03-02 15:22:47 +0100144 self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
145 % self.bd_id))
146
147 def create_hosts(self, count, start=0):
148 """
149 Create required number of host MAC addresses and distribute them among
150 interfaces. Create host IPv4 address for every host MAC address.
151
152 :param int count: Number of hosts to create MAC/IPv4 addresses for.
153 :param int start: Number to start numbering from.
154 """
155 n_int = len(self.pg_interfaces)
156 macs_per_if = count / n_int
157 i = -1
158 for pg_if in self.pg_interfaces:
159 i += 1
160 start_nr = macs_per_if * i + start
161 end_nr = count + start if i == (n_int - 1) \
162 else macs_per_if * (i + 1) + start
163 hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
164 for j in range(start_nr, end_nr):
165 host = Host(
166 "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
167 "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
168 "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
169 hosts.append(host)
170
171 def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
172 s_prefix=0, s_ip='\x00\x00\x00\x00',
173 d_prefix=0, d_ip='\x00\x00\x00\x00'):
174 if proto == -1:
175 return
176 if ports == self.PORTS_ALL:
177 sport_from = 0
178 dport_from = 0
179 sport_to = 65535 if proto != 1 and proto != 58 else 255
180 dport_to = sport_to
181 elif ports == self.PORTS_RANGE:
182 if proto == 1:
183 sport_from = self.icmp4_type
184 sport_to = self.icmp4_type
185 dport_from = self.icmp4_code
186 dport_to = self.icmp4_code
187 elif proto == 58:
188 sport_from = self.icmp6_type
189 sport_to = self.icmp6_type
190 dport_from = self.icmp6_code
191 dport_to = self.icmp6_code
192 elif proto == self.proto[self.IP][self.TCP]:
193 sport_from = self.tcp_sport_from
194 sport_to = self.tcp_sport_to
195 dport_from = self.tcp_dport_from
196 dport_to = self.tcp_dport_to
197 elif proto == self.proto[self.IP][self.UDP]:
198 sport_from = self.udp_sport_from
199 sport_to = self.udp_sport_to
200 dport_from = self.udp_dport_from
201 dport_to = self.udp_dport_to
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +0200202 elif ports == self.PORTS_RANGE_2:
203 if proto == 1:
204 sport_from = self.icmp4_type_2
205 sport_to = self.icmp4_type_2
206 dport_from = self.icmp4_code_from_2
207 dport_to = self.icmp4_code_to_2
208 elif proto == 58:
209 sport_from = self.icmp6_type_2
210 sport_to = self.icmp6_type_2
211 dport_from = self.icmp6_code_from_2
212 dport_to = self.icmp6_code_to_2
213 elif proto == self.proto[self.IP][self.TCP]:
214 sport_from = self.tcp_sport_from_2
215 sport_to = self.tcp_sport_to_2
216 dport_from = self.tcp_dport_from_2
217 dport_to = self.tcp_dport_to_2
218 elif proto == self.proto[self.IP][self.UDP]:
219 sport_from = self.udp_sport_from_2
220 sport_to = self.udp_sport_to_2
221 dport_from = self.udp_dport_from_2
222 dport_to = self.udp_dport_to_2
Pavel Kotucek59dda062017-03-02 15:22:47 +0100223 else:
224 sport_from = ports
225 sport_to = ports
226 dport_from = ports
227 dport_to = ports
228
229 rule = ({'is_permit': permit_deny, 'is_ipv6': ip, 'proto': proto,
230 'srcport_or_icmptype_first': sport_from,
231 'srcport_or_icmptype_last': sport_to,
232 'src_ip_prefix_len': s_prefix,
233 'src_ip_addr': s_ip,
234 'dstport_or_icmpcode_first': dport_from,
235 'dstport_or_icmpcode_last': dport_to,
236 'dst_ip_prefix_len': d_prefix,
237 'dst_ip_addr': d_ip})
238 return rule
239
240 def apply_rules(self, rules, tag=''):
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200241 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
242 tag=tag)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100243 self.logger.info("Dumped ACL: " + str(
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200244 self.vapi.acl_dump(reply.acl_index)))
Pavel Kotucek59dda062017-03-02 15:22:47 +0100245 # Apply a ACL on the interface as inbound
246 for i in self.pg_interfaces:
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200247 self.vapi.acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
248 n_input=1,
249 acls=[reply.acl_index])
Pavel Kotucek59dda062017-03-02 15:22:47 +0100250 return
251
Andrew Yourtchenkode3682f2018-03-23 10:38:46 +0100252 def apply_rules_to(self, rules, tag='', sw_if_index=0xFFFFFFFF):
253 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
254 tag=tag)
255 self.logger.info("Dumped ACL: " + str(
256 self.vapi.acl_dump(reply.acl_index)))
257 # Apply a ACL on the interface as inbound
258 self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index,
259 n_input=1,
260 acls=[reply.acl_index])
261 return
262
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100263 def etype_whitelist(self, whitelist, n_input):
264 # Apply whitelists on all the interfaces
265 for i in self.pg_interfaces:
266 # checkstyle can't read long names. Help them.
267 fun = self.vapi.acl_interface_set_etype_whitelist
268 fun(sw_if_index=i.sw_if_index, n_input=n_input,
269 whitelist=whitelist)
270 return
271
Pavel Kotucek59dda062017-03-02 15:22:47 +0100272 def create_upper_layer(self, packet_index, proto, ports=0):
273 p = self.proto_map[proto]
274 if p == 'UDP':
275 if ports == 0:
276 return UDP(sport=random.randint(self.udp_sport_from,
277 self.udp_sport_to),
278 dport=random.randint(self.udp_dport_from,
279 self.udp_dport_to))
280 else:
281 return UDP(sport=ports, dport=ports)
282 elif p == 'TCP':
283 if ports == 0:
284 return TCP(sport=random.randint(self.tcp_sport_from,
285 self.tcp_sport_to),
286 dport=random.randint(self.tcp_dport_from,
287 self.tcp_dport_to))
288 else:
289 return TCP(sport=ports, dport=ports)
290 return ''
291
292 def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100293 proto=-1, ports=0, fragments=False,
294 pkt_raw=True, etype=-1):
Pavel Kotucek59dda062017-03-02 15:22:47 +0100295 """
296 Create input packet stream for defined interface using hosts or
297 deleted_hosts list.
298
299 :param object src_if: Interface to create packet stream for.
300 :param list packet_sizes: List of required packet sizes.
301 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
302 :return: Stream of packets.
303 """
304 pkts = []
305 if self.flows.__contains__(src_if):
306 src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
307 for dst_if in self.flows[src_if]:
308 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
309 n_int = len(dst_hosts) * len(src_hosts)
310 for i in range(0, n_int):
311 dst_host = dst_hosts[i / len(src_hosts)]
312 src_host = src_hosts[i % len(src_hosts)]
313 pkt_info = self.create_packet_info(src_if, dst_if)
314 if ipv6 == 1:
315 pkt_info.ip = 1
316 elif ipv6 == 0:
317 pkt_info.ip = 0
318 else:
319 pkt_info.ip = random.choice([0, 1])
320 if proto == -1:
321 pkt_info.proto = random.choice(self.proto[self.IP])
322 else:
323 pkt_info.proto = proto
324 payload = self.info_to_payload(pkt_info)
325 p = Ether(dst=dst_host.mac, src=src_host.mac)
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100326 if etype > 0:
327 p = Ether(dst=dst_host.mac,
328 src=src_host.mac,
329 type=etype)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100330 if pkt_info.ip:
331 p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000332 if fragments:
333 p /= IPv6ExtHdrFragment(offset=64, m=1)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100334 else:
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000335 if fragments:
336 p /= IP(src=src_host.ip4, dst=dst_host.ip4,
337 flags=1, frag=64)
338 else:
339 p /= IP(src=src_host.ip4, dst=dst_host.ip4)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100340 if traffic_type == self.ICMP:
341 if pkt_info.ip:
342 p /= ICMPv6EchoRequest(type=self.icmp6_type,
343 code=self.icmp6_code)
344 else:
345 p /= ICMP(type=self.icmp4_type,
346 code=self.icmp4_code)
347 else:
348 p /= self.create_upper_layer(i, pkt_info.proto, ports)
Pavel Kotuceke7b67342017-04-18 13:12:20 +0200349 if pkt_raw:
350 p /= Raw(payload)
351 pkt_info.data = p.copy()
352 if pkt_raw:
353 size = random.choice(packet_sizes)
354 self.extend_packet(p, size)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100355 pkts.append(p)
356 return pkts
357
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100358 def verify_capture(self, pg_if, capture,
359 traffic_type=0, ip_type=0, etype=-1):
Pavel Kotucek59dda062017-03-02 15:22:47 +0100360 """
361 Verify captured input packet stream for defined interface.
362
363 :param object pg_if: Interface to verify captured packet stream for.
364 :param list capture: Captured packet stream.
365 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
366 """
367 last_info = dict()
368 for i in self.pg_interfaces:
369 last_info[i.sw_if_index] = None
370 dst_sw_if_index = pg_if.sw_if_index
371 for packet in capture:
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100372 if etype > 0:
373 if packet[Ether].type != etype:
374 self.logger.error(ppp("Unexpected ethertype in packet:",
375 packet))
376 else:
377 continue
Pavel Kotucek59dda062017-03-02 15:22:47 +0100378 try:
379 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
380 if traffic_type == self.ICMP and ip_type == self.IPV6:
381 payload_info = self.payload_to_info(
382 packet[ICMPv6EchoRequest].data)
383 payload = packet[ICMPv6EchoRequest]
384 else:
385 payload_info = self.payload_to_info(str(packet[Raw]))
386 payload = packet[self.proto_map[payload_info.proto]]
387 except:
388 self.logger.error(ppp("Unexpected or invalid packet "
389 "(outside network):", packet))
390 raise
391
392 if ip_type != 0:
393 self.assertEqual(payload_info.ip, ip_type)
394 if traffic_type == self.ICMP:
395 try:
396 if payload_info.ip == 0:
397 self.assertEqual(payload.type, self.icmp4_type)
398 self.assertEqual(payload.code, self.icmp4_code)
399 else:
400 self.assertEqual(payload.type, self.icmp6_type)
401 self.assertEqual(payload.code, self.icmp6_code)
402 except:
403 self.logger.error(ppp("Unexpected or invalid packet "
404 "(outside network):", packet))
405 raise
406 else:
407 try:
408 ip_version = IPv6 if payload_info.ip == 1 else IP
409
410 ip = packet[ip_version]
411 packet_index = payload_info.index
412
413 self.assertEqual(payload_info.dst, dst_sw_if_index)
414 self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
415 (pg_if.name, payload_info.src,
416 packet_index))
417 next_info = self.get_next_packet_info_for_interface2(
418 payload_info.src, dst_sw_if_index,
419 last_info[payload_info.src])
420 last_info[payload_info.src] = next_info
421 self.assertTrue(next_info is not None)
422 self.assertEqual(packet_index, next_info.index)
423 saved_packet = next_info.data
424 # Check standard fields
425 self.assertEqual(ip.src, saved_packet[ip_version].src)
426 self.assertEqual(ip.dst, saved_packet[ip_version].dst)
427 p = self.proto_map[payload_info.proto]
428 if p == 'TCP':
429 tcp = packet[TCP]
430 self.assertEqual(tcp.sport, saved_packet[
431 TCP].sport)
432 self.assertEqual(tcp.dport, saved_packet[
433 TCP].dport)
434 elif p == 'UDP':
435 udp = packet[UDP]
436 self.assertEqual(udp.sport, saved_packet[
437 UDP].sport)
438 self.assertEqual(udp.dport, saved_packet[
439 UDP].dport)
440 except:
441 self.logger.error(ppp("Unexpected or invalid packet:",
442 packet))
443 raise
444 for i in self.pg_interfaces:
445 remaining_packet = self.get_next_packet_info_for_interface2(
446 i, dst_sw_if_index, last_info[i.sw_if_index])
447 self.assertTrue(
448 remaining_packet is None,
449 "Port %u: Packet expected from source %u didn't arrive" %
450 (dst_sw_if_index, i.sw_if_index))
451
452 def run_traffic_no_check(self):
453 # Test
454 # Create incoming packet streams for packet-generator interfaces
455 for i in self.pg_interfaces:
456 if self.flows.__contains__(i):
457 pkts = self.create_stream(i, self.pg_if_packet_sizes)
458 if len(pkts) > 0:
459 i.add_stream(pkts)
460
461 # Enable packet capture and start packet sending
462 self.pg_enable_capture(self.pg_interfaces)
463 self.pg_start()
464
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000465 def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100466 frags=False, pkt_raw=True, etype=-1):
Pavel Kotucek59dda062017-03-02 15:22:47 +0100467 # Test
468 # Create incoming packet streams for packet-generator interfaces
469 pkts_cnt = 0
470 for i in self.pg_interfaces:
471 if self.flows.__contains__(i):
472 pkts = self.create_stream(i, self.pg_if_packet_sizes,
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000473 traffic_type, ip_type, proto, ports,
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100474 frags, pkt_raw, etype)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100475 if len(pkts) > 0:
476 i.add_stream(pkts)
477 pkts_cnt += len(pkts)
478
479 # Enable packet capture and start packet sendingself.IPV
480 self.pg_enable_capture(self.pg_interfaces)
481 self.pg_start()
482
483 # Verify
484 # Verify outgoing packet streams per packet-generator interface
485 for src_if in self.pg_interfaces:
486 if self.flows.__contains__(src_if):
487 for dst_if in self.flows[src_if]:
488 capture = dst_if.get_capture(pkts_cnt)
489 self.logger.info("Verifying capture on interface %s" %
490 dst_if.name)
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100491 self.verify_capture(dst_if, capture,
492 traffic_type, ip_type, etype)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100493
494 def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100495 ports=0, frags=False, etype=-1):
Pavel Kotucek59dda062017-03-02 15:22:47 +0100496 # Test
497 self.reset_packet_infos()
498 for i in self.pg_interfaces:
499 if self.flows.__contains__(i):
500 pkts = self.create_stream(i, self.pg_if_packet_sizes,
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000501 traffic_type, ip_type, proto, ports,
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100502 frags, True, etype)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100503 if len(pkts) > 0:
504 i.add_stream(pkts)
505
506 # Enable packet capture and start packet sending
507 self.pg_enable_capture(self.pg_interfaces)
508 self.pg_start()
509
510 # Verify
511 # Verify outgoing packet streams per packet-generator interface
512 for src_if in self.pg_interfaces:
513 if self.flows.__contains__(src_if):
514 for dst_if in self.flows[src_if]:
515 self.logger.info("Verifying capture on interface %s" %
516 dst_if.name)
517 capture = dst_if.get_capture(0)
518 self.assertEqual(len(capture), 0)
519
Pavel Kotucek59dda062017-03-02 15:22:47 +0100520 def test_0000_warmup_test(self):
521 """ ACL plugin version check; learn MACs
522 """
523 self.create_hosts(16)
524 self.run_traffic_no_check()
525 reply = self.vapi.papi.acl_plugin_get_version()
526 self.assertEqual(reply.major, 1)
527 self.logger.info("Working with ACL plugin version: %d.%d" % (
528 reply.major, reply.minor))
529 # minor version changes are non breaking
530 # self.assertEqual(reply.minor, 0)
531
532 def test_0001_acl_create(self):
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200533 """ ACL create/delete test
Pavel Kotucek59dda062017-03-02 15:22:47 +0100534 """
535
536 self.logger.info("ACLP_TEST_START_0001")
537 # Add an ACL
538 r = [{'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
539 'srcport_or_icmptype_first': 1234,
540 'srcport_or_icmptype_last': 1235,
541 'src_ip_prefix_len': 0,
542 'src_ip_addr': '\x00\x00\x00\x00',
543 'dstport_or_icmpcode_first': 1234,
544 'dstport_or_icmpcode_last': 1234,
545 'dst_ip_addr': '\x00\x00\x00\x00',
546 'dst_ip_prefix_len': 0}]
547 # Test 1: add a new ACL
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200548 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r,
549 tag="permit 1234")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100550 self.assertEqual(reply.retval, 0)
551 # The very first ACL gets #0
552 self.assertEqual(reply.acl_index, 0)
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200553 first_acl = reply.acl_index
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200554 rr = self.vapi.acl_dump(reply.acl_index)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100555 self.logger.info("Dumped ACL: " + str(rr))
556 self.assertEqual(len(rr), 1)
557 # We should have the same number of ACL entries as we had asked
558 self.assertEqual(len(rr[0].r), len(r))
559 # The rules should be the same. But because the submitted and returned
560 # are different types, we need to iterate over rules and keys to get
561 # to basic values.
562 for i_rule in range(0, len(r) - 1):
563 for rule_key in r[i_rule]:
564 self.assertEqual(rr[0].r[i_rule][rule_key],
565 r[i_rule][rule_key])
566
567 # Add a deny-1234 ACL
Ole Troan895b6e82017-10-20 13:28:20 +0200568 r_deny = [{'is_permit': 0, 'is_ipv6': 0, 'proto': 17,
Pavel Kotucek59dda062017-03-02 15:22:47 +0100569 'srcport_or_icmptype_first': 1234,
570 'srcport_or_icmptype_last': 1235,
571 'src_ip_prefix_len': 0,
572 'src_ip_addr': '\x00\x00\x00\x00',
573 'dstport_or_icmpcode_first': 1234,
574 'dstport_or_icmpcode_last': 1234,
575 'dst_ip_addr': '\x00\x00\x00\x00',
576 'dst_ip_prefix_len': 0},
577 {'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
578 'srcport_or_icmptype_first': 0,
579 'srcport_or_icmptype_last': 0,
580 'src_ip_prefix_len': 0,
581 'src_ip_addr': '\x00\x00\x00\x00',
582 'dstport_or_icmpcode_first': 0,
583 'dstport_or_icmpcode_last': 0,
584 'dst_ip_addr': '\x00\x00\x00\x00',
Ole Troan895b6e82017-10-20 13:28:20 +0200585 'dst_ip_prefix_len': 0}]
Pavel Kotucek59dda062017-03-02 15:22:47 +0100586
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200587 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_deny,
588 tag="deny 1234;permit all")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100589 self.assertEqual(reply.retval, 0)
590 # The second ACL gets #1
591 self.assertEqual(reply.acl_index, 1)
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200592 second_acl = reply.acl_index
Pavel Kotucek59dda062017-03-02 15:22:47 +0100593
594 # Test 2: try to modify a nonexistent ACL
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200595 reply = self.vapi.acl_add_replace(acl_index=432, r=r,
Jon Loeliger27cadd22017-11-10 13:15:12 -0600596 tag="FFFF:FFFF", expected_retval=-6)
597 self.assertEqual(reply.retval, -6)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100598 # The ACL number should pass through
599 self.assertEqual(reply.acl_index, 432)
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200600 # apply an ACL on an interface inbound, try to delete ACL, must fail
601 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
602 n_input=1,
603 acls=[first_acl])
Jon Loeliger27cadd22017-11-10 13:15:12 -0600604 reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=-142)
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200605 # Unapply an ACL and then try to delete it - must be ok
606 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
607 n_input=0,
608 acls=[])
609 reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=0)
610
611 # apply an ACL on an interface outbound, try to delete ACL, must fail
612 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
613 n_input=0,
614 acls=[second_acl])
Jon Loeliger27cadd22017-11-10 13:15:12 -0600615 reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=-143)
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200616 # Unapply the ACL and then try to delete it - must be ok
617 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
618 n_input=0,
619 acls=[])
620 reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=0)
621
622 # try to apply a nonexistent ACL - must fail
623 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
624 n_input=1,
625 acls=[first_acl],
Jon Loeliger27cadd22017-11-10 13:15:12 -0600626 expected_retval=-6)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100627
628 self.logger.info("ACLP_TEST_FINISH_0001")
629
630 def test_0002_acl_permit_apply(self):
631 """ permit ACL apply test
632 """
633 self.logger.info("ACLP_TEST_START_0002")
634
635 rules = []
636 rules.append(self.create_rule(self.IPV4, self.PERMIT,
637 0, self.proto[self.IP][self.UDP]))
638 rules.append(self.create_rule(self.IPV4, self.PERMIT,
639 0, self.proto[self.IP][self.TCP]))
640
641 # Apply rules
642 self.apply_rules(rules, "permit per-flow")
643
644 # Traffic should still pass
645 self.run_verify_test(self.IP, self.IPV4, -1)
646 self.logger.info("ACLP_TEST_FINISH_0002")
647
648 def test_0003_acl_deny_apply(self):
649 """ deny ACL apply test
650 """
651 self.logger.info("ACLP_TEST_START_0003")
652 # Add a deny-flows ACL
653 rules = []
654 rules.append(self.create_rule(self.IPV4, self.DENY,
655 self.PORTS_ALL, self.proto[self.IP][self.UDP]))
656 # Permit ip any any in the end
657 rules.append(self.create_rule(self.IPV4, self.PERMIT,
658 self.PORTS_ALL, 0))
659
660 # Apply rules
661 self.apply_rules(rules, "deny per-flow;permit all")
662
663 # Traffic should not pass
664 self.run_verify_negat_test(self.IP, self.IPV4,
665 self.proto[self.IP][self.UDP])
666 self.logger.info("ACLP_TEST_FINISH_0003")
667 # self.assertEqual(1, 0)
668
669 def test_0004_vpp624_permit_icmpv4(self):
670 """ VPP_624 permit ICMPv4
671 """
672 self.logger.info("ACLP_TEST_START_0004")
673
674 # Add an ACL
675 rules = []
676 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
677 self.proto[self.ICMP][self.ICMPv4]))
678 # deny ip any any in the end
679 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
680
681 # Apply rules
682 self.apply_rules(rules, "permit icmpv4")
683
684 # Traffic should still pass
685 self.run_verify_test(self.ICMP, self.IPV4,
686 self.proto[self.ICMP][self.ICMPv4])
687
688 self.logger.info("ACLP_TEST_FINISH_0004")
689
690 def test_0005_vpp624_permit_icmpv6(self):
691 """ VPP_624 permit ICMPv6
692 """
693 self.logger.info("ACLP_TEST_START_0005")
694
695 # Add an ACL
696 rules = []
697 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
698 self.proto[self.ICMP][self.ICMPv6]))
699 # deny ip any any in the end
700 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
701
702 # Apply rules
703 self.apply_rules(rules, "permit icmpv6")
704
705 # Traffic should still pass
706 self.run_verify_test(self.ICMP, self.IPV6,
707 self.proto[self.ICMP][self.ICMPv6])
708
709 self.logger.info("ACLP_TEST_FINISH_0005")
710
711 def test_0006_vpp624_deny_icmpv4(self):
712 """ VPP_624 deny ICMPv4
713 """
714 self.logger.info("ACLP_TEST_START_0006")
715 # Add an ACL
716 rules = []
717 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
718 self.proto[self.ICMP][self.ICMPv4]))
719 # permit ip any any in the end
720 rules.append(self.create_rule(self.IPV4, self.PERMIT,
721 self.PORTS_ALL, 0))
722
723 # Apply rules
724 self.apply_rules(rules, "deny icmpv4")
725
726 # Traffic should not pass
727 self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
728
729 self.logger.info("ACLP_TEST_FINISH_0006")
730
731 def test_0007_vpp624_deny_icmpv6(self):
732 """ VPP_624 deny ICMPv6
733 """
734 self.logger.info("ACLP_TEST_START_0007")
735 # Add an ACL
736 rules = []
737 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
738 self.proto[self.ICMP][self.ICMPv6]))
739 # deny ip any any in the end
740 rules.append(self.create_rule(self.IPV6, self.PERMIT,
741 self.PORTS_ALL, 0))
742
743 # Apply rules
744 self.apply_rules(rules, "deny icmpv6")
745
746 # Traffic should not pass
747 self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
748
749 self.logger.info("ACLP_TEST_FINISH_0007")
750
751 def test_0008_tcp_permit_v4(self):
752 """ permit TCPv4
753 """
754 self.logger.info("ACLP_TEST_START_0008")
755
756 # Add an ACL
757 rules = []
758 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
759 self.proto[self.IP][self.TCP]))
760 # deny ip any any in the end
761 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
762
763 # Apply rules
764 self.apply_rules(rules, "permit ipv4 tcp")
765
766 # Traffic should still pass
767 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
768
769 self.logger.info("ACLP_TEST_FINISH_0008")
770
771 def test_0009_tcp_permit_v6(self):
772 """ permit TCPv6
773 """
774 self.logger.info("ACLP_TEST_START_0009")
775
776 # Add an ACL
777 rules = []
778 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
779 self.proto[self.IP][self.TCP]))
780 # deny ip any any in the end
781 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
782
783 # Apply rules
784 self.apply_rules(rules, "permit ip6 tcp")
785
786 # Traffic should still pass
787 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
788
789 self.logger.info("ACLP_TEST_FINISH_0008")
790
791 def test_0010_udp_permit_v4(self):
792 """ permit UDPv4
793 """
794 self.logger.info("ACLP_TEST_START_0010")
795
796 # Add an ACL
797 rules = []
798 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
799 self.proto[self.IP][self.UDP]))
800 # deny ip any any in the end
801 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
802
803 # Apply rules
804 self.apply_rules(rules, "permit ipv udp")
805
806 # Traffic should still pass
807 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
808
809 self.logger.info("ACLP_TEST_FINISH_0010")
810
811 def test_0011_udp_permit_v6(self):
812 """ permit UDPv6
813 """
814 self.logger.info("ACLP_TEST_START_0011")
815
816 # Add an ACL
817 rules = []
818 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
819 self.proto[self.IP][self.UDP]))
820 # deny ip any any in the end
821 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
822
823 # Apply rules
824 self.apply_rules(rules, "permit ip6 udp")
825
826 # Traffic should still pass
827 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
828
829 self.logger.info("ACLP_TEST_FINISH_0011")
830
831 def test_0012_tcp_deny(self):
832 """ deny TCPv4/v6
833 """
834 self.logger.info("ACLP_TEST_START_0012")
835
836 # Add an ACL
837 rules = []
838 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
839 self.proto[self.IP][self.TCP]))
840 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
841 self.proto[self.IP][self.TCP]))
842 # permit ip any any in the end
843 rules.append(self.create_rule(self.IPV4, self.PERMIT,
844 self.PORTS_ALL, 0))
845 rules.append(self.create_rule(self.IPV6, self.PERMIT,
846 self.PORTS_ALL, 0))
847
848 # Apply rules
849 self.apply_rules(rules, "deny ip4/ip6 tcp")
850
851 # Traffic should not pass
852 self.run_verify_negat_test(self.IP, self.IPRANDOM,
853 self.proto[self.IP][self.TCP])
854
855 self.logger.info("ACLP_TEST_FINISH_0012")
856
857 def test_0013_udp_deny(self):
858 """ deny UDPv4/v6
859 """
860 self.logger.info("ACLP_TEST_START_0013")
861
862 # Add an ACL
863 rules = []
864 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
865 self.proto[self.IP][self.UDP]))
866 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
867 self.proto[self.IP][self.UDP]))
868 # permit ip any any in the end
869 rules.append(self.create_rule(self.IPV4, self.PERMIT,
870 self.PORTS_ALL, 0))
871 rules.append(self.create_rule(self.IPV6, self.PERMIT,
872 self.PORTS_ALL, 0))
873
874 # Apply rules
875 self.apply_rules(rules, "deny ip4/ip6 udp")
876
877 # Traffic should not pass
878 self.run_verify_negat_test(self.IP, self.IPRANDOM,
879 self.proto[self.IP][self.UDP])
880
881 self.logger.info("ACLP_TEST_FINISH_0013")
882
883 def test_0014_acl_dump(self):
884 """ verify add/dump acls
885 """
886 self.logger.info("ACLP_TEST_START_0014")
887
888 r = [[self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
889 [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
890 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
891 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
892 [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
893 [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
894 [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
895 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
896 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
897 [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
898 [self.IPV4, self.DENY, self.PORTS_ALL, 0],
899 [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
900 [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
901 [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
902 [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
903 [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
904 [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
905 [self.IPV6, self.DENY, self.PORTS_ALL, 0]
906 ]
907
908 # Add and verify new ACLs
909 rules = []
910 for i in range(len(r)):
911 rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
912
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200913 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules)
914 result = self.vapi.acl_dump(reply.acl_index)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100915
916 i = 0
917 for drules in result:
918 for dr in drules.r:
919 self.assertEqual(dr.is_ipv6, r[i][0])
920 self.assertEqual(dr.is_permit, r[i][1])
921 self.assertEqual(dr.proto, r[i][3])
922
923 if r[i][2] > 0:
924 self.assertEqual(dr.srcport_or_icmptype_first, r[i][2])
925 else:
926 if r[i][2] < 0:
927 self.assertEqual(dr.srcport_or_icmptype_first, 0)
928 self.assertEqual(dr.srcport_or_icmptype_last, 65535)
929 else:
930 if dr.proto == self.proto[self.IP][self.TCP]:
931 self.assertGreater(dr.srcport_or_icmptype_first,
932 self.tcp_sport_from-1)
933 self.assertLess(dr.srcport_or_icmptype_first,
934 self.tcp_sport_to+1)
935 self.assertGreater(dr.dstport_or_icmpcode_last,
936 self.tcp_dport_from-1)
937 self.assertLess(dr.dstport_or_icmpcode_last,
938 self.tcp_dport_to+1)
939 elif dr.proto == self.proto[self.IP][self.UDP]:
940 self.assertGreater(dr.srcport_or_icmptype_first,
941 self.udp_sport_from-1)
942 self.assertLess(dr.srcport_or_icmptype_first,
943 self.udp_sport_to+1)
944 self.assertGreater(dr.dstport_or_icmpcode_last,
945 self.udp_dport_from-1)
946 self.assertLess(dr.dstport_or_icmpcode_last,
947 self.udp_dport_to+1)
948 i += 1
949
950 self.logger.info("ACLP_TEST_FINISH_0014")
951
952 def test_0015_tcp_permit_port_v4(self):
953 """ permit single TCPv4
954 """
955 self.logger.info("ACLP_TEST_START_0015")
956
957 port = random.randint(0, 65535)
958 # Add an ACL
959 rules = []
960 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
961 self.proto[self.IP][self.TCP]))
962 # deny ip any any in the end
963 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
964
965 # Apply rules
966 self.apply_rules(rules, "permit ip4 tcp "+str(port))
967
968 # Traffic should still pass
969 self.run_verify_test(self.IP, self.IPV4,
970 self.proto[self.IP][self.TCP], port)
971
972 self.logger.info("ACLP_TEST_FINISH_0015")
973
974 def test_0016_udp_permit_port_v4(self):
975 """ permit single UDPv4
976 """
977 self.logger.info("ACLP_TEST_START_0016")
978
979 port = random.randint(0, 65535)
980 # Add an ACL
981 rules = []
982 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
983 self.proto[self.IP][self.UDP]))
984 # deny ip any any in the end
985 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
986
987 # Apply rules
988 self.apply_rules(rules, "permit ip4 tcp "+str(port))
989
990 # Traffic should still pass
991 self.run_verify_test(self.IP, self.IPV4,
992 self.proto[self.IP][self.UDP], port)
993
994 self.logger.info("ACLP_TEST_FINISH_0016")
995
996 def test_0017_tcp_permit_port_v6(self):
997 """ permit single TCPv6
998 """
999 self.logger.info("ACLP_TEST_START_0017")
1000
1001 port = random.randint(0, 65535)
1002 # Add an ACL
1003 rules = []
1004 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1005 self.proto[self.IP][self.TCP]))
1006 # deny ip any any in the end
1007 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1008
1009 # Apply rules
1010 self.apply_rules(rules, "permit ip4 tcp "+str(port))
1011
1012 # Traffic should still pass
1013 self.run_verify_test(self.IP, self.IPV6,
1014 self.proto[self.IP][self.TCP], port)
1015
1016 self.logger.info("ACLP_TEST_FINISH_0017")
1017
1018 def test_0018_udp_permit_port_v6(self):
1019 """ permit single UPPv6
1020 """
1021 self.logger.info("ACLP_TEST_START_0018")
1022
1023 port = random.randint(0, 65535)
1024 # Add an ACL
1025 rules = []
1026 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1027 self.proto[self.IP][self.UDP]))
1028 # deny ip any any in the end
1029 rules.append(self.create_rule(self.IPV6, self.DENY,
1030 self.PORTS_ALL, 0))
1031
1032 # Apply rules
1033 self.apply_rules(rules, "permit ip4 tcp "+str(port))
1034
1035 # Traffic should still pass
1036 self.run_verify_test(self.IP, self.IPV6,
1037 self.proto[self.IP][self.UDP], port)
1038
1039 self.logger.info("ACLP_TEST_FINISH_0018")
1040
1041 def test_0019_udp_deny_port(self):
1042 """ deny single TCPv4/v6
1043 """
1044 self.logger.info("ACLP_TEST_START_0019")
1045
1046 port = random.randint(0, 65535)
1047 # Add an ACL
1048 rules = []
1049 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1050 self.proto[self.IP][self.TCP]))
1051 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1052 self.proto[self.IP][self.TCP]))
1053 # Permit ip any any in the end
1054 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1055 self.PORTS_ALL, 0))
1056 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1057 self.PORTS_ALL, 0))
1058
1059 # Apply rules
1060 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1061
1062 # Traffic should not pass
1063 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1064 self.proto[self.IP][self.TCP], port)
1065
1066 self.logger.info("ACLP_TEST_FINISH_0019")
1067
1068 def test_0020_udp_deny_port(self):
1069 """ deny single UDPv4/v6
1070 """
1071 self.logger.info("ACLP_TEST_START_0020")
1072
1073 port = random.randint(0, 65535)
1074 # Add an ACL
1075 rules = []
1076 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1077 self.proto[self.IP][self.UDP]))
1078 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1079 self.proto[self.IP][self.UDP]))
1080 # Permit ip any any in the end
1081 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1082 self.PORTS_ALL, 0))
1083 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1084 self.PORTS_ALL, 0))
1085
1086 # Apply rules
1087 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1088
1089 # Traffic should not pass
1090 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1091 self.proto[self.IP][self.UDP], port)
1092
1093 self.logger.info("ACLP_TEST_FINISH_0020")
1094
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +00001095 def test_0021_udp_deny_port_verify_fragment_deny(self):
Chris Luked0421942018-04-10 15:19:54 -04001096 """ deny single UDPv4/v6, permit ip any, verify non-initial fragment
1097 blocked
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +00001098 """
1099 self.logger.info("ACLP_TEST_START_0021")
1100
1101 port = random.randint(0, 65535)
1102 # Add an ACL
1103 rules = []
1104 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1105 self.proto[self.IP][self.UDP]))
1106 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1107 self.proto[self.IP][self.UDP]))
1108 # deny ip any any in the end
1109 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1110 self.PORTS_ALL, 0))
1111 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1112 self.PORTS_ALL, 0))
1113
1114 # Apply rules
1115 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1116
1117 # Traffic should not pass
1118 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1119 self.proto[self.IP][self.UDP], port, True)
1120
1121 self.logger.info("ACLP_TEST_FINISH_0021")
1122
Pavel Kotuceke7b67342017-04-18 13:12:20 +02001123 def test_0022_zero_length_udp_ipv4(self):
1124 """ VPP-687 zero length udp ipv4 packet"""
1125 self.logger.info("ACLP_TEST_START_0022")
1126
1127 port = random.randint(0, 65535)
1128 # Add an ACL
1129 rules = []
1130 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
1131 self.proto[self.IP][self.UDP]))
1132 # deny ip any any in the end
1133 rules.append(
1134 self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1135
1136 # Apply rules
1137 self.apply_rules(rules, "permit empty udp ip4 " + str(port))
1138
1139 # Traffic should still pass
1140 # Create incoming packet streams for packet-generator interfaces
1141 pkts_cnt = 0
1142 pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1143 self.IP, self.IPV4,
1144 self.proto[self.IP][self.UDP], port,
1145 False, False)
1146 if len(pkts) > 0:
1147 self.pg0.add_stream(pkts)
1148 pkts_cnt += len(pkts)
1149
1150 # Enable packet capture and start packet sendingself.IPV
1151 self.pg_enable_capture(self.pg_interfaces)
1152 self.pg_start()
1153
1154 self.pg1.get_capture(pkts_cnt)
1155
1156 self.logger.info("ACLP_TEST_FINISH_0022")
1157
1158 def test_0023_zero_length_udp_ipv6(self):
1159 """ VPP-687 zero length udp ipv6 packet"""
1160 self.logger.info("ACLP_TEST_START_0023")
1161
1162 port = random.randint(0, 65535)
1163 # Add an ACL
1164 rules = []
1165 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1166 self.proto[self.IP][self.UDP]))
1167 # deny ip any any in the end
1168 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1169
1170 # Apply rules
1171 self.apply_rules(rules, "permit empty udp ip6 "+str(port))
1172
1173 # Traffic should still pass
1174 # Create incoming packet streams for packet-generator interfaces
1175 pkts_cnt = 0
1176 pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1177 self.IP, self.IPV6,
1178 self.proto[self.IP][self.UDP], port,
1179 False, False)
1180 if len(pkts) > 0:
1181 self.pg0.add_stream(pkts)
1182 pkts_cnt += len(pkts)
1183
1184 # Enable packet capture and start packet sendingself.IPV
1185 self.pg_enable_capture(self.pg_interfaces)
1186 self.pg_start()
1187
1188 # Verify outgoing packet streams per packet-generator interface
1189 self.pg1.get_capture(pkts_cnt)
1190
1191 self.logger.info("ACLP_TEST_FINISH_0023")
1192
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +02001193 def test_0108_tcp_permit_v4(self):
1194 """ permit TCPv4 + non-match range
1195 """
1196 self.logger.info("ACLP_TEST_START_0108")
1197
1198 # Add an ACL
1199 rules = []
1200 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1201 self.proto[self.IP][self.TCP]))
1202 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1203 self.proto[self.IP][self.TCP]))
1204 # deny ip any any in the end
1205 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1206
1207 # Apply rules
1208 self.apply_rules(rules, "permit ipv4 tcp")
1209
1210 # Traffic should still pass
1211 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1212
1213 self.logger.info("ACLP_TEST_FINISH_0108")
1214
1215 def test_0109_tcp_permit_v6(self):
1216 """ permit TCPv6 + non-match range
1217 """
1218 self.logger.info("ACLP_TEST_START_0109")
1219
1220 # Add an ACL
1221 rules = []
1222 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1223 self.proto[self.IP][self.TCP]))
1224 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1225 self.proto[self.IP][self.TCP]))
1226 # deny ip any any in the end
1227 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1228
1229 # Apply rules
1230 self.apply_rules(rules, "permit ip6 tcp")
1231
1232 # Traffic should still pass
1233 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
1234
1235 self.logger.info("ACLP_TEST_FINISH_0109")
1236
1237 def test_0110_udp_permit_v4(self):
1238 """ permit UDPv4 + non-match range
1239 """
1240 self.logger.info("ACLP_TEST_START_0110")
1241
1242 # Add an ACL
1243 rules = []
1244 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1245 self.proto[self.IP][self.UDP]))
1246 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1247 self.proto[self.IP][self.UDP]))
1248 # deny ip any any in the end
1249 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1250
1251 # Apply rules
1252 self.apply_rules(rules, "permit ipv4 udp")
1253
1254 # Traffic should still pass
1255 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
1256
1257 self.logger.info("ACLP_TEST_FINISH_0110")
1258
1259 def test_0111_udp_permit_v6(self):
1260 """ permit UDPv6 + non-match range
1261 """
1262 self.logger.info("ACLP_TEST_START_0111")
1263
1264 # Add an ACL
1265 rules = []
1266 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1267 self.proto[self.IP][self.UDP]))
1268 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1269 self.proto[self.IP][self.UDP]))
1270 # deny ip any any in the end
1271 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1272
1273 # Apply rules
1274 self.apply_rules(rules, "permit ip6 udp")
1275
1276 # Traffic should still pass
1277 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
1278
1279 self.logger.info("ACLP_TEST_FINISH_0111")
1280
1281 def test_0112_tcp_deny(self):
1282 """ deny TCPv4/v6 + non-match range
1283 """
1284 self.logger.info("ACLP_TEST_START_0112")
1285
1286 # Add an ACL
1287 rules = []
1288 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1289 self.PORTS_RANGE_2,
1290 self.proto[self.IP][self.TCP]))
1291 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1292 self.PORTS_RANGE_2,
1293 self.proto[self.IP][self.TCP]))
1294 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1295 self.proto[self.IP][self.TCP]))
1296 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1297 self.proto[self.IP][self.TCP]))
1298 # permit ip any any in the end
1299 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1300 self.PORTS_ALL, 0))
1301 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1302 self.PORTS_ALL, 0))
1303
1304 # Apply rules
1305 self.apply_rules(rules, "deny ip4/ip6 tcp")
1306
1307 # Traffic should not pass
1308 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1309 self.proto[self.IP][self.TCP])
1310
1311 self.logger.info("ACLP_TEST_FINISH_0112")
1312
1313 def test_0113_udp_deny(self):
1314 """ deny UDPv4/v6 + non-match range
1315 """
1316 self.logger.info("ACLP_TEST_START_0113")
1317
1318 # Add an ACL
1319 rules = []
1320 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1321 self.PORTS_RANGE_2,
1322 self.proto[self.IP][self.UDP]))
1323 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1324 self.PORTS_RANGE_2,
1325 self.proto[self.IP][self.UDP]))
1326 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1327 self.proto[self.IP][self.UDP]))
1328 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1329 self.proto[self.IP][self.UDP]))
1330 # permit ip any any in the end
1331 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1332 self.PORTS_ALL, 0))
1333 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1334 self.PORTS_ALL, 0))
1335
1336 # Apply rules
1337 self.apply_rules(rules, "deny ip4/ip6 udp")
1338
1339 # Traffic should not pass
1340 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1341 self.proto[self.IP][self.UDP])
1342
1343 self.logger.info("ACLP_TEST_FINISH_0113")
1344
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001345 def test_0300_tcp_permit_v4_etype_aaaa(self):
1346 """ permit TCPv4, send 0xAAAA etype
1347 """
1348 self.logger.info("ACLP_TEST_START_0300")
1349
1350 # Add an ACL
1351 rules = []
1352 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1353 self.proto[self.IP][self.TCP]))
1354 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1355 self.proto[self.IP][self.TCP]))
1356 # deny ip any any in the end
1357 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1358
1359 # Apply rules
1360 self.apply_rules(rules, "permit ipv4 tcp")
1361
1362 # Traffic should still pass
1363 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1364
1365 # Traffic should still pass also for an odd ethertype
1366 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1367 0, False, True, 0xaaaa)
1368
1369 self.logger.info("ACLP_TEST_FINISH_0300")
1370
1371 def test_0305_tcp_permit_v4_etype_blacklist_aaaa(self):
1372 """ permit TCPv4, whitelist 0x0BBB ethertype, send 0xAAAA, 0x0BBB
1373 """
1374 self.logger.info("ACLP_TEST_START_0305")
1375
1376 # Add an ACL
1377 rules = []
1378 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1379 self.proto[self.IP][self.TCP]))
1380 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1381 self.proto[self.IP][self.TCP]))
1382 # deny ip any any in the end
1383 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1384
1385 # Apply rules
1386 self.apply_rules(rules, "permit ipv4 tcp")
1387
1388 # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1389 self.etype_whitelist([0xbbb], 1)
1390
1391 # The IPv4 traffic should still pass
1392 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1393
1394 # The oddball ethertype should be blocked
1395 self.run_verify_negat_test(self.IP, self.IPV4,
1396 self.proto[self.IP][self.TCP],
1397 0, False, 0xaaaa)
1398
1399 # The whitelisted traffic, on the other hand, should pass
1400 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1401 0, False, True, 0x0bbb)
1402
1403 # remove the whitelist, the previously blocked 0xAAAA should pass now
1404 self.etype_whitelist([], 0)
1405 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1406 0, False, True, 0xaaaa)
1407
1408 self.logger.info("ACLP_TEST_FINISH_0305")
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +02001409
Andrew Yourtchenkode3682f2018-03-23 10:38:46 +01001410 def test_0315_del_intf(self):
1411 """ apply an acl and delete the interface
1412 """
1413 self.logger.info("ACLP_TEST_START_0315")
1414
1415 # Add an ACL
1416 rules = []
1417 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1418 self.proto[self.IP][self.TCP]))
1419 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1420 self.proto[self.IP][self.TCP]))
1421 # deny ip any any in the end
1422 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1423
1424 # create an interface
1425 intf = []
Klement Sekerabeaded52018-06-24 10:30:37 +02001426 intf.append(VppLoInterface(self))
Andrew Yourtchenkode3682f2018-03-23 10:38:46 +01001427
1428 # Apply rules
1429 self.apply_rules_to(rules, "permit ipv4 tcp", intf[0].sw_if_index)
1430
1431 # Remove the interface
1432 intf[0].remove_vpp_config()
1433
1434 self.logger.info("ACLP_TEST_FINISH_0315")
1435
Pavel Kotucek59dda062017-03-02 15:22:47 +01001436if __name__ == '__main__':
1437 unittest.main(testRunner=VppTestRunner)