blob: 361ced14c21aee7d1e086e4a5bb6c5e6c3a55e77 [file] [log] [blame]
Pavel Kotucek59dda062017-03-02 15:22:47 +01001#!/usr/bin/env python
2"""ACL plugin Test Case HLD:
3"""
4
5import unittest
6import random
7
8from scapy.packet import Raw
9from scapy.layers.l2 import Ether
10from scapy.layers.inet import IP, TCP, UDP, ICMP
11from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +000012from scapy.layers.inet6 import IPv6ExtHdrFragment
Pavel Kotucek59dda062017-03-02 15:22:47 +010013from framework import VppTestCase, VppTestRunner
14from util import Host, ppp
15
16
17class TestACLplugin(VppTestCase):
18 """ ACL plugin Test Case """
19
20 # traffic types
21 IP = 0
22 ICMP = 1
23
24 # IP version
25 IPRANDOM = -1
26 IPV4 = 0
27 IPV6 = 1
28
29 # rule types
30 DENY = 0
31 PERMIT = 1
32
33 # supported protocols
34 proto = [[6, 17], [1, 58]]
35 proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
36 ICMPv4 = 0
37 ICMPv6 = 1
38 TCP = 0
39 UDP = 1
40 PROTO_ALL = 0
41
42 # port ranges
43 PORTS_ALL = -1
44 PORTS_RANGE = 0
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +020045 PORTS_RANGE_2 = 1
Pavel Kotucek59dda062017-03-02 15:22:47 +010046 udp_sport_from = 10
47 udp_sport_to = udp_sport_from + 5
48 udp_dport_from = 20000
49 udp_dport_to = udp_dport_from + 5000
50 tcp_sport_from = 30
51 tcp_sport_to = tcp_sport_from + 5
52 tcp_dport_from = 40000
53 tcp_dport_to = tcp_dport_from + 5000
54
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +020055 udp_sport_from_2 = 90
56 udp_sport_to_2 = udp_sport_from_2 + 5
57 udp_dport_from_2 = 30000
58 udp_dport_to_2 = udp_dport_from_2 + 5000
59 tcp_sport_from_2 = 130
60 tcp_sport_to_2 = tcp_sport_from_2 + 5
61 tcp_dport_from_2 = 20000
62 tcp_dport_to_2 = tcp_dport_from_2 + 5000
63
Pavel Kotucek59dda062017-03-02 15:22:47 +010064 icmp4_type = 8 # echo request
65 icmp4_code = 3
66 icmp6_type = 128 # echo request
67 icmp6_code = 3
68
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +020069 icmp4_type_2 = 8
70 icmp4_code_from_2 = 5
71 icmp4_code_to_2 = 20
72 icmp6_type_2 = 128
73 icmp6_code_from_2 = 8
74 icmp6_code_to_2 = 42
75
Pavel Kotucek59dda062017-03-02 15:22:47 +010076 # Test variables
77 bd_id = 1
78
79 @classmethod
80 def setUpClass(cls):
81 """
82 Perform standard class setup (defined by class method setUpClass in
83 class VppTestCase) before running the test case, set test case related
84 variables and configure VPP.
85 """
86 super(TestACLplugin, cls).setUpClass()
87
Pavel Kotucek59dda062017-03-02 15:22:47 +010088 try:
89 # Create 2 pg interfaces
90 cls.create_pg_interfaces(range(2))
91
92 # Packet flows mapping pg0 -> pg1, pg2 etc.
93 cls.flows = dict()
94 cls.flows[cls.pg0] = [cls.pg1]
95
96 # Packet sizes
97 cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
98
99 # Create BD with MAC learning and unknown unicast flooding disabled
100 # and put interfaces to this BD
101 cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
102 learn=1)
103 for pg_if in cls.pg_interfaces:
104 cls.vapi.sw_interface_set_l2_bridge(pg_if.sw_if_index,
105 bd_id=cls.bd_id)
106
107 # Set up all interfaces
108 for i in cls.pg_interfaces:
109 i.admin_up()
110
111 # Mapping between packet-generator index and lists of test hosts
112 cls.hosts_by_pg_idx = dict()
113 for pg_if in cls.pg_interfaces:
114 cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
115
116 # Create list of deleted hosts
117 cls.deleted_hosts_by_pg_idx = dict()
118 for pg_if in cls.pg_interfaces:
119 cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
120
121 # warm-up the mac address tables
122 # self.warmup_test()
123
124 except Exception:
125 super(TestACLplugin, cls).tearDownClass()
126 raise
127
128 def setUp(self):
129 super(TestACLplugin, self).setUp()
130 self.reset_packet_infos()
131
132 def tearDown(self):
133 """
134 Show various debug prints after each test.
135 """
136 super(TestACLplugin, self).tearDown()
137 if not self.vpp_dead:
138 self.logger.info(self.vapi.ppcli("show l2fib verbose"))
Andrew Yourtchenko7f4d5772017-05-24 13:20:47 +0200139 self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
140 self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
141 self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
Pavel Kotucek59dda062017-03-02 15:22:47 +0100142 self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
143 % self.bd_id))
144
145 def create_hosts(self, count, start=0):
146 """
147 Create required number of host MAC addresses and distribute them among
148 interfaces. Create host IPv4 address for every host MAC address.
149
150 :param int count: Number of hosts to create MAC/IPv4 addresses for.
151 :param int start: Number to start numbering from.
152 """
153 n_int = len(self.pg_interfaces)
154 macs_per_if = count / n_int
155 i = -1
156 for pg_if in self.pg_interfaces:
157 i += 1
158 start_nr = macs_per_if * i + start
159 end_nr = count + start if i == (n_int - 1) \
160 else macs_per_if * (i + 1) + start
161 hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
162 for j in range(start_nr, end_nr):
163 host = Host(
164 "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
165 "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
166 "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
167 hosts.append(host)
168
169 def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
170 s_prefix=0, s_ip='\x00\x00\x00\x00',
171 d_prefix=0, d_ip='\x00\x00\x00\x00'):
172 if proto == -1:
173 return
174 if ports == self.PORTS_ALL:
175 sport_from = 0
176 dport_from = 0
177 sport_to = 65535 if proto != 1 and proto != 58 else 255
178 dport_to = sport_to
179 elif ports == self.PORTS_RANGE:
180 if proto == 1:
181 sport_from = self.icmp4_type
182 sport_to = self.icmp4_type
183 dport_from = self.icmp4_code
184 dport_to = self.icmp4_code
185 elif proto == 58:
186 sport_from = self.icmp6_type
187 sport_to = self.icmp6_type
188 dport_from = self.icmp6_code
189 dport_to = self.icmp6_code
190 elif proto == self.proto[self.IP][self.TCP]:
191 sport_from = self.tcp_sport_from
192 sport_to = self.tcp_sport_to
193 dport_from = self.tcp_dport_from
194 dport_to = self.tcp_dport_to
195 elif proto == self.proto[self.IP][self.UDP]:
196 sport_from = self.udp_sport_from
197 sport_to = self.udp_sport_to
198 dport_from = self.udp_dport_from
199 dport_to = self.udp_dport_to
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +0200200 elif ports == self.PORTS_RANGE_2:
201 if proto == 1:
202 sport_from = self.icmp4_type_2
203 sport_to = self.icmp4_type_2
204 dport_from = self.icmp4_code_from_2
205 dport_to = self.icmp4_code_to_2
206 elif proto == 58:
207 sport_from = self.icmp6_type_2
208 sport_to = self.icmp6_type_2
209 dport_from = self.icmp6_code_from_2
210 dport_to = self.icmp6_code_to_2
211 elif proto == self.proto[self.IP][self.TCP]:
212 sport_from = self.tcp_sport_from_2
213 sport_to = self.tcp_sport_to_2
214 dport_from = self.tcp_dport_from_2
215 dport_to = self.tcp_dport_to_2
216 elif proto == self.proto[self.IP][self.UDP]:
217 sport_from = self.udp_sport_from_2
218 sport_to = self.udp_sport_to_2
219 dport_from = self.udp_dport_from_2
220 dport_to = self.udp_dport_to_2
Pavel Kotucek59dda062017-03-02 15:22:47 +0100221 else:
222 sport_from = ports
223 sport_to = ports
224 dport_from = ports
225 dport_to = ports
226
227 rule = ({'is_permit': permit_deny, 'is_ipv6': ip, 'proto': proto,
228 'srcport_or_icmptype_first': sport_from,
229 'srcport_or_icmptype_last': sport_to,
230 'src_ip_prefix_len': s_prefix,
231 'src_ip_addr': s_ip,
232 'dstport_or_icmpcode_first': dport_from,
233 'dstport_or_icmpcode_last': dport_to,
234 'dst_ip_prefix_len': d_prefix,
235 'dst_ip_addr': d_ip})
236 return rule
237
238 def apply_rules(self, rules, tag=''):
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200239 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
240 tag=tag)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100241 self.logger.info("Dumped ACL: " + str(
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200242 self.vapi.acl_dump(reply.acl_index)))
Pavel Kotucek59dda062017-03-02 15:22:47 +0100243 # Apply a ACL on the interface as inbound
244 for i in self.pg_interfaces:
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200245 self.vapi.acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
246 n_input=1,
247 acls=[reply.acl_index])
Pavel Kotucek59dda062017-03-02 15:22:47 +0100248 return
249
250 def create_upper_layer(self, packet_index, proto, ports=0):
251 p = self.proto_map[proto]
252 if p == 'UDP':
253 if ports == 0:
254 return UDP(sport=random.randint(self.udp_sport_from,
255 self.udp_sport_to),
256 dport=random.randint(self.udp_dport_from,
257 self.udp_dport_to))
258 else:
259 return UDP(sport=ports, dport=ports)
260 elif p == 'TCP':
261 if ports == 0:
262 return TCP(sport=random.randint(self.tcp_sport_from,
263 self.tcp_sport_to),
264 dport=random.randint(self.tcp_dport_from,
265 self.tcp_dport_to))
266 else:
267 return TCP(sport=ports, dport=ports)
268 return ''
269
270 def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
Pavel Kotuceke7b67342017-04-18 13:12:20 +0200271 proto=-1, ports=0, fragments=False, pkt_raw=True):
Pavel Kotucek59dda062017-03-02 15:22:47 +0100272 """
273 Create input packet stream for defined interface using hosts or
274 deleted_hosts list.
275
276 :param object src_if: Interface to create packet stream for.
277 :param list packet_sizes: List of required packet sizes.
278 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
279 :return: Stream of packets.
280 """
281 pkts = []
282 if self.flows.__contains__(src_if):
283 src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
284 for dst_if in self.flows[src_if]:
285 dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
286 n_int = len(dst_hosts) * len(src_hosts)
287 for i in range(0, n_int):
288 dst_host = dst_hosts[i / len(src_hosts)]
289 src_host = src_hosts[i % len(src_hosts)]
290 pkt_info = self.create_packet_info(src_if, dst_if)
291 if ipv6 == 1:
292 pkt_info.ip = 1
293 elif ipv6 == 0:
294 pkt_info.ip = 0
295 else:
296 pkt_info.ip = random.choice([0, 1])
297 if proto == -1:
298 pkt_info.proto = random.choice(self.proto[self.IP])
299 else:
300 pkt_info.proto = proto
301 payload = self.info_to_payload(pkt_info)
302 p = Ether(dst=dst_host.mac, src=src_host.mac)
303 if pkt_info.ip:
304 p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000305 if fragments:
306 p /= IPv6ExtHdrFragment(offset=64, m=1)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100307 else:
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000308 if fragments:
309 p /= IP(src=src_host.ip4, dst=dst_host.ip4,
310 flags=1, frag=64)
311 else:
312 p /= IP(src=src_host.ip4, dst=dst_host.ip4)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100313 if traffic_type == self.ICMP:
314 if pkt_info.ip:
315 p /= ICMPv6EchoRequest(type=self.icmp6_type,
316 code=self.icmp6_code)
317 else:
318 p /= ICMP(type=self.icmp4_type,
319 code=self.icmp4_code)
320 else:
321 p /= self.create_upper_layer(i, pkt_info.proto, ports)
Pavel Kotuceke7b67342017-04-18 13:12:20 +0200322 if pkt_raw:
323 p /= Raw(payload)
324 pkt_info.data = p.copy()
325 if pkt_raw:
326 size = random.choice(packet_sizes)
327 self.extend_packet(p, size)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100328 pkts.append(p)
329 return pkts
330
331 def verify_capture(self, pg_if, capture, traffic_type=0, ip_type=0):
332 """
333 Verify captured input packet stream for defined interface.
334
335 :param object pg_if: Interface to verify captured packet stream for.
336 :param list capture: Captured packet stream.
337 :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
338 """
339 last_info = dict()
340 for i in self.pg_interfaces:
341 last_info[i.sw_if_index] = None
342 dst_sw_if_index = pg_if.sw_if_index
343 for packet in capture:
344 try:
345 # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
346 if traffic_type == self.ICMP and ip_type == self.IPV6:
347 payload_info = self.payload_to_info(
348 packet[ICMPv6EchoRequest].data)
349 payload = packet[ICMPv6EchoRequest]
350 else:
351 payload_info = self.payload_to_info(str(packet[Raw]))
352 payload = packet[self.proto_map[payload_info.proto]]
353 except:
354 self.logger.error(ppp("Unexpected or invalid packet "
355 "(outside network):", packet))
356 raise
357
358 if ip_type != 0:
359 self.assertEqual(payload_info.ip, ip_type)
360 if traffic_type == self.ICMP:
361 try:
362 if payload_info.ip == 0:
363 self.assertEqual(payload.type, self.icmp4_type)
364 self.assertEqual(payload.code, self.icmp4_code)
365 else:
366 self.assertEqual(payload.type, self.icmp6_type)
367 self.assertEqual(payload.code, self.icmp6_code)
368 except:
369 self.logger.error(ppp("Unexpected or invalid packet "
370 "(outside network):", packet))
371 raise
372 else:
373 try:
374 ip_version = IPv6 if payload_info.ip == 1 else IP
375
376 ip = packet[ip_version]
377 packet_index = payload_info.index
378
379 self.assertEqual(payload_info.dst, dst_sw_if_index)
380 self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
381 (pg_if.name, payload_info.src,
382 packet_index))
383 next_info = self.get_next_packet_info_for_interface2(
384 payload_info.src, dst_sw_if_index,
385 last_info[payload_info.src])
386 last_info[payload_info.src] = next_info
387 self.assertTrue(next_info is not None)
388 self.assertEqual(packet_index, next_info.index)
389 saved_packet = next_info.data
390 # Check standard fields
391 self.assertEqual(ip.src, saved_packet[ip_version].src)
392 self.assertEqual(ip.dst, saved_packet[ip_version].dst)
393 p = self.proto_map[payload_info.proto]
394 if p == 'TCP':
395 tcp = packet[TCP]
396 self.assertEqual(tcp.sport, saved_packet[
397 TCP].sport)
398 self.assertEqual(tcp.dport, saved_packet[
399 TCP].dport)
400 elif p == 'UDP':
401 udp = packet[UDP]
402 self.assertEqual(udp.sport, saved_packet[
403 UDP].sport)
404 self.assertEqual(udp.dport, saved_packet[
405 UDP].dport)
406 except:
407 self.logger.error(ppp("Unexpected or invalid packet:",
408 packet))
409 raise
410 for i in self.pg_interfaces:
411 remaining_packet = self.get_next_packet_info_for_interface2(
412 i, dst_sw_if_index, last_info[i.sw_if_index])
413 self.assertTrue(
414 remaining_packet is None,
415 "Port %u: Packet expected from source %u didn't arrive" %
416 (dst_sw_if_index, i.sw_if_index))
417
418 def run_traffic_no_check(self):
419 # Test
420 # Create incoming packet streams for packet-generator interfaces
421 for i in self.pg_interfaces:
422 if self.flows.__contains__(i):
423 pkts = self.create_stream(i, self.pg_if_packet_sizes)
424 if len(pkts) > 0:
425 i.add_stream(pkts)
426
427 # Enable packet capture and start packet sending
428 self.pg_enable_capture(self.pg_interfaces)
429 self.pg_start()
430
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000431 def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
Pavel Kotuceke7b67342017-04-18 13:12:20 +0200432 frags=False, pkt_raw=True):
Pavel Kotucek59dda062017-03-02 15:22:47 +0100433 # Test
434 # Create incoming packet streams for packet-generator interfaces
435 pkts_cnt = 0
436 for i in self.pg_interfaces:
437 if self.flows.__contains__(i):
438 pkts = self.create_stream(i, self.pg_if_packet_sizes,
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000439 traffic_type, ip_type, proto, ports,
Pavel Kotuceke7b67342017-04-18 13:12:20 +0200440 frags, pkt_raw)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100441 if len(pkts) > 0:
442 i.add_stream(pkts)
443 pkts_cnt += len(pkts)
444
445 # Enable packet capture and start packet sendingself.IPV
446 self.pg_enable_capture(self.pg_interfaces)
447 self.pg_start()
448
449 # Verify
450 # Verify outgoing packet streams per packet-generator interface
451 for src_if in self.pg_interfaces:
452 if self.flows.__contains__(src_if):
453 for dst_if in self.flows[src_if]:
454 capture = dst_if.get_capture(pkts_cnt)
455 self.logger.info("Verifying capture on interface %s" %
456 dst_if.name)
457 self.verify_capture(dst_if, capture, traffic_type, ip_type)
458
459 def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000460 ports=0, frags=False):
Pavel Kotucek59dda062017-03-02 15:22:47 +0100461 # Test
462 self.reset_packet_infos()
463 for i in self.pg_interfaces:
464 if self.flows.__contains__(i):
465 pkts = self.create_stream(i, self.pg_if_packet_sizes,
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +0000466 traffic_type, ip_type, proto, ports,
467 frags)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100468 if len(pkts) > 0:
469 i.add_stream(pkts)
470
471 # Enable packet capture and start packet sending
472 self.pg_enable_capture(self.pg_interfaces)
473 self.pg_start()
474
475 # Verify
476 # Verify outgoing packet streams per packet-generator interface
477 for src_if in self.pg_interfaces:
478 if self.flows.__contains__(src_if):
479 for dst_if in self.flows[src_if]:
480 self.logger.info("Verifying capture on interface %s" %
481 dst_if.name)
482 capture = dst_if.get_capture(0)
483 self.assertEqual(len(capture), 0)
484
Pavel Kotucek59dda062017-03-02 15:22:47 +0100485 def test_0000_warmup_test(self):
486 """ ACL plugin version check; learn MACs
487 """
488 self.create_hosts(16)
489 self.run_traffic_no_check()
490 reply = self.vapi.papi.acl_plugin_get_version()
491 self.assertEqual(reply.major, 1)
492 self.logger.info("Working with ACL plugin version: %d.%d" % (
493 reply.major, reply.minor))
494 # minor version changes are non breaking
495 # self.assertEqual(reply.minor, 0)
496
497 def test_0001_acl_create(self):
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200498 """ ACL create/delete test
Pavel Kotucek59dda062017-03-02 15:22:47 +0100499 """
500
501 self.logger.info("ACLP_TEST_START_0001")
502 # Add an ACL
503 r = [{'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
504 'srcport_or_icmptype_first': 1234,
505 'srcport_or_icmptype_last': 1235,
506 'src_ip_prefix_len': 0,
507 'src_ip_addr': '\x00\x00\x00\x00',
508 'dstport_or_icmpcode_first': 1234,
509 'dstport_or_icmpcode_last': 1234,
510 'dst_ip_addr': '\x00\x00\x00\x00',
511 'dst_ip_prefix_len': 0}]
512 # Test 1: add a new ACL
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200513 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r,
514 tag="permit 1234")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100515 self.assertEqual(reply.retval, 0)
516 # The very first ACL gets #0
517 self.assertEqual(reply.acl_index, 0)
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200518 first_acl = reply.acl_index
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200519 rr = self.vapi.acl_dump(reply.acl_index)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100520 self.logger.info("Dumped ACL: " + str(rr))
521 self.assertEqual(len(rr), 1)
522 # We should have the same number of ACL entries as we had asked
523 self.assertEqual(len(rr[0].r), len(r))
524 # The rules should be the same. But because the submitted and returned
525 # are different types, we need to iterate over rules and keys to get
526 # to basic values.
527 for i_rule in range(0, len(r) - 1):
528 for rule_key in r[i_rule]:
529 self.assertEqual(rr[0].r[i_rule][rule_key],
530 r[i_rule][rule_key])
531
532 # Add a deny-1234 ACL
Ole Troan895b6e82017-10-20 13:28:20 +0200533 r_deny = [{'is_permit': 0, 'is_ipv6': 0, 'proto': 17,
Pavel Kotucek59dda062017-03-02 15:22:47 +0100534 'srcport_or_icmptype_first': 1234,
535 'srcport_or_icmptype_last': 1235,
536 'src_ip_prefix_len': 0,
537 'src_ip_addr': '\x00\x00\x00\x00',
538 'dstport_or_icmpcode_first': 1234,
539 'dstport_or_icmpcode_last': 1234,
540 'dst_ip_addr': '\x00\x00\x00\x00',
541 'dst_ip_prefix_len': 0},
542 {'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
543 'srcport_or_icmptype_first': 0,
544 'srcport_or_icmptype_last': 0,
545 'src_ip_prefix_len': 0,
546 'src_ip_addr': '\x00\x00\x00\x00',
547 'dstport_or_icmpcode_first': 0,
548 'dstport_or_icmpcode_last': 0,
549 'dst_ip_addr': '\x00\x00\x00\x00',
Ole Troan895b6e82017-10-20 13:28:20 +0200550 'dst_ip_prefix_len': 0}]
Pavel Kotucek59dda062017-03-02 15:22:47 +0100551
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200552 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_deny,
553 tag="deny 1234;permit all")
Pavel Kotucek59dda062017-03-02 15:22:47 +0100554 self.assertEqual(reply.retval, 0)
555 # The second ACL gets #1
556 self.assertEqual(reply.acl_index, 1)
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200557 second_acl = reply.acl_index
Pavel Kotucek59dda062017-03-02 15:22:47 +0100558
559 # Test 2: try to modify a nonexistent ACL
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200560 reply = self.vapi.acl_add_replace(acl_index=432, r=r,
Jon Loeliger27cadd22017-11-10 13:15:12 -0600561 tag="FFFF:FFFF", expected_retval=-6)
562 self.assertEqual(reply.retval, -6)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100563 # The ACL number should pass through
564 self.assertEqual(reply.acl_index, 432)
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200565 # apply an ACL on an interface inbound, try to delete ACL, must fail
566 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
567 n_input=1,
568 acls=[first_acl])
Jon Loeliger27cadd22017-11-10 13:15:12 -0600569 reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=-142)
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200570 # Unapply an ACL and then try to delete it - must be ok
571 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
572 n_input=0,
573 acls=[])
574 reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=0)
575
576 # apply an ACL on an interface outbound, try to delete ACL, must fail
577 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
578 n_input=0,
579 acls=[second_acl])
Jon Loeliger27cadd22017-11-10 13:15:12 -0600580 reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=-143)
Andrew Yourtchenko987abe92017-09-27 13:50:31 +0200581 # Unapply the ACL and then try to delete it - must be ok
582 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
583 n_input=0,
584 acls=[])
585 reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=0)
586
587 # try to apply a nonexistent ACL - must fail
588 self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
589 n_input=1,
590 acls=[first_acl],
Jon Loeliger27cadd22017-11-10 13:15:12 -0600591 expected_retval=-6)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100592
593 self.logger.info("ACLP_TEST_FINISH_0001")
594
595 def test_0002_acl_permit_apply(self):
596 """ permit ACL apply test
597 """
598 self.logger.info("ACLP_TEST_START_0002")
599
600 rules = []
601 rules.append(self.create_rule(self.IPV4, self.PERMIT,
602 0, self.proto[self.IP][self.UDP]))
603 rules.append(self.create_rule(self.IPV4, self.PERMIT,
604 0, self.proto[self.IP][self.TCP]))
605
606 # Apply rules
607 self.apply_rules(rules, "permit per-flow")
608
609 # Traffic should still pass
610 self.run_verify_test(self.IP, self.IPV4, -1)
611 self.logger.info("ACLP_TEST_FINISH_0002")
612
613 def test_0003_acl_deny_apply(self):
614 """ deny ACL apply test
615 """
616 self.logger.info("ACLP_TEST_START_0003")
617 # Add a deny-flows ACL
618 rules = []
619 rules.append(self.create_rule(self.IPV4, self.DENY,
620 self.PORTS_ALL, self.proto[self.IP][self.UDP]))
621 # Permit ip any any in the end
622 rules.append(self.create_rule(self.IPV4, self.PERMIT,
623 self.PORTS_ALL, 0))
624
625 # Apply rules
626 self.apply_rules(rules, "deny per-flow;permit all")
627
628 # Traffic should not pass
629 self.run_verify_negat_test(self.IP, self.IPV4,
630 self.proto[self.IP][self.UDP])
631 self.logger.info("ACLP_TEST_FINISH_0003")
632 # self.assertEqual(1, 0)
633
634 def test_0004_vpp624_permit_icmpv4(self):
635 """ VPP_624 permit ICMPv4
636 """
637 self.logger.info("ACLP_TEST_START_0004")
638
639 # Add an ACL
640 rules = []
641 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
642 self.proto[self.ICMP][self.ICMPv4]))
643 # deny ip any any in the end
644 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
645
646 # Apply rules
647 self.apply_rules(rules, "permit icmpv4")
648
649 # Traffic should still pass
650 self.run_verify_test(self.ICMP, self.IPV4,
651 self.proto[self.ICMP][self.ICMPv4])
652
653 self.logger.info("ACLP_TEST_FINISH_0004")
654
655 def test_0005_vpp624_permit_icmpv6(self):
656 """ VPP_624 permit ICMPv6
657 """
658 self.logger.info("ACLP_TEST_START_0005")
659
660 # Add an ACL
661 rules = []
662 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
663 self.proto[self.ICMP][self.ICMPv6]))
664 # deny ip any any in the end
665 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
666
667 # Apply rules
668 self.apply_rules(rules, "permit icmpv6")
669
670 # Traffic should still pass
671 self.run_verify_test(self.ICMP, self.IPV6,
672 self.proto[self.ICMP][self.ICMPv6])
673
674 self.logger.info("ACLP_TEST_FINISH_0005")
675
676 def test_0006_vpp624_deny_icmpv4(self):
677 """ VPP_624 deny ICMPv4
678 """
679 self.logger.info("ACLP_TEST_START_0006")
680 # Add an ACL
681 rules = []
682 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
683 self.proto[self.ICMP][self.ICMPv4]))
684 # permit ip any any in the end
685 rules.append(self.create_rule(self.IPV4, self.PERMIT,
686 self.PORTS_ALL, 0))
687
688 # Apply rules
689 self.apply_rules(rules, "deny icmpv4")
690
691 # Traffic should not pass
692 self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
693
694 self.logger.info("ACLP_TEST_FINISH_0006")
695
696 def test_0007_vpp624_deny_icmpv6(self):
697 """ VPP_624 deny ICMPv6
698 """
699 self.logger.info("ACLP_TEST_START_0007")
700 # Add an ACL
701 rules = []
702 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
703 self.proto[self.ICMP][self.ICMPv6]))
704 # deny ip any any in the end
705 rules.append(self.create_rule(self.IPV6, self.PERMIT,
706 self.PORTS_ALL, 0))
707
708 # Apply rules
709 self.apply_rules(rules, "deny icmpv6")
710
711 # Traffic should not pass
712 self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
713
714 self.logger.info("ACLP_TEST_FINISH_0007")
715
716 def test_0008_tcp_permit_v4(self):
717 """ permit TCPv4
718 """
719 self.logger.info("ACLP_TEST_START_0008")
720
721 # Add an ACL
722 rules = []
723 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
724 self.proto[self.IP][self.TCP]))
725 # deny ip any any in the end
726 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
727
728 # Apply rules
729 self.apply_rules(rules, "permit ipv4 tcp")
730
731 # Traffic should still pass
732 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
733
734 self.logger.info("ACLP_TEST_FINISH_0008")
735
736 def test_0009_tcp_permit_v6(self):
737 """ permit TCPv6
738 """
739 self.logger.info("ACLP_TEST_START_0009")
740
741 # Add an ACL
742 rules = []
743 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
744 self.proto[self.IP][self.TCP]))
745 # deny ip any any in the end
746 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
747
748 # Apply rules
749 self.apply_rules(rules, "permit ip6 tcp")
750
751 # Traffic should still pass
752 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
753
754 self.logger.info("ACLP_TEST_FINISH_0008")
755
756 def test_0010_udp_permit_v4(self):
757 """ permit UDPv4
758 """
759 self.logger.info("ACLP_TEST_START_0010")
760
761 # Add an ACL
762 rules = []
763 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
764 self.proto[self.IP][self.UDP]))
765 # deny ip any any in the end
766 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
767
768 # Apply rules
769 self.apply_rules(rules, "permit ipv udp")
770
771 # Traffic should still pass
772 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
773
774 self.logger.info("ACLP_TEST_FINISH_0010")
775
776 def test_0011_udp_permit_v6(self):
777 """ permit UDPv6
778 """
779 self.logger.info("ACLP_TEST_START_0011")
780
781 # Add an ACL
782 rules = []
783 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
784 self.proto[self.IP][self.UDP]))
785 # deny ip any any in the end
786 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
787
788 # Apply rules
789 self.apply_rules(rules, "permit ip6 udp")
790
791 # Traffic should still pass
792 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
793
794 self.logger.info("ACLP_TEST_FINISH_0011")
795
796 def test_0012_tcp_deny(self):
797 """ deny TCPv4/v6
798 """
799 self.logger.info("ACLP_TEST_START_0012")
800
801 # Add an ACL
802 rules = []
803 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
804 self.proto[self.IP][self.TCP]))
805 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
806 self.proto[self.IP][self.TCP]))
807 # permit ip any any in the end
808 rules.append(self.create_rule(self.IPV4, self.PERMIT,
809 self.PORTS_ALL, 0))
810 rules.append(self.create_rule(self.IPV6, self.PERMIT,
811 self.PORTS_ALL, 0))
812
813 # Apply rules
814 self.apply_rules(rules, "deny ip4/ip6 tcp")
815
816 # Traffic should not pass
817 self.run_verify_negat_test(self.IP, self.IPRANDOM,
818 self.proto[self.IP][self.TCP])
819
820 self.logger.info("ACLP_TEST_FINISH_0012")
821
822 def test_0013_udp_deny(self):
823 """ deny UDPv4/v6
824 """
825 self.logger.info("ACLP_TEST_START_0013")
826
827 # Add an ACL
828 rules = []
829 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
830 self.proto[self.IP][self.UDP]))
831 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
832 self.proto[self.IP][self.UDP]))
833 # permit ip any any in the end
834 rules.append(self.create_rule(self.IPV4, self.PERMIT,
835 self.PORTS_ALL, 0))
836 rules.append(self.create_rule(self.IPV6, self.PERMIT,
837 self.PORTS_ALL, 0))
838
839 # Apply rules
840 self.apply_rules(rules, "deny ip4/ip6 udp")
841
842 # Traffic should not pass
843 self.run_verify_negat_test(self.IP, self.IPRANDOM,
844 self.proto[self.IP][self.UDP])
845
846 self.logger.info("ACLP_TEST_FINISH_0013")
847
848 def test_0014_acl_dump(self):
849 """ verify add/dump acls
850 """
851 self.logger.info("ACLP_TEST_START_0014")
852
853 r = [[self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
854 [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
855 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
856 [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
857 [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
858 [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
859 [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
860 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
861 [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
862 [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
863 [self.IPV4, self.DENY, self.PORTS_ALL, 0],
864 [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
865 [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
866 [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
867 [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
868 [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
869 [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
870 [self.IPV6, self.DENY, self.PORTS_ALL, 0]
871 ]
872
873 # Add and verify new ACLs
874 rules = []
875 for i in range(len(r)):
876 rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
877
Andrew Yourtchenko51d26512017-09-14 18:26:36 +0200878 reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules)
879 result = self.vapi.acl_dump(reply.acl_index)
Pavel Kotucek59dda062017-03-02 15:22:47 +0100880
881 i = 0
882 for drules in result:
883 for dr in drules.r:
884 self.assertEqual(dr.is_ipv6, r[i][0])
885 self.assertEqual(dr.is_permit, r[i][1])
886 self.assertEqual(dr.proto, r[i][3])
887
888 if r[i][2] > 0:
889 self.assertEqual(dr.srcport_or_icmptype_first, r[i][2])
890 else:
891 if r[i][2] < 0:
892 self.assertEqual(dr.srcport_or_icmptype_first, 0)
893 self.assertEqual(dr.srcport_or_icmptype_last, 65535)
894 else:
895 if dr.proto == self.proto[self.IP][self.TCP]:
896 self.assertGreater(dr.srcport_or_icmptype_first,
897 self.tcp_sport_from-1)
898 self.assertLess(dr.srcport_or_icmptype_first,
899 self.tcp_sport_to+1)
900 self.assertGreater(dr.dstport_or_icmpcode_last,
901 self.tcp_dport_from-1)
902 self.assertLess(dr.dstport_or_icmpcode_last,
903 self.tcp_dport_to+1)
904 elif dr.proto == self.proto[self.IP][self.UDP]:
905 self.assertGreater(dr.srcport_or_icmptype_first,
906 self.udp_sport_from-1)
907 self.assertLess(dr.srcport_or_icmptype_first,
908 self.udp_sport_to+1)
909 self.assertGreater(dr.dstport_or_icmpcode_last,
910 self.udp_dport_from-1)
911 self.assertLess(dr.dstport_or_icmpcode_last,
912 self.udp_dport_to+1)
913 i += 1
914
915 self.logger.info("ACLP_TEST_FINISH_0014")
916
917 def test_0015_tcp_permit_port_v4(self):
918 """ permit single TCPv4
919 """
920 self.logger.info("ACLP_TEST_START_0015")
921
922 port = random.randint(0, 65535)
923 # Add an ACL
924 rules = []
925 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
926 self.proto[self.IP][self.TCP]))
927 # deny ip any any in the end
928 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
929
930 # Apply rules
931 self.apply_rules(rules, "permit ip4 tcp "+str(port))
932
933 # Traffic should still pass
934 self.run_verify_test(self.IP, self.IPV4,
935 self.proto[self.IP][self.TCP], port)
936
937 self.logger.info("ACLP_TEST_FINISH_0015")
938
939 def test_0016_udp_permit_port_v4(self):
940 """ permit single UDPv4
941 """
942 self.logger.info("ACLP_TEST_START_0016")
943
944 port = random.randint(0, 65535)
945 # Add an ACL
946 rules = []
947 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
948 self.proto[self.IP][self.UDP]))
949 # deny ip any any in the end
950 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
951
952 # Apply rules
953 self.apply_rules(rules, "permit ip4 tcp "+str(port))
954
955 # Traffic should still pass
956 self.run_verify_test(self.IP, self.IPV4,
957 self.proto[self.IP][self.UDP], port)
958
959 self.logger.info("ACLP_TEST_FINISH_0016")
960
961 def test_0017_tcp_permit_port_v6(self):
962 """ permit single TCPv6
963 """
964 self.logger.info("ACLP_TEST_START_0017")
965
966 port = random.randint(0, 65535)
967 # Add an ACL
968 rules = []
969 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
970 self.proto[self.IP][self.TCP]))
971 # deny ip any any in the end
972 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
973
974 # Apply rules
975 self.apply_rules(rules, "permit ip4 tcp "+str(port))
976
977 # Traffic should still pass
978 self.run_verify_test(self.IP, self.IPV6,
979 self.proto[self.IP][self.TCP], port)
980
981 self.logger.info("ACLP_TEST_FINISH_0017")
982
983 def test_0018_udp_permit_port_v6(self):
984 """ permit single UPPv6
985 """
986 self.logger.info("ACLP_TEST_START_0018")
987
988 port = random.randint(0, 65535)
989 # Add an ACL
990 rules = []
991 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
992 self.proto[self.IP][self.UDP]))
993 # deny ip any any in the end
994 rules.append(self.create_rule(self.IPV6, self.DENY,
995 self.PORTS_ALL, 0))
996
997 # Apply rules
998 self.apply_rules(rules, "permit ip4 tcp "+str(port))
999
1000 # Traffic should still pass
1001 self.run_verify_test(self.IP, self.IPV6,
1002 self.proto[self.IP][self.UDP], port)
1003
1004 self.logger.info("ACLP_TEST_FINISH_0018")
1005
1006 def test_0019_udp_deny_port(self):
1007 """ deny single TCPv4/v6
1008 """
1009 self.logger.info("ACLP_TEST_START_0019")
1010
1011 port = random.randint(0, 65535)
1012 # Add an ACL
1013 rules = []
1014 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1015 self.proto[self.IP][self.TCP]))
1016 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1017 self.proto[self.IP][self.TCP]))
1018 # Permit ip any any in the end
1019 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1020 self.PORTS_ALL, 0))
1021 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1022 self.PORTS_ALL, 0))
1023
1024 # Apply rules
1025 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1026
1027 # Traffic should not pass
1028 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1029 self.proto[self.IP][self.TCP], port)
1030
1031 self.logger.info("ACLP_TEST_FINISH_0019")
1032
1033 def test_0020_udp_deny_port(self):
1034 """ deny single UDPv4/v6
1035 """
1036 self.logger.info("ACLP_TEST_START_0020")
1037
1038 port = random.randint(0, 65535)
1039 # Add an ACL
1040 rules = []
1041 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1042 self.proto[self.IP][self.UDP]))
1043 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1044 self.proto[self.IP][self.UDP]))
1045 # Permit ip any any in the end
1046 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1047 self.PORTS_ALL, 0))
1048 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1049 self.PORTS_ALL, 0))
1050
1051 # Apply rules
1052 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1053
1054 # Traffic should not pass
1055 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1056 self.proto[self.IP][self.UDP], port)
1057
1058 self.logger.info("ACLP_TEST_FINISH_0020")
1059
Andrew Yourtchenkod1b05642017-04-04 14:10:40 +00001060 def test_0021_udp_deny_port_verify_fragment_deny(self):
1061 """ deny single UDPv4/v6, permit ip any, verify non-initial fragment blocked
1062 """
1063 self.logger.info("ACLP_TEST_START_0021")
1064
1065 port = random.randint(0, 65535)
1066 # Add an ACL
1067 rules = []
1068 rules.append(self.create_rule(self.IPV4, self.DENY, port,
1069 self.proto[self.IP][self.UDP]))
1070 rules.append(self.create_rule(self.IPV6, self.DENY, port,
1071 self.proto[self.IP][self.UDP]))
1072 # deny ip any any in the end
1073 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1074 self.PORTS_ALL, 0))
1075 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1076 self.PORTS_ALL, 0))
1077
1078 # Apply rules
1079 self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
1080
1081 # Traffic should not pass
1082 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1083 self.proto[self.IP][self.UDP], port, True)
1084
1085 self.logger.info("ACLP_TEST_FINISH_0021")
1086
Pavel Kotuceke7b67342017-04-18 13:12:20 +02001087 def test_0022_zero_length_udp_ipv4(self):
1088 """ VPP-687 zero length udp ipv4 packet"""
1089 self.logger.info("ACLP_TEST_START_0022")
1090
1091 port = random.randint(0, 65535)
1092 # Add an ACL
1093 rules = []
1094 rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
1095 self.proto[self.IP][self.UDP]))
1096 # deny ip any any in the end
1097 rules.append(
1098 self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1099
1100 # Apply rules
1101 self.apply_rules(rules, "permit empty udp ip4 " + str(port))
1102
1103 # Traffic should still pass
1104 # Create incoming packet streams for packet-generator interfaces
1105 pkts_cnt = 0
1106 pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1107 self.IP, self.IPV4,
1108 self.proto[self.IP][self.UDP], port,
1109 False, False)
1110 if len(pkts) > 0:
1111 self.pg0.add_stream(pkts)
1112 pkts_cnt += len(pkts)
1113
1114 # Enable packet capture and start packet sendingself.IPV
1115 self.pg_enable_capture(self.pg_interfaces)
1116 self.pg_start()
1117
1118 self.pg1.get_capture(pkts_cnt)
1119
1120 self.logger.info("ACLP_TEST_FINISH_0022")
1121
1122 def test_0023_zero_length_udp_ipv6(self):
1123 """ VPP-687 zero length udp ipv6 packet"""
1124 self.logger.info("ACLP_TEST_START_0023")
1125
1126 port = random.randint(0, 65535)
1127 # Add an ACL
1128 rules = []
1129 rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
1130 self.proto[self.IP][self.UDP]))
1131 # deny ip any any in the end
1132 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1133
1134 # Apply rules
1135 self.apply_rules(rules, "permit empty udp ip6 "+str(port))
1136
1137 # Traffic should still pass
1138 # Create incoming packet streams for packet-generator interfaces
1139 pkts_cnt = 0
1140 pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
1141 self.IP, self.IPV6,
1142 self.proto[self.IP][self.UDP], port,
1143 False, False)
1144 if len(pkts) > 0:
1145 self.pg0.add_stream(pkts)
1146 pkts_cnt += len(pkts)
1147
1148 # Enable packet capture and start packet sendingself.IPV
1149 self.pg_enable_capture(self.pg_interfaces)
1150 self.pg_start()
1151
1152 # Verify outgoing packet streams per packet-generator interface
1153 self.pg1.get_capture(pkts_cnt)
1154
1155 self.logger.info("ACLP_TEST_FINISH_0023")
1156
Andrew Yourtchenko6be72cd2017-08-10 17:02:58 +02001157 def test_0108_tcp_permit_v4(self):
1158 """ permit TCPv4 + non-match range
1159 """
1160 self.logger.info("ACLP_TEST_START_0108")
1161
1162 # Add an ACL
1163 rules = []
1164 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1165 self.proto[self.IP][self.TCP]))
1166 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1167 self.proto[self.IP][self.TCP]))
1168 # deny ip any any in the end
1169 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1170
1171 # Apply rules
1172 self.apply_rules(rules, "permit ipv4 tcp")
1173
1174 # Traffic should still pass
1175 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
1176
1177 self.logger.info("ACLP_TEST_FINISH_0108")
1178
1179 def test_0109_tcp_permit_v6(self):
1180 """ permit TCPv6 + non-match range
1181 """
1182 self.logger.info("ACLP_TEST_START_0109")
1183
1184 # Add an ACL
1185 rules = []
1186 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1187 self.proto[self.IP][self.TCP]))
1188 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1189 self.proto[self.IP][self.TCP]))
1190 # deny ip any any in the end
1191 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1192
1193 # Apply rules
1194 self.apply_rules(rules, "permit ip6 tcp")
1195
1196 # Traffic should still pass
1197 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
1198
1199 self.logger.info("ACLP_TEST_FINISH_0109")
1200
1201 def test_0110_udp_permit_v4(self):
1202 """ permit UDPv4 + non-match range
1203 """
1204 self.logger.info("ACLP_TEST_START_0110")
1205
1206 # Add an ACL
1207 rules = []
1208 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
1209 self.proto[self.IP][self.UDP]))
1210 rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
1211 self.proto[self.IP][self.UDP]))
1212 # deny ip any any in the end
1213 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
1214
1215 # Apply rules
1216 self.apply_rules(rules, "permit ipv4 udp")
1217
1218 # Traffic should still pass
1219 self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
1220
1221 self.logger.info("ACLP_TEST_FINISH_0110")
1222
1223 def test_0111_udp_permit_v6(self):
1224 """ permit UDPv6 + non-match range
1225 """
1226 self.logger.info("ACLP_TEST_START_0111")
1227
1228 # Add an ACL
1229 rules = []
1230 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
1231 self.proto[self.IP][self.UDP]))
1232 rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
1233 self.proto[self.IP][self.UDP]))
1234 # deny ip any any in the end
1235 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
1236
1237 # Apply rules
1238 self.apply_rules(rules, "permit ip6 udp")
1239
1240 # Traffic should still pass
1241 self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
1242
1243 self.logger.info("ACLP_TEST_FINISH_0111")
1244
1245 def test_0112_tcp_deny(self):
1246 """ deny TCPv4/v6 + non-match range
1247 """
1248 self.logger.info("ACLP_TEST_START_0112")
1249
1250 # Add an ACL
1251 rules = []
1252 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1253 self.PORTS_RANGE_2,
1254 self.proto[self.IP][self.TCP]))
1255 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1256 self.PORTS_RANGE_2,
1257 self.proto[self.IP][self.TCP]))
1258 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1259 self.proto[self.IP][self.TCP]))
1260 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1261 self.proto[self.IP][self.TCP]))
1262 # permit ip any any in the end
1263 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1264 self.PORTS_ALL, 0))
1265 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1266 self.PORTS_ALL, 0))
1267
1268 # Apply rules
1269 self.apply_rules(rules, "deny ip4/ip6 tcp")
1270
1271 # Traffic should not pass
1272 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1273 self.proto[self.IP][self.TCP])
1274
1275 self.logger.info("ACLP_TEST_FINISH_0112")
1276
1277 def test_0113_udp_deny(self):
1278 """ deny UDPv4/v6 + non-match range
1279 """
1280 self.logger.info("ACLP_TEST_START_0113")
1281
1282 # Add an ACL
1283 rules = []
1284 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1285 self.PORTS_RANGE_2,
1286 self.proto[self.IP][self.UDP]))
1287 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1288 self.PORTS_RANGE_2,
1289 self.proto[self.IP][self.UDP]))
1290 rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
1291 self.proto[self.IP][self.UDP]))
1292 rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
1293 self.proto[self.IP][self.UDP]))
1294 # permit ip any any in the end
1295 rules.append(self.create_rule(self.IPV4, self.PERMIT,
1296 self.PORTS_ALL, 0))
1297 rules.append(self.create_rule(self.IPV6, self.PERMIT,
1298 self.PORTS_ALL, 0))
1299
1300 # Apply rules
1301 self.apply_rules(rules, "deny ip4/ip6 udp")
1302
1303 # Traffic should not pass
1304 self.run_verify_negat_test(self.IP, self.IPRANDOM,
1305 self.proto[self.IP][self.UDP])
1306
1307 self.logger.info("ACLP_TEST_FINISH_0113")
1308
1309
Pavel Kotucek59dda062017-03-02 15:22:47 +01001310if __name__ == '__main__':
1311 unittest.main(testRunner=VppTestRunner)