blob: fb30fc3018ceb412062c89eddbf5d49cac18f356 [file] [log] [blame]
Andrew Yourtchenko669d07d2017-11-17 14:38:18 +01001#!/usr/bin/env python
2
3from framework import VppTestCase, VppTestRunner
4from vpp_udp_encap import *
Neale Rannsc0a93142018-09-05 15:42:26 -07005from vpp_ip import DpoProto
6from vpp_ip_route import VppIpRoute, VppRoutePath, VppIpTable
Andrew Yourtchenko669d07d2017-11-17 14:38:18 +01007
8from scapy.packet import Raw
9from scapy.layers.l2 import Ether, ARP
10from scapy.layers.inet import IP, UDP
11from scapy.layers.inet6 import IPv6
12from scapy.contrib.mpls import MPLS
13
14from vpp_object import *
15from socket import inet_pton, inet_ntop, AF_INET, AF_INET6
16
17
18def find_abf_policy(test, id):
19 policies = test.vapi.abf_policy_dump()
20 for p in policies:
21 if id == p.policy.policy_id:
22 return True
23 return False
24
25
26def find_abf_itf_attach(test, id, sw_if_index):
27 attachs = test.vapi.abf_itf_attach_dump()
28 for a in attachs:
29 if id == a.attach.policy_id and \
30 sw_if_index == a.attach.sw_if_index:
31 return True
32 return False
33
34
35class VppAbfPolicy(VppObject):
36
37 def __init__(self,
38 test,
39 policy_id,
40 acl,
41 paths):
42 self._test = test
43 self.policy_id = policy_id
44 self.acl = acl
45 self.paths = paths
46
47 def encode_paths(self):
48 br_paths = []
49 for p in self.paths:
50 lstack = []
51 for l in p.nh_labels:
52 if type(l) == VppMplsLabel:
53 lstack.append(l.encode())
54 else:
55 lstack.append({'label': l, 'ttl': 255})
56 n_labels = len(lstack)
57 while (len(lstack) < 16):
58 lstack.append({})
59 br_paths.append({'next_hop': p.nh_addr,
60 'weight': 1,
61 'afi': p.proto,
62 'sw_if_index': 0xffffffff,
63 'preference': 0,
64 'table_id': p.nh_table_id,
65 'next_hop_id': p.next_hop_id,
66 'is_udp_encap': p.is_udp_encap,
67 'n_labels': n_labels,
68 'label_stack': lstack})
69 return br_paths
70
71 def add_vpp_config(self):
72 self._test.vapi.abf_policy_add_del(
73 1,
74 {'policy_id': self.policy_id,
75 'acl_index': self.acl.acl_index,
76 'n_paths': len(self.paths),
77 'paths': self.encode_paths()})
78 self._test.registry.register(self, self._test.logger)
79
80 def remove_vpp_config(self):
81 self._test.vapi.abf_policy_add_del(
82 0,
83 {'policy_id': self.policy_id,
84 'acl_index': self.acl.acl_index,
85 'n_paths': len(self.paths),
86 'paths': self.encode_paths()})
87
88 def query_vpp_config(self):
89 return find_abf_policy(self._test, self.policy_id)
90
91 def __str__(self):
92 return self.object_id()
93
94 def object_id(self):
95 return ("abf-policy-%d" % self.policy_id)
96
97
98class VppAbfAttach(VppObject):
99
100 def __init__(self,
101 test,
102 policy_id,
103 sw_if_index,
104 priority,
105 is_ipv6=0):
106 self._test = test
107 self.policy_id = policy_id
108 self.sw_if_index = sw_if_index
109 self.priority = priority
110 self.is_ipv6 = is_ipv6
111
112 def add_vpp_config(self):
113 self._test.vapi.abf_itf_attach_add_del(
114 1,
115 {'policy_id': self.policy_id,
116 'sw_if_index': self.sw_if_index,
117 'priority': self.priority,
118 'is_ipv6': self.is_ipv6})
119 self._test.registry.register(self, self._test.logger)
120
121 def remove_vpp_config(self):
122 self._test.vapi.abf_itf_attach_add_del(
123 0,
124 {'policy_id': self.policy_id,
125 'sw_if_index': self.sw_if_index,
126 'priority': self.priority,
127 'is_ipv6': self.is_ipv6})
128
129 def query_vpp_config(self):
130 return find_abf_itf_attach(self._test,
131 self.policy_id,
132 self.sw_if_index)
133
134 def __str__(self):
135 return self.object_id()
136
137 def object_id(self):
138 return ("abf-attach-%d-%d" % (self.policy_id, self.sw_if_index))
139
140
141class TestAbf(VppTestCase):
142 """ ABF Test Case """
143
144 def setUp(self):
145 super(TestAbf, self).setUp()
146
147 self.create_pg_interfaces(range(4))
148
149 for i in self.pg_interfaces:
150 i.admin_up()
151 i.config_ip4()
152 i.resolve_arp()
153 i.config_ip6()
154 i.resolve_ndp()
155
156 def tearDown(self):
157 for i in self.pg_interfaces:
158 i.unconfig_ip4()
159 i.unconfig_ip6()
160 i.ip6_disable()
161 i.admin_down()
162 super(TestAbf, self).tearDown()
163
164 def test_abf4(self):
165 """ IPv4 ACL Based Forwarding
166 """
167
168 #
169 # We are not testing the various matching capabilities
170 # of ACLs, that's done elsewhere. Here ware are testing
171 # the application of ACLs to a forwarding path to achieve
172 # ABF
173 # So we construct just a few ACLs to ensure the ABF policies
174 # are correclty constructed and used. And a few path types
175 # to test the API path decoding.
176 #
177
178 #
179 # Rule 1
180 #
181 rule_1 = ({'is_permit': 1,
182 'is_ipv6': 0,
183 'proto': 17,
184 'srcport_or_icmptype_first': 1234,
185 'srcport_or_icmptype_last': 1234,
186 'src_ip_prefix_len': 32,
187 'src_ip_addr': inet_pton(AF_INET, "1.1.1.1"),
188 'dstport_or_icmpcode_first': 1234,
189 'dstport_or_icmpcode_last': 1234,
190 'dst_ip_prefix_len': 32,
191 'dst_ip_addr': inet_pton(AF_INET, "1.1.1.2")})
192 acl_1 = self.vapi.acl_add_replace(acl_index=4294967295, r=[rule_1])
193
194 #
195 # ABF policy for ACL 1 - path via interface 1
196 #
197 abf_1 = VppAbfPolicy(self, 10, acl_1,
198 [VppRoutePath(self.pg1.remote_ip4,
199 self.pg1.sw_if_index)])
200 abf_1.add_vpp_config()
201
202 #
203 # Attach the policy to input interface Pg0
204 #
205 attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index, 50)
206 attach_1.add_vpp_config()
207
208 #
209 # fire in packet matching the ACL src,dst. If it's forwarded
210 # then the ABF was successful, since default routing will drop it
211 #
212 p_1 = (Ether(src=self.pg0.remote_mac,
213 dst=self.pg0.local_mac) /
214 IP(src="1.1.1.1", dst="1.1.1.2") /
215 UDP(sport=1234, dport=1234) /
216 Raw('\xa5' * 100))
217 self.send_and_expect(self.pg0, p_1*65, self.pg1)
218
219 #
220 # Attach a 'better' priority policy to the same interface
221 #
222 abf_2 = VppAbfPolicy(self, 11, acl_1,
223 [VppRoutePath(self.pg2.remote_ip4,
224 self.pg2.sw_if_index)])
225 abf_2.add_vpp_config()
226 attach_2 = VppAbfAttach(self, 11, self.pg0.sw_if_index, 40)
227 attach_2.add_vpp_config()
228
229 self.send_and_expect(self.pg0, p_1*65, self.pg2)
230
231 #
232 # Attach a policy with priority in the middle
233 #
234 abf_3 = VppAbfPolicy(self, 12, acl_1,
235 [VppRoutePath(self.pg3.remote_ip4,
236 self.pg3.sw_if_index)])
237 abf_3.add_vpp_config()
238 attach_3 = VppAbfAttach(self, 12, self.pg0.sw_if_index, 45)
239 attach_3.add_vpp_config()
240
241 self.send_and_expect(self.pg0, p_1*65, self.pg2)
242
243 #
244 # remove the best priority
245 #
246 attach_2.remove_vpp_config()
247 self.send_and_expect(self.pg0, p_1*65, self.pg3)
248
249 #
250 # Attach one of the same policies to Pg1
251 #
252 attach_4 = VppAbfAttach(self, 12, self.pg1.sw_if_index, 45)
253 attach_4.add_vpp_config()
254
255 p_2 = (Ether(src=self.pg1.remote_mac,
256 dst=self.pg1.local_mac) /
257 IP(src="1.1.1.1", dst="1.1.1.2") /
258 UDP(sport=1234, dport=1234) /
259 Raw('\xa5' * 100))
260 self.send_and_expect(self.pg1, p_2 * 65, self.pg3)
261
262 #
263 # detach the policy from PG1, now expect traffic to be dropped
264 #
265 attach_4.remove_vpp_config()
266
267 self.send_and_assert_no_replies(self.pg1, p_2 * 65, "Detached")
268
269 def test_abf6(self):
270 """ IPv6 ACL Based Forwarding
271 """
272
273 #
274 # Simple test for matching IPv6 packets
275 #
276
277 #
278 # Rule 1
279 #
280 rule_1 = ({'is_permit': 1,
281 'is_ipv6': 1,
282 'proto': 17,
283 'srcport_or_icmptype_first': 1234,
284 'srcport_or_icmptype_last': 1234,
285 'src_ip_prefix_len': 128,
286 'src_ip_addr': inet_pton(AF_INET6, "2001::2"),
287 'dstport_or_icmpcode_first': 1234,
288 'dstport_or_icmpcode_last': 1234,
289 'dst_ip_prefix_len': 128,
290 'dst_ip_addr': inet_pton(AF_INET6, "2001::1")})
291 acl_1 = self.vapi.acl_add_replace(acl_index=4294967295,
292 r=[rule_1])
293
294 #
295 # ABF policy for ACL 1 - path via interface 1
296 #
297 abf_1 = VppAbfPolicy(self, 10, acl_1,
298 [VppRoutePath("3001::1",
299 0xffffffff,
300 proto=DpoProto.DPO_PROTO_IP6)])
301 abf_1.add_vpp_config()
302
303 attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index,
304 45, is_ipv6=True)
305 attach_1.add_vpp_config()
306
307 #
308 # a packet matching the rule
309 #
310 p = (Ether(src=self.pg0.remote_mac,
311 dst=self.pg0.local_mac) /
312 IPv6(src="2001::2", dst="2001::1") /
313 UDP(sport=1234, dport=1234) /
314 Raw('\xa5' * 100))
315
316 #
317 # packets are dropped because there is no route to the policy's
318 # next hop
319 #
320 self.send_and_assert_no_replies(self.pg1, p * 65, "no route")
321
322 #
323 # add a route resolving the next-hop
324 #
325 route = VppIpRoute(self, "3001::1", 32,
326 [VppRoutePath(self.pg1.remote_ip6,
327 self.pg1.sw_if_index,
328 proto=DpoProto.DPO_PROTO_IP6)],
329 is_ip6=1)
330 route.add_vpp_config()
331
332 #
333 # now expect packets forwarded.
334 #
335 self.send_and_expect(self.pg0, p * 65, self.pg1)
336
337
338if __name__ == '__main__':
339 unittest.main(testRunner=VppTestRunner)