blob: 307dbabc39b941210cba5649e6549c1e0b16af05 [file] [log] [blame]
#!/usr/bin/env python3
import unittest
import ipaddress
from framework import VppTestCase
from asfframework import VppTestRunner
from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6DestUnreach
from scapy.layers.l2 import Ether
from scapy.packet import Raw
class TestNPT66(VppTestCase):
"""NPTv6 Test Case"""
extra_vpp_plugin_config = [
"plugin npt66_plugin.so {enable}",
]
def setUp(self):
super(TestNPT66, self).setUp()
# create 2 pg interfaces
self.create_pg_interfaces(range(2))
for i in self.pg_interfaces:
i.admin_up()
i.config_ip6()
i.resolve_ndp()
def tearDown(self):
for i in self.pg_interfaces:
i.unconfig_ip6()
i.admin_down()
super(TestNPT66, self).tearDown()
def send_and_verify(self, internal, reply_icmp_error=False):
sendif = self.pg0
recvif = self.pg1
local_mac = self.pg0.local_mac
remote_mac = self.pg0.remote_mac
src = ipaddress.ip_interface(internal).ip + 1
dst = self.pg1.remote_ip6
p = (
Ether(dst=local_mac, src=remote_mac)
/ IPv6(src=src, dst=dst)
/ ICMPv6EchoRequest()
/ Raw(b"Request")
)
# print('Sending packet')
# p.show2()
rxs = self.send_and_expect(sendif, p, recvif)
for rx in rxs:
# print('Received packet')
# rx.show2()
original_cksum = rx[ICMPv6EchoRequest].cksum
del rx[ICMPv6EchoRequest].cksum
rx = rx.__class__(bytes(rx))
self.assertEqual(original_cksum, rx[ICMPv6EchoRequest].cksum)
# Generate a replies
if reply_icmp_error:
# print('Generating an ICMP error message')
reply = (
Ether(dst=rx[Ether].src, src=local_mac)
/ IPv6(src=rx[IPv6].dst, dst=rx[IPv6].src)
/ ICMPv6DestUnreach()
/ rx[IPv6]
)
# print('Sending ICMP error message reply')
# reply.show2()
replies = self.send_and_expect(recvif, reply, sendif)
for r in replies:
# print('Received ICMP error message reply on the other side')
# r.show2()
self.assertEqual(str(p[IPv6].src), r[IPv6].dst)
original_cksum = r[ICMPv6EchoRequest].cksum
del r[ICMPv6EchoRequest].cksum
r = r.__class__(bytes(r))
self.assertEqual(original_cksum, r[ICMPv6EchoRequest].cksum)
else:
reply = (
Ether(dst=rx[Ether].src, src=local_mac)
/ IPv6(src=rx[IPv6].dst, dst=rx[IPv6].src)
/ ICMPv6EchoRequest()
/ Raw(b"Reply")
)
replies = self.send_and_expect(recvif, reply, sendif)
for r in replies:
# r.show2()
self.assertEqual(str(p[IPv6].src), r[IPv6].dst)
original_cksum = r[ICMPv6EchoRequest].cksum
del r[ICMPv6EchoRequest].cksum
r = r.__class__(bytes(r))
self.assertEqual(original_cksum, r[ICMPv6EchoRequest].cksum)
def do_test(self, internal, external, reply_icmp_error=False):
"""Add NPT66 binding and send packet"""
self.vapi.npt66_binding_add_del(
sw_if_index=self.pg1.sw_if_index,
internal=internal,
external=external,
is_add=True,
)
## TODO use route api
self.vapi.cli(f"ip route add {internal} via {self.pg0.remote_ip6}")
self.send_and_verify(internal, reply_icmp_error=reply_icmp_error)
self.vapi.npt66_binding_add_del(
sw_if_index=self.pg1.sw_if_index,
internal=internal,
external=external,
is_add=False,
)
def test_npt66_simple(self):
"""Send and receive a packet through NPT66"""
self.do_test("fd00:0000:0000::/48", "2001:4650:c3ed::/48")
self.do_test("fc00:1::/48", "2001:db8:1::/48")
self.do_test("fc00:1234::/32", "2001:db8:1::/32")
self.do_test("fc00:1234::/63", "2001:db8:1::/56")
def test_npt66_icmp6(self):
"""Send and receive a packet through NPT66"""
# Test ICMP6 error packets
self.do_test(
"fd00:0000:0000::/48", "2001:4650:c3ed::/48", reply_icmp_error=True
)
self.do_test("fc00:1::/48", "2001:db8:1::/48", reply_icmp_error=True)
self.do_test("fc00:1234::/32", "2001:db8:1::/32", reply_icmp_error=True)
self.do_test("fc00:1234::/63", "2001:db8:1::/56", reply_icmp_error=True)
if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)