npt66: icmp6 alg to handle icmp6 error messages
Support rewriting the inner packet for ICMP6 error messages.
Type: feature
Change-Id: I7e11f53626037075a23310f1cb7e673b0cb52843
Signed-off-by: Ole Troan <otroan@employees.org>
diff --git a/test/test_npt66.py b/test/test_npt66.py
index 44a9e87..c867621 100644
--- a/test/test_npt66.py
+++ b/test/test_npt66.py
@@ -4,7 +4,7 @@
import ipaddress
from framework import VppTestCase, VppTestRunner
-from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
+from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6DestUnreach
from scapy.layers.l2 import Ether
from scapy.packet import Raw
@@ -33,7 +33,7 @@
i.admin_down()
super(TestNPT66, self).tearDown()
- def send_and_verify(self, internal):
+ def send_and_verify(self, internal, reply_icmp_error=False):
sendif = self.pg0
recvif = self.pg1
local_mac = self.pg0.local_mac
@@ -47,30 +47,57 @@
/ ICMPv6EchoRequest()
/ Raw(b"Request")
)
+ # print('Sending packet')
+ # p.show2()
rxs = self.send_and_expect(sendif, p, recvif)
for rx in rxs:
+ # print('Received packet')
+ # rx.show2()
original_cksum = rx[ICMPv6EchoRequest].cksum
del rx[ICMPv6EchoRequest].cksum
rx = rx.__class__(bytes(rx))
self.assertEqual(original_cksum, rx[ICMPv6EchoRequest].cksum)
# Generate a replies
- reply = (
- Ether(dst=rx[Ether].src, src=local_mac)
- / IPv6(src=rx[IPv6].dst, dst=rx[IPv6].src)
- / ICMPv6EchoRequest()
- / Raw(b"Reply")
- )
+ if reply_icmp_error:
+ # print('Generating an ICMP error message')
+ reply = (
+ Ether(dst=rx[Ether].src, src=local_mac)
+ / IPv6(src=rx[IPv6].dst, dst=rx[IPv6].src)
+ / ICMPv6DestUnreach()
+ / rx[IPv6]
+ )
+ # print('Sending ICMP error message reply')
+ # reply.show2()
+ replies = self.send_and_expect(recvif, reply, sendif)
+ for r in replies:
+ # print('Received ICMP error message reply on the other side')
+ # r.show2()
+ self.assertEqual(str(p[IPv6].src), r[IPv6].dst)
+ original_cksum = r[ICMPv6EchoRequest].cksum
+ del r[ICMPv6EchoRequest].cksum
+ r = r.__class__(bytes(r))
+ self.assertEqual(original_cksum, r[ICMPv6EchoRequest].cksum)
- replies = self.send_and_expect(recvif, reply, sendif)
- for r in replies:
- self.assertEqual(str(p[IPv6].src), r[IPv6].dst)
- original_cksum = r[ICMPv6EchoRequest].cksum
- del r[ICMPv6EchoRequest].cksum
- r = r.__class__(bytes(r))
- self.assertEqual(original_cksum, r[ICMPv6EchoRequest].cksum)
+ else:
+ reply = (
+ Ether(dst=rx[Ether].src, src=local_mac)
+ / IPv6(src=rx[IPv6].dst, dst=rx[IPv6].src)
+ / ICMPv6EchoRequest()
+ / Raw(b"Reply")
+ )
- def do_test(self, internal, external):
+ replies = self.send_and_expect(recvif, reply, sendif)
+ for r in replies:
+ r.show2()
+ self.assertEqual(str(p[IPv6].src), r[IPv6].dst)
+ original_cksum = r[ICMPv6EchoRequest].cksum
+ del r[ICMPv6EchoRequest].cksum
+ r = r.__class__(bytes(r))
+ self.assertEqual(original_cksum, r[ICMPv6EchoRequest].cksum)
+
+ def do_test(self, internal, external, reply_icmp_error=False):
+ """Add NPT66 binding and send packet"""
self.vapi.npt66_binding_add_del(
sw_if_index=self.pg1.sw_if_index,
internal=internal,
@@ -80,7 +107,7 @@
## TODO use route api
self.vapi.cli(f"ip route add {internal} via {self.pg0.remote_ip6}")
- self.send_and_verify(internal)
+ self.send_and_verify(internal, reply_icmp_error=reply_icmp_error)
self.vapi.npt66_binding_add_del(
sw_if_index=self.pg1.sw_if_index,
@@ -97,6 +124,17 @@
self.do_test("fc00:1234::/32", "2001:db8:1::/32")
self.do_test("fc00:1234::/63", "2001:db8:1::/56")
+ def test_npt66_icmp6(self):
+ """Send and receive a packet through NPT66"""
+
+ # Test ICMP6 error packets
+ self.do_test(
+ "fd00:0000:0000::/48", "2001:4650:c3ed::/48", reply_icmp_error=True
+ )
+ self.do_test("fc00:1::/48", "2001:db8:1::/48", reply_icmp_error=True)
+ self.do_test("fc00:1234::/32", "2001:db8:1::/32", reply_icmp_error=True)
+ self.do_test("fc00:1234::/63", "2001:db8:1::/56", reply_icmp_error=True)
+
if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)