blob: 58a7ec3cd9a45e4554b4dfd7256d4405df6ae719 [file] [log] [blame]
Filip Tehlar12b517b2020-04-26 18:05:05 +00001import os
Filip Tehlar2008e312020-11-09 13:23:24 +00002import time
Filip Tehlarec112e52020-10-07 23:52:37 +00003from socket import inet_pton
Filip Tehlarbfeae8c2020-06-23 20:35:58 +00004from cryptography import x509
Filip Tehlar12b517b2020-04-26 18:05:05 +00005from cryptography.hazmat.backends import default_backend
6from cryptography.hazmat.primitives import hashes, hmac
Filip Tehlarbfeae8c2020-06-23 20:35:58 +00007from cryptography.hazmat.primitives.asymmetric import dh, padding
8from cryptography.hazmat.primitives.serialization import load_pem_private_key
Filip Tehlar12b517b2020-04-26 18:05:05 +00009from cryptography.hazmat.primitives.ciphers import (
10 Cipher,
11 algorithms,
12 modes,
13)
Filip Tehlar84962d12020-09-08 06:08:05 +000014from ipaddress import IPv4Address, IPv6Address, ip_address
Paul Vinciguerrae061dad2020-12-04 14:57:51 -050015import unittest
Klement Sekerab23ffd72021-05-31 16:08:53 +020016from config import config
Filip Tehlarbfeae8c2020-06-23 20:35:58 +000017from scapy.layers.ipsec import ESP
Filip Tehlar12b517b2020-04-26 18:05:05 +000018from scapy.layers.inet import IP, UDP, Ether
Filip Tehlar84962d12020-09-08 06:08:05 +000019from scapy.layers.inet6 import IPv6
Filip Tehlar12b517b2020-04-26 18:05:05 +000020from scapy.packet import raw, Raw
21from scapy.utils import long_converter
Andrew Yourtchenko8dc0d482021-01-29 13:17:19 +000022from framework import tag_fixme_vpp_workers
Filip Tehlar12b517b2020-04-26 18:05:05 +000023from framework import VppTestCase, VppTestRunner
Filip Tehlarbfeae8c2020-06-23 20:35:58 +000024from vpp_ikev2 import Profile, IDType, AuthMethod
Filip Tehlar4f42a712020-07-01 08:56:59 +000025from vpp_papi import VppEnum
Filip Tehlar12b517b2020-04-26 18:05:05 +000026
Filip Tehlar84962d12020-09-08 06:08:05 +000027try:
28 text_type = unicode
29except NameError:
30 text_type = str
Filip Tehlar12b517b2020-04-26 18:05:05 +000031
32KEY_PAD = b"Key Pad for IKEv2"
Filip Tehlara7b963d2020-07-08 13:25:34 +000033SALT_SIZE = 4
34GCM_ICV_SIZE = 16
35GCM_IV_SIZE = 8
Filip Tehlar12b517b2020-04-26 18:05:05 +000036
37
38# defined in rfc3526
39# tuple structure is (p, g, key_len)
40DH = {
41 '2048MODPgr': (long_converter("""
42 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
43 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
44 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
45 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
46 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
47 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
48 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
49 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
50 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
51 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
Filip Tehlar4f42a712020-07-01 08:56:59 +000052 15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""), 2, 256),
53
54 '3072MODPgr': (long_converter("""
55 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
56 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
57 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
58 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
59 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
60 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
61 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
62 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
63 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
64 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
65 15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
66 ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
67 ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
68 F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
69 BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
70 43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""), 2, 384)
Filip Tehlar12b517b2020-04-26 18:05:05 +000071}
72
73
74class CryptoAlgo(object):
75 def __init__(self, name, cipher, mode):
76 self.name = name
77 self.cipher = cipher
78 self.mode = mode
79 if self.cipher is not None:
80 self.bs = self.cipher.block_size // 8
81
Filip Tehlara7b963d2020-07-08 13:25:34 +000082 if self.name == 'AES-GCM-16ICV':
83 self.iv_len = GCM_IV_SIZE
84 else:
85 self.iv_len = self.bs
Filip Tehlar12b517b2020-04-26 18:05:05 +000086
Filip Tehlara7b963d2020-07-08 13:25:34 +000087 def encrypt(self, data, key, aad=None):
88 iv = os.urandom(self.iv_len)
89 if aad is None:
90 encryptor = Cipher(self.cipher(key), self.mode(iv),
91 default_backend()).encryptor()
92 return iv + encryptor.update(data) + encryptor.finalize()
93 else:
94 salt = key[-SALT_SIZE:]
95 nonce = salt + iv
96 encryptor = Cipher(self.cipher(key[:-SALT_SIZE]), self.mode(nonce),
97 default_backend()).encryptor()
98 encryptor.authenticate_additional_data(aad)
99 data = encryptor.update(data) + encryptor.finalize()
100 data += encryptor.tag[:GCM_ICV_SIZE]
101 return iv + data
102
103 def decrypt(self, data, key, aad=None, icv=None):
104 if aad is None:
105 iv = data[:self.iv_len]
106 ct = data[self.iv_len:]
107 decryptor = Cipher(algorithms.AES(key),
108 self.mode(iv),
109 default_backend()).decryptor()
110 return decryptor.update(ct) + decryptor.finalize()
111 else:
112 salt = key[-SALT_SIZE:]
113 nonce = salt + data[:GCM_IV_SIZE]
114 ct = data[GCM_IV_SIZE:]
115 key = key[:-SALT_SIZE]
116 decryptor = Cipher(algorithms.AES(key),
117 self.mode(nonce, icv, len(icv)),
118 default_backend()).decryptor()
119 decryptor.authenticate_additional_data(aad)
Filip Tehlaredf29002020-10-10 04:39:11 +0000120 return decryptor.update(ct) + decryptor.finalize()
Filip Tehlar12b517b2020-04-26 18:05:05 +0000121
122 def pad(self, data):
123 pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
124 data = data + b'\x00' * (pad_len - 1)
Filip Tehlar558607d2020-07-16 07:25:56 +0000125 return data + bytes([pad_len - 1])
Filip Tehlar12b517b2020-04-26 18:05:05 +0000126
127
128class AuthAlgo(object):
129 def __init__(self, name, mac, mod, key_len, trunc_len=None):
130 self.name = name
131 self.mac = mac
132 self.mod = mod
133 self.key_len = key_len
134 self.trunc_len = trunc_len or key_len
135
136
137CRYPTO_ALGOS = {
138 'NULL': CryptoAlgo('NULL', cipher=None, mode=None),
139 'AES-CBC': CryptoAlgo('AES-CBC', cipher=algorithms.AES, mode=modes.CBC),
Filip Tehlar4f42a712020-07-01 08:56:59 +0000140 'AES-GCM-16ICV': CryptoAlgo('AES-GCM-16ICV', cipher=algorithms.AES,
141 mode=modes.GCM),
Filip Tehlar12b517b2020-04-26 18:05:05 +0000142}
143
144AUTH_ALGOS = {
145 'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
146 'HMAC-SHA1-96': AuthAlgo('HMAC-SHA1-96', hmac.HMAC, hashes.SHA1, 20, 12),
Filip Tehlar4f42a712020-07-01 08:56:59 +0000147 'SHA2-256-128': AuthAlgo('SHA2-256-128', hmac.HMAC, hashes.SHA256, 32, 16),
148 'SHA2-384-192': AuthAlgo('SHA2-384-192', hmac.HMAC, hashes.SHA256, 48, 24),
149 'SHA2-512-256': AuthAlgo('SHA2-512-256', hmac.HMAC, hashes.SHA256, 64, 32),
Filip Tehlar12b517b2020-04-26 18:05:05 +0000150}
151
152PRF_ALGOS = {
153 'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
154 'PRF_HMAC_SHA2_256': AuthAlgo('PRF_HMAC_SHA2_256', hmac.HMAC,
155 hashes.SHA256, 32),
156}
157
Filip Tehlar68ad6252020-10-30 05:28:11 +0000158CRYPTO_IDS = {
159 12: 'AES-CBC',
160 20: 'AES-GCM-16ICV',
161}
162
163INTEG_IDS = {
164 2: 'HMAC-SHA1-96',
165 12: 'SHA2-256-128',
166 13: 'SHA2-384-192',
167 14: 'SHA2-512-256',
168}
169
Filip Tehlar12b517b2020-04-26 18:05:05 +0000170
171class IKEv2ChildSA(object):
Filip Tehlar68ad6252020-10-30 05:28:11 +0000172 def __init__(self, local_ts, remote_ts, is_initiator):
173 spi = os.urandom(4)
174 if is_initiator:
175 self.ispi = spi
176 self.rspi = None
177 else:
178 self.rspi = spi
179 self.ispi = None
Filip Tehlar12b517b2020-04-26 18:05:05 +0000180 self.local_ts = local_ts
181 self.remote_ts = remote_ts
182
183
184class IKEv2SA(object):
Filip Tehlar84962d12020-09-08 06:08:05 +0000185 def __init__(self, test, is_initiator=True, i_id=None, r_id=None,
186 spi=b'\x01\x02\x03\x04\x05\x06\x07\x08', id_type='fqdn',
187 nonce=None, auth_data=None, local_ts=None, remote_ts=None,
Filip Tehlar027d8132020-12-04 17:38:11 +0000188 auth_method='shared-key', priv_key=None, i_natt=False,
189 r_natt=False, udp_encap=False):
Filip Tehlar67b8a7f2020-11-06 11:00:42 +0000190 self.udp_encap = udp_encap
Filip Tehlar027d8132020-12-04 17:38:11 +0000191 self.i_natt = i_natt
192 self.r_natt = r_natt
193 if i_natt or r_natt:
Filip Tehlarbfeae8c2020-06-23 20:35:58 +0000194 self.sport = 4500
195 self.dport = 4500
196 else:
197 self.sport = 500
198 self.dport = 500
Filip Tehlar558607d2020-07-16 07:25:56 +0000199 self.msg_id = 0
Filip Tehlar12b517b2020-04-26 18:05:05 +0000200 self.dh_params = None
201 self.test = test
Filip Tehlarbfeae8c2020-06-23 20:35:58 +0000202 self.priv_key = priv_key
Filip Tehlar12b517b2020-04-26 18:05:05 +0000203 self.is_initiator = is_initiator
204 nonce = nonce or os.urandom(32)
205 self.auth_data = auth_data
Filip Tehlar4128c7b2020-05-10 05:18:37 +0000206 self.i_id = i_id
207 self.r_id = r_id
Filip Tehlar12b517b2020-04-26 18:05:05 +0000208 if isinstance(id_type, str):
209 self.id_type = IDType.value(id_type)
210 else:
211 self.id_type = id_type
212 self.auth_method = auth_method
213 if self.is_initiator:
Filip Tehlar84962d12020-09-08 06:08:05 +0000214 self.rspi = 8 * b'\x00'
Filip Tehlar12b517b2020-04-26 18:05:05 +0000215 self.ispi = spi
Filip Tehlar12b517b2020-04-26 18:05:05 +0000216 self.i_nonce = nonce
217 else:
218 self.rspi = spi
Filip Tehlar84962d12020-09-08 06:08:05 +0000219 self.ispi = 8 * b'\x00'
Filip Tehlare7c83962020-09-23 11:20:12 +0000220 self.r_nonce = nonce
Filip Tehlar68ad6252020-10-30 05:28:11 +0000221 self.child_sas = [IKEv2ChildSA(local_ts, remote_ts,
222 self.is_initiator)]
Filip Tehlar12b517b2020-04-26 18:05:05 +0000223
Filip Tehlar558607d2020-07-16 07:25:56 +0000224 def new_msg_id(self):
225 self.msg_id += 1
226 return self.msg_id
227
Filip Tehlare7c83962020-09-23 11:20:12 +0000228 @property
229 def my_dh_pub_key(self):
230 if self.is_initiator:
231 return self.i_dh_data
232 return self.r_dh_data
233
234 @property
235 def peer_dh_pub_key(self):
236 if self.is_initiator:
237 return self.r_dh_data
Filip Tehlar12b517b2020-04-26 18:05:05 +0000238 return self.i_dh_data
239
Filip Tehlar027d8132020-12-04 17:38:11 +0000240 @property
241 def natt(self):
242 return self.i_natt or self.r_natt
243
Filip Tehlar12b517b2020-04-26 18:05:05 +0000244 def compute_secret(self):
245 priv = self.dh_private_key
Filip Tehlare7c83962020-09-23 11:20:12 +0000246 peer = self.peer_dh_pub_key
Filip Tehlar12b517b2020-04-26 18:05:05 +0000247 p, g, l = self.ike_group
248 return pow(int.from_bytes(peer, 'big'),
249 int.from_bytes(priv, 'big'), p).to_bytes(l, 'big')
250
251 def generate_dh_data(self):
252 # generate DH keys
Filip Tehlare7c83962020-09-23 11:20:12 +0000253 if self.ike_dh not in DH:
254 raise NotImplementedError('%s not in DH group' % self.ike_dh)
255
256 if self.dh_params is None:
257 dhg = DH[self.ike_dh]
258 pn = dh.DHParameterNumbers(dhg[0], dhg[1])
259 self.dh_params = pn.parameters(default_backend())
260
261 priv = self.dh_params.generate_private_key()
262 pub = priv.public_key()
263 x = priv.private_numbers().x
264 self.dh_private_key = x.to_bytes(priv.key_size // 8, 'big')
265 y = pub.public_numbers().y
266
Filip Tehlar12b517b2020-04-26 18:05:05 +0000267 if self.is_initiator:
Filip Tehlar12b517b2020-04-26 18:05:05 +0000268 self.i_dh_data = y.to_bytes(pub.key_size // 8, 'big')
Filip Tehlare7c83962020-09-23 11:20:12 +0000269 else:
270 self.r_dh_data = y.to_bytes(pub.key_size // 8, 'big')
Filip Tehlar12b517b2020-04-26 18:05:05 +0000271
272 def complete_dh_data(self):
273 self.dh_shared_secret = self.compute_secret()
274
275 def calc_child_keys(self):
276 prf = self.ike_prf_alg.mod()
277 s = self.i_nonce + self.r_nonce
278 c = self.child_sas[0]
279
280 encr_key_len = self.esp_crypto_key_len
Filip Tehlar4f42a712020-07-01 08:56:59 +0000281 integ_key_len = self.esp_integ_alg.key_len
282 salt_len = 0 if integ_key_len else 4
283
Filip Tehlar12b517b2020-04-26 18:05:05 +0000284 l = (integ_key_len * 2 +
Filip Tehlar4f42a712020-07-01 08:56:59 +0000285 encr_key_len * 2 +
286 salt_len * 2)
Filip Tehlar12b517b2020-04-26 18:05:05 +0000287 keymat = self.calc_prfplus(prf, self.sk_d, s, l)
288
289 pos = 0
290 c.sk_ei = keymat[pos:pos+encr_key_len]
291 pos += encr_key_len
292
Filip Tehlar4f42a712020-07-01 08:56:59 +0000293 if integ_key_len:
294 c.sk_ai = keymat[pos:pos+integ_key_len]
295 pos += integ_key_len
296 else:
297 c.salt_ei = keymat[pos:pos+salt_len]
298 pos += salt_len
Filip Tehlar12b517b2020-04-26 18:05:05 +0000299
300 c.sk_er = keymat[pos:pos+encr_key_len]
301 pos += encr_key_len
302
Filip Tehlar4f42a712020-07-01 08:56:59 +0000303 if integ_key_len:
304 c.sk_ar = keymat[pos:pos+integ_key_len]
305 pos += integ_key_len
306 else:
307 c.salt_er = keymat[pos:pos+salt_len]
308 pos += salt_len
Filip Tehlar12b517b2020-04-26 18:05:05 +0000309
310 def calc_prfplus(self, prf, key, seed, length):
311 r = b''
312 t = None
313 x = 1
314 while len(r) < length and x < 255:
315 if t is not None:
316 s = t
317 else:
318 s = b''
319 s = s + seed + bytes([x])
320 t = self.calc_prf(prf, key, s)
321 r = r + t
322 x = x + 1
323
324 if x == 255:
325 return None
326 return r
327
328 def calc_prf(self, prf, key, data):
Filip Tehlara7b963d2020-07-08 13:25:34 +0000329 h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
Filip Tehlar12b517b2020-04-26 18:05:05 +0000330 h.update(data)
331 return h.finalize()
332
333 def calc_keys(self):
334 prf = self.ike_prf_alg.mod()
335 # SKEYSEED = prf(Ni | Nr, g^ir)
336 s = self.i_nonce + self.r_nonce
337 self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
338
339 # calculate S = Ni | Nr | SPIi SPIr
340 s = s + self.ispi + self.rspi
341
342 prf_key_trunc = self.ike_prf_alg.trunc_len
343 encr_key_len = self.ike_crypto_key_len
344 tr_prf_key_len = self.ike_prf_alg.key_len
345 integ_key_len = self.ike_integ_alg.key_len
Filip Tehlara7b963d2020-07-08 13:25:34 +0000346 if integ_key_len == 0:
347 salt_size = 4
348 else:
349 salt_size = 0
350
Filip Tehlar12b517b2020-04-26 18:05:05 +0000351 l = (prf_key_trunc +
352 integ_key_len * 2 +
353 encr_key_len * 2 +
Filip Tehlara7b963d2020-07-08 13:25:34 +0000354 tr_prf_key_len * 2 +
355 salt_size * 2)
Filip Tehlar12b517b2020-04-26 18:05:05 +0000356 keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
357
358 pos = 0
359 self.sk_d = keymat[:pos+prf_key_trunc]
360 pos += prf_key_trunc
361
362 self.sk_ai = keymat[pos:pos+integ_key_len]
363 pos += integ_key_len
364 self.sk_ar = keymat[pos:pos+integ_key_len]
365 pos += integ_key_len
366
Filip Tehlara7b963d2020-07-08 13:25:34 +0000367 self.sk_ei = keymat[pos:pos+encr_key_len + salt_size]
368 pos += encr_key_len + salt_size
369 self.sk_er = keymat[pos:pos+encr_key_len + salt_size]
370 pos += encr_key_len + salt_size
Filip Tehlar12b517b2020-04-26 18:05:05 +0000371
372 self.sk_pi = keymat[pos:pos+tr_prf_key_len]
373 pos += tr_prf_key_len
374 self.sk_pr = keymat[pos:pos+tr_prf_key_len]
375
376 def generate_authmsg(self, prf, packet):
377 if self.is_initiator:
378 id = self.i_id
379 nonce = self.r_nonce
380 key = self.sk_pi
Filip Tehlare7c83962020-09-23 11:20:12 +0000381 else:
382 id = self.r_id
383 nonce = self.i_nonce
384 key = self.sk_pr
Filip Tehlar12b517b2020-04-26 18:05:05 +0000385 data = bytes([self.id_type, 0, 0, 0]) + id
386 id_hash = self.calc_prf(prf, key, data)
387 return packet + nonce + id_hash
388
389 def auth_init(self):
390 prf = self.ike_prf_alg.mod()
Filip Tehlare7c83962020-09-23 11:20:12 +0000391 if self.is_initiator:
392 packet = self.init_req_packet
393 else:
394 packet = self.init_resp_packet
395 authmsg = self.generate_authmsg(prf, raw(packet))
Filip Tehlarbfeae8c2020-06-23 20:35:58 +0000396 if self.auth_method == 'shared-key':
397 psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
398 self.auth_data = self.calc_prf(prf, psk, authmsg)
399 elif self.auth_method == 'rsa-sig':
400 self.auth_data = self.priv_key.sign(authmsg, padding.PKCS1v15(),
401 hashes.SHA1())
402 else:
403 raise TypeError('unknown auth method type!')
Filip Tehlar12b517b2020-04-26 18:05:05 +0000404
Filip Tehlara7b963d2020-07-08 13:25:34 +0000405 def encrypt(self, data, aad=None):
Filip Tehlar12b517b2020-04-26 18:05:05 +0000406 data = self.ike_crypto_alg.pad(data)
Filip Tehlara7b963d2020-07-08 13:25:34 +0000407 return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
Filip Tehlar12b517b2020-04-26 18:05:05 +0000408
409 @property
410 def peer_authkey(self):
411 if self.is_initiator:
412 return self.sk_ar
413 return self.sk_ai
414
415 @property
416 def my_authkey(self):
417 if self.is_initiator:
418 return self.sk_ai
419 return self.sk_ar
420
421 @property
422 def my_cryptokey(self):
423 if self.is_initiator:
424 return self.sk_ei
425 return self.sk_er
426
427 @property
428 def peer_cryptokey(self):
429 if self.is_initiator:
430 return self.sk_er
431 return self.sk_ei
432
Filip Tehlar4f42a712020-07-01 08:56:59 +0000433 def concat(self, alg, key_len):
434 return alg + '-' + str(key_len * 8)
435
436 @property
437 def vpp_ike_cypto_alg(self):
438 return self.concat(self.ike_crypto, self.ike_crypto_key_len)
439
440 @property
441 def vpp_esp_cypto_alg(self):
442 return self.concat(self.esp_crypto, self.esp_crypto_key_len)
443
Filip Tehlar12b517b2020-04-26 18:05:05 +0000444 def verify_hmac(self, ikemsg):
445 integ_trunc = self.ike_integ_alg.trunc_len
446 exp_hmac = ikemsg[-integ_trunc:]
447 data = ikemsg[:-integ_trunc]
448 computed_hmac = self.compute_hmac(self.ike_integ_alg.mod(),
449 self.peer_authkey, data)
450 self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
451
452 def compute_hmac(self, integ, key, data):
453 h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
454 h.update(data)
455 return h.finalize()
456
Filip Tehlara7b963d2020-07-08 13:25:34 +0000457 def decrypt(self, data, aad=None, icv=None):
458 return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
Filip Tehlar12b517b2020-04-26 18:05:05 +0000459
460 def hmac_and_decrypt(self, ike):
461 ep = ike[ikev2.IKEv2_payload_Encrypted]
Filip Tehlara7b963d2020-07-08 13:25:34 +0000462 if self.ike_crypto == 'AES-GCM-16ICV':
463 aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
464 ct = ep.load[:-GCM_ICV_SIZE]
465 tag = ep.load[-GCM_ICV_SIZE:]
Filip Tehlaredf29002020-10-10 04:39:11 +0000466 plain = self.decrypt(ct, raw(ike)[:aad_len], tag)
Filip Tehlara7b963d2020-07-08 13:25:34 +0000467 else:
468 self.verify_hmac(raw(ike))
469 integ_trunc = self.ike_integ_alg.trunc_len
Filip Tehlar12b517b2020-04-26 18:05:05 +0000470
Filip Tehlara7b963d2020-07-08 13:25:34 +0000471 # remove ICV and decrypt payload
472 ct = ep.load[:-integ_trunc]
Filip Tehlaredf29002020-10-10 04:39:11 +0000473 plain = self.decrypt(ct)
474 # remove padding
475 pad_len = plain[-1]
476 return plain[:-pad_len - 1]
Filip Tehlar12b517b2020-04-26 18:05:05 +0000477
Filip Tehlar84962d12020-09-08 06:08:05 +0000478 def build_ts_addr(self, ts, version):
479 return {'starting_address_v' + version: ts['start_addr'],
480 'ending_address_v' + version: ts['end_addr']}
481
482 def generate_ts(self, is_ip4):
Filip Tehlar12b517b2020-04-26 18:05:05 +0000483 c = self.child_sas[0]
Filip Tehlar84962d12020-09-08 06:08:05 +0000484 ts_data = {'IP_protocol_ID': 0,
485 'start_port': 0,
486 'end_port': 0xffff}
487 if is_ip4:
488 ts_data.update(self.build_ts_addr(c.local_ts, '4'))
489 ts1 = ikev2.IPv4TrafficSelector(**ts_data)
490 ts_data.update(self.build_ts_addr(c.remote_ts, '4'))
491 ts2 = ikev2.IPv4TrafficSelector(**ts_data)
492 else:
493 ts_data.update(self.build_ts_addr(c.local_ts, '6'))
494 ts1 = ikev2.IPv6TrafficSelector(**ts_data)
495 ts_data.update(self.build_ts_addr(c.remote_ts, '6'))
496 ts2 = ikev2.IPv6TrafficSelector(**ts_data)
Filip Tehlare7c83962020-09-23 11:20:12 +0000497
498 if self.is_initiator:
499 return ([ts1], [ts2])
500 return ([ts2], [ts1])
Filip Tehlar12b517b2020-04-26 18:05:05 +0000501
502 def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
503 if crypto not in CRYPTO_ALGOS:
504 raise TypeError('unsupported encryption algo %r' % crypto)
505 self.ike_crypto = crypto
506 self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
507 self.ike_crypto_key_len = crypto_key_len
508
509 if integ not in AUTH_ALGOS:
510 raise TypeError('unsupported auth algo %r' % integ)
Filip Tehlara7b963d2020-07-08 13:25:34 +0000511 self.ike_integ = None if integ == 'NULL' else integ
Filip Tehlar12b517b2020-04-26 18:05:05 +0000512 self.ike_integ_alg = AUTH_ALGOS[integ]
513
514 if prf not in PRF_ALGOS:
515 raise TypeError('unsupported prf algo %r' % prf)
516 self.ike_prf = prf
517 self.ike_prf_alg = PRF_ALGOS[prf]
518 self.ike_dh = dh
519 self.ike_group = DH[self.ike_dh]
520
521 def set_esp_props(self, crypto, crypto_key_len, integ):
522 self.esp_crypto_key_len = crypto_key_len
523 if crypto not in CRYPTO_ALGOS:
524 raise TypeError('unsupported encryption algo %r' % crypto)
525 self.esp_crypto = crypto
526 self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
527
528 if integ not in AUTH_ALGOS:
529 raise TypeError('unsupported auth algo %r' % integ)
Filip Tehlar4f42a712020-07-01 08:56:59 +0000530 self.esp_integ = None if integ == 'NULL' else integ
Filip Tehlar12b517b2020-04-26 18:05:05 +0000531 self.esp_integ_alg = AUTH_ALGOS[integ]
532
533 def crypto_attr(self, key_len):
Filip Tehlara7b963d2020-07-08 13:25:34 +0000534 if self.ike_crypto in ['AES-CBC', 'AES-GCM-16ICV']:
Filip Tehlar12b517b2020-04-26 18:05:05 +0000535 return (0x800e << 16 | key_len << 3, 12)
536 else:
537 raise Exception('unsupported attribute type')
538
539 def ike_crypto_attr(self):
540 return self.crypto_attr(self.ike_crypto_key_len)
541
542 def esp_crypto_attr(self):
543 return self.crypto_attr(self.esp_crypto_key_len)
544
Filip Tehlarec112e52020-10-07 23:52:37 +0000545 def compute_nat_sha1(self, ip, port, rspi=None):
546 if rspi is None:
547 rspi = self.rspi
548 data = self.ispi + rspi + ip + (port).to_bytes(2, 'big')
Filip Tehlarbfeae8c2020-06-23 20:35:58 +0000549 digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
550 digest.update(data)
551 return digest.finalize()
Filip Tehlar12b517b2020-04-26 18:05:05 +0000552
Filip Tehlarbfeae8c2020-06-23 20:35:58 +0000553
Filip Tehlare7c83962020-09-23 11:20:12 +0000554class IkePeer(VppTestCase):
555 """ common class for initiator and responder """
Filip Tehlar12b517b2020-04-26 18:05:05 +0000556
557 @classmethod
558 def setUpClass(cls):
559 import scapy.contrib.ikev2 as _ikev2
560 globals()['ikev2'] = _ikev2
Filip Tehlare7c83962020-09-23 11:20:12 +0000561 super(IkePeer, cls).setUpClass()
Filip Tehlar12b517b2020-04-26 18:05:05 +0000562 cls.create_pg_interfaces(range(2))
563 for i in cls.pg_interfaces:
564 i.admin_up()
565 i.config_ip4()
566 i.resolve_arp()
Filip Tehlar84962d12020-09-08 06:08:05 +0000567 i.config_ip6()
568 i.resolve_ndp()
Filip Tehlar12b517b2020-04-26 18:05:05 +0000569
570 @classmethod
571 def tearDownClass(cls):
Filip Tehlare7c83962020-09-23 11:20:12 +0000572 super(IkePeer, cls).tearDownClass()
Filip Tehlar12b517b2020-04-26 18:05:05 +0000573
Filip Tehlaredf29002020-10-10 04:39:11 +0000574 def tearDown(self):
575 super(IkePeer, self).tearDown()
576 if self.del_sa_from_responder:
577 self.initiate_del_sa_from_responder()
578 else:
579 self.initiate_del_sa_from_initiator()
580 r = self.vapi.ikev2_sa_dump()
581 self.assertEqual(len(r), 0)
582 sas = self.vapi.ipsec_sa_dump()
583 self.assertEqual(len(sas), 0)
584 self.p.remove_vpp_config()
585 self.assertIsNone(self.p.query_vpp_config())
586
Filip Tehlar12b517b2020-04-26 18:05:05 +0000587 def setUp(self):
Filip Tehlare7c83962020-09-23 11:20:12 +0000588 super(IkePeer, self).setUp()
Filip Tehlar12b517b2020-04-26 18:05:05 +0000589 self.config_tc()
Filip Tehlar12b517b2020-04-26 18:05:05 +0000590 self.p.add_vpp_config()
Filip Tehlar459d17b2020-07-06 15:40:08 +0000591 self.assertIsNotNone(self.p.query_vpp_config())
Filip Tehlar68ad6252020-10-30 05:28:11 +0000592 if self.sa.is_initiator:
593 self.sa.generate_dh_data()
Filip Tehlar84962d12020-09-08 06:08:05 +0000594 self.vapi.cli('ikev2 set logging level 4')
595 self.vapi.cli('event-lo clear')
Filip Tehlar12b517b2020-04-26 18:05:05 +0000596
Filip Tehlarfab5e7f2021-01-14 13:32:01 +0000597 def assert_counter(self, count, name, version='ip4'):
598 node_name = '/err/ikev2-%s/' % version + name
599 self.assertEqual(count, self.statistics.get_err_counter(node_name))
600
Filip Tehlar38340fa2020-11-19 21:34:48 +0000601 def create_rekey_request(self):
602 sa, first_payload = self.generate_auth_payload(is_rekey=True)
603 header = ikev2.IKEv2(
604 init_SPI=self.sa.ispi,
605 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
606 flags='Initiator', exch_type='CREATE_CHILD_SA')
607
608 ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
609 return self.create_packet(self.pg0, ike_msg, self.sa.sport,
610 self.sa.dport, self.sa.natt, self.ip6)
611
612 def create_empty_request(self):
613 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
614 id=self.sa.new_msg_id(), flags='Initiator',
615 exch_type='INFORMATIONAL',
616 next_payload='Encrypted')
617
618 msg = self.encrypt_ike_msg(header, b'', None)
619 return self.create_packet(self.pg0, msg, self.sa.sport,
620 self.sa.dport, self.sa.natt, self.ip6)
621
Filip Tehlar84962d12020-09-08 06:08:05 +0000622 def create_packet(self, src_if, msg, sport=500, dport=500, natt=False,
623 use_ip6=False):
624 if use_ip6:
625 src_ip = src_if.remote_ip6
626 dst_ip = src_if.local_ip6
627 ip_layer = IPv6
628 else:
629 src_ip = src_if.remote_ip4
630 dst_ip = src_if.local_ip4
631 ip_layer = IP
Filip Tehlarbfeae8c2020-06-23 20:35:58 +0000632 res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
Filip Tehlar84962d12020-09-08 06:08:05 +0000633 ip_layer(src=src_ip, dst=dst_ip) /
Filip Tehlarbfeae8c2020-06-23 20:35:58 +0000634 UDP(sport=sport, dport=dport))
635 if natt:
636 # insert non ESP marker
637 res = res / Raw(b'\x00' * 4)
638 return res / msg
Filip Tehlar12b517b2020-04-26 18:05:05 +0000639
Filip Tehlare7c83962020-09-23 11:20:12 +0000640 def verify_udp(self, udp):
641 self.assertEqual(udp.sport, self.sa.sport)
642 self.assertEqual(udp.dport, self.sa.dport)
Filip Tehlar12b517b2020-04-26 18:05:05 +0000643
Filip Tehlare7c83962020-09-23 11:20:12 +0000644 def get_ike_header(self, packet):
645 try:
646 ih = packet[ikev2.IKEv2]
Filip Tehlar18107c92020-12-01 14:51:09 +0000647 ih = self.verify_and_remove_non_esp_marker(ih)
Filip Tehlare7c83962020-09-23 11:20:12 +0000648 except IndexError as e:
649 # this is a workaround for getting IKEv2 layer as both ikev2 and
650 # ipsec register for port 4500
651 esp = packet[ESP]
652 ih = self.verify_and_remove_non_esp_marker(esp)
653 self.assertEqual(ih.version, 0x20)
Filip Tehlaredf29002020-10-10 04:39:11 +0000654 self.assertNotIn('Version', ih.flags)
Filip Tehlare7c83962020-09-23 11:20:12 +0000655 return ih
Filip Tehlar12b517b2020-04-26 18:05:05 +0000656
Filip Tehlare7c83962020-09-23 11:20:12 +0000657 def verify_and_remove_non_esp_marker(self, packet):
658 if self.sa.natt:
659 # if we are in nat traversal mode check for non esp marker
660 # and remove it
661 data = raw(packet)
662 self.assertEqual(data[:4], b'\x00' * 4)
663 return ikev2.IKEv2(data[4:])
Filip Tehlarbfeae8c2020-06-23 20:35:58 +0000664 else:
Filip Tehlare7c83962020-09-23 11:20:12 +0000665 return packet
Filip Tehlar12b517b2020-04-26 18:05:05 +0000666
Filip Tehlar558607d2020-07-16 07:25:56 +0000667 def encrypt_ike_msg(self, header, plain, first_payload):
668 if self.sa.ike_crypto == 'AES-GCM-16ICV':
669 data = self.sa.ike_crypto_alg.pad(raw(plain))
670 plen = len(data) + GCM_IV_SIZE + GCM_ICV_SIZE +\
671 len(ikev2.IKEv2_payload_Encrypted())
672 tlen = plen + len(ikev2.IKEv2())
673
674 # prepare aad data
675 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
676 length=plen)
677 header.length = tlen
678 res = header / sk_p
679 encr = self.sa.encrypt(raw(plain), raw(res))
680 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
681 length=plen, load=encr)
682 res = header / sk_p
683 else:
684 encr = self.sa.encrypt(raw(plain))
685 trunc_len = self.sa.ike_integ_alg.trunc_len
686 plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
687 tlen = plen + len(ikev2.IKEv2())
688
689 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
690 length=plen, load=encr)
691 header.length = tlen
692 res = header / sk_p
693
694 integ_data = raw(res)
695 hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
696 self.sa.my_authkey, integ_data)
697 res = res / Raw(hmac_data[:trunc_len])
698 assert(len(res) == tlen)
699 return res
700
Filip Tehlar67b8a7f2020-11-06 11:00:42 +0000701 def verify_udp_encap(self, ipsec_sa):
702 e = VppEnum.vl_api_ipsec_sad_flags_t
703 if self.sa.udp_encap or self.sa.natt:
704 self.assertIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
705 else:
706 self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
707
Filip Tehlar68ad6252020-10-30 05:28:11 +0000708 def verify_ipsec_sas(self, is_rekey=False):
Filip Tehlar12b517b2020-04-26 18:05:05 +0000709 sas = self.vapi.ipsec_sa_dump()
Filip Tehlar68ad6252020-10-30 05:28:11 +0000710 if is_rekey:
711 # after rekey there is a short period of time in which old
712 # inbound SA is still present
713 sa_count = 3
714 else:
715 sa_count = 2
716 self.assertEqual(len(sas), sa_count)
Filip Tehlare7c83962020-09-23 11:20:12 +0000717 if self.sa.is_initiator:
Filip Tehlar68ad6252020-10-30 05:28:11 +0000718 if is_rekey:
719 sa0 = sas[0].entry
720 sa1 = sas[2].entry
721 else:
722 sa0 = sas[0].entry
723 sa1 = sas[1].entry
Filip Tehlare7c83962020-09-23 11:20:12 +0000724 else:
Filip Tehlar68ad6252020-10-30 05:28:11 +0000725 if is_rekey:
726 sa0 = sas[2].entry
727 sa1 = sas[0].entry
728 else:
729 sa1 = sas[0].entry
730 sa0 = sas[1].entry
Filip Tehlare7c83962020-09-23 11:20:12 +0000731
Filip Tehlar12b517b2020-04-26 18:05:05 +0000732 c = self.sa.child_sas[0]
733
Filip Tehlar67b8a7f2020-11-06 11:00:42 +0000734 self.verify_udp_encap(sa0)
735 self.verify_udp_encap(sa1)
Filip Tehlar4f42a712020-07-01 08:56:59 +0000736 vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
737 self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
738 self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
739
740 if self.sa.esp_integ is None:
741 vpp_integ_alg = 0
742 else:
743 vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
744 self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
745 self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
746
Filip Tehlar12b517b2020-04-26 18:05:05 +0000747 # verify crypto keys
748 self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
749 self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
750 self.assertEqual(sa0.crypto_key.data[:len(c.sk_er)], c.sk_er)
751 self.assertEqual(sa1.crypto_key.data[:len(c.sk_ei)], c.sk_ei)
752
753 # verify integ keys
Filip Tehlar4f42a712020-07-01 08:56:59 +0000754 if vpp_integ_alg:
755 self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
756 self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
757 self.assertEqual(sa0.integrity_key.data[:len(c.sk_ar)], c.sk_ar)
758 self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai)
759 else:
760 self.assertEqual(sa0.salt.to_bytes(4, 'little'), c.salt_er)
761 self.assertEqual(sa1.salt.to_bytes(4, 'little'), c.salt_ei)
Filip Tehlar12b517b2020-04-26 18:05:05 +0000762
jan_cavojskya340fe12020-07-08 09:24:12 +0200763 def verify_keymat(self, api_keys, keys, name):
764 km = getattr(keys, name)
765 api_km = getattr(api_keys, name)
766 api_km_len = getattr(api_keys, name + '_len')
767 self.assertEqual(len(km), api_km_len)
768 self.assertEqual(km, api_km[:api_km_len])
769
770 def verify_id(self, api_id, exp_id):
771 self.assertEqual(api_id.type, IDType.value(exp_id.type))
772 self.assertEqual(api_id.data_len, exp_id.data_len)
773 self.assertEqual(bytes(api_id.data, 'ascii'), exp_id.type)
774
775 def verify_ike_sas(self):
776 r = self.vapi.ikev2_sa_dump()
777 self.assertEqual(len(r), 1)
778 sa = r[0].sa
Filip Tehlar84962d12020-09-08 06:08:05 +0000779 self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, 'big'))
jan_cavojskya340fe12020-07-08 09:24:12 +0200780 self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, 'big'))
Filip Tehlar84962d12020-09-08 06:08:05 +0000781 if self.ip6:
Filip Tehlare7c83962020-09-23 11:20:12 +0000782 if self.sa.is_initiator:
783 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
784 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
785 else:
786 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
787 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
Filip Tehlar84962d12020-09-08 06:08:05 +0000788 else:
Filip Tehlare7c83962020-09-23 11:20:12 +0000789 if self.sa.is_initiator:
790 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
791 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
792 else:
793 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
794 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
jan_cavojskya340fe12020-07-08 09:24:12 +0200795 self.verify_keymat(sa.keys, self.sa, 'sk_d')
796 self.verify_keymat(sa.keys, self.sa, 'sk_ai')
797 self.verify_keymat(sa.keys, self.sa, 'sk_ar')
798 self.verify_keymat(sa.keys, self.sa, 'sk_ei')
799 self.verify_keymat(sa.keys, self.sa, 'sk_er')
800 self.verify_keymat(sa.keys, self.sa, 'sk_pi')
801 self.verify_keymat(sa.keys, self.sa, 'sk_pr')
802
803 self.assertEqual(sa.i_id.type, self.sa.id_type)
804 self.assertEqual(sa.r_id.type, self.sa.id_type)
805 self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
Benoît Gannec7cceee2021-09-28 11:19:37 +0200806 self.assertEqual(sa.r_id.data_len, len(self.idr))
jan_cavojskya340fe12020-07-08 09:24:12 +0200807 self.assertEqual(bytes(sa.i_id.data, 'ascii'), self.sa.i_id)
Benoît Gannec7cceee2021-09-28 11:19:37 +0200808 self.assertEqual(bytes(sa.r_id.data, 'ascii'), self.idr)
jan_cavojskya340fe12020-07-08 09:24:12 +0200809
810 r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
811 self.assertEqual(len(r), 1)
812 csa = r[0].child_sa
813 self.assertEqual(csa.sa_index, sa.sa_index)
814 c = self.sa.child_sas[0]
815 if hasattr(c, 'sk_ai'):
816 self.verify_keymat(csa.keys, c, 'sk_ai')
817 self.verify_keymat(csa.keys, c, 'sk_ar')
818 self.verify_keymat(csa.keys, c, 'sk_ei')
819 self.verify_keymat(csa.keys, c, 'sk_er')
Filip Tehlar68ad6252020-10-30 05:28:11 +0000820 self.assertEqual(csa.i_spi.to_bytes(4, 'big'), c.ispi)
821 self.assertEqual(csa.r_spi.to_bytes(4, 'big'), c.rspi)
jan_cavojskya340fe12020-07-08 09:24:12 +0200822
Filip Tehlar84962d12020-09-08 06:08:05 +0000823 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
jan_cavojskya340fe12020-07-08 09:24:12 +0200824 tsi = tsi[0]
825 tsr = tsr[0]
826 r = self.vapi.ikev2_traffic_selector_dump(
827 is_initiator=True, sa_index=sa.sa_index,
828 child_sa_index=csa.child_sa_index)
829 self.assertEqual(len(r), 1)
830 ts = r[0].ts
831 self.verify_ts(r[0].ts, tsi[0], True)
832
833 r = self.vapi.ikev2_traffic_selector_dump(
834 is_initiator=False, sa_index=sa.sa_index,
835 child_sa_index=csa.child_sa_index)
836 self.assertEqual(len(r), 1)
837 self.verify_ts(r[0].ts, tsr[0], False)
838
839 n = self.vapi.ikev2_nonce_get(is_initiator=True,
840 sa_index=sa.sa_index)
841 self.verify_nonce(n, self.sa.i_nonce)
842 n = self.vapi.ikev2_nonce_get(is_initiator=False,
843 sa_index=sa.sa_index)
844 self.verify_nonce(n, self.sa.r_nonce)
845
846 def verify_nonce(self, api_nonce, nonce):
847 self.assertEqual(api_nonce.data_len, len(nonce))
848 self.assertEqual(api_nonce.nonce, nonce)
849
850 def verify_ts(self, api_ts, ts, is_initiator):
851 if is_initiator:
852 self.assertTrue(api_ts.is_local)
853 else:
854 self.assertFalse(api_ts.is_local)
Filip Tehlar84962d12020-09-08 06:08:05 +0000855
856 if self.p.ts_is_ip4:
857 self.assertEqual(api_ts.start_addr,
858 IPv4Address(ts.starting_address_v4))
859 self.assertEqual(api_ts.end_addr,
860 IPv4Address(ts.ending_address_v4))
861 else:
862 self.assertEqual(api_ts.start_addr,
863 IPv6Address(ts.starting_address_v6))
864 self.assertEqual(api_ts.end_addr,
865 IPv6Address(ts.ending_address_v6))
jan_cavojskya340fe12020-07-08 09:24:12 +0200866 self.assertEqual(api_ts.start_port, ts.start_port)
867 self.assertEqual(api_ts.end_port, ts.end_port)
868 self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
869
Filip Tehlare7c83962020-09-23 11:20:12 +0000870
871class TemplateInitiator(IkePeer):
872 """ initiator test template """
873
Filip Tehlaredf29002020-10-10 04:39:11 +0000874 def initiate_del_sa_from_initiator(self):
875 ispi = int.from_bytes(self.sa.ispi, 'little')
876 self.pg0.enable_capture()
877 self.pg_start()
878 self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
879 capture = self.pg0.get_capture(1)
880 ih = self.get_ike_header(capture[0])
881 self.assertNotIn('Response', ih.flags)
882 self.assertIn('Initiator', ih.flags)
883 self.assertEqual(ih.init_SPI, self.sa.ispi)
884 self.assertEqual(ih.resp_SPI, self.sa.rspi)
885 plain = self.sa.hmac_and_decrypt(ih)
886 d = ikev2.IKEv2_payload_Delete(plain)
887 self.assertEqual(d.proto, 1) # proto=IKEv2
888 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
889 flags='Response', exch_type='INFORMATIONAL',
890 id=ih.id, next_payload='Encrypted')
891 resp = self.encrypt_ike_msg(header, b'', None)
892 self.send_and_assert_no_replies(self.pg0, resp)
893
894 def verify_del_sa(self, packet):
895 ih = self.get_ike_header(packet)
896 self.assertEqual(ih.id, self.sa.msg_id)
897 self.assertEqual(ih.exch_type, 37) # exchange informational
898 self.assertIn('Response', ih.flags)
899 self.assertIn('Initiator', ih.flags)
900 plain = self.sa.hmac_and_decrypt(ih)
901 self.assertEqual(plain, b'')
902
903 def initiate_del_sa_from_responder(self):
904 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
905 exch_type='INFORMATIONAL',
906 id=self.sa.new_msg_id())
907 del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
908 ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
909 packet = self.create_packet(self.pg0, ike_msg,
910 self.sa.sport, self.sa.dport,
911 self.sa.natt, self.ip6)
912 self.pg0.add_stream(packet)
913 self.pg0.enable_capture()
914 self.pg_start()
915 capture = self.pg0.get_capture(1)
916 self.verify_del_sa(capture[0])
Filip Tehlare7c83962020-09-23 11:20:12 +0000917
Filip Tehlarec112e52020-10-07 23:52:37 +0000918 @staticmethod
919 def find_notify_payload(packet, notify_type):
920 n = packet[ikev2.IKEv2_payload_Notify]
921 while n is not None:
922 if n.type == notify_type:
923 return n
924 n = n.payload
925 return None
926
927 def verify_nat_detection(self, packet):
928 if self.ip6:
929 iph = packet[IPv6]
930 else:
931 iph = packet[IP]
932 udp = packet[UDP]
933
934 # NAT_DETECTION_SOURCE_IP
935 s = self.find_notify_payload(packet, 16388)
936 self.assertIsNotNone(s)
937 src_sha = self.sa.compute_nat_sha1(
938 inet_pton(socket.AF_INET, iph.src), udp.sport, b'\x00' * 8)
939 self.assertEqual(s.load, src_sha)
940
941 # NAT_DETECTION_DESTINATION_IP
942 s = self.find_notify_payload(packet, 16389)
943 self.assertIsNotNone(s)
944 dst_sha = self.sa.compute_nat_sha1(
945 inet_pton(socket.AF_INET, iph.dst), udp.dport, b'\x00' * 8)
946 self.assertEqual(s.load, dst_sha)
947
Filip Tehlare7c83962020-09-23 11:20:12 +0000948 def verify_sa_init_request(self, packet):
Filip Tehlar18107c92020-12-01 14:51:09 +0000949 udp = packet[UDP]
950 self.sa.dport = udp.sport
Filip Tehlare7c83962020-09-23 11:20:12 +0000951 ih = packet[ikev2.IKEv2]
952 self.assertNotEqual(ih.init_SPI, 8 * b'\x00')
953 self.assertEqual(ih.exch_type, 34) # SA_INIT
954 self.sa.ispi = ih.init_SPI
955 self.assertEqual(ih.resp_SPI, 8 * b'\x00')
956 self.assertIn('Initiator', ih.flags)
957 self.assertNotIn('Response', ih.flags)
958 self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
959 self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
960
961 prop = packet[ikev2.IKEv2_payload_Proposal]
962 self.assertEqual(prop.proto, 1) # proto = ikev2
963 self.assertEqual(prop.proposal, 1)
964 self.assertEqual(prop.trans[0].transform_type, 1) # encryption
965 self.assertEqual(prop.trans[0].transform_id,
966 self.p.ike_transforms['crypto_alg'])
967 self.assertEqual(prop.trans[1].transform_type, 2) # prf
968 self.assertEqual(prop.trans[1].transform_id, 5) # "hmac-sha2-256"
969 self.assertEqual(prop.trans[2].transform_type, 4) # dh
970 self.assertEqual(prop.trans[2].transform_id,
971 self.p.ike_transforms['dh_group'])
972
Filip Tehlarec112e52020-10-07 23:52:37 +0000973 self.verify_nat_detection(packet)
Filip Tehlar68ad6252020-10-30 05:28:11 +0000974 self.sa.set_ike_props(
975 crypto='AES-GCM-16ICV', crypto_key_len=32,
976 integ='NULL', prf='PRF_HMAC_SHA2_256', dh='3072MODPgr')
977 self.sa.set_esp_props(crypto='AES-CBC', crypto_key_len=32,
978 integ='SHA2-256-128')
979 self.sa.generate_dh_data()
Filip Tehlare7c83962020-09-23 11:20:12 +0000980 self.sa.complete_dh_data()
981 self.sa.calc_keys()
982
Filip Tehlar68ad6252020-10-30 05:28:11 +0000983 def update_esp_transforms(self, trans, sa):
984 while trans:
985 if trans.transform_type == 1: # ecryption
986 sa.esp_crypto = CRYPTO_IDS[trans.transform_id]
987 elif trans.transform_type == 3: # integrity
988 sa.esp_integ = INTEG_IDS[trans.transform_id]
989 trans = trans.payload
990
Filip Tehlare7c83962020-09-23 11:20:12 +0000991 def verify_sa_auth_req(self, packet):
Filip Tehlar18107c92020-12-01 14:51:09 +0000992 udp = packet[UDP]
993 self.sa.dport = udp.sport
Filip Tehlare7c83962020-09-23 11:20:12 +0000994 ih = self.get_ike_header(packet)
995 self.assertEqual(ih.resp_SPI, self.sa.rspi)
996 self.assertEqual(ih.init_SPI, self.sa.ispi)
997 self.assertEqual(ih.exch_type, 35) # IKE_AUTH
998 self.assertIn('Initiator', ih.flags)
999 self.assertNotIn('Response', ih.flags)
1000
1001 udp = packet[UDP]
1002 self.verify_udp(udp)
1003 self.assertEqual(ih.id, self.sa.msg_id + 1)
1004 self.sa.msg_id += 1
1005 plain = self.sa.hmac_and_decrypt(ih)
1006 idi = ikev2.IKEv2_payload_IDi(plain)
Filip Tehlare7c83962020-09-23 11:20:12 +00001007 self.assertEqual(idi.load, self.sa.i_id)
Benoît Gannec7cceee2021-09-28 11:19:37 +02001008 if self.no_idr_auth:
1009 self.assertEqual(idi.next_payload, 39) # AUTH
1010 else:
1011 idr = ikev2.IKEv2_payload_IDr(idi.payload)
1012 self.assertEqual(idr.load, self.sa.r_id)
Filip Tehlar68ad6252020-10-30 05:28:11 +00001013 prop = idi[ikev2.IKEv2_payload_Proposal]
1014 c = self.sa.child_sas[0]
1015 c.ispi = prop.SPI
1016 self.update_esp_transforms(
1017 prop[ikev2.IKEv2_payload_Transform], self.sa)
Filip Tehlare7c83962020-09-23 11:20:12 +00001018
1019 def send_init_response(self):
1020 tr_attr = self.sa.ike_crypto_attr()
1021 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1022 transform_id=self.sa.ike_crypto, length=tr_attr[1],
1023 key_length=tr_attr[0]) /
1024 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1025 transform_id=self.sa.ike_integ) /
1026 ikev2.IKEv2_payload_Transform(transform_type='PRF',
1027 transform_id=self.sa.ike_prf_alg.name) /
1028 ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
1029 transform_id=self.sa.ike_dh))
1030 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
1031 trans_nb=4, trans=trans))
Filip Tehlar18107c92020-12-01 14:51:09 +00001032
1033 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1034 if self.sa.natt:
1035 dst_address = b'\x0a\x0a\x0a\x0a'
1036 else:
1037 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1038 src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1039 dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1040
Filip Tehlare7c83962020-09-23 11:20:12 +00001041 self.sa.init_resp_packet = (
1042 ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1043 exch_type='IKE_SA_INIT', flags='Response') /
1044 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
1045 ikev2.IKEv2_payload_KE(next_payload='Nonce',
1046 group=self.sa.ike_dh,
1047 load=self.sa.my_dh_pub_key) /
Filip Tehlar18107c92020-12-01 14:51:09 +00001048 ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce,
1049 next_payload='Notify') /
1050 ikev2.IKEv2_payload_Notify(
1051 type='NAT_DETECTION_SOURCE_IP', load=src_nat,
1052 next_payload='Notify') / ikev2.IKEv2_payload_Notify(
1053 type='NAT_DETECTION_DESTINATION_IP', load=dst_nat))
Filip Tehlare7c83962020-09-23 11:20:12 +00001054
1055 ike_msg = self.create_packet(self.pg0, self.sa.init_resp_packet,
1056 self.sa.sport, self.sa.dport,
Filip Tehlar18107c92020-12-01 14:51:09 +00001057 False, self.ip6)
Filip Tehlare7c83962020-09-23 11:20:12 +00001058 self.pg_send(self.pg0, ike_msg)
1059 capture = self.pg0.get_capture(1)
1060 self.verify_sa_auth_req(capture[0])
1061
1062 def initiate_sa_init(self):
1063 self.pg0.enable_capture()
1064 self.pg_start()
1065 self.vapi.ikev2_initiate_sa_init(name=self.p.profile_name)
1066
1067 capture = self.pg0.get_capture(1)
1068 self.verify_sa_init_request(capture[0])
1069 self.send_init_response()
1070
1071 def send_auth_response(self):
1072 tr_attr = self.sa.esp_crypto_attr()
1073 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1074 transform_id=self.sa.esp_crypto, length=tr_attr[1],
1075 key_length=tr_attr[0]) /
1076 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1077 transform_id=self.sa.esp_integ) /
1078 ikev2.IKEv2_payload_Transform(
1079 transform_type='Extended Sequence Number',
1080 transform_id='No ESN') /
1081 ikev2.IKEv2_payload_Transform(
1082 transform_type='Extended Sequence Number',
1083 transform_id='ESN'))
1084
Filip Tehlar68ad6252020-10-30 05:28:11 +00001085 c = self.sa.child_sas[0]
Filip Tehlare7c83962020-09-23 11:20:12 +00001086 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
Filip Tehlar68ad6252020-10-30 05:28:11 +00001087 SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans))
Filip Tehlare7c83962020-09-23 11:20:12 +00001088
1089 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1090 plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1091 IDtype=self.sa.id_type, load=self.sa.i_id) /
1092 ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1093 IDtype=self.sa.id_type, load=self.sa.r_id) /
1094 ikev2.IKEv2_payload_AUTH(next_payload='SA',
1095 auth_type=AuthMethod.value(self.sa.auth_method),
1096 load=self.sa.auth_data) /
1097 ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1098 ikev2.IKEv2_payload_TSi(next_payload='TSr',
1099 number_of_TSs=len(tsi),
1100 traffic_selector=tsi) /
1101 ikev2.IKEv2_payload_TSr(next_payload='Notify',
1102 number_of_TSs=len(tsr),
1103 traffic_selector=tsr) /
1104 ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
1105
1106 header = ikev2.IKEv2(
1107 init_SPI=self.sa.ispi,
1108 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1109 flags='Response', exch_type='IKE_AUTH')
1110
1111 ike_msg = self.encrypt_ike_msg(header, plain, 'IDi')
1112 packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1113 self.sa.dport, self.sa.natt, self.ip6)
1114 self.pg_send(self.pg0, packet)
1115
1116 def test_initiator(self):
1117 self.initiate_sa_init()
1118 self.sa.auth_init()
1119 self.sa.calc_child_keys()
1120 self.send_auth_response()
1121 self.verify_ike_sas()
1122
1123
1124class TemplateResponder(IkePeer):
1125 """ responder test template """
1126
Filip Tehlaredf29002020-10-10 04:39:11 +00001127 def initiate_del_sa_from_responder(self):
1128 self.pg0.enable_capture()
1129 self.pg_start()
1130 self.vapi.ikev2_initiate_del_ike_sa(
1131 ispi=int.from_bytes(self.sa.ispi, 'little'))
1132 capture = self.pg0.get_capture(1)
1133 ih = self.get_ike_header(capture[0])
1134 self.assertNotIn('Response', ih.flags)
1135 self.assertNotIn('Initiator', ih.flags)
1136 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
1137 plain = self.sa.hmac_and_decrypt(ih)
1138 d = ikev2.IKEv2_payload_Delete(plain)
1139 self.assertEqual(d.proto, 1) # proto=IKEv2
1140 self.assertEqual(ih.init_SPI, self.sa.ispi)
1141 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1142 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1143 flags='Initiator+Response',
1144 exch_type='INFORMATIONAL',
1145 id=ih.id, next_payload='Encrypted')
1146 resp = self.encrypt_ike_msg(header, b'', None)
1147 self.send_and_assert_no_replies(self.pg0, resp)
Filip Tehlare7c83962020-09-23 11:20:12 +00001148
1149 def verify_del_sa(self, packet):
1150 ih = self.get_ike_header(packet)
1151 self.assertEqual(ih.id, self.sa.msg_id)
1152 self.assertEqual(ih.exch_type, 37) # exchange informational
Filip Tehlaredf29002020-10-10 04:39:11 +00001153 self.assertIn('Response', ih.flags)
1154 self.assertNotIn('Initiator', ih.flags)
1155 self.assertEqual(ih.next_payload, 46) # Encrypted
1156 self.assertEqual(ih.init_SPI, self.sa.ispi)
1157 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1158 plain = self.sa.hmac_and_decrypt(ih)
1159 self.assertEqual(plain, b'')
Filip Tehlare7c83962020-09-23 11:20:12 +00001160
Filip Tehlaredf29002020-10-10 04:39:11 +00001161 def initiate_del_sa_from_initiator(self):
Filip Tehlare7c83962020-09-23 11:20:12 +00001162 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1163 flags='Initiator', exch_type='INFORMATIONAL',
1164 id=self.sa.new_msg_id())
1165 del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
1166 ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
1167 packet = self.create_packet(self.pg0, ike_msg,
1168 self.sa.sport, self.sa.dport,
1169 self.sa.natt, self.ip6)
1170 self.pg0.add_stream(packet)
1171 self.pg0.enable_capture()
1172 self.pg_start()
1173 capture = self.pg0.get_capture(1)
1174 self.verify_del_sa(capture[0])
1175
Filip Tehlar027d8132020-12-04 17:38:11 +00001176 def send_sa_init_req(self):
Filip Tehlare7c83962020-09-23 11:20:12 +00001177 tr_attr = self.sa.ike_crypto_attr()
1178 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1179 transform_id=self.sa.ike_crypto, length=tr_attr[1],
1180 key_length=tr_attr[0]) /
1181 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1182 transform_id=self.sa.ike_integ) /
1183 ikev2.IKEv2_payload_Transform(transform_type='PRF',
1184 transform_id=self.sa.ike_prf_alg.name) /
1185 ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
1186 transform_id=self.sa.ike_dh))
1187
1188 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
1189 trans_nb=4, trans=trans))
1190
Filip Tehlar027d8132020-12-04 17:38:11 +00001191 next_payload = None if self.ip6 else 'Notify'
1192
Filip Tehlare7c83962020-09-23 11:20:12 +00001193 self.sa.init_req_packet = (
1194 ikev2.IKEv2(init_SPI=self.sa.ispi,
1195 flags='Initiator', exch_type='IKE_SA_INIT') /
1196 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
1197 ikev2.IKEv2_payload_KE(next_payload='Nonce',
1198 group=self.sa.ike_dh,
1199 load=self.sa.my_dh_pub_key) /
Filip Tehlar027d8132020-12-04 17:38:11 +00001200 ikev2.IKEv2_payload_Nonce(next_payload=next_payload,
Filip Tehlare7c83962020-09-23 11:20:12 +00001201 load=self.sa.i_nonce))
1202
Filip Tehlar027d8132020-12-04 17:38:11 +00001203 if not self.ip6:
1204 if self.sa.i_natt:
1205 src_address = b'\x0a\x0a\x0a\x01'
1206 else:
1207 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
Filip Tehlare7c83962020-09-23 11:20:12 +00001208
Filip Tehlar027d8132020-12-04 17:38:11 +00001209 if self.sa.r_natt:
1210 dst_address = b'\x0a\x0a\x0a\x0a'
1211 else:
1212 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1213
1214 src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1215 dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1216 nat_src_detection = ikev2.IKEv2_payload_Notify(
1217 type='NAT_DETECTION_SOURCE_IP', load=src_nat,
1218 next_payload='Notify')
1219 nat_dst_detection = ikev2.IKEv2_payload_Notify(
1220 type='NAT_DETECTION_DESTINATION_IP', load=dst_nat)
1221 self.sa.init_req_packet = (self.sa.init_req_packet /
1222 nat_src_detection /
1223 nat_dst_detection)
Filip Tehlare7c83962020-09-23 11:20:12 +00001224
1225 ike_msg = self.create_packet(self.pg0, self.sa.init_req_packet,
1226 self.sa.sport, self.sa.dport,
1227 self.sa.natt, self.ip6)
1228 self.pg0.add_stream(ike_msg)
1229 self.pg0.enable_capture()
1230 self.pg_start()
1231 capture = self.pg0.get_capture(1)
1232 self.verify_sa_init(capture[0])
1233
Filip Tehlar68ad6252020-10-30 05:28:11 +00001234 def generate_auth_payload(self, last_payload=None, is_rekey=False):
Filip Tehlare7c83962020-09-23 11:20:12 +00001235 tr_attr = self.sa.esp_crypto_attr()
Filip Tehlar68ad6252020-10-30 05:28:11 +00001236 last_payload = last_payload or 'Notify'
Filip Tehlare7c83962020-09-23 11:20:12 +00001237 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1238 transform_id=self.sa.esp_crypto, length=tr_attr[1],
1239 key_length=tr_attr[0]) /
1240 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1241 transform_id=self.sa.esp_integ) /
1242 ikev2.IKEv2_payload_Transform(
1243 transform_type='Extended Sequence Number',
1244 transform_id='No ESN') /
1245 ikev2.IKEv2_payload_Transform(
1246 transform_type='Extended Sequence Number',
1247 transform_id='ESN'))
1248
Filip Tehlar68ad6252020-10-30 05:28:11 +00001249 c = self.sa.child_sas[0]
Filip Tehlare7c83962020-09-23 11:20:12 +00001250 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
Filip Tehlar68ad6252020-10-30 05:28:11 +00001251 SPIsize=4, SPI=c.ispi, trans_nb=4, trans=trans))
Filip Tehlare7c83962020-09-23 11:20:12 +00001252
1253 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
Filip Tehlar68ad6252020-10-30 05:28:11 +00001254 plain = (ikev2.IKEv2_payload_AUTH(next_payload='SA',
Filip Tehlare7c83962020-09-23 11:20:12 +00001255 auth_type=AuthMethod.value(self.sa.auth_method),
1256 load=self.sa.auth_data) /
1257 ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1258 ikev2.IKEv2_payload_TSi(next_payload='TSr',
Filip Tehlar68ad6252020-10-30 05:28:11 +00001259 number_of_TSs=len(tsi), traffic_selector=tsi) /
1260 ikev2.IKEv2_payload_TSr(next_payload=last_payload,
1261 number_of_TSs=len(tsr), traffic_selector=tsr))
Filip Tehlare7c83962020-09-23 11:20:12 +00001262
Filip Tehlar68ad6252020-10-30 05:28:11 +00001263 if is_rekey:
1264 first_payload = 'Nonce'
1265 plain = (ikev2.IKEv2_payload_Nonce(load=self.sa.i_nonce,
1266 next_payload='SA') / plain /
1267 ikev2.IKEv2_payload_Notify(type='REKEY_SA',
1268 proto='ESP', SPI=c.ispi))
1269 else:
1270 first_payload = 'IDi'
Benoît Gannec7cceee2021-09-28 11:19:37 +02001271 if self.no_idr_auth:
1272 ids = ikev2.IKEv2_payload_IDi(next_payload='AUTH',
1273 IDtype=self.sa.id_type,
1274 load=self.sa.i_id)
1275 else:
1276 ids = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1277 IDtype=self.sa.id_type, load=self.sa.i_id) /
1278 ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1279 IDtype=self.sa.id_type, load=self.sa.r_id))
Filip Tehlar68ad6252020-10-30 05:28:11 +00001280 plain = ids / plain
1281 return plain, first_payload
1282
1283 def send_sa_auth(self):
1284 plain, first_payload = self.generate_auth_payload(
1285 last_payload='Notify')
1286 plain = plain / ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT')
Filip Tehlare7c83962020-09-23 11:20:12 +00001287 header = ikev2.IKEv2(
1288 init_SPI=self.sa.ispi,
1289 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1290 flags='Initiator', exch_type='IKE_AUTH')
1291
Filip Tehlar68ad6252020-10-30 05:28:11 +00001292 ike_msg = self.encrypt_ike_msg(header, plain, first_payload)
Filip Tehlare7c83962020-09-23 11:20:12 +00001293 packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1294 self.sa.dport, self.sa.natt, self.ip6)
1295 self.pg0.add_stream(packet)
1296 self.pg0.enable_capture()
1297 self.pg_start()
1298 capture = self.pg0.get_capture(1)
1299 self.verify_sa_auth_resp(capture[0])
1300
1301 def verify_sa_init(self, packet):
1302 ih = self.get_ike_header(packet)
1303
1304 self.assertEqual(ih.id, self.sa.msg_id)
1305 self.assertEqual(ih.exch_type, 34)
Filip Tehlaredf29002020-10-10 04:39:11 +00001306 self.assertIn('Response', ih.flags)
Filip Tehlare7c83962020-09-23 11:20:12 +00001307 self.assertEqual(ih.init_SPI, self.sa.ispi)
1308 self.assertNotEqual(ih.resp_SPI, 0)
1309 self.sa.rspi = ih.resp_SPI
1310 try:
1311 sa = ih[ikev2.IKEv2_payload_SA]
1312 self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1313 self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
1314 except IndexError as e:
1315 self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
1316 self.logger.error(ih.show())
1317 raise
1318 self.sa.complete_dh_data()
1319 self.sa.calc_keys()
1320 self.sa.auth_init()
1321
1322 def verify_sa_auth_resp(self, packet):
1323 ike = self.get_ike_header(packet)
1324 udp = packet[UDP]
1325 self.verify_udp(udp)
1326 self.assertEqual(ike.id, self.sa.msg_id)
1327 plain = self.sa.hmac_and_decrypt(ike)
Filip Tehlar68ad6252020-10-30 05:28:11 +00001328 idr = ikev2.IKEv2_payload_IDr(plain)
1329 prop = idr[ikev2.IKEv2_payload_Proposal]
1330 self.assertEqual(prop.SPIsize, 4)
1331 self.sa.child_sas[0].rspi = prop.SPI
Filip Tehlare7c83962020-09-23 11:20:12 +00001332 self.sa.calc_child_keys()
1333
Filip Tehlarfab5e7f2021-01-14 13:32:01 +00001334 IKE_NODE_SUFFIX = 'ip4'
1335
1336 def verify_counters(self):
1337 self.assert_counter(2, 'processed', self.IKE_NODE_SUFFIX)
Filip Tehlar68d27532021-01-25 10:09:27 +00001338 self.assert_counter(1, 'init_sa_req', self.IKE_NODE_SUFFIX)
Filip Tehlarfab5e7f2021-01-14 13:32:01 +00001339 self.assert_counter(1, 'ike_auth_req', self.IKE_NODE_SUFFIX)
1340
Filip Tehlar68d27532021-01-25 10:09:27 +00001341 r = self.vapi.ikev2_sa_dump()
1342 s = r[0].sa.stats
1343 self.assertEqual(1, s.n_sa_auth_req)
1344 self.assertEqual(1, s.n_sa_init_req)
1345
Filip Tehlar12b517b2020-04-26 18:05:05 +00001346 def test_responder(self):
Filip Tehlar027d8132020-12-04 17:38:11 +00001347 self.send_sa_init_req()
Filip Tehlar12b517b2020-04-26 18:05:05 +00001348 self.send_sa_auth()
jan_cavojskya340fe12020-07-08 09:24:12 +02001349 self.verify_ipsec_sas()
1350 self.verify_ike_sas()
Filip Tehlarfab5e7f2021-01-14 13:32:01 +00001351 self.verify_counters()
Filip Tehlar12b517b2020-04-26 18:05:05 +00001352
1353
Filip Tehlarbfeae8c2020-06-23 20:35:58 +00001354class Ikev2Params(object):
1355 def config_params(self, params={}):
Filip Tehlar4f42a712020-07-01 08:56:59 +00001356 ec = VppEnum.vl_api_ipsec_crypto_alg_t
1357 ei = VppEnum.vl_api_ipsec_integ_alg_t
1358 self.vpp_enums = {
1359 'AES-CBC-128': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
1360 'AES-CBC-192': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
1361 'AES-CBC-256': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
1362 'AES-GCM-16ICV-128': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
1363 'AES-GCM-16ICV-192': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
1364 'AES-GCM-16ICV-256': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
1365
1366 'HMAC-SHA1-96': ei.IPSEC_API_INTEG_ALG_SHA1_96,
1367 'SHA2-256-128': ei.IPSEC_API_INTEG_ALG_SHA_256_128,
1368 'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192,
1369 'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256}
1370
Filip Tehlar2008e312020-11-09 13:23:24 +00001371 dpd_disabled = True if 'dpd_disabled' not in params else\
1372 params['dpd_disabled']
1373 if dpd_disabled:
1374 self.vapi.cli('ikev2 dpd disable')
Filip Tehlaredf29002020-10-10 04:39:11 +00001375 self.del_sa_from_responder = False if 'del_sa_from_responder'\
1376 not in params else params['del_sa_from_responder']
Filip Tehlar027d8132020-12-04 17:38:11 +00001377 i_natt = False if 'i_natt' not in params else params['i_natt']
1378 r_natt = False if 'r_natt' not in params else params['r_natt']
Filip Tehlarbfeae8c2020-06-23 20:35:58 +00001379 self.p = Profile(self, 'pr1')
Filip Tehlar84962d12020-09-08 06:08:05 +00001380 self.ip6 = False if 'ip6' not in params else params['ip6']
Filip Tehlarbfeae8c2020-06-23 20:35:58 +00001381
1382 if 'auth' in params and params['auth'] == 'rsa-sig':
1383 auth_method = 'rsa-sig'
Klement Sekerab23ffd72021-05-31 16:08:53 +02001384 work_dir = f"{config.vpp_ws_dir}/src/plugins/ikev2/test/certs/"
Filip Tehlarbfeae8c2020-06-23 20:35:58 +00001385 self.vapi.ikev2_set_local_key(
1386 key_file=work_dir + params['server-key'])
1387
1388 client_file = work_dir + params['client-cert']
1389 server_pem = open(work_dir + params['server-cert']).read()
1390 client_priv = open(work_dir + params['client-key']).read()
1391 client_priv = load_pem_private_key(str.encode(client_priv), None,
1392 default_backend())
1393 self.peer_cert = x509.load_pem_x509_certificate(
1394 str.encode(server_pem),
1395 default_backend())
1396 self.p.add_auth(method='rsa-sig', data=str.encode(client_file))
1397 auth_data = None
1398 else:
1399 auth_data = b'$3cr3tpa$$w0rd'
1400 self.p.add_auth(method='shared-key', data=auth_data)
1401 auth_method = 'shared-key'
1402 client_priv = None
1403
Filip Tehlare7c83962020-09-23 11:20:12 +00001404 is_init = True if 'is_initiator' not in params else\
1405 params['is_initiator']
Benoît Gannec7cceee2021-09-28 11:19:37 +02001406 self.no_idr_auth = params.get('no_idr_in_auth', False)
Filip Tehlare7c83962020-09-23 11:20:12 +00001407
1408 idr = {'id_type': 'fqdn', 'data': b'vpp.home'}
1409 idi = {'id_type': 'fqdn', 'data': b'roadwarrior.example.com'}
Benoît Gannec7cceee2021-09-28 11:19:37 +02001410 r_id = self.idr = idr['data']
1411 i_id = self.idi = idi['data']
Filip Tehlare7c83962020-09-23 11:20:12 +00001412 if is_init:
Benoît Gannec7cceee2021-09-28 11:19:37 +02001413 # scapy is initiator, VPP is responder
Filip Tehlare7c83962020-09-23 11:20:12 +00001414 self.p.add_local_id(**idr)
1415 self.p.add_remote_id(**idi)
Benoît Gannec7cceee2021-09-28 11:19:37 +02001416 if self.no_idr_auth:
1417 r_id = None
Filip Tehlare7c83962020-09-23 11:20:12 +00001418 else:
Benoît Gannec7cceee2021-09-28 11:19:37 +02001419 # VPP is initiator, scapy is responder
Filip Tehlare7c83962020-09-23 11:20:12 +00001420 self.p.add_local_id(**idi)
Benoît Gannec7cceee2021-09-28 11:19:37 +02001421 if not self.no_idr_auth:
1422 self.p.add_remote_id(**idr)
Filip Tehlare7c83962020-09-23 11:20:12 +00001423
Filip Tehlar84962d12020-09-08 06:08:05 +00001424 loc_ts = {'start_addr': '10.10.10.0', 'end_addr': '10.10.10.255'} if\
1425 'loc_ts' not in params else params['loc_ts']
1426 rem_ts = {'start_addr': '10.0.0.0', 'end_addr': '10.0.0.255'} if\
1427 'rem_ts' not in params else params['rem_ts']
1428 self.p.add_local_ts(**loc_ts)
1429 self.p.add_remote_ts(**rem_ts)
Filip Tehlare7c83962020-09-23 11:20:12 +00001430 if 'responder' in params:
1431 self.p.add_responder(params['responder'])
1432 if 'ike_transforms' in params:
1433 self.p.add_ike_transforms(params['ike_transforms'])
1434 if 'esp_transforms' in params:
1435 self.p.add_esp_transforms(params['esp_transforms'])
Filip Tehlarbfeae8c2020-06-23 20:35:58 +00001436
Filip Tehlar67b8a7f2020-11-06 11:00:42 +00001437 udp_encap = False if 'udp_encap' not in params else\
1438 params['udp_encap']
1439 if udp_encap:
1440 self.p.set_udp_encap(True)
1441
Filip Tehlaraf2cc642021-02-22 16:15:51 +00001442 if 'responder_hostname' in params:
1443 hn = params['responder_hostname']
1444 self.p.add_responder_hostname(hn)
1445
1446 # configure static dns record
1447 self.vapi.dns_name_server_add_del(
1448 is_ip6=0, is_add=1,
1449 server_address=IPv4Address(u'8.8.8.8').packed)
1450 self.vapi.dns_enable_disable(enable=1)
1451
1452 cmd = "dns cache add {} {}".format(hn['hostname'],
1453 self.pg0.remote_ip4)
1454 self.vapi.cli(cmd)
1455
Benoît Gannec7cceee2021-09-28 11:19:37 +02001456 self.sa = IKEv2SA(self, i_id=i_id, r_id=r_id,
Filip Tehlare7c83962020-09-23 11:20:12 +00001457 is_initiator=is_init,
Filip Tehlar027d8132020-12-04 17:38:11 +00001458 id_type=self.p.local_id['id_type'],
1459 i_natt=i_natt, r_natt=r_natt,
Filip Tehlarbfeae8c2020-06-23 20:35:58 +00001460 priv_key=client_priv, auth_method=auth_method,
Benoît Gannea4276902021-09-27 15:37:48 +02001461 nonce=params.get('nonce'),
Filip Tehlar67b8a7f2020-11-06 11:00:42 +00001462 auth_data=auth_data, udp_encap=udp_encap,
Filip Tehlarbfeae8c2020-06-23 20:35:58 +00001463 local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
Benoît Gannec7cceee2021-09-28 11:19:37 +02001464
Filip Tehlar68ad6252020-10-30 05:28:11 +00001465 if is_init:
1466 ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\
1467 params['ike-crypto']
1468 ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\
1469 params['ike-integ']
1470 ike_dh = '2048MODPgr' if 'ike-dh' not in params else\
1471 params['ike-dh']
Filip Tehlar4f42a712020-07-01 08:56:59 +00001472
Filip Tehlar68ad6252020-10-30 05:28:11 +00001473 esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\
1474 params['esp-crypto']
1475 esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\
1476 params['esp-integ']
Filip Tehlar4f42a712020-07-01 08:56:59 +00001477
Filip Tehlar68ad6252020-10-30 05:28:11 +00001478 self.sa.set_ike_props(
1479 crypto=ike_crypto[0], crypto_key_len=ike_crypto[1],
1480 integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh)
1481 self.sa.set_esp_props(
1482 crypto=esp_crypto[0], crypto_key_len=esp_crypto[1],
1483 integ=esp_integ)
Filip Tehlarbfeae8c2020-06-23 20:35:58 +00001484
1485
Filip Tehlar459d17b2020-07-06 15:40:08 +00001486class TestApi(VppTestCase):
1487 """ Test IKEV2 API """
1488 @classmethod
1489 def setUpClass(cls):
1490 super(TestApi, cls).setUpClass()
1491
1492 @classmethod
1493 def tearDownClass(cls):
1494 super(TestApi, cls).tearDownClass()
1495
1496 def tearDown(self):
1497 super(TestApi, self).tearDown()
1498 self.p1.remove_vpp_config()
1499 self.p2.remove_vpp_config()
1500 r = self.vapi.ikev2_profile_dump()
1501 self.assertEqual(len(r), 0)
1502
1503 def configure_profile(self, cfg):
1504 p = Profile(self, cfg['name'])
1505 p.add_local_id(id_type=cfg['loc_id'][0], data=cfg['loc_id'][1])
1506 p.add_remote_id(id_type=cfg['rem_id'][0], data=cfg['rem_id'][1])
1507 p.add_local_ts(**cfg['loc_ts'])
1508 p.add_remote_ts(**cfg['rem_ts'])
1509 p.add_responder(cfg['responder'])
1510 p.add_ike_transforms(cfg['ike_ts'])
1511 p.add_esp_transforms(cfg['esp_ts'])
1512 p.add_auth(**cfg['auth'])
1513 p.set_udp_encap(cfg['udp_encap'])
1514 p.set_ipsec_over_udp_port(cfg['ipsec_over_udp_port'])
1515 if 'lifetime_data' in cfg:
1516 p.set_lifetime_data(cfg['lifetime_data'])
1517 if 'tun_itf' in cfg:
1518 p.set_tunnel_interface(cfg['tun_itf'])
Filip Tehlard7fc12f2020-10-30 04:47:44 +00001519 if 'natt_disabled' in cfg and cfg['natt_disabled']:
1520 p.disable_natt()
Filip Tehlar459d17b2020-07-06 15:40:08 +00001521 p.add_vpp_config()
1522 return p
1523
1524 def test_profile_api(self):
1525 """ test profile dump API """
Filip Tehlar84962d12020-09-08 06:08:05 +00001526 loc_ts4 = {
Filip Tehlar459d17b2020-07-06 15:40:08 +00001527 'proto': 8,
1528 'start_port': 1,
1529 'end_port': 19,
1530 'start_addr': '3.3.3.2',
1531 'end_addr': '3.3.3.3',
1532 }
Filip Tehlar84962d12020-09-08 06:08:05 +00001533 rem_ts4 = {
Filip Tehlar459d17b2020-07-06 15:40:08 +00001534 'proto': 9,
1535 'start_port': 10,
1536 'end_port': 119,
1537 'start_addr': '4.5.76.80',
1538 'end_addr': '2.3.4.6',
1539 }
1540
Filip Tehlar84962d12020-09-08 06:08:05 +00001541 loc_ts6 = {
1542 'proto': 8,
1543 'start_port': 1,
1544 'end_port': 19,
1545 'start_addr': 'ab::1',
1546 'end_addr': 'ab::4',
1547 }
1548 rem_ts6 = {
1549 'proto': 9,
1550 'start_port': 10,
1551 'end_port': 119,
1552 'start_addr': 'cd::12',
1553 'end_addr': 'cd::13',
1554 }
1555
Filip Tehlar459d17b2020-07-06 15:40:08 +00001556 conf = {
1557 'p1': {
1558 'name': 'p1',
Filip Tehlard7fc12f2020-10-30 04:47:44 +00001559 'natt_disabled': True,
Filip Tehlar459d17b2020-07-06 15:40:08 +00001560 'loc_id': ('fqdn', b'vpp.home'),
1561 'rem_id': ('fqdn', b'roadwarrior.example.com'),
Filip Tehlar84962d12020-09-08 06:08:05 +00001562 'loc_ts': loc_ts4,
1563 'rem_ts': rem_ts4,
1564 'responder': {'sw_if_index': 0, 'addr': '5.6.7.8'},
Filip Tehlar459d17b2020-07-06 15:40:08 +00001565 'ike_ts': {
1566 'crypto_alg': 20,
1567 'crypto_key_size': 32,
Benoît Ganne1eaaba42020-12-14 19:31:16 +01001568 'integ_alg': 0,
Filip Tehlar459d17b2020-07-06 15:40:08 +00001569 'dh_group': 1},
1570 'esp_ts': {
1571 'crypto_alg': 13,
1572 'crypto_key_size': 24,
1573 'integ_alg': 2},
1574 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1575 'udp_encap': True,
1576 'ipsec_over_udp_port': 4501,
1577 'lifetime_data': {
1578 'lifetime': 123,
1579 'lifetime_maxdata': 20192,
1580 'lifetime_jitter': 9,
1581 'handover': 132},
1582 },
1583 'p2': {
1584 'name': 'p2',
1585 'loc_id': ('ip4-addr', b'192.168.2.1'),
Filip Tehlar84962d12020-09-08 06:08:05 +00001586 'rem_id': ('ip6-addr', b'abcd::1'),
1587 'loc_ts': loc_ts6,
1588 'rem_ts': rem_ts6,
1589 'responder': {'sw_if_index': 4, 'addr': 'def::10'},
Filip Tehlar459d17b2020-07-06 15:40:08 +00001590 'ike_ts': {
1591 'crypto_alg': 12,
1592 'crypto_key_size': 16,
1593 'integ_alg': 3,
1594 'dh_group': 3},
1595 'esp_ts': {
1596 'crypto_alg': 9,
1597 'crypto_key_size': 24,
1598 'integ_alg': 4},
1599 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1600 'udp_encap': False,
1601 'ipsec_over_udp_port': 4600,
1602 'tun_itf': 0}
1603 }
1604 self.p1 = self.configure_profile(conf['p1'])
1605 self.p2 = self.configure_profile(conf['p2'])
1606
1607 r = self.vapi.ikev2_profile_dump()
1608 self.assertEqual(len(r), 2)
1609 self.verify_profile(r[0].profile, conf['p1'])
1610 self.verify_profile(r[1].profile, conf['p2'])
1611
1612 def verify_id(self, api_id, cfg_id):
1613 self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1614 self.assertEqual(bytes(api_id.data, 'ascii'), cfg_id[1])
1615
1616 def verify_ts(self, api_ts, cfg_ts):
1617 self.assertEqual(api_ts.protocol_id, cfg_ts['proto'])
1618 self.assertEqual(api_ts.start_port, cfg_ts['start_port'])
1619 self.assertEqual(api_ts.end_port, cfg_ts['end_port'])
Filip Tehlar84962d12020-09-08 06:08:05 +00001620 self.assertEqual(api_ts.start_addr,
1621 ip_address(text_type(cfg_ts['start_addr'])))
1622 self.assertEqual(api_ts.end_addr,
1623 ip_address(text_type(cfg_ts['end_addr'])))
Filip Tehlar459d17b2020-07-06 15:40:08 +00001624
1625 def verify_responder(self, api_r, cfg_r):
1626 self.assertEqual(api_r.sw_if_index, cfg_r['sw_if_index'])
Filip Tehlar84962d12020-09-08 06:08:05 +00001627 self.assertEqual(api_r.addr, ip_address(cfg_r['addr']))
Filip Tehlar459d17b2020-07-06 15:40:08 +00001628
1629 def verify_transforms(self, api_ts, cfg_ts):
1630 self.assertEqual(api_ts.crypto_alg, cfg_ts['crypto_alg'])
1631 self.assertEqual(api_ts.crypto_key_size, cfg_ts['crypto_key_size'])
1632 self.assertEqual(api_ts.integ_alg, cfg_ts['integ_alg'])
1633
1634 def verify_ike_transforms(self, api_ts, cfg_ts):
1635 self.verify_transforms(api_ts, cfg_ts)
1636 self.assertEqual(api_ts.dh_group, cfg_ts['dh_group'])
1637
1638 def verify_esp_transforms(self, api_ts, cfg_ts):
1639 self.verify_transforms(api_ts, cfg_ts)
1640
1641 def verify_auth(self, api_auth, cfg_auth):
1642 self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth['method']))
1643 self.assertEqual(api_auth.data, cfg_auth['data'])
1644 self.assertEqual(api_auth.data_len, len(cfg_auth['data']))
1645
1646 def verify_lifetime_data(self, p, ld):
1647 self.assertEqual(p.lifetime, ld['lifetime'])
1648 self.assertEqual(p.lifetime_maxdata, ld['lifetime_maxdata'])
1649 self.assertEqual(p.lifetime_jitter, ld['lifetime_jitter'])
1650 self.assertEqual(p.handover, ld['handover'])
1651
1652 def verify_profile(self, ap, cp):
1653 self.assertEqual(ap.name, cp['name'])
1654 self.assertEqual(ap.udp_encap, cp['udp_encap'])
1655 self.verify_id(ap.loc_id, cp['loc_id'])
1656 self.verify_id(ap.rem_id, cp['rem_id'])
1657 self.verify_ts(ap.loc_ts, cp['loc_ts'])
1658 self.verify_ts(ap.rem_ts, cp['rem_ts'])
1659 self.verify_responder(ap.responder, cp['responder'])
1660 self.verify_ike_transforms(ap.ike_ts, cp['ike_ts'])
1661 self.verify_esp_transforms(ap.esp_ts, cp['esp_ts'])
1662 self.verify_auth(ap.auth, cp['auth'])
Filip Tehlard7fc12f2020-10-30 04:47:44 +00001663 natt_dis = False if 'natt_disabled' not in cp else cp['natt_disabled']
1664 self.assertTrue(natt_dis == ap.natt_disabled)
1665
Filip Tehlar459d17b2020-07-06 15:40:08 +00001666 if 'lifetime_data' in cp:
1667 self.verify_lifetime_data(ap, cp['lifetime_data'])
1668 self.assertEqual(ap.ipsec_over_udp_port, cp['ipsec_over_udp_port'])
1669 if 'tun_itf' in cp:
1670 self.assertEqual(ap.tun_itf, cp['tun_itf'])
1671 else:
1672 self.assertEqual(ap.tun_itf, 0xffffffff)
1673
1674
Andrew Yourtchenko8dc0d482021-01-29 13:17:19 +00001675@tag_fixme_vpp_workers
Filip Tehlar027d8132020-12-04 17:38:11 +00001676class TestResponderBehindNAT(TemplateResponder, Ikev2Params):
1677 """ test responder - responder behind NAT """
1678
Filip Tehlarfab5e7f2021-01-14 13:32:01 +00001679 IKE_NODE_SUFFIX = 'ip4-natt'
1680
Filip Tehlar027d8132020-12-04 17:38:11 +00001681 def config_tc(self):
1682 self.config_params({'r_natt': True})
1683
1684
Andrew Yourtchenko8dc0d482021-01-29 13:17:19 +00001685@tag_fixme_vpp_workers
Filip Tehlar18107c92020-12-01 14:51:09 +00001686class TestInitiatorNATT(TemplateInitiator, Ikev2Params):
1687 """ test ikev2 initiator - NAT traversal (intitiator behind NAT) """
1688
1689 def config_tc(self):
1690 self.config_params({
Filip Tehlar027d8132020-12-04 17:38:11 +00001691 'i_natt': True,
Filip Tehlar18107c92020-12-01 14:51:09 +00001692 'is_initiator': False, # seen from test case perspective
1693 # thus vpp is initiator
1694 'responder': {'sw_if_index': self.pg0.sw_if_index,
1695 'addr': self.pg0.remote_ip4},
1696 'ike-crypto': ('AES-GCM-16ICV', 32),
1697 'ike-integ': 'NULL',
1698 'ike-dh': '3072MODPgr',
1699 'ike_transforms': {
1700 'crypto_alg': 20, # "aes-gcm-16"
1701 'crypto_key_size': 256,
1702 'dh_group': 15, # "modp-3072"
1703 },
1704 'esp_transforms': {
1705 'crypto_alg': 12, # "aes-cbc"
1706 'crypto_key_size': 256,
1707 # "hmac-sha2-256-128"
1708 'integ_alg': 12}})
1709
1710
Andrew Yourtchenko8dc0d482021-01-29 13:17:19 +00001711@tag_fixme_vpp_workers
Filip Tehlare7c83962020-09-23 11:20:12 +00001712class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
1713 """ test ikev2 initiator - pre shared key auth """
Filip Tehlaredf29002020-10-10 04:39:11 +00001714
Filip Tehlare7c83962020-09-23 11:20:12 +00001715 def config_tc(self):
1716 self.config_params({
1717 'is_initiator': False, # seen from test case perspective
1718 # thus vpp is initiator
Filip Tehlare7c83962020-09-23 11:20:12 +00001719 'ike-crypto': ('AES-GCM-16ICV', 32),
1720 'ike-integ': 'NULL',
1721 'ike-dh': '3072MODPgr',
1722 'ike_transforms': {
1723 'crypto_alg': 20, # "aes-gcm-16"
1724 'crypto_key_size': 256,
1725 'dh_group': 15, # "modp-3072"
1726 },
1727 'esp_transforms': {
1728 'crypto_alg': 12, # "aes-cbc"
1729 'crypto_key_size': 256,
1730 # "hmac-sha2-256-128"
Filip Tehlaraf2cc642021-02-22 16:15:51 +00001731 'integ_alg': 12},
1732 'responder_hostname': {'hostname': 'vpp.responder.org',
1733 'sw_if_index': self.pg0.sw_if_index}})
Filip Tehlare7c83962020-09-23 11:20:12 +00001734
1735
Andrew Yourtchenko8dc0d482021-01-29 13:17:19 +00001736@tag_fixme_vpp_workers
Filip Tehlar38340fa2020-11-19 21:34:48 +00001737class TestInitiatorRequestWindowSize(TestInitiatorPsk):
1738 """ test initiator - request window size (1) """
1739
1740 def rekey_respond(self, req, update_child_sa_data):
1741 ih = self.get_ike_header(req)
1742 plain = self.sa.hmac_and_decrypt(ih)
1743 sa = ikev2.IKEv2_payload_SA(plain)
1744 if update_child_sa_data:
1745 prop = sa[ikev2.IKEv2_payload_Proposal]
1746 self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1747 self.sa.r_nonce = self.sa.i_nonce
1748 self.sa.child_sas[0].ispi = prop.SPI
1749 self.sa.child_sas[0].rspi = prop.SPI
1750 self.sa.calc_child_keys()
1751
1752 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1753 flags='Response', exch_type=36,
1754 id=ih.id, next_payload='Encrypted')
1755 resp = self.encrypt_ike_msg(header, sa, 'SA')
1756 packet = self.create_packet(self.pg0, resp, self.sa.sport,
1757 self.sa.dport, self.sa.natt, self.ip6)
1758 self.send_and_assert_no_replies(self.pg0, packet)
1759
1760 def test_initiator(self):
1761 super(TestInitiatorRequestWindowSize, self).test_initiator()
1762 self.pg0.enable_capture()
1763 self.pg_start()
1764 ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
1765 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1766 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1767 capture = self.pg0.get_capture(2)
1768
1769 # reply in reverse order
1770 self.rekey_respond(capture[1], True)
1771 self.rekey_respond(capture[0], False)
1772
1773 # verify that only the second request was accepted
1774 self.verify_ike_sas()
1775 self.verify_ipsec_sas(is_rekey=True)
1776
1777
Andrew Yourtchenko8dc0d482021-01-29 13:17:19 +00001778@tag_fixme_vpp_workers
Filip Tehlar68ad6252020-10-30 05:28:11 +00001779class TestInitiatorRekey(TestInitiatorPsk):
1780 """ test ikev2 initiator - rekey """
1781
1782 def rekey_from_initiator(self):
1783 ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
1784 self.pg0.enable_capture()
1785 self.pg_start()
1786 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1787 capture = self.pg0.get_capture(1)
1788 ih = self.get_ike_header(capture[0])
1789 self.assertEqual(ih.exch_type, 36) # CHILD_SA
1790 self.assertNotIn('Response', ih.flags)
1791 self.assertIn('Initiator', ih.flags)
1792 plain = self.sa.hmac_and_decrypt(ih)
1793 sa = ikev2.IKEv2_payload_SA(plain)
1794 prop = sa[ikev2.IKEv2_payload_Proposal]
Filip Tehlar68ad6252020-10-30 05:28:11 +00001795 self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1796 self.sa.r_nonce = self.sa.i_nonce
1797 # update new responder SPI
1798 self.sa.child_sas[0].ispi = prop.SPI
1799 self.sa.child_sas[0].rspi = prop.SPI
1800 self.sa.calc_child_keys()
1801 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1802 flags='Response', exch_type=36,
1803 id=ih.id, next_payload='Encrypted')
1804 resp = self.encrypt_ike_msg(header, sa, 'SA')
1805 packet = self.create_packet(self.pg0, resp, self.sa.sport,
1806 self.sa.dport, self.sa.natt, self.ip6)
1807 self.send_and_assert_no_replies(self.pg0, packet)
1808
1809 def test_initiator(self):
1810 super(TestInitiatorRekey, self).test_initiator()
1811 self.rekey_from_initiator()
1812 self.verify_ike_sas()
1813 self.verify_ipsec_sas(is_rekey=True)
1814
1815
Andrew Yourtchenko8dc0d482021-01-29 13:17:19 +00001816@tag_fixme_vpp_workers
Filip Tehlaredf29002020-10-10 04:39:11 +00001817class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
1818 """ test ikev2 initiator - delete IKE SA from responder """
1819
1820 def config_tc(self):
1821 self.config_params({
1822 'del_sa_from_responder': True,
1823 'is_initiator': False, # seen from test case perspective
1824 # thus vpp is initiator
1825 'responder': {'sw_if_index': self.pg0.sw_if_index,
1826 'addr': self.pg0.remote_ip4},
1827 'ike-crypto': ('AES-GCM-16ICV', 32),
1828 'ike-integ': 'NULL',
1829 'ike-dh': '3072MODPgr',
1830 'ike_transforms': {
1831 'crypto_alg': 20, # "aes-gcm-16"
1832 'crypto_key_size': 256,
1833 'dh_group': 15, # "modp-3072"
1834 },
1835 'esp_transforms': {
1836 'crypto_alg': 12, # "aes-cbc"
1837 'crypto_key_size': 256,
1838 # "hmac-sha2-256-128"
Benoît Gannec7cceee2021-09-28 11:19:37 +02001839 'integ_alg': 12},
1840 'no_idr_in_auth': True})
Filip Tehlaredf29002020-10-10 04:39:11 +00001841
1842
Andrew Yourtchenko8dc0d482021-01-29 13:17:19 +00001843@tag_fixme_vpp_workers
Filip Tehlar027d8132020-12-04 17:38:11 +00001844class TestResponderInitBehindNATT(TemplateResponder, Ikev2Params):
1845 """ test ikev2 responder - initiator behind NAT """
Filip Tehlarfab5e7f2021-01-14 13:32:01 +00001846
1847 IKE_NODE_SUFFIX = 'ip4-natt'
1848
Filip Tehlarbfeae8c2020-06-23 20:35:58 +00001849 def config_tc(self):
1850 self.config_params(
Filip Tehlar027d8132020-12-04 17:38:11 +00001851 {'i_natt': True})
Filip Tehlarbfeae8c2020-06-23 20:35:58 +00001852
1853
Andrew Yourtchenko8dc0d482021-01-29 13:17:19 +00001854@tag_fixme_vpp_workers
Filip Tehlarbfeae8c2020-06-23 20:35:58 +00001855class TestResponderPsk(TemplateResponder, Ikev2Params):
1856 """ test ikev2 responder - pre shared key auth """
1857 def config_tc(self):
1858 self.config_params()
1859
1860
Andrew Yourtchenko8dc0d482021-01-29 13:17:19 +00001861@tag_fixme_vpp_workers
Filip Tehlar2008e312020-11-09 13:23:24 +00001862class TestResponderDpd(TestResponderPsk):
1863 """
1864 Dead peer detection test
1865 """
1866 def config_tc(self):
1867 self.config_params({'dpd_disabled': False})
1868
1869 def tearDown(self):
1870 pass
1871
1872 def test_responder(self):
1873 self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
1874 super(TestResponderDpd, self).test_responder()
1875 self.pg0.enable_capture()
1876 self.pg_start()
1877 # capture empty request but don't reply
1878 capture = self.pg0.get_capture(expected_count=1, timeout=5)
1879 ih = self.get_ike_header(capture[0])
1880 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
1881 plain = self.sa.hmac_and_decrypt(ih)
1882 self.assertEqual(plain, b'')
1883 # wait for SA expiration
1884 time.sleep(3)
1885 ike_sas = self.vapi.ikev2_sa_dump()
1886 self.assertEqual(len(ike_sas), 0)
1887 ipsec_sas = self.vapi.ipsec_sa_dump()
1888 self.assertEqual(len(ipsec_sas), 0)
1889
1890
Andrew Yourtchenko8dc0d482021-01-29 13:17:19 +00001891@tag_fixme_vpp_workers
Filip Tehlar68ad6252020-10-30 05:28:11 +00001892class TestResponderRekey(TestResponderPsk):
1893 """ test ikev2 responder - rekey """
1894
1895 def rekey_from_initiator(self):
Filip Tehlar38340fa2020-11-19 21:34:48 +00001896 packet = self.create_rekey_request()
Filip Tehlar68ad6252020-10-30 05:28:11 +00001897 self.pg0.add_stream(packet)
1898 self.pg0.enable_capture()
1899 self.pg_start()
1900 capture = self.pg0.get_capture(1)
1901 ih = self.get_ike_header(capture[0])
1902 plain = self.sa.hmac_and_decrypt(ih)
1903 sa = ikev2.IKEv2_payload_SA(plain)
1904 prop = sa[ikev2.IKEv2_payload_Proposal]
Filip Tehlar68ad6252020-10-30 05:28:11 +00001905 self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1906 # update new responder SPI
1907 self.sa.child_sas[0].rspi = prop.SPI
1908
1909 def test_responder(self):
1910 super(TestResponderRekey, self).test_responder()
1911 self.rekey_from_initiator()
1912 self.sa.calc_child_keys()
1913 self.verify_ike_sas()
1914 self.verify_ipsec_sas(is_rekey=True)
Filip Tehlar68d27532021-01-25 10:09:27 +00001915 self.assert_counter(1, 'rekey_req', 'ip4')
1916 r = self.vapi.ikev2_sa_dump()
1917 self.assertEqual(r[0].sa.stats.n_rekey_req, 1)
Filip Tehlar68ad6252020-10-30 05:28:11 +00001918
1919
Filip Tehlard28196f2021-01-27 18:08:21 +00001920class TestResponderVrf(TestResponderPsk, Ikev2Params):
1921 """ test ikev2 responder - non-default table id """
1922
1923 @classmethod
1924 def setUpClass(cls):
1925 import scapy.contrib.ikev2 as _ikev2
1926 globals()['ikev2'] = _ikev2
1927 super(IkePeer, cls).setUpClass()
1928 cls.create_pg_interfaces(range(1))
1929 cls.vapi.cli("ip table add 1")
1930 cls.vapi.cli("set interface ip table pg0 1")
1931 for i in cls.pg_interfaces:
1932 i.admin_up()
1933 i.config_ip4()
1934 i.resolve_arp()
1935 i.config_ip6()
1936 i.resolve_ndp()
1937
1938 def config_tc(self):
1939 self.config_params({'dpd_disabled': False})
1940
1941 def test_responder(self):
1942 self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
1943 super(TestResponderVrf, self).test_responder()
1944 self.pg0.enable_capture()
1945 self.pg_start()
1946 capture = self.pg0.get_capture(expected_count=1, timeout=5)
1947 ih = self.get_ike_header(capture[0])
1948 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
1949 plain = self.sa.hmac_and_decrypt(ih)
1950 self.assertEqual(plain, b'')
1951
1952
Andrew Yourtchenko8dc0d482021-01-29 13:17:19 +00001953@tag_fixme_vpp_workers
Filip Tehlarbfeae8c2020-06-23 20:35:58 +00001954class TestResponderRsaSign(TemplateResponder, Ikev2Params):
1955 """ test ikev2 responder - cert based auth """
1956 def config_tc(self):
1957 self.config_params({
Filip Tehlar67b8a7f2020-11-06 11:00:42 +00001958 'udp_encap': True,
Filip Tehlarbfeae8c2020-06-23 20:35:58 +00001959 'auth': 'rsa-sig',
1960 'server-key': 'server-key.pem',
1961 'client-key': 'client-key.pem',
1962 'client-cert': 'client-cert.pem',
1963 'server-cert': 'server-cert.pem'})
1964
Filip Tehlar4f42a712020-07-01 08:56:59 +00001965
Andrew Yourtchenko8dc0d482021-01-29 13:17:19 +00001966@tag_fixme_vpp_workers
Filip Tehlar4f42a712020-07-01 08:56:59 +00001967class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\
1968 (TemplateResponder, Ikev2Params):
1969 """
1970 IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
1971 """
1972 def config_tc(self):
1973 self.config_params({
1974 'ike-crypto': ('AES-CBC', 16),
1975 'ike-integ': 'SHA2-256-128',
1976 'esp-crypto': ('AES-CBC', 24),
1977 'esp-integ': 'SHA2-384-192',
Benoît Gannea4276902021-09-27 15:37:48 +02001978 'ike-dh': '2048MODPgr',
Benoît Gannec7cceee2021-09-28 11:19:37 +02001979 'nonce': os.urandom(256),
1980 'no_idr_in_auth': True})
Filip Tehlar4f42a712020-07-01 08:56:59 +00001981
1982
Andrew Yourtchenko8dc0d482021-01-29 13:17:19 +00001983@tag_fixme_vpp_workers
Filip Tehlar4f42a712020-07-01 08:56:59 +00001984class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\
1985 (TemplateResponder, Ikev2Params):
Filip Tehlarfab5e7f2021-01-14 13:32:01 +00001986
Filip Tehlar4f42a712020-07-01 08:56:59 +00001987 """
1988 IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
1989 """
1990 def config_tc(self):
1991 self.config_params({
1992 'ike-crypto': ('AES-CBC', 32),
1993 'ike-integ': 'SHA2-256-128',
1994 'esp-crypto': ('AES-GCM-16ICV', 32),
1995 'esp-integ': 'NULL',
1996 'ike-dh': '3072MODPgr'})
1997
1998
Andrew Yourtchenko8dc0d482021-01-29 13:17:19 +00001999@tag_fixme_vpp_workers
Filip Tehlara7b963d2020-07-08 13:25:34 +00002000class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
2001 """
2002 IKE:AES_GCM_16_256
2003 """
Filip Tehlarfab5e7f2021-01-14 13:32:01 +00002004
2005 IKE_NODE_SUFFIX = 'ip6'
2006
Filip Tehlara7b963d2020-07-08 13:25:34 +00002007 def config_tc(self):
2008 self.config_params({
Filip Tehlaredf29002020-10-10 04:39:11 +00002009 'del_sa_from_responder': True,
Filip Tehlar84962d12020-09-08 06:08:05 +00002010 'ip6': True,
2011 'natt': True,
Filip Tehlara7b963d2020-07-08 13:25:34 +00002012 'ike-crypto': ('AES-GCM-16ICV', 32),
2013 'ike-integ': 'NULL',
Filip Tehlar84962d12020-09-08 06:08:05 +00002014 'ike-dh': '2048MODPgr',
2015 'loc_ts': {'start_addr': 'ab:cd::0',
2016 'end_addr': 'ab:cd::10'},
2017 'rem_ts': {'start_addr': '11::0',
2018 'end_addr': '11::100'}})
Filip Tehlara7b963d2020-07-08 13:25:34 +00002019
2020
Andrew Yourtchenko8dc0d482021-01-29 13:17:19 +00002021@tag_fixme_vpp_workers
Filip Tehlar2008e312020-11-09 13:23:24 +00002022class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
2023 """
2024 Test for keep alive messages
2025 """
2026
2027 def send_empty_req_from_responder(self):
Filip Tehlar38340fa2020-11-19 21:34:48 +00002028 packet = self.create_empty_request()
Filip Tehlar2008e312020-11-09 13:23:24 +00002029 self.pg0.add_stream(packet)
2030 self.pg0.enable_capture()
2031 self.pg_start()
2032 capture = self.pg0.get_capture(1)
2033 ih = self.get_ike_header(capture[0])
2034 self.assertEqual(ih.id, self.sa.msg_id)
2035 plain = self.sa.hmac_and_decrypt(ih)
2036 self.assertEqual(plain, b'')
Filip Tehlarfab5e7f2021-01-14 13:32:01 +00002037 self.assert_counter(1, 'keepalive', 'ip4')
Filip Tehlar68d27532021-01-25 10:09:27 +00002038 r = self.vapi.ikev2_sa_dump()
2039 self.assertEqual(1, r[0].sa.stats.n_keepalives)
Filip Tehlar2008e312020-11-09 13:23:24 +00002040
2041 def test_initiator(self):
2042 super(TestInitiatorKeepaliveMsg, self).test_initiator()
2043 self.send_empty_req_from_responder()
2044
2045
Filip Tehlar558607d2020-07-16 07:25:56 +00002046class TestMalformedMessages(TemplateResponder, Ikev2Params):
2047 """ malformed packet test """
2048
2049 def tearDown(self):
2050 pass
2051
2052 def config_tc(self):
2053 self.config_params()
2054
Filip Tehlar558607d2020-07-16 07:25:56 +00002055 def create_ike_init_msg(self, length=None, payload=None):
2056 msg = ikev2.IKEv2(length=length, init_SPI='\x11' * 8,
2057 flags='Initiator', exch_type='IKE_SA_INIT')
2058 if payload is not None:
2059 msg /= payload
2060 return self.create_packet(self.pg0, msg, self.sa.sport,
2061 self.sa.dport)
2062
2063 def verify_bad_packet_length(self):
2064 ike_msg = self.create_ike_init_msg(length=0xdead)
2065 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
Filip Tehlarfab5e7f2021-01-14 13:32:01 +00002066 self.assert_counter(self.pkt_count, 'bad_length')
Filip Tehlar558607d2020-07-16 07:25:56 +00002067
2068 def verify_bad_sa_payload_length(self):
2069 p = ikev2.IKEv2_payload_SA(length=0xdead)
2070 ike_msg = self.create_ike_init_msg(payload=p)
2071 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
Filip Tehlarfab5e7f2021-01-14 13:32:01 +00002072 self.assert_counter(self.pkt_count, 'malformed_packet')
Filip Tehlar558607d2020-07-16 07:25:56 +00002073
2074 def test_responder(self):
2075 self.pkt_count = 254
2076 self.verify_bad_packet_length()
2077 self.verify_bad_sa_payload_length()
2078
2079
Filip Tehlar12b517b2020-04-26 18:05:05 +00002080if __name__ == '__main__':
2081 unittest.main(testRunner=VppTestRunner)