blob: cd375a2cea7d5a98535998216822f35f12779d70 [file] [log] [blame]
Pavel Kotucek59dda062017-03-02 15:22:47 +01001#!/usr/bin/env python
2"""ACL plugin Test Case HLD:
3"""
4
5import unittest
6import random
7
8from scapy.packet import Raw
9from scapy.layers.l2 import Ether
10from scapy.layers.inet import IP, TCP, UDP, ICMP
11from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +000012from scapy.layers.inet6 import IPv6ExtHdrFragment
Pavel Kotucek59dda062017-03-02 15:22:47 +010013from framework import VppTestCase, VppTestRunner
14from util import Host, ppp
15
16
17class TestACLplugin(VppTestCase):
18 """ ACL plugin Test Case """
19
20 # traffic types
21 IP = 0
22 ICMP = 1
23
24 # IP version
25 IPRANDOM = -1
26 IPV4 = 0
27 IPV6 = 1
28
29 # rule types
30 DENY = 0
31 PERMIT = 1
32
33 # supported protocols
34 proto = [[6, 17], [1, 58]]
35 proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
36 ICMPv4 = 0
37 ICMPv6 = 1
38 TCP = 0
39 UDP = 1
40 PROTO_ALL = 0
41
42 # port ranges
43 PORTS_ALL = -1
44 PORTS_RANGE = 0
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +020045 PORTS_RANGE_2 = 1
Pavel Kotucek59dda062017-03-02 15:22:47 +010046 udp_sport_from = 10
47 udp_sport_to = udp_sport_from + 5
48 udp_dport_from = 20000
49 udp_dport_to = udp_dport_from + 5000
50 tcp_sport_from = 30
51 tcp_sport_to = tcp_sport_from + 5
52 tcp_dport_from = 40000
53 tcp_dport_to = tcp_dport_from + 5000
54
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +020055 udp_sport_from_2 = 90
56 udp_sport_to_2 = udp_sport_from_2 + 5
57 udp_dport_from_2 = 30000
58 udp_dport_to_2 = udp_dport_from_2 + 5000
59 tcp_sport_from_2 = 130
60 tcp_sport_to_2 = tcp_sport_from_2 + 5
61 tcp_dport_from_2 = 20000
62 tcp_dport_to_2 = tcp_dport_from_2 + 5000
63
Pavel Kotucek59dda062017-03-02 15:22:47 +010064 icmp4_type = 8 # echo request
65 icmp4_code = 3
66 icmp6_type = 128 # echo request
67 icmp6_code = 3
68
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +020069 icmp4_type_2 = 8
70 icmp4_code_from_2 = 5
71 icmp4_code_to_2 = 20
72 icmp6_type_2 = 128
73 icmp6_code_from_2 = 8
74 icmp6_code_to_2 = 42
75
Pavel Kotucek59dda062017-03-02 15:22:47 +010076 # Test variables
77 bd_id = 1
78
79 @classmethod
80 def setUpClass(cls):
81 """
82 Perform standard class setup (defined by class method setUpClass in
83 class VppTestCase) before running the test case, set test case related
84 variables and configure VPP.
85 """
86 super(TestACLplugin, cls).setUpClass()
87
88 random.seed()
89
90 try:
91 # Create 2 pg interfaces
92 cls.create_pg_interfaces(range(2))
93
94 # Packet flows mapping pg0 -> pg1, pg2 etc.
95 cls.flows = dict()
96 cls.flows[cls.pg0] = [cls.pg1]
97
98 # Packet sizes
99 cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
100
101 # Create BD with MAC learning and unknown unicast flooding disabled
102 # and put interfaces to this BD
103 cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
104 learn=1)
105 for pg_if in cls.pg_interfaces:
106 cls.vapi.sw_interface_set_l2_bridge(pg_if.sw_if_index,
107 bd_id=cls.bd_id)
108
109 # Set up all interfaces
110 for i in cls.pg_interfaces:
111 i.admin_up()
112
113 # Mapping between packet-generator index and lists of test hosts
114 cls.hosts_by_pg_idx = dict()
115 for pg_if in cls.pg_interfaces:
116 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
117
118 # Create list of deleted hosts
119 cls.deleted_hosts_by_pg_idx = dict()
120 for pg_if in cls.pg_interfaces:
121 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
122
123 # warm-up the mac address tables
124 # self.warmup_test()
125
126 except Exception:
127 super(TestACLplugin, cls).tearDownClass()
128 raise
129
130 def setUp(self):
131 super(TestACLplugin, self).setUp()
132 self.reset_packet_infos()
133
134 def tearDown(self):
135 """
136 Show various debug prints after each test.
137 """
138 super(TestACLplugin, self).tearDown()
139 if not self.vpp_dead:
140 self.logger.info(self.vapi.ppcli("show l2fib verbose"))
Andrew Yourtchenko7f4d5772017-05-24 13:20:47 +0200141 self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
142 self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
143 self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
Pavel Kotucek59dda062017-03-02 15:22:47 +0100144 self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
145 % self.bd_id))
146
147 def create_hosts(self, count, start=0):
148 """
149 Create required number of host MAC addresses and distribute them among
150 interfaces. Create host IPv4 address for every host MAC address.
151
152 :param int count: Number of hosts to create MAC/IPv4 addresses for.
153 :param int start: Number to start numbering from.
154 """
155 n_int = len(self.pg_interfaces)
156 macs_per_if = count / n_int
157 i = -1
158 for pg_if in self.pg_interfaces:
159 i += 1
160 start_nr = macs_per_if * i + start
161 end_nr = count + start if i == (n_int - 1) \
162 else macs_per_if * (i + 1) + start
163 hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
164 for j in range(start_nr, end_nr):
165 host = Host(
166 "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
167 "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
168 "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
169 hosts.append(host)
170
171 def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
172 s_prefix=0, s_ip='\x00\x00\x00\x00',
173 d_prefix=0, d_ip='\x00\x00\x00\x00'):
174 if proto == -1:
175 return
176 if ports == self.PORTS_ALL:
177 sport_from = 0
178 dport_from = 0
179 sport_to = 65535 if proto != 1 and proto != 58 else 255
180 dport_to = sport_to
181 elif ports == self.PORTS_RANGE:
182 if proto == 1:
183 sport_from = self.icmp4_type
184 sport_to = self.icmp4_type
185 dport_from = self.icmp4_code
186 dport_to = self.icmp4_code
187 elif proto == 58:
188 sport_from = self.icmp6_type
189 sport_to = self.icmp6_type
190 dport_from = self.icmp6_code
191 dport_to = self.icmp6_code
192 elif proto == self.proto[self.IP][self.TCP]:
193 sport_from = self.tcp_sport_from
194 sport_to = self.tcp_sport_to
195 dport_from = self.tcp_dport_from
196 dport_to = self.tcp_dport_to
197 elif proto == self.proto[self.IP][self.UDP]:
198 sport_from = self.udp_sport_from
199 sport_to = self.udp_sport_to
200 dport_from = self.udp_dport_from
201 dport_to = self.udp_dport_to
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +0200202 elif ports == self.PORTS_RANGE_2:
203 if proto == 1:
204 sport_from = self.icmp4_type_2
205 sport_to = self.icmp4_type_2
206 dport_from = self.icmp4_code_from_2
207 dport_to = self.icmp4_code_to_2
208 elif proto == 58:
209 sport_from = self.icmp6_type_2
210 sport_to = self.icmp6_type_2
211 dport_from = self.icmp6_code_from_2
212 dport_to = self.icmp6_code_to_2
213 elif proto == self.proto[self.IP][self.TCP]:
214 sport_from = self.tcp_sport_from_2
215 sport_to = self.tcp_sport_to_2
216 dport_from = self.tcp_dport_from_2
217 dport_to = self.tcp_dport_to_2
218 elif proto == self.proto[self.IP][self.UDP]:
219 sport_from = self.udp_sport_from_2
220 sport_to = self.udp_sport_to_2
221 dport_from = self.udp_dport_from_2
222 dport_to = self.udp_dport_to_2
Pavel Kotucek59dda062017-03-02 15:22:47 +0100223 else:
224 sport_from = ports
225 sport_to = ports
226 dport_from = ports
227 dport_to = ports
228
229 rule = ({'is_permit': permit_deny, 'is_ipv6': ip, 'proto': proto,
230 'srcport_or_icmptype_first': sport_from,
231 'srcport_or_icmptype_last': sport_to,
232 'src_ip_prefix_len': s_prefix,
233 'src_ip_addr': s_ip,
234 'dstport_or_icmpcode_first': dport_from,
235 'dstport_or_icmpcode_last': dport_to,
236 'dst_ip_prefix_len': d_prefix,
237 'dst_ip_addr': d_ip})
238 return rule
239
240 def apply_rules(self, rules, tag=''):
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200241 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
242 tag=tag)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100243 self.logger.info("Dumped ACL: " + str(
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200244 self.vapi.acl_dump(reply.acl_index)))
Pavel Kotucek59dda062017-03-02 15:22:47 +0100245 # Apply a ACL on the interface as inbound
246 for i in self.pg_interfaces:
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200247 self.vapi.acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
248 n_input=1,
249 acls=[reply.acl_index])
Pavel Kotucek59dda062017-03-02 15:22:47 +0100250 return
251
252 def create_upper_layer(self, packet_index, proto, ports=0):
253 p = self.proto_map[proto]
254 if p == 'UDP':
255 if ports == 0:
256 return UDP(sport=random.randint(self.udp_sport_from,
257 self.udp_sport_to),
258 dport=random.randint(self.udp_dport_from,
259 self.udp_dport_to))
260 else:
261 return UDP(sport=ports, dport=ports)
262 elif p == 'TCP':
263 if ports == 0:
264 return TCP(sport=random.randint(self.tcp_sport_from,
265 self.tcp_sport_to),
266 dport=random.randint(self.tcp_dport_from,
267 self.tcp_dport_to))
268 else:
269 return TCP(sport=ports, dport=ports)
270 return ''
271
272 def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
Pavel Kotuceke7b67342017-04-18 13:12:20 +0200273 proto=-1, ports=0, fragments=False, pkt_raw=True):
Pavel Kotucek59dda062017-03-02 15:22:47 +0100274 """
275 Create input packet stream for defined interface using hosts or
276 deleted_hosts list.
277
278 :param object src_if: Interface to create packet stream for.
279 :param list packet_sizes: List of required packet sizes.
280 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
281 :return: Stream of packets.
282 """
283 pkts = []
284 if self.flows.__contains__(src_if):
285 src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
286 for dst_if in self.flows[src_if]:
287 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
288 n_int = len(dst_hosts) * len(src_hosts)
289 for i in range(0, n_int):
290 dst_host = dst_hosts[i / len(src_hosts)]
291 src_host = src_hosts[i % len(src_hosts)]
292 pkt_info = self.create_packet_info(src_if, dst_if)
293 if ipv6 == 1:
294 pkt_info.ip = 1
295 elif ipv6 == 0:
296 pkt_info.ip = 0
297 else:
298 pkt_info.ip = random.choice([0, 1])
299 if proto == -1:
300 pkt_info.proto = random.choice(self.proto[self.IP])
301 else:
302 pkt_info.proto = proto
303 payload = self.info_to_payload(pkt_info)
304 p = Ether(dst=dst_host.mac, src=src_host.mac)
305 if pkt_info.ip:
306 p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000307 if fragments:
308 p /= IPv6ExtHdrFragment(offset=64, m=1)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100309 else:
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000310 if fragments:
311 p /= IP(src=src_host.ip4, dst=dst_host.ip4,
312 flags=1, frag=64)
313 else:
314 p /= IP(src=src_host.ip4, dst=dst_host.ip4)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100315 if traffic_type == self.ICMP:
316 if pkt_info.ip:
317 p /= ICMPv6EchoRequest(type=self.icmp6_type,
318 code=self.icmp6_code)
319 else:
320 p /= ICMP(type=self.icmp4_type,
321 code=self.icmp4_code)
322 else:
323 p /= self.create_upper_layer(i, pkt_info.proto, ports)
Pavel Kotuceke7b67342017-04-18 13:12:20 +0200324 if pkt_raw:
325 p /= Raw(payload)
326 pkt_info.data = p.copy()
327 if pkt_raw:
328 size = random.choice(packet_sizes)
329 self.extend_packet(p, size)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100330 pkts.append(p)
331 return pkts
332
333 def verify_capture(self, pg_if, capture, traffic_type=0, ip_type=0):
334 """
335 Verify captured input packet stream for defined interface.
336
337 :param object pg_if: Interface to verify captured packet stream for.
338 :param list capture: Captured packet stream.
339 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
340 """
341 last_info = dict()
342 for i in self.pg_interfaces:
343 last_info[i.sw_if_index] = None
344 dst_sw_if_index = pg_if.sw_if_index
345 for packet in capture:
346 try:
347 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
348 if traffic_type == self.ICMP and ip_type == self.IPV6:
349 payload_info = self.payload_to_info(
350 packet[ICMPv6EchoRequest].data)
351 payload = packet[ICMPv6EchoRequest]
352 else:
353 payload_info = self.payload_to_info(str(packet[Raw]))
354 payload = packet[self.proto_map[payload_info.proto]]
355 except:
356 self.logger.error(ppp("Unexpected or invalid packet "
357 "(outside network):", packet))
358 raise
359
360 if ip_type != 0:
361 self.assertEqual(payload_info.ip, ip_type)
362 if traffic_type == self.ICMP:
363 try:
364 if payload_info.ip == 0:
365 self.assertEqual(payload.type, self.icmp4_type)
366 self.assertEqual(payload.code, self.icmp4_code)
367 else:
368 self.assertEqual(payload.type, self.icmp6_type)
369 self.assertEqual(payload.code, self.icmp6_code)
370 except:
371 self.logger.error(ppp("Unexpected or invalid packet "
372 "(outside network):", packet))
373 raise
374 else:
375 try:
376 ip_version = IPv6 if payload_info.ip == 1 else IP
377
378 ip = packet[ip_version]
379 packet_index = payload_info.index
380
381 self.assertEqual(payload_info.dst, dst_sw_if_index)
382 self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
383 (pg_if.name, payload_info.src,
384 packet_index))
385 next_info = self.get_next_packet_info_for_interface2(
386 payload_info.src, dst_sw_if_index,
387 last_info[payload_info.src])
388 last_info[payload_info.src] = next_info
389 self.assertTrue(next_info is not None)
390 self.assertEqual(packet_index, next_info.index)
391 saved_packet = next_info.data
392 # Check standard fields
393 self.assertEqual(ip.src, saved_packet[ip_version].src)
394 self.assertEqual(ip.dst, saved_packet[ip_version].dst)
395 p = self.proto_map[payload_info.proto]
396 if p == 'TCP':
397 tcp = packet[TCP]
398 self.assertEqual(tcp.sport, saved_packet[
399 TCP].sport)
400 self.assertEqual(tcp.dport, saved_packet[
401 TCP].dport)
402 elif p == 'UDP':
403 udp = packet[UDP]
404 self.assertEqual(udp.sport, saved_packet[
405 UDP].sport)
406 self.assertEqual(udp.dport, saved_packet[
407 UDP].dport)
408 except:
409 self.logger.error(ppp("Unexpected or invalid packet:",
410 packet))
411 raise
412 for i in self.pg_interfaces:
413 remaining_packet = self.get_next_packet_info_for_interface2(
414 i, dst_sw_if_index, last_info[i.sw_if_index])
415 self.assertTrue(
416 remaining_packet is None,
417 "Port %u: Packet expected from source %u didn't arrive" %
418 (dst_sw_if_index, i.sw_if_index))
419
420 def run_traffic_no_check(self):
421 # Test
422 # Create incoming packet streams for packet-generator interfaces
423 for i in self.pg_interfaces:
424 if self.flows.__contains__(i):
425 pkts = self.create_stream(i, self.pg_if_packet_sizes)
426 if len(pkts) > 0:
427 i.add_stream(pkts)
428
429 # Enable packet capture and start packet sending
430 self.pg_enable_capture(self.pg_interfaces)
431 self.pg_start()
432
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000433 def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
Pavel Kotuceke7b67342017-04-18 13:12:20 +0200434 frags=False, pkt_raw=True):
Pavel Kotucek59dda062017-03-02 15:22:47 +0100435 # Test
436 # Create incoming packet streams for packet-generator interfaces
437 pkts_cnt = 0
438 for i in self.pg_interfaces:
439 if self.flows.__contains__(i):
440 pkts = self.create_stream(i, self.pg_if_packet_sizes,
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000441 traffic_type, ip_type, proto, ports,
Pavel Kotuceke7b67342017-04-18 13:12:20 +0200442 frags, pkt_raw)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100443 if len(pkts) > 0:
444 i.add_stream(pkts)
445 pkts_cnt += len(pkts)
446
447 # Enable packet capture and start packet sendingself.IPV
448 self.pg_enable_capture(self.pg_interfaces)
449 self.pg_start()
450
451 # Verify
452 # Verify outgoing packet streams per packet-generator interface
453 for src_if in self.pg_interfaces:
454 if self.flows.__contains__(src_if):
455 for dst_if in self.flows[src_if]:
456 capture = dst_if.get_capture(pkts_cnt)
457 self.logger.info("Verifying capture on interface %s" %
458 dst_if.name)
459 self.verify_capture(dst_if, capture, traffic_type, ip_type)
460
461 def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000462 ports=0, frags=False):
Pavel Kotucek59dda062017-03-02 15:22:47 +0100463 # Test
464 self.reset_packet_infos()
465 for i in self.pg_interfaces:
466 if self.flows.__contains__(i):
467 pkts = self.create_stream(i, self.pg_if_packet_sizes,
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000468 traffic_type, ip_type, proto, ports,
469 frags)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100470 if len(pkts) > 0:
471 i.add_stream(pkts)
472
473 # Enable packet capture and start packet sending
474 self.pg_enable_capture(self.pg_interfaces)
475 self.pg_start()
476
477 # Verify
478 # Verify outgoing packet streams per packet-generator interface
479 for src_if in self.pg_interfaces:
480 if self.flows.__contains__(src_if):
481 for dst_if in self.flows[src_if]:
482 self.logger.info("Verifying capture on interface %s" %
483 dst_if.name)
484 capture = dst_if.get_capture(0)
485 self.assertEqual(len(capture), 0)
486
Pavel Kotucek59dda062017-03-02 15:22:47 +0100487 def test_0000_warmup_test(self):
488 """ ACL plugin version check; learn MACs
489 """
490 self.create_hosts(16)
491 self.run_traffic_no_check()
492 reply = self.vapi.papi.acl_plugin_get_version()
493 self.assertEqual(reply.major, 1)
494 self.logger.info("Working with ACL plugin version: %d.%d" % (
495 reply.major, reply.minor))
496 # minor version changes are non breaking
497 # self.assertEqual(reply.minor, 0)
498
499 def test_0001_acl_create(self):
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200500 """ ACL create/delete test
Pavel Kotucek59dda062017-03-02 15:22:47 +0100501 """
502
503 self.logger.info("ACLP_TEST_START_0001")
504 # Add an ACL
505 r = [{'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
506 'srcport_or_icmptype_first': 1234,
507 'srcport_or_icmptype_last': 1235,
508 'src_ip_prefix_len': 0,
509 'src_ip_addr': '\x00\x00\x00\x00',
510 'dstport_or_icmpcode_first': 1234,
511 'dstport_or_icmpcode_last': 1234,
512 'dst_ip_addr': '\x00\x00\x00\x00',
513 'dst_ip_prefix_len': 0}]
514 # Test 1: add a new ACL
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200515 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r,
516 tag="permit 1234")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100517 self.assertEqual(reply.retval, 0)
518 # The very first ACL gets #0
519 self.assertEqual(reply.acl_index, 0)
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200520 first_acl = reply.acl_index
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200521 rr = self.vapi.acl_dump(reply.acl_index)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100522 self.logger.info("Dumped ACL: " + str(rr))
523 self.assertEqual(len(rr), 1)
524 # We should have the same number of ACL entries as we had asked
525 self.assertEqual(len(rr[0].r), len(r))
526 # The rules should be the same. But because the submitted and returned
527 # are different types, we need to iterate over rules and keys to get
528 # to basic values.
529 for i_rule in range(0, len(r) - 1):
530 for rule_key in r[i_rule]:
531 self.assertEqual(rr[0].r[i_rule][rule_key],
532 r[i_rule][rule_key])
533
534 # Add a deny-1234 ACL
Ole Troan895b6e82017-10-20 13:28:20 +0200535 r_deny = [{'is_permit': 0, 'is_ipv6': 0, 'proto': 17,
Pavel Kotucek59dda062017-03-02 15:22:47 +0100536 'srcport_or_icmptype_first': 1234,
537 'srcport_or_icmptype_last': 1235,
538 'src_ip_prefix_len': 0,
539 'src_ip_addr': '\x00\x00\x00\x00',
540 'dstport_or_icmpcode_first': 1234,
541 'dstport_or_icmpcode_last': 1234,
542 'dst_ip_addr': '\x00\x00\x00\x00',
543 'dst_ip_prefix_len': 0},
544 {'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
545 'srcport_or_icmptype_first': 0,
546 'srcport_or_icmptype_last': 0,
547 'src_ip_prefix_len': 0,
548 'src_ip_addr': '\x00\x00\x00\x00',
549 'dstport_or_icmpcode_first': 0,
550 'dstport_or_icmpcode_last': 0,
551 'dst_ip_addr': '\x00\x00\x00\x00',
Ole Troan895b6e82017-10-20 13:28:20 +0200552 'dst_ip_prefix_len': 0}]
Pavel Kotucek59dda062017-03-02 15:22:47 +0100553
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200554 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_deny,
555 tag="deny 1234;permit all")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100556 self.assertEqual(reply.retval, 0)
557 # The second ACL gets #1
558 self.assertEqual(reply.acl_index, 1)
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200559 second_acl = reply.acl_index
Pavel Kotucek59dda062017-03-02 15:22:47 +0100560
561 # Test 2: try to modify a nonexistent ACL
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200562 reply = self.vapi.acl_add_replace(acl_index=432, r=r,
563 tag="FFFF:FFFF", expected_retval=-1)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100564 self.assertEqual(reply.retval, -1)
565 # The ACL number should pass through
566 self.assertEqual(reply.acl_index, 432)
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200567 # apply an ACL on an interface inbound, try to delete ACL, must fail
568 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
569 n_input=1,
570 acls=[first_acl])
571 reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=-1)
572 # Unapply an ACL and then try to delete it - must be ok
573 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
574 n_input=0,
575 acls=[])
576 reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=0)
577
578 # apply an ACL on an interface outbound, try to delete ACL, must fail
579 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
580 n_input=0,
581 acls=[second_acl])
582 reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=-1)
583 # Unapply the ACL and then try to delete it - must be ok
584 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
585 n_input=0,
586 acls=[])
587 reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=0)
588
589 # try to apply a nonexistent ACL - must fail
590 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
591 n_input=1,
592 acls=[first_acl],
593 expected_retval=-1)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100594
595 self.logger.info("ACLP_TEST_FINISH_0001")
596
597 def test_0002_acl_permit_apply(self):
598 """ permit ACL apply test
599 """
600 self.logger.info("ACLP_TEST_START_0002")
601
602 rules = []
603 rules.append(self.create_rule(self.IPV4, self.PERMIT,
604 0, self.proto[self.IP][self.UDP]))
605 rules.append(self.create_rule(self.IPV4, self.PERMIT,
606 0, self.proto[self.IP][self.TCP]))
607
608 # Apply rules
609 self.apply_rules(rules, "permit per-flow")
610
611 # Traffic should still pass
612 self.run_verify_test(self.IP, self.IPV4, -1)
613 self.logger.info("ACLP_TEST_FINISH_0002")
614
615 def test_0003_acl_deny_apply(self):
616 """ deny ACL apply test
617 """
618 self.logger.info("ACLP_TEST_START_0003")
619 # Add a deny-flows ACL
620 rules = []
621 rules.append(self.create_rule(self.IPV4, self.DENY,
622 self.PORTS_ALL, self.proto[self.IP][self.UDP]))
623 # Permit ip any any in the end
624 rules.append(self.create_rule(self.IPV4, self.PERMIT,
625 self.PORTS_ALL, 0))
626
627 # Apply rules
628 self.apply_rules(rules, "deny per-flow;permit all")
629
630 # Traffic should not pass
631 self.run_verify_negat_test(self.IP, self.IPV4,
632 self.proto[self.IP][self.UDP])
633 self.logger.info("ACLP_TEST_FINISH_0003")
634 # self.assertEqual(1, 0)
635
636 def test_0004_vpp624_permit_icmpv4(self):
637 """ VPP_624 permit ICMPv4
638 """
639 self.logger.info("ACLP_TEST_START_0004")
640
641 # Add an ACL
642 rules = []
643 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
644 self.proto[self.ICMP][self.ICMPv4]))
645 # deny ip any any in the end
646 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
647
648 # Apply rules
649 self.apply_rules(rules, "permit icmpv4")
650
651 # Traffic should still pass
652 self.run_verify_test(self.ICMP, self.IPV4,
653 self.proto[self.ICMP][self.ICMPv4])
654
655 self.logger.info("ACLP_TEST_FINISH_0004")
656
657 def test_0005_vpp624_permit_icmpv6(self):
658 """ VPP_624 permit ICMPv6
659 """
660 self.logger.info("ACLP_TEST_START_0005")
661
662 # Add an ACL
663 rules = []
664 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
665 self.proto[self.ICMP][self.ICMPv6]))
666 # deny ip any any in the end
667 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
668
669 # Apply rules
670 self.apply_rules(rules, "permit icmpv6")
671
672 # Traffic should still pass
673 self.run_verify_test(self.ICMP, self.IPV6,
674 self.proto[self.ICMP][self.ICMPv6])
675
676 self.logger.info("ACLP_TEST_FINISH_0005")
677
678 def test_0006_vpp624_deny_icmpv4(self):
679 """ VPP_624 deny ICMPv4
680 """
681 self.logger.info("ACLP_TEST_START_0006")
682 # Add an ACL
683 rules = []
684 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
685 self.proto[self.ICMP][self.ICMPv4]))
686 # permit ip any any in the end
687 rules.append(self.create_rule(self.IPV4, self.PERMIT,
688 self.PORTS_ALL, 0))
689
690 # Apply rules
691 self.apply_rules(rules, "deny icmpv4")
692
693 # Traffic should not pass
694 self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
695
696 self.logger.info("ACLP_TEST_FINISH_0006")
697
698 def test_0007_vpp624_deny_icmpv6(self):
699 """ VPP_624 deny ICMPv6
700 """
701 self.logger.info("ACLP_TEST_START_0007")
702 # Add an ACL
703 rules = []
704 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
705 self.proto[self.ICMP][self.ICMPv6]))
706 # deny ip any any in the end
707 rules.append(self.create_rule(self.IPV6, self.PERMIT,
708 self.PORTS_ALL, 0))
709
710 # Apply rules
711 self.apply_rules(rules, "deny icmpv6")
712
713 # Traffic should not pass
714 self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
715
716 self.logger.info("ACLP_TEST_FINISH_0007")
717
718 def test_0008_tcp_permit_v4(self):
719 """ permit TCPv4
720 """
721 self.logger.info("ACLP_TEST_START_0008")
722
723 # Add an ACL
724 rules = []
725 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
726 self.proto[self.IP][self.TCP]))
727 # deny ip any any in the end
728 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
729
730 # Apply rules
731 self.apply_rules(rules, "permit ipv4 tcp")
732
733 # Traffic should still pass
734 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
735
736 self.logger.info("ACLP_TEST_FINISH_0008")
737
738 def test_0009_tcp_permit_v6(self):
739 """ permit TCPv6
740 """
741 self.logger.info("ACLP_TEST_START_0009")
742
743 # Add an ACL
744 rules = []
745 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
746 self.proto[self.IP][self.TCP]))
747 # deny ip any any in the end
748 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
749
750 # Apply rules
751 self.apply_rules(rules, "permit ip6 tcp")
752
753 # Traffic should still pass
754 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
755
756 self.logger.info("ACLP_TEST_FINISH_0008")
757
758 def test_0010_udp_permit_v4(self):
759 """ permit UDPv4
760 """
761 self.logger.info("ACLP_TEST_START_0010")
762
763 # Add an ACL
764 rules = []
765 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
766 self.proto[self.IP][self.UDP]))
767 # deny ip any any in the end
768 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
769
770 # Apply rules
771 self.apply_rules(rules, "permit ipv udp")
772
773 # Traffic should still pass
774 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
775
776 self.logger.info("ACLP_TEST_FINISH_0010")
777
778 def test_0011_udp_permit_v6(self):
779 """ permit UDPv6
780 """
781 self.logger.info("ACLP_TEST_START_0011")
782
783 # Add an ACL
784 rules = []
785 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
786 self.proto[self.IP][self.UDP]))
787 # deny ip any any in the end
788 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
789
790 # Apply rules
791 self.apply_rules(rules, "permit ip6 udp")
792
793 # Traffic should still pass
794 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
795
796 self.logger.info("ACLP_TEST_FINISH_0011")
797
798 def test_0012_tcp_deny(self):
799 """ deny TCPv4/v6
800 """
801 self.logger.info("ACLP_TEST_START_0012")
802
803 # Add an ACL
804 rules = []
805 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
806 self.proto[self.IP][self.TCP]))
807 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
808 self.proto[self.IP][self.TCP]))
809 # permit ip any any in the end
810 rules.append(self.create_rule(self.IPV4, self.PERMIT,
811 self.PORTS_ALL, 0))
812 rules.append(self.create_rule(self.IPV6, self.PERMIT,
813 self.PORTS_ALL, 0))
814
815 # Apply rules
816 self.apply_rules(rules, "deny ip4/ip6 tcp")
817
818 # Traffic should not pass
819 self.run_verify_negat_test(self.IP, self.IPRANDOM,
820 self.proto[self.IP][self.TCP])
821
822 self.logger.info("ACLP_TEST_FINISH_0012")
823
824 def test_0013_udp_deny(self):
825 """ deny UDPv4/v6
826 """
827 self.logger.info("ACLP_TEST_START_0013")
828
829 # Add an ACL
830 rules = []
831 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
832 self.proto[self.IP][self.UDP]))
833 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
834 self.proto[self.IP][self.UDP]))
835 # permit ip any any in the end
836 rules.append(self.create_rule(self.IPV4, self.PERMIT,
837 self.PORTS_ALL, 0))
838 rules.append(self.create_rule(self.IPV6, self.PERMIT,
839 self.PORTS_ALL, 0))
840
841 # Apply rules
842 self.apply_rules(rules, "deny ip4/ip6 udp")
843
844 # Traffic should not pass
845 self.run_verify_negat_test(self.IP, self.IPRANDOM,
846 self.proto[self.IP][self.UDP])
847
848 self.logger.info("ACLP_TEST_FINISH_0013")
849
850 def test_0014_acl_dump(self):
851 """ verify add/dump acls
852 """
853 self.logger.info("ACLP_TEST_START_0014")
854
855 r = [[self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
856 [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
857 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
858 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
859 [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
860 [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
861 [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
862 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
863 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
864 [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
865 [self.IPV4, self.DENY, self.PORTS_ALL, 0],
866 [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
867 [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
868 [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
869 [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
870 [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
871 [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
872 [self.IPV6, self.DENY, self.PORTS_ALL, 0]
873 ]
874
875 # Add and verify new ACLs
876 rules = []
877 for i in range(len(r)):
878 rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
879
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200880 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules)
881 result = self.vapi.acl_dump(reply.acl_index)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100882
883 i = 0
884 for drules in result:
885 for dr in drules.r:
886 self.assertEqual(dr.is_ipv6, r[i][0])
887 self.assertEqual(dr.is_permit, r[i][1])
888 self.assertEqual(dr.proto, r[i][3])
889
890 if r[i][2] > 0:
891 self.assertEqual(dr.srcport_or_icmptype_first, r[i][2])
892 else:
893 if r[i][2] < 0:
894 self.assertEqual(dr.srcport_or_icmptype_first, 0)
895 self.assertEqual(dr.srcport_or_icmptype_last, 65535)
896 else:
897 if dr.proto == self.proto[self.IP][self.TCP]:
898 self.assertGreater(dr.srcport_or_icmptype_first,
899 self.tcp_sport_from-1)
900 self.assertLess(dr.srcport_or_icmptype_first,
901 self.tcp_sport_to+1)
902 self.assertGreater(dr.dstport_or_icmpcode_last,
903 self.tcp_dport_from-1)
904 self.assertLess(dr.dstport_or_icmpcode_last,
905 self.tcp_dport_to+1)
906 elif dr.proto == self.proto[self.IP][self.UDP]:
907 self.assertGreater(dr.srcport_or_icmptype_first,
908 self.udp_sport_from-1)
909 self.assertLess(dr.srcport_or_icmptype_first,
910 self.udp_sport_to+1)
911 self.assertGreater(dr.dstport_or_icmpcode_last,
912 self.udp_dport_from-1)
913 self.assertLess(dr.dstport_or_icmpcode_last,
914 self.udp_dport_to+1)
915 i += 1
916
917 self.logger.info("ACLP_TEST_FINISH_0014")
918
919 def test_0015_tcp_permit_port_v4(self):
920 """ permit single TCPv4
921 """
922 self.logger.info("ACLP_TEST_START_0015")
923
924 port = random.randint(0, 65535)
925 # Add an ACL
926 rules = []
927 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
928 self.proto[self.IP][self.TCP]))
929 # deny ip any any in the end
930 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
931
932 # Apply rules
933 self.apply_rules(rules, "permit ip4 tcp "+str(port))
934
935 # Traffic should still pass
936 self.run_verify_test(self.IP, self.IPV4,
937 self.proto[self.IP][self.TCP], port)
938
939 self.logger.info("ACLP_TEST_FINISH_0015")
940
941 def test_0016_udp_permit_port_v4(self):
942 """ permit single UDPv4
943 """
944 self.logger.info("ACLP_TEST_START_0016")
945
946 port = random.randint(0, 65535)
947 # Add an ACL
948 rules = []
949 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
950 self.proto[self.IP][self.UDP]))
951 # deny ip any any in the end
952 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
953
954 # Apply rules
955 self.apply_rules(rules, "permit ip4 tcp "+str(port))
956
957 # Traffic should still pass
958 self.run_verify_test(self.IP, self.IPV4,
959 self.proto[self.IP][self.UDP], port)
960
961 self.logger.info("ACLP_TEST_FINISH_0016")
962
963 def test_0017_tcp_permit_port_v6(self):
964 """ permit single TCPv6
965 """
966 self.logger.info("ACLP_TEST_START_0017")
967
968 port = random.randint(0, 65535)
969 # Add an ACL
970 rules = []
971 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
972 self.proto[self.IP][self.TCP]))
973 # deny ip any any in the end
974 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
975
976 # Apply rules
977 self.apply_rules(rules, "permit ip4 tcp "+str(port))
978
979 # Traffic should still pass
980 self.run_verify_test(self.IP, self.IPV6,
981 self.proto[self.IP][self.TCP], port)
982
983 self.logger.info("ACLP_TEST_FINISH_0017")
984
985 def test_0018_udp_permit_port_v6(self):
986 """ permit single UPPv6
987 """
988 self.logger.info("ACLP_TEST_START_0018")
989
990 port = random.randint(0, 65535)
991 # Add an ACL
992 rules = []
993 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
994 self.proto[self.IP][self.UDP]))
995 # deny ip any any in the end
996 rules.append(self.create_rule(self.IPV6, self.DENY,
997 self.PORTS_ALL, 0))
998
999 # Apply rules
1000 self.apply_rules(rules, "permit ip4 tcp "+str(port))
1001
1002 # Traffic should still pass
1003 self.run_verify_test(self.IP, self.IPV6,
1004 self.proto[self.IP][self.UDP], port)
1005
1006 self.logger.info("ACLP_TEST_FINISH_0018")
1007
1008 def test_0019_udp_deny_port(self):
1009 """ deny single TCPv4/v6
1010 """
1011 self.logger.info("ACLP_TEST_START_0019")
1012
1013 port = random.randint(0, 65535)
1014 # Add an ACL
1015 rules = []
1016 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1017 self.proto[self.IP][self.TCP]))
1018 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1019 self.proto[self.IP][self.TCP]))
1020 # Permit ip any any in the end
1021 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1022 self.PORTS_ALL, 0))
1023 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1024 self.PORTS_ALL, 0))
1025
1026 # Apply rules
1027 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1028
1029 # Traffic should not pass
1030 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1031 self.proto[self.IP][self.TCP], port)
1032
1033 self.logger.info("ACLP_TEST_FINISH_0019")
1034
1035 def test_0020_udp_deny_port(self):
1036 """ deny single UDPv4/v6
1037 """
1038 self.logger.info("ACLP_TEST_START_0020")
1039
1040 port = random.randint(0, 65535)
1041 # Add an ACL
1042 rules = []
1043 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1044 self.proto[self.IP][self.UDP]))
1045 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1046 self.proto[self.IP][self.UDP]))
1047 # Permit ip any any in the end
1048 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1049 self.PORTS_ALL, 0))
1050 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1051 self.PORTS_ALL, 0))
1052
1053 # Apply rules
1054 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1055
1056 # Traffic should not pass
1057 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1058 self.proto[self.IP][self.UDP], port)
1059
1060 self.logger.info("ACLP_TEST_FINISH_0020")
1061
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +00001062 def test_0021_udp_deny_port_verify_fragment_deny(self):
1063 """ deny single UDPv4/v6, permit ip any, verify non-initial fragment blocked
1064 """
1065 self.logger.info("ACLP_TEST_START_0021")
1066
1067 port = random.randint(0, 65535)
1068 # Add an ACL
1069 rules = []
1070 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1071 self.proto[self.IP][self.UDP]))
1072 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1073 self.proto[self.IP][self.UDP]))
1074 # deny ip any any in the end
1075 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1076 self.PORTS_ALL, 0))
1077 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1078 self.PORTS_ALL, 0))
1079
1080 # Apply rules
1081 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1082
1083 # Traffic should not pass
1084 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1085 self.proto[self.IP][self.UDP], port, True)
1086
1087 self.logger.info("ACLP_TEST_FINISH_0021")
1088
Pavel Kotuceke7b67342017-04-18 13:12:20 +02001089 def test_0022_zero_length_udp_ipv4(self):
1090 """ VPP-687 zero length udp ipv4 packet"""
1091 self.logger.info("ACLP_TEST_START_0022")
1092
1093 port = random.randint(0, 65535)
1094 # Add an ACL
1095 rules = []
1096 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
1097 self.proto[self.IP][self.UDP]))
1098 # deny ip any any in the end
1099 rules.append(
1100 self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1101
1102 # Apply rules
1103 self.apply_rules(rules, "permit empty udp ip4 " + str(port))
1104
1105 # Traffic should still pass
1106 # Create incoming packet streams for packet-generator interfaces
1107 pkts_cnt = 0
1108 pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1109 self.IP, self.IPV4,
1110 self.proto[self.IP][self.UDP], port,
1111 False, False)
1112 if len(pkts) > 0:
1113 self.pg0.add_stream(pkts)
1114 pkts_cnt += len(pkts)
1115
1116 # Enable packet capture and start packet sendingself.IPV
1117 self.pg_enable_capture(self.pg_interfaces)
1118 self.pg_start()
1119
1120 self.pg1.get_capture(pkts_cnt)
1121
1122 self.logger.info("ACLP_TEST_FINISH_0022")
1123
1124 def test_0023_zero_length_udp_ipv6(self):
1125 """ VPP-687 zero length udp ipv6 packet"""
1126 self.logger.info("ACLP_TEST_START_0023")
1127
1128 port = random.randint(0, 65535)
1129 # Add an ACL
1130 rules = []
1131 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1132 self.proto[self.IP][self.UDP]))
1133 # deny ip any any in the end
1134 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1135
1136 # Apply rules
1137 self.apply_rules(rules, "permit empty udp ip6 "+str(port))
1138
1139 # Traffic should still pass
1140 # Create incoming packet streams for packet-generator interfaces
1141 pkts_cnt = 0
1142 pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1143 self.IP, self.IPV6,
1144 self.proto[self.IP][self.UDP], port,
1145 False, False)
1146 if len(pkts) > 0:
1147 self.pg0.add_stream(pkts)
1148 pkts_cnt += len(pkts)
1149
1150 # Enable packet capture and start packet sendingself.IPV
1151 self.pg_enable_capture(self.pg_interfaces)
1152 self.pg_start()
1153
1154 # Verify outgoing packet streams per packet-generator interface
1155 self.pg1.get_capture(pkts_cnt)
1156
1157 self.logger.info("ACLP_TEST_FINISH_0023")
1158
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +02001159 def test_0108_tcp_permit_v4(self):
1160 """ permit TCPv4 + non-match range
1161 """
1162 self.logger.info("ACLP_TEST_START_0108")
1163
1164 # Add an ACL
1165 rules = []
1166 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1167 self.proto[self.IP][self.TCP]))
1168 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1169 self.proto[self.IP][self.TCP]))
1170 # deny ip any any in the end
1171 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1172
1173 # Apply rules
1174 self.apply_rules(rules, "permit ipv4 tcp")
1175
1176 # Traffic should still pass
1177 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1178
1179 self.logger.info("ACLP_TEST_FINISH_0108")
1180
1181 def test_0109_tcp_permit_v6(self):
1182 """ permit TCPv6 + non-match range
1183 """
1184 self.logger.info("ACLP_TEST_START_0109")
1185
1186 # Add an ACL
1187 rules = []
1188 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1189 self.proto[self.IP][self.TCP]))
1190 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1191 self.proto[self.IP][self.TCP]))
1192 # deny ip any any in the end
1193 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1194
1195 # Apply rules
1196 self.apply_rules(rules, "permit ip6 tcp")
1197
1198 # Traffic should still pass
1199 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
1200
1201 self.logger.info("ACLP_TEST_FINISH_0109")
1202
1203 def test_0110_udp_permit_v4(self):
1204 """ permit UDPv4 + non-match range
1205 """
1206 self.logger.info("ACLP_TEST_START_0110")
1207
1208 # Add an ACL
1209 rules = []
1210 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1211 self.proto[self.IP][self.UDP]))
1212 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1213 self.proto[self.IP][self.UDP]))
1214 # deny ip any any in the end
1215 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1216
1217 # Apply rules
1218 self.apply_rules(rules, "permit ipv4 udp")
1219
1220 # Traffic should still pass
1221 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
1222
1223 self.logger.info("ACLP_TEST_FINISH_0110")
1224
1225 def test_0111_udp_permit_v6(self):
1226 """ permit UDPv6 + non-match range
1227 """
1228 self.logger.info("ACLP_TEST_START_0111")
1229
1230 # Add an ACL
1231 rules = []
1232 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1233 self.proto[self.IP][self.UDP]))
1234 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1235 self.proto[self.IP][self.UDP]))
1236 # deny ip any any in the end
1237 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1238
1239 # Apply rules
1240 self.apply_rules(rules, "permit ip6 udp")
1241
1242 # Traffic should still pass
1243 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
1244
1245 self.logger.info("ACLP_TEST_FINISH_0111")
1246
1247 def test_0112_tcp_deny(self):
1248 """ deny TCPv4/v6 + non-match range
1249 """
1250 self.logger.info("ACLP_TEST_START_0112")
1251
1252 # Add an ACL
1253 rules = []
1254 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1255 self.PORTS_RANGE_2,
1256 self.proto[self.IP][self.TCP]))
1257 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1258 self.PORTS_RANGE_2,
1259 self.proto[self.IP][self.TCP]))
1260 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1261 self.proto[self.IP][self.TCP]))
1262 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1263 self.proto[self.IP][self.TCP]))
1264 # permit ip any any in the end
1265 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1266 self.PORTS_ALL, 0))
1267 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1268 self.PORTS_ALL, 0))
1269
1270 # Apply rules
1271 self.apply_rules(rules, "deny ip4/ip6 tcp")
1272
1273 # Traffic should not pass
1274 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1275 self.proto[self.IP][self.TCP])
1276
1277 self.logger.info("ACLP_TEST_FINISH_0112")
1278
1279 def test_0113_udp_deny(self):
1280 """ deny UDPv4/v6 + non-match range
1281 """
1282 self.logger.info("ACLP_TEST_START_0113")
1283
1284 # Add an ACL
1285 rules = []
1286 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1287 self.PORTS_RANGE_2,
1288 self.proto[self.IP][self.UDP]))
1289 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1290 self.PORTS_RANGE_2,
1291 self.proto[self.IP][self.UDP]))
1292 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1293 self.proto[self.IP][self.UDP]))
1294 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1295 self.proto[self.IP][self.UDP]))
1296 # permit ip any any in the end
1297 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1298 self.PORTS_ALL, 0))
1299 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1300 self.PORTS_ALL, 0))
1301
1302 # Apply rules
1303 self.apply_rules(rules, "deny ip4/ip6 udp")
1304
1305 # Traffic should not pass
1306 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1307 self.proto[self.IP][self.UDP])
1308
1309 self.logger.info("ACLP_TEST_FINISH_0113")
1310
1311
Pavel Kotucek59dda062017-03-02 15:22:47 +01001312if __name__ == '__main__':
1313 unittest.main(testRunner=VppTestRunner)