blob: 8f5995e61fa1c2af2bd990498474839bdb7c2c9f [file] [log] [blame]
Ole Troan2c6639c2019-12-19 11:55:54 +01001#!/usr/bin/env python3
2
3import socket
4import unittest
5import struct
6import random
7
Andrew Yourtchenko8dc0d482021-01-29 13:17:19 +00008from framework import tag_fixme_vpp_workers
Klement Sekerab23ffd72021-05-31 16:08:53 +02009from framework import VppTestCase, VppTestRunner
Ole Troan2c6639c2019-12-19 11:55:54 +010010
11import scapy.compat
12from scapy.layers.inet import IP, TCP, UDP, ICMP
13from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
14from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, \
15 ICMPv6ND_NS, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, fragment6
16from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
17from scapy.layers.l2 import Ether, ARP, GRE
18from scapy.data import IP_PROTOS
19from scapy.packet import bind_layers, Raw
20from util import ppp
21from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
22from time import sleep
23from util import ip4_range
24from vpp_papi import mac_pton
25from syslog_rfc5424_parser import SyslogMessage, ParseError
26from syslog_rfc5424_parser.constants import SyslogFacility, SyslogSeverity
27from io import BytesIO
28from vpp_papi import VppEnum
29from vpp_ip_route import VppIpRoute, VppRoutePath, FibPathType
30from vpp_neighbor import VppNeighbor
31from scapy.all import bind_layers, Packet, ByteEnumField, ShortField, \
32 IPField, IntField, LongField, XByteField, FlagsField, FieldLenField, \
33 PacketListField
34from ipaddress import IPv6Network
35
36
Andrew Yourtchenko8dc0d482021-01-29 13:17:19 +000037@tag_fixme_vpp_workers
Ole Troan2c6639c2019-12-19 11:55:54 +010038class TestDSlite(VppTestCase):
39 """ DS-Lite Test Cases """
40
41 @classmethod
42 def setUpClass(cls):
43 super(TestDSlite, cls).setUpClass()
44
45 try:
46 cls.nat_addr = '10.0.0.3'
47
48 cls.create_pg_interfaces(range(3))
49 cls.pg0.admin_up()
50 cls.pg0.config_ip4()
51 cls.pg0.resolve_arp()
52 cls.pg1.admin_up()
53 cls.pg1.config_ip6()
54 cls.pg1.generate_remote_hosts(2)
55 cls.pg1.configure_ipv6_neighbors()
56 cls.pg2.admin_up()
57 cls.pg2.config_ip4()
58 cls.pg2.resolve_arp()
59
60 except Exception:
61 super(TestDSlite, cls).tearDownClass()
62 raise
63
64 @classmethod
65 def tearDownClass(cls):
66 super(TestDSlite, cls).tearDownClass()
67
68 def verify_syslog_apmadd(self, data, isaddr, isport, xsaddr, xsport,
69 sv6enc, proto):
70 message = data.decode('utf-8')
71 try:
72 message = SyslogMessage.parse(message)
73 except ParseError as e:
74 self.logger.error(e)
75 else:
76 self.assertEqual(message.severity, SyslogSeverity.info)
77 self.assertEqual(message.appname, 'NAT')
78 self.assertEqual(message.msgid, 'APMADD')
79 sd_params = message.sd.get('napmap')
80 self.assertTrue(sd_params is not None)
81 self.assertEqual(sd_params.get('IATYP'), 'IPv4')
82 self.assertEqual(sd_params.get('ISADDR'), isaddr)
83 self.assertEqual(sd_params.get('ISPORT'), "%d" % isport)
84 self.assertEqual(sd_params.get('XATYP'), 'IPv4')
85 self.assertEqual(sd_params.get('XSADDR'), xsaddr)
86 self.assertEqual(sd_params.get('XSPORT'), "%d" % xsport)
87 self.assertEqual(sd_params.get('PROTO'), "%d" % proto)
88 self.assertTrue(sd_params.get('SSUBIX') is not None)
89 self.assertEqual(sd_params.get('SV6ENC'), sv6enc)
90
91 def test_dslite(self):
92 """ Test DS-Lite """
93 nat_config = self.vapi.nat_show_config()
94 self.assertEqual(0, nat_config.dslite_ce)
95
96 self.vapi.dslite_add_del_pool_addr_range(start_addr=self.nat_addr,
97 end_addr=self.nat_addr,
98 is_add=1)
99 aftr_ip4 = '192.0.0.1'
100 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
101 self.vapi.dslite_set_aftr_addr(ip4_addr=aftr_ip4, ip6_addr=aftr_ip6)
102 self.vapi.syslog_set_sender(self.pg2.local_ip4, self.pg2.remote_ip4)
103
104 # UDP
105 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
106 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
107 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
108 UDP(sport=20000, dport=10000))
109 self.pg1.add_stream(p)
110 self.pg_enable_capture(self.pg_interfaces)
111 self.pg_start()
112 capture = self.pg0.get_capture(1)
113 capture = capture[0]
114 self.assertFalse(capture.haslayer(IPv6))
115 self.assertEqual(capture[IP].src, self.nat_addr)
116 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
117 self.assertNotEqual(capture[UDP].sport, 20000)
118 self.assertEqual(capture[UDP].dport, 10000)
119 self.assert_packet_checksums_valid(capture)
120 out_port = capture[UDP].sport
121 capture = self.pg2.get_capture(1)
122 self.verify_syslog_apmadd(capture[0][Raw].load, '192.168.1.1',
123 20000, self.nat_addr, out_port,
124 self.pg1.remote_hosts[0].ip6, IP_PROTOS.udp)
125
126 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
127 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
128 UDP(sport=10000, dport=out_port))
129 self.pg0.add_stream(p)
130 self.pg_enable_capture(self.pg_interfaces)
131 self.pg_start()
132 capture = self.pg1.get_capture(1)
133 capture = capture[0]
134 self.assertEqual(capture[IPv6].src, aftr_ip6)
135 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
136 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
137 self.assertEqual(capture[IP].dst, '192.168.1.1')
138 self.assertEqual(capture[UDP].sport, 10000)
139 self.assertEqual(capture[UDP].dport, 20000)
140 self.assert_packet_checksums_valid(capture)
141
142 # TCP
143 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
144 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
145 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
146 TCP(sport=20001, dport=10001))
147 self.pg1.add_stream(p)
148 self.pg_enable_capture(self.pg_interfaces)
149 self.pg_start()
150 capture = self.pg0.get_capture(1)
151 capture = capture[0]
152 self.assertFalse(capture.haslayer(IPv6))
153 self.assertEqual(capture[IP].src, self.nat_addr)
154 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
155 self.assertNotEqual(capture[TCP].sport, 20001)
156 self.assertEqual(capture[TCP].dport, 10001)
157 self.assert_packet_checksums_valid(capture)
158 out_port = capture[TCP].sport
159
160 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
161 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
162 TCP(sport=10001, dport=out_port))
163 self.pg0.add_stream(p)
164 self.pg_enable_capture(self.pg_interfaces)
165 self.pg_start()
166 capture = self.pg1.get_capture(1)
167 capture = capture[0]
168 self.assertEqual(capture[IPv6].src, aftr_ip6)
169 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
170 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
171 self.assertEqual(capture[IP].dst, '192.168.1.1')
172 self.assertEqual(capture[TCP].sport, 10001)
173 self.assertEqual(capture[TCP].dport, 20001)
174 self.assert_packet_checksums_valid(capture)
175
176 # ICMP
177 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
178 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
179 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
180 ICMP(id=4000, type='echo-request'))
181 self.pg1.add_stream(p)
182 self.pg_enable_capture(self.pg_interfaces)
183 self.pg_start()
184 capture = self.pg0.get_capture(1)
185 capture = capture[0]
186 self.assertFalse(capture.haslayer(IPv6))
187 self.assertEqual(capture[IP].src, self.nat_addr)
188 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
189 self.assertNotEqual(capture[ICMP].id, 4000)
190 self.assert_packet_checksums_valid(capture)
191 out_id = capture[ICMP].id
192
193 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
194 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
195 ICMP(id=out_id, type='echo-reply'))
196 self.pg0.add_stream(p)
197 self.pg_enable_capture(self.pg_interfaces)
198 self.pg_start()
199 capture = self.pg1.get_capture(1)
200 capture = capture[0]
201 self.assertEqual(capture[IPv6].src, aftr_ip6)
202 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
203 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
204 self.assertEqual(capture[IP].dst, '192.168.1.1')
205 self.assertEqual(capture[ICMP].id, 4000)
206 self.assert_packet_checksums_valid(capture)
207
208 # ping DS-Lite AFTR tunnel endpoint address
209 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
210 IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
211 ICMPv6EchoRequest())
212 self.pg1.add_stream(p)
213 self.pg_enable_capture(self.pg_interfaces)
214 self.pg_start()
215 capture = self.pg1.get_capture(1)
216 capture = capture[0]
217 self.assertEqual(capture[IPv6].src, aftr_ip6)
218 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
219 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
220
221 b4s = self.statistics.get_counter('/dslite/total-b4s')
222 self.assertEqual(b4s[0][0], 2)
223 sessions = self.statistics.get_counter('/dslite/total-sessions')
224 self.assertEqual(sessions[0][0], 3)
225
226 def tearDown(self):
227 super(TestDSlite, self).tearDown()
228
229 def show_commands_at_teardown(self):
230 self.logger.info(self.vapi.cli("show dslite pool"))
231 self.logger.info(
232 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
233 self.logger.info(self.vapi.cli("show dslite sessions"))
234
235
Vladimir Ratnikov958919f2020-04-13 06:36:19 -0400236class TestDSliteCE(VppTestCase):
Ole Troan2c6639c2019-12-19 11:55:54 +0100237 """ DS-Lite CE Test Cases """
238
239 @classmethod
240 def setUpConstants(cls):
241 super(TestDSliteCE, cls).setUpConstants()
Vladimir Ratnikov958919f2020-04-13 06:36:19 -0400242 cls.vpp_cmdline.extend(["dslite", "{", "ce", "}"])
Ole Troan2c6639c2019-12-19 11:55:54 +0100243
244 @classmethod
245 def setUpClass(cls):
246 super(TestDSliteCE, cls).setUpClass()
247
248 try:
249 cls.create_pg_interfaces(range(2))
250 cls.pg0.admin_up()
251 cls.pg0.config_ip4()
252 cls.pg0.resolve_arp()
253 cls.pg1.admin_up()
254 cls.pg1.config_ip6()
255 cls.pg1.generate_remote_hosts(1)
256 cls.pg1.configure_ipv6_neighbors()
257
258 except Exception:
259 super(TestDSliteCE, cls).tearDownClass()
260 raise
261
262 @classmethod
263 def tearDownClass(cls):
264 super(TestDSliteCE, cls).tearDownClass()
265
266 def test_dslite_ce(self):
267 """ Test DS-Lite CE """
268
Vladimir Ratnikov958919f2020-04-13 06:36:19 -0400269 # TODO: add message to retrieve dslite config
270 # nat_config = self.vapi.nat_show_config()
271 # self.assertEqual(1, nat_config.dslite_ce)
Ole Troan2c6639c2019-12-19 11:55:54 +0100272
273 b4_ip4 = '192.0.0.2'
274 b4_ip6 = '2001:db8:62aa::375e:f4c1:1'
275 self.vapi.dslite_set_b4_addr(ip4_addr=b4_ip4, ip6_addr=b4_ip6)
276
277 aftr_ip4 = '192.0.0.1'
278 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
279 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
280 self.vapi.dslite_set_aftr_addr(ip4_addr=aftr_ip4, ip6_addr=aftr_ip6)
281
282 r1 = VppIpRoute(self, aftr_ip6, 128,
283 [VppRoutePath(self.pg1.remote_ip6,
284 self.pg1.sw_if_index)])
285 r1.add_vpp_config()
286
287 # UDP encapsulation
288 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
289 IP(dst=self.pg1.remote_ip4, src=self.pg0.remote_ip4) /
290 UDP(sport=10000, dport=20000))
291 self.pg0.add_stream(p)
292 self.pg_enable_capture(self.pg_interfaces)
293 self.pg_start()
294 capture = self.pg1.get_capture(1)
295 capture = capture[0]
296 self.assertEqual(capture[IPv6].src, b4_ip6)
297 self.assertEqual(capture[IPv6].dst, aftr_ip6)
298 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
299 self.assertEqual(capture[IP].dst, self.pg1.remote_ip4)
300 self.assertEqual(capture[UDP].sport, 10000)
301 self.assertEqual(capture[UDP].dport, 20000)
302 self.assert_packet_checksums_valid(capture)
303
304 # UDP decapsulation
305 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
306 IPv6(dst=b4_ip6, src=aftr_ip6) /
307 IP(dst=self.pg0.remote_ip4, src=self.pg1.remote_ip4) /
308 UDP(sport=20000, dport=10000))
309 self.pg1.add_stream(p)
310 self.pg_enable_capture(self.pg_interfaces)
311 self.pg_start()
312 capture = self.pg0.get_capture(1)
313 capture = capture[0]
314 self.assertFalse(capture.haslayer(IPv6))
315 self.assertEqual(capture[IP].src, self.pg1.remote_ip4)
316 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
317 self.assertEqual(capture[UDP].sport, 20000)
318 self.assertEqual(capture[UDP].dport, 10000)
319 self.assert_packet_checksums_valid(capture)
320
321 # ping DS-Lite B4 tunnel endpoint address
322 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
323 IPv6(src=self.pg1.remote_hosts[0].ip6, dst=b4_ip6) /
324 ICMPv6EchoRequest())
325 self.pg1.add_stream(p)
326 self.pg_enable_capture(self.pg_interfaces)
327 self.pg_start()
328 capture = self.pg1.get_capture(1)
329 capture = capture[0]
330 self.assertEqual(capture[IPv6].src, b4_ip6)
331 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
332 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
333
334 def tearDown(self):
335 super(TestDSliteCE, self).tearDown()
336
337 def show_commands_at_teardown(self):
338 self.logger.info(
339 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
340 self.logger.info(
341 self.vapi.cli("show dslite b4-tunnel-endpoint-address"))