blob: 6e246f681a9cd91ce1ce0a4b4c9ccfe57e6892f0 [file] [log] [blame]
Klement Sekerab4d30532018-11-08 13:00:02 +01001import unittest
2
3from framework import VppTestCase, VppTestRunner
Neale Ranns17dcec02019-01-09 21:22:20 -08004from template_ipsec import TemplateIpsec, IPsecIPv4Params
5from vpp_papi import VppEnum
Klement Sekerab4d30532018-11-08 13:00:02 +01006
Maxime Peim1271e3a2023-03-20 14:13:56 +00007from vpp_ipsec import VppIpsecSA
8
Klement Sekerab4d30532018-11-08 13:00:02 +01009
10class IpsecApiTestCase(VppTestCase):
Klement Sekerad9b0c6f2022-04-26 19:02:15 +020011 """IPSec API tests"""
Klement Sekerab4d30532018-11-08 13:00:02 +010012
Maxime Peim1271e3a2023-03-20 14:13:56 +000013 vpp_worker_count = 2
14
Paul Vinciguerra7f9b7f92019-03-12 19:23:27 -070015 @classmethod
16 def setUpClass(cls):
17 super(IpsecApiTestCase, cls).setUpClass()
18
19 @classmethod
20 def tearDownClass(cls):
21 super(IpsecApiTestCase, cls).tearDownClass()
22
Neale Ranns8e4a89b2019-01-23 08:16:17 -080023 def setUp(self):
24 super(IpsecApiTestCase, self).setUp()
25 self.create_pg_interfaces([0])
26 self.pg0.config_ip4()
27 self.pg0.admin_up()
28
Klement Sekerad9b0c6f2022-04-26 19:02:15 +020029 self.vpp_esp_protocol = VppEnum.vl_api_ipsec_proto_t.IPSEC_API_PROTO_ESP
30 self.vpp_ah_protocol = VppEnum.vl_api_ipsec_proto_t.IPSEC_API_PROTO_AH
Neale Ranns8e4a89b2019-01-23 08:16:17 -080031 self.ipv4_params = IPsecIPv4Params()
32
33 def tearDown(self):
34 self.pg0.unconfig_ip4()
35 self.pg0.admin_down()
36 super(IpsecApiTestCase, self).tearDown()
Klement Sekerab4d30532018-11-08 13:00:02 +010037
38 def test_backend_dump(self):
Klement Sekerad9b0c6f2022-04-26 19:02:15 +020039 """backend dump"""
Klement Sekerab4d30532018-11-08 13:00:02 +010040 d = self.vapi.ipsec_backend_dump()
41 self.assert_equal(len(d), 2, "number of ipsec backends in dump")
Klement Sekerad9b0c6f2022-04-26 19:02:15 +020042 self.assert_equal(
43 d[0].protocol, self.vpp_ah_protocol, "ipsec protocol in dump entry"
44 )
Klement Sekerab4d30532018-11-08 13:00:02 +010045 self.assert_equal(d[0].index, 0, "index in dump entry")
46 self.assert_equal(d[0].active, 1, "active flag in dump entry")
Klement Sekerad9b0c6f2022-04-26 19:02:15 +020047 self.assert_equal(
48 d[1].protocol, self.vpp_esp_protocol, "ipsec protocol in dump entry"
49 )
Klement Sekerab4d30532018-11-08 13:00:02 +010050 self.assert_equal(d[1].index, 0, "index in dump entry")
51 self.assert_equal(d[1].active, 1, "active flag in dump entry")
52
53 def test_select_valid_backend(self):
Klement Sekerad9b0c6f2022-04-26 19:02:15 +020054 """select valid backend"""
Neale Ranns8e4a89b2019-01-23 08:16:17 -080055 self.vapi.ipsec_select_backend(self.vpp_ah_protocol, 0)
56 self.vapi.ipsec_select_backend(self.vpp_esp_protocol, 0)
Klement Sekerab4d30532018-11-08 13:00:02 +010057
58 def test_select_invalid_backend(self):
Klement Sekerad9b0c6f2022-04-26 19:02:15 +020059 """select invalid backend"""
Klement Sekerab4d30532018-11-08 13:00:02 +010060 with self.vapi.assert_negative_api_retval():
Neale Ranns8e4a89b2019-01-23 08:16:17 -080061 self.vapi.ipsec_select_backend(self.vpp_ah_protocol, 200)
Klement Sekerab4d30532018-11-08 13:00:02 +010062 with self.vapi.assert_negative_api_retval():
Neale Ranns8e4a89b2019-01-23 08:16:17 -080063 self.vapi.ipsec_select_backend(self.vpp_esp_protocol, 200)
Klement Sekerab4d30532018-11-08 13:00:02 +010064
65 def test_select_backend_in_use(self):
Klement Sekerad9b0c6f2022-04-26 19:02:15 +020066 """attempt to change backend while sad configured"""
Neale Ranns8e4a89b2019-01-23 08:16:17 -080067 params = self.ipv4_params
Klement Sekerab4d30532018-11-08 13:00:02 +010068 addr_type = params.addr_type
69 is_ipv6 = params.is_ipv6
70 scapy_tun_sa_id = params.scapy_tun_sa_id
71 scapy_tun_spi = params.scapy_tun_spi
72 auth_algo_vpp_id = params.auth_algo_vpp_id
73 auth_key = params.auth_key
74 crypt_algo_vpp_id = params.crypt_algo_vpp_id
75 crypt_key = params.crypt_key
76
Neale Rannsabc56602020-04-01 09:45:23 +000077 self.vapi.ipsec_sad_entry_add_del(
78 is_add=1,
79 entry={
Klement Sekerad9b0c6f2022-04-26 19:02:15 +020080 "sad_id": scapy_tun_sa_id,
81 "spi": scapy_tun_spi,
82 "integrity_algorithm": auth_algo_vpp_id,
83 "integrity_key": {
84 "data": auth_key,
85 "length": len(auth_key),
Neale Rannsabc56602020-04-01 09:45:23 +000086 },
Klement Sekerad9b0c6f2022-04-26 19:02:15 +020087 "crypto_algorithm": crypt_algo_vpp_id,
88 "crypto_key": {
89 "data": crypt_key,
90 "length": len(crypt_key),
Neale Rannsabc56602020-04-01 09:45:23 +000091 },
Klement Sekerad9b0c6f2022-04-26 19:02:15 +020092 "protocol": self.vpp_ah_protocol,
93 "tunnel_src": self.pg0.local_addr[addr_type],
94 "tunnel_dst": self.pg0.remote_addr[addr_type],
95 },
96 )
Klement Sekerab4d30532018-11-08 13:00:02 +010097 with self.vapi.assert_negative_api_retval():
Klement Sekerad9b0c6f2022-04-26 19:02:15 +020098 self.vapi.ipsec_select_backend(protocol=self.vpp_ah_protocol, index=0)
Klement Sekerab4d30532018-11-08 13:00:02 +010099
Neale Rannsabc56602020-04-01 09:45:23 +0000100 self.vapi.ipsec_sad_entry_add_del(
101 is_add=0,
102 entry={
Klement Sekerad9b0c6f2022-04-26 19:02:15 +0200103 "sad_id": scapy_tun_sa_id,
104 "spi": scapy_tun_spi,
105 "integrity_algorithm": auth_algo_vpp_id,
106 "integrity_key": {
107 "data": auth_key,
108 "length": len(auth_key),
Neale Rannsabc56602020-04-01 09:45:23 +0000109 },
Klement Sekerad9b0c6f2022-04-26 19:02:15 +0200110 "crypto_algorithm": crypt_algo_vpp_id,
111 "crypto_key": {
112 "data": crypt_key,
113 "length": len(crypt_key),
Neale Rannsabc56602020-04-01 09:45:23 +0000114 },
Klement Sekerad9b0c6f2022-04-26 19:02:15 +0200115 "protocol": self.vpp_ah_protocol,
116 "tunnel_src": self.pg0.local_addr[addr_type],
117 "tunnel_dst": self.pg0.remote_addr[addr_type],
118 },
119 )
120 self.vapi.ipsec_select_backend(protocol=self.vpp_ah_protocol, index=0)
Klement Sekerab4d30532018-11-08 13:00:02 +0100121
Maxime Peim1271e3a2023-03-20 14:13:56 +0000122 def __check_sa_binding(self, sa_id, thread_index):
123 found_sa = False
Maxime Peim0e2f1882022-12-22 11:26:57 +0000124 sa_dumps = self.vapi.ipsec_sa_v5_dump()
Maxime Peim1271e3a2023-03-20 14:13:56 +0000125 for dump in sa_dumps:
126 if dump.entry.sad_id == sa_id:
127 self.assertEqual(dump.thread_index, thread_index)
128 found_sa = True
129 break
130
131 if not found_sa:
132 self.fail("SA not found in VPP")
133
134 def test_sa_worker_bind(self):
135 """Bind an SA to a worker"""
136 sa = VppIpsecSA(
137 self,
138 self.ipv4_params.scapy_tun_sa_id,
139 self.ipv4_params.scapy_tun_spi,
140 self.ipv4_params.auth_algo_vpp_id,
141 self.ipv4_params.auth_key,
142 self.ipv4_params.crypt_algo_vpp_id,
143 self.ipv4_params.crypt_key,
144 VppEnum.vl_api_ipsec_proto_t.IPSEC_API_PROTO_ESP,
145 )
146 sa.add_vpp_config()
147
148 self.__check_sa_binding(sa.id, 0xFFFF)
149
150 self.vapi.ipsec_sad_bind(sa_id=sa.id, worker=1)
151
152 self.__check_sa_binding(sa.id, 2)
153
154 sa.remove_vpp_config()
155
Klement Sekerab4d30532018-11-08 13:00:02 +0100156
Klement Sekerad9b0c6f2022-04-26 19:02:15 +0200157if __name__ == "__main__":
Klement Sekerab4d30532018-11-08 13:00:02 +0100158 unittest.main(testRunner=VppTestRunner)