blob: 464034df1afed305794089ff05d49a06ff2e93e1 [file] [log] [blame]
Renato Botelho do Coutoead1e532019-10-31 13:31:07 -05001#!/usr/bin/env python3
Jan Gelety059d1d02018-07-03 13:58:24 +02002
3import unittest
4import socket
5import binascii
6import sys
7
8from framework import VppTestCase, VppTestRunner
9
10from scapy.packet import Raw
11from scapy.layers.l2 import Ether
12from scapy.layers.inet6 import IPv6, UDP, TCP
13from util import ppp
14
15
16class TestClassifier(VppTestCase):
17 """ Classifier Test Case """
18
19 @classmethod
20 def setUpClass(cls):
21 """
22 Perform standard class setup (defined by class method setUpClass in
23 class VppTestCase) before running the test case, set test case related
24 variables and configure VPP.
25 """
26 super(TestClassifier, cls).setUpClass()
27 cls.acl_active_table = ''
28
Paul Vinciguerra7f9b7f92019-03-12 19:23:27 -070029 @classmethod
30 def tearDownClass(cls):
31 super(TestClassifier, cls).tearDownClass()
32
Jan Gelety059d1d02018-07-03 13:58:24 +020033 def setUp(self):
34 """
35 Perform test setup before test case.
36
37 **Config:**
38 - create 4 pg interfaces
39 - untagged pg0/pg1/pg2 interface
40 pg0 -------> pg1 (IP ACL)
41 \
42 ---> pg2 (MAC ACL))
43 - setup interfaces:
44 - put it into UP state
45 - set IPv6 addresses
46 - resolve neighbor address using NDP
47
48 :ivar list interfaces: pg interfaces.
49 :ivar list pg_if_packet_sizes: packet sizes in test.
50 :ivar dict acl_tbl_idx: ACL table index.
51 :ivar int pbr_vrfid: VRF id for PBR test.
52 """
53 self.reset_packet_infos()
54 super(TestClassifier, self).setUp()
55
56 # create 4 pg interfaces
57 self.create_pg_interfaces(range(3))
58
59 # packet sizes to test
60 self.pg_if_packet_sizes = [64, 9018]
61
62 self.interfaces = list(self.pg_interfaces)
63
64 # ACL vars
65 self.acl_tbl_idx = {}
66
67 # setup all interfaces
68 for intf in self.interfaces:
69 intf.admin_up()
70 intf.config_ip6()
71 intf.resolve_ndp()
72
73 def tearDown(self):
74 """Run standard test teardown and acl related log."""
75 if not self.vpp_dead:
Jan Gelety059d1d02018-07-03 13:58:24 +020076 if self.acl_active_table == 'ip6_out':
77 self.output_acl_set_interface(
78 self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
79 self.acl_active_table = ''
80 elif self.acl_active_table != '':
81 self.input_acl_set_interface(
82 self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
83 self.acl_active_table = ''
84 for intf in self.interfaces:
85 intf.unconfig_ip6()
86 intf.admin_down()
87
88 super(TestClassifier, self).tearDown()
89
Paul Vinciguerra90cf21b2019-03-13 09:23:05 -070090 def show_commands_at_teardown(self):
91 self.logger.info(self.vapi.ppcli("show inacl type ip6"))
92 self.logger.info(self.vapi.ppcli("show outacl type ip6"))
93 self.logger.info(self.vapi.cli("show classify table verbose"))
94 self.logger.info(self.vapi.cli("show ip fib"))
95
Jan Gelety059d1d02018-07-03 13:58:24 +020096 def create_stream(self, src_if, dst_if, packet_sizes,
97 proto_l=UDP(sport=1234, dport=5678)):
98 """Create input packet stream for defined interfaces.
99
100 :param VppInterface src_if: Source Interface for packet stream.
101 :param VppInterface dst_if: Destination Interface for packet stream.
102 :param list packet_sizes: packet size to test.
103 :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
104 """
105 pkts = []
106
107 for size in packet_sizes:
108 info = self.create_packet_info(src_if, dst_if)
109 payload = self.info_to_payload(info)
110 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
111 IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6) /
112 proto_l /
113 Raw(payload))
114 info.data = p.copy()
115 self.extend_packet(p, size)
116 pkts.append(p)
117 return pkts
118
119 def verify_capture(self, dst_if, capture, proto_l=UDP):
120 """Verify captured input packet stream for defined interface.
121
122 :param VppInterface dst_if: Interface to verify captured packet stream.
123 :param list capture: Captured packet stream.
124 :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
125 """
126 self.logger.info("Verifying capture on interface %s" % dst_if.name)
127 last_info = dict()
128 for i in self.interfaces:
129 last_info[i.sw_if_index] = None
130 dst_sw_if_index = dst_if.sw_if_index
131 for packet in capture:
132 try:
133 ip6_received = packet[IPv6]
134 proto_received = packet[proto_l]
Paul Vinciguerraeaea4212019-03-06 11:58:06 -0800135 payload_info = self.payload_to_info(packet[Raw])
Jan Gelety059d1d02018-07-03 13:58:24 +0200136 packet_index = payload_info.index
137 self.assertEqual(payload_info.dst, dst_sw_if_index)
138 self.logger.debug(
139 "Got packet on port %s: src=%u (id=%u)" %
140 (dst_if.name, payload_info.src, packet_index))
141 next_info = self.get_next_packet_info_for_interface2(
142 payload_info.src, dst_sw_if_index,
143 last_info[payload_info.src])
144 last_info[payload_info.src] = next_info
145 self.assertTrue(next_info is not None)
146 self.assertEqual(packet_index, next_info.index)
147 saved_packet = next_info.data
148 ip_saved = saved_packet[IPv6]
149 proto_saved = saved_packet[proto_l]
150 # Check standard fields
151 self.assertEqual(ip6_received.src, ip_saved.src)
152 self.assertEqual(ip6_received.dst, ip_saved.dst)
153 self.assertEqual(proto_received.sport, proto_saved.sport)
154 self.assertEqual(proto_received.dport, proto_saved.dport)
155 except:
156 self.logger.error(ppp("Unexpected or invalid packet:", packet))
157 raise
158 for i in self.interfaces:
159 remaining_packet = self.get_next_packet_info_for_interface2(
160 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
161 self.assertTrue(remaining_packet is None,
162 "Interface %s: Packet expected from interface %s "
163 "didn't arrive" % (dst_if.name, i.name))
164
165 @staticmethod
166 def build_ip6_mask(nh='', src_ip='', dst_ip='',
167 src_port='', dst_port=''):
168 """Build IPv6 ACL mask data with hexstring format.
169
170 :param str nh: next header number <0-ff>
171 :param str src_ip: source ip address <0-ffffffff>
172 :param str dst_ip: destination ip address <0-ffffffff>
173 :param str src_port: source port number <0-ffff>
174 :param str dst_port: destination port number <0-ffff>
175 """
176
Paul Vinciguerraea2450f2019-03-06 08:23:58 -0800177 return ('{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}'.format(
Jan Gelety059d1d02018-07-03 13:58:24 +0200178 nh, src_ip, dst_ip, src_port, dst_port)).rstrip('0')
179
180 @staticmethod
181 def build_ip6_match(nh=0, src_ip='', dst_ip='',
182 src_port=0, dst_port=0):
183 """Build IPv6 ACL match data with hexstring format.
184
185 :param int nh: next header number with valid option "x"
186 :param str src_ip: source ip6 address with format of "xxx:xxxx::xxxx"
187 :param str dst_ip: destination ip6 address with format of
188 "xxx:xxxx::xxxx"
189 :param int src_port: source port number "x"
190 :param int dst_port: destination port number "x"
191 """
192 if src_ip:
Ole Troan1556b3a2019-10-21 19:52:06 +0200193 if sys.version_info[0] == 2:
194 src_ip = binascii.hexlify(socket.inet_pton(
195 socket.AF_INET6, src_ip))
196 else:
197 src_ip = socket.inet_pton(socket.AF_INET6, src_ip).hex()
198
Jan Gelety059d1d02018-07-03 13:58:24 +0200199 if dst_ip:
Ole Troan1556b3a2019-10-21 19:52:06 +0200200 if sys.version_info[0] == 2:
201 dst_ip = binascii.hexlify(socket.inet_pton(
202 socket.AF_INET6, dst_ip))
203 else:
204 dst_ip = socket.inet_pton(socket.AF_INET6, dst_ip).hex()
Jan Gelety059d1d02018-07-03 13:58:24 +0200205
Paul Vinciguerraea2450f2019-03-06 08:23:58 -0800206 return ('{!s:0>14}{!s:0>34}{!s:0>32}{!s:0>4}{!s:0>4}'.format(
Jan Gelety059d1d02018-07-03 13:58:24 +0200207 hex(nh)[2:], src_ip, dst_ip, hex(src_port)[2:],
208 hex(dst_port)[2:])).rstrip('0')
209
210 @staticmethod
211 def build_mac_mask(dst_mac='', src_mac='', ether_type=''):
212 """Build MAC ACL mask data with hexstring format.
213
214 :param str dst_mac: source MAC address <0-ffffffffffff>
215 :param str src_mac: destination MAC address <0-ffffffffffff>
216 :param str ether_type: ethernet type <0-ffff>
217 """
218
Paul Vinciguerraea2450f2019-03-06 08:23:58 -0800219 return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
220 dst_mac, src_mac, ether_type)).rstrip('0')
Jan Gelety059d1d02018-07-03 13:58:24 +0200221
222 @staticmethod
223 def build_mac_match(dst_mac='', src_mac='', ether_type=''):
224 """Build MAC ACL match data with hexstring format.
225
226 :param str dst_mac: source MAC address <x:x:x:x:x:x>
227 :param str src_mac: destination MAC address <x:x:x:x:x:x>
228 :param str ether_type: ethernet type <0-ffff>
229 """
230 if dst_mac:
231 dst_mac = dst_mac.replace(':', '')
232 if src_mac:
233 src_mac = src_mac.replace(':', '')
234
Paul Vinciguerraea2450f2019-03-06 08:23:58 -0800235 return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format(
236 dst_mac, src_mac, ether_type)).rstrip('0')
Jan Gelety059d1d02018-07-03 13:58:24 +0200237
238 def create_classify_table(self, key, mask, data_offset=0):
239 """Create Classify Table
240
241 :param str key: key for classify table (ex, ACL name).
242 :param str mask: mask value for interested traffic.
243 :param int data_offset:
244 """
245 r = self.vapi.classify_add_del_table(
246 is_add=1,
247 mask=binascii.unhexlify(mask),
248 match_n_vectors=(len(mask) - 1) // 32 + 1,
249 miss_next_index=0,
250 current_data_flag=1,
251 current_data_offset=data_offset)
Paul Vinciguerra8d991d92019-01-25 14:05:48 -0800252 self.assertIsNotNone(r, 'No response msg for add_del_table')
Jan Gelety059d1d02018-07-03 13:58:24 +0200253 self.acl_tbl_idx[key] = r.new_table_index
254
255 def create_classify_session(self, table_index, match, vrfid=0, is_add=1):
256 """Create Classify Session
257
258 :param int table_index: table index to identify classify table.
259 :param str match: matched value for interested traffic.
260 :param int vrfid: VRF id.
261 :param int is_add: option to configure classify session.
262 - create(1) or delete(0)
263 """
264 r = self.vapi.classify_add_del_session(
265 is_add,
266 table_index,
267 binascii.unhexlify(match),
268 opaque_index=0,
269 metadata=vrfid)
Paul Vinciguerra8d991d92019-01-25 14:05:48 -0800270 self.assertIsNotNone(r, 'No response msg for add_del_session')
Jan Gelety059d1d02018-07-03 13:58:24 +0200271
272 def input_acl_set_interface(self, intf, table_index, is_add=1):
273 """Configure Input ACL interface
274
275 :param VppInterface intf: Interface to apply Input ACL feature.
276 :param int table_index: table index to identify classify table.
277 :param int is_add: option to configure classify session.
278 - enable(1) or disable(0)
279 """
280 r = self.vapi.input_acl_set_interface(
281 is_add,
282 intf.sw_if_index,
283 ip6_table_index=table_index)
Paul Vinciguerra8d991d92019-01-25 14:05:48 -0800284 self.assertIsNotNone(r, 'No response msg for acl_set_interface')
Jan Gelety059d1d02018-07-03 13:58:24 +0200285
286 def output_acl_set_interface(self, intf, table_index, is_add=1):
287 """Configure Output ACL interface
288
289 :param VppInterface intf: Interface to apply Output ACL feature.
290 :param int table_index: table index to identify classify table.
291 :param int is_add: option to configure classify session.
292 - enable(1) or disable(0)
293 """
294 r = self.vapi.output_acl_set_interface(
295 is_add,
296 intf.sw_if_index,
297 ip6_table_index=table_index)
Paul Vinciguerra8d991d92019-01-25 14:05:48 -0800298 self.assertIsNotNone(r, 'No response msg for acl_set_interface')
Jan Gelety059d1d02018-07-03 13:58:24 +0200299
300
301class TestClassifierIP6(TestClassifier):
302 """ Classifier IP6 Test Case """
303
Paul Vinciguerra7f9b7f92019-03-12 19:23:27 -0700304 @classmethod
305 def setUpClass(cls):
306 super(TestClassifierIP6, cls).setUpClass()
307
308 @classmethod
309 def tearDownClass(cls):
310 super(TestClassifierIP6, cls).tearDownClass()
311
Jan Gelety059d1d02018-07-03 13:58:24 +0200312 def test_iacl_src_ip(self):
313 """ Source IP6 iACL test
314
315 Test scenario for basic IP ACL with source IP
316 - Create IPv6 stream for pg0 -> pg1 interface.
317 - Create iACL with source IP address.
318 - Send and verify received packets on pg1 interface.
319 """
320
321 # Basic iACL testing with source IP
322 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
323 self.pg0.add_stream(pkts)
324
325 key = 'ip6_src'
326 self.create_classify_table(
327 key,
328 self.build_ip6_mask(src_ip='ffffffffffffffffffffffffffffffff'))
329 self.create_classify_session(
330 self.acl_tbl_idx.get(key),
331 self.build_ip6_match(src_ip=self.pg0.remote_ip6))
332 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
333 self.acl_active_table = key
334
335 self.pg_enable_capture(self.pg_interfaces)
336 self.pg_start()
337
338 pkts = self.pg1.get_capture(len(pkts))
339 self.verify_capture(self.pg1, pkts)
340 self.pg0.assert_nothing_captured(remark="packets forwarded")
341 self.pg2.assert_nothing_captured(remark="packets forwarded")
342
343 def test_iacl_dst_ip(self):
344 """ Destination IP6 iACL test
345
346 Test scenario for basic IP ACL with destination IP
347 - Create IPv6 stream for pg0 -> pg1 interface.
348 - Create iACL with destination IP address.
349 - Send and verify received packets on pg1 interface.
350 """
351
352 # Basic iACL testing with destination IP
353 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
354 self.pg0.add_stream(pkts)
355
356 key = 'ip6_dst'
357 self.create_classify_table(
358 key,
359 self.build_ip6_mask(dst_ip='ffffffffffffffffffffffffffffffff'))
360 self.create_classify_session(
361 self.acl_tbl_idx.get(key),
362 self.build_ip6_match(dst_ip=self.pg1.remote_ip6))
363 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
364 self.acl_active_table = key
365
366 self.pg_enable_capture(self.pg_interfaces)
367 self.pg_start()
368
369 pkts = self.pg1.get_capture(len(pkts))
370 self.verify_capture(self.pg1, pkts)
371 self.pg0.assert_nothing_captured(remark="packets forwarded")
372 self.pg2.assert_nothing_captured(remark="packets forwarded")
373
374 def test_iacl_src_dst_ip(self):
375 """ Source and destination IP6 iACL test
376
377 Test scenario for basic IP ACL with source and destination IP
378 - Create IPv4 stream for pg0 -> pg1 interface.
379 - Create iACL with source and destination IP addresses.
380 - Send and verify received packets on pg1 interface.
381 """
382
383 # Basic iACL testing with source and destination IP
384 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
385 self.pg0.add_stream(pkts)
386
387 key = 'ip6'
388 self.create_classify_table(
389 key,
390 self.build_ip6_mask(src_ip='ffffffffffffffffffffffffffffffff',
391 dst_ip='ffffffffffffffffffffffffffffffff'))
392 self.create_classify_session(
393 self.acl_tbl_idx.get(key),
394 self.build_ip6_match(src_ip=self.pg0.remote_ip6,
395 dst_ip=self.pg1.remote_ip6))
396 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
397 self.acl_active_table = key
398
399 self.pg_enable_capture(self.pg_interfaces)
400 self.pg_start()
401
402 pkts = self.pg1.get_capture(len(pkts))
403 self.verify_capture(self.pg1, pkts)
404 self.pg0.assert_nothing_captured(remark="packets forwarded")
405 self.pg2.assert_nothing_captured(remark="packets forwarded")
406
407
408# Tests split to different test case classes because of issue reported in
409# ticket VPP-1336
410class TestClassifierIP6UDP(TestClassifier):
411 """ Classifier IP6 UDP proto Test Case """
412
Paul Vinciguerra7f9b7f92019-03-12 19:23:27 -0700413 @classmethod
414 def setUpClass(cls):
415 super(TestClassifierIP6UDP, cls).setUpClass()
416
417 @classmethod
418 def tearDownClass(cls):
419 super(TestClassifierIP6UDP, cls).tearDownClass()
420
Jan Gelety059d1d02018-07-03 13:58:24 +0200421 def test_iacl_proto_udp(self):
422 """ IP6 UDP protocol iACL test
423
424 Test scenario for basic protocol ACL with UDP protocol
425 - Create IPv6 stream for pg0 -> pg1 interface.
426 - Create iACL with UDP IP protocol.
427 - Send and verify received packets on pg1 interface.
428 """
429
430 # Basic iACL testing with UDP protocol
431 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
432 self.pg0.add_stream(pkts)
433
434 key = 'nh_udp'
435 self.create_classify_table(key, self.build_ip6_mask(nh='ff'))
436 self.create_classify_session(
437 self.acl_tbl_idx.get(key),
438 self.build_ip6_match(nh=socket.IPPROTO_UDP))
439 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
440 self.acl_active_table = key
441
442 self.pg_enable_capture(self.pg_interfaces)
443 self.pg_start()
444
445 pkts = self.pg1.get_capture(len(pkts))
446 self.verify_capture(self.pg1, pkts)
447 self.pg0.assert_nothing_captured(remark="packets forwarded")
448 self.pg2.assert_nothing_captured(remark="packets forwarded")
449
450 def test_iacl_proto_udp_sport(self):
451 """ IP6 UDP source port iACL test
452
453 Test scenario for basic protocol ACL with UDP and sport
454 - Create IPv6 stream for pg0 -> pg1 interface.
455 - Create iACL with UDP IP protocol and defined sport.
456 - Send and verify received packets on pg1 interface.
457 """
458
459 # Basic iACL testing with UDP and sport
460 sport = 38
461 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
462 UDP(sport=sport, dport=5678))
463 self.pg0.add_stream(pkts)
464
465 key = 'nh_udp_sport'
466 self.create_classify_table(
467 key, self.build_ip6_mask(nh='ff', src_port='ffff'))
468 self.create_classify_session(
469 self.acl_tbl_idx.get(key),
470 self.build_ip6_match(nh=socket.IPPROTO_UDP, src_port=sport))
471 self.input_acl_set_interface(
472 self.pg0, self.acl_tbl_idx.get(key))
473 self.acl_active_table = key
474
475 self.pg_enable_capture(self.pg_interfaces)
476 self.pg_start()
477
478 pkts = self.pg1.get_capture(len(pkts))
479 self.verify_capture(self.pg1, pkts)
480 self.pg0.assert_nothing_captured(remark="packets forwarded")
481 self.pg2.assert_nothing_captured(remark="packets forwarded")
482
483 def test_iacl_proto_udp_dport(self):
484 """ IP6 UDP destination port iACL test
485
486 Test scenario for basic protocol ACL with UDP and dport
487 - Create IPv6 stream for pg0 -> pg1 interface.
488 - Create iACL with UDP IP protocol and defined dport.
489 - Send and verify received packets on pg1 interface.
490 """
491
492 # Basic iACL testing with UDP and dport
493 dport = 427
494 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
495 UDP(sport=1234, dport=dport))
496 self.pg0.add_stream(pkts)
497
498 key = 'nh_udp_dport'
499 self.create_classify_table(
500 key, self.build_ip6_mask(nh='ff', dst_port='ffff'))
501 self.create_classify_session(
502 self.acl_tbl_idx.get(key),
503 self.build_ip6_match(nh=socket.IPPROTO_UDP, dst_port=dport))
504 self.input_acl_set_interface(
505 self.pg0, self.acl_tbl_idx.get(key))
506 self.acl_active_table = key
507
508 self.pg_enable_capture(self.pg_interfaces)
509 self.pg_start()
510
511 pkts = self.pg1.get_capture(len(pkts))
512 self.verify_capture(self.pg1, pkts)
513 self.pg0.assert_nothing_captured(remark="packets forwarded")
514 self.pg2.assert_nothing_captured(remark="packets forwarded")
515
516 def test_iacl_proto_udp_sport_dport(self):
517 """ IP6 UDP source and destination ports iACL test
518
519 Test scenario for basic protocol ACL with UDP and sport and dport
520 - Create IPv6 stream for pg0 -> pg1 interface.
521 - Create iACL with UDP IP protocol and defined sport and dport.
522 - Send and verify received packets on pg1 interface.
523 """
524
525 # Basic iACL testing with UDP and sport and dport
526 sport = 13720
527 dport = 9080
528 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
529 UDP(sport=sport, dport=dport))
530 self.pg0.add_stream(pkts)
531
532 key = 'nh_udp_ports'
533 self.create_classify_table(
534 key,
535 self.build_ip6_mask(nh='ff', src_port='ffff', dst_port='ffff'))
536 self.create_classify_session(
537 self.acl_tbl_idx.get(key),
538 self.build_ip6_match(nh=socket.IPPROTO_UDP, src_port=sport,
539 dst_port=dport))
540 self.input_acl_set_interface(
541 self.pg0, self.acl_tbl_idx.get(key))
542 self.acl_active_table = key
543
544 self.pg_enable_capture(self.pg_interfaces)
545 self.pg_start()
546
547 pkts = self.pg1.get_capture(len(pkts))
548 self.verify_capture(self.pg1, pkts)
549 self.pg0.assert_nothing_captured(remark="packets forwarded")
550 self.pg2.assert_nothing_captured(remark="packets forwarded")
551
552
553class TestClassifierIP6TCP(TestClassifier):
554 """ Classifier IP6 TCP proto Test Case """
555
Paul Vinciguerra7f9b7f92019-03-12 19:23:27 -0700556 @classmethod
557 def setUpClass(cls):
558 super(TestClassifierIP6TCP, cls).setUpClass()
559
560 @classmethod
561 def tearDownClass(cls):
562 super(TestClassifierIP6TCP, cls).tearDownClass()
563
Jan Gelety059d1d02018-07-03 13:58:24 +0200564 def test_iacl_proto_tcp(self):
565 """ IP6 TCP protocol iACL test
566
567 Test scenario for basic protocol ACL with TCP protocol
568 - Create IPv6 stream for pg0 -> pg1 interface.
569 - Create iACL with TCP IP protocol.
570 - Send and verify received packets on pg1 interface.
571 """
572
573 # Basic iACL testing with TCP protocol
574 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
575 TCP(sport=1234, dport=5678))
576 self.pg0.add_stream(pkts)
577
578 key = 'nh_tcp'
579 self.create_classify_table(key, self.build_ip6_mask(nh='ff'))
580 self.create_classify_session(
581 self.acl_tbl_idx.get(key),
582 self.build_ip6_match(nh=socket.IPPROTO_TCP))
583 self.input_acl_set_interface(
584 self.pg0, self.acl_tbl_idx.get(key))
585 self.acl_active_table = key
586
587 self.pg_enable_capture(self.pg_interfaces)
588 self.pg_start()
589
590 pkts = self.pg1.get_capture(len(pkts))
591 self.verify_capture(self.pg1, pkts, TCP)
592 self.pg0.assert_nothing_captured(remark="packets forwarded")
593 self.pg2.assert_nothing_captured(remark="packets forwarded")
594
595 def test_iacl_proto_tcp_sport(self):
596 """ IP6 TCP source port iACL test
597
598 Test scenario for basic protocol ACL with TCP and sport
599 - Create IPv6 stream for pg0 -> pg1 interface.
600 - Create iACL with TCP IP protocol and defined sport.
601 - Send and verify received packets on pg1 interface.
602 """
603
604 # Basic iACL testing with TCP and sport
605 sport = 38
606 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
607 TCP(sport=sport, dport=5678))
608 self.pg0.add_stream(pkts)
609
610 key = 'nh_tcp_sport'
611 self.create_classify_table(
612 key, self.build_ip6_mask(nh='ff', src_port='ffff'))
613 self.create_classify_session(
614 self.acl_tbl_idx.get(key),
615 self.build_ip6_match(nh=socket.IPPROTO_TCP, src_port=sport))
616 self.input_acl_set_interface(
617 self.pg0, self.acl_tbl_idx.get(key))
618 self.acl_active_table = key
619
620 self.pg_enable_capture(self.pg_interfaces)
621 self.pg_start()
622
623 pkts = self.pg1.get_capture(len(pkts))
624 self.verify_capture(self.pg1, pkts, TCP)
625 self.pg0.assert_nothing_captured(remark="packets forwarded")
626 self.pg2.assert_nothing_captured(remark="packets forwarded")
627
628 def test_iacl_proto_tcp_dport(self):
629 """ IP6 TCP destination port iACL test
630
631 Test scenario for basic protocol ACL with TCP and dport
632 - Create IPv6 stream for pg0 -> pg1 interface.
633 - Create iACL with TCP IP protocol and defined dport.
634 - Send and verify received packets on pg1 interface.
635 """
636
637 # Basic iACL testing with TCP and dport
638 dport = 427
639 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
640 TCP(sport=1234, dport=dport))
641 self.pg0.add_stream(pkts)
642
643 key = 'nh_tcp_dport'
644 self.create_classify_table(
645 key, self.build_ip6_mask(nh='ff', dst_port='ffff'))
646 self.create_classify_session(
647 self.acl_tbl_idx.get(key),
648 self.build_ip6_match(nh=socket.IPPROTO_TCP, dst_port=dport))
649 self.input_acl_set_interface(
650 self.pg0, self.acl_tbl_idx.get(key))
651 self.acl_active_table = key
652
653 self.pg_enable_capture(self.pg_interfaces)
654 self.pg_start()
655
656 pkts = self.pg1.get_capture(len(pkts))
657 self.verify_capture(self.pg1, pkts, TCP)
658 self.pg0.assert_nothing_captured(remark="packets forwarded")
659 self.pg2.assert_nothing_captured(remark="packets forwarded")
660
661 def test_iacl_proto_tcp_sport_dport(self):
662 """ IP6 TCP source and destination ports iACL test
663
664 Test scenario for basic protocol ACL with TCP and sport and dport
665 - Create IPv6 stream for pg0 -> pg1 interface.
666 - Create iACL with TCP IP protocol and defined sport and dport.
667 - Send and verify received packets on pg1 interface.
668 """
669
670 # Basic iACL testing with TCP and sport and dport
671 sport = 13720
672 dport = 9080
673 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
674 TCP(sport=sport, dport=dport))
675 self.pg0.add_stream(pkts)
676
677 key = 'nh_tcp_ports'
678 self.create_classify_table(
679 key,
680 self.build_ip6_mask(nh='ff', src_port='ffff', dst_port='ffff'))
681 self.create_classify_session(
682 self.acl_tbl_idx.get(key),
683 self.build_ip6_match(nh=socket.IPPROTO_TCP, src_port=sport,
684 dst_port=dport))
685 self.input_acl_set_interface(
686 self.pg0, self.acl_tbl_idx.get(key))
687 self.acl_active_table = key
688
689 self.pg_enable_capture(self.pg_interfaces)
690 self.pg_start()
691
692 pkts = self.pg1.get_capture(len(pkts))
693 self.verify_capture(self.pg1, pkts, TCP)
694 self.pg0.assert_nothing_captured(remark="packets forwarded")
695 self.pg2.assert_nothing_captured(remark="packets forwarded")
696
697
698class TestClassifierIP6Out(TestClassifier):
699 """ Classifier output IP6 Test Case """
700
Paul Vinciguerra7f9b7f92019-03-12 19:23:27 -0700701 @classmethod
702 def setUpClass(cls):
703 super(TestClassifierIP6Out, cls).setUpClass()
704
705 @classmethod
706 def tearDownClass(cls):
707 super(TestClassifierIP6Out, cls).tearDownClass()
708
Jan Gelety059d1d02018-07-03 13:58:24 +0200709 def test_acl_ip_out(self):
710 """ Output IP6 ACL test
711
712 Test scenario for basic IP ACL with source IP
713 - Create IPv6 stream for pg1 -> pg0 interface.
714 - Create ACL with source IP address.
715 - Send and verify received packets on pg0 interface.
716 """
717
718 # Basic oACL testing with source IP
719 pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes)
720 self.pg1.add_stream(pkts)
721
722 key = 'ip6_out'
723 self.create_classify_table(
724 key,
725 self.build_ip6_mask(src_ip='ffffffffffffffffffffffffffffffff'),
726 data_offset=0)
727 self.create_classify_session(
728 self.acl_tbl_idx.get(key),
729 self.build_ip6_match(src_ip=self.pg1.remote_ip6))
730 self.output_acl_set_interface(
731 self.pg0, self.acl_tbl_idx.get(key))
732 self.acl_active_table = key
733
734 self.pg_enable_capture(self.pg_interfaces)
735 self.pg_start()
736
737 pkts = self.pg0.get_capture(len(pkts))
738 self.verify_capture(self.pg0, pkts)
739 self.pg1.assert_nothing_captured(remark="packets forwarded")
740 self.pg2.assert_nothing_captured(remark="packets forwarded")
741
742
743class TestClassifierIP6MAC(TestClassifier):
744 """ Classifier IP6 MAC Test Case """
745
Paul Vinciguerra7f9b7f92019-03-12 19:23:27 -0700746 @classmethod
747 def setUpClass(cls):
748 super(TestClassifierIP6MAC, cls).setUpClass()
749
750 @classmethod
751 def tearDownClass(cls):
752 super(TestClassifierIP6MAC, cls).tearDownClass()
753
Jan Gelety059d1d02018-07-03 13:58:24 +0200754 def test_acl_mac(self):
755 """ IP6 MAC iACL test
756
757 Test scenario for basic MAC ACL with source MAC
758 - Create IPv6 stream for pg0 -> pg2 interface.
759 - Create ACL with source MAC address.
760 - Send and verify received packets on pg2 interface.
761 """
762
763 # Basic iACL testing with source MAC
764 pkts = self.create_stream(self.pg0, self.pg2, self.pg_if_packet_sizes)
765 self.pg0.add_stream(pkts)
766
767 key = 'mac'
768 self.create_classify_table(
769 key, self.build_mac_mask(src_mac='ffffffffffff'), data_offset=-14)
770 self.create_classify_session(
771 self.acl_tbl_idx.get(key),
772 self.build_mac_match(src_mac=self.pg0.remote_mac))
773 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
774 self.acl_active_table = key
775
776 self.pg_enable_capture(self.pg_interfaces)
777 self.pg_start()
778
779 pkts = self.pg2.get_capture(len(pkts))
780 self.verify_capture(self.pg2, pkts)
781 self.pg0.assert_nothing_captured(remark="packets forwarded")
782 self.pg1.assert_nothing_captured(remark="packets forwarded")
783
784
785if __name__ == '__main__':
786 unittest.main(testRunner=VppTestRunner)