blob: 939cca5d8d1010ed0498e2615e41aecf4a0c6265 [file] [log] [blame]
Neale Ranns039cbfe2018-02-27 03:45:38 -08001#!/usr/bin/env python
2
3import unittest
Neale Ranns039cbfe2018-02-27 03:45:38 -08004
5from framework import VppTestCase, VppTestRunner
Neale Ranns039cbfe2018-02-27 03:45:38 -08006from vpp_papi_provider import QOS_SOURCE
Klement Sekerab9ef2732018-06-24 22:49:33 +02007from vpp_ip_route import VppIpRoute, VppRoutePath
Neale Ranns039cbfe2018-02-27 03:45:38 -08008
9from scapy.packet import Raw
Klement Sekerab9ef2732018-06-24 22:49:33 +020010from scapy.layers.l2 import Ether
Neale Ranns039cbfe2018-02-27 03:45:38 -080011from scapy.layers.inet import IP, UDP
12from scapy.layers.inet6 import IPv6
13from scapy.contrib.mpls import MPLS
14
15
16class TestQOS(VppTestCase):
17 """ QOS Test Case """
18
19 def setUp(self):
20 super(TestQOS, self).setUp()
21
22 self.create_pg_interfaces(range(5))
23
24 for i in self.pg_interfaces:
25 i.admin_up()
26 i.config_ip4()
27 i.resolve_arp()
28 i.config_ip6()
29 i.resolve_ndp()
30
31 def tearDown(self):
32 for i in self.pg_interfaces:
33 i.unconfig_ip4()
34 i.unconfig_ip6()
35
36 super(TestQOS, self).tearDown()
37
38 def test_qos_ip(self):
39 """ QoS Mark IP """
40
41 #
42 # for table 1 map the n=0xff possible values of input QoS mark,
43 # n to 1-n
44 #
45 output = [chr(0)] * 256
46 for i in range(0, 255):
47 output[i] = chr(255 - i)
48 os = ''.join(output)
49 rows = [{'outputs': os},
50 {'outputs': os},
51 {'outputs': os},
52 {'outputs': os}]
53
54 self.vapi.qos_egress_map_update(1, rows)
55
56 #
57 # For table 2 (and up) use the value n for everything
58 #
59 output = [chr(2)] * 256
60 os = ''.join(output)
61 rows = [{'outputs': os},
62 {'outputs': os},
63 {'outputs': os},
64 {'outputs': os}]
65
66 self.vapi.qos_egress_map_update(2, rows)
67
68 output = [chr(3)] * 256
69 os = ''.join(output)
70 rows = [{'outputs': os},
71 {'outputs': os},
72 {'outputs': os},
73 {'outputs': os}]
74
75 self.vapi.qos_egress_map_update(3, rows)
76
77 output = [chr(4)] * 256
78 os = ''.join(output)
79 rows = [{'outputs': os},
80 {'outputs': os},
81 {'outputs': os},
82 {'outputs': os}]
83 self.vapi.qos_egress_map_update(4, rows)
84 self.vapi.qos_egress_map_update(5, rows)
85 self.vapi.qos_egress_map_update(6, rows)
86 self.vapi.qos_egress_map_update(7, rows)
87
88 self.logger.info(self.vapi.cli("sh qos eg map"))
89
90 #
91 # Bind interface pgN to table n
92 #
93 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
94 QOS_SOURCE.IP,
95 1,
96 1)
97 self.vapi.qos_mark_enable_disable(self.pg2.sw_if_index,
98 QOS_SOURCE.IP,
99 2,
100 1)
101 self.vapi.qos_mark_enable_disable(self.pg3.sw_if_index,
102 QOS_SOURCE.IP,
103 3,
104 1)
105 self.vapi.qos_mark_enable_disable(self.pg4.sw_if_index,
106 QOS_SOURCE.IP,
107 4,
108 1)
109
110 #
111 # packets ingress on Pg0
112 #
113 p_v4 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
114 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, tos=1) /
115 UDP(sport=1234, dport=1234) /
116 Raw(chr(100) * 65))
117 p_v6 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
118 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6,
119 tc=1) /
120 UDP(sport=1234, dport=1234) /
121 Raw(chr(100) * 65))
122
123 #
124 # Since we have not yet enabled the recording of the input QoS
125 # from the input iP header, the egress packet's ToS will be unchanged
126 #
127 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
128 for p in rx:
129 self.assertEqual(p[IP].tos, 1)
130 rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
131 for p in rx:
132 self.assertEqual(p[IPv6].tc, 1)
133
134 #
135 # Enable QoS recrding on IP input for pg0
136 #
137 self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
138 QOS_SOURCE.IP,
139 1)
140
141 #
142 # send the same packets, this time expect the input TOS of 1
143 # to be mapped to pg1's egress value of 254
144 #
145 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
146 for p in rx:
147 self.assertEqual(p[IP].tos, 254)
148 rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
149 for p in rx:
150 self.assertEqual(p[IPv6].tc, 254)
151
152 #
153 # different input ToS to test the mapping
154 #
155 p_v4[IP].tos = 127
156 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
157 for p in rx:
158 self.assertEqual(p[IP].tos, 128)
159 p_v6[IPv6].tc = 127
160 rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
161 for p in rx:
162 self.assertEqual(p[IPv6].tc, 128)
163
164 p_v4[IP].tos = 254
165 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
166 for p in rx:
167 self.assertEqual(p[IP].tos, 1)
168 p_v6[IPv6].tc = 254
169 rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg1)
170 for p in rx:
171 self.assertEqual(p[IPv6].tc, 1)
172
173 #
174 # send packets out the other interfaces to test the maps are
175 # correctly applied
176 #
177 p_v4[IP].dst = self.pg2.remote_ip4
178 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg2)
179 for p in rx:
180 self.assertEqual(p[IP].tos, 2)
181
182 p_v4[IP].dst = self.pg3.remote_ip4
183 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg3)
184 for p in rx:
185 self.assertEqual(p[IP].tos, 3)
186
187 p_v6[IPv6].dst = self.pg3.remote_ip6
188 rx = self.send_and_expect(self.pg0, p_v6 * 65, self.pg3)
189 for p in rx:
190 self.assertEqual(p[IPv6].tc, 3)
191
192 #
193 # remove the map on pg2 and pg3, now expect an unchanged IP tos
194 #
195 self.vapi.qos_mark_enable_disable(self.pg2.sw_if_index,
196 QOS_SOURCE.IP,
197 2,
198 0)
199 self.vapi.qos_mark_enable_disable(self.pg3.sw_if_index,
200 QOS_SOURCE.IP,
201 3,
202 0)
203 self.logger.info(self.vapi.cli("sh int feat pg2"))
204
205 p_v4[IP].dst = self.pg2.remote_ip4
206 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg2)
207 for p in rx:
208 self.assertEqual(p[IP].tos, 254)
209
210 p_v4[IP].dst = self.pg3.remote_ip4
211 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg3)
212 for p in rx:
213 self.assertEqual(p[IP].tos, 254)
214
215 #
216 # still mapping out of pg1
217 #
218 p_v4[IP].dst = self.pg1.remote_ip4
219 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
220 for p in rx:
221 self.assertEqual(p[IP].tos, 1)
222
223 #
224 # disable the input recording on pg0
225 #
226 self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
227 QOS_SOURCE.IP,
228 0)
229
230 #
231 # back to an unchanged TOS value
232 #
233 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
234 for p in rx:
235 self.assertEqual(p[IP].tos, 254)
236
237 #
238 # disable the egress map on pg1 and pg4
239 #
240 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
241 QOS_SOURCE.IP,
242 1,
243 0)
244 self.vapi.qos_mark_enable_disable(self.pg4.sw_if_index,
245 QOS_SOURCE.IP,
246 4,
247 0)
248
249 #
250 # unchanged Tos on pg1
251 #
252 rx = self.send_and_expect(self.pg0, p_v4 * 65, self.pg1)
253 for p in rx:
254 self.assertEqual(p[IP].tos, 254)
255
256 #
257 # clean-up the masp
258 #
259 self.vapi.qos_egress_map_delete(1)
260 self.vapi.qos_egress_map_delete(4)
261 self.vapi.qos_egress_map_delete(2)
262 self.vapi.qos_egress_map_delete(3)
263 self.vapi.qos_egress_map_delete(5)
264 self.vapi.qos_egress_map_delete(6)
265 self.vapi.qos_egress_map_delete(7)
266
267 def test_qos_mpls(self):
268 """ QoS Mark MPLS """
269
270 #
271 # 255 QoS for all input values
272 #
273 output = [chr(255)] * 256
274 os = ''.join(output)
275 rows = [{'outputs': os},
276 {'outputs': os},
277 {'outputs': os},
278 {'outputs': os}]
279
280 self.vapi.qos_egress_map_update(1, rows)
281
282 #
283 # a route with 1 MPLS label
284 #
285 route_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
286 [VppRoutePath(self.pg1.remote_ip4,
287 self.pg1.sw_if_index,
288 labels=[32])])
289 route_10_0_0_1.add_vpp_config()
290
291 #
292 # a route with 3 MPLS labels
293 #
294 route_10_0_0_3 = VppIpRoute(self, "10.0.0.3", 32,
295 [VppRoutePath(self.pg1.remote_ip4,
296 self.pg1.sw_if_index,
297 labels=[63, 33, 34])])
298 route_10_0_0_3.add_vpp_config()
299
300 #
301 # enable IP QoS recording on the input Pg0 and MPLS egress marking
302 # on Pg1
303 #
304 self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
305 QOS_SOURCE.IP,
306 1)
307 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
308 QOS_SOURCE.MPLS,
309 1,
310 1)
311
312 #
313 # packet that will get one label added and 3 labels added resp.
314 #
315 p_1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
316 IP(src=self.pg0.remote_ip4, dst="10.0.0.1", tos=1) /
317 UDP(sport=1234, dport=1234) /
318 Raw(chr(100) * 65))
319 p_3 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
320 IP(src=self.pg0.remote_ip4, dst="10.0.0.3", tos=1) /
321 UDP(sport=1234, dport=1234) /
322 Raw(chr(100) * 65))
323
324 rx = self.send_and_expect(self.pg0, p_1 * 65, self.pg1)
325
326 #
327 # only 3 bits of ToS value in MPLS make sure tos is correct
328 # and the label and EOS bit have not been corrupted
329 #
330 for p in rx:
331 self.assertEqual(p[MPLS].cos, 7)
332 self.assertEqual(p[MPLS].label, 32)
333 self.assertEqual(p[MPLS].s, 1)
334 rx = self.send_and_expect(self.pg0, p_3 * 65, self.pg1)
335 for p in rx:
336 self.assertEqual(p[MPLS].cos, 7)
337 self.assertEqual(p[MPLS].label, 63)
338 self.assertEqual(p[MPLS].s, 0)
339 h = p[MPLS].payload
340 self.assertEqual(h[MPLS].cos, 7)
341 self.assertEqual(h[MPLS].label, 33)
342 self.assertEqual(h[MPLS].s, 0)
343 h = h[MPLS].payload
344 self.assertEqual(h[MPLS].cos, 7)
345 self.assertEqual(h[MPLS].label, 34)
346 self.assertEqual(h[MPLS].s, 1)
347
348 #
349 # cleanup
350 #
351 self.vapi.qos_record_enable_disable(self.pg0.sw_if_index,
352 QOS_SOURCE.IP,
353 0)
354 self.vapi.qos_mark_enable_disable(self.pg1.sw_if_index,
355 QOS_SOURCE.MPLS,
356 1,
357 0)
358 self.vapi.qos_egress_map_delete(1)
359
360
361if __name__ == '__main__':
362 unittest.main(testRunner=VppTestRunner)