| #!/usr/bin/env python3 |
| """Policy 1:1 NAT functional tests""" |
| |
| import unittest |
| from scapy.layers.inet import Ether, IP, UDP, ICMP |
| from framework import VppTestCase, VppTestRunner |
| from vpp_papi import VppEnum |
| |
| |
| class TestPNAT(VppTestCase): |
| """ PNAT Test Case """ |
| maxDiff = None |
| |
| @classmethod |
| def setUpClass(cls): |
| super(TestPNAT, cls).setUpClass() |
| cls.create_pg_interfaces(range(2)) |
| cls.interfaces = list(cls.pg_interfaces) |
| |
| @classmethod |
| def tearDownClass(cls): |
| super(TestPNAT, cls).tearDownClass() |
| |
| def setUp(self): |
| super(TestPNAT, self).setUp() |
| for i in self.interfaces: |
| i.admin_up() |
| i.config_ip4() |
| i.resolve_arp() |
| |
| def tearDown(self): |
| super(TestPNAT, self).tearDown() |
| if not self.vpp_dead: |
| for i in self.pg_interfaces: |
| i.unconfig_ip4() |
| i.admin_down() |
| |
| def validate(self, rx, expected): |
| self.assertEqual(rx, expected.__class__(expected)) |
| |
| def validate_bytes(self, rx, expected): |
| self.assertEqual(rx, expected) |
| |
| def ping_check(self): |
| """ Verify non matching traffic works. """ |
| p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) |
| |
| icmpecho = (IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) / |
| ICMP()) |
| reply = (IP(src=self.pg0.local_ip4, dst=self.pg0.remote_ip4) / |
| ICMP(type='echo-reply')) |
| rx = self.send_and_expect(self.pg0, p_ether/icmpecho * 1, self.pg0) |
| for p in rx: |
| reply[IP].id = p[IP].id |
| self.validate(p[1], reply) |
| |
| def test_pnat(self): |
| """ PNAT test """ |
| |
| PNAT_IP4_INPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_INPUT |
| PNAT_IP4_OUTPUT = \ |
| VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_OUTPUT |
| |
| tests = [ |
| { |
| 'input': PNAT_IP4_INPUT, |
| 'sw_if_index': self.pg0.sw_if_index, |
| 'match': {'mask': 0xa, 'dst': '10.10.10.10', 'proto': 17, |
| 'dport': 6871}, |
| 'rewrite': {'mask': 0x2, 'dst': self.pg1.remote_ip4}, |
| 'send': (IP(src=self.pg0.remote_ip4, dst='10.10.10.10') / |
| UDP(dport=6871)), |
| 'reply': (IP(src=self.pg0.remote_ip4, |
| dst=self.pg1.remote_ip4) / |
| UDP(dport=6871)) |
| }, |
| { |
| 'input': PNAT_IP4_OUTPUT, |
| 'sw_if_index': self.pg1.sw_if_index, |
| 'match': {'mask': 0x9, 'src': self.pg0.remote_ip4, 'proto': 17, |
| 'dport': 6871}, |
| 'rewrite': {'mask': 0x1, 'src': '11.11.11.11'}, |
| 'send': (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / |
| UDP(dport=6871)), |
| 'reply': (IP(src='11.11.11.11', dst=self.pg1.remote_ip4) / |
| UDP(dport=6871)) |
| }, |
| { |
| 'input': PNAT_IP4_INPUT, |
| 'sw_if_index': self.pg0.sw_if_index, |
| 'match': {'mask': 0xa, 'dst': '10.10.10.10', 'proto': 17, |
| 'dport': 6871}, |
| 'rewrite': {'mask': 0xa, 'dst': self.pg1.remote_ip4, |
| 'dport': 5555}, |
| 'send': (IP(src=self.pg0.remote_ip4, dst='10.10.10.10') / |
| UDP(sport=65530, dport=6871)), |
| 'reply': (IP(src=self.pg0.remote_ip4, |
| dst=self.pg1.remote_ip4) / |
| UDP(sport=65530, dport=5555)) |
| }, |
| { |
| 'input': PNAT_IP4_INPUT, |
| 'sw_if_index': self.pg0.sw_if_index, |
| 'match': {'mask': 0xa, 'dst': self.pg1.remote_ip4, 'proto': 17, |
| 'dport': 6871}, |
| 'rewrite': {'mask': 0x8, 'dport': 5555}, |
| 'send': (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / |
| UDP(dport=6871, chksum=0)), |
| 'reply': (IP(src=self.pg0.remote_ip4, |
| dst=self.pg1.remote_ip4) / |
| UDP(dport=5555, chksum=0)) |
| }, |
| { |
| 'input': PNAT_IP4_INPUT, |
| 'sw_if_index': self.pg0.sw_if_index, |
| 'match': {'mask': 0x2, 'dst': self.pg1.remote_ip4, 'proto': 1}, |
| 'rewrite': {'mask': 0x1, 'src': '8.8.8.8'}, |
| 'send': (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / |
| ICMP()), |
| 'reply': IP(src='8.8.8.8', dst=self.pg1.remote_ip4)/ICMP(), |
| }, |
| ] |
| |
| p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) |
| for t in tests: |
| rv = self.vapi.pnat_binding_add(match=t['match'], |
| rewrite=t['rewrite']) |
| self.vapi.pnat_binding_attach(sw_if_index=t['sw_if_index'], |
| attachment=t['input'], |
| binding_index=rv.binding_index) |
| |
| reply = t['reply'] |
| reply[IP].ttl -= 1 |
| rx = self.send_and_expect(self.pg0, p_ether/t['send']*1, self.pg1) |
| for p in rx: |
| # p.show2() |
| self.validate(p[1], reply) |
| |
| self.ping_check() |
| |
| self.vapi.pnat_binding_detach(sw_if_index=t['sw_if_index'], |
| attachment=t['input'], |
| binding_index=rv.binding_index) |
| self.vapi.pnat_binding_del(binding_index=rv.binding_index) |
| |
| def test_pnat_show(self): |
| """ PNAT show tests """ |
| |
| PNAT_IP4_INPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_INPUT |
| PNAT_IP4_OUTPUT = \ |
| VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_OUTPUT |
| |
| tests = [ |
| { |
| 'input': PNAT_IP4_INPUT, |
| 'sw_if_index': self.pg0.sw_if_index, |
| 'match': {'mask': 0xa, 'dst': '10.10.10.10', 'proto': 17, |
| 'dport': 6871}, |
| 'rewrite': {'mask': 0x2, 'dst': self.pg1.remote_ip4}, |
| 'send': (IP(src=self.pg0.remote_ip4, dst='10.10.10.10') / |
| UDP(dport=6871)), |
| 'reply': (IP(src=self.pg0.remote_ip4, |
| dst=self.pg1.remote_ip4) / |
| UDP(dport=6871)) |
| }, |
| { |
| 'input': PNAT_IP4_OUTPUT, |
| 'sw_if_index': self.pg1.sw_if_index, |
| 'match': {'mask': 0x9, 'src': self.pg0.remote_ip4, 'proto': 17, |
| 'dport': 6871}, |
| 'rewrite': {'mask': 0x1, 'src': '11.11.11.11'}, |
| 'send': (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / |
| UDP(dport=6871)), |
| 'reply': (IP(src='11.11.11.11', dst=self.pg1.remote_ip4) / |
| UDP(dport=6871)) |
| }, |
| ] |
| binding_index = [] |
| for t in tests: |
| rv = self.vapi.pnat_binding_add(match=t['match'], |
| rewrite=t['rewrite']) |
| binding_index.append(rv.binding_index) |
| self.vapi.pnat_binding_attach(sw_if_index=t['sw_if_index'], |
| attachment=t['input'], |
| binding_index=rv.binding_index) |
| |
| rv, l = self.vapi.pnat_bindings_get() |
| self.assertEqual(len(l), len(tests)) |
| |
| rv, l = self.vapi.pnat_interfaces_get() |
| self.assertEqual(len(l), 2) |
| |
| self.logger.info(self.vapi.cli("show pnat translations")) |
| self.logger.info(self.vapi.cli("show pnat interfaces")) |
| |
| for i, t in enumerate(tests): |
| self.vapi.pnat_binding_detach(sw_if_index=t['sw_if_index'], |
| attachment=t['input'], |
| binding_index=binding_index[i]) |
| self.vapi.pnat_binding_del(binding_index=binding_index[i]) |
| |
| if __name__ == '__main__': |
| unittest.main(testRunner=VppTestRunner) |