blob: 9f4c20aae3e4891886469598d4c6faaf307c4167 [file] [log] [blame]
Jan Gelety059d1d02018-07-03 13:58:24 +02001#!/usr/bin/env python
2
3import unittest
4import socket
5import binascii
6import sys
7
8from framework import VppTestCase, VppTestRunner
9
10from scapy.packet import Raw
11from scapy.layers.l2 import Ether
12from scapy.layers.inet6 import IPv6, UDP, TCP
13from util import ppp
14
15
16class TestClassifier(VppTestCase):
17 """ Classifier Test Case """
18
19 @classmethod
20 def setUpClass(cls):
21 """
22 Perform standard class setup (defined by class method setUpClass in
23 class VppTestCase) before running the test case, set test case related
24 variables and configure VPP.
25 """
26 super(TestClassifier, cls).setUpClass()
27 cls.acl_active_table = ''
28
29 def setUp(self):
30 """
31 Perform test setup before test case.
32
33 **Config:**
34 - create 4 pg interfaces
35 - untagged pg0/pg1/pg2 interface
36 pg0 -------> pg1 (IP ACL)
37 \
38 ---> pg2 (MAC ACL))
39 - setup interfaces:
40 - put it into UP state
41 - set IPv6 addresses
42 - resolve neighbor address using NDP
43
44 :ivar list interfaces: pg interfaces.
45 :ivar list pg_if_packet_sizes: packet sizes in test.
46 :ivar dict acl_tbl_idx: ACL table index.
47 :ivar int pbr_vrfid: VRF id for PBR test.
48 """
49 self.reset_packet_infos()
50 super(TestClassifier, self).setUp()
51
52 # create 4 pg interfaces
53 self.create_pg_interfaces(range(3))
54
55 # packet sizes to test
56 self.pg_if_packet_sizes = [64, 9018]
57
58 self.interfaces = list(self.pg_interfaces)
59
60 # ACL vars
61 self.acl_tbl_idx = {}
62
63 # setup all interfaces
64 for intf in self.interfaces:
65 intf.admin_up()
66 intf.config_ip6()
67 intf.resolve_ndp()
68
69 def tearDown(self):
70 """Run standard test teardown and acl related log."""
71 if not self.vpp_dead:
72 self.logger.info(self.vapi.ppcli("show inacl type ip6"))
73 self.logger.info(self.vapi.ppcli("show outacl type ip6"))
74 self.logger.info(self.vapi.cli("show classify table verbose"))
75 self.logger.info(self.vapi.cli("show ip fib"))
76 if self.acl_active_table == 'ip6_out':
77 self.output_acl_set_interface(
78 self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
79 self.acl_active_table = ''
80 elif self.acl_active_table != '':
81 self.input_acl_set_interface(
82 self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
83 self.acl_active_table = ''
84 for intf in self.interfaces:
85 intf.unconfig_ip6()
86 intf.admin_down()
87
88 super(TestClassifier, self).tearDown()
89
90 def create_stream(self, src_if, dst_if, packet_sizes,
91 proto_l=UDP(sport=1234, dport=5678)):
92 """Create input packet stream for defined interfaces.
93
94 :param VppInterface src_if: Source Interface for packet stream.
95 :param VppInterface dst_if: Destination Interface for packet stream.
96 :param list packet_sizes: packet size to test.
97 :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
98 """
99 pkts = []
100
101 for size in packet_sizes:
102 info = self.create_packet_info(src_if, dst_if)
103 payload = self.info_to_payload(info)
104 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
105 IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6) /
106 proto_l /
107 Raw(payload))
108 info.data = p.copy()
109 self.extend_packet(p, size)
110 pkts.append(p)
111 return pkts
112
113 def verify_capture(self, dst_if, capture, proto_l=UDP):
114 """Verify captured input packet stream for defined interface.
115
116 :param VppInterface dst_if: Interface to verify captured packet stream.
117 :param list capture: Captured packet stream.
118 :param Scapy proto_l: Required IP protocol. Default protocol is UDP.
119 """
120 self.logger.info("Verifying capture on interface %s" % dst_if.name)
121 last_info = dict()
122 for i in self.interfaces:
123 last_info[i.sw_if_index] = None
124 dst_sw_if_index = dst_if.sw_if_index
125 for packet in capture:
126 try:
127 ip6_received = packet[IPv6]
128 proto_received = packet[proto_l]
129 payload_info = self.payload_to_info(str(packet[Raw]))
130 packet_index = payload_info.index
131 self.assertEqual(payload_info.dst, dst_sw_if_index)
132 self.logger.debug(
133 "Got packet on port %s: src=%u (id=%u)" %
134 (dst_if.name, payload_info.src, packet_index))
135 next_info = self.get_next_packet_info_for_interface2(
136 payload_info.src, dst_sw_if_index,
137 last_info[payload_info.src])
138 last_info[payload_info.src] = next_info
139 self.assertTrue(next_info is not None)
140 self.assertEqual(packet_index, next_info.index)
141 saved_packet = next_info.data
142 ip_saved = saved_packet[IPv6]
143 proto_saved = saved_packet[proto_l]
144 # Check standard fields
145 self.assertEqual(ip6_received.src, ip_saved.src)
146 self.assertEqual(ip6_received.dst, ip_saved.dst)
147 self.assertEqual(proto_received.sport, proto_saved.sport)
148 self.assertEqual(proto_received.dport, proto_saved.dport)
149 except:
150 self.logger.error(ppp("Unexpected or invalid packet:", packet))
151 raise
152 for i in self.interfaces:
153 remaining_packet = self.get_next_packet_info_for_interface2(
154 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
155 self.assertTrue(remaining_packet is None,
156 "Interface %s: Packet expected from interface %s "
157 "didn't arrive" % (dst_if.name, i.name))
158
159 @staticmethod
160 def build_ip6_mask(nh='', src_ip='', dst_ip='',
161 src_port='', dst_port=''):
162 """Build IPv6 ACL mask data with hexstring format.
163
164 :param str nh: next header number <0-ff>
165 :param str src_ip: source ip address <0-ffffffff>
166 :param str dst_ip: destination ip address <0-ffffffff>
167 :param str src_port: source port number <0-ffff>
168 :param str dst_port: destination port number <0-ffff>
169 """
170
171 return ('{:0>14}{:0>34}{:0>32}{:0>4}{:0>4}'.format(
172 nh, src_ip, dst_ip, src_port, dst_port)).rstrip('0')
173
174 @staticmethod
175 def build_ip6_match(nh=0, src_ip='', dst_ip='',
176 src_port=0, dst_port=0):
177 """Build IPv6 ACL match data with hexstring format.
178
179 :param int nh: next header number with valid option "x"
180 :param str src_ip: source ip6 address with format of "xxx:xxxx::xxxx"
181 :param str dst_ip: destination ip6 address with format of
182 "xxx:xxxx::xxxx"
183 :param int src_port: source port number "x"
184 :param int dst_port: destination port number "x"
185 """
186 if src_ip:
187 src_ip = socket.inet_pton(socket.AF_INET6, src_ip).encode('hex')
188 if dst_ip:
189 dst_ip = socket.inet_pton(socket.AF_INET6, dst_ip).encode('hex')
190
191 return ('{:0>14}{:0>34}{:0>32}{:0>4}{:0>4}'.format(
192 hex(nh)[2:], src_ip, dst_ip, hex(src_port)[2:],
193 hex(dst_port)[2:])).rstrip('0')
194
195 @staticmethod
196 def build_mac_mask(dst_mac='', src_mac='', ether_type=''):
197 """Build MAC ACL mask data with hexstring format.
198
199 :param str dst_mac: source MAC address <0-ffffffffffff>
200 :param str src_mac: destination MAC address <0-ffffffffffff>
201 :param str ether_type: ethernet type <0-ffff>
202 """
203
204 return ('{:0>12}{:0>12}{:0>4}'.format(dst_mac, src_mac,
205 ether_type)).rstrip('0')
206
207 @staticmethod
208 def build_mac_match(dst_mac='', src_mac='', ether_type=''):
209 """Build MAC ACL match data with hexstring format.
210
211 :param str dst_mac: source MAC address <x:x:x:x:x:x>
212 :param str src_mac: destination MAC address <x:x:x:x:x:x>
213 :param str ether_type: ethernet type <0-ffff>
214 """
215 if dst_mac:
216 dst_mac = dst_mac.replace(':', '')
217 if src_mac:
218 src_mac = src_mac.replace(':', '')
219
220 return ('{:0>12}{:0>12}{:0>4}'.format(dst_mac, src_mac,
221 ether_type)).rstrip('0')
222
223 def create_classify_table(self, key, mask, data_offset=0):
224 """Create Classify Table
225
226 :param str key: key for classify table (ex, ACL name).
227 :param str mask: mask value for interested traffic.
228 :param int data_offset:
229 """
230 r = self.vapi.classify_add_del_table(
231 is_add=1,
232 mask=binascii.unhexlify(mask),
233 match_n_vectors=(len(mask) - 1) // 32 + 1,
234 miss_next_index=0,
235 current_data_flag=1,
236 current_data_offset=data_offset)
237 self.assertIsNotNone(r, msg='No response msg for add_del_table')
238 self.acl_tbl_idx[key] = r.new_table_index
239
240 def create_classify_session(self, table_index, match, vrfid=0, is_add=1):
241 """Create Classify Session
242
243 :param int table_index: table index to identify classify table.
244 :param str match: matched value for interested traffic.
245 :param int vrfid: VRF id.
246 :param int is_add: option to configure classify session.
247 - create(1) or delete(0)
248 """
249 r = self.vapi.classify_add_del_session(
250 is_add,
251 table_index,
252 binascii.unhexlify(match),
253 opaque_index=0,
254 metadata=vrfid)
255 self.assertIsNotNone(r, msg='No response msg for add_del_session')
256
257 def input_acl_set_interface(self, intf, table_index, is_add=1):
258 """Configure Input ACL interface
259
260 :param VppInterface intf: Interface to apply Input ACL feature.
261 :param int table_index: table index to identify classify table.
262 :param int is_add: option to configure classify session.
263 - enable(1) or disable(0)
264 """
265 r = self.vapi.input_acl_set_interface(
266 is_add,
267 intf.sw_if_index,
268 ip6_table_index=table_index)
269 self.assertIsNotNone(r, msg='No response msg for acl_set_interface')
270
271 def output_acl_set_interface(self, intf, table_index, is_add=1):
272 """Configure Output ACL interface
273
274 :param VppInterface intf: Interface to apply Output ACL feature.
275 :param int table_index: table index to identify classify table.
276 :param int is_add: option to configure classify session.
277 - enable(1) or disable(0)
278 """
279 r = self.vapi.output_acl_set_interface(
280 is_add,
281 intf.sw_if_index,
282 ip6_table_index=table_index)
283 self.assertIsNotNone(r, msg='No response msg for acl_set_interface')
284
285
286class TestClassifierIP6(TestClassifier):
287 """ Classifier IP6 Test Case """
288
289 def test_iacl_src_ip(self):
290 """ Source IP6 iACL test
291
292 Test scenario for basic IP ACL with source IP
293 - Create IPv6 stream for pg0 -> pg1 interface.
294 - Create iACL with source IP address.
295 - Send and verify received packets on pg1 interface.
296 """
297
298 # Basic iACL testing with source IP
299 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
300 self.pg0.add_stream(pkts)
301
302 key = 'ip6_src'
303 self.create_classify_table(
304 key,
305 self.build_ip6_mask(src_ip='ffffffffffffffffffffffffffffffff'))
306 self.create_classify_session(
307 self.acl_tbl_idx.get(key),
308 self.build_ip6_match(src_ip=self.pg0.remote_ip6))
309 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
310 self.acl_active_table = key
311
312 self.pg_enable_capture(self.pg_interfaces)
313 self.pg_start()
314
315 pkts = self.pg1.get_capture(len(pkts))
316 self.verify_capture(self.pg1, pkts)
317 self.pg0.assert_nothing_captured(remark="packets forwarded")
318 self.pg2.assert_nothing_captured(remark="packets forwarded")
319
320 def test_iacl_dst_ip(self):
321 """ Destination IP6 iACL test
322
323 Test scenario for basic IP ACL with destination IP
324 - Create IPv6 stream for pg0 -> pg1 interface.
325 - Create iACL with destination IP address.
326 - Send and verify received packets on pg1 interface.
327 """
328
329 # Basic iACL testing with destination IP
330 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
331 self.pg0.add_stream(pkts)
332
333 key = 'ip6_dst'
334 self.create_classify_table(
335 key,
336 self.build_ip6_mask(dst_ip='ffffffffffffffffffffffffffffffff'))
337 self.create_classify_session(
338 self.acl_tbl_idx.get(key),
339 self.build_ip6_match(dst_ip=self.pg1.remote_ip6))
340 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
341 self.acl_active_table = key
342
343 self.pg_enable_capture(self.pg_interfaces)
344 self.pg_start()
345
346 pkts = self.pg1.get_capture(len(pkts))
347 self.verify_capture(self.pg1, pkts)
348 self.pg0.assert_nothing_captured(remark="packets forwarded")
349 self.pg2.assert_nothing_captured(remark="packets forwarded")
350
351 def test_iacl_src_dst_ip(self):
352 """ Source and destination IP6 iACL test
353
354 Test scenario for basic IP ACL with source and destination IP
355 - Create IPv4 stream for pg0 -> pg1 interface.
356 - Create iACL with source and destination IP addresses.
357 - Send and verify received packets on pg1 interface.
358 """
359
360 # Basic iACL testing with source and destination IP
361 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
362 self.pg0.add_stream(pkts)
363
364 key = 'ip6'
365 self.create_classify_table(
366 key,
367 self.build_ip6_mask(src_ip='ffffffffffffffffffffffffffffffff',
368 dst_ip='ffffffffffffffffffffffffffffffff'))
369 self.create_classify_session(
370 self.acl_tbl_idx.get(key),
371 self.build_ip6_match(src_ip=self.pg0.remote_ip6,
372 dst_ip=self.pg1.remote_ip6))
373 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
374 self.acl_active_table = key
375
376 self.pg_enable_capture(self.pg_interfaces)
377 self.pg_start()
378
379 pkts = self.pg1.get_capture(len(pkts))
380 self.verify_capture(self.pg1, pkts)
381 self.pg0.assert_nothing_captured(remark="packets forwarded")
382 self.pg2.assert_nothing_captured(remark="packets forwarded")
383
384
385# Tests split to different test case classes because of issue reported in
386# ticket VPP-1336
387class TestClassifierIP6UDP(TestClassifier):
388 """ Classifier IP6 UDP proto Test Case """
389
390 def test_iacl_proto_udp(self):
391 """ IP6 UDP protocol iACL test
392
393 Test scenario for basic protocol ACL with UDP protocol
394 - Create IPv6 stream for pg0 -> pg1 interface.
395 - Create iACL with UDP IP protocol.
396 - Send and verify received packets on pg1 interface.
397 """
398
399 # Basic iACL testing with UDP protocol
400 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
401 self.pg0.add_stream(pkts)
402
403 key = 'nh_udp'
404 self.create_classify_table(key, self.build_ip6_mask(nh='ff'))
405 self.create_classify_session(
406 self.acl_tbl_idx.get(key),
407 self.build_ip6_match(nh=socket.IPPROTO_UDP))
408 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
409 self.acl_active_table = key
410
411 self.pg_enable_capture(self.pg_interfaces)
412 self.pg_start()
413
414 pkts = self.pg1.get_capture(len(pkts))
415 self.verify_capture(self.pg1, pkts)
416 self.pg0.assert_nothing_captured(remark="packets forwarded")
417 self.pg2.assert_nothing_captured(remark="packets forwarded")
418
419 def test_iacl_proto_udp_sport(self):
420 """ IP6 UDP source port iACL test
421
422 Test scenario for basic protocol ACL with UDP and sport
423 - Create IPv6 stream for pg0 -> pg1 interface.
424 - Create iACL with UDP IP protocol and defined sport.
425 - Send and verify received packets on pg1 interface.
426 """
427
428 # Basic iACL testing with UDP and sport
429 sport = 38
430 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
431 UDP(sport=sport, dport=5678))
432 self.pg0.add_stream(pkts)
433
434 key = 'nh_udp_sport'
435 self.create_classify_table(
436 key, self.build_ip6_mask(nh='ff', src_port='ffff'))
437 self.create_classify_session(
438 self.acl_tbl_idx.get(key),
439 self.build_ip6_match(nh=socket.IPPROTO_UDP, src_port=sport))
440 self.input_acl_set_interface(
441 self.pg0, self.acl_tbl_idx.get(key))
442 self.acl_active_table = key
443
444 self.pg_enable_capture(self.pg_interfaces)
445 self.pg_start()
446
447 pkts = self.pg1.get_capture(len(pkts))
448 self.verify_capture(self.pg1, pkts)
449 self.pg0.assert_nothing_captured(remark="packets forwarded")
450 self.pg2.assert_nothing_captured(remark="packets forwarded")
451
452 def test_iacl_proto_udp_dport(self):
453 """ IP6 UDP destination port iACL test
454
455 Test scenario for basic protocol ACL with UDP and dport
456 - Create IPv6 stream for pg0 -> pg1 interface.
457 - Create iACL with UDP IP protocol and defined dport.
458 - Send and verify received packets on pg1 interface.
459 """
460
461 # Basic iACL testing with UDP and dport
462 dport = 427
463 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
464 UDP(sport=1234, dport=dport))
465 self.pg0.add_stream(pkts)
466
467 key = 'nh_udp_dport'
468 self.create_classify_table(
469 key, self.build_ip6_mask(nh='ff', dst_port='ffff'))
470 self.create_classify_session(
471 self.acl_tbl_idx.get(key),
472 self.build_ip6_match(nh=socket.IPPROTO_UDP, dst_port=dport))
473 self.input_acl_set_interface(
474 self.pg0, self.acl_tbl_idx.get(key))
475 self.acl_active_table = key
476
477 self.pg_enable_capture(self.pg_interfaces)
478 self.pg_start()
479
480 pkts = self.pg1.get_capture(len(pkts))
481 self.verify_capture(self.pg1, pkts)
482 self.pg0.assert_nothing_captured(remark="packets forwarded")
483 self.pg2.assert_nothing_captured(remark="packets forwarded")
484
485 def test_iacl_proto_udp_sport_dport(self):
486 """ IP6 UDP source and destination ports iACL test
487
488 Test scenario for basic protocol ACL with UDP and sport and dport
489 - Create IPv6 stream for pg0 -> pg1 interface.
490 - Create iACL with UDP IP protocol and defined sport and dport.
491 - Send and verify received packets on pg1 interface.
492 """
493
494 # Basic iACL testing with UDP and sport and dport
495 sport = 13720
496 dport = 9080
497 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
498 UDP(sport=sport, dport=dport))
499 self.pg0.add_stream(pkts)
500
501 key = 'nh_udp_ports'
502 self.create_classify_table(
503 key,
504 self.build_ip6_mask(nh='ff', src_port='ffff', dst_port='ffff'))
505 self.create_classify_session(
506 self.acl_tbl_idx.get(key),
507 self.build_ip6_match(nh=socket.IPPROTO_UDP, src_port=sport,
508 dst_port=dport))
509 self.input_acl_set_interface(
510 self.pg0, self.acl_tbl_idx.get(key))
511 self.acl_active_table = key
512
513 self.pg_enable_capture(self.pg_interfaces)
514 self.pg_start()
515
516 pkts = self.pg1.get_capture(len(pkts))
517 self.verify_capture(self.pg1, pkts)
518 self.pg0.assert_nothing_captured(remark="packets forwarded")
519 self.pg2.assert_nothing_captured(remark="packets forwarded")
520
521
522class TestClassifierIP6TCP(TestClassifier):
523 """ Classifier IP6 TCP proto Test Case """
524
525 def test_iacl_proto_tcp(self):
526 """ IP6 TCP protocol iACL test
527
528 Test scenario for basic protocol ACL with TCP protocol
529 - Create IPv6 stream for pg0 -> pg1 interface.
530 - Create iACL with TCP IP protocol.
531 - Send and verify received packets on pg1 interface.
532 """
533
534 # Basic iACL testing with TCP protocol
535 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
536 TCP(sport=1234, dport=5678))
537 self.pg0.add_stream(pkts)
538
539 key = 'nh_tcp'
540 self.create_classify_table(key, self.build_ip6_mask(nh='ff'))
541 self.create_classify_session(
542 self.acl_tbl_idx.get(key),
543 self.build_ip6_match(nh=socket.IPPROTO_TCP))
544 self.input_acl_set_interface(
545 self.pg0, self.acl_tbl_idx.get(key))
546 self.acl_active_table = key
547
548 self.pg_enable_capture(self.pg_interfaces)
549 self.pg_start()
550
551 pkts = self.pg1.get_capture(len(pkts))
552 self.verify_capture(self.pg1, pkts, TCP)
553 self.pg0.assert_nothing_captured(remark="packets forwarded")
554 self.pg2.assert_nothing_captured(remark="packets forwarded")
555
556 def test_iacl_proto_tcp_sport(self):
557 """ IP6 TCP source port iACL test
558
559 Test scenario for basic protocol ACL with TCP and sport
560 - Create IPv6 stream for pg0 -> pg1 interface.
561 - Create iACL with TCP IP protocol and defined sport.
562 - Send and verify received packets on pg1 interface.
563 """
564
565 # Basic iACL testing with TCP and sport
566 sport = 38
567 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
568 TCP(sport=sport, dport=5678))
569 self.pg0.add_stream(pkts)
570
571 key = 'nh_tcp_sport'
572 self.create_classify_table(
573 key, self.build_ip6_mask(nh='ff', src_port='ffff'))
574 self.create_classify_session(
575 self.acl_tbl_idx.get(key),
576 self.build_ip6_match(nh=socket.IPPROTO_TCP, src_port=sport))
577 self.input_acl_set_interface(
578 self.pg0, self.acl_tbl_idx.get(key))
579 self.acl_active_table = key
580
581 self.pg_enable_capture(self.pg_interfaces)
582 self.pg_start()
583
584 pkts = self.pg1.get_capture(len(pkts))
585 self.verify_capture(self.pg1, pkts, TCP)
586 self.pg0.assert_nothing_captured(remark="packets forwarded")
587 self.pg2.assert_nothing_captured(remark="packets forwarded")
588
589 def test_iacl_proto_tcp_dport(self):
590 """ IP6 TCP destination port iACL test
591
592 Test scenario for basic protocol ACL with TCP and dport
593 - Create IPv6 stream for pg0 -> pg1 interface.
594 - Create iACL with TCP IP protocol and defined dport.
595 - Send and verify received packets on pg1 interface.
596 """
597
598 # Basic iACL testing with TCP and dport
599 dport = 427
600 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
601 TCP(sport=1234, dport=dport))
602 self.pg0.add_stream(pkts)
603
604 key = 'nh_tcp_dport'
605 self.create_classify_table(
606 key, self.build_ip6_mask(nh='ff', dst_port='ffff'))
607 self.create_classify_session(
608 self.acl_tbl_idx.get(key),
609 self.build_ip6_match(nh=socket.IPPROTO_TCP, dst_port=dport))
610 self.input_acl_set_interface(
611 self.pg0, self.acl_tbl_idx.get(key))
612 self.acl_active_table = key
613
614 self.pg_enable_capture(self.pg_interfaces)
615 self.pg_start()
616
617 pkts = self.pg1.get_capture(len(pkts))
618 self.verify_capture(self.pg1, pkts, TCP)
619 self.pg0.assert_nothing_captured(remark="packets forwarded")
620 self.pg2.assert_nothing_captured(remark="packets forwarded")
621
622 def test_iacl_proto_tcp_sport_dport(self):
623 """ IP6 TCP source and destination ports iACL test
624
625 Test scenario for basic protocol ACL with TCP and sport and dport
626 - Create IPv6 stream for pg0 -> pg1 interface.
627 - Create iACL with TCP IP protocol and defined sport and dport.
628 - Send and verify received packets on pg1 interface.
629 """
630
631 # Basic iACL testing with TCP and sport and dport
632 sport = 13720
633 dport = 9080
634 pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes,
635 TCP(sport=sport, dport=dport))
636 self.pg0.add_stream(pkts)
637
638 key = 'nh_tcp_ports'
639 self.create_classify_table(
640 key,
641 self.build_ip6_mask(nh='ff', src_port='ffff', dst_port='ffff'))
642 self.create_classify_session(
643 self.acl_tbl_idx.get(key),
644 self.build_ip6_match(nh=socket.IPPROTO_TCP, src_port=sport,
645 dst_port=dport))
646 self.input_acl_set_interface(
647 self.pg0, self.acl_tbl_idx.get(key))
648 self.acl_active_table = key
649
650 self.pg_enable_capture(self.pg_interfaces)
651 self.pg_start()
652
653 pkts = self.pg1.get_capture(len(pkts))
654 self.verify_capture(self.pg1, pkts, TCP)
655 self.pg0.assert_nothing_captured(remark="packets forwarded")
656 self.pg2.assert_nothing_captured(remark="packets forwarded")
657
658
659class TestClassifierIP6Out(TestClassifier):
660 """ Classifier output IP6 Test Case """
661
662 def test_acl_ip_out(self):
663 """ Output IP6 ACL test
664
665 Test scenario for basic IP ACL with source IP
666 - Create IPv6 stream for pg1 -> pg0 interface.
667 - Create ACL with source IP address.
668 - Send and verify received packets on pg0 interface.
669 """
670
671 # Basic oACL testing with source IP
672 pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes)
673 self.pg1.add_stream(pkts)
674
675 key = 'ip6_out'
676 self.create_classify_table(
677 key,
678 self.build_ip6_mask(src_ip='ffffffffffffffffffffffffffffffff'),
679 data_offset=0)
680 self.create_classify_session(
681 self.acl_tbl_idx.get(key),
682 self.build_ip6_match(src_ip=self.pg1.remote_ip6))
683 self.output_acl_set_interface(
684 self.pg0, self.acl_tbl_idx.get(key))
685 self.acl_active_table = key
686
687 self.pg_enable_capture(self.pg_interfaces)
688 self.pg_start()
689
690 pkts = self.pg0.get_capture(len(pkts))
691 self.verify_capture(self.pg0, pkts)
692 self.pg1.assert_nothing_captured(remark="packets forwarded")
693 self.pg2.assert_nothing_captured(remark="packets forwarded")
694
695
696class TestClassifierIP6MAC(TestClassifier):
697 """ Classifier IP6 MAC Test Case """
698
699 def test_acl_mac(self):
700 """ IP6 MAC iACL test
701
702 Test scenario for basic MAC ACL with source MAC
703 - Create IPv6 stream for pg0 -> pg2 interface.
704 - Create ACL with source MAC address.
705 - Send and verify received packets on pg2 interface.
706 """
707
708 # Basic iACL testing with source MAC
709 pkts = self.create_stream(self.pg0, self.pg2, self.pg_if_packet_sizes)
710 self.pg0.add_stream(pkts)
711
712 key = 'mac'
713 self.create_classify_table(
714 key, self.build_mac_mask(src_mac='ffffffffffff'), data_offset=-14)
715 self.create_classify_session(
716 self.acl_tbl_idx.get(key),
717 self.build_mac_match(src_mac=self.pg0.remote_mac))
718 self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
719 self.acl_active_table = key
720
721 self.pg_enable_capture(self.pg_interfaces)
722 self.pg_start()
723
724 pkts = self.pg2.get_capture(len(pkts))
725 self.verify_capture(self.pg2, pkts)
726 self.pg0.assert_nothing_captured(remark="packets forwarded")
727 self.pg1.assert_nothing_captured(remark="packets forwarded")
728
729
730if __name__ == '__main__':
731 unittest.main(testRunner=VppTestRunner)