blob: 5fcf09c5ee75b3b915d558196397f43c81f20c18 [file] [log] [blame]
Pavel Kotucek59dda062017-03-02 15:22:47 +01001#!/usr/bin/env python
2"""ACL plugin Test Case HLD:
3"""
4
5import unittest
6import random
7
8from scapy.packet import Raw
9from scapy.layers.l2 import Ether
10from scapy.layers.inet import IP, TCP, UDP, ICMP
11from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +000012from scapy.layers.inet6 import IPv6ExtHdrFragment
Pavel Kotucek59dda062017-03-02 15:22:47 +010013from framework import VppTestCase, VppTestRunner
14from util import Host, ppp
15
16
17class TestACLplugin(VppTestCase):
18 """ ACL plugin Test Case """
19
20 # traffic types
21 IP = 0
22 ICMP = 1
23
24 # IP version
25 IPRANDOM = -1
26 IPV4 = 0
27 IPV6 = 1
28
29 # rule types
30 DENY = 0
31 PERMIT = 1
32
33 # supported protocols
34 proto = [[6, 17], [1, 58]]
35 proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
36 ICMPv4 = 0
37 ICMPv6 = 1
38 TCP = 0
39 UDP = 1
40 PROTO_ALL = 0
41
42 # port ranges
43 PORTS_ALL = -1
44 PORTS_RANGE = 0
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +020045 PORTS_RANGE_2 = 1
Pavel Kotucek59dda062017-03-02 15:22:47 +010046 udp_sport_from = 10
47 udp_sport_to = udp_sport_from + 5
48 udp_dport_from = 20000
49 udp_dport_to = udp_dport_from + 5000
50 tcp_sport_from = 30
51 tcp_sport_to = tcp_sport_from + 5
52 tcp_dport_from = 40000
53 tcp_dport_to = tcp_dport_from + 5000
54
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +020055 udp_sport_from_2 = 90
56 udp_sport_to_2 = udp_sport_from_2 + 5
57 udp_dport_from_2 = 30000
58 udp_dport_to_2 = udp_dport_from_2 + 5000
59 tcp_sport_from_2 = 130
60 tcp_sport_to_2 = tcp_sport_from_2 + 5
61 tcp_dport_from_2 = 20000
62 tcp_dport_to_2 = tcp_dport_from_2 + 5000
63
Pavel Kotucek59dda062017-03-02 15:22:47 +010064 icmp4_type = 8 # echo request
65 icmp4_code = 3
66 icmp6_type = 128 # echo request
67 icmp6_code = 3
68
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +020069 icmp4_type_2 = 8
70 icmp4_code_from_2 = 5
71 icmp4_code_to_2 = 20
72 icmp6_type_2 = 128
73 icmp6_code_from_2 = 8
74 icmp6_code_to_2 = 42
75
Pavel Kotucek59dda062017-03-02 15:22:47 +010076 # Test variables
77 bd_id = 1
78
79 @classmethod
80 def setUpClass(cls):
81 """
82 Perform standard class setup (defined by class method setUpClass in
83 class VppTestCase) before running the test case, set test case related
84 variables and configure VPP.
85 """
86 super(TestACLplugin, cls).setUpClass()
87
Pavel Kotucek59dda062017-03-02 15:22:47 +010088 try:
89 # Create 2 pg interfaces
90 cls.create_pg_interfaces(range(2))
91
92 # Packet flows mapping pg0 -> pg1, pg2 etc.
93 cls.flows = dict()
94 cls.flows[cls.pg0] = [cls.pg1]
95
96 # Packet sizes
97 cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
98
99 # Create BD with MAC learning and unknown unicast flooding disabled
100 # and put interfaces to this BD
101 cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
102 learn=1)
103 for pg_if in cls.pg_interfaces:
104 cls.vapi.sw_interface_set_l2_bridge(pg_if.sw_if_index,
105 bd_id=cls.bd_id)
106
107 # Set up all interfaces
108 for i in cls.pg_interfaces:
109 i.admin_up()
110
111 # Mapping between packet-generator index and lists of test hosts
112 cls.hosts_by_pg_idx = dict()
113 for pg_if in cls.pg_interfaces:
114 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
115
116 # Create list of deleted hosts
117 cls.deleted_hosts_by_pg_idx = dict()
118 for pg_if in cls.pg_interfaces:
119 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
120
121 # warm-up the mac address tables
122 # self.warmup_test()
123
124 except Exception:
125 super(TestACLplugin, cls).tearDownClass()
126 raise
127
128 def setUp(self):
129 super(TestACLplugin, self).setUp()
130 self.reset_packet_infos()
131
132 def tearDown(self):
133 """
134 Show various debug prints after each test.
135 """
136 super(TestACLplugin, self).tearDown()
137 if not self.vpp_dead:
138 self.logger.info(self.vapi.ppcli("show l2fib verbose"))
Andrew Yourtchenko7f4d5772017-05-24 13:20:47 +0200139 self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
140 self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
141 self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
Pavel Kotucek59dda062017-03-02 15:22:47 +0100142 self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
143 % self.bd_id))
144
145 def create_hosts(self, count, start=0):
146 """
147 Create required number of host MAC addresses and distribute them among
148 interfaces. Create host IPv4 address for every host MAC address.
149
150 :param int count: Number of hosts to create MAC/IPv4 addresses for.
151 :param int start: Number to start numbering from.
152 """
153 n_int = len(self.pg_interfaces)
154 macs_per_if = count / n_int
155 i = -1
156 for pg_if in self.pg_interfaces:
157 i += 1
158 start_nr = macs_per_if * i + start
159 end_nr = count + start if i == (n_int - 1) \
160 else macs_per_if * (i + 1) + start
161 hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
162 for j in range(start_nr, end_nr):
163 host = Host(
164 "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
165 "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
166 "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
167 hosts.append(host)
168
169 def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
170 s_prefix=0, s_ip='\x00\x00\x00\x00',
171 d_prefix=0, d_ip='\x00\x00\x00\x00'):
172 if proto == -1:
173 return
174 if ports == self.PORTS_ALL:
175 sport_from = 0
176 dport_from = 0
177 sport_to = 65535 if proto != 1 and proto != 58 else 255
178 dport_to = sport_to
179 elif ports == self.PORTS_RANGE:
180 if proto == 1:
181 sport_from = self.icmp4_type
182 sport_to = self.icmp4_type
183 dport_from = self.icmp4_code
184 dport_to = self.icmp4_code
185 elif proto == 58:
186 sport_from = self.icmp6_type
187 sport_to = self.icmp6_type
188 dport_from = self.icmp6_code
189 dport_to = self.icmp6_code
190 elif proto == self.proto[self.IP][self.TCP]:
191 sport_from = self.tcp_sport_from
192 sport_to = self.tcp_sport_to
193 dport_from = self.tcp_dport_from
194 dport_to = self.tcp_dport_to
195 elif proto == self.proto[self.IP][self.UDP]:
196 sport_from = self.udp_sport_from
197 sport_to = self.udp_sport_to
198 dport_from = self.udp_dport_from
199 dport_to = self.udp_dport_to
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +0200200 elif ports == self.PORTS_RANGE_2:
201 if proto == 1:
202 sport_from = self.icmp4_type_2
203 sport_to = self.icmp4_type_2
204 dport_from = self.icmp4_code_from_2
205 dport_to = self.icmp4_code_to_2
206 elif proto == 58:
207 sport_from = self.icmp6_type_2
208 sport_to = self.icmp6_type_2
209 dport_from = self.icmp6_code_from_2
210 dport_to = self.icmp6_code_to_2
211 elif proto == self.proto[self.IP][self.TCP]:
212 sport_from = self.tcp_sport_from_2
213 sport_to = self.tcp_sport_to_2
214 dport_from = self.tcp_dport_from_2
215 dport_to = self.tcp_dport_to_2
216 elif proto == self.proto[self.IP][self.UDP]:
217 sport_from = self.udp_sport_from_2
218 sport_to = self.udp_sport_to_2
219 dport_from = self.udp_dport_from_2
220 dport_to = self.udp_dport_to_2
Pavel Kotucek59dda062017-03-02 15:22:47 +0100221 else:
222 sport_from = ports
223 sport_to = ports
224 dport_from = ports
225 dport_to = ports
226
227 rule = ({'is_permit': permit_deny, 'is_ipv6': ip, 'proto': proto,
228 'srcport_or_icmptype_first': sport_from,
229 'srcport_or_icmptype_last': sport_to,
230 'src_ip_prefix_len': s_prefix,
231 'src_ip_addr': s_ip,
232 'dstport_or_icmpcode_first': dport_from,
233 'dstport_or_icmpcode_last': dport_to,
234 'dst_ip_prefix_len': d_prefix,
235 'dst_ip_addr': d_ip})
236 return rule
237
238 def apply_rules(self, rules, tag=''):
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200239 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
240 tag=tag)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100241 self.logger.info("Dumped ACL: " + str(
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200242 self.vapi.acl_dump(reply.acl_index)))
Pavel Kotucek59dda062017-03-02 15:22:47 +0100243 # Apply a ACL on the interface as inbound
244 for i in self.pg_interfaces:
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200245 self.vapi.acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
246 n_input=1,
247 acls=[reply.acl_index])
Pavel Kotucek59dda062017-03-02 15:22:47 +0100248 return
249
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100250 def etype_whitelist(self, whitelist, n_input):
251 # Apply whitelists on all the interfaces
252 for i in self.pg_interfaces:
253 # checkstyle can't read long names. Help them.
254 fun = self.vapi.acl_interface_set_etype_whitelist
255 fun(sw_if_index=i.sw_if_index, n_input=n_input,
256 whitelist=whitelist)
257 return
258
Pavel Kotucek59dda062017-03-02 15:22:47 +0100259 def create_upper_layer(self, packet_index, proto, ports=0):
260 p = self.proto_map[proto]
261 if p == 'UDP':
262 if ports == 0:
263 return UDP(sport=random.randint(self.udp_sport_from,
264 self.udp_sport_to),
265 dport=random.randint(self.udp_dport_from,
266 self.udp_dport_to))
267 else:
268 return UDP(sport=ports, dport=ports)
269 elif p == 'TCP':
270 if ports == 0:
271 return TCP(sport=random.randint(self.tcp_sport_from,
272 self.tcp_sport_to),
273 dport=random.randint(self.tcp_dport_from,
274 self.tcp_dport_to))
275 else:
276 return TCP(sport=ports, dport=ports)
277 return ''
278
279 def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100280 proto=-1, ports=0, fragments=False,
281 pkt_raw=True, etype=-1):
Pavel Kotucek59dda062017-03-02 15:22:47 +0100282 """
283 Create input packet stream for defined interface using hosts or
284 deleted_hosts list.
285
286 :param object src_if: Interface to create packet stream for.
287 :param list packet_sizes: List of required packet sizes.
288 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
289 :return: Stream of packets.
290 """
291 pkts = []
292 if self.flows.__contains__(src_if):
293 src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
294 for dst_if in self.flows[src_if]:
295 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
296 n_int = len(dst_hosts) * len(src_hosts)
297 for i in range(0, n_int):
298 dst_host = dst_hosts[i / len(src_hosts)]
299 src_host = src_hosts[i % len(src_hosts)]
300 pkt_info = self.create_packet_info(src_if, dst_if)
301 if ipv6 == 1:
302 pkt_info.ip = 1
303 elif ipv6 == 0:
304 pkt_info.ip = 0
305 else:
306 pkt_info.ip = random.choice([0, 1])
307 if proto == -1:
308 pkt_info.proto = random.choice(self.proto[self.IP])
309 else:
310 pkt_info.proto = proto
311 payload = self.info_to_payload(pkt_info)
312 p = Ether(dst=dst_host.mac, src=src_host.mac)
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100313 if etype > 0:
314 p = Ether(dst=dst_host.mac,
315 src=src_host.mac,
316 type=etype)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100317 if pkt_info.ip:
318 p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000319 if fragments:
320 p /= IPv6ExtHdrFragment(offset=64, m=1)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100321 else:
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000322 if fragments:
323 p /= IP(src=src_host.ip4, dst=dst_host.ip4,
324 flags=1, frag=64)
325 else:
326 p /= IP(src=src_host.ip4, dst=dst_host.ip4)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100327 if traffic_type == self.ICMP:
328 if pkt_info.ip:
329 p /= ICMPv6EchoRequest(type=self.icmp6_type,
330 code=self.icmp6_code)
331 else:
332 p /= ICMP(type=self.icmp4_type,
333 code=self.icmp4_code)
334 else:
335 p /= self.create_upper_layer(i, pkt_info.proto, ports)
Pavel Kotuceke7b67342017-04-18 13:12:20 +0200336 if pkt_raw:
337 p /= Raw(payload)
338 pkt_info.data = p.copy()
339 if pkt_raw:
340 size = random.choice(packet_sizes)
341 self.extend_packet(p, size)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100342 pkts.append(p)
343 return pkts
344
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100345 def verify_capture(self, pg_if, capture,
346 traffic_type=0, ip_type=0, etype=-1):
Pavel Kotucek59dda062017-03-02 15:22:47 +0100347 """
348 Verify captured input packet stream for defined interface.
349
350 :param object pg_if: Interface to verify captured packet stream for.
351 :param list capture: Captured packet stream.
352 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
353 """
354 last_info = dict()
355 for i in self.pg_interfaces:
356 last_info[i.sw_if_index] = None
357 dst_sw_if_index = pg_if.sw_if_index
358 for packet in capture:
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100359 if etype > 0:
360 if packet[Ether].type != etype:
361 self.logger.error(ppp("Unexpected ethertype in packet:",
362 packet))
363 else:
364 continue
Pavel Kotucek59dda062017-03-02 15:22:47 +0100365 try:
366 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
367 if traffic_type == self.ICMP and ip_type == self.IPV6:
368 payload_info = self.payload_to_info(
369 packet[ICMPv6EchoRequest].data)
370 payload = packet[ICMPv6EchoRequest]
371 else:
372 payload_info = self.payload_to_info(str(packet[Raw]))
373 payload = packet[self.proto_map[payload_info.proto]]
374 except:
375 self.logger.error(ppp("Unexpected or invalid packet "
376 "(outside network):", packet))
377 raise
378
379 if ip_type != 0:
380 self.assertEqual(payload_info.ip, ip_type)
381 if traffic_type == self.ICMP:
382 try:
383 if payload_info.ip == 0:
384 self.assertEqual(payload.type, self.icmp4_type)
385 self.assertEqual(payload.code, self.icmp4_code)
386 else:
387 self.assertEqual(payload.type, self.icmp6_type)
388 self.assertEqual(payload.code, self.icmp6_code)
389 except:
390 self.logger.error(ppp("Unexpected or invalid packet "
391 "(outside network):", packet))
392 raise
393 else:
394 try:
395 ip_version = IPv6 if payload_info.ip == 1 else IP
396
397 ip = packet[ip_version]
398 packet_index = payload_info.index
399
400 self.assertEqual(payload_info.dst, dst_sw_if_index)
401 self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
402 (pg_if.name, payload_info.src,
403 packet_index))
404 next_info = self.get_next_packet_info_for_interface2(
405 payload_info.src, dst_sw_if_index,
406 last_info[payload_info.src])
407 last_info[payload_info.src] = next_info
408 self.assertTrue(next_info is not None)
409 self.assertEqual(packet_index, next_info.index)
410 saved_packet = next_info.data
411 # Check standard fields
412 self.assertEqual(ip.src, saved_packet[ip_version].src)
413 self.assertEqual(ip.dst, saved_packet[ip_version].dst)
414 p = self.proto_map[payload_info.proto]
415 if p == 'TCP':
416 tcp = packet[TCP]
417 self.assertEqual(tcp.sport, saved_packet[
418 TCP].sport)
419 self.assertEqual(tcp.dport, saved_packet[
420 TCP].dport)
421 elif p == 'UDP':
422 udp = packet[UDP]
423 self.assertEqual(udp.sport, saved_packet[
424 UDP].sport)
425 self.assertEqual(udp.dport, saved_packet[
426 UDP].dport)
427 except:
428 self.logger.error(ppp("Unexpected or invalid packet:",
429 packet))
430 raise
431 for i in self.pg_interfaces:
432 remaining_packet = self.get_next_packet_info_for_interface2(
433 i, dst_sw_if_index, last_info[i.sw_if_index])
434 self.assertTrue(
435 remaining_packet is None,
436 "Port %u: Packet expected from source %u didn't arrive" %
437 (dst_sw_if_index, i.sw_if_index))
438
439 def run_traffic_no_check(self):
440 # Test
441 # Create incoming packet streams for packet-generator interfaces
442 for i in self.pg_interfaces:
443 if self.flows.__contains__(i):
444 pkts = self.create_stream(i, self.pg_if_packet_sizes)
445 if len(pkts) > 0:
446 i.add_stream(pkts)
447
448 # Enable packet capture and start packet sending
449 self.pg_enable_capture(self.pg_interfaces)
450 self.pg_start()
451
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000452 def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100453 frags=False, pkt_raw=True, etype=-1):
Pavel Kotucek59dda062017-03-02 15:22:47 +0100454 # Test
455 # Create incoming packet streams for packet-generator interfaces
456 pkts_cnt = 0
457 for i in self.pg_interfaces:
458 if self.flows.__contains__(i):
459 pkts = self.create_stream(i, self.pg_if_packet_sizes,
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000460 traffic_type, ip_type, proto, ports,
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100461 frags, pkt_raw, etype)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100462 if len(pkts) > 0:
463 i.add_stream(pkts)
464 pkts_cnt += len(pkts)
465
466 # Enable packet capture and start packet sendingself.IPV
467 self.pg_enable_capture(self.pg_interfaces)
468 self.pg_start()
469
470 # Verify
471 # Verify outgoing packet streams per packet-generator interface
472 for src_if in self.pg_interfaces:
473 if self.flows.__contains__(src_if):
474 for dst_if in self.flows[src_if]:
475 capture = dst_if.get_capture(pkts_cnt)
476 self.logger.info("Verifying capture on interface %s" %
477 dst_if.name)
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100478 self.verify_capture(dst_if, capture,
479 traffic_type, ip_type, etype)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100480
481 def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100482 ports=0, frags=False, etype=-1):
Pavel Kotucek59dda062017-03-02 15:22:47 +0100483 # Test
484 self.reset_packet_infos()
485 for i in self.pg_interfaces:
486 if self.flows.__contains__(i):
487 pkts = self.create_stream(i, self.pg_if_packet_sizes,
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000488 traffic_type, ip_type, proto, ports,
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100489 frags, True, etype)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100490 if len(pkts) > 0:
491 i.add_stream(pkts)
492
493 # Enable packet capture and start packet sending
494 self.pg_enable_capture(self.pg_interfaces)
495 self.pg_start()
496
497 # Verify
498 # Verify outgoing packet streams per packet-generator interface
499 for src_if in self.pg_interfaces:
500 if self.flows.__contains__(src_if):
501 for dst_if in self.flows[src_if]:
502 self.logger.info("Verifying capture on interface %s" %
503 dst_if.name)
504 capture = dst_if.get_capture(0)
505 self.assertEqual(len(capture), 0)
506
Pavel Kotucek59dda062017-03-02 15:22:47 +0100507 def test_0000_warmup_test(self):
508 """ ACL plugin version check; learn MACs
509 """
510 self.create_hosts(16)
511 self.run_traffic_no_check()
512 reply = self.vapi.papi.acl_plugin_get_version()
513 self.assertEqual(reply.major, 1)
514 self.logger.info("Working with ACL plugin version: %d.%d" % (
515 reply.major, reply.minor))
516 # minor version changes are non breaking
517 # self.assertEqual(reply.minor, 0)
518
519 def test_0001_acl_create(self):
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200520 """ ACL create/delete test
Pavel Kotucek59dda062017-03-02 15:22:47 +0100521 """
522
523 self.logger.info("ACLP_TEST_START_0001")
524 # Add an ACL
525 r = [{'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
526 'srcport_or_icmptype_first': 1234,
527 'srcport_or_icmptype_last': 1235,
528 'src_ip_prefix_len': 0,
529 'src_ip_addr': '\x00\x00\x00\x00',
530 'dstport_or_icmpcode_first': 1234,
531 'dstport_or_icmpcode_last': 1234,
532 'dst_ip_addr': '\x00\x00\x00\x00',
533 'dst_ip_prefix_len': 0}]
534 # Test 1: add a new ACL
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200535 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r,
536 tag="permit 1234")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100537 self.assertEqual(reply.retval, 0)
538 # The very first ACL gets #0
539 self.assertEqual(reply.acl_index, 0)
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200540 first_acl = reply.acl_index
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200541 rr = self.vapi.acl_dump(reply.acl_index)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100542 self.logger.info("Dumped ACL: " + str(rr))
543 self.assertEqual(len(rr), 1)
544 # We should have the same number of ACL entries as we had asked
545 self.assertEqual(len(rr[0].r), len(r))
546 # The rules should be the same. But because the submitted and returned
547 # are different types, we need to iterate over rules and keys to get
548 # to basic values.
549 for i_rule in range(0, len(r) - 1):
550 for rule_key in r[i_rule]:
551 self.assertEqual(rr[0].r[i_rule][rule_key],
552 r[i_rule][rule_key])
553
554 # Add a deny-1234 ACL
Ole Troan895b6e82017-10-20 13:28:20 +0200555 r_deny = [{'is_permit': 0, 'is_ipv6': 0, 'proto': 17,
Pavel Kotucek59dda062017-03-02 15:22:47 +0100556 'srcport_or_icmptype_first': 1234,
557 'srcport_or_icmptype_last': 1235,
558 'src_ip_prefix_len': 0,
559 'src_ip_addr': '\x00\x00\x00\x00',
560 'dstport_or_icmpcode_first': 1234,
561 'dstport_or_icmpcode_last': 1234,
562 'dst_ip_addr': '\x00\x00\x00\x00',
563 'dst_ip_prefix_len': 0},
564 {'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
565 'srcport_or_icmptype_first': 0,
566 'srcport_or_icmptype_last': 0,
567 'src_ip_prefix_len': 0,
568 'src_ip_addr': '\x00\x00\x00\x00',
569 'dstport_or_icmpcode_first': 0,
570 'dstport_or_icmpcode_last': 0,
571 'dst_ip_addr': '\x00\x00\x00\x00',
Ole Troan895b6e82017-10-20 13:28:20 +0200572 'dst_ip_prefix_len': 0}]
Pavel Kotucek59dda062017-03-02 15:22:47 +0100573
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200574 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_deny,
575 tag="deny 1234;permit all")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100576 self.assertEqual(reply.retval, 0)
577 # The second ACL gets #1
578 self.assertEqual(reply.acl_index, 1)
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200579 second_acl = reply.acl_index
Pavel Kotucek59dda062017-03-02 15:22:47 +0100580
581 # Test 2: try to modify a nonexistent ACL
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200582 reply = self.vapi.acl_add_replace(acl_index=432, r=r,
Jon Loeliger27cadd22017-11-10 13:15:12 -0600583 tag="FFFF:FFFF", expected_retval=-6)
584 self.assertEqual(reply.retval, -6)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100585 # The ACL number should pass through
586 self.assertEqual(reply.acl_index, 432)
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200587 # apply an ACL on an interface inbound, try to delete ACL, must fail
588 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
589 n_input=1,
590 acls=[first_acl])
Jon Loeliger27cadd22017-11-10 13:15:12 -0600591 reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=-142)
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200592 # Unapply an ACL and then try to delete it - must be ok
593 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
594 n_input=0,
595 acls=[])
596 reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=0)
597
598 # apply an ACL on an interface outbound, try to delete ACL, must fail
599 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
600 n_input=0,
601 acls=[second_acl])
Jon Loeliger27cadd22017-11-10 13:15:12 -0600602 reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=-143)
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200603 # Unapply the ACL and then try to delete it - must be ok
604 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
605 n_input=0,
606 acls=[])
607 reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=0)
608
609 # try to apply a nonexistent ACL - must fail
610 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
611 n_input=1,
612 acls=[first_acl],
Jon Loeliger27cadd22017-11-10 13:15:12 -0600613 expected_retval=-6)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100614
615 self.logger.info("ACLP_TEST_FINISH_0001")
616
617 def test_0002_acl_permit_apply(self):
618 """ permit ACL apply test
619 """
620 self.logger.info("ACLP_TEST_START_0002")
621
622 rules = []
623 rules.append(self.create_rule(self.IPV4, self.PERMIT,
624 0, self.proto[self.IP][self.UDP]))
625 rules.append(self.create_rule(self.IPV4, self.PERMIT,
626 0, self.proto[self.IP][self.TCP]))
627
628 # Apply rules
629 self.apply_rules(rules, "permit per-flow")
630
631 # Traffic should still pass
632 self.run_verify_test(self.IP, self.IPV4, -1)
633 self.logger.info("ACLP_TEST_FINISH_0002")
634
635 def test_0003_acl_deny_apply(self):
636 """ deny ACL apply test
637 """
638 self.logger.info("ACLP_TEST_START_0003")
639 # Add a deny-flows ACL
640 rules = []
641 rules.append(self.create_rule(self.IPV4, self.DENY,
642 self.PORTS_ALL, self.proto[self.IP][self.UDP]))
643 # Permit ip any any in the end
644 rules.append(self.create_rule(self.IPV4, self.PERMIT,
645 self.PORTS_ALL, 0))
646
647 # Apply rules
648 self.apply_rules(rules, "deny per-flow;permit all")
649
650 # Traffic should not pass
651 self.run_verify_negat_test(self.IP, self.IPV4,
652 self.proto[self.IP][self.UDP])
653 self.logger.info("ACLP_TEST_FINISH_0003")
654 # self.assertEqual(1, 0)
655
656 def test_0004_vpp624_permit_icmpv4(self):
657 """ VPP_624 permit ICMPv4
658 """
659 self.logger.info("ACLP_TEST_START_0004")
660
661 # Add an ACL
662 rules = []
663 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
664 self.proto[self.ICMP][self.ICMPv4]))
665 # deny ip any any in the end
666 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
667
668 # Apply rules
669 self.apply_rules(rules, "permit icmpv4")
670
671 # Traffic should still pass
672 self.run_verify_test(self.ICMP, self.IPV4,
673 self.proto[self.ICMP][self.ICMPv4])
674
675 self.logger.info("ACLP_TEST_FINISH_0004")
676
677 def test_0005_vpp624_permit_icmpv6(self):
678 """ VPP_624 permit ICMPv6
679 """
680 self.logger.info("ACLP_TEST_START_0005")
681
682 # Add an ACL
683 rules = []
684 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
685 self.proto[self.ICMP][self.ICMPv6]))
686 # deny ip any any in the end
687 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
688
689 # Apply rules
690 self.apply_rules(rules, "permit icmpv6")
691
692 # Traffic should still pass
693 self.run_verify_test(self.ICMP, self.IPV6,
694 self.proto[self.ICMP][self.ICMPv6])
695
696 self.logger.info("ACLP_TEST_FINISH_0005")
697
698 def test_0006_vpp624_deny_icmpv4(self):
699 """ VPP_624 deny ICMPv4
700 """
701 self.logger.info("ACLP_TEST_START_0006")
702 # Add an ACL
703 rules = []
704 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
705 self.proto[self.ICMP][self.ICMPv4]))
706 # permit ip any any in the end
707 rules.append(self.create_rule(self.IPV4, self.PERMIT,
708 self.PORTS_ALL, 0))
709
710 # Apply rules
711 self.apply_rules(rules, "deny icmpv4")
712
713 # Traffic should not pass
714 self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
715
716 self.logger.info("ACLP_TEST_FINISH_0006")
717
718 def test_0007_vpp624_deny_icmpv6(self):
719 """ VPP_624 deny ICMPv6
720 """
721 self.logger.info("ACLP_TEST_START_0007")
722 # Add an ACL
723 rules = []
724 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
725 self.proto[self.ICMP][self.ICMPv6]))
726 # deny ip any any in the end
727 rules.append(self.create_rule(self.IPV6, self.PERMIT,
728 self.PORTS_ALL, 0))
729
730 # Apply rules
731 self.apply_rules(rules, "deny icmpv6")
732
733 # Traffic should not pass
734 self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
735
736 self.logger.info("ACLP_TEST_FINISH_0007")
737
738 def test_0008_tcp_permit_v4(self):
739 """ permit TCPv4
740 """
741 self.logger.info("ACLP_TEST_START_0008")
742
743 # Add an ACL
744 rules = []
745 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
746 self.proto[self.IP][self.TCP]))
747 # deny ip any any in the end
748 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
749
750 # Apply rules
751 self.apply_rules(rules, "permit ipv4 tcp")
752
753 # Traffic should still pass
754 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
755
756 self.logger.info("ACLP_TEST_FINISH_0008")
757
758 def test_0009_tcp_permit_v6(self):
759 """ permit TCPv6
760 """
761 self.logger.info("ACLP_TEST_START_0009")
762
763 # Add an ACL
764 rules = []
765 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
766 self.proto[self.IP][self.TCP]))
767 # deny ip any any in the end
768 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
769
770 # Apply rules
771 self.apply_rules(rules, "permit ip6 tcp")
772
773 # Traffic should still pass
774 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
775
776 self.logger.info("ACLP_TEST_FINISH_0008")
777
778 def test_0010_udp_permit_v4(self):
779 """ permit UDPv4
780 """
781 self.logger.info("ACLP_TEST_START_0010")
782
783 # Add an ACL
784 rules = []
785 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
786 self.proto[self.IP][self.UDP]))
787 # deny ip any any in the end
788 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
789
790 # Apply rules
791 self.apply_rules(rules, "permit ipv udp")
792
793 # Traffic should still pass
794 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
795
796 self.logger.info("ACLP_TEST_FINISH_0010")
797
798 def test_0011_udp_permit_v6(self):
799 """ permit UDPv6
800 """
801 self.logger.info("ACLP_TEST_START_0011")
802
803 # Add an ACL
804 rules = []
805 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
806 self.proto[self.IP][self.UDP]))
807 # deny ip any any in the end
808 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
809
810 # Apply rules
811 self.apply_rules(rules, "permit ip6 udp")
812
813 # Traffic should still pass
814 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
815
816 self.logger.info("ACLP_TEST_FINISH_0011")
817
818 def test_0012_tcp_deny(self):
819 """ deny TCPv4/v6
820 """
821 self.logger.info("ACLP_TEST_START_0012")
822
823 # Add an ACL
824 rules = []
825 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
826 self.proto[self.IP][self.TCP]))
827 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
828 self.proto[self.IP][self.TCP]))
829 # permit ip any any in the end
830 rules.append(self.create_rule(self.IPV4, self.PERMIT,
831 self.PORTS_ALL, 0))
832 rules.append(self.create_rule(self.IPV6, self.PERMIT,
833 self.PORTS_ALL, 0))
834
835 # Apply rules
836 self.apply_rules(rules, "deny ip4/ip6 tcp")
837
838 # Traffic should not pass
839 self.run_verify_negat_test(self.IP, self.IPRANDOM,
840 self.proto[self.IP][self.TCP])
841
842 self.logger.info("ACLP_TEST_FINISH_0012")
843
844 def test_0013_udp_deny(self):
845 """ deny UDPv4/v6
846 """
847 self.logger.info("ACLP_TEST_START_0013")
848
849 # Add an ACL
850 rules = []
851 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
852 self.proto[self.IP][self.UDP]))
853 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
854 self.proto[self.IP][self.UDP]))
855 # permit ip any any in the end
856 rules.append(self.create_rule(self.IPV4, self.PERMIT,
857 self.PORTS_ALL, 0))
858 rules.append(self.create_rule(self.IPV6, self.PERMIT,
859 self.PORTS_ALL, 0))
860
861 # Apply rules
862 self.apply_rules(rules, "deny ip4/ip6 udp")
863
864 # Traffic should not pass
865 self.run_verify_negat_test(self.IP, self.IPRANDOM,
866 self.proto[self.IP][self.UDP])
867
868 self.logger.info("ACLP_TEST_FINISH_0013")
869
870 def test_0014_acl_dump(self):
871 """ verify add/dump acls
872 """
873 self.logger.info("ACLP_TEST_START_0014")
874
875 r = [[self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
876 [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
877 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
878 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
879 [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
880 [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
881 [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
882 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
883 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
884 [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
885 [self.IPV4, self.DENY, self.PORTS_ALL, 0],
886 [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
887 [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
888 [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
889 [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
890 [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
891 [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
892 [self.IPV6, self.DENY, self.PORTS_ALL, 0]
893 ]
894
895 # Add and verify new ACLs
896 rules = []
897 for i in range(len(r)):
898 rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
899
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200900 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules)
901 result = self.vapi.acl_dump(reply.acl_index)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100902
903 i = 0
904 for drules in result:
905 for dr in drules.r:
906 self.assertEqual(dr.is_ipv6, r[i][0])
907 self.assertEqual(dr.is_permit, r[i][1])
908 self.assertEqual(dr.proto, r[i][3])
909
910 if r[i][2] > 0:
911 self.assertEqual(dr.srcport_or_icmptype_first, r[i][2])
912 else:
913 if r[i][2] < 0:
914 self.assertEqual(dr.srcport_or_icmptype_first, 0)
915 self.assertEqual(dr.srcport_or_icmptype_last, 65535)
916 else:
917 if dr.proto == self.proto[self.IP][self.TCP]:
918 self.assertGreater(dr.srcport_or_icmptype_first,
919 self.tcp_sport_from-1)
920 self.assertLess(dr.srcport_or_icmptype_first,
921 self.tcp_sport_to+1)
922 self.assertGreater(dr.dstport_or_icmpcode_last,
923 self.tcp_dport_from-1)
924 self.assertLess(dr.dstport_or_icmpcode_last,
925 self.tcp_dport_to+1)
926 elif dr.proto == self.proto[self.IP][self.UDP]:
927 self.assertGreater(dr.srcport_or_icmptype_first,
928 self.udp_sport_from-1)
929 self.assertLess(dr.srcport_or_icmptype_first,
930 self.udp_sport_to+1)
931 self.assertGreater(dr.dstport_or_icmpcode_last,
932 self.udp_dport_from-1)
933 self.assertLess(dr.dstport_or_icmpcode_last,
934 self.udp_dport_to+1)
935 i += 1
936
937 self.logger.info("ACLP_TEST_FINISH_0014")
938
939 def test_0015_tcp_permit_port_v4(self):
940 """ permit single TCPv4
941 """
942 self.logger.info("ACLP_TEST_START_0015")
943
944 port = random.randint(0, 65535)
945 # Add an ACL
946 rules = []
947 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
948 self.proto[self.IP][self.TCP]))
949 # deny ip any any in the end
950 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
951
952 # Apply rules
953 self.apply_rules(rules, "permit ip4 tcp "+str(port))
954
955 # Traffic should still pass
956 self.run_verify_test(self.IP, self.IPV4,
957 self.proto[self.IP][self.TCP], port)
958
959 self.logger.info("ACLP_TEST_FINISH_0015")
960
961 def test_0016_udp_permit_port_v4(self):
962 """ permit single UDPv4
963 """
964 self.logger.info("ACLP_TEST_START_0016")
965
966 port = random.randint(0, 65535)
967 # Add an ACL
968 rules = []
969 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
970 self.proto[self.IP][self.UDP]))
971 # deny ip any any in the end
972 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
973
974 # Apply rules
975 self.apply_rules(rules, "permit ip4 tcp "+str(port))
976
977 # Traffic should still pass
978 self.run_verify_test(self.IP, self.IPV4,
979 self.proto[self.IP][self.UDP], port)
980
981 self.logger.info("ACLP_TEST_FINISH_0016")
982
983 def test_0017_tcp_permit_port_v6(self):
984 """ permit single TCPv6
985 """
986 self.logger.info("ACLP_TEST_START_0017")
987
988 port = random.randint(0, 65535)
989 # Add an ACL
990 rules = []
991 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
992 self.proto[self.IP][self.TCP]))
993 # deny ip any any in the end
994 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
995
996 # Apply rules
997 self.apply_rules(rules, "permit ip4 tcp "+str(port))
998
999 # Traffic should still pass
1000 self.run_verify_test(self.IP, self.IPV6,
1001 self.proto[self.IP][self.TCP], port)
1002
1003 self.logger.info("ACLP_TEST_FINISH_0017")
1004
1005 def test_0018_udp_permit_port_v6(self):
1006 """ permit single UPPv6
1007 """
1008 self.logger.info("ACLP_TEST_START_0018")
1009
1010 port = random.randint(0, 65535)
1011 # Add an ACL
1012 rules = []
1013 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1014 self.proto[self.IP][self.UDP]))
1015 # deny ip any any in the end
1016 rules.append(self.create_rule(self.IPV6, self.DENY,
1017 self.PORTS_ALL, 0))
1018
1019 # Apply rules
1020 self.apply_rules(rules, "permit ip4 tcp "+str(port))
1021
1022 # Traffic should still pass
1023 self.run_verify_test(self.IP, self.IPV6,
1024 self.proto[self.IP][self.UDP], port)
1025
1026 self.logger.info("ACLP_TEST_FINISH_0018")
1027
1028 def test_0019_udp_deny_port(self):
1029 """ deny single TCPv4/v6
1030 """
1031 self.logger.info("ACLP_TEST_START_0019")
1032
1033 port = random.randint(0, 65535)
1034 # Add an ACL
1035 rules = []
1036 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1037 self.proto[self.IP][self.TCP]))
1038 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1039 self.proto[self.IP][self.TCP]))
1040 # Permit ip any any in the end
1041 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1042 self.PORTS_ALL, 0))
1043 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1044 self.PORTS_ALL, 0))
1045
1046 # Apply rules
1047 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1048
1049 # Traffic should not pass
1050 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1051 self.proto[self.IP][self.TCP], port)
1052
1053 self.logger.info("ACLP_TEST_FINISH_0019")
1054
1055 def test_0020_udp_deny_port(self):
1056 """ deny single UDPv4/v6
1057 """
1058 self.logger.info("ACLP_TEST_START_0020")
1059
1060 port = random.randint(0, 65535)
1061 # Add an ACL
1062 rules = []
1063 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1064 self.proto[self.IP][self.UDP]))
1065 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1066 self.proto[self.IP][self.UDP]))
1067 # Permit ip any any in the end
1068 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1069 self.PORTS_ALL, 0))
1070 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1071 self.PORTS_ALL, 0))
1072
1073 # Apply rules
1074 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1075
1076 # Traffic should not pass
1077 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1078 self.proto[self.IP][self.UDP], port)
1079
1080 self.logger.info("ACLP_TEST_FINISH_0020")
1081
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +00001082 def test_0021_udp_deny_port_verify_fragment_deny(self):
1083 """ deny single UDPv4/v6, permit ip any, verify non-initial fragment blocked
1084 """
1085 self.logger.info("ACLP_TEST_START_0021")
1086
1087 port = random.randint(0, 65535)
1088 # Add an ACL
1089 rules = []
1090 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1091 self.proto[self.IP][self.UDP]))
1092 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1093 self.proto[self.IP][self.UDP]))
1094 # deny ip any any in the end
1095 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1096 self.PORTS_ALL, 0))
1097 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1098 self.PORTS_ALL, 0))
1099
1100 # Apply rules
1101 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1102
1103 # Traffic should not pass
1104 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1105 self.proto[self.IP][self.UDP], port, True)
1106
1107 self.logger.info("ACLP_TEST_FINISH_0021")
1108
Pavel Kotuceke7b67342017-04-18 13:12:20 +02001109 def test_0022_zero_length_udp_ipv4(self):
1110 """ VPP-687 zero length udp ipv4 packet"""
1111 self.logger.info("ACLP_TEST_START_0022")
1112
1113 port = random.randint(0, 65535)
1114 # Add an ACL
1115 rules = []
1116 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
1117 self.proto[self.IP][self.UDP]))
1118 # deny ip any any in the end
1119 rules.append(
1120 self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1121
1122 # Apply rules
1123 self.apply_rules(rules, "permit empty udp ip4 " + str(port))
1124
1125 # Traffic should still pass
1126 # Create incoming packet streams for packet-generator interfaces
1127 pkts_cnt = 0
1128 pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1129 self.IP, self.IPV4,
1130 self.proto[self.IP][self.UDP], port,
1131 False, False)
1132 if len(pkts) > 0:
1133 self.pg0.add_stream(pkts)
1134 pkts_cnt += len(pkts)
1135
1136 # Enable packet capture and start packet sendingself.IPV
1137 self.pg_enable_capture(self.pg_interfaces)
1138 self.pg_start()
1139
1140 self.pg1.get_capture(pkts_cnt)
1141
1142 self.logger.info("ACLP_TEST_FINISH_0022")
1143
1144 def test_0023_zero_length_udp_ipv6(self):
1145 """ VPP-687 zero length udp ipv6 packet"""
1146 self.logger.info("ACLP_TEST_START_0023")
1147
1148 port = random.randint(0, 65535)
1149 # Add an ACL
1150 rules = []
1151 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1152 self.proto[self.IP][self.UDP]))
1153 # deny ip any any in the end
1154 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1155
1156 # Apply rules
1157 self.apply_rules(rules, "permit empty udp ip6 "+str(port))
1158
1159 # Traffic should still pass
1160 # Create incoming packet streams for packet-generator interfaces
1161 pkts_cnt = 0
1162 pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1163 self.IP, self.IPV6,
1164 self.proto[self.IP][self.UDP], port,
1165 False, False)
1166 if len(pkts) > 0:
1167 self.pg0.add_stream(pkts)
1168 pkts_cnt += len(pkts)
1169
1170 # Enable packet capture and start packet sendingself.IPV
1171 self.pg_enable_capture(self.pg_interfaces)
1172 self.pg_start()
1173
1174 # Verify outgoing packet streams per packet-generator interface
1175 self.pg1.get_capture(pkts_cnt)
1176
1177 self.logger.info("ACLP_TEST_FINISH_0023")
1178
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +02001179 def test_0108_tcp_permit_v4(self):
1180 """ permit TCPv4 + non-match range
1181 """
1182 self.logger.info("ACLP_TEST_START_0108")
1183
1184 # Add an ACL
1185 rules = []
1186 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1187 self.proto[self.IP][self.TCP]))
1188 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1189 self.proto[self.IP][self.TCP]))
1190 # deny ip any any in the end
1191 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1192
1193 # Apply rules
1194 self.apply_rules(rules, "permit ipv4 tcp")
1195
1196 # Traffic should still pass
1197 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1198
1199 self.logger.info("ACLP_TEST_FINISH_0108")
1200
1201 def test_0109_tcp_permit_v6(self):
1202 """ permit TCPv6 + non-match range
1203 """
1204 self.logger.info("ACLP_TEST_START_0109")
1205
1206 # Add an ACL
1207 rules = []
1208 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1209 self.proto[self.IP][self.TCP]))
1210 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1211 self.proto[self.IP][self.TCP]))
1212 # deny ip any any in the end
1213 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1214
1215 # Apply rules
1216 self.apply_rules(rules, "permit ip6 tcp")
1217
1218 # Traffic should still pass
1219 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
1220
1221 self.logger.info("ACLP_TEST_FINISH_0109")
1222
1223 def test_0110_udp_permit_v4(self):
1224 """ permit UDPv4 + non-match range
1225 """
1226 self.logger.info("ACLP_TEST_START_0110")
1227
1228 # Add an ACL
1229 rules = []
1230 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1231 self.proto[self.IP][self.UDP]))
1232 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1233 self.proto[self.IP][self.UDP]))
1234 # deny ip any any in the end
1235 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1236
1237 # Apply rules
1238 self.apply_rules(rules, "permit ipv4 udp")
1239
1240 # Traffic should still pass
1241 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
1242
1243 self.logger.info("ACLP_TEST_FINISH_0110")
1244
1245 def test_0111_udp_permit_v6(self):
1246 """ permit UDPv6 + non-match range
1247 """
1248 self.logger.info("ACLP_TEST_START_0111")
1249
1250 # Add an ACL
1251 rules = []
1252 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1253 self.proto[self.IP][self.UDP]))
1254 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1255 self.proto[self.IP][self.UDP]))
1256 # deny ip any any in the end
1257 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1258
1259 # Apply rules
1260 self.apply_rules(rules, "permit ip6 udp")
1261
1262 # Traffic should still pass
1263 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
1264
1265 self.logger.info("ACLP_TEST_FINISH_0111")
1266
1267 def test_0112_tcp_deny(self):
1268 """ deny TCPv4/v6 + non-match range
1269 """
1270 self.logger.info("ACLP_TEST_START_0112")
1271
1272 # Add an ACL
1273 rules = []
1274 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1275 self.PORTS_RANGE_2,
1276 self.proto[self.IP][self.TCP]))
1277 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1278 self.PORTS_RANGE_2,
1279 self.proto[self.IP][self.TCP]))
1280 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1281 self.proto[self.IP][self.TCP]))
1282 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1283 self.proto[self.IP][self.TCP]))
1284 # permit ip any any in the end
1285 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1286 self.PORTS_ALL, 0))
1287 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1288 self.PORTS_ALL, 0))
1289
1290 # Apply rules
1291 self.apply_rules(rules, "deny ip4/ip6 tcp")
1292
1293 # Traffic should not pass
1294 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1295 self.proto[self.IP][self.TCP])
1296
1297 self.logger.info("ACLP_TEST_FINISH_0112")
1298
1299 def test_0113_udp_deny(self):
1300 """ deny UDPv4/v6 + non-match range
1301 """
1302 self.logger.info("ACLP_TEST_START_0113")
1303
1304 # Add an ACL
1305 rules = []
1306 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1307 self.PORTS_RANGE_2,
1308 self.proto[self.IP][self.UDP]))
1309 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1310 self.PORTS_RANGE_2,
1311 self.proto[self.IP][self.UDP]))
1312 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1313 self.proto[self.IP][self.UDP]))
1314 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1315 self.proto[self.IP][self.UDP]))
1316 # permit ip any any in the end
1317 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1318 self.PORTS_ALL, 0))
1319 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1320 self.PORTS_ALL, 0))
1321
1322 # Apply rules
1323 self.apply_rules(rules, "deny ip4/ip6 udp")
1324
1325 # Traffic should not pass
1326 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1327 self.proto[self.IP][self.UDP])
1328
1329 self.logger.info("ACLP_TEST_FINISH_0113")
1330
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001331 def test_0300_tcp_permit_v4_etype_aaaa(self):
1332 """ permit TCPv4, send 0xAAAA etype
1333 """
1334 self.logger.info("ACLP_TEST_START_0300")
1335
1336 # Add an ACL
1337 rules = []
1338 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1339 self.proto[self.IP][self.TCP]))
1340 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1341 self.proto[self.IP][self.TCP]))
1342 # deny ip any any in the end
1343 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1344
1345 # Apply rules
1346 self.apply_rules(rules, "permit ipv4 tcp")
1347
1348 # Traffic should still pass
1349 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1350
1351 # Traffic should still pass also for an odd ethertype
1352 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1353 0, False, True, 0xaaaa)
1354
1355 self.logger.info("ACLP_TEST_FINISH_0300")
1356
1357 def test_0305_tcp_permit_v4_etype_blacklist_aaaa(self):
1358 """ permit TCPv4, whitelist 0x0BBB ethertype, send 0xAAAA, 0x0BBB
1359 """
1360 self.logger.info("ACLP_TEST_START_0305")
1361
1362 # Add an ACL
1363 rules = []
1364 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1365 self.proto[self.IP][self.TCP]))
1366 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1367 self.proto[self.IP][self.TCP]))
1368 # deny ip any any in the end
1369 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1370
1371 # Apply rules
1372 self.apply_rules(rules, "permit ipv4 tcp")
1373
1374 # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1375 self.etype_whitelist([0xbbb], 1)
1376
1377 # The IPv4 traffic should still pass
1378 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1379
1380 # The oddball ethertype should be blocked
1381 self.run_verify_negat_test(self.IP, self.IPV4,
1382 self.proto[self.IP][self.TCP],
1383 0, False, 0xaaaa)
1384
1385 # The whitelisted traffic, on the other hand, should pass
1386 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1387 0, False, True, 0x0bbb)
1388
1389 # remove the whitelist, the previously blocked 0xAAAA should pass now
1390 self.etype_whitelist([], 0)
1391 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1392 0, False, True, 0xaaaa)
1393
1394 self.logger.info("ACLP_TEST_FINISH_0305")
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +02001395
Pavel Kotucek59dda062017-03-02 15:22:47 +01001396if __name__ == '__main__':
1397 unittest.main(testRunner=VppTestRunner)