blob: fd45b4a27e77abe734db9d1a2796f88141cea5ed [file] [log] [blame]
Pavel Kotucek59dda062017-03-02 15:22:47 +01001#!/usr/bin/env python
2"""ACL plugin Test Case HLD:
3"""
4
5import unittest
6import random
7
8from scapy.packet import Raw
9from scapy.layers.l2 import Ether
10from scapy.layers.inet import IP, TCP, UDP, ICMP
11from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +000012from scapy.layers.inet6 import IPv6ExtHdrFragment
Pavel Kotucek59dda062017-03-02 15:22:47 +010013from framework import VppTestCase, VppTestRunner
14from util import Host, ppp
15
Andrew Yourtchenkode3682f2018-03-23 10:38:46 +010016from vpp_lo_interface import VppLoInterface
17
Pavel Kotucek59dda062017-03-02 15:22:47 +010018
19class TestACLplugin(VppTestCase):
20 """ ACL plugin Test Case """
21
22 # traffic types
23 IP = 0
24 ICMP = 1
25
26 # IP version
27 IPRANDOM = -1
28 IPV4 = 0
29 IPV6 = 1
30
31 # rule types
32 DENY = 0
33 PERMIT = 1
34
35 # supported protocols
36 proto = [[6, 17], [1, 58]]
37 proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
38 ICMPv4 = 0
39 ICMPv6 = 1
40 TCP = 0
41 UDP = 1
42 PROTO_ALL = 0
43
44 # port ranges
45 PORTS_ALL = -1
46 PORTS_RANGE = 0
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +020047 PORTS_RANGE_2 = 1
Pavel Kotucek59dda062017-03-02 15:22:47 +010048 udp_sport_from = 10
49 udp_sport_to = udp_sport_from + 5
50 udp_dport_from = 20000
51 udp_dport_to = udp_dport_from + 5000
52 tcp_sport_from = 30
53 tcp_sport_to = tcp_sport_from + 5
54 tcp_dport_from = 40000
55 tcp_dport_to = tcp_dport_from + 5000
56
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +020057 udp_sport_from_2 = 90
58 udp_sport_to_2 = udp_sport_from_2 + 5
59 udp_dport_from_2 = 30000
60 udp_dport_to_2 = udp_dport_from_2 + 5000
61 tcp_sport_from_2 = 130
62 tcp_sport_to_2 = tcp_sport_from_2 + 5
63 tcp_dport_from_2 = 20000
64 tcp_dport_to_2 = tcp_dport_from_2 + 5000
65
Pavel Kotucek59dda062017-03-02 15:22:47 +010066 icmp4_type = 8 # echo request
67 icmp4_code = 3
68 icmp6_type = 128 # echo request
69 icmp6_code = 3
70
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +020071 icmp4_type_2 = 8
72 icmp4_code_from_2 = 5
73 icmp4_code_to_2 = 20
74 icmp6_type_2 = 128
75 icmp6_code_from_2 = 8
76 icmp6_code_to_2 = 42
77
Pavel Kotucek59dda062017-03-02 15:22:47 +010078 # Test variables
79 bd_id = 1
80
81 @classmethod
82 def setUpClass(cls):
83 """
84 Perform standard class setup (defined by class method setUpClass in
85 class VppTestCase) before running the test case, set test case related
86 variables and configure VPP.
87 """
88 super(TestACLplugin, cls).setUpClass()
89
Pavel Kotucek59dda062017-03-02 15:22:47 +010090 try:
91 # Create 2 pg interfaces
92 cls.create_pg_interfaces(range(2))
93
94 # Packet flows mapping pg0 -> pg1, pg2 etc.
95 cls.flows = dict()
96 cls.flows[cls.pg0] = [cls.pg1]
97
98 # Packet sizes
99 cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
100
101 # Create BD with MAC learning and unknown unicast flooding disabled
102 # and put interfaces to this BD
103 cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
104 learn=1)
105 for pg_if in cls.pg_interfaces:
106 cls.vapi.sw_interface_set_l2_bridge(pg_if.sw_if_index,
107 bd_id=cls.bd_id)
108
109 # Set up all interfaces
110 for i in cls.pg_interfaces:
111 i.admin_up()
112
113 # Mapping between packet-generator index and lists of test hosts
114 cls.hosts_by_pg_idx = dict()
115 for pg_if in cls.pg_interfaces:
116 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
117
118 # Create list of deleted hosts
119 cls.deleted_hosts_by_pg_idx = dict()
120 for pg_if in cls.pg_interfaces:
121 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
122
123 # warm-up the mac address tables
124 # self.warmup_test()
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +0200125 count = 16
126 start = 0
127 n_int = len(cls.pg_interfaces)
128 macs_per_if = count / n_int
129 i = -1
130 for pg_if in cls.pg_interfaces:
131 i += 1
132 start_nr = macs_per_if * i + start
133 end_nr = count + start if i == (n_int - 1) \
134 else macs_per_if * (i + 1) + start
135 hosts = cls.hosts_by_pg_idx[pg_if.sw_if_index]
136 for j in range(start_nr, end_nr):
137 host = Host(
138 "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
139 "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
140 "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
141 hosts.append(host)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100142
143 except Exception:
144 super(TestACLplugin, cls).tearDownClass()
145 raise
146
147 def setUp(self):
148 super(TestACLplugin, self).setUp()
149 self.reset_packet_infos()
150
151 def tearDown(self):
152 """
153 Show various debug prints after each test.
154 """
155 super(TestACLplugin, self).tearDown()
156 if not self.vpp_dead:
157 self.logger.info(self.vapi.ppcli("show l2fib verbose"))
Andrew Yourtchenko7f4d5772017-05-24 13:20:47 +0200158 self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
159 self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
160 self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
Pavel Kotucek59dda062017-03-02 15:22:47 +0100161 self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
162 % self.bd_id))
163
Pavel Kotucek59dda062017-03-02 15:22:47 +0100164 def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
165 s_prefix=0, s_ip='\x00\x00\x00\x00',
166 d_prefix=0, d_ip='\x00\x00\x00\x00'):
167 if proto == -1:
168 return
169 if ports == self.PORTS_ALL:
170 sport_from = 0
171 dport_from = 0
172 sport_to = 65535 if proto != 1 and proto != 58 else 255
173 dport_to = sport_to
174 elif ports == self.PORTS_RANGE:
175 if proto == 1:
176 sport_from = self.icmp4_type
177 sport_to = self.icmp4_type
178 dport_from = self.icmp4_code
179 dport_to = self.icmp4_code
180 elif proto == 58:
181 sport_from = self.icmp6_type
182 sport_to = self.icmp6_type
183 dport_from = self.icmp6_code
184 dport_to = self.icmp6_code
185 elif proto == self.proto[self.IP][self.TCP]:
186 sport_from = self.tcp_sport_from
187 sport_to = self.tcp_sport_to
188 dport_from = self.tcp_dport_from
189 dport_to = self.tcp_dport_to
190 elif proto == self.proto[self.IP][self.UDP]:
191 sport_from = self.udp_sport_from
192 sport_to = self.udp_sport_to
193 dport_from = self.udp_dport_from
194 dport_to = self.udp_dport_to
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +0200195 elif ports == self.PORTS_RANGE_2:
196 if proto == 1:
197 sport_from = self.icmp4_type_2
198 sport_to = self.icmp4_type_2
199 dport_from = self.icmp4_code_from_2
200 dport_to = self.icmp4_code_to_2
201 elif proto == 58:
202 sport_from = self.icmp6_type_2
203 sport_to = self.icmp6_type_2
204 dport_from = self.icmp6_code_from_2
205 dport_to = self.icmp6_code_to_2
206 elif proto == self.proto[self.IP][self.TCP]:
207 sport_from = self.tcp_sport_from_2
208 sport_to = self.tcp_sport_to_2
209 dport_from = self.tcp_dport_from_2
210 dport_to = self.tcp_dport_to_2
211 elif proto == self.proto[self.IP][self.UDP]:
212 sport_from = self.udp_sport_from_2
213 sport_to = self.udp_sport_to_2
214 dport_from = self.udp_dport_from_2
215 dport_to = self.udp_dport_to_2
Pavel Kotucek59dda062017-03-02 15:22:47 +0100216 else:
217 sport_from = ports
218 sport_to = ports
219 dport_from = ports
220 dport_to = ports
221
222 rule = ({'is_permit': permit_deny, 'is_ipv6': ip, 'proto': proto,
223 'srcport_or_icmptype_first': sport_from,
224 'srcport_or_icmptype_last': sport_to,
225 'src_ip_prefix_len': s_prefix,
226 'src_ip_addr': s_ip,
227 'dstport_or_icmpcode_first': dport_from,
228 'dstport_or_icmpcode_last': dport_to,
229 'dst_ip_prefix_len': d_prefix,
230 'dst_ip_addr': d_ip})
231 return rule
232
233 def apply_rules(self, rules, tag=''):
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200234 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
235 tag=tag)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100236 self.logger.info("Dumped ACL: " + str(
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200237 self.vapi.acl_dump(reply.acl_index)))
Pavel Kotucek59dda062017-03-02 15:22:47 +0100238 # Apply a ACL on the interface as inbound
239 for i in self.pg_interfaces:
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200240 self.vapi.acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
241 n_input=1,
242 acls=[reply.acl_index])
Pavel Kotucek59dda062017-03-02 15:22:47 +0100243 return
244
Andrew Yourtchenkode3682f2018-03-23 10:38:46 +0100245 def apply_rules_to(self, rules, tag='', sw_if_index=0xFFFFFFFF):
246 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
247 tag=tag)
248 self.logger.info("Dumped ACL: " + str(
249 self.vapi.acl_dump(reply.acl_index)))
250 # Apply a ACL on the interface as inbound
251 self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index,
252 n_input=1,
253 acls=[reply.acl_index])
254 return
255
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100256 def etype_whitelist(self, whitelist, n_input):
257 # Apply whitelists on all the interfaces
258 for i in self.pg_interfaces:
259 # checkstyle can't read long names. Help them.
260 fun = self.vapi.acl_interface_set_etype_whitelist
261 fun(sw_if_index=i.sw_if_index, n_input=n_input,
262 whitelist=whitelist)
263 return
264
Pavel Kotucek59dda062017-03-02 15:22:47 +0100265 def create_upper_layer(self, packet_index, proto, ports=0):
266 p = self.proto_map[proto]
267 if p == 'UDP':
268 if ports == 0:
269 return UDP(sport=random.randint(self.udp_sport_from,
270 self.udp_sport_to),
271 dport=random.randint(self.udp_dport_from,
272 self.udp_dport_to))
273 else:
274 return UDP(sport=ports, dport=ports)
275 elif p == 'TCP':
276 if ports == 0:
277 return TCP(sport=random.randint(self.tcp_sport_from,
278 self.tcp_sport_to),
279 dport=random.randint(self.tcp_dport_from,
280 self.tcp_dport_to))
281 else:
282 return TCP(sport=ports, dport=ports)
283 return ''
284
285 def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100286 proto=-1, ports=0, fragments=False,
287 pkt_raw=True, etype=-1):
Pavel Kotucek59dda062017-03-02 15:22:47 +0100288 """
289 Create input packet stream for defined interface using hosts or
290 deleted_hosts list.
291
292 :param object src_if: Interface to create packet stream for.
293 :param list packet_sizes: List of required packet sizes.
294 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
295 :return: Stream of packets.
296 """
297 pkts = []
298 if self.flows.__contains__(src_if):
299 src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
300 for dst_if in self.flows[src_if]:
301 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
302 n_int = len(dst_hosts) * len(src_hosts)
303 for i in range(0, n_int):
304 dst_host = dst_hosts[i / len(src_hosts)]
305 src_host = src_hosts[i % len(src_hosts)]
306 pkt_info = self.create_packet_info(src_if, dst_if)
307 if ipv6 == 1:
308 pkt_info.ip = 1
309 elif ipv6 == 0:
310 pkt_info.ip = 0
311 else:
312 pkt_info.ip = random.choice([0, 1])
313 if proto == -1:
314 pkt_info.proto = random.choice(self.proto[self.IP])
315 else:
316 pkt_info.proto = proto
317 payload = self.info_to_payload(pkt_info)
318 p = Ether(dst=dst_host.mac, src=src_host.mac)
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100319 if etype > 0:
320 p = Ether(dst=dst_host.mac,
321 src=src_host.mac,
322 type=etype)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100323 if pkt_info.ip:
324 p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000325 if fragments:
326 p /= IPv6ExtHdrFragment(offset=64, m=1)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100327 else:
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000328 if fragments:
329 p /= IP(src=src_host.ip4, dst=dst_host.ip4,
330 flags=1, frag=64)
331 else:
332 p /= IP(src=src_host.ip4, dst=dst_host.ip4)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100333 if traffic_type == self.ICMP:
334 if pkt_info.ip:
335 p /= ICMPv6EchoRequest(type=self.icmp6_type,
336 code=self.icmp6_code)
337 else:
338 p /= ICMP(type=self.icmp4_type,
339 code=self.icmp4_code)
340 else:
341 p /= self.create_upper_layer(i, pkt_info.proto, ports)
Pavel Kotuceke7b67342017-04-18 13:12:20 +0200342 if pkt_raw:
343 p /= Raw(payload)
344 pkt_info.data = p.copy()
345 if pkt_raw:
346 size = random.choice(packet_sizes)
347 self.extend_packet(p, size)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100348 pkts.append(p)
349 return pkts
350
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100351 def verify_capture(self, pg_if, capture,
352 traffic_type=0, ip_type=0, etype=-1):
Pavel Kotucek59dda062017-03-02 15:22:47 +0100353 """
354 Verify captured input packet stream for defined interface.
355
356 :param object pg_if: Interface to verify captured packet stream for.
357 :param list capture: Captured packet stream.
358 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
359 """
360 last_info = dict()
361 for i in self.pg_interfaces:
362 last_info[i.sw_if_index] = None
363 dst_sw_if_index = pg_if.sw_if_index
364 for packet in capture:
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100365 if etype > 0:
366 if packet[Ether].type != etype:
367 self.logger.error(ppp("Unexpected ethertype in packet:",
368 packet))
369 else:
370 continue
Pavel Kotucek59dda062017-03-02 15:22:47 +0100371 try:
372 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
373 if traffic_type == self.ICMP and ip_type == self.IPV6:
374 payload_info = self.payload_to_info(
375 packet[ICMPv6EchoRequest].data)
376 payload = packet[ICMPv6EchoRequest]
377 else:
378 payload_info = self.payload_to_info(str(packet[Raw]))
379 payload = packet[self.proto_map[payload_info.proto]]
380 except:
381 self.logger.error(ppp("Unexpected or invalid packet "
382 "(outside network):", packet))
383 raise
384
385 if ip_type != 0:
386 self.assertEqual(payload_info.ip, ip_type)
387 if traffic_type == self.ICMP:
388 try:
389 if payload_info.ip == 0:
390 self.assertEqual(payload.type, self.icmp4_type)
391 self.assertEqual(payload.code, self.icmp4_code)
392 else:
393 self.assertEqual(payload.type, self.icmp6_type)
394 self.assertEqual(payload.code, self.icmp6_code)
395 except:
396 self.logger.error(ppp("Unexpected or invalid packet "
397 "(outside network):", packet))
398 raise
399 else:
400 try:
401 ip_version = IPv6 if payload_info.ip == 1 else IP
402
403 ip = packet[ip_version]
404 packet_index = payload_info.index
405
406 self.assertEqual(payload_info.dst, dst_sw_if_index)
407 self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
408 (pg_if.name, payload_info.src,
409 packet_index))
410 next_info = self.get_next_packet_info_for_interface2(
411 payload_info.src, dst_sw_if_index,
412 last_info[payload_info.src])
413 last_info[payload_info.src] = next_info
414 self.assertTrue(next_info is not None)
415 self.assertEqual(packet_index, next_info.index)
416 saved_packet = next_info.data
417 # Check standard fields
418 self.assertEqual(ip.src, saved_packet[ip_version].src)
419 self.assertEqual(ip.dst, saved_packet[ip_version].dst)
420 p = self.proto_map[payload_info.proto]
421 if p == 'TCP':
422 tcp = packet[TCP]
423 self.assertEqual(tcp.sport, saved_packet[
424 TCP].sport)
425 self.assertEqual(tcp.dport, saved_packet[
426 TCP].dport)
427 elif p == 'UDP':
428 udp = packet[UDP]
429 self.assertEqual(udp.sport, saved_packet[
430 UDP].sport)
431 self.assertEqual(udp.dport, saved_packet[
432 UDP].dport)
433 except:
434 self.logger.error(ppp("Unexpected or invalid packet:",
435 packet))
436 raise
437 for i in self.pg_interfaces:
438 remaining_packet = self.get_next_packet_info_for_interface2(
439 i, dst_sw_if_index, last_info[i.sw_if_index])
440 self.assertTrue(
441 remaining_packet is None,
442 "Port %u: Packet expected from source %u didn't arrive" %
443 (dst_sw_if_index, i.sw_if_index))
444
445 def run_traffic_no_check(self):
446 # Test
447 # Create incoming packet streams for packet-generator interfaces
448 for i in self.pg_interfaces:
449 if self.flows.__contains__(i):
450 pkts = self.create_stream(i, self.pg_if_packet_sizes)
451 if len(pkts) > 0:
452 i.add_stream(pkts)
453
454 # Enable packet capture and start packet sending
455 self.pg_enable_capture(self.pg_interfaces)
456 self.pg_start()
457
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000458 def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100459 frags=False, pkt_raw=True, etype=-1):
Pavel Kotucek59dda062017-03-02 15:22:47 +0100460 # Test
461 # Create incoming packet streams for packet-generator interfaces
462 pkts_cnt = 0
463 for i in self.pg_interfaces:
464 if self.flows.__contains__(i):
465 pkts = self.create_stream(i, self.pg_if_packet_sizes,
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000466 traffic_type, ip_type, proto, ports,
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100467 frags, pkt_raw, etype)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100468 if len(pkts) > 0:
469 i.add_stream(pkts)
470 pkts_cnt += len(pkts)
471
472 # Enable packet capture and start packet sendingself.IPV
473 self.pg_enable_capture(self.pg_interfaces)
474 self.pg_start()
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +0200475 self.logger.info("sent packets count: %d" % pkts_cnt)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100476
477 # Verify
478 # Verify outgoing packet streams per packet-generator interface
479 for src_if in self.pg_interfaces:
480 if self.flows.__contains__(src_if):
481 for dst_if in self.flows[src_if]:
482 capture = dst_if.get_capture(pkts_cnt)
483 self.logger.info("Verifying capture on interface %s" %
484 dst_if.name)
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100485 self.verify_capture(dst_if, capture,
486 traffic_type, ip_type, etype)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100487
488 def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100489 ports=0, frags=False, etype=-1):
Pavel Kotucek59dda062017-03-02 15:22:47 +0100490 # Test
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +0200491 pkts_cnt = 0
Pavel Kotucek59dda062017-03-02 15:22:47 +0100492 self.reset_packet_infos()
493 for i in self.pg_interfaces:
494 if self.flows.__contains__(i):
495 pkts = self.create_stream(i, self.pg_if_packet_sizes,
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000496 traffic_type, ip_type, proto, ports,
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100497 frags, True, etype)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100498 if len(pkts) > 0:
499 i.add_stream(pkts)
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +0200500 pkts_cnt += len(pkts)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100501
502 # Enable packet capture and start packet sending
503 self.pg_enable_capture(self.pg_interfaces)
504 self.pg_start()
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +0200505 self.logger.info("sent packets count: %d" % pkts_cnt)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100506
507 # Verify
508 # Verify outgoing packet streams per packet-generator interface
509 for src_if in self.pg_interfaces:
510 if self.flows.__contains__(src_if):
511 for dst_if in self.flows[src_if]:
512 self.logger.info("Verifying capture on interface %s" %
513 dst_if.name)
514 capture = dst_if.get_capture(0)
515 self.assertEqual(len(capture), 0)
516
Pavel Kotucek59dda062017-03-02 15:22:47 +0100517 def test_0000_warmup_test(self):
518 """ ACL plugin version check; learn MACs
519 """
Pavel Kotucek59dda062017-03-02 15:22:47 +0100520 reply = self.vapi.papi.acl_plugin_get_version()
521 self.assertEqual(reply.major, 1)
522 self.logger.info("Working with ACL plugin version: %d.%d" % (
523 reply.major, reply.minor))
524 # minor version changes are non breaking
525 # self.assertEqual(reply.minor, 0)
526
527 def test_0001_acl_create(self):
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200528 """ ACL create/delete test
Pavel Kotucek59dda062017-03-02 15:22:47 +0100529 """
530
531 self.logger.info("ACLP_TEST_START_0001")
532 # Add an ACL
533 r = [{'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
534 'srcport_or_icmptype_first': 1234,
535 'srcport_or_icmptype_last': 1235,
536 'src_ip_prefix_len': 0,
537 'src_ip_addr': '\x00\x00\x00\x00',
538 'dstport_or_icmpcode_first': 1234,
539 'dstport_or_icmpcode_last': 1234,
540 'dst_ip_addr': '\x00\x00\x00\x00',
541 'dst_ip_prefix_len': 0}]
542 # Test 1: add a new ACL
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200543 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r,
544 tag="permit 1234")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100545 self.assertEqual(reply.retval, 0)
546 # The very first ACL gets #0
547 self.assertEqual(reply.acl_index, 0)
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200548 first_acl = reply.acl_index
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200549 rr = self.vapi.acl_dump(reply.acl_index)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100550 self.logger.info("Dumped ACL: " + str(rr))
551 self.assertEqual(len(rr), 1)
552 # We should have the same number of ACL entries as we had asked
553 self.assertEqual(len(rr[0].r), len(r))
554 # The rules should be the same. But because the submitted and returned
555 # are different types, we need to iterate over rules and keys to get
556 # to basic values.
557 for i_rule in range(0, len(r) - 1):
558 for rule_key in r[i_rule]:
559 self.assertEqual(rr[0].r[i_rule][rule_key],
560 r[i_rule][rule_key])
561
562 # Add a deny-1234 ACL
Ole Troan895b6e82017-10-20 13:28:20 +0200563 r_deny = [{'is_permit': 0, 'is_ipv6': 0, 'proto': 17,
Pavel Kotucek59dda062017-03-02 15:22:47 +0100564 'srcport_or_icmptype_first': 1234,
565 'srcport_or_icmptype_last': 1235,
566 'src_ip_prefix_len': 0,
567 'src_ip_addr': '\x00\x00\x00\x00',
568 'dstport_or_icmpcode_first': 1234,
569 'dstport_or_icmpcode_last': 1234,
570 'dst_ip_addr': '\x00\x00\x00\x00',
571 'dst_ip_prefix_len': 0},
572 {'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
573 'srcport_or_icmptype_first': 0,
574 'srcport_or_icmptype_last': 0,
575 'src_ip_prefix_len': 0,
576 'src_ip_addr': '\x00\x00\x00\x00',
577 'dstport_or_icmpcode_first': 0,
578 'dstport_or_icmpcode_last': 0,
579 'dst_ip_addr': '\x00\x00\x00\x00',
Ole Troan895b6e82017-10-20 13:28:20 +0200580 'dst_ip_prefix_len': 0}]
Pavel Kotucek59dda062017-03-02 15:22:47 +0100581
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200582 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_deny,
583 tag="deny 1234;permit all")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100584 self.assertEqual(reply.retval, 0)
585 # The second ACL gets #1
586 self.assertEqual(reply.acl_index, 1)
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200587 second_acl = reply.acl_index
Pavel Kotucek59dda062017-03-02 15:22:47 +0100588
589 # Test 2: try to modify a nonexistent ACL
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200590 reply = self.vapi.acl_add_replace(acl_index=432, r=r,
Jon Loeliger27cadd22017-11-10 13:15:12 -0600591 tag="FFFF:FFFF", expected_retval=-6)
592 self.assertEqual(reply.retval, -6)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100593 # The ACL number should pass through
594 self.assertEqual(reply.acl_index, 432)
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200595 # apply an ACL on an interface inbound, try to delete ACL, must fail
596 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
597 n_input=1,
598 acls=[first_acl])
Jon Loeliger27cadd22017-11-10 13:15:12 -0600599 reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=-142)
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200600 # Unapply an ACL and then try to delete it - must be ok
601 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
602 n_input=0,
603 acls=[])
604 reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=0)
605
606 # apply an ACL on an interface outbound, try to delete ACL, must fail
607 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
608 n_input=0,
609 acls=[second_acl])
Jon Loeliger27cadd22017-11-10 13:15:12 -0600610 reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=-143)
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200611 # Unapply the ACL and then try to delete it - must be ok
612 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
613 n_input=0,
614 acls=[])
615 reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=0)
616
617 # try to apply a nonexistent ACL - must fail
618 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
619 n_input=1,
620 acls=[first_acl],
Jon Loeliger27cadd22017-11-10 13:15:12 -0600621 expected_retval=-6)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100622
623 self.logger.info("ACLP_TEST_FINISH_0001")
624
625 def test_0002_acl_permit_apply(self):
626 """ permit ACL apply test
627 """
628 self.logger.info("ACLP_TEST_START_0002")
629
630 rules = []
631 rules.append(self.create_rule(self.IPV4, self.PERMIT,
632 0, self.proto[self.IP][self.UDP]))
633 rules.append(self.create_rule(self.IPV4, self.PERMIT,
634 0, self.proto[self.IP][self.TCP]))
635
636 # Apply rules
637 self.apply_rules(rules, "permit per-flow")
638
639 # Traffic should still pass
640 self.run_verify_test(self.IP, self.IPV4, -1)
641 self.logger.info("ACLP_TEST_FINISH_0002")
642
643 def test_0003_acl_deny_apply(self):
644 """ deny ACL apply test
645 """
646 self.logger.info("ACLP_TEST_START_0003")
647 # Add a deny-flows ACL
648 rules = []
649 rules.append(self.create_rule(self.IPV4, self.DENY,
650 self.PORTS_ALL, self.proto[self.IP][self.UDP]))
651 # Permit ip any any in the end
652 rules.append(self.create_rule(self.IPV4, self.PERMIT,
653 self.PORTS_ALL, 0))
654
655 # Apply rules
656 self.apply_rules(rules, "deny per-flow;permit all")
657
658 # Traffic should not pass
659 self.run_verify_negat_test(self.IP, self.IPV4,
660 self.proto[self.IP][self.UDP])
661 self.logger.info("ACLP_TEST_FINISH_0003")
662 # self.assertEqual(1, 0)
663
664 def test_0004_vpp624_permit_icmpv4(self):
665 """ VPP_624 permit ICMPv4
666 """
667 self.logger.info("ACLP_TEST_START_0004")
668
669 # Add an ACL
670 rules = []
671 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
672 self.proto[self.ICMP][self.ICMPv4]))
673 # deny ip any any in the end
674 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
675
676 # Apply rules
677 self.apply_rules(rules, "permit icmpv4")
678
679 # Traffic should still pass
680 self.run_verify_test(self.ICMP, self.IPV4,
681 self.proto[self.ICMP][self.ICMPv4])
682
683 self.logger.info("ACLP_TEST_FINISH_0004")
684
685 def test_0005_vpp624_permit_icmpv6(self):
686 """ VPP_624 permit ICMPv6
687 """
688 self.logger.info("ACLP_TEST_START_0005")
689
690 # Add an ACL
691 rules = []
692 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
693 self.proto[self.ICMP][self.ICMPv6]))
694 # deny ip any any in the end
695 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
696
697 # Apply rules
698 self.apply_rules(rules, "permit icmpv6")
699
700 # Traffic should still pass
701 self.run_verify_test(self.ICMP, self.IPV6,
702 self.proto[self.ICMP][self.ICMPv6])
703
704 self.logger.info("ACLP_TEST_FINISH_0005")
705
706 def test_0006_vpp624_deny_icmpv4(self):
707 """ VPP_624 deny ICMPv4
708 """
709 self.logger.info("ACLP_TEST_START_0006")
710 # Add an ACL
711 rules = []
712 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
713 self.proto[self.ICMP][self.ICMPv4]))
714 # permit ip any any in the end
715 rules.append(self.create_rule(self.IPV4, self.PERMIT,
716 self.PORTS_ALL, 0))
717
718 # Apply rules
719 self.apply_rules(rules, "deny icmpv4")
720
721 # Traffic should not pass
722 self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
723
724 self.logger.info("ACLP_TEST_FINISH_0006")
725
726 def test_0007_vpp624_deny_icmpv6(self):
727 """ VPP_624 deny ICMPv6
728 """
729 self.logger.info("ACLP_TEST_START_0007")
730 # Add an ACL
731 rules = []
732 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
733 self.proto[self.ICMP][self.ICMPv6]))
734 # deny ip any any in the end
735 rules.append(self.create_rule(self.IPV6, self.PERMIT,
736 self.PORTS_ALL, 0))
737
738 # Apply rules
739 self.apply_rules(rules, "deny icmpv6")
740
741 # Traffic should not pass
742 self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
743
744 self.logger.info("ACLP_TEST_FINISH_0007")
745
746 def test_0008_tcp_permit_v4(self):
747 """ permit TCPv4
748 """
749 self.logger.info("ACLP_TEST_START_0008")
750
751 # Add an ACL
752 rules = []
753 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
754 self.proto[self.IP][self.TCP]))
755 # deny ip any any in the end
756 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
757
758 # Apply rules
759 self.apply_rules(rules, "permit ipv4 tcp")
760
761 # Traffic should still pass
762 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
763
764 self.logger.info("ACLP_TEST_FINISH_0008")
765
766 def test_0009_tcp_permit_v6(self):
767 """ permit TCPv6
768 """
769 self.logger.info("ACLP_TEST_START_0009")
770
771 # Add an ACL
772 rules = []
773 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
774 self.proto[self.IP][self.TCP]))
775 # deny ip any any in the end
776 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
777
778 # Apply rules
779 self.apply_rules(rules, "permit ip6 tcp")
780
781 # Traffic should still pass
782 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
783
784 self.logger.info("ACLP_TEST_FINISH_0008")
785
786 def test_0010_udp_permit_v4(self):
787 """ permit UDPv4
788 """
789 self.logger.info("ACLP_TEST_START_0010")
790
791 # Add an ACL
792 rules = []
793 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
794 self.proto[self.IP][self.UDP]))
795 # deny ip any any in the end
796 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
797
798 # Apply rules
799 self.apply_rules(rules, "permit ipv udp")
800
801 # Traffic should still pass
802 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
803
804 self.logger.info("ACLP_TEST_FINISH_0010")
805
806 def test_0011_udp_permit_v6(self):
807 """ permit UDPv6
808 """
809 self.logger.info("ACLP_TEST_START_0011")
810
811 # Add an ACL
812 rules = []
813 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
814 self.proto[self.IP][self.UDP]))
815 # deny ip any any in the end
816 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
817
818 # Apply rules
819 self.apply_rules(rules, "permit ip6 udp")
820
821 # Traffic should still pass
822 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
823
824 self.logger.info("ACLP_TEST_FINISH_0011")
825
826 def test_0012_tcp_deny(self):
827 """ deny TCPv4/v6
828 """
829 self.logger.info("ACLP_TEST_START_0012")
830
831 # Add an ACL
832 rules = []
833 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
834 self.proto[self.IP][self.TCP]))
835 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
836 self.proto[self.IP][self.TCP]))
837 # permit ip any any in the end
838 rules.append(self.create_rule(self.IPV4, self.PERMIT,
839 self.PORTS_ALL, 0))
840 rules.append(self.create_rule(self.IPV6, self.PERMIT,
841 self.PORTS_ALL, 0))
842
843 # Apply rules
844 self.apply_rules(rules, "deny ip4/ip6 tcp")
845
846 # Traffic should not pass
847 self.run_verify_negat_test(self.IP, self.IPRANDOM,
848 self.proto[self.IP][self.TCP])
849
850 self.logger.info("ACLP_TEST_FINISH_0012")
851
852 def test_0013_udp_deny(self):
853 """ deny UDPv4/v6
854 """
855 self.logger.info("ACLP_TEST_START_0013")
856
857 # Add an ACL
858 rules = []
859 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
860 self.proto[self.IP][self.UDP]))
861 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
862 self.proto[self.IP][self.UDP]))
863 # permit ip any any in the end
864 rules.append(self.create_rule(self.IPV4, self.PERMIT,
865 self.PORTS_ALL, 0))
866 rules.append(self.create_rule(self.IPV6, self.PERMIT,
867 self.PORTS_ALL, 0))
868
869 # Apply rules
870 self.apply_rules(rules, "deny ip4/ip6 udp")
871
872 # Traffic should not pass
873 self.run_verify_negat_test(self.IP, self.IPRANDOM,
874 self.proto[self.IP][self.UDP])
875
876 self.logger.info("ACLP_TEST_FINISH_0013")
877
878 def test_0014_acl_dump(self):
879 """ verify add/dump acls
880 """
881 self.logger.info("ACLP_TEST_START_0014")
882
883 r = [[self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
884 [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
885 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
886 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
887 [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
888 [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
889 [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
890 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
891 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
892 [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
893 [self.IPV4, self.DENY, self.PORTS_ALL, 0],
894 [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
895 [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
896 [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
897 [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
898 [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
899 [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
900 [self.IPV6, self.DENY, self.PORTS_ALL, 0]
901 ]
902
903 # Add and verify new ACLs
904 rules = []
905 for i in range(len(r)):
906 rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
907
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200908 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules)
909 result = self.vapi.acl_dump(reply.acl_index)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100910
911 i = 0
912 for drules in result:
913 for dr in drules.r:
914 self.assertEqual(dr.is_ipv6, r[i][0])
915 self.assertEqual(dr.is_permit, r[i][1])
916 self.assertEqual(dr.proto, r[i][3])
917
918 if r[i][2] > 0:
919 self.assertEqual(dr.srcport_or_icmptype_first, r[i][2])
920 else:
921 if r[i][2] < 0:
922 self.assertEqual(dr.srcport_or_icmptype_first, 0)
923 self.assertEqual(dr.srcport_or_icmptype_last, 65535)
924 else:
925 if dr.proto == self.proto[self.IP][self.TCP]:
926 self.assertGreater(dr.srcport_or_icmptype_first,
927 self.tcp_sport_from-1)
928 self.assertLess(dr.srcport_or_icmptype_first,
929 self.tcp_sport_to+1)
930 self.assertGreater(dr.dstport_or_icmpcode_last,
931 self.tcp_dport_from-1)
932 self.assertLess(dr.dstport_or_icmpcode_last,
933 self.tcp_dport_to+1)
934 elif dr.proto == self.proto[self.IP][self.UDP]:
935 self.assertGreater(dr.srcport_or_icmptype_first,
936 self.udp_sport_from-1)
937 self.assertLess(dr.srcport_or_icmptype_first,
938 self.udp_sport_to+1)
939 self.assertGreater(dr.dstport_or_icmpcode_last,
940 self.udp_dport_from-1)
941 self.assertLess(dr.dstport_or_icmpcode_last,
942 self.udp_dport_to+1)
943 i += 1
944
945 self.logger.info("ACLP_TEST_FINISH_0014")
946
947 def test_0015_tcp_permit_port_v4(self):
948 """ permit single TCPv4
949 """
950 self.logger.info("ACLP_TEST_START_0015")
951
952 port = random.randint(0, 65535)
953 # Add an ACL
954 rules = []
955 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
956 self.proto[self.IP][self.TCP]))
957 # deny ip any any in the end
958 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
959
960 # Apply rules
961 self.apply_rules(rules, "permit ip4 tcp "+str(port))
962
963 # Traffic should still pass
964 self.run_verify_test(self.IP, self.IPV4,
965 self.proto[self.IP][self.TCP], port)
966
967 self.logger.info("ACLP_TEST_FINISH_0015")
968
969 def test_0016_udp_permit_port_v4(self):
970 """ permit single UDPv4
971 """
972 self.logger.info("ACLP_TEST_START_0016")
973
974 port = random.randint(0, 65535)
975 # Add an ACL
976 rules = []
977 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
978 self.proto[self.IP][self.UDP]))
979 # deny ip any any in the end
980 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
981
982 # Apply rules
983 self.apply_rules(rules, "permit ip4 tcp "+str(port))
984
985 # Traffic should still pass
986 self.run_verify_test(self.IP, self.IPV4,
987 self.proto[self.IP][self.UDP], port)
988
989 self.logger.info("ACLP_TEST_FINISH_0016")
990
991 def test_0017_tcp_permit_port_v6(self):
992 """ permit single TCPv6
993 """
994 self.logger.info("ACLP_TEST_START_0017")
995
996 port = random.randint(0, 65535)
997 # Add an ACL
998 rules = []
999 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1000 self.proto[self.IP][self.TCP]))
1001 # deny ip any any in the end
1002 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1003
1004 # Apply rules
1005 self.apply_rules(rules, "permit ip4 tcp "+str(port))
1006
1007 # Traffic should still pass
1008 self.run_verify_test(self.IP, self.IPV6,
1009 self.proto[self.IP][self.TCP], port)
1010
1011 self.logger.info("ACLP_TEST_FINISH_0017")
1012
1013 def test_0018_udp_permit_port_v6(self):
1014 """ permit single UPPv6
1015 """
1016 self.logger.info("ACLP_TEST_START_0018")
1017
1018 port = random.randint(0, 65535)
1019 # Add an ACL
1020 rules = []
1021 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1022 self.proto[self.IP][self.UDP]))
1023 # deny ip any any in the end
1024 rules.append(self.create_rule(self.IPV6, self.DENY,
1025 self.PORTS_ALL, 0))
1026
1027 # Apply rules
1028 self.apply_rules(rules, "permit ip4 tcp "+str(port))
1029
1030 # Traffic should still pass
1031 self.run_verify_test(self.IP, self.IPV6,
1032 self.proto[self.IP][self.UDP], port)
1033
1034 self.logger.info("ACLP_TEST_FINISH_0018")
1035
1036 def test_0019_udp_deny_port(self):
1037 """ deny single TCPv4/v6
1038 """
1039 self.logger.info("ACLP_TEST_START_0019")
1040
1041 port = random.randint(0, 65535)
1042 # Add an ACL
1043 rules = []
1044 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1045 self.proto[self.IP][self.TCP]))
1046 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1047 self.proto[self.IP][self.TCP]))
1048 # Permit ip any any in the end
1049 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1050 self.PORTS_ALL, 0))
1051 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1052 self.PORTS_ALL, 0))
1053
1054 # Apply rules
1055 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1056
1057 # Traffic should not pass
1058 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1059 self.proto[self.IP][self.TCP], port)
1060
1061 self.logger.info("ACLP_TEST_FINISH_0019")
1062
1063 def test_0020_udp_deny_port(self):
1064 """ deny single UDPv4/v6
1065 """
1066 self.logger.info("ACLP_TEST_START_0020")
1067
1068 port = random.randint(0, 65535)
1069 # Add an ACL
1070 rules = []
1071 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1072 self.proto[self.IP][self.UDP]))
1073 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1074 self.proto[self.IP][self.UDP]))
1075 # Permit ip any any in the end
1076 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1077 self.PORTS_ALL, 0))
1078 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1079 self.PORTS_ALL, 0))
1080
1081 # Apply rules
1082 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1083
1084 # Traffic should not pass
1085 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1086 self.proto[self.IP][self.UDP], port)
1087
1088 self.logger.info("ACLP_TEST_FINISH_0020")
1089
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +00001090 def test_0021_udp_deny_port_verify_fragment_deny(self):
Chris Luked0421942018-04-10 15:19:54 -04001091 """ deny single UDPv4/v6, permit ip any, verify non-initial fragment
1092 blocked
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +00001093 """
1094 self.logger.info("ACLP_TEST_START_0021")
1095
1096 port = random.randint(0, 65535)
1097 # Add an ACL
1098 rules = []
1099 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1100 self.proto[self.IP][self.UDP]))
1101 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1102 self.proto[self.IP][self.UDP]))
1103 # deny ip any any in the end
1104 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1105 self.PORTS_ALL, 0))
1106 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1107 self.PORTS_ALL, 0))
1108
1109 # Apply rules
1110 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1111
1112 # Traffic should not pass
1113 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1114 self.proto[self.IP][self.UDP], port, True)
1115
1116 self.logger.info("ACLP_TEST_FINISH_0021")
1117
Pavel Kotuceke7b67342017-04-18 13:12:20 +02001118 def test_0022_zero_length_udp_ipv4(self):
1119 """ VPP-687 zero length udp ipv4 packet"""
1120 self.logger.info("ACLP_TEST_START_0022")
1121
1122 port = random.randint(0, 65535)
1123 # Add an ACL
1124 rules = []
1125 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
1126 self.proto[self.IP][self.UDP]))
1127 # deny ip any any in the end
1128 rules.append(
1129 self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1130
1131 # Apply rules
1132 self.apply_rules(rules, "permit empty udp ip4 " + str(port))
1133
1134 # Traffic should still pass
1135 # Create incoming packet streams for packet-generator interfaces
1136 pkts_cnt = 0
1137 pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1138 self.IP, self.IPV4,
1139 self.proto[self.IP][self.UDP], port,
1140 False, False)
1141 if len(pkts) > 0:
1142 self.pg0.add_stream(pkts)
1143 pkts_cnt += len(pkts)
1144
1145 # Enable packet capture and start packet sendingself.IPV
1146 self.pg_enable_capture(self.pg_interfaces)
1147 self.pg_start()
1148
1149 self.pg1.get_capture(pkts_cnt)
1150
1151 self.logger.info("ACLP_TEST_FINISH_0022")
1152
1153 def test_0023_zero_length_udp_ipv6(self):
1154 """ VPP-687 zero length udp ipv6 packet"""
1155 self.logger.info("ACLP_TEST_START_0023")
1156
1157 port = random.randint(0, 65535)
1158 # Add an ACL
1159 rules = []
1160 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1161 self.proto[self.IP][self.UDP]))
1162 # deny ip any any in the end
1163 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1164
1165 # Apply rules
1166 self.apply_rules(rules, "permit empty udp ip6 "+str(port))
1167
1168 # Traffic should still pass
1169 # Create incoming packet streams for packet-generator interfaces
1170 pkts_cnt = 0
1171 pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1172 self.IP, self.IPV6,
1173 self.proto[self.IP][self.UDP], port,
1174 False, False)
1175 if len(pkts) > 0:
1176 self.pg0.add_stream(pkts)
1177 pkts_cnt += len(pkts)
1178
1179 # Enable packet capture and start packet sendingself.IPV
1180 self.pg_enable_capture(self.pg_interfaces)
1181 self.pg_start()
1182
1183 # Verify outgoing packet streams per packet-generator interface
1184 self.pg1.get_capture(pkts_cnt)
1185
1186 self.logger.info("ACLP_TEST_FINISH_0023")
1187
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +02001188 def test_0108_tcp_permit_v4(self):
1189 """ permit TCPv4 + non-match range
1190 """
1191 self.logger.info("ACLP_TEST_START_0108")
1192
1193 # Add an ACL
1194 rules = []
1195 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1196 self.proto[self.IP][self.TCP]))
1197 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1198 self.proto[self.IP][self.TCP]))
1199 # deny ip any any in the end
1200 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1201
1202 # Apply rules
1203 self.apply_rules(rules, "permit ipv4 tcp")
1204
1205 # Traffic should still pass
1206 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1207
1208 self.logger.info("ACLP_TEST_FINISH_0108")
1209
1210 def test_0109_tcp_permit_v6(self):
1211 """ permit TCPv6 + non-match range
1212 """
1213 self.logger.info("ACLP_TEST_START_0109")
1214
1215 # Add an ACL
1216 rules = []
1217 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1218 self.proto[self.IP][self.TCP]))
1219 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1220 self.proto[self.IP][self.TCP]))
1221 # deny ip any any in the end
1222 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1223
1224 # Apply rules
1225 self.apply_rules(rules, "permit ip6 tcp")
1226
1227 # Traffic should still pass
1228 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
1229
1230 self.logger.info("ACLP_TEST_FINISH_0109")
1231
1232 def test_0110_udp_permit_v4(self):
1233 """ permit UDPv4 + non-match range
1234 """
1235 self.logger.info("ACLP_TEST_START_0110")
1236
1237 # Add an ACL
1238 rules = []
1239 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1240 self.proto[self.IP][self.UDP]))
1241 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1242 self.proto[self.IP][self.UDP]))
1243 # deny ip any any in the end
1244 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1245
1246 # Apply rules
1247 self.apply_rules(rules, "permit ipv4 udp")
1248
1249 # Traffic should still pass
1250 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
1251
1252 self.logger.info("ACLP_TEST_FINISH_0110")
1253
1254 def test_0111_udp_permit_v6(self):
1255 """ permit UDPv6 + non-match range
1256 """
1257 self.logger.info("ACLP_TEST_START_0111")
1258
1259 # Add an ACL
1260 rules = []
1261 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1262 self.proto[self.IP][self.UDP]))
1263 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1264 self.proto[self.IP][self.UDP]))
1265 # deny ip any any in the end
1266 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1267
1268 # Apply rules
1269 self.apply_rules(rules, "permit ip6 udp")
1270
1271 # Traffic should still pass
1272 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
1273
1274 self.logger.info("ACLP_TEST_FINISH_0111")
1275
1276 def test_0112_tcp_deny(self):
1277 """ deny TCPv4/v6 + non-match range
1278 """
1279 self.logger.info("ACLP_TEST_START_0112")
1280
1281 # Add an ACL
1282 rules = []
1283 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1284 self.PORTS_RANGE_2,
1285 self.proto[self.IP][self.TCP]))
1286 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1287 self.PORTS_RANGE_2,
1288 self.proto[self.IP][self.TCP]))
1289 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1290 self.proto[self.IP][self.TCP]))
1291 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1292 self.proto[self.IP][self.TCP]))
1293 # permit ip any any in the end
1294 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1295 self.PORTS_ALL, 0))
1296 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1297 self.PORTS_ALL, 0))
1298
1299 # Apply rules
1300 self.apply_rules(rules, "deny ip4/ip6 tcp")
1301
1302 # Traffic should not pass
1303 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1304 self.proto[self.IP][self.TCP])
1305
1306 self.logger.info("ACLP_TEST_FINISH_0112")
1307
1308 def test_0113_udp_deny(self):
1309 """ deny UDPv4/v6 + non-match range
1310 """
1311 self.logger.info("ACLP_TEST_START_0113")
1312
1313 # Add an ACL
1314 rules = []
1315 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1316 self.PORTS_RANGE_2,
1317 self.proto[self.IP][self.UDP]))
1318 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1319 self.PORTS_RANGE_2,
1320 self.proto[self.IP][self.UDP]))
1321 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1322 self.proto[self.IP][self.UDP]))
1323 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1324 self.proto[self.IP][self.UDP]))
1325 # permit ip any any in the end
1326 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1327 self.PORTS_ALL, 0))
1328 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1329 self.PORTS_ALL, 0))
1330
1331 # Apply rules
1332 self.apply_rules(rules, "deny ip4/ip6 udp")
1333
1334 # Traffic should not pass
1335 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1336 self.proto[self.IP][self.UDP])
1337
1338 self.logger.info("ACLP_TEST_FINISH_0113")
1339
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001340 def test_0300_tcp_permit_v4_etype_aaaa(self):
1341 """ permit TCPv4, send 0xAAAA etype
1342 """
1343 self.logger.info("ACLP_TEST_START_0300")
1344
1345 # Add an ACL
1346 rules = []
1347 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1348 self.proto[self.IP][self.TCP]))
1349 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1350 self.proto[self.IP][self.TCP]))
1351 # deny ip any any in the end
1352 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1353
1354 # Apply rules
1355 self.apply_rules(rules, "permit ipv4 tcp")
1356
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001357 # Traffic should still pass also for an odd ethertype
1358 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1359 0, False, True, 0xaaaa)
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001360 self.logger.info("ACLP_TEST_FINISH_0300")
1361
1362 def test_0305_tcp_permit_v4_etype_blacklist_aaaa(self):
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +02001363 """ permit TCPv4, whitelist 0x0BBB ethertype, send 0xAAAA-blocked
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001364 """
1365 self.logger.info("ACLP_TEST_START_0305")
1366
1367 # Add an ACL
1368 rules = []
1369 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1370 self.proto[self.IP][self.TCP]))
1371 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1372 self.proto[self.IP][self.TCP]))
1373 # deny ip any any in the end
1374 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1375
1376 # Apply rules
1377 self.apply_rules(rules, "permit ipv4 tcp")
1378
1379 # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1380 self.etype_whitelist([0xbbb], 1)
1381
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001382 # The oddball ethertype should be blocked
1383 self.run_verify_negat_test(self.IP, self.IPV4,
1384 self.proto[self.IP][self.TCP],
1385 0, False, 0xaaaa)
1386
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +02001387 # remove the whitelist
1388 self.etype_whitelist([], 0)
1389
1390 self.logger.info("ACLP_TEST_FINISH_0305")
1391
1392 def test_0306_tcp_permit_v4_etype_blacklist_aaaa(self):
1393 """ permit TCPv4, whitelist 0x0BBB ethertype, send 0x0BBB - pass
1394 """
1395 self.logger.info("ACLP_TEST_START_0306")
1396
1397 # Add an ACL
1398 rules = []
1399 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1400 self.proto[self.IP][self.TCP]))
1401 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1402 self.proto[self.IP][self.TCP]))
1403 # deny ip any any in the end
1404 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1405
1406 # Apply rules
1407 self.apply_rules(rules, "permit ipv4 tcp")
1408
1409 # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1410 self.etype_whitelist([0xbbb], 1)
1411
1412 # The whitelisted traffic, should pass
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001413 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1414 0, False, True, 0x0bbb)
1415
1416 # remove the whitelist, the previously blocked 0xAAAA should pass now
1417 self.etype_whitelist([], 0)
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +02001418
1419 self.logger.info("ACLP_TEST_FINISH_0306")
1420
1421 def test_0307_tcp_permit_v4_etype_blacklist_aaaa(self):
1422 """ permit TCPv4, whitelist 0x0BBB, remove, send 0xAAAA - pass
1423 """
1424 self.logger.info("ACLP_TEST_START_0307")
1425
1426 # Add an ACL
1427 rules = []
1428 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1429 self.proto[self.IP][self.TCP]))
1430 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1431 self.proto[self.IP][self.TCP]))
1432 # deny ip any any in the end
1433 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1434
1435 # Apply rules
1436 self.apply_rules(rules, "permit ipv4 tcp")
1437
1438 # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1439 self.etype_whitelist([0xbbb], 1)
1440 # remove the whitelist, the previously blocked 0xAAAA should pass now
1441 self.etype_whitelist([], 0)
1442
1443 # The whitelisted traffic, should pass
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001444 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1445 0, False, True, 0xaaaa)
1446
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +02001447 self.logger.info("ACLP_TEST_FINISH_0306")
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +02001448
Andrew Yourtchenkode3682f2018-03-23 10:38:46 +01001449 def test_0315_del_intf(self):
1450 """ apply an acl and delete the interface
1451 """
1452 self.logger.info("ACLP_TEST_START_0315")
1453
1454 # Add an ACL
1455 rules = []
1456 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1457 self.proto[self.IP][self.TCP]))
1458 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1459 self.proto[self.IP][self.TCP]))
1460 # deny ip any any in the end
1461 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1462
1463 # create an interface
1464 intf = []
Klement Sekerabeaded52018-06-24 10:30:37 +02001465 intf.append(VppLoInterface(self))
Andrew Yourtchenkode3682f2018-03-23 10:38:46 +01001466
1467 # Apply rules
1468 self.apply_rules_to(rules, "permit ipv4 tcp", intf[0].sw_if_index)
1469
1470 # Remove the interface
1471 intf[0].remove_vpp_config()
1472
1473 self.logger.info("ACLP_TEST_FINISH_0315")
1474
Pavel Kotucek59dda062017-03-02 15:22:47 +01001475if __name__ == '__main__':
1476 unittest.main(testRunner=VppTestRunner)