| #!/usr/bin/env python3 |
| |
| import unittest |
| import ipaddress |
| from framework import VppTestCase |
| from asfframework import VppTestRunner |
| |
| from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6DestUnreach |
| from scapy.layers.l2 import Ether |
| from scapy.packet import Raw |
| |
| |
| class TestNPT66(VppTestCase): |
| """NPTv6 Test Case""" |
| |
| extra_vpp_plugin_config = [ |
| "plugin npt66_plugin.so {enable}", |
| ] |
| |
| def setUp(self): |
| super(TestNPT66, self).setUp() |
| |
| # create 2 pg interfaces |
| self.create_pg_interfaces(range(2)) |
| |
| for i in self.pg_interfaces: |
| i.admin_up() |
| i.config_ip6() |
| i.resolve_ndp() |
| |
| def tearDown(self): |
| for i in self.pg_interfaces: |
| i.unconfig_ip6() |
| i.admin_down() |
| super(TestNPT66, self).tearDown() |
| |
| def send_and_verify(self, internal, reply_icmp_error=False): |
| sendif = self.pg0 |
| recvif = self.pg1 |
| local_mac = self.pg0.local_mac |
| remote_mac = self.pg0.remote_mac |
| src = ipaddress.ip_interface(internal).ip + 1 |
| dst = self.pg1.remote_ip6 |
| |
| p = ( |
| Ether(dst=local_mac, src=remote_mac) |
| / IPv6(src=src, dst=dst) |
| / ICMPv6EchoRequest() |
| / Raw(b"Request") |
| ) |
| # print('Sending packet') |
| # p.show2() |
| rxs = self.send_and_expect(sendif, p, recvif) |
| for rx in rxs: |
| # print('Received packet') |
| # rx.show2() |
| original_cksum = rx[ICMPv6EchoRequest].cksum |
| del rx[ICMPv6EchoRequest].cksum |
| rx = rx.__class__(bytes(rx)) |
| self.assertEqual(original_cksum, rx[ICMPv6EchoRequest].cksum) |
| |
| # Generate a replies |
| if reply_icmp_error: |
| # print('Generating an ICMP error message') |
| reply = ( |
| Ether(dst=rx[Ether].src, src=local_mac) |
| / IPv6(src=rx[IPv6].dst, dst=rx[IPv6].src) |
| / ICMPv6DestUnreach() |
| / rx[IPv6] |
| ) |
| # print('Sending ICMP error message reply') |
| # reply.show2() |
| replies = self.send_and_expect(recvif, reply, sendif) |
| for r in replies: |
| # print('Received ICMP error message reply on the other side') |
| # r.show2() |
| self.assertEqual(str(p[IPv6].src), r[IPv6].dst) |
| original_cksum = r[ICMPv6EchoRequest].cksum |
| del r[ICMPv6EchoRequest].cksum |
| r = r.__class__(bytes(r)) |
| self.assertEqual(original_cksum, r[ICMPv6EchoRequest].cksum) |
| |
| else: |
| reply = ( |
| Ether(dst=rx[Ether].src, src=local_mac) |
| / IPv6(src=rx[IPv6].dst, dst=rx[IPv6].src) |
| / ICMPv6EchoRequest() |
| / Raw(b"Reply") |
| ) |
| |
| replies = self.send_and_expect(recvif, reply, sendif) |
| for r in replies: |
| # r.show2() |
| self.assertEqual(str(p[IPv6].src), r[IPv6].dst) |
| original_cksum = r[ICMPv6EchoRequest].cksum |
| del r[ICMPv6EchoRequest].cksum |
| r = r.__class__(bytes(r)) |
| self.assertEqual(original_cksum, r[ICMPv6EchoRequest].cksum) |
| |
| def do_test(self, internal, external, reply_icmp_error=False): |
| """Add NPT66 binding and send packet""" |
| self.vapi.npt66_binding_add_del( |
| sw_if_index=self.pg1.sw_if_index, |
| internal=internal, |
| external=external, |
| is_add=True, |
| ) |
| ## TODO use route api |
| self.vapi.cli(f"ip route add {internal} via {self.pg0.remote_ip6}") |
| |
| self.send_and_verify(internal, reply_icmp_error=reply_icmp_error) |
| |
| self.vapi.npt66_binding_add_del( |
| sw_if_index=self.pg1.sw_if_index, |
| internal=internal, |
| external=external, |
| is_add=False, |
| ) |
| |
| def test_npt66_simple(self): |
| """Send and receive a packet through NPT66""" |
| |
| self.do_test("fd00:0000:0000::/48", "2001:4650:c3ed::/48") |
| self.do_test("fc00:1::/48", "2001:db8:1::/48") |
| self.do_test("fc00:1234::/32", "2001:db8:1::/32") |
| self.do_test("fc00:1234::/63", "2001:db8:1::/56") |
| |
| def test_npt66_icmp6(self): |
| """Send and receive a packet through NPT66""" |
| |
| # Test ICMP6 error packets |
| self.do_test( |
| "fd00:0000:0000::/48", "2001:4650:c3ed::/48", reply_icmp_error=True |
| ) |
| self.do_test("fc00:1::/48", "2001:db8:1::/48", reply_icmp_error=True) |
| self.do_test("fc00:1234::/32", "2001:db8:1::/32", reply_icmp_error=True) |
| self.do_test("fc00:1234::/63", "2001:db8:1::/56", reply_icmp_error=True) |
| |
| |
| if __name__ == "__main__": |
| unittest.main(testRunner=VppTestRunner) |