blob: 558e8a02f87f91bc25f0673e58082f62bc8074b0 [file] [log] [blame]
Filip Tehlar12b517b2020-04-26 18:05:05 +00001import os
Filip Tehlar2008e312020-11-09 13:23:24 +00002import time
Filip Tehlarec112e52020-10-07 23:52:37 +00003from socket import inet_pton
Filip Tehlarbfeae8c2020-06-23 20:35:58 +00004from cryptography import x509
Filip Tehlar12b517b2020-04-26 18:05:05 +00005from cryptography.hazmat.backends import default_backend
6from cryptography.hazmat.primitives import hashes, hmac
Filip Tehlarbfeae8c2020-06-23 20:35:58 +00007from cryptography.hazmat.primitives.asymmetric import dh, padding
8from cryptography.hazmat.primitives.serialization import load_pem_private_key
Filip Tehlar12b517b2020-04-26 18:05:05 +00009from cryptography.hazmat.primitives.ciphers import (
10 Cipher,
11 algorithms,
12 modes,
13)
Filip Tehlar84962d12020-09-08 06:08:05 +000014from ipaddress import IPv4Address, IPv6Address, ip_address
Paul Vinciguerrae061dad2020-12-04 14:57:51 -050015import unittest
Filip Tehlarbfeae8c2020-06-23 20:35:58 +000016from scapy.layers.ipsec import ESP
Filip Tehlar12b517b2020-04-26 18:05:05 +000017from scapy.layers.inet import IP, UDP, Ether
Filip Tehlar84962d12020-09-08 06:08:05 +000018from scapy.layers.inet6 import IPv6
Filip Tehlar12b517b2020-04-26 18:05:05 +000019from scapy.packet import raw, Raw
20from scapy.utils import long_converter
Andrew Yourtchenko8dc0d482021-01-29 13:17:19 +000021from framework import tag_fixme_vpp_workers
Filip Tehlar12b517b2020-04-26 18:05:05 +000022from framework import VppTestCase, VppTestRunner
Filip Tehlarbfeae8c2020-06-23 20:35:58 +000023from vpp_ikev2 import Profile, IDType, AuthMethod
Filip Tehlar4f42a712020-07-01 08:56:59 +000024from vpp_papi import VppEnum
Filip Tehlar12b517b2020-04-26 18:05:05 +000025
Filip Tehlar84962d12020-09-08 06:08:05 +000026try:
27 text_type = unicode
28except NameError:
29 text_type = str
Filip Tehlar12b517b2020-04-26 18:05:05 +000030
31KEY_PAD = b"Key Pad for IKEv2"
Filip Tehlara7b963d2020-07-08 13:25:34 +000032SALT_SIZE = 4
33GCM_ICV_SIZE = 16
34GCM_IV_SIZE = 8
Filip Tehlar12b517b2020-04-26 18:05:05 +000035
36
37# defined in rfc3526
38# tuple structure is (p, g, key_len)
39DH = {
40 '2048MODPgr': (long_converter("""
41 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
42 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
43 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
44 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
45 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
46 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
47 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
48 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
49 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
50 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
Filip Tehlar4f42a712020-07-01 08:56:59 +000051 15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""), 2, 256),
52
53 '3072MODPgr': (long_converter("""
54 FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
55 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
56 EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
57 E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED
58 EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D
59 C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F
60 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D
61 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
62 E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
63 DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
64 15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64
65 ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7
66 ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
67 F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
68 BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
69 43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""), 2, 384)
Filip Tehlar12b517b2020-04-26 18:05:05 +000070}
71
72
73class CryptoAlgo(object):
74 def __init__(self, name, cipher, mode):
75 self.name = name
76 self.cipher = cipher
77 self.mode = mode
78 if self.cipher is not None:
79 self.bs = self.cipher.block_size // 8
80
Filip Tehlara7b963d2020-07-08 13:25:34 +000081 if self.name == 'AES-GCM-16ICV':
82 self.iv_len = GCM_IV_SIZE
83 else:
84 self.iv_len = self.bs
Filip Tehlar12b517b2020-04-26 18:05:05 +000085
Filip Tehlara7b963d2020-07-08 13:25:34 +000086 def encrypt(self, data, key, aad=None):
87 iv = os.urandom(self.iv_len)
88 if aad is None:
89 encryptor = Cipher(self.cipher(key), self.mode(iv),
90 default_backend()).encryptor()
91 return iv + encryptor.update(data) + encryptor.finalize()
92 else:
93 salt = key[-SALT_SIZE:]
94 nonce = salt + iv
95 encryptor = Cipher(self.cipher(key[:-SALT_SIZE]), self.mode(nonce),
96 default_backend()).encryptor()
97 encryptor.authenticate_additional_data(aad)
98 data = encryptor.update(data) + encryptor.finalize()
99 data += encryptor.tag[:GCM_ICV_SIZE]
100 return iv + data
101
102 def decrypt(self, data, key, aad=None, icv=None):
103 if aad is None:
104 iv = data[:self.iv_len]
105 ct = data[self.iv_len:]
106 decryptor = Cipher(algorithms.AES(key),
107 self.mode(iv),
108 default_backend()).decryptor()
109 return decryptor.update(ct) + decryptor.finalize()
110 else:
111 salt = key[-SALT_SIZE:]
112 nonce = salt + data[:GCM_IV_SIZE]
113 ct = data[GCM_IV_SIZE:]
114 key = key[:-SALT_SIZE]
115 decryptor = Cipher(algorithms.AES(key),
116 self.mode(nonce, icv, len(icv)),
117 default_backend()).decryptor()
118 decryptor.authenticate_additional_data(aad)
Filip Tehlaredf29002020-10-10 04:39:11 +0000119 return decryptor.update(ct) + decryptor.finalize()
Filip Tehlar12b517b2020-04-26 18:05:05 +0000120
121 def pad(self, data):
122 pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
123 data = data + b'\x00' * (pad_len - 1)
Filip Tehlar558607d2020-07-16 07:25:56 +0000124 return data + bytes([pad_len - 1])
Filip Tehlar12b517b2020-04-26 18:05:05 +0000125
126
127class AuthAlgo(object):
128 def __init__(self, name, mac, mod, key_len, trunc_len=None):
129 self.name = name
130 self.mac = mac
131 self.mod = mod
132 self.key_len = key_len
133 self.trunc_len = trunc_len or key_len
134
135
136CRYPTO_ALGOS = {
137 'NULL': CryptoAlgo('NULL', cipher=None, mode=None),
138 'AES-CBC': CryptoAlgo('AES-CBC', cipher=algorithms.AES, mode=modes.CBC),
Filip Tehlar4f42a712020-07-01 08:56:59 +0000139 'AES-GCM-16ICV': CryptoAlgo('AES-GCM-16ICV', cipher=algorithms.AES,
140 mode=modes.GCM),
Filip Tehlar12b517b2020-04-26 18:05:05 +0000141}
142
143AUTH_ALGOS = {
144 'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
145 'HMAC-SHA1-96': AuthAlgo('HMAC-SHA1-96', hmac.HMAC, hashes.SHA1, 20, 12),
Filip Tehlar4f42a712020-07-01 08:56:59 +0000146 'SHA2-256-128': AuthAlgo('SHA2-256-128', hmac.HMAC, hashes.SHA256, 32, 16),
147 'SHA2-384-192': AuthAlgo('SHA2-384-192', hmac.HMAC, hashes.SHA256, 48, 24),
148 'SHA2-512-256': AuthAlgo('SHA2-512-256', hmac.HMAC, hashes.SHA256, 64, 32),
Filip Tehlar12b517b2020-04-26 18:05:05 +0000149}
150
151PRF_ALGOS = {
152 'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
153 'PRF_HMAC_SHA2_256': AuthAlgo('PRF_HMAC_SHA2_256', hmac.HMAC,
154 hashes.SHA256, 32),
155}
156
Filip Tehlar68ad6252020-10-30 05:28:11 +0000157CRYPTO_IDS = {
158 12: 'AES-CBC',
159 20: 'AES-GCM-16ICV',
160}
161
162INTEG_IDS = {
163 2: 'HMAC-SHA1-96',
164 12: 'SHA2-256-128',
165 13: 'SHA2-384-192',
166 14: 'SHA2-512-256',
167}
168
Filip Tehlar12b517b2020-04-26 18:05:05 +0000169
170class IKEv2ChildSA(object):
Filip Tehlar68ad6252020-10-30 05:28:11 +0000171 def __init__(self, local_ts, remote_ts, is_initiator):
172 spi = os.urandom(4)
173 if is_initiator:
174 self.ispi = spi
175 self.rspi = None
176 else:
177 self.rspi = spi
178 self.ispi = None
Filip Tehlar12b517b2020-04-26 18:05:05 +0000179 self.local_ts = local_ts
180 self.remote_ts = remote_ts
181
182
183class IKEv2SA(object):
Filip Tehlar84962d12020-09-08 06:08:05 +0000184 def __init__(self, test, is_initiator=True, i_id=None, r_id=None,
185 spi=b'\x01\x02\x03\x04\x05\x06\x07\x08', id_type='fqdn',
186 nonce=None, auth_data=None, local_ts=None, remote_ts=None,
Filip Tehlar027d8132020-12-04 17:38:11 +0000187 auth_method='shared-key', priv_key=None, i_natt=False,
188 r_natt=False, udp_encap=False):
Filip Tehlar67b8a7f2020-11-06 11:00:42 +0000189 self.udp_encap = udp_encap
Filip Tehlar027d8132020-12-04 17:38:11 +0000190 self.i_natt = i_natt
191 self.r_natt = r_natt
192 if i_natt or r_natt:
Filip Tehlarbfeae8c2020-06-23 20:35:58 +0000193 self.sport = 4500
194 self.dport = 4500
195 else:
196 self.sport = 500
197 self.dport = 500
Filip Tehlar558607d2020-07-16 07:25:56 +0000198 self.msg_id = 0
Filip Tehlar12b517b2020-04-26 18:05:05 +0000199 self.dh_params = None
200 self.test = test
Filip Tehlarbfeae8c2020-06-23 20:35:58 +0000201 self.priv_key = priv_key
Filip Tehlar12b517b2020-04-26 18:05:05 +0000202 self.is_initiator = is_initiator
203 nonce = nonce or os.urandom(32)
204 self.auth_data = auth_data
Filip Tehlar4128c7b2020-05-10 05:18:37 +0000205 self.i_id = i_id
206 self.r_id = r_id
Filip Tehlar12b517b2020-04-26 18:05:05 +0000207 if isinstance(id_type, str):
208 self.id_type = IDType.value(id_type)
209 else:
210 self.id_type = id_type
211 self.auth_method = auth_method
212 if self.is_initiator:
Filip Tehlar84962d12020-09-08 06:08:05 +0000213 self.rspi = 8 * b'\x00'
Filip Tehlar12b517b2020-04-26 18:05:05 +0000214 self.ispi = spi
Filip Tehlar12b517b2020-04-26 18:05:05 +0000215 self.i_nonce = nonce
216 else:
217 self.rspi = spi
Filip Tehlar84962d12020-09-08 06:08:05 +0000218 self.ispi = 8 * b'\x00'
Filip Tehlare7c83962020-09-23 11:20:12 +0000219 self.r_nonce = nonce
Filip Tehlar68ad6252020-10-30 05:28:11 +0000220 self.child_sas = [IKEv2ChildSA(local_ts, remote_ts,
221 self.is_initiator)]
Filip Tehlar12b517b2020-04-26 18:05:05 +0000222
Filip Tehlar558607d2020-07-16 07:25:56 +0000223 def new_msg_id(self):
224 self.msg_id += 1
225 return self.msg_id
226
Filip Tehlare7c83962020-09-23 11:20:12 +0000227 @property
228 def my_dh_pub_key(self):
229 if self.is_initiator:
230 return self.i_dh_data
231 return self.r_dh_data
232
233 @property
234 def peer_dh_pub_key(self):
235 if self.is_initiator:
236 return self.r_dh_data
Filip Tehlar12b517b2020-04-26 18:05:05 +0000237 return self.i_dh_data
238
Filip Tehlar027d8132020-12-04 17:38:11 +0000239 @property
240 def natt(self):
241 return self.i_natt or self.r_natt
242
Filip Tehlar12b517b2020-04-26 18:05:05 +0000243 def compute_secret(self):
244 priv = self.dh_private_key
Filip Tehlare7c83962020-09-23 11:20:12 +0000245 peer = self.peer_dh_pub_key
Filip Tehlar12b517b2020-04-26 18:05:05 +0000246 p, g, l = self.ike_group
247 return pow(int.from_bytes(peer, 'big'),
248 int.from_bytes(priv, 'big'), p).to_bytes(l, 'big')
249
250 def generate_dh_data(self):
251 # generate DH keys
Filip Tehlare7c83962020-09-23 11:20:12 +0000252 if self.ike_dh not in DH:
253 raise NotImplementedError('%s not in DH group' % self.ike_dh)
254
255 if self.dh_params is None:
256 dhg = DH[self.ike_dh]
257 pn = dh.DHParameterNumbers(dhg[0], dhg[1])
258 self.dh_params = pn.parameters(default_backend())
259
260 priv = self.dh_params.generate_private_key()
261 pub = priv.public_key()
262 x = priv.private_numbers().x
263 self.dh_private_key = x.to_bytes(priv.key_size // 8, 'big')
264 y = pub.public_numbers().y
265
Filip Tehlar12b517b2020-04-26 18:05:05 +0000266 if self.is_initiator:
Filip Tehlar12b517b2020-04-26 18:05:05 +0000267 self.i_dh_data = y.to_bytes(pub.key_size // 8, 'big')
Filip Tehlare7c83962020-09-23 11:20:12 +0000268 else:
269 self.r_dh_data = y.to_bytes(pub.key_size // 8, 'big')
Filip Tehlar12b517b2020-04-26 18:05:05 +0000270
271 def complete_dh_data(self):
272 self.dh_shared_secret = self.compute_secret()
273
274 def calc_child_keys(self):
275 prf = self.ike_prf_alg.mod()
276 s = self.i_nonce + self.r_nonce
277 c = self.child_sas[0]
278
279 encr_key_len = self.esp_crypto_key_len
Filip Tehlar4f42a712020-07-01 08:56:59 +0000280 integ_key_len = self.esp_integ_alg.key_len
281 salt_len = 0 if integ_key_len else 4
282
Filip Tehlar12b517b2020-04-26 18:05:05 +0000283 l = (integ_key_len * 2 +
Filip Tehlar4f42a712020-07-01 08:56:59 +0000284 encr_key_len * 2 +
285 salt_len * 2)
Filip Tehlar12b517b2020-04-26 18:05:05 +0000286 keymat = self.calc_prfplus(prf, self.sk_d, s, l)
287
288 pos = 0
289 c.sk_ei = keymat[pos:pos+encr_key_len]
290 pos += encr_key_len
291
Filip Tehlar4f42a712020-07-01 08:56:59 +0000292 if integ_key_len:
293 c.sk_ai = keymat[pos:pos+integ_key_len]
294 pos += integ_key_len
295 else:
296 c.salt_ei = keymat[pos:pos+salt_len]
297 pos += salt_len
Filip Tehlar12b517b2020-04-26 18:05:05 +0000298
299 c.sk_er = keymat[pos:pos+encr_key_len]
300 pos += encr_key_len
301
Filip Tehlar4f42a712020-07-01 08:56:59 +0000302 if integ_key_len:
303 c.sk_ar = keymat[pos:pos+integ_key_len]
304 pos += integ_key_len
305 else:
306 c.salt_er = keymat[pos:pos+salt_len]
307 pos += salt_len
Filip Tehlar12b517b2020-04-26 18:05:05 +0000308
309 def calc_prfplus(self, prf, key, seed, length):
310 r = b''
311 t = None
312 x = 1
313 while len(r) < length and x < 255:
314 if t is not None:
315 s = t
316 else:
317 s = b''
318 s = s + seed + bytes([x])
319 t = self.calc_prf(prf, key, s)
320 r = r + t
321 x = x + 1
322
323 if x == 255:
324 return None
325 return r
326
327 def calc_prf(self, prf, key, data):
Filip Tehlara7b963d2020-07-08 13:25:34 +0000328 h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
Filip Tehlar12b517b2020-04-26 18:05:05 +0000329 h.update(data)
330 return h.finalize()
331
332 def calc_keys(self):
333 prf = self.ike_prf_alg.mod()
334 # SKEYSEED = prf(Ni | Nr, g^ir)
335 s = self.i_nonce + self.r_nonce
336 self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
337
338 # calculate S = Ni | Nr | SPIi SPIr
339 s = s + self.ispi + self.rspi
340
341 prf_key_trunc = self.ike_prf_alg.trunc_len
342 encr_key_len = self.ike_crypto_key_len
343 tr_prf_key_len = self.ike_prf_alg.key_len
344 integ_key_len = self.ike_integ_alg.key_len
Filip Tehlara7b963d2020-07-08 13:25:34 +0000345 if integ_key_len == 0:
346 salt_size = 4
347 else:
348 salt_size = 0
349
Filip Tehlar12b517b2020-04-26 18:05:05 +0000350 l = (prf_key_trunc +
351 integ_key_len * 2 +
352 encr_key_len * 2 +
Filip Tehlara7b963d2020-07-08 13:25:34 +0000353 tr_prf_key_len * 2 +
354 salt_size * 2)
Filip Tehlar12b517b2020-04-26 18:05:05 +0000355 keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
356
357 pos = 0
358 self.sk_d = keymat[:pos+prf_key_trunc]
359 pos += prf_key_trunc
360
361 self.sk_ai = keymat[pos:pos+integ_key_len]
362 pos += integ_key_len
363 self.sk_ar = keymat[pos:pos+integ_key_len]
364 pos += integ_key_len
365
Filip Tehlara7b963d2020-07-08 13:25:34 +0000366 self.sk_ei = keymat[pos:pos+encr_key_len + salt_size]
367 pos += encr_key_len + salt_size
368 self.sk_er = keymat[pos:pos+encr_key_len + salt_size]
369 pos += encr_key_len + salt_size
Filip Tehlar12b517b2020-04-26 18:05:05 +0000370
371 self.sk_pi = keymat[pos:pos+tr_prf_key_len]
372 pos += tr_prf_key_len
373 self.sk_pr = keymat[pos:pos+tr_prf_key_len]
374
375 def generate_authmsg(self, prf, packet):
376 if self.is_initiator:
377 id = self.i_id
378 nonce = self.r_nonce
379 key = self.sk_pi
Filip Tehlare7c83962020-09-23 11:20:12 +0000380 else:
381 id = self.r_id
382 nonce = self.i_nonce
383 key = self.sk_pr
Filip Tehlar12b517b2020-04-26 18:05:05 +0000384 data = bytes([self.id_type, 0, 0, 0]) + id
385 id_hash = self.calc_prf(prf, key, data)
386 return packet + nonce + id_hash
387
388 def auth_init(self):
389 prf = self.ike_prf_alg.mod()
Filip Tehlare7c83962020-09-23 11:20:12 +0000390 if self.is_initiator:
391 packet = self.init_req_packet
392 else:
393 packet = self.init_resp_packet
394 authmsg = self.generate_authmsg(prf, raw(packet))
Filip Tehlarbfeae8c2020-06-23 20:35:58 +0000395 if self.auth_method == 'shared-key':
396 psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
397 self.auth_data = self.calc_prf(prf, psk, authmsg)
398 elif self.auth_method == 'rsa-sig':
399 self.auth_data = self.priv_key.sign(authmsg, padding.PKCS1v15(),
400 hashes.SHA1())
401 else:
402 raise TypeError('unknown auth method type!')
Filip Tehlar12b517b2020-04-26 18:05:05 +0000403
Filip Tehlara7b963d2020-07-08 13:25:34 +0000404 def encrypt(self, data, aad=None):
Filip Tehlar12b517b2020-04-26 18:05:05 +0000405 data = self.ike_crypto_alg.pad(data)
Filip Tehlara7b963d2020-07-08 13:25:34 +0000406 return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
Filip Tehlar12b517b2020-04-26 18:05:05 +0000407
408 @property
409 def peer_authkey(self):
410 if self.is_initiator:
411 return self.sk_ar
412 return self.sk_ai
413
414 @property
415 def my_authkey(self):
416 if self.is_initiator:
417 return self.sk_ai
418 return self.sk_ar
419
420 @property
421 def my_cryptokey(self):
422 if self.is_initiator:
423 return self.sk_ei
424 return self.sk_er
425
426 @property
427 def peer_cryptokey(self):
428 if self.is_initiator:
429 return self.sk_er
430 return self.sk_ei
431
Filip Tehlar4f42a712020-07-01 08:56:59 +0000432 def concat(self, alg, key_len):
433 return alg + '-' + str(key_len * 8)
434
435 @property
436 def vpp_ike_cypto_alg(self):
437 return self.concat(self.ike_crypto, self.ike_crypto_key_len)
438
439 @property
440 def vpp_esp_cypto_alg(self):
441 return self.concat(self.esp_crypto, self.esp_crypto_key_len)
442
Filip Tehlar12b517b2020-04-26 18:05:05 +0000443 def verify_hmac(self, ikemsg):
444 integ_trunc = self.ike_integ_alg.trunc_len
445 exp_hmac = ikemsg[-integ_trunc:]
446 data = ikemsg[:-integ_trunc]
447 computed_hmac = self.compute_hmac(self.ike_integ_alg.mod(),
448 self.peer_authkey, data)
449 self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
450
451 def compute_hmac(self, integ, key, data):
452 h = self.ike_integ_alg.mac(key, integ, backend=default_backend())
453 h.update(data)
454 return h.finalize()
455
Filip Tehlara7b963d2020-07-08 13:25:34 +0000456 def decrypt(self, data, aad=None, icv=None):
457 return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
Filip Tehlar12b517b2020-04-26 18:05:05 +0000458
459 def hmac_and_decrypt(self, ike):
460 ep = ike[ikev2.IKEv2_payload_Encrypted]
Filip Tehlara7b963d2020-07-08 13:25:34 +0000461 if self.ike_crypto == 'AES-GCM-16ICV':
462 aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
463 ct = ep.load[:-GCM_ICV_SIZE]
464 tag = ep.load[-GCM_ICV_SIZE:]
Filip Tehlaredf29002020-10-10 04:39:11 +0000465 plain = self.decrypt(ct, raw(ike)[:aad_len], tag)
Filip Tehlara7b963d2020-07-08 13:25:34 +0000466 else:
467 self.verify_hmac(raw(ike))
468 integ_trunc = self.ike_integ_alg.trunc_len
Filip Tehlar12b517b2020-04-26 18:05:05 +0000469
Filip Tehlara7b963d2020-07-08 13:25:34 +0000470 # remove ICV and decrypt payload
471 ct = ep.load[:-integ_trunc]
Filip Tehlaredf29002020-10-10 04:39:11 +0000472 plain = self.decrypt(ct)
473 # remove padding
474 pad_len = plain[-1]
475 return plain[:-pad_len - 1]
Filip Tehlar12b517b2020-04-26 18:05:05 +0000476
Filip Tehlar84962d12020-09-08 06:08:05 +0000477 def build_ts_addr(self, ts, version):
478 return {'starting_address_v' + version: ts['start_addr'],
479 'ending_address_v' + version: ts['end_addr']}
480
481 def generate_ts(self, is_ip4):
Filip Tehlar12b517b2020-04-26 18:05:05 +0000482 c = self.child_sas[0]
Filip Tehlar84962d12020-09-08 06:08:05 +0000483 ts_data = {'IP_protocol_ID': 0,
484 'start_port': 0,
485 'end_port': 0xffff}
486 if is_ip4:
487 ts_data.update(self.build_ts_addr(c.local_ts, '4'))
488 ts1 = ikev2.IPv4TrafficSelector(**ts_data)
489 ts_data.update(self.build_ts_addr(c.remote_ts, '4'))
490 ts2 = ikev2.IPv4TrafficSelector(**ts_data)
491 else:
492 ts_data.update(self.build_ts_addr(c.local_ts, '6'))
493 ts1 = ikev2.IPv6TrafficSelector(**ts_data)
494 ts_data.update(self.build_ts_addr(c.remote_ts, '6'))
495 ts2 = ikev2.IPv6TrafficSelector(**ts_data)
Filip Tehlare7c83962020-09-23 11:20:12 +0000496
497 if self.is_initiator:
498 return ([ts1], [ts2])
499 return ([ts2], [ts1])
Filip Tehlar12b517b2020-04-26 18:05:05 +0000500
501 def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
502 if crypto not in CRYPTO_ALGOS:
503 raise TypeError('unsupported encryption algo %r' % crypto)
504 self.ike_crypto = crypto
505 self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
506 self.ike_crypto_key_len = crypto_key_len
507
508 if integ not in AUTH_ALGOS:
509 raise TypeError('unsupported auth algo %r' % integ)
Filip Tehlara7b963d2020-07-08 13:25:34 +0000510 self.ike_integ = None if integ == 'NULL' else integ
Filip Tehlar12b517b2020-04-26 18:05:05 +0000511 self.ike_integ_alg = AUTH_ALGOS[integ]
512
513 if prf not in PRF_ALGOS:
514 raise TypeError('unsupported prf algo %r' % prf)
515 self.ike_prf = prf
516 self.ike_prf_alg = PRF_ALGOS[prf]
517 self.ike_dh = dh
518 self.ike_group = DH[self.ike_dh]
519
520 def set_esp_props(self, crypto, crypto_key_len, integ):
521 self.esp_crypto_key_len = crypto_key_len
522 if crypto not in CRYPTO_ALGOS:
523 raise TypeError('unsupported encryption algo %r' % crypto)
524 self.esp_crypto = crypto
525 self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
526
527 if integ not in AUTH_ALGOS:
528 raise TypeError('unsupported auth algo %r' % integ)
Filip Tehlar4f42a712020-07-01 08:56:59 +0000529 self.esp_integ = None if integ == 'NULL' else integ
Filip Tehlar12b517b2020-04-26 18:05:05 +0000530 self.esp_integ_alg = AUTH_ALGOS[integ]
531
532 def crypto_attr(self, key_len):
Filip Tehlara7b963d2020-07-08 13:25:34 +0000533 if self.ike_crypto in ['AES-CBC', 'AES-GCM-16ICV']:
Filip Tehlar12b517b2020-04-26 18:05:05 +0000534 return (0x800e << 16 | key_len << 3, 12)
535 else:
536 raise Exception('unsupported attribute type')
537
538 def ike_crypto_attr(self):
539 return self.crypto_attr(self.ike_crypto_key_len)
540
541 def esp_crypto_attr(self):
542 return self.crypto_attr(self.esp_crypto_key_len)
543
Filip Tehlarec112e52020-10-07 23:52:37 +0000544 def compute_nat_sha1(self, ip, port, rspi=None):
545 if rspi is None:
546 rspi = self.rspi
547 data = self.ispi + rspi + ip + (port).to_bytes(2, 'big')
Filip Tehlarbfeae8c2020-06-23 20:35:58 +0000548 digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
549 digest.update(data)
550 return digest.finalize()
Filip Tehlar12b517b2020-04-26 18:05:05 +0000551
Filip Tehlarbfeae8c2020-06-23 20:35:58 +0000552
Filip Tehlare7c83962020-09-23 11:20:12 +0000553class IkePeer(VppTestCase):
554 """ common class for initiator and responder """
Filip Tehlar12b517b2020-04-26 18:05:05 +0000555
556 @classmethod
557 def setUpClass(cls):
558 import scapy.contrib.ikev2 as _ikev2
559 globals()['ikev2'] = _ikev2
Filip Tehlare7c83962020-09-23 11:20:12 +0000560 super(IkePeer, cls).setUpClass()
Filip Tehlar12b517b2020-04-26 18:05:05 +0000561 cls.create_pg_interfaces(range(2))
562 for i in cls.pg_interfaces:
563 i.admin_up()
564 i.config_ip4()
565 i.resolve_arp()
Filip Tehlar84962d12020-09-08 06:08:05 +0000566 i.config_ip6()
567 i.resolve_ndp()
Filip Tehlar12b517b2020-04-26 18:05:05 +0000568
569 @classmethod
570 def tearDownClass(cls):
Filip Tehlare7c83962020-09-23 11:20:12 +0000571 super(IkePeer, cls).tearDownClass()
Filip Tehlar12b517b2020-04-26 18:05:05 +0000572
Filip Tehlaredf29002020-10-10 04:39:11 +0000573 def tearDown(self):
574 super(IkePeer, self).tearDown()
575 if self.del_sa_from_responder:
576 self.initiate_del_sa_from_responder()
577 else:
578 self.initiate_del_sa_from_initiator()
579 r = self.vapi.ikev2_sa_dump()
580 self.assertEqual(len(r), 0)
581 sas = self.vapi.ipsec_sa_dump()
582 self.assertEqual(len(sas), 0)
583 self.p.remove_vpp_config()
584 self.assertIsNone(self.p.query_vpp_config())
585
Filip Tehlar12b517b2020-04-26 18:05:05 +0000586 def setUp(self):
Filip Tehlare7c83962020-09-23 11:20:12 +0000587 super(IkePeer, self).setUp()
Filip Tehlar12b517b2020-04-26 18:05:05 +0000588 self.config_tc()
Filip Tehlar12b517b2020-04-26 18:05:05 +0000589 self.p.add_vpp_config()
Filip Tehlar459d17b2020-07-06 15:40:08 +0000590 self.assertIsNotNone(self.p.query_vpp_config())
Filip Tehlar68ad6252020-10-30 05:28:11 +0000591 if self.sa.is_initiator:
592 self.sa.generate_dh_data()
Filip Tehlar84962d12020-09-08 06:08:05 +0000593 self.vapi.cli('ikev2 set logging level 4')
594 self.vapi.cli('event-lo clear')
Filip Tehlar12b517b2020-04-26 18:05:05 +0000595
Filip Tehlarfab5e7f2021-01-14 13:32:01 +0000596 def assert_counter(self, count, name, version='ip4'):
597 node_name = '/err/ikev2-%s/' % version + name
598 self.assertEqual(count, self.statistics.get_err_counter(node_name))
599
Filip Tehlar38340fa2020-11-19 21:34:48 +0000600 def create_rekey_request(self):
601 sa, first_payload = self.generate_auth_payload(is_rekey=True)
602 header = ikev2.IKEv2(
603 init_SPI=self.sa.ispi,
604 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
605 flags='Initiator', exch_type='CREATE_CHILD_SA')
606
607 ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
608 return self.create_packet(self.pg0, ike_msg, self.sa.sport,
609 self.sa.dport, self.sa.natt, self.ip6)
610
611 def create_empty_request(self):
612 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
613 id=self.sa.new_msg_id(), flags='Initiator',
614 exch_type='INFORMATIONAL',
615 next_payload='Encrypted')
616
617 msg = self.encrypt_ike_msg(header, b'', None)
618 return self.create_packet(self.pg0, msg, self.sa.sport,
619 self.sa.dport, self.sa.natt, self.ip6)
620
Filip Tehlar84962d12020-09-08 06:08:05 +0000621 def create_packet(self, src_if, msg, sport=500, dport=500, natt=False,
622 use_ip6=False):
623 if use_ip6:
624 src_ip = src_if.remote_ip6
625 dst_ip = src_if.local_ip6
626 ip_layer = IPv6
627 else:
628 src_ip = src_if.remote_ip4
629 dst_ip = src_if.local_ip4
630 ip_layer = IP
Filip Tehlarbfeae8c2020-06-23 20:35:58 +0000631 res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
Filip Tehlar84962d12020-09-08 06:08:05 +0000632 ip_layer(src=src_ip, dst=dst_ip) /
Filip Tehlarbfeae8c2020-06-23 20:35:58 +0000633 UDP(sport=sport, dport=dport))
634 if natt:
635 # insert non ESP marker
636 res = res / Raw(b'\x00' * 4)
637 return res / msg
Filip Tehlar12b517b2020-04-26 18:05:05 +0000638
Filip Tehlare7c83962020-09-23 11:20:12 +0000639 def verify_udp(self, udp):
640 self.assertEqual(udp.sport, self.sa.sport)
641 self.assertEqual(udp.dport, self.sa.dport)
Filip Tehlar12b517b2020-04-26 18:05:05 +0000642
Filip Tehlare7c83962020-09-23 11:20:12 +0000643 def get_ike_header(self, packet):
644 try:
645 ih = packet[ikev2.IKEv2]
Filip Tehlar18107c92020-12-01 14:51:09 +0000646 ih = self.verify_and_remove_non_esp_marker(ih)
Filip Tehlare7c83962020-09-23 11:20:12 +0000647 except IndexError as e:
648 # this is a workaround for getting IKEv2 layer as both ikev2 and
649 # ipsec register for port 4500
650 esp = packet[ESP]
651 ih = self.verify_and_remove_non_esp_marker(esp)
652 self.assertEqual(ih.version, 0x20)
Filip Tehlaredf29002020-10-10 04:39:11 +0000653 self.assertNotIn('Version', ih.flags)
Filip Tehlare7c83962020-09-23 11:20:12 +0000654 return ih
Filip Tehlar12b517b2020-04-26 18:05:05 +0000655
Filip Tehlare7c83962020-09-23 11:20:12 +0000656 def verify_and_remove_non_esp_marker(self, packet):
657 if self.sa.natt:
658 # if we are in nat traversal mode check for non esp marker
659 # and remove it
660 data = raw(packet)
661 self.assertEqual(data[:4], b'\x00' * 4)
662 return ikev2.IKEv2(data[4:])
Filip Tehlarbfeae8c2020-06-23 20:35:58 +0000663 else:
Filip Tehlare7c83962020-09-23 11:20:12 +0000664 return packet
Filip Tehlar12b517b2020-04-26 18:05:05 +0000665
Filip Tehlar558607d2020-07-16 07:25:56 +0000666 def encrypt_ike_msg(self, header, plain, first_payload):
667 if self.sa.ike_crypto == 'AES-GCM-16ICV':
668 data = self.sa.ike_crypto_alg.pad(raw(plain))
669 plen = len(data) + GCM_IV_SIZE + GCM_ICV_SIZE +\
670 len(ikev2.IKEv2_payload_Encrypted())
671 tlen = plen + len(ikev2.IKEv2())
672
673 # prepare aad data
674 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
675 length=plen)
676 header.length = tlen
677 res = header / sk_p
678 encr = self.sa.encrypt(raw(plain), raw(res))
679 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
680 length=plen, load=encr)
681 res = header / sk_p
682 else:
683 encr = self.sa.encrypt(raw(plain))
684 trunc_len = self.sa.ike_integ_alg.trunc_len
685 plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
686 tlen = plen + len(ikev2.IKEv2())
687
688 sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
689 length=plen, load=encr)
690 header.length = tlen
691 res = header / sk_p
692
693 integ_data = raw(res)
694 hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
695 self.sa.my_authkey, integ_data)
696 res = res / Raw(hmac_data[:trunc_len])
697 assert(len(res) == tlen)
698 return res
699
Filip Tehlar67b8a7f2020-11-06 11:00:42 +0000700 def verify_udp_encap(self, ipsec_sa):
701 e = VppEnum.vl_api_ipsec_sad_flags_t
702 if self.sa.udp_encap or self.sa.natt:
703 self.assertIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
704 else:
705 self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
706
Filip Tehlar68ad6252020-10-30 05:28:11 +0000707 def verify_ipsec_sas(self, is_rekey=False):
Filip Tehlar12b517b2020-04-26 18:05:05 +0000708 sas = self.vapi.ipsec_sa_dump()
Filip Tehlar68ad6252020-10-30 05:28:11 +0000709 if is_rekey:
710 # after rekey there is a short period of time in which old
711 # inbound SA is still present
712 sa_count = 3
713 else:
714 sa_count = 2
715 self.assertEqual(len(sas), sa_count)
Filip Tehlare7c83962020-09-23 11:20:12 +0000716 if self.sa.is_initiator:
Filip Tehlar68ad6252020-10-30 05:28:11 +0000717 if is_rekey:
718 sa0 = sas[0].entry
719 sa1 = sas[2].entry
720 else:
721 sa0 = sas[0].entry
722 sa1 = sas[1].entry
Filip Tehlare7c83962020-09-23 11:20:12 +0000723 else:
Filip Tehlar68ad6252020-10-30 05:28:11 +0000724 if is_rekey:
725 sa0 = sas[2].entry
726 sa1 = sas[0].entry
727 else:
728 sa1 = sas[0].entry
729 sa0 = sas[1].entry
Filip Tehlare7c83962020-09-23 11:20:12 +0000730
Filip Tehlar12b517b2020-04-26 18:05:05 +0000731 c = self.sa.child_sas[0]
732
Filip Tehlar67b8a7f2020-11-06 11:00:42 +0000733 self.verify_udp_encap(sa0)
734 self.verify_udp_encap(sa1)
Filip Tehlar4f42a712020-07-01 08:56:59 +0000735 vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
736 self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg)
737 self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg)
738
739 if self.sa.esp_integ is None:
740 vpp_integ_alg = 0
741 else:
742 vpp_integ_alg = self.vpp_enums[self.sa.esp_integ]
743 self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg)
744 self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg)
745
Filip Tehlar12b517b2020-04-26 18:05:05 +0000746 # verify crypto keys
747 self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
748 self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
749 self.assertEqual(sa0.crypto_key.data[:len(c.sk_er)], c.sk_er)
750 self.assertEqual(sa1.crypto_key.data[:len(c.sk_ei)], c.sk_ei)
751
752 # verify integ keys
Filip Tehlar4f42a712020-07-01 08:56:59 +0000753 if vpp_integ_alg:
754 self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
755 self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
756 self.assertEqual(sa0.integrity_key.data[:len(c.sk_ar)], c.sk_ar)
757 self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai)
758 else:
759 self.assertEqual(sa0.salt.to_bytes(4, 'little'), c.salt_er)
760 self.assertEqual(sa1.salt.to_bytes(4, 'little'), c.salt_ei)
Filip Tehlar12b517b2020-04-26 18:05:05 +0000761
jan_cavojskya340fe12020-07-08 09:24:12 +0200762 def verify_keymat(self, api_keys, keys, name):
763 km = getattr(keys, name)
764 api_km = getattr(api_keys, name)
765 api_km_len = getattr(api_keys, name + '_len')
766 self.assertEqual(len(km), api_km_len)
767 self.assertEqual(km, api_km[:api_km_len])
768
769 def verify_id(self, api_id, exp_id):
770 self.assertEqual(api_id.type, IDType.value(exp_id.type))
771 self.assertEqual(api_id.data_len, exp_id.data_len)
772 self.assertEqual(bytes(api_id.data, 'ascii'), exp_id.type)
773
774 def verify_ike_sas(self):
775 r = self.vapi.ikev2_sa_dump()
776 self.assertEqual(len(r), 1)
777 sa = r[0].sa
Filip Tehlar84962d12020-09-08 06:08:05 +0000778 self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, 'big'))
jan_cavojskya340fe12020-07-08 09:24:12 +0200779 self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, 'big'))
Filip Tehlar84962d12020-09-08 06:08:05 +0000780 if self.ip6:
Filip Tehlare7c83962020-09-23 11:20:12 +0000781 if self.sa.is_initiator:
782 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
783 self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
784 else:
785 self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
786 self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
Filip Tehlar84962d12020-09-08 06:08:05 +0000787 else:
Filip Tehlare7c83962020-09-23 11:20:12 +0000788 if self.sa.is_initiator:
789 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
790 self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
791 else:
792 self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
793 self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
jan_cavojskya340fe12020-07-08 09:24:12 +0200794 self.verify_keymat(sa.keys, self.sa, 'sk_d')
795 self.verify_keymat(sa.keys, self.sa, 'sk_ai')
796 self.verify_keymat(sa.keys, self.sa, 'sk_ar')
797 self.verify_keymat(sa.keys, self.sa, 'sk_ei')
798 self.verify_keymat(sa.keys, self.sa, 'sk_er')
799 self.verify_keymat(sa.keys, self.sa, 'sk_pi')
800 self.verify_keymat(sa.keys, self.sa, 'sk_pr')
801
802 self.assertEqual(sa.i_id.type, self.sa.id_type)
803 self.assertEqual(sa.r_id.type, self.sa.id_type)
804 self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
805 self.assertEqual(sa.r_id.data_len, len(self.sa.r_id))
806 self.assertEqual(bytes(sa.i_id.data, 'ascii'), self.sa.i_id)
807 self.assertEqual(bytes(sa.r_id.data, 'ascii'), self.sa.r_id)
808
809 r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
810 self.assertEqual(len(r), 1)
811 csa = r[0].child_sa
812 self.assertEqual(csa.sa_index, sa.sa_index)
813 c = self.sa.child_sas[0]
814 if hasattr(c, 'sk_ai'):
815 self.verify_keymat(csa.keys, c, 'sk_ai')
816 self.verify_keymat(csa.keys, c, 'sk_ar')
817 self.verify_keymat(csa.keys, c, 'sk_ei')
818 self.verify_keymat(csa.keys, c, 'sk_er')
Filip Tehlar68ad6252020-10-30 05:28:11 +0000819 self.assertEqual(csa.i_spi.to_bytes(4, 'big'), c.ispi)
820 self.assertEqual(csa.r_spi.to_bytes(4, 'big'), c.rspi)
jan_cavojskya340fe12020-07-08 09:24:12 +0200821
Filip Tehlar84962d12020-09-08 06:08:05 +0000822 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
jan_cavojskya340fe12020-07-08 09:24:12 +0200823 tsi = tsi[0]
824 tsr = tsr[0]
825 r = self.vapi.ikev2_traffic_selector_dump(
826 is_initiator=True, sa_index=sa.sa_index,
827 child_sa_index=csa.child_sa_index)
828 self.assertEqual(len(r), 1)
829 ts = r[0].ts
830 self.verify_ts(r[0].ts, tsi[0], True)
831
832 r = self.vapi.ikev2_traffic_selector_dump(
833 is_initiator=False, sa_index=sa.sa_index,
834 child_sa_index=csa.child_sa_index)
835 self.assertEqual(len(r), 1)
836 self.verify_ts(r[0].ts, tsr[0], False)
837
838 n = self.vapi.ikev2_nonce_get(is_initiator=True,
839 sa_index=sa.sa_index)
840 self.verify_nonce(n, self.sa.i_nonce)
841 n = self.vapi.ikev2_nonce_get(is_initiator=False,
842 sa_index=sa.sa_index)
843 self.verify_nonce(n, self.sa.r_nonce)
844
845 def verify_nonce(self, api_nonce, nonce):
846 self.assertEqual(api_nonce.data_len, len(nonce))
847 self.assertEqual(api_nonce.nonce, nonce)
848
849 def verify_ts(self, api_ts, ts, is_initiator):
850 if is_initiator:
851 self.assertTrue(api_ts.is_local)
852 else:
853 self.assertFalse(api_ts.is_local)
Filip Tehlar84962d12020-09-08 06:08:05 +0000854
855 if self.p.ts_is_ip4:
856 self.assertEqual(api_ts.start_addr,
857 IPv4Address(ts.starting_address_v4))
858 self.assertEqual(api_ts.end_addr,
859 IPv4Address(ts.ending_address_v4))
860 else:
861 self.assertEqual(api_ts.start_addr,
862 IPv6Address(ts.starting_address_v6))
863 self.assertEqual(api_ts.end_addr,
864 IPv6Address(ts.ending_address_v6))
jan_cavojskya340fe12020-07-08 09:24:12 +0200865 self.assertEqual(api_ts.start_port, ts.start_port)
866 self.assertEqual(api_ts.end_port, ts.end_port)
867 self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
868
Filip Tehlare7c83962020-09-23 11:20:12 +0000869
870class TemplateInitiator(IkePeer):
871 """ initiator test template """
872
Filip Tehlaredf29002020-10-10 04:39:11 +0000873 def initiate_del_sa_from_initiator(self):
874 ispi = int.from_bytes(self.sa.ispi, 'little')
875 self.pg0.enable_capture()
876 self.pg_start()
877 self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
878 capture = self.pg0.get_capture(1)
879 ih = self.get_ike_header(capture[0])
880 self.assertNotIn('Response', ih.flags)
881 self.assertIn('Initiator', ih.flags)
882 self.assertEqual(ih.init_SPI, self.sa.ispi)
883 self.assertEqual(ih.resp_SPI, self.sa.rspi)
884 plain = self.sa.hmac_and_decrypt(ih)
885 d = ikev2.IKEv2_payload_Delete(plain)
886 self.assertEqual(d.proto, 1) # proto=IKEv2
887 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
888 flags='Response', exch_type='INFORMATIONAL',
889 id=ih.id, next_payload='Encrypted')
890 resp = self.encrypt_ike_msg(header, b'', None)
891 self.send_and_assert_no_replies(self.pg0, resp)
892
893 def verify_del_sa(self, packet):
894 ih = self.get_ike_header(packet)
895 self.assertEqual(ih.id, self.sa.msg_id)
896 self.assertEqual(ih.exch_type, 37) # exchange informational
897 self.assertIn('Response', ih.flags)
898 self.assertIn('Initiator', ih.flags)
899 plain = self.sa.hmac_and_decrypt(ih)
900 self.assertEqual(plain, b'')
901
902 def initiate_del_sa_from_responder(self):
903 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
904 exch_type='INFORMATIONAL',
905 id=self.sa.new_msg_id())
906 del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
907 ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
908 packet = self.create_packet(self.pg0, ike_msg,
909 self.sa.sport, self.sa.dport,
910 self.sa.natt, self.ip6)
911 self.pg0.add_stream(packet)
912 self.pg0.enable_capture()
913 self.pg_start()
914 capture = self.pg0.get_capture(1)
915 self.verify_del_sa(capture[0])
Filip Tehlare7c83962020-09-23 11:20:12 +0000916
Filip Tehlarec112e52020-10-07 23:52:37 +0000917 @staticmethod
918 def find_notify_payload(packet, notify_type):
919 n = packet[ikev2.IKEv2_payload_Notify]
920 while n is not None:
921 if n.type == notify_type:
922 return n
923 n = n.payload
924 return None
925
926 def verify_nat_detection(self, packet):
927 if self.ip6:
928 iph = packet[IPv6]
929 else:
930 iph = packet[IP]
931 udp = packet[UDP]
932
933 # NAT_DETECTION_SOURCE_IP
934 s = self.find_notify_payload(packet, 16388)
935 self.assertIsNotNone(s)
936 src_sha = self.sa.compute_nat_sha1(
937 inet_pton(socket.AF_INET, iph.src), udp.sport, b'\x00' * 8)
938 self.assertEqual(s.load, src_sha)
939
940 # NAT_DETECTION_DESTINATION_IP
941 s = self.find_notify_payload(packet, 16389)
942 self.assertIsNotNone(s)
943 dst_sha = self.sa.compute_nat_sha1(
944 inet_pton(socket.AF_INET, iph.dst), udp.dport, b'\x00' * 8)
945 self.assertEqual(s.load, dst_sha)
946
Filip Tehlare7c83962020-09-23 11:20:12 +0000947 def verify_sa_init_request(self, packet):
Filip Tehlar18107c92020-12-01 14:51:09 +0000948 udp = packet[UDP]
949 self.sa.dport = udp.sport
Filip Tehlare7c83962020-09-23 11:20:12 +0000950 ih = packet[ikev2.IKEv2]
951 self.assertNotEqual(ih.init_SPI, 8 * b'\x00')
952 self.assertEqual(ih.exch_type, 34) # SA_INIT
953 self.sa.ispi = ih.init_SPI
954 self.assertEqual(ih.resp_SPI, 8 * b'\x00')
955 self.assertIn('Initiator', ih.flags)
956 self.assertNotIn('Response', ih.flags)
957 self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
958 self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
959
960 prop = packet[ikev2.IKEv2_payload_Proposal]
961 self.assertEqual(prop.proto, 1) # proto = ikev2
962 self.assertEqual(prop.proposal, 1)
963 self.assertEqual(prop.trans[0].transform_type, 1) # encryption
964 self.assertEqual(prop.trans[0].transform_id,
965 self.p.ike_transforms['crypto_alg'])
966 self.assertEqual(prop.trans[1].transform_type, 2) # prf
967 self.assertEqual(prop.trans[1].transform_id, 5) # "hmac-sha2-256"
968 self.assertEqual(prop.trans[2].transform_type, 4) # dh
969 self.assertEqual(prop.trans[2].transform_id,
970 self.p.ike_transforms['dh_group'])
971
Filip Tehlarec112e52020-10-07 23:52:37 +0000972 self.verify_nat_detection(packet)
Filip Tehlar68ad6252020-10-30 05:28:11 +0000973 self.sa.set_ike_props(
974 crypto='AES-GCM-16ICV', crypto_key_len=32,
975 integ='NULL', prf='PRF_HMAC_SHA2_256', dh='3072MODPgr')
976 self.sa.set_esp_props(crypto='AES-CBC', crypto_key_len=32,
977 integ='SHA2-256-128')
978 self.sa.generate_dh_data()
Filip Tehlare7c83962020-09-23 11:20:12 +0000979 self.sa.complete_dh_data()
980 self.sa.calc_keys()
981
Filip Tehlar68ad6252020-10-30 05:28:11 +0000982 def update_esp_transforms(self, trans, sa):
983 while trans:
984 if trans.transform_type == 1: # ecryption
985 sa.esp_crypto = CRYPTO_IDS[trans.transform_id]
986 elif trans.transform_type == 3: # integrity
987 sa.esp_integ = INTEG_IDS[trans.transform_id]
988 trans = trans.payload
989
Filip Tehlare7c83962020-09-23 11:20:12 +0000990 def verify_sa_auth_req(self, packet):
Filip Tehlar18107c92020-12-01 14:51:09 +0000991 udp = packet[UDP]
992 self.sa.dport = udp.sport
Filip Tehlare7c83962020-09-23 11:20:12 +0000993 ih = self.get_ike_header(packet)
994 self.assertEqual(ih.resp_SPI, self.sa.rspi)
995 self.assertEqual(ih.init_SPI, self.sa.ispi)
996 self.assertEqual(ih.exch_type, 35) # IKE_AUTH
997 self.assertIn('Initiator', ih.flags)
998 self.assertNotIn('Response', ih.flags)
999
1000 udp = packet[UDP]
1001 self.verify_udp(udp)
1002 self.assertEqual(ih.id, self.sa.msg_id + 1)
1003 self.sa.msg_id += 1
1004 plain = self.sa.hmac_and_decrypt(ih)
1005 idi = ikev2.IKEv2_payload_IDi(plain)
1006 idr = ikev2.IKEv2_payload_IDr(idi.payload)
1007 self.assertEqual(idi.load, self.sa.i_id)
1008 self.assertEqual(idr.load, self.sa.r_id)
Filip Tehlar68ad6252020-10-30 05:28:11 +00001009 prop = idi[ikev2.IKEv2_payload_Proposal]
1010 c = self.sa.child_sas[0]
1011 c.ispi = prop.SPI
1012 self.update_esp_transforms(
1013 prop[ikev2.IKEv2_payload_Transform], self.sa)
Filip Tehlare7c83962020-09-23 11:20:12 +00001014
1015 def send_init_response(self):
1016 tr_attr = self.sa.ike_crypto_attr()
1017 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1018 transform_id=self.sa.ike_crypto, length=tr_attr[1],
1019 key_length=tr_attr[0]) /
1020 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1021 transform_id=self.sa.ike_integ) /
1022 ikev2.IKEv2_payload_Transform(transform_type='PRF',
1023 transform_id=self.sa.ike_prf_alg.name) /
1024 ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
1025 transform_id=self.sa.ike_dh))
1026 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
1027 trans_nb=4, trans=trans))
Filip Tehlar18107c92020-12-01 14:51:09 +00001028
1029 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
1030 if self.sa.natt:
1031 dst_address = b'\x0a\x0a\x0a\x0a'
1032 else:
1033 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1034 src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1035 dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1036
Filip Tehlare7c83962020-09-23 11:20:12 +00001037 self.sa.init_resp_packet = (
1038 ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1039 exch_type='IKE_SA_INIT', flags='Response') /
1040 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
1041 ikev2.IKEv2_payload_KE(next_payload='Nonce',
1042 group=self.sa.ike_dh,
1043 load=self.sa.my_dh_pub_key) /
Filip Tehlar18107c92020-12-01 14:51:09 +00001044 ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce,
1045 next_payload='Notify') /
1046 ikev2.IKEv2_payload_Notify(
1047 type='NAT_DETECTION_SOURCE_IP', load=src_nat,
1048 next_payload='Notify') / ikev2.IKEv2_payload_Notify(
1049 type='NAT_DETECTION_DESTINATION_IP', load=dst_nat))
Filip Tehlare7c83962020-09-23 11:20:12 +00001050
1051 ike_msg = self.create_packet(self.pg0, self.sa.init_resp_packet,
1052 self.sa.sport, self.sa.dport,
Filip Tehlar18107c92020-12-01 14:51:09 +00001053 False, self.ip6)
Filip Tehlare7c83962020-09-23 11:20:12 +00001054 self.pg_send(self.pg0, ike_msg)
1055 capture = self.pg0.get_capture(1)
1056 self.verify_sa_auth_req(capture[0])
1057
1058 def initiate_sa_init(self):
1059 self.pg0.enable_capture()
1060 self.pg_start()
1061 self.vapi.ikev2_initiate_sa_init(name=self.p.profile_name)
1062
1063 capture = self.pg0.get_capture(1)
1064 self.verify_sa_init_request(capture[0])
1065 self.send_init_response()
1066
1067 def send_auth_response(self):
1068 tr_attr = self.sa.esp_crypto_attr()
1069 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1070 transform_id=self.sa.esp_crypto, length=tr_attr[1],
1071 key_length=tr_attr[0]) /
1072 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1073 transform_id=self.sa.esp_integ) /
1074 ikev2.IKEv2_payload_Transform(
1075 transform_type='Extended Sequence Number',
1076 transform_id='No ESN') /
1077 ikev2.IKEv2_payload_Transform(
1078 transform_type='Extended Sequence Number',
1079 transform_id='ESN'))
1080
Filip Tehlar68ad6252020-10-30 05:28:11 +00001081 c = self.sa.child_sas[0]
Filip Tehlare7c83962020-09-23 11:20:12 +00001082 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
Filip Tehlar68ad6252020-10-30 05:28:11 +00001083 SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans))
Filip Tehlare7c83962020-09-23 11:20:12 +00001084
1085 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
1086 plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1087 IDtype=self.sa.id_type, load=self.sa.i_id) /
1088 ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1089 IDtype=self.sa.id_type, load=self.sa.r_id) /
1090 ikev2.IKEv2_payload_AUTH(next_payload='SA',
1091 auth_type=AuthMethod.value(self.sa.auth_method),
1092 load=self.sa.auth_data) /
1093 ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1094 ikev2.IKEv2_payload_TSi(next_payload='TSr',
1095 number_of_TSs=len(tsi),
1096 traffic_selector=tsi) /
1097 ikev2.IKEv2_payload_TSr(next_payload='Notify',
1098 number_of_TSs=len(tsr),
1099 traffic_selector=tsr) /
1100 ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
1101
1102 header = ikev2.IKEv2(
1103 init_SPI=self.sa.ispi,
1104 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1105 flags='Response', exch_type='IKE_AUTH')
1106
1107 ike_msg = self.encrypt_ike_msg(header, plain, 'IDi')
1108 packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1109 self.sa.dport, self.sa.natt, self.ip6)
1110 self.pg_send(self.pg0, packet)
1111
1112 def test_initiator(self):
1113 self.initiate_sa_init()
1114 self.sa.auth_init()
1115 self.sa.calc_child_keys()
1116 self.send_auth_response()
1117 self.verify_ike_sas()
1118
1119
1120class TemplateResponder(IkePeer):
1121 """ responder test template """
1122
Filip Tehlaredf29002020-10-10 04:39:11 +00001123 def initiate_del_sa_from_responder(self):
1124 self.pg0.enable_capture()
1125 self.pg_start()
1126 self.vapi.ikev2_initiate_del_ike_sa(
1127 ispi=int.from_bytes(self.sa.ispi, 'little'))
1128 capture = self.pg0.get_capture(1)
1129 ih = self.get_ike_header(capture[0])
1130 self.assertNotIn('Response', ih.flags)
1131 self.assertNotIn('Initiator', ih.flags)
1132 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
1133 plain = self.sa.hmac_and_decrypt(ih)
1134 d = ikev2.IKEv2_payload_Delete(plain)
1135 self.assertEqual(d.proto, 1) # proto=IKEv2
1136 self.assertEqual(ih.init_SPI, self.sa.ispi)
1137 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1138 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1139 flags='Initiator+Response',
1140 exch_type='INFORMATIONAL',
1141 id=ih.id, next_payload='Encrypted')
1142 resp = self.encrypt_ike_msg(header, b'', None)
1143 self.send_and_assert_no_replies(self.pg0, resp)
Filip Tehlare7c83962020-09-23 11:20:12 +00001144
1145 def verify_del_sa(self, packet):
1146 ih = self.get_ike_header(packet)
1147 self.assertEqual(ih.id, self.sa.msg_id)
1148 self.assertEqual(ih.exch_type, 37) # exchange informational
Filip Tehlaredf29002020-10-10 04:39:11 +00001149 self.assertIn('Response', ih.flags)
1150 self.assertNotIn('Initiator', ih.flags)
1151 self.assertEqual(ih.next_payload, 46) # Encrypted
1152 self.assertEqual(ih.init_SPI, self.sa.ispi)
1153 self.assertEqual(ih.resp_SPI, self.sa.rspi)
1154 plain = self.sa.hmac_and_decrypt(ih)
1155 self.assertEqual(plain, b'')
Filip Tehlare7c83962020-09-23 11:20:12 +00001156
Filip Tehlaredf29002020-10-10 04:39:11 +00001157 def initiate_del_sa_from_initiator(self):
Filip Tehlare7c83962020-09-23 11:20:12 +00001158 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1159 flags='Initiator', exch_type='INFORMATIONAL',
1160 id=self.sa.new_msg_id())
1161 del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
1162 ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
1163 packet = self.create_packet(self.pg0, ike_msg,
1164 self.sa.sport, self.sa.dport,
1165 self.sa.natt, self.ip6)
1166 self.pg0.add_stream(packet)
1167 self.pg0.enable_capture()
1168 self.pg_start()
1169 capture = self.pg0.get_capture(1)
1170 self.verify_del_sa(capture[0])
1171
Filip Tehlar027d8132020-12-04 17:38:11 +00001172 def send_sa_init_req(self):
Filip Tehlare7c83962020-09-23 11:20:12 +00001173 tr_attr = self.sa.ike_crypto_attr()
1174 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1175 transform_id=self.sa.ike_crypto, length=tr_attr[1],
1176 key_length=tr_attr[0]) /
1177 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1178 transform_id=self.sa.ike_integ) /
1179 ikev2.IKEv2_payload_Transform(transform_type='PRF',
1180 transform_id=self.sa.ike_prf_alg.name) /
1181 ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
1182 transform_id=self.sa.ike_dh))
1183
1184 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
1185 trans_nb=4, trans=trans))
1186
Filip Tehlar027d8132020-12-04 17:38:11 +00001187 next_payload = None if self.ip6 else 'Notify'
1188
Filip Tehlare7c83962020-09-23 11:20:12 +00001189 self.sa.init_req_packet = (
1190 ikev2.IKEv2(init_SPI=self.sa.ispi,
1191 flags='Initiator', exch_type='IKE_SA_INIT') /
1192 ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
1193 ikev2.IKEv2_payload_KE(next_payload='Nonce',
1194 group=self.sa.ike_dh,
1195 load=self.sa.my_dh_pub_key) /
Filip Tehlar027d8132020-12-04 17:38:11 +00001196 ikev2.IKEv2_payload_Nonce(next_payload=next_payload,
Filip Tehlare7c83962020-09-23 11:20:12 +00001197 load=self.sa.i_nonce))
1198
Filip Tehlar027d8132020-12-04 17:38:11 +00001199 if not self.ip6:
1200 if self.sa.i_natt:
1201 src_address = b'\x0a\x0a\x0a\x01'
1202 else:
1203 src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
Filip Tehlare7c83962020-09-23 11:20:12 +00001204
Filip Tehlar027d8132020-12-04 17:38:11 +00001205 if self.sa.r_natt:
1206 dst_address = b'\x0a\x0a\x0a\x0a'
1207 else:
1208 dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
1209
1210 src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
1211 dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
1212 nat_src_detection = ikev2.IKEv2_payload_Notify(
1213 type='NAT_DETECTION_SOURCE_IP', load=src_nat,
1214 next_payload='Notify')
1215 nat_dst_detection = ikev2.IKEv2_payload_Notify(
1216 type='NAT_DETECTION_DESTINATION_IP', load=dst_nat)
1217 self.sa.init_req_packet = (self.sa.init_req_packet /
1218 nat_src_detection /
1219 nat_dst_detection)
Filip Tehlare7c83962020-09-23 11:20:12 +00001220
1221 ike_msg = self.create_packet(self.pg0, self.sa.init_req_packet,
1222 self.sa.sport, self.sa.dport,
1223 self.sa.natt, self.ip6)
1224 self.pg0.add_stream(ike_msg)
1225 self.pg0.enable_capture()
1226 self.pg_start()
1227 capture = self.pg0.get_capture(1)
1228 self.verify_sa_init(capture[0])
1229
Filip Tehlar68ad6252020-10-30 05:28:11 +00001230 def generate_auth_payload(self, last_payload=None, is_rekey=False):
Filip Tehlare7c83962020-09-23 11:20:12 +00001231 tr_attr = self.sa.esp_crypto_attr()
Filip Tehlar68ad6252020-10-30 05:28:11 +00001232 last_payload = last_payload or 'Notify'
Filip Tehlare7c83962020-09-23 11:20:12 +00001233 trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
1234 transform_id=self.sa.esp_crypto, length=tr_attr[1],
1235 key_length=tr_attr[0]) /
1236 ikev2.IKEv2_payload_Transform(transform_type='Integrity',
1237 transform_id=self.sa.esp_integ) /
1238 ikev2.IKEv2_payload_Transform(
1239 transform_type='Extended Sequence Number',
1240 transform_id='No ESN') /
1241 ikev2.IKEv2_payload_Transform(
1242 transform_type='Extended Sequence Number',
1243 transform_id='ESN'))
1244
Filip Tehlar68ad6252020-10-30 05:28:11 +00001245 c = self.sa.child_sas[0]
Filip Tehlare7c83962020-09-23 11:20:12 +00001246 props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
Filip Tehlar68ad6252020-10-30 05:28:11 +00001247 SPIsize=4, SPI=c.ispi, trans_nb=4, trans=trans))
Filip Tehlare7c83962020-09-23 11:20:12 +00001248
1249 tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
Filip Tehlar68ad6252020-10-30 05:28:11 +00001250 plain = (ikev2.IKEv2_payload_AUTH(next_payload='SA',
Filip Tehlare7c83962020-09-23 11:20:12 +00001251 auth_type=AuthMethod.value(self.sa.auth_method),
1252 load=self.sa.auth_data) /
1253 ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
1254 ikev2.IKEv2_payload_TSi(next_payload='TSr',
Filip Tehlar68ad6252020-10-30 05:28:11 +00001255 number_of_TSs=len(tsi), traffic_selector=tsi) /
1256 ikev2.IKEv2_payload_TSr(next_payload=last_payload,
1257 number_of_TSs=len(tsr), traffic_selector=tsr))
Filip Tehlare7c83962020-09-23 11:20:12 +00001258
Filip Tehlar68ad6252020-10-30 05:28:11 +00001259 if is_rekey:
1260 first_payload = 'Nonce'
1261 plain = (ikev2.IKEv2_payload_Nonce(load=self.sa.i_nonce,
1262 next_payload='SA') / plain /
1263 ikev2.IKEv2_payload_Notify(type='REKEY_SA',
1264 proto='ESP', SPI=c.ispi))
1265 else:
1266 first_payload = 'IDi'
1267 ids = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
1268 IDtype=self.sa.id_type, load=self.sa.i_id) /
1269 ikev2.IKEv2_payload_IDr(next_payload='AUTH',
1270 IDtype=self.sa.id_type, load=self.sa.r_id))
1271 plain = ids / plain
1272 return plain, first_payload
1273
1274 def send_sa_auth(self):
1275 plain, first_payload = self.generate_auth_payload(
1276 last_payload='Notify')
1277 plain = plain / ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT')
Filip Tehlare7c83962020-09-23 11:20:12 +00001278 header = ikev2.IKEv2(
1279 init_SPI=self.sa.ispi,
1280 resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
1281 flags='Initiator', exch_type='IKE_AUTH')
1282
Filip Tehlar68ad6252020-10-30 05:28:11 +00001283 ike_msg = self.encrypt_ike_msg(header, plain, first_payload)
Filip Tehlare7c83962020-09-23 11:20:12 +00001284 packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
1285 self.sa.dport, self.sa.natt, self.ip6)
1286 self.pg0.add_stream(packet)
1287 self.pg0.enable_capture()
1288 self.pg_start()
1289 capture = self.pg0.get_capture(1)
1290 self.verify_sa_auth_resp(capture[0])
1291
1292 def verify_sa_init(self, packet):
1293 ih = self.get_ike_header(packet)
1294
1295 self.assertEqual(ih.id, self.sa.msg_id)
1296 self.assertEqual(ih.exch_type, 34)
Filip Tehlaredf29002020-10-10 04:39:11 +00001297 self.assertIn('Response', ih.flags)
Filip Tehlare7c83962020-09-23 11:20:12 +00001298 self.assertEqual(ih.init_SPI, self.sa.ispi)
1299 self.assertNotEqual(ih.resp_SPI, 0)
1300 self.sa.rspi = ih.resp_SPI
1301 try:
1302 sa = ih[ikev2.IKEv2_payload_SA]
1303 self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
1304 self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
1305 except IndexError as e:
1306 self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
1307 self.logger.error(ih.show())
1308 raise
1309 self.sa.complete_dh_data()
1310 self.sa.calc_keys()
1311 self.sa.auth_init()
1312
1313 def verify_sa_auth_resp(self, packet):
1314 ike = self.get_ike_header(packet)
1315 udp = packet[UDP]
1316 self.verify_udp(udp)
1317 self.assertEqual(ike.id, self.sa.msg_id)
1318 plain = self.sa.hmac_and_decrypt(ike)
Filip Tehlar68ad6252020-10-30 05:28:11 +00001319 idr = ikev2.IKEv2_payload_IDr(plain)
1320 prop = idr[ikev2.IKEv2_payload_Proposal]
1321 self.assertEqual(prop.SPIsize, 4)
1322 self.sa.child_sas[0].rspi = prop.SPI
Filip Tehlare7c83962020-09-23 11:20:12 +00001323 self.sa.calc_child_keys()
1324
Filip Tehlarfab5e7f2021-01-14 13:32:01 +00001325 IKE_NODE_SUFFIX = 'ip4'
1326
1327 def verify_counters(self):
1328 self.assert_counter(2, 'processed', self.IKE_NODE_SUFFIX)
Filip Tehlar68d27532021-01-25 10:09:27 +00001329 self.assert_counter(1, 'init_sa_req', self.IKE_NODE_SUFFIX)
Filip Tehlarfab5e7f2021-01-14 13:32:01 +00001330 self.assert_counter(1, 'ike_auth_req', self.IKE_NODE_SUFFIX)
1331
Filip Tehlar68d27532021-01-25 10:09:27 +00001332 r = self.vapi.ikev2_sa_dump()
1333 s = r[0].sa.stats
1334 self.assertEqual(1, s.n_sa_auth_req)
1335 self.assertEqual(1, s.n_sa_init_req)
1336
Filip Tehlar12b517b2020-04-26 18:05:05 +00001337 def test_responder(self):
Filip Tehlar027d8132020-12-04 17:38:11 +00001338 self.send_sa_init_req()
Filip Tehlar12b517b2020-04-26 18:05:05 +00001339 self.send_sa_auth()
jan_cavojskya340fe12020-07-08 09:24:12 +02001340 self.verify_ipsec_sas()
1341 self.verify_ike_sas()
Filip Tehlarfab5e7f2021-01-14 13:32:01 +00001342 self.verify_counters()
Filip Tehlar12b517b2020-04-26 18:05:05 +00001343
1344
Filip Tehlarbfeae8c2020-06-23 20:35:58 +00001345class Ikev2Params(object):
1346 def config_params(self, params={}):
Filip Tehlar4f42a712020-07-01 08:56:59 +00001347 ec = VppEnum.vl_api_ipsec_crypto_alg_t
1348 ei = VppEnum.vl_api_ipsec_integ_alg_t
1349 self.vpp_enums = {
1350 'AES-CBC-128': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
1351 'AES-CBC-192': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
1352 'AES-CBC-256': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
1353 'AES-GCM-16ICV-128': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
1354 'AES-GCM-16ICV-192': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
1355 'AES-GCM-16ICV-256': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
1356
1357 'HMAC-SHA1-96': ei.IPSEC_API_INTEG_ALG_SHA1_96,
1358 'SHA2-256-128': ei.IPSEC_API_INTEG_ALG_SHA_256_128,
1359 'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192,
1360 'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256}
1361
Filip Tehlar2008e312020-11-09 13:23:24 +00001362 dpd_disabled = True if 'dpd_disabled' not in params else\
1363 params['dpd_disabled']
1364 if dpd_disabled:
1365 self.vapi.cli('ikev2 dpd disable')
Filip Tehlaredf29002020-10-10 04:39:11 +00001366 self.del_sa_from_responder = False if 'del_sa_from_responder'\
1367 not in params else params['del_sa_from_responder']
Filip Tehlar027d8132020-12-04 17:38:11 +00001368 i_natt = False if 'i_natt' not in params else params['i_natt']
1369 r_natt = False if 'r_natt' not in params else params['r_natt']
Filip Tehlarbfeae8c2020-06-23 20:35:58 +00001370 self.p = Profile(self, 'pr1')
Filip Tehlar84962d12020-09-08 06:08:05 +00001371 self.ip6 = False if 'ip6' not in params else params['ip6']
Filip Tehlarbfeae8c2020-06-23 20:35:58 +00001372
1373 if 'auth' in params and params['auth'] == 'rsa-sig':
1374 auth_method = 'rsa-sig'
1375 work_dir = os.getenv('BR') + '/../src/plugins/ikev2/test/certs/'
1376 self.vapi.ikev2_set_local_key(
1377 key_file=work_dir + params['server-key'])
1378
1379 client_file = work_dir + params['client-cert']
1380 server_pem = open(work_dir + params['server-cert']).read()
1381 client_priv = open(work_dir + params['client-key']).read()
1382 client_priv = load_pem_private_key(str.encode(client_priv), None,
1383 default_backend())
1384 self.peer_cert = x509.load_pem_x509_certificate(
1385 str.encode(server_pem),
1386 default_backend())
1387 self.p.add_auth(method='rsa-sig', data=str.encode(client_file))
1388 auth_data = None
1389 else:
1390 auth_data = b'$3cr3tpa$$w0rd'
1391 self.p.add_auth(method='shared-key', data=auth_data)
1392 auth_method = 'shared-key'
1393 client_priv = None
1394
Filip Tehlare7c83962020-09-23 11:20:12 +00001395 is_init = True if 'is_initiator' not in params else\
1396 params['is_initiator']
1397
1398 idr = {'id_type': 'fqdn', 'data': b'vpp.home'}
1399 idi = {'id_type': 'fqdn', 'data': b'roadwarrior.example.com'}
1400 if is_init:
1401 self.p.add_local_id(**idr)
1402 self.p.add_remote_id(**idi)
1403 else:
1404 self.p.add_local_id(**idi)
1405 self.p.add_remote_id(**idr)
1406
Filip Tehlar84962d12020-09-08 06:08:05 +00001407 loc_ts = {'start_addr': '10.10.10.0', 'end_addr': '10.10.10.255'} if\
1408 'loc_ts' not in params else params['loc_ts']
1409 rem_ts = {'start_addr': '10.0.0.0', 'end_addr': '10.0.0.255'} if\
1410 'rem_ts' not in params else params['rem_ts']
1411 self.p.add_local_ts(**loc_ts)
1412 self.p.add_remote_ts(**rem_ts)
Filip Tehlare7c83962020-09-23 11:20:12 +00001413 if 'responder' in params:
1414 self.p.add_responder(params['responder'])
1415 if 'ike_transforms' in params:
1416 self.p.add_ike_transforms(params['ike_transforms'])
1417 if 'esp_transforms' in params:
1418 self.p.add_esp_transforms(params['esp_transforms'])
Filip Tehlarbfeae8c2020-06-23 20:35:58 +00001419
Filip Tehlar67b8a7f2020-11-06 11:00:42 +00001420 udp_encap = False if 'udp_encap' not in params else\
1421 params['udp_encap']
1422 if udp_encap:
1423 self.p.set_udp_encap(True)
1424
Filip Tehlaraf2cc642021-02-22 16:15:51 +00001425 if 'responder_hostname' in params:
1426 hn = params['responder_hostname']
1427 self.p.add_responder_hostname(hn)
1428
1429 # configure static dns record
1430 self.vapi.dns_name_server_add_del(
1431 is_ip6=0, is_add=1,
1432 server_address=IPv4Address(u'8.8.8.8').packed)
1433 self.vapi.dns_enable_disable(enable=1)
1434
1435 cmd = "dns cache add {} {}".format(hn['hostname'],
1436 self.pg0.remote_ip4)
1437 self.vapi.cli(cmd)
1438
Filip Tehlare7c83962020-09-23 11:20:12 +00001439 self.sa = IKEv2SA(self, i_id=idi['data'], r_id=idr['data'],
1440 is_initiator=is_init,
Filip Tehlar027d8132020-12-04 17:38:11 +00001441 id_type=self.p.local_id['id_type'],
1442 i_natt=i_natt, r_natt=r_natt,
Filip Tehlarbfeae8c2020-06-23 20:35:58 +00001443 priv_key=client_priv, auth_method=auth_method,
Filip Tehlar67b8a7f2020-11-06 11:00:42 +00001444 auth_data=auth_data, udp_encap=udp_encap,
Filip Tehlarbfeae8c2020-06-23 20:35:58 +00001445 local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
Filip Tehlar68ad6252020-10-30 05:28:11 +00001446 if is_init:
1447 ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\
1448 params['ike-crypto']
1449 ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\
1450 params['ike-integ']
1451 ike_dh = '2048MODPgr' if 'ike-dh' not in params else\
1452 params['ike-dh']
Filip Tehlar4f42a712020-07-01 08:56:59 +00001453
Filip Tehlar68ad6252020-10-30 05:28:11 +00001454 esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\
1455 params['esp-crypto']
1456 esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\
1457 params['esp-integ']
Filip Tehlar4f42a712020-07-01 08:56:59 +00001458
Filip Tehlar68ad6252020-10-30 05:28:11 +00001459 self.sa.set_ike_props(
1460 crypto=ike_crypto[0], crypto_key_len=ike_crypto[1],
1461 integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh)
1462 self.sa.set_esp_props(
1463 crypto=esp_crypto[0], crypto_key_len=esp_crypto[1],
1464 integ=esp_integ)
Filip Tehlarbfeae8c2020-06-23 20:35:58 +00001465
1466
Filip Tehlar459d17b2020-07-06 15:40:08 +00001467class TestApi(VppTestCase):
1468 """ Test IKEV2 API """
1469 @classmethod
1470 def setUpClass(cls):
1471 super(TestApi, cls).setUpClass()
1472
1473 @classmethod
1474 def tearDownClass(cls):
1475 super(TestApi, cls).tearDownClass()
1476
1477 def tearDown(self):
1478 super(TestApi, self).tearDown()
1479 self.p1.remove_vpp_config()
1480 self.p2.remove_vpp_config()
1481 r = self.vapi.ikev2_profile_dump()
1482 self.assertEqual(len(r), 0)
1483
1484 def configure_profile(self, cfg):
1485 p = Profile(self, cfg['name'])
1486 p.add_local_id(id_type=cfg['loc_id'][0], data=cfg['loc_id'][1])
1487 p.add_remote_id(id_type=cfg['rem_id'][0], data=cfg['rem_id'][1])
1488 p.add_local_ts(**cfg['loc_ts'])
1489 p.add_remote_ts(**cfg['rem_ts'])
1490 p.add_responder(cfg['responder'])
1491 p.add_ike_transforms(cfg['ike_ts'])
1492 p.add_esp_transforms(cfg['esp_ts'])
1493 p.add_auth(**cfg['auth'])
1494 p.set_udp_encap(cfg['udp_encap'])
1495 p.set_ipsec_over_udp_port(cfg['ipsec_over_udp_port'])
1496 if 'lifetime_data' in cfg:
1497 p.set_lifetime_data(cfg['lifetime_data'])
1498 if 'tun_itf' in cfg:
1499 p.set_tunnel_interface(cfg['tun_itf'])
Filip Tehlard7fc12f2020-10-30 04:47:44 +00001500 if 'natt_disabled' in cfg and cfg['natt_disabled']:
1501 p.disable_natt()
Filip Tehlar459d17b2020-07-06 15:40:08 +00001502 p.add_vpp_config()
1503 return p
1504
1505 def test_profile_api(self):
1506 """ test profile dump API """
Filip Tehlar84962d12020-09-08 06:08:05 +00001507 loc_ts4 = {
Filip Tehlar459d17b2020-07-06 15:40:08 +00001508 'proto': 8,
1509 'start_port': 1,
1510 'end_port': 19,
1511 'start_addr': '3.3.3.2',
1512 'end_addr': '3.3.3.3',
1513 }
Filip Tehlar84962d12020-09-08 06:08:05 +00001514 rem_ts4 = {
Filip Tehlar459d17b2020-07-06 15:40:08 +00001515 'proto': 9,
1516 'start_port': 10,
1517 'end_port': 119,
1518 'start_addr': '4.5.76.80',
1519 'end_addr': '2.3.4.6',
1520 }
1521
Filip Tehlar84962d12020-09-08 06:08:05 +00001522 loc_ts6 = {
1523 'proto': 8,
1524 'start_port': 1,
1525 'end_port': 19,
1526 'start_addr': 'ab::1',
1527 'end_addr': 'ab::4',
1528 }
1529 rem_ts6 = {
1530 'proto': 9,
1531 'start_port': 10,
1532 'end_port': 119,
1533 'start_addr': 'cd::12',
1534 'end_addr': 'cd::13',
1535 }
1536
Filip Tehlar459d17b2020-07-06 15:40:08 +00001537 conf = {
1538 'p1': {
1539 'name': 'p1',
Filip Tehlard7fc12f2020-10-30 04:47:44 +00001540 'natt_disabled': True,
Filip Tehlar459d17b2020-07-06 15:40:08 +00001541 'loc_id': ('fqdn', b'vpp.home'),
1542 'rem_id': ('fqdn', b'roadwarrior.example.com'),
Filip Tehlar84962d12020-09-08 06:08:05 +00001543 'loc_ts': loc_ts4,
1544 'rem_ts': rem_ts4,
1545 'responder': {'sw_if_index': 0, 'addr': '5.6.7.8'},
Filip Tehlar459d17b2020-07-06 15:40:08 +00001546 'ike_ts': {
1547 'crypto_alg': 20,
1548 'crypto_key_size': 32,
1549 'integ_alg': 1,
1550 'dh_group': 1},
1551 'esp_ts': {
1552 'crypto_alg': 13,
1553 'crypto_key_size': 24,
1554 'integ_alg': 2},
1555 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1556 'udp_encap': True,
1557 'ipsec_over_udp_port': 4501,
1558 'lifetime_data': {
1559 'lifetime': 123,
1560 'lifetime_maxdata': 20192,
1561 'lifetime_jitter': 9,
1562 'handover': 132},
1563 },
1564 'p2': {
1565 'name': 'p2',
1566 'loc_id': ('ip4-addr', b'192.168.2.1'),
Filip Tehlar84962d12020-09-08 06:08:05 +00001567 'rem_id': ('ip6-addr', b'abcd::1'),
1568 'loc_ts': loc_ts6,
1569 'rem_ts': rem_ts6,
1570 'responder': {'sw_if_index': 4, 'addr': 'def::10'},
Filip Tehlar459d17b2020-07-06 15:40:08 +00001571 'ike_ts': {
1572 'crypto_alg': 12,
1573 'crypto_key_size': 16,
1574 'integ_alg': 3,
1575 'dh_group': 3},
1576 'esp_ts': {
1577 'crypto_alg': 9,
1578 'crypto_key_size': 24,
1579 'integ_alg': 4},
1580 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
1581 'udp_encap': False,
1582 'ipsec_over_udp_port': 4600,
1583 'tun_itf': 0}
1584 }
1585 self.p1 = self.configure_profile(conf['p1'])
1586 self.p2 = self.configure_profile(conf['p2'])
1587
1588 r = self.vapi.ikev2_profile_dump()
1589 self.assertEqual(len(r), 2)
1590 self.verify_profile(r[0].profile, conf['p1'])
1591 self.verify_profile(r[1].profile, conf['p2'])
1592
1593 def verify_id(self, api_id, cfg_id):
1594 self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
1595 self.assertEqual(bytes(api_id.data, 'ascii'), cfg_id[1])
1596
1597 def verify_ts(self, api_ts, cfg_ts):
1598 self.assertEqual(api_ts.protocol_id, cfg_ts['proto'])
1599 self.assertEqual(api_ts.start_port, cfg_ts['start_port'])
1600 self.assertEqual(api_ts.end_port, cfg_ts['end_port'])
Filip Tehlar84962d12020-09-08 06:08:05 +00001601 self.assertEqual(api_ts.start_addr,
1602 ip_address(text_type(cfg_ts['start_addr'])))
1603 self.assertEqual(api_ts.end_addr,
1604 ip_address(text_type(cfg_ts['end_addr'])))
Filip Tehlar459d17b2020-07-06 15:40:08 +00001605
1606 def verify_responder(self, api_r, cfg_r):
1607 self.assertEqual(api_r.sw_if_index, cfg_r['sw_if_index'])
Filip Tehlar84962d12020-09-08 06:08:05 +00001608 self.assertEqual(api_r.addr, ip_address(cfg_r['addr']))
Filip Tehlar459d17b2020-07-06 15:40:08 +00001609
1610 def verify_transforms(self, api_ts, cfg_ts):
1611 self.assertEqual(api_ts.crypto_alg, cfg_ts['crypto_alg'])
1612 self.assertEqual(api_ts.crypto_key_size, cfg_ts['crypto_key_size'])
1613 self.assertEqual(api_ts.integ_alg, cfg_ts['integ_alg'])
1614
1615 def verify_ike_transforms(self, api_ts, cfg_ts):
1616 self.verify_transforms(api_ts, cfg_ts)
1617 self.assertEqual(api_ts.dh_group, cfg_ts['dh_group'])
1618
1619 def verify_esp_transforms(self, api_ts, cfg_ts):
1620 self.verify_transforms(api_ts, cfg_ts)
1621
1622 def verify_auth(self, api_auth, cfg_auth):
1623 self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth['method']))
1624 self.assertEqual(api_auth.data, cfg_auth['data'])
1625 self.assertEqual(api_auth.data_len, len(cfg_auth['data']))
1626
1627 def verify_lifetime_data(self, p, ld):
1628 self.assertEqual(p.lifetime, ld['lifetime'])
1629 self.assertEqual(p.lifetime_maxdata, ld['lifetime_maxdata'])
1630 self.assertEqual(p.lifetime_jitter, ld['lifetime_jitter'])
1631 self.assertEqual(p.handover, ld['handover'])
1632
1633 def verify_profile(self, ap, cp):
1634 self.assertEqual(ap.name, cp['name'])
1635 self.assertEqual(ap.udp_encap, cp['udp_encap'])
1636 self.verify_id(ap.loc_id, cp['loc_id'])
1637 self.verify_id(ap.rem_id, cp['rem_id'])
1638 self.verify_ts(ap.loc_ts, cp['loc_ts'])
1639 self.verify_ts(ap.rem_ts, cp['rem_ts'])
1640 self.verify_responder(ap.responder, cp['responder'])
1641 self.verify_ike_transforms(ap.ike_ts, cp['ike_ts'])
1642 self.verify_esp_transforms(ap.esp_ts, cp['esp_ts'])
1643 self.verify_auth(ap.auth, cp['auth'])
Filip Tehlard7fc12f2020-10-30 04:47:44 +00001644 natt_dis = False if 'natt_disabled' not in cp else cp['natt_disabled']
1645 self.assertTrue(natt_dis == ap.natt_disabled)
1646
Filip Tehlar459d17b2020-07-06 15:40:08 +00001647 if 'lifetime_data' in cp:
1648 self.verify_lifetime_data(ap, cp['lifetime_data'])
1649 self.assertEqual(ap.ipsec_over_udp_port, cp['ipsec_over_udp_port'])
1650 if 'tun_itf' in cp:
1651 self.assertEqual(ap.tun_itf, cp['tun_itf'])
1652 else:
1653 self.assertEqual(ap.tun_itf, 0xffffffff)
1654
1655
Andrew Yourtchenko8dc0d482021-01-29 13:17:19 +00001656@tag_fixme_vpp_workers
Filip Tehlar027d8132020-12-04 17:38:11 +00001657class TestResponderBehindNAT(TemplateResponder, Ikev2Params):
1658 """ test responder - responder behind NAT """
1659
Filip Tehlarfab5e7f2021-01-14 13:32:01 +00001660 IKE_NODE_SUFFIX = 'ip4-natt'
1661
Filip Tehlar027d8132020-12-04 17:38:11 +00001662 def config_tc(self):
1663 self.config_params({'r_natt': True})
1664
1665
Andrew Yourtchenko8dc0d482021-01-29 13:17:19 +00001666@tag_fixme_vpp_workers
Filip Tehlar18107c92020-12-01 14:51:09 +00001667class TestInitiatorNATT(TemplateInitiator, Ikev2Params):
1668 """ test ikev2 initiator - NAT traversal (intitiator behind NAT) """
1669
1670 def config_tc(self):
1671 self.config_params({
Filip Tehlar027d8132020-12-04 17:38:11 +00001672 'i_natt': True,
Filip Tehlar18107c92020-12-01 14:51:09 +00001673 'is_initiator': False, # seen from test case perspective
1674 # thus vpp is initiator
1675 'responder': {'sw_if_index': self.pg0.sw_if_index,
1676 'addr': self.pg0.remote_ip4},
1677 'ike-crypto': ('AES-GCM-16ICV', 32),
1678 'ike-integ': 'NULL',
1679 'ike-dh': '3072MODPgr',
1680 'ike_transforms': {
1681 'crypto_alg': 20, # "aes-gcm-16"
1682 'crypto_key_size': 256,
1683 'dh_group': 15, # "modp-3072"
1684 },
1685 'esp_transforms': {
1686 'crypto_alg': 12, # "aes-cbc"
1687 'crypto_key_size': 256,
1688 # "hmac-sha2-256-128"
1689 'integ_alg': 12}})
1690
1691
Andrew Yourtchenko8dc0d482021-01-29 13:17:19 +00001692@tag_fixme_vpp_workers
Filip Tehlare7c83962020-09-23 11:20:12 +00001693class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
1694 """ test ikev2 initiator - pre shared key auth """
Filip Tehlaredf29002020-10-10 04:39:11 +00001695
Filip Tehlare7c83962020-09-23 11:20:12 +00001696 def config_tc(self):
1697 self.config_params({
1698 'is_initiator': False, # seen from test case perspective
1699 # thus vpp is initiator
Filip Tehlare7c83962020-09-23 11:20:12 +00001700 'ike-crypto': ('AES-GCM-16ICV', 32),
1701 'ike-integ': 'NULL',
1702 'ike-dh': '3072MODPgr',
1703 'ike_transforms': {
1704 'crypto_alg': 20, # "aes-gcm-16"
1705 'crypto_key_size': 256,
1706 'dh_group': 15, # "modp-3072"
1707 },
1708 'esp_transforms': {
1709 'crypto_alg': 12, # "aes-cbc"
1710 'crypto_key_size': 256,
1711 # "hmac-sha2-256-128"
Filip Tehlaraf2cc642021-02-22 16:15:51 +00001712 'integ_alg': 12},
1713 'responder_hostname': {'hostname': 'vpp.responder.org',
1714 'sw_if_index': self.pg0.sw_if_index}})
Filip Tehlare7c83962020-09-23 11:20:12 +00001715
1716
Andrew Yourtchenko8dc0d482021-01-29 13:17:19 +00001717@tag_fixme_vpp_workers
Filip Tehlar38340fa2020-11-19 21:34:48 +00001718class TestInitiatorRequestWindowSize(TestInitiatorPsk):
1719 """ test initiator - request window size (1) """
1720
1721 def rekey_respond(self, req, update_child_sa_data):
1722 ih = self.get_ike_header(req)
1723 plain = self.sa.hmac_and_decrypt(ih)
1724 sa = ikev2.IKEv2_payload_SA(plain)
1725 if update_child_sa_data:
1726 prop = sa[ikev2.IKEv2_payload_Proposal]
1727 self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1728 self.sa.r_nonce = self.sa.i_nonce
1729 self.sa.child_sas[0].ispi = prop.SPI
1730 self.sa.child_sas[0].rspi = prop.SPI
1731 self.sa.calc_child_keys()
1732
1733 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1734 flags='Response', exch_type=36,
1735 id=ih.id, next_payload='Encrypted')
1736 resp = self.encrypt_ike_msg(header, sa, 'SA')
1737 packet = self.create_packet(self.pg0, resp, self.sa.sport,
1738 self.sa.dport, self.sa.natt, self.ip6)
1739 self.send_and_assert_no_replies(self.pg0, packet)
1740
1741 def test_initiator(self):
1742 super(TestInitiatorRequestWindowSize, self).test_initiator()
1743 self.pg0.enable_capture()
1744 self.pg_start()
1745 ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
1746 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1747 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1748 capture = self.pg0.get_capture(2)
1749
1750 # reply in reverse order
1751 self.rekey_respond(capture[1], True)
1752 self.rekey_respond(capture[0], False)
1753
1754 # verify that only the second request was accepted
1755 self.verify_ike_sas()
1756 self.verify_ipsec_sas(is_rekey=True)
1757
1758
Andrew Yourtchenko8dc0d482021-01-29 13:17:19 +00001759@tag_fixme_vpp_workers
Filip Tehlar68ad6252020-10-30 05:28:11 +00001760class TestInitiatorRekey(TestInitiatorPsk):
1761 """ test ikev2 initiator - rekey """
1762
1763 def rekey_from_initiator(self):
1764 ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
1765 self.pg0.enable_capture()
1766 self.pg_start()
1767 self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
1768 capture = self.pg0.get_capture(1)
1769 ih = self.get_ike_header(capture[0])
1770 self.assertEqual(ih.exch_type, 36) # CHILD_SA
1771 self.assertNotIn('Response', ih.flags)
1772 self.assertIn('Initiator', ih.flags)
1773 plain = self.sa.hmac_and_decrypt(ih)
1774 sa = ikev2.IKEv2_payload_SA(plain)
1775 prop = sa[ikev2.IKEv2_payload_Proposal]
Filip Tehlar68ad6252020-10-30 05:28:11 +00001776 self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1777 self.sa.r_nonce = self.sa.i_nonce
1778 # update new responder SPI
1779 self.sa.child_sas[0].ispi = prop.SPI
1780 self.sa.child_sas[0].rspi = prop.SPI
1781 self.sa.calc_child_keys()
1782 header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
1783 flags='Response', exch_type=36,
1784 id=ih.id, next_payload='Encrypted')
1785 resp = self.encrypt_ike_msg(header, sa, 'SA')
1786 packet = self.create_packet(self.pg0, resp, self.sa.sport,
1787 self.sa.dport, self.sa.natt, self.ip6)
1788 self.send_and_assert_no_replies(self.pg0, packet)
1789
1790 def test_initiator(self):
1791 super(TestInitiatorRekey, self).test_initiator()
1792 self.rekey_from_initiator()
1793 self.verify_ike_sas()
1794 self.verify_ipsec_sas(is_rekey=True)
1795
1796
Andrew Yourtchenko8dc0d482021-01-29 13:17:19 +00001797@tag_fixme_vpp_workers
Filip Tehlaredf29002020-10-10 04:39:11 +00001798class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
1799 """ test ikev2 initiator - delete IKE SA from responder """
1800
1801 def config_tc(self):
1802 self.config_params({
1803 'del_sa_from_responder': True,
1804 'is_initiator': False, # seen from test case perspective
1805 # thus vpp is initiator
1806 'responder': {'sw_if_index': self.pg0.sw_if_index,
1807 'addr': self.pg0.remote_ip4},
1808 'ike-crypto': ('AES-GCM-16ICV', 32),
1809 'ike-integ': 'NULL',
1810 'ike-dh': '3072MODPgr',
1811 'ike_transforms': {
1812 'crypto_alg': 20, # "aes-gcm-16"
1813 'crypto_key_size': 256,
1814 'dh_group': 15, # "modp-3072"
1815 },
1816 'esp_transforms': {
1817 'crypto_alg': 12, # "aes-cbc"
1818 'crypto_key_size': 256,
1819 # "hmac-sha2-256-128"
1820 'integ_alg': 12}})
1821
1822
Andrew Yourtchenko8dc0d482021-01-29 13:17:19 +00001823@tag_fixme_vpp_workers
Filip Tehlar027d8132020-12-04 17:38:11 +00001824class TestResponderInitBehindNATT(TemplateResponder, Ikev2Params):
1825 """ test ikev2 responder - initiator behind NAT """
Filip Tehlarfab5e7f2021-01-14 13:32:01 +00001826
1827 IKE_NODE_SUFFIX = 'ip4-natt'
1828
Filip Tehlarbfeae8c2020-06-23 20:35:58 +00001829 def config_tc(self):
1830 self.config_params(
Filip Tehlar027d8132020-12-04 17:38:11 +00001831 {'i_natt': True})
Filip Tehlarbfeae8c2020-06-23 20:35:58 +00001832
1833
Andrew Yourtchenko8dc0d482021-01-29 13:17:19 +00001834@tag_fixme_vpp_workers
Filip Tehlarbfeae8c2020-06-23 20:35:58 +00001835class TestResponderPsk(TemplateResponder, Ikev2Params):
1836 """ test ikev2 responder - pre shared key auth """
1837 def config_tc(self):
1838 self.config_params()
1839
1840
Andrew Yourtchenko8dc0d482021-01-29 13:17:19 +00001841@tag_fixme_vpp_workers
Filip Tehlar2008e312020-11-09 13:23:24 +00001842class TestResponderDpd(TestResponderPsk):
1843 """
1844 Dead peer detection test
1845 """
1846 def config_tc(self):
1847 self.config_params({'dpd_disabled': False})
1848
1849 def tearDown(self):
1850 pass
1851
1852 def test_responder(self):
1853 self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
1854 super(TestResponderDpd, self).test_responder()
1855 self.pg0.enable_capture()
1856 self.pg_start()
1857 # capture empty request but don't reply
1858 capture = self.pg0.get_capture(expected_count=1, timeout=5)
1859 ih = self.get_ike_header(capture[0])
1860 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
1861 plain = self.sa.hmac_and_decrypt(ih)
1862 self.assertEqual(plain, b'')
1863 # wait for SA expiration
1864 time.sleep(3)
1865 ike_sas = self.vapi.ikev2_sa_dump()
1866 self.assertEqual(len(ike_sas), 0)
1867 ipsec_sas = self.vapi.ipsec_sa_dump()
1868 self.assertEqual(len(ipsec_sas), 0)
1869
1870
Andrew Yourtchenko8dc0d482021-01-29 13:17:19 +00001871@tag_fixme_vpp_workers
Filip Tehlar68ad6252020-10-30 05:28:11 +00001872class TestResponderRekey(TestResponderPsk):
1873 """ test ikev2 responder - rekey """
1874
1875 def rekey_from_initiator(self):
Filip Tehlar38340fa2020-11-19 21:34:48 +00001876 packet = self.create_rekey_request()
Filip Tehlar68ad6252020-10-30 05:28:11 +00001877 self.pg0.add_stream(packet)
1878 self.pg0.enable_capture()
1879 self.pg_start()
1880 capture = self.pg0.get_capture(1)
1881 ih = self.get_ike_header(capture[0])
1882 plain = self.sa.hmac_and_decrypt(ih)
1883 sa = ikev2.IKEv2_payload_SA(plain)
1884 prop = sa[ikev2.IKEv2_payload_Proposal]
Filip Tehlar68ad6252020-10-30 05:28:11 +00001885 self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
1886 # update new responder SPI
1887 self.sa.child_sas[0].rspi = prop.SPI
1888
1889 def test_responder(self):
1890 super(TestResponderRekey, self).test_responder()
1891 self.rekey_from_initiator()
1892 self.sa.calc_child_keys()
1893 self.verify_ike_sas()
1894 self.verify_ipsec_sas(is_rekey=True)
Filip Tehlar68d27532021-01-25 10:09:27 +00001895 self.assert_counter(1, 'rekey_req', 'ip4')
1896 r = self.vapi.ikev2_sa_dump()
1897 self.assertEqual(r[0].sa.stats.n_rekey_req, 1)
Filip Tehlar68ad6252020-10-30 05:28:11 +00001898
1899
Filip Tehlard28196f2021-01-27 18:08:21 +00001900class TestResponderVrf(TestResponderPsk, Ikev2Params):
1901 """ test ikev2 responder - non-default table id """
1902
1903 @classmethod
1904 def setUpClass(cls):
1905 import scapy.contrib.ikev2 as _ikev2
1906 globals()['ikev2'] = _ikev2
1907 super(IkePeer, cls).setUpClass()
1908 cls.create_pg_interfaces(range(1))
1909 cls.vapi.cli("ip table add 1")
1910 cls.vapi.cli("set interface ip table pg0 1")
1911 for i in cls.pg_interfaces:
1912 i.admin_up()
1913 i.config_ip4()
1914 i.resolve_arp()
1915 i.config_ip6()
1916 i.resolve_ndp()
1917
1918 def config_tc(self):
1919 self.config_params({'dpd_disabled': False})
1920
1921 def test_responder(self):
1922 self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
1923 super(TestResponderVrf, self).test_responder()
1924 self.pg0.enable_capture()
1925 self.pg_start()
1926 capture = self.pg0.get_capture(expected_count=1, timeout=5)
1927 ih = self.get_ike_header(capture[0])
1928 self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
1929 plain = self.sa.hmac_and_decrypt(ih)
1930 self.assertEqual(plain, b'')
1931
1932
Andrew Yourtchenko8dc0d482021-01-29 13:17:19 +00001933@tag_fixme_vpp_workers
Filip Tehlarbfeae8c2020-06-23 20:35:58 +00001934class TestResponderRsaSign(TemplateResponder, Ikev2Params):
1935 """ test ikev2 responder - cert based auth """
1936 def config_tc(self):
1937 self.config_params({
Filip Tehlar67b8a7f2020-11-06 11:00:42 +00001938 'udp_encap': True,
Filip Tehlarbfeae8c2020-06-23 20:35:58 +00001939 'auth': 'rsa-sig',
1940 'server-key': 'server-key.pem',
1941 'client-key': 'client-key.pem',
1942 'client-cert': 'client-cert.pem',
1943 'server-cert': 'server-cert.pem'})
1944
Filip Tehlar4f42a712020-07-01 08:56:59 +00001945
Andrew Yourtchenko8dc0d482021-01-29 13:17:19 +00001946@tag_fixme_vpp_workers
Filip Tehlar4f42a712020-07-01 08:56:59 +00001947class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\
1948 (TemplateResponder, Ikev2Params):
1949 """
1950 IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
1951 """
1952 def config_tc(self):
1953 self.config_params({
1954 'ike-crypto': ('AES-CBC', 16),
1955 'ike-integ': 'SHA2-256-128',
1956 'esp-crypto': ('AES-CBC', 24),
1957 'esp-integ': 'SHA2-384-192',
1958 'ike-dh': '2048MODPgr'})
1959
1960
Andrew Yourtchenko8dc0d482021-01-29 13:17:19 +00001961@tag_fixme_vpp_workers
Filip Tehlar4f42a712020-07-01 08:56:59 +00001962class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\
1963 (TemplateResponder, Ikev2Params):
Filip Tehlarfab5e7f2021-01-14 13:32:01 +00001964
Filip Tehlar4f42a712020-07-01 08:56:59 +00001965 """
1966 IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
1967 """
1968 def config_tc(self):
1969 self.config_params({
1970 'ike-crypto': ('AES-CBC', 32),
1971 'ike-integ': 'SHA2-256-128',
1972 'esp-crypto': ('AES-GCM-16ICV', 32),
1973 'esp-integ': 'NULL',
1974 'ike-dh': '3072MODPgr'})
1975
1976
Andrew Yourtchenko8dc0d482021-01-29 13:17:19 +00001977@tag_fixme_vpp_workers
Filip Tehlara7b963d2020-07-08 13:25:34 +00001978class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
1979 """
1980 IKE:AES_GCM_16_256
1981 """
Filip Tehlarfab5e7f2021-01-14 13:32:01 +00001982
1983 IKE_NODE_SUFFIX = 'ip6'
1984
Filip Tehlara7b963d2020-07-08 13:25:34 +00001985 def config_tc(self):
1986 self.config_params({
Filip Tehlaredf29002020-10-10 04:39:11 +00001987 'del_sa_from_responder': True,
Filip Tehlar84962d12020-09-08 06:08:05 +00001988 'ip6': True,
1989 'natt': True,
Filip Tehlara7b963d2020-07-08 13:25:34 +00001990 'ike-crypto': ('AES-GCM-16ICV', 32),
1991 'ike-integ': 'NULL',
Filip Tehlar84962d12020-09-08 06:08:05 +00001992 'ike-dh': '2048MODPgr',
1993 'loc_ts': {'start_addr': 'ab:cd::0',
1994 'end_addr': 'ab:cd::10'},
1995 'rem_ts': {'start_addr': '11::0',
1996 'end_addr': '11::100'}})
Filip Tehlara7b963d2020-07-08 13:25:34 +00001997
1998
Andrew Yourtchenko8dc0d482021-01-29 13:17:19 +00001999@tag_fixme_vpp_workers
Filip Tehlar2008e312020-11-09 13:23:24 +00002000class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
2001 """
2002 Test for keep alive messages
2003 """
2004
2005 def send_empty_req_from_responder(self):
Filip Tehlar38340fa2020-11-19 21:34:48 +00002006 packet = self.create_empty_request()
Filip Tehlar2008e312020-11-09 13:23:24 +00002007 self.pg0.add_stream(packet)
2008 self.pg0.enable_capture()
2009 self.pg_start()
2010 capture = self.pg0.get_capture(1)
2011 ih = self.get_ike_header(capture[0])
2012 self.assertEqual(ih.id, self.sa.msg_id)
2013 plain = self.sa.hmac_and_decrypt(ih)
2014 self.assertEqual(plain, b'')
Filip Tehlarfab5e7f2021-01-14 13:32:01 +00002015 self.assert_counter(1, 'keepalive', 'ip4')
Filip Tehlar68d27532021-01-25 10:09:27 +00002016 r = self.vapi.ikev2_sa_dump()
2017 self.assertEqual(1, r[0].sa.stats.n_keepalives)
Filip Tehlar2008e312020-11-09 13:23:24 +00002018
2019 def test_initiator(self):
2020 super(TestInitiatorKeepaliveMsg, self).test_initiator()
2021 self.send_empty_req_from_responder()
2022
2023
Filip Tehlar558607d2020-07-16 07:25:56 +00002024class TestMalformedMessages(TemplateResponder, Ikev2Params):
2025 """ malformed packet test """
2026
2027 def tearDown(self):
2028 pass
2029
2030 def config_tc(self):
2031 self.config_params()
2032
Filip Tehlar558607d2020-07-16 07:25:56 +00002033 def create_ike_init_msg(self, length=None, payload=None):
2034 msg = ikev2.IKEv2(length=length, init_SPI='\x11' * 8,
2035 flags='Initiator', exch_type='IKE_SA_INIT')
2036 if payload is not None:
2037 msg /= payload
2038 return self.create_packet(self.pg0, msg, self.sa.sport,
2039 self.sa.dport)
2040
2041 def verify_bad_packet_length(self):
2042 ike_msg = self.create_ike_init_msg(length=0xdead)
2043 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
Filip Tehlarfab5e7f2021-01-14 13:32:01 +00002044 self.assert_counter(self.pkt_count, 'bad_length')
Filip Tehlar558607d2020-07-16 07:25:56 +00002045
2046 def verify_bad_sa_payload_length(self):
2047 p = ikev2.IKEv2_payload_SA(length=0xdead)
2048 ike_msg = self.create_ike_init_msg(payload=p)
2049 self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
Filip Tehlarfab5e7f2021-01-14 13:32:01 +00002050 self.assert_counter(self.pkt_count, 'malformed_packet')
Filip Tehlar558607d2020-07-16 07:25:56 +00002051
2052 def test_responder(self):
2053 self.pkt_count = 254
2054 self.verify_bad_packet_length()
2055 self.verify_bad_sa_payload_length()
2056
2057
Filip Tehlar12b517b2020-04-26 18:05:05 +00002058if __name__ == '__main__':
2059 unittest.main(testRunner=VppTestRunner)