blob: cdb0badfcd4b221324233b7e0527121766e2095e [file] [log] [blame]
Ole Troan6ee3aa42023-08-17 13:36:08 +02001#!/usr/bin/env python3
2
3import unittest
4import ipaddress
5from framework import VppTestCase, VppTestRunner
6
Ole Troanff344a92023-10-12 18:54:55 +02007from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6DestUnreach
Ole Troan6ee3aa42023-08-17 13:36:08 +02008from scapy.layers.l2 import Ether
9from scapy.packet import Raw
10
11
12class TestNPT66(VppTestCase):
13 """NPTv6 Test Case"""
14
Ole Troan9587d392023-09-01 14:18:23 +020015 extra_vpp_plugin_config = [
16 "plugin npt66_plugin.so {enable}",
17 ]
18
Ole Troan6ee3aa42023-08-17 13:36:08 +020019 def setUp(self):
20 super(TestNPT66, self).setUp()
21
22 # create 2 pg interfaces
23 self.create_pg_interfaces(range(2))
24
25 for i in self.pg_interfaces:
26 i.admin_up()
27 i.config_ip6()
28 i.resolve_ndp()
29
30 def tearDown(self):
31 for i in self.pg_interfaces:
32 i.unconfig_ip6()
33 i.admin_down()
34 super(TestNPT66, self).tearDown()
35
Ole Troanff344a92023-10-12 18:54:55 +020036 def send_and_verify(self, internal, reply_icmp_error=False):
Ole Troan34850e02023-09-01 14:15:39 +020037 sendif = self.pg0
38 recvif = self.pg1
39 local_mac = self.pg0.local_mac
40 remote_mac = self.pg0.remote_mac
41 src = ipaddress.ip_interface(internal).ip + 1
42 dst = self.pg1.remote_ip6
Ole Troan6ee3aa42023-08-17 13:36:08 +020043
44 p = (
45 Ether(dst=local_mac, src=remote_mac)
46 / IPv6(src=src, dst=dst)
47 / ICMPv6EchoRequest()
Ole Troan34850e02023-09-01 14:15:39 +020048 / Raw(b"Request")
Ole Troan6ee3aa42023-08-17 13:36:08 +020049 )
Ole Troanff344a92023-10-12 18:54:55 +020050 # print('Sending packet')
51 # p.show2()
Ole Troan6ee3aa42023-08-17 13:36:08 +020052 rxs = self.send_and_expect(sendif, p, recvif)
53 for rx in rxs:
Ole Troanff344a92023-10-12 18:54:55 +020054 # print('Received packet')
55 # rx.show2()
Ole Troan6ee3aa42023-08-17 13:36:08 +020056 original_cksum = rx[ICMPv6EchoRequest].cksum
57 del rx[ICMPv6EchoRequest].cksum
58 rx = rx.__class__(bytes(rx))
59 self.assertEqual(original_cksum, rx[ICMPv6EchoRequest].cksum)
60
Ole Troan34850e02023-09-01 14:15:39 +020061 # Generate a replies
Ole Troanff344a92023-10-12 18:54:55 +020062 if reply_icmp_error:
63 # print('Generating an ICMP error message')
64 reply = (
65 Ether(dst=rx[Ether].src, src=local_mac)
66 / IPv6(src=rx[IPv6].dst, dst=rx[IPv6].src)
67 / ICMPv6DestUnreach()
68 / rx[IPv6]
69 )
70 # print('Sending ICMP error message reply')
71 # reply.show2()
72 replies = self.send_and_expect(recvif, reply, sendif)
73 for r in replies:
74 # print('Received ICMP error message reply on the other side')
75 # r.show2()
76 self.assertEqual(str(p[IPv6].src), r[IPv6].dst)
77 original_cksum = r[ICMPv6EchoRequest].cksum
78 del r[ICMPv6EchoRequest].cksum
79 r = r.__class__(bytes(r))
80 self.assertEqual(original_cksum, r[ICMPv6EchoRequest].cksum)
Ole Troan34850e02023-09-01 14:15:39 +020081
Ole Troanff344a92023-10-12 18:54:55 +020082 else:
83 reply = (
84 Ether(dst=rx[Ether].src, src=local_mac)
85 / IPv6(src=rx[IPv6].dst, dst=rx[IPv6].src)
86 / ICMPv6EchoRequest()
87 / Raw(b"Reply")
88 )
Ole Troan34850e02023-09-01 14:15:39 +020089
Ole Troanff344a92023-10-12 18:54:55 +020090 replies = self.send_and_expect(recvif, reply, sendif)
91 for r in replies:
Dave Wallacee3f59e22023-10-30 17:05:23 -040092 # r.show2()
Ole Troanff344a92023-10-12 18:54:55 +020093 self.assertEqual(str(p[IPv6].src), r[IPv6].dst)
94 original_cksum = r[ICMPv6EchoRequest].cksum
95 del r[ICMPv6EchoRequest].cksum
96 r = r.__class__(bytes(r))
97 self.assertEqual(original_cksum, r[ICMPv6EchoRequest].cksum)
98
99 def do_test(self, internal, external, reply_icmp_error=False):
100 """Add NPT66 binding and send packet"""
Ole Troan6ee3aa42023-08-17 13:36:08 +0200101 self.vapi.npt66_binding_add_del(
102 sw_if_index=self.pg1.sw_if_index,
103 internal=internal,
104 external=external,
105 is_add=True,
106 )
Ole Troan34850e02023-09-01 14:15:39 +0200107 ## TODO use route api
Ole Troan6ee3aa42023-08-17 13:36:08 +0200108 self.vapi.cli(f"ip route add {internal} via {self.pg0.remote_ip6}")
109
Ole Troanff344a92023-10-12 18:54:55 +0200110 self.send_and_verify(internal, reply_icmp_error=reply_icmp_error)
Ole Troan6ee3aa42023-08-17 13:36:08 +0200111
112 self.vapi.npt66_binding_add_del(
113 sw_if_index=self.pg1.sw_if_index,
114 internal=internal,
115 external=external,
116 is_add=False,
117 )
118
119 def test_npt66_simple(self):
120 """Send and receive a packet through NPT66"""
121
Ole Troan34850e02023-09-01 14:15:39 +0200122 self.do_test("fd00:0000:0000::/48", "2001:4650:c3ed::/48")
Ole Troan6ee3aa42023-08-17 13:36:08 +0200123 self.do_test("fc00:1::/48", "2001:db8:1::/48")
124 self.do_test("fc00:1234::/32", "2001:db8:1::/32")
125 self.do_test("fc00:1234::/63", "2001:db8:1::/56")
126
Ole Troanff344a92023-10-12 18:54:55 +0200127 def test_npt66_icmp6(self):
128 """Send and receive a packet through NPT66"""
129
130 # Test ICMP6 error packets
131 self.do_test(
132 "fd00:0000:0000::/48", "2001:4650:c3ed::/48", reply_icmp_error=True
133 )
134 self.do_test("fc00:1::/48", "2001:db8:1::/48", reply_icmp_error=True)
135 self.do_test("fc00:1234::/32", "2001:db8:1::/32", reply_icmp_error=True)
136 self.do_test("fc00:1234::/63", "2001:db8:1::/56", reply_icmp_error=True)
137
Ole Troan6ee3aa42023-08-17 13:36:08 +0200138
139if __name__ == "__main__":
140 unittest.main(testRunner=VppTestRunner)