blob: 198dd739d943eb53aef8063dd962ada311bb2769 [file] [log] [blame]
Neale Ranns29f3c7d2020-05-19 07:17:19 +00001#!/usr/bin/env python3
2
3import unittest
4
5from framework import VppTestCase, VppTestRunner
Nathan Skrzypczakaf897c52020-09-21 19:14:08 +02006from vpp_ip import DpoProto, INVALID_INDEX
7from itertools import product
Neale Ranns29f3c7d2020-05-19 07:17:19 +00008
9from scapy.packet import Raw
10from scapy.layers.l2 import Ether
Nathan Skrzypczakece39212020-09-08 15:16:08 +020011from scapy.layers.inet import IP, UDP, TCP, ICMP
12from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13from scapy.layers.inet6 import IPv6, IPerror6, ICMPv6DestUnreach
Nathan Skrzypczak613b2c32020-09-10 17:44:41 +020014from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
Nathan Skrzypczakece39212020-09-08 15:16:08 +020015
16import struct
Neale Ranns29f3c7d2020-05-19 07:17:19 +000017
18from ipaddress import ip_address, ip_network, \
19 IPv4Address, IPv6Address, IPv4Network, IPv6Network
20
21from vpp_object import VppObject
22from vpp_papi import VppEnum
23
24N_PKTS = 15
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +020025N_REMOTE_HOSTS = 3
26
27SRC = 0
28DST = 1
Neale Ranns29f3c7d2020-05-19 07:17:19 +000029
30
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +020031class CnatCommonTestCase(VppTestCase):
32 """ CNat common test class """
33
34 #
35 # turn the scanner off whilst testing otherwise sessions
36 # will time out
37 #
38 extra_vpp_punt_config = ["cnat", "{",
39 "session-db-buckets", "64",
40 "session-cleanup-timeout", "0.1",
41 "session-max-age", "1",
42 "tcp-max-age", "1",
43 "scanner", "off", "}"]
44
45 @classmethod
46 def setUpClass(cls):
47 super(CnatCommonTestCase, cls).setUpClass()
48
49 @classmethod
50 def tearDownClass(cls):
51 super(CnatCommonTestCase, cls).tearDownClass()
52
53
54class Endpoint(object):
Neale Ranns29f3c7d2020-05-19 07:17:19 +000055 """ CNat endpoint """
56
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +020057 def __init__(self, pg=None, pgi=None, port=0, is_v6=False, ip=None):
Neale Ranns29f3c7d2020-05-19 07:17:19 +000058 self.port = port
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +020059 self.is_v6 = is_v6
60 self.sw_if_index = INVALID_INDEX
61 if pg is not None and pgi is not None:
62 # pg interface specified and remote index
63 self.ip = self.get_ip46(pg.remote_hosts[pgi])
64 elif pg is not None:
65 self.ip = None
66 self.sw_if_index = pg.sw_if_index
67 elif ip is not None:
68 self.ip = ip
Nathan Skrzypczakaf897c52020-09-21 19:14:08 +020069 else:
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +020070 self.ip = "::" if self.is_v6 else "0.0.0.0"
71
72 def get_ip46(self, obj):
73 if self.is_v6:
74 return obj.ip6
75 return obj.ip4
76
77 def udpate(self, **kwargs):
78 self.__init__(**kwargs)
79
80 def _vpp_if_af(self):
81 if self.is_v6:
82 return VppEnum.vl_api_address_family_t.ADDRESS_IP6
83 return VppEnum.vl_api_address_family_t.ADDRESS_IP4
Neale Ranns29f3c7d2020-05-19 07:17:19 +000084
85 def encode(self):
86 return {'addr': self.ip,
Nathan Skrzypczakaf897c52020-09-21 19:14:08 +020087 'port': self.port,
88 'sw_if_index': self.sw_if_index,
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +020089 'if_af': self._vpp_if_af()}
Nathan Skrzypczakece39212020-09-08 15:16:08 +020090
Neale Ranns29f3c7d2020-05-19 07:17:19 +000091 def __str__(self):
92 return ("%s:%d" % (self.ip, self.port))
93
94
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +020095class Translation(VppObject):
Neale Ranns29f3c7d2020-05-19 07:17:19 +000096
97 def __init__(self, test, iproto, vip, paths):
98 self._test = test
99 self.vip = vip
100 self.iproto = iproto
101 self.paths = paths
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200102 self.id = None
Neale Ranns29f3c7d2020-05-19 07:17:19 +0000103
Nathan Skrzypczakaf897c52020-09-21 19:14:08 +0200104 def __str__(self):
105 return ("%s %s %s" % (self.vip, self.iproto, self.paths))
106
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200107 def _vl4_proto(self):
Neale Ranns29f3c7d2020-05-19 07:17:19 +0000108 ip_proto = VppEnum.vl_api_ip_proto_t
109 return {
110 UDP: ip_proto.IP_API_PROTO_UDP,
111 TCP: ip_proto.IP_API_PROTO_TCP,
112 }[self.iproto]
113
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200114 def _encoded_paths(self):
115 return [{'src_ep': src.encode(),
116 'dst_ep': dst.encode()} for (src, dst) in self.paths]
117
Neale Ranns29f3c7d2020-05-19 07:17:19 +0000118 def add_vpp_config(self):
119 r = self._test.vapi.cnat_translation_update(
120 {'vip': self.vip.encode(),
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200121 'ip_proto': self._vl4_proto(),
Neale Ranns29f3c7d2020-05-19 07:17:19 +0000122 'n_paths': len(self.paths),
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200123 'paths': self._encoded_paths()})
Neale Ranns29f3c7d2020-05-19 07:17:19 +0000124 self._test.registry.register(self, self._test.logger)
125 self.id = r.id
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200126 return self
Neale Ranns29f3c7d2020-05-19 07:17:19 +0000127
128 def remove_vpp_config(self):
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200129 assert(self.id is not None)
Nathan Skrzypczakaf897c52020-09-21 19:14:08 +0200130 self._test.vapi.cnat_translation_del(id=self.id)
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200131 return self
Neale Ranns29f3c7d2020-05-19 07:17:19 +0000132
133 def query_vpp_config(self):
Nathan Skrzypczakaf897c52020-09-21 19:14:08 +0200134 for t in self._test.vapi.cnat_translation_dump():
135 if self.id == t.translation.id:
136 return t.translation
137 return None
Neale Ranns29f3c7d2020-05-19 07:17:19 +0000138
Neale Ranns29f3c7d2020-05-19 07:17:19 +0000139
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200140class CnatTestContext(object):
141 """
142 Usage :
143
144 ctx = CnatTestContext(self, TCP, is_v6=True)
145
146 # send pg0.remote[0]:1234 -> pg1.remote[0]:6661
147 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661)
148
149 # We expect this to be NATed as
150 # pg2.remote[0]:<anyport> -> pg1.remote[0]:6661
151 ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661)
152
153 # After running cnat_expect, we can send back the received packet
154 # and expect it be 'unnated' so that we get the original packet
155 ctx.cnat_send_return().cnat_expect_return()
156
157 # same thing for ICMP errors
158 ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
159 """
160
161 def __init__(self, test, L4PROTO, is_v6):
162 self.L4PROTO = L4PROTO
163 self.is_v6 = is_v6
164 self._test = test
165
166 def get_ip46(self, obj):
167 if self.is_v6:
168 return obj.ip6
169 return obj.ip4
170
171 @property
172 def IP46(self):
173 return IPv6 if self.is_v6 else IP
174
175 def cnat_send(self, src_pg, src_id, src_port, dst_pg, dst_id, dst_port,
176 no_replies=False):
177 if isinstance(src_id, int):
178 self.src_addr = self.get_ip46(src_pg.remote_hosts[src_id])
179 else:
180 self.dst_addr = src_id
181 if isinstance(dst_id, int):
182 self.dst_addr = self.get_ip46(dst_pg.remote_hosts[dst_id])
183 else:
184 self.dst_addr = dst_id
185 self.src_port = src_port # also ICMP id
186 self.dst_port = dst_port # also ICMP type
187
188 if self.L4PROTO in [TCP, UDP]:
189 l4 = self.L4PROTO(sport=self.src_port, dport=self.dst_port)
190 elif self.L4PROTO in [ICMP] and not self.is_v6:
191 l4 = self.L4PROTO(id=self.src_port, type=self.dst_port)
192 elif self.L4PROTO in [ICMP] and self.is_v6:
193 l4 = ICMPv6EchoRequest(id=self.src_port)
194 p1 = (Ether(src=src_pg.remote_mac,
195 dst=src_pg.local_mac) /
196 self.IP46(src=self.src_addr, dst=self.dst_addr) /
197 l4 /
198 Raw())
199
200 if no_replies:
201 self._test.send_and_assert_no_replies(src_pg, p1 * N_PKTS, dst_pg)
202 else:
203 self.rxs = self._test.send_and_expect(src_pg, p1 * N_PKTS, dst_pg)
204 self.expected_src_pg = src_pg
205 self.expected_dst_pg = dst_pg
206 return self
207
208 def cnat_expect(self, src_pg, src_id, src_port, dst_pg, dst_id, dst_port):
209 if isinstance(src_id, int):
210 self.expect_src_addr = self.get_ip46(src_pg.remote_hosts[src_id])
211 else:
212 self.expect_src_addr = src_id
213 if isinstance(dst_id, int):
214 self.expect_dst_addr = self.get_ip46(dst_pg.remote_hosts[dst_id])
215 else:
216 self.expect_dst_addr = dst_id
217 self.expect_src_port = src_port
218 self.expect_dst_port = dst_port
219
220 if self.expect_src_port is None:
221 if self.L4PROTO in [TCP, UDP]:
222 self.expect_src_port = self.rxs[0][self.L4PROTO].sport
223 elif self.L4PROTO in [ICMP] and not self.is_v6:
224 self.expect_src_port = self.rxs[0][self.L4PROTO].id
225 elif self.L4PROTO in [ICMP] and self.is_v6:
226 self.expect_src_port = self.rxs[0][ICMPv6EchoRequest].id
227
228 for rx in self.rxs:
229 self._test.assert_packet_checksums_valid(rx)
230 self._test.assertEqual(rx[self.IP46].dst, self.expect_dst_addr)
231 self._test.assertEqual(rx[self.IP46].src, self.expect_src_addr)
232 if self.L4PROTO in [TCP, UDP]:
233 self._test.assertEqual(
234 rx[self.L4PROTO].dport, self.expect_dst_port)
235 self._test.assertEqual(
236 rx[self.L4PROTO].sport, self.expect_src_port)
237 elif self.L4PROTO in [ICMP] and not self.is_v6:
238 self._test.assertEqual(
239 rx[self.L4PROTO].type, self.expect_dst_port)
240 self._test.assertEqual(
241 rx[self.L4PROTO].id, self.expect_src_port)
242 elif self.L4PROTO in [ICMP] and self.is_v6:
243 self._test.assertEqual(
244 rx[ICMPv6EchoRequest].id, self.expect_src_port)
245 return self
246
247 def cnat_send_return(self):
248 """This sends the return traffic"""
249 if self.L4PROTO in [TCP, UDP]:
250 l4 = self.L4PROTO(sport=self.expect_dst_port,
251 dport=self.expect_src_port)
252 elif self.L4PROTO in [ICMP] and not self.is_v6:
253 # icmp type 0 if echo reply
254 l4 = self.L4PROTO(id=self.expect_src_port, type=0)
255 elif self.L4PROTO in [ICMP] and self.is_v6:
256 l4 = ICMPv6EchoReply(id=self.expect_src_port)
257 src_mac = self.expected_dst_pg.remote_mac
258 p1 = (Ether(src=src_mac, dst=self.expected_dst_pg.local_mac) /
259 self.IP46(src=self.expect_dst_addr, dst=self.expect_src_addr) /
260 l4 /
261 Raw())
262
263 self.return_rxs = self._test.send_and_expect(
264 self.expected_dst_pg, p1 * N_PKTS, self.expected_src_pg)
265 return self
266
267 def cnat_expect_return(self):
268 for rx in self.return_rxs:
269 self._test.assert_packet_checksums_valid(rx)
270 self._test.assertEqual(rx[self.IP46].dst, self.src_addr)
271 self._test.assertEqual(rx[self.IP46].src, self.dst_addr)
272 if self.L4PROTO in [TCP, UDP]:
273 self._test.assertEqual(rx[self.L4PROTO].dport, self.src_port)
274 self._test.assertEqual(rx[self.L4PROTO].sport, self.dst_port)
275 elif self.L4PROTO in [ICMP] and not self.is_v6:
276 # icmp type 0 if echo reply
277 self._test.assertEqual(rx[self.L4PROTO].type, 0)
278 self._test.assertEqual(rx[self.L4PROTO].id, self.src_port)
279 elif self.L4PROTO in [ICMP] and self.is_v6:
280 self._test.assertEqual(rx[ICMPv6EchoReply].id, self.src_port)
281 return self
282
283 def cnat_send_icmp_return_error(self):
284 """
285 This called after cnat_expect will send an icmp error
286 on the reverse path
287 """
288 ICMPelem = ICMPv6DestUnreach(code=1) if self.is_v6 else ICMP(type=11)
289 InnerIP = self.rxs[0][self.IP46]
290 p1 = (
291 Ether(src=self.expected_dst_pg.remote_mac,
292 dst=self.expected_dst_pg.local_mac) /
293 self.IP46(src=self.expect_dst_addr, dst=self.expect_src_addr) /
294 ICMPelem / InnerIP)
295 self.return_rxs = self._test.send_and_expect(
296 self.expected_dst_pg, p1 * N_PKTS, self.expected_src_pg)
297 return self
298
299 def cnat_expect_icmp_error_return(self):
300 ICMP46 = ICMPv6DestUnreach if self.is_v6 else ICMP
301 IP46err = IPerror6 if self.is_v6 else IPerror
302 L4err = TCPerror if self.L4PROTO is TCP else UDPerror
303 for rx in self.return_rxs:
304 self._test.assert_packet_checksums_valid(rx)
305 self._test.assertEqual(rx[self.IP46].dst, self.src_addr)
306 self._test.assertEqual(rx[self.IP46].src, self.dst_addr)
307 self._test.assertEqual(rx[ICMP46][IP46err].src, self.src_addr)
308 self._test.assertEqual(rx[ICMP46][IP46err].dst, self.dst_addr)
309 self._test.assertEqual(
310 rx[ICMP46][IP46err][L4err].sport, self.src_port)
311 self._test.assertEqual(
312 rx[ICMP46][IP46err][L4err].dport, self.dst_port)
313 return self
314
315# -------------------------------------------------------------------
316# -------------------------------------------------------------------
317# -------------------------------------------------------------------
318# -------------------------------------------------------------------
Neale Ranns29f3c7d2020-05-19 07:17:19 +0000319
320
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200321class TestCNatTranslation(CnatCommonTestCase):
Neale Ranns29f3c7d2020-05-19 07:17:19 +0000322 """ CNat Translation """
Neale Ranns29f3c7d2020-05-19 07:17:19 +0000323
324 @classmethod
325 def setUpClass(cls):
326 super(TestCNatTranslation, cls).setUpClass()
327
328 @classmethod
329 def tearDownClass(cls):
330 super(TestCNatTranslation, cls).tearDownClass()
331
332 def setUp(self):
333 super(TestCNatTranslation, self).setUp()
334
335 self.create_pg_interfaces(range(3))
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200336 self.pg0.generate_remote_hosts(N_REMOTE_HOSTS)
337 self.pg1.generate_remote_hosts(N_REMOTE_HOSTS)
Neale Ranns29f3c7d2020-05-19 07:17:19 +0000338
339 for i in self.pg_interfaces:
340 i.admin_up()
341 i.config_ip4()
342 i.resolve_arp()
343 i.config_ip6()
344 i.resolve_ndp()
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200345 i.configure_ipv4_neighbors()
346 i.configure_ipv6_neighbors()
Neale Ranns29f3c7d2020-05-19 07:17:19 +0000347
348 def tearDown(self):
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200349 for translation in self.translations:
350 translation.remove_vpp_config()
351
352 self.vapi.cnat_session_purge()
353 self.assertFalse(self.vapi.cnat_session_dump())
354
Neale Ranns29f3c7d2020-05-19 07:17:19 +0000355 for i in self.pg_interfaces:
356 i.unconfig_ip4()
357 i.unconfig_ip6()
358 i.admin_down()
359 super(TestCNatTranslation, self).tearDown()
360
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200361 def cnat_translation(self):
362 """ CNat Translation """
363 self.logger.info(self.vapi.cli("sh cnat client"))
364 self.logger.info(self.vapi.cli("sh cnat translation"))
Neale Ranns29f3c7d2020-05-19 07:17:19 +0000365
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200366 for nbr, translation in enumerate(self.translations):
367 vip = translation.vip
Neale Ranns29f3c7d2020-05-19 07:17:19 +0000368
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200369 #
370 # Test Flows to the VIP
371 #
372 ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
373 for src_pgi, sport in product(range(N_REMOTE_HOSTS), [1234, 1233]):
Neale Ranns29f3c7d2020-05-19 07:17:19 +0000374 # from client to vip
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200375 ctx.cnat_send(self.pg0, src_pgi, sport,
376 self.pg1, vip.ip, vip.port)
377 dst_port = translation.paths[0][DST].port
378 ctx.cnat_expect(self.pg0, src_pgi, sport,
379 self.pg1, nbr, dst_port)
Neale Ranns29f3c7d2020-05-19 07:17:19 +0000380 # from vip to client
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200381 ctx.cnat_send_return().cnat_expect_return()
Neale Ranns29f3c7d2020-05-19 07:17:19 +0000382
383 #
384 # packets to the VIP that do not match a
385 # translation are dropped
386 #
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200387 ctx.cnat_send(self.pg0, src_pgi, sport, self.pg1,
388 vip.ip, 6666, no_replies=True)
Neale Ranns29f3c7d2020-05-19 07:17:19 +0000389
390 #
391 # packets from the VIP that do not match a
392 # session are forwarded
393 #
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200394 ctx.cnat_send(self.pg1, nbr, 6666, self.pg0, src_pgi, sport)
395 ctx.cnat_expect(self.pg1, nbr, 6666, self.pg0, src_pgi, sport)
Neale Ranns29f3c7d2020-05-19 07:17:19 +0000396
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200397 #
398 # modify the translation to use a different backend
399 #
400 old_dst_port = translation.paths[0][DST].port
401 translation.paths[0][DST].udpate(
402 pg=self.pg2, pgi=0, port=5000, is_v6=vip.is_v6)
403 translation.add_vpp_config()
Neale Ranns29f3c7d2020-05-19 07:17:19 +0000404
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200405 #
406 # existing flows follow the old path
407 #
408 for src_pgi in range(N_REMOTE_HOSTS):
409 for sport in [1234, 1233]:
410 # from client to vip
411 ctx.cnat_send(self.pg0, src_pgi, sport,
412 self.pg1, vip.ip, vip.port)
413 ctx.cnat_expect(self.pg0, src_pgi, sport,
414 self.pg1, nbr, old_dst_port)
415 # from vip to client
416 ctx.cnat_send_return().cnat_expect_return()
Neale Ranns29f3c7d2020-05-19 07:17:19 +0000417
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200418 #
419 # new flows go to the new backend
420 #
421 for src_pgi in range(N_REMOTE_HOSTS):
422 ctx.cnat_send(self.pg0, src_pgi, 9999,
423 self.pg2, vip.ip, vip.port)
424 ctx.cnat_expect(self.pg0, src_pgi, 9999, self.pg2, 0, 5000)
Neale Ranns29f3c7d2020-05-19 07:17:19 +0000425
Neale Ranns29f3c7d2020-05-19 07:17:19 +0000426 self.logger.info(self.vapi.cli("sh cnat session verbose"))
427
428 #
Nathan Skrzypczakd63f73b2020-09-23 10:43:16 +0200429 # turn the scanner back on and wait until the sessions
Neale Ranns29f3c7d2020-05-19 07:17:19 +0000430 # all disapper
431 #
432 self.vapi.cli("test cnat scanner on")
433
434 n_tries = 0
435 sessions = self.vapi.cnat_session_dump()
436 while (len(sessions) and n_tries < 100):
437 n_tries += 1
438 sessions = self.vapi.cnat_session_dump()
439 self.sleep(2)
Nathan Skrzypczakaf897c52020-09-21 19:14:08 +0200440 self.logger.info(self.vapi.cli("show cnat session verbose"))
Neale Ranns29f3c7d2020-05-19 07:17:19 +0000441
442 self.assertTrue(n_tries < 100)
Nathan Skrzypczakece39212020-09-08 15:16:08 +0200443 self.vapi.cli("test cnat scanner off")
Neale Ranns29f3c7d2020-05-19 07:17:19 +0000444
445 #
446 # load some flows again and purge
447 #
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200448 for translation in self.translations:
449 vip = translation.vip
450 ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
451 for src_pgi in range(N_REMOTE_HOSTS):
452 for sport in [1234, 1233]:
Neale Ranns29f3c7d2020-05-19 07:17:19 +0000453 # from client to vip
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200454 ctx.cnat_send(self.pg0, src_pgi, sport,
455 self.pg2, vip.ip, vip.port)
456 ctx.cnat_expect(self.pg0, src_pgi,
457 sport, self.pg2, 0, 5000)
Neale Ranns29f3c7d2020-05-19 07:17:19 +0000458
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200459 def _test_icmp(self):
Neale Ranns29f3c7d2020-05-19 07:17:19 +0000460
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200461 #
462 # Testing ICMP
463 #
464 for nbr, translation in enumerate(self.translations):
465 vip = translation.vip
466 ctx = CnatTestContext(self, translation.iproto, vip.is_v6)
Neale Ranns29f3c7d2020-05-19 07:17:19 +0000467
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200468 #
469 # NATing ICMP errors
470 #
471 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, vip.ip, vip.port)
472 dst_port = translation.paths[0][DST].port
473 ctx.cnat_expect(self.pg0, 0, 1234, self.pg1, nbr, dst_port)
474 ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
Nathan Skrzypczakece39212020-09-08 15:16:08 +0200475
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200476 #
477 # ICMP errors with no VIP associated should not be
478 # modified
479 #
480 ctx.cnat_send(self.pg0, 0, 1234, self.pg2, 0, vip.port)
481 dst_port = translation.paths[0][DST].port
482 ctx.cnat_expect(self.pg0, 0, 1234, self.pg2, 0, vip.port)
483 ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
Nathan Skrzypczakece39212020-09-08 15:16:08 +0200484
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200485 def _make_translations_v4(self):
486 self.translations = []
487 self.translations.append(Translation(
488 self, TCP, Endpoint(ip="30.0.0.1", port=5555, is_v6=False),
489 [(
490 Endpoint(is_v6=False),
491 Endpoint(pg=self.pg1, pgi=0, port=4001, is_v6=False),
492 )]
493 ).add_vpp_config())
494 self.translations.append(Translation(
495 self, TCP, Endpoint(ip="30.0.0.2", port=5554, is_v6=False),
496 [(
497 Endpoint(is_v6=False),
498 Endpoint(pg=self.pg1, pgi=1, port=4002, is_v6=False),
499 )]
500 ).add_vpp_config())
501 self.translations.append(Translation(
502 self, UDP, Endpoint(ip="30.0.0.2", port=5553, is_v6=False),
503 [(
504 Endpoint(is_v6=False),
505 Endpoint(pg=self.pg1, pgi=2, port=4003, is_v6=False),
506 )]
507 ).add_vpp_config())
Nathan Skrzypczakece39212020-09-08 15:16:08 +0200508
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200509 def _make_translations_v6(self):
510 self.translations = []
511 self.translations.append(Translation(
512 self, TCP, Endpoint(ip="30::1", port=5555, is_v6=True),
513 [(
514 Endpoint(is_v6=True),
515 Endpoint(pg=self.pg1, pgi=0, port=4001, is_v6=True),
516 )]
517 ).add_vpp_config())
518 self.translations.append(Translation(
519 self, TCP, Endpoint(ip="30::2", port=5554, is_v6=True),
520 [(
521 Endpoint(is_v6=True),
522 Endpoint(pg=self.pg1, pgi=1, port=4002, is_v6=True),
523 )]
524 ).add_vpp_config())
525 self.translations.append(Translation(
526 self, UDP, Endpoint(ip="30::2", port=5553, is_v6=True),
527 [(
528 Endpoint(is_v6=True),
529 Endpoint(pg=self.pg1, pgi=2, port=4003, is_v6=True),
530 )]
531 ).add_vpp_config())
Nathan Skrzypczakece39212020-09-08 15:16:08 +0200532
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200533 def test_icmp4(self):
534 # """ CNat Translation icmp v4 """
535 self._make_translations_v4()
536 self._test_icmp()
Nathan Skrzypczakece39212020-09-08 15:16:08 +0200537
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200538 def test_icmp6(self):
539 # """ CNat Translation icmp v6 """
540 self._make_translations_v6()
541 self._test_icmp()
Nathan Skrzypczakece39212020-09-08 15:16:08 +0200542
Neale Ranns29f3c7d2020-05-19 07:17:19 +0000543 def test_cnat6(self):
544 # """ CNat Translation ipv6 """
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200545 self._make_translations_v6()
546 self.cnat_translation()
Neale Ranns29f3c7d2020-05-19 07:17:19 +0000547
548 def test_cnat4(self):
549 # """ CNat Translation ipv4 """
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200550 self._make_translations_v4()
551 self.cnat_translation()
Neale Ranns29f3c7d2020-05-19 07:17:19 +0000552
553
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200554class TestCNatSourceNAT(CnatCommonTestCase):
Neale Ranns29f3c7d2020-05-19 07:17:19 +0000555 """ CNat Source NAT """
Neale Ranns29f3c7d2020-05-19 07:17:19 +0000556
557 @classmethod
558 def setUpClass(cls):
559 super(TestCNatSourceNAT, cls).setUpClass()
560
561 @classmethod
562 def tearDownClass(cls):
563 super(TestCNatSourceNAT, cls).tearDownClass()
564
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200565 def _enable_disable_snat(self, is_enable=True):
Nathan Skrzypczak613b2c32020-09-10 17:44:41 +0200566 self.vapi.cnat_set_snat_addresses(
567 snat_ip4=self.pg2.remote_hosts[0].ip4,
Nathan Skrzypczak8786a4c2021-02-26 18:12:20 +0100568 snat_ip6=self.pg2.remote_hosts[0].ip6,
569 sw_if_index=INVALID_INDEX)
Nathan Skrzypczak613b2c32020-09-10 17:44:41 +0200570 self.vapi.feature_enable_disable(
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200571 enable=1 if is_enable else 0,
Nathan Skrzypczak613b2c32020-09-10 17:44:41 +0200572 arc_name="ip6-unicast",
Nathan Skrzypczak27647a22021-02-25 11:01:41 +0100573 feature_name="cnat-snat-ip6",
Nathan Skrzypczak613b2c32020-09-10 17:44:41 +0200574 sw_if_index=self.pg0.sw_if_index)
575 self.vapi.feature_enable_disable(
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200576 enable=1 if is_enable else 0,
Nathan Skrzypczak613b2c32020-09-10 17:44:41 +0200577 arc_name="ip4-unicast",
Nathan Skrzypczak27647a22021-02-25 11:01:41 +0100578 feature_name="cnat-snat-ip4",
Nathan Skrzypczak613b2c32020-09-10 17:44:41 +0200579 sw_if_index=self.pg0.sw_if_index)
580
Nathan Skrzypczak3fd77f72021-02-25 17:39:03 +0100581 policie_tbls = VppEnum.vl_api_cnat_snat_policy_table_t
582 self.vapi.cnat_set_snat_policy(
583 policy=VppEnum.vl_api_cnat_snat_policies_t.CNAT_POLICY_IF_PFX)
584 for i in self.pg_interfaces:
585 self.vapi.cnat_snat_policy_add_del_if(
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200586 sw_if_index=i.sw_if_index, is_add=1 if is_enable else 0,
Nathan Skrzypczak3fd77f72021-02-25 17:39:03 +0100587 table=policie_tbls.CNAT_POLICY_INCLUDE_V6)
588 self.vapi.cnat_snat_policy_add_del_if(
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200589 sw_if_index=i.sw_if_index, is_add=1 if is_enable else 0,
Nathan Skrzypczak3fd77f72021-02-25 17:39:03 +0100590 table=policie_tbls.CNAT_POLICY_INCLUDE_V4)
591
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200592 def setUp(self):
593 super(TestCNatSourceNAT, self).setUp()
594
595 self.create_pg_interfaces(range(3))
596 self.pg1.generate_remote_hosts(2)
597
598 for i in self.pg_interfaces:
599 i.admin_up()
600 i.config_ip4()
601 i.resolve_arp()
602 i.config_ip6()
603 i.resolve_ndp()
604 i.configure_ipv6_neighbors()
605 i.configure_ipv4_neighbors()
606
607 self._enable_disable_snat(is_enable=True)
608
Neale Ranns29f3c7d2020-05-19 07:17:19 +0000609 def tearDown(self):
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200610 self._enable_disable_snat(is_enable=True)
611
Nathan Skrzypczak613b2c32020-09-10 17:44:41 +0200612 self.vapi.cnat_session_purge()
Neale Ranns29f3c7d2020-05-19 07:17:19 +0000613 for i in self.pg_interfaces:
614 i.unconfig_ip4()
615 i.unconfig_ip6()
616 i.admin_down()
617 super(TestCNatSourceNAT, self).tearDown()
618
Nathan Skrzypczak613b2c32020-09-10 17:44:41 +0200619 def test_snat_v6(self):
620 # """ CNat Source Nat v6 """
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200621 self.sourcenat_test_tcp_udp_conf(TCP, is_v6=True)
622 self.sourcenat_test_tcp_udp_conf(UDP, is_v6=True)
623 self.sourcenat_test_icmp_echo_conf(is_v6=True)
Neale Ranns29f3c7d2020-05-19 07:17:19 +0000624
Nathan Skrzypczak613b2c32020-09-10 17:44:41 +0200625 def test_snat_v4(self):
626 # """ CNat Source Nat v4 """
627 self.sourcenat_test_tcp_udp_conf(TCP)
628 self.sourcenat_test_tcp_udp_conf(UDP)
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200629 self.sourcenat_test_icmp_echo_conf()
Neale Ranns29f3c7d2020-05-19 07:17:19 +0000630
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200631 def sourcenat_test_icmp_echo_conf(self, is_v6=False):
632 ctx = CnatTestContext(self, ICMP, is_v6=is_v6)
633 # 8 is ICMP type echo (v4 only)
634 ctx.cnat_send(self.pg0, 0, 0xfeed, self.pg1, 0, 8)
635 ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 8)
636 ctx.cnat_send_return().cnat_expect_return()
Neale Ranns29f3c7d2020-05-19 07:17:19 +0000637
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200638 def sourcenat_test_tcp_udp_conf(self, L4PROTO, is_v6=False):
639 ctx = CnatTestContext(self, L4PROTO, is_v6)
640 # we should source NAT
641 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661)
642 ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661)
643 ctx.cnat_send_return().cnat_expect_return()
Neale Ranns29f3c7d2020-05-19 07:17:19 +0000644
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200645 # exclude dst address of pg1.1 from snat
646 if is_v6:
647 exclude_prefix = ip_network(
648 "%s/100" % self.pg1.remote_hosts[1].ip6, strict=False)
649 else:
650 exclude_prefix = ip_network(
651 "%s/16" % self.pg1.remote_hosts[1].ip4, strict=False)
Nathan Skrzypczak613b2c32020-09-10 17:44:41 +0200652
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200653 # add remote host to exclude list
654 self.vapi.cnat_snat_policy_add_del_exclude_pfx(
655 prefix=exclude_prefix, is_add=1)
Nathan Skrzypczak613b2c32020-09-10 17:44:41 +0200656
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200657 # We should not source NAT the id=1
658 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
659 ctx.cnat_expect(self.pg0, 0, 1234, self.pg1, 1, 6661)
660 ctx.cnat_send_return().cnat_expect_return()
Nathan Skrzypczak613b2c32020-09-10 17:44:41 +0200661
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200662 # But we should source NAT the id=0
663 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661)
664 ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661)
665 ctx.cnat_send_return().cnat_expect_return()
Nathan Skrzypczak613b2c32020-09-10 17:44:41 +0200666
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200667 # remove remote host from exclude list
668 self.vapi.cnat_snat_policy_add_del_exclude_pfx(
669 prefix=exclude_prefix, is_add=0)
670 self.vapi.cnat_session_purge()
Nathan Skrzypczak613b2c32020-09-10 17:44:41 +0200671
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200672 # We should source NAT again
673 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
674 ctx.cnat_expect(self.pg2, 0, None, self.pg1, 1, 6661)
675 ctx.cnat_send_return().cnat_expect_return()
Nathan Skrzypczak613b2c32020-09-10 17:44:41 +0200676
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200677 # test return ICMP error nating
678 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661)
679 ctx.cnat_expect(self.pg2, 0, None, self.pg1, 1, 6661)
680 ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return()
Nathan Skrzypczak613b2c32020-09-10 17:44:41 +0200681
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200682 self.vapi.cnat_session_purge()
Neale Ranns29f3c7d2020-05-19 07:17:19 +0000683
Nathan Skrzypczakd63f73b2020-09-23 10:43:16 +0200684
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200685class TestCNatDHCP(CnatCommonTestCase):
Nathan Skrzypczakaf897c52020-09-21 19:14:08 +0200686 """ CNat Translation """
Nathan Skrzypczakaf897c52020-09-21 19:14:08 +0200687
688 @classmethod
689 def setUpClass(cls):
690 super(TestCNatDHCP, cls).setUpClass()
691
692 @classmethod
693 def tearDownClass(cls):
694 super(TestCNatDHCP, cls).tearDownClass()
695
696 def tearDown(self):
697 for i in self.pg_interfaces:
698 i.admin_down()
699 super(TestCNatDHCP, self).tearDown()
700
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200701 def make_addr(self, sw_if_index, addr_id, is_v6):
Nathan Skrzypczakaf897c52020-09-21 19:14:08 +0200702 if is_v6:
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200703 return "fd01:%x::%u" % (sw_if_index, addr_id + 1)
704 return "172.16.%u.%u" % (sw_if_index, addr_id)
Nathan Skrzypczakaf897c52020-09-21 19:14:08 +0200705
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200706 def make_prefix(self, sw_if_index, addr_id, is_v6):
Nathan Skrzypczakaf897c52020-09-21 19:14:08 +0200707 if is_v6:
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200708 return "%s/128" % self.make_addr(sw_if_index, addr_id, is_v6)
709 return "%s/32" % self.make_addr(sw_if_index, addr_id, is_v6)
Nathan Skrzypczakaf897c52020-09-21 19:14:08 +0200710
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200711 def check_resolved(self, tr, addr_id, is_v6=False):
712 qt = tr.query_vpp_config()
713 self.assertEqual(str(qt.vip.addr), self.make_addr(
714 tr.vip.sw_if_index, addr_id, is_v6))
715 self.assertEqual(len(qt.paths), len(tr.paths))
716 for path_tr, path_qt in zip(tr.paths, qt.paths):
717 src_qt = path_qt.src_ep
718 dst_qt = path_qt.dst_ep
719 src_tr, dst_tr = path_tr
720 self.assertEqual(str(src_qt.addr), self.make_addr(
721 src_tr.sw_if_index, addr_id, is_v6))
722 self.assertEqual(str(dst_qt.addr), self.make_addr(
723 dst_tr.sw_if_index, addr_id, is_v6))
Nathan Skrzypczakaf897c52020-09-21 19:14:08 +0200724
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200725 def add_del_address(self, pg, addr_id, is_add=True, is_v6=False):
726 self.vapi.sw_interface_add_del_address(
727 sw_if_index=pg.sw_if_index,
728 prefix=self.make_prefix(pg.sw_if_index, addr_id, is_v6),
729 is_add=1 if is_add else 0)
730
731 def _test_dhcp_v46(self, is_v6):
732 self.create_pg_interfaces(range(4))
733 for i in self.pg_interfaces:
734 i.admin_up()
735 paths = [
736 (Endpoint(pg=self.pg1, is_v6=is_v6),
737 Endpoint(pg=self.pg2, is_v6=is_v6)),
738 (Endpoint(pg=self.pg1, is_v6=is_v6),
739 Endpoint(pg=self.pg3, is_v6=is_v6))
740 ]
741 ep = Endpoint(pg=self.pg0, is_v6=is_v6)
742 t = Translation(self, TCP, ep, paths).add_vpp_config()
743 # Add an address on every interface
744 # and check it is reflected in the cnat config
745 for pg in self.pg_interfaces:
746 self.add_del_address(pg, addr_id=0, is_add=True, is_v6=is_v6)
747 self.check_resolved(t, addr_id=0, is_v6=is_v6)
748 # Add a new address on every interface, remove the old one
749 # and check it is reflected in the cnat config
750 for pg in self.pg_interfaces:
751 self.add_del_address(pg, addr_id=1, is_add=True, is_v6=is_v6)
752 self.add_del_address(pg, addr_id=0, is_add=False, is_v6=is_v6)
753 self.check_resolved(t, addr_id=1, is_v6=is_v6)
754 # remove the configuration
755 for pg in self.pg_interfaces:
756 self.add_del_address(pg, addr_id=1, is_add=False, is_v6=is_v6)
757 t.remove_vpp_config()
Nathan Skrzypczakaf897c52020-09-21 19:14:08 +0200758
759 def test_dhcp_v4(self):
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200760 self._test_dhcp_v46(False)
Nathan Skrzypczakaf897c52020-09-21 19:14:08 +0200761
762 def test_dhcp_v6(self):
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200763 self._test_dhcp_v46(True)
Nathan Skrzypczakaf897c52020-09-21 19:14:08 +0200764
765 def test_dhcp_snat(self):
766 self.create_pg_interfaces(range(1))
767 for i in self.pg_interfaces:
768 i.admin_up()
769 self.vapi.cnat_set_snat_addresses(sw_if_index=self.pg0.sw_if_index)
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200770 # Add an address on every interface
771 # and check it is reflected in the cnat config
772 for pg in self.pg_interfaces:
773 self.add_del_address(pg, addr_id=0, is_add=True, is_v6=False)
774 self.add_del_address(pg, addr_id=0, is_add=True, is_v6=True)
Nathan Skrzypczakaf897c52020-09-21 19:14:08 +0200775 r = self.vapi.cnat_get_snat_addresses()
776 self.assertEqual(str(r.snat_ip4), self.make_addr(
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200777 self.pg0.sw_if_index, addr_id=0, is_v6=False))
Nathan Skrzypczakaf897c52020-09-21 19:14:08 +0200778 self.assertEqual(str(r.snat_ip6), self.make_addr(
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200779 self.pg0.sw_if_index, addr_id=0, is_v6=True))
780 # Add a new address on every interface, remove the old one
781 # and check it is reflected in the cnat config
782 for pg in self.pg_interfaces:
783 self.add_del_address(pg, addr_id=1, is_add=True, is_v6=False)
784 self.add_del_address(pg, addr_id=1, is_add=True, is_v6=True)
785 self.add_del_address(pg, addr_id=0, is_add=False, is_v6=False)
786 self.add_del_address(pg, addr_id=0, is_add=False, is_v6=True)
Nathan Skrzypczakaf897c52020-09-21 19:14:08 +0200787 r = self.vapi.cnat_get_snat_addresses()
788 self.assertEqual(str(r.snat_ip4), self.make_addr(
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200789 self.pg0.sw_if_index, addr_id=1, is_v6=False))
Nathan Skrzypczakaf897c52020-09-21 19:14:08 +0200790 self.assertEqual(str(r.snat_ip6), self.make_addr(
Nathan Skrzypczak1dff6b32021-10-20 17:41:07 +0200791 self.pg0.sw_if_index, addr_id=1, is_v6=True))
792 # remove the configuration
793 for pg in self.pg_interfaces:
794 self.add_del_address(pg, addr_id=1, is_add=False, is_v6=False)
795 self.add_del_address(pg, addr_id=1, is_add=False, is_v6=True)
Nathan Skrzypczak8786a4c2021-02-26 18:12:20 +0100796 self.vapi.cnat_set_snat_addresses(sw_if_index=INVALID_INDEX)
Nathan Skrzypczakaf897c52020-09-21 19:14:08 +0200797
798
Neale Ranns29f3c7d2020-05-19 07:17:19 +0000799if __name__ == '__main__':
800 unittest.main(testRunner=VppTestRunner)