blob: d5b600506912b2f26a5255ca22d9c2c249f6c39d [file] [log] [blame]
Ole Troan18327be2021-01-12 21:49:38 +01001#!/usr/bin/env python3
2"""Policy 1:1 NAT functional tests"""
3
4import unittest
5from scapy.layers.inet import Ether, IP, UDP, ICMP
6from framework import VppTestCase, VppTestRunner
7from vpp_papi import VppEnum
8
9
10class TestPNAT(VppTestCase):
11 """ PNAT Test Case """
12 maxDiff = None
13
14 @classmethod
15 def setUpClass(cls):
16 super(TestPNAT, cls).setUpClass()
17 cls.create_pg_interfaces(range(2))
18 cls.interfaces = list(cls.pg_interfaces)
19
20 @classmethod
21 def tearDownClass(cls):
22 super(TestPNAT, cls).tearDownClass()
23
24 def setUp(self):
25 super(TestPNAT, self).setUp()
26 for i in self.interfaces:
27 i.admin_up()
28 i.config_ip4()
29 i.resolve_arp()
30
31 def tearDown(self):
32 super(TestPNAT, self).tearDown()
33 if not self.vpp_dead:
34 for i in self.pg_interfaces:
35 i.unconfig_ip4()
36 i.admin_down()
37
38 def validate(self, rx, expected):
39 self.assertEqual(rx, expected.__class__(expected))
40
41 def validate_bytes(self, rx, expected):
42 self.assertEqual(rx, expected)
43
44 def ping_check(self):
45 """ Verify non matching traffic works. """
46 p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
47
48 icmpecho = (IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) /
49 ICMP())
50 reply = (IP(src=self.pg0.local_ip4, dst=self.pg0.remote_ip4) /
51 ICMP(type='echo-reply'))
52 rx = self.send_and_expect(self.pg0, p_ether/icmpecho * 1, self.pg0)
53 for p in rx:
54 reply[IP].id = p[IP].id
55 self.validate(p[1], reply)
56
57 def test_pnat(self):
58 """ PNAT test """
59
60 PNAT_IP4_INPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_INPUT
61 PNAT_IP4_OUTPUT = \
62 VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_OUTPUT
63
64 tests = [
65 {
66 'input': PNAT_IP4_INPUT,
67 'sw_if_index': self.pg0.sw_if_index,
68 'match': {'mask': 0xa, 'dst': '10.10.10.10', 'proto': 17,
69 'dport': 6871},
70 'rewrite': {'mask': 0x2, 'dst': self.pg1.remote_ip4},
71 'send': (IP(src=self.pg0.remote_ip4, dst='10.10.10.10') /
72 UDP(dport=6871)),
73 'reply': (IP(src=self.pg0.remote_ip4,
74 dst=self.pg1.remote_ip4) /
75 UDP(dport=6871))
76 },
77 {
78 'input': PNAT_IP4_OUTPUT,
79 'sw_if_index': self.pg1.sw_if_index,
80 'match': {'mask': 0x9, 'src': self.pg0.remote_ip4, 'proto': 17,
81 'dport': 6871},
82 'rewrite': {'mask': 0x1, 'src': '11.11.11.11'},
83 'send': (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
84 UDP(dport=6871)),
85 'reply': (IP(src='11.11.11.11', dst=self.pg1.remote_ip4) /
86 UDP(dport=6871))
87 },
88 {
89 'input': PNAT_IP4_INPUT,
90 'sw_if_index': self.pg0.sw_if_index,
91 'match': {'mask': 0xa, 'dst': '10.10.10.10', 'proto': 17,
92 'dport': 6871},
93 'rewrite': {'mask': 0xa, 'dst': self.pg1.remote_ip4,
94 'dport': 5555},
95 'send': (IP(src=self.pg0.remote_ip4, dst='10.10.10.10') /
Ole Troanc9c91432021-02-06 13:02:41 +010096 UDP(sport=65530, dport=6871)),
Ole Troan18327be2021-01-12 21:49:38 +010097 'reply': (IP(src=self.pg0.remote_ip4,
98 dst=self.pg1.remote_ip4) /
Ole Troanc9c91432021-02-06 13:02:41 +010099 UDP(sport=65530, dport=5555))
Ole Troan18327be2021-01-12 21:49:38 +0100100 },
101 {
102 'input': PNAT_IP4_INPUT,
103 'sw_if_index': self.pg0.sw_if_index,
104 'match': {'mask': 0xa, 'dst': self.pg1.remote_ip4, 'proto': 17,
105 'dport': 6871},
106 'rewrite': {'mask': 0x8, 'dport': 5555},
107 'send': (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
108 UDP(dport=6871, chksum=0)),
109 'reply': (IP(src=self.pg0.remote_ip4,
110 dst=self.pg1.remote_ip4) /
111 UDP(dport=5555, chksum=0))
112 },
113 {
114 'input': PNAT_IP4_INPUT,
115 'sw_if_index': self.pg0.sw_if_index,
116 'match': {'mask': 0x2, 'dst': self.pg1.remote_ip4, 'proto': 1},
117 'rewrite': {'mask': 0x1, 'src': '8.8.8.8'},
118 'send': (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
119 ICMP()),
120 'reply': IP(src='8.8.8.8', dst=self.pg1.remote_ip4)/ICMP(),
121 },
122 ]
123
124 p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
125 for t in tests:
126 rv = self.vapi.pnat_binding_add(match=t['match'],
127 rewrite=t['rewrite'])
128 self.vapi.pnat_binding_attach(sw_if_index=t['sw_if_index'],
129 attachment=t['input'],
130 binding_index=rv.binding_index)
131
132 reply = t['reply']
133 reply[IP].ttl -= 1
134 rx = self.send_and_expect(self.pg0, p_ether/t['send']*1, self.pg1)
135 for p in rx:
136 # p.show2()
137 self.validate(p[1], reply)
138
139 self.ping_check()
140
141 self.vapi.pnat_binding_detach(sw_if_index=t['sw_if_index'],
142 attachment=t['input'],
143 binding_index=rv.binding_index)
144 self.vapi.pnat_binding_del(binding_index=rv.binding_index)
145
146 def test_pnat_show(self):
147 """ PNAT show tests """
148
149 PNAT_IP4_INPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_INPUT
150 PNAT_IP4_OUTPUT = \
151 VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_OUTPUT
152
153 tests = [
154 {
155 'input': PNAT_IP4_INPUT,
156 'sw_if_index': self.pg0.sw_if_index,
157 'match': {'mask': 0xa, 'dst': '10.10.10.10', 'proto': 17,
158 'dport': 6871},
159 'rewrite': {'mask': 0x2, 'dst': self.pg1.remote_ip4},
160 'send': (IP(src=self.pg0.remote_ip4, dst='10.10.10.10') /
161 UDP(dport=6871)),
162 'reply': (IP(src=self.pg0.remote_ip4,
163 dst=self.pg1.remote_ip4) /
164 UDP(dport=6871))
165 },
166 {
167 'input': PNAT_IP4_OUTPUT,
168 'sw_if_index': self.pg1.sw_if_index,
169 'match': {'mask': 0x9, 'src': self.pg0.remote_ip4, 'proto': 17,
170 'dport': 6871},
171 'rewrite': {'mask': 0x1, 'src': '11.11.11.11'},
172 'send': (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
173 UDP(dport=6871)),
174 'reply': (IP(src='11.11.11.11', dst=self.pg1.remote_ip4) /
175 UDP(dport=6871))
176 },
177 ]
178 binding_index = []
179 for t in tests:
180 rv = self.vapi.pnat_binding_add(match=t['match'],
181 rewrite=t['rewrite'])
182 binding_index.append(rv.binding_index)
183 self.vapi.pnat_binding_attach(sw_if_index=t['sw_if_index'],
184 attachment=t['input'],
185 binding_index=rv.binding_index)
186
187 rv, l = self.vapi.pnat_bindings_get()
188 self.assertEqual(len(l), len(tests))
189
190 rv, l = self.vapi.pnat_interfaces_get()
191 self.assertEqual(len(l), 2)
192
193 self.logger.info(self.vapi.cli("show pnat translations"))
194 self.logger.info(self.vapi.cli("show pnat interfaces"))
195
196 for i, t in enumerate(tests):
197 self.vapi.pnat_binding_detach(sw_if_index=t['sw_if_index'],
198 attachment=t['input'],
199 binding_index=binding_index[i])
200 self.vapi.pnat_binding_del(binding_index=binding_index[i])
201
202if __name__ == '__main__':
203 unittest.main(testRunner=VppTestRunner)