blob: 438a674977f77bab7f93a94a6e3bf336d0083d3d [file] [log] [blame]
Filip Tehlar12b517b2020-04-26 18:05:05 +00001import os
Filip Tehlar2008e312020-11-09 13:23:24 +00002import time
Filip Tehlarec112e52020-10-07 23:52:37 +00003from socket import inet_pton
Filip Tehlarbfeae8c2020-06-23 20:35:58 +00004from cryptography import x509
Filip Tehlar12b517b2020-04-26 18:05:05 +00005from cryptography.hazmat.backends import default_backend
6from cryptography.hazmat.primitives import hashes, hmac
Filip Tehlarbfeae8c2020-06-23 20:35:58 +00007from cryptography.hazmat.primitives.asymmetric import dh, padding
8from cryptography.hazmat.primitives.serialization import load_pem_private_key
Filip Tehlar12b517b2020-04-26 18:05:05 +00009from cryptography.hazmat.primitives.ciphers import (
10 Cipher,
11 algorithms,
12 modes,
13)
Filip Tehlar84962d12020-09-08 06:08:05 +000014from ipaddress import IPv4Address, IPv6Address, ip_address
Paul Vinciguerrae061dad2020-12-04 14:57:51 -050015import unittest
Filip Tehlarbfeae8c2020-06-23 20:35:58 +000016from scapy.layers.ipsec import ESP
Filip Tehlar12b517b2020-04-26 18:05:05 +000017from scapy.layers.inet import IP, UDP, Ether
Filip Tehlar84962d12020-09-08 06:08:05 +000018from scapy.layers.inet6 import IPv6
Filip Tehlar12b517b2020-04-26 18:05:05 +000019from scapy.packet import raw, Raw
20from scapy.utils import long_converter
Andrew Yourtchenko8dc0d482021-01-29 13:17:19 +000021from framework import tag_fixme_vpp_workers
Filip Tehlar12b517b2020-04-26 18:05:05 +000022from framework import VppTestCase, VppTestRunner
Filip Tehlarbfeae8c2020-06-23 20:35:58 +000023from vpp_ikev2 import Profile, IDType, AuthMethod
Filip Tehlar4f42a712020-07-01 08:56:59 +000024from vpp_papi import VppEnum
Filip Tehlar12b517b2020-04-26 18:05:05 +000025
Filip Tehlar84962d12020-09-08 06:08:05 +000026try:
27 text_type = unicode
28except NameError:
29 text_type = str
Filip Tehlar12b517b2020-04-26 18:05:05 +000030
31KEY_PAD = b"Key Pad for IKEv2"
Filip Tehlara7b963d2020-07-08 13:25:34 +000032SALT_SIZE = 4
33GCM_ICV_SIZE = 16
34GCM_IV_SIZE = 8
Filip Tehlar12b517b2020-04-26 18:05:05 +000035
36
37# defined in rfc3526
38# tuple structure is (p, g, key_len)
39DH = {
40 '2048MODPgr': (long_converter("""
41 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
42 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
43 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
44 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
45 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
46 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
47 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
48 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
49 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
50 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
Filip Tehlar4f42a712020-07-01 08:56:59 +000051 15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""), 2, 256),
52
53 '3072MODPgr': (long_converter("""
54 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
55 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
56 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
57 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
58 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
59 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
60 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
61 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
62 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
63 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
64 15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
65 ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
66 ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
67 F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
68 BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
69 43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""), 2, 384)
Filip Tehlar12b517b2020-04-26 18:05:05 +000070}
71
72
73class CryptoAlgo(object):
74 def __init__(self, name, cipher, mode):
75 self.name = name
76 self.cipher = cipher
77 self.mode = mode
78 if self.cipher is not None:
79 self.bs = self.cipher.block_size // 8
80
Filip Tehlara7b963d2020-07-08 13:25:34 +000081 if self.name == 'AES-GCM-16ICV':
82 self.iv_len = GCM_IV_SIZE
83 else:
84 self.iv_len = self.bs
Filip Tehlar12b517b2020-04-26 18:05:05 +000085
Filip Tehlara7b963d2020-07-08 13:25:34 +000086 def encrypt(self, data, key, aad=None):
87 iv = os.urandom(self.iv_len)
88 if aad is None:
89 encryptor = Cipher(self.cipher(key), self.mode(iv),
90 default_backend()).encryptor()
91 return iv + encryptor.update(data) + encryptor.finalize()
92 else:
93 salt = key[-SALT_SIZE:]
94 nonce = salt + iv
95 encryptor = Cipher(self.cipher(key[:-SALT_SIZE]), self.mode(nonce),
96 default_backend()).encryptor()
97 encryptor.authenticate_additional_data(aad)
98 data = encryptor.update(data) + encryptor.finalize()
99 data += encryptor.tag[:GCM_ICV_SIZE]
100 return iv + data
101
102 def decrypt(self, data, key, aad=None, icv=None):
103 if aad is None:
104 iv = data[:self.iv_len]
105 ct = data[self.iv_len:]
106 decryptor = Cipher(algorithms.AES(key),
107 self.mode(iv),
108 default_backend()).decryptor()
109 return decryptor.update(ct) + decryptor.finalize()
110 else:
111 salt = key[-SALT_SIZE:]
112 nonce = salt + data[:GCM_IV_SIZE]
113 ct = data[GCM_IV_SIZE:]
114 key = key[:-SALT_SIZE]
115 decryptor = Cipher(algorithms.AES(key),
116 self.mode(nonce, icv, len(icv)),
117 default_backend()).decryptor()
118 decryptor.authenticate_additional_data(aad)
Filip Tehlaredf29002020-10-10 04:39:11 +0000119 return decryptor.update(ct) + decryptor.finalize()
Filip Tehlar12b517b2020-04-26 18:05:05 +0000120
121 def pad(self, data):
122 pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
123 data = data + b'\x00' * (pad_len - 1)
Filip Tehlar558607d2020-07-16 07:25:56 +0000124 return data + bytes([pad_len - 1])
Filip Tehlar12b517b2020-04-26 18:05:05 +0000125
126
127class AuthAlgo(object):
128 def __init__(self, name, mac, mod, key_len, trunc_len=None):
129 self.name = name
130 self.mac = mac
131 self.mod = mod
132 self.key_len = key_len
133 self.trunc_len = trunc_len or key_len
134
135
136CRYPTO_ALGOS = {
137 'NULL': CryptoAlgo('NULL', cipher=None, mode=None),
138 'AES-CBC': CryptoAlgo('AES-CBC', cipher=algorithms.AES, mode=modes.CBC),
Filip Tehlar4f42a712020-07-01 08:56:59 +0000139 'AES-GCM-16ICV': CryptoAlgo('AES-GCM-16ICV', cipher=algorithms.AES,
140 mode=modes.GCM),
Filip Tehlar12b517b2020-04-26 18:05:05 +0000141}
142
143AUTH_ALGOS = {
144 'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
145 'HMAC-SHA1-96': AuthAlgo('HMAC-SHA1-96', hmac.HMAC, hashes.SHA1, 20, 12),
Filip Tehlar4f42a712020-07-01 08:56:59 +0000146 'SHA2-256-128': AuthAlgo('SHA2-256-128', hmac.HMAC, hashes.SHA256, 32, 16),
147 'SHA2-384-192': AuthAlgo('SHA2-384-192', hmac.HMAC, hashes.SHA256, 48, 24),
148 'SHA2-512-256': AuthAlgo('SHA2-512-256', hmac.HMAC, hashes.SHA256, 64, 32),
Filip Tehlar12b517b2020-04-26 18:05:05 +0000149}
150
151PRF_ALGOS = {
152 'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
153 'PRF_HMAC_SHA2_256': AuthAlgo('PRF_HMAC_SHA2_256', hmac.HMAC,
154 hashes.SHA256, 32),
155}
156
Filip Tehlar68ad6252020-10-30 05:28:11 +0000157CRYPTO_IDS = {
158 12: 'AES-CBC',
159 20: 'AES-GCM-16ICV',
160}
161
162INTEG_IDS = {
163 2: 'HMAC-SHA1-96',
164 12: 'SHA2-256-128',
165 13: 'SHA2-384-192',
166 14: 'SHA2-512-256',
167}
168
Filip Tehlar12b517b2020-04-26 18:05:05 +0000169
170class IKEv2ChildSA(object):
Filip Tehlar68ad6252020-10-30 05:28:11 +0000171 def __init__(self, local_ts, remote_ts, is_initiator):
172 spi = os.urandom(4)
173 if is_initiator:
174 self.ispi = spi
175 self.rspi = None
176 else:
177 self.rspi = spi
178 self.ispi = None
Filip Tehlar12b517b2020-04-26 18:05:05 +0000179 self.local_ts = local_ts
180 self.remote_ts = remote_ts
181
182
183class IKEv2SA(object):
Filip Tehlar84962d12020-09-08 06:08:05 +0000184 def __init__(self, test, is_initiator=True, i_id=None, r_id=None,
185 spi=b'\x01\x02\x03\x04\x05\x06\x07\x08', id_type='fqdn',
186 nonce=None, auth_data=None, local_ts=None, remote_ts=None,
Filip Tehlar027d8132020-12-04 17:38:11 +0000187 auth_method='shared-key', priv_key=None, i_natt=False,
188 r_natt=False, udp_encap=False):
Filip Tehlar67b8a7f2020-11-06 11:00:42 +0000189 self.udp_encap = udp_encap
Filip Tehlar027d8132020-12-04 17:38:11 +0000190 self.i_natt = i_natt
191 self.r_natt = r_natt
192 if i_natt or r_natt:
Filip Tehlarbfeae8c2020-06-23 20:35:58 +0000193 self.sport = 4500
194 self.dport = 4500
195 else:
196 self.sport = 500
197 self.dport = 500
Filip Tehlar558607d2020-07-16 07:25:56 +0000198 self.msg_id = 0
Filip Tehlar12b517b2020-04-26 18:05:05 +0000199 self.dh_params = None
200 self.test = test
Filip Tehlarbfeae8c2020-06-23 20:35:58 +0000201 self.priv_key = priv_key
Filip Tehlar12b517b2020-04-26 18:05:05 +0000202 self.is_initiator = is_initiator
203 nonce = nonce or os.urandom(32)
204 self.auth_data = auth_data
Filip Tehlar4128c7b2020-05-10 05:18:37 +0000205 self.i_id = i_id
206 self.r_id = r_id
Filip Tehlar12b517b2020-04-26 18:05:05 +0000207 if isinstance(id_type, str):
208 self.id_type = IDType.value(id_type)
209 else:
210 self.id_type = id_type
211 self.auth_method = auth_method
212 if self.is_initiator:
Filip Tehlar84962d12020-09-08 06:08:05 +0000213 self.rspi = 8 * b'\x00'
Filip Tehlar12b517b2020-04-26 18:05:05 +0000214 self.ispi = spi
Filip Tehlar12b517b2020-04-26 18:05:05 +0000215 self.i_nonce = nonce
216 else:
217 self.rspi = spi
Filip Tehlar84962d12020-09-08 06:08:05 +0000218 self.ispi = 8 * b'\x00'
Filip Tehlare7c83962020-09-23 11:20:12 +0000219 self.r_nonce = nonce
Filip Tehlar68ad6252020-10-30 05:28:11 +0000220 self.child_sas = [IKEv2ChildSA(local_ts, remote_ts,
221 self.is_initiator)]
Filip Tehlar12b517b2020-04-26 18:05:05 +0000222
Filip Tehlar558607d2020-07-16 07:25:56 +0000223 def new_msg_id(self):
224 self.msg_id += 1
225 return self.msg_id
226
Filip Tehlare7c83962020-09-23 11:20:12 +0000227 @property
228 def my_dh_pub_key(self):
229 if self.is_initiator:
230 return self.i_dh_data
231 return self.r_dh_data
232
233 @property
234 def peer_dh_pub_key(self):
235 if self.is_initiator:
236 return self.r_dh_data
Filip Tehlar12b517b2020-04-26 18:05:05 +0000237 return self.i_dh_data
238
Filip Tehlar027d8132020-12-04 17:38:11 +0000239 @property
240 def natt(self):
241 return self.i_natt or self.r_natt
242
Filip Tehlar12b517b2020-04-26 18:05:05 +0000243 def compute_secret(self):
244 priv = self.dh_private_key
Filip Tehlare7c83962020-09-23 11:20:12 +0000245 peer = self.peer_dh_pub_key
Filip Tehlar12b517b2020-04-26 18:05:05 +0000246 p, g, l = self.ike_group
247 return pow(int.from_bytes(peer, 'big'),
248 int.from_bytes(priv, 'big'), p).to_bytes(l, 'big')
249
250 def generate_dh_data(self):
251 # generate DH keys
Filip Tehlare7c83962020-09-23 11:20:12 +0000252 if self.ike_dh not in DH:
253 raise NotImplementedError('%s not in DH group' % self.ike_dh)
254
255 if self.dh_params is None:
256 dhg = DH[self.ike_dh]
257 pn = dh.DHParameterNumbers(dhg[0], dhg[1])
258 self.dh_params = pn.parameters(default_backend())
259
260 priv = self.dh_params.generate_private_key()
261 pub = priv.public_key()
262 x = priv.private_numbers().x
263 self.dh_private_key = x.to_bytes(priv.key_size // 8, 'big')
264 y = pub.public_numbers().y
265
Filip Tehlar12b517b2020-04-26 18:05:05 +0000266 if self.is_initiator:
Filip Tehlar12b517b2020-04-26 18:05:05 +0000267 self.i_dh_data = y.to_bytes(pub.key_size // 8, 'big')
Filip Tehlare7c83962020-09-23 11:20:12 +0000268 else:
269 self.r_dh_data = y.to_bytes(pub.key_size // 8, 'big')
Filip Tehlar12b517b2020-04-26 18:05:05 +0000270
271 def complete_dh_data(self):
272 self.dh_shared_secret = self.compute_secret()
273
274 def calc_child_keys(self):
275 prf = self.ike_prf_alg.mod()
276 s = self.i_nonce + self.r_nonce
277 c = self.child_sas[0]
278
279 encr_key_len = self.esp_crypto_key_len
Filip Tehlar4f42a712020-07-01 08:56:59 +0000280 integ_key_len = self.esp_integ_alg.key_len
281 salt_len = 0 if integ_key_len else 4
282
Filip Tehlar12b517b2020-04-26 18:05:05 +0000283 l = (integ_key_len * 2 +
Filip Tehlar4f42a712020-07-01 08:56:59 +0000284 encr_key_len * 2 +
285 salt_len * 2)
Filip Tehlar12b517b2020-04-26 18:05:05 +0000286 keymat = self.calc_prfplus(prf, self.sk_d, s, l)
287
288 pos = 0
289 c.sk_ei = keymat[pos:pos+encr_key_len]
290 pos += encr_key_len
291
Filip Tehlar4f42a712020-07-01 08:56:59 +0000292 if integ_key_len:
293 c.sk_ai = keymat[pos:pos+integ_key_len]
294 pos += integ_key_len
295 else:
296 c.salt_ei = keymat[pos:pos+salt_len]
297 pos += salt_len
Filip Tehlar12b517b2020-04-26 18:05:05 +0000298
299 c.sk_er = keymat[pos:pos+encr_key_len]
300 pos += encr_key_len
301
Filip Tehlar4f42a712020-07-01 08:56:59 +0000302 if integ_key_len:
303 c.sk_ar = keymat[pos:pos+integ_key_len]
304 pos += integ_key_len
305 else:
306 c.salt_er = keymat[pos:pos+salt_len]
307 pos += salt_len
Filip Tehlar12b517b2020-04-26 18:05:05 +0000308
309 def calc_prfplus(self, prf, key, seed, length):
310 r = b''
311 t = None
312 x = 1
313 while len(r) < length and x < 255:
314 if t is not None:
315 s = t
316 else:
317 s = b''
318 s = s + seed + bytes([x])
319 t = self.calc_prf(prf, key, s)
320 r = r + t
321 x = x + 1
322
323 if x == 255:
324 return None
325 return r
326
327 def calc_prf(self, prf, key, data):
Filip Tehlara7b963d2020-07-08 13:25:34 +0000328 h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
Filip Tehlar12b517b2020-04-26 18:05:05 +0000329 h.update(data)
330 return h.finalize()
331
332 def calc_keys(self):
333 prf = self.ike_prf_alg.mod()
334 # SKEYSEED = prf(Ni | Nr, g^ir)
335 s = self.i_nonce + self.r_nonce
336 self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
337
338 # calculate S = Ni | Nr | SPIi SPIr
339 s = s + self.ispi + self.rspi
340
341 prf_key_trunc = self.ike_prf_alg.trunc_len
342 encr_key_len = self.ike_crypto_key_len
343 tr_prf_key_len = self.ike_prf_alg.key_len
344 integ_key_len = self.ike_integ_alg.key_len
Filip Tehlara7b963d2020-07-08 13:25:34 +0000345 if integ_key_len == 0:
346 salt_size = 4
347 else:
348 salt_size = 0
349
Filip Tehlar12b517b2020-04-26 18:05:05 +0000350 l = (prf_key_trunc +
351 integ_key_len * 2 +
352 encr_key_len * 2 +
Filip Tehlara7b963d2020-07-08 13:25:34 +0000353 tr_prf_key_len * 2 +
354 salt_size * 2)
Filip Tehlar12b517b2020-04-26 18:05:05 +0000355 keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
356
357 pos = 0
358 self.sk_d = keymat[:pos+prf_key_trunc]
359 pos += prf_key_trunc
360
361 self.sk_ai = keymat[pos:pos+integ_key_len]
362 pos += integ_key_len
363 self.sk_ar = keymat[pos:pos+integ_key_len]
364 pos += integ_key_len
365
Filip Tehlara7b963d2020-07-08 13:25:34 +0000366 self.sk_ei = keymat[pos:pos+encr_key_len + salt_size]
367 pos += encr_key_len + salt_size
368 self.sk_er = keymat[pos:pos+encr_key_len + salt_size]
369 pos += encr_key_len + salt_size
Filip Tehlar12b517b2020-04-26 18:05:05 +0000370
371 self.sk_pi = keymat[pos:pos+tr_prf_key_len]
372 pos += tr_prf_key_len
373 self.sk_pr = keymat[pos:pos+tr_prf_key_len]
374
375 def generate_authmsg(self, prf, packet):
376 if self.is_initiator:
377 id = self.i_id
378 nonce = self.r_nonce
379 key = self.sk_pi
Filip Tehlare7c83962020-09-23 11:20:12 +0000380 else:
381 id = self.r_id
382 nonce = self.i_nonce
383 key = self.sk_pr
Filip Tehlar12b517b2020-04-26 18:05:05 +0000384 data = bytes([self.id_type, 0, 0, 0]) + id
385 id_hash = self.calc_prf(prf, key, data)
386 return packet + nonce + id_hash
387
388 def auth_init(self):
389 prf = self.ike_prf_alg.mod()
Filip Tehlare7c83962020-09-23 11:20:12 +0000390 if self.is_initiator:
391 packet = self.init_req_packet
392 else:
393 packet = self.init_resp_packet
394 authmsg = self.generate_authmsg(prf, raw(packet))
Filip Tehlarbfeae8c2020-06-23 20:35:58 +0000395 if self.auth_method == 'shared-key':
396 psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
397 self.auth_data = self.calc_prf(prf, psk, authmsg)
398 elif self.auth_method == 'rsa-sig':
399 self.auth_data = self.priv_key.sign(authmsg, padding.PKCS1v15(),
400 hashes.SHA1())
401 else:
402 raise TypeError('unknown auth method type!')
Filip Tehlar12b517b2020-04-26 18:05:05 +0000403
Filip Tehlara7b963d2020-07-08 13:25:34 +0000404 def encrypt(self, data, aad=None):
Filip Tehlar12b517b2020-04-26 18:05:05 +0000405 data = self.ike_crypto_alg.pad(data)
Filip Tehlara7b963d2020-07-08 13:25:34 +0000406 return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
Filip Tehlar12b517b2020-04-26 18:05:05 +0000407
408 @property
409 def peer_authkey(self):
410 if self.is_initiator:
411 return self.sk_ar
412 return self.sk_ai
413
414 @property
415 def my_authkey(self):
416 if self.is_initiator:
417 return self.sk_ai
418 return self.sk_ar
419
420 @property
421 def my_cryptokey(self):
422 if self.is_initiator:
423 return self.sk_ei
424 return self.sk_er
425
426 @property
427 def peer_cryptokey(self):
428 if self.is_initiator:
429 return self.sk_er
430 return self.sk_ei
431
Filip Tehlar4f42a712020-07-01 08:56:59 +0000432 def concat(self, alg, key_len):
433 return alg + '-' + str(key_len * 8)
434
435 @property
436 def vpp_ike_cypto_alg(self):
437 return self.concat(self.ike_crypto, self.ike_crypto_key_len)
438
439 @property
440 def vpp_esp_cypto_alg(self):
441 return self.concat(self.esp_crypto, self.esp_crypto_key_len)
442
Filip Tehlar12b517b2020-04-26 18:05:05 +0000443 def verify_hmac(self, ikemsg):
444 integ_trunc = self.ike_integ_alg.trunc_len
445 exp_hmac = ikemsg[-integ_trunc:]
446 data = ikemsg[:-integ_trunc]
447 computed_hmac = self.compute_hmac(self.ike_integ_alg.mod(),
448 self.peer_authkey, data)
449 self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
450
451 def compute_hmac(self, integ, key, data):
452 h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
453 h.update(data)
454 return h.finalize()
455
Filip Tehlara7b963d2020-07-08 13:25:34 +0000456 def decrypt(self, data, aad=None, icv=None):
457 return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
Filip Tehlar12b517b2020-04-26 18:05:05 +0000458
459 def hmac_and_decrypt(self, ike):
460 ep = ike[ikev2.IKEv2_payload_Encrypted]
Filip Tehlara7b963d2020-07-08 13:25:34 +0000461 if self.ike_crypto == 'AES-GCM-16ICV':
462 aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
463 ct = ep.load[:-GCM_ICV_SIZE]
464 tag = ep.load[-GCM_ICV_SIZE:]
Filip Tehlaredf29002020-10-10 04:39:11 +0000465 plain = self.decrypt(ct, raw(ike)[:aad_len], tag)
Filip Tehlara7b963d2020-07-08 13:25:34 +0000466 else:
467 self.verify_hmac(raw(ike))
468 integ_trunc = self.ike_integ_alg.trunc_len
Filip Tehlar12b517b2020-04-26 18:05:05 +0000469
Filip Tehlara7b963d2020-07-08 13:25:34 +0000470 # remove ICV and decrypt payload
471 ct = ep.load[:-integ_trunc]
Filip Tehlaredf29002020-10-10 04:39:11 +0000472 plain = self.decrypt(ct)
473 # remove padding
474 pad_len = plain[-1]
475 return plain[:-pad_len - 1]
Filip Tehlar12b517b2020-04-26 18:05:05 +0000476
Filip Tehlar84962d12020-09-08 06:08:05 +0000477 def build_ts_addr(self, ts, version):
478 return {'starting_address_v' + version: ts['start_addr'],
479 'ending_address_v' + version: ts['end_addr']}
480
481 def generate_ts(self, is_ip4):
Filip Tehlar12b517b2020-04-26 18:05:05 +0000482 c = self.child_sas[0]
Filip Tehlar84962d12020-09-08 06:08:05 +0000483 ts_data = {'IP_protocol_ID': 0,
484 'start_port': 0,
485 'end_port': 0xffff}
486 if is_ip4:
487 ts_data.update(self.build_ts_addr(c.local_ts, '4'))
488 ts1 = ikev2.IPv4TrafficSelector(**ts_data)
489 ts_data.update(self.build_ts_addr(c.remote_ts, '4'))
490 ts2 = ikev2.IPv4TrafficSelector(**ts_data)
491 else:
492 ts_data.update(self.build_ts_addr(c.local_ts, '6'))
493 ts1 = ikev2.IPv6TrafficSelector(**ts_data)
494 ts_data.update(self.build_ts_addr(c.remote_ts, '6'))
495 ts2 = ikev2.IPv6TrafficSelector(**ts_data)
Filip Tehlare7c83962020-09-23 11:20:12 +0000496
497 if self.is_initiator:
498 return ([ts1], [ts2])
499 return ([ts2], [ts1])
Filip Tehlar12b517b2020-04-26 18:05:05 +0000500
501 def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
502 if crypto not in CRYPTO_ALGOS:
503 raise TypeError('unsupported encryption algo %r' % crypto)
504 self.ike_crypto = crypto
505 self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
506 self.ike_crypto_key_len = crypto_key_len
507
508 if integ not in AUTH_ALGOS:
509 raise TypeError('unsupported auth algo %r' % integ)
Filip Tehlara7b963d2020-07-08 13:25:34 +0000510 self.ike_integ = None if integ == 'NULL' else integ
Filip Tehlar12b517b2020-04-26 18:05:05 +0000511 self.ike_integ_alg = AUTH_ALGOS[integ]
512
513 if prf not in PRF_ALGOS:
514 raise TypeError('unsupported prf algo %r' % prf)
515 self.ike_prf = prf
516 self.ike_prf_alg = PRF_ALGOS[prf]
517 self.ike_dh = dh
518 self.ike_group = DH[self.ike_dh]
519
520 def set_esp_props(self, crypto, crypto_key_len, integ):
521 self.esp_crypto_key_len = crypto_key_len
522 if crypto not in CRYPTO_ALGOS:
523 raise TypeError('unsupported encryption algo %r' % crypto)
524 self.esp_crypto = crypto
525 self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
526
527 if integ not in AUTH_ALGOS:
528 raise TypeError('unsupported auth algo %r' % integ)
Filip Tehlar4f42a712020-07-01 08:56:59 +0000529 self.esp_integ = None if integ == 'NULL' else integ
Filip Tehlar12b517b2020-04-26 18:05:05 +0000530 self.esp_integ_alg = AUTH_ALGOS[integ]
531
532 def crypto_attr(self, key_len):
Filip Tehlara7b963d2020-07-08 13:25:34 +0000533 if self.ike_crypto in ['AES-CBC', 'AES-GCM-16ICV']:
Filip Tehlar12b517b2020-04-26 18:05:05 +0000534 return (0x800e << 16 | key_len << 3, 12)
535 else:
536 raise Exception('unsupported attribute type')
537
538 def ike_crypto_attr(self):
539 return self.crypto_attr(self.ike_crypto_key_len)
540
541 def esp_crypto_attr(self):
542 return self.crypto_attr(self.esp_crypto_key_len)
543
Filip Tehlarec112e52020-10-07 23:52:37 +0000544 def compute_nat_sha1(self, ip, port, rspi=None):
545 if rspi is None:
546 rspi = self.rspi
547 data = self.ispi + rspi + ip + (port).to_bytes(2, 'big')
Filip Tehlarbfeae8c2020-06-23 20:35:58 +0000548 digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
549 digest.update(data)
550 return digest.finalize()
Filip Tehlar12b517b2020-04-26 18:05:05 +0000551
Filip Tehlarbfeae8c2020-06-23 20:35:58 +0000552
Filip Tehlare7c83962020-09-23 11:20:12 +0000553class IkePeer(VppTestCase):
554 """ common class for initiator and responder """
Filip Tehlar12b517b2020-04-26 18:05:05 +0000555
556 @classmethod
557 def setUpClass(cls):
558 import scapy.contrib.ikev2 as _ikev2
559 globals()['ikev2'] = _ikev2
Filip Tehlare7c83962020-09-23 11:20:12 +0000560 super(IkePeer, cls).setUpClass()
Filip Tehlar12b517b2020-04-26 18:05:05 +0000561 cls.create_pg_interfaces(range(2))
562 for i in cls.pg_interfaces:
563 i.admin_up()
564 i.config_ip4()
565 i.resolve_arp()
Filip Tehlar84962d12020-09-08 06:08:05 +0000566 i.config_ip6()
567 i.resolve_ndp()
Filip Tehlar12b517b2020-04-26 18:05:05 +0000568
569 @classmethod
570 def tearDownClass(cls):
Filip Tehlare7c83962020-09-23 11:20:12 +0000571 super(IkePeer, cls).tearDownClass()
Filip Tehlar12b517b2020-04-26 18:05:05 +0000572
Filip Tehlaredf29002020-10-10 04:39:11 +0000573 def tearDown(self):
574 super(IkePeer, self).tearDown()
575 if self.del_sa_from_responder:
576 self.initiate_del_sa_from_responder()
577 else:
578 self.initiate_del_sa_from_initiator()
579 r = self.vapi.ikev2_sa_dump()
580 self.assertEqual(len(r), 0)
581 sas = self.vapi.ipsec_sa_dump()
582 self.assertEqual(len(sas), 0)
583 self.p.remove_vpp_config()
584 self.assertIsNone(self.p.query_vpp_config())
585
Filip Tehlar12b517b2020-04-26 18:05:05 +0000586 def setUp(self):
Filip Tehlare7c83962020-09-23 11:20:12 +0000587 super(IkePeer, self).setUp()
Filip Tehlar12b517b2020-04-26 18:05:05 +0000588 self.config_tc()
Filip Tehlar12b517b2020-04-26 18:05:05 +0000589 self.p.add_vpp_config()
Filip Tehlar459d17b2020-07-06 15:40:08 +0000590 self.assertIsNotNone(self.p.query_vpp_config())
Filip Tehlar68ad6252020-10-30 05:28:11 +0000591 if self.sa.is_initiator:
592 self.sa.generate_dh_data()
Filip Tehlar84962d12020-09-08 06:08:05 +0000593 self.vapi.cli('ikev2 set logging level 4')
594 self.vapi.cli('event-lo clear')
Filip Tehlar12b517b2020-04-26 18:05:05 +0000595
Filip Tehlarfab5e7f2021-01-14 13:32:01 +0000596 def assert_counter(self, count, name, version='ip4'):
597 node_name = '/err/ikev2-%s/' % version + name
598 self.assertEqual(count, self.statistics.get_err_counter(node_name))
599
Filip Tehlar38340fa2020-11-19 21:34:48 +0000600 def create_rekey_request(self):
601 sa, first_payload = self.generate_auth_payload(is_rekey=True)
602 header = ikev2.IKEv2(
603 init_SPI=self.sa.ispi,
604 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
605 flags='Initiator', exch_type='CREATE_CHILD_SA')
606
607 ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
608 return self.create_packet(self.pg0, ike_msg, self.sa.sport,
609 self.sa.dport, self.sa.natt, self.ip6)
610
611 def create_empty_request(self):
612 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
613 id=self.sa.new_msg_id(), flags='Initiator',
614 exch_type='INFORMATIONAL',
615 next_payload='Encrypted')
616
617 msg = self.encrypt_ike_msg(header, b'', None)
618 return self.create_packet(self.pg0, msg, self.sa.sport,
619 self.sa.dport, self.sa.natt, self.ip6)
620
Filip Tehlar84962d12020-09-08 06:08:05 +0000621 def create_packet(self, src_if, msg, sport=500, dport=500, natt=False,
622 use_ip6=False):
623 if use_ip6:
624 src_ip = src_if.remote_ip6
625 dst_ip = src_if.local_ip6
626 ip_layer = IPv6
627 else:
628 src_ip = src_if.remote_ip4
629 dst_ip = src_if.local_ip4
630 ip_layer = IP
Filip Tehlarbfeae8c2020-06-23 20:35:58 +0000631 res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
Filip Tehlar84962d12020-09-08 06:08:05 +0000632 ip_layer(src=src_ip, dst=dst_ip) /
Filip Tehlarbfeae8c2020-06-23 20:35:58 +0000633 UDP(sport=sport, dport=dport))
634 if natt:
635 # insert non ESP marker
636 res = res / Raw(b'\x00' * 4)
637 return res / msg
Filip Tehlar12b517b2020-04-26 18:05:05 +0000638
Filip Tehlare7c83962020-09-23 11:20:12 +0000639 def verify_udp(self, udp):
640 self.assertEqual(udp.sport, self.sa.sport)
641 self.assertEqual(udp.dport, self.sa.dport)
Filip Tehlar12b517b2020-04-26 18:05:05 +0000642
Filip Tehlare7c83962020-09-23 11:20:12 +0000643 def get_ike_header(self, packet):
644 try:
645 ih = packet[ikev2.IKEv2]
Filip Tehlar18107c92020-12-01 14:51:09 +0000646 ih = self.verify_and_remove_non_esp_marker(ih)
Filip Tehlare7c83962020-09-23 11:20:12 +0000647 except IndexError as e:
648 # this is a workaround for getting IKEv2 layer as both ikev2 and
649 # ipsec register for port 4500
650 esp = packet[ESP]
651 ih = self.verify_and_remove_non_esp_marker(esp)
652 self.assertEqual(ih.version, 0x20)
Filip Tehlaredf29002020-10-10 04:39:11 +0000653 self.assertNotIn('Version', ih.flags)
Filip Tehlare7c83962020-09-23 11:20:12 +0000654 return ih
Filip Tehlar12b517b2020-04-26 18:05:05 +0000655
Filip Tehlare7c83962020-09-23 11:20:12 +0000656 def verify_and_remove_non_esp_marker(self, packet):
657 if self.sa.natt:
658 # if we are in nat traversal mode check for non esp marker
659 # and remove it
660 data = raw(packet)
661 self.assertEqual(data[:4], b'\x00' * 4)
662 return ikev2.IKEv2(data[4:])
Filip Tehlarbfeae8c2020-06-23 20:35:58 +0000663 else:
Filip Tehlare7c83962020-09-23 11:20:12 +0000664 return packet
Filip Tehlar12b517b2020-04-26 18:05:05 +0000665
Filip Tehlar558607d2020-07-16 07:25:56 +0000666 def encrypt_ike_msg(self, header, plain, first_payload):
667 if self.sa.ike_crypto == 'AES-GCM-16ICV':
668 data = self.sa.ike_crypto_alg.pad(raw(plain))
669 plen = len(data) + GCM_IV_SIZE + GCM_ICV_SIZE +\
670 len(ikev2.IKEv2_payload_Encrypted())
671 tlen = plen + len(ikev2.IKEv2())
672
673 # prepare aad data
674 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
675 length=plen)
676 header.length = tlen
677 res = header / sk_p
678 encr = self.sa.encrypt(raw(plain), raw(res))
679 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
680 length=plen, load=encr)
681 res = header / sk_p
682 else:
683 encr = self.sa.encrypt(raw(plain))
684 trunc_len = self.sa.ike_integ_alg.trunc_len
685 plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
686 tlen = plen + len(ikev2.IKEv2())
687
688 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
689 length=plen, load=encr)
690 header.length = tlen
691 res = header / sk_p
692
693 integ_data = raw(res)
694 hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
695 self.sa.my_authkey, integ_data)
696 res = res / Raw(hmac_data[:trunc_len])
697 assert(len(res) == tlen)
698 return res
699
Filip Tehlar67b8a7f2020-11-06 11:00:42 +0000700 def verify_udp_encap(self, ipsec_sa):
701 e = VppEnum.vl_api_ipsec_sad_flags_t
702 if self.sa.udp_encap or self.sa.natt:
703 self.assertIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
704 else:
705 self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
706
Filip Tehlar68ad6252020-10-30 05:28:11 +0000707 def verify_ipsec_sas(self, is_rekey=False):
Filip Tehlar12b517b2020-04-26 18:05:05 +0000708 sas = self.vapi.ipsec_sa_dump()
Filip Tehlar68ad6252020-10-30 05:28:11 +0000709 if is_rekey:
710 # after rekey there is a short period of time in which old
711 # inbound SA is still present
712 sa_count = 3
713 else:
714 sa_count = 2
715 self.assertEqual(len(sas), sa_count)
Filip Tehlare7c83962020-09-23 11:20:12 +0000716 if self.sa.is_initiator:
Filip Tehlar68ad6252020-10-30 05:28:11 +0000717 if is_rekey:
718 sa0 = sas[0].entry
719 sa1 = sas[2].entry
720 else:
721 sa0 = sas[0].entry
722 sa1 = sas[1].entry
Filip Tehlare7c83962020-09-23 11:20:12 +0000723 else:
Filip Tehlar68ad6252020-10-30 05:28:11 +0000724 if is_rekey:
725 sa0 = sas[2].entry
726 sa1 = sas[0].entry
727 else:
728 sa1 = sas[0].entry
729 sa0 = sas[1].entry
Filip Tehlare7c83962020-09-23 11:20:12 +0000730
Filip Tehlar12b517b2020-04-26 18:05:05 +0000731 c = self.sa.child_sas[0]
732
Filip Tehlar67b8a7f2020-11-06 11:00:42 +0000733 self.verify_udp_encap(sa0)
734 self.verify_udp_encap(sa1)
Filip Tehlar4f42a712020-07-01 08:56:59 +0000735 vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
736 self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
737 self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
738
739 if self.sa.esp_integ is None:
740 vpp_integ_alg = 0
741 else:
742 vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
743 self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
744 self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
745
Filip Tehlar12b517b2020-04-26 18:05:05 +0000746 # verify crypto keys
747 self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
748 self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
749 self.assertEqual(sa0.crypto_key.data[:len(c.sk_er)], c.sk_er)
750 self.assertEqual(sa1.crypto_key.data[:len(c.sk_ei)], c.sk_ei)
751
752 # verify integ keys
Filip Tehlar4f42a712020-07-01 08:56:59 +0000753 if vpp_integ_alg:
754 self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
755 self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
756 self.assertEqual(sa0.integrity_key.data[:len(c.sk_ar)], c.sk_ar)
757 self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai)
758 else:
759 self.assertEqual(sa0.salt.to_bytes(4, 'little'), c.salt_er)
760 self.assertEqual(sa1.salt.to_bytes(4, 'little'), c.salt_ei)
Filip Tehlar12b517b2020-04-26 18:05:05 +0000761
jan_cavojskya340fe12020-07-08 09:24:12 +0200762 def verify_keymat(self, api_keys, keys, name):
763 km = getattr(keys, name)
764 api_km = getattr(api_keys, name)
765 api_km_len = getattr(api_keys, name + '_len')
766 self.assertEqual(len(km), api_km_len)
767 self.assertEqual(km, api_km[:api_km_len])
768
769 def verify_id(self, api_id, exp_id):
770 self.assertEqual(api_id.type, IDType.value(exp_id.type))
771 self.assertEqual(api_id.data_len, exp_id.data_len)
772 self.assertEqual(bytes(api_id.data, 'ascii'), exp_id.type)
773
774 def verify_ike_sas(self):
775 r = self.vapi.ikev2_sa_dump()
776 self.assertEqual(len(r), 1)
777 sa = r[0].sa
Filip Tehlar84962d12020-09-08 06:08:05 +0000778 self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, 'big'))
jan_cavojskya340fe12020-07-08 09:24:12 +0200779 self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, 'big'))
Filip Tehlar84962d12020-09-08 06:08:05 +0000780 if self.ip6:
Filip Tehlare7c83962020-09-23 11:20:12 +0000781 if self.sa.is_initiator:
782 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
783 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
784 else:
785 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
786 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
Filip Tehlar84962d12020-09-08 06:08:05 +0000787 else:
Filip Tehlare7c83962020-09-23 11:20:12 +0000788 if self.sa.is_initiator:
789 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
790 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
791 else:
792 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
793 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
jan_cavojskya340fe12020-07-08 09:24:12 +0200794 self.verify_keymat(sa.keys, self.sa, 'sk_d')
795 self.verify_keymat(sa.keys, self.sa, 'sk_ai')
796 self.verify_keymat(sa.keys, self.sa, 'sk_ar')
797 self.verify_keymat(sa.keys, self.sa, 'sk_ei')
798 self.verify_keymat(sa.keys, self.sa, 'sk_er')
799 self.verify_keymat(sa.keys, self.sa, 'sk_pi')
800 self.verify_keymat(sa.keys, self.sa, 'sk_pr')
801
802 self.assertEqual(sa.i_id.type, self.sa.id_type)
803 self.assertEqual(sa.r_id.type, self.sa.id_type)
804 self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
Benoît Gannec7cceee2021-09-28 11:19:37 +0200805 self.assertEqual(sa.r_id.data_len, len(self.idr))
jan_cavojskya340fe12020-07-08 09:24:12 +0200806 self.assertEqual(bytes(sa.i_id.data, 'ascii'), self.sa.i_id)
Benoît Gannec7cceee2021-09-28 11:19:37 +0200807 self.assertEqual(bytes(sa.r_id.data, 'ascii'), self.idr)
jan_cavojskya340fe12020-07-08 09:24:12 +0200808
809 r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
810 self.assertEqual(len(r), 1)
811 csa = r[0].child_sa
812 self.assertEqual(csa.sa_index, sa.sa_index)
813 c = self.sa.child_sas[0]
814 if hasattr(c, 'sk_ai'):
815 self.verify_keymat(csa.keys, c, 'sk_ai')
816 self.verify_keymat(csa.keys, c, 'sk_ar')
817 self.verify_keymat(csa.keys, c, 'sk_ei')
818 self.verify_keymat(csa.keys, c, 'sk_er')
Filip Tehlar68ad6252020-10-30 05:28:11 +0000819 self.assertEqual(csa.i_spi.to_bytes(4, 'big'), c.ispi)
820 self.assertEqual(csa.r_spi.to_bytes(4, 'big'), c.rspi)
jan_cavojskya340fe12020-07-08 09:24:12 +0200821
Filip Tehlar84962d12020-09-08 06:08:05 +0000822 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
jan_cavojskya340fe12020-07-08 09:24:12 +0200823 tsi = tsi[0]
824 tsr = tsr[0]
825 r = self.vapi.ikev2_traffic_selector_dump(
826 is_initiator=True, sa_index=sa.sa_index,
827 child_sa_index=csa.child_sa_index)
828 self.assertEqual(len(r), 1)
829 ts = r[0].ts
830 self.verify_ts(r[0].ts, tsi[0], True)
831
832 r = self.vapi.ikev2_traffic_selector_dump(
833 is_initiator=False, sa_index=sa.sa_index,
834 child_sa_index=csa.child_sa_index)
835 self.assertEqual(len(r), 1)
836 self.verify_ts(r[0].ts, tsr[0], False)
837
838 n = self.vapi.ikev2_nonce_get(is_initiator=True,
839 sa_index=sa.sa_index)
840 self.verify_nonce(n, self.sa.i_nonce)
841 n = self.vapi.ikev2_nonce_get(is_initiator=False,
842 sa_index=sa.sa_index)
843 self.verify_nonce(n, self.sa.r_nonce)
844
845 def verify_nonce(self, api_nonce, nonce):
846 self.assertEqual(api_nonce.data_len, len(nonce))
847 self.assertEqual(api_nonce.nonce, nonce)
848
849 def verify_ts(self, api_ts, ts, is_initiator):
850 if is_initiator:
851 self.assertTrue(api_ts.is_local)
852 else:
853 self.assertFalse(api_ts.is_local)
Filip Tehlar84962d12020-09-08 06:08:05 +0000854
855 if self.p.ts_is_ip4:
856 self.assertEqual(api_ts.start_addr,
857 IPv4Address(ts.starting_address_v4))
858 self.assertEqual(api_ts.end_addr,
859 IPv4Address(ts.ending_address_v4))
860 else:
861 self.assertEqual(api_ts.start_addr,
862 IPv6Address(ts.starting_address_v6))
863 self.assertEqual(api_ts.end_addr,
864 IPv6Address(ts.ending_address_v6))
jan_cavojskya340fe12020-07-08 09:24:12 +0200865 self.assertEqual(api_ts.start_port, ts.start_port)
866 self.assertEqual(api_ts.end_port, ts.end_port)
867 self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
868
Filip Tehlare7c83962020-09-23 11:20:12 +0000869
870class TemplateInitiator(IkePeer):
871 """ initiator test template """
872
Filip Tehlaredf29002020-10-10 04:39:11 +0000873 def initiate_del_sa_from_initiator(self):
874 ispi = int.from_bytes(self.sa.ispi, 'little')
875 self.pg0.enable_capture()
876 self.pg_start()
877 self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
878 capture = self.pg0.get_capture(1)
879 ih = self.get_ike_header(capture[0])
880 self.assertNotIn('Response', ih.flags)
881 self.assertIn('Initiator', ih.flags)
882 self.assertEqual(ih.init_SPI, self.sa.ispi)
883 self.assertEqual(ih.resp_SPI, self.sa.rspi)
884 plain = self.sa.hmac_and_decrypt(ih)
885 d = ikev2.IKEv2_payload_Delete(plain)
886 self.assertEqual(d.proto, 1) # proto=IKEv2
887 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
888 flags='Response', exch_type='INFORMATIONAL',
889 id=ih.id, next_payload='Encrypted')
890 resp = self.encrypt_ike_msg(header, b'', None)
891 self.send_and_assert_no_replies(self.pg0, resp)
892
893 def verify_del_sa(self, packet):
894 ih = self.get_ike_header(packet)
895 self.assertEqual(ih.id, self.sa.msg_id)
896 self.assertEqual(ih.exch_type, 37) # exchange informational
897 self.assertIn('Response', ih.flags)
898 self.assertIn('Initiator', ih.flags)
899 plain = self.sa.hmac_and_decrypt(ih)
900 self.assertEqual(plain, b'')
901
902 def initiate_del_sa_from_responder(self):
903 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
904 exch_type='INFORMATIONAL',
905 id=self.sa.new_msg_id())
906 del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
907 ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
908 packet = self.create_packet(self.pg0, ike_msg,
909 self.sa.sport, self.sa.dport,
910 self.sa.natt, self.ip6)
911 self.pg0.add_stream(packet)
912 self.pg0.enable_capture()
913 self.pg_start()
914 capture = self.pg0.get_capture(1)
915 self.verify_del_sa(capture[0])
Filip Tehlare7c83962020-09-23 11:20:12 +0000916
Filip Tehlarec112e52020-10-07 23:52:37 +0000917 @staticmethod
918 def find_notify_payload(packet, notify_type):
919 n = packet[ikev2.IKEv2_payload_Notify]
920 while n is not None:
921 if n.type == notify_type:
922 return n
923 n = n.payload
924 return None
925
926 def verify_nat_detection(self, packet):
927 if self.ip6:
928 iph = packet[IPv6]
929 else:
930 iph = packet[IP]
931 udp = packet[UDP]
932
933 # NAT_DETECTION_SOURCE_IP
934 s = self.find_notify_payload(packet, 16388)
935 self.assertIsNotNone(s)
936 src_sha = self.sa.compute_nat_sha1(
937 inet_pton(socket.AF_INET, iph.src), udp.sport, b'\x00' * 8)
938 self.assertEqual(s.load, src_sha)
939
940 # NAT_DETECTION_DESTINATION_IP
941 s = self.find_notify_payload(packet, 16389)
942 self.assertIsNotNone(s)
943 dst_sha = self.sa.compute_nat_sha1(
944 inet_pton(socket.AF_INET, iph.dst), udp.dport, b'\x00' * 8)
945 self.assertEqual(s.load, dst_sha)
946
Filip Tehlare7c83962020-09-23 11:20:12 +0000947 def verify_sa_init_request(self, packet):
Filip Tehlar18107c92020-12-01 14:51:09 +0000948 udp = packet[UDP]
949 self.sa.dport = udp.sport
Filip Tehlare7c83962020-09-23 11:20:12 +0000950 ih = packet[ikev2.IKEv2]
951 self.assertNotEqual(ih.init_SPI, 8 * b'\x00')
952 self.assertEqual(ih.exch_type, 34) # SA_INIT
953 self.sa.ispi = ih.init_SPI
954 self.assertEqual(ih.resp_SPI, 8 * b'\x00')
955 self.assertIn('Initiator', ih.flags)
956 self.assertNotIn('Response', ih.flags)
957 self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
958 self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
959
960 prop = packet[ikev2.IKEv2_payload_Proposal]
961 self.assertEqual(prop.proto, 1) # proto = ikev2
962 self.assertEqual(prop.proposal, 1)
963 self.assertEqual(prop.trans[0].transform_type, 1) # encryption
964 self.assertEqual(prop.trans[0].transform_id,
965 self.p.ike_transforms['crypto_alg'])
966 self.assertEqual(prop.trans[1].transform_type, 2) # prf
967 self.assertEqual(prop.trans[1].transform_id, 5) # "hmac-sha2-256"
968 self.assertEqual(prop.trans[2].transform_type, 4) # dh
969 self.assertEqual(prop.trans[2].transform_id,
970 self.p.ike_transforms['dh_group'])
971
Filip Tehlarec112e52020-10-07 23:52:37 +0000972 self.verify_nat_detection(packet)
Filip Tehlar68ad6252020-10-30 05:28:11 +0000973 self.sa.set_ike_props(
974 crypto='AES-GCM-16ICV', crypto_key_len=32,
975 integ='NULL', prf='PRF_HMAC_SHA2_256', dh='3072MODPgr')
976 self.sa.set_esp_props(crypto='AES-CBC', crypto_key_len=32,
977 integ='SHA2-256-128')
978 self.sa.generate_dh_data()
Filip Tehlare7c83962020-09-23 11:20:12 +0000979 self.sa.complete_dh_data()
980 self.sa.calc_keys()
981
Filip Tehlar68ad6252020-10-30 05:28:11 +0000982 def update_esp_transforms(self, trans, sa):
983 while trans:
984 if trans.transform_type == 1: # ecryption
985 sa.esp_crypto = CRYPTO_IDS[trans.transform_id]
986 elif trans.transform_type == 3: # integrity
987 sa.esp_integ = INTEG_IDS[trans.transform_id]
988 trans = trans.payload
989
Filip Tehlare7c83962020-09-23 11:20:12 +0000990 def verify_sa_auth_req(self, packet):
Filip Tehlar18107c92020-12-01 14:51:09 +0000991 udp = packet[UDP]
992 self.sa.dport = udp.sport
Filip Tehlare7c83962020-09-23 11:20:12 +0000993 ih = self.get_ike_header(packet)
994 self.assertEqual(ih.resp_SPI, self.sa.rspi)
995 self.assertEqual(ih.init_SPI, self.sa.ispi)
996 self.assertEqual(ih.exch_type, 35) # IKE_AUTH
997 self.assertIn('Initiator', ih.flags)
998 self.assertNotIn('Response', ih.flags)
999
1000 udp = packet[UDP]
1001 self.verify_udp(udp)
1002 self.assertEqual(ih.id, self.sa.msg_id + 1)
1003 self.sa.msg_id += 1
1004 plain = self.sa.hmac_and_decrypt(ih)
1005 idi = ikev2.IKEv2_payload_IDi(plain)
Filip Tehlare7c83962020-09-23 11:20:12 +00001006 self.assertEqual(idi.load, self.sa.i_id)
Benoît Gannec7cceee2021-09-28 11:19:37 +02001007 if self.no_idr_auth:
1008 self.assertEqual(idi.next_payload, 39) # AUTH
1009 else:
1010 idr = ikev2.IKEv2_payload_IDr(idi.payload)
1011 self.assertEqual(idr.load, self.sa.r_id)
Filip Tehlar68ad6252020-10-30 05:28:11 +00001012 prop = idi[ikev2.IKEv2_payload_Proposal]
1013 c = self.sa.child_sas[0]
1014 c.ispi = prop.SPI
1015 self.update_esp_transforms(
1016 prop[ikev2.IKEv2_payload_Transform], self.sa)
Filip Tehlare7c83962020-09-23 11:20:12 +00001017
1018 def send_init_response(self):
1019 tr_attr = self.sa.ike_crypto_attr()
1020 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1021 transform_id=self.sa.ike_crypto, length=tr_attr[1],
1022 key_length=tr_attr[0]) /
1023 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1024 transform_id=self.sa.ike_integ) /
1025 ikev2.IKEv2_payload_Transform(transform_type='PRF',
1026 transform_id=self.sa.ike_prf_alg.name) /
1027 ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
1028 transform_id=self.sa.ike_dh))
1029 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
1030 trans_nb=4, trans=trans))
Filip Tehlar18107c92020-12-01 14:51:09 +00001031
1032 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1033 if self.sa.natt:
1034 dst_address = b'\x0a\x0a\x0a\x0a'
1035 else:
1036 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1037 src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1038 dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1039
Filip Tehlare7c83962020-09-23 11:20:12 +00001040 self.sa.init_resp_packet = (
1041 ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1042 exch_type='IKE_SA_INIT', flags='Response') /
1043 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
1044 ikev2.IKEv2_payload_KE(next_payload='Nonce',
1045 group=self.sa.ike_dh,
1046 load=self.sa.my_dh_pub_key) /
Filip Tehlar18107c92020-12-01 14:51:09 +00001047 ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce,
1048 next_payload='Notify') /
1049 ikev2.IKEv2_payload_Notify(
1050 type='NAT_DETECTION_SOURCE_IP', load=src_nat,
1051 next_payload='Notify') / ikev2.IKEv2_payload_Notify(
1052 type='NAT_DETECTION_DESTINATION_IP', load=dst_nat))
Filip Tehlare7c83962020-09-23 11:20:12 +00001053
1054 ike_msg = self.create_packet(self.pg0, self.sa.init_resp_packet,
1055 self.sa.sport, self.sa.dport,
Filip Tehlar18107c92020-12-01 14:51:09 +00001056 False, self.ip6)
Filip Tehlare7c83962020-09-23 11:20:12 +00001057 self.pg_send(self.pg0, ike_msg)
1058 capture = self.pg0.get_capture(1)
1059 self.verify_sa_auth_req(capture[0])
1060
1061 def initiate_sa_init(self):
1062 self.pg0.enable_capture()
1063 self.pg_start()
1064 self.vapi.ikev2_initiate_sa_init(name=self.p.profile_name)
1065
1066 capture = self.pg0.get_capture(1)
1067 self.verify_sa_init_request(capture[0])
1068 self.send_init_response()
1069
1070 def send_auth_response(self):
1071 tr_attr = self.sa.esp_crypto_attr()
1072 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1073 transform_id=self.sa.esp_crypto, length=tr_attr[1],
1074 key_length=tr_attr[0]) /
1075 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1076 transform_id=self.sa.esp_integ) /
1077 ikev2.IKEv2_payload_Transform(
1078 transform_type='Extended Sequence Number',
1079 transform_id='No ESN') /
1080 ikev2.IKEv2_payload_Transform(
1081 transform_type='Extended Sequence Number',
1082 transform_id='ESN'))
1083
Filip Tehlar68ad6252020-10-30 05:28:11 +00001084 c = self.sa.child_sas[0]
Filip Tehlare7c83962020-09-23 11:20:12 +00001085 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
Filip Tehlar68ad6252020-10-30 05:28:11 +00001086 SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans))
Filip Tehlare7c83962020-09-23 11:20:12 +00001087
1088 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1089 plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1090 IDtype=self.sa.id_type, load=self.sa.i_id) /
1091 ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1092 IDtype=self.sa.id_type, load=self.sa.r_id) /
1093 ikev2.IKEv2_payload_AUTH(next_payload='SA',
1094 auth_type=AuthMethod.value(self.sa.auth_method),
1095 load=self.sa.auth_data) /
1096 ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1097 ikev2.IKEv2_payload_TSi(next_payload='TSr',
1098 number_of_TSs=len(tsi),
1099 traffic_selector=tsi) /
1100 ikev2.IKEv2_payload_TSr(next_payload='Notify',
1101 number_of_TSs=len(tsr),
1102 traffic_selector=tsr) /
1103 ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
1104
1105 header = ikev2.IKEv2(
1106 init_SPI=self.sa.ispi,
1107 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1108 flags='Response', exch_type='IKE_AUTH')
1109
1110 ike_msg = self.encrypt_ike_msg(header, plain, 'IDi')
1111 packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1112 self.sa.dport, self.sa.natt, self.ip6)
1113 self.pg_send(self.pg0, packet)
1114
1115 def test_initiator(self):
1116 self.initiate_sa_init()
1117 self.sa.auth_init()
1118 self.sa.calc_child_keys()
1119 self.send_auth_response()
1120 self.verify_ike_sas()
1121
1122
1123class TemplateResponder(IkePeer):
1124 """ responder test template """
1125
Filip Tehlaredf29002020-10-10 04:39:11 +00001126 def initiate_del_sa_from_responder(self):
1127 self.pg0.enable_capture()
1128 self.pg_start()
1129 self.vapi.ikev2_initiate_del_ike_sa(
1130 ispi=int.from_bytes(self.sa.ispi, 'little'))
1131 capture = self.pg0.get_capture(1)
1132 ih = self.get_ike_header(capture[0])
1133 self.assertNotIn('Response', ih.flags)
1134 self.assertNotIn('Initiator', ih.flags)
1135 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
1136 plain = self.sa.hmac_and_decrypt(ih)
1137 d = ikev2.IKEv2_payload_Delete(plain)
1138 self.assertEqual(d.proto, 1) # proto=IKEv2
1139 self.assertEqual(ih.init_SPI, self.sa.ispi)
1140 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1141 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1142 flags='Initiator+Response',
1143 exch_type='INFORMATIONAL',
1144 id=ih.id, next_payload='Encrypted')
1145 resp = self.encrypt_ike_msg(header, b'', None)
1146 self.send_and_assert_no_replies(self.pg0, resp)
Filip Tehlare7c83962020-09-23 11:20:12 +00001147
1148 def verify_del_sa(self, packet):
1149 ih = self.get_ike_header(packet)
1150 self.assertEqual(ih.id, self.sa.msg_id)
1151 self.assertEqual(ih.exch_type, 37) # exchange informational
Filip Tehlaredf29002020-10-10 04:39:11 +00001152 self.assertIn('Response', ih.flags)
1153 self.assertNotIn('Initiator', ih.flags)
1154 self.assertEqual(ih.next_payload, 46) # Encrypted
1155 self.assertEqual(ih.init_SPI, self.sa.ispi)
1156 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1157 plain = self.sa.hmac_and_decrypt(ih)
1158 self.assertEqual(plain, b'')
Filip Tehlare7c83962020-09-23 11:20:12 +00001159
Filip Tehlaredf29002020-10-10 04:39:11 +00001160 def initiate_del_sa_from_initiator(self):
Filip Tehlare7c83962020-09-23 11:20:12 +00001161 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1162 flags='Initiator', exch_type='INFORMATIONAL',
1163 id=self.sa.new_msg_id())
1164 del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
1165 ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
1166 packet = self.create_packet(self.pg0, ike_msg,
1167 self.sa.sport, self.sa.dport,
1168 self.sa.natt, self.ip6)
1169 self.pg0.add_stream(packet)
1170 self.pg0.enable_capture()
1171 self.pg_start()
1172 capture = self.pg0.get_capture(1)
1173 self.verify_del_sa(capture[0])
1174
Filip Tehlar027d8132020-12-04 17:38:11 +00001175 def send_sa_init_req(self):
Filip Tehlare7c83962020-09-23 11:20:12 +00001176 tr_attr = self.sa.ike_crypto_attr()
1177 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1178 transform_id=self.sa.ike_crypto, length=tr_attr[1],
1179 key_length=tr_attr[0]) /
1180 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1181 transform_id=self.sa.ike_integ) /
1182 ikev2.IKEv2_payload_Transform(transform_type='PRF',
1183 transform_id=self.sa.ike_prf_alg.name) /
1184 ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
1185 transform_id=self.sa.ike_dh))
1186
1187 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
1188 trans_nb=4, trans=trans))
1189
Filip Tehlar027d8132020-12-04 17:38:11 +00001190 next_payload = None if self.ip6 else 'Notify'
1191
Filip Tehlare7c83962020-09-23 11:20:12 +00001192 self.sa.init_req_packet = (
1193 ikev2.IKEv2(init_SPI=self.sa.ispi,
1194 flags='Initiator', exch_type='IKE_SA_INIT') /
1195 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
1196 ikev2.IKEv2_payload_KE(next_payload='Nonce',
1197 group=self.sa.ike_dh,
1198 load=self.sa.my_dh_pub_key) /
Filip Tehlar027d8132020-12-04 17:38:11 +00001199 ikev2.IKEv2_payload_Nonce(next_payload=next_payload,
Filip Tehlare7c83962020-09-23 11:20:12 +00001200 load=self.sa.i_nonce))
1201
Filip Tehlar027d8132020-12-04 17:38:11 +00001202 if not self.ip6:
1203 if self.sa.i_natt:
1204 src_address = b'\x0a\x0a\x0a\x01'
1205 else:
1206 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
Filip Tehlare7c83962020-09-23 11:20:12 +00001207
Filip Tehlar027d8132020-12-04 17:38:11 +00001208 if self.sa.r_natt:
1209 dst_address = b'\x0a\x0a\x0a\x0a'
1210 else:
1211 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1212
1213 src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1214 dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1215 nat_src_detection = ikev2.IKEv2_payload_Notify(
1216 type='NAT_DETECTION_SOURCE_IP', load=src_nat,
1217 next_payload='Notify')
1218 nat_dst_detection = ikev2.IKEv2_payload_Notify(
1219 type='NAT_DETECTION_DESTINATION_IP', load=dst_nat)
1220 self.sa.init_req_packet = (self.sa.init_req_packet /
1221 nat_src_detection /
1222 nat_dst_detection)
Filip Tehlare7c83962020-09-23 11:20:12 +00001223
1224 ike_msg = self.create_packet(self.pg0, self.sa.init_req_packet,
1225 self.sa.sport, self.sa.dport,
1226 self.sa.natt, self.ip6)
1227 self.pg0.add_stream(ike_msg)
1228 self.pg0.enable_capture()
1229 self.pg_start()
1230 capture = self.pg0.get_capture(1)
1231 self.verify_sa_init(capture[0])
1232
Filip Tehlar68ad6252020-10-30 05:28:11 +00001233 def generate_auth_payload(self, last_payload=None, is_rekey=False):
Filip Tehlare7c83962020-09-23 11:20:12 +00001234 tr_attr = self.sa.esp_crypto_attr()
Filip Tehlar68ad6252020-10-30 05:28:11 +00001235 last_payload = last_payload or 'Notify'
Filip Tehlare7c83962020-09-23 11:20:12 +00001236 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1237 transform_id=self.sa.esp_crypto, length=tr_attr[1],
1238 key_length=tr_attr[0]) /
1239 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1240 transform_id=self.sa.esp_integ) /
1241 ikev2.IKEv2_payload_Transform(
1242 transform_type='Extended Sequence Number',
1243 transform_id='No ESN') /
1244 ikev2.IKEv2_payload_Transform(
1245 transform_type='Extended Sequence Number',
1246 transform_id='ESN'))
1247
Filip Tehlar68ad6252020-10-30 05:28:11 +00001248 c = self.sa.child_sas[0]
Filip Tehlare7c83962020-09-23 11:20:12 +00001249 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
Filip Tehlar68ad6252020-10-30 05:28:11 +00001250 SPIsize=4, SPI=c.ispi, trans_nb=4, trans=trans))
Filip Tehlare7c83962020-09-23 11:20:12 +00001251
1252 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
Filip Tehlar68ad6252020-10-30 05:28:11 +00001253 plain = (ikev2.IKEv2_payload_AUTH(next_payload='SA',
Filip Tehlare7c83962020-09-23 11:20:12 +00001254 auth_type=AuthMethod.value(self.sa.auth_method),
1255 load=self.sa.auth_data) /
1256 ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1257 ikev2.IKEv2_payload_TSi(next_payload='TSr',
Filip Tehlar68ad6252020-10-30 05:28:11 +00001258 number_of_TSs=len(tsi), traffic_selector=tsi) /
1259 ikev2.IKEv2_payload_TSr(next_payload=last_payload,
1260 number_of_TSs=len(tsr), traffic_selector=tsr))
Filip Tehlare7c83962020-09-23 11:20:12 +00001261
Filip Tehlar68ad6252020-10-30 05:28:11 +00001262 if is_rekey:
1263 first_payload = 'Nonce'
1264 plain = (ikev2.IKEv2_payload_Nonce(load=self.sa.i_nonce,
1265 next_payload='SA') / plain /
1266 ikev2.IKEv2_payload_Notify(type='REKEY_SA',
1267 proto='ESP', SPI=c.ispi))
1268 else:
1269 first_payload = 'IDi'
Benoît Gannec7cceee2021-09-28 11:19:37 +02001270 if self.no_idr_auth:
1271 ids = ikev2.IKEv2_payload_IDi(next_payload='AUTH',
1272 IDtype=self.sa.id_type,
1273 load=self.sa.i_id)
1274 else:
1275 ids = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1276 IDtype=self.sa.id_type, load=self.sa.i_id) /
1277 ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1278 IDtype=self.sa.id_type, load=self.sa.r_id))
Filip Tehlar68ad6252020-10-30 05:28:11 +00001279 plain = ids / plain
1280 return plain, first_payload
1281
1282 def send_sa_auth(self):
1283 plain, first_payload = self.generate_auth_payload(
1284 last_payload='Notify')
1285 plain = plain / ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT')
Filip Tehlare7c83962020-09-23 11:20:12 +00001286 header = ikev2.IKEv2(
1287 init_SPI=self.sa.ispi,
1288 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1289 flags='Initiator', exch_type='IKE_AUTH')
1290
Filip Tehlar68ad6252020-10-30 05:28:11 +00001291 ike_msg = self.encrypt_ike_msg(header, plain, first_payload)
Filip Tehlare7c83962020-09-23 11:20:12 +00001292 packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1293 self.sa.dport, self.sa.natt, self.ip6)
1294 self.pg0.add_stream(packet)
1295 self.pg0.enable_capture()
1296 self.pg_start()
1297 capture = self.pg0.get_capture(1)
1298 self.verify_sa_auth_resp(capture[0])
1299
1300 def verify_sa_init(self, packet):
1301 ih = self.get_ike_header(packet)
1302
1303 self.assertEqual(ih.id, self.sa.msg_id)
1304 self.assertEqual(ih.exch_type, 34)
Filip Tehlaredf29002020-10-10 04:39:11 +00001305 self.assertIn('Response', ih.flags)
Filip Tehlare7c83962020-09-23 11:20:12 +00001306 self.assertEqual(ih.init_SPI, self.sa.ispi)
1307 self.assertNotEqual(ih.resp_SPI, 0)
1308 self.sa.rspi = ih.resp_SPI
1309 try:
1310 sa = ih[ikev2.IKEv2_payload_SA]
1311 self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1312 self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
1313 except IndexError as e:
1314 self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
1315 self.logger.error(ih.show())
1316 raise
1317 self.sa.complete_dh_data()
1318 self.sa.calc_keys()
1319 self.sa.auth_init()
1320
1321 def verify_sa_auth_resp(self, packet):
1322 ike = self.get_ike_header(packet)
1323 udp = packet[UDP]
1324 self.verify_udp(udp)
1325 self.assertEqual(ike.id, self.sa.msg_id)
1326 plain = self.sa.hmac_and_decrypt(ike)
Filip Tehlar68ad6252020-10-30 05:28:11 +00001327 idr = ikev2.IKEv2_payload_IDr(plain)
1328 prop = idr[ikev2.IKEv2_payload_Proposal]
1329 self.assertEqual(prop.SPIsize, 4)
1330 self.sa.child_sas[0].rspi = prop.SPI
Filip Tehlare7c83962020-09-23 11:20:12 +00001331 self.sa.calc_child_keys()
1332
Filip Tehlarfab5e7f2021-01-14 13:32:01 +00001333 IKE_NODE_SUFFIX = 'ip4'
1334
1335 def verify_counters(self):
1336 self.assert_counter(2, 'processed', self.IKE_NODE_SUFFIX)
Filip Tehlar68d27532021-01-25 10:09:27 +00001337 self.assert_counter(1, 'init_sa_req', self.IKE_NODE_SUFFIX)
Filip Tehlarfab5e7f2021-01-14 13:32:01 +00001338 self.assert_counter(1, 'ike_auth_req', self.IKE_NODE_SUFFIX)
1339
Filip Tehlar68d27532021-01-25 10:09:27 +00001340 r = self.vapi.ikev2_sa_dump()
1341 s = r[0].sa.stats
1342 self.assertEqual(1, s.n_sa_auth_req)
1343 self.assertEqual(1, s.n_sa_init_req)
1344
Filip Tehlar12b517b2020-04-26 18:05:05 +00001345 def test_responder(self):
Filip Tehlar027d8132020-12-04 17:38:11 +00001346 self.send_sa_init_req()
Filip Tehlar12b517b2020-04-26 18:05:05 +00001347 self.send_sa_auth()
jan_cavojskya340fe12020-07-08 09:24:12 +02001348 self.verify_ipsec_sas()
1349 self.verify_ike_sas()
Filip Tehlarfab5e7f2021-01-14 13:32:01 +00001350 self.verify_counters()
Filip Tehlar12b517b2020-04-26 18:05:05 +00001351
1352
Filip Tehlarbfeae8c2020-06-23 20:35:58 +00001353class Ikev2Params(object):
1354 def config_params(self, params={}):
Filip Tehlar4f42a712020-07-01 08:56:59 +00001355 ec = VppEnum.vl_api_ipsec_crypto_alg_t
1356 ei = VppEnum.vl_api_ipsec_integ_alg_t
1357 self.vpp_enums = {
1358 'AES-CBC-128': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
1359 'AES-CBC-192': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
1360 'AES-CBC-256': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
1361 'AES-GCM-16ICV-128': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
1362 'AES-GCM-16ICV-192': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
1363 'AES-GCM-16ICV-256': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
1364
1365 'HMAC-SHA1-96': ei.IPSEC_API_INTEG_ALG_SHA1_96,
1366 'SHA2-256-128': ei.IPSEC_API_INTEG_ALG_SHA_256_128,
1367 'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192,
1368 'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256}
1369
Filip Tehlar2008e312020-11-09 13:23:24 +00001370 dpd_disabled = True if 'dpd_disabled' not in params else\
1371 params['dpd_disabled']
1372 if dpd_disabled:
1373 self.vapi.cli('ikev2 dpd disable')
Filip Tehlaredf29002020-10-10 04:39:11 +00001374 self.del_sa_from_responder = False if 'del_sa_from_responder'\
1375 not in params else params['del_sa_from_responder']
Filip Tehlar027d8132020-12-04 17:38:11 +00001376 i_natt = False if 'i_natt' not in params else params['i_natt']
1377 r_natt = False if 'r_natt' not in params else params['r_natt']
Filip Tehlarbfeae8c2020-06-23 20:35:58 +00001378 self.p = Profile(self, 'pr1')
Filip Tehlar84962d12020-09-08 06:08:05 +00001379 self.ip6 = False if 'ip6' not in params else params['ip6']
Filip Tehlarbfeae8c2020-06-23 20:35:58 +00001380
1381 if 'auth' in params and params['auth'] == 'rsa-sig':
1382 auth_method = 'rsa-sig'
1383 work_dir = os.getenv('BR') + '/../src/plugins/ikev2/test/certs/'
1384 self.vapi.ikev2_set_local_key(
1385 key_file=work_dir + params['server-key'])
1386
1387 client_file = work_dir + params['client-cert']
1388 server_pem = open(work_dir + params['server-cert']).read()
1389 client_priv = open(work_dir + params['client-key']).read()
1390 client_priv = load_pem_private_key(str.encode(client_priv), None,
1391 default_backend())
1392 self.peer_cert = x509.load_pem_x509_certificate(
1393 str.encode(server_pem),
1394 default_backend())
1395 self.p.add_auth(method='rsa-sig', data=str.encode(client_file))
1396 auth_data = None
1397 else:
1398 auth_data = b'$3cr3tpa$$w0rd'
1399 self.p.add_auth(method='shared-key', data=auth_data)
1400 auth_method = 'shared-key'
1401 client_priv = None
1402
Filip Tehlare7c83962020-09-23 11:20:12 +00001403 is_init = True if 'is_initiator' not in params else\
1404 params['is_initiator']
Benoît Gannec7cceee2021-09-28 11:19:37 +02001405 self.no_idr_auth = params.get('no_idr_in_auth', False)
Filip Tehlare7c83962020-09-23 11:20:12 +00001406
1407 idr = {'id_type': 'fqdn', 'data': b'vpp.home'}
1408 idi = {'id_type': 'fqdn', 'data': b'roadwarrior.example.com'}
Benoît Gannec7cceee2021-09-28 11:19:37 +02001409 r_id = self.idr = idr['data']
1410 i_id = self.idi = idi['data']
Filip Tehlare7c83962020-09-23 11:20:12 +00001411 if is_init:
Benoît Gannec7cceee2021-09-28 11:19:37 +02001412 # scapy is initiator, VPP is responder
Filip Tehlare7c83962020-09-23 11:20:12 +00001413 self.p.add_local_id(**idr)
1414 self.p.add_remote_id(**idi)
Benoît Gannec7cceee2021-09-28 11:19:37 +02001415 if self.no_idr_auth:
1416 r_id = None
Filip Tehlare7c83962020-09-23 11:20:12 +00001417 else:
Benoît Gannec7cceee2021-09-28 11:19:37 +02001418 # VPP is initiator, scapy is responder
Filip Tehlare7c83962020-09-23 11:20:12 +00001419 self.p.add_local_id(**idi)
Benoît Gannec7cceee2021-09-28 11:19:37 +02001420 if not self.no_idr_auth:
1421 self.p.add_remote_id(**idr)
Filip Tehlare7c83962020-09-23 11:20:12 +00001422
Filip Tehlar84962d12020-09-08 06:08:05 +00001423 loc_ts = {'start_addr': '10.10.10.0', 'end_addr': '10.10.10.255'} if\
1424 'loc_ts' not in params else params['loc_ts']
1425 rem_ts = {'start_addr': '10.0.0.0', 'end_addr': '10.0.0.255'} if\
1426 'rem_ts' not in params else params['rem_ts']
1427 self.p.add_local_ts(**loc_ts)
1428 self.p.add_remote_ts(**rem_ts)
Filip Tehlare7c83962020-09-23 11:20:12 +00001429 if 'responder' in params:
1430 self.p.add_responder(params['responder'])
1431 if 'ike_transforms' in params:
1432 self.p.add_ike_transforms(params['ike_transforms'])
1433 if 'esp_transforms' in params:
1434 self.p.add_esp_transforms(params['esp_transforms'])
Filip Tehlarbfeae8c2020-06-23 20:35:58 +00001435
Filip Tehlar67b8a7f2020-11-06 11:00:42 +00001436 udp_encap = False if 'udp_encap' not in params else\
1437 params['udp_encap']
1438 if udp_encap:
1439 self.p.set_udp_encap(True)
1440
Filip Tehlaraf2cc642021-02-22 16:15:51 +00001441 if 'responder_hostname' in params:
1442 hn = params['responder_hostname']
1443 self.p.add_responder_hostname(hn)
1444
1445 # configure static dns record
1446 self.vapi.dns_name_server_add_del(
1447 is_ip6=0, is_add=1,
1448 server_address=IPv4Address(u'8.8.8.8').packed)
1449 self.vapi.dns_enable_disable(enable=1)
1450
1451 cmd = "dns cache add {} {}".format(hn['hostname'],
1452 self.pg0.remote_ip4)
1453 self.vapi.cli(cmd)
1454
Benoît Gannec7cceee2021-09-28 11:19:37 +02001455 self.sa = IKEv2SA(self, i_id=i_id, r_id=r_id,
Filip Tehlare7c83962020-09-23 11:20:12 +00001456 is_initiator=is_init,
Filip Tehlar027d8132020-12-04 17:38:11 +00001457 id_type=self.p.local_id['id_type'],
1458 i_natt=i_natt, r_natt=r_natt,
Filip Tehlarbfeae8c2020-06-23 20:35:58 +00001459 priv_key=client_priv, auth_method=auth_method,
Benoît Gannea4276902021-09-27 15:37:48 +02001460 nonce=params.get('nonce'),
Filip Tehlar67b8a7f2020-11-06 11:00:42 +00001461 auth_data=auth_data, udp_encap=udp_encap,
Filip Tehlarbfeae8c2020-06-23 20:35:58 +00001462 local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
Benoît Gannec7cceee2021-09-28 11:19:37 +02001463
Filip Tehlar68ad6252020-10-30 05:28:11 +00001464 if is_init:
1465 ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\
1466 params['ike-crypto']
1467 ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\
1468 params['ike-integ']
1469 ike_dh = '2048MODPgr' if 'ike-dh' not in params else\
1470 params['ike-dh']
Filip Tehlar4f42a712020-07-01 08:56:59 +00001471
Filip Tehlar68ad6252020-10-30 05:28:11 +00001472 esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\
1473 params['esp-crypto']
1474 esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\
1475 params['esp-integ']
Filip Tehlar4f42a712020-07-01 08:56:59 +00001476
Filip Tehlar68ad6252020-10-30 05:28:11 +00001477 self.sa.set_ike_props(
1478 crypto=ike_crypto[0], crypto_key_len=ike_crypto[1],
1479 integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh)
1480 self.sa.set_esp_props(
1481 crypto=esp_crypto[0], crypto_key_len=esp_crypto[1],
1482 integ=esp_integ)
Filip Tehlarbfeae8c2020-06-23 20:35:58 +00001483
1484
Filip Tehlar459d17b2020-07-06 15:40:08 +00001485class TestApi(VppTestCase):
1486 """ Test IKEV2 API """
1487 @classmethod
1488 def setUpClass(cls):
1489 super(TestApi, cls).setUpClass()
1490
1491 @classmethod
1492 def tearDownClass(cls):
1493 super(TestApi, cls).tearDownClass()
1494
1495 def tearDown(self):
1496 super(TestApi, self).tearDown()
1497 self.p1.remove_vpp_config()
1498 self.p2.remove_vpp_config()
1499 r = self.vapi.ikev2_profile_dump()
1500 self.assertEqual(len(r), 0)
1501
1502 def configure_profile(self, cfg):
1503 p = Profile(self, cfg['name'])
1504 p.add_local_id(id_type=cfg['loc_id'][0], data=cfg['loc_id'][1])
1505 p.add_remote_id(id_type=cfg['rem_id'][0], data=cfg['rem_id'][1])
1506 p.add_local_ts(**cfg['loc_ts'])
1507 p.add_remote_ts(**cfg['rem_ts'])
1508 p.add_responder(cfg['responder'])
1509 p.add_ike_transforms(cfg['ike_ts'])
1510 p.add_esp_transforms(cfg['esp_ts'])
1511 p.add_auth(**cfg['auth'])
1512 p.set_udp_encap(cfg['udp_encap'])
1513 p.set_ipsec_over_udp_port(cfg['ipsec_over_udp_port'])
1514 if 'lifetime_data' in cfg:
1515 p.set_lifetime_data(cfg['lifetime_data'])
1516 if 'tun_itf' in cfg:
1517 p.set_tunnel_interface(cfg['tun_itf'])
Filip Tehlard7fc12f2020-10-30 04:47:44 +00001518 if 'natt_disabled' in cfg and cfg['natt_disabled']:
1519 p.disable_natt()
Filip Tehlar459d17b2020-07-06 15:40:08 +00001520 p.add_vpp_config()
1521 return p
1522
1523 def test_profile_api(self):
1524 """ test profile dump API """
Filip Tehlar84962d12020-09-08 06:08:05 +00001525 loc_ts4 = {
Filip Tehlar459d17b2020-07-06 15:40:08 +00001526 'proto': 8,
1527 'start_port': 1,
1528 'end_port': 19,
1529 'start_addr': '3.3.3.2',
1530 'end_addr': '3.3.3.3',
1531 }
Filip Tehlar84962d12020-09-08 06:08:05 +00001532 rem_ts4 = {
Filip Tehlar459d17b2020-07-06 15:40:08 +00001533 'proto': 9,
1534 'start_port': 10,
1535 'end_port': 119,
1536 'start_addr': '4.5.76.80',
1537 'end_addr': '2.3.4.6',
1538 }
1539
Filip Tehlar84962d12020-09-08 06:08:05 +00001540 loc_ts6 = {
1541 'proto': 8,
1542 'start_port': 1,
1543 'end_port': 19,
1544 'start_addr': 'ab::1',
1545 'end_addr': 'ab::4',
1546 }
1547 rem_ts6 = {
1548 'proto': 9,
1549 'start_port': 10,
1550 'end_port': 119,
1551 'start_addr': 'cd::12',
1552 'end_addr': 'cd::13',
1553 }
1554
Filip Tehlar459d17b2020-07-06 15:40:08 +00001555 conf = {
1556 'p1': {
1557 'name': 'p1',
Filip Tehlard7fc12f2020-10-30 04:47:44 +00001558 'natt_disabled': True,
Filip Tehlar459d17b2020-07-06 15:40:08 +00001559 'loc_id': ('fqdn', b'vpp.home'),
1560 'rem_id': ('fqdn', b'roadwarrior.example.com'),
Filip Tehlar84962d12020-09-08 06:08:05 +00001561 'loc_ts': loc_ts4,
1562 'rem_ts': rem_ts4,
1563 'responder': {'sw_if_index': 0, 'addr': '5.6.7.8'},
Filip Tehlar459d17b2020-07-06 15:40:08 +00001564 'ike_ts': {
1565 'crypto_alg': 20,
1566 'crypto_key_size': 32,
Benoît Ganne1eaaba42020-12-14 19:31:16 +01001567 'integ_alg': 0,
Filip Tehlar459d17b2020-07-06 15:40:08 +00001568 'dh_group': 1},
1569 'esp_ts': {
1570 'crypto_alg': 13,
1571 'crypto_key_size': 24,
1572 'integ_alg': 2},
1573 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1574 'udp_encap': True,
1575 'ipsec_over_udp_port': 4501,
1576 'lifetime_data': {
1577 'lifetime': 123,
1578 'lifetime_maxdata': 20192,
1579 'lifetime_jitter': 9,
1580 'handover': 132},
1581 },
1582 'p2': {
1583 'name': 'p2',
1584 'loc_id': ('ip4-addr', b'192.168.2.1'),
Filip Tehlar84962d12020-09-08 06:08:05 +00001585 'rem_id': ('ip6-addr', b'abcd::1'),
1586 'loc_ts': loc_ts6,
1587 'rem_ts': rem_ts6,
1588 'responder': {'sw_if_index': 4, 'addr': 'def::10'},
Filip Tehlar459d17b2020-07-06 15:40:08 +00001589 'ike_ts': {
1590 'crypto_alg': 12,
1591 'crypto_key_size': 16,
1592 'integ_alg': 3,
1593 'dh_group': 3},
1594 'esp_ts': {
1595 'crypto_alg': 9,
1596 'crypto_key_size': 24,
1597 'integ_alg': 4},
1598 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1599 'udp_encap': False,
1600 'ipsec_over_udp_port': 4600,
1601 'tun_itf': 0}
1602 }
1603 self.p1 = self.configure_profile(conf['p1'])
1604 self.p2 = self.configure_profile(conf['p2'])
1605
1606 r = self.vapi.ikev2_profile_dump()
1607 self.assertEqual(len(r), 2)
1608 self.verify_profile(r[0].profile, conf['p1'])
1609 self.verify_profile(r[1].profile, conf['p2'])
1610
1611 def verify_id(self, api_id, cfg_id):
1612 self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1613 self.assertEqual(bytes(api_id.data, 'ascii'), cfg_id[1])
1614
1615 def verify_ts(self, api_ts, cfg_ts):
1616 self.assertEqual(api_ts.protocol_id, cfg_ts['proto'])
1617 self.assertEqual(api_ts.start_port, cfg_ts['start_port'])
1618 self.assertEqual(api_ts.end_port, cfg_ts['end_port'])
Filip Tehlar84962d12020-09-08 06:08:05 +00001619 self.assertEqual(api_ts.start_addr,
1620 ip_address(text_type(cfg_ts['start_addr'])))
1621 self.assertEqual(api_ts.end_addr,
1622 ip_address(text_type(cfg_ts['end_addr'])))
Filip Tehlar459d17b2020-07-06 15:40:08 +00001623
1624 def verify_responder(self, api_r, cfg_r):
1625 self.assertEqual(api_r.sw_if_index, cfg_r['sw_if_index'])
Filip Tehlar84962d12020-09-08 06:08:05 +00001626 self.assertEqual(api_r.addr, ip_address(cfg_r['addr']))
Filip Tehlar459d17b2020-07-06 15:40:08 +00001627
1628 def verify_transforms(self, api_ts, cfg_ts):
1629 self.assertEqual(api_ts.crypto_alg, cfg_ts['crypto_alg'])
1630 self.assertEqual(api_ts.crypto_key_size, cfg_ts['crypto_key_size'])
1631 self.assertEqual(api_ts.integ_alg, cfg_ts['integ_alg'])
1632
1633 def verify_ike_transforms(self, api_ts, cfg_ts):
1634 self.verify_transforms(api_ts, cfg_ts)
1635 self.assertEqual(api_ts.dh_group, cfg_ts['dh_group'])
1636
1637 def verify_esp_transforms(self, api_ts, cfg_ts):
1638 self.verify_transforms(api_ts, cfg_ts)
1639
1640 def verify_auth(self, api_auth, cfg_auth):
1641 self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth['method']))
1642 self.assertEqual(api_auth.data, cfg_auth['data'])
1643 self.assertEqual(api_auth.data_len, len(cfg_auth['data']))
1644
1645 def verify_lifetime_data(self, p, ld):
1646 self.assertEqual(p.lifetime, ld['lifetime'])
1647 self.assertEqual(p.lifetime_maxdata, ld['lifetime_maxdata'])
1648 self.assertEqual(p.lifetime_jitter, ld['lifetime_jitter'])
1649 self.assertEqual(p.handover, ld['handover'])
1650
1651 def verify_profile(self, ap, cp):
1652 self.assertEqual(ap.name, cp['name'])
1653 self.assertEqual(ap.udp_encap, cp['udp_encap'])
1654 self.verify_id(ap.loc_id, cp['loc_id'])
1655 self.verify_id(ap.rem_id, cp['rem_id'])
1656 self.verify_ts(ap.loc_ts, cp['loc_ts'])
1657 self.verify_ts(ap.rem_ts, cp['rem_ts'])
1658 self.verify_responder(ap.responder, cp['responder'])
1659 self.verify_ike_transforms(ap.ike_ts, cp['ike_ts'])
1660 self.verify_esp_transforms(ap.esp_ts, cp['esp_ts'])
1661 self.verify_auth(ap.auth, cp['auth'])
Filip Tehlard7fc12f2020-10-30 04:47:44 +00001662 natt_dis = False if 'natt_disabled' not in cp else cp['natt_disabled']
1663 self.assertTrue(natt_dis == ap.natt_disabled)
1664
Filip Tehlar459d17b2020-07-06 15:40:08 +00001665 if 'lifetime_data' in cp:
1666 self.verify_lifetime_data(ap, cp['lifetime_data'])
1667 self.assertEqual(ap.ipsec_over_udp_port, cp['ipsec_over_udp_port'])
1668 if 'tun_itf' in cp:
1669 self.assertEqual(ap.tun_itf, cp['tun_itf'])
1670 else:
1671 self.assertEqual(ap.tun_itf, 0xffffffff)
1672
1673
Andrew Yourtchenko8dc0d482021-01-29 13:17:19 +00001674@tag_fixme_vpp_workers
Filip Tehlar027d8132020-12-04 17:38:11 +00001675class TestResponderBehindNAT(TemplateResponder, Ikev2Params):
1676 """ test responder - responder behind NAT """
1677
Filip Tehlarfab5e7f2021-01-14 13:32:01 +00001678 IKE_NODE_SUFFIX = 'ip4-natt'
1679
Filip Tehlar027d8132020-12-04 17:38:11 +00001680 def config_tc(self):
1681 self.config_params({'r_natt': True})
1682
1683
Andrew Yourtchenko8dc0d482021-01-29 13:17:19 +00001684@tag_fixme_vpp_workers
Filip Tehlar18107c92020-12-01 14:51:09 +00001685class TestInitiatorNATT(TemplateInitiator, Ikev2Params):
1686 """ test ikev2 initiator - NAT traversal (intitiator behind NAT) """
1687
1688 def config_tc(self):
1689 self.config_params({
Filip Tehlar027d8132020-12-04 17:38:11 +00001690 'i_natt': True,
Filip Tehlar18107c92020-12-01 14:51:09 +00001691 'is_initiator': False, # seen from test case perspective
1692 # thus vpp is initiator
1693 'responder': {'sw_if_index': self.pg0.sw_if_index,
1694 'addr': self.pg0.remote_ip4},
1695 'ike-crypto': ('AES-GCM-16ICV', 32),
1696 'ike-integ': 'NULL',
1697 'ike-dh': '3072MODPgr',
1698 'ike_transforms': {
1699 'crypto_alg': 20, # "aes-gcm-16"
1700 'crypto_key_size': 256,
1701 'dh_group': 15, # "modp-3072"
1702 },
1703 'esp_transforms': {
1704 'crypto_alg': 12, # "aes-cbc"
1705 'crypto_key_size': 256,
1706 # "hmac-sha2-256-128"
1707 'integ_alg': 12}})
1708
1709
Andrew Yourtchenko8dc0d482021-01-29 13:17:19 +00001710@tag_fixme_vpp_workers
Filip Tehlare7c83962020-09-23 11:20:12 +00001711class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
1712 """ test ikev2 initiator - pre shared key auth """
Filip Tehlaredf29002020-10-10 04:39:11 +00001713
Filip Tehlare7c83962020-09-23 11:20:12 +00001714 def config_tc(self):
1715 self.config_params({
1716 'is_initiator': False, # seen from test case perspective
1717 # thus vpp is initiator
Filip Tehlare7c83962020-09-23 11:20:12 +00001718 'ike-crypto': ('AES-GCM-16ICV', 32),
1719 'ike-integ': 'NULL',
1720 'ike-dh': '3072MODPgr',
1721 'ike_transforms': {
1722 'crypto_alg': 20, # "aes-gcm-16"
1723 'crypto_key_size': 256,
1724 'dh_group': 15, # "modp-3072"
1725 },
1726 'esp_transforms': {
1727 'crypto_alg': 12, # "aes-cbc"
1728 'crypto_key_size': 256,
1729 # "hmac-sha2-256-128"
Filip Tehlaraf2cc642021-02-22 16:15:51 +00001730 'integ_alg': 12},
1731 'responder_hostname': {'hostname': 'vpp.responder.org',
1732 'sw_if_index': self.pg0.sw_if_index}})
Filip Tehlare7c83962020-09-23 11:20:12 +00001733
1734
Andrew Yourtchenko8dc0d482021-01-29 13:17:19 +00001735@tag_fixme_vpp_workers
Filip Tehlar38340fa2020-11-19 21:34:48 +00001736class TestInitiatorRequestWindowSize(TestInitiatorPsk):
1737 """ test initiator - request window size (1) """
1738
1739 def rekey_respond(self, req, update_child_sa_data):
1740 ih = self.get_ike_header(req)
1741 plain = self.sa.hmac_and_decrypt(ih)
1742 sa = ikev2.IKEv2_payload_SA(plain)
1743 if update_child_sa_data:
1744 prop = sa[ikev2.IKEv2_payload_Proposal]
1745 self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1746 self.sa.r_nonce = self.sa.i_nonce
1747 self.sa.child_sas[0].ispi = prop.SPI
1748 self.sa.child_sas[0].rspi = prop.SPI
1749 self.sa.calc_child_keys()
1750
1751 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1752 flags='Response', exch_type=36,
1753 id=ih.id, next_payload='Encrypted')
1754 resp = self.encrypt_ike_msg(header, sa, 'SA')
1755 packet = self.create_packet(self.pg0, resp, self.sa.sport,
1756 self.sa.dport, self.sa.natt, self.ip6)
1757 self.send_and_assert_no_replies(self.pg0, packet)
1758
1759 def test_initiator(self):
1760 super(TestInitiatorRequestWindowSize, self).test_initiator()
1761 self.pg0.enable_capture()
1762 self.pg_start()
1763 ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
1764 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1765 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1766 capture = self.pg0.get_capture(2)
1767
1768 # reply in reverse order
1769 self.rekey_respond(capture[1], True)
1770 self.rekey_respond(capture[0], False)
1771
1772 # verify that only the second request was accepted
1773 self.verify_ike_sas()
1774 self.verify_ipsec_sas(is_rekey=True)
1775
1776
Andrew Yourtchenko8dc0d482021-01-29 13:17:19 +00001777@tag_fixme_vpp_workers
Filip Tehlar68ad6252020-10-30 05:28:11 +00001778class TestInitiatorRekey(TestInitiatorPsk):
1779 """ test ikev2 initiator - rekey """
1780
1781 def rekey_from_initiator(self):
1782 ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
1783 self.pg0.enable_capture()
1784 self.pg_start()
1785 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1786 capture = self.pg0.get_capture(1)
1787 ih = self.get_ike_header(capture[0])
1788 self.assertEqual(ih.exch_type, 36) # CHILD_SA
1789 self.assertNotIn('Response', ih.flags)
1790 self.assertIn('Initiator', ih.flags)
1791 plain = self.sa.hmac_and_decrypt(ih)
1792 sa = ikev2.IKEv2_payload_SA(plain)
1793 prop = sa[ikev2.IKEv2_payload_Proposal]
Filip Tehlar68ad6252020-10-30 05:28:11 +00001794 self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1795 self.sa.r_nonce = self.sa.i_nonce
1796 # update new responder SPI
1797 self.sa.child_sas[0].ispi = prop.SPI
1798 self.sa.child_sas[0].rspi = prop.SPI
1799 self.sa.calc_child_keys()
1800 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1801 flags='Response', exch_type=36,
1802 id=ih.id, next_payload='Encrypted')
1803 resp = self.encrypt_ike_msg(header, sa, 'SA')
1804 packet = self.create_packet(self.pg0, resp, self.sa.sport,
1805 self.sa.dport, self.sa.natt, self.ip6)
1806 self.send_and_assert_no_replies(self.pg0, packet)
1807
1808 def test_initiator(self):
1809 super(TestInitiatorRekey, self).test_initiator()
1810 self.rekey_from_initiator()
1811 self.verify_ike_sas()
1812 self.verify_ipsec_sas(is_rekey=True)
1813
1814
Andrew Yourtchenko8dc0d482021-01-29 13:17:19 +00001815@tag_fixme_vpp_workers
Filip Tehlaredf29002020-10-10 04:39:11 +00001816class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
1817 """ test ikev2 initiator - delete IKE SA from responder """
1818
1819 def config_tc(self):
1820 self.config_params({
1821 'del_sa_from_responder': True,
1822 'is_initiator': False, # seen from test case perspective
1823 # thus vpp is initiator
1824 'responder': {'sw_if_index': self.pg0.sw_if_index,
1825 'addr': self.pg0.remote_ip4},
1826 'ike-crypto': ('AES-GCM-16ICV', 32),
1827 'ike-integ': 'NULL',
1828 'ike-dh': '3072MODPgr',
1829 'ike_transforms': {
1830 'crypto_alg': 20, # "aes-gcm-16"
1831 'crypto_key_size': 256,
1832 'dh_group': 15, # "modp-3072"
1833 },
1834 'esp_transforms': {
1835 'crypto_alg': 12, # "aes-cbc"
1836 'crypto_key_size': 256,
1837 # "hmac-sha2-256-128"
Benoît Gannec7cceee2021-09-28 11:19:37 +02001838 'integ_alg': 12},
1839 'no_idr_in_auth': True})
Filip Tehlaredf29002020-10-10 04:39:11 +00001840
1841
Andrew Yourtchenko8dc0d482021-01-29 13:17:19 +00001842@tag_fixme_vpp_workers
Filip Tehlar027d8132020-12-04 17:38:11 +00001843class TestResponderInitBehindNATT(TemplateResponder, Ikev2Params):
1844 """ test ikev2 responder - initiator behind NAT """
Filip Tehlarfab5e7f2021-01-14 13:32:01 +00001845
1846 IKE_NODE_SUFFIX = 'ip4-natt'
1847
Filip Tehlarbfeae8c2020-06-23 20:35:58 +00001848 def config_tc(self):
1849 self.config_params(
Filip Tehlar027d8132020-12-04 17:38:11 +00001850 {'i_natt': True})
Filip Tehlarbfeae8c2020-06-23 20:35:58 +00001851
1852
Andrew Yourtchenko8dc0d482021-01-29 13:17:19 +00001853@tag_fixme_vpp_workers
Filip Tehlarbfeae8c2020-06-23 20:35:58 +00001854class TestResponderPsk(TemplateResponder, Ikev2Params):
1855 """ test ikev2 responder - pre shared key auth """
1856 def config_tc(self):
1857 self.config_params()
1858
1859
Andrew Yourtchenko8dc0d482021-01-29 13:17:19 +00001860@tag_fixme_vpp_workers
Filip Tehlar2008e312020-11-09 13:23:24 +00001861class TestResponderDpd(TestResponderPsk):
1862 """
1863 Dead peer detection test
1864 """
1865 def config_tc(self):
1866 self.config_params({'dpd_disabled': False})
1867
1868 def tearDown(self):
1869 pass
1870
1871 def test_responder(self):
1872 self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
1873 super(TestResponderDpd, self).test_responder()
1874 self.pg0.enable_capture()
1875 self.pg_start()
1876 # capture empty request but don't reply
1877 capture = self.pg0.get_capture(expected_count=1, timeout=5)
1878 ih = self.get_ike_header(capture[0])
1879 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
1880 plain = self.sa.hmac_and_decrypt(ih)
1881 self.assertEqual(plain, b'')
1882 # wait for SA expiration
1883 time.sleep(3)
1884 ike_sas = self.vapi.ikev2_sa_dump()
1885 self.assertEqual(len(ike_sas), 0)
1886 ipsec_sas = self.vapi.ipsec_sa_dump()
1887 self.assertEqual(len(ipsec_sas), 0)
1888
1889
Andrew Yourtchenko8dc0d482021-01-29 13:17:19 +00001890@tag_fixme_vpp_workers
Filip Tehlar68ad6252020-10-30 05:28:11 +00001891class TestResponderRekey(TestResponderPsk):
1892 """ test ikev2 responder - rekey """
1893
1894 def rekey_from_initiator(self):
Filip Tehlar38340fa2020-11-19 21:34:48 +00001895 packet = self.create_rekey_request()
Filip Tehlar68ad6252020-10-30 05:28:11 +00001896 self.pg0.add_stream(packet)
1897 self.pg0.enable_capture()
1898 self.pg_start()
1899 capture = self.pg0.get_capture(1)
1900 ih = self.get_ike_header(capture[0])
1901 plain = self.sa.hmac_and_decrypt(ih)
1902 sa = ikev2.IKEv2_payload_SA(plain)
1903 prop = sa[ikev2.IKEv2_payload_Proposal]
Filip Tehlar68ad6252020-10-30 05:28:11 +00001904 self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1905 # update new responder SPI
1906 self.sa.child_sas[0].rspi = prop.SPI
1907
1908 def test_responder(self):
1909 super(TestResponderRekey, self).test_responder()
1910 self.rekey_from_initiator()
1911 self.sa.calc_child_keys()
1912 self.verify_ike_sas()
1913 self.verify_ipsec_sas(is_rekey=True)
Filip Tehlar68d27532021-01-25 10:09:27 +00001914 self.assert_counter(1, 'rekey_req', 'ip4')
1915 r = self.vapi.ikev2_sa_dump()
1916 self.assertEqual(r[0].sa.stats.n_rekey_req, 1)
Filip Tehlar68ad6252020-10-30 05:28:11 +00001917
1918
Filip Tehlard28196f2021-01-27 18:08:21 +00001919class TestResponderVrf(TestResponderPsk, Ikev2Params):
1920 """ test ikev2 responder - non-default table id """
1921
1922 @classmethod
1923 def setUpClass(cls):
1924 import scapy.contrib.ikev2 as _ikev2
1925 globals()['ikev2'] = _ikev2
1926 super(IkePeer, cls).setUpClass()
1927 cls.create_pg_interfaces(range(1))
1928 cls.vapi.cli("ip table add 1")
1929 cls.vapi.cli("set interface ip table pg0 1")
1930 for i in cls.pg_interfaces:
1931 i.admin_up()
1932 i.config_ip4()
1933 i.resolve_arp()
1934 i.config_ip6()
1935 i.resolve_ndp()
1936
1937 def config_tc(self):
1938 self.config_params({'dpd_disabled': False})
1939
1940 def test_responder(self):
1941 self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
1942 super(TestResponderVrf, self).test_responder()
1943 self.pg0.enable_capture()
1944 self.pg_start()
1945 capture = self.pg0.get_capture(expected_count=1, timeout=5)
1946 ih = self.get_ike_header(capture[0])
1947 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
1948 plain = self.sa.hmac_and_decrypt(ih)
1949 self.assertEqual(plain, b'')
1950
1951
Andrew Yourtchenko8dc0d482021-01-29 13:17:19 +00001952@tag_fixme_vpp_workers
Filip Tehlarbfeae8c2020-06-23 20:35:58 +00001953class TestResponderRsaSign(TemplateResponder, Ikev2Params):
1954 """ test ikev2 responder - cert based auth """
1955 def config_tc(self):
1956 self.config_params({
Filip Tehlar67b8a7f2020-11-06 11:00:42 +00001957 'udp_encap': True,
Filip Tehlarbfeae8c2020-06-23 20:35:58 +00001958 'auth': 'rsa-sig',
1959 'server-key': 'server-key.pem',
1960 'client-key': 'client-key.pem',
1961 'client-cert': 'client-cert.pem',
1962 'server-cert': 'server-cert.pem'})
1963
Filip Tehlar4f42a712020-07-01 08:56:59 +00001964
Andrew Yourtchenko8dc0d482021-01-29 13:17:19 +00001965@tag_fixme_vpp_workers
Filip Tehlar4f42a712020-07-01 08:56:59 +00001966class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\
1967 (TemplateResponder, Ikev2Params):
1968 """
1969 IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
1970 """
1971 def config_tc(self):
1972 self.config_params({
1973 'ike-crypto': ('AES-CBC', 16),
1974 'ike-integ': 'SHA2-256-128',
1975 'esp-crypto': ('AES-CBC', 24),
1976 'esp-integ': 'SHA2-384-192',
Benoît Gannea4276902021-09-27 15:37:48 +02001977 'ike-dh': '2048MODPgr',
Benoît Gannec7cceee2021-09-28 11:19:37 +02001978 'nonce': os.urandom(256),
1979 'no_idr_in_auth': True})
Filip Tehlar4f42a712020-07-01 08:56:59 +00001980
1981
Andrew Yourtchenko8dc0d482021-01-29 13:17:19 +00001982@tag_fixme_vpp_workers
Filip Tehlar4f42a712020-07-01 08:56:59 +00001983class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\
1984 (TemplateResponder, Ikev2Params):
Filip Tehlarfab5e7f2021-01-14 13:32:01 +00001985
Filip Tehlar4f42a712020-07-01 08:56:59 +00001986 """
1987 IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
1988 """
1989 def config_tc(self):
1990 self.config_params({
1991 'ike-crypto': ('AES-CBC', 32),
1992 'ike-integ': 'SHA2-256-128',
1993 'esp-crypto': ('AES-GCM-16ICV', 32),
1994 'esp-integ': 'NULL',
1995 'ike-dh': '3072MODPgr'})
1996
1997
Andrew Yourtchenko8dc0d482021-01-29 13:17:19 +00001998@tag_fixme_vpp_workers
Filip Tehlara7b963d2020-07-08 13:25:34 +00001999class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
2000 """
2001 IKE:AES_GCM_16_256
2002 """
Filip Tehlarfab5e7f2021-01-14 13:32:01 +00002003
2004 IKE_NODE_SUFFIX = 'ip6'
2005
Filip Tehlara7b963d2020-07-08 13:25:34 +00002006 def config_tc(self):
2007 self.config_params({
Filip Tehlaredf29002020-10-10 04:39:11 +00002008 'del_sa_from_responder': True,
Filip Tehlar84962d12020-09-08 06:08:05 +00002009 'ip6': True,
2010 'natt': True,
Filip Tehlara7b963d2020-07-08 13:25:34 +00002011 'ike-crypto': ('AES-GCM-16ICV', 32),
2012 'ike-integ': 'NULL',
Filip Tehlar84962d12020-09-08 06:08:05 +00002013 'ike-dh': '2048MODPgr',
2014 'loc_ts': {'start_addr': 'ab:cd::0',
2015 'end_addr': 'ab:cd::10'},
2016 'rem_ts': {'start_addr': '11::0',
2017 'end_addr': '11::100'}})
Filip Tehlara7b963d2020-07-08 13:25:34 +00002018
2019
Andrew Yourtchenko8dc0d482021-01-29 13:17:19 +00002020@tag_fixme_vpp_workers
Filip Tehlar2008e312020-11-09 13:23:24 +00002021class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
2022 """
2023 Test for keep alive messages
2024 """
2025
2026 def send_empty_req_from_responder(self):
Filip Tehlar38340fa2020-11-19 21:34:48 +00002027 packet = self.create_empty_request()
Filip Tehlar2008e312020-11-09 13:23:24 +00002028 self.pg0.add_stream(packet)
2029 self.pg0.enable_capture()
2030 self.pg_start()
2031 capture = self.pg0.get_capture(1)
2032 ih = self.get_ike_header(capture[0])
2033 self.assertEqual(ih.id, self.sa.msg_id)
2034 plain = self.sa.hmac_and_decrypt(ih)
2035 self.assertEqual(plain, b'')
Filip Tehlarfab5e7f2021-01-14 13:32:01 +00002036 self.assert_counter(1, 'keepalive', 'ip4')
Filip Tehlar68d27532021-01-25 10:09:27 +00002037 r = self.vapi.ikev2_sa_dump()
2038 self.assertEqual(1, r[0].sa.stats.n_keepalives)
Filip Tehlar2008e312020-11-09 13:23:24 +00002039
2040 def test_initiator(self):
2041 super(TestInitiatorKeepaliveMsg, self).test_initiator()
2042 self.send_empty_req_from_responder()
2043
2044
Filip Tehlar558607d2020-07-16 07:25:56 +00002045class TestMalformedMessages(TemplateResponder, Ikev2Params):
2046 """ malformed packet test """
2047
2048 def tearDown(self):
2049 pass
2050
2051 def config_tc(self):
2052 self.config_params()
2053
Filip Tehlar558607d2020-07-16 07:25:56 +00002054 def create_ike_init_msg(self, length=None, payload=None):
2055 msg = ikev2.IKEv2(length=length, init_SPI='\x11' * 8,
2056 flags='Initiator', exch_type='IKE_SA_INIT')
2057 if payload is not None:
2058 msg /= payload
2059 return self.create_packet(self.pg0, msg, self.sa.sport,
2060 self.sa.dport)
2061
2062 def verify_bad_packet_length(self):
2063 ike_msg = self.create_ike_init_msg(length=0xdead)
2064 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
Filip Tehlarfab5e7f2021-01-14 13:32:01 +00002065 self.assert_counter(self.pkt_count, 'bad_length')
Filip Tehlar558607d2020-07-16 07:25:56 +00002066
2067 def verify_bad_sa_payload_length(self):
2068 p = ikev2.IKEv2_payload_SA(length=0xdead)
2069 ike_msg = self.create_ike_init_msg(payload=p)
2070 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
Filip Tehlarfab5e7f2021-01-14 13:32:01 +00002071 self.assert_counter(self.pkt_count, 'malformed_packet')
Filip Tehlar558607d2020-07-16 07:25:56 +00002072
2073 def test_responder(self):
2074 self.pkt_count = 254
2075 self.verify_bad_packet_length()
2076 self.verify_bad_sa_payload_length()
2077
2078
Filip Tehlar12b517b2020-04-26 18:05:05 +00002079if __name__ == '__main__':
2080 unittest.main(testRunner=VppTestRunner)