blob: a940bd3a64c8f8e251b5c4ce3b3423a6b46ef783 [file] [log] [blame]
Neale Ranns039cbfe2018-02-27 03:45:38 -08001#!/usr/bin/env python
2
3import unittest
4import socket
5import struct
6
7from framework import VppTestCase, VppTestRunner
8from vpp_object import VppObject
9from vpp_papi_provider import QOS_SOURCE
10from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsRoute
11from vpp_sub_interface import VppSubInterface, VppDot1QSubint
12
13from scapy.packet import Raw
14from scapy.layers.l2 import Ether, Dot1Q
15from scapy.layers.inet import IP, UDP
16from scapy.layers.inet6 import IPv6
17from scapy.contrib.mpls import MPLS
18
19
20class TestQOS(VppTestCase):
21 """ QOS Test Case """
22
23 def setUp(self):
24 super(TestQOS, self).setUp()
25
26 self.create_pg_interfaces(range(5))
27
28 for i in self.pg_interfaces:
29 i.admin_up()
30 i.config_ip4()
31 i.resolve_arp()
32 i.config_ip6()
33 i.resolve_ndp()
34
35 def tearDown(self):
36 for i in self.pg_interfaces:
37 i.unconfig_ip4()
38 i.unconfig_ip6()
39
40 super(TestQOS, self).tearDown()
41
42 def test_qos_ip(self):
43 """ QoS Mark IP """
44
45 #
46 # for table 1 map the n=0xff possible values of input QoS mark,
47 # n to 1-n
48 #
49 output = [chr(0)] * 256
50 for i in range(0, 255):
51 output[i] = chr(255 - i)
52 os = ''.join(output)
53 rows = [{'outputs': os},
54 {'outputs': os},
55 {'outputs': os},
56 {'outputs': os}]
57
58 self.vapi.qos_egress_map_update(1, rows)
59
60 #
61 # For table 2 (and up) use the value n for everything
62 #
63 output = [chr(2)] * 256
64 os = ''.join(output)
65 rows = [{'outputs': os},
66 {'outputs': os},
67 {'outputs': os},
68 {'outputs': os}]
69
70 self.vapi.qos_egress_map_update(2, rows)
71
72 output = [chr(3)] * 256
73 os = ''.join(output)
74 rows = [{'outputs': os},
75 {'outputs': os},
76 {'outputs': os},
77 {'outputs': os}]
78
79 self.vapi.qos_egress_map_update(3, rows)
80
81 output = [chr(4)] * 256
82 os = ''.join(output)
83 rows = [{'outputs': os},
84 {'outputs': os},
85 {'outputs': os},
86 {'outputs': os}]
87 self.vapi.qos_egress_map_update(4, rows)
88 self.vapi.qos_egress_map_update(5, rows)
89 self.vapi.qos_egress_map_update(6, rows)
90 self.vapi.qos_egress_map_update(7, rows)
91
92 self.logger.info(self.vapi.cli("sh qos eg map"))
93
94 #
95 # Bind interface pgN to table n
96 #
97 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
98 QOS_SOURCE.IP,
99 1,
100 1)
101 self.vapi.qos_mark_enable_disable(self.pg2.sw_if_index,
102 QOS_SOURCE.IP,
103 2,
104 1)
105 self.vapi.qos_mark_enable_disable(self.pg3.sw_if_index,
106 QOS_SOURCE.IP,
107 3,
108 1)
109 self.vapi.qos_mark_enable_disable(self.pg4.sw_if_index,
110 QOS_SOURCE.IP,
111 4,
112 1)
113
114 #
115 # packets ingress on Pg0
116 #
117 p_v4 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
118 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, tos=1) /
119 UDP(sport=1234, dport=1234) /
120 Raw(chr(100) * 65))
121 p_v6 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
122 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6,
123 tc=1) /
124 UDP(sport=1234, dport=1234) /
125 Raw(chr(100) * 65))
126
127 #
128 # Since we have not yet enabled the recording of the input QoS
129 # from the input iP header, the egress packet's ToS will be unchanged
130 #
131 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
132 for p in rx:
133 self.assertEqual(p[IP].tos, 1)
134 rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
135 for p in rx:
136 self.assertEqual(p[IPv6].tc, 1)
137
138 #
139 # Enable QoS recrding on IP input for pg0
140 #
141 self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
142 QOS_SOURCE.IP,
143 1)
144
145 #
146 # send the same packets, this time expect the input TOS of 1
147 # to be mapped to pg1's egress value of 254
148 #
149 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
150 for p in rx:
151 self.assertEqual(p[IP].tos, 254)
152 rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
153 for p in rx:
154 self.assertEqual(p[IPv6].tc, 254)
155
156 #
157 # different input ToS to test the mapping
158 #
159 p_v4[IP].tos = 127
160 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
161 for p in rx:
162 self.assertEqual(p[IP].tos, 128)
163 p_v6[IPv6].tc = 127
164 rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
165 for p in rx:
166 self.assertEqual(p[IPv6].tc, 128)
167
168 p_v4[IP].tos = 254
169 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
170 for p in rx:
171 self.assertEqual(p[IP].tos, 1)
172 p_v6[IPv6].tc = 254
173 rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
174 for p in rx:
175 self.assertEqual(p[IPv6].tc, 1)
176
177 #
178 # send packets out the other interfaces to test the maps are
179 # correctly applied
180 #
181 p_v4[IP].dst = self.pg2.remote_ip4
182 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg2)
183 for p in rx:
184 self.assertEqual(p[IP].tos, 2)
185
186 p_v4[IP].dst = self.pg3.remote_ip4
187 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg3)
188 for p in rx:
189 self.assertEqual(p[IP].tos, 3)
190
191 p_v6[IPv6].dst = self.pg3.remote_ip6
192 rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg3)
193 for p in rx:
194 self.assertEqual(p[IPv6].tc, 3)
195
196 #
197 # remove the map on pg2 and pg3, now expect an unchanged IP tos
198 #
199 self.vapi.qos_mark_enable_disable(self.pg2.sw_if_index,
200 QOS_SOURCE.IP,
201 2,
202 0)
203 self.vapi.qos_mark_enable_disable(self.pg3.sw_if_index,
204 QOS_SOURCE.IP,
205 3,
206 0)
207 self.logger.info(self.vapi.cli("sh int feat pg2"))
208
209 p_v4[IP].dst = self.pg2.remote_ip4
210 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg2)
211 for p in rx:
212 self.assertEqual(p[IP].tos, 254)
213
214 p_v4[IP].dst = self.pg3.remote_ip4
215 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg3)
216 for p in rx:
217 self.assertEqual(p[IP].tos, 254)
218
219 #
220 # still mapping out of pg1
221 #
222 p_v4[IP].dst = self.pg1.remote_ip4
223 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
224 for p in rx:
225 self.assertEqual(p[IP].tos, 1)
226
227 #
228 # disable the input recording on pg0
229 #
230 self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
231 QOS_SOURCE.IP,
232 0)
233
234 #
235 # back to an unchanged TOS value
236 #
237 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
238 for p in rx:
239 self.assertEqual(p[IP].tos, 254)
240
241 #
242 # disable the egress map on pg1 and pg4
243 #
244 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
245 QOS_SOURCE.IP,
246 1,
247 0)
248 self.vapi.qos_mark_enable_disable(self.pg4.sw_if_index,
249 QOS_SOURCE.IP,
250 4,
251 0)
252
253 #
254 # unchanged Tos on pg1
255 #
256 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
257 for p in rx:
258 self.assertEqual(p[IP].tos, 254)
259
260 #
261 # clean-up the masp
262 #
263 self.vapi.qos_egress_map_delete(1)
264 self.vapi.qos_egress_map_delete(4)
265 self.vapi.qos_egress_map_delete(2)
266 self.vapi.qos_egress_map_delete(3)
267 self.vapi.qos_egress_map_delete(5)
268 self.vapi.qos_egress_map_delete(6)
269 self.vapi.qos_egress_map_delete(7)
270
271 def test_qos_mpls(self):
272 """ QoS Mark MPLS """
273
274 #
275 # 255 QoS for all input values
276 #
277 output = [chr(255)] * 256
278 os = ''.join(output)
279 rows = [{'outputs': os},
280 {'outputs': os},
281 {'outputs': os},
282 {'outputs': os}]
283
284 self.vapi.qos_egress_map_update(1, rows)
285
286 #
287 # a route with 1 MPLS label
288 #
289 route_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
290 [VppRoutePath(self.pg1.remote_ip4,
291 self.pg1.sw_if_index,
292 labels=[32])])
293 route_10_0_0_1.add_vpp_config()
294
295 #
296 # a route with 3 MPLS labels
297 #
298 route_10_0_0_3 = VppIpRoute(self, "10.0.0.3", 32,
299 [VppRoutePath(self.pg1.remote_ip4,
300 self.pg1.sw_if_index,
301 labels=[63, 33, 34])])
302 route_10_0_0_3.add_vpp_config()
303
304 #
305 # enable IP QoS recording on the input Pg0 and MPLS egress marking
306 # on Pg1
307 #
308 self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
309 QOS_SOURCE.IP,
310 1)
311 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
312 QOS_SOURCE.MPLS,
313 1,
314 1)
315
316 #
317 # packet that will get one label added and 3 labels added resp.
318 #
319 p_1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
320 IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) /
321 UDP(sport=1234, dport=1234) /
322 Raw(chr(100) * 65))
323 p_3 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
324 IP(src=self.pg0.remote_ip4, dst="10.0.0.3", tos=1) /
325 UDP(sport=1234, dport=1234) /
326 Raw(chr(100) * 65))
327
328 rx = self.send_and_expect(self.pg0, p_1 * 65, self.pg1)
329
330 #
331 # only 3 bits of ToS value in MPLS make sure tos is correct
332 # and the label and EOS bit have not been corrupted
333 #
334 for p in rx:
335 self.assertEqual(p[MPLS].cos, 7)
336 self.assertEqual(p[MPLS].label, 32)
337 self.assertEqual(p[MPLS].s, 1)
338 rx = self.send_and_expect(self.pg0, p_3 * 65, self.pg1)
339 for p in rx:
340 self.assertEqual(p[MPLS].cos, 7)
341 self.assertEqual(p[MPLS].label, 63)
342 self.assertEqual(p[MPLS].s, 0)
343 h = p[MPLS].payload
344 self.assertEqual(h[MPLS].cos, 7)
345 self.assertEqual(h[MPLS].label, 33)
346 self.assertEqual(h[MPLS].s, 0)
347 h = h[MPLS].payload
348 self.assertEqual(h[MPLS].cos, 7)
349 self.assertEqual(h[MPLS].label, 34)
350 self.assertEqual(h[MPLS].s, 1)
351
352 #
353 # cleanup
354 #
355 self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
356 QOS_SOURCE.IP,
357 0)
358 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
359 QOS_SOURCE.MPLS,
360 1,
361 0)
362 self.vapi.qos_egress_map_delete(1)
363
364
365if __name__ == '__main__':
366 unittest.main(testRunner=VppTestRunner)