blob: 970249489d35894b532bd7c6ff66f6e84de1c8d9 [file] [log] [blame]
Ole Troan18327be2021-01-12 21:49:38 +01001#!/usr/bin/env python3
2"""Policy 1:1 NAT functional tests"""
3
4import unittest
5from scapy.layers.inet import Ether, IP, UDP, ICMP
6from framework import VppTestCase, VppTestRunner
7from vpp_papi import VppEnum
8
9
10class TestPNAT(VppTestCase):
Klement Sekerad9b0c6f2022-04-26 19:02:15 +020011 """PNAT Test Case"""
12
Ole Troan18327be2021-01-12 21:49:38 +010013 maxDiff = None
14
15 @classmethod
16 def setUpClass(cls):
17 super(TestPNAT, cls).setUpClass()
18 cls.create_pg_interfaces(range(2))
19 cls.interfaces = list(cls.pg_interfaces)
20
21 @classmethod
22 def tearDownClass(cls):
23 super(TestPNAT, cls).tearDownClass()
24
25 def setUp(self):
26 super(TestPNAT, self).setUp()
27 for i in self.interfaces:
28 i.admin_up()
29 i.config_ip4()
30 i.resolve_arp()
31
32 def tearDown(self):
33 super(TestPNAT, self).tearDown()
34 if not self.vpp_dead:
35 for i in self.pg_interfaces:
36 i.unconfig_ip4()
37 i.admin_down()
38
39 def validate(self, rx, expected):
40 self.assertEqual(rx, expected.__class__(expected))
41
42 def validate_bytes(self, rx, expected):
43 self.assertEqual(rx, expected)
44
45 def ping_check(self):
Klement Sekerad9b0c6f2022-04-26 19:02:15 +020046 """Verify non matching traffic works."""
Ole Troan18327be2021-01-12 21:49:38 +010047 p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
48
Klement Sekerad9b0c6f2022-04-26 19:02:15 +020049 icmpecho = IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) / ICMP()
50 reply = IP(src=self.pg0.local_ip4, dst=self.pg0.remote_ip4) / ICMP(
51 type="echo-reply"
52 )
53 rx = self.send_and_expect(self.pg0, p_ether / icmpecho * 1, self.pg0)
Ole Troan18327be2021-01-12 21:49:38 +010054 for p in rx:
55 reply[IP].id = p[IP].id
56 self.validate(p[1], reply)
57
58 def test_pnat(self):
Klement Sekerad9b0c6f2022-04-26 19:02:15 +020059 """PNAT test"""
Ole Troan18327be2021-01-12 21:49:38 +010060
61 PNAT_IP4_INPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_INPUT
Klement Sekerad9b0c6f2022-04-26 19:02:15 +020062 PNAT_IP4_OUTPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_OUTPUT
Ole Troan18327be2021-01-12 21:49:38 +010063
64 tests = [
65 {
Klement Sekerad9b0c6f2022-04-26 19:02:15 +020066 "input": PNAT_IP4_INPUT,
67 "sw_if_index": self.pg0.sw_if_index,
68 "match": {
69 "mask": 0xA,
70 "dst": "10.10.10.10",
71 "proto": 17,
72 "dport": 6871,
73 },
74 "rewrite": {"mask": 0x2, "dst": self.pg1.remote_ip4},
75 "send": (
76 IP(src=self.pg0.remote_ip4, dst="10.10.10.10") / UDP(dport=6871)
77 ),
78 "reply": (
79 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
80 / UDP(dport=6871)
81 ),
Ole Troan18327be2021-01-12 21:49:38 +010082 },
83 {
Klement Sekerad9b0c6f2022-04-26 19:02:15 +020084 "input": PNAT_IP4_OUTPUT,
85 "sw_if_index": self.pg1.sw_if_index,
86 "match": {
87 "mask": 0x9,
88 "src": self.pg0.remote_ip4,
89 "proto": 17,
90 "dport": 6871,
91 },
92 "rewrite": {"mask": 0x1, "src": "11.11.11.11"},
93 "send": (
94 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
95 / UDP(dport=6871)
96 ),
97 "reply": (
98 IP(src="11.11.11.11", dst=self.pg1.remote_ip4) / UDP(dport=6871)
99 ),
Ole Troan18327be2021-01-12 21:49:38 +0100100 },
101 {
Klement Sekerad9b0c6f2022-04-26 19:02:15 +0200102 "input": PNAT_IP4_INPUT,
103 "sw_if_index": self.pg0.sw_if_index,
104 "match": {
105 "mask": 0xA,
106 "dst": "10.10.10.10",
107 "proto": 17,
108 "dport": 6871,
109 },
110 "rewrite": {"mask": 0xA, "dst": self.pg1.remote_ip4, "dport": 5555},
111 "send": (
112 IP(src=self.pg0.remote_ip4, dst="10.10.10.10")
113 / UDP(sport=65530, dport=6871)
114 ),
115 "reply": (
116 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
117 / UDP(sport=65530, dport=5555)
118 ),
Ole Troan18327be2021-01-12 21:49:38 +0100119 },
120 {
Klement Sekerad9b0c6f2022-04-26 19:02:15 +0200121 "input": PNAT_IP4_INPUT,
122 "sw_if_index": self.pg0.sw_if_index,
123 "match": {
124 "mask": 0xA,
125 "dst": self.pg1.remote_ip4,
126 "proto": 17,
127 "dport": 6871,
128 },
129 "rewrite": {"mask": 0x8, "dport": 5555},
130 "send": (
131 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
132 / UDP(dport=6871, chksum=0)
133 ),
134 "reply": (
135 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
136 / UDP(dport=5555, chksum=0)
137 ),
Ole Troan18327be2021-01-12 21:49:38 +0100138 },
139 {
Klement Sekerad9b0c6f2022-04-26 19:02:15 +0200140 "input": PNAT_IP4_INPUT,
141 "sw_if_index": self.pg0.sw_if_index,
142 "match": {"mask": 0x2, "dst": self.pg1.remote_ip4, "proto": 1},
143 "rewrite": {"mask": 0x1, "src": "8.8.8.8"},
144 "send": (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / ICMP()),
145 "reply": IP(src="8.8.8.8", dst=self.pg1.remote_ip4) / ICMP(),
Ole Troan18327be2021-01-12 21:49:38 +0100146 },
147 ]
148
149 p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
150 for t in tests:
Klement Sekerad9b0c6f2022-04-26 19:02:15 +0200151 rv = self.vapi.pnat_binding_add(match=t["match"], rewrite=t["rewrite"])
152 self.vapi.pnat_binding_attach(
153 sw_if_index=t["sw_if_index"],
154 attachment=t["input"],
155 binding_index=rv.binding_index,
156 )
Ole Troan18327be2021-01-12 21:49:38 +0100157
Klement Sekerad9b0c6f2022-04-26 19:02:15 +0200158 reply = t["reply"]
Ole Troan18327be2021-01-12 21:49:38 +0100159 reply[IP].ttl -= 1
Klement Sekerad9b0c6f2022-04-26 19:02:15 +0200160 rx = self.send_and_expect(self.pg0, p_ether / t["send"] * 1, self.pg1)
Ole Troan18327be2021-01-12 21:49:38 +0100161 for p in rx:
162 # p.show2()
163 self.validate(p[1], reply)
164
165 self.ping_check()
166
Klement Sekerad9b0c6f2022-04-26 19:02:15 +0200167 self.vapi.pnat_binding_detach(
168 sw_if_index=t["sw_if_index"],
169 attachment=t["input"],
170 binding_index=rv.binding_index,
171 )
Ole Troan18327be2021-01-12 21:49:38 +0100172 self.vapi.pnat_binding_del(binding_index=rv.binding_index)
173
174 def test_pnat_show(self):
Klement Sekerad9b0c6f2022-04-26 19:02:15 +0200175 """PNAT show tests"""
Ole Troan18327be2021-01-12 21:49:38 +0100176
177 PNAT_IP4_INPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_INPUT
Klement Sekerad9b0c6f2022-04-26 19:02:15 +0200178 PNAT_IP4_OUTPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_OUTPUT
Ole Troan18327be2021-01-12 21:49:38 +0100179
180 tests = [
181 {
Klement Sekerad9b0c6f2022-04-26 19:02:15 +0200182 "input": PNAT_IP4_INPUT,
183 "sw_if_index": self.pg0.sw_if_index,
184 "match": {
185 "mask": 0xA,
186 "dst": "10.10.10.10",
187 "proto": 17,
188 "dport": 6871,
189 },
190 "rewrite": {"mask": 0x2, "dst": self.pg1.remote_ip4},
191 "send": (
192 IP(src=self.pg0.remote_ip4, dst="10.10.10.10") / UDP(dport=6871)
193 ),
194 "reply": (
195 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
196 / UDP(dport=6871)
197 ),
Ole Troan18327be2021-01-12 21:49:38 +0100198 },
199 {
Klement Sekerad9b0c6f2022-04-26 19:02:15 +0200200 "input": PNAT_IP4_OUTPUT,
201 "sw_if_index": self.pg1.sw_if_index,
202 "match": {
203 "mask": 0x9,
204 "src": self.pg0.remote_ip4,
205 "proto": 17,
206 "dport": 6871,
207 },
208 "rewrite": {"mask": 0x1, "src": "11.11.11.11"},
209 "send": (
210 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
211 / UDP(dport=6871)
212 ),
213 "reply": (
214 IP(src="11.11.11.11", dst=self.pg1.remote_ip4) / UDP(dport=6871)
215 ),
Ole Troan18327be2021-01-12 21:49:38 +0100216 },
217 ]
218 binding_index = []
219 for t in tests:
Klement Sekerad9b0c6f2022-04-26 19:02:15 +0200220 rv = self.vapi.pnat_binding_add(match=t["match"], rewrite=t["rewrite"])
Ole Troan18327be2021-01-12 21:49:38 +0100221 binding_index.append(rv.binding_index)
Klement Sekerad9b0c6f2022-04-26 19:02:15 +0200222 self.vapi.pnat_binding_attach(
223 sw_if_index=t["sw_if_index"],
224 attachment=t["input"],
225 binding_index=rv.binding_index,
226 )
Ole Troan18327be2021-01-12 21:49:38 +0100227
228 rv, l = self.vapi.pnat_bindings_get()
229 self.assertEqual(len(l), len(tests))
230
231 rv, l = self.vapi.pnat_interfaces_get()
232 self.assertEqual(len(l), 2)
233
234 self.logger.info(self.vapi.cli("show pnat translations"))
235 self.logger.info(self.vapi.cli("show pnat interfaces"))
236
237 for i, t in enumerate(tests):
Klement Sekerad9b0c6f2022-04-26 19:02:15 +0200238 self.vapi.pnat_binding_detach(
239 sw_if_index=t["sw_if_index"],
240 attachment=t["input"],
241 binding_index=binding_index[i],
242 )
Ole Troan18327be2021-01-12 21:49:38 +0100243 self.vapi.pnat_binding_del(binding_index=binding_index[i])
244
Fahad Naeem0891b6a2022-05-10 01:03:52 -0500245 def test_pnat_wildcard_proto(self):
246 """
247 PNAT test wildcard IP protocol, PNAT_PROTO for mask should be set by
248 handler
249 """
250
251 PNAT_IP4_INPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_INPUT
Alexander Chernavind0e0e062022-05-13 08:34:34 +0000252 PNAT_IP4_OUTPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_OUTPUT
Fahad Naeem0891b6a2022-05-10 01:03:52 -0500253
254 tests = [
255 {
Alexander Chernavind0e0e062022-05-13 08:34:34 +0000256 "input": PNAT_IP4_INPUT,
257 "sw_if_index": self.pg0.sw_if_index,
258 "match": {"mask": 0x2, "dst": "10.10.10.10"},
259 "rewrite": {"mask": 0x2, "dst": self.pg1.remote_ip4},
260 "send": (IP(src=self.pg0.remote_ip4, dst="10.10.10.10")),
261 "reply": (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)),
Fahad Naeem0891b6a2022-05-10 01:03:52 -0500262 },
263 {
Alexander Chernavind0e0e062022-05-13 08:34:34 +0000264 "input": PNAT_IP4_OUTPUT,
265 "sw_if_index": self.pg1.sw_if_index,
266 "match": {"mask": 0x1, "src": self.pg0.remote_ip4},
267 "rewrite": {"mask": 0x1, "src": "11.11.11.11"},
268 "send": (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)),
269 "reply": (IP(src="11.11.11.11", dst=self.pg1.remote_ip4)),
Fahad Naeem0891b6a2022-05-10 01:03:52 -0500270 },
271 ]
272
273 p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
274 for t in tests:
Alexander Chernavind0e0e062022-05-13 08:34:34 +0000275 rv = self.vapi.pnat_binding_add(match=t["match"], rewrite=t["rewrite"])
276 self.vapi.pnat_binding_attach(
277 sw_if_index=t["sw_if_index"],
278 attachment=t["input"],
279 binding_index=rv.binding_index,
280 )
Fahad Naeem0891b6a2022-05-10 01:03:52 -0500281
Alexander Chernavind0e0e062022-05-13 08:34:34 +0000282 reply = t["reply"]
Fahad Naeem0891b6a2022-05-10 01:03:52 -0500283 reply[IP].ttl -= 1
Alexander Chernavind0e0e062022-05-13 08:34:34 +0000284 rx = self.send_and_expect(self.pg0, p_ether / t["send"] * 1, self.pg1)
Fahad Naeem0891b6a2022-05-10 01:03:52 -0500285 for p in rx:
286 self.validate(p[1], reply)
287
288 self.ping_check()
289
Alexander Chernavind0e0e062022-05-13 08:34:34 +0000290 self.vapi.pnat_binding_detach(
291 sw_if_index=t["sw_if_index"],
292 attachment=t["input"],
293 binding_index=rv.binding_index,
294 )
Fahad Naeem0891b6a2022-05-10 01:03:52 -0500295 self.vapi.pnat_binding_del(binding_index=rv.binding_index)
296
297 def test_pnat_wildcard_proto_v2(self):
Alexander Chernavind0e0e062022-05-13 08:34:34 +0000298 """PNAT test wildcard IP protocol using pnat_binding_add_v2"""
Fahad Naeem0891b6a2022-05-10 01:03:52 -0500299
300 PNAT_IP4_INPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_INPUT
Alexander Chernavind0e0e062022-05-13 08:34:34 +0000301 PNAT_IP4_OUTPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_OUTPUT
Fahad Naeem0891b6a2022-05-10 01:03:52 -0500302
303 tests = [
304 {
Alexander Chernavind0e0e062022-05-13 08:34:34 +0000305 "input": PNAT_IP4_INPUT,
306 "sw_if_index": self.pg0.sw_if_index,
307 "match": {"mask": 0x42, "dst": "10.10.10.10"},
308 "rewrite": {"mask": 0x42, "dst": self.pg1.remote_ip4},
309 "send": (IP(src=self.pg0.remote_ip4, dst="10.10.10.10")),
310 "reply": (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)),
Fahad Naeem0891b6a2022-05-10 01:03:52 -0500311 },
312 {
Alexander Chernavind0e0e062022-05-13 08:34:34 +0000313 "input": PNAT_IP4_OUTPUT,
314 "sw_if_index": self.pg1.sw_if_index,
315 "match": {"mask": 0x41, "src": self.pg0.remote_ip4},
316 "rewrite": {"mask": 0x41, "src": "11.11.11.11"},
317 "send": (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)),
318 "reply": (IP(src="11.11.11.11", dst=self.pg1.remote_ip4)),
Fahad Naeem0891b6a2022-05-10 01:03:52 -0500319 },
320 ]
321
322 p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
323 for t in tests:
Alexander Chernavind0e0e062022-05-13 08:34:34 +0000324 rv = self.vapi.pnat_binding_add_v2(match=t["match"], rewrite=t["rewrite"])
325 self.vapi.pnat_binding_attach(
326 sw_if_index=t["sw_if_index"],
327 attachment=t["input"],
328 binding_index=rv.binding_index,
329 )
Fahad Naeem0891b6a2022-05-10 01:03:52 -0500330
Alexander Chernavind0e0e062022-05-13 08:34:34 +0000331 reply = t["reply"]
Fahad Naeem0891b6a2022-05-10 01:03:52 -0500332 reply[IP].ttl -= 1
Alexander Chernavind0e0e062022-05-13 08:34:34 +0000333 rx = self.send_and_expect(self.pg0, p_ether / t["send"] * 1, self.pg1)
Fahad Naeem0891b6a2022-05-10 01:03:52 -0500334 for p in rx:
335 self.validate(p[1], reply)
336
337 self.ping_check()
338
Alexander Chernavind0e0e062022-05-13 08:34:34 +0000339 self.vapi.pnat_binding_detach(
340 sw_if_index=t["sw_if_index"],
341 attachment=t["input"],
342 binding_index=rv.binding_index,
343 )
Fahad Naeem0891b6a2022-05-10 01:03:52 -0500344 self.vapi.pnat_binding_del(binding_index=rv.binding_index)
345
Klement Sekerad9b0c6f2022-04-26 19:02:15 +0200346
347if __name__ == "__main__":
Ole Troan18327be2021-01-12 21:49:38 +0100348 unittest.main(testRunner=VppTestRunner)