blob: 9e0b9ea912e754816fc31950f6b20113ec8672ad [file] [log] [blame]
Klement Sekera4b089f22018-04-17 18:04:57 +02001#!/usr/bin/env python
2
3import socket
4
5from scapy.layers.l2 import Ether
6from scapy.layers.inet import ICMP, IP, TCP, UDP
7from scapy.layers.ipsec import SecurityAssociation, ESP
8from util import ppp, ppc
Klement Sekerabeaded52018-06-24 10:30:37 +02009from template_ipsec import TemplateIpsec
Klement Sekera4b089f22018-04-17 18:04:57 +020010
11
Klement Sekerabeaded52018-06-24 10:30:37 +020012class IPSecNATTestCase(TemplateIpsec):
Klement Sekera4b089f22018-04-17 18:04:57 +020013 """ IPSec/NAT
Klement Sekera4b089f22018-04-17 18:04:57 +020014 TUNNEL MODE:
15
16
17 public network | private network
18 --- encrypt --- plain ---
19 |pg0| <------- |VPP| <------ |pg1|
20 --- --- ---
21
22 --- decrypt --- plain ---
23 |pg0| -------> |VPP| ------> |pg1|
24 --- --- ---
25 """
26
Klement Sekera64526222018-06-15 12:44:16 +020027 tcp_port_in = 6303
28 tcp_port_out = 6303
29 udp_port_in = 6304
30 udp_port_out = 6304
31 icmp_id_in = 6305
32 icmp_id_out = 6305
33
Klement Sekera4b089f22018-04-17 18:04:57 +020034 @classmethod
35 def setUpClass(cls):
36 super(IPSecNATTestCase, cls).setUpClass()
Klement Sekerabeaded52018-06-24 10:30:37 +020037 cls.tun_if = cls.pg0
Klement Sekera4b089f22018-04-17 18:04:57 +020038 cls.config_esp_tun()
39 cls.logger.info(cls.vapi.ppcli("show ipsec"))
Klement Sekerabeaded52018-06-24 10:30:37 +020040 client = socket.inet_pton(socket.AF_INET, cls.remote_tun_if_host)
Klement Sekera64526222018-06-15 12:44:16 +020041 cls.vapi.ip_add_del_route(client, 32, cls.tun_if.remote_ip4n)
Klement Sekera4b089f22018-04-17 18:04:57 +020042
43 def create_stream_plain(self, src_mac, dst_mac, src_ip, dst_ip):
44 return [
45 # TCP
46 Ether(src=src_mac, dst=dst_mac) /
47 IP(src=src_ip, dst=dst_ip) /
48 TCP(sport=self.tcp_port_in, dport=20),
49 # UDP
50 Ether(src=src_mac, dst=dst_mac) /
51 IP(src=src_ip, dst=dst_ip) /
52 UDP(sport=self.udp_port_in, dport=20),
53 # ICMP
54 Ether(src=src_mac, dst=dst_mac) /
55 IP(src=src_ip, dst=dst_ip) /
56 ICMP(id=self.icmp_id_in, type='echo-request')
57 ]
58
59 def create_stream_encrypted(self, src_mac, dst_mac, src_ip, dst_ip, sa):
60 return [
61 # TCP
62 Ether(src=src_mac, dst=dst_mac) /
63 sa.encrypt(IP(src=src_ip, dst=dst_ip) /
64 TCP(dport=self.tcp_port_out, sport=20)),
65 # UDP
66 Ether(src=src_mac, dst=dst_mac) /
67 sa.encrypt(IP(src=src_ip, dst=dst_ip) /
68 UDP(dport=self.udp_port_out, sport=20)),
69 # ICMP
70 Ether(src=src_mac, dst=dst_mac) /
71 sa.encrypt(IP(src=src_ip, dst=dst_ip) /
72 ICMP(id=self.icmp_id_out, type='echo-request'))
73 ]
74
Klement Sekera4b089f22018-04-17 18:04:57 +020075 def verify_capture_plain(self, capture):
76 for packet in capture:
77 try:
Klement Sekerad81ae412018-05-16 10:52:54 +020078 self.assert_packet_checksums_valid(packet)
Klement Sekerabeaded52018-06-24 10:30:37 +020079 self.assert_equal(packet[IP].src, self.tun_if.remote_ip4,
Klement Sekera4b089f22018-04-17 18:04:57 +020080 "decrypted packet source address")
81 self.assert_equal(packet[IP].dst, self.pg1.remote_ip4,
82 "decrypted packet destination address")
83 if packet.haslayer(TCP):
84 self.assertFalse(
85 packet.haslayer(UDP),
86 "unexpected UDP header in decrypted packet")
87 self.assert_equal(packet[TCP].dport, self.tcp_port_in,
88 "decrypted packet TCP destination port")
Klement Sekera4b089f22018-04-17 18:04:57 +020089 elif packet.haslayer(UDP):
90 if packet[UDP].payload:
91 self.assertFalse(
92 packet[UDP][1].haslayer(UDP),
93 "unexpected UDP header in decrypted packet")
94 self.assert_equal(packet[UDP].dport, self.udp_port_in,
95 "decrypted packet UDP destination port")
96 else:
97 self.assertFalse(
98 packet.haslayer(UDP),
99 "unexpected UDP header in decrypted packet")
100 self.assert_equal(packet[ICMP].id, self.icmp_id_in,
101 "decrypted packet ICMP ID")
Klement Sekera4b089f22018-04-17 18:04:57 +0200102 except Exception:
103 self.logger.error(
104 ppp("Unexpected or invalid plain packet:", packet))
105 raise
106
107 def verify_capture_encrypted(self, capture, sa):
108 for packet in capture:
109 try:
Klement Sekera64526222018-06-15 12:44:16 +0200110 copy = packet.__class__(str(packet))
111 del copy[UDP].len
112 copy = packet.__class__(str(copy))
113 self.assert_equal(packet[UDP].len, copy[UDP].len,
114 "UDP header length")
115 self.assert_packet_checksums_valid(packet)
Klement Sekera4b089f22018-04-17 18:04:57 +0200116 self.assertIn(ESP, packet[IP])
117 decrypt_pkt = sa.decrypt(packet[IP])
Klement Sekera64526222018-06-15 12:44:16 +0200118 self.assert_packet_checksums_valid(decrypt_pkt)
Klement Sekera4b089f22018-04-17 18:04:57 +0200119 self.assert_equal(decrypt_pkt[IP].src, self.pg1.remote_ip4,
120 "encrypted packet source address")
Klement Sekerabeaded52018-06-24 10:30:37 +0200121 self.assert_equal(decrypt_pkt[IP].dst, self.tun_if.remote_ip4,
Klement Sekera4b089f22018-04-17 18:04:57 +0200122 "encrypted packet destination address")
Klement Sekera4b089f22018-04-17 18:04:57 +0200123 except Exception:
124 self.logger.error(
125 ppp("Unexpected or invalid encrypted packet:", packet))
126 raise
127
128 @classmethod
129 def config_esp_tun(cls):
Klement Sekerabeaded52018-06-24 10:30:37 +0200130 cls.vapi.ipsec_sad_add_del_entry(cls.scapy_tun_sa_id,
131 cls.scapy_tun_spi,
132 cls.auth_algo_vpp_id, cls.auth_key,
133 cls.crypt_algo_vpp_id,
134 cls.crypt_key, cls.vpp_esp_protocol,
Klement Sekera4b089f22018-04-17 18:04:57 +0200135 cls.pg1.remote_ip4n,
Klement Sekera64526222018-06-15 12:44:16 +0200136 cls.tun_if.remote_ip4n,
137 udp_encap=1)
Klement Sekerabeaded52018-06-24 10:30:37 +0200138 cls.vapi.ipsec_sad_add_del_entry(cls.vpp_tun_sa_id,
139 cls.vpp_tun_spi,
140 cls.auth_algo_vpp_id, cls.auth_key,
141 cls.crypt_algo_vpp_id,
142 cls.crypt_key, cls.vpp_esp_protocol,
143 cls.tun_if.remote_ip4n,
Klement Sekera64526222018-06-15 12:44:16 +0200144 cls.pg1.remote_ip4n,
145 udp_encap=1)
Klement Sekerabeaded52018-06-24 10:30:37 +0200146 cls.vapi.ipsec_spd_add_del(cls.tun_spd_id)
147 cls.vapi.ipsec_interface_add_del_spd(cls.tun_spd_id,
148 cls.tun_if.sw_if_index)
Klement Sekera4b089f22018-04-17 18:04:57 +0200149 l_startaddr = r_startaddr = socket.inet_pton(socket.AF_INET,
150 "0.0.0.0")
151 l_stopaddr = r_stopaddr = socket.inet_pton(socket.AF_INET,
152 "255.255.255.255")
Klement Sekera64526222018-06-15 12:44:16 +0200153 cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.scapy_tun_sa_id,
Klement Sekerabeaded52018-06-24 10:30:37 +0200154 l_startaddr, l_stopaddr, r_startaddr,
155 r_stopaddr,
Klement Sekera4b089f22018-04-17 18:04:57 +0200156 protocol=socket.IPPROTO_ESP)
Klement Sekerabeaded52018-06-24 10:30:37 +0200157 cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.scapy_tun_sa_id,
158 l_startaddr, l_stopaddr, r_startaddr,
159 r_stopaddr, is_outbound=0,
160 protocol=socket.IPPROTO_ESP)
Klement Sekera64526222018-06-15 12:44:16 +0200161 cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.scapy_tun_sa_id,
Klement Sekerabeaded52018-06-24 10:30:37 +0200162 l_startaddr, l_stopaddr, r_startaddr,
163 r_stopaddr, remote_port_start=4500,
Klement Sekera4b089f22018-04-17 18:04:57 +0200164 remote_port_stop=4500,
165 protocol=socket.IPPROTO_UDP)
Klement Sekerabeaded52018-06-24 10:30:37 +0200166 cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.scapy_tun_sa_id,
167 l_startaddr, l_stopaddr, r_startaddr,
168 r_stopaddr, remote_port_start=4500,
Klement Sekera4b089f22018-04-17 18:04:57 +0200169 remote_port_stop=4500,
170 protocol=socket.IPPROTO_UDP,
171 is_outbound=0)
Klement Sekerabeaded52018-06-24 10:30:37 +0200172 l_startaddr = l_stopaddr = cls.tun_if.remote_ip4n
Klement Sekera4b089f22018-04-17 18:04:57 +0200173 r_startaddr = r_stopaddr = cls.pg1.remote_ip4n
Klement Sekerabeaded52018-06-24 10:30:37 +0200174 cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.vpp_tun_sa_id,
175 l_startaddr, l_stopaddr, r_startaddr,
176 r_stopaddr, priority=10, policy=3,
177 is_outbound=0)
178 cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.scapy_tun_sa_id,
179 r_startaddr, r_stopaddr, l_startaddr,
180 l_stopaddr, priority=10, policy=3)
Klement Sekera4b089f22018-04-17 18:04:57 +0200181
182 def test_ipsec_nat_tun(self):
183 """ IPSec/NAT tunnel test case """
Klement Sekera64526222018-06-15 12:44:16 +0200184 scapy_tun_sa = SecurityAssociation(ESP, spi=self.scapy_tun_spi,
Klement Sekerabeaded52018-06-24 10:30:37 +0200185 crypt_algo=self.crypt_algo,
186 crypt_key=self.crypt_key,
187 auth_algo=self.auth_algo,
188 auth_key=self.auth_key,
Klement Sekera4b089f22018-04-17 18:04:57 +0200189 tunnel_header=IP(
190 src=self.pg1.remote_ip4,
Klement Sekerabeaded52018-06-24 10:30:37 +0200191 dst=self.tun_if.remote_ip4),
Klement Sekera4b089f22018-04-17 18:04:57 +0200192 nat_t_header=UDP(
193 sport=4500,
194 dport=4500))
195 # in2out - from private network to public
196 pkts = self.create_stream_plain(
197 self.pg1.remote_mac, self.pg1.local_mac,
Klement Sekerabeaded52018-06-24 10:30:37 +0200198 self.pg1.remote_ip4, self.tun_if.remote_ip4)
Klement Sekera4b089f22018-04-17 18:04:57 +0200199 self.pg1.add_stream(pkts)
200 self.pg_enable_capture(self.pg_interfaces)
201 self.pg_start()
Klement Sekerabeaded52018-06-24 10:30:37 +0200202 capture = self.tun_if.get_capture(len(pkts))
203 self.verify_capture_encrypted(capture, scapy_tun_sa)
Klement Sekera4b089f22018-04-17 18:04:57 +0200204
Klement Sekerabeaded52018-06-24 10:30:37 +0200205 vpp_tun_sa = SecurityAssociation(ESP,
206 spi=self.vpp_tun_spi,
207 crypt_algo=self.crypt_algo,
208 crypt_key=self.crypt_key,
209 auth_algo=self.auth_algo,
210 auth_key=self.auth_key,
211 tunnel_header=IP(
212 src=self.tun_if.remote_ip4,
213 dst=self.pg1.remote_ip4),
214 nat_t_header=UDP(
215 sport=4500,
216 dport=4500))
Klement Sekera4b089f22018-04-17 18:04:57 +0200217
218 # out2in - from public network to private
219 pkts = self.create_stream_encrypted(
Klement Sekerabeaded52018-06-24 10:30:37 +0200220 self.tun_if.remote_mac, self.tun_if.local_mac,
221 self.tun_if.remote_ip4, self.pg1.remote_ip4, vpp_tun_sa)
Klement Sekera4b089f22018-04-17 18:04:57 +0200222 self.logger.info(ppc("Sending packets:", pkts))
Klement Sekerabeaded52018-06-24 10:30:37 +0200223 self.tun_if.add_stream(pkts)
Klement Sekera4b089f22018-04-17 18:04:57 +0200224 self.pg_enable_capture(self.pg_interfaces)
225 self.pg_start()
226 capture = self.pg1.get_capture(len(pkts))
227 self.verify_capture_plain(capture)