blob: 5267cd27d989ab51407affc5fe67c718b452031a [file] [log] [blame]
Pavel Kotucek59dda062017-03-02 15:22:47 +01001#!/usr/bin/env python
2"""ACL plugin Test Case HLD:
3"""
4
5import unittest
6import random
7
8from scapy.packet import Raw
9from scapy.layers.l2 import Ether
10from scapy.layers.inet import IP, TCP, UDP, ICMP
11from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +000012from scapy.layers.inet6 import IPv6ExtHdrFragment
Pavel Kotucek59dda062017-03-02 15:22:47 +010013from framework import VppTestCase, VppTestRunner
14from util import Host, ppp
15
16
17class TestACLplugin(VppTestCase):
18 """ ACL plugin Test Case """
19
20 # traffic types
21 IP = 0
22 ICMP = 1
23
24 # IP version
25 IPRANDOM = -1
26 IPV4 = 0
27 IPV6 = 1
28
29 # rule types
30 DENY = 0
31 PERMIT = 1
32
33 # supported protocols
34 proto = [[6, 17], [1, 58]]
35 proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
36 ICMPv4 = 0
37 ICMPv6 = 1
38 TCP = 0
39 UDP = 1
40 PROTO_ALL = 0
41
42 # port ranges
43 PORTS_ALL = -1
44 PORTS_RANGE = 0
45 udp_sport_from = 10
46 udp_sport_to = udp_sport_from + 5
47 udp_dport_from = 20000
48 udp_dport_to = udp_dport_from + 5000
49 tcp_sport_from = 30
50 tcp_sport_to = tcp_sport_from + 5
51 tcp_dport_from = 40000
52 tcp_dport_to = tcp_dport_from + 5000
53
54 icmp4_type = 8 # echo request
55 icmp4_code = 3
56 icmp6_type = 128 # echo request
57 icmp6_code = 3
58
59 # Test variables
60 bd_id = 1
61
62 @classmethod
63 def setUpClass(cls):
64 """
65 Perform standard class setup (defined by class method setUpClass in
66 class VppTestCase) before running the test case, set test case related
67 variables and configure VPP.
68 """
69 super(TestACLplugin, cls).setUpClass()
70
71 random.seed()
72
73 try:
74 # Create 2 pg interfaces
75 cls.create_pg_interfaces(range(2))
76
77 # Packet flows mapping pg0 -> pg1, pg2 etc.
78 cls.flows = dict()
79 cls.flows[cls.pg0] = [cls.pg1]
80
81 # Packet sizes
82 cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
83
84 # Create BD with MAC learning and unknown unicast flooding disabled
85 # and put interfaces to this BD
86 cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
87 learn=1)
88 for pg_if in cls.pg_interfaces:
89 cls.vapi.sw_interface_set_l2_bridge(pg_if.sw_if_index,
90 bd_id=cls.bd_id)
91
92 # Set up all interfaces
93 for i in cls.pg_interfaces:
94 i.admin_up()
95
96 # Mapping between packet-generator index and lists of test hosts
97 cls.hosts_by_pg_idx = dict()
98 for pg_if in cls.pg_interfaces:
99 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
100
101 # Create list of deleted hosts
102 cls.deleted_hosts_by_pg_idx = dict()
103 for pg_if in cls.pg_interfaces:
104 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
105
106 # warm-up the mac address tables
107 # self.warmup_test()
108
109 except Exception:
110 super(TestACLplugin, cls).tearDownClass()
111 raise
112
113 def setUp(self):
114 super(TestACLplugin, self).setUp()
115 self.reset_packet_infos()
116
117 def tearDown(self):
118 """
119 Show various debug prints after each test.
120 """
121 super(TestACLplugin, self).tearDown()
122 if not self.vpp_dead:
123 self.logger.info(self.vapi.ppcli("show l2fib verbose"))
124 self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
125 % self.bd_id))
126
127 def create_hosts(self, count, start=0):
128 """
129 Create required number of host MAC addresses and distribute them among
130 interfaces. Create host IPv4 address for every host MAC address.
131
132 :param int count: Number of hosts to create MAC/IPv4 addresses for.
133 :param int start: Number to start numbering from.
134 """
135 n_int = len(self.pg_interfaces)
136 macs_per_if = count / n_int
137 i = -1
138 for pg_if in self.pg_interfaces:
139 i += 1
140 start_nr = macs_per_if * i + start
141 end_nr = count + start if i == (n_int - 1) \
142 else macs_per_if * (i + 1) + start
143 hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
144 for j in range(start_nr, end_nr):
145 host = Host(
146 "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
147 "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
148 "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
149 hosts.append(host)
150
151 def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
152 s_prefix=0, s_ip='\x00\x00\x00\x00',
153 d_prefix=0, d_ip='\x00\x00\x00\x00'):
154 if proto == -1:
155 return
156 if ports == self.PORTS_ALL:
157 sport_from = 0
158 dport_from = 0
159 sport_to = 65535 if proto != 1 and proto != 58 else 255
160 dport_to = sport_to
161 elif ports == self.PORTS_RANGE:
162 if proto == 1:
163 sport_from = self.icmp4_type
164 sport_to = self.icmp4_type
165 dport_from = self.icmp4_code
166 dport_to = self.icmp4_code
167 elif proto == 58:
168 sport_from = self.icmp6_type
169 sport_to = self.icmp6_type
170 dport_from = self.icmp6_code
171 dport_to = self.icmp6_code
172 elif proto == self.proto[self.IP][self.TCP]:
173 sport_from = self.tcp_sport_from
174 sport_to = self.tcp_sport_to
175 dport_from = self.tcp_dport_from
176 dport_to = self.tcp_dport_to
177 elif proto == self.proto[self.IP][self.UDP]:
178 sport_from = self.udp_sport_from
179 sport_to = self.udp_sport_to
180 dport_from = self.udp_dport_from
181 dport_to = self.udp_dport_to
182 else:
183 sport_from = ports
184 sport_to = ports
185 dport_from = ports
186 dport_to = ports
187
188 rule = ({'is_permit': permit_deny, 'is_ipv6': ip, 'proto': proto,
189 'srcport_or_icmptype_first': sport_from,
190 'srcport_or_icmptype_last': sport_to,
191 'src_ip_prefix_len': s_prefix,
192 'src_ip_addr': s_ip,
193 'dstport_or_icmpcode_first': dport_from,
194 'dstport_or_icmpcode_last': dport_to,
195 'dst_ip_prefix_len': d_prefix,
196 'dst_ip_addr': d_ip})
197 return rule
198
199 def apply_rules(self, rules, tag=''):
200 reply = self.api_acl_add_replace(acl_index=4294967295, r=rules,
201 count=len(rules),
202 tag=tag)
203 self.logger.info("Dumped ACL: " + str(
204 self.api_acl_dump(reply.acl_index)))
205 # Apply a ACL on the interface as inbound
206 for i in self.pg_interfaces:
207 self.api_acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
208 count=1, n_input=1,
209 acls=[reply.acl_index])
210 return
211
212 def create_upper_layer(self, packet_index, proto, ports=0):
213 p = self.proto_map[proto]
214 if p == 'UDP':
215 if ports == 0:
216 return UDP(sport=random.randint(self.udp_sport_from,
217 self.udp_sport_to),
218 dport=random.randint(self.udp_dport_from,
219 self.udp_dport_to))
220 else:
221 return UDP(sport=ports, dport=ports)
222 elif p == 'TCP':
223 if ports == 0:
224 return TCP(sport=random.randint(self.tcp_sport_from,
225 self.tcp_sport_to),
226 dport=random.randint(self.tcp_dport_from,
227 self.tcp_dport_to))
228 else:
229 return TCP(sport=ports, dport=ports)
230 return ''
231
232 def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
Pavel Kotuceke7b67342017-04-18 13:12:20 +0200233 proto=-1, ports=0, fragments=False, pkt_raw=True):
Pavel Kotucek59dda062017-03-02 15:22:47 +0100234 """
235 Create input packet stream for defined interface using hosts or
236 deleted_hosts list.
237
238 :param object src_if: Interface to create packet stream for.
239 :param list packet_sizes: List of required packet sizes.
240 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
241 :return: Stream of packets.
242 """
243 pkts = []
244 if self.flows.__contains__(src_if):
245 src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
246 for dst_if in self.flows[src_if]:
247 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
248 n_int = len(dst_hosts) * len(src_hosts)
249 for i in range(0, n_int):
250 dst_host = dst_hosts[i / len(src_hosts)]
251 src_host = src_hosts[i % len(src_hosts)]
252 pkt_info = self.create_packet_info(src_if, dst_if)
253 if ipv6 == 1:
254 pkt_info.ip = 1
255 elif ipv6 == 0:
256 pkt_info.ip = 0
257 else:
258 pkt_info.ip = random.choice([0, 1])
259 if proto == -1:
260 pkt_info.proto = random.choice(self.proto[self.IP])
261 else:
262 pkt_info.proto = proto
263 payload = self.info_to_payload(pkt_info)
264 p = Ether(dst=dst_host.mac, src=src_host.mac)
265 if pkt_info.ip:
266 p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000267 if fragments:
268 p /= IPv6ExtHdrFragment(offset=64, m=1)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100269 else:
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000270 if fragments:
271 p /= IP(src=src_host.ip4, dst=dst_host.ip4,
272 flags=1, frag=64)
273 else:
274 p /= IP(src=src_host.ip4, dst=dst_host.ip4)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100275 if traffic_type == self.ICMP:
276 if pkt_info.ip:
277 p /= ICMPv6EchoRequest(type=self.icmp6_type,
278 code=self.icmp6_code)
279 else:
280 p /= ICMP(type=self.icmp4_type,
281 code=self.icmp4_code)
282 else:
283 p /= self.create_upper_layer(i, pkt_info.proto, ports)
Pavel Kotuceke7b67342017-04-18 13:12:20 +0200284 if pkt_raw:
285 p /= Raw(payload)
286 pkt_info.data = p.copy()
287 if pkt_raw:
288 size = random.choice(packet_sizes)
289 self.extend_packet(p, size)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100290 pkts.append(p)
291 return pkts
292
293 def verify_capture(self, pg_if, capture, traffic_type=0, ip_type=0):
294 """
295 Verify captured input packet stream for defined interface.
296
297 :param object pg_if: Interface to verify captured packet stream for.
298 :param list capture: Captured packet stream.
299 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
300 """
301 last_info = dict()
302 for i in self.pg_interfaces:
303 last_info[i.sw_if_index] = None
304 dst_sw_if_index = pg_if.sw_if_index
305 for packet in capture:
306 try:
307 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
308 if traffic_type == self.ICMP and ip_type == self.IPV6:
309 payload_info = self.payload_to_info(
310 packet[ICMPv6EchoRequest].data)
311 payload = packet[ICMPv6EchoRequest]
312 else:
313 payload_info = self.payload_to_info(str(packet[Raw]))
314 payload = packet[self.proto_map[payload_info.proto]]
315 except:
316 self.logger.error(ppp("Unexpected or invalid packet "
317 "(outside network):", packet))
318 raise
319
320 if ip_type != 0:
321 self.assertEqual(payload_info.ip, ip_type)
322 if traffic_type == self.ICMP:
323 try:
324 if payload_info.ip == 0:
325 self.assertEqual(payload.type, self.icmp4_type)
326 self.assertEqual(payload.code, self.icmp4_code)
327 else:
328 self.assertEqual(payload.type, self.icmp6_type)
329 self.assertEqual(payload.code, self.icmp6_code)
330 except:
331 self.logger.error(ppp("Unexpected or invalid packet "
332 "(outside network):", packet))
333 raise
334 else:
335 try:
336 ip_version = IPv6 if payload_info.ip == 1 else IP
337
338 ip = packet[ip_version]
339 packet_index = payload_info.index
340
341 self.assertEqual(payload_info.dst, dst_sw_if_index)
342 self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
343 (pg_if.name, payload_info.src,
344 packet_index))
345 next_info = self.get_next_packet_info_for_interface2(
346 payload_info.src, dst_sw_if_index,
347 last_info[payload_info.src])
348 last_info[payload_info.src] = next_info
349 self.assertTrue(next_info is not None)
350 self.assertEqual(packet_index, next_info.index)
351 saved_packet = next_info.data
352 # Check standard fields
353 self.assertEqual(ip.src, saved_packet[ip_version].src)
354 self.assertEqual(ip.dst, saved_packet[ip_version].dst)
355 p = self.proto_map[payload_info.proto]
356 if p == 'TCP':
357 tcp = packet[TCP]
358 self.assertEqual(tcp.sport, saved_packet[
359 TCP].sport)
360 self.assertEqual(tcp.dport, saved_packet[
361 TCP].dport)
362 elif p == 'UDP':
363 udp = packet[UDP]
364 self.assertEqual(udp.sport, saved_packet[
365 UDP].sport)
366 self.assertEqual(udp.dport, saved_packet[
367 UDP].dport)
368 except:
369 self.logger.error(ppp("Unexpected or invalid packet:",
370 packet))
371 raise
372 for i in self.pg_interfaces:
373 remaining_packet = self.get_next_packet_info_for_interface2(
374 i, dst_sw_if_index, last_info[i.sw_if_index])
375 self.assertTrue(
376 remaining_packet is None,
377 "Port %u: Packet expected from source %u didn't arrive" %
378 (dst_sw_if_index, i.sw_if_index))
379
380 def run_traffic_no_check(self):
381 # Test
382 # Create incoming packet streams for packet-generator interfaces
383 for i in self.pg_interfaces:
384 if self.flows.__contains__(i):
385 pkts = self.create_stream(i, self.pg_if_packet_sizes)
386 if len(pkts) > 0:
387 i.add_stream(pkts)
388
389 # Enable packet capture and start packet sending
390 self.pg_enable_capture(self.pg_interfaces)
391 self.pg_start()
392
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000393 def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
Pavel Kotuceke7b67342017-04-18 13:12:20 +0200394 frags=False, pkt_raw=True):
Pavel Kotucek59dda062017-03-02 15:22:47 +0100395 # Test
396 # Create incoming packet streams for packet-generator interfaces
397 pkts_cnt = 0
398 for i in self.pg_interfaces:
399 if self.flows.__contains__(i):
400 pkts = self.create_stream(i, self.pg_if_packet_sizes,
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000401 traffic_type, ip_type, proto, ports,
Pavel Kotuceke7b67342017-04-18 13:12:20 +0200402 frags, pkt_raw)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100403 if len(pkts) > 0:
404 i.add_stream(pkts)
405 pkts_cnt += len(pkts)
406
407 # Enable packet capture and start packet sendingself.IPV
408 self.pg_enable_capture(self.pg_interfaces)
409 self.pg_start()
410
411 # Verify
412 # Verify outgoing packet streams per packet-generator interface
413 for src_if in self.pg_interfaces:
414 if self.flows.__contains__(src_if):
415 for dst_if in self.flows[src_if]:
416 capture = dst_if.get_capture(pkts_cnt)
417 self.logger.info("Verifying capture on interface %s" %
418 dst_if.name)
419 self.verify_capture(dst_if, capture, traffic_type, ip_type)
420
421 def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000422 ports=0, frags=False):
Pavel Kotucek59dda062017-03-02 15:22:47 +0100423 # Test
424 self.reset_packet_infos()
425 for i in self.pg_interfaces:
426 if self.flows.__contains__(i):
427 pkts = self.create_stream(i, self.pg_if_packet_sizes,
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000428 traffic_type, ip_type, proto, ports,
429 frags)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100430 if len(pkts) > 0:
431 i.add_stream(pkts)
432
433 # Enable packet capture and start packet sending
434 self.pg_enable_capture(self.pg_interfaces)
435 self.pg_start()
436
437 # Verify
438 # Verify outgoing packet streams per packet-generator interface
439 for src_if in self.pg_interfaces:
440 if self.flows.__contains__(src_if):
441 for dst_if in self.flows[src_if]:
442 self.logger.info("Verifying capture on interface %s" %
443 dst_if.name)
444 capture = dst_if.get_capture(0)
445 self.assertEqual(len(capture), 0)
446
447 def api_acl_add_replace(self, acl_index, r, count, tag='',
448 expected_retval=0):
449 """Add/replace an ACL
450
451 :param int acl_index: ACL index to replace,
452 4294967295 to create new ACL.
453 :param acl_rule r: ACL rules array.
454 :param str tag: symbolic tag (description) for this ACL.
455 :param int count: number of rules.
456 """
457 return self.vapi.api(self.vapi.papi.acl_add_replace,
458 {'acl_index': acl_index,
459 'r': r,
460 'count': count,
461 'tag': tag},
462 expected_retval=expected_retval)
463
464 def api_acl_interface_set_acl_list(self, sw_if_index, count, n_input, acls,
465 expected_retval=0):
466 return self.vapi.api(self.vapi.papi.acl_interface_set_acl_list,
467 {'sw_if_index': sw_if_index,
468 'count': count,
469 'n_input': n_input,
470 'acls': acls},
471 expected_retval=expected_retval)
472
473 def api_acl_dump(self, acl_index, expected_retval=0):
474 return self.vapi.api(self.vapi.papi.acl_dump,
475 {'acl_index': acl_index},
476 expected_retval=expected_retval)
477
478 def test_0000_warmup_test(self):
479 """ ACL plugin version check; learn MACs
480 """
481 self.create_hosts(16)
482 self.run_traffic_no_check()
483 reply = self.vapi.papi.acl_plugin_get_version()
484 self.assertEqual(reply.major, 1)
485 self.logger.info("Working with ACL plugin version: %d.%d" % (
486 reply.major, reply.minor))
487 # minor version changes are non breaking
488 # self.assertEqual(reply.minor, 0)
489
490 def test_0001_acl_create(self):
491 """ ACL create test
492 """
493
494 self.logger.info("ACLP_TEST_START_0001")
495 # Add an ACL
496 r = [{'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
497 'srcport_or_icmptype_first': 1234,
498 'srcport_or_icmptype_last': 1235,
499 'src_ip_prefix_len': 0,
500 'src_ip_addr': '\x00\x00\x00\x00',
501 'dstport_or_icmpcode_first': 1234,
502 'dstport_or_icmpcode_last': 1234,
503 'dst_ip_addr': '\x00\x00\x00\x00',
504 'dst_ip_prefix_len': 0}]
505 # Test 1: add a new ACL
506 reply = self.api_acl_add_replace(acl_index=4294967295, r=r,
507 count=len(r), tag="permit 1234")
508 self.assertEqual(reply.retval, 0)
509 # The very first ACL gets #0
510 self.assertEqual(reply.acl_index, 0)
511 rr = self.api_acl_dump(reply.acl_index)
512 self.logger.info("Dumped ACL: " + str(rr))
513 self.assertEqual(len(rr), 1)
514 # We should have the same number of ACL entries as we had asked
515 self.assertEqual(len(rr[0].r), len(r))
516 # The rules should be the same. But because the submitted and returned
517 # are different types, we need to iterate over rules and keys to get
518 # to basic values.
519 for i_rule in range(0, len(r) - 1):
520 for rule_key in r[i_rule]:
521 self.assertEqual(rr[0].r[i_rule][rule_key],
522 r[i_rule][rule_key])
523
524 # Add a deny-1234 ACL
525 r_deny = ({'is_permit': 0, 'is_ipv6': 0, 'proto': 17,
526 'srcport_or_icmptype_first': 1234,
527 'srcport_or_icmptype_last': 1235,
528 'src_ip_prefix_len': 0,
529 'src_ip_addr': '\x00\x00\x00\x00',
530 'dstport_or_icmpcode_first': 1234,
531 'dstport_or_icmpcode_last': 1234,
532 'dst_ip_addr': '\x00\x00\x00\x00',
533 'dst_ip_prefix_len': 0},
534 {'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
535 'srcport_or_icmptype_first': 0,
536 'srcport_or_icmptype_last': 0,
537 'src_ip_prefix_len': 0,
538 'src_ip_addr': '\x00\x00\x00\x00',
539 'dstport_or_icmpcode_first': 0,
540 'dstport_or_icmpcode_last': 0,
541 'dst_ip_addr': '\x00\x00\x00\x00',
542 'dst_ip_prefix_len': 0})
543
544 reply = self.api_acl_add_replace(acl_index=4294967295, r=r_deny,
545 count=len(r_deny),
546 tag="deny 1234;permit all")
547 self.assertEqual(reply.retval, 0)
548 # The second ACL gets #1
549 self.assertEqual(reply.acl_index, 1)
550
551 # Test 2: try to modify a nonexistent ACL
552 reply = self.api_acl_add_replace(acl_index=432, r=r, count=len(r),
553 tag="FFFF:FFFF", expected_retval=-1)
554 self.assertEqual(reply.retval, -1)
555 # The ACL number should pass through
556 self.assertEqual(reply.acl_index, 432)
557
558 self.logger.info("ACLP_TEST_FINISH_0001")
559
560 def test_0002_acl_permit_apply(self):
561 """ permit ACL apply test
562 """
563 self.logger.info("ACLP_TEST_START_0002")
564
565 rules = []
566 rules.append(self.create_rule(self.IPV4, self.PERMIT,
567 0, self.proto[self.IP][self.UDP]))
568 rules.append(self.create_rule(self.IPV4, self.PERMIT,
569 0, self.proto[self.IP][self.TCP]))
570
571 # Apply rules
572 self.apply_rules(rules, "permit per-flow")
573
574 # Traffic should still pass
575 self.run_verify_test(self.IP, self.IPV4, -1)
576 self.logger.info("ACLP_TEST_FINISH_0002")
577
578 def test_0003_acl_deny_apply(self):
579 """ deny ACL apply test
580 """
581 self.logger.info("ACLP_TEST_START_0003")
582 # Add a deny-flows ACL
583 rules = []
584 rules.append(self.create_rule(self.IPV4, self.DENY,
585 self.PORTS_ALL, self.proto[self.IP][self.UDP]))
586 # Permit ip any any in the end
587 rules.append(self.create_rule(self.IPV4, self.PERMIT,
588 self.PORTS_ALL, 0))
589
590 # Apply rules
591 self.apply_rules(rules, "deny per-flow;permit all")
592
593 # Traffic should not pass
594 self.run_verify_negat_test(self.IP, self.IPV4,
595 self.proto[self.IP][self.UDP])
596 self.logger.info("ACLP_TEST_FINISH_0003")
597 # self.assertEqual(1, 0)
598
599 def test_0004_vpp624_permit_icmpv4(self):
600 """ VPP_624 permit ICMPv4
601 """
602 self.logger.info("ACLP_TEST_START_0004")
603
604 # Add an ACL
605 rules = []
606 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
607 self.proto[self.ICMP][self.ICMPv4]))
608 # deny ip any any in the end
609 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
610
611 # Apply rules
612 self.apply_rules(rules, "permit icmpv4")
613
614 # Traffic should still pass
615 self.run_verify_test(self.ICMP, self.IPV4,
616 self.proto[self.ICMP][self.ICMPv4])
617
618 self.logger.info("ACLP_TEST_FINISH_0004")
619
620 def test_0005_vpp624_permit_icmpv6(self):
621 """ VPP_624 permit ICMPv6
622 """
623 self.logger.info("ACLP_TEST_START_0005")
624
625 # Add an ACL
626 rules = []
627 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
628 self.proto[self.ICMP][self.ICMPv6]))
629 # deny ip any any in the end
630 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
631
632 # Apply rules
633 self.apply_rules(rules, "permit icmpv6")
634
635 # Traffic should still pass
636 self.run_verify_test(self.ICMP, self.IPV6,
637 self.proto[self.ICMP][self.ICMPv6])
638
639 self.logger.info("ACLP_TEST_FINISH_0005")
640
641 def test_0006_vpp624_deny_icmpv4(self):
642 """ VPP_624 deny ICMPv4
643 """
644 self.logger.info("ACLP_TEST_START_0006")
645 # Add an ACL
646 rules = []
647 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
648 self.proto[self.ICMP][self.ICMPv4]))
649 # permit ip any any in the end
650 rules.append(self.create_rule(self.IPV4, self.PERMIT,
651 self.PORTS_ALL, 0))
652
653 # Apply rules
654 self.apply_rules(rules, "deny icmpv4")
655
656 # Traffic should not pass
657 self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
658
659 self.logger.info("ACLP_TEST_FINISH_0006")
660
661 def test_0007_vpp624_deny_icmpv6(self):
662 """ VPP_624 deny ICMPv6
663 """
664 self.logger.info("ACLP_TEST_START_0007")
665 # Add an ACL
666 rules = []
667 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
668 self.proto[self.ICMP][self.ICMPv6]))
669 # deny ip any any in the end
670 rules.append(self.create_rule(self.IPV6, self.PERMIT,
671 self.PORTS_ALL, 0))
672
673 # Apply rules
674 self.apply_rules(rules, "deny icmpv6")
675
676 # Traffic should not pass
677 self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
678
679 self.logger.info("ACLP_TEST_FINISH_0007")
680
681 def test_0008_tcp_permit_v4(self):
682 """ permit TCPv4
683 """
684 self.logger.info("ACLP_TEST_START_0008")
685
686 # Add an ACL
687 rules = []
688 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
689 self.proto[self.IP][self.TCP]))
690 # deny ip any any in the end
691 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
692
693 # Apply rules
694 self.apply_rules(rules, "permit ipv4 tcp")
695
696 # Traffic should still pass
697 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
698
699 self.logger.info("ACLP_TEST_FINISH_0008")
700
701 def test_0009_tcp_permit_v6(self):
702 """ permit TCPv6
703 """
704 self.logger.info("ACLP_TEST_START_0009")
705
706 # Add an ACL
707 rules = []
708 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
709 self.proto[self.IP][self.TCP]))
710 # deny ip any any in the end
711 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
712
713 # Apply rules
714 self.apply_rules(rules, "permit ip6 tcp")
715
716 # Traffic should still pass
717 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
718
719 self.logger.info("ACLP_TEST_FINISH_0008")
720
721 def test_0010_udp_permit_v4(self):
722 """ permit UDPv4
723 """
724 self.logger.info("ACLP_TEST_START_0010")
725
726 # Add an ACL
727 rules = []
728 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
729 self.proto[self.IP][self.UDP]))
730 # deny ip any any in the end
731 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
732
733 # Apply rules
734 self.apply_rules(rules, "permit ipv udp")
735
736 # Traffic should still pass
737 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
738
739 self.logger.info("ACLP_TEST_FINISH_0010")
740
741 def test_0011_udp_permit_v6(self):
742 """ permit UDPv6
743 """
744 self.logger.info("ACLP_TEST_START_0011")
745
746 # Add an ACL
747 rules = []
748 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
749 self.proto[self.IP][self.UDP]))
750 # deny ip any any in the end
751 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
752
753 # Apply rules
754 self.apply_rules(rules, "permit ip6 udp")
755
756 # Traffic should still pass
757 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
758
759 self.logger.info("ACLP_TEST_FINISH_0011")
760
761 def test_0012_tcp_deny(self):
762 """ deny TCPv4/v6
763 """
764 self.logger.info("ACLP_TEST_START_0012")
765
766 # Add an ACL
767 rules = []
768 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
769 self.proto[self.IP][self.TCP]))
770 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
771 self.proto[self.IP][self.TCP]))
772 # permit ip any any in the end
773 rules.append(self.create_rule(self.IPV4, self.PERMIT,
774 self.PORTS_ALL, 0))
775 rules.append(self.create_rule(self.IPV6, self.PERMIT,
776 self.PORTS_ALL, 0))
777
778 # Apply rules
779 self.apply_rules(rules, "deny ip4/ip6 tcp")
780
781 # Traffic should not pass
782 self.run_verify_negat_test(self.IP, self.IPRANDOM,
783 self.proto[self.IP][self.TCP])
784
785 self.logger.info("ACLP_TEST_FINISH_0012")
786
787 def test_0013_udp_deny(self):
788 """ deny UDPv4/v6
789 """
790 self.logger.info("ACLP_TEST_START_0013")
791
792 # Add an ACL
793 rules = []
794 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
795 self.proto[self.IP][self.UDP]))
796 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
797 self.proto[self.IP][self.UDP]))
798 # permit ip any any in the end
799 rules.append(self.create_rule(self.IPV4, self.PERMIT,
800 self.PORTS_ALL, 0))
801 rules.append(self.create_rule(self.IPV6, self.PERMIT,
802 self.PORTS_ALL, 0))
803
804 # Apply rules
805 self.apply_rules(rules, "deny ip4/ip6 udp")
806
807 # Traffic should not pass
808 self.run_verify_negat_test(self.IP, self.IPRANDOM,
809 self.proto[self.IP][self.UDP])
810
811 self.logger.info("ACLP_TEST_FINISH_0013")
812
813 def test_0014_acl_dump(self):
814 """ verify add/dump acls
815 """
816 self.logger.info("ACLP_TEST_START_0014")
817
818 r = [[self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
819 [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
820 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
821 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
822 [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
823 [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
824 [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
825 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
826 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
827 [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
828 [self.IPV4, self.DENY, self.PORTS_ALL, 0],
829 [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
830 [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
831 [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
832 [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
833 [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
834 [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
835 [self.IPV6, self.DENY, self.PORTS_ALL, 0]
836 ]
837
838 # Add and verify new ACLs
839 rules = []
840 for i in range(len(r)):
841 rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
842
843 reply = self.api_acl_add_replace(acl_index=4294967295, r=rules,
844 count=len(rules))
845 result = self.api_acl_dump(reply.acl_index)
846
847 i = 0
848 for drules in result:
849 for dr in drules.r:
850 self.assertEqual(dr.is_ipv6, r[i][0])
851 self.assertEqual(dr.is_permit, r[i][1])
852 self.assertEqual(dr.proto, r[i][3])
853
854 if r[i][2] > 0:
855 self.assertEqual(dr.srcport_or_icmptype_first, r[i][2])
856 else:
857 if r[i][2] < 0:
858 self.assertEqual(dr.srcport_or_icmptype_first, 0)
859 self.assertEqual(dr.srcport_or_icmptype_last, 65535)
860 else:
861 if dr.proto == self.proto[self.IP][self.TCP]:
862 self.assertGreater(dr.srcport_or_icmptype_first,
863 self.tcp_sport_from-1)
864 self.assertLess(dr.srcport_or_icmptype_first,
865 self.tcp_sport_to+1)
866 self.assertGreater(dr.dstport_or_icmpcode_last,
867 self.tcp_dport_from-1)
868 self.assertLess(dr.dstport_or_icmpcode_last,
869 self.tcp_dport_to+1)
870 elif dr.proto == self.proto[self.IP][self.UDP]:
871 self.assertGreater(dr.srcport_or_icmptype_first,
872 self.udp_sport_from-1)
873 self.assertLess(dr.srcport_or_icmptype_first,
874 self.udp_sport_to+1)
875 self.assertGreater(dr.dstport_or_icmpcode_last,
876 self.udp_dport_from-1)
877 self.assertLess(dr.dstport_or_icmpcode_last,
878 self.udp_dport_to+1)
879 i += 1
880
881 self.logger.info("ACLP_TEST_FINISH_0014")
882
883 def test_0015_tcp_permit_port_v4(self):
884 """ permit single TCPv4
885 """
886 self.logger.info("ACLP_TEST_START_0015")
887
888 port = random.randint(0, 65535)
889 # Add an ACL
890 rules = []
891 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
892 self.proto[self.IP][self.TCP]))
893 # deny ip any any in the end
894 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
895
896 # Apply rules
897 self.apply_rules(rules, "permit ip4 tcp "+str(port))
898
899 # Traffic should still pass
900 self.run_verify_test(self.IP, self.IPV4,
901 self.proto[self.IP][self.TCP], port)
902
903 self.logger.info("ACLP_TEST_FINISH_0015")
904
905 def test_0016_udp_permit_port_v4(self):
906 """ permit single UDPv4
907 """
908 self.logger.info("ACLP_TEST_START_0016")
909
910 port = random.randint(0, 65535)
911 # Add an ACL
912 rules = []
913 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
914 self.proto[self.IP][self.UDP]))
915 # deny ip any any in the end
916 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
917
918 # Apply rules
919 self.apply_rules(rules, "permit ip4 tcp "+str(port))
920
921 # Traffic should still pass
922 self.run_verify_test(self.IP, self.IPV4,
923 self.proto[self.IP][self.UDP], port)
924
925 self.logger.info("ACLP_TEST_FINISH_0016")
926
927 def test_0017_tcp_permit_port_v6(self):
928 """ permit single TCPv6
929 """
930 self.logger.info("ACLP_TEST_START_0017")
931
932 port = random.randint(0, 65535)
933 # Add an ACL
934 rules = []
935 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
936 self.proto[self.IP][self.TCP]))
937 # deny ip any any in the end
938 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
939
940 # Apply rules
941 self.apply_rules(rules, "permit ip4 tcp "+str(port))
942
943 # Traffic should still pass
944 self.run_verify_test(self.IP, self.IPV6,
945 self.proto[self.IP][self.TCP], port)
946
947 self.logger.info("ACLP_TEST_FINISH_0017")
948
949 def test_0018_udp_permit_port_v6(self):
950 """ permit single UPPv6
951 """
952 self.logger.info("ACLP_TEST_START_0018")
953
954 port = random.randint(0, 65535)
955 # Add an ACL
956 rules = []
957 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
958 self.proto[self.IP][self.UDP]))
959 # deny ip any any in the end
960 rules.append(self.create_rule(self.IPV6, self.DENY,
961 self.PORTS_ALL, 0))
962
963 # Apply rules
964 self.apply_rules(rules, "permit ip4 tcp "+str(port))
965
966 # Traffic should still pass
967 self.run_verify_test(self.IP, self.IPV6,
968 self.proto[self.IP][self.UDP], port)
969
970 self.logger.info("ACLP_TEST_FINISH_0018")
971
972 def test_0019_udp_deny_port(self):
973 """ deny single TCPv4/v6
974 """
975 self.logger.info("ACLP_TEST_START_0019")
976
977 port = random.randint(0, 65535)
978 # Add an ACL
979 rules = []
980 rules.append(self.create_rule(self.IPV4, self.DENY, port,
981 self.proto[self.IP][self.TCP]))
982 rules.append(self.create_rule(self.IPV6, self.DENY, port,
983 self.proto[self.IP][self.TCP]))
984 # Permit ip any any in the end
985 rules.append(self.create_rule(self.IPV4, self.PERMIT,
986 self.PORTS_ALL, 0))
987 rules.append(self.create_rule(self.IPV6, self.PERMIT,
988 self.PORTS_ALL, 0))
989
990 # Apply rules
991 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
992
993 # Traffic should not pass
994 self.run_verify_negat_test(self.IP, self.IPRANDOM,
995 self.proto[self.IP][self.TCP], port)
996
997 self.logger.info("ACLP_TEST_FINISH_0019")
998
999 def test_0020_udp_deny_port(self):
1000 """ deny single UDPv4/v6
1001 """
1002 self.logger.info("ACLP_TEST_START_0020")
1003
1004 port = random.randint(0, 65535)
1005 # Add an ACL
1006 rules = []
1007 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1008 self.proto[self.IP][self.UDP]))
1009 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1010 self.proto[self.IP][self.UDP]))
1011 # Permit ip any any in the end
1012 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1013 self.PORTS_ALL, 0))
1014 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1015 self.PORTS_ALL, 0))
1016
1017 # Apply rules
1018 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1019
1020 # Traffic should not pass
1021 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1022 self.proto[self.IP][self.UDP], port)
1023
1024 self.logger.info("ACLP_TEST_FINISH_0020")
1025
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +00001026 def test_0021_udp_deny_port_verify_fragment_deny(self):
1027 """ deny single UDPv4/v6, permit ip any, verify non-initial fragment blocked
1028 """
1029 self.logger.info("ACLP_TEST_START_0021")
1030
1031 port = random.randint(0, 65535)
1032 # Add an ACL
1033 rules = []
1034 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1035 self.proto[self.IP][self.UDP]))
1036 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1037 self.proto[self.IP][self.UDP]))
1038 # deny ip any any in the end
1039 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1040 self.PORTS_ALL, 0))
1041 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1042 self.PORTS_ALL, 0))
1043
1044 # Apply rules
1045 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1046
1047 # Traffic should not pass
1048 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1049 self.proto[self.IP][self.UDP], port, True)
1050
1051 self.logger.info("ACLP_TEST_FINISH_0021")
1052
Pavel Kotuceke7b67342017-04-18 13:12:20 +02001053 def test_0022_zero_length_udp_ipv4(self):
1054 """ VPP-687 zero length udp ipv4 packet"""
1055 self.logger.info("ACLP_TEST_START_0022")
1056
1057 port = random.randint(0, 65535)
1058 # Add an ACL
1059 rules = []
1060 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
1061 self.proto[self.IP][self.UDP]))
1062 # deny ip any any in the end
1063 rules.append(
1064 self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1065
1066 # Apply rules
1067 self.apply_rules(rules, "permit empty udp ip4 " + str(port))
1068
1069 # Traffic should still pass
1070 # Create incoming packet streams for packet-generator interfaces
1071 pkts_cnt = 0
1072 pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1073 self.IP, self.IPV4,
1074 self.proto[self.IP][self.UDP], port,
1075 False, False)
1076 if len(pkts) > 0:
1077 self.pg0.add_stream(pkts)
1078 pkts_cnt += len(pkts)
1079
1080 # Enable packet capture and start packet sendingself.IPV
1081 self.pg_enable_capture(self.pg_interfaces)
1082 self.pg_start()
1083
1084 self.pg1.get_capture(pkts_cnt)
1085
1086 self.logger.info("ACLP_TEST_FINISH_0022")
1087
1088 def test_0023_zero_length_udp_ipv6(self):
1089 """ VPP-687 zero length udp ipv6 packet"""
1090 self.logger.info("ACLP_TEST_START_0023")
1091
1092 port = random.randint(0, 65535)
1093 # Add an ACL
1094 rules = []
1095 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1096 self.proto[self.IP][self.UDP]))
1097 # deny ip any any in the end
1098 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1099
1100 # Apply rules
1101 self.apply_rules(rules, "permit empty udp ip6 "+str(port))
1102
1103 # Traffic should still pass
1104 # Create incoming packet streams for packet-generator interfaces
1105 pkts_cnt = 0
1106 pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1107 self.IP, self.IPV6,
1108 self.proto[self.IP][self.UDP], port,
1109 False, False)
1110 if len(pkts) > 0:
1111 self.pg0.add_stream(pkts)
1112 pkts_cnt += len(pkts)
1113
1114 # Enable packet capture and start packet sendingself.IPV
1115 self.pg_enable_capture(self.pg_interfaces)
1116 self.pg_start()
1117
1118 # Verify outgoing packet streams per packet-generator interface
1119 self.pg1.get_capture(pkts_cnt)
1120
1121 self.logger.info("ACLP_TEST_FINISH_0023")
1122
Pavel Kotucek59dda062017-03-02 15:22:47 +01001123if __name__ == '__main__':
1124 unittest.main(testRunner=VppTestRunner)