blob: 2bbebe85b1b6c1a0ecfaa558105bcfa73e04548e [file] [log] [blame]
Pavel Kotucek59dda062017-03-02 15:22:47 +01001#!/usr/bin/env python
2"""ACL plugin Test Case HLD:
3"""
4
5import unittest
6import random
7
8from scapy.packet import Raw
9from scapy.layers.l2 import Ether
10from scapy.layers.inet import IP, TCP, UDP, ICMP
11from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
12from framework import VppTestCase, VppTestRunner
13from util import Host, ppp
14
15
16class TestACLplugin(VppTestCase):
17 """ ACL plugin Test Case """
18
19 # traffic types
20 IP = 0
21 ICMP = 1
22
23 # IP version
24 IPRANDOM = -1
25 IPV4 = 0
26 IPV6 = 1
27
28 # rule types
29 DENY = 0
30 PERMIT = 1
31
32 # supported protocols
33 proto = [[6, 17], [1, 58]]
34 proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
35 ICMPv4 = 0
36 ICMPv6 = 1
37 TCP = 0
38 UDP = 1
39 PROTO_ALL = 0
40
41 # port ranges
42 PORTS_ALL = -1
43 PORTS_RANGE = 0
44 udp_sport_from = 10
45 udp_sport_to = udp_sport_from + 5
46 udp_dport_from = 20000
47 udp_dport_to = udp_dport_from + 5000
48 tcp_sport_from = 30
49 tcp_sport_to = tcp_sport_from + 5
50 tcp_dport_from = 40000
51 tcp_dport_to = tcp_dport_from + 5000
52
53 icmp4_type = 8 # echo request
54 icmp4_code = 3
55 icmp6_type = 128 # echo request
56 icmp6_code = 3
57
58 # Test variables
59 bd_id = 1
60
61 @classmethod
62 def setUpClass(cls):
63 """
64 Perform standard class setup (defined by class method setUpClass in
65 class VppTestCase) before running the test case, set test case related
66 variables and configure VPP.
67 """
68 super(TestACLplugin, cls).setUpClass()
69
70 random.seed()
71
72 try:
73 # Create 2 pg interfaces
74 cls.create_pg_interfaces(range(2))
75
76 # Packet flows mapping pg0 -> pg1, pg2 etc.
77 cls.flows = dict()
78 cls.flows[cls.pg0] = [cls.pg1]
79
80 # Packet sizes
81 cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
82
83 # Create BD with MAC learning and unknown unicast flooding disabled
84 # and put interfaces to this BD
85 cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
86 learn=1)
87 for pg_if in cls.pg_interfaces:
88 cls.vapi.sw_interface_set_l2_bridge(pg_if.sw_if_index,
89 bd_id=cls.bd_id)
90
91 # Set up all interfaces
92 for i in cls.pg_interfaces:
93 i.admin_up()
94
95 # Mapping between packet-generator index and lists of test hosts
96 cls.hosts_by_pg_idx = dict()
97 for pg_if in cls.pg_interfaces:
98 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
99
100 # Create list of deleted hosts
101 cls.deleted_hosts_by_pg_idx = dict()
102 for pg_if in cls.pg_interfaces:
103 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
104
105 # warm-up the mac address tables
106 # self.warmup_test()
107
108 except Exception:
109 super(TestACLplugin, cls).tearDownClass()
110 raise
111
112 def setUp(self):
113 super(TestACLplugin, self).setUp()
114 self.reset_packet_infos()
115
116 def tearDown(self):
117 """
118 Show various debug prints after each test.
119 """
120 super(TestACLplugin, self).tearDown()
121 if not self.vpp_dead:
122 self.logger.info(self.vapi.ppcli("show l2fib verbose"))
123 self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
124 % self.bd_id))
125
126 def create_hosts(self, count, start=0):
127 """
128 Create required number of host MAC addresses and distribute them among
129 interfaces. Create host IPv4 address for every host MAC address.
130
131 :param int count: Number of hosts to create MAC/IPv4 addresses for.
132 :param int start: Number to start numbering from.
133 """
134 n_int = len(self.pg_interfaces)
135 macs_per_if = count / n_int
136 i = -1
137 for pg_if in self.pg_interfaces:
138 i += 1
139 start_nr = macs_per_if * i + start
140 end_nr = count + start if i == (n_int - 1) \
141 else macs_per_if * (i + 1) + start
142 hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
143 for j in range(start_nr, end_nr):
144 host = Host(
145 "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
146 "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
147 "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
148 hosts.append(host)
149
150 def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
151 s_prefix=0, s_ip='\x00\x00\x00\x00',
152 d_prefix=0, d_ip='\x00\x00\x00\x00'):
153 if proto == -1:
154 return
155 if ports == self.PORTS_ALL:
156 sport_from = 0
157 dport_from = 0
158 sport_to = 65535 if proto != 1 and proto != 58 else 255
159 dport_to = sport_to
160 elif ports == self.PORTS_RANGE:
161 if proto == 1:
162 sport_from = self.icmp4_type
163 sport_to = self.icmp4_type
164 dport_from = self.icmp4_code
165 dport_to = self.icmp4_code
166 elif proto == 58:
167 sport_from = self.icmp6_type
168 sport_to = self.icmp6_type
169 dport_from = self.icmp6_code
170 dport_to = self.icmp6_code
171 elif proto == self.proto[self.IP][self.TCP]:
172 sport_from = self.tcp_sport_from
173 sport_to = self.tcp_sport_to
174 dport_from = self.tcp_dport_from
175 dport_to = self.tcp_dport_to
176 elif proto == self.proto[self.IP][self.UDP]:
177 sport_from = self.udp_sport_from
178 sport_to = self.udp_sport_to
179 dport_from = self.udp_dport_from
180 dport_to = self.udp_dport_to
181 else:
182 sport_from = ports
183 sport_to = ports
184 dport_from = ports
185 dport_to = ports
186
187 rule = ({'is_permit': permit_deny, 'is_ipv6': ip, 'proto': proto,
188 'srcport_or_icmptype_first': sport_from,
189 'srcport_or_icmptype_last': sport_to,
190 'src_ip_prefix_len': s_prefix,
191 'src_ip_addr': s_ip,
192 'dstport_or_icmpcode_first': dport_from,
193 'dstport_or_icmpcode_last': dport_to,
194 'dst_ip_prefix_len': d_prefix,
195 'dst_ip_addr': d_ip})
196 return rule
197
198 def apply_rules(self, rules, tag=''):
199 reply = self.api_acl_add_replace(acl_index=4294967295, r=rules,
200 count=len(rules),
201 tag=tag)
202 self.logger.info("Dumped ACL: " + str(
203 self.api_acl_dump(reply.acl_index)))
204 # Apply a ACL on the interface as inbound
205 for i in self.pg_interfaces:
206 self.api_acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
207 count=1, n_input=1,
208 acls=[reply.acl_index])
209 return
210
211 def create_upper_layer(self, packet_index, proto, ports=0):
212 p = self.proto_map[proto]
213 if p == 'UDP':
214 if ports == 0:
215 return UDP(sport=random.randint(self.udp_sport_from,
216 self.udp_sport_to),
217 dport=random.randint(self.udp_dport_from,
218 self.udp_dport_to))
219 else:
220 return UDP(sport=ports, dport=ports)
221 elif p == 'TCP':
222 if ports == 0:
223 return TCP(sport=random.randint(self.tcp_sport_from,
224 self.tcp_sport_to),
225 dport=random.randint(self.tcp_dport_from,
226 self.tcp_dport_to))
227 else:
228 return TCP(sport=ports, dport=ports)
229 return ''
230
231 def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
232 proto=-1, ports=0):
233 """
234 Create input packet stream for defined interface using hosts or
235 deleted_hosts list.
236
237 :param object src_if: Interface to create packet stream for.
238 :param list packet_sizes: List of required packet sizes.
239 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
240 :return: Stream of packets.
241 """
242 pkts = []
243 if self.flows.__contains__(src_if):
244 src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
245 for dst_if in self.flows[src_if]:
246 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
247 n_int = len(dst_hosts) * len(src_hosts)
248 for i in range(0, n_int):
249 dst_host = dst_hosts[i / len(src_hosts)]
250 src_host = src_hosts[i % len(src_hosts)]
251 pkt_info = self.create_packet_info(src_if, dst_if)
252 if ipv6 == 1:
253 pkt_info.ip = 1
254 elif ipv6 == 0:
255 pkt_info.ip = 0
256 else:
257 pkt_info.ip = random.choice([0, 1])
258 if proto == -1:
259 pkt_info.proto = random.choice(self.proto[self.IP])
260 else:
261 pkt_info.proto = proto
262 payload = self.info_to_payload(pkt_info)
263 p = Ether(dst=dst_host.mac, src=src_host.mac)
264 if pkt_info.ip:
265 p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
266 else:
267 p /= IP(src=src_host.ip4, dst=dst_host.ip4)
268 if traffic_type == self.ICMP:
269 if pkt_info.ip:
270 p /= ICMPv6EchoRequest(type=self.icmp6_type,
271 code=self.icmp6_code)
272 else:
273 p /= ICMP(type=self.icmp4_type,
274 code=self.icmp4_code)
275 else:
276 p /= self.create_upper_layer(i, pkt_info.proto, ports)
277 p /= Raw(payload)
278 pkt_info.data = p.copy()
279 size = random.choice(packet_sizes)
280 self.extend_packet(p, size)
281 pkts.append(p)
282 return pkts
283
284 def verify_capture(self, pg_if, capture, traffic_type=0, ip_type=0):
285 """
286 Verify captured input packet stream for defined interface.
287
288 :param object pg_if: Interface to verify captured packet stream for.
289 :param list capture: Captured packet stream.
290 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
291 """
292 last_info = dict()
293 for i in self.pg_interfaces:
294 last_info[i.sw_if_index] = None
295 dst_sw_if_index = pg_if.sw_if_index
296 for packet in capture:
297 try:
298 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
299 if traffic_type == self.ICMP and ip_type == self.IPV6:
300 payload_info = self.payload_to_info(
301 packet[ICMPv6EchoRequest].data)
302 payload = packet[ICMPv6EchoRequest]
303 else:
304 payload_info = self.payload_to_info(str(packet[Raw]))
305 payload = packet[self.proto_map[payload_info.proto]]
306 except:
307 self.logger.error(ppp("Unexpected or invalid packet "
308 "(outside network):", packet))
309 raise
310
311 if ip_type != 0:
312 self.assertEqual(payload_info.ip, ip_type)
313 if traffic_type == self.ICMP:
314 try:
315 if payload_info.ip == 0:
316 self.assertEqual(payload.type, self.icmp4_type)
317 self.assertEqual(payload.code, self.icmp4_code)
318 else:
319 self.assertEqual(payload.type, self.icmp6_type)
320 self.assertEqual(payload.code, self.icmp6_code)
321 except:
322 self.logger.error(ppp("Unexpected or invalid packet "
323 "(outside network):", packet))
324 raise
325 else:
326 try:
327 ip_version = IPv6 if payload_info.ip == 1 else IP
328
329 ip = packet[ip_version]
330 packet_index = payload_info.index
331
332 self.assertEqual(payload_info.dst, dst_sw_if_index)
333 self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
334 (pg_if.name, payload_info.src,
335 packet_index))
336 next_info = self.get_next_packet_info_for_interface2(
337 payload_info.src, dst_sw_if_index,
338 last_info[payload_info.src])
339 last_info[payload_info.src] = next_info
340 self.assertTrue(next_info is not None)
341 self.assertEqual(packet_index, next_info.index)
342 saved_packet = next_info.data
343 # Check standard fields
344 self.assertEqual(ip.src, saved_packet[ip_version].src)
345 self.assertEqual(ip.dst, saved_packet[ip_version].dst)
346 p = self.proto_map[payload_info.proto]
347 if p == 'TCP':
348 tcp = packet[TCP]
349 self.assertEqual(tcp.sport, saved_packet[
350 TCP].sport)
351 self.assertEqual(tcp.dport, saved_packet[
352 TCP].dport)
353 elif p == 'UDP':
354 udp = packet[UDP]
355 self.assertEqual(udp.sport, saved_packet[
356 UDP].sport)
357 self.assertEqual(udp.dport, saved_packet[
358 UDP].dport)
359 except:
360 self.logger.error(ppp("Unexpected or invalid packet:",
361 packet))
362 raise
363 for i in self.pg_interfaces:
364 remaining_packet = self.get_next_packet_info_for_interface2(
365 i, dst_sw_if_index, last_info[i.sw_if_index])
366 self.assertTrue(
367 remaining_packet is None,
368 "Port %u: Packet expected from source %u didn't arrive" %
369 (dst_sw_if_index, i.sw_if_index))
370
371 def run_traffic_no_check(self):
372 # Test
373 # Create incoming packet streams for packet-generator interfaces
374 for i in self.pg_interfaces:
375 if self.flows.__contains__(i):
376 pkts = self.create_stream(i, self.pg_if_packet_sizes)
377 if len(pkts) > 0:
378 i.add_stream(pkts)
379
380 # Enable packet capture and start packet sending
381 self.pg_enable_capture(self.pg_interfaces)
382 self.pg_start()
383
384 def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0):
385 # Test
386 # Create incoming packet streams for packet-generator interfaces
387 pkts_cnt = 0
388 for i in self.pg_interfaces:
389 if self.flows.__contains__(i):
390 pkts = self.create_stream(i, self.pg_if_packet_sizes,
391 traffic_type, ip_type, proto, ports)
392 if len(pkts) > 0:
393 i.add_stream(pkts)
394 pkts_cnt += len(pkts)
395
396 # Enable packet capture and start packet sendingself.IPV
397 self.pg_enable_capture(self.pg_interfaces)
398 self.pg_start()
399
400 # Verify
401 # Verify outgoing packet streams per packet-generator interface
402 for src_if in self.pg_interfaces:
403 if self.flows.__contains__(src_if):
404 for dst_if in self.flows[src_if]:
405 capture = dst_if.get_capture(pkts_cnt)
406 self.logger.info("Verifying capture on interface %s" %
407 dst_if.name)
408 self.verify_capture(dst_if, capture, traffic_type, ip_type)
409
410 def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
411 ports=0):
412 # Test
413 self.reset_packet_infos()
414 for i in self.pg_interfaces:
415 if self.flows.__contains__(i):
416 pkts = self.create_stream(i, self.pg_if_packet_sizes,
417 traffic_type, ip_type, proto, ports)
418 if len(pkts) > 0:
419 i.add_stream(pkts)
420
421 # Enable packet capture and start packet sending
422 self.pg_enable_capture(self.pg_interfaces)
423 self.pg_start()
424
425 # Verify
426 # Verify outgoing packet streams per packet-generator interface
427 for src_if in self.pg_interfaces:
428 if self.flows.__contains__(src_if):
429 for dst_if in self.flows[src_if]:
430 self.logger.info("Verifying capture on interface %s" %
431 dst_if.name)
432 capture = dst_if.get_capture(0)
433 self.assertEqual(len(capture), 0)
434
435 def api_acl_add_replace(self, acl_index, r, count, tag='',
436 expected_retval=0):
437 """Add/replace an ACL
438
439 :param int acl_index: ACL index to replace,
440 4294967295 to create new ACL.
441 :param acl_rule r: ACL rules array.
442 :param str tag: symbolic tag (description) for this ACL.
443 :param int count: number of rules.
444 """
445 return self.vapi.api(self.vapi.papi.acl_add_replace,
446 {'acl_index': acl_index,
447 'r': r,
448 'count': count,
449 'tag': tag},
450 expected_retval=expected_retval)
451
452 def api_acl_interface_set_acl_list(self, sw_if_index, count, n_input, acls,
453 expected_retval=0):
454 return self.vapi.api(self.vapi.papi.acl_interface_set_acl_list,
455 {'sw_if_index': sw_if_index,
456 'count': count,
457 'n_input': n_input,
458 'acls': acls},
459 expected_retval=expected_retval)
460
461 def api_acl_dump(self, acl_index, expected_retval=0):
462 return self.vapi.api(self.vapi.papi.acl_dump,
463 {'acl_index': acl_index},
464 expected_retval=expected_retval)
465
466 def test_0000_warmup_test(self):
467 """ ACL plugin version check; learn MACs
468 """
469 self.create_hosts(16)
470 self.run_traffic_no_check()
471 reply = self.vapi.papi.acl_plugin_get_version()
472 self.assertEqual(reply.major, 1)
473 self.logger.info("Working with ACL plugin version: %d.%d" % (
474 reply.major, reply.minor))
475 # minor version changes are non breaking
476 # self.assertEqual(reply.minor, 0)
477
478 def test_0001_acl_create(self):
479 """ ACL create test
480 """
481
482 self.logger.info("ACLP_TEST_START_0001")
483 # Add an ACL
484 r = [{'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
485 'srcport_or_icmptype_first': 1234,
486 'srcport_or_icmptype_last': 1235,
487 'src_ip_prefix_len': 0,
488 'src_ip_addr': '\x00\x00\x00\x00',
489 'dstport_or_icmpcode_first': 1234,
490 'dstport_or_icmpcode_last': 1234,
491 'dst_ip_addr': '\x00\x00\x00\x00',
492 'dst_ip_prefix_len': 0}]
493 # Test 1: add a new ACL
494 reply = self.api_acl_add_replace(acl_index=4294967295, r=r,
495 count=len(r), tag="permit 1234")
496 self.assertEqual(reply.retval, 0)
497 # The very first ACL gets #0
498 self.assertEqual(reply.acl_index, 0)
499 rr = self.api_acl_dump(reply.acl_index)
500 self.logger.info("Dumped ACL: " + str(rr))
501 self.assertEqual(len(rr), 1)
502 # We should have the same number of ACL entries as we had asked
503 self.assertEqual(len(rr[0].r), len(r))
504 # The rules should be the same. But because the submitted and returned
505 # are different types, we need to iterate over rules and keys to get
506 # to basic values.
507 for i_rule in range(0, len(r) - 1):
508 for rule_key in r[i_rule]:
509 self.assertEqual(rr[0].r[i_rule][rule_key],
510 r[i_rule][rule_key])
511
512 # Add a deny-1234 ACL
513 r_deny = ({'is_permit': 0, 'is_ipv6': 0, 'proto': 17,
514 'srcport_or_icmptype_first': 1234,
515 'srcport_or_icmptype_last': 1235,
516 'src_ip_prefix_len': 0,
517 'src_ip_addr': '\x00\x00\x00\x00',
518 'dstport_or_icmpcode_first': 1234,
519 'dstport_or_icmpcode_last': 1234,
520 'dst_ip_addr': '\x00\x00\x00\x00',
521 'dst_ip_prefix_len': 0},
522 {'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
523 'srcport_or_icmptype_first': 0,
524 'srcport_or_icmptype_last': 0,
525 'src_ip_prefix_len': 0,
526 'src_ip_addr': '\x00\x00\x00\x00',
527 'dstport_or_icmpcode_first': 0,
528 'dstport_or_icmpcode_last': 0,
529 'dst_ip_addr': '\x00\x00\x00\x00',
530 'dst_ip_prefix_len': 0})
531
532 reply = self.api_acl_add_replace(acl_index=4294967295, r=r_deny,
533 count=len(r_deny),
534 tag="deny 1234;permit all")
535 self.assertEqual(reply.retval, 0)
536 # The second ACL gets #1
537 self.assertEqual(reply.acl_index, 1)
538
539 # Test 2: try to modify a nonexistent ACL
540 reply = self.api_acl_add_replace(acl_index=432, r=r, count=len(r),
541 tag="FFFF:FFFF", expected_retval=-1)
542 self.assertEqual(reply.retval, -1)
543 # The ACL number should pass through
544 self.assertEqual(reply.acl_index, 432)
545
546 self.logger.info("ACLP_TEST_FINISH_0001")
547
548 def test_0002_acl_permit_apply(self):
549 """ permit ACL apply test
550 """
551 self.logger.info("ACLP_TEST_START_0002")
552
553 rules = []
554 rules.append(self.create_rule(self.IPV4, self.PERMIT,
555 0, self.proto[self.IP][self.UDP]))
556 rules.append(self.create_rule(self.IPV4, self.PERMIT,
557 0, self.proto[self.IP][self.TCP]))
558
559 # Apply rules
560 self.apply_rules(rules, "permit per-flow")
561
562 # Traffic should still pass
563 self.run_verify_test(self.IP, self.IPV4, -1)
564 self.logger.info("ACLP_TEST_FINISH_0002")
565
566 def test_0003_acl_deny_apply(self):
567 """ deny ACL apply test
568 """
569 self.logger.info("ACLP_TEST_START_0003")
570 # Add a deny-flows ACL
571 rules = []
572 rules.append(self.create_rule(self.IPV4, self.DENY,
573 self.PORTS_ALL, self.proto[self.IP][self.UDP]))
574 # Permit ip any any in the end
575 rules.append(self.create_rule(self.IPV4, self.PERMIT,
576 self.PORTS_ALL, 0))
577
578 # Apply rules
579 self.apply_rules(rules, "deny per-flow;permit all")
580
581 # Traffic should not pass
582 self.run_verify_negat_test(self.IP, self.IPV4,
583 self.proto[self.IP][self.UDP])
584 self.logger.info("ACLP_TEST_FINISH_0003")
585 # self.assertEqual(1, 0)
586
587 def test_0004_vpp624_permit_icmpv4(self):
588 """ VPP_624 permit ICMPv4
589 """
590 self.logger.info("ACLP_TEST_START_0004")
591
592 # Add an ACL
593 rules = []
594 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
595 self.proto[self.ICMP][self.ICMPv4]))
596 # deny ip any any in the end
597 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
598
599 # Apply rules
600 self.apply_rules(rules, "permit icmpv4")
601
602 # Traffic should still pass
603 self.run_verify_test(self.ICMP, self.IPV4,
604 self.proto[self.ICMP][self.ICMPv4])
605
606 self.logger.info("ACLP_TEST_FINISH_0004")
607
608 def test_0005_vpp624_permit_icmpv6(self):
609 """ VPP_624 permit ICMPv6
610 """
611 self.logger.info("ACLP_TEST_START_0005")
612
613 # Add an ACL
614 rules = []
615 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
616 self.proto[self.ICMP][self.ICMPv6]))
617 # deny ip any any in the end
618 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
619
620 # Apply rules
621 self.apply_rules(rules, "permit icmpv6")
622
623 # Traffic should still pass
624 self.run_verify_test(self.ICMP, self.IPV6,
625 self.proto[self.ICMP][self.ICMPv6])
626
627 self.logger.info("ACLP_TEST_FINISH_0005")
628
629 def test_0006_vpp624_deny_icmpv4(self):
630 """ VPP_624 deny ICMPv4
631 """
632 self.logger.info("ACLP_TEST_START_0006")
633 # Add an ACL
634 rules = []
635 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
636 self.proto[self.ICMP][self.ICMPv4]))
637 # permit ip any any in the end
638 rules.append(self.create_rule(self.IPV4, self.PERMIT,
639 self.PORTS_ALL, 0))
640
641 # Apply rules
642 self.apply_rules(rules, "deny icmpv4")
643
644 # Traffic should not pass
645 self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
646
647 self.logger.info("ACLP_TEST_FINISH_0006")
648
649 def test_0007_vpp624_deny_icmpv6(self):
650 """ VPP_624 deny ICMPv6
651 """
652 self.logger.info("ACLP_TEST_START_0007")
653 # Add an ACL
654 rules = []
655 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
656 self.proto[self.ICMP][self.ICMPv6]))
657 # deny ip any any in the end
658 rules.append(self.create_rule(self.IPV6, self.PERMIT,
659 self.PORTS_ALL, 0))
660
661 # Apply rules
662 self.apply_rules(rules, "deny icmpv6")
663
664 # Traffic should not pass
665 self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
666
667 self.logger.info("ACLP_TEST_FINISH_0007")
668
669 def test_0008_tcp_permit_v4(self):
670 """ permit TCPv4
671 """
672 self.logger.info("ACLP_TEST_START_0008")
673
674 # Add an ACL
675 rules = []
676 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
677 self.proto[self.IP][self.TCP]))
678 # deny ip any any in the end
679 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
680
681 # Apply rules
682 self.apply_rules(rules, "permit ipv4 tcp")
683
684 # Traffic should still pass
685 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
686
687 self.logger.info("ACLP_TEST_FINISH_0008")
688
689 def test_0009_tcp_permit_v6(self):
690 """ permit TCPv6
691 """
692 self.logger.info("ACLP_TEST_START_0009")
693
694 # Add an ACL
695 rules = []
696 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
697 self.proto[self.IP][self.TCP]))
698 # deny ip any any in the end
699 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
700
701 # Apply rules
702 self.apply_rules(rules, "permit ip6 tcp")
703
704 # Traffic should still pass
705 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
706
707 self.logger.info("ACLP_TEST_FINISH_0008")
708
709 def test_0010_udp_permit_v4(self):
710 """ permit UDPv4
711 """
712 self.logger.info("ACLP_TEST_START_0010")
713
714 # Add an ACL
715 rules = []
716 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
717 self.proto[self.IP][self.UDP]))
718 # deny ip any any in the end
719 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
720
721 # Apply rules
722 self.apply_rules(rules, "permit ipv udp")
723
724 # Traffic should still pass
725 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
726
727 self.logger.info("ACLP_TEST_FINISH_0010")
728
729 def test_0011_udp_permit_v6(self):
730 """ permit UDPv6
731 """
732 self.logger.info("ACLP_TEST_START_0011")
733
734 # Add an ACL
735 rules = []
736 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
737 self.proto[self.IP][self.UDP]))
738 # deny ip any any in the end
739 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
740
741 # Apply rules
742 self.apply_rules(rules, "permit ip6 udp")
743
744 # Traffic should still pass
745 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
746
747 self.logger.info("ACLP_TEST_FINISH_0011")
748
749 def test_0012_tcp_deny(self):
750 """ deny TCPv4/v6
751 """
752 self.logger.info("ACLP_TEST_START_0012")
753
754 # Add an ACL
755 rules = []
756 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
757 self.proto[self.IP][self.TCP]))
758 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
759 self.proto[self.IP][self.TCP]))
760 # permit ip any any in the end
761 rules.append(self.create_rule(self.IPV4, self.PERMIT,
762 self.PORTS_ALL, 0))
763 rules.append(self.create_rule(self.IPV6, self.PERMIT,
764 self.PORTS_ALL, 0))
765
766 # Apply rules
767 self.apply_rules(rules, "deny ip4/ip6 tcp")
768
769 # Traffic should not pass
770 self.run_verify_negat_test(self.IP, self.IPRANDOM,
771 self.proto[self.IP][self.TCP])
772
773 self.logger.info("ACLP_TEST_FINISH_0012")
774
775 def test_0013_udp_deny(self):
776 """ deny UDPv4/v6
777 """
778 self.logger.info("ACLP_TEST_START_0013")
779
780 # Add an ACL
781 rules = []
782 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
783 self.proto[self.IP][self.UDP]))
784 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
785 self.proto[self.IP][self.UDP]))
786 # permit ip any any in the end
787 rules.append(self.create_rule(self.IPV4, self.PERMIT,
788 self.PORTS_ALL, 0))
789 rules.append(self.create_rule(self.IPV6, self.PERMIT,
790 self.PORTS_ALL, 0))
791
792 # Apply rules
793 self.apply_rules(rules, "deny ip4/ip6 udp")
794
795 # Traffic should not pass
796 self.run_verify_negat_test(self.IP, self.IPRANDOM,
797 self.proto[self.IP][self.UDP])
798
799 self.logger.info("ACLP_TEST_FINISH_0013")
800
801 def test_0014_acl_dump(self):
802 """ verify add/dump acls
803 """
804 self.logger.info("ACLP_TEST_START_0014")
805
806 r = [[self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
807 [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
808 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
809 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
810 [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
811 [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
812 [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
813 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
814 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
815 [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
816 [self.IPV4, self.DENY, self.PORTS_ALL, 0],
817 [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
818 [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
819 [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
820 [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
821 [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
822 [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
823 [self.IPV6, self.DENY, self.PORTS_ALL, 0]
824 ]
825
826 # Add and verify new ACLs
827 rules = []
828 for i in range(len(r)):
829 rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
830
831 reply = self.api_acl_add_replace(acl_index=4294967295, r=rules,
832 count=len(rules))
833 result = self.api_acl_dump(reply.acl_index)
834
835 i = 0
836 for drules in result:
837 for dr in drules.r:
838 self.assertEqual(dr.is_ipv6, r[i][0])
839 self.assertEqual(dr.is_permit, r[i][1])
840 self.assertEqual(dr.proto, r[i][3])
841
842 if r[i][2] > 0:
843 self.assertEqual(dr.srcport_or_icmptype_first, r[i][2])
844 else:
845 if r[i][2] < 0:
846 self.assertEqual(dr.srcport_or_icmptype_first, 0)
847 self.assertEqual(dr.srcport_or_icmptype_last, 65535)
848 else:
849 if dr.proto == self.proto[self.IP][self.TCP]:
850 self.assertGreater(dr.srcport_or_icmptype_first,
851 self.tcp_sport_from-1)
852 self.assertLess(dr.srcport_or_icmptype_first,
853 self.tcp_sport_to+1)
854 self.assertGreater(dr.dstport_or_icmpcode_last,
855 self.tcp_dport_from-1)
856 self.assertLess(dr.dstport_or_icmpcode_last,
857 self.tcp_dport_to+1)
858 elif dr.proto == self.proto[self.IP][self.UDP]:
859 self.assertGreater(dr.srcport_or_icmptype_first,
860 self.udp_sport_from-1)
861 self.assertLess(dr.srcport_or_icmptype_first,
862 self.udp_sport_to+1)
863 self.assertGreater(dr.dstport_or_icmpcode_last,
864 self.udp_dport_from-1)
865 self.assertLess(dr.dstport_or_icmpcode_last,
866 self.udp_dport_to+1)
867 i += 1
868
869 self.logger.info("ACLP_TEST_FINISH_0014")
870
871 def test_0015_tcp_permit_port_v4(self):
872 """ permit single TCPv4
873 """
874 self.logger.info("ACLP_TEST_START_0015")
875
876 port = random.randint(0, 65535)
877 # Add an ACL
878 rules = []
879 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
880 self.proto[self.IP][self.TCP]))
881 # deny ip any any in the end
882 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
883
884 # Apply rules
885 self.apply_rules(rules, "permit ip4 tcp "+str(port))
886
887 # Traffic should still pass
888 self.run_verify_test(self.IP, self.IPV4,
889 self.proto[self.IP][self.TCP], port)
890
891 self.logger.info("ACLP_TEST_FINISH_0015")
892
893 def test_0016_udp_permit_port_v4(self):
894 """ permit single UDPv4
895 """
896 self.logger.info("ACLP_TEST_START_0016")
897
898 port = random.randint(0, 65535)
899 # Add an ACL
900 rules = []
901 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
902 self.proto[self.IP][self.UDP]))
903 # deny ip any any in the end
904 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
905
906 # Apply rules
907 self.apply_rules(rules, "permit ip4 tcp "+str(port))
908
909 # Traffic should still pass
910 self.run_verify_test(self.IP, self.IPV4,
911 self.proto[self.IP][self.UDP], port)
912
913 self.logger.info("ACLP_TEST_FINISH_0016")
914
915 def test_0017_tcp_permit_port_v6(self):
916 """ permit single TCPv6
917 """
918 self.logger.info("ACLP_TEST_START_0017")
919
920 port = random.randint(0, 65535)
921 # Add an ACL
922 rules = []
923 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
924 self.proto[self.IP][self.TCP]))
925 # deny ip any any in the end
926 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
927
928 # Apply rules
929 self.apply_rules(rules, "permit ip4 tcp "+str(port))
930
931 # Traffic should still pass
932 self.run_verify_test(self.IP, self.IPV6,
933 self.proto[self.IP][self.TCP], port)
934
935 self.logger.info("ACLP_TEST_FINISH_0017")
936
937 def test_0018_udp_permit_port_v6(self):
938 """ permit single UPPv6
939 """
940 self.logger.info("ACLP_TEST_START_0018")
941
942 port = random.randint(0, 65535)
943 # Add an ACL
944 rules = []
945 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
946 self.proto[self.IP][self.UDP]))
947 # deny ip any any in the end
948 rules.append(self.create_rule(self.IPV6, self.DENY,
949 self.PORTS_ALL, 0))
950
951 # Apply rules
952 self.apply_rules(rules, "permit ip4 tcp "+str(port))
953
954 # Traffic should still pass
955 self.run_verify_test(self.IP, self.IPV6,
956 self.proto[self.IP][self.UDP], port)
957
958 self.logger.info("ACLP_TEST_FINISH_0018")
959
960 def test_0019_udp_deny_port(self):
961 """ deny single TCPv4/v6
962 """
963 self.logger.info("ACLP_TEST_START_0019")
964
965 port = random.randint(0, 65535)
966 # Add an ACL
967 rules = []
968 rules.append(self.create_rule(self.IPV4, self.DENY, port,
969 self.proto[self.IP][self.TCP]))
970 rules.append(self.create_rule(self.IPV6, self.DENY, port,
971 self.proto[self.IP][self.TCP]))
972 # Permit ip any any in the end
973 rules.append(self.create_rule(self.IPV4, self.PERMIT,
974 self.PORTS_ALL, 0))
975 rules.append(self.create_rule(self.IPV6, self.PERMIT,
976 self.PORTS_ALL, 0))
977
978 # Apply rules
979 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
980
981 # Traffic should not pass
982 self.run_verify_negat_test(self.IP, self.IPRANDOM,
983 self.proto[self.IP][self.TCP], port)
984
985 self.logger.info("ACLP_TEST_FINISH_0019")
986
987 def test_0020_udp_deny_port(self):
988 """ deny single UDPv4/v6
989 """
990 self.logger.info("ACLP_TEST_START_0020")
991
992 port = random.randint(0, 65535)
993 # Add an ACL
994 rules = []
995 rules.append(self.create_rule(self.IPV4, self.DENY, port,
996 self.proto[self.IP][self.UDP]))
997 rules.append(self.create_rule(self.IPV6, self.DENY, port,
998 self.proto[self.IP][self.UDP]))
999 # Permit ip any any in the end
1000 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1001 self.PORTS_ALL, 0))
1002 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1003 self.PORTS_ALL, 0))
1004
1005 # Apply rules
1006 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1007
1008 # Traffic should not pass
1009 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1010 self.proto[self.IP][self.UDP], port)
1011
1012 self.logger.info("ACLP_TEST_FINISH_0020")
1013
1014if __name__ == '__main__':
1015 unittest.main(testRunner=VppTestRunner)