blob: 5f830ea2b50079218d54dfd34f6ed348edbfc061 [file] [log] [blame]
Pavel Kotucek59dda062017-03-02 15:22:47 +01001#!/usr/bin/env python
2"""ACL plugin Test Case HLD:
3"""
4
5import unittest
6import random
7
8from scapy.packet import Raw
9from scapy.layers.l2 import Ether
10from scapy.layers.inet import IP, TCP, UDP, ICMP
11from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +000012from scapy.layers.inet6 import IPv6ExtHdrFragment
Pavel Kotucek59dda062017-03-02 15:22:47 +010013from framework import VppTestCase, VppTestRunner
14from util import Host, ppp
15
16
17class TestACLplugin(VppTestCase):
18 """ ACL plugin Test Case """
19
20 # traffic types
21 IP = 0
22 ICMP = 1
23
24 # IP version
25 IPRANDOM = -1
26 IPV4 = 0
27 IPV6 = 1
28
29 # rule types
30 DENY = 0
31 PERMIT = 1
32
33 # supported protocols
34 proto = [[6, 17], [1, 58]]
35 proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
36 ICMPv4 = 0
37 ICMPv6 = 1
38 TCP = 0
39 UDP = 1
40 PROTO_ALL = 0
41
42 # port ranges
43 PORTS_ALL = -1
44 PORTS_RANGE = 0
45 udp_sport_from = 10
46 udp_sport_to = udp_sport_from + 5
47 udp_dport_from = 20000
48 udp_dport_to = udp_dport_from + 5000
49 tcp_sport_from = 30
50 tcp_sport_to = tcp_sport_from + 5
51 tcp_dport_from = 40000
52 tcp_dport_to = tcp_dport_from + 5000
53
54 icmp4_type = 8 # echo request
55 icmp4_code = 3
56 icmp6_type = 128 # echo request
57 icmp6_code = 3
58
59 # Test variables
60 bd_id = 1
61
62 @classmethod
63 def setUpClass(cls):
64 """
65 Perform standard class setup (defined by class method setUpClass in
66 class VppTestCase) before running the test case, set test case related
67 variables and configure VPP.
68 """
69 super(TestACLplugin, cls).setUpClass()
70
71 random.seed()
72
73 try:
74 # Create 2 pg interfaces
75 cls.create_pg_interfaces(range(2))
76
77 # Packet flows mapping pg0 -> pg1, pg2 etc.
78 cls.flows = dict()
79 cls.flows[cls.pg0] = [cls.pg1]
80
81 # Packet sizes
82 cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
83
84 # Create BD with MAC learning and unknown unicast flooding disabled
85 # and put interfaces to this BD
86 cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
87 learn=1)
88 for pg_if in cls.pg_interfaces:
89 cls.vapi.sw_interface_set_l2_bridge(pg_if.sw_if_index,
90 bd_id=cls.bd_id)
91
92 # Set up all interfaces
93 for i in cls.pg_interfaces:
94 i.admin_up()
95
96 # Mapping between packet-generator index and lists of test hosts
97 cls.hosts_by_pg_idx = dict()
98 for pg_if in cls.pg_interfaces:
99 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
100
101 # Create list of deleted hosts
102 cls.deleted_hosts_by_pg_idx = dict()
103 for pg_if in cls.pg_interfaces:
104 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
105
106 # warm-up the mac address tables
107 # self.warmup_test()
108
109 except Exception:
110 super(TestACLplugin, cls).tearDownClass()
111 raise
112
113 def setUp(self):
114 super(TestACLplugin, self).setUp()
115 self.reset_packet_infos()
116
117 def tearDown(self):
118 """
119 Show various debug prints after each test.
120 """
121 super(TestACLplugin, self).tearDown()
122 if not self.vpp_dead:
123 self.logger.info(self.vapi.ppcli("show l2fib verbose"))
Andrew Yourtchenko7f4d5772017-05-24 13:20:47 +0200124 self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
125 self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
126 self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
Pavel Kotucek59dda062017-03-02 15:22:47 +0100127 self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
128 % self.bd_id))
129
130 def create_hosts(self, count, start=0):
131 """
132 Create required number of host MAC addresses and distribute them among
133 interfaces. Create host IPv4 address for every host MAC address.
134
135 :param int count: Number of hosts to create MAC/IPv4 addresses for.
136 :param int start: Number to start numbering from.
137 """
138 n_int = len(self.pg_interfaces)
139 macs_per_if = count / n_int
140 i = -1
141 for pg_if in self.pg_interfaces:
142 i += 1
143 start_nr = macs_per_if * i + start
144 end_nr = count + start if i == (n_int - 1) \
145 else macs_per_if * (i + 1) + start
146 hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
147 for j in range(start_nr, end_nr):
148 host = Host(
149 "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
150 "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
151 "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
152 hosts.append(host)
153
154 def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
155 s_prefix=0, s_ip='\x00\x00\x00\x00',
156 d_prefix=0, d_ip='\x00\x00\x00\x00'):
157 if proto == -1:
158 return
159 if ports == self.PORTS_ALL:
160 sport_from = 0
161 dport_from = 0
162 sport_to = 65535 if proto != 1 and proto != 58 else 255
163 dport_to = sport_to
164 elif ports == self.PORTS_RANGE:
165 if proto == 1:
166 sport_from = self.icmp4_type
167 sport_to = self.icmp4_type
168 dport_from = self.icmp4_code
169 dport_to = self.icmp4_code
170 elif proto == 58:
171 sport_from = self.icmp6_type
172 sport_to = self.icmp6_type
173 dport_from = self.icmp6_code
174 dport_to = self.icmp6_code
175 elif proto == self.proto[self.IP][self.TCP]:
176 sport_from = self.tcp_sport_from
177 sport_to = self.tcp_sport_to
178 dport_from = self.tcp_dport_from
179 dport_to = self.tcp_dport_to
180 elif proto == self.proto[self.IP][self.UDP]:
181 sport_from = self.udp_sport_from
182 sport_to = self.udp_sport_to
183 dport_from = self.udp_dport_from
184 dport_to = self.udp_dport_to
185 else:
186 sport_from = ports
187 sport_to = ports
188 dport_from = ports
189 dport_to = ports
190
191 rule = ({'is_permit': permit_deny, 'is_ipv6': ip, 'proto': proto,
192 'srcport_or_icmptype_first': sport_from,
193 'srcport_or_icmptype_last': sport_to,
194 'src_ip_prefix_len': s_prefix,
195 'src_ip_addr': s_ip,
196 'dstport_or_icmpcode_first': dport_from,
197 'dstport_or_icmpcode_last': dport_to,
198 'dst_ip_prefix_len': d_prefix,
199 'dst_ip_addr': d_ip})
200 return rule
201
202 def apply_rules(self, rules, tag=''):
203 reply = self.api_acl_add_replace(acl_index=4294967295, r=rules,
204 count=len(rules),
205 tag=tag)
206 self.logger.info("Dumped ACL: " + str(
207 self.api_acl_dump(reply.acl_index)))
208 # Apply a ACL on the interface as inbound
209 for i in self.pg_interfaces:
210 self.api_acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
211 count=1, n_input=1,
212 acls=[reply.acl_index])
213 return
214
215 def create_upper_layer(self, packet_index, proto, ports=0):
216 p = self.proto_map[proto]
217 if p == 'UDP':
218 if ports == 0:
219 return UDP(sport=random.randint(self.udp_sport_from,
220 self.udp_sport_to),
221 dport=random.randint(self.udp_dport_from,
222 self.udp_dport_to))
223 else:
224 return UDP(sport=ports, dport=ports)
225 elif p == 'TCP':
226 if ports == 0:
227 return TCP(sport=random.randint(self.tcp_sport_from,
228 self.tcp_sport_to),
229 dport=random.randint(self.tcp_dport_from,
230 self.tcp_dport_to))
231 else:
232 return TCP(sport=ports, dport=ports)
233 return ''
234
235 def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
Pavel Kotuceke7b67342017-04-18 13:12:20 +0200236 proto=-1, ports=0, fragments=False, pkt_raw=True):
Pavel Kotucek59dda062017-03-02 15:22:47 +0100237 """
238 Create input packet stream for defined interface using hosts or
239 deleted_hosts list.
240
241 :param object src_if: Interface to create packet stream for.
242 :param list packet_sizes: List of required packet sizes.
243 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
244 :return: Stream of packets.
245 """
246 pkts = []
247 if self.flows.__contains__(src_if):
248 src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
249 for dst_if in self.flows[src_if]:
250 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
251 n_int = len(dst_hosts) * len(src_hosts)
252 for i in range(0, n_int):
253 dst_host = dst_hosts[i / len(src_hosts)]
254 src_host = src_hosts[i % len(src_hosts)]
255 pkt_info = self.create_packet_info(src_if, dst_if)
256 if ipv6 == 1:
257 pkt_info.ip = 1
258 elif ipv6 == 0:
259 pkt_info.ip = 0
260 else:
261 pkt_info.ip = random.choice([0, 1])
262 if proto == -1:
263 pkt_info.proto = random.choice(self.proto[self.IP])
264 else:
265 pkt_info.proto = proto
266 payload = self.info_to_payload(pkt_info)
267 p = Ether(dst=dst_host.mac, src=src_host.mac)
268 if pkt_info.ip:
269 p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000270 if fragments:
271 p /= IPv6ExtHdrFragment(offset=64, m=1)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100272 else:
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000273 if fragments:
274 p /= IP(src=src_host.ip4, dst=dst_host.ip4,
275 flags=1, frag=64)
276 else:
277 p /= IP(src=src_host.ip4, dst=dst_host.ip4)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100278 if traffic_type == self.ICMP:
279 if pkt_info.ip:
280 p /= ICMPv6EchoRequest(type=self.icmp6_type,
281 code=self.icmp6_code)
282 else:
283 p /= ICMP(type=self.icmp4_type,
284 code=self.icmp4_code)
285 else:
286 p /= self.create_upper_layer(i, pkt_info.proto, ports)
Pavel Kotuceke7b67342017-04-18 13:12:20 +0200287 if pkt_raw:
288 p /= Raw(payload)
289 pkt_info.data = p.copy()
290 if pkt_raw:
291 size = random.choice(packet_sizes)
292 self.extend_packet(p, size)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100293 pkts.append(p)
294 return pkts
295
296 def verify_capture(self, pg_if, capture, traffic_type=0, ip_type=0):
297 """
298 Verify captured input packet stream for defined interface.
299
300 :param object pg_if: Interface to verify captured packet stream for.
301 :param list capture: Captured packet stream.
302 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
303 """
304 last_info = dict()
305 for i in self.pg_interfaces:
306 last_info[i.sw_if_index] = None
307 dst_sw_if_index = pg_if.sw_if_index
308 for packet in capture:
309 try:
310 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
311 if traffic_type == self.ICMP and ip_type == self.IPV6:
312 payload_info = self.payload_to_info(
313 packet[ICMPv6EchoRequest].data)
314 payload = packet[ICMPv6EchoRequest]
315 else:
316 payload_info = self.payload_to_info(str(packet[Raw]))
317 payload = packet[self.proto_map[payload_info.proto]]
318 except:
319 self.logger.error(ppp("Unexpected or invalid packet "
320 "(outside network):", packet))
321 raise
322
323 if ip_type != 0:
324 self.assertEqual(payload_info.ip, ip_type)
325 if traffic_type == self.ICMP:
326 try:
327 if payload_info.ip == 0:
328 self.assertEqual(payload.type, self.icmp4_type)
329 self.assertEqual(payload.code, self.icmp4_code)
330 else:
331 self.assertEqual(payload.type, self.icmp6_type)
332 self.assertEqual(payload.code, self.icmp6_code)
333 except:
334 self.logger.error(ppp("Unexpected or invalid packet "
335 "(outside network):", packet))
336 raise
337 else:
338 try:
339 ip_version = IPv6 if payload_info.ip == 1 else IP
340
341 ip = packet[ip_version]
342 packet_index = payload_info.index
343
344 self.assertEqual(payload_info.dst, dst_sw_if_index)
345 self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
346 (pg_if.name, payload_info.src,
347 packet_index))
348 next_info = self.get_next_packet_info_for_interface2(
349 payload_info.src, dst_sw_if_index,
350 last_info[payload_info.src])
351 last_info[payload_info.src] = next_info
352 self.assertTrue(next_info is not None)
353 self.assertEqual(packet_index, next_info.index)
354 saved_packet = next_info.data
355 # Check standard fields
356 self.assertEqual(ip.src, saved_packet[ip_version].src)
357 self.assertEqual(ip.dst, saved_packet[ip_version].dst)
358 p = self.proto_map[payload_info.proto]
359 if p == 'TCP':
360 tcp = packet[TCP]
361 self.assertEqual(tcp.sport, saved_packet[
362 TCP].sport)
363 self.assertEqual(tcp.dport, saved_packet[
364 TCP].dport)
365 elif p == 'UDP':
366 udp = packet[UDP]
367 self.assertEqual(udp.sport, saved_packet[
368 UDP].sport)
369 self.assertEqual(udp.dport, saved_packet[
370 UDP].dport)
371 except:
372 self.logger.error(ppp("Unexpected or invalid packet:",
373 packet))
374 raise
375 for i in self.pg_interfaces:
376 remaining_packet = self.get_next_packet_info_for_interface2(
377 i, dst_sw_if_index, last_info[i.sw_if_index])
378 self.assertTrue(
379 remaining_packet is None,
380 "Port %u: Packet expected from source %u didn't arrive" %
381 (dst_sw_if_index, i.sw_if_index))
382
383 def run_traffic_no_check(self):
384 # Test
385 # Create incoming packet streams for packet-generator interfaces
386 for i in self.pg_interfaces:
387 if self.flows.__contains__(i):
388 pkts = self.create_stream(i, self.pg_if_packet_sizes)
389 if len(pkts) > 0:
390 i.add_stream(pkts)
391
392 # Enable packet capture and start packet sending
393 self.pg_enable_capture(self.pg_interfaces)
394 self.pg_start()
395
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000396 def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
Pavel Kotuceke7b67342017-04-18 13:12:20 +0200397 frags=False, pkt_raw=True):
Pavel Kotucek59dda062017-03-02 15:22:47 +0100398 # Test
399 # Create incoming packet streams for packet-generator interfaces
400 pkts_cnt = 0
401 for i in self.pg_interfaces:
402 if self.flows.__contains__(i):
403 pkts = self.create_stream(i, self.pg_if_packet_sizes,
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000404 traffic_type, ip_type, proto, ports,
Pavel Kotuceke7b67342017-04-18 13:12:20 +0200405 frags, pkt_raw)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100406 if len(pkts) > 0:
407 i.add_stream(pkts)
408 pkts_cnt += len(pkts)
409
410 # Enable packet capture and start packet sendingself.IPV
411 self.pg_enable_capture(self.pg_interfaces)
412 self.pg_start()
413
414 # Verify
415 # Verify outgoing packet streams per packet-generator interface
416 for src_if in self.pg_interfaces:
417 if self.flows.__contains__(src_if):
418 for dst_if in self.flows[src_if]:
419 capture = dst_if.get_capture(pkts_cnt)
420 self.logger.info("Verifying capture on interface %s" %
421 dst_if.name)
422 self.verify_capture(dst_if, capture, traffic_type, ip_type)
423
424 def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000425 ports=0, frags=False):
Pavel Kotucek59dda062017-03-02 15:22:47 +0100426 # Test
427 self.reset_packet_infos()
428 for i in self.pg_interfaces:
429 if self.flows.__contains__(i):
430 pkts = self.create_stream(i, self.pg_if_packet_sizes,
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000431 traffic_type, ip_type, proto, ports,
432 frags)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100433 if len(pkts) > 0:
434 i.add_stream(pkts)
435
436 # Enable packet capture and start packet sending
437 self.pg_enable_capture(self.pg_interfaces)
438 self.pg_start()
439
440 # Verify
441 # Verify outgoing packet streams per packet-generator interface
442 for src_if in self.pg_interfaces:
443 if self.flows.__contains__(src_if):
444 for dst_if in self.flows[src_if]:
445 self.logger.info("Verifying capture on interface %s" %
446 dst_if.name)
447 capture = dst_if.get_capture(0)
448 self.assertEqual(len(capture), 0)
449
450 def api_acl_add_replace(self, acl_index, r, count, tag='',
451 expected_retval=0):
452 """Add/replace an ACL
453
454 :param int acl_index: ACL index to replace,
455 4294967295 to create new ACL.
456 :param acl_rule r: ACL rules array.
457 :param str tag: symbolic tag (description) for this ACL.
458 :param int count: number of rules.
459 """
460 return self.vapi.api(self.vapi.papi.acl_add_replace,
461 {'acl_index': acl_index,
462 'r': r,
463 'count': count,
464 'tag': tag},
465 expected_retval=expected_retval)
466
467 def api_acl_interface_set_acl_list(self, sw_if_index, count, n_input, acls,
468 expected_retval=0):
469 return self.vapi.api(self.vapi.papi.acl_interface_set_acl_list,
470 {'sw_if_index': sw_if_index,
471 'count': count,
472 'n_input': n_input,
473 'acls': acls},
474 expected_retval=expected_retval)
475
476 def api_acl_dump(self, acl_index, expected_retval=0):
477 return self.vapi.api(self.vapi.papi.acl_dump,
478 {'acl_index': acl_index},
479 expected_retval=expected_retval)
480
481 def test_0000_warmup_test(self):
482 """ ACL plugin version check; learn MACs
483 """
484 self.create_hosts(16)
485 self.run_traffic_no_check()
486 reply = self.vapi.papi.acl_plugin_get_version()
487 self.assertEqual(reply.major, 1)
488 self.logger.info("Working with ACL plugin version: %d.%d" % (
489 reply.major, reply.minor))
490 # minor version changes are non breaking
491 # self.assertEqual(reply.minor, 0)
492
493 def test_0001_acl_create(self):
494 """ ACL create test
495 """
496
497 self.logger.info("ACLP_TEST_START_0001")
498 # Add an ACL
499 r = [{'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
500 'srcport_or_icmptype_first': 1234,
501 'srcport_or_icmptype_last': 1235,
502 'src_ip_prefix_len': 0,
503 'src_ip_addr': '\x00\x00\x00\x00',
504 'dstport_or_icmpcode_first': 1234,
505 'dstport_or_icmpcode_last': 1234,
506 'dst_ip_addr': '\x00\x00\x00\x00',
507 'dst_ip_prefix_len': 0}]
508 # Test 1: add a new ACL
509 reply = self.api_acl_add_replace(acl_index=4294967295, r=r,
510 count=len(r), tag="permit 1234")
511 self.assertEqual(reply.retval, 0)
512 # The very first ACL gets #0
513 self.assertEqual(reply.acl_index, 0)
514 rr = self.api_acl_dump(reply.acl_index)
515 self.logger.info("Dumped ACL: " + str(rr))
516 self.assertEqual(len(rr), 1)
517 # We should have the same number of ACL entries as we had asked
518 self.assertEqual(len(rr[0].r), len(r))
519 # The rules should be the same. But because the submitted and returned
520 # are different types, we need to iterate over rules and keys to get
521 # to basic values.
522 for i_rule in range(0, len(r) - 1):
523 for rule_key in r[i_rule]:
524 self.assertEqual(rr[0].r[i_rule][rule_key],
525 r[i_rule][rule_key])
526
527 # Add a deny-1234 ACL
528 r_deny = ({'is_permit': 0, 'is_ipv6': 0, 'proto': 17,
529 'srcport_or_icmptype_first': 1234,
530 'srcport_or_icmptype_last': 1235,
531 'src_ip_prefix_len': 0,
532 'src_ip_addr': '\x00\x00\x00\x00',
533 'dstport_or_icmpcode_first': 1234,
534 'dstport_or_icmpcode_last': 1234,
535 'dst_ip_addr': '\x00\x00\x00\x00',
536 'dst_ip_prefix_len': 0},
537 {'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
538 'srcport_or_icmptype_first': 0,
539 'srcport_or_icmptype_last': 0,
540 'src_ip_prefix_len': 0,
541 'src_ip_addr': '\x00\x00\x00\x00',
542 'dstport_or_icmpcode_first': 0,
543 'dstport_or_icmpcode_last': 0,
544 'dst_ip_addr': '\x00\x00\x00\x00',
545 'dst_ip_prefix_len': 0})
546
547 reply = self.api_acl_add_replace(acl_index=4294967295, r=r_deny,
548 count=len(r_deny),
549 tag="deny 1234;permit all")
550 self.assertEqual(reply.retval, 0)
551 # The second ACL gets #1
552 self.assertEqual(reply.acl_index, 1)
553
554 # Test 2: try to modify a nonexistent ACL
555 reply = self.api_acl_add_replace(acl_index=432, r=r, count=len(r),
556 tag="FFFF:FFFF", expected_retval=-1)
557 self.assertEqual(reply.retval, -1)
558 # The ACL number should pass through
559 self.assertEqual(reply.acl_index, 432)
560
561 self.logger.info("ACLP_TEST_FINISH_0001")
562
563 def test_0002_acl_permit_apply(self):
564 """ permit ACL apply test
565 """
566 self.logger.info("ACLP_TEST_START_0002")
567
568 rules = []
569 rules.append(self.create_rule(self.IPV4, self.PERMIT,
570 0, self.proto[self.IP][self.UDP]))
571 rules.append(self.create_rule(self.IPV4, self.PERMIT,
572 0, self.proto[self.IP][self.TCP]))
573
574 # Apply rules
575 self.apply_rules(rules, "permit per-flow")
576
577 # Traffic should still pass
578 self.run_verify_test(self.IP, self.IPV4, -1)
579 self.logger.info("ACLP_TEST_FINISH_0002")
580
581 def test_0003_acl_deny_apply(self):
582 """ deny ACL apply test
583 """
584 self.logger.info("ACLP_TEST_START_0003")
585 # Add a deny-flows ACL
586 rules = []
587 rules.append(self.create_rule(self.IPV4, self.DENY,
588 self.PORTS_ALL, self.proto[self.IP][self.UDP]))
589 # Permit ip any any in the end
590 rules.append(self.create_rule(self.IPV4, self.PERMIT,
591 self.PORTS_ALL, 0))
592
593 # Apply rules
594 self.apply_rules(rules, "deny per-flow;permit all")
595
596 # Traffic should not pass
597 self.run_verify_negat_test(self.IP, self.IPV4,
598 self.proto[self.IP][self.UDP])
599 self.logger.info("ACLP_TEST_FINISH_0003")
600 # self.assertEqual(1, 0)
601
602 def test_0004_vpp624_permit_icmpv4(self):
603 """ VPP_624 permit ICMPv4
604 """
605 self.logger.info("ACLP_TEST_START_0004")
606
607 # Add an ACL
608 rules = []
609 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
610 self.proto[self.ICMP][self.ICMPv4]))
611 # deny ip any any in the end
612 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
613
614 # Apply rules
615 self.apply_rules(rules, "permit icmpv4")
616
617 # Traffic should still pass
618 self.run_verify_test(self.ICMP, self.IPV4,
619 self.proto[self.ICMP][self.ICMPv4])
620
621 self.logger.info("ACLP_TEST_FINISH_0004")
622
623 def test_0005_vpp624_permit_icmpv6(self):
624 """ VPP_624 permit ICMPv6
625 """
626 self.logger.info("ACLP_TEST_START_0005")
627
628 # Add an ACL
629 rules = []
630 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
631 self.proto[self.ICMP][self.ICMPv6]))
632 # deny ip any any in the end
633 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
634
635 # Apply rules
636 self.apply_rules(rules, "permit icmpv6")
637
638 # Traffic should still pass
639 self.run_verify_test(self.ICMP, self.IPV6,
640 self.proto[self.ICMP][self.ICMPv6])
641
642 self.logger.info("ACLP_TEST_FINISH_0005")
643
644 def test_0006_vpp624_deny_icmpv4(self):
645 """ VPP_624 deny ICMPv4
646 """
647 self.logger.info("ACLP_TEST_START_0006")
648 # Add an ACL
649 rules = []
650 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
651 self.proto[self.ICMP][self.ICMPv4]))
652 # permit ip any any in the end
653 rules.append(self.create_rule(self.IPV4, self.PERMIT,
654 self.PORTS_ALL, 0))
655
656 # Apply rules
657 self.apply_rules(rules, "deny icmpv4")
658
659 # Traffic should not pass
660 self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
661
662 self.logger.info("ACLP_TEST_FINISH_0006")
663
664 def test_0007_vpp624_deny_icmpv6(self):
665 """ VPP_624 deny ICMPv6
666 """
667 self.logger.info("ACLP_TEST_START_0007")
668 # Add an ACL
669 rules = []
670 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
671 self.proto[self.ICMP][self.ICMPv6]))
672 # deny ip any any in the end
673 rules.append(self.create_rule(self.IPV6, self.PERMIT,
674 self.PORTS_ALL, 0))
675
676 # Apply rules
677 self.apply_rules(rules, "deny icmpv6")
678
679 # Traffic should not pass
680 self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
681
682 self.logger.info("ACLP_TEST_FINISH_0007")
683
684 def test_0008_tcp_permit_v4(self):
685 """ permit TCPv4
686 """
687 self.logger.info("ACLP_TEST_START_0008")
688
689 # Add an ACL
690 rules = []
691 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
692 self.proto[self.IP][self.TCP]))
693 # deny ip any any in the end
694 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
695
696 # Apply rules
697 self.apply_rules(rules, "permit ipv4 tcp")
698
699 # Traffic should still pass
700 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
701
702 self.logger.info("ACLP_TEST_FINISH_0008")
703
704 def test_0009_tcp_permit_v6(self):
705 """ permit TCPv6
706 """
707 self.logger.info("ACLP_TEST_START_0009")
708
709 # Add an ACL
710 rules = []
711 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
712 self.proto[self.IP][self.TCP]))
713 # deny ip any any in the end
714 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
715
716 # Apply rules
717 self.apply_rules(rules, "permit ip6 tcp")
718
719 # Traffic should still pass
720 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
721
722 self.logger.info("ACLP_TEST_FINISH_0008")
723
724 def test_0010_udp_permit_v4(self):
725 """ permit UDPv4
726 """
727 self.logger.info("ACLP_TEST_START_0010")
728
729 # Add an ACL
730 rules = []
731 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
732 self.proto[self.IP][self.UDP]))
733 # deny ip any any in the end
734 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
735
736 # Apply rules
737 self.apply_rules(rules, "permit ipv udp")
738
739 # Traffic should still pass
740 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
741
742 self.logger.info("ACLP_TEST_FINISH_0010")
743
744 def test_0011_udp_permit_v6(self):
745 """ permit UDPv6
746 """
747 self.logger.info("ACLP_TEST_START_0011")
748
749 # Add an ACL
750 rules = []
751 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
752 self.proto[self.IP][self.UDP]))
753 # deny ip any any in the end
754 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
755
756 # Apply rules
757 self.apply_rules(rules, "permit ip6 udp")
758
759 # Traffic should still pass
760 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
761
762 self.logger.info("ACLP_TEST_FINISH_0011")
763
764 def test_0012_tcp_deny(self):
765 """ deny TCPv4/v6
766 """
767 self.logger.info("ACLP_TEST_START_0012")
768
769 # Add an ACL
770 rules = []
771 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
772 self.proto[self.IP][self.TCP]))
773 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
774 self.proto[self.IP][self.TCP]))
775 # permit ip any any in the end
776 rules.append(self.create_rule(self.IPV4, self.PERMIT,
777 self.PORTS_ALL, 0))
778 rules.append(self.create_rule(self.IPV6, self.PERMIT,
779 self.PORTS_ALL, 0))
780
781 # Apply rules
782 self.apply_rules(rules, "deny ip4/ip6 tcp")
783
784 # Traffic should not pass
785 self.run_verify_negat_test(self.IP, self.IPRANDOM,
786 self.proto[self.IP][self.TCP])
787
788 self.logger.info("ACLP_TEST_FINISH_0012")
789
790 def test_0013_udp_deny(self):
791 """ deny UDPv4/v6
792 """
793 self.logger.info("ACLP_TEST_START_0013")
794
795 # Add an ACL
796 rules = []
797 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
798 self.proto[self.IP][self.UDP]))
799 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
800 self.proto[self.IP][self.UDP]))
801 # permit ip any any in the end
802 rules.append(self.create_rule(self.IPV4, self.PERMIT,
803 self.PORTS_ALL, 0))
804 rules.append(self.create_rule(self.IPV6, self.PERMIT,
805 self.PORTS_ALL, 0))
806
807 # Apply rules
808 self.apply_rules(rules, "deny ip4/ip6 udp")
809
810 # Traffic should not pass
811 self.run_verify_negat_test(self.IP, self.IPRANDOM,
812 self.proto[self.IP][self.UDP])
813
814 self.logger.info("ACLP_TEST_FINISH_0013")
815
816 def test_0014_acl_dump(self):
817 """ verify add/dump acls
818 """
819 self.logger.info("ACLP_TEST_START_0014")
820
821 r = [[self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
822 [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
823 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
824 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
825 [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
826 [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
827 [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
828 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
829 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
830 [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
831 [self.IPV4, self.DENY, self.PORTS_ALL, 0],
832 [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
833 [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
834 [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
835 [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
836 [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
837 [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
838 [self.IPV6, self.DENY, self.PORTS_ALL, 0]
839 ]
840
841 # Add and verify new ACLs
842 rules = []
843 for i in range(len(r)):
844 rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
845
846 reply = self.api_acl_add_replace(acl_index=4294967295, r=rules,
847 count=len(rules))
848 result = self.api_acl_dump(reply.acl_index)
849
850 i = 0
851 for drules in result:
852 for dr in drules.r:
853 self.assertEqual(dr.is_ipv6, r[i][0])
854 self.assertEqual(dr.is_permit, r[i][1])
855 self.assertEqual(dr.proto, r[i][3])
856
857 if r[i][2] > 0:
858 self.assertEqual(dr.srcport_or_icmptype_first, r[i][2])
859 else:
860 if r[i][2] < 0:
861 self.assertEqual(dr.srcport_or_icmptype_first, 0)
862 self.assertEqual(dr.srcport_or_icmptype_last, 65535)
863 else:
864 if dr.proto == self.proto[self.IP][self.TCP]:
865 self.assertGreater(dr.srcport_or_icmptype_first,
866 self.tcp_sport_from-1)
867 self.assertLess(dr.srcport_or_icmptype_first,
868 self.tcp_sport_to+1)
869 self.assertGreater(dr.dstport_or_icmpcode_last,
870 self.tcp_dport_from-1)
871 self.assertLess(dr.dstport_or_icmpcode_last,
872 self.tcp_dport_to+1)
873 elif dr.proto == self.proto[self.IP][self.UDP]:
874 self.assertGreater(dr.srcport_or_icmptype_first,
875 self.udp_sport_from-1)
876 self.assertLess(dr.srcport_or_icmptype_first,
877 self.udp_sport_to+1)
878 self.assertGreater(dr.dstport_or_icmpcode_last,
879 self.udp_dport_from-1)
880 self.assertLess(dr.dstport_or_icmpcode_last,
881 self.udp_dport_to+1)
882 i += 1
883
884 self.logger.info("ACLP_TEST_FINISH_0014")
885
886 def test_0015_tcp_permit_port_v4(self):
887 """ permit single TCPv4
888 """
889 self.logger.info("ACLP_TEST_START_0015")
890
891 port = random.randint(0, 65535)
892 # Add an ACL
893 rules = []
894 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
895 self.proto[self.IP][self.TCP]))
896 # deny ip any any in the end
897 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
898
899 # Apply rules
900 self.apply_rules(rules, "permit ip4 tcp "+str(port))
901
902 # Traffic should still pass
903 self.run_verify_test(self.IP, self.IPV4,
904 self.proto[self.IP][self.TCP], port)
905
906 self.logger.info("ACLP_TEST_FINISH_0015")
907
908 def test_0016_udp_permit_port_v4(self):
909 """ permit single UDPv4
910 """
911 self.logger.info("ACLP_TEST_START_0016")
912
913 port = random.randint(0, 65535)
914 # Add an ACL
915 rules = []
916 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
917 self.proto[self.IP][self.UDP]))
918 # deny ip any any in the end
919 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
920
921 # Apply rules
922 self.apply_rules(rules, "permit ip4 tcp "+str(port))
923
924 # Traffic should still pass
925 self.run_verify_test(self.IP, self.IPV4,
926 self.proto[self.IP][self.UDP], port)
927
928 self.logger.info("ACLP_TEST_FINISH_0016")
929
930 def test_0017_tcp_permit_port_v6(self):
931 """ permit single TCPv6
932 """
933 self.logger.info("ACLP_TEST_START_0017")
934
935 port = random.randint(0, 65535)
936 # Add an ACL
937 rules = []
938 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
939 self.proto[self.IP][self.TCP]))
940 # deny ip any any in the end
941 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
942
943 # Apply rules
944 self.apply_rules(rules, "permit ip4 tcp "+str(port))
945
946 # Traffic should still pass
947 self.run_verify_test(self.IP, self.IPV6,
948 self.proto[self.IP][self.TCP], port)
949
950 self.logger.info("ACLP_TEST_FINISH_0017")
951
952 def test_0018_udp_permit_port_v6(self):
953 """ permit single UPPv6
954 """
955 self.logger.info("ACLP_TEST_START_0018")
956
957 port = random.randint(0, 65535)
958 # Add an ACL
959 rules = []
960 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
961 self.proto[self.IP][self.UDP]))
962 # deny ip any any in the end
963 rules.append(self.create_rule(self.IPV6, self.DENY,
964 self.PORTS_ALL, 0))
965
966 # Apply rules
967 self.apply_rules(rules, "permit ip4 tcp "+str(port))
968
969 # Traffic should still pass
970 self.run_verify_test(self.IP, self.IPV6,
971 self.proto[self.IP][self.UDP], port)
972
973 self.logger.info("ACLP_TEST_FINISH_0018")
974
975 def test_0019_udp_deny_port(self):
976 """ deny single TCPv4/v6
977 """
978 self.logger.info("ACLP_TEST_START_0019")
979
980 port = random.randint(0, 65535)
981 # Add an ACL
982 rules = []
983 rules.append(self.create_rule(self.IPV4, self.DENY, port,
984 self.proto[self.IP][self.TCP]))
985 rules.append(self.create_rule(self.IPV6, self.DENY, port,
986 self.proto[self.IP][self.TCP]))
987 # Permit ip any any in the end
988 rules.append(self.create_rule(self.IPV4, self.PERMIT,
989 self.PORTS_ALL, 0))
990 rules.append(self.create_rule(self.IPV6, self.PERMIT,
991 self.PORTS_ALL, 0))
992
993 # Apply rules
994 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
995
996 # Traffic should not pass
997 self.run_verify_negat_test(self.IP, self.IPRANDOM,
998 self.proto[self.IP][self.TCP], port)
999
1000 self.logger.info("ACLP_TEST_FINISH_0019")
1001
1002 def test_0020_udp_deny_port(self):
1003 """ deny single UDPv4/v6
1004 """
1005 self.logger.info("ACLP_TEST_START_0020")
1006
1007 port = random.randint(0, 65535)
1008 # Add an ACL
1009 rules = []
1010 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1011 self.proto[self.IP][self.UDP]))
1012 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1013 self.proto[self.IP][self.UDP]))
1014 # Permit ip any any in the end
1015 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1016 self.PORTS_ALL, 0))
1017 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1018 self.PORTS_ALL, 0))
1019
1020 # Apply rules
1021 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1022
1023 # Traffic should not pass
1024 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1025 self.proto[self.IP][self.UDP], port)
1026
1027 self.logger.info("ACLP_TEST_FINISH_0020")
1028
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +00001029 def test_0021_udp_deny_port_verify_fragment_deny(self):
1030 """ deny single UDPv4/v6, permit ip any, verify non-initial fragment blocked
1031 """
1032 self.logger.info("ACLP_TEST_START_0021")
1033
1034 port = random.randint(0, 65535)
1035 # Add an ACL
1036 rules = []
1037 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1038 self.proto[self.IP][self.UDP]))
1039 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1040 self.proto[self.IP][self.UDP]))
1041 # deny ip any any in the end
1042 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1043 self.PORTS_ALL, 0))
1044 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1045 self.PORTS_ALL, 0))
1046
1047 # Apply rules
1048 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1049
1050 # Traffic should not pass
1051 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1052 self.proto[self.IP][self.UDP], port, True)
1053
1054 self.logger.info("ACLP_TEST_FINISH_0021")
1055
Pavel Kotuceke7b67342017-04-18 13:12:20 +02001056 def test_0022_zero_length_udp_ipv4(self):
1057 """ VPP-687 zero length udp ipv4 packet"""
1058 self.logger.info("ACLP_TEST_START_0022")
1059
1060 port = random.randint(0, 65535)
1061 # Add an ACL
1062 rules = []
1063 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
1064 self.proto[self.IP][self.UDP]))
1065 # deny ip any any in the end
1066 rules.append(
1067 self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1068
1069 # Apply rules
1070 self.apply_rules(rules, "permit empty udp ip4 " + str(port))
1071
1072 # Traffic should still pass
1073 # Create incoming packet streams for packet-generator interfaces
1074 pkts_cnt = 0
1075 pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1076 self.IP, self.IPV4,
1077 self.proto[self.IP][self.UDP], port,
1078 False, False)
1079 if len(pkts) > 0:
1080 self.pg0.add_stream(pkts)
1081 pkts_cnt += len(pkts)
1082
1083 # Enable packet capture and start packet sendingself.IPV
1084 self.pg_enable_capture(self.pg_interfaces)
1085 self.pg_start()
1086
1087 self.pg1.get_capture(pkts_cnt)
1088
1089 self.logger.info("ACLP_TEST_FINISH_0022")
1090
1091 def test_0023_zero_length_udp_ipv6(self):
1092 """ VPP-687 zero length udp ipv6 packet"""
1093 self.logger.info("ACLP_TEST_START_0023")
1094
1095 port = random.randint(0, 65535)
1096 # Add an ACL
1097 rules = []
1098 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1099 self.proto[self.IP][self.UDP]))
1100 # deny ip any any in the end
1101 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1102
1103 # Apply rules
1104 self.apply_rules(rules, "permit empty udp ip6 "+str(port))
1105
1106 # Traffic should still pass
1107 # Create incoming packet streams for packet-generator interfaces
1108 pkts_cnt = 0
1109 pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1110 self.IP, self.IPV6,
1111 self.proto[self.IP][self.UDP], port,
1112 False, False)
1113 if len(pkts) > 0:
1114 self.pg0.add_stream(pkts)
1115 pkts_cnt += len(pkts)
1116
1117 # Enable packet capture and start packet sendingself.IPV
1118 self.pg_enable_capture(self.pg_interfaces)
1119 self.pg_start()
1120
1121 # Verify outgoing packet streams per packet-generator interface
1122 self.pg1.get_capture(pkts_cnt)
1123
1124 self.logger.info("ACLP_TEST_FINISH_0023")
1125
Pavel Kotucek59dda062017-03-02 15:22:47 +01001126if __name__ == '__main__':
1127 unittest.main(testRunner=VppTestRunner)