blob: 042bb88f5be2acc14598f1d0596b22e401ffb181 [file] [log] [blame]
“mystarrocks”23f0c452017-12-11 07:11:51 -08001import socket
Klement Sekera28fb03f2018-04-17 11:36:55 +02002import unittest
“mystarrocks”23f0c452017-12-11 07:11:51 -08003
4from scapy.layers.inet import IP, ICMP
Klement Sekera28fb03f2018-04-17 11:36:55 +02005from scapy.layers.l2 import Ether, Raw
6from scapy.layers.ipsec import SecurityAssociation, AH
“mystarrocks”23f0c452017-12-11 07:11:51 -08007
Klement Sekera28fb03f2018-04-17 11:36:55 +02008from framework import VppTestCase, VppTestRunner
“mystarrocks”23f0c452017-12-11 07:11:51 -08009
10
11class TestIpsecAh(VppTestCase):
12 """
13 Basic test for IPSEC using AH transport and Tunnel mode
14
15 Below 4 cases are covered as part of this test
16 1) ipsec ah v4 transport basic test - IPv4 Transport mode
17 scenario using HMAC-SHA1-96 intergrity algo
18 2) ipsec ah v4 transport burst test
19 Above test for 257 pkts
20 3) ipsec ah 4o4 tunnel basic test - IPv4 Tunnel mode
21 scenario using HMAC-SHA1-96 intergrity algo
22 4) ipsec ah 4o4 tunnel burst test
23 Above test for 257 pkts
24
25 TRANSPORT MODE:
26
27 --- encrypt ---
28 |pg2| <-------> |VPP|
29 --- decrypt ---
30
31 TUNNEL MODE:
32
33 --- encrypt --- plain ---
34 |pg0| -------> |VPP| ------> |pg1|
35 --- --- ---
36
37 --- decrypt --- plain ---
38 |pg0| <------- |VPP| <------ |pg1|
39 --- --- ---
40
41 Note : IPv6 is not covered
42 """
43
44 remote_pg0_lb_addr = '1.1.1.1'
45 remote_pg1_lb_addr = '2.2.2.2'
46
47 @classmethod
48 def setUpClass(cls):
49 super(TestIpsecAh, cls).setUpClass()
50 try:
51 cls.create_pg_interfaces(range(3))
52 cls.interfaces = list(cls.pg_interfaces)
53 for i in cls.interfaces:
54 i.admin_up()
55 i.config_ip4()
56 i.resolve_arp()
57 cls.logger.info(cls.vapi.ppcli("show int addr"))
58 cls.configAhTra()
59 cls.logger.info(cls.vapi.ppcli("show ipsec"))
60 cls.configAhTun()
61 cls.logger.info(cls.vapi.ppcli("show ipsec"))
62 except Exception:
63 super(TestIpsecAh, cls).tearDownClass()
64 raise
65
66 @classmethod
67 def configAhTun(cls):
68 try:
69 spd_id = 1
70 remote_sa_id = 10
71 local_sa_id = 20
72 remote_tun_spi = 1001
73 local_tun_spi = 1000
74 src4 = socket.inet_pton(socket.AF_INET, cls.remote_pg0_lb_addr)
75 cls.vapi.ip_add_del_route(src4, 32, cls.pg0.remote_ip4n)
76 dst4 = socket.inet_pton(socket.AF_INET, cls.remote_pg1_lb_addr)
77 cls.vapi.ip_add_del_route(dst4, 32, cls.pg1.remote_ip4n)
78 cls.vapi.ipsec_sad_add_del_entry(
79 remote_sa_id,
80 remote_tun_spi,
81 cls.pg0.local_ip4n,
82 cls.pg0.remote_ip4n,
83 integrity_key_length=20)
84 cls.vapi.ipsec_sad_add_del_entry(
85 local_sa_id,
86 local_tun_spi,
87 cls.pg0.remote_ip4n,
88 cls.pg0.local_ip4n,
89 integrity_key_length=20)
90 cls.vapi.ipsec_spd_add_del(spd_id)
91 cls.vapi.ipsec_interface_add_del_spd(spd_id, cls.pg0.sw_if_index)
92 l_startaddr = r_startaddr = socket.inet_pton(
93 socket.AF_INET, "0.0.0.0")
94 l_stopaddr = r_stopaddr = socket.inet_pton(
95 socket.AF_INET, "255.255.255.255")
96 cls.vapi.ipsec_spd_add_del_entry(
97 spd_id,
98 l_startaddr,
99 l_stopaddr,
100 r_startaddr,
101 r_stopaddr,
102 protocol=51)
103 cls.vapi.ipsec_spd_add_del_entry(
104 spd_id,
105 l_startaddr,
106 l_stopaddr,
107 r_startaddr,
108 r_stopaddr,
109 protocol=51,
110 is_outbound=0)
111 l_startaddr = l_stopaddr = socket.inet_pton(
112 socket.AF_INET, cls.remote_pg0_lb_addr)
113 r_startaddr = r_stopaddr = socket.inet_pton(
114 socket.AF_INET, cls.remote_pg1_lb_addr)
115 cls.vapi.ipsec_spd_add_del_entry(
116 spd_id,
117 l_startaddr,
118 l_stopaddr,
119 r_startaddr,
120 r_stopaddr,
121 priority=10,
122 policy=3,
123 is_outbound=0,
124 sa_id=local_sa_id)
125 cls.vapi.ipsec_spd_add_del_entry(
126 spd_id,
127 r_startaddr,
128 r_stopaddr,
129 l_startaddr,
130 l_stopaddr,
131 priority=10,
132 policy=3,
133 sa_id=remote_sa_id)
134 except Exception:
135 raise
136
137 @classmethod
138 def configAhTra(cls):
139 try:
140 spd_id = 2
141 remote_sa_id = 30
142 local_sa_id = 40
143 remote_tra_spi = 2001
144 local_tra_spi = 2000
145 cls.vapi.ipsec_sad_add_del_entry(
146 remote_sa_id,
147 remote_tra_spi,
148 integrity_key_length=20,
149 is_tunnel=0)
150 cls.vapi.ipsec_sad_add_del_entry(
151 local_sa_id,
152 local_tra_spi,
153 integrity_key_length=20,
154 is_tunnel=0)
155 cls.vapi.ipsec_spd_add_del(spd_id)
156 cls.vapi.ipsec_interface_add_del_spd(spd_id, cls.pg2.sw_if_index)
157 l_startaddr = r_startaddr = socket.inet_pton(
158 socket.AF_INET, "0.0.0.0")
159 l_stopaddr = r_stopaddr = socket.inet_pton(
160 socket.AF_INET, "255.255.255.255")
161 cls.vapi.ipsec_spd_add_del_entry(
162 spd_id,
163 l_startaddr,
164 l_stopaddr,
165 r_startaddr,
166 r_stopaddr,
167 protocol=51)
168 cls.vapi.ipsec_spd_add_del_entry(
169 spd_id,
170 l_startaddr,
171 l_stopaddr,
172 r_startaddr,
173 r_stopaddr,
174 protocol=51,
175 is_outbound=0)
176 l_startaddr = l_stopaddr = cls.pg2.local_ip4n
177 r_startaddr = r_stopaddr = cls.pg2.remote_ip4n
178 cls.vapi.ipsec_spd_add_del_entry(
179 spd_id,
180 l_startaddr,
181 l_stopaddr,
182 r_startaddr,
183 r_stopaddr,
184 priority=10,
185 policy=3,
186 is_outbound=0,
187 sa_id=local_sa_id)
188 cls.vapi.ipsec_spd_add_del_entry(
189 spd_id,
190 l_startaddr,
191 l_stopaddr,
192 r_startaddr,
193 r_stopaddr,
194 priority=10,
195 policy=3,
196 sa_id=remote_sa_id)
197 except Exception:
198 raise
199
200 def configScapySA(self, is_tun=False):
201 if is_tun:
202 self.remote_tun_sa = SecurityAssociation(
203 AH,
204 spi=0x000003e8,
205 auth_algo='HMAC-SHA1-96',
206 auth_key='C91KUR9GYMm5GfkEvNjX',
207 tunnel_header=IP(
208 src=self.pg0.remote_ip4,
209 dst=self.pg0.local_ip4))
210 self.local_tun_sa = SecurityAssociation(
211 AH,
212 spi=0x000003e9,
213 auth_algo='HMAC-SHA1-96',
214 auth_key='C91KUR9GYMm5GfkEvNjX',
215 tunnel_header=IP(
216 dst=self.pg0.remote_ip4,
217 src=self.pg0.local_ip4))
218 else:
219 self.remote_tra_sa = SecurityAssociation(
220 AH, spi=0x000007d0, auth_algo='HMAC-SHA1-96',
221 auth_key='C91KUR9GYMm5GfkEvNjX')
222 self.local_tra_sa = SecurityAssociation(
223 AH, spi=0x000007d1, auth_algo='HMAC-SHA1-96',
224 auth_key='C91KUR9GYMm5GfkEvNjX')
225
226 def tearDown(self):
227 super(TestIpsecAh, self).tearDown()
228 if not self.vpp_dead:
229 self.vapi.cli("show hardware")
230
231 def send_and_expect(self, input, pkts, output, count=1):
232 input.add_stream(pkts)
233 self.pg_enable_capture(self.pg_interfaces)
234 self.pg_start()
235 rx = output.get_capture(count)
236 return rx
237
238 def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1):
239 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
240 sa.encrypt(IP(src=src, dst=dst) / ICMP() /
241 "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX")
242 ] * count
243
244 def gen_pkts(self, sw_intf, src, dst, count=1):
245 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
246 IP(src=src, dst=dst) / ICMP() /
247 "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
248 ] * count
249
250 def test_ipsec_ah_tra_basic(self, count=1):
251 """ ipsec ah v4 transport basic test """
252 try:
253 self.configScapySA()
254 send_pkts = self.gen_encrypt_pkts(
255 self.remote_tra_sa,
256 self.pg2,
257 src=self.pg2.remote_ip4,
258 dst=self.pg2.local_ip4,
259 count=count)
260 recv_pkts = self.send_and_expect(
261 self.pg2, send_pkts, self.pg2, count=count)
262 # ESP TRA VPP encryption/decryption verification
263 for Pkts in recv_pkts:
264 Pkts[AH].padding = Pkts[AH].icv[12:]
265 Pkts[AH].icv = Pkts[AH].icv[:12]
Klement Sekera28fb03f2018-04-17 11:36:55 +0200266 self.local_tra_sa.decrypt(Pkts[IP])
“mystarrocks”23f0c452017-12-11 07:11:51 -0800267 finally:
268 self.logger.info(self.vapi.ppcli("show error"))
269 self.logger.info(self.vapi.ppcli("show ipsec"))
270
271 def test_ipsec_ah_tra_burst(self):
272 """ ipsec ah v4 transport burst test """
273 try:
274 self.test_ipsec_ah_tra_basic(count=257)
275 finally:
276 self.logger.info(self.vapi.ppcli("show error"))
277 self.logger.info(self.vapi.ppcli("show ipsec"))
278
279 def test_ipsec_ah_tun_basic(self, count=1):
280 """ ipsec ah 4o4 tunnel basic test """
281 try:
282 self.configScapySA(is_tun=True)
283 send_pkts = self.gen_encrypt_pkts(
284 self.remote_tun_sa,
285 self.pg0,
286 src=self.remote_pg0_lb_addr,
287 dst=self.remote_pg1_lb_addr,
288 count=count)
289 recv_pkts = self.send_and_expect(
290 self.pg0, send_pkts, self.pg1, count=count)
291 # ESP TUN VPP decryption verification
292 for recv_pkt in recv_pkts:
293 self.assert_equal(recv_pkt[IP].src, self.remote_pg0_lb_addr)
294 self.assert_equal(recv_pkt[IP].dst, self.remote_pg1_lb_addr)
295 send_pkts = self.gen_pkts(
296 self.pg1,
297 src=self.remote_pg1_lb_addr,
298 dst=self.remote_pg0_lb_addr,
299 count=count)
300 recv_pkts = self.send_and_expect(
301 self.pg1, send_pkts, self.pg0, count=count)
302 # ESP TUN VPP encryption verification
303 for recv_pkt in recv_pkts:
“mystarrocks”23f0c452017-12-11 07:11:51 -0800304 decrypt_pkt = self.local_tun_sa.decrypt(recv_pkt[IP])
Neale Ranns2bc94022018-02-25 12:27:18 -0800305 decrypt_pkt = IP(decrypt_pkt[Raw].load)
“mystarrocks”23f0c452017-12-11 07:11:51 -0800306 self.assert_equal(decrypt_pkt.src, self.remote_pg1_lb_addr)
307 self.assert_equal(decrypt_pkt.dst, self.remote_pg0_lb_addr)
308 finally:
309 self.logger.info(self.vapi.ppcli("show error"))
310 self.logger.info(self.vapi.ppcli("show ipsec"))
311
312 def test_ipsec_ah_tun_burst(self):
313 """ ipsec ah 4o4 tunnel burst test """
314 try:
315 self.test_ipsec_ah_tun_basic(count=257)
316 finally:
317 self.logger.info(self.vapi.ppcli("show error"))
318 self.logger.info(self.vapi.ppcli("show ipsec"))
319
320
321if __name__ == '__main__':
322 unittest.main(testRunner=VppTestRunner)