blob: 5ccfa39e5253540212ae0ac6fc10851fa4d04523 [file] [log] [blame]
Pavel Kotucek59dda062017-03-02 15:22:47 +01001#!/usr/bin/env python
2"""ACL plugin Test Case HLD:
3"""
4
5import unittest
6import random
7
8from scapy.packet import Raw
9from scapy.layers.l2 import Ether
10from scapy.layers.inet import IP, TCP, UDP, ICMP
11from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +000012from scapy.layers.inet6 import IPv6ExtHdrFragment
Pavel Kotucek59dda062017-03-02 15:22:47 +010013from framework import VppTestCase, VppTestRunner
14from util import Host, ppp
15
Andrew Yourtchenkode3682f2018-03-23 10:38:46 +010016from vpp_lo_interface import VppLoInterface
17
Pavel Kotucek59dda062017-03-02 15:22:47 +010018
19class TestACLplugin(VppTestCase):
20 """ ACL plugin Test Case """
21
22 # traffic types
23 IP = 0
24 ICMP = 1
25
26 # IP version
27 IPRANDOM = -1
28 IPV4 = 0
29 IPV6 = 1
30
31 # rule types
32 DENY = 0
33 PERMIT = 1
34
35 # supported protocols
36 proto = [[6, 17], [1, 58]]
37 proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
38 ICMPv4 = 0
39 ICMPv6 = 1
40 TCP = 0
41 UDP = 1
42 PROTO_ALL = 0
43
44 # port ranges
45 PORTS_ALL = -1
46 PORTS_RANGE = 0
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +020047 PORTS_RANGE_2 = 1
Pavel Kotucek59dda062017-03-02 15:22:47 +010048 udp_sport_from = 10
49 udp_sport_to = udp_sport_from + 5
50 udp_dport_from = 20000
51 udp_dport_to = udp_dport_from + 5000
52 tcp_sport_from = 30
53 tcp_sport_to = tcp_sport_from + 5
54 tcp_dport_from = 40000
55 tcp_dport_to = tcp_dport_from + 5000
56
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +020057 udp_sport_from_2 = 90
58 udp_sport_to_2 = udp_sport_from_2 + 5
59 udp_dport_from_2 = 30000
60 udp_dport_to_2 = udp_dport_from_2 + 5000
61 tcp_sport_from_2 = 130
62 tcp_sport_to_2 = tcp_sport_from_2 + 5
63 tcp_dport_from_2 = 20000
64 tcp_dport_to_2 = tcp_dport_from_2 + 5000
65
Pavel Kotucek59dda062017-03-02 15:22:47 +010066 icmp4_type = 8 # echo request
67 icmp4_code = 3
68 icmp6_type = 128 # echo request
69 icmp6_code = 3
70
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +020071 icmp4_type_2 = 8
72 icmp4_code_from_2 = 5
73 icmp4_code_to_2 = 20
74 icmp6_type_2 = 128
75 icmp6_code_from_2 = 8
76 icmp6_code_to_2 = 42
77
Pavel Kotucek59dda062017-03-02 15:22:47 +010078 # Test variables
79 bd_id = 1
80
81 @classmethod
82 def setUpClass(cls):
83 """
84 Perform standard class setup (defined by class method setUpClass in
85 class VppTestCase) before running the test case, set test case related
86 variables and configure VPP.
87 """
88 super(TestACLplugin, cls).setUpClass()
89
Pavel Kotucek59dda062017-03-02 15:22:47 +010090 try:
91 # Create 2 pg interfaces
92 cls.create_pg_interfaces(range(2))
93
94 # Packet flows mapping pg0 -> pg1, pg2 etc.
95 cls.flows = dict()
96 cls.flows[cls.pg0] = [cls.pg1]
97
98 # Packet sizes
99 cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
100
101 # Create BD with MAC learning and unknown unicast flooding disabled
102 # and put interfaces to this BD
103 cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
104 learn=1)
105 for pg_if in cls.pg_interfaces:
106 cls.vapi.sw_interface_set_l2_bridge(pg_if.sw_if_index,
107 bd_id=cls.bd_id)
108
109 # Set up all interfaces
110 for i in cls.pg_interfaces:
111 i.admin_up()
112
113 # Mapping between packet-generator index and lists of test hosts
114 cls.hosts_by_pg_idx = dict()
115 for pg_if in cls.pg_interfaces:
116 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
117
118 # Create list of deleted hosts
119 cls.deleted_hosts_by_pg_idx = dict()
120 for pg_if in cls.pg_interfaces:
121 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
122
123 # warm-up the mac address tables
124 # self.warmup_test()
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +0200125 count = 16
126 start = 0
127 n_int = len(cls.pg_interfaces)
128 macs_per_if = count / n_int
129 i = -1
130 for pg_if in cls.pg_interfaces:
131 i += 1
132 start_nr = macs_per_if * i + start
133 end_nr = count + start if i == (n_int - 1) \
134 else macs_per_if * (i + 1) + start
135 hosts = cls.hosts_by_pg_idx[pg_if.sw_if_index]
136 for j in range(start_nr, end_nr):
137 host = Host(
138 "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
139 "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
140 "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
141 hosts.append(host)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100142
143 except Exception:
144 super(TestACLplugin, cls).tearDownClass()
145 raise
146
147 def setUp(self):
148 super(TestACLplugin, self).setUp()
149 self.reset_packet_infos()
150
151 def tearDown(self):
152 """
153 Show various debug prints after each test.
154 """
155 super(TestACLplugin, self).tearDown()
156 if not self.vpp_dead:
Andrew Yourtchenkoc1f87942018-10-06 09:26:02 +0200157 cli = "show vlib graph l2-input-feat-arc"
158 self.logger.info(self.vapi.ppcli(cli))
159 cli = "show vlib graph l2-input-feat-arc-end"
160 self.logger.info(self.vapi.ppcli(cli))
161 cli = "show vlib graph l2-output-feat-arc"
162 self.logger.info(self.vapi.ppcli(cli))
163 cli = "show vlib graph l2-output-feat-arc-end"
164 self.logger.info(self.vapi.ppcli(cli))
Pavel Kotucek59dda062017-03-02 15:22:47 +0100165 self.logger.info(self.vapi.ppcli("show l2fib verbose"))
Andrew Yourtchenko7f4d5772017-05-24 13:20:47 +0200166 self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
167 self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
168 self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
Pavel Kotucek59dda062017-03-02 15:22:47 +0100169 self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
170 % self.bd_id))
171
Pavel Kotucek59dda062017-03-02 15:22:47 +0100172 def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
173 s_prefix=0, s_ip='\x00\x00\x00\x00',
174 d_prefix=0, d_ip='\x00\x00\x00\x00'):
175 if proto == -1:
176 return
177 if ports == self.PORTS_ALL:
178 sport_from = 0
179 dport_from = 0
180 sport_to = 65535 if proto != 1 and proto != 58 else 255
181 dport_to = sport_to
182 elif ports == self.PORTS_RANGE:
183 if proto == 1:
184 sport_from = self.icmp4_type
185 sport_to = self.icmp4_type
186 dport_from = self.icmp4_code
187 dport_to = self.icmp4_code
188 elif proto == 58:
189 sport_from = self.icmp6_type
190 sport_to = self.icmp6_type
191 dport_from = self.icmp6_code
192 dport_to = self.icmp6_code
193 elif proto == self.proto[self.IP][self.TCP]:
194 sport_from = self.tcp_sport_from
195 sport_to = self.tcp_sport_to
196 dport_from = self.tcp_dport_from
197 dport_to = self.tcp_dport_to
198 elif proto == self.proto[self.IP][self.UDP]:
199 sport_from = self.udp_sport_from
200 sport_to = self.udp_sport_to
201 dport_from = self.udp_dport_from
202 dport_to = self.udp_dport_to
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +0200203 elif ports == self.PORTS_RANGE_2:
204 if proto == 1:
205 sport_from = self.icmp4_type_2
206 sport_to = self.icmp4_type_2
207 dport_from = self.icmp4_code_from_2
208 dport_to = self.icmp4_code_to_2
209 elif proto == 58:
210 sport_from = self.icmp6_type_2
211 sport_to = self.icmp6_type_2
212 dport_from = self.icmp6_code_from_2
213 dport_to = self.icmp6_code_to_2
214 elif proto == self.proto[self.IP][self.TCP]:
215 sport_from = self.tcp_sport_from_2
216 sport_to = self.tcp_sport_to_2
217 dport_from = self.tcp_dport_from_2
218 dport_to = self.tcp_dport_to_2
219 elif proto == self.proto[self.IP][self.UDP]:
220 sport_from = self.udp_sport_from_2
221 sport_to = self.udp_sport_to_2
222 dport_from = self.udp_dport_from_2
223 dport_to = self.udp_dport_to_2
Pavel Kotucek59dda062017-03-02 15:22:47 +0100224 else:
225 sport_from = ports
226 sport_to = ports
227 dport_from = ports
228 dport_to = ports
229
230 rule = ({'is_permit': permit_deny, 'is_ipv6': ip, 'proto': proto,
231 'srcport_or_icmptype_first': sport_from,
232 'srcport_or_icmptype_last': sport_to,
233 'src_ip_prefix_len': s_prefix,
234 'src_ip_addr': s_ip,
235 'dstport_or_icmpcode_first': dport_from,
236 'dstport_or_icmpcode_last': dport_to,
237 'dst_ip_prefix_len': d_prefix,
238 'dst_ip_addr': d_ip})
239 return rule
240
241 def apply_rules(self, rules, tag=''):
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200242 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
243 tag=tag)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100244 self.logger.info("Dumped ACL: " + str(
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200245 self.vapi.acl_dump(reply.acl_index)))
Pavel Kotucek59dda062017-03-02 15:22:47 +0100246 # Apply a ACL on the interface as inbound
247 for i in self.pg_interfaces:
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200248 self.vapi.acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
249 n_input=1,
250 acls=[reply.acl_index])
Pavel Kotucek59dda062017-03-02 15:22:47 +0100251 return
252
Andrew Yourtchenkode3682f2018-03-23 10:38:46 +0100253 def apply_rules_to(self, rules, tag='', sw_if_index=0xFFFFFFFF):
254 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
255 tag=tag)
256 self.logger.info("Dumped ACL: " + str(
257 self.vapi.acl_dump(reply.acl_index)))
258 # Apply a ACL on the interface as inbound
259 self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index,
260 n_input=1,
261 acls=[reply.acl_index])
262 return
263
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100264 def etype_whitelist(self, whitelist, n_input):
265 # Apply whitelists on all the interfaces
266 for i in self.pg_interfaces:
267 # checkstyle can't read long names. Help them.
268 fun = self.vapi.acl_interface_set_etype_whitelist
269 fun(sw_if_index=i.sw_if_index, n_input=n_input,
270 whitelist=whitelist)
271 return
272
Pavel Kotucek59dda062017-03-02 15:22:47 +0100273 def create_upper_layer(self, packet_index, proto, ports=0):
274 p = self.proto_map[proto]
275 if p == 'UDP':
276 if ports == 0:
277 return UDP(sport=random.randint(self.udp_sport_from,
278 self.udp_sport_to),
279 dport=random.randint(self.udp_dport_from,
280 self.udp_dport_to))
281 else:
282 return UDP(sport=ports, dport=ports)
283 elif p == 'TCP':
284 if ports == 0:
285 return TCP(sport=random.randint(self.tcp_sport_from,
286 self.tcp_sport_to),
287 dport=random.randint(self.tcp_dport_from,
288 self.tcp_dport_to))
289 else:
290 return TCP(sport=ports, dport=ports)
291 return ''
292
293 def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100294 proto=-1, ports=0, fragments=False,
295 pkt_raw=True, etype=-1):
Pavel Kotucek59dda062017-03-02 15:22:47 +0100296 """
297 Create input packet stream for defined interface using hosts or
298 deleted_hosts list.
299
300 :param object src_if: Interface to create packet stream for.
301 :param list packet_sizes: List of required packet sizes.
302 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
303 :return: Stream of packets.
304 """
305 pkts = []
306 if self.flows.__contains__(src_if):
307 src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
308 for dst_if in self.flows[src_if]:
309 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
310 n_int = len(dst_hosts) * len(src_hosts)
311 for i in range(0, n_int):
312 dst_host = dst_hosts[i / len(src_hosts)]
313 src_host = src_hosts[i % len(src_hosts)]
314 pkt_info = self.create_packet_info(src_if, dst_if)
315 if ipv6 == 1:
316 pkt_info.ip = 1
317 elif ipv6 == 0:
318 pkt_info.ip = 0
319 else:
320 pkt_info.ip = random.choice([0, 1])
321 if proto == -1:
322 pkt_info.proto = random.choice(self.proto[self.IP])
323 else:
324 pkt_info.proto = proto
325 payload = self.info_to_payload(pkt_info)
326 p = Ether(dst=dst_host.mac, src=src_host.mac)
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100327 if etype > 0:
328 p = Ether(dst=dst_host.mac,
329 src=src_host.mac,
330 type=etype)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100331 if pkt_info.ip:
332 p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000333 if fragments:
334 p /= IPv6ExtHdrFragment(offset=64, m=1)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100335 else:
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000336 if fragments:
337 p /= IP(src=src_host.ip4, dst=dst_host.ip4,
338 flags=1, frag=64)
339 else:
340 p /= IP(src=src_host.ip4, dst=dst_host.ip4)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100341 if traffic_type == self.ICMP:
342 if pkt_info.ip:
343 p /= ICMPv6EchoRequest(type=self.icmp6_type,
344 code=self.icmp6_code)
345 else:
346 p /= ICMP(type=self.icmp4_type,
347 code=self.icmp4_code)
348 else:
349 p /= self.create_upper_layer(i, pkt_info.proto, ports)
Pavel Kotuceke7b67342017-04-18 13:12:20 +0200350 if pkt_raw:
351 p /= Raw(payload)
352 pkt_info.data = p.copy()
353 if pkt_raw:
354 size = random.choice(packet_sizes)
355 self.extend_packet(p, size)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100356 pkts.append(p)
357 return pkts
358
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100359 def verify_capture(self, pg_if, capture,
360 traffic_type=0, ip_type=0, etype=-1):
Pavel Kotucek59dda062017-03-02 15:22:47 +0100361 """
362 Verify captured input packet stream for defined interface.
363
364 :param object pg_if: Interface to verify captured packet stream for.
365 :param list capture: Captured packet stream.
366 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
367 """
368 last_info = dict()
369 for i in self.pg_interfaces:
370 last_info[i.sw_if_index] = None
371 dst_sw_if_index = pg_if.sw_if_index
372 for packet in capture:
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100373 if etype > 0:
374 if packet[Ether].type != etype:
375 self.logger.error(ppp("Unexpected ethertype in packet:",
376 packet))
377 else:
378 continue
Pavel Kotucek59dda062017-03-02 15:22:47 +0100379 try:
380 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
381 if traffic_type == self.ICMP and ip_type == self.IPV6:
382 payload_info = self.payload_to_info(
383 packet[ICMPv6EchoRequest].data)
384 payload = packet[ICMPv6EchoRequest]
385 else:
386 payload_info = self.payload_to_info(str(packet[Raw]))
387 payload = packet[self.proto_map[payload_info.proto]]
388 except:
389 self.logger.error(ppp("Unexpected or invalid packet "
390 "(outside network):", packet))
391 raise
392
393 if ip_type != 0:
394 self.assertEqual(payload_info.ip, ip_type)
395 if traffic_type == self.ICMP:
396 try:
397 if payload_info.ip == 0:
398 self.assertEqual(payload.type, self.icmp4_type)
399 self.assertEqual(payload.code, self.icmp4_code)
400 else:
401 self.assertEqual(payload.type, self.icmp6_type)
402 self.assertEqual(payload.code, self.icmp6_code)
403 except:
404 self.logger.error(ppp("Unexpected or invalid packet "
405 "(outside network):", packet))
406 raise
407 else:
408 try:
409 ip_version = IPv6 if payload_info.ip == 1 else IP
410
411 ip = packet[ip_version]
412 packet_index = payload_info.index
413
414 self.assertEqual(payload_info.dst, dst_sw_if_index)
415 self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
416 (pg_if.name, payload_info.src,
417 packet_index))
418 next_info = self.get_next_packet_info_for_interface2(
419 payload_info.src, dst_sw_if_index,
420 last_info[payload_info.src])
421 last_info[payload_info.src] = next_info
422 self.assertTrue(next_info is not None)
423 self.assertEqual(packet_index, next_info.index)
424 saved_packet = next_info.data
425 # Check standard fields
426 self.assertEqual(ip.src, saved_packet[ip_version].src)
427 self.assertEqual(ip.dst, saved_packet[ip_version].dst)
428 p = self.proto_map[payload_info.proto]
429 if p == 'TCP':
430 tcp = packet[TCP]
431 self.assertEqual(tcp.sport, saved_packet[
432 TCP].sport)
433 self.assertEqual(tcp.dport, saved_packet[
434 TCP].dport)
435 elif p == 'UDP':
436 udp = packet[UDP]
437 self.assertEqual(udp.sport, saved_packet[
438 UDP].sport)
439 self.assertEqual(udp.dport, saved_packet[
440 UDP].dport)
441 except:
442 self.logger.error(ppp("Unexpected or invalid packet:",
443 packet))
444 raise
445 for i in self.pg_interfaces:
446 remaining_packet = self.get_next_packet_info_for_interface2(
447 i, dst_sw_if_index, last_info[i.sw_if_index])
448 self.assertTrue(
449 remaining_packet is None,
450 "Port %u: Packet expected from source %u didn't arrive" %
451 (dst_sw_if_index, i.sw_if_index))
452
453 def run_traffic_no_check(self):
454 # Test
455 # Create incoming packet streams for packet-generator interfaces
456 for i in self.pg_interfaces:
457 if self.flows.__contains__(i):
458 pkts = self.create_stream(i, self.pg_if_packet_sizes)
459 if len(pkts) > 0:
460 i.add_stream(pkts)
461
462 # Enable packet capture and start packet sending
463 self.pg_enable_capture(self.pg_interfaces)
464 self.pg_start()
465
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000466 def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100467 frags=False, pkt_raw=True, etype=-1):
Pavel Kotucek59dda062017-03-02 15:22:47 +0100468 # Test
469 # Create incoming packet streams for packet-generator interfaces
470 pkts_cnt = 0
471 for i in self.pg_interfaces:
472 if self.flows.__contains__(i):
473 pkts = self.create_stream(i, self.pg_if_packet_sizes,
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000474 traffic_type, ip_type, proto, ports,
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100475 frags, pkt_raw, etype)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100476 if len(pkts) > 0:
477 i.add_stream(pkts)
478 pkts_cnt += len(pkts)
479
480 # Enable packet capture and start packet sendingself.IPV
481 self.pg_enable_capture(self.pg_interfaces)
482 self.pg_start()
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +0200483 self.logger.info("sent packets count: %d" % pkts_cnt)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100484
485 # Verify
486 # Verify outgoing packet streams per packet-generator interface
487 for src_if in self.pg_interfaces:
488 if self.flows.__contains__(src_if):
489 for dst_if in self.flows[src_if]:
490 capture = dst_if.get_capture(pkts_cnt)
491 self.logger.info("Verifying capture on interface %s" %
492 dst_if.name)
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100493 self.verify_capture(dst_if, capture,
494 traffic_type, ip_type, etype)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100495
496 def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100497 ports=0, frags=False, etype=-1):
Pavel Kotucek59dda062017-03-02 15:22:47 +0100498 # Test
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +0200499 pkts_cnt = 0
Pavel Kotucek59dda062017-03-02 15:22:47 +0100500 self.reset_packet_infos()
501 for i in self.pg_interfaces:
502 if self.flows.__contains__(i):
503 pkts = self.create_stream(i, self.pg_if_packet_sizes,
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000504 traffic_type, ip_type, proto, ports,
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +0100505 frags, True, etype)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100506 if len(pkts) > 0:
507 i.add_stream(pkts)
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +0200508 pkts_cnt += len(pkts)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100509
510 # Enable packet capture and start packet sending
511 self.pg_enable_capture(self.pg_interfaces)
512 self.pg_start()
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +0200513 self.logger.info("sent packets count: %d" % pkts_cnt)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100514
515 # Verify
516 # Verify outgoing packet streams per packet-generator interface
517 for src_if in self.pg_interfaces:
518 if self.flows.__contains__(src_if):
519 for dst_if in self.flows[src_if]:
520 self.logger.info("Verifying capture on interface %s" %
521 dst_if.name)
522 capture = dst_if.get_capture(0)
523 self.assertEqual(len(capture), 0)
524
Pavel Kotucek59dda062017-03-02 15:22:47 +0100525 def test_0000_warmup_test(self):
526 """ ACL plugin version check; learn MACs
527 """
Pavel Kotucek59dda062017-03-02 15:22:47 +0100528 reply = self.vapi.papi.acl_plugin_get_version()
529 self.assertEqual(reply.major, 1)
530 self.logger.info("Working with ACL plugin version: %d.%d" % (
531 reply.major, reply.minor))
532 # minor version changes are non breaking
533 # self.assertEqual(reply.minor, 0)
534
535 def test_0001_acl_create(self):
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200536 """ ACL create/delete test
Pavel Kotucek59dda062017-03-02 15:22:47 +0100537 """
538
539 self.logger.info("ACLP_TEST_START_0001")
540 # Add an ACL
541 r = [{'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
542 'srcport_or_icmptype_first': 1234,
543 'srcport_or_icmptype_last': 1235,
544 'src_ip_prefix_len': 0,
545 'src_ip_addr': '\x00\x00\x00\x00',
546 'dstport_or_icmpcode_first': 1234,
547 'dstport_or_icmpcode_last': 1234,
548 'dst_ip_addr': '\x00\x00\x00\x00',
549 'dst_ip_prefix_len': 0}]
550 # Test 1: add a new ACL
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200551 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r,
552 tag="permit 1234")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100553 self.assertEqual(reply.retval, 0)
554 # The very first ACL gets #0
555 self.assertEqual(reply.acl_index, 0)
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200556 first_acl = reply.acl_index
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200557 rr = self.vapi.acl_dump(reply.acl_index)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100558 self.logger.info("Dumped ACL: " + str(rr))
559 self.assertEqual(len(rr), 1)
560 # We should have the same number of ACL entries as we had asked
561 self.assertEqual(len(rr[0].r), len(r))
562 # The rules should be the same. But because the submitted and returned
563 # are different types, we need to iterate over rules and keys to get
564 # to basic values.
565 for i_rule in range(0, len(r) - 1):
566 for rule_key in r[i_rule]:
567 self.assertEqual(rr[0].r[i_rule][rule_key],
568 r[i_rule][rule_key])
569
570 # Add a deny-1234 ACL
Ole Troan895b6e82017-10-20 13:28:20 +0200571 r_deny = [{'is_permit': 0, 'is_ipv6': 0, 'proto': 17,
Pavel Kotucek59dda062017-03-02 15:22:47 +0100572 'srcport_or_icmptype_first': 1234,
573 'srcport_or_icmptype_last': 1235,
574 'src_ip_prefix_len': 0,
575 'src_ip_addr': '\x00\x00\x00\x00',
576 'dstport_or_icmpcode_first': 1234,
577 'dstport_or_icmpcode_last': 1234,
578 'dst_ip_addr': '\x00\x00\x00\x00',
579 'dst_ip_prefix_len': 0},
580 {'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
581 'srcport_or_icmptype_first': 0,
582 'srcport_or_icmptype_last': 0,
583 'src_ip_prefix_len': 0,
584 'src_ip_addr': '\x00\x00\x00\x00',
585 'dstport_or_icmpcode_first': 0,
586 'dstport_or_icmpcode_last': 0,
587 'dst_ip_addr': '\x00\x00\x00\x00',
Ole Troan895b6e82017-10-20 13:28:20 +0200588 'dst_ip_prefix_len': 0}]
Pavel Kotucek59dda062017-03-02 15:22:47 +0100589
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200590 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_deny,
591 tag="deny 1234;permit all")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100592 self.assertEqual(reply.retval, 0)
593 # The second ACL gets #1
594 self.assertEqual(reply.acl_index, 1)
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200595 second_acl = reply.acl_index
Pavel Kotucek59dda062017-03-02 15:22:47 +0100596
597 # Test 2: try to modify a nonexistent ACL
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200598 reply = self.vapi.acl_add_replace(acl_index=432, r=r,
Jon Loeliger27cadd22017-11-10 13:15:12 -0600599 tag="FFFF:FFFF", expected_retval=-6)
600 self.assertEqual(reply.retval, -6)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100601 # The ACL number should pass through
602 self.assertEqual(reply.acl_index, 432)
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200603 # apply an ACL on an interface inbound, try to delete ACL, must fail
604 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
605 n_input=1,
606 acls=[first_acl])
Jon Loeliger27cadd22017-11-10 13:15:12 -0600607 reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=-142)
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200608 # Unapply an ACL and then try to delete it - must be ok
609 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
610 n_input=0,
611 acls=[])
612 reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=0)
613
614 # apply an ACL on an interface outbound, try to delete ACL, must fail
615 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
616 n_input=0,
617 acls=[second_acl])
Jon Loeliger27cadd22017-11-10 13:15:12 -0600618 reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=-143)
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200619 # Unapply the ACL and then try to delete it - must be ok
620 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
621 n_input=0,
622 acls=[])
623 reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=0)
624
625 # try to apply a nonexistent ACL - must fail
626 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
627 n_input=1,
628 acls=[first_acl],
Jon Loeliger27cadd22017-11-10 13:15:12 -0600629 expected_retval=-6)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100630
631 self.logger.info("ACLP_TEST_FINISH_0001")
632
633 def test_0002_acl_permit_apply(self):
634 """ permit ACL apply test
635 """
636 self.logger.info("ACLP_TEST_START_0002")
637
638 rules = []
639 rules.append(self.create_rule(self.IPV4, self.PERMIT,
640 0, self.proto[self.IP][self.UDP]))
641 rules.append(self.create_rule(self.IPV4, self.PERMIT,
642 0, self.proto[self.IP][self.TCP]))
643
644 # Apply rules
645 self.apply_rules(rules, "permit per-flow")
646
647 # Traffic should still pass
648 self.run_verify_test(self.IP, self.IPV4, -1)
649 self.logger.info("ACLP_TEST_FINISH_0002")
650
651 def test_0003_acl_deny_apply(self):
652 """ deny ACL apply test
653 """
654 self.logger.info("ACLP_TEST_START_0003")
655 # Add a deny-flows ACL
656 rules = []
657 rules.append(self.create_rule(self.IPV4, self.DENY,
658 self.PORTS_ALL, self.proto[self.IP][self.UDP]))
659 # Permit ip any any in the end
660 rules.append(self.create_rule(self.IPV4, self.PERMIT,
661 self.PORTS_ALL, 0))
662
663 # Apply rules
664 self.apply_rules(rules, "deny per-flow;permit all")
665
666 # Traffic should not pass
667 self.run_verify_negat_test(self.IP, self.IPV4,
668 self.proto[self.IP][self.UDP])
669 self.logger.info("ACLP_TEST_FINISH_0003")
670 # self.assertEqual(1, 0)
671
672 def test_0004_vpp624_permit_icmpv4(self):
673 """ VPP_624 permit ICMPv4
674 """
675 self.logger.info("ACLP_TEST_START_0004")
676
677 # Add an ACL
678 rules = []
679 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
680 self.proto[self.ICMP][self.ICMPv4]))
681 # deny ip any any in the end
682 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
683
684 # Apply rules
685 self.apply_rules(rules, "permit icmpv4")
686
687 # Traffic should still pass
688 self.run_verify_test(self.ICMP, self.IPV4,
689 self.proto[self.ICMP][self.ICMPv4])
690
691 self.logger.info("ACLP_TEST_FINISH_0004")
692
693 def test_0005_vpp624_permit_icmpv6(self):
694 """ VPP_624 permit ICMPv6
695 """
696 self.logger.info("ACLP_TEST_START_0005")
697
698 # Add an ACL
699 rules = []
700 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
701 self.proto[self.ICMP][self.ICMPv6]))
702 # deny ip any any in the end
703 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
704
705 # Apply rules
706 self.apply_rules(rules, "permit icmpv6")
707
708 # Traffic should still pass
709 self.run_verify_test(self.ICMP, self.IPV6,
710 self.proto[self.ICMP][self.ICMPv6])
711
712 self.logger.info("ACLP_TEST_FINISH_0005")
713
714 def test_0006_vpp624_deny_icmpv4(self):
715 """ VPP_624 deny ICMPv4
716 """
717 self.logger.info("ACLP_TEST_START_0006")
718 # Add an ACL
719 rules = []
720 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
721 self.proto[self.ICMP][self.ICMPv4]))
722 # permit ip any any in the end
723 rules.append(self.create_rule(self.IPV4, self.PERMIT,
724 self.PORTS_ALL, 0))
725
726 # Apply rules
727 self.apply_rules(rules, "deny icmpv4")
728
729 # Traffic should not pass
730 self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
731
732 self.logger.info("ACLP_TEST_FINISH_0006")
733
734 def test_0007_vpp624_deny_icmpv6(self):
735 """ VPP_624 deny ICMPv6
736 """
737 self.logger.info("ACLP_TEST_START_0007")
738 # Add an ACL
739 rules = []
740 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
741 self.proto[self.ICMP][self.ICMPv6]))
742 # deny ip any any in the end
743 rules.append(self.create_rule(self.IPV6, self.PERMIT,
744 self.PORTS_ALL, 0))
745
746 # Apply rules
747 self.apply_rules(rules, "deny icmpv6")
748
749 # Traffic should not pass
750 self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
751
752 self.logger.info("ACLP_TEST_FINISH_0007")
753
754 def test_0008_tcp_permit_v4(self):
755 """ permit TCPv4
756 """
757 self.logger.info("ACLP_TEST_START_0008")
758
759 # Add an ACL
760 rules = []
761 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
762 self.proto[self.IP][self.TCP]))
763 # deny ip any any in the end
764 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
765
766 # Apply rules
767 self.apply_rules(rules, "permit ipv4 tcp")
768
769 # Traffic should still pass
770 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
771
772 self.logger.info("ACLP_TEST_FINISH_0008")
773
774 def test_0009_tcp_permit_v6(self):
775 """ permit TCPv6
776 """
777 self.logger.info("ACLP_TEST_START_0009")
778
779 # Add an ACL
780 rules = []
781 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
782 self.proto[self.IP][self.TCP]))
783 # deny ip any any in the end
784 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
785
786 # Apply rules
787 self.apply_rules(rules, "permit ip6 tcp")
788
789 # Traffic should still pass
790 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
791
792 self.logger.info("ACLP_TEST_FINISH_0008")
793
794 def test_0010_udp_permit_v4(self):
795 """ permit UDPv4
796 """
797 self.logger.info("ACLP_TEST_START_0010")
798
799 # Add an ACL
800 rules = []
801 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
802 self.proto[self.IP][self.UDP]))
803 # deny ip any any in the end
804 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
805
806 # Apply rules
807 self.apply_rules(rules, "permit ipv udp")
808
809 # Traffic should still pass
810 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
811
812 self.logger.info("ACLP_TEST_FINISH_0010")
813
814 def test_0011_udp_permit_v6(self):
815 """ permit UDPv6
816 """
817 self.logger.info("ACLP_TEST_START_0011")
818
819 # Add an ACL
820 rules = []
821 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
822 self.proto[self.IP][self.UDP]))
823 # deny ip any any in the end
824 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
825
826 # Apply rules
827 self.apply_rules(rules, "permit ip6 udp")
828
829 # Traffic should still pass
830 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
831
832 self.logger.info("ACLP_TEST_FINISH_0011")
833
834 def test_0012_tcp_deny(self):
835 """ deny TCPv4/v6
836 """
837 self.logger.info("ACLP_TEST_START_0012")
838
839 # Add an ACL
840 rules = []
841 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
842 self.proto[self.IP][self.TCP]))
843 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
844 self.proto[self.IP][self.TCP]))
845 # permit ip any any in the end
846 rules.append(self.create_rule(self.IPV4, self.PERMIT,
847 self.PORTS_ALL, 0))
848 rules.append(self.create_rule(self.IPV6, self.PERMIT,
849 self.PORTS_ALL, 0))
850
851 # Apply rules
852 self.apply_rules(rules, "deny ip4/ip6 tcp")
853
854 # Traffic should not pass
855 self.run_verify_negat_test(self.IP, self.IPRANDOM,
856 self.proto[self.IP][self.TCP])
857
858 self.logger.info("ACLP_TEST_FINISH_0012")
859
860 def test_0013_udp_deny(self):
861 """ deny UDPv4/v6
862 """
863 self.logger.info("ACLP_TEST_START_0013")
864
865 # Add an ACL
866 rules = []
867 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
868 self.proto[self.IP][self.UDP]))
869 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
870 self.proto[self.IP][self.UDP]))
871 # permit ip any any in the end
872 rules.append(self.create_rule(self.IPV4, self.PERMIT,
873 self.PORTS_ALL, 0))
874 rules.append(self.create_rule(self.IPV6, self.PERMIT,
875 self.PORTS_ALL, 0))
876
877 # Apply rules
878 self.apply_rules(rules, "deny ip4/ip6 udp")
879
880 # Traffic should not pass
881 self.run_verify_negat_test(self.IP, self.IPRANDOM,
882 self.proto[self.IP][self.UDP])
883
884 self.logger.info("ACLP_TEST_FINISH_0013")
885
886 def test_0014_acl_dump(self):
887 """ verify add/dump acls
888 """
889 self.logger.info("ACLP_TEST_START_0014")
890
891 r = [[self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
892 [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
893 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
894 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
895 [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
896 [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
897 [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
898 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
899 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
900 [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
901 [self.IPV4, self.DENY, self.PORTS_ALL, 0],
902 [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
903 [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
904 [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
905 [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
906 [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
907 [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
908 [self.IPV6, self.DENY, self.PORTS_ALL, 0]
909 ]
910
911 # Add and verify new ACLs
912 rules = []
913 for i in range(len(r)):
914 rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
915
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200916 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules)
917 result = self.vapi.acl_dump(reply.acl_index)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100918
919 i = 0
920 for drules in result:
921 for dr in drules.r:
922 self.assertEqual(dr.is_ipv6, r[i][0])
923 self.assertEqual(dr.is_permit, r[i][1])
924 self.assertEqual(dr.proto, r[i][3])
925
926 if r[i][2] > 0:
927 self.assertEqual(dr.srcport_or_icmptype_first, r[i][2])
928 else:
929 if r[i][2] < 0:
930 self.assertEqual(dr.srcport_or_icmptype_first, 0)
931 self.assertEqual(dr.srcport_or_icmptype_last, 65535)
932 else:
933 if dr.proto == self.proto[self.IP][self.TCP]:
934 self.assertGreater(dr.srcport_or_icmptype_first,
935 self.tcp_sport_from-1)
936 self.assertLess(dr.srcport_or_icmptype_first,
937 self.tcp_sport_to+1)
938 self.assertGreater(dr.dstport_or_icmpcode_last,
939 self.tcp_dport_from-1)
940 self.assertLess(dr.dstport_or_icmpcode_last,
941 self.tcp_dport_to+1)
942 elif dr.proto == self.proto[self.IP][self.UDP]:
943 self.assertGreater(dr.srcport_or_icmptype_first,
944 self.udp_sport_from-1)
945 self.assertLess(dr.srcport_or_icmptype_first,
946 self.udp_sport_to+1)
947 self.assertGreater(dr.dstport_or_icmpcode_last,
948 self.udp_dport_from-1)
949 self.assertLess(dr.dstport_or_icmpcode_last,
950 self.udp_dport_to+1)
951 i += 1
952
953 self.logger.info("ACLP_TEST_FINISH_0014")
954
955 def test_0015_tcp_permit_port_v4(self):
956 """ permit single TCPv4
957 """
958 self.logger.info("ACLP_TEST_START_0015")
959
960 port = random.randint(0, 65535)
961 # Add an ACL
962 rules = []
963 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
964 self.proto[self.IP][self.TCP]))
965 # deny ip any any in the end
966 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
967
968 # Apply rules
969 self.apply_rules(rules, "permit ip4 tcp "+str(port))
970
971 # Traffic should still pass
972 self.run_verify_test(self.IP, self.IPV4,
973 self.proto[self.IP][self.TCP], port)
974
975 self.logger.info("ACLP_TEST_FINISH_0015")
976
977 def test_0016_udp_permit_port_v4(self):
978 """ permit single UDPv4
979 """
980 self.logger.info("ACLP_TEST_START_0016")
981
982 port = random.randint(0, 65535)
983 # Add an ACL
984 rules = []
985 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
986 self.proto[self.IP][self.UDP]))
987 # deny ip any any in the end
988 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
989
990 # Apply rules
991 self.apply_rules(rules, "permit ip4 tcp "+str(port))
992
993 # Traffic should still pass
994 self.run_verify_test(self.IP, self.IPV4,
995 self.proto[self.IP][self.UDP], port)
996
997 self.logger.info("ACLP_TEST_FINISH_0016")
998
999 def test_0017_tcp_permit_port_v6(self):
1000 """ permit single TCPv6
1001 """
1002 self.logger.info("ACLP_TEST_START_0017")
1003
1004 port = random.randint(0, 65535)
1005 # Add an ACL
1006 rules = []
1007 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1008 self.proto[self.IP][self.TCP]))
1009 # deny ip any any in the end
1010 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1011
1012 # Apply rules
1013 self.apply_rules(rules, "permit ip4 tcp "+str(port))
1014
1015 # Traffic should still pass
1016 self.run_verify_test(self.IP, self.IPV6,
1017 self.proto[self.IP][self.TCP], port)
1018
1019 self.logger.info("ACLP_TEST_FINISH_0017")
1020
1021 def test_0018_udp_permit_port_v6(self):
1022 """ permit single UPPv6
1023 """
1024 self.logger.info("ACLP_TEST_START_0018")
1025
1026 port = random.randint(0, 65535)
1027 # Add an ACL
1028 rules = []
1029 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1030 self.proto[self.IP][self.UDP]))
1031 # deny ip any any in the end
1032 rules.append(self.create_rule(self.IPV6, self.DENY,
1033 self.PORTS_ALL, 0))
1034
1035 # Apply rules
1036 self.apply_rules(rules, "permit ip4 tcp "+str(port))
1037
1038 # Traffic should still pass
1039 self.run_verify_test(self.IP, self.IPV6,
1040 self.proto[self.IP][self.UDP], port)
1041
1042 self.logger.info("ACLP_TEST_FINISH_0018")
1043
1044 def test_0019_udp_deny_port(self):
1045 """ deny single TCPv4/v6
1046 """
1047 self.logger.info("ACLP_TEST_START_0019")
1048
1049 port = random.randint(0, 65535)
1050 # Add an ACL
1051 rules = []
1052 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1053 self.proto[self.IP][self.TCP]))
1054 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1055 self.proto[self.IP][self.TCP]))
1056 # Permit ip any any in the end
1057 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1058 self.PORTS_ALL, 0))
1059 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1060 self.PORTS_ALL, 0))
1061
1062 # Apply rules
1063 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1064
1065 # Traffic should not pass
1066 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1067 self.proto[self.IP][self.TCP], port)
1068
1069 self.logger.info("ACLP_TEST_FINISH_0019")
1070
1071 def test_0020_udp_deny_port(self):
1072 """ deny single UDPv4/v6
1073 """
1074 self.logger.info("ACLP_TEST_START_0020")
1075
1076 port = random.randint(0, 65535)
1077 # Add an ACL
1078 rules = []
1079 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1080 self.proto[self.IP][self.UDP]))
1081 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1082 self.proto[self.IP][self.UDP]))
1083 # Permit ip any any in the end
1084 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1085 self.PORTS_ALL, 0))
1086 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1087 self.PORTS_ALL, 0))
1088
1089 # Apply rules
1090 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1091
1092 # Traffic should not pass
1093 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1094 self.proto[self.IP][self.UDP], port)
1095
1096 self.logger.info("ACLP_TEST_FINISH_0020")
1097
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +00001098 def test_0021_udp_deny_port_verify_fragment_deny(self):
Chris Luked0421942018-04-10 15:19:54 -04001099 """ deny single UDPv4/v6, permit ip any, verify non-initial fragment
1100 blocked
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +00001101 """
1102 self.logger.info("ACLP_TEST_START_0021")
1103
1104 port = random.randint(0, 65535)
1105 # Add an ACL
1106 rules = []
1107 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1108 self.proto[self.IP][self.UDP]))
1109 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1110 self.proto[self.IP][self.UDP]))
1111 # deny ip any any in the end
1112 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1113 self.PORTS_ALL, 0))
1114 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1115 self.PORTS_ALL, 0))
1116
1117 # Apply rules
1118 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1119
1120 # Traffic should not pass
1121 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1122 self.proto[self.IP][self.UDP], port, True)
1123
1124 self.logger.info("ACLP_TEST_FINISH_0021")
1125
Pavel Kotuceke7b67342017-04-18 13:12:20 +02001126 def test_0022_zero_length_udp_ipv4(self):
1127 """ VPP-687 zero length udp ipv4 packet"""
1128 self.logger.info("ACLP_TEST_START_0022")
1129
1130 port = random.randint(0, 65535)
1131 # Add an ACL
1132 rules = []
1133 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
1134 self.proto[self.IP][self.UDP]))
1135 # deny ip any any in the end
1136 rules.append(
1137 self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1138
1139 # Apply rules
1140 self.apply_rules(rules, "permit empty udp ip4 " + str(port))
1141
1142 # Traffic should still pass
1143 # Create incoming packet streams for packet-generator interfaces
1144 pkts_cnt = 0
1145 pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1146 self.IP, self.IPV4,
1147 self.proto[self.IP][self.UDP], port,
1148 False, False)
1149 if len(pkts) > 0:
1150 self.pg0.add_stream(pkts)
1151 pkts_cnt += len(pkts)
1152
1153 # Enable packet capture and start packet sendingself.IPV
1154 self.pg_enable_capture(self.pg_interfaces)
1155 self.pg_start()
1156
1157 self.pg1.get_capture(pkts_cnt)
1158
1159 self.logger.info("ACLP_TEST_FINISH_0022")
1160
1161 def test_0023_zero_length_udp_ipv6(self):
1162 """ VPP-687 zero length udp ipv6 packet"""
1163 self.logger.info("ACLP_TEST_START_0023")
1164
1165 port = random.randint(0, 65535)
1166 # Add an ACL
1167 rules = []
1168 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1169 self.proto[self.IP][self.UDP]))
1170 # deny ip any any in the end
1171 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1172
1173 # Apply rules
1174 self.apply_rules(rules, "permit empty udp ip6 "+str(port))
1175
1176 # Traffic should still pass
1177 # Create incoming packet streams for packet-generator interfaces
1178 pkts_cnt = 0
1179 pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1180 self.IP, self.IPV6,
1181 self.proto[self.IP][self.UDP], port,
1182 False, False)
1183 if len(pkts) > 0:
1184 self.pg0.add_stream(pkts)
1185 pkts_cnt += len(pkts)
1186
1187 # Enable packet capture and start packet sendingself.IPV
1188 self.pg_enable_capture(self.pg_interfaces)
1189 self.pg_start()
1190
1191 # Verify outgoing packet streams per packet-generator interface
1192 self.pg1.get_capture(pkts_cnt)
1193
1194 self.logger.info("ACLP_TEST_FINISH_0023")
1195
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +02001196 def test_0108_tcp_permit_v4(self):
1197 """ permit TCPv4 + non-match range
1198 """
1199 self.logger.info("ACLP_TEST_START_0108")
1200
1201 # Add an ACL
1202 rules = []
1203 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1204 self.proto[self.IP][self.TCP]))
1205 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1206 self.proto[self.IP][self.TCP]))
1207 # deny ip any any in the end
1208 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1209
1210 # Apply rules
1211 self.apply_rules(rules, "permit ipv4 tcp")
1212
1213 # Traffic should still pass
1214 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1215
1216 self.logger.info("ACLP_TEST_FINISH_0108")
1217
1218 def test_0109_tcp_permit_v6(self):
1219 """ permit TCPv6 + non-match range
1220 """
1221 self.logger.info("ACLP_TEST_START_0109")
1222
1223 # Add an ACL
1224 rules = []
1225 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1226 self.proto[self.IP][self.TCP]))
1227 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1228 self.proto[self.IP][self.TCP]))
1229 # deny ip any any in the end
1230 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1231
1232 # Apply rules
1233 self.apply_rules(rules, "permit ip6 tcp")
1234
1235 # Traffic should still pass
1236 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
1237
1238 self.logger.info("ACLP_TEST_FINISH_0109")
1239
1240 def test_0110_udp_permit_v4(self):
1241 """ permit UDPv4 + non-match range
1242 """
1243 self.logger.info("ACLP_TEST_START_0110")
1244
1245 # Add an ACL
1246 rules = []
1247 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1248 self.proto[self.IP][self.UDP]))
1249 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1250 self.proto[self.IP][self.UDP]))
1251 # deny ip any any in the end
1252 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1253
1254 # Apply rules
1255 self.apply_rules(rules, "permit ipv4 udp")
1256
1257 # Traffic should still pass
1258 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
1259
1260 self.logger.info("ACLP_TEST_FINISH_0110")
1261
1262 def test_0111_udp_permit_v6(self):
1263 """ permit UDPv6 + non-match range
1264 """
1265 self.logger.info("ACLP_TEST_START_0111")
1266
1267 # Add an ACL
1268 rules = []
1269 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1270 self.proto[self.IP][self.UDP]))
1271 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1272 self.proto[self.IP][self.UDP]))
1273 # deny ip any any in the end
1274 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1275
1276 # Apply rules
1277 self.apply_rules(rules, "permit ip6 udp")
1278
1279 # Traffic should still pass
1280 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
1281
1282 self.logger.info("ACLP_TEST_FINISH_0111")
1283
1284 def test_0112_tcp_deny(self):
1285 """ deny TCPv4/v6 + non-match range
1286 """
1287 self.logger.info("ACLP_TEST_START_0112")
1288
1289 # Add an ACL
1290 rules = []
1291 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1292 self.PORTS_RANGE_2,
1293 self.proto[self.IP][self.TCP]))
1294 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1295 self.PORTS_RANGE_2,
1296 self.proto[self.IP][self.TCP]))
1297 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1298 self.proto[self.IP][self.TCP]))
1299 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1300 self.proto[self.IP][self.TCP]))
1301 # permit ip any any in the end
1302 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1303 self.PORTS_ALL, 0))
1304 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1305 self.PORTS_ALL, 0))
1306
1307 # Apply rules
1308 self.apply_rules(rules, "deny ip4/ip6 tcp")
1309
1310 # Traffic should not pass
1311 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1312 self.proto[self.IP][self.TCP])
1313
1314 self.logger.info("ACLP_TEST_FINISH_0112")
1315
1316 def test_0113_udp_deny(self):
1317 """ deny UDPv4/v6 + non-match range
1318 """
1319 self.logger.info("ACLP_TEST_START_0113")
1320
1321 # Add an ACL
1322 rules = []
1323 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1324 self.PORTS_RANGE_2,
1325 self.proto[self.IP][self.UDP]))
1326 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1327 self.PORTS_RANGE_2,
1328 self.proto[self.IP][self.UDP]))
1329 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1330 self.proto[self.IP][self.UDP]))
1331 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1332 self.proto[self.IP][self.UDP]))
1333 # permit ip any any in the end
1334 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1335 self.PORTS_ALL, 0))
1336 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1337 self.PORTS_ALL, 0))
1338
1339 # Apply rules
1340 self.apply_rules(rules, "deny ip4/ip6 udp")
1341
1342 # Traffic should not pass
1343 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1344 self.proto[self.IP][self.UDP])
1345
1346 self.logger.info("ACLP_TEST_FINISH_0113")
1347
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001348 def test_0300_tcp_permit_v4_etype_aaaa(self):
1349 """ permit TCPv4, send 0xAAAA etype
1350 """
1351 self.logger.info("ACLP_TEST_START_0300")
1352
1353 # Add an ACL
1354 rules = []
1355 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1356 self.proto[self.IP][self.TCP]))
1357 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1358 self.proto[self.IP][self.TCP]))
1359 # deny ip any any in the end
1360 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1361
1362 # Apply rules
1363 self.apply_rules(rules, "permit ipv4 tcp")
1364
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001365 # Traffic should still pass also for an odd ethertype
1366 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1367 0, False, True, 0xaaaa)
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001368 self.logger.info("ACLP_TEST_FINISH_0300")
1369
1370 def test_0305_tcp_permit_v4_etype_blacklist_aaaa(self):
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +02001371 """ permit TCPv4, whitelist 0x0BBB ethertype, send 0xAAAA-blocked
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001372 """
1373 self.logger.info("ACLP_TEST_START_0305")
1374
1375 # Add an ACL
1376 rules = []
1377 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1378 self.proto[self.IP][self.TCP]))
1379 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1380 self.proto[self.IP][self.TCP]))
1381 # deny ip any any in the end
1382 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1383
1384 # Apply rules
1385 self.apply_rules(rules, "permit ipv4 tcp")
1386
1387 # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1388 self.etype_whitelist([0xbbb], 1)
1389
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001390 # The oddball ethertype should be blocked
1391 self.run_verify_negat_test(self.IP, self.IPV4,
1392 self.proto[self.IP][self.TCP],
1393 0, False, 0xaaaa)
1394
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +02001395 # remove the whitelist
1396 self.etype_whitelist([], 0)
1397
1398 self.logger.info("ACLP_TEST_FINISH_0305")
1399
1400 def test_0306_tcp_permit_v4_etype_blacklist_aaaa(self):
1401 """ permit TCPv4, whitelist 0x0BBB ethertype, send 0x0BBB - pass
1402 """
1403 self.logger.info("ACLP_TEST_START_0306")
1404
1405 # Add an ACL
1406 rules = []
1407 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1408 self.proto[self.IP][self.TCP]))
1409 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1410 self.proto[self.IP][self.TCP]))
1411 # deny ip any any in the end
1412 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1413
1414 # Apply rules
1415 self.apply_rules(rules, "permit ipv4 tcp")
1416
1417 # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1418 self.etype_whitelist([0xbbb], 1)
1419
1420 # The whitelisted traffic, should pass
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001421 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1422 0, False, True, 0x0bbb)
1423
1424 # remove the whitelist, the previously blocked 0xAAAA should pass now
1425 self.etype_whitelist([], 0)
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +02001426
1427 self.logger.info("ACLP_TEST_FINISH_0306")
1428
1429 def test_0307_tcp_permit_v4_etype_blacklist_aaaa(self):
1430 """ permit TCPv4, whitelist 0x0BBB, remove, send 0xAAAA - pass
1431 """
1432 self.logger.info("ACLP_TEST_START_0307")
1433
1434 # Add an ACL
1435 rules = []
1436 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1437 self.proto[self.IP][self.TCP]))
1438 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1439 self.proto[self.IP][self.TCP]))
1440 # deny ip any any in the end
1441 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1442
1443 # Apply rules
1444 self.apply_rules(rules, "permit ipv4 tcp")
1445
1446 # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
1447 self.etype_whitelist([0xbbb], 1)
1448 # remove the whitelist, the previously blocked 0xAAAA should pass now
1449 self.etype_whitelist([], 0)
1450
1451 # The whitelisted traffic, should pass
Andrew Yourtchenkoc43b3f92018-02-06 17:42:32 +01001452 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
1453 0, False, True, 0xaaaa)
1454
Andrew Yourtchenko7ff74532018-10-06 09:24:28 +02001455 self.logger.info("ACLP_TEST_FINISH_0306")
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +02001456
Andrew Yourtchenkode3682f2018-03-23 10:38:46 +01001457 def test_0315_del_intf(self):
1458 """ apply an acl and delete the interface
1459 """
1460 self.logger.info("ACLP_TEST_START_0315")
1461
1462 # Add an ACL
1463 rules = []
1464 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1465 self.proto[self.IP][self.TCP]))
1466 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1467 self.proto[self.IP][self.TCP]))
1468 # deny ip any any in the end
1469 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1470
1471 # create an interface
1472 intf = []
Klement Sekerabeaded52018-06-24 10:30:37 +02001473 intf.append(VppLoInterface(self))
Andrew Yourtchenkode3682f2018-03-23 10:38:46 +01001474
1475 # Apply rules
1476 self.apply_rules_to(rules, "permit ipv4 tcp", intf[0].sw_if_index)
1477
1478 # Remove the interface
1479 intf[0].remove_vpp_config()
1480
1481 self.logger.info("ACLP_TEST_FINISH_0315")
1482
Pavel Kotucek59dda062017-03-02 15:22:47 +01001483if __name__ == '__main__':
1484 unittest.main(testRunner=VppTestRunner)