| #!/usr/bin/env python |
| |
| import socket |
| import unittest |
| from logging import * |
| |
| from framework import VppTestCase, VppTestRunner |
| |
| from scapy.layers.inet import IP, TCP, UDP, ICMP |
| from scapy.layers.l2 import Ether |
| |
| |
| class TestSNAT(VppTestCase): |
| """ SNAT Test Cases """ |
| |
| @classmethod |
| def setUpClass(cls): |
| super(TestSNAT, cls).setUpClass() |
| |
| try: |
| cls.tcp_port_in = 6303 |
| cls.tcp_port_out = 6303 |
| cls.udp_port_in = 6304 |
| cls.udp_port_out = 6304 |
| cls.icmp_id_in = 6305 |
| cls.icmp_id_out = 6305 |
| cls.snat_addr = '10.0.0.3' |
| |
| cls.create_pg_interfaces(range(7)) |
| cls.interfaces = list(cls.pg_interfaces[0:4]) |
| |
| for i in cls.interfaces: |
| i.admin_up() |
| i.config_ip4() |
| i.resolve_arp() |
| |
| cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7])) |
| |
| for i in cls.overlapping_interfaces: |
| i._local_ip4 = "172.16.255.1" |
| i._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4) |
| i._remote_hosts[0]._ip4 = "172.16.255.2" |
| i.set_table_ip4(i.sw_if_index) |
| i.config_ip4() |
| i.admin_up() |
| i.resolve_arp() |
| |
| except Exception: |
| super(TestSNAT, cls).tearDownClass() |
| raise |
| |
| def create_stream_in(self, in_if, out_if): |
| """ |
| Create packet stream for inside network |
| |
| :param in_if: Inside interface |
| :param out_if: Outside interface |
| """ |
| pkts = [] |
| # TCP |
| p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / |
| IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) / |
| TCP(sport=self.tcp_port_in)) |
| pkts.append(p) |
| |
| # UDP |
| p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / |
| IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) / |
| UDP(sport=self.udp_port_in)) |
| pkts.append(p) |
| |
| # ICMP |
| p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / |
| IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) / |
| ICMP(id=self.icmp_id_in, type='echo-request')) |
| pkts.append(p) |
| |
| return pkts |
| |
| def create_stream_out(self, out_if, dst_ip=None): |
| """ |
| Create packet stream for outside network |
| |
| :param out_if: Outside interface |
| :param dst_ip: Destination IP address (Default use global SNAT address) |
| """ |
| if dst_ip is None: |
| dst_ip=self.snat_addr |
| pkts = [] |
| # TCP |
| p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / |
| IP(src=out_if.remote_ip4, dst=dst_ip) / |
| TCP(dport=self.tcp_port_out)) |
| pkts.append(p) |
| |
| # UDP |
| p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / |
| IP(src=out_if.remote_ip4, dst=dst_ip) / |
| UDP(dport=self.udp_port_out)) |
| pkts.append(p) |
| |
| # ICMP |
| p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / |
| IP(src=out_if.remote_ip4, dst=dst_ip) / |
| ICMP(id=self.icmp_id_out, type='echo-reply')) |
| pkts.append(p) |
| |
| return pkts |
| |
| def verify_capture_out(self, capture, nat_ip=None, same_port=False, |
| packet_num=3): |
| """ |
| Verify captured packets on outside network |
| |
| :param capture: Captured packets |
| :param nat_ip: Translated IP address (Default use global SNAT address) |
| :param same_port: Sorce port number is not translated (Default False) |
| :param packet_num: Expected number of packets (Default 3) |
| """ |
| if nat_ip is None: |
| nat_ip = self.snat_addr |
| self.assertEqual(packet_num, len(capture)) |
| for packet in capture: |
| try: |
| self.assertEqual(packet[IP].src, nat_ip) |
| if packet.haslayer(TCP): |
| if same_port: |
| self.assertEqual(packet[TCP].sport, self.tcp_port_in) |
| else: |
| self.assertNotEqual(packet[TCP].sport, self.tcp_port_in) |
| self.tcp_port_out = packet[TCP].sport |
| elif packet.haslayer(UDP): |
| if same_port: |
| self.assertEqual(packet[UDP].sport, self.udp_port_in) |
| else: |
| self.assertNotEqual(packet[UDP].sport, self.udp_port_in) |
| self.udp_port_out = packet[UDP].sport |
| else: |
| if same_port: |
| self.assertEqual(packet[ICMP].id, self.icmp_id_in) |
| else: |
| self.assertNotEqual(packet[ICMP].id, self.icmp_id_in) |
| self.icmp_id_out = packet[ICMP].id |
| except: |
| error("Unexpected or invalid packet (outside network):") |
| error(packet.show()) |
| raise |
| |
| def verify_capture_in(self, capture, in_if, packet_num=3): |
| """ |
| Verify captured packets on inside network |
| |
| :param capture: Captured packets |
| :param in_if: Inside interface |
| :param packet_num: Expected number of packets (Default 3) |
| """ |
| self.assertEqual(packet_num, len(capture)) |
| for packet in capture: |
| try: |
| self.assertEqual(packet[IP].dst, in_if.remote_ip4) |
| if packet.haslayer(TCP): |
| self.assertEqual(packet[TCP].dport, self.tcp_port_in) |
| elif packet.haslayer(UDP): |
| self.assertEqual(packet[UDP].dport, self.udp_port_in) |
| else: |
| self.assertEqual(packet[ICMP].id, self.icmp_id_in) |
| except: |
| error("Unexpected or invalid packet (inside network):") |
| error(packet.show()) |
| raise |
| |
| def clear_snat(self): |
| """ |
| Clear SNAT configuration. |
| """ |
| interfaces = self.vapi.snat_interface_dump() |
| for intf in interfaces: |
| self.vapi.snat_interface_add_del_feature(intf.sw_if_index, |
| intf.is_inside, |
| is_add=0) |
| |
| static_mappings = self.vapi.snat_static_mapping_dump() |
| for sm in static_mappings: |
| self.vapi.snat_add_static_mapping(sm.local_ip_address, |
| sm.external_ip_address, |
| local_port=sm.local_port, |
| external_port=sm.external_port, |
| addr_only=sm.addr_only, |
| vrf_id=sm.vrf_id, |
| is_add=0) |
| |
| adresses = self.vapi.snat_address_dump() |
| for addr in adresses: |
| self.vapi.snat_add_address_range(addr.ip_address, |
| addr.ip_address, |
| is_add=0) |
| |
| def snat_add_static_mapping(self, local_ip, external_ip, local_port=0, |
| external_port=0, vrf_id=0, is_add=1): |
| """ |
| Add/delete S-NAT static mapping |
| |
| :param local_ip: Local IP address |
| :param external_ip: External IP address |
| :param local_port: Local port number (Optional) |
| :param external_port: External port number (Optional) |
| :param vrf_id: VRF ID (Default 0) |
| :param is_add: 1 if add, 0 if delete (Default add) |
| """ |
| addr_only = 1 |
| if local_port and external_port: |
| addr_only = 0 |
| l_ip = socket.inet_pton(socket.AF_INET, local_ip) |
| e_ip = socket.inet_pton(socket.AF_INET, external_ip) |
| self.vapi.snat_add_static_mapping(l_ip, e_ip, local_port, external_port, |
| addr_only, vrf_id, is_add) |
| |
| def snat_add_address(self, ip, is_add=1): |
| """ |
| Add/delete S-NAT address |
| |
| :param ip: IP address |
| :param is_add: 1 if add, 0 if delete (Default add) |
| """ |
| snat_addr = socket.inet_pton(socket.AF_INET, ip) |
| self.vapi.snat_add_address_range(snat_addr, snat_addr, is_add) |
| |
| def test_dynamic(self): |
| """ SNAT dynamic translation test """ |
| |
| self.snat_add_address(self.snat_addr) |
| self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index) |
| self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index, |
| is_inside=0) |
| |
| # in2out |
| pkts = self.create_stream_in(self.pg0, self.pg1) |
| self.pg0.add_stream(pkts) |
| self.pg_enable_capture(self.pg_interfaces) |
| self.pg_start() |
| capture = self.pg1.get_capture() |
| self.verify_capture_out(capture) |
| |
| # out2in |
| pkts = self.create_stream_out(self.pg1) |
| self.pg1.add_stream(pkts) |
| self.pg_enable_capture(self.pg_interfaces) |
| self.pg_start() |
| capture = self.pg0.get_capture() |
| self.verify_capture_in(capture, self.pg0) |
| |
| def test_static_in(self): |
| """ SNAT 1:1 NAT initialized from inside network """ |
| |
| nat_ip = "10.0.0.10" |
| self.tcp_port_out = 6303 |
| self.udp_port_out = 6304 |
| self.icmp_id_out = 6305 |
| |
| self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip) |
| self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index) |
| self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index, |
| is_inside=0) |
| |
| # in2out |
| pkts = self.create_stream_in(self.pg0, self.pg1) |
| self.pg0.add_stream(pkts) |
| self.pg_enable_capture(self.pg_interfaces) |
| self.pg_start() |
| capture = self.pg1.get_capture() |
| self.verify_capture_out(capture, nat_ip, True) |
| |
| # out2in |
| pkts = self.create_stream_out(self.pg1, nat_ip) |
| self.pg1.add_stream(pkts) |
| self.pg_enable_capture(self.pg_interfaces) |
| self.pg_start() |
| capture = self.pg0.get_capture() |
| self.verify_capture_in(capture, self.pg0) |
| |
| def test_static_out(self): |
| """ SNAT 1:1 NAT initialized from outside network """ |
| |
| nat_ip = "10.0.0.20" |
| self.tcp_port_out = 6303 |
| self.udp_port_out = 6304 |
| self.icmp_id_out = 6305 |
| |
| self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip) |
| self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index) |
| self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index, |
| is_inside=0) |
| |
| # out2in |
| pkts = self.create_stream_out(self.pg1, nat_ip) |
| self.pg1.add_stream(pkts) |
| self.pg_enable_capture(self.pg_interfaces) |
| self.pg_start() |
| capture = self.pg0.get_capture() |
| self.verify_capture_in(capture, self.pg0) |
| |
| # in2out |
| pkts = self.create_stream_in(self.pg0, self.pg1) |
| self.pg0.add_stream(pkts) |
| self.pg_enable_capture(self.pg_interfaces) |
| self.pg_start() |
| capture = self.pg1.get_capture() |
| self.verify_capture_out(capture, nat_ip, True) |
| |
| def test_static_with_port_in(self): |
| """ SNAT 1:1 NAT with port initialized from inside network """ |
| |
| self.tcp_port_out = 3606 |
| self.udp_port_out = 3607 |
| self.icmp_id_out = 3608 |
| |
| self.snat_add_address(self.snat_addr) |
| self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr, |
| self.tcp_port_in, self.tcp_port_out) |
| self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr, |
| self.udp_port_in, self.udp_port_out) |
| self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr, |
| self.icmp_id_in, self.icmp_id_out) |
| self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index) |
| self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index, |
| is_inside=0) |
| |
| # in2out |
| pkts = self.create_stream_in(self.pg0, self.pg1) |
| self.pg0.add_stream(pkts) |
| self.pg_enable_capture(self.pg_interfaces) |
| self.pg_start() |
| capture = self.pg1.get_capture() |
| self.verify_capture_out(capture) |
| |
| # out2in |
| pkts = self.create_stream_out(self.pg1) |
| self.pg1.add_stream(pkts) |
| self.pg_enable_capture(self.pg_interfaces) |
| self.pg_start() |
| capture = self.pg0.get_capture() |
| self.verify_capture_in(capture, self.pg0) |
| |
| def test_static_with_port_out(self): |
| """ SNAT 1:1 NAT with port initialized from outside network """ |
| |
| self.tcp_port_out = 30606 |
| self.udp_port_out = 30607 |
| self.icmp_id_out = 30608 |
| |
| self.snat_add_address(self.snat_addr) |
| self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr, |
| self.tcp_port_in, self.tcp_port_out) |
| self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr, |
| self.udp_port_in, self.udp_port_out) |
| self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr, |
| self.icmp_id_in, self.icmp_id_out) |
| self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index) |
| self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index, |
| is_inside=0) |
| |
| # out2in |
| pkts = self.create_stream_out(self.pg1) |
| self.pg1.add_stream(pkts) |
| self.pg_enable_capture(self.pg_interfaces) |
| self.pg_start() |
| capture = self.pg0.get_capture() |
| self.verify_capture_in(capture, self.pg0) |
| |
| # in2out |
| pkts = self.create_stream_in(self.pg0, self.pg1) |
| self.pg0.add_stream(pkts) |
| self.pg_enable_capture(self.pg_interfaces) |
| self.pg_start() |
| capture = self.pg1.get_capture() |
| self.verify_capture_out(capture) |
| |
| def test_static_vrf_aware(self): |
| """ SNAT 1:1 NAT VRF awareness """ |
| |
| nat_ip1 = "10.0.0.30" |
| nat_ip2 = "10.0.0.40" |
| self.tcp_port_out = 6303 |
| self.udp_port_out = 6304 |
| self.icmp_id_out = 6305 |
| |
| self.snat_add_static_mapping(self.pg4.remote_ip4, nat_ip1, |
| vrf_id=self.pg4.sw_if_index) |
| self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip2, |
| vrf_id=self.pg4.sw_if_index) |
| self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index, |
| is_inside=0) |
| self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index) |
| self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index) |
| |
| # inside interface VRF match SNAT static mapping VRF |
| pkts = self.create_stream_in(self.pg4, self.pg3) |
| self.pg4.add_stream(pkts) |
| self.pg_enable_capture(self.pg_interfaces) |
| self.pg_start() |
| capture = self.pg3.get_capture() |
| self.verify_capture_out(capture, nat_ip1, True) |
| |
| # inside interface VRF don't match SNAT static mapping VRF (packets |
| # are dropped) |
| pkts = self.create_stream_in(self.pg0, self.pg3) |
| self.pg0.add_stream(pkts) |
| self.pg_enable_capture(self.pg_interfaces) |
| self.pg_start() |
| capture = self.pg3.get_capture() |
| self.verify_capture_out(capture, packet_num=0) |
| |
| def test_multiple_inside_interfaces(self): |
| """ SNAT multiple inside interfaces with non-overlapping address space """ |
| |
| self.snat_add_address(self.snat_addr) |
| self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index) |
| self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index) |
| self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index) |
| self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index, |
| is_inside=0) |
| |
| # in2out 1st interface |
| pkts = self.create_stream_in(self.pg0, self.pg3) |
| self.pg0.add_stream(pkts) |
| self.pg_enable_capture(self.pg_interfaces) |
| self.pg_start() |
| capture = self.pg3.get_capture() |
| self.verify_capture_out(capture) |
| |
| # out2in 1st interface |
| pkts = self.create_stream_out(self.pg3) |
| self.pg3.add_stream(pkts) |
| self.pg_enable_capture(self.pg_interfaces) |
| self.pg_start() |
| capture = self.pg0.get_capture() |
| self.verify_capture_in(capture, self.pg0) |
| |
| # in2out 2nd interface |
| pkts = self.create_stream_in(self.pg1, self.pg3) |
| self.pg1.add_stream(pkts) |
| self.pg_enable_capture(self.pg_interfaces) |
| self.pg_start() |
| capture = self.pg3.get_capture() |
| self.verify_capture_out(capture) |
| |
| # out2in 2nd interface |
| pkts = self.create_stream_out(self.pg3) |
| self.pg3.add_stream(pkts) |
| self.pg_enable_capture(self.pg_interfaces) |
| self.pg_start() |
| capture = self.pg1.get_capture() |
| self.verify_capture_in(capture, self.pg1) |
| |
| # in2out 3rd interface |
| pkts = self.create_stream_in(self.pg2, self.pg3) |
| self.pg2.add_stream(pkts) |
| self.pg_enable_capture(self.pg_interfaces) |
| self.pg_start() |
| capture = self.pg3.get_capture() |
| self.verify_capture_out(capture) |
| |
| # out2in 3rd interface |
| pkts = self.create_stream_out(self.pg3) |
| self.pg3.add_stream(pkts) |
| self.pg_enable_capture(self.pg_interfaces) |
| self.pg_start() |
| capture = self.pg2.get_capture() |
| self.verify_capture_in(capture, self.pg2) |
| |
| def test_inside_overlapping_interfaces(self): |
| """ SNAT multiple inside interfaces with overlapping address space """ |
| |
| self.snat_add_address(self.snat_addr) |
| self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index, |
| is_inside=0) |
| self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index) |
| self.vapi.snat_interface_add_del_feature(self.pg5.sw_if_index) |
| self.vapi.snat_interface_add_del_feature(self.pg6.sw_if_index) |
| |
| # in2out 1st interface |
| pkts = self.create_stream_in(self.pg4, self.pg3) |
| self.pg4.add_stream(pkts) |
| self.pg_enable_capture(self.pg_interfaces) |
| self.pg_start() |
| capture = self.pg3.get_capture() |
| self.verify_capture_out(capture) |
| |
| # out2in 1st interface |
| pkts = self.create_stream_out(self.pg3) |
| self.pg3.add_stream(pkts) |
| self.pg_enable_capture(self.pg_interfaces) |
| self.pg_start() |
| capture = self.pg4.get_capture() |
| self.verify_capture_in(capture, self.pg4) |
| |
| # in2out 2nd interface |
| pkts = self.create_stream_in(self.pg5, self.pg3) |
| self.pg5.add_stream(pkts) |
| self.pg_enable_capture(self.pg_interfaces) |
| self.pg_start() |
| capture = self.pg3.get_capture() |
| self.verify_capture_out(capture) |
| |
| # out2in 2nd interface |
| pkts = self.create_stream_out(self.pg3) |
| self.pg3.add_stream(pkts) |
| self.pg_enable_capture(self.pg_interfaces) |
| self.pg_start() |
| capture = self.pg5.get_capture() |
| self.verify_capture_in(capture, self.pg5) |
| |
| # in2out 3rd interface |
| pkts = self.create_stream_in(self.pg6, self.pg3) |
| self.pg6.add_stream(pkts) |
| self.pg_enable_capture(self.pg_interfaces) |
| self.pg_start() |
| capture = self.pg3.get_capture() |
| self.verify_capture_out(capture) |
| |
| # out2in 3rd interface |
| pkts = self.create_stream_out(self.pg3) |
| self.pg3.add_stream(pkts) |
| self.pg_enable_capture(self.pg_interfaces) |
| self.pg_start() |
| capture = self.pg6.get_capture() |
| self.verify_capture_in(capture, self.pg6) |
| |
| def tearDown(self): |
| super(TestSNAT, self).tearDown() |
| if not self.vpp_dead: |
| self.logger.info(self.vapi.cli("show snat verbose")) |
| self.clear_snat() |
| |
| if __name__ == '__main__': |
| unittest.main(testRunner=VppTestRunner) |