blob: 4cbcf6c5a0732c74dc1ea0f0b7b7c2014eab9ecb [file] [log] [blame]
Neale Ranns44db1ca2020-12-24 09:16:09 +00001#!/usr/bin/env python3
2
3import unittest
4
5from scapy.layers.inet import IP, UDP
6from scapy.layers.inet6 import IPv6, Raw
7from scapy.layers.l2 import Ether, ARP, Dot1Q
8
Neale Ranns89d939e2021-06-07 09:34:07 +00009from util import reassemble4
Neale Ranns44db1ca2020-12-24 09:16:09 +000010from vpp_object import VppObject
11from framework import VppTestCase, VppTestRunner
Neale Ranns89d939e2021-06-07 09:34:07 +000012from vpp_ipip_tun_interface import VppIpIpTunInterface
13from template_ipsec import TemplateIpsec, IpsecTun4Tests, \
14 IpsecTun4, mk_scapy_crypt_key, config_tun_params
15from template_ipsec import TemplateIpsec, IpsecTun4Tests, \
16 IpsecTun4, mk_scapy_crypt_key, config_tun_params
17from test_ipsec_tun_if_esp import TemplateIpsecItf4
18from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect, VppIpsecInterface
Neale Ranns44db1ca2020-12-24 09:16:09 +000019
20
21class VppLcpPair(VppObject):
22 def __init__(self, test, phy, host):
23 self._test = test
24 self.phy = phy
25 self.host = host
26
27 def add_vpp_config(self):
28 self._test.vapi.cli("test lcp add phy %s host %s" %
29 (self.phy, self.host))
30 self._test.registry.register(self, self._test.logger)
31 return self
32
33 def remove_vpp_config(self):
34 self._test.vapi.cli("test lcp del phy %s host %s" %
35 (self.phy, self.host))
36
37 def object_id(self):
38 return "lcp:%d:%d" % (self.phy.sw_if_index,
39 self.host.sw_if_index)
40
41 def query_vpp_config(self):
42 pairs = list(self._test.vapi.vpp.details_iter(
43 self._test.vapi.lcp_itf_pair_get))
44
45 for p in pairs:
46 if p.phy_sw_if_index == self.phy.sw_if_index and \
47 p.host_sw_if_index == self.host.sw_if_index:
48 return True
49 return False
50
51
52class TestLinuxCP(VppTestCase):
53 """ Linux Control Plane """
54
55 extra_vpp_plugin_config = ["plugin",
56 "linux_cp_plugin.so",
57 "{", "enable", "}",
58 "plugin",
59 "linux_cp_unittest_plugin.so",
60 "{", "enable", "}"]
61
62 @classmethod
63 def setUpClass(cls):
64 super(TestLinuxCP, cls).setUpClass()
65
66 @classmethod
67 def tearDownClass(cls):
68 super(TestLinuxCP, cls).tearDownClass()
69
70 def setUp(self):
71 super(TestLinuxCP, self).setUp()
72
Neale Ranns89d939e2021-06-07 09:34:07 +000073 # create 4 pg interfaces so we can create two pairs
Neale Ranns44db1ca2020-12-24 09:16:09 +000074 self.create_pg_interfaces(range(4))
75
Neale Ranns89d939e2021-06-07 09:34:07 +000076 # create on ip4 and one ip6 pg tun
77 self.pg_interfaces += self.create_pg_ip4_interfaces(range(4, 5))
78 self.pg_interfaces += self.create_pg_ip6_interfaces(range(5, 6))
79
Neale Ranns44db1ca2020-12-24 09:16:09 +000080 for i in self.pg_interfaces:
81 i.admin_up()
82
83 def tearDown(self):
84 for i in self.pg_interfaces:
85 i.admin_down()
86 super(TestLinuxCP, self).tearDown()
87
88 def test_linux_cp_tap(self):
89 """ Linux CP TAP """
90
91 #
92 # Setup
93 #
94
95 arp_opts = {"who-has": 1, "is-at": 2}
96
97 # create two pairs, wihch a bunch of hots on the phys
98 hosts = [self.pg0, self.pg1]
99 phys = [self.pg2, self.pg3]
100 N_HOSTS = 4
101
102 for phy in phys:
103 phy.config_ip4()
104 phy.generate_remote_hosts(4)
105 phy.configure_ipv4_neighbors()
106
107 pair1 = VppLcpPair(self, phys[0], hosts[0]).add_vpp_config()
108 pair2 = VppLcpPair(self, phys[1], hosts[1]).add_vpp_config()
109
110 self.logger.info(self.vapi.cli("sh lcp adj verbose"))
111 self.logger.info(self.vapi.cli("sh lcp"))
112
113 #
114 # Traffic Tests
115 #
116
117 # hosts to phys
118 for phy, host in zip(phys, hosts):
119 for j in range(N_HOSTS):
120 p = (Ether(src=phy.local_mac,
121 dst=phy.remote_hosts[j].mac) /
122 IP(src=phy.local_ip4,
123 dst=phy.remote_hosts[j].ip4) /
124 UDP(sport=1234, dport=1234) /
125 Raw())
126
127 rxs = self.send_and_expect(host, [p], phy)
128
129 # verify packet is unchanged
130 for rx in rxs:
131 self.assertEqual(p.show2(True), rx.show2(True))
132
133 # ARPs x-connect to phy
134 p = (Ether(dst="ff:ff:ff:ff:ff:ff",
135 src=phy.remote_hosts[j].mac) /
136 ARP(op="who-has",
137 hwdst=phy.remote_hosts[j].mac,
138 hwsrc=phy.local_mac,
139 psrc=phy.local_ip4,
140 pdst=phy.remote_hosts[j].ip4))
141
142 rxs = self.send_and_expect(host, [p], phy)
143
144 # verify packet is unchanged
145 for rx in rxs:
146 self.assertEqual(p.show2(True), rx.show2(True))
147
148 # phy to host
149 for phy, host in zip(phys, hosts):
150 for j in range(N_HOSTS):
151 p = (Ether(dst=phy.local_mac,
152 src=phy.remote_hosts[j].mac) /
153 IP(dst=phy.local_ip4,
154 src=phy.remote_hosts[j].ip4) /
155 UDP(sport=1234, dport=1234) /
156 Raw())
157
158 rxs = self.send_and_expect(phy, [p], host)
159
160 # verify packet is unchanged
161 for rx in rxs:
162 self.assertEqual(p.show2(True), rx.show2(True))
163
164 # ARPs rx'd on the phy are sent to the host
165 p = (Ether(dst="ff:ff:ff:ff:ff:ff",
166 src=phy.remote_hosts[j].mac) /
167 ARP(op="is-at",
168 hwsrc=phy.remote_hosts[j].mac,
169 hwdst=phy.local_mac,
170 pdst=phy.local_ip4,
171 psrc=phy.remote_hosts[j].ip4))
172
173 rxs = self.send_and_expect(phy, [p], host)
174
175 # verify packet is unchanged
176 for rx in rxs:
177 self.assertEqual(p.show2(True), rx.show2(True))
178
179 # cleanup
180 for phy in phys:
181 phy.unconfig_ip4()
182
Neale Ranns89d939e2021-06-07 09:34:07 +0000183 def test_linux_cp_tun(self):
184 """ Linux CP TUN """
185
186 #
187 # Setup
188 #
189 N_PKTS = 31
190
191 # create two pairs, wihch a bunch of hots on the phys
192 hosts = [self.pg4, self.pg5]
193 phy = self.pg2
194
195 phy.config_ip4()
196 phy.config_ip6()
197 phy.resolve_arp()
198 phy.resolve_ndp()
199
200 tun4 = VppIpIpTunInterface(
201 self,
202 phy,
203 phy.local_ip4,
204 phy.remote_ip4).add_vpp_config()
205 tun6 = VppIpIpTunInterface(
206 self,
207 phy,
208 phy.local_ip6,
209 phy.remote_ip6).add_vpp_config()
210 tuns = [tun4, tun6]
211
212 tun4.admin_up()
213 tun4.config_ip4()
214 tun6.admin_up()
215 tun6.config_ip6()
216
217 pair1 = VppLcpPair(self, tuns[0], hosts[0]).add_vpp_config()
218 pair2 = VppLcpPair(self, tuns[1], hosts[1]).add_vpp_config()
219
220 self.logger.info(self.vapi.cli("sh lcp adj verbose"))
221 self.logger.info(self.vapi.cli("sh lcp"))
222 self.logger.info(self.vapi.cli("sh ip punt redirect"))
223
224 #
225 # Traffic Tests
226 #
227
228 # host to phy for v4
229 p = (IP(src=tun4.local_ip4, dst="2.2.2.2") /
230 UDP(sport=1234, dport=1234) /
231 Raw())
232
233 rxs = self.send_and_expect(self.pg4, p * N_PKTS, phy)
234
235 # verify inner packet is unchanged and has the tunnel encap
236 for rx in rxs:
237 self.assertEqual(rx[Ether].dst, phy.remote_mac)
238 self.assertEqual(rx[IP].dst, phy.remote_ip4)
239 self.assertEqual(rx[IP].src, phy.local_ip4)
240 inner = IP(rx[IP].payload)
241 self.assertEqual(inner.src, tun4.local_ip4)
242 self.assertEqual(inner.dst, "2.2.2.2")
243
244 # host to phy for v6
245 p = (IPv6(src=tun6.local_ip6, dst="2::2") /
246 UDP(sport=1234, dport=1234) /
247 Raw())
248
249 rxs = self.send_and_expect(self.pg5, p * N_PKTS, phy)
250
251 # verify inner packet is unchanged and has the tunnel encap
252 for rx in rxs:
253 self.assertEqual(rx[IPv6].dst, phy.remote_ip6)
254 self.assertEqual(rx[IPv6].src, phy.local_ip6)
255 inner = IPv6(rx[IPv6].payload)
256 self.assertEqual(inner.src, tun6.local_ip6)
257 self.assertEqual(inner.dst, "2::2")
258
259 # phy to host v4
260 p = (Ether(dst=phy.local_mac, src=phy.remote_mac) /
261 IP(dst=phy.local_ip4, src=phy.remote_ip4) /
262 IP(dst=tun4.local_ip4, src=tun4.remote_ip4) /
263 UDP(sport=1234, dport=1234) /
264 Raw())
265
266 rxs = self.send_and_expect(phy, p * N_PKTS, self.pg4)
267 for rx in rxs:
268 rx = IP(rx)
269 self.assertEqual(rx[IP].dst, tun4.local_ip4)
270 self.assertEqual(rx[IP].src, tun4.remote_ip4)
271
272 # phy to host v6
273 p = (Ether(dst=phy.local_mac, src=phy.remote_mac) /
274 IPv6(dst=phy.local_ip6, src=phy.remote_ip6) /
275 IPv6(dst=tun6.local_ip6, src=tun6.remote_ip6) /
276 UDP(sport=1234, dport=1234) /
277 Raw())
278
279 rxs = self.send_and_expect(phy, p * N_PKTS, self.pg5)
280 for rx in rxs:
281 rx = IPv6(rx)
282 self.assertEqual(rx[IPv6].dst, tun6.local_ip6)
283 self.assertEqual(rx[IPv6].src, tun6.remote_ip6)
284
285 # cleanup
286 phy.unconfig_ip4()
287 phy.unconfig_ip6()
288
289 tun4.unconfig_ip4()
290 tun6.unconfig_ip6()
291
292
293class TestLinuxCPIpsec(TemplateIpsec,
294 TemplateIpsecItf4,
295 IpsecTun4):
296 """ IPsec Interface IPv4 """
297
298 extra_vpp_plugin_config = ["plugin",
299 "linux_cp_plugin.so",
300 "{", "enable", "}",
301 "plugin",
302 "linux_cp_unittest_plugin.so",
303 "{", "enable", "}"]
304
305 def setUp(self):
306 super(TestLinuxCPIpsec, self).setUp()
307
308 self.tun_if = self.pg0
309 self.pg_interfaces += self.create_pg_ip4_interfaces(range(3, 4))
310 self.pg_interfaces += self.create_pg_ip6_interfaces(range(4, 5))
311
312 def tearDown(self):
313 super(TestLinuxCPIpsec, self).tearDown()
314
315 def verify_encrypted(self, p, sa, rxs):
316 decrypt_pkts = []
317 for rx in rxs:
318 if p.nat_header:
319 self.assertEqual(rx[UDP].dport, 4500)
320 self.assert_packet_checksums_valid(rx)
321 self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
322 try:
323 rx_ip = rx[IP]
324 decrypt_pkt = p.vpp_tun_sa.decrypt(rx_ip)
325 if not decrypt_pkt.haslayer(IP):
326 decrypt_pkt = IP(decrypt_pkt[Raw].load)
327 if rx_ip.proto == socket.IPPROTO_ESP:
328 self.verify_esp_padding(sa, rx_ip[ESP].data, decrypt_pkt)
329 decrypt_pkts.append(decrypt_pkt)
330 self.assert_equal(decrypt_pkt.src, p.tun_if.local_ip4)
331 self.assert_equal(decrypt_pkt.dst, p.tun_if.remote_ip4)
332 except:
333 self.logger.debug(ppp("Unexpected packet:", rx))
334 try:
335 self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
336 except:
337 pass
338 raise
339 pkts = reassemble4(decrypt_pkts)
340 for pkt in pkts:
341 self.assert_packet_checksums_valid(pkt)
342
343 def verify_decrypted(self, p, rxs):
344 for rx in rxs:
345 rx = IP(rx)
346 self.assert_equal(rx[IP].src, p.tun_if.remote_ip4)
347 self.assert_equal(rx[IP].dst, p.tun_if.local_ip4)
348 self.assert_packet_checksums_valid(rx)
349
350 def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
351 payload_size=54):
352 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
353 sa.encrypt(IP(src=src, dst=dst) /
354 UDP(sport=1111, dport=2222) /
355 Raw(b'X' * payload_size))
356 for i in range(count)]
357
358 def test_linux_cp_ipsec4_tun(self):
359 """ Linux CP Ipsec TUN """
360
361 #
362 # Setup
363 #
364 N_PKTS = 31
365
366 # the pg that paris with the tunnel
367 self.host = self.pg3
368
369 # tunnel and protection setup
370 p = self.ipv4_params
371
372 self.config_network(p)
373 self.config_sa_tun(p,
374 self.pg0.local_ip4,
375 self.pg0.remote_ip4)
376 self.config_protect(p)
377
378 pair = VppLcpPair(self, p.tun_if, self.host).add_vpp_config()
379
380 self.logger.error(self.vapi.cli("sh int addr"))
381 self.logger.info(self.vapi.cli("sh lcp"))
382 self.logger.info(self.vapi.cli("sh ip punt redirect"))
383
384 #
385 # Traffic Tests
386 #
387
388 # host to phy for v4
389 pkt = (IP(src=p.tun_if.local_ip4,
390 dst=p.tun_if.remote_ip4) /
391 UDP(sport=1234, dport=1234) /
392 Raw())
393
394 rxs = self.send_and_expect(self.host, pkt * N_PKTS, self.tun_if)
395 self.verify_encrypted(p, p.vpp_tun_sa, rxs)
396
397 # phy to host for v4
398 pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
399 src=p.tun_if.remote_ip4,
400 dst=p.tun_if.local_ip4,
401 count=N_PKTS)
402 try:
403 rxs = self.send_and_expect(self.tun_if, pkts, self.host)
404 self.verify_decrypted(p, rxs)
405 finally:
406 self.logger.error(self.vapi.cli("sh trace"))
407
408 # cleanup
409 pair.remove_vpp_config()
410 self.unconfig_protect(p)
411 self.unconfig_sa(p)
412 self.unconfig_network(p)
413
Neale Ranns44db1ca2020-12-24 09:16:09 +0000414
415if __name__ == '__main__':
416 unittest.main(testRunner=VppTestRunner)