blob: ce53e53fc984d95f329663b9a18bfffa7a54e972 [file] [log] [blame]
Andrew Yourtchenko669d07d2017-11-17 14:38:18 +01001#!/usr/bin/env python
2
3from framework import VppTestCase, VppTestRunner
4from vpp_udp_encap import *
5from vpp_ip_route import VppIpRoute, VppRoutePath, VppIpTable, DpoProto
6
7from scapy.packet import Raw
8from scapy.layers.l2 import Ether, ARP
9from scapy.layers.inet import IP, UDP
10from scapy.layers.inet6 import IPv6
11from scapy.contrib.mpls import MPLS
12
13from vpp_object import *
14from socket import inet_pton, inet_ntop, AF_INET, AF_INET6
15
16
17def find_abf_policy(test, id):
18 policies = test.vapi.abf_policy_dump()
19 for p in policies:
20 if id == p.policy.policy_id:
21 return True
22 return False
23
24
25def find_abf_itf_attach(test, id, sw_if_index):
26 attachs = test.vapi.abf_itf_attach_dump()
27 for a in attachs:
28 if id == a.attach.policy_id and \
29 sw_if_index == a.attach.sw_if_index:
30 return True
31 return False
32
33
34class VppAbfPolicy(VppObject):
35
36 def __init__(self,
37 test,
38 policy_id,
39 acl,
40 paths):
41 self._test = test
42 self.policy_id = policy_id
43 self.acl = acl
44 self.paths = paths
45
46 def encode_paths(self):
47 br_paths = []
48 for p in self.paths:
49 lstack = []
50 for l in p.nh_labels:
51 if type(l) == VppMplsLabel:
52 lstack.append(l.encode())
53 else:
54 lstack.append({'label': l, 'ttl': 255})
55 n_labels = len(lstack)
56 while (len(lstack) < 16):
57 lstack.append({})
58 br_paths.append({'next_hop': p.nh_addr,
59 'weight': 1,
60 'afi': p.proto,
61 'sw_if_index': 0xffffffff,
62 'preference': 0,
63 'table_id': p.nh_table_id,
64 'next_hop_id': p.next_hop_id,
65 'is_udp_encap': p.is_udp_encap,
66 'n_labels': n_labels,
67 'label_stack': lstack})
68 return br_paths
69
70 def add_vpp_config(self):
71 self._test.vapi.abf_policy_add_del(
72 1,
73 {'policy_id': self.policy_id,
74 'acl_index': self.acl.acl_index,
75 'n_paths': len(self.paths),
76 'paths': self.encode_paths()})
77 self._test.registry.register(self, self._test.logger)
78
79 def remove_vpp_config(self):
80 self._test.vapi.abf_policy_add_del(
81 0,
82 {'policy_id': self.policy_id,
83 'acl_index': self.acl.acl_index,
84 'n_paths': len(self.paths),
85 'paths': self.encode_paths()})
86
87 def query_vpp_config(self):
88 return find_abf_policy(self._test, self.policy_id)
89
90 def __str__(self):
91 return self.object_id()
92
93 def object_id(self):
94 return ("abf-policy-%d" % self.policy_id)
95
96
97class VppAbfAttach(VppObject):
98
99 def __init__(self,
100 test,
101 policy_id,
102 sw_if_index,
103 priority,
104 is_ipv6=0):
105 self._test = test
106 self.policy_id = policy_id
107 self.sw_if_index = sw_if_index
108 self.priority = priority
109 self.is_ipv6 = is_ipv6
110
111 def add_vpp_config(self):
112 self._test.vapi.abf_itf_attach_add_del(
113 1,
114 {'policy_id': self.policy_id,
115 'sw_if_index': self.sw_if_index,
116 'priority': self.priority,
117 'is_ipv6': self.is_ipv6})
118 self._test.registry.register(self, self._test.logger)
119
120 def remove_vpp_config(self):
121 self._test.vapi.abf_itf_attach_add_del(
122 0,
123 {'policy_id': self.policy_id,
124 'sw_if_index': self.sw_if_index,
125 'priority': self.priority,
126 'is_ipv6': self.is_ipv6})
127
128 def query_vpp_config(self):
129 return find_abf_itf_attach(self._test,
130 self.policy_id,
131 self.sw_if_index)
132
133 def __str__(self):
134 return self.object_id()
135
136 def object_id(self):
137 return ("abf-attach-%d-%d" % (self.policy_id, self.sw_if_index))
138
139
140class TestAbf(VppTestCase):
141 """ ABF Test Case """
142
143 def setUp(self):
144 super(TestAbf, self).setUp()
145
146 self.create_pg_interfaces(range(4))
147
148 for i in self.pg_interfaces:
149 i.admin_up()
150 i.config_ip4()
151 i.resolve_arp()
152 i.config_ip6()
153 i.resolve_ndp()
154
155 def tearDown(self):
156 for i in self.pg_interfaces:
157 i.unconfig_ip4()
158 i.unconfig_ip6()
159 i.ip6_disable()
160 i.admin_down()
161 super(TestAbf, self).tearDown()
162
163 def test_abf4(self):
164 """ IPv4 ACL Based Forwarding
165 """
166
167 #
168 # We are not testing the various matching capabilities
169 # of ACLs, that's done elsewhere. Here ware are testing
170 # the application of ACLs to a forwarding path to achieve
171 # ABF
172 # So we construct just a few ACLs to ensure the ABF policies
173 # are correclty constructed and used. And a few path types
174 # to test the API path decoding.
175 #
176
177 #
178 # Rule 1
179 #
180 rule_1 = ({'is_permit': 1,
181 'is_ipv6': 0,
182 'proto': 17,
183 'srcport_or_icmptype_first': 1234,
184 'srcport_or_icmptype_last': 1234,
185 'src_ip_prefix_len': 32,
186 'src_ip_addr': inet_pton(AF_INET, "1.1.1.1"),
187 'dstport_or_icmpcode_first': 1234,
188 'dstport_or_icmpcode_last': 1234,
189 'dst_ip_prefix_len': 32,
190 'dst_ip_addr': inet_pton(AF_INET, "1.1.1.2")})
191 acl_1 = self.vapi.acl_add_replace(acl_index=4294967295, r=[rule_1])
192
193 #
194 # ABF policy for ACL 1 - path via interface 1
195 #
196 abf_1 = VppAbfPolicy(self, 10, acl_1,
197 [VppRoutePath(self.pg1.remote_ip4,
198 self.pg1.sw_if_index)])
199 abf_1.add_vpp_config()
200
201 #
202 # Attach the policy to input interface Pg0
203 #
204 attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index, 50)
205 attach_1.add_vpp_config()
206
207 #
208 # fire in packet matching the ACL src,dst. If it's forwarded
209 # then the ABF was successful, since default routing will drop it
210 #
211 p_1 = (Ether(src=self.pg0.remote_mac,
212 dst=self.pg0.local_mac) /
213 IP(src="1.1.1.1", dst="1.1.1.2") /
214 UDP(sport=1234, dport=1234) /
215 Raw('\xa5' * 100))
216 self.send_and_expect(self.pg0, p_1*65, self.pg1)
217
218 #
219 # Attach a 'better' priority policy to the same interface
220 #
221 abf_2 = VppAbfPolicy(self, 11, acl_1,
222 [VppRoutePath(self.pg2.remote_ip4,
223 self.pg2.sw_if_index)])
224 abf_2.add_vpp_config()
225 attach_2 = VppAbfAttach(self, 11, self.pg0.sw_if_index, 40)
226 attach_2.add_vpp_config()
227
228 self.send_and_expect(self.pg0, p_1*65, self.pg2)
229
230 #
231 # Attach a policy with priority in the middle
232 #
233 abf_3 = VppAbfPolicy(self, 12, acl_1,
234 [VppRoutePath(self.pg3.remote_ip4,
235 self.pg3.sw_if_index)])
236 abf_3.add_vpp_config()
237 attach_3 = VppAbfAttach(self, 12, self.pg0.sw_if_index, 45)
238 attach_3.add_vpp_config()
239
240 self.send_and_expect(self.pg0, p_1*65, self.pg2)
241
242 #
243 # remove the best priority
244 #
245 attach_2.remove_vpp_config()
246 self.send_and_expect(self.pg0, p_1*65, self.pg3)
247
248 #
249 # Attach one of the same policies to Pg1
250 #
251 attach_4 = VppAbfAttach(self, 12, self.pg1.sw_if_index, 45)
252 attach_4.add_vpp_config()
253
254 p_2 = (Ether(src=self.pg1.remote_mac,
255 dst=self.pg1.local_mac) /
256 IP(src="1.1.1.1", dst="1.1.1.2") /
257 UDP(sport=1234, dport=1234) /
258 Raw('\xa5' * 100))
259 self.send_and_expect(self.pg1, p_2 * 65, self.pg3)
260
261 #
262 # detach the policy from PG1, now expect traffic to be dropped
263 #
264 attach_4.remove_vpp_config()
265
266 self.send_and_assert_no_replies(self.pg1, p_2 * 65, "Detached")
267
268 def test_abf6(self):
269 """ IPv6 ACL Based Forwarding
270 """
271
272 #
273 # Simple test for matching IPv6 packets
274 #
275
276 #
277 # Rule 1
278 #
279 rule_1 = ({'is_permit': 1,
280 'is_ipv6': 1,
281 'proto': 17,
282 'srcport_or_icmptype_first': 1234,
283 'srcport_or_icmptype_last': 1234,
284 'src_ip_prefix_len': 128,
285 'src_ip_addr': inet_pton(AF_INET6, "2001::2"),
286 'dstport_or_icmpcode_first': 1234,
287 'dstport_or_icmpcode_last': 1234,
288 'dst_ip_prefix_len': 128,
289 'dst_ip_addr': inet_pton(AF_INET6, "2001::1")})
290 acl_1 = self.vapi.acl_add_replace(acl_index=4294967295,
291 r=[rule_1])
292
293 #
294 # ABF policy for ACL 1 - path via interface 1
295 #
296 abf_1 = VppAbfPolicy(self, 10, acl_1,
297 [VppRoutePath("3001::1",
298 0xffffffff,
299 proto=DpoProto.DPO_PROTO_IP6)])
300 abf_1.add_vpp_config()
301
302 attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index,
303 45, is_ipv6=True)
304 attach_1.add_vpp_config()
305
306 #
307 # a packet matching the rule
308 #
309 p = (Ether(src=self.pg0.remote_mac,
310 dst=self.pg0.local_mac) /
311 IPv6(src="2001::2", dst="2001::1") /
312 UDP(sport=1234, dport=1234) /
313 Raw('\xa5' * 100))
314
315 #
316 # packets are dropped because there is no route to the policy's
317 # next hop
318 #
319 self.send_and_assert_no_replies(self.pg1, p * 65, "no route")
320
321 #
322 # add a route resolving the next-hop
323 #
324 route = VppIpRoute(self, "3001::1", 32,
325 [VppRoutePath(self.pg1.remote_ip6,
326 self.pg1.sw_if_index,
327 proto=DpoProto.DPO_PROTO_IP6)],
328 is_ip6=1)
329 route.add_vpp_config()
330
331 #
332 # now expect packets forwarded.
333 #
334 self.send_and_expect(self.pg0, p * 65, self.pg1)
335
336
337if __name__ == '__main__':
338 unittest.main(testRunner=VppTestRunner)