npt66: icmp6 alg to handle icmp6 error messages

Support rewriting the inner packet for ICMP6 error messages.

Type: feature
Change-Id: I7e11f53626037075a23310f1cb7e673b0cb52843
Signed-off-by: Ole Troan <otroan@employees.org>
diff --git a/test/test_npt66.py b/test/test_npt66.py
index 44a9e87..c867621 100644
--- a/test/test_npt66.py
+++ b/test/test_npt66.py
@@ -4,7 +4,7 @@
 import ipaddress
 from framework import VppTestCase, VppTestRunner
 
-from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
+from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6DestUnreach
 from scapy.layers.l2 import Ether
 from scapy.packet import Raw
 
@@ -33,7 +33,7 @@
             i.admin_down()
         super(TestNPT66, self).tearDown()
 
-    def send_and_verify(self, internal):
+    def send_and_verify(self, internal, reply_icmp_error=False):
         sendif = self.pg0
         recvif = self.pg1
         local_mac = self.pg0.local_mac
@@ -47,30 +47,57 @@
             / ICMPv6EchoRequest()
             / Raw(b"Request")
         )
+        # print('Sending packet')
+        # p.show2()
         rxs = self.send_and_expect(sendif, p, recvif)
         for rx in rxs:
+            # print('Received packet')
+            # rx.show2()
             original_cksum = rx[ICMPv6EchoRequest].cksum
             del rx[ICMPv6EchoRequest].cksum
             rx = rx.__class__(bytes(rx))
             self.assertEqual(original_cksum, rx[ICMPv6EchoRequest].cksum)
 
             # Generate a replies
-            reply = (
-                Ether(dst=rx[Ether].src, src=local_mac)
-                / IPv6(src=rx[IPv6].dst, dst=rx[IPv6].src)
-                / ICMPv6EchoRequest()
-                / Raw(b"Reply")
-            )
+            if reply_icmp_error:
+                # print('Generating an ICMP error message')
+                reply = (
+                    Ether(dst=rx[Ether].src, src=local_mac)
+                    / IPv6(src=rx[IPv6].dst, dst=rx[IPv6].src)
+                    / ICMPv6DestUnreach()
+                    / rx[IPv6]
+                )
+                # print('Sending ICMP error message reply')
+                # reply.show2()
+                replies = self.send_and_expect(recvif, reply, sendif)
+                for r in replies:
+                    # print('Received ICMP error message reply on the other side')
+                    # r.show2()
+                    self.assertEqual(str(p[IPv6].src), r[IPv6].dst)
+                    original_cksum = r[ICMPv6EchoRequest].cksum
+                    del r[ICMPv6EchoRequest].cksum
+                    r = r.__class__(bytes(r))
+                    self.assertEqual(original_cksum, r[ICMPv6EchoRequest].cksum)
 
-            replies = self.send_and_expect(recvif, reply, sendif)
-            for r in replies:
-                self.assertEqual(str(p[IPv6].src), r[IPv6].dst)
-                original_cksum = r[ICMPv6EchoRequest].cksum
-                del r[ICMPv6EchoRequest].cksum
-                r = r.__class__(bytes(r))
-                self.assertEqual(original_cksum, r[ICMPv6EchoRequest].cksum)
+            else:
+                reply = (
+                    Ether(dst=rx[Ether].src, src=local_mac)
+                    / IPv6(src=rx[IPv6].dst, dst=rx[IPv6].src)
+                    / ICMPv6EchoRequest()
+                    / Raw(b"Reply")
+                )
 
-    def do_test(self, internal, external):
+                replies = self.send_and_expect(recvif, reply, sendif)
+                for r in replies:
+                    r.show2()
+                    self.assertEqual(str(p[IPv6].src), r[IPv6].dst)
+                    original_cksum = r[ICMPv6EchoRequest].cksum
+                    del r[ICMPv6EchoRequest].cksum
+                    r = r.__class__(bytes(r))
+                    self.assertEqual(original_cksum, r[ICMPv6EchoRequest].cksum)
+
+    def do_test(self, internal, external, reply_icmp_error=False):
+        """Add NPT66 binding and send packet"""
         self.vapi.npt66_binding_add_del(
             sw_if_index=self.pg1.sw_if_index,
             internal=internal,
@@ -80,7 +107,7 @@
         ## TODO use route api
         self.vapi.cli(f"ip route add {internal} via {self.pg0.remote_ip6}")
 
-        self.send_and_verify(internal)
+        self.send_and_verify(internal, reply_icmp_error=reply_icmp_error)
 
         self.vapi.npt66_binding_add_del(
             sw_if_index=self.pg1.sw_if_index,
@@ -97,6 +124,17 @@
         self.do_test("fc00:1234::/32", "2001:db8:1::/32")
         self.do_test("fc00:1234::/63", "2001:db8:1::/56")
 
+    def test_npt66_icmp6(self):
+        """Send and receive a packet through NPT66"""
+
+        # Test ICMP6 error packets
+        self.do_test(
+            "fd00:0000:0000::/48", "2001:4650:c3ed::/48", reply_icmp_error=True
+        )
+        self.do_test("fc00:1::/48", "2001:db8:1::/48", reply_icmp_error=True)
+        self.do_test("fc00:1234::/32", "2001:db8:1::/32", reply_icmp_error=True)
+        self.do_test("fc00:1234::/63", "2001:db8:1::/56", reply_icmp_error=True)
+
 
 if __name__ == "__main__":
     unittest.main(testRunner=VppTestRunner)