blob: 376c4d81b8729a46dc02ba4faefdfddbddec2ac0 [file] [log] [blame]
Pavel Kotucek59dda062017-03-02 15:22:47 +01001#!/usr/bin/env python
2"""ACL plugin Test Case HLD:
3"""
4
5import unittest
6import random
7
8from scapy.packet import Raw
9from scapy.layers.l2 import Ether
10from scapy.layers.inet import IP, TCP, UDP, ICMP
11from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +000012from scapy.layers.inet6 import IPv6ExtHdrFragment
Pavel Kotucek59dda062017-03-02 15:22:47 +010013from framework import VppTestCase, VppTestRunner
14from util import Host, ppp
15
Andrew Yourtchenkode3682f2018-03-23 10:38:46 +010016from vpp_lo_interface import VppLoInterface
17
Pavel Kotucek59dda062017-03-02 15:22:47 +010018
19class TestACLplugin(VppTestCase):
20 """ ACL plugin Test Case """
21
22 # traffic types
23 IP = 0
24 ICMP = 1
25
26 # IP version
27 IPRANDOM = -1
28 IPV4 = 0
29 IPV6 = 1
30
31 # rule types
32 DENY = 0
33 PERMIT = 1
34
35 # supported protocols
36 proto = [[6, 17], [1, 58]]
37 proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
38 ICMPv4 = 0
39 ICMPv6 = 1
40 TCP = 0
41 UDP = 1
42 PROTO_ALL = 0
43
44 # port ranges
45 PORTS_ALL = -1
46 PORTS_RANGE = 0
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +020047 PORTS_RANGE_2 = 1
Pavel Kotucek59dda062017-03-02 15:22:47 +010048 udp_sport_from = 10
49 udp_sport_to = udp_sport_from + 5
50 udp_dport_from = 20000
51 udp_dport_to = udp_dport_from + 5000
52 tcp_sport_from = 30
53 tcp_sport_to = tcp_sport_from + 5
54 tcp_dport_from = 40000
55 tcp_dport_to = tcp_dport_from + 5000
56
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +020057 udp_sport_from_2 = 90
58 udp_sport_to_2 = udp_sport_from_2 + 5
59 udp_dport_from_2 = 30000
60 udp_dport_to_2 = udp_dport_from_2 + 5000
61 tcp_sport_from_2 = 130
62 tcp_sport_to_2 = tcp_sport_from_2 + 5
63 tcp_dport_from_2 = 20000
64 tcp_dport_to_2 = tcp_dport_from_2 + 5000
65
Pavel Kotucek59dda062017-03-02 15:22:47 +010066 icmp4_type = 8 # echo request
67 icmp4_code = 3
68 icmp6_type = 128 # echo request
69 icmp6_code = 3
70
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +020071 icmp4_type_2 = 8
72 icmp4_code_from_2 = 5
73 icmp4_code_to_2 = 20
74 icmp6_type_2 = 128
75 icmp6_code_from_2 = 8
76 icmp6_code_to_2 = 42
77
Pavel Kotucek59dda062017-03-02 15:22:47 +010078 # Test variables
79 bd_id = 1
80
81 @classmethod
82 def setUpClass(cls):
83 """
84 Perform standard class setup (defined by class method setUpClass in
85 class VppTestCase) before running the test case, set test case related
86 variables and configure VPP.
87 """
88 super(TestACLplugin, cls).setUpClass()
89
Pavel Kotucek59dda062017-03-02 15:22:47 +010090 try:
91 # Create 2 pg interfaces
92 cls.create_pg_interfaces(range(2))
93
94 # Packet flows mapping pg0 -> pg1, pg2 etc.
95 cls.flows = dict()
96 cls.flows[cls.pg0] = [cls.pg1]
97
98 # Packet sizes
99 cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
100
101 # Create BD with MAC learning and unknown unicast flooding disabled
102 # and put interfaces to this BD
103 cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
104 learn=1)
105 for pg_if in cls.pg_interfaces:
106 cls.vapi.sw_interface_set_l2_bridge(pg_if.sw_if_index,
107 bd_id=cls.bd_id)
108
109 # Set up all interfaces
110 for i in cls.pg_interfaces:
111 i.admin_up()
112
113 # Mapping between packet-generator index and lists of test hosts
114 cls.hosts_by_pg_idx = dict()
115 for pg_if in cls.pg_interfaces:
116 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
117
118 # Create list of deleted hosts
119 cls.deleted_hosts_by_pg_idx = dict()
120 for pg_if in cls.pg_interfaces:
121 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
122
123 # warm-up the mac address tables
124 # self.warmup_test()
125
126 except Exception:
127 super(TestACLplugin, cls).tearDownClass()
128 raise
129
130 def setUp(self):
131 super(TestACLplugin, self).setUp()
132 self.reset_packet_infos()
133
134 def tearDown(self):
135 """
136 Show various debug prints after each test.
137 """
138 super(TestACLplugin, self).tearDown()
139 if not self.vpp_dead:
140 self.logger.info(self.vapi.ppcli("show l2fib verbose"))
Andrew Yourtchenko7f4d5772017-05-24 13:20:47 +0200141 self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
142 self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
143 self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
Pavel Kotucek59dda062017-03-02 15:22:47 +0100144 self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
145 % self.bd_id))
146
147 def create_hosts(self, count, start=0):
148 """
149 Create required number of host MAC addresses and distribute them among
150 interfaces. Create host IPv4 address for every host MAC address.
151
152 :param int count: Number of hosts to create MAC/IPv4 addresses for.
153 :param int start: Number to start numbering from.
154 """
155 n_int = len(self.pg_interfaces)
156 macs_per_if = count / n_int
157 i = -1
158 for pg_if in self.pg_interfaces:
159 i += 1
160 start_nr = macs_per_if * i + start
161 end_nr = count + start if i == (n_int - 1) \
162 else macs_per_if * (i + 1) + start
163 hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
164 for j in range(start_nr, end_nr):
165 host = Host(
166 "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
167 "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
168 "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
169 hosts.append(host)
170
171 def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
172 s_prefix=0, s_ip='\x00\x00\x00\x00',
173 d_prefix=0, d_ip='\x00\x00\x00\x00'):
174 if proto == -1:
175 return
176 if ports == self.PORTS_ALL:
177 sport_from = 0
178 dport_from = 0
179 sport_to = 65535 if proto != 1 and proto != 58 else 255
180 dport_to = sport_to
181 elif ports == self.PORTS_RANGE:
182 if proto == 1:
183 sport_from = self.icmp4_type
184 sport_to = self.icmp4_type
185 dport_from = self.icmp4_code
186 dport_to = self.icmp4_code
187 elif proto == 58:
188 sport_from = self.icmp6_type
189 sport_to = self.icmp6_type
190 dport_from = self.icmp6_code
191 dport_to = self.icmp6_code
192 elif proto == self.proto[self.IP][self.TCP]:
193 sport_from = self.tcp_sport_from
194 sport_to = self.tcp_sport_to
195 dport_from = self.tcp_dport_from
196 dport_to = self.tcp_dport_to
197 elif proto == self.proto[self.IP][self.UDP]:
198 sport_from = self.udp_sport_from
199 sport_to = self.udp_sport_to
200 dport_from = self.udp_dport_from
201 dport_to = self.udp_dport_to
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +0200202 elif ports == self.PORTS_RANGE_2:
203 if proto == 1:
204 sport_from = self.icmp4_type_2
205 sport_to = self.icmp4_type_2
206 dport_from = self.icmp4_code_from_2
207 dport_to = self.icmp4_code_to_2
208 elif proto == 58:
209 sport_from = self.icmp6_type_2
210 sport_to = self.icmp6_type_2
211 dport_from = self.icmp6_code_from_2
212 dport_to = self.icmp6_code_to_2
213 elif proto == self.proto[self.IP][self.TCP]:
214 sport_from = self.tcp_sport_from_2
215 sport_to = self.tcp_sport_to_2
216 dport_from = self.tcp_dport_from_2
217 dport_to = self.tcp_dport_to_2
218 elif proto == self.proto[self.IP][self.UDP]:
219 sport_from = self.udp_sport_from_2
220 sport_to = self.udp_sport_to_2
221 dport_from = self.udp_dport_from_2
222 dport_to = self.udp_dport_to_2
Pavel Kotucek59dda062017-03-02 15:22:47 +0100223 else:
224 sport_from = ports
225 sport_to = ports
226 dport_from = ports
227 dport_to = ports
228
229 rule = ({'is_permit': permit_deny, 'is_ipv6': ip, 'proto': proto,
230 'srcport_or_icmptype_first': sport_from,
231 'srcport_or_icmptype_last': sport_to,
232 'src_ip_prefix_len': s_prefix,
233 'src_ip_addr': s_ip,
234 'dstport_or_icmpcode_first': dport_from,
235 'dstport_or_icmpcode_last': dport_to,
236 'dst_ip_prefix_len': d_prefix,
237 'dst_ip_addr': d_ip})
238 return rule
239
240 def apply_rules(self, rules, tag=''):
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200241 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
242 tag=tag)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100243 self.logger.info("Dumped ACL: " + str(
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200244 self.vapi.acl_dump(reply.acl_index)))
Pavel Kotucek59dda062017-03-02 15:22:47 +0100245 # Apply a ACL on the interface as inbound
246 for i in self.pg_interfaces:
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200247 self.vapi.acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
248 n_input=1,
249 acls=[reply.acl_index])
Pavel Kotucek59dda062017-03-02 15:22:47 +0100250 return
251
Andrew Yourtchenkode3682f2018-03-23 10:38:46 +0100252 def apply_rules_to(self, rules, tag='', sw_if_index=0xFFFFFFFF):
253 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
254 tag=tag)
255 self.logger.info("Dumped ACL: " + str(
256 self.vapi.acl_dump(reply.acl_index)))
257 # Apply a ACL on the interface as inbound
258 self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index,
259 n_input=1,
260 acls=[reply.acl_index])
261 return
262
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100263 def etype_whitelist(self, whitelist, n_input):
264 # Apply whitelists on all the interfaces
265 for i in self.pg_interfaces:
266 # checkstyle can't read long names. Help them.
267 fun = self.vapi.acl_interface_set_etype_whitelist
268 fun(sw_if_index=i.sw_if_index, n_input=n_input,
269 whitelist=whitelist)
270 return
271
Pavel Kotucek59dda062017-03-02 15:22:47 +0100272 def create_upper_layer(self, packet_index, proto, ports=0):
273 p = self.proto_map[proto]
274 if p == 'UDP':
275 if ports == 0:
276 return UDP(sport=random.randint(self.udp_sport_from,
277 self.udp_sport_to),
278 dport=random.randint(self.udp_dport_from,
279 self.udp_dport_to))
280 else:
281 return UDP(sport=ports, dport=ports)
282 elif p == 'TCP':
283 if ports == 0:
284 return TCP(sport=random.randint(self.tcp_sport_from,
285 self.tcp_sport_to),
286 dport=random.randint(self.tcp_dport_from,
287 self.tcp_dport_to))
288 else:
289 return TCP(sport=ports, dport=ports)
290 return ''
291
292 def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100293 proto=-1, ports=0, fragments=False,
294 pkt_raw=True, etype=-1):
Pavel Kotucek59dda062017-03-02 15:22:47 +0100295 """
296 Create input packet stream for defined interface using hosts or
297 deleted_hosts list.
298
299 :param object src_if: Interface to create packet stream for.
300 :param list packet_sizes: List of required packet sizes.
301 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
302 :return: Stream of packets.
303 """
304 pkts = []
305 if self.flows.__contains__(src_if):
306 src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
307 for dst_if in self.flows[src_if]:
308 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
309 n_int = len(dst_hosts) * len(src_hosts)
310 for i in range(0, n_int):
311 dst_host = dst_hosts[i / len(src_hosts)]
312 src_host = src_hosts[i % len(src_hosts)]
313 pkt_info = self.create_packet_info(src_if, dst_if)
314 if ipv6 == 1:
315 pkt_info.ip = 1
316 elif ipv6 == 0:
317 pkt_info.ip = 0
318 else:
319 pkt_info.ip = random.choice([0, 1])
320 if proto == -1:
321 pkt_info.proto = random.choice(self.proto[self.IP])
322 else:
323 pkt_info.proto = proto
324 payload = self.info_to_payload(pkt_info)
325 p = Ether(dst=dst_host.mac, src=src_host.mac)
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100326 if etype > 0:
327 p = Ether(dst=dst_host.mac,
328 src=src_host.mac,
329 type=etype)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100330 if pkt_info.ip:
331 p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000332 if fragments:
333 p /= IPv6ExtHdrFragment(offset=64, m=1)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100334 else:
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000335 if fragments:
336 p /= IP(src=src_host.ip4, dst=dst_host.ip4,
337 flags=1, frag=64)
338 else:
339 p /= IP(src=src_host.ip4, dst=dst_host.ip4)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100340 if traffic_type == self.ICMP:
341 if pkt_info.ip:
342 p /= ICMPv6EchoRequest(type=self.icmp6_type,
343 code=self.icmp6_code)
344 else:
345 p /= ICMP(type=self.icmp4_type,
346 code=self.icmp4_code)
347 else:
348 p /= self.create_upper_layer(i, pkt_info.proto, ports)
Pavel Kotuceke7b67342017-04-18 13:12:20 +0200349 if pkt_raw:
350 p /= Raw(payload)
351 pkt_info.data = p.copy()
352 if pkt_raw:
353 size = random.choice(packet_sizes)
354 self.extend_packet(p, size)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100355 pkts.append(p)
356 return pkts
357
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100358 def verify_capture(self, pg_if, capture,
359 traffic_type=0, ip_type=0, etype=-1):
Pavel Kotucek59dda062017-03-02 15:22:47 +0100360 """
361 Verify captured input packet stream for defined interface.
362
363 :param object pg_if: Interface to verify captured packet stream for.
364 :param list capture: Captured packet stream.
365 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
366 """
367 last_info = dict()
368 for i in self.pg_interfaces:
369 last_info[i.sw_if_index] = None
370 dst_sw_if_index = pg_if.sw_if_index
371 for packet in capture:
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100372 if etype > 0:
373 if packet[Ether].type != etype:
374 self.logger.error(ppp("Unexpected ethertype in packet:",
375 packet))
376 else:
377 continue
Pavel Kotucek59dda062017-03-02 15:22:47 +0100378 try:
379 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
380 if traffic_type == self.ICMP and ip_type == self.IPV6:
381 payload_info = self.payload_to_info(
382 packet[ICMPv6EchoRequest].data)
383 payload = packet[ICMPv6EchoRequest]
384 else:
385 payload_info = self.payload_to_info(str(packet[Raw]))
386 payload = packet[self.proto_map[payload_info.proto]]
387 except:
388 self.logger.error(ppp("Unexpected or invalid packet "
389 "(outside network):", packet))
390 raise
391
392 if ip_type != 0:
393 self.assertEqual(payload_info.ip, ip_type)
394 if traffic_type == self.ICMP:
395 try:
396 if payload_info.ip == 0:
397 self.assertEqual(payload.type, self.icmp4_type)
398 self.assertEqual(payload.code, self.icmp4_code)
399 else:
400 self.assertEqual(payload.type, self.icmp6_type)
401 self.assertEqual(payload.code, self.icmp6_code)
402 except:
403 self.logger.error(ppp("Unexpected or invalid packet "
404 "(outside network):", packet))
405 raise
406 else:
407 try:
408 ip_version = IPv6 if payload_info.ip == 1 else IP
409
410 ip = packet[ip_version]
411 packet_index = payload_info.index
412
413 self.assertEqual(payload_info.dst, dst_sw_if_index)
414 self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
415 (pg_if.name, payload_info.src,
416 packet_index))
417 next_info = self.get_next_packet_info_for_interface2(
418 payload_info.src, dst_sw_if_index,
419 last_info[payload_info.src])
420 last_info[payload_info.src] = next_info
421 self.assertTrue(next_info is not None)
422 self.assertEqual(packet_index, next_info.index)
423 saved_packet = next_info.data
424 # Check standard fields
425 self.assertEqual(ip.src, saved_packet[ip_version].src)
426 self.assertEqual(ip.dst, saved_packet[ip_version].dst)
427 p = self.proto_map[payload_info.proto]
428 if p == 'TCP':
429 tcp = packet[TCP]
430 self.assertEqual(tcp.sport, saved_packet[
431 TCP].sport)
432 self.assertEqual(tcp.dport, saved_packet[
433 TCP].dport)
434 elif p == 'UDP':
435 udp = packet[UDP]
436 self.assertEqual(udp.sport, saved_packet[
437 UDP].sport)
438 self.assertEqual(udp.dport, saved_packet[
439 UDP].dport)
440 except:
441 self.logger.error(ppp("Unexpected or invalid packet:",
442 packet))
443 raise
444 for i in self.pg_interfaces:
445 remaining_packet = self.get_next_packet_info_for_interface2(
446 i, dst_sw_if_index, last_info[i.sw_if_index])
447 self.assertTrue(
448 remaining_packet is None,
449 "Port %u: Packet expected from source %u didn't arrive" %
450 (dst_sw_if_index, i.sw_if_index))
451
452 def run_traffic_no_check(self):
453 # Test
454 # Create incoming packet streams for packet-generator interfaces
455 for i in self.pg_interfaces:
456 if self.flows.__contains__(i):
457 pkts = self.create_stream(i, self.pg_if_packet_sizes)
458 if len(pkts) > 0:
459 i.add_stream(pkts)
460
461 # Enable packet capture and start packet sending
462 self.pg_enable_capture(self.pg_interfaces)
463 self.pg_start()
464
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000465 def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100466 frags=False, pkt_raw=True, etype=-1):
Pavel Kotucek59dda062017-03-02 15:22:47 +0100467 # Test
468 # Create incoming packet streams for packet-generator interfaces
469 pkts_cnt = 0
470 for i in self.pg_interfaces:
471 if self.flows.__contains__(i):
472 pkts = self.create_stream(i, self.pg_if_packet_sizes,
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000473 traffic_type, ip_type, proto, ports,
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100474 frags, pkt_raw, etype)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100475 if len(pkts) > 0:
476 i.add_stream(pkts)
477 pkts_cnt += len(pkts)
478
479 # Enable packet capture and start packet sendingself.IPV
480 self.pg_enable_capture(self.pg_interfaces)
481 self.pg_start()
482
483 # Verify
484 # Verify outgoing packet streams per packet-generator interface
485 for src_if in self.pg_interfaces:
486 if self.flows.__contains__(src_if):
487 for dst_if in self.flows[src_if]:
488 capture = dst_if.get_capture(pkts_cnt)
489 self.logger.info("Verifying capture on interface %s" %
490 dst_if.name)
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100491 self.verify_capture(dst_if, capture,
492 traffic_type, ip_type, etype)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100493
494 def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100495 ports=0, frags=False, etype=-1):
Pavel Kotucek59dda062017-03-02 15:22:47 +0100496 # Test
497 self.reset_packet_infos()
498 for i in self.pg_interfaces:
499 if self.flows.__contains__(i):
500 pkts = self.create_stream(i, self.pg_if_packet_sizes,
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000501 traffic_type, ip_type, proto, ports,
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100502 frags, True, etype)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100503 if len(pkts) > 0:
504 i.add_stream(pkts)
505
506 # Enable packet capture and start packet sending
507 self.pg_enable_capture(self.pg_interfaces)
508 self.pg_start()
509
510 # Verify
511 # Verify outgoing packet streams per packet-generator interface
512 for src_if in self.pg_interfaces:
513 if self.flows.__contains__(src_if):
514 for dst_if in self.flows[src_if]:
515 self.logger.info("Verifying capture on interface %s" %
516 dst_if.name)
517 capture = dst_if.get_capture(0)
518 self.assertEqual(len(capture), 0)
519
Pavel Kotucek59dda062017-03-02 15:22:47 +0100520 def test_0000_warmup_test(self):
521 """ ACL plugin version check; learn MACs
522 """
523 self.create_hosts(16)
524 self.run_traffic_no_check()
525 reply = self.vapi.papi.acl_plugin_get_version()
526 self.assertEqual(reply.major, 1)
527 self.logger.info("Working with ACL plugin version: %d.%d" % (
528 reply.major, reply.minor))
529 # minor version changes are non breaking
530 # self.assertEqual(reply.minor, 0)
531
532 def test_0001_acl_create(self):
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200533 """ ACL create/delete test
Pavel Kotucek59dda062017-03-02 15:22:47 +0100534 """
535
536 self.logger.info("ACLP_TEST_START_0001")
537 # Add an ACL
538 r = [{'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
539 'srcport_or_icmptype_first': 1234,
540 'srcport_or_icmptype_last': 1235,
541 'src_ip_prefix_len': 0,
542 'src_ip_addr': '\x00\x00\x00\x00',
543 'dstport_or_icmpcode_first': 1234,
544 'dstport_or_icmpcode_last': 1234,
545 'dst_ip_addr': '\x00\x00\x00\x00',
546 'dst_ip_prefix_len': 0}]
547 # Test 1: add a new ACL
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200548 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r,
549 tag="permit 1234")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100550 self.assertEqual(reply.retval, 0)
551 # The very first ACL gets #0
552 self.assertEqual(reply.acl_index, 0)
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200553 first_acl = reply.acl_index
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200554 rr = self.vapi.acl_dump(reply.acl_index)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100555 self.logger.info("Dumped ACL: " + str(rr))
556 self.assertEqual(len(rr), 1)
557 # We should have the same number of ACL entries as we had asked
558 self.assertEqual(len(rr[0].r), len(r))
559 # The rules should be the same. But because the submitted and returned
560 # are different types, we need to iterate over rules and keys to get
561 # to basic values.
562 for i_rule in range(0, len(r) - 1):
563 for rule_key in r[i_rule]:
564 self.assertEqual(rr[0].r[i_rule][rule_key],
565 r[i_rule][rule_key])
566
567 # Add a deny-1234 ACL
Ole Troan895b6e82017-10-20 13:28:20 +0200568 r_deny = [{'is_permit': 0, 'is_ipv6': 0, 'proto': 17,
Pavel Kotucek59dda062017-03-02 15:22:47 +0100569 'srcport_or_icmptype_first': 1234,
570 'srcport_or_icmptype_last': 1235,
571 'src_ip_prefix_len': 0,
572 'src_ip_addr': '\x00\x00\x00\x00',
573 'dstport_or_icmpcode_first': 1234,
574 'dstport_or_icmpcode_last': 1234,
575 'dst_ip_addr': '\x00\x00\x00\x00',
576 'dst_ip_prefix_len': 0},
577 {'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
578 'srcport_or_icmptype_first': 0,
579 'srcport_or_icmptype_last': 0,
580 'src_ip_prefix_len': 0,
581 'src_ip_addr': '\x00\x00\x00\x00',
582 'dstport_or_icmpcode_first': 0,
583 'dstport_or_icmpcode_last': 0,
584 'dst_ip_addr': '\x00\x00\x00\x00',
Ole Troan895b6e82017-10-20 13:28:20 +0200585 'dst_ip_prefix_len': 0}]
Pavel Kotucek59dda062017-03-02 15:22:47 +0100586
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200587 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_deny,
588 tag="deny 1234;permit all")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100589 self.assertEqual(reply.retval, 0)
590 # The second ACL gets #1
591 self.assertEqual(reply.acl_index, 1)
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200592 second_acl = reply.acl_index
Pavel Kotucek59dda062017-03-02 15:22:47 +0100593
594 # Test 2: try to modify a nonexistent ACL
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200595 reply = self.vapi.acl_add_replace(acl_index=432, r=r,
Jon Loeliger27cadd22017-11-10 13:15:12 -0600596 tag="FFFF:FFFF", expected_retval=-6)
597 self.assertEqual(reply.retval, -6)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100598 # The ACL number should pass through
599 self.assertEqual(reply.acl_index, 432)
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200600 # apply an ACL on an interface inbound, try to delete ACL, must fail
601 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
602 n_input=1,
603 acls=[first_acl])
Jon Loeliger27cadd22017-11-10 13:15:12 -0600604 reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=-142)
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200605 # Unapply an ACL and then try to delete it - must be ok
606 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
607 n_input=0,
608 acls=[])
609 reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=0)
610
611 # apply an ACL on an interface outbound, try to delete ACL, must fail
612 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
613 n_input=0,
614 acls=[second_acl])
Jon Loeliger27cadd22017-11-10 13:15:12 -0600615 reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=-143)
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200616 # Unapply the ACL and then try to delete it - must be ok
617 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
618 n_input=0,
619 acls=[])
620 reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=0)
621
622 # try to apply a nonexistent ACL - must fail
623 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
624 n_input=1,
625 acls=[first_acl],
Jon Loeliger27cadd22017-11-10 13:15:12 -0600626 expected_retval=-6)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100627
628 self.logger.info("ACLP_TEST_FINISH_0001")
629
630 def test_0002_acl_permit_apply(self):
631 """ permit ACL apply test
632 """
633 self.logger.info("ACLP_TEST_START_0002")
634
635 rules = []
636 rules.append(self.create_rule(self.IPV4, self.PERMIT,
637 0, self.proto[self.IP][self.UDP]))
638 rules.append(self.create_rule(self.IPV4, self.PERMIT,
639 0, self.proto[self.IP][self.TCP]))
640
641 # Apply rules
642 self.apply_rules(rules, "permit per-flow")
643
644 # Traffic should still pass
645 self.run_verify_test(self.IP, self.IPV4, -1)
646 self.logger.info("ACLP_TEST_FINISH_0002")
647
648 def test_0003_acl_deny_apply(self):
649 """ deny ACL apply test
650 """
651 self.logger.info("ACLP_TEST_START_0003")
652 # Add a deny-flows ACL
653 rules = []
654 rules.append(self.create_rule(self.IPV4, self.DENY,
655 self.PORTS_ALL, self.proto[self.IP][self.UDP]))
656 # Permit ip any any in the end
657 rules.append(self.create_rule(self.IPV4, self.PERMIT,
658 self.PORTS_ALL, 0))
659
660 # Apply rules
661 self.apply_rules(rules, "deny per-flow;permit all")
662
663 # Traffic should not pass
664 self.run_verify_negat_test(self.IP, self.IPV4,
665 self.proto[self.IP][self.UDP])
666 self.logger.info("ACLP_TEST_FINISH_0003")
667 # self.assertEqual(1, 0)
668
669 def test_0004_vpp624_permit_icmpv4(self):
670 """ VPP_624 permit ICMPv4
671 """
672 self.logger.info("ACLP_TEST_START_0004")
673
674 # Add an ACL
675 rules = []
676 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
677 self.proto[self.ICMP][self.ICMPv4]))
678 # deny ip any any in the end
679 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
680
681 # Apply rules
682 self.apply_rules(rules, "permit icmpv4")
683
684 # Traffic should still pass
685 self.run_verify_test(self.ICMP, self.IPV4,
686 self.proto[self.ICMP][self.ICMPv4])
687
688 self.logger.info("ACLP_TEST_FINISH_0004")
689
690 def test_0005_vpp624_permit_icmpv6(self):
691 """ VPP_624 permit ICMPv6
692 """
693 self.logger.info("ACLP_TEST_START_0005")
694
695 # Add an ACL
696 rules = []
697 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
698 self.proto[self.ICMP][self.ICMPv6]))
699 # deny ip any any in the end
700 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
701
702 # Apply rules
703 self.apply_rules(rules, "permit icmpv6")
704
705 # Traffic should still pass
706 self.run_verify_test(self.ICMP, self.IPV6,
707 self.proto[self.ICMP][self.ICMPv6])
708
709 self.logger.info("ACLP_TEST_FINISH_0005")
710
711 def test_0006_vpp624_deny_icmpv4(self):
712 """ VPP_624 deny ICMPv4
713 """
714 self.logger.info("ACLP_TEST_START_0006")
715 # Add an ACL
716 rules = []
717 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
718 self.proto[self.ICMP][self.ICMPv4]))
719 # permit ip any any in the end
720 rules.append(self.create_rule(self.IPV4, self.PERMIT,
721 self.PORTS_ALL, 0))
722
723 # Apply rules
724 self.apply_rules(rules, "deny icmpv4")
725
726 # Traffic should not pass
727 self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
728
729 self.logger.info("ACLP_TEST_FINISH_0006")
730
731 def test_0007_vpp624_deny_icmpv6(self):
732 """ VPP_624 deny ICMPv6
733 """
734 self.logger.info("ACLP_TEST_START_0007")
735 # Add an ACL
736 rules = []
737 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
738 self.proto[self.ICMP][self.ICMPv6]))
739 # deny ip any any in the end
740 rules.append(self.create_rule(self.IPV6, self.PERMIT,
741 self.PORTS_ALL, 0))
742
743 # Apply rules
744 self.apply_rules(rules, "deny icmpv6")
745
746 # Traffic should not pass
747 self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
748
749 self.logger.info("ACLP_TEST_FINISH_0007")
750
751 def test_0008_tcp_permit_v4(self):
752 """ permit TCPv4
753 """
754 self.logger.info("ACLP_TEST_START_0008")
755
756 # Add an ACL
757 rules = []
758 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
759 self.proto[self.IP][self.TCP]))
760 # deny ip any any in the end
761 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
762
763 # Apply rules
764 self.apply_rules(rules, "permit ipv4 tcp")
765
766 # Traffic should still pass
767 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
768
769 self.logger.info("ACLP_TEST_FINISH_0008")
770
771 def test_0009_tcp_permit_v6(self):
772 """ permit TCPv6
773 """
774 self.logger.info("ACLP_TEST_START_0009")
775
776 # Add an ACL
777 rules = []
778 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
779 self.proto[self.IP][self.TCP]))
780 # deny ip any any in the end
781 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
782
783 # Apply rules
784 self.apply_rules(rules, "permit ip6 tcp")
785
786 # Traffic should still pass
787 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
788
789 self.logger.info("ACLP_TEST_FINISH_0008")
790
791 def test_0010_udp_permit_v4(self):
792 """ permit UDPv4
793 """
794 self.logger.info("ACLP_TEST_START_0010")
795
796 # Add an ACL
797 rules = []
798 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
799 self.proto[self.IP][self.UDP]))
800 # deny ip any any in the end
801 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
802
803 # Apply rules
804 self.apply_rules(rules, "permit ipv udp")
805
806 # Traffic should still pass
807 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
808
809 self.logger.info("ACLP_TEST_FINISH_0010")
810
811 def test_0011_udp_permit_v6(self):
812 """ permit UDPv6
813 """
814 self.logger.info("ACLP_TEST_START_0011")
815
816 # Add an ACL
817 rules = []
818 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
819 self.proto[self.IP][self.UDP]))
820 # deny ip any any in the end
821 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
822
823 # Apply rules
824 self.apply_rules(rules, "permit ip6 udp")
825
826 # Traffic should still pass
827 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
828
829 self.logger.info("ACLP_TEST_FINISH_0011")
830
831 def test_0012_tcp_deny(self):
832 """ deny TCPv4/v6
833 """
834 self.logger.info("ACLP_TEST_START_0012")
835
836 # Add an ACL
837 rules = []
838 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
839 self.proto[self.IP][self.TCP]))
840 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
841 self.proto[self.IP][self.TCP]))
842 # permit ip any any in the end
843 rules.append(self.create_rule(self.IPV4, self.PERMIT,
844 self.PORTS_ALL, 0))
845 rules.append(self.create_rule(self.IPV6, self.PERMIT,
846 self.PORTS_ALL, 0))
847
848 # Apply rules
849 self.apply_rules(rules, "deny ip4/ip6 tcp")
850
851 # Traffic should not pass
852 self.run_verify_negat_test(self.IP, self.IPRANDOM,
853 self.proto[self.IP][self.TCP])
854
855 self.logger.info("ACLP_TEST_FINISH_0012")
856
857 def test_0013_udp_deny(self):
858 """ deny UDPv4/v6
859 """
860 self.logger.info("ACLP_TEST_START_0013")
861
862 # Add an ACL
863 rules = []
864 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
865 self.proto[self.IP][self.UDP]))
866 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
867 self.proto[self.IP][self.UDP]))
868 # permit ip any any in the end
869 rules.append(self.create_rule(self.IPV4, self.PERMIT,
870 self.PORTS_ALL, 0))
871 rules.append(self.create_rule(self.IPV6, self.PERMIT,
872 self.PORTS_ALL, 0))
873
874 # Apply rules
875 self.apply_rules(rules, "deny ip4/ip6 udp")
876
877 # Traffic should not pass
878 self.run_verify_negat_test(self.IP, self.IPRANDOM,
879 self.proto[self.IP][self.UDP])
880
881 self.logger.info("ACLP_TEST_FINISH_0013")
882
883 def test_0014_acl_dump(self):
884 """ verify add/dump acls
885 """
886 self.logger.info("ACLP_TEST_START_0014")
887
888 r = [[self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
889 [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
890 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
891 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
892 [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
893 [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
894 [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
895 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
896 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
897 [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
898 [self.IPV4, self.DENY, self.PORTS_ALL, 0],
899 [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
900 [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
901 [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
902 [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
903 [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
904 [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
905 [self.IPV6, self.DENY, self.PORTS_ALL, 0]
906 ]
907
908 # Add and verify new ACLs
909 rules = []
910 for i in range(len(r)):
911 rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
912
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200913 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules)
914 result = self.vapi.acl_dump(reply.acl_index)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100915
916 i = 0
917 for drules in result:
918 for dr in drules.r:
919 self.assertEqual(dr.is_ipv6, r[i][0])
920 self.assertEqual(dr.is_permit, r[i][1])
921 self.assertEqual(dr.proto, r[i][3])
922
923 if r[i][2] > 0:
924 self.assertEqual(dr.srcport_or_icmptype_first, r[i][2])
925 else:
926 if r[i][2] < 0:
927 self.assertEqual(dr.srcport_or_icmptype_first, 0)
928 self.assertEqual(dr.srcport_or_icmptype_last, 65535)
929 else:
930 if dr.proto == self.proto[self.IP][self.TCP]:
931 self.assertGreater(dr.srcport_or_icmptype_first,
932 self.tcp_sport_from-1)
933 self.assertLess(dr.srcport_or_icmptype_first,
934 self.tcp_sport_to+1)
935 self.assertGreater(dr.dstport_or_icmpcode_last,
936 self.tcp_dport_from-1)
937 self.assertLess(dr.dstport_or_icmpcode_last,
938 self.tcp_dport_to+1)
939 elif dr.proto == self.proto[self.IP][self.UDP]:
940 self.assertGreater(dr.srcport_or_icmptype_first,
941 self.udp_sport_from-1)
942 self.assertLess(dr.srcport_or_icmptype_first,
943 self.udp_sport_to+1)
944 self.assertGreater(dr.dstport_or_icmpcode_last,
945 self.udp_dport_from-1)
946 self.assertLess(dr.dstport_or_icmpcode_last,
947 self.udp_dport_to+1)
948 i += 1
949
950 self.logger.info("ACLP_TEST_FINISH_0014")
951
952 def test_0015_tcp_permit_port_v4(self):
953 """ permit single TCPv4
954 """
955 self.logger.info("ACLP_TEST_START_0015")
956
957 port = random.randint(0, 65535)
958 # Add an ACL
959 rules = []
960 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
961 self.proto[self.IP][self.TCP]))
962 # deny ip any any in the end
963 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
964
965 # Apply rules
966 self.apply_rules(rules, "permit ip4 tcp "+str(port))
967
968 # Traffic should still pass
969 self.run_verify_test(self.IP, self.IPV4,
970 self.proto[self.IP][self.TCP], port)
971
972 self.logger.info("ACLP_TEST_FINISH_0015")
973
974 def test_0016_udp_permit_port_v4(self):
975 """ permit single UDPv4
976 """
977 self.logger.info("ACLP_TEST_START_0016")
978
979 port = random.randint(0, 65535)
980 # Add an ACL
981 rules = []
982 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
983 self.proto[self.IP][self.UDP]))
984 # deny ip any any in the end
985 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
986
987 # Apply rules
988 self.apply_rules(rules, "permit ip4 tcp "+str(port))
989
990 # Traffic should still pass
991 self.run_verify_test(self.IP, self.IPV4,
992 self.proto[self.IP][self.UDP], port)
993
994 self.logger.info("ACLP_TEST_FINISH_0016")
995
996 def test_0017_tcp_permit_port_v6(self):
997 """ permit single TCPv6
998 """
999 self.logger.info("ACLP_TEST_START_0017")
1000
1001 port = random.randint(0, 65535)
1002 # Add an ACL
1003 rules = []
1004 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1005 self.proto[self.IP][self.TCP]))
1006 # deny ip any any in the end
1007 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1008
1009 # Apply rules
1010 self.apply_rules(rules, "permit ip4 tcp "+str(port))
1011
1012 # Traffic should still pass
1013 self.run_verify_test(self.IP, self.IPV6,
1014 self.proto[self.IP][self.TCP], port)
1015
1016 self.logger.info("ACLP_TEST_FINISH_0017")
1017
1018 def test_0018_udp_permit_port_v6(self):
1019 """ permit single UPPv6
1020 """
1021 self.logger.info("ACLP_TEST_START_0018")
1022
1023 port = random.randint(0, 65535)
1024 # Add an ACL
1025 rules = []
1026 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1027 self.proto[self.IP][self.UDP]))
1028 # deny ip any any in the end
1029 rules.append(self.create_rule(self.IPV6, self.DENY,
1030 self.PORTS_ALL, 0))
1031
1032 # Apply rules
1033 self.apply_rules(rules, "permit ip4 tcp "+str(port))
1034
1035 # Traffic should still pass
1036 self.run_verify_test(self.IP, self.IPV6,
1037 self.proto[self.IP][self.UDP], port)
1038
1039 self.logger.info("ACLP_TEST_FINISH_0018")
1040
1041 def test_0019_udp_deny_port(self):
1042 """ deny single TCPv4/v6
1043 """
1044 self.logger.info("ACLP_TEST_START_0019")
1045
1046 port = random.randint(0, 65535)
1047 # Add an ACL
1048 rules = []
1049 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1050 self.proto[self.IP][self.TCP]))
1051 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1052 self.proto[self.IP][self.TCP]))
1053 # Permit ip any any in the end
1054 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1055 self.PORTS_ALL, 0))
1056 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1057 self.PORTS_ALL, 0))
1058
1059 # Apply rules
1060 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1061
1062 # Traffic should not pass
1063 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1064 self.proto[self.IP][self.TCP], port)
1065
1066 self.logger.info("ACLP_TEST_FINISH_0019")
1067
1068 def test_0020_udp_deny_port(self):
1069 """ deny single UDPv4/v6
1070 """
1071 self.logger.info("ACLP_TEST_START_0020")
1072
1073 port = random.randint(0, 65535)
1074 # Add an ACL
1075 rules = []
1076 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1077 self.proto[self.IP][self.UDP]))
1078 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1079 self.proto[self.IP][self.UDP]))
1080 # Permit ip any any in the end
1081 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1082 self.PORTS_ALL, 0))
1083 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1084 self.PORTS_ALL, 0))
1085
1086 # Apply rules
1087 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1088
1089 # Traffic should not pass
1090 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1091 self.proto[self.IP][self.UDP], port)
1092
1093 self.logger.info("ACLP_TEST_FINISH_0020")
1094
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +00001095 def test_0021_udp_deny_port_verify_fragment_deny(self):
1096 """ deny single UDPv4/v6, permit ip any, verify non-initial fragment blocked
1097 """
1098 self.logger.info("ACLP_TEST_START_0021")
1099
1100 port = random.randint(0, 65535)
1101 # Add an ACL
1102 rules = []
1103 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1104 self.proto[self.IP][self.UDP]))
1105 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1106 self.proto[self.IP][self.UDP]))
1107 # deny ip any any in the end
1108 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1109 self.PORTS_ALL, 0))
1110 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1111 self.PORTS_ALL, 0))
1112
1113 # Apply rules
1114 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1115
1116 # Traffic should not pass
1117 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1118 self.proto[self.IP][self.UDP], port, True)
1119
1120 self.logger.info("ACLP_TEST_FINISH_0021")
1121
Pavel Kotuceke7b67342017-04-18 13:12:20 +02001122 def test_0022_zero_length_udp_ipv4(self):
1123 """ VPP-687 zero length udp ipv4 packet"""
1124 self.logger.info("ACLP_TEST_START_0022")
1125
1126 port = random.randint(0, 65535)
1127 # Add an ACL
1128 rules = []
1129 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
1130 self.proto[self.IP][self.UDP]))
1131 # deny ip any any in the end
1132 rules.append(
1133 self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1134
1135 # Apply rules
1136 self.apply_rules(rules, "permit empty udp ip4 " + str(port))
1137
1138 # Traffic should still pass
1139 # Create incoming packet streams for packet-generator interfaces
1140 pkts_cnt = 0
1141 pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1142 self.IP, self.IPV4,
1143 self.proto[self.IP][self.UDP], port,
1144 False, False)
1145 if len(pkts) > 0:
1146 self.pg0.add_stream(pkts)
1147 pkts_cnt += len(pkts)
1148
1149 # Enable packet capture and start packet sendingself.IPV
1150 self.pg_enable_capture(self.pg_interfaces)
1151 self.pg_start()
1152
1153 self.pg1.get_capture(pkts_cnt)
1154
1155 self.logger.info("ACLP_TEST_FINISH_0022")
1156
1157 def test_0023_zero_length_udp_ipv6(self):
1158 """ VPP-687 zero length udp ipv6 packet"""
1159 self.logger.info("ACLP_TEST_START_0023")
1160
1161 port = random.randint(0, 65535)
1162 # Add an ACL
1163 rules = []
1164 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1165 self.proto[self.IP][self.UDP]))
1166 # deny ip any any in the end
1167 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1168
1169 # Apply rules
1170 self.apply_rules(rules, "permit empty udp ip6 "+str(port))
1171
1172 # Traffic should still pass
1173 # Create incoming packet streams for packet-generator interfaces
1174 pkts_cnt = 0
1175 pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1176 self.IP, self.IPV6,
1177 self.proto[self.IP][self.UDP], port,
1178 False, False)
1179 if len(pkts) > 0:
1180 self.pg0.add_stream(pkts)
1181 pkts_cnt += len(pkts)
1182
1183 # Enable packet capture and start packet sendingself.IPV
1184 self.pg_enable_capture(self.pg_interfaces)
1185 self.pg_start()
1186
1187 # Verify outgoing packet streams per packet-generator interface
1188 self.pg1.get_capture(pkts_cnt)
1189
1190 self.logger.info("ACLP_TEST_FINISH_0023")
1191
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +02001192 def test_0108_tcp_permit_v4(self):
1193 """ permit TCPv4 + non-match range
1194 """
1195 self.logger.info("ACLP_TEST_START_0108")
1196
1197 # Add an ACL
1198 rules = []
1199 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1200 self.proto[self.IP][self.TCP]))
1201 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1202 self.proto[self.IP][self.TCP]))
1203 # deny ip any any in the end
1204 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1205
1206 # Apply rules
1207 self.apply_rules(rules, "permit ipv4 tcp")
1208
1209 # Traffic should still pass
1210 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1211
1212 self.logger.info("ACLP_TEST_FINISH_0108")
1213
1214 def test_0109_tcp_permit_v6(self):
1215 """ permit TCPv6 + non-match range
1216 """
1217 self.logger.info("ACLP_TEST_START_0109")
1218
1219 # Add an ACL
1220 rules = []
1221 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1222 self.proto[self.IP][self.TCP]))
1223 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1224 self.proto[self.IP][self.TCP]))
1225 # deny ip any any in the end
1226 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1227
1228 # Apply rules
1229 self.apply_rules(rules, "permit ip6 tcp")
1230
1231 # Traffic should still pass
1232 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
1233
1234 self.logger.info("ACLP_TEST_FINISH_0109")
1235
1236 def test_0110_udp_permit_v4(self):
1237 """ permit UDPv4 + non-match range
1238 """
1239 self.logger.info("ACLP_TEST_START_0110")
1240
1241 # Add an ACL
1242 rules = []
1243 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1244 self.proto[self.IP][self.UDP]))
1245 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1246 self.proto[self.IP][self.UDP]))
1247 # deny ip any any in the end
1248 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1249
1250 # Apply rules
1251 self.apply_rules(rules, "permit ipv4 udp")
1252
1253 # Traffic should still pass
1254 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
1255
1256 self.logger.info("ACLP_TEST_FINISH_0110")
1257
1258 def test_0111_udp_permit_v6(self):
1259 """ permit UDPv6 + non-match range
1260 """
1261 self.logger.info("ACLP_TEST_START_0111")
1262
1263 # Add an ACL
1264 rules = []
1265 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1266 self.proto[self.IP][self.UDP]))
1267 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1268 self.proto[self.IP][self.UDP]))
1269 # deny ip any any in the end
1270 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1271
1272 # Apply rules
1273 self.apply_rules(rules, "permit ip6 udp")
1274
1275 # Traffic should still pass
1276 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
1277
1278 self.logger.info("ACLP_TEST_FINISH_0111")
1279
1280 def test_0112_tcp_deny(self):
1281 """ deny TCPv4/v6 + non-match range
1282 """
1283 self.logger.info("ACLP_TEST_START_0112")
1284
1285 # Add an ACL
1286 rules = []
1287 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1288 self.PORTS_RANGE_2,
1289 self.proto[self.IP][self.TCP]))
1290 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1291 self.PORTS_RANGE_2,
1292 self.proto[self.IP][self.TCP]))
1293 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1294 self.proto[self.IP][self.TCP]))
1295 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1296 self.proto[self.IP][self.TCP]))
1297 # permit ip any any in the end
1298 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1299 self.PORTS_ALL, 0))
1300 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1301 self.PORTS_ALL, 0))
1302
1303 # Apply rules
1304 self.apply_rules(rules, "deny ip4/ip6 tcp")
1305
1306 # Traffic should not pass
1307 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1308 self.proto[self.IP][self.TCP])
1309
1310 self.logger.info("ACLP_TEST_FINISH_0112")
1311
1312 def test_0113_udp_deny(self):
1313 """ deny UDPv4/v6 + non-match range
1314 """
1315 self.logger.info("ACLP_TEST_START_0113")
1316
1317 # Add an ACL
1318 rules = []
1319 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1320 self.PORTS_RANGE_2,
1321 self.proto[self.IP][self.UDP]))
1322 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1323 self.PORTS_RANGE_2,
1324 self.proto[self.IP][self.UDP]))
1325 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1326 self.proto[self.IP][self.UDP]))
1327 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1328 self.proto[self.IP][self.UDP]))
1329 # permit ip any any in the end
1330 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1331 self.PORTS_ALL, 0))
1332 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1333 self.PORTS_ALL, 0))
1334
1335 # Apply rules
1336 self.apply_rules(rules, "deny ip4/ip6 udp")
1337
1338 # Traffic should not pass
1339 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1340 self.proto[self.IP][self.UDP])
1341
1342 self.logger.info("ACLP_TEST_FINISH_0113")
1343
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001344 def test_0300_tcp_permit_v4_etype_aaaa(self):
1345 """ permit TCPv4, send 0xAAAA etype
1346 """
1347 self.logger.info("ACLP_TEST_START_0300")
1348
1349 # Add an ACL
1350 rules = []
1351 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1352 self.proto[self.IP][self.TCP]))
1353 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1354 self.proto[self.IP][self.TCP]))
1355 # deny ip any any in the end
1356 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1357
1358 # Apply rules
1359 self.apply_rules(rules, "permit ipv4 tcp")
1360
1361 # Traffic should still pass
1362 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1363
1364 # Traffic should still pass also for an odd ethertype
1365 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1366 0, False, True, 0xaaaa)
1367
1368 self.logger.info("ACLP_TEST_FINISH_0300")
1369
1370 def test_0305_tcp_permit_v4_etype_blacklist_aaaa(self):
1371 """ permit TCPv4, whitelist 0x0BBB ethertype, send 0xAAAA, 0x0BBB
1372 """
1373 self.logger.info("ACLP_TEST_START_0305")
1374
1375 # Add an ACL
1376 rules = []
1377 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1378 self.proto[self.IP][self.TCP]))
1379 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1380 self.proto[self.IP][self.TCP]))
1381 # deny ip any any in the end
1382 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1383
1384 # Apply rules
1385 self.apply_rules(rules, "permit ipv4 tcp")
1386
1387 # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1388 self.etype_whitelist([0xbbb], 1)
1389
1390 # The IPv4 traffic should still pass
1391 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1392
1393 # The oddball ethertype should be blocked
1394 self.run_verify_negat_test(self.IP, self.IPV4,
1395 self.proto[self.IP][self.TCP],
1396 0, False, 0xaaaa)
1397
1398 # The whitelisted traffic, on the other hand, should pass
1399 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1400 0, False, True, 0x0bbb)
1401
1402 # remove the whitelist, the previously blocked 0xAAAA should pass now
1403 self.etype_whitelist([], 0)
1404 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1405 0, False, True, 0xaaaa)
1406
1407 self.logger.info("ACLP_TEST_FINISH_0305")
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +02001408
Andrew Yourtchenkode3682f2018-03-23 10:38:46 +01001409 def test_0315_del_intf(self):
1410 """ apply an acl and delete the interface
1411 """
1412 self.logger.info("ACLP_TEST_START_0315")
1413
1414 # Add an ACL
1415 rules = []
1416 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1417 self.proto[self.IP][self.TCP]))
1418 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1419 self.proto[self.IP][self.TCP]))
1420 # deny ip any any in the end
1421 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1422
1423 # create an interface
1424 intf = []
1425 intf.append(VppLoInterface(self, 0))
1426
1427 # Apply rules
1428 self.apply_rules_to(rules, "permit ipv4 tcp", intf[0].sw_if_index)
1429
1430 # Remove the interface
1431 intf[0].remove_vpp_config()
1432
1433 self.logger.info("ACLP_TEST_FINISH_0315")
1434
Pavel Kotucek59dda062017-03-02 15:22:47 +01001435if __name__ == '__main__':
1436 unittest.main(testRunner=VppTestRunner)