blob: ef5b3a107dcfe28dea7e12bb53e12890aa1887d1 [file] [log] [blame]
Ole Troan18327be2021-01-12 21:49:38 +01001#!/usr/bin/env python3
2"""Policy 1:1 NAT functional tests"""
3
4import unittest
5from scapy.layers.inet import Ether, IP, UDP, ICMP
Dave Wallace8800f732023-08-31 00:47:44 -04006from framework import VppTestCase
7from asfframework import VppTestRunner
Ole Troan18327be2021-01-12 21:49:38 +01008from vpp_papi import VppEnum
Dmitry Valter34fa0ce2024-03-11 10:38:46 +00009from config import config
Ole Troan18327be2021-01-12 21:49:38 +010010
11
Dmitry Valter34fa0ce2024-03-11 10:38:46 +000012@unittest.skipIf("nat" in config.excluded_plugins, "Exclude NAT plugin tests")
Ole Troan18327be2021-01-12 21:49:38 +010013class TestPNAT(VppTestCase):
Klement Sekerad9b0c6f2022-04-26 19:02:15 +020014 """PNAT Test Case"""
15
Ole Troan18327be2021-01-12 21:49:38 +010016 maxDiff = None
17
18 @classmethod
19 def setUpClass(cls):
20 super(TestPNAT, cls).setUpClass()
21 cls.create_pg_interfaces(range(2))
22 cls.interfaces = list(cls.pg_interfaces)
23
24 @classmethod
25 def tearDownClass(cls):
26 super(TestPNAT, cls).tearDownClass()
27
28 def setUp(self):
29 super(TestPNAT, self).setUp()
30 for i in self.interfaces:
31 i.admin_up()
32 i.config_ip4()
33 i.resolve_arp()
34
35 def tearDown(self):
36 super(TestPNAT, self).tearDown()
37 if not self.vpp_dead:
38 for i in self.pg_interfaces:
39 i.unconfig_ip4()
40 i.admin_down()
41
42 def validate(self, rx, expected):
Dave Wallacecf9356d2024-07-23 01:28:19 -040043 self.assertTrue(bytes(rx), bytes(expected))
Ole Troan18327be2021-01-12 21:49:38 +010044
45 def validate_bytes(self, rx, expected):
46 self.assertEqual(rx, expected)
47
48 def ping_check(self):
Klement Sekerad9b0c6f2022-04-26 19:02:15 +020049 """Verify non matching traffic works."""
Ole Troan18327be2021-01-12 21:49:38 +010050 p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
51
Klement Sekerad9b0c6f2022-04-26 19:02:15 +020052 icmpecho = IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) / ICMP()
53 reply = IP(src=self.pg0.local_ip4, dst=self.pg0.remote_ip4) / ICMP(
54 type="echo-reply"
55 )
56 rx = self.send_and_expect(self.pg0, p_ether / icmpecho * 1, self.pg0)
Ole Troan18327be2021-01-12 21:49:38 +010057 for p in rx:
58 reply[IP].id = p[IP].id
59 self.validate(p[1], reply)
60
61 def test_pnat(self):
Klement Sekerad9b0c6f2022-04-26 19:02:15 +020062 """PNAT test"""
Ole Troan18327be2021-01-12 21:49:38 +010063
64 PNAT_IP4_INPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_INPUT
Klement Sekerad9b0c6f2022-04-26 19:02:15 +020065 PNAT_IP4_OUTPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_OUTPUT
Ole Troan18327be2021-01-12 21:49:38 +010066
67 tests = [
68 {
Klement Sekerad9b0c6f2022-04-26 19:02:15 +020069 "input": PNAT_IP4_INPUT,
70 "sw_if_index": self.pg0.sw_if_index,
71 "match": {
72 "mask": 0xA,
73 "dst": "10.10.10.10",
74 "proto": 17,
75 "dport": 6871,
76 },
77 "rewrite": {"mask": 0x2, "dst": self.pg1.remote_ip4},
78 "send": (
79 IP(src=self.pg0.remote_ip4, dst="10.10.10.10") / UDP(dport=6871)
80 ),
81 "reply": (
82 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
83 / UDP(dport=6871)
84 ),
Ole Troan18327be2021-01-12 21:49:38 +010085 },
86 {
Klement Sekerad9b0c6f2022-04-26 19:02:15 +020087 "input": PNAT_IP4_OUTPUT,
88 "sw_if_index": self.pg1.sw_if_index,
89 "match": {
90 "mask": 0x9,
91 "src": self.pg0.remote_ip4,
92 "proto": 17,
93 "dport": 6871,
94 },
95 "rewrite": {"mask": 0x1, "src": "11.11.11.11"},
96 "send": (
97 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
98 / UDP(dport=6871)
99 ),
100 "reply": (
101 IP(src="11.11.11.11", dst=self.pg1.remote_ip4) / UDP(dport=6871)
102 ),
Ole Troan18327be2021-01-12 21:49:38 +0100103 },
104 {
Klement Sekerad9b0c6f2022-04-26 19:02:15 +0200105 "input": PNAT_IP4_INPUT,
106 "sw_if_index": self.pg0.sw_if_index,
107 "match": {
108 "mask": 0xA,
109 "dst": "10.10.10.10",
110 "proto": 17,
111 "dport": 6871,
112 },
113 "rewrite": {"mask": 0xA, "dst": self.pg1.remote_ip4, "dport": 5555},
114 "send": (
115 IP(src=self.pg0.remote_ip4, dst="10.10.10.10")
116 / UDP(sport=65530, dport=6871)
117 ),
118 "reply": (
119 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
120 / UDP(sport=65530, dport=5555)
121 ),
Ole Troan18327be2021-01-12 21:49:38 +0100122 },
123 {
Klement Sekerad9b0c6f2022-04-26 19:02:15 +0200124 "input": PNAT_IP4_INPUT,
125 "sw_if_index": self.pg0.sw_if_index,
126 "match": {
127 "mask": 0xA,
128 "dst": self.pg1.remote_ip4,
129 "proto": 17,
130 "dport": 6871,
131 },
132 "rewrite": {"mask": 0x8, "dport": 5555},
133 "send": (
134 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
135 / UDP(dport=6871, chksum=0)
136 ),
137 "reply": (
138 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
139 / UDP(dport=5555, chksum=0)
140 ),
Ole Troan18327be2021-01-12 21:49:38 +0100141 },
142 {
Klement Sekerad9b0c6f2022-04-26 19:02:15 +0200143 "input": PNAT_IP4_INPUT,
144 "sw_if_index": self.pg0.sw_if_index,
145 "match": {"mask": 0x2, "dst": self.pg1.remote_ip4, "proto": 1},
146 "rewrite": {"mask": 0x1, "src": "8.8.8.8"},
147 "send": (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / ICMP()),
148 "reply": IP(src="8.8.8.8", dst=self.pg1.remote_ip4) / ICMP(),
Ole Troan18327be2021-01-12 21:49:38 +0100149 },
150 ]
151
152 p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
153 for t in tests:
Klement Sekerad9b0c6f2022-04-26 19:02:15 +0200154 rv = self.vapi.pnat_binding_add(match=t["match"], rewrite=t["rewrite"])
155 self.vapi.pnat_binding_attach(
156 sw_if_index=t["sw_if_index"],
157 attachment=t["input"],
158 binding_index=rv.binding_index,
159 )
Ole Troan18327be2021-01-12 21:49:38 +0100160
Klement Sekerad9b0c6f2022-04-26 19:02:15 +0200161 reply = t["reply"]
Ole Troan18327be2021-01-12 21:49:38 +0100162 reply[IP].ttl -= 1
Klement Sekerad9b0c6f2022-04-26 19:02:15 +0200163 rx = self.send_and_expect(self.pg0, p_ether / t["send"] * 1, self.pg1)
Ole Troan18327be2021-01-12 21:49:38 +0100164 for p in rx:
165 # p.show2()
166 self.validate(p[1], reply)
167
168 self.ping_check()
169
Klement Sekerad9b0c6f2022-04-26 19:02:15 +0200170 self.vapi.pnat_binding_detach(
171 sw_if_index=t["sw_if_index"],
172 attachment=t["input"],
173 binding_index=rv.binding_index,
174 )
Ole Troan18327be2021-01-12 21:49:38 +0100175 self.vapi.pnat_binding_del(binding_index=rv.binding_index)
176
177 def test_pnat_show(self):
Klement Sekerad9b0c6f2022-04-26 19:02:15 +0200178 """PNAT show tests"""
Ole Troan18327be2021-01-12 21:49:38 +0100179
180 PNAT_IP4_INPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_INPUT
Klement Sekerad9b0c6f2022-04-26 19:02:15 +0200181 PNAT_IP4_OUTPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_OUTPUT
Ole Troan18327be2021-01-12 21:49:38 +0100182
183 tests = [
184 {
Klement Sekerad9b0c6f2022-04-26 19:02:15 +0200185 "input": PNAT_IP4_INPUT,
186 "sw_if_index": self.pg0.sw_if_index,
187 "match": {
188 "mask": 0xA,
189 "dst": "10.10.10.10",
190 "proto": 17,
191 "dport": 6871,
192 },
193 "rewrite": {"mask": 0x2, "dst": self.pg1.remote_ip4},
194 "send": (
195 IP(src=self.pg0.remote_ip4, dst="10.10.10.10") / UDP(dport=6871)
196 ),
197 "reply": (
198 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
199 / UDP(dport=6871)
200 ),
Ole Troan18327be2021-01-12 21:49:38 +0100201 },
202 {
Klement Sekerad9b0c6f2022-04-26 19:02:15 +0200203 "input": PNAT_IP4_OUTPUT,
204 "sw_if_index": self.pg1.sw_if_index,
205 "match": {
206 "mask": 0x9,
207 "src": self.pg0.remote_ip4,
208 "proto": 17,
209 "dport": 6871,
210 },
211 "rewrite": {"mask": 0x1, "src": "11.11.11.11"},
212 "send": (
213 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
214 / UDP(dport=6871)
215 ),
216 "reply": (
217 IP(src="11.11.11.11", dst=self.pg1.remote_ip4) / UDP(dport=6871)
218 ),
Ole Troan18327be2021-01-12 21:49:38 +0100219 },
220 ]
221 binding_index = []
222 for t in tests:
Klement Sekerad9b0c6f2022-04-26 19:02:15 +0200223 rv = self.vapi.pnat_binding_add(match=t["match"], rewrite=t["rewrite"])
Ole Troan18327be2021-01-12 21:49:38 +0100224 binding_index.append(rv.binding_index)
Klement Sekerad9b0c6f2022-04-26 19:02:15 +0200225 self.vapi.pnat_binding_attach(
226 sw_if_index=t["sw_if_index"],
227 attachment=t["input"],
228 binding_index=rv.binding_index,
229 )
Ole Troan18327be2021-01-12 21:49:38 +0100230
231 rv, l = self.vapi.pnat_bindings_get()
232 self.assertEqual(len(l), len(tests))
233
234 rv, l = self.vapi.pnat_interfaces_get()
235 self.assertEqual(len(l), 2)
236
237 self.logger.info(self.vapi.cli("show pnat translations"))
238 self.logger.info(self.vapi.cli("show pnat interfaces"))
239
240 for i, t in enumerate(tests):
Klement Sekerad9b0c6f2022-04-26 19:02:15 +0200241 self.vapi.pnat_binding_detach(
242 sw_if_index=t["sw_if_index"],
243 attachment=t["input"],
244 binding_index=binding_index[i],
245 )
Ole Troan18327be2021-01-12 21:49:38 +0100246 self.vapi.pnat_binding_del(binding_index=binding_index[i])
247
Fahad Naeem0891b6a2022-05-10 01:03:52 -0500248 def test_pnat_wildcard_proto(self):
249 """
250 PNAT test wildcard IP protocol, PNAT_PROTO for mask should be set by
251 handler
252 """
253
254 PNAT_IP4_INPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_INPUT
Alexander Chernavind0e0e062022-05-13 08:34:34 +0000255 PNAT_IP4_OUTPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_OUTPUT
Fahad Naeem0891b6a2022-05-10 01:03:52 -0500256
257 tests = [
258 {
Alexander Chernavind0e0e062022-05-13 08:34:34 +0000259 "input": PNAT_IP4_INPUT,
260 "sw_if_index": self.pg0.sw_if_index,
261 "match": {"mask": 0x2, "dst": "10.10.10.10"},
262 "rewrite": {"mask": 0x2, "dst": self.pg1.remote_ip4},
263 "send": (IP(src=self.pg0.remote_ip4, dst="10.10.10.10")),
264 "reply": (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)),
Fahad Naeem0891b6a2022-05-10 01:03:52 -0500265 },
266 {
Alexander Chernavind0e0e062022-05-13 08:34:34 +0000267 "input": PNAT_IP4_OUTPUT,
268 "sw_if_index": self.pg1.sw_if_index,
269 "match": {"mask": 0x1, "src": self.pg0.remote_ip4},
270 "rewrite": {"mask": 0x1, "src": "11.11.11.11"},
271 "send": (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)),
272 "reply": (IP(src="11.11.11.11", dst=self.pg1.remote_ip4)),
Fahad Naeem0891b6a2022-05-10 01:03:52 -0500273 },
274 ]
275
276 p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
277 for t in tests:
Alexander Chernavind0e0e062022-05-13 08:34:34 +0000278 rv = self.vapi.pnat_binding_add(match=t["match"], rewrite=t["rewrite"])
279 self.vapi.pnat_binding_attach(
280 sw_if_index=t["sw_if_index"],
281 attachment=t["input"],
282 binding_index=rv.binding_index,
283 )
Fahad Naeem0891b6a2022-05-10 01:03:52 -0500284
Alexander Chernavind0e0e062022-05-13 08:34:34 +0000285 reply = t["reply"]
Fahad Naeem0891b6a2022-05-10 01:03:52 -0500286 reply[IP].ttl -= 1
Alexander Chernavind0e0e062022-05-13 08:34:34 +0000287 rx = self.send_and_expect(self.pg0, p_ether / t["send"] * 1, self.pg1)
Fahad Naeem0891b6a2022-05-10 01:03:52 -0500288 for p in rx:
289 self.validate(p[1], reply)
290
291 self.ping_check()
292
Alexander Chernavind0e0e062022-05-13 08:34:34 +0000293 self.vapi.pnat_binding_detach(
294 sw_if_index=t["sw_if_index"],
295 attachment=t["input"],
296 binding_index=rv.binding_index,
297 )
Fahad Naeem0891b6a2022-05-10 01:03:52 -0500298 self.vapi.pnat_binding_del(binding_index=rv.binding_index)
299
300 def test_pnat_wildcard_proto_v2(self):
Alexander Chernavind0e0e062022-05-13 08:34:34 +0000301 """PNAT test wildcard IP protocol using pnat_binding_add_v2"""
Fahad Naeem0891b6a2022-05-10 01:03:52 -0500302
303 PNAT_IP4_INPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_INPUT
Alexander Chernavind0e0e062022-05-13 08:34:34 +0000304 PNAT_IP4_OUTPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_OUTPUT
Fahad Naeem0891b6a2022-05-10 01:03:52 -0500305
306 tests = [
307 {
Alexander Chernavind0e0e062022-05-13 08:34:34 +0000308 "input": PNAT_IP4_INPUT,
309 "sw_if_index": self.pg0.sw_if_index,
310 "match": {"mask": 0x42, "dst": "10.10.10.10"},
311 "rewrite": {"mask": 0x42, "dst": self.pg1.remote_ip4},
312 "send": (IP(src=self.pg0.remote_ip4, dst="10.10.10.10")),
313 "reply": (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)),
Fahad Naeem0891b6a2022-05-10 01:03:52 -0500314 },
315 {
Alexander Chernavind0e0e062022-05-13 08:34:34 +0000316 "input": PNAT_IP4_OUTPUT,
317 "sw_if_index": self.pg1.sw_if_index,
318 "match": {"mask": 0x41, "src": self.pg0.remote_ip4},
319 "rewrite": {"mask": 0x41, "src": "11.11.11.11"},
320 "send": (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)),
321 "reply": (IP(src="11.11.11.11", dst=self.pg1.remote_ip4)),
Fahad Naeem0891b6a2022-05-10 01:03:52 -0500322 },
323 ]
324
325 p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
326 for t in tests:
Alexander Chernavind0e0e062022-05-13 08:34:34 +0000327 rv = self.vapi.pnat_binding_add_v2(match=t["match"], rewrite=t["rewrite"])
328 self.vapi.pnat_binding_attach(
329 sw_if_index=t["sw_if_index"],
330 attachment=t["input"],
331 binding_index=rv.binding_index,
332 )
Fahad Naeem0891b6a2022-05-10 01:03:52 -0500333
Alexander Chernavind0e0e062022-05-13 08:34:34 +0000334 reply = t["reply"]
Fahad Naeem0891b6a2022-05-10 01:03:52 -0500335 reply[IP].ttl -= 1
Alexander Chernavind0e0e062022-05-13 08:34:34 +0000336 rx = self.send_and_expect(self.pg0, p_ether / t["send"] * 1, self.pg1)
Fahad Naeem0891b6a2022-05-10 01:03:52 -0500337 for p in rx:
338 self.validate(p[1], reply)
339
340 self.ping_check()
341
Alexander Chernavind0e0e062022-05-13 08:34:34 +0000342 self.vapi.pnat_binding_detach(
343 sw_if_index=t["sw_if_index"],
344 attachment=t["input"],
345 binding_index=rv.binding_index,
346 )
Fahad Naeem0891b6a2022-05-10 01:03:52 -0500347 self.vapi.pnat_binding_del(binding_index=rv.binding_index)
348
Klement Sekerad9b0c6f2022-04-26 19:02:15 +0200349
350if __name__ == "__main__":
Ole Troan18327be2021-01-12 21:49:38 +0100351 unittest.main(testRunner=VppTestRunner)