blob: cbcf5c472579528241b5501a70fcd914394d3ad1 [file] [log] [blame]
Jan Gelety059d1d02018-07-03 13:58:24 +02001#!/usr/bin/env python
2
3import unittest
4import socket
5import binascii
6import sys
7
8from framework import VppTestCase, VppTestRunner
9
10from scapy.packet import Raw
11from scapy.layers.l2 import Ether
12from scapy.layers.inet6 import IPv6, UDP, TCP
13from util import ppp
14
15
16class TestClassifier(VppTestCase):
17 """ Classifier Test Case """
18
19 @classmethod
20 def setUpClass(cls):
21 """
22 Perform standard class setup (defined by class method setUpClass in
23 class VppTestCase) before running the test case, set test case related
24 variables and configure VPP.
25 """
26 super(TestClassifier, cls).setUpClass()
27 cls.acl_active_table = ''
28
29 def setUp(self):
30 """
31 Perform test setup before test case.
32
33 **Config:**
34 - create 4 pg interfaces
35 - untagged pg0/pg1/pg2 interface
36 pg0 -------> pg1 (IP ACL)
37 \
38 ---> pg2 (MAC ACL))
39 - setup interfaces:
40 - put it into UP state
41 - set IPv6 addresses
42 - resolve neighbor address using NDP
43
44 :ivar list interfaces: pg interfaces.
45 :ivar list pg_if_packet_sizes: packet sizes in test.
46 :ivar dict acl_tbl_idx: ACL table index.
47 :ivar int pbr_vrfid: VRF id for PBR test.
48 """
49 self.reset_packet_infos()
50 super(TestClassifier, self).setUp()
51
52 # create 4 pg interfaces
53 self.create_pg_interfaces(range(3))
54
55 # packet sizes to test
56 self.pg_if_packet_sizes = [64, 9018]
57
58 self.interfaces = list(self.pg_interfaces)
59
60 # ACL vars
61 self.acl_tbl_idx = {}
62
63 # setup all interfaces
64 for intf in self.interfaces:
65 intf.admin_up()
66 intf.config_ip6()
67 intf.resolve_ndp()
68
69 def tearDown(self):
70 """Run standard test teardown and acl related log."""
71 if not self.vpp_dead:
72 self.logger.info(self.vapi.ppcli("show inacl type ip6"))
73 self.logger.info(self.vapi.ppcli("show outacl type ip6"))
74 self.logger.info(self.vapi.cli("show classify table verbose"))
75 self.logger.info(self.vapi.cli("show ip fib"))
76 if self.acl_active_table == 'ip6_out':
77 self.output_acl_set_interface(
78 self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
79 self.acl_active_table = ''
80 elif self.acl_active_table != '':
81 self.input_acl_set_interface(
82 self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
83 self.acl_active_table = ''
84 for intf in self.interfaces:
85 intf.unconfig_ip6()
86 intf.admin_down()
87
88 super(TestClassifier, self).tearDown()
89
90 def create_stream(self, src_if, dst_if, packet_sizes,
91 proto_l=UDP(sport=1234, dport=5678)):
92 """Create input packet stream for defined interfaces.
93
94 :param VppInterface src_if: Source Interface for packet stream.
95 :param VppInterface dst_if: Destination Interface for packet stream.
96 :param list packet_sizes: packet size to test.
97 :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
98 """
99 pkts = []
100
101 for size in packet_sizes:
102 info = self.create_packet_info(src_if, dst_if)
103 payload = self.info_to_payload(info)
104 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
105 IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6) /
106 proto_l /
107 Raw(payload))
108 info.data = p.copy()
109 self.extend_packet(p, size)
110 pkts.append(p)
111 return pkts
112
113 def verify_capture(self, dst_if, capture, proto_l=UDP):
114 """Verify captured input packet stream for defined interface.
115
116 :param VppInterface dst_if: Interface to verify captured packet stream.
117 :param list capture: Captured packet stream.
118 :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
119 """
120 self.logger.info("Verifying capture on interface %s" % dst_if.name)
121 last_info = dict()
122 for i in self.interfaces:
123 last_info[i.sw_if_index] = None
124 dst_sw_if_index = dst_if.sw_if_index
125 for packet in capture:
126 try:
127 ip6_received = packet[IPv6]
128 proto_received = packet[proto_l]
129 payload_info = self.payload_to_info(str(packet[Raw]))
130 packet_index = payload_info.index
131 self.assertEqual(payload_info.dst, dst_sw_if_index)
132 self.logger.debug(
133 "Got packet on port %s: src=%u (id=%u)" %
134 (dst_if.name, payload_info.src, packet_index))
135 next_info = self.get_next_packet_info_for_interface2(
136 payload_info.src, dst_sw_if_index,
137 last_info[payload_info.src])
138 last_info[payload_info.src] = next_info
139 self.assertTrue(next_info is not None)
140 self.assertEqual(packet_index, next_info.index)
141 saved_packet = next_info.data
142 ip_saved = saved_packet[IPv6]
143 proto_saved = saved_packet[proto_l]
144 # Check standard fields
145 self.assertEqual(ip6_received.src, ip_saved.src)
146 self.assertEqual(ip6_received.dst, ip_saved.dst)
147 self.assertEqual(proto_received.sport, proto_saved.sport)
148 self.assertEqual(proto_received.dport, proto_saved.dport)
149 except:
150 self.logger.error(ppp("Unexpected or invalid packet:", packet))
151 raise
152 for i in self.interfaces:
153 remaining_packet = self.get_next_packet_info_for_interface2(
154 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
155 self.assertTrue(remaining_packet is None,
156 "Interface %s: Packet expected from interface %s "
157 "didn't arrive" % (dst_if.name, i.name))
158
159 @staticmethod
160 def build_ip6_mask(nh='', src_ip='', dst_ip='',
161 src_port='', dst_port=''):
162 """Build IPv6 ACL mask data with hexstring format.
163
164 :param str nh: next header number <0-ff>
165 :param str src_ip: source ip address <0-ffffffff>
166 :param str dst_ip: destination ip address <0-ffffffff>
167 :param str src_port: source port number <0-ffff>
168 :param str dst_port: destination port number <0-ffff>
169 """
170
171 return ('{:0>14}{:0>34}{:0>32}{:0>4}{:0>4}'.format(
172 nh, src_ip, dst_ip, src_port, dst_port)).rstrip('0')
173
174 @staticmethod
175 def build_ip6_match(nh=0, src_ip='', dst_ip='',
176 src_port=0, dst_port=0):
177 """Build IPv6 ACL match data with hexstring format.
178
179 :param int nh: next header number with valid option "x"
180 :param str src_ip: source ip6 address with format of "xxx:xxxx::xxxx"
181 :param str dst_ip: destination ip6 address with format of
182 "xxx:xxxx::xxxx"
183 :param int src_port: source port number "x"
184 :param int dst_port: destination port number "x"
185 """
186 if src_ip:
Paul Vinciguerra6e4c6ad2018-11-25 10:35:29 -0800187 src_ip = binascii.hexlify(socket.inet_pton(
188 socket.AF_INET6, src_ip))
Jan Gelety059d1d02018-07-03 13:58:24 +0200189 if dst_ip:
Paul Vinciguerra6e4c6ad2018-11-25 10:35:29 -0800190 dst_ip = binascii.hexlify(socket.inet_pton(
191 socket.AF_INET6, dst_ip))
Jan Gelety059d1d02018-07-03 13:58:24 +0200192
193 return ('{:0>14}{:0>34}{:0>32}{:0>4}{:0>4}'.format(
194 hex(nh)[2:], src_ip, dst_ip, hex(src_port)[2:],
195 hex(dst_port)[2:])).rstrip('0')
196
197 @staticmethod
198 def build_mac_mask(dst_mac='', src_mac='', ether_type=''):
199 """Build MAC ACL mask data with hexstring format.
200
201 :param str dst_mac: source MAC address <0-ffffffffffff>
202 :param str src_mac: destination MAC address <0-ffffffffffff>
203 :param str ether_type: ethernet type <0-ffff>
204 """
205
206 return ('{:0>12}{:0>12}{:0>4}'.format(dst_mac, src_mac,
207 ether_type)).rstrip('0')
208
209 @staticmethod
210 def build_mac_match(dst_mac='', src_mac='', ether_type=''):
211 """Build MAC ACL match data with hexstring format.
212
213 :param str dst_mac: source MAC address <x:x:x:x:x:x>
214 :param str src_mac: destination MAC address <x:x:x:x:x:x>
215 :param str ether_type: ethernet type <0-ffff>
216 """
217 if dst_mac:
218 dst_mac = dst_mac.replace(':', '')
219 if src_mac:
220 src_mac = src_mac.replace(':', '')
221
222 return ('{:0>12}{:0>12}{:0>4}'.format(dst_mac, src_mac,
223 ether_type)).rstrip('0')
224
225 def create_classify_table(self, key, mask, data_offset=0):
226 """Create Classify Table
227
228 :param str key: key for classify table (ex, ACL name).
229 :param str mask: mask value for interested traffic.
230 :param int data_offset:
231 """
232 r = self.vapi.classify_add_del_table(
233 is_add=1,
234 mask=binascii.unhexlify(mask),
235 match_n_vectors=(len(mask) - 1) // 32 + 1,
236 miss_next_index=0,
237 current_data_flag=1,
238 current_data_offset=data_offset)
239 self.assertIsNotNone(r, msg='No response msg for add_del_table')
240 self.acl_tbl_idx[key] = r.new_table_index
241
242 def create_classify_session(self, table_index, match, vrfid=0, is_add=1):
243 """Create Classify Session
244
245 :param int table_index: table index to identify classify table.
246 :param str match: matched value for interested traffic.
247 :param int vrfid: VRF id.
248 :param int is_add: option to configure classify session.
249 - create(1) or delete(0)
250 """
251 r = self.vapi.classify_add_del_session(
252 is_add,
253 table_index,
254 binascii.unhexlify(match),
255 opaque_index=0,
256 metadata=vrfid)
257 self.assertIsNotNone(r, msg='No response msg for add_del_session')
258
259 def input_acl_set_interface(self, intf, table_index, is_add=1):
260 """Configure Input ACL interface
261
262 :param VppInterface intf: Interface to apply Input ACL feature.
263 :param int table_index: table index to identify classify table.
264 :param int is_add: option to configure classify session.
265 - enable(1) or disable(0)
266 """
267 r = self.vapi.input_acl_set_interface(
268 is_add,
269 intf.sw_if_index,
270 ip6_table_index=table_index)
271 self.assertIsNotNone(r, msg='No response msg for acl_set_interface')
272
273 def output_acl_set_interface(self, intf, table_index, is_add=1):
274 """Configure Output ACL interface
275
276 :param VppInterface intf: Interface to apply Output ACL feature.
277 :param int table_index: table index to identify classify table.
278 :param int is_add: option to configure classify session.
279 - enable(1) or disable(0)
280 """
281 r = self.vapi.output_acl_set_interface(
282 is_add,
283 intf.sw_if_index,
284 ip6_table_index=table_index)
285 self.assertIsNotNone(r, msg='No response msg for acl_set_interface')
286
287
288class TestClassifierIP6(TestClassifier):
289 """ Classifier IP6 Test Case """
290
291 def test_iacl_src_ip(self):
292 """ Source IP6 iACL test
293
294 Test scenario for basic IP ACL with source IP
295 - Create IPv6 stream for pg0 -> pg1 interface.
296 - Create iACL with source IP address.
297 - Send and verify received packets on pg1 interface.
298 """
299
300 # Basic iACL testing with source IP
301 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
302 self.pg0.add_stream(pkts)
303
304 key = 'ip6_src'
305 self.create_classify_table(
306 key,
307 self.build_ip6_mask(src_ip='ffffffffffffffffffffffffffffffff'))
308 self.create_classify_session(
309 self.acl_tbl_idx.get(key),
310 self.build_ip6_match(src_ip=self.pg0.remote_ip6))
311 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
312 self.acl_active_table = key
313
314 self.pg_enable_capture(self.pg_interfaces)
315 self.pg_start()
316
317 pkts = self.pg1.get_capture(len(pkts))
318 self.verify_capture(self.pg1, pkts)
319 self.pg0.assert_nothing_captured(remark="packets forwarded")
320 self.pg2.assert_nothing_captured(remark="packets forwarded")
321
322 def test_iacl_dst_ip(self):
323 """ Destination IP6 iACL test
324
325 Test scenario for basic IP ACL with destination IP
326 - Create IPv6 stream for pg0 -> pg1 interface.
327 - Create iACL with destination IP address.
328 - Send and verify received packets on pg1 interface.
329 """
330
331 # Basic iACL testing with destination IP
332 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
333 self.pg0.add_stream(pkts)
334
335 key = 'ip6_dst'
336 self.create_classify_table(
337 key,
338 self.build_ip6_mask(dst_ip='ffffffffffffffffffffffffffffffff'))
339 self.create_classify_session(
340 self.acl_tbl_idx.get(key),
341 self.build_ip6_match(dst_ip=self.pg1.remote_ip6))
342 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
343 self.acl_active_table = key
344
345 self.pg_enable_capture(self.pg_interfaces)
346 self.pg_start()
347
348 pkts = self.pg1.get_capture(len(pkts))
349 self.verify_capture(self.pg1, pkts)
350 self.pg0.assert_nothing_captured(remark="packets forwarded")
351 self.pg2.assert_nothing_captured(remark="packets forwarded")
352
353 def test_iacl_src_dst_ip(self):
354 """ Source and destination IP6 iACL test
355
356 Test scenario for basic IP ACL with source and destination IP
357 - Create IPv4 stream for pg0 -> pg1 interface.
358 - Create iACL with source and destination IP addresses.
359 - Send and verify received packets on pg1 interface.
360 """
361
362 # Basic iACL testing with source and destination IP
363 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
364 self.pg0.add_stream(pkts)
365
366 key = 'ip6'
367 self.create_classify_table(
368 key,
369 self.build_ip6_mask(src_ip='ffffffffffffffffffffffffffffffff',
370 dst_ip='ffffffffffffffffffffffffffffffff'))
371 self.create_classify_session(
372 self.acl_tbl_idx.get(key),
373 self.build_ip6_match(src_ip=self.pg0.remote_ip6,
374 dst_ip=self.pg1.remote_ip6))
375 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
376 self.acl_active_table = key
377
378 self.pg_enable_capture(self.pg_interfaces)
379 self.pg_start()
380
381 pkts = self.pg1.get_capture(len(pkts))
382 self.verify_capture(self.pg1, pkts)
383 self.pg0.assert_nothing_captured(remark="packets forwarded")
384 self.pg2.assert_nothing_captured(remark="packets forwarded")
385
386
387# Tests split to different test case classes because of issue reported in
388# ticket VPP-1336
389class TestClassifierIP6UDP(TestClassifier):
390 """ Classifier IP6 UDP proto Test Case """
391
392 def test_iacl_proto_udp(self):
393 """ IP6 UDP protocol iACL test
394
395 Test scenario for basic protocol ACL with UDP protocol
396 - Create IPv6 stream for pg0 -> pg1 interface.
397 - Create iACL with UDP IP protocol.
398 - Send and verify received packets on pg1 interface.
399 """
400
401 # Basic iACL testing with UDP protocol
402 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
403 self.pg0.add_stream(pkts)
404
405 key = 'nh_udp'
406 self.create_classify_table(key, self.build_ip6_mask(nh='ff'))
407 self.create_classify_session(
408 self.acl_tbl_idx.get(key),
409 self.build_ip6_match(nh=socket.IPPROTO_UDP))
410 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
411 self.acl_active_table = key
412
413 self.pg_enable_capture(self.pg_interfaces)
414 self.pg_start()
415
416 pkts = self.pg1.get_capture(len(pkts))
417 self.verify_capture(self.pg1, pkts)
418 self.pg0.assert_nothing_captured(remark="packets forwarded")
419 self.pg2.assert_nothing_captured(remark="packets forwarded")
420
421 def test_iacl_proto_udp_sport(self):
422 """ IP6 UDP source port iACL test
423
424 Test scenario for basic protocol ACL with UDP and sport
425 - Create IPv6 stream for pg0 -> pg1 interface.
426 - Create iACL with UDP IP protocol and defined sport.
427 - Send and verify received packets on pg1 interface.
428 """
429
430 # Basic iACL testing with UDP and sport
431 sport = 38
432 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
433 UDP(sport=sport, dport=5678))
434 self.pg0.add_stream(pkts)
435
436 key = 'nh_udp_sport'
437 self.create_classify_table(
438 key, self.build_ip6_mask(nh='ff', src_port='ffff'))
439 self.create_classify_session(
440 self.acl_tbl_idx.get(key),
441 self.build_ip6_match(nh=socket.IPPROTO_UDP, src_port=sport))
442 self.input_acl_set_interface(
443 self.pg0, self.acl_tbl_idx.get(key))
444 self.acl_active_table = key
445
446 self.pg_enable_capture(self.pg_interfaces)
447 self.pg_start()
448
449 pkts = self.pg1.get_capture(len(pkts))
450 self.verify_capture(self.pg1, pkts)
451 self.pg0.assert_nothing_captured(remark="packets forwarded")
452 self.pg2.assert_nothing_captured(remark="packets forwarded")
453
454 def test_iacl_proto_udp_dport(self):
455 """ IP6 UDP destination port iACL test
456
457 Test scenario for basic protocol ACL with UDP and dport
458 - Create IPv6 stream for pg0 -> pg1 interface.
459 - Create iACL with UDP IP protocol and defined dport.
460 - Send and verify received packets on pg1 interface.
461 """
462
463 # Basic iACL testing with UDP and dport
464 dport = 427
465 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
466 UDP(sport=1234, dport=dport))
467 self.pg0.add_stream(pkts)
468
469 key = 'nh_udp_dport'
470 self.create_classify_table(
471 key, self.build_ip6_mask(nh='ff', dst_port='ffff'))
472 self.create_classify_session(
473 self.acl_tbl_idx.get(key),
474 self.build_ip6_match(nh=socket.IPPROTO_UDP, dst_port=dport))
475 self.input_acl_set_interface(
476 self.pg0, self.acl_tbl_idx.get(key))
477 self.acl_active_table = key
478
479 self.pg_enable_capture(self.pg_interfaces)
480 self.pg_start()
481
482 pkts = self.pg1.get_capture(len(pkts))
483 self.verify_capture(self.pg1, pkts)
484 self.pg0.assert_nothing_captured(remark="packets forwarded")
485 self.pg2.assert_nothing_captured(remark="packets forwarded")
486
487 def test_iacl_proto_udp_sport_dport(self):
488 """ IP6 UDP source and destination ports iACL test
489
490 Test scenario for basic protocol ACL with UDP and sport and dport
491 - Create IPv6 stream for pg0 -> pg1 interface.
492 - Create iACL with UDP IP protocol and defined sport and dport.
493 - Send and verify received packets on pg1 interface.
494 """
495
496 # Basic iACL testing with UDP and sport and dport
497 sport = 13720
498 dport = 9080
499 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
500 UDP(sport=sport, dport=dport))
501 self.pg0.add_stream(pkts)
502
503 key = 'nh_udp_ports'
504 self.create_classify_table(
505 key,
506 self.build_ip6_mask(nh='ff', src_port='ffff', dst_port='ffff'))
507 self.create_classify_session(
508 self.acl_tbl_idx.get(key),
509 self.build_ip6_match(nh=socket.IPPROTO_UDP, src_port=sport,
510 dst_port=dport))
511 self.input_acl_set_interface(
512 self.pg0, self.acl_tbl_idx.get(key))
513 self.acl_active_table = key
514
515 self.pg_enable_capture(self.pg_interfaces)
516 self.pg_start()
517
518 pkts = self.pg1.get_capture(len(pkts))
519 self.verify_capture(self.pg1, pkts)
520 self.pg0.assert_nothing_captured(remark="packets forwarded")
521 self.pg2.assert_nothing_captured(remark="packets forwarded")
522
523
524class TestClassifierIP6TCP(TestClassifier):
525 """ Classifier IP6 TCP proto Test Case """
526
527 def test_iacl_proto_tcp(self):
528 """ IP6 TCP protocol iACL test
529
530 Test scenario for basic protocol ACL with TCP protocol
531 - Create IPv6 stream for pg0 -> pg1 interface.
532 - Create iACL with TCP IP protocol.
533 - Send and verify received packets on pg1 interface.
534 """
535
536 # Basic iACL testing with TCP protocol
537 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
538 TCP(sport=1234, dport=5678))
539 self.pg0.add_stream(pkts)
540
541 key = 'nh_tcp'
542 self.create_classify_table(key, self.build_ip6_mask(nh='ff'))
543 self.create_classify_session(
544 self.acl_tbl_idx.get(key),
545 self.build_ip6_match(nh=socket.IPPROTO_TCP))
546 self.input_acl_set_interface(
547 self.pg0, self.acl_tbl_idx.get(key))
548 self.acl_active_table = key
549
550 self.pg_enable_capture(self.pg_interfaces)
551 self.pg_start()
552
553 pkts = self.pg1.get_capture(len(pkts))
554 self.verify_capture(self.pg1, pkts, TCP)
555 self.pg0.assert_nothing_captured(remark="packets forwarded")
556 self.pg2.assert_nothing_captured(remark="packets forwarded")
557
558 def test_iacl_proto_tcp_sport(self):
559 """ IP6 TCP source port iACL test
560
561 Test scenario for basic protocol ACL with TCP and sport
562 - Create IPv6 stream for pg0 -> pg1 interface.
563 - Create iACL with TCP IP protocol and defined sport.
564 - Send and verify received packets on pg1 interface.
565 """
566
567 # Basic iACL testing with TCP and sport
568 sport = 38
569 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
570 TCP(sport=sport, dport=5678))
571 self.pg0.add_stream(pkts)
572
573 key = 'nh_tcp_sport'
574 self.create_classify_table(
575 key, self.build_ip6_mask(nh='ff', src_port='ffff'))
576 self.create_classify_session(
577 self.acl_tbl_idx.get(key),
578 self.build_ip6_match(nh=socket.IPPROTO_TCP, src_port=sport))
579 self.input_acl_set_interface(
580 self.pg0, self.acl_tbl_idx.get(key))
581 self.acl_active_table = key
582
583 self.pg_enable_capture(self.pg_interfaces)
584 self.pg_start()
585
586 pkts = self.pg1.get_capture(len(pkts))
587 self.verify_capture(self.pg1, pkts, TCP)
588 self.pg0.assert_nothing_captured(remark="packets forwarded")
589 self.pg2.assert_nothing_captured(remark="packets forwarded")
590
591 def test_iacl_proto_tcp_dport(self):
592 """ IP6 TCP destination port iACL test
593
594 Test scenario for basic protocol ACL with TCP and dport
595 - Create IPv6 stream for pg0 -> pg1 interface.
596 - Create iACL with TCP IP protocol and defined dport.
597 - Send and verify received packets on pg1 interface.
598 """
599
600 # Basic iACL testing with TCP and dport
601 dport = 427
602 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
603 TCP(sport=1234, dport=dport))
604 self.pg0.add_stream(pkts)
605
606 key = 'nh_tcp_dport'
607 self.create_classify_table(
608 key, self.build_ip6_mask(nh='ff', dst_port='ffff'))
609 self.create_classify_session(
610 self.acl_tbl_idx.get(key),
611 self.build_ip6_match(nh=socket.IPPROTO_TCP, dst_port=dport))
612 self.input_acl_set_interface(
613 self.pg0, self.acl_tbl_idx.get(key))
614 self.acl_active_table = key
615
616 self.pg_enable_capture(self.pg_interfaces)
617 self.pg_start()
618
619 pkts = self.pg1.get_capture(len(pkts))
620 self.verify_capture(self.pg1, pkts, TCP)
621 self.pg0.assert_nothing_captured(remark="packets forwarded")
622 self.pg2.assert_nothing_captured(remark="packets forwarded")
623
624 def test_iacl_proto_tcp_sport_dport(self):
625 """ IP6 TCP source and destination ports iACL test
626
627 Test scenario for basic protocol ACL with TCP and sport and dport
628 - Create IPv6 stream for pg0 -> pg1 interface.
629 - Create iACL with TCP IP protocol and defined sport and dport.
630 - Send and verify received packets on pg1 interface.
631 """
632
633 # Basic iACL testing with TCP and sport and dport
634 sport = 13720
635 dport = 9080
636 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
637 TCP(sport=sport, dport=dport))
638 self.pg0.add_stream(pkts)
639
640 key = 'nh_tcp_ports'
641 self.create_classify_table(
642 key,
643 self.build_ip6_mask(nh='ff', src_port='ffff', dst_port='ffff'))
644 self.create_classify_session(
645 self.acl_tbl_idx.get(key),
646 self.build_ip6_match(nh=socket.IPPROTO_TCP, src_port=sport,
647 dst_port=dport))
648 self.input_acl_set_interface(
649 self.pg0, self.acl_tbl_idx.get(key))
650 self.acl_active_table = key
651
652 self.pg_enable_capture(self.pg_interfaces)
653 self.pg_start()
654
655 pkts = self.pg1.get_capture(len(pkts))
656 self.verify_capture(self.pg1, pkts, TCP)
657 self.pg0.assert_nothing_captured(remark="packets forwarded")
658 self.pg2.assert_nothing_captured(remark="packets forwarded")
659
660
661class TestClassifierIP6Out(TestClassifier):
662 """ Classifier output IP6 Test Case """
663
664 def test_acl_ip_out(self):
665 """ Output IP6 ACL test
666
667 Test scenario for basic IP ACL with source IP
668 - Create IPv6 stream for pg1 -> pg0 interface.
669 - Create ACL with source IP address.
670 - Send and verify received packets on pg0 interface.
671 """
672
673 # Basic oACL testing with source IP
674 pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes)
675 self.pg1.add_stream(pkts)
676
677 key = 'ip6_out'
678 self.create_classify_table(
679 key,
680 self.build_ip6_mask(src_ip='ffffffffffffffffffffffffffffffff'),
681 data_offset=0)
682 self.create_classify_session(
683 self.acl_tbl_idx.get(key),
684 self.build_ip6_match(src_ip=self.pg1.remote_ip6))
685 self.output_acl_set_interface(
686 self.pg0, self.acl_tbl_idx.get(key))
687 self.acl_active_table = key
688
689 self.pg_enable_capture(self.pg_interfaces)
690 self.pg_start()
691
692 pkts = self.pg0.get_capture(len(pkts))
693 self.verify_capture(self.pg0, pkts)
694 self.pg1.assert_nothing_captured(remark="packets forwarded")
695 self.pg2.assert_nothing_captured(remark="packets forwarded")
696
697
698class TestClassifierIP6MAC(TestClassifier):
699 """ Classifier IP6 MAC Test Case """
700
701 def test_acl_mac(self):
702 """ IP6 MAC iACL test
703
704 Test scenario for basic MAC ACL with source MAC
705 - Create IPv6 stream for pg0 -> pg2 interface.
706 - Create ACL with source MAC address.
707 - Send and verify received packets on pg2 interface.
708 """
709
710 # Basic iACL testing with source MAC
711 pkts = self.create_stream(self.pg0, self.pg2, self.pg_if_packet_sizes)
712 self.pg0.add_stream(pkts)
713
714 key = 'mac'
715 self.create_classify_table(
716 key, self.build_mac_mask(src_mac='ffffffffffff'), data_offset=-14)
717 self.create_classify_session(
718 self.acl_tbl_idx.get(key),
719 self.build_mac_match(src_mac=self.pg0.remote_mac))
720 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
721 self.acl_active_table = key
722
723 self.pg_enable_capture(self.pg_interfaces)
724 self.pg_start()
725
726 pkts = self.pg2.get_capture(len(pkts))
727 self.verify_capture(self.pg2, pkts)
728 self.pg0.assert_nothing_captured(remark="packets forwarded")
729 self.pg1.assert_nothing_captured(remark="packets forwarded")
730
731
732if __name__ == '__main__':
733 unittest.main(testRunner=VppTestRunner)