blob: af256e39fc486c17fbadff7e6515fb5c50946dad [file] [log] [blame]
Ole Troan2c6639c2019-12-19 11:55:54 +01001#!/usr/bin/env python3
2
3import socket
4import unittest
5import struct
6import random
7
8from framework import VppTestCase, VppTestRunner, running_extended_tests
9
10import scapy.compat
11from scapy.layers.inet import IP, TCP, UDP, ICMP
12from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, \
14 ICMPv6ND_NS, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, fragment6
15from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
16from scapy.layers.l2 import Ether, ARP, GRE
17from scapy.data import IP_PROTOS
18from scapy.packet import bind_layers, Raw
19from util import ppp
20from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21from time import sleep
22from util import ip4_range
23from vpp_papi import mac_pton
24from syslog_rfc5424_parser import SyslogMessage, ParseError
25from syslog_rfc5424_parser.constants import SyslogFacility, SyslogSeverity
26from io import BytesIO
27from vpp_papi import VppEnum
28from vpp_ip_route import VppIpRoute, VppRoutePath, FibPathType
29from vpp_neighbor import VppNeighbor
30from scapy.all import bind_layers, Packet, ByteEnumField, ShortField, \
31 IPField, IntField, LongField, XByteField, FlagsField, FieldLenField, \
32 PacketListField
33from ipaddress import IPv6Network
34
35
36class TestDSlite(VppTestCase):
37 """ DS-Lite Test Cases """
38
39 @classmethod
40 def setUpClass(cls):
41 super(TestDSlite, cls).setUpClass()
42
43 try:
44 cls.nat_addr = '10.0.0.3'
45
46 cls.create_pg_interfaces(range(3))
47 cls.pg0.admin_up()
48 cls.pg0.config_ip4()
49 cls.pg0.resolve_arp()
50 cls.pg1.admin_up()
51 cls.pg1.config_ip6()
52 cls.pg1.generate_remote_hosts(2)
53 cls.pg1.configure_ipv6_neighbors()
54 cls.pg2.admin_up()
55 cls.pg2.config_ip4()
56 cls.pg2.resolve_arp()
57
58 except Exception:
59 super(TestDSlite, cls).tearDownClass()
60 raise
61
62 @classmethod
63 def tearDownClass(cls):
64 super(TestDSlite, cls).tearDownClass()
65
66 def verify_syslog_apmadd(self, data, isaddr, isport, xsaddr, xsport,
67 sv6enc, proto):
68 message = data.decode('utf-8')
69 try:
70 message = SyslogMessage.parse(message)
71 except ParseError as e:
72 self.logger.error(e)
73 else:
74 self.assertEqual(message.severity, SyslogSeverity.info)
75 self.assertEqual(message.appname, 'NAT')
76 self.assertEqual(message.msgid, 'APMADD')
77 sd_params = message.sd.get('napmap')
78 self.assertTrue(sd_params is not None)
79 self.assertEqual(sd_params.get('IATYP'), 'IPv4')
80 self.assertEqual(sd_params.get('ISADDR'), isaddr)
81 self.assertEqual(sd_params.get('ISPORT'), "%d" % isport)
82 self.assertEqual(sd_params.get('XATYP'), 'IPv4')
83 self.assertEqual(sd_params.get('XSADDR'), xsaddr)
84 self.assertEqual(sd_params.get('XSPORT'), "%d" % xsport)
85 self.assertEqual(sd_params.get('PROTO'), "%d" % proto)
86 self.assertTrue(sd_params.get('SSUBIX') is not None)
87 self.assertEqual(sd_params.get('SV6ENC'), sv6enc)
88
89 def test_dslite(self):
90 """ Test DS-Lite """
91 nat_config = self.vapi.nat_show_config()
92 self.assertEqual(0, nat_config.dslite_ce)
93
94 self.vapi.dslite_add_del_pool_addr_range(start_addr=self.nat_addr,
95 end_addr=self.nat_addr,
96 is_add=1)
97 aftr_ip4 = '192.0.0.1'
98 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
99 self.vapi.dslite_set_aftr_addr(ip4_addr=aftr_ip4, ip6_addr=aftr_ip6)
100 self.vapi.syslog_set_sender(self.pg2.local_ip4, self.pg2.remote_ip4)
101
102 # UDP
103 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
104 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
105 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
106 UDP(sport=20000, dport=10000))
107 self.pg1.add_stream(p)
108 self.pg_enable_capture(self.pg_interfaces)
109 self.pg_start()
110 capture = self.pg0.get_capture(1)
111 capture = capture[0]
112 self.assertFalse(capture.haslayer(IPv6))
113 self.assertEqual(capture[IP].src, self.nat_addr)
114 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
115 self.assertNotEqual(capture[UDP].sport, 20000)
116 self.assertEqual(capture[UDP].dport, 10000)
117 self.assert_packet_checksums_valid(capture)
118 out_port = capture[UDP].sport
119 capture = self.pg2.get_capture(1)
120 self.verify_syslog_apmadd(capture[0][Raw].load, '192.168.1.1',
121 20000, self.nat_addr, out_port,
122 self.pg1.remote_hosts[0].ip6, IP_PROTOS.udp)
123
124 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
125 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
126 UDP(sport=10000, dport=out_port))
127 self.pg0.add_stream(p)
128 self.pg_enable_capture(self.pg_interfaces)
129 self.pg_start()
130 capture = self.pg1.get_capture(1)
131 capture = capture[0]
132 self.assertEqual(capture[IPv6].src, aftr_ip6)
133 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
134 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
135 self.assertEqual(capture[IP].dst, '192.168.1.1')
136 self.assertEqual(capture[UDP].sport, 10000)
137 self.assertEqual(capture[UDP].dport, 20000)
138 self.assert_packet_checksums_valid(capture)
139
140 # TCP
141 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
142 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
143 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
144 TCP(sport=20001, dport=10001))
145 self.pg1.add_stream(p)
146 self.pg_enable_capture(self.pg_interfaces)
147 self.pg_start()
148 capture = self.pg0.get_capture(1)
149 capture = capture[0]
150 self.assertFalse(capture.haslayer(IPv6))
151 self.assertEqual(capture[IP].src, self.nat_addr)
152 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
153 self.assertNotEqual(capture[TCP].sport, 20001)
154 self.assertEqual(capture[TCP].dport, 10001)
155 self.assert_packet_checksums_valid(capture)
156 out_port = capture[TCP].sport
157
158 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
159 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
160 TCP(sport=10001, dport=out_port))
161 self.pg0.add_stream(p)
162 self.pg_enable_capture(self.pg_interfaces)
163 self.pg_start()
164 capture = self.pg1.get_capture(1)
165 capture = capture[0]
166 self.assertEqual(capture[IPv6].src, aftr_ip6)
167 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
168 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
169 self.assertEqual(capture[IP].dst, '192.168.1.1')
170 self.assertEqual(capture[TCP].sport, 10001)
171 self.assertEqual(capture[TCP].dport, 20001)
172 self.assert_packet_checksums_valid(capture)
173
174 # ICMP
175 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
176 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
177 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
178 ICMP(id=4000, type='echo-request'))
179 self.pg1.add_stream(p)
180 self.pg_enable_capture(self.pg_interfaces)
181 self.pg_start()
182 capture = self.pg0.get_capture(1)
183 capture = capture[0]
184 self.assertFalse(capture.haslayer(IPv6))
185 self.assertEqual(capture[IP].src, self.nat_addr)
186 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
187 self.assertNotEqual(capture[ICMP].id, 4000)
188 self.assert_packet_checksums_valid(capture)
189 out_id = capture[ICMP].id
190
191 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
192 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
193 ICMP(id=out_id, type='echo-reply'))
194 self.pg0.add_stream(p)
195 self.pg_enable_capture(self.pg_interfaces)
196 self.pg_start()
197 capture = self.pg1.get_capture(1)
198 capture = capture[0]
199 self.assertEqual(capture[IPv6].src, aftr_ip6)
200 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
201 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
202 self.assertEqual(capture[IP].dst, '192.168.1.1')
203 self.assertEqual(capture[ICMP].id, 4000)
204 self.assert_packet_checksums_valid(capture)
205
206 # ping DS-Lite AFTR tunnel endpoint address
207 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
208 IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
209 ICMPv6EchoRequest())
210 self.pg1.add_stream(p)
211 self.pg_enable_capture(self.pg_interfaces)
212 self.pg_start()
213 capture = self.pg1.get_capture(1)
214 capture = capture[0]
215 self.assertEqual(capture[IPv6].src, aftr_ip6)
216 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
217 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
218
219 b4s = self.statistics.get_counter('/dslite/total-b4s')
220 self.assertEqual(b4s[0][0], 2)
221 sessions = self.statistics.get_counter('/dslite/total-sessions')
222 self.assertEqual(sessions[0][0], 3)
223
224 def tearDown(self):
225 super(TestDSlite, self).tearDown()
226
227 def show_commands_at_teardown(self):
228 self.logger.info(self.vapi.cli("show dslite pool"))
229 self.logger.info(
230 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
231 self.logger.info(self.vapi.cli("show dslite sessions"))
232
233
Vladimir Ratnikov958919f2020-04-13 06:36:19 -0400234class TestDSliteCE(VppTestCase):
Ole Troan2c6639c2019-12-19 11:55:54 +0100235 """ DS-Lite CE Test Cases """
236
237 @classmethod
238 def setUpConstants(cls):
239 super(TestDSliteCE, cls).setUpConstants()
Vladimir Ratnikov958919f2020-04-13 06:36:19 -0400240 cls.vpp_cmdline.extend(["dslite", "{", "ce", "}"])
Ole Troan2c6639c2019-12-19 11:55:54 +0100241
242 @classmethod
243 def setUpClass(cls):
244 super(TestDSliteCE, cls).setUpClass()
245
246 try:
247 cls.create_pg_interfaces(range(2))
248 cls.pg0.admin_up()
249 cls.pg0.config_ip4()
250 cls.pg0.resolve_arp()
251 cls.pg1.admin_up()
252 cls.pg1.config_ip6()
253 cls.pg1.generate_remote_hosts(1)
254 cls.pg1.configure_ipv6_neighbors()
255
256 except Exception:
257 super(TestDSliteCE, cls).tearDownClass()
258 raise
259
260 @classmethod
261 def tearDownClass(cls):
262 super(TestDSliteCE, cls).tearDownClass()
263
264 def test_dslite_ce(self):
265 """ Test DS-Lite CE """
266
Vladimir Ratnikov958919f2020-04-13 06:36:19 -0400267 # TODO: add message to retrieve dslite config
268 # nat_config = self.vapi.nat_show_config()
269 # self.assertEqual(1, nat_config.dslite_ce)
Ole Troan2c6639c2019-12-19 11:55:54 +0100270
271 b4_ip4 = '192.0.0.2'
272 b4_ip6 = '2001:db8:62aa::375e:f4c1:1'
273 self.vapi.dslite_set_b4_addr(ip4_addr=b4_ip4, ip6_addr=b4_ip6)
274
275 aftr_ip4 = '192.0.0.1'
276 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
277 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
278 self.vapi.dslite_set_aftr_addr(ip4_addr=aftr_ip4, ip6_addr=aftr_ip6)
279
280 r1 = VppIpRoute(self, aftr_ip6, 128,
281 [VppRoutePath(self.pg1.remote_ip6,
282 self.pg1.sw_if_index)])
283 r1.add_vpp_config()
284
285 # UDP encapsulation
286 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
287 IP(dst=self.pg1.remote_ip4, src=self.pg0.remote_ip4) /
288 UDP(sport=10000, dport=20000))
289 self.pg0.add_stream(p)
290 self.pg_enable_capture(self.pg_interfaces)
291 self.pg_start()
292 capture = self.pg1.get_capture(1)
293 capture = capture[0]
294 self.assertEqual(capture[IPv6].src, b4_ip6)
295 self.assertEqual(capture[IPv6].dst, aftr_ip6)
296 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
297 self.assertEqual(capture[IP].dst, self.pg1.remote_ip4)
298 self.assertEqual(capture[UDP].sport, 10000)
299 self.assertEqual(capture[UDP].dport, 20000)
300 self.assert_packet_checksums_valid(capture)
301
302 # UDP decapsulation
303 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
304 IPv6(dst=b4_ip6, src=aftr_ip6) /
305 IP(dst=self.pg0.remote_ip4, src=self.pg1.remote_ip4) /
306 UDP(sport=20000, dport=10000))
307 self.pg1.add_stream(p)
308 self.pg_enable_capture(self.pg_interfaces)
309 self.pg_start()
310 capture = self.pg0.get_capture(1)
311 capture = capture[0]
312 self.assertFalse(capture.haslayer(IPv6))
313 self.assertEqual(capture[IP].src, self.pg1.remote_ip4)
314 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
315 self.assertEqual(capture[UDP].sport, 20000)
316 self.assertEqual(capture[UDP].dport, 10000)
317 self.assert_packet_checksums_valid(capture)
318
319 # ping DS-Lite B4 tunnel endpoint address
320 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
321 IPv6(src=self.pg1.remote_hosts[0].ip6, dst=b4_ip6) /
322 ICMPv6EchoRequest())
323 self.pg1.add_stream(p)
324 self.pg_enable_capture(self.pg_interfaces)
325 self.pg_start()
326 capture = self.pg1.get_capture(1)
327 capture = capture[0]
328 self.assertEqual(capture[IPv6].src, b4_ip6)
329 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
330 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
331
332 def tearDown(self):
333 super(TestDSliteCE, self).tearDown()
334
335 def show_commands_at_teardown(self):
336 self.logger.info(
337 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
338 self.logger.info(
339 self.vapi.cli("show dslite b4-tunnel-endpoint-address"))