blob: 78f51501f9b8ac31822d762d27d7caa68b5300a9 [file] [log] [blame]
“mystarrocks”23f0c452017-12-11 07:11:51 -08001import socket
2
3from scapy.layers.inet import IP, ICMP
4from scapy.layers.l2 import Ether
5from scapy.layers.ipsec import *
6
7from framework import VppTestCase
8from vpp_ip_route import VppIpRoute
9
10from util import ppp
11
12
13class TestIpsecEsp(VppTestCase):
14 """
15 Basic test for ipsec esp sanity - tunnel and transport modes.
16
17 Below 4 cases are covered as part of this test
18 1) ipsec esp v4 transport basic test - IPv4 Transport mode
19 scenario using HMAC-SHA1-96 intergrity algo
20 2) ipsec esp v4 transport burst test
21 Above test for 257 pkts
22 3) ipsec esp 4o4 tunnel basic test - IPv4 Tunnel mode
23 scenario using HMAC-SHA1-96 intergrity algo
24 4) ipsec esp 4o4 tunnel burst test
25 Above test for 257 pkts
26
27 TRANSPORT MODE:
28
29 --- encrypt ---
30 |pg2| <-------> |VPP|
31 --- decrypt ---
32
33 TUNNEL MODE:
34
35 --- encrypt --- plain ---
36 |pg0| -------> |VPP| ------> |pg1|
37 --- --- ---
38
39 --- decrypt --- plain ---
40 |pg0| <------- |VPP| <------ |pg1|
41 --- --- ---
42
43 Note : IPv6 is not covered
44 """
45
46 remote_pg0_lb_addr = '1.1.1.1'
47 remote_pg1_lb_addr = '2.2.2.2'
48
49 @classmethod
50 def setUpClass(cls):
51 super(TestIpsecEsp, cls).setUpClass()
52 try:
53 cls.create_pg_interfaces(range(3))
54 cls.interfaces = list(cls.pg_interfaces)
55 for i in cls.interfaces:
56 i.admin_up()
57 i.config_ip4()
58 i.resolve_arp()
59 cls.logger.info(cls.vapi.ppcli("show int addr"))
60 cls.configEspTra()
61 cls.logger.info(cls.vapi.ppcli("show ipsec"))
62 cls.configEspTun()
63 cls.logger.info(cls.vapi.ppcli("show ipsec"))
64 except Exception:
65 super(TestIpsecEsp, cls).tearDownClass()
66 raise
67
68 @classmethod
69 def configEspTun(cls):
70 try:
71 spd_id = 1
72 remote_sa_id = 10
73 local_sa_id = 20
74 remote_tun_spi = 1001
75 local_tun_spi = 1000
76 src4 = socket.inet_pton(socket.AF_INET, cls.remote_pg0_lb_addr)
77 cls.vapi.ip_add_del_route(src4, 32, cls.pg0.remote_ip4n)
78 dst4 = socket.inet_pton(socket.AF_INET, cls.remote_pg1_lb_addr)
79 cls.vapi.ip_add_del_route(dst4, 32, cls.pg1.remote_ip4n)
80 cls.vapi.ipsec_sad_add_del_entry(
81 remote_sa_id,
82 remote_tun_spi,
83 cls.pg0.local_ip4n,
84 cls.pg0.remote_ip4n,
85 integrity_key_length=20,
86 crypto_key_length=16,
87 protocol=1)
88 cls.vapi.ipsec_sad_add_del_entry(
89 local_sa_id,
90 local_tun_spi,
91 cls.pg0.remote_ip4n,
92 cls.pg0.local_ip4n,
93 integrity_key_length=20,
94 crypto_key_length=16,
95 protocol=1)
96 cls.vapi.ipsec_spd_add_del(spd_id)
97 cls.vapi.ipsec_interface_add_del_spd(spd_id, cls.pg0.sw_if_index)
98 l_startaddr = r_startaddr = socket.inet_pton(
99 socket.AF_INET, "0.0.0.0")
100 l_stopaddr = r_stopaddr = socket.inet_pton(
101 socket.AF_INET, "255.255.255.255")
102 cls.vapi.ipsec_spd_add_del_entry(
103 spd_id,
104 l_startaddr,
105 l_stopaddr,
106 r_startaddr,
107 r_stopaddr,
108 protocol=50)
109 cls.vapi.ipsec_spd_add_del_entry(
110 spd_id,
111 l_startaddr,
112 l_stopaddr,
113 r_startaddr,
114 r_stopaddr,
115 protocol=50,
116 is_outbound=0)
117 l_startaddr = l_stopaddr = socket.inet_pton(
118 socket.AF_INET, cls.remote_pg0_lb_addr)
119 r_startaddr = r_stopaddr = socket.inet_pton(
120 socket.AF_INET, cls.remote_pg1_lb_addr)
121 cls.vapi.ipsec_spd_add_del_entry(
122 spd_id,
123 l_startaddr,
124 l_stopaddr,
125 r_startaddr,
126 r_stopaddr,
127 priority=10,
128 policy=3,
129 is_outbound=0,
130 sa_id=local_sa_id)
131 cls.vapi.ipsec_spd_add_del_entry(
132 spd_id,
133 r_startaddr,
134 r_stopaddr,
135 l_startaddr,
136 l_stopaddr,
137 priority=10,
138 policy=3,
139 sa_id=remote_sa_id)
140 except Exception:
141 raise
142
143 @classmethod
144 def configEspTra(cls):
145 try:
146 spd_id = 2
147 remote_sa_id = 30
148 local_sa_id = 40
149 remote_tra_spi = 2001
150 local_tra_spi = 2000
151 cls.vapi.ipsec_sad_add_del_entry(
152 remote_sa_id,
153 remote_tra_spi,
154 integrity_key_length=20,
155 crypto_key_length=16,
156 protocol=1,
157 is_tunnel=0)
158 cls.vapi.ipsec_sad_add_del_entry(
159 local_sa_id,
160 local_tra_spi,
161 integrity_key_length=20,
162 crypto_key_length=16,
163 protocol=1,
164 is_tunnel=0)
165 cls.vapi.ipsec_spd_add_del(spd_id)
166 cls.vapi.ipsec_interface_add_del_spd(spd_id, cls.pg2.sw_if_index)
167 l_startaddr = r_startaddr = socket.inet_pton(
168 socket.AF_INET, "0.0.0.0")
169 l_stopaddr = r_stopaddr = socket.inet_pton(
170 socket.AF_INET, "255.255.255.255")
171 cls.vapi.ipsec_spd_add_del_entry(
172 spd_id,
173 l_startaddr,
174 l_stopaddr,
175 r_startaddr,
176 r_stopaddr,
177 protocol=50)
178 cls.vapi.ipsec_spd_add_del_entry(
179 spd_id,
180 l_startaddr,
181 l_stopaddr,
182 r_startaddr,
183 r_stopaddr,
184 protocol=50,
185 is_outbound=0)
186 l_startaddr = l_stopaddr = cls.pg2.local_ip4n
187 r_startaddr = r_stopaddr = cls.pg2.remote_ip4n
188 cls.vapi.ipsec_spd_add_del_entry(
189 spd_id,
190 l_startaddr,
191 l_stopaddr,
192 r_startaddr,
193 r_stopaddr,
194 priority=10,
195 policy=3,
196 is_outbound=0,
197 sa_id=local_sa_id)
198 cls.vapi.ipsec_spd_add_del_entry(
199 spd_id,
200 l_startaddr,
201 l_stopaddr,
202 r_startaddr,
203 r_stopaddr,
204 priority=10,
205 policy=3,
206 sa_id=remote_sa_id)
207 except Exception:
208 raise
209
210 def configScapySA(self, is_tun=False):
211 if is_tun:
212 self.remote_tun_sa = SecurityAssociation(
213 ESP,
214 spi=0x000003e8,
215 crypt_algo='AES-CBC',
216 crypt_key='JPjyOWBeVEQiMe7h',
217 auth_algo='HMAC-SHA1-96',
218 auth_key='C91KUR9GYMm5GfkEvNjX',
219 tunnel_header=IP(
220 src=self.pg0.remote_ip4,
221 dst=self.pg0.local_ip4))
222 self.local_tun_sa = SecurityAssociation(
223 ESP,
224 spi=0x000003e9,
225 crypt_algo='AES-CBC',
226 crypt_key='JPjyOWBeVEQiMe7h',
227 auth_algo='HMAC-SHA1-96',
228 auth_key='C91KUR9GYMm5GfkEvNjX',
229 tunnel_header=IP(
230 dst=self.pg0.remote_ip4,
231 src=self.pg0.local_ip4))
232 else:
233 self.remote_tra_sa = SecurityAssociation(
234 ESP,
235 spi=0x000007d0,
236 crypt_algo='AES-CBC',
237 crypt_key='JPjyOWBeVEQiMe7h',
238 auth_algo='HMAC-SHA1-96',
239 auth_key='C91KUR9GYMm5GfkEvNjX')
240 self.local_tra_sa = SecurityAssociation(
241 ESP,
242 spi=0x000007d1,
243 crypt_algo='AES-CBC',
244 crypt_key='JPjyOWBeVEQiMe7h',
245 auth_algo='HMAC-SHA1-96',
246 auth_key='C91KUR9GYMm5GfkEvNjX')
247
248 def tearDown(self):
249 super(TestIpsecEsp, self).tearDown()
250 if not self.vpp_dead:
251 self.vapi.cli("show hardware")
252
253 def send_and_expect(self, input, pkts, output, count=1):
254 input.add_stream(pkts)
255 self.pg_enable_capture(self.pg_interfaces)
256 self.pg_start()
257 rx = output.get_capture(count)
258 return rx
259
260 def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1):
261 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
262 sa.encrypt(IP(src=src, dst=dst) / ICMP() /
263 "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX")
264 ] * count
265
266 def gen_pkts(self, sw_intf, src, dst, count=1):
267 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
268 IP(src=src, dst=dst) / ICMP() /
269 "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
270 ] * count
271
272 def test_ipsec_esp_tra_basic(self, count=1):
273 """ ipsec esp v4 transport basic test """
274 try:
275 self.configScapySA()
276 send_pkts = self.gen_encrypt_pkts(
277 self.remote_tra_sa,
278 self.pg2,
279 src=self.pg2.remote_ip4,
280 dst=self.pg2.local_ip4,
281 count=count)
282 recv_pkts = self.send_and_expect(
283 self.pg2, send_pkts, self.pg2, count=count)
284 # ESP TRA VPP encryption/decryption verification
285 for Pkts in recv_pkts:
286 decrypt_pkt = self.local_tra_sa.decrypt(Pkts[IP])
287 finally:
288 self.logger.info(self.vapi.ppcli("show error"))
289 self.logger.info(self.vapi.ppcli("show ipsec"))
290
291 def test_ipsec_esp_tra_burst(self):
292 """ ipsec esp v4 transport burst test """
293 try:
294 self.test_ipsec_esp_tra_basic(count=257)
295 finally:
296 self.logger.info(self.vapi.ppcli("show error"))
297 self.logger.info(self.vapi.ppcli("show ipsec"))
298
299 def test_ipsec_esp_tun_basic(self, count=1):
300 """ ipsec esp 4o4 tunnel basic test """
301 try:
302 self.configScapySA(is_tun=True)
303 send_pkts = self.gen_encrypt_pkts(
304 self.remote_tun_sa,
305 self.pg0,
306 src=self.remote_pg0_lb_addr,
307 dst=self.remote_pg1_lb_addr,
308 count=count)
309 recv_pkts = self.send_and_expect(
310 self.pg0, send_pkts, self.pg1, count=count)
311 # ESP TUN VPP decryption verification
312 for recv_pkt in recv_pkts:
313 self.assert_equal(recv_pkt[IP].src, self.remote_pg0_lb_addr)
314 self.assert_equal(recv_pkt[IP].dst, self.remote_pg1_lb_addr)
315 send_pkts = self.gen_pkts(
316 self.pg1,
317 src=self.remote_pg1_lb_addr,
318 dst=self.remote_pg0_lb_addr,
319 count=count)
320 recv_pkts = self.send_and_expect(
321 self.pg1, send_pkts, self.pg0, count=count)
322 # ESP TUN VPP encryption verification
323 for recv_pkt in recv_pkts:
324 decrypt_pkt = self.local_tun_sa.decrypt(recv_pkt[IP])
325 self.assert_equal(decrypt_pkt.src, self.remote_pg1_lb_addr)
326 self.assert_equal(decrypt_pkt.dst, self.remote_pg0_lb_addr)
327 finally:
328 self.logger.info(self.vapi.ppcli("show error"))
329 self.logger.info(self.vapi.ppcli("show ipsec"))
330
331 def test_ipsec_esp_tun_burst(self):
332 """ ipsec esp 4o4 tunnel burst test """
333 try:
334 self.test_ipsec_esp_tun_basic(count=257)
335 finally:
336 self.logger.info(self.vapi.ppcli("show error"))
337 self.logger.info(self.vapi.ppcli("show ipsec"))
338
339
340if __name__ == '__main__':
341 unittest.main(testRunner=VppTestRunner)