blob: 3a180d21961515fdf14b1f6f706b81985d0b7fff [file] [log] [blame]
Pavel Kotucek59dda062017-03-02 15:22:47 +01001#!/usr/bin/env python
2"""ACL plugin Test Case HLD:
3"""
4
5import unittest
6import random
7
8from scapy.packet import Raw
9from scapy.layers.l2 import Ether
10from scapy.layers.inet import IP, TCP, UDP, ICMP
11from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +000012from scapy.layers.inet6 import IPv6ExtHdrFragment
Pavel Kotucek59dda062017-03-02 15:22:47 +010013from framework import VppTestCase, VppTestRunner
14from util import Host, ppp
15
16
17class TestACLplugin(VppTestCase):
18 """ ACL plugin Test Case """
19
20 # traffic types
21 IP = 0
22 ICMP = 1
23
24 # IP version
25 IPRANDOM = -1
26 IPV4 = 0
27 IPV6 = 1
28
29 # rule types
30 DENY = 0
31 PERMIT = 1
32
33 # supported protocols
34 proto = [[6, 17], [1, 58]]
35 proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
36 ICMPv4 = 0
37 ICMPv6 = 1
38 TCP = 0
39 UDP = 1
40 PROTO_ALL = 0
41
42 # port ranges
43 PORTS_ALL = -1
44 PORTS_RANGE = 0
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +020045 PORTS_RANGE_2 = 1
Pavel Kotucek59dda062017-03-02 15:22:47 +010046 udp_sport_from = 10
47 udp_sport_to = udp_sport_from + 5
48 udp_dport_from = 20000
49 udp_dport_to = udp_dport_from + 5000
50 tcp_sport_from = 30
51 tcp_sport_to = tcp_sport_from + 5
52 tcp_dport_from = 40000
53 tcp_dport_to = tcp_dport_from + 5000
54
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +020055 udp_sport_from_2 = 90
56 udp_sport_to_2 = udp_sport_from_2 + 5
57 udp_dport_from_2 = 30000
58 udp_dport_to_2 = udp_dport_from_2 + 5000
59 tcp_sport_from_2 = 130
60 tcp_sport_to_2 = tcp_sport_from_2 + 5
61 tcp_dport_from_2 = 20000
62 tcp_dport_to_2 = tcp_dport_from_2 + 5000
63
Pavel Kotucek59dda062017-03-02 15:22:47 +010064 icmp4_type = 8 # echo request
65 icmp4_code = 3
66 icmp6_type = 128 # echo request
67 icmp6_code = 3
68
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +020069 icmp4_type_2 = 8
70 icmp4_code_from_2 = 5
71 icmp4_code_to_2 = 20
72 icmp6_type_2 = 128
73 icmp6_code_from_2 = 8
74 icmp6_code_to_2 = 42
75
Pavel Kotucek59dda062017-03-02 15:22:47 +010076 # Test variables
77 bd_id = 1
78
79 @classmethod
80 def setUpClass(cls):
81 """
82 Perform standard class setup (defined by class method setUpClass in
83 class VppTestCase) before running the test case, set test case related
84 variables and configure VPP.
85 """
86 super(TestACLplugin, cls).setUpClass()
87
88 random.seed()
89
90 try:
91 # Create 2 pg interfaces
92 cls.create_pg_interfaces(range(2))
93
94 # Packet flows mapping pg0 -> pg1, pg2 etc.
95 cls.flows = dict()
96 cls.flows[cls.pg0] = [cls.pg1]
97
98 # Packet sizes
99 cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
100
101 # Create BD with MAC learning and unknown unicast flooding disabled
102 # and put interfaces to this BD
103 cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
104 learn=1)
105 for pg_if in cls.pg_interfaces:
106 cls.vapi.sw_interface_set_l2_bridge(pg_if.sw_if_index,
107 bd_id=cls.bd_id)
108
109 # Set up all interfaces
110 for i in cls.pg_interfaces:
111 i.admin_up()
112
113 # Mapping between packet-generator index and lists of test hosts
114 cls.hosts_by_pg_idx = dict()
115 for pg_if in cls.pg_interfaces:
116 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
117
118 # Create list of deleted hosts
119 cls.deleted_hosts_by_pg_idx = dict()
120 for pg_if in cls.pg_interfaces:
121 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
122
123 # warm-up the mac address tables
124 # self.warmup_test()
125
126 except Exception:
127 super(TestACLplugin, cls).tearDownClass()
128 raise
129
130 def setUp(self):
131 super(TestACLplugin, self).setUp()
132 self.reset_packet_infos()
133
134 def tearDown(self):
135 """
136 Show various debug prints after each test.
137 """
138 super(TestACLplugin, self).tearDown()
139 if not self.vpp_dead:
140 self.logger.info(self.vapi.ppcli("show l2fib verbose"))
Andrew Yourtchenko7f4d5772017-05-24 13:20:47 +0200141 self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
142 self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
143 self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
Pavel Kotucek59dda062017-03-02 15:22:47 +0100144 self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
145 % self.bd_id))
146
147 def create_hosts(self, count, start=0):
148 """
149 Create required number of host MAC addresses and distribute them among
150 interfaces. Create host IPv4 address for every host MAC address.
151
152 :param int count: Number of hosts to create MAC/IPv4 addresses for.
153 :param int start: Number to start numbering from.
154 """
155 n_int = len(self.pg_interfaces)
156 macs_per_if = count / n_int
157 i = -1
158 for pg_if in self.pg_interfaces:
159 i += 1
160 start_nr = macs_per_if * i + start
161 end_nr = count + start if i == (n_int - 1) \
162 else macs_per_if * (i + 1) + start
163 hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
164 for j in range(start_nr, end_nr):
165 host = Host(
166 "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
167 "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
168 "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
169 hosts.append(host)
170
171 def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
172 s_prefix=0, s_ip='\x00\x00\x00\x00',
173 d_prefix=0, d_ip='\x00\x00\x00\x00'):
174 if proto == -1:
175 return
176 if ports == self.PORTS_ALL:
177 sport_from = 0
178 dport_from = 0
179 sport_to = 65535 if proto != 1 and proto != 58 else 255
180 dport_to = sport_to
181 elif ports == self.PORTS_RANGE:
182 if proto == 1:
183 sport_from = self.icmp4_type
184 sport_to = self.icmp4_type
185 dport_from = self.icmp4_code
186 dport_to = self.icmp4_code
187 elif proto == 58:
188 sport_from = self.icmp6_type
189 sport_to = self.icmp6_type
190 dport_from = self.icmp6_code
191 dport_to = self.icmp6_code
192 elif proto == self.proto[self.IP][self.TCP]:
193 sport_from = self.tcp_sport_from
194 sport_to = self.tcp_sport_to
195 dport_from = self.tcp_dport_from
196 dport_to = self.tcp_dport_to
197 elif proto == self.proto[self.IP][self.UDP]:
198 sport_from = self.udp_sport_from
199 sport_to = self.udp_sport_to
200 dport_from = self.udp_dport_from
201 dport_to = self.udp_dport_to
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +0200202 elif ports == self.PORTS_RANGE_2:
203 if proto == 1:
204 sport_from = self.icmp4_type_2
205 sport_to = self.icmp4_type_2
206 dport_from = self.icmp4_code_from_2
207 dport_to = self.icmp4_code_to_2
208 elif proto == 58:
209 sport_from = self.icmp6_type_2
210 sport_to = self.icmp6_type_2
211 dport_from = self.icmp6_code_from_2
212 dport_to = self.icmp6_code_to_2
213 elif proto == self.proto[self.IP][self.TCP]:
214 sport_from = self.tcp_sport_from_2
215 sport_to = self.tcp_sport_to_2
216 dport_from = self.tcp_dport_from_2
217 dport_to = self.tcp_dport_to_2
218 elif proto == self.proto[self.IP][self.UDP]:
219 sport_from = self.udp_sport_from_2
220 sport_to = self.udp_sport_to_2
221 dport_from = self.udp_dport_from_2
222 dport_to = self.udp_dport_to_2
Pavel Kotucek59dda062017-03-02 15:22:47 +0100223 else:
224 sport_from = ports
225 sport_to = ports
226 dport_from = ports
227 dport_to = ports
228
229 rule = ({'is_permit': permit_deny, 'is_ipv6': ip, 'proto': proto,
230 'srcport_or_icmptype_first': sport_from,
231 'srcport_or_icmptype_last': sport_to,
232 'src_ip_prefix_len': s_prefix,
233 'src_ip_addr': s_ip,
234 'dstport_or_icmpcode_first': dport_from,
235 'dstport_or_icmpcode_last': dport_to,
236 'dst_ip_prefix_len': d_prefix,
237 'dst_ip_addr': d_ip})
238 return rule
239
240 def apply_rules(self, rules, tag=''):
241 reply = self.api_acl_add_replace(acl_index=4294967295, r=rules,
242 count=len(rules),
243 tag=tag)
244 self.logger.info("Dumped ACL: " + str(
245 self.api_acl_dump(reply.acl_index)))
246 # Apply a ACL on the interface as inbound
247 for i in self.pg_interfaces:
248 self.api_acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
249 count=1, n_input=1,
250 acls=[reply.acl_index])
251 return
252
253 def create_upper_layer(self, packet_index, proto, ports=0):
254 p = self.proto_map[proto]
255 if p == 'UDP':
256 if ports == 0:
257 return UDP(sport=random.randint(self.udp_sport_from,
258 self.udp_sport_to),
259 dport=random.randint(self.udp_dport_from,
260 self.udp_dport_to))
261 else:
262 return UDP(sport=ports, dport=ports)
263 elif p == 'TCP':
264 if ports == 0:
265 return TCP(sport=random.randint(self.tcp_sport_from,
266 self.tcp_sport_to),
267 dport=random.randint(self.tcp_dport_from,
268 self.tcp_dport_to))
269 else:
270 return TCP(sport=ports, dport=ports)
271 return ''
272
273 def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
Pavel Kotuceke7b67342017-04-18 13:12:20 +0200274 proto=-1, ports=0, fragments=False, pkt_raw=True):
Pavel Kotucek59dda062017-03-02 15:22:47 +0100275 """
276 Create input packet stream for defined interface using hosts or
277 deleted_hosts list.
278
279 :param object src_if: Interface to create packet stream for.
280 :param list packet_sizes: List of required packet sizes.
281 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
282 :return: Stream of packets.
283 """
284 pkts = []
285 if self.flows.__contains__(src_if):
286 src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
287 for dst_if in self.flows[src_if]:
288 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
289 n_int = len(dst_hosts) * len(src_hosts)
290 for i in range(0, n_int):
291 dst_host = dst_hosts[i / len(src_hosts)]
292 src_host = src_hosts[i % len(src_hosts)]
293 pkt_info = self.create_packet_info(src_if, dst_if)
294 if ipv6 == 1:
295 pkt_info.ip = 1
296 elif ipv6 == 0:
297 pkt_info.ip = 0
298 else:
299 pkt_info.ip = random.choice([0, 1])
300 if proto == -1:
301 pkt_info.proto = random.choice(self.proto[self.IP])
302 else:
303 pkt_info.proto = proto
304 payload = self.info_to_payload(pkt_info)
305 p = Ether(dst=dst_host.mac, src=src_host.mac)
306 if pkt_info.ip:
307 p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000308 if fragments:
309 p /= IPv6ExtHdrFragment(offset=64, m=1)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100310 else:
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000311 if fragments:
312 p /= IP(src=src_host.ip4, dst=dst_host.ip4,
313 flags=1, frag=64)
314 else:
315 p /= IP(src=src_host.ip4, dst=dst_host.ip4)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100316 if traffic_type == self.ICMP:
317 if pkt_info.ip:
318 p /= ICMPv6EchoRequest(type=self.icmp6_type,
319 code=self.icmp6_code)
320 else:
321 p /= ICMP(type=self.icmp4_type,
322 code=self.icmp4_code)
323 else:
324 p /= self.create_upper_layer(i, pkt_info.proto, ports)
Pavel Kotuceke7b67342017-04-18 13:12:20 +0200325 if pkt_raw:
326 p /= Raw(payload)
327 pkt_info.data = p.copy()
328 if pkt_raw:
329 size = random.choice(packet_sizes)
330 self.extend_packet(p, size)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100331 pkts.append(p)
332 return pkts
333
334 def verify_capture(self, pg_if, capture, traffic_type=0, ip_type=0):
335 """
336 Verify captured input packet stream for defined interface.
337
338 :param object pg_if: Interface to verify captured packet stream for.
339 :param list capture: Captured packet stream.
340 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
341 """
342 last_info = dict()
343 for i in self.pg_interfaces:
344 last_info[i.sw_if_index] = None
345 dst_sw_if_index = pg_if.sw_if_index
346 for packet in capture:
347 try:
348 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
349 if traffic_type == self.ICMP and ip_type == self.IPV6:
350 payload_info = self.payload_to_info(
351 packet[ICMPv6EchoRequest].data)
352 payload = packet[ICMPv6EchoRequest]
353 else:
354 payload_info = self.payload_to_info(str(packet[Raw]))
355 payload = packet[self.proto_map[payload_info.proto]]
356 except:
357 self.logger.error(ppp("Unexpected or invalid packet "
358 "(outside network):", packet))
359 raise
360
361 if ip_type != 0:
362 self.assertEqual(payload_info.ip, ip_type)
363 if traffic_type == self.ICMP:
364 try:
365 if payload_info.ip == 0:
366 self.assertEqual(payload.type, self.icmp4_type)
367 self.assertEqual(payload.code, self.icmp4_code)
368 else:
369 self.assertEqual(payload.type, self.icmp6_type)
370 self.assertEqual(payload.code, self.icmp6_code)
371 except:
372 self.logger.error(ppp("Unexpected or invalid packet "
373 "(outside network):", packet))
374 raise
375 else:
376 try:
377 ip_version = IPv6 if payload_info.ip == 1 else IP
378
379 ip = packet[ip_version]
380 packet_index = payload_info.index
381
382 self.assertEqual(payload_info.dst, dst_sw_if_index)
383 self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
384 (pg_if.name, payload_info.src,
385 packet_index))
386 next_info = self.get_next_packet_info_for_interface2(
387 payload_info.src, dst_sw_if_index,
388 last_info[payload_info.src])
389 last_info[payload_info.src] = next_info
390 self.assertTrue(next_info is not None)
391 self.assertEqual(packet_index, next_info.index)
392 saved_packet = next_info.data
393 # Check standard fields
394 self.assertEqual(ip.src, saved_packet[ip_version].src)
395 self.assertEqual(ip.dst, saved_packet[ip_version].dst)
396 p = self.proto_map[payload_info.proto]
397 if p == 'TCP':
398 tcp = packet[TCP]
399 self.assertEqual(tcp.sport, saved_packet[
400 TCP].sport)
401 self.assertEqual(tcp.dport, saved_packet[
402 TCP].dport)
403 elif p == 'UDP':
404 udp = packet[UDP]
405 self.assertEqual(udp.sport, saved_packet[
406 UDP].sport)
407 self.assertEqual(udp.dport, saved_packet[
408 UDP].dport)
409 except:
410 self.logger.error(ppp("Unexpected or invalid packet:",
411 packet))
412 raise
413 for i in self.pg_interfaces:
414 remaining_packet = self.get_next_packet_info_for_interface2(
415 i, dst_sw_if_index, last_info[i.sw_if_index])
416 self.assertTrue(
417 remaining_packet is None,
418 "Port %u: Packet expected from source %u didn't arrive" %
419 (dst_sw_if_index, i.sw_if_index))
420
421 def run_traffic_no_check(self):
422 # Test
423 # Create incoming packet streams for packet-generator interfaces
424 for i in self.pg_interfaces:
425 if self.flows.__contains__(i):
426 pkts = self.create_stream(i, self.pg_if_packet_sizes)
427 if len(pkts) > 0:
428 i.add_stream(pkts)
429
430 # Enable packet capture and start packet sending
431 self.pg_enable_capture(self.pg_interfaces)
432 self.pg_start()
433
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000434 def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
Pavel Kotuceke7b67342017-04-18 13:12:20 +0200435 frags=False, pkt_raw=True):
Pavel Kotucek59dda062017-03-02 15:22:47 +0100436 # Test
437 # Create incoming packet streams for packet-generator interfaces
438 pkts_cnt = 0
439 for i in self.pg_interfaces:
440 if self.flows.__contains__(i):
441 pkts = self.create_stream(i, self.pg_if_packet_sizes,
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000442 traffic_type, ip_type, proto, ports,
Pavel Kotuceke7b67342017-04-18 13:12:20 +0200443 frags, pkt_raw)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100444 if len(pkts) > 0:
445 i.add_stream(pkts)
446 pkts_cnt += len(pkts)
447
448 # Enable packet capture and start packet sendingself.IPV
449 self.pg_enable_capture(self.pg_interfaces)
450 self.pg_start()
451
452 # Verify
453 # Verify outgoing packet streams per packet-generator interface
454 for src_if in self.pg_interfaces:
455 if self.flows.__contains__(src_if):
456 for dst_if in self.flows[src_if]:
457 capture = dst_if.get_capture(pkts_cnt)
458 self.logger.info("Verifying capture on interface %s" %
459 dst_if.name)
460 self.verify_capture(dst_if, capture, traffic_type, ip_type)
461
462 def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000463 ports=0, frags=False):
Pavel Kotucek59dda062017-03-02 15:22:47 +0100464 # Test
465 self.reset_packet_infos()
466 for i in self.pg_interfaces:
467 if self.flows.__contains__(i):
468 pkts = self.create_stream(i, self.pg_if_packet_sizes,
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000469 traffic_type, ip_type, proto, ports,
470 frags)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100471 if len(pkts) > 0:
472 i.add_stream(pkts)
473
474 # Enable packet capture and start packet sending
475 self.pg_enable_capture(self.pg_interfaces)
476 self.pg_start()
477
478 # Verify
479 # Verify outgoing packet streams per packet-generator interface
480 for src_if in self.pg_interfaces:
481 if self.flows.__contains__(src_if):
482 for dst_if in self.flows[src_if]:
483 self.logger.info("Verifying capture on interface %s" %
484 dst_if.name)
485 capture = dst_if.get_capture(0)
486 self.assertEqual(len(capture), 0)
487
488 def api_acl_add_replace(self, acl_index, r, count, tag='',
489 expected_retval=0):
490 """Add/replace an ACL
491
492 :param int acl_index: ACL index to replace,
493 4294967295 to create new ACL.
494 :param acl_rule r: ACL rules array.
495 :param str tag: symbolic tag (description) for this ACL.
496 :param int count: number of rules.
497 """
498 return self.vapi.api(self.vapi.papi.acl_add_replace,
499 {'acl_index': acl_index,
500 'r': r,
501 'count': count,
502 'tag': tag},
503 expected_retval=expected_retval)
504
505 def api_acl_interface_set_acl_list(self, sw_if_index, count, n_input, acls,
506 expected_retval=0):
507 return self.vapi.api(self.vapi.papi.acl_interface_set_acl_list,
508 {'sw_if_index': sw_if_index,
509 'count': count,
510 'n_input': n_input,
511 'acls': acls},
512 expected_retval=expected_retval)
513
514 def api_acl_dump(self, acl_index, expected_retval=0):
515 return self.vapi.api(self.vapi.papi.acl_dump,
516 {'acl_index': acl_index},
517 expected_retval=expected_retval)
518
519 def test_0000_warmup_test(self):
520 """ ACL plugin version check; learn MACs
521 """
522 self.create_hosts(16)
523 self.run_traffic_no_check()
524 reply = self.vapi.papi.acl_plugin_get_version()
525 self.assertEqual(reply.major, 1)
526 self.logger.info("Working with ACL plugin version: %d.%d" % (
527 reply.major, reply.minor))
528 # minor version changes are non breaking
529 # self.assertEqual(reply.minor, 0)
530
531 def test_0001_acl_create(self):
532 """ ACL create test
533 """
534
535 self.logger.info("ACLP_TEST_START_0001")
536 # Add an ACL
537 r = [{'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
538 'srcport_or_icmptype_first': 1234,
539 'srcport_or_icmptype_last': 1235,
540 'src_ip_prefix_len': 0,
541 'src_ip_addr': '\x00\x00\x00\x00',
542 'dstport_or_icmpcode_first': 1234,
543 'dstport_or_icmpcode_last': 1234,
544 'dst_ip_addr': '\x00\x00\x00\x00',
545 'dst_ip_prefix_len': 0}]
546 # Test 1: add a new ACL
547 reply = self.api_acl_add_replace(acl_index=4294967295, r=r,
548 count=len(r), tag="permit 1234")
549 self.assertEqual(reply.retval, 0)
550 # The very first ACL gets #0
551 self.assertEqual(reply.acl_index, 0)
552 rr = self.api_acl_dump(reply.acl_index)
553 self.logger.info("Dumped ACL: " + str(rr))
554 self.assertEqual(len(rr), 1)
555 # We should have the same number of ACL entries as we had asked
556 self.assertEqual(len(rr[0].r), len(r))
557 # The rules should be the same. But because the submitted and returned
558 # are different types, we need to iterate over rules and keys to get
559 # to basic values.
560 for i_rule in range(0, len(r) - 1):
561 for rule_key in r[i_rule]:
562 self.assertEqual(rr[0].r[i_rule][rule_key],
563 r[i_rule][rule_key])
564
565 # Add a deny-1234 ACL
566 r_deny = ({'is_permit': 0, 'is_ipv6': 0, 'proto': 17,
567 'srcport_or_icmptype_first': 1234,
568 'srcport_or_icmptype_last': 1235,
569 'src_ip_prefix_len': 0,
570 'src_ip_addr': '\x00\x00\x00\x00',
571 'dstport_or_icmpcode_first': 1234,
572 'dstport_or_icmpcode_last': 1234,
573 'dst_ip_addr': '\x00\x00\x00\x00',
574 'dst_ip_prefix_len': 0},
575 {'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
576 'srcport_or_icmptype_first': 0,
577 'srcport_or_icmptype_last': 0,
578 'src_ip_prefix_len': 0,
579 'src_ip_addr': '\x00\x00\x00\x00',
580 'dstport_or_icmpcode_first': 0,
581 'dstport_or_icmpcode_last': 0,
582 'dst_ip_addr': '\x00\x00\x00\x00',
583 'dst_ip_prefix_len': 0})
584
585 reply = self.api_acl_add_replace(acl_index=4294967295, r=r_deny,
586 count=len(r_deny),
587 tag="deny 1234;permit all")
588 self.assertEqual(reply.retval, 0)
589 # The second ACL gets #1
590 self.assertEqual(reply.acl_index, 1)
591
592 # Test 2: try to modify a nonexistent ACL
593 reply = self.api_acl_add_replace(acl_index=432, r=r, count=len(r),
594 tag="FFFF:FFFF", expected_retval=-1)
595 self.assertEqual(reply.retval, -1)
596 # The ACL number should pass through
597 self.assertEqual(reply.acl_index, 432)
598
599 self.logger.info("ACLP_TEST_FINISH_0001")
600
601 def test_0002_acl_permit_apply(self):
602 """ permit ACL apply test
603 """
604 self.logger.info("ACLP_TEST_START_0002")
605
606 rules = []
607 rules.append(self.create_rule(self.IPV4, self.PERMIT,
608 0, self.proto[self.IP][self.UDP]))
609 rules.append(self.create_rule(self.IPV4, self.PERMIT,
610 0, self.proto[self.IP][self.TCP]))
611
612 # Apply rules
613 self.apply_rules(rules, "permit per-flow")
614
615 # Traffic should still pass
616 self.run_verify_test(self.IP, self.IPV4, -1)
617 self.logger.info("ACLP_TEST_FINISH_0002")
618
619 def test_0003_acl_deny_apply(self):
620 """ deny ACL apply test
621 """
622 self.logger.info("ACLP_TEST_START_0003")
623 # Add a deny-flows ACL
624 rules = []
625 rules.append(self.create_rule(self.IPV4, self.DENY,
626 self.PORTS_ALL, self.proto[self.IP][self.UDP]))
627 # Permit ip any any in the end
628 rules.append(self.create_rule(self.IPV4, self.PERMIT,
629 self.PORTS_ALL, 0))
630
631 # Apply rules
632 self.apply_rules(rules, "deny per-flow;permit all")
633
634 # Traffic should not pass
635 self.run_verify_negat_test(self.IP, self.IPV4,
636 self.proto[self.IP][self.UDP])
637 self.logger.info("ACLP_TEST_FINISH_0003")
638 # self.assertEqual(1, 0)
639
640 def test_0004_vpp624_permit_icmpv4(self):
641 """ VPP_624 permit ICMPv4
642 """
643 self.logger.info("ACLP_TEST_START_0004")
644
645 # Add an ACL
646 rules = []
647 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
648 self.proto[self.ICMP][self.ICMPv4]))
649 # deny ip any any in the end
650 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
651
652 # Apply rules
653 self.apply_rules(rules, "permit icmpv4")
654
655 # Traffic should still pass
656 self.run_verify_test(self.ICMP, self.IPV4,
657 self.proto[self.ICMP][self.ICMPv4])
658
659 self.logger.info("ACLP_TEST_FINISH_0004")
660
661 def test_0005_vpp624_permit_icmpv6(self):
662 """ VPP_624 permit ICMPv6
663 """
664 self.logger.info("ACLP_TEST_START_0005")
665
666 # Add an ACL
667 rules = []
668 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
669 self.proto[self.ICMP][self.ICMPv6]))
670 # deny ip any any in the end
671 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
672
673 # Apply rules
674 self.apply_rules(rules, "permit icmpv6")
675
676 # Traffic should still pass
677 self.run_verify_test(self.ICMP, self.IPV6,
678 self.proto[self.ICMP][self.ICMPv6])
679
680 self.logger.info("ACLP_TEST_FINISH_0005")
681
682 def test_0006_vpp624_deny_icmpv4(self):
683 """ VPP_624 deny ICMPv4
684 """
685 self.logger.info("ACLP_TEST_START_0006")
686 # Add an ACL
687 rules = []
688 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
689 self.proto[self.ICMP][self.ICMPv4]))
690 # permit ip any any in the end
691 rules.append(self.create_rule(self.IPV4, self.PERMIT,
692 self.PORTS_ALL, 0))
693
694 # Apply rules
695 self.apply_rules(rules, "deny icmpv4")
696
697 # Traffic should not pass
698 self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
699
700 self.logger.info("ACLP_TEST_FINISH_0006")
701
702 def test_0007_vpp624_deny_icmpv6(self):
703 """ VPP_624 deny ICMPv6
704 """
705 self.logger.info("ACLP_TEST_START_0007")
706 # Add an ACL
707 rules = []
708 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
709 self.proto[self.ICMP][self.ICMPv6]))
710 # deny ip any any in the end
711 rules.append(self.create_rule(self.IPV6, self.PERMIT,
712 self.PORTS_ALL, 0))
713
714 # Apply rules
715 self.apply_rules(rules, "deny icmpv6")
716
717 # Traffic should not pass
718 self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
719
720 self.logger.info("ACLP_TEST_FINISH_0007")
721
722 def test_0008_tcp_permit_v4(self):
723 """ permit TCPv4
724 """
725 self.logger.info("ACLP_TEST_START_0008")
726
727 # Add an ACL
728 rules = []
729 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
730 self.proto[self.IP][self.TCP]))
731 # deny ip any any in the end
732 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
733
734 # Apply rules
735 self.apply_rules(rules, "permit ipv4 tcp")
736
737 # Traffic should still pass
738 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
739
740 self.logger.info("ACLP_TEST_FINISH_0008")
741
742 def test_0009_tcp_permit_v6(self):
743 """ permit TCPv6
744 """
745 self.logger.info("ACLP_TEST_START_0009")
746
747 # Add an ACL
748 rules = []
749 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
750 self.proto[self.IP][self.TCP]))
751 # deny ip any any in the end
752 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
753
754 # Apply rules
755 self.apply_rules(rules, "permit ip6 tcp")
756
757 # Traffic should still pass
758 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
759
760 self.logger.info("ACLP_TEST_FINISH_0008")
761
762 def test_0010_udp_permit_v4(self):
763 """ permit UDPv4
764 """
765 self.logger.info("ACLP_TEST_START_0010")
766
767 # Add an ACL
768 rules = []
769 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
770 self.proto[self.IP][self.UDP]))
771 # deny ip any any in the end
772 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
773
774 # Apply rules
775 self.apply_rules(rules, "permit ipv udp")
776
777 # Traffic should still pass
778 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
779
780 self.logger.info("ACLP_TEST_FINISH_0010")
781
782 def test_0011_udp_permit_v6(self):
783 """ permit UDPv6
784 """
785 self.logger.info("ACLP_TEST_START_0011")
786
787 # Add an ACL
788 rules = []
789 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
790 self.proto[self.IP][self.UDP]))
791 # deny ip any any in the end
792 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
793
794 # Apply rules
795 self.apply_rules(rules, "permit ip6 udp")
796
797 # Traffic should still pass
798 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
799
800 self.logger.info("ACLP_TEST_FINISH_0011")
801
802 def test_0012_tcp_deny(self):
803 """ deny TCPv4/v6
804 """
805 self.logger.info("ACLP_TEST_START_0012")
806
807 # Add an ACL
808 rules = []
809 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
810 self.proto[self.IP][self.TCP]))
811 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
812 self.proto[self.IP][self.TCP]))
813 # permit ip any any in the end
814 rules.append(self.create_rule(self.IPV4, self.PERMIT,
815 self.PORTS_ALL, 0))
816 rules.append(self.create_rule(self.IPV6, self.PERMIT,
817 self.PORTS_ALL, 0))
818
819 # Apply rules
820 self.apply_rules(rules, "deny ip4/ip6 tcp")
821
822 # Traffic should not pass
823 self.run_verify_negat_test(self.IP, self.IPRANDOM,
824 self.proto[self.IP][self.TCP])
825
826 self.logger.info("ACLP_TEST_FINISH_0012")
827
828 def test_0013_udp_deny(self):
829 """ deny UDPv4/v6
830 """
831 self.logger.info("ACLP_TEST_START_0013")
832
833 # Add an ACL
834 rules = []
835 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
836 self.proto[self.IP][self.UDP]))
837 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
838 self.proto[self.IP][self.UDP]))
839 # permit ip any any in the end
840 rules.append(self.create_rule(self.IPV4, self.PERMIT,
841 self.PORTS_ALL, 0))
842 rules.append(self.create_rule(self.IPV6, self.PERMIT,
843 self.PORTS_ALL, 0))
844
845 # Apply rules
846 self.apply_rules(rules, "deny ip4/ip6 udp")
847
848 # Traffic should not pass
849 self.run_verify_negat_test(self.IP, self.IPRANDOM,
850 self.proto[self.IP][self.UDP])
851
852 self.logger.info("ACLP_TEST_FINISH_0013")
853
854 def test_0014_acl_dump(self):
855 """ verify add/dump acls
856 """
857 self.logger.info("ACLP_TEST_START_0014")
858
859 r = [[self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
860 [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
861 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
862 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
863 [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
864 [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
865 [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
866 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
867 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
868 [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
869 [self.IPV4, self.DENY, self.PORTS_ALL, 0],
870 [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
871 [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
872 [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
873 [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
874 [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
875 [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
876 [self.IPV6, self.DENY, self.PORTS_ALL, 0]
877 ]
878
879 # Add and verify new ACLs
880 rules = []
881 for i in range(len(r)):
882 rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
883
884 reply = self.api_acl_add_replace(acl_index=4294967295, r=rules,
885 count=len(rules))
886 result = self.api_acl_dump(reply.acl_index)
887
888 i = 0
889 for drules in result:
890 for dr in drules.r:
891 self.assertEqual(dr.is_ipv6, r[i][0])
892 self.assertEqual(dr.is_permit, r[i][1])
893 self.assertEqual(dr.proto, r[i][3])
894
895 if r[i][2] > 0:
896 self.assertEqual(dr.srcport_or_icmptype_first, r[i][2])
897 else:
898 if r[i][2] < 0:
899 self.assertEqual(dr.srcport_or_icmptype_first, 0)
900 self.assertEqual(dr.srcport_or_icmptype_last, 65535)
901 else:
902 if dr.proto == self.proto[self.IP][self.TCP]:
903 self.assertGreater(dr.srcport_or_icmptype_first,
904 self.tcp_sport_from-1)
905 self.assertLess(dr.srcport_or_icmptype_first,
906 self.tcp_sport_to+1)
907 self.assertGreater(dr.dstport_or_icmpcode_last,
908 self.tcp_dport_from-1)
909 self.assertLess(dr.dstport_or_icmpcode_last,
910 self.tcp_dport_to+1)
911 elif dr.proto == self.proto[self.IP][self.UDP]:
912 self.assertGreater(dr.srcport_or_icmptype_first,
913 self.udp_sport_from-1)
914 self.assertLess(dr.srcport_or_icmptype_first,
915 self.udp_sport_to+1)
916 self.assertGreater(dr.dstport_or_icmpcode_last,
917 self.udp_dport_from-1)
918 self.assertLess(dr.dstport_or_icmpcode_last,
919 self.udp_dport_to+1)
920 i += 1
921
922 self.logger.info("ACLP_TEST_FINISH_0014")
923
924 def test_0015_tcp_permit_port_v4(self):
925 """ permit single TCPv4
926 """
927 self.logger.info("ACLP_TEST_START_0015")
928
929 port = random.randint(0, 65535)
930 # Add an ACL
931 rules = []
932 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
933 self.proto[self.IP][self.TCP]))
934 # deny ip any any in the end
935 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
936
937 # Apply rules
938 self.apply_rules(rules, "permit ip4 tcp "+str(port))
939
940 # Traffic should still pass
941 self.run_verify_test(self.IP, self.IPV4,
942 self.proto[self.IP][self.TCP], port)
943
944 self.logger.info("ACLP_TEST_FINISH_0015")
945
946 def test_0016_udp_permit_port_v4(self):
947 """ permit single UDPv4
948 """
949 self.logger.info("ACLP_TEST_START_0016")
950
951 port = random.randint(0, 65535)
952 # Add an ACL
953 rules = []
954 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
955 self.proto[self.IP][self.UDP]))
956 # deny ip any any in the end
957 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
958
959 # Apply rules
960 self.apply_rules(rules, "permit ip4 tcp "+str(port))
961
962 # Traffic should still pass
963 self.run_verify_test(self.IP, self.IPV4,
964 self.proto[self.IP][self.UDP], port)
965
966 self.logger.info("ACLP_TEST_FINISH_0016")
967
968 def test_0017_tcp_permit_port_v6(self):
969 """ permit single TCPv6
970 """
971 self.logger.info("ACLP_TEST_START_0017")
972
973 port = random.randint(0, 65535)
974 # Add an ACL
975 rules = []
976 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
977 self.proto[self.IP][self.TCP]))
978 # deny ip any any in the end
979 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
980
981 # Apply rules
982 self.apply_rules(rules, "permit ip4 tcp "+str(port))
983
984 # Traffic should still pass
985 self.run_verify_test(self.IP, self.IPV6,
986 self.proto[self.IP][self.TCP], port)
987
988 self.logger.info("ACLP_TEST_FINISH_0017")
989
990 def test_0018_udp_permit_port_v6(self):
991 """ permit single UPPv6
992 """
993 self.logger.info("ACLP_TEST_START_0018")
994
995 port = random.randint(0, 65535)
996 # Add an ACL
997 rules = []
998 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
999 self.proto[self.IP][self.UDP]))
1000 # deny ip any any in the end
1001 rules.append(self.create_rule(self.IPV6, self.DENY,
1002 self.PORTS_ALL, 0))
1003
1004 # Apply rules
1005 self.apply_rules(rules, "permit ip4 tcp "+str(port))
1006
1007 # Traffic should still pass
1008 self.run_verify_test(self.IP, self.IPV6,
1009 self.proto[self.IP][self.UDP], port)
1010
1011 self.logger.info("ACLP_TEST_FINISH_0018")
1012
1013 def test_0019_udp_deny_port(self):
1014 """ deny single TCPv4/v6
1015 """
1016 self.logger.info("ACLP_TEST_START_0019")
1017
1018 port = random.randint(0, 65535)
1019 # Add an ACL
1020 rules = []
1021 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1022 self.proto[self.IP][self.TCP]))
1023 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1024 self.proto[self.IP][self.TCP]))
1025 # Permit ip any any in the end
1026 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1027 self.PORTS_ALL, 0))
1028 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1029 self.PORTS_ALL, 0))
1030
1031 # Apply rules
1032 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1033
1034 # Traffic should not pass
1035 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1036 self.proto[self.IP][self.TCP], port)
1037
1038 self.logger.info("ACLP_TEST_FINISH_0019")
1039
1040 def test_0020_udp_deny_port(self):
1041 """ deny single UDPv4/v6
1042 """
1043 self.logger.info("ACLP_TEST_START_0020")
1044
1045 port = random.randint(0, 65535)
1046 # Add an ACL
1047 rules = []
1048 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1049 self.proto[self.IP][self.UDP]))
1050 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1051 self.proto[self.IP][self.UDP]))
1052 # Permit ip any any in the end
1053 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1054 self.PORTS_ALL, 0))
1055 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1056 self.PORTS_ALL, 0))
1057
1058 # Apply rules
1059 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1060
1061 # Traffic should not pass
1062 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1063 self.proto[self.IP][self.UDP], port)
1064
1065 self.logger.info("ACLP_TEST_FINISH_0020")
1066
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +00001067 def test_0021_udp_deny_port_verify_fragment_deny(self):
1068 """ deny single UDPv4/v6, permit ip any, verify non-initial fragment blocked
1069 """
1070 self.logger.info("ACLP_TEST_START_0021")
1071
1072 port = random.randint(0, 65535)
1073 # Add an ACL
1074 rules = []
1075 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1076 self.proto[self.IP][self.UDP]))
1077 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1078 self.proto[self.IP][self.UDP]))
1079 # deny ip any any in the end
1080 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1081 self.PORTS_ALL, 0))
1082 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1083 self.PORTS_ALL, 0))
1084
1085 # Apply rules
1086 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1087
1088 # Traffic should not pass
1089 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1090 self.proto[self.IP][self.UDP], port, True)
1091
1092 self.logger.info("ACLP_TEST_FINISH_0021")
1093
Pavel Kotuceke7b67342017-04-18 13:12:20 +02001094 def test_0022_zero_length_udp_ipv4(self):
1095 """ VPP-687 zero length udp ipv4 packet"""
1096 self.logger.info("ACLP_TEST_START_0022")
1097
1098 port = random.randint(0, 65535)
1099 # Add an ACL
1100 rules = []
1101 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
1102 self.proto[self.IP][self.UDP]))
1103 # deny ip any any in the end
1104 rules.append(
1105 self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1106
1107 # Apply rules
1108 self.apply_rules(rules, "permit empty udp ip4 " + str(port))
1109
1110 # Traffic should still pass
1111 # Create incoming packet streams for packet-generator interfaces
1112 pkts_cnt = 0
1113 pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1114 self.IP, self.IPV4,
1115 self.proto[self.IP][self.UDP], port,
1116 False, False)
1117 if len(pkts) > 0:
1118 self.pg0.add_stream(pkts)
1119 pkts_cnt += len(pkts)
1120
1121 # Enable packet capture and start packet sendingself.IPV
1122 self.pg_enable_capture(self.pg_interfaces)
1123 self.pg_start()
1124
1125 self.pg1.get_capture(pkts_cnt)
1126
1127 self.logger.info("ACLP_TEST_FINISH_0022")
1128
1129 def test_0023_zero_length_udp_ipv6(self):
1130 """ VPP-687 zero length udp ipv6 packet"""
1131 self.logger.info("ACLP_TEST_START_0023")
1132
1133 port = random.randint(0, 65535)
1134 # Add an ACL
1135 rules = []
1136 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1137 self.proto[self.IP][self.UDP]))
1138 # deny ip any any in the end
1139 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1140
1141 # Apply rules
1142 self.apply_rules(rules, "permit empty udp ip6 "+str(port))
1143
1144 # Traffic should still pass
1145 # Create incoming packet streams for packet-generator interfaces
1146 pkts_cnt = 0
1147 pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1148 self.IP, self.IPV6,
1149 self.proto[self.IP][self.UDP], port,
1150 False, False)
1151 if len(pkts) > 0:
1152 self.pg0.add_stream(pkts)
1153 pkts_cnt += len(pkts)
1154
1155 # Enable packet capture and start packet sendingself.IPV
1156 self.pg_enable_capture(self.pg_interfaces)
1157 self.pg_start()
1158
1159 # Verify outgoing packet streams per packet-generator interface
1160 self.pg1.get_capture(pkts_cnt)
1161
1162 self.logger.info("ACLP_TEST_FINISH_0023")
1163
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +02001164 def test_0108_tcp_permit_v4(self):
1165 """ permit TCPv4 + non-match range
1166 """
1167 self.logger.info("ACLP_TEST_START_0108")
1168
1169 # Add an ACL
1170 rules = []
1171 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1172 self.proto[self.IP][self.TCP]))
1173 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1174 self.proto[self.IP][self.TCP]))
1175 # deny ip any any in the end
1176 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1177
1178 # Apply rules
1179 self.apply_rules(rules, "permit ipv4 tcp")
1180
1181 # Traffic should still pass
1182 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1183
1184 self.logger.info("ACLP_TEST_FINISH_0108")
1185
1186 def test_0109_tcp_permit_v6(self):
1187 """ permit TCPv6 + non-match range
1188 """
1189 self.logger.info("ACLP_TEST_START_0109")
1190
1191 # Add an ACL
1192 rules = []
1193 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1194 self.proto[self.IP][self.TCP]))
1195 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1196 self.proto[self.IP][self.TCP]))
1197 # deny ip any any in the end
1198 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1199
1200 # Apply rules
1201 self.apply_rules(rules, "permit ip6 tcp")
1202
1203 # Traffic should still pass
1204 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
1205
1206 self.logger.info("ACLP_TEST_FINISH_0109")
1207
1208 def test_0110_udp_permit_v4(self):
1209 """ permit UDPv4 + non-match range
1210 """
1211 self.logger.info("ACLP_TEST_START_0110")
1212
1213 # Add an ACL
1214 rules = []
1215 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1216 self.proto[self.IP][self.UDP]))
1217 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1218 self.proto[self.IP][self.UDP]))
1219 # deny ip any any in the end
1220 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1221
1222 # Apply rules
1223 self.apply_rules(rules, "permit ipv4 udp")
1224
1225 # Traffic should still pass
1226 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
1227
1228 self.logger.info("ACLP_TEST_FINISH_0110")
1229
1230 def test_0111_udp_permit_v6(self):
1231 """ permit UDPv6 + non-match range
1232 """
1233 self.logger.info("ACLP_TEST_START_0111")
1234
1235 # Add an ACL
1236 rules = []
1237 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1238 self.proto[self.IP][self.UDP]))
1239 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1240 self.proto[self.IP][self.UDP]))
1241 # deny ip any any in the end
1242 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1243
1244 # Apply rules
1245 self.apply_rules(rules, "permit ip6 udp")
1246
1247 # Traffic should still pass
1248 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
1249
1250 self.logger.info("ACLP_TEST_FINISH_0111")
1251
1252 def test_0112_tcp_deny(self):
1253 """ deny TCPv4/v6 + non-match range
1254 """
1255 self.logger.info("ACLP_TEST_START_0112")
1256
1257 # Add an ACL
1258 rules = []
1259 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1260 self.PORTS_RANGE_2,
1261 self.proto[self.IP][self.TCP]))
1262 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1263 self.PORTS_RANGE_2,
1264 self.proto[self.IP][self.TCP]))
1265 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1266 self.proto[self.IP][self.TCP]))
1267 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1268 self.proto[self.IP][self.TCP]))
1269 # permit ip any any in the end
1270 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1271 self.PORTS_ALL, 0))
1272 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1273 self.PORTS_ALL, 0))
1274
1275 # Apply rules
1276 self.apply_rules(rules, "deny ip4/ip6 tcp")
1277
1278 # Traffic should not pass
1279 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1280 self.proto[self.IP][self.TCP])
1281
1282 self.logger.info("ACLP_TEST_FINISH_0112")
1283
1284 def test_0113_udp_deny(self):
1285 """ deny UDPv4/v6 + non-match range
1286 """
1287 self.logger.info("ACLP_TEST_START_0113")
1288
1289 # Add an ACL
1290 rules = []
1291 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1292 self.PORTS_RANGE_2,
1293 self.proto[self.IP][self.UDP]))
1294 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1295 self.PORTS_RANGE_2,
1296 self.proto[self.IP][self.UDP]))
1297 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1298 self.proto[self.IP][self.UDP]))
1299 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1300 self.proto[self.IP][self.UDP]))
1301 # permit ip any any in the end
1302 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1303 self.PORTS_ALL, 0))
1304 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1305 self.PORTS_ALL, 0))
1306
1307 # Apply rules
1308 self.apply_rules(rules, "deny ip4/ip6 udp")
1309
1310 # Traffic should not pass
1311 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1312 self.proto[self.IP][self.UDP])
1313
1314 self.logger.info("ACLP_TEST_FINISH_0113")
1315
1316
Pavel Kotucek59dda062017-03-02 15:22:47 +01001317if __name__ == '__main__':
1318 unittest.main(testRunner=VppTestRunner)