blob: 7479d3029b88a8a4b101388162e281725d0d51cd [file] [log] [blame]
Ole Troan6ee3aa42023-08-17 13:36:08 +02001#!/usr/bin/env python3
2
3import unittest
4import ipaddress
Dave Wallace8800f732023-08-31 00:47:44 -04005from framework import VppTestCase
6from asfframework import VppTestRunner
Dmitry Valter34fa0ce2024-03-11 10:38:46 +00007from config import config
Ole Troan6ee3aa42023-08-17 13:36:08 +02008
Ole Troanff344a92023-10-12 18:54:55 +02009from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6DestUnreach
Ole Troan6ee3aa42023-08-17 13:36:08 +020010from scapy.layers.l2 import Ether
11from scapy.packet import Raw
12
13
Dmitry Valter34fa0ce2024-03-11 10:38:46 +000014@unittest.skipIf("npt66" in config.excluded_plugins, "Exclude NPTv6 plugin tests")
Ole Troan6ee3aa42023-08-17 13:36:08 +020015class TestNPT66(VppTestCase):
16 """NPTv6 Test Case"""
17
Ole Troan9587d392023-09-01 14:18:23 +020018 extra_vpp_plugin_config = [
19 "plugin npt66_plugin.so {enable}",
20 ]
21
Ole Troan6ee3aa42023-08-17 13:36:08 +020022 def setUp(self):
23 super(TestNPT66, self).setUp()
24
25 # create 2 pg interfaces
26 self.create_pg_interfaces(range(2))
27
28 for i in self.pg_interfaces:
29 i.admin_up()
30 i.config_ip6()
31 i.resolve_ndp()
32
33 def tearDown(self):
34 for i in self.pg_interfaces:
35 i.unconfig_ip6()
36 i.admin_down()
37 super(TestNPT66, self).tearDown()
38
Ole Troanff344a92023-10-12 18:54:55 +020039 def send_and_verify(self, internal, reply_icmp_error=False):
Ole Troan34850e02023-09-01 14:15:39 +020040 sendif = self.pg0
41 recvif = self.pg1
42 local_mac = self.pg0.local_mac
43 remote_mac = self.pg0.remote_mac
44 src = ipaddress.ip_interface(internal).ip + 1
45 dst = self.pg1.remote_ip6
Ole Troan6ee3aa42023-08-17 13:36:08 +020046
47 p = (
48 Ether(dst=local_mac, src=remote_mac)
49 / IPv6(src=src, dst=dst)
50 / ICMPv6EchoRequest()
Ole Troan34850e02023-09-01 14:15:39 +020051 / Raw(b"Request")
Ole Troan6ee3aa42023-08-17 13:36:08 +020052 )
Ole Troanff344a92023-10-12 18:54:55 +020053 # print('Sending packet')
54 # p.show2()
Ole Troan6ee3aa42023-08-17 13:36:08 +020055 rxs = self.send_and_expect(sendif, p, recvif)
56 for rx in rxs:
Ole Troanff344a92023-10-12 18:54:55 +020057 # print('Received packet')
58 # rx.show2()
Ole Troan6ee3aa42023-08-17 13:36:08 +020059 original_cksum = rx[ICMPv6EchoRequest].cksum
60 del rx[ICMPv6EchoRequest].cksum
61 rx = rx.__class__(bytes(rx))
62 self.assertEqual(original_cksum, rx[ICMPv6EchoRequest].cksum)
63
Ole Troan34850e02023-09-01 14:15:39 +020064 # Generate a replies
Ole Troanff344a92023-10-12 18:54:55 +020065 if reply_icmp_error:
66 # print('Generating an ICMP error message')
67 reply = (
68 Ether(dst=rx[Ether].src, src=local_mac)
69 / IPv6(src=rx[IPv6].dst, dst=rx[IPv6].src)
70 / ICMPv6DestUnreach()
71 / rx[IPv6]
72 )
73 # print('Sending ICMP error message reply')
74 # reply.show2()
75 replies = self.send_and_expect(recvif, reply, sendif)
76 for r in replies:
77 # print('Received ICMP error message reply on the other side')
78 # r.show2()
79 self.assertEqual(str(p[IPv6].src), r[IPv6].dst)
80 original_cksum = r[ICMPv6EchoRequest].cksum
81 del r[ICMPv6EchoRequest].cksum
82 r = r.__class__(bytes(r))
83 self.assertEqual(original_cksum, r[ICMPv6EchoRequest].cksum)
Ole Troan34850e02023-09-01 14:15:39 +020084
Ole Troanff344a92023-10-12 18:54:55 +020085 else:
86 reply = (
87 Ether(dst=rx[Ether].src, src=local_mac)
88 / IPv6(src=rx[IPv6].dst, dst=rx[IPv6].src)
89 / ICMPv6EchoRequest()
90 / Raw(b"Reply")
91 )
Ole Troan34850e02023-09-01 14:15:39 +020092
Ole Troanff344a92023-10-12 18:54:55 +020093 replies = self.send_and_expect(recvif, reply, sendif)
94 for r in replies:
Dave Wallacee3f59e22023-10-30 17:05:23 -040095 # r.show2()
Ole Troanff344a92023-10-12 18:54:55 +020096 self.assertEqual(str(p[IPv6].src), r[IPv6].dst)
97 original_cksum = r[ICMPv6EchoRequest].cksum
98 del r[ICMPv6EchoRequest].cksum
99 r = r.__class__(bytes(r))
100 self.assertEqual(original_cksum, r[ICMPv6EchoRequest].cksum)
101
102 def do_test(self, internal, external, reply_icmp_error=False):
103 """Add NPT66 binding and send packet"""
Ole Troan6ee3aa42023-08-17 13:36:08 +0200104 self.vapi.npt66_binding_add_del(
105 sw_if_index=self.pg1.sw_if_index,
106 internal=internal,
107 external=external,
108 is_add=True,
109 )
Ole Troan34850e02023-09-01 14:15:39 +0200110 ## TODO use route api
Ole Troan6ee3aa42023-08-17 13:36:08 +0200111 self.vapi.cli(f"ip route add {internal} via {self.pg0.remote_ip6}")
112
Ole Troanff344a92023-10-12 18:54:55 +0200113 self.send_and_verify(internal, reply_icmp_error=reply_icmp_error)
Ole Troan6ee3aa42023-08-17 13:36:08 +0200114
115 self.vapi.npt66_binding_add_del(
116 sw_if_index=self.pg1.sw_if_index,
117 internal=internal,
118 external=external,
119 is_add=False,
120 )
121
122 def test_npt66_simple(self):
123 """Send and receive a packet through NPT66"""
124
Ole Troan34850e02023-09-01 14:15:39 +0200125 self.do_test("fd00:0000:0000::/48", "2001:4650:c3ed::/48")
Ole Troan6ee3aa42023-08-17 13:36:08 +0200126 self.do_test("fc00:1::/48", "2001:db8:1::/48")
127 self.do_test("fc00:1234::/32", "2001:db8:1::/32")
128 self.do_test("fc00:1234::/63", "2001:db8:1::/56")
129
Ole Troanff344a92023-10-12 18:54:55 +0200130 def test_npt66_icmp6(self):
131 """Send and receive a packet through NPT66"""
132
133 # Test ICMP6 error packets
134 self.do_test(
135 "fd00:0000:0000::/48", "2001:4650:c3ed::/48", reply_icmp_error=True
136 )
137 self.do_test("fc00:1::/48", "2001:db8:1::/48", reply_icmp_error=True)
138 self.do_test("fc00:1234::/32", "2001:db8:1::/32", reply_icmp_error=True)
139 self.do_test("fc00:1234::/63", "2001:db8:1::/56", reply_icmp_error=True)
140
Ole Troan6ee3aa42023-08-17 13:36:08 +0200141
142if __name__ == "__main__":
143 unittest.main(testRunner=VppTestRunner)