blob: e90d9c0b71d08986efbe68a09625e635e7ea5367 [file] [log] [blame]
Matus Fabiande886752016-12-07 03:38:19 -08001#!/usr/bin/env python
2
3import socket
4import unittest
5from logging import *
6
7from framework import VppTestCase, VppTestRunner
8
9from scapy.layers.inet import IP, TCP, UDP, ICMP
10from scapy.layers.l2 import Ether
11
12
13class TestSNAT(VppTestCase):
14 """ SNAT Test Cases """
15
16 @classmethod
17 def setUpClass(cls):
18 super(TestSNAT, cls).setUpClass()
19
20 try:
21 cls.tcp_port_in = 6303
22 cls.tcp_port_out = 6303
23 cls.udp_port_in = 6304
24 cls.udp_port_out = 6304
25 cls.icmp_id_in = 6305
26 cls.icmp_id_out = 6305
27 cls.snat_addr = '10.0.0.3'
28
29 cls.create_pg_interfaces(range(7))
30 cls.interfaces = list(cls.pg_interfaces[0:4])
31
32 for i in cls.interfaces:
33 i.admin_up()
34 i.config_ip4()
35 i.resolve_arp()
36
37 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
38
39 for i in cls.overlapping_interfaces:
40 i._local_ip4 = "172.16.255.1"
41 i._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
42 i._remote_hosts[0]._ip4 = "172.16.255.2"
43 i.set_table_ip4(i.sw_if_index)
44 i.config_ip4()
45 i.admin_up()
46 i.resolve_arp()
47
48 except Exception:
49 super(TestSNAT, cls).tearDownClass()
50 raise
51
52 def create_stream_in(self, in_if, out_if):
53 """
54 Create packet stream for inside network
55
56 :param in_if: Inside interface
57 :param out_if: Outside interface
58 """
59 pkts = []
60 # TCP
61 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
62 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
63 TCP(sport=self.tcp_port_in))
64 pkts.append(p)
65
66 # UDP
67 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
68 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
69 UDP(sport=self.udp_port_in))
70 pkts.append(p)
71
72 # ICMP
73 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
74 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
75 ICMP(id=self.icmp_id_in, type='echo-request'))
76 pkts.append(p)
77
78 return pkts
79
80 def create_stream_out(self, out_if, dst_ip=None):
81 """
82 Create packet stream for outside network
83
84 :param out_if: Outside interface
85 :param dst_ip: Destination IP address (Default use global SNAT address)
86 """
87 if dst_ip is None:
88 dst_ip=self.snat_addr
89 pkts = []
90 # TCP
91 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
92 IP(src=out_if.remote_ip4, dst=dst_ip) /
93 TCP(dport=self.tcp_port_out))
94 pkts.append(p)
95
96 # UDP
97 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
98 IP(src=out_if.remote_ip4, dst=dst_ip) /
99 UDP(dport=self.udp_port_out))
100 pkts.append(p)
101
102 # ICMP
103 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
104 IP(src=out_if.remote_ip4, dst=dst_ip) /
105 ICMP(id=self.icmp_id_out, type='echo-reply'))
106 pkts.append(p)
107
108 return pkts
109
110 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
111 packet_num=3):
112 """
113 Verify captured packets on outside network
114
115 :param capture: Captured packets
116 :param nat_ip: Translated IP address (Default use global SNAT address)
117 :param same_port: Sorce port number is not translated (Default False)
118 :param packet_num: Expected number of packets (Default 3)
119 """
120 if nat_ip is None:
121 nat_ip = self.snat_addr
122 self.assertEqual(packet_num, len(capture))
123 for packet in capture:
124 try:
125 self.assertEqual(packet[IP].src, nat_ip)
126 if packet.haslayer(TCP):
127 if same_port:
128 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
129 else:
130 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
131 self.tcp_port_out = packet[TCP].sport
132 elif packet.haslayer(UDP):
133 if same_port:
134 self.assertEqual(packet[UDP].sport, self.udp_port_in)
135 else:
136 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
137 self.udp_port_out = packet[UDP].sport
138 else:
139 if same_port:
140 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
141 else:
142 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
143 self.icmp_id_out = packet[ICMP].id
144 except:
145 error("Unexpected or invalid packet (outside network):")
146 error(packet.show())
147 raise
148
149 def verify_capture_in(self, capture, in_if, packet_num=3):
150 """
151 Verify captured packets on inside network
152
153 :param capture: Captured packets
154 :param in_if: Inside interface
155 :param packet_num: Expected number of packets (Default 3)
156 """
157 self.assertEqual(packet_num, len(capture))
158 for packet in capture:
159 try:
160 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
161 if packet.haslayer(TCP):
162 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
163 elif packet.haslayer(UDP):
164 self.assertEqual(packet[UDP].dport, self.udp_port_in)
165 else:
166 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
167 except:
168 error("Unexpected or invalid packet (inside network):")
169 error(packet.show())
170 raise
171
172 def clear_snat(self):
173 """
174 Clear SNAT configuration.
175 """
176 interfaces = self.vapi.snat_interface_dump()
177 for intf in interfaces:
178 self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
179 intf.is_inside,
180 is_add=0)
181
182 static_mappings = self.vapi.snat_static_mapping_dump()
183 for sm in static_mappings:
184 self.vapi.snat_add_static_mapping(sm.local_ip_address,
185 sm.external_ip_address,
186 local_port=sm.local_port,
187 external_port=sm.external_port,
188 addr_only=sm.addr_only,
189 vrf_id=sm.vrf_id,
190 is_add=0)
191
192 adresses = self.vapi.snat_address_dump()
193 for addr in adresses:
194 self.vapi.snat_add_address_range(addr.ip_address,
195 addr.ip_address,
196 is_add=0)
197
198 def snat_add_static_mapping(self, local_ip, external_ip, local_port=0,
199 external_port=0, vrf_id=0, is_add=1):
200 """
201 Add/delete S-NAT static mapping
202
203 :param local_ip: Local IP address
204 :param external_ip: External IP address
205 :param local_port: Local port number (Optional)
206 :param external_port: External port number (Optional)
207 :param vrf_id: VRF ID (Default 0)
208 :param is_add: 1 if add, 0 if delete (Default add)
209 """
210 addr_only = 1
211 if local_port and external_port:
212 addr_only = 0
213 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
214 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
215 self.vapi.snat_add_static_mapping(l_ip, e_ip, local_port, external_port,
216 addr_only, vrf_id, is_add)
217
218 def snat_add_address(self, ip, is_add=1):
219 """
220 Add/delete S-NAT address
221
222 :param ip: IP address
223 :param is_add: 1 if add, 0 if delete (Default add)
224 """
225 snat_addr = socket.inet_pton(socket.AF_INET, ip)
226 self.vapi.snat_add_address_range(snat_addr, snat_addr, is_add)
227
228 def test_dynamic(self):
229 """ SNAT dynamic translation test """
230
231 self.snat_add_address(self.snat_addr)
232 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
233 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
234 is_inside=0)
235
236 # in2out
237 pkts = self.create_stream_in(self.pg0, self.pg1)
238 self.pg0.add_stream(pkts)
239 self.pg_enable_capture(self.pg_interfaces)
240 self.pg_start()
241 capture = self.pg1.get_capture()
242 self.verify_capture_out(capture)
243
244 # out2in
245 pkts = self.create_stream_out(self.pg1)
246 self.pg1.add_stream(pkts)
247 self.pg_enable_capture(self.pg_interfaces)
248 self.pg_start()
249 capture = self.pg0.get_capture()
250 self.verify_capture_in(capture, self.pg0)
251
252 def test_static_in(self):
253 """ SNAT 1:1 NAT initialized from inside network """
254
255 nat_ip = "10.0.0.10"
256 self.tcp_port_out = 6303
257 self.udp_port_out = 6304
258 self.icmp_id_out = 6305
259
260 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
261 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
262 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
263 is_inside=0)
264
265 # in2out
266 pkts = self.create_stream_in(self.pg0, self.pg1)
267 self.pg0.add_stream(pkts)
268 self.pg_enable_capture(self.pg_interfaces)
269 self.pg_start()
270 capture = self.pg1.get_capture()
271 self.verify_capture_out(capture, nat_ip, True)
272
273 # out2in
274 pkts = self.create_stream_out(self.pg1, nat_ip)
275 self.pg1.add_stream(pkts)
276 self.pg_enable_capture(self.pg_interfaces)
277 self.pg_start()
278 capture = self.pg0.get_capture()
279 self.verify_capture_in(capture, self.pg0)
280
281 def test_static_out(self):
282 """ SNAT 1:1 NAT initialized from outside network """
283
284 nat_ip = "10.0.0.20"
285 self.tcp_port_out = 6303
286 self.udp_port_out = 6304
287 self.icmp_id_out = 6305
288
289 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
290 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
291 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
292 is_inside=0)
293
294 # out2in
295 pkts = self.create_stream_out(self.pg1, nat_ip)
296 self.pg1.add_stream(pkts)
297 self.pg_enable_capture(self.pg_interfaces)
298 self.pg_start()
299 capture = self.pg0.get_capture()
300 self.verify_capture_in(capture, self.pg0)
301
302 # in2out
303 pkts = self.create_stream_in(self.pg0, self.pg1)
304 self.pg0.add_stream(pkts)
305 self.pg_enable_capture(self.pg_interfaces)
306 self.pg_start()
307 capture = self.pg1.get_capture()
308 self.verify_capture_out(capture, nat_ip, True)
309
310 def test_static_with_port_in(self):
311 """ SNAT 1:1 NAT with port initialized from inside network """
312
313 self.tcp_port_out = 3606
314 self.udp_port_out = 3607
315 self.icmp_id_out = 3608
316
317 self.snat_add_address(self.snat_addr)
318 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
319 self.tcp_port_in, self.tcp_port_out)
320 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
321 self.udp_port_in, self.udp_port_out)
322 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
323 self.icmp_id_in, self.icmp_id_out)
324 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
325 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
326 is_inside=0)
327
328 # in2out
329 pkts = self.create_stream_in(self.pg0, self.pg1)
330 self.pg0.add_stream(pkts)
331 self.pg_enable_capture(self.pg_interfaces)
332 self.pg_start()
333 capture = self.pg1.get_capture()
334 self.verify_capture_out(capture)
335
336 # out2in
337 pkts = self.create_stream_out(self.pg1)
338 self.pg1.add_stream(pkts)
339 self.pg_enable_capture(self.pg_interfaces)
340 self.pg_start()
341 capture = self.pg0.get_capture()
342 self.verify_capture_in(capture, self.pg0)
343
344 def test_static_with_port_out(self):
345 """ SNAT 1:1 NAT with port initialized from outside network """
346
347 self.tcp_port_out = 30606
348 self.udp_port_out = 30607
349 self.icmp_id_out = 30608
350
351 self.snat_add_address(self.snat_addr)
352 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
353 self.tcp_port_in, self.tcp_port_out)
354 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
355 self.udp_port_in, self.udp_port_out)
356 self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
357 self.icmp_id_in, self.icmp_id_out)
358 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
359 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
360 is_inside=0)
361
362 # out2in
363 pkts = self.create_stream_out(self.pg1)
364 self.pg1.add_stream(pkts)
365 self.pg_enable_capture(self.pg_interfaces)
366 self.pg_start()
367 capture = self.pg0.get_capture()
368 self.verify_capture_in(capture, self.pg0)
369
370 # in2out
371 pkts = self.create_stream_in(self.pg0, self.pg1)
372 self.pg0.add_stream(pkts)
373 self.pg_enable_capture(self.pg_interfaces)
374 self.pg_start()
375 capture = self.pg1.get_capture()
376 self.verify_capture_out(capture)
377
378 def test_static_vrf_aware(self):
379 """ SNAT 1:1 NAT VRF awareness """
380
381 nat_ip1 = "10.0.0.30"
382 nat_ip2 = "10.0.0.40"
383 self.tcp_port_out = 6303
384 self.udp_port_out = 6304
385 self.icmp_id_out = 6305
386
387 self.snat_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
388 vrf_id=self.pg4.sw_if_index)
389 self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
390 vrf_id=self.pg4.sw_if_index)
391 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
392 is_inside=0)
393 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
394 self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
395
396 # inside interface VRF match SNAT static mapping VRF
397 pkts = self.create_stream_in(self.pg4, self.pg3)
398 self.pg4.add_stream(pkts)
399 self.pg_enable_capture(self.pg_interfaces)
400 self.pg_start()
401 capture = self.pg3.get_capture()
402 self.verify_capture_out(capture, nat_ip1, True)
403
404 # inside interface VRF don't match SNAT static mapping VRF (packets
405 # are dropped)
406 pkts = self.create_stream_in(self.pg0, self.pg3)
407 self.pg0.add_stream(pkts)
408 self.pg_enable_capture(self.pg_interfaces)
409 self.pg_start()
410 capture = self.pg3.get_capture()
411 self.verify_capture_out(capture, packet_num=0)
412
413 def test_multiple_inside_interfaces(self):
414 """ SNAT multiple inside interfaces with non-overlapping address space """
415
416 self.snat_add_address(self.snat_addr)
417 self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
418 self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
419 self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index)
420 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
421 is_inside=0)
422
423 # in2out 1st interface
424 pkts = self.create_stream_in(self.pg0, self.pg3)
425 self.pg0.add_stream(pkts)
426 self.pg_enable_capture(self.pg_interfaces)
427 self.pg_start()
428 capture = self.pg3.get_capture()
429 self.verify_capture_out(capture)
430
431 # out2in 1st interface
432 pkts = self.create_stream_out(self.pg3)
433 self.pg3.add_stream(pkts)
434 self.pg_enable_capture(self.pg_interfaces)
435 self.pg_start()
436 capture = self.pg0.get_capture()
437 self.verify_capture_in(capture, self.pg0)
438
439 # in2out 2nd interface
440 pkts = self.create_stream_in(self.pg1, self.pg3)
441 self.pg1.add_stream(pkts)
442 self.pg_enable_capture(self.pg_interfaces)
443 self.pg_start()
444 capture = self.pg3.get_capture()
445 self.verify_capture_out(capture)
446
447 # out2in 2nd interface
448 pkts = self.create_stream_out(self.pg3)
449 self.pg3.add_stream(pkts)
450 self.pg_enable_capture(self.pg_interfaces)
451 self.pg_start()
452 capture = self.pg1.get_capture()
453 self.verify_capture_in(capture, self.pg1)
454
455 # in2out 3rd interface
456 pkts = self.create_stream_in(self.pg2, self.pg3)
457 self.pg2.add_stream(pkts)
458 self.pg_enable_capture(self.pg_interfaces)
459 self.pg_start()
460 capture = self.pg3.get_capture()
461 self.verify_capture_out(capture)
462
463 # out2in 3rd interface
464 pkts = self.create_stream_out(self.pg3)
465 self.pg3.add_stream(pkts)
466 self.pg_enable_capture(self.pg_interfaces)
467 self.pg_start()
468 capture = self.pg2.get_capture()
469 self.verify_capture_in(capture, self.pg2)
470
471 def test_inside_overlapping_interfaces(self):
472 """ SNAT multiple inside interfaces with overlapping address space """
473
474 self.snat_add_address(self.snat_addr)
475 self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
476 is_inside=0)
477 self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
478 self.vapi.snat_interface_add_del_feature(self.pg5.sw_if_index)
479 self.vapi.snat_interface_add_del_feature(self.pg6.sw_if_index)
480
481 # in2out 1st interface
482 pkts = self.create_stream_in(self.pg4, self.pg3)
483 self.pg4.add_stream(pkts)
484 self.pg_enable_capture(self.pg_interfaces)
485 self.pg_start()
486 capture = self.pg3.get_capture()
487 self.verify_capture_out(capture)
488
489 # out2in 1st interface
490 pkts = self.create_stream_out(self.pg3)
491 self.pg3.add_stream(pkts)
492 self.pg_enable_capture(self.pg_interfaces)
493 self.pg_start()
494 capture = self.pg4.get_capture()
495 self.verify_capture_in(capture, self.pg4)
496
497 # in2out 2nd interface
498 pkts = self.create_stream_in(self.pg5, self.pg3)
499 self.pg5.add_stream(pkts)
500 self.pg_enable_capture(self.pg_interfaces)
501 self.pg_start()
502 capture = self.pg3.get_capture()
503 self.verify_capture_out(capture)
504
505 # out2in 2nd interface
506 pkts = self.create_stream_out(self.pg3)
507 self.pg3.add_stream(pkts)
508 self.pg_enable_capture(self.pg_interfaces)
509 self.pg_start()
510 capture = self.pg5.get_capture()
511 self.verify_capture_in(capture, self.pg5)
512
513 # in2out 3rd interface
514 pkts = self.create_stream_in(self.pg6, self.pg3)
515 self.pg6.add_stream(pkts)
516 self.pg_enable_capture(self.pg_interfaces)
517 self.pg_start()
518 capture = self.pg3.get_capture()
519 self.verify_capture_out(capture)
520
521 # out2in 3rd interface
522 pkts = self.create_stream_out(self.pg3)
523 self.pg3.add_stream(pkts)
524 self.pg_enable_capture(self.pg_interfaces)
525 self.pg_start()
526 capture = self.pg6.get_capture()
527 self.verify_capture_in(capture, self.pg6)
528
529 def tearDown(self):
530 super(TestSNAT, self).tearDown()
531 if not self.vpp_dead:
532 self.logger.info(self.vapi.cli("show snat verbose"))
533 self.clear_snat()
534
535if __name__ == '__main__':
536 unittest.main(testRunner=VppTestRunner)