blob: 307dbabc39b941210cba5649e6549c1e0b16af05 [file] [log] [blame]
Ole Troan6ee3aa42023-08-17 13:36:08 +02001#!/usr/bin/env python3
2
3import unittest
4import ipaddress
Dave Wallace8800f732023-08-31 00:47:44 -04005from framework import VppTestCase
6from asfframework import VppTestRunner
Ole Troan6ee3aa42023-08-17 13:36:08 +02007
Ole Troanff344a92023-10-12 18:54:55 +02008from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6DestUnreach
Ole Troan6ee3aa42023-08-17 13:36:08 +02009from scapy.layers.l2 import Ether
10from scapy.packet import Raw
11
12
13class TestNPT66(VppTestCase):
14 """NPTv6 Test Case"""
15
Ole Troan9587d392023-09-01 14:18:23 +020016 extra_vpp_plugin_config = [
17 "plugin npt66_plugin.so {enable}",
18 ]
19
Ole Troan6ee3aa42023-08-17 13:36:08 +020020 def setUp(self):
21 super(TestNPT66, self).setUp()
22
23 # create 2 pg interfaces
24 self.create_pg_interfaces(range(2))
25
26 for i in self.pg_interfaces:
27 i.admin_up()
28 i.config_ip6()
29 i.resolve_ndp()
30
31 def tearDown(self):
32 for i in self.pg_interfaces:
33 i.unconfig_ip6()
34 i.admin_down()
35 super(TestNPT66, self).tearDown()
36
Ole Troanff344a92023-10-12 18:54:55 +020037 def send_and_verify(self, internal, reply_icmp_error=False):
Ole Troan34850e02023-09-01 14:15:39 +020038 sendif = self.pg0
39 recvif = self.pg1
40 local_mac = self.pg0.local_mac
41 remote_mac = self.pg0.remote_mac
42 src = ipaddress.ip_interface(internal).ip + 1
43 dst = self.pg1.remote_ip6
Ole Troan6ee3aa42023-08-17 13:36:08 +020044
45 p = (
46 Ether(dst=local_mac, src=remote_mac)
47 / IPv6(src=src, dst=dst)
48 / ICMPv6EchoRequest()
Ole Troan34850e02023-09-01 14:15:39 +020049 / Raw(b"Request")
Ole Troan6ee3aa42023-08-17 13:36:08 +020050 )
Ole Troanff344a92023-10-12 18:54:55 +020051 # print('Sending packet')
52 # p.show2()
Ole Troan6ee3aa42023-08-17 13:36:08 +020053 rxs = self.send_and_expect(sendif, p, recvif)
54 for rx in rxs:
Ole Troanff344a92023-10-12 18:54:55 +020055 # print('Received packet')
56 # rx.show2()
Ole Troan6ee3aa42023-08-17 13:36:08 +020057 original_cksum = rx[ICMPv6EchoRequest].cksum
58 del rx[ICMPv6EchoRequest].cksum
59 rx = rx.__class__(bytes(rx))
60 self.assertEqual(original_cksum, rx[ICMPv6EchoRequest].cksum)
61
Ole Troan34850e02023-09-01 14:15:39 +020062 # Generate a replies
Ole Troanff344a92023-10-12 18:54:55 +020063 if reply_icmp_error:
64 # print('Generating an ICMP error message')
65 reply = (
66 Ether(dst=rx[Ether].src, src=local_mac)
67 / IPv6(src=rx[IPv6].dst, dst=rx[IPv6].src)
68 / ICMPv6DestUnreach()
69 / rx[IPv6]
70 )
71 # print('Sending ICMP error message reply')
72 # reply.show2()
73 replies = self.send_and_expect(recvif, reply, sendif)
74 for r in replies:
75 # print('Received ICMP error message reply on the other side')
76 # r.show2()
77 self.assertEqual(str(p[IPv6].src), r[IPv6].dst)
78 original_cksum = r[ICMPv6EchoRequest].cksum
79 del r[ICMPv6EchoRequest].cksum
80 r = r.__class__(bytes(r))
81 self.assertEqual(original_cksum, r[ICMPv6EchoRequest].cksum)
Ole Troan34850e02023-09-01 14:15:39 +020082
Ole Troanff344a92023-10-12 18:54:55 +020083 else:
84 reply = (
85 Ether(dst=rx[Ether].src, src=local_mac)
86 / IPv6(src=rx[IPv6].dst, dst=rx[IPv6].src)
87 / ICMPv6EchoRequest()
88 / Raw(b"Reply")
89 )
Ole Troan34850e02023-09-01 14:15:39 +020090
Ole Troanff344a92023-10-12 18:54:55 +020091 replies = self.send_and_expect(recvif, reply, sendif)
92 for r in replies:
Dave Wallacee3f59e22023-10-30 17:05:23 -040093 # r.show2()
Ole Troanff344a92023-10-12 18:54:55 +020094 self.assertEqual(str(p[IPv6].src), r[IPv6].dst)
95 original_cksum = r[ICMPv6EchoRequest].cksum
96 del r[ICMPv6EchoRequest].cksum
97 r = r.__class__(bytes(r))
98 self.assertEqual(original_cksum, r[ICMPv6EchoRequest].cksum)
99
100 def do_test(self, internal, external, reply_icmp_error=False):
101 """Add NPT66 binding and send packet"""
Ole Troan6ee3aa42023-08-17 13:36:08 +0200102 self.vapi.npt66_binding_add_del(
103 sw_if_index=self.pg1.sw_if_index,
104 internal=internal,
105 external=external,
106 is_add=True,
107 )
Ole Troan34850e02023-09-01 14:15:39 +0200108 ## TODO use route api
Ole Troan6ee3aa42023-08-17 13:36:08 +0200109 self.vapi.cli(f"ip route add {internal} via {self.pg0.remote_ip6}")
110
Ole Troanff344a92023-10-12 18:54:55 +0200111 self.send_and_verify(internal, reply_icmp_error=reply_icmp_error)
Ole Troan6ee3aa42023-08-17 13:36:08 +0200112
113 self.vapi.npt66_binding_add_del(
114 sw_if_index=self.pg1.sw_if_index,
115 internal=internal,
116 external=external,
117 is_add=False,
118 )
119
120 def test_npt66_simple(self):
121 """Send and receive a packet through NPT66"""
122
Ole Troan34850e02023-09-01 14:15:39 +0200123 self.do_test("fd00:0000:0000::/48", "2001:4650:c3ed::/48")
Ole Troan6ee3aa42023-08-17 13:36:08 +0200124 self.do_test("fc00:1::/48", "2001:db8:1::/48")
125 self.do_test("fc00:1234::/32", "2001:db8:1::/32")
126 self.do_test("fc00:1234::/63", "2001:db8:1::/56")
127
Ole Troanff344a92023-10-12 18:54:55 +0200128 def test_npt66_icmp6(self):
129 """Send and receive a packet through NPT66"""
130
131 # Test ICMP6 error packets
132 self.do_test(
133 "fd00:0000:0000::/48", "2001:4650:c3ed::/48", reply_icmp_error=True
134 )
135 self.do_test("fc00:1::/48", "2001:db8:1::/48", reply_icmp_error=True)
136 self.do_test("fc00:1234::/32", "2001:db8:1::/32", reply_icmp_error=True)
137 self.do_test("fc00:1234::/63", "2001:db8:1::/56", reply_icmp_error=True)
138
Ole Troan6ee3aa42023-08-17 13:36:08 +0200139
140if __name__ == "__main__":
141 unittest.main(testRunner=VppTestRunner)