blob: ea6adcb713ffdc30931170bb69b514edc24be338 [file] [log] [blame]
Jan Gelety059d1d02018-07-03 13:58:24 +02001#!/usr/bin/env python
2
3import unittest
4import socket
5import binascii
6import sys
7
8from framework import VppTestCase, VppTestRunner
9
10from scapy.packet import Raw
11from scapy.layers.l2 import Ether
12from scapy.layers.inet6 import IPv6, UDP, TCP
13from util import ppp
14
15
16class TestClassifier(VppTestCase):
17 """ Classifier Test Case """
18
19 @classmethod
20 def setUpClass(cls):
21 """
22 Perform standard class setup (defined by class method setUpClass in
23 class VppTestCase) before running the test case, set test case related
24 variables and configure VPP.
25 """
26 super(TestClassifier, cls).setUpClass()
27 cls.acl_active_table = ''
28
Paul Vinciguerra7f9b7f92019-03-12 19:23:27 -070029 @classmethod
30 def tearDownClass(cls):
31 super(TestClassifier, cls).tearDownClass()
32
Jan Gelety059d1d02018-07-03 13:58:24 +020033 def setUp(self):
34 """
35 Perform test setup before test case.
36
37 **Config:**
38 - create 4 pg interfaces
39 - untagged pg0/pg1/pg2 interface
40 pg0 -------> pg1 (IP ACL)
41 \
42 ---> pg2 (MAC ACL))
43 - setup interfaces:
44 - put it into UP state
45 - set IPv6 addresses
46 - resolve neighbor address using NDP
47
48 :ivar list interfaces: pg interfaces.
49 :ivar list pg_if_packet_sizes: packet sizes in test.
50 :ivar dict acl_tbl_idx: ACL table index.
51 :ivar int pbr_vrfid: VRF id for PBR test.
52 """
53 self.reset_packet_infos()
54 super(TestClassifier, self).setUp()
55
56 # create 4 pg interfaces
57 self.create_pg_interfaces(range(3))
58
59 # packet sizes to test
60 self.pg_if_packet_sizes = [64, 9018]
61
62 self.interfaces = list(self.pg_interfaces)
63
64 # ACL vars
65 self.acl_tbl_idx = {}
66
67 # setup all interfaces
68 for intf in self.interfaces:
69 intf.admin_up()
70 intf.config_ip6()
71 intf.resolve_ndp()
72
73 def tearDown(self):
74 """Run standard test teardown and acl related log."""
75 if not self.vpp_dead:
Jan Gelety059d1d02018-07-03 13:58:24 +020076 if self.acl_active_table == 'ip6_out':
77 self.output_acl_set_interface(
78 self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
79 self.acl_active_table = ''
80 elif self.acl_active_table != '':
81 self.input_acl_set_interface(
82 self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
83 self.acl_active_table = ''
84 for intf in self.interfaces:
85 intf.unconfig_ip6()
86 intf.admin_down()
87
88 super(TestClassifier, self).tearDown()
89
Paul Vinciguerra90cf21b2019-03-13 09:23:05 -070090 def show_commands_at_teardown(self):
91 self.logger.info(self.vapi.ppcli("show inacl type ip6"))
92 self.logger.info(self.vapi.ppcli("show outacl type ip6"))
93 self.logger.info(self.vapi.cli("show classify table verbose"))
94 self.logger.info(self.vapi.cli("show ip fib"))
95
Jan Gelety059d1d02018-07-03 13:58:24 +020096 def create_stream(self, src_if, dst_if, packet_sizes,
97 proto_l=UDP(sport=1234, dport=5678)):
98 """Create input packet stream for defined interfaces.
99
100 :param VppInterface src_if: Source Interface for packet stream.
101 :param VppInterface dst_if: Destination Interface for packet stream.
102 :param list packet_sizes: packet size to test.
103 :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
104 """
105 pkts = []
106
107 for size in packet_sizes:
108 info = self.create_packet_info(src_if, dst_if)
109 payload = self.info_to_payload(info)
110 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
111 IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6) /
112 proto_l /
113 Raw(payload))
114 info.data = p.copy()
115 self.extend_packet(p, size)
116 pkts.append(p)
117 return pkts
118
119 def verify_capture(self, dst_if, capture, proto_l=UDP):
120 """Verify captured input packet stream for defined interface.
121
122 :param VppInterface dst_if: Interface to verify captured packet stream.
123 :param list capture: Captured packet stream.
124 :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
125 """
126 self.logger.info("Verifying capture on interface %s" % dst_if.name)
127 last_info = dict()
128 for i in self.interfaces:
129 last_info[i.sw_if_index] = None
130 dst_sw_if_index = dst_if.sw_if_index
131 for packet in capture:
132 try:
133 ip6_received = packet[IPv6]
134 proto_received = packet[proto_l]
Paul Vinciguerraeaea4212019-03-06 11:58:06 -0800135 payload_info = self.payload_to_info(packet[Raw])
Jan Gelety059d1d02018-07-03 13:58:24 +0200136 packet_index = payload_info.index
137 self.assertEqual(payload_info.dst, dst_sw_if_index)
138 self.logger.debug(
139 "Got packet on port %s: src=%u (id=%u)" %
140 (dst_if.name, payload_info.src, packet_index))
141 next_info = self.get_next_packet_info_for_interface2(
142 payload_info.src, dst_sw_if_index,
143 last_info[payload_info.src])
144 last_info[payload_info.src] = next_info
145 self.assertTrue(next_info is not None)
146 self.assertEqual(packet_index, next_info.index)
147 saved_packet = next_info.data
148 ip_saved = saved_packet[IPv6]
149 proto_saved = saved_packet[proto_l]
150 # Check standard fields
151 self.assertEqual(ip6_received.src, ip_saved.src)
152 self.assertEqual(ip6_received.dst, ip_saved.dst)
153 self.assertEqual(proto_received.sport, proto_saved.sport)
154 self.assertEqual(proto_received.dport, proto_saved.dport)
155 except:
156 self.logger.error(ppp("Unexpected or invalid packet:", packet))
157 raise
158 for i in self.interfaces:
159 remaining_packet = self.get_next_packet_info_for_interface2(
160 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
161 self.assertTrue(remaining_packet is None,
162 "Interface %s: Packet expected from interface %s "
163 "didn't arrive" % (dst_if.name, i.name))
164
165 @staticmethod
166 def build_ip6_mask(nh='', src_ip='', dst_ip='',
167 src_port='', dst_port=''):
168 """Build IPv6 ACL mask data with hexstring format.
169
170 :param str nh: next header number <0-ff>
171 :param str src_ip: source ip address <0-ffffffff>
172 :param str dst_ip: destination ip address <0-ffffffff>
173 :param str src_port: source port number <0-ffff>
174 :param str dst_port: destination port number <0-ffff>
175 """
176
Paul Vinciguerraea2450f2019-03-06 08:23:58 -0800177 return ('{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}'.format(
Jan Gelety059d1d02018-07-03 13:58:24 +0200178 nh, src_ip, dst_ip, src_port, dst_port)).rstrip('0')
179
180 @staticmethod
181 def build_ip6_match(nh=0, src_ip='', dst_ip='',
182 src_port=0, dst_port=0):
183 """Build IPv6 ACL match data with hexstring format.
184
185 :param int nh: next header number with valid option "x"
186 :param str src_ip: source ip6 address with format of "xxx:xxxx::xxxx"
187 :param str dst_ip: destination ip6 address with format of
188 "xxx:xxxx::xxxx"
189 :param int src_port: source port number "x"
190 :param int dst_port: destination port number "x"
191 """
192 if src_ip:
Paul Vinciguerra6e4c6ad2018-11-25 10:35:29 -0800193 src_ip = binascii.hexlify(socket.inet_pton(
194 socket.AF_INET6, src_ip))
Jan Gelety059d1d02018-07-03 13:58:24 +0200195 if dst_ip:
Paul Vinciguerra6e4c6ad2018-11-25 10:35:29 -0800196 dst_ip = binascii.hexlify(socket.inet_pton(
197 socket.AF_INET6, dst_ip))
Jan Gelety059d1d02018-07-03 13:58:24 +0200198
Paul Vinciguerraea2450f2019-03-06 08:23:58 -0800199 return ('{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}'.format(
Jan Gelety059d1d02018-07-03 13:58:24 +0200200 hex(nh)[2:], src_ip, dst_ip, hex(src_port)[2:],
201 hex(dst_port)[2:])).rstrip('0')
202
203 @staticmethod
204 def build_mac_mask(dst_mac='', src_mac='', ether_type=''):
205 """Build MAC ACL mask data with hexstring format.
206
207 :param str dst_mac: source MAC address <0-ffffffffffff>
208 :param str src_mac: destination MAC address <0-ffffffffffff>
209 :param str ether_type: ethernet type <0-ffff>
210 """
211
Paul Vinciguerraea2450f2019-03-06 08:23:58 -0800212 return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
213 dst_mac, src_mac, ether_type)).rstrip('0')
Jan Gelety059d1d02018-07-03 13:58:24 +0200214
215 @staticmethod
216 def build_mac_match(dst_mac='', src_mac='', ether_type=''):
217 """Build MAC ACL match data with hexstring format.
218
219 :param str dst_mac: source MAC address <x:x:x:x:x:x>
220 :param str src_mac: destination MAC address <x:x:x:x:x:x>
221 :param str ether_type: ethernet type <0-ffff>
222 """
223 if dst_mac:
224 dst_mac = dst_mac.replace(':', '')
225 if src_mac:
226 src_mac = src_mac.replace(':', '')
227
Paul Vinciguerraea2450f2019-03-06 08:23:58 -0800228 return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
229 dst_mac, src_mac, ether_type)).rstrip('0')
Jan Gelety059d1d02018-07-03 13:58:24 +0200230
231 def create_classify_table(self, key, mask, data_offset=0):
232 """Create Classify Table
233
234 :param str key: key for classify table (ex, ACL name).
235 :param str mask: mask value for interested traffic.
236 :param int data_offset:
237 """
238 r = self.vapi.classify_add_del_table(
239 is_add=1,
240 mask=binascii.unhexlify(mask),
241 match_n_vectors=(len(mask) - 1) // 32 + 1,
242 miss_next_index=0,
243 current_data_flag=1,
244 current_data_offset=data_offset)
Paul Vinciguerra8d991d92019-01-25 14:05:48 -0800245 self.assertIsNotNone(r, 'No response msg for add_del_table')
Jan Gelety059d1d02018-07-03 13:58:24 +0200246 self.acl_tbl_idx[key] = r.new_table_index
247
248 def create_classify_session(self, table_index, match, vrfid=0, is_add=1):
249 """Create Classify Session
250
251 :param int table_index: table index to identify classify table.
252 :param str match: matched value for interested traffic.
253 :param int vrfid: VRF id.
254 :param int is_add: option to configure classify session.
255 - create(1) or delete(0)
256 """
257 r = self.vapi.classify_add_del_session(
258 is_add,
259 table_index,
260 binascii.unhexlify(match),
261 opaque_index=0,
262 metadata=vrfid)
Paul Vinciguerra8d991d92019-01-25 14:05:48 -0800263 self.assertIsNotNone(r, 'No response msg for add_del_session')
Jan Gelety059d1d02018-07-03 13:58:24 +0200264
265 def input_acl_set_interface(self, intf, table_index, is_add=1):
266 """Configure Input ACL interface
267
268 :param VppInterface intf: Interface to apply Input ACL feature.
269 :param int table_index: table index to identify classify table.
270 :param int is_add: option to configure classify session.
271 - enable(1) or disable(0)
272 """
273 r = self.vapi.input_acl_set_interface(
274 is_add,
275 intf.sw_if_index,
276 ip6_table_index=table_index)
Paul Vinciguerra8d991d92019-01-25 14:05:48 -0800277 self.assertIsNotNone(r, 'No response msg for acl_set_interface')
Jan Gelety059d1d02018-07-03 13:58:24 +0200278
279 def output_acl_set_interface(self, intf, table_index, is_add=1):
280 """Configure Output ACL interface
281
282 :param VppInterface intf: Interface to apply Output ACL feature.
283 :param int table_index: table index to identify classify table.
284 :param int is_add: option to configure classify session.
285 - enable(1) or disable(0)
286 """
287 r = self.vapi.output_acl_set_interface(
288 is_add,
289 intf.sw_if_index,
290 ip6_table_index=table_index)
Paul Vinciguerra8d991d92019-01-25 14:05:48 -0800291 self.assertIsNotNone(r, 'No response msg for acl_set_interface')
Jan Gelety059d1d02018-07-03 13:58:24 +0200292
293
294class TestClassifierIP6(TestClassifier):
295 """ Classifier IP6 Test Case """
296
Paul Vinciguerra7f9b7f92019-03-12 19:23:27 -0700297 @classmethod
298 def setUpClass(cls):
299 super(TestClassifierIP6, cls).setUpClass()
300
301 @classmethod
302 def tearDownClass(cls):
303 super(TestClassifierIP6, cls).tearDownClass()
304
Jan Gelety059d1d02018-07-03 13:58:24 +0200305 def test_iacl_src_ip(self):
306 """ Source IP6 iACL test
307
308 Test scenario for basic IP ACL with source IP
309 - Create IPv6 stream for pg0 -> pg1 interface.
310 - Create iACL with source IP address.
311 - Send and verify received packets on pg1 interface.
312 """
313
314 # Basic iACL testing with source IP
315 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
316 self.pg0.add_stream(pkts)
317
318 key = 'ip6_src'
319 self.create_classify_table(
320 key,
321 self.build_ip6_mask(src_ip='ffffffffffffffffffffffffffffffff'))
322 self.create_classify_session(
323 self.acl_tbl_idx.get(key),
324 self.build_ip6_match(src_ip=self.pg0.remote_ip6))
325 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
326 self.acl_active_table = key
327
328 self.pg_enable_capture(self.pg_interfaces)
329 self.pg_start()
330
331 pkts = self.pg1.get_capture(len(pkts))
332 self.verify_capture(self.pg1, pkts)
333 self.pg0.assert_nothing_captured(remark="packets forwarded")
334 self.pg2.assert_nothing_captured(remark="packets forwarded")
335
336 def test_iacl_dst_ip(self):
337 """ Destination IP6 iACL test
338
339 Test scenario for basic IP ACL with destination IP
340 - Create IPv6 stream for pg0 -> pg1 interface.
341 - Create iACL with destination IP address.
342 - Send and verify received packets on pg1 interface.
343 """
344
345 # Basic iACL testing with destination IP
346 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
347 self.pg0.add_stream(pkts)
348
349 key = 'ip6_dst'
350 self.create_classify_table(
351 key,
352 self.build_ip6_mask(dst_ip='ffffffffffffffffffffffffffffffff'))
353 self.create_classify_session(
354 self.acl_tbl_idx.get(key),
355 self.build_ip6_match(dst_ip=self.pg1.remote_ip6))
356 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
357 self.acl_active_table = key
358
359 self.pg_enable_capture(self.pg_interfaces)
360 self.pg_start()
361
362 pkts = self.pg1.get_capture(len(pkts))
363 self.verify_capture(self.pg1, pkts)
364 self.pg0.assert_nothing_captured(remark="packets forwarded")
365 self.pg2.assert_nothing_captured(remark="packets forwarded")
366
367 def test_iacl_src_dst_ip(self):
368 """ Source and destination IP6 iACL test
369
370 Test scenario for basic IP ACL with source and destination IP
371 - Create IPv4 stream for pg0 -> pg1 interface.
372 - Create iACL with source and destination IP addresses.
373 - Send and verify received packets on pg1 interface.
374 """
375
376 # Basic iACL testing with source and destination IP
377 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
378 self.pg0.add_stream(pkts)
379
380 key = 'ip6'
381 self.create_classify_table(
382 key,
383 self.build_ip6_mask(src_ip='ffffffffffffffffffffffffffffffff',
384 dst_ip='ffffffffffffffffffffffffffffffff'))
385 self.create_classify_session(
386 self.acl_tbl_idx.get(key),
387 self.build_ip6_match(src_ip=self.pg0.remote_ip6,
388 dst_ip=self.pg1.remote_ip6))
389 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
390 self.acl_active_table = key
391
392 self.pg_enable_capture(self.pg_interfaces)
393 self.pg_start()
394
395 pkts = self.pg1.get_capture(len(pkts))
396 self.verify_capture(self.pg1, pkts)
397 self.pg0.assert_nothing_captured(remark="packets forwarded")
398 self.pg2.assert_nothing_captured(remark="packets forwarded")
399
400
401# Tests split to different test case classes because of issue reported in
402# ticket VPP-1336
403class TestClassifierIP6UDP(TestClassifier):
404 """ Classifier IP6 UDP proto Test Case """
405
Paul Vinciguerra7f9b7f92019-03-12 19:23:27 -0700406 @classmethod
407 def setUpClass(cls):
408 super(TestClassifierIP6UDP, cls).setUpClass()
409
410 @classmethod
411 def tearDownClass(cls):
412 super(TestClassifierIP6UDP, cls).tearDownClass()
413
Jan Gelety059d1d02018-07-03 13:58:24 +0200414 def test_iacl_proto_udp(self):
415 """ IP6 UDP protocol iACL test
416
417 Test scenario for basic protocol ACL with UDP protocol
418 - Create IPv6 stream for pg0 -> pg1 interface.
419 - Create iACL with UDP IP protocol.
420 - Send and verify received packets on pg1 interface.
421 """
422
423 # Basic iACL testing with UDP protocol
424 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
425 self.pg0.add_stream(pkts)
426
427 key = 'nh_udp'
428 self.create_classify_table(key, self.build_ip6_mask(nh='ff'))
429 self.create_classify_session(
430 self.acl_tbl_idx.get(key),
431 self.build_ip6_match(nh=socket.IPPROTO_UDP))
432 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
433 self.acl_active_table = key
434
435 self.pg_enable_capture(self.pg_interfaces)
436 self.pg_start()
437
438 pkts = self.pg1.get_capture(len(pkts))
439 self.verify_capture(self.pg1, pkts)
440 self.pg0.assert_nothing_captured(remark="packets forwarded")
441 self.pg2.assert_nothing_captured(remark="packets forwarded")
442
443 def test_iacl_proto_udp_sport(self):
444 """ IP6 UDP source port iACL test
445
446 Test scenario for basic protocol ACL with UDP and sport
447 - Create IPv6 stream for pg0 -> pg1 interface.
448 - Create iACL with UDP IP protocol and defined sport.
449 - Send and verify received packets on pg1 interface.
450 """
451
452 # Basic iACL testing with UDP and sport
453 sport = 38
454 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
455 UDP(sport=sport, dport=5678))
456 self.pg0.add_stream(pkts)
457
458 key = 'nh_udp_sport'
459 self.create_classify_table(
460 key, self.build_ip6_mask(nh='ff', src_port='ffff'))
461 self.create_classify_session(
462 self.acl_tbl_idx.get(key),
463 self.build_ip6_match(nh=socket.IPPROTO_UDP, src_port=sport))
464 self.input_acl_set_interface(
465 self.pg0, self.acl_tbl_idx.get(key))
466 self.acl_active_table = key
467
468 self.pg_enable_capture(self.pg_interfaces)
469 self.pg_start()
470
471 pkts = self.pg1.get_capture(len(pkts))
472 self.verify_capture(self.pg1, pkts)
473 self.pg0.assert_nothing_captured(remark="packets forwarded")
474 self.pg2.assert_nothing_captured(remark="packets forwarded")
475
476 def test_iacl_proto_udp_dport(self):
477 """ IP6 UDP destination port iACL test
478
479 Test scenario for basic protocol ACL with UDP and dport
480 - Create IPv6 stream for pg0 -> pg1 interface.
481 - Create iACL with UDP IP protocol and defined dport.
482 - Send and verify received packets on pg1 interface.
483 """
484
485 # Basic iACL testing with UDP and dport
486 dport = 427
487 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
488 UDP(sport=1234, dport=dport))
489 self.pg0.add_stream(pkts)
490
491 key = 'nh_udp_dport'
492 self.create_classify_table(
493 key, self.build_ip6_mask(nh='ff', dst_port='ffff'))
494 self.create_classify_session(
495 self.acl_tbl_idx.get(key),
496 self.build_ip6_match(nh=socket.IPPROTO_UDP, dst_port=dport))
497 self.input_acl_set_interface(
498 self.pg0, self.acl_tbl_idx.get(key))
499 self.acl_active_table = key
500
501 self.pg_enable_capture(self.pg_interfaces)
502 self.pg_start()
503
504 pkts = self.pg1.get_capture(len(pkts))
505 self.verify_capture(self.pg1, pkts)
506 self.pg0.assert_nothing_captured(remark="packets forwarded")
507 self.pg2.assert_nothing_captured(remark="packets forwarded")
508
509 def test_iacl_proto_udp_sport_dport(self):
510 """ IP6 UDP source and destination ports iACL test
511
512 Test scenario for basic protocol ACL with UDP and sport and dport
513 - Create IPv6 stream for pg0 -> pg1 interface.
514 - Create iACL with UDP IP protocol and defined sport and dport.
515 - Send and verify received packets on pg1 interface.
516 """
517
518 # Basic iACL testing with UDP and sport and dport
519 sport = 13720
520 dport = 9080
521 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
522 UDP(sport=sport, dport=dport))
523 self.pg0.add_stream(pkts)
524
525 key = 'nh_udp_ports'
526 self.create_classify_table(
527 key,
528 self.build_ip6_mask(nh='ff', src_port='ffff', dst_port='ffff'))
529 self.create_classify_session(
530 self.acl_tbl_idx.get(key),
531 self.build_ip6_match(nh=socket.IPPROTO_UDP, src_port=sport,
532 dst_port=dport))
533 self.input_acl_set_interface(
534 self.pg0, self.acl_tbl_idx.get(key))
535 self.acl_active_table = key
536
537 self.pg_enable_capture(self.pg_interfaces)
538 self.pg_start()
539
540 pkts = self.pg1.get_capture(len(pkts))
541 self.verify_capture(self.pg1, pkts)
542 self.pg0.assert_nothing_captured(remark="packets forwarded")
543 self.pg2.assert_nothing_captured(remark="packets forwarded")
544
545
546class TestClassifierIP6TCP(TestClassifier):
547 """ Classifier IP6 TCP proto Test Case """
548
Paul Vinciguerra7f9b7f92019-03-12 19:23:27 -0700549 @classmethod
550 def setUpClass(cls):
551 super(TestClassifierIP6TCP, cls).setUpClass()
552
553 @classmethod
554 def tearDownClass(cls):
555 super(TestClassifierIP6TCP, cls).tearDownClass()
556
Jan Gelety059d1d02018-07-03 13:58:24 +0200557 def test_iacl_proto_tcp(self):
558 """ IP6 TCP protocol iACL test
559
560 Test scenario for basic protocol ACL with TCP protocol
561 - Create IPv6 stream for pg0 -> pg1 interface.
562 - Create iACL with TCP IP protocol.
563 - Send and verify received packets on pg1 interface.
564 """
565
566 # Basic iACL testing with TCP protocol
567 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
568 TCP(sport=1234, dport=5678))
569 self.pg0.add_stream(pkts)
570
571 key = 'nh_tcp'
572 self.create_classify_table(key, self.build_ip6_mask(nh='ff'))
573 self.create_classify_session(
574 self.acl_tbl_idx.get(key),
575 self.build_ip6_match(nh=socket.IPPROTO_TCP))
576 self.input_acl_set_interface(
577 self.pg0, self.acl_tbl_idx.get(key))
578 self.acl_active_table = key
579
580 self.pg_enable_capture(self.pg_interfaces)
581 self.pg_start()
582
583 pkts = self.pg1.get_capture(len(pkts))
584 self.verify_capture(self.pg1, pkts, TCP)
585 self.pg0.assert_nothing_captured(remark="packets forwarded")
586 self.pg2.assert_nothing_captured(remark="packets forwarded")
587
588 def test_iacl_proto_tcp_sport(self):
589 """ IP6 TCP source port iACL test
590
591 Test scenario for basic protocol ACL with TCP and sport
592 - Create IPv6 stream for pg0 -> pg1 interface.
593 - Create iACL with TCP IP protocol and defined sport.
594 - Send and verify received packets on pg1 interface.
595 """
596
597 # Basic iACL testing with TCP and sport
598 sport = 38
599 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
600 TCP(sport=sport, dport=5678))
601 self.pg0.add_stream(pkts)
602
603 key = 'nh_tcp_sport'
604 self.create_classify_table(
605 key, self.build_ip6_mask(nh='ff', src_port='ffff'))
606 self.create_classify_session(
607 self.acl_tbl_idx.get(key),
608 self.build_ip6_match(nh=socket.IPPROTO_TCP, src_port=sport))
609 self.input_acl_set_interface(
610 self.pg0, self.acl_tbl_idx.get(key))
611 self.acl_active_table = key
612
613 self.pg_enable_capture(self.pg_interfaces)
614 self.pg_start()
615
616 pkts = self.pg1.get_capture(len(pkts))
617 self.verify_capture(self.pg1, pkts, TCP)
618 self.pg0.assert_nothing_captured(remark="packets forwarded")
619 self.pg2.assert_nothing_captured(remark="packets forwarded")
620
621 def test_iacl_proto_tcp_dport(self):
622 """ IP6 TCP destination port iACL test
623
624 Test scenario for basic protocol ACL with TCP and dport
625 - Create IPv6 stream for pg0 -> pg1 interface.
626 - Create iACL with TCP IP protocol and defined dport.
627 - Send and verify received packets on pg1 interface.
628 """
629
630 # Basic iACL testing with TCP and dport
631 dport = 427
632 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
633 TCP(sport=1234, dport=dport))
634 self.pg0.add_stream(pkts)
635
636 key = 'nh_tcp_dport'
637 self.create_classify_table(
638 key, self.build_ip6_mask(nh='ff', dst_port='ffff'))
639 self.create_classify_session(
640 self.acl_tbl_idx.get(key),
641 self.build_ip6_match(nh=socket.IPPROTO_TCP, dst_port=dport))
642 self.input_acl_set_interface(
643 self.pg0, self.acl_tbl_idx.get(key))
644 self.acl_active_table = key
645
646 self.pg_enable_capture(self.pg_interfaces)
647 self.pg_start()
648
649 pkts = self.pg1.get_capture(len(pkts))
650 self.verify_capture(self.pg1, pkts, TCP)
651 self.pg0.assert_nothing_captured(remark="packets forwarded")
652 self.pg2.assert_nothing_captured(remark="packets forwarded")
653
654 def test_iacl_proto_tcp_sport_dport(self):
655 """ IP6 TCP source and destination ports iACL test
656
657 Test scenario for basic protocol ACL with TCP and sport and dport
658 - Create IPv6 stream for pg0 -> pg1 interface.
659 - Create iACL with TCP IP protocol and defined sport and dport.
660 - Send and verify received packets on pg1 interface.
661 """
662
663 # Basic iACL testing with TCP and sport and dport
664 sport = 13720
665 dport = 9080
666 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
667 TCP(sport=sport, dport=dport))
668 self.pg0.add_stream(pkts)
669
670 key = 'nh_tcp_ports'
671 self.create_classify_table(
672 key,
673 self.build_ip6_mask(nh='ff', src_port='ffff', dst_port='ffff'))
674 self.create_classify_session(
675 self.acl_tbl_idx.get(key),
676 self.build_ip6_match(nh=socket.IPPROTO_TCP, src_port=sport,
677 dst_port=dport))
678 self.input_acl_set_interface(
679 self.pg0, self.acl_tbl_idx.get(key))
680 self.acl_active_table = key
681
682 self.pg_enable_capture(self.pg_interfaces)
683 self.pg_start()
684
685 pkts = self.pg1.get_capture(len(pkts))
686 self.verify_capture(self.pg1, pkts, TCP)
687 self.pg0.assert_nothing_captured(remark="packets forwarded")
688 self.pg2.assert_nothing_captured(remark="packets forwarded")
689
690
691class TestClassifierIP6Out(TestClassifier):
692 """ Classifier output IP6 Test Case """
693
Paul Vinciguerra7f9b7f92019-03-12 19:23:27 -0700694 @classmethod
695 def setUpClass(cls):
696 super(TestClassifierIP6Out, cls).setUpClass()
697
698 @classmethod
699 def tearDownClass(cls):
700 super(TestClassifierIP6Out, cls).tearDownClass()
701
Jan Gelety059d1d02018-07-03 13:58:24 +0200702 def test_acl_ip_out(self):
703 """ Output IP6 ACL test
704
705 Test scenario for basic IP ACL with source IP
706 - Create IPv6 stream for pg1 -> pg0 interface.
707 - Create ACL with source IP address.
708 - Send and verify received packets on pg0 interface.
709 """
710
711 # Basic oACL testing with source IP
712 pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes)
713 self.pg1.add_stream(pkts)
714
715 key = 'ip6_out'
716 self.create_classify_table(
717 key,
718 self.build_ip6_mask(src_ip='ffffffffffffffffffffffffffffffff'),
719 data_offset=0)
720 self.create_classify_session(
721 self.acl_tbl_idx.get(key),
722 self.build_ip6_match(src_ip=self.pg1.remote_ip6))
723 self.output_acl_set_interface(
724 self.pg0, self.acl_tbl_idx.get(key))
725 self.acl_active_table = key
726
727 self.pg_enable_capture(self.pg_interfaces)
728 self.pg_start()
729
730 pkts = self.pg0.get_capture(len(pkts))
731 self.verify_capture(self.pg0, pkts)
732 self.pg1.assert_nothing_captured(remark="packets forwarded")
733 self.pg2.assert_nothing_captured(remark="packets forwarded")
734
735
736class TestClassifierIP6MAC(TestClassifier):
737 """ Classifier IP6 MAC Test Case """
738
Paul Vinciguerra7f9b7f92019-03-12 19:23:27 -0700739 @classmethod
740 def setUpClass(cls):
741 super(TestClassifierIP6MAC, cls).setUpClass()
742
743 @classmethod
744 def tearDownClass(cls):
745 super(TestClassifierIP6MAC, cls).tearDownClass()
746
Jan Gelety059d1d02018-07-03 13:58:24 +0200747 def test_acl_mac(self):
748 """ IP6 MAC iACL test
749
750 Test scenario for basic MAC ACL with source MAC
751 - Create IPv6 stream for pg0 -> pg2 interface.
752 - Create ACL with source MAC address.
753 - Send and verify received packets on pg2 interface.
754 """
755
756 # Basic iACL testing with source MAC
757 pkts = self.create_stream(self.pg0, self.pg2, self.pg_if_packet_sizes)
758 self.pg0.add_stream(pkts)
759
760 key = 'mac'
761 self.create_classify_table(
762 key, self.build_mac_mask(src_mac='ffffffffffff'), data_offset=-14)
763 self.create_classify_session(
764 self.acl_tbl_idx.get(key),
765 self.build_mac_match(src_mac=self.pg0.remote_mac))
766 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
767 self.acl_active_table = key
768
769 self.pg_enable_capture(self.pg_interfaces)
770 self.pg_start()
771
772 pkts = self.pg2.get_capture(len(pkts))
773 self.verify_capture(self.pg2, pkts)
774 self.pg0.assert_nothing_captured(remark="packets forwarded")
775 self.pg1.assert_nothing_captured(remark="packets forwarded")
776
777
778if __name__ == '__main__':
779 unittest.main(testRunner=VppTestRunner)