blob: 54f9321d44e841450fbb224d3d4c88737382e701 [file] [log] [blame]
Ole Troan96068d62020-07-01 13:16:16 +02001#!/usr/bin/env python3
2
Ole Troan96068d62020-07-01 13:16:16 +02003import unittest
Ole Troan96068d62020-07-01 13:16:16 +02004
Dave Wallace8800f732023-08-31 00:47:44 -04005from framework import VppTestCase
6from asfframework import VppTestRunner
7from scapy.layers.inet import IP, TCP, UDP
Klement Sekerad9b0c6f2022-04-26 19:02:15 +02008from scapy.layers.inet6 import (
9 IPv6,
10 ICMPv6EchoRequest,
11 ICMPv6EchoReply,
Klement Sekerad9b0c6f2022-04-26 19:02:15 +020012)
Dave Wallace8800f732023-08-31 00:47:44 -040013from scapy.layers.l2 import Ether, GRE
14from util import ppp
Ole Troan96068d62020-07-01 13:16:16 +020015from vpp_papi import VppEnum
Dmitry Valter34fa0ce2024-03-11 10:38:46 +000016from config import config
Ole Troan96068d62020-07-01 13:16:16 +020017
18
Dmitry Valter34fa0ce2024-03-11 10:38:46 +000019@unittest.skipIf("nat" in config.excluded_plugins, "Exclude NAT plugin tests")
Filip Vargaed2ee5e2021-03-23 12:57:58 +010020class TestNAT66(VppTestCase):
Klement Sekerad9b0c6f2022-04-26 19:02:15 +020021 """NAT66 Test Cases"""
Ole Troan96068d62020-07-01 13:16:16 +020022
23 @classmethod
24 def setUpClass(cls):
25 super(TestNAT66, cls).setUpClass()
26
Klement Sekerad9b0c6f2022-04-26 19:02:15 +020027 cls.nat_addr = "fd01:ff::2"
Ole Troan96068d62020-07-01 13:16:16 +020028 cls.create_pg_interfaces(range(2))
29 cls.interfaces = list(cls.pg_interfaces)
30
31 for i in cls.interfaces:
32 i.admin_up()
33 i.config_ip6()
34 i.configure_ipv6_neighbors()
35
Filip Vargaed2ee5e2021-03-23 12:57:58 +010036 @property
37 def config_flags(self):
38 return VppEnum.vl_api_nat_config_flags_t
39
40 def plugin_enable(self):
41 self.vapi.nat66_plugin_enable_disable(enable=1)
42
43 def plugin_disable(self):
44 self.vapi.nat66_plugin_enable_disable(enable=0)
45
46 def setUp(self):
47 super(TestNAT66, self).setUp()
48 self.plugin_enable()
49
50 def tearDown(self):
51 super(TestNAT66, self).tearDown()
52 if not self.vpp_dead:
53 self.plugin_disable()
Ole Troan96068d62020-07-01 13:16:16 +020054
55 def test_static(self):
Klement Sekerad9b0c6f2022-04-26 19:02:15 +020056 """1:1 NAT66 test"""
Ole Troan96068d62020-07-01 13:16:16 +020057 flags = self.config_flags.NAT_IS_INSIDE
Klement Sekerad9b0c6f2022-04-26 19:02:15 +020058 self.vapi.nat66_add_del_interface(
59 is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
60 )
61 self.vapi.nat66_add_del_interface(is_add=1, sw_if_index=self.pg1.sw_if_index)
Ole Troan96068d62020-07-01 13:16:16 +020062 self.vapi.nat66_add_del_static_mapping(
63 local_ip_address=self.pg0.remote_ip6,
64 external_ip_address=self.nat_addr,
Klement Sekerad9b0c6f2022-04-26 19:02:15 +020065 is_add=1,
66 )
Ole Troan96068d62020-07-01 13:16:16 +020067
68 # in2out
69 pkts = []
Klement Sekerad9b0c6f2022-04-26 19:02:15 +020070 p = (
71 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
72 / IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6)
73 / TCP()
74 )
Ole Troan96068d62020-07-01 13:16:16 +020075 pkts.append(p)
Klement Sekerad9b0c6f2022-04-26 19:02:15 +020076 p = (
77 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
78 / IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6)
79 / UDP()
80 )
Ole Troan96068d62020-07-01 13:16:16 +020081 pkts.append(p)
Klement Sekerad9b0c6f2022-04-26 19:02:15 +020082 p = (
83 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
84 / IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6)
85 / ICMPv6EchoRequest()
86 )
Ole Troan96068d62020-07-01 13:16:16 +020087 pkts.append(p)
Klement Sekerad9b0c6f2022-04-26 19:02:15 +020088 p = (
89 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
90 / IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6)
91 / GRE()
92 / IP()
93 / TCP()
94 )
Ole Troan96068d62020-07-01 13:16:16 +020095 pkts.append(p)
96 self.pg0.add_stream(pkts)
97 self.pg_enable_capture(self.pg_interfaces)
98 self.pg_start()
99 capture = self.pg1.get_capture(len(pkts))
100
101 for packet in capture:
102 try:
103 self.assertEqual(packet[IPv6].src, self.nat_addr)
104 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
105 self.assert_packet_checksums_valid(packet)
106 except:
107 self.logger.error(ppp("Unexpected or invalid packet:", packet))
108 raise
109
110 # out2in
111 pkts = []
Klement Sekerad9b0c6f2022-04-26 19:02:15 +0200112 p = (
113 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
114 / IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr)
115 / TCP()
116 )
Ole Troan96068d62020-07-01 13:16:16 +0200117 pkts.append(p)
Klement Sekerad9b0c6f2022-04-26 19:02:15 +0200118 p = (
119 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
120 / IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr)
121 / UDP()
122 )
Ole Troan96068d62020-07-01 13:16:16 +0200123 pkts.append(p)
Klement Sekerad9b0c6f2022-04-26 19:02:15 +0200124 p = (
125 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
126 / IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr)
127 / ICMPv6EchoReply()
128 )
Ole Troan96068d62020-07-01 13:16:16 +0200129 pkts.append(p)
Klement Sekerad9b0c6f2022-04-26 19:02:15 +0200130 p = (
131 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
132 / IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr)
133 / GRE()
134 / IP()
135 / TCP()
136 )
Ole Troan96068d62020-07-01 13:16:16 +0200137 pkts.append(p)
138 self.pg1.add_stream(pkts)
139 self.pg_enable_capture(self.pg_interfaces)
140 self.pg_start()
141 capture = self.pg0.get_capture(len(pkts))
142 for packet in capture:
143 try:
144 self.assertEqual(packet[IPv6].src, self.pg1.remote_ip6)
145 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
146 self.assert_packet_checksums_valid(packet)
147 except:
148 self.logger.error(ppp("Unexpected or invalid packet:", packet))
149 raise
150
151 sm = self.vapi.nat66_static_mapping_dump()
152 self.assertEqual(len(sm), 1)
153 self.assertEqual(sm[0].total_pkts, 8)
154
155 def test_check_no_translate(self):
Klement Sekerad9b0c6f2022-04-26 19:02:15 +0200156 """NAT66 translate only when egress interface is outside interface"""
Ole Troan96068d62020-07-01 13:16:16 +0200157 flags = self.config_flags.NAT_IS_INSIDE
Klement Sekerad9b0c6f2022-04-26 19:02:15 +0200158 self.vapi.nat66_add_del_interface(
159 is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
160 )
161 self.vapi.nat66_add_del_interface(
162 is_add=1, flags=flags, sw_if_index=self.pg1.sw_if_index
163 )
Ole Troan96068d62020-07-01 13:16:16 +0200164 self.vapi.nat66_add_del_static_mapping(
165 local_ip_address=self.pg0.remote_ip6,
166 external_ip_address=self.nat_addr,
Klement Sekerad9b0c6f2022-04-26 19:02:15 +0200167 is_add=1,
168 )
Ole Troan96068d62020-07-01 13:16:16 +0200169
170 # in2out
Klement Sekerad9b0c6f2022-04-26 19:02:15 +0200171 p = (
172 Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
173 / IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6)
174 / UDP()
175 )
Ole Troan96068d62020-07-01 13:16:16 +0200176 self.pg0.add_stream([p])
177 self.pg_enable_capture(self.pg_interfaces)
178 self.pg_start()
179 capture = self.pg1.get_capture(1)
180 packet = capture[0]
181 try:
182 self.assertEqual(packet[IPv6].src, self.pg0.remote_ip6)
183 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
184 except:
185 self.logger.error(ppp("Unexpected or invalid packet:", packet))
186 raise
187
Ole Troan96068d62020-07-01 13:16:16 +0200188
Klement Sekerad9b0c6f2022-04-26 19:02:15 +0200189if __name__ == "__main__":
Ole Troan96068d62020-07-01 13:16:16 +0200190 unittest.main(testRunner=VppTestRunner)