blob: b64d3bc8367d020296c1664cf71137d999616fdf [file] [log] [blame]
Kris Michielsen91074432017-06-22 13:00:20 +02001#!/usr/bin/env python
2
3import unittest
Gabriel Ganne8527f122017-10-02 11:41:24 +02004import binascii
Kris Michielsen91074432017-06-22 13:00:20 +02005from socket import AF_INET6
6
7from framework import VppTestCase, VppTestRunner
Neale Ranns8f6dd322018-05-17 06:34:24 -07008from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto, VppIpTable
Kris Michielsen91074432017-06-22 13:00:20 +02009from vpp_srv6 import SRv6LocalSIDBehaviors, VppSRv6LocalSID, VppSRv6Policy, \
10 SRv6PolicyType, VppSRv6Steering, SRv6PolicySteeringTypes
11
12from scapy.packet import Raw
13from scapy.layers.l2 import Ether, Dot1Q
14from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
15from scapy.layers.inet import IP, UDP
16
17from scapy.utils import inet_pton, inet_ntop
18
19from util import ppp
20
21
22class TestSRv6(VppTestCase):
23 """ SRv6 Test Case """
24
25 @classmethod
26 def setUpClass(self):
27 super(TestSRv6, self).setUpClass()
28
29 def setUp(self):
30 """ Perform test setup before each test case.
31 """
32 super(TestSRv6, self).setUp()
33
34 # packet sizes, inclusive L2 overhead
35 self.pg_packet_sizes = [64, 512, 1518, 9018]
36
37 # reset packet_infos
38 self.reset_packet_infos()
39
40 def tearDown(self):
41 """ Clean up test setup after each test case.
42 """
43 self.teardown_interfaces()
44
45 super(TestSRv6, self).tearDown()
46
47 def configure_interface(self,
48 interface,
49 ipv6=False, ipv4=False,
50 ipv6_table_id=0, ipv4_table_id=0):
51 """ Configure interface.
52 :param ipv6: configure IPv6 on interface
53 :param ipv4: configure IPv4 on interface
54 :param ipv6_table_id: FIB table_id for IPv6
55 :param ipv4_table_id: FIB table_id for IPv4
56 """
57 self.logger.debug("Configuring interface %s" % (interface.name))
58 if ipv6:
59 self.logger.debug("Configuring IPv6")
60 interface.set_table_ip6(ipv6_table_id)
61 interface.config_ip6()
62 interface.resolve_ndp(timeout=5)
63 if ipv4:
64 self.logger.debug("Configuring IPv4")
65 interface.set_table_ip4(ipv4_table_id)
66 interface.config_ip4()
67 interface.resolve_arp()
68 interface.admin_up()
69
70 def setup_interfaces(self, ipv6=[], ipv4=[],
71 ipv6_table_id=[], ipv4_table_id=[]):
72 """ Create and configure interfaces.
73
74 :param ipv6: list of interface IPv6 capabilities
75 :param ipv4: list of interface IPv4 capabilities
76 :param ipv6_table_id: list of intf IPv6 FIB table_ids
77 :param ipv4_table_id: list of intf IPv4 FIB table_ids
78 :returns: List of created interfaces.
79 """
80 # how many interfaces?
81 if len(ipv6):
82 count = len(ipv6)
83 else:
84 count = len(ipv4)
85 self.logger.debug("Creating and configuring %d interfaces" % (count))
86
87 # fill up ipv6 and ipv4 lists if needed
88 # not enabled (False) is the default
89 if len(ipv6) < count:
90 ipv6 += (count - len(ipv6)) * [False]
91 if len(ipv4) < count:
92 ipv4 += (count - len(ipv4)) * [False]
93
94 # fill up table_id lists if needed
95 # table_id 0 (global) is the default
96 if len(ipv6_table_id) < count:
97 ipv6_table_id += (count - len(ipv6_table_id)) * [0]
98 if len(ipv4_table_id) < count:
99 ipv4_table_id += (count - len(ipv4_table_id)) * [0]
100
101 # create 'count' pg interfaces
102 self.create_pg_interfaces(range(count))
103
104 # setup all interfaces
105 for i in range(count):
106 intf = self.pg_interfaces[i]
107 self.configure_interface(intf,
108 ipv6[i], ipv4[i],
109 ipv6_table_id[i], ipv4_table_id[i])
110
111 if any(ipv6):
112 self.logger.debug(self.vapi.cli("show ip6 neighbors"))
113 if any(ipv4):
114 self.logger.debug(self.vapi.cli("show ip arp"))
115 self.logger.debug(self.vapi.cli("show interface"))
116 self.logger.debug(self.vapi.cli("show hardware"))
117
118 return self.pg_interfaces
119
120 def teardown_interfaces(self):
121 """ Unconfigure and bring down interface.
122 """
123 self.logger.debug("Tearing down interfaces")
124 # tear down all interfaces
125 # AFAIK they cannot be deleted
126 for i in self.pg_interfaces:
127 self.logger.debug("Tear down interface %s" % (i.name))
Neale Rannsf9342022017-10-13 02:43:33 -0700128 i.admin_down()
Florin Corasd0a59722017-10-15 17:41:21 +0000129 i.unconfig()
Neale Ranns8f6dd322018-05-17 06:34:24 -0700130 i.set_table_ip4(0)
131 i.set_table_ip6(0)
Kris Michielsen91074432017-06-22 13:00:20 +0200132
Neale Ranns2bc94022018-02-25 12:27:18 -0800133 @unittest.skipUnless(0, "PC to fix")
Kris Michielsen91074432017-06-22 13:00:20 +0200134 def test_SRv6_T_Encaps(self):
135 """ Test SRv6 Transit.Encaps behavior for IPv6.
136 """
137 # send traffic to one destination interface
138 # source and destination are IPv6 only
139 self.setup_interfaces(ipv6=[True, True])
140
141 # configure FIB entries
142 route = VppIpRoute(self, "a4::", 64,
143 [VppRoutePath(self.pg1.remote_ip6,
144 self.pg1.sw_if_index,
145 proto=DpoProto.DPO_PROTO_IP6)],
146 is_ip6=1)
147 route.add_vpp_config()
148
149 # configure encaps IPv6 source address
150 # needs to be done before SR Policy config
151 # TODO: API?
152 self.vapi.cli("set sr encaps source addr a3::")
153
154 bsid = 'a3::9999:1'
155 # configure SRv6 Policy
156 # Note: segment list order: first -> last
157 sr_policy = VppSRv6Policy(
158 self, bsid=bsid,
159 is_encap=1,
160 sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
161 weight=1, fib_table=0,
162 segments=['a4::', 'a5::', 'a6::c7'],
163 source='a3::')
164 sr_policy.add_vpp_config()
165 self.sr_policy = sr_policy
166
167 # log the sr policies
168 self.logger.info(self.vapi.cli("show sr policies"))
169
170 # steer IPv6 traffic to a7::/64 into SRv6 Policy
171 # use the bsid of the above self.sr_policy
172 pol_steering = VppSRv6Steering(
173 self,
174 bsid=self.sr_policy.bsid,
175 prefix="a7::", mask_width=64,
176 traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6,
177 sr_policy_index=0, table_id=0,
178 sw_if_index=0)
179 pol_steering.add_vpp_config()
180
181 # log the sr steering policies
182 self.logger.info(self.vapi.cli("show sr steering policies"))
183
184 # create packets
185 count = len(self.pg_packet_sizes)
186 dst_inner = 'a7::1234'
187 pkts = []
188
189 # create IPv6 packets without SRH
190 packet_header = self.create_packet_header_IPv6(dst_inner)
191 # create traffic stream pg0->pg1
192 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
193 self.pg_packet_sizes, count))
194
195 # create IPv6 packets with SRH
196 # packets with segments-left 1, active segment a7::
197 packet_header = self.create_packet_header_IPv6_SRH(
198 sidlist=['a8::', 'a7::', 'a6::'],
199 segleft=1)
200 # create traffic stream pg0->pg1
201 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
202 self.pg_packet_sizes, count))
203
204 # create IPv6 packets with SRH and IPv6
205 # packets with segments-left 1, active segment a7::
206 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
207 dst_inner,
208 sidlist=['a8::', 'a7::', 'a6::'],
209 segleft=1)
210 # create traffic stream pg0->pg1
211 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
212 self.pg_packet_sizes, count))
213
214 # send packets and verify received packets
215 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
216 self.compare_rx_tx_packet_T_Encaps)
217
218 # log the localsid counters
219 self.logger.info(self.vapi.cli("show sr localsid"))
220
221 # remove SR steering
222 pol_steering.remove_vpp_config()
223 self.logger.info(self.vapi.cli("show sr steering policies"))
224
225 # remove SR Policies
226 self.sr_policy.remove_vpp_config()
227 self.logger.info(self.vapi.cli("show sr policies"))
228
229 # remove FIB entries
230 # done by tearDown
231
232 # cleanup interfaces
233 self.teardown_interfaces()
234
Neale Ranns2bc94022018-02-25 12:27:18 -0800235 @unittest.skipUnless(0, "PC to fix")
Kris Michielsen91074432017-06-22 13:00:20 +0200236 def test_SRv6_T_Insert(self):
237 """ Test SRv6 Transit.Insert behavior (IPv6 only).
238 """
239 # send traffic to one destination interface
240 # source and destination are IPv6 only
241 self.setup_interfaces(ipv6=[True, True])
242
243 # configure FIB entries
244 route = VppIpRoute(self, "a4::", 64,
245 [VppRoutePath(self.pg1.remote_ip6,
246 self.pg1.sw_if_index,
247 proto=DpoProto.DPO_PROTO_IP6)],
248 is_ip6=1)
249 route.add_vpp_config()
250
251 # configure encaps IPv6 source address
252 # needs to be done before SR Policy config
253 # TODO: API?
254 self.vapi.cli("set sr encaps source addr a3::")
255
256 bsid = 'a3::9999:1'
257 # configure SRv6 Policy
258 # Note: segment list order: first -> last
259 sr_policy = VppSRv6Policy(
260 self, bsid=bsid,
261 is_encap=0,
262 sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
263 weight=1, fib_table=0,
264 segments=['a4::', 'a5::', 'a6::c7'],
265 source='a3::')
266 sr_policy.add_vpp_config()
267 self.sr_policy = sr_policy
268
269 # log the sr policies
270 self.logger.info(self.vapi.cli("show sr policies"))
271
272 # steer IPv6 traffic to a7::/64 into SRv6 Policy
273 # use the bsid of the above self.sr_policy
274 pol_steering = VppSRv6Steering(
275 self,
276 bsid=self.sr_policy.bsid,
277 prefix="a7::", mask_width=64,
278 traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6,
279 sr_policy_index=0, table_id=0,
280 sw_if_index=0)
281 pol_steering.add_vpp_config()
282
283 # log the sr steering policies
284 self.logger.info(self.vapi.cli("show sr steering policies"))
285
286 # create packets
287 count = len(self.pg_packet_sizes)
288 dst_inner = 'a7::1234'
289 pkts = []
290
291 # create IPv6 packets without SRH
292 packet_header = self.create_packet_header_IPv6(dst_inner)
293 # create traffic stream pg0->pg1
294 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
295 self.pg_packet_sizes, count))
296
297 # create IPv6 packets with SRH
298 # packets with segments-left 1, active segment a7::
299 packet_header = self.create_packet_header_IPv6_SRH(
300 sidlist=['a8::', 'a7::', 'a6::'],
301 segleft=1)
302 # create traffic stream pg0->pg1
303 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
304 self.pg_packet_sizes, count))
305
306 # send packets and verify received packets
307 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
308 self.compare_rx_tx_packet_T_Insert)
309
310 # log the localsid counters
311 self.logger.info(self.vapi.cli("show sr localsid"))
312
313 # remove SR steering
314 pol_steering.remove_vpp_config()
315 self.logger.info(self.vapi.cli("show sr steering policies"))
316
317 # remove SR Policies
318 self.sr_policy.remove_vpp_config()
319 self.logger.info(self.vapi.cli("show sr policies"))
320
321 # remove FIB entries
322 # done by tearDown
323
324 # cleanup interfaces
325 self.teardown_interfaces()
326
Neale Ranns2bc94022018-02-25 12:27:18 -0800327 @unittest.skipUnless(0, "PC to fix")
Kris Michielsen91074432017-06-22 13:00:20 +0200328 def test_SRv6_T_Encaps_IPv4(self):
329 """ Test SRv6 Transit.Encaps behavior for IPv4.
330 """
331 # send traffic to one destination interface
332 # source interface is IPv4 only
333 # destination interface is IPv6 only
334 self.setup_interfaces(ipv6=[False, True], ipv4=[True, False])
335
336 # configure FIB entries
337 route = VppIpRoute(self, "a4::", 64,
338 [VppRoutePath(self.pg1.remote_ip6,
339 self.pg1.sw_if_index,
340 proto=DpoProto.DPO_PROTO_IP6)],
341 is_ip6=1)
342 route.add_vpp_config()
343
344 # configure encaps IPv6 source address
345 # needs to be done before SR Policy config
346 # TODO: API?
347 self.vapi.cli("set sr encaps source addr a3::")
348
349 bsid = 'a3::9999:1'
350 # configure SRv6 Policy
351 # Note: segment list order: first -> last
352 sr_policy = VppSRv6Policy(
353 self, bsid=bsid,
354 is_encap=1,
355 sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
356 weight=1, fib_table=0,
357 segments=['a4::', 'a5::', 'a6::c7'],
358 source='a3::')
359 sr_policy.add_vpp_config()
360 self.sr_policy = sr_policy
361
362 # log the sr policies
363 self.logger.info(self.vapi.cli("show sr policies"))
364
365 # steer IPv4 traffic to 7.1.1.0/24 into SRv6 Policy
366 # use the bsid of the above self.sr_policy
367 pol_steering = VppSRv6Steering(
368 self,
369 bsid=self.sr_policy.bsid,
370 prefix="7.1.1.0", mask_width=24,
371 traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV4,
372 sr_policy_index=0, table_id=0,
373 sw_if_index=0)
374 pol_steering.add_vpp_config()
375
376 # log the sr steering policies
377 self.logger.info(self.vapi.cli("show sr steering policies"))
378
379 # create packets
380 count = len(self.pg_packet_sizes)
381 dst_inner = '7.1.1.123'
382 pkts = []
383
384 # create IPv4 packets
385 packet_header = self.create_packet_header_IPv4(dst_inner)
386 # create traffic stream pg0->pg1
387 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
388 self.pg_packet_sizes, count))
389
390 # send packets and verify received packets
391 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
392 self.compare_rx_tx_packet_T_Encaps_IPv4)
393
394 # log the localsid counters
395 self.logger.info(self.vapi.cli("show sr localsid"))
396
397 # remove SR steering
398 pol_steering.remove_vpp_config()
399 self.logger.info(self.vapi.cli("show sr steering policies"))
400
401 # remove SR Policies
402 self.sr_policy.remove_vpp_config()
403 self.logger.info(self.vapi.cli("show sr policies"))
404
405 # remove FIB entries
406 # done by tearDown
407
408 # cleanup interfaces
409 self.teardown_interfaces()
410
411 @unittest.skip("VPP crashes after running this test")
412 def test_SRv6_T_Encaps_L2(self):
413 """ Test SRv6 Transit.Encaps behavior for L2.
414 """
415 # send traffic to one destination interface
416 # source interface is IPv4 only TODO?
417 # destination interface is IPv6 only
418 self.setup_interfaces(ipv6=[False, True], ipv4=[False, False])
419
420 # configure FIB entries
421 route = VppIpRoute(self, "a4::", 64,
422 [VppRoutePath(self.pg1.remote_ip6,
423 self.pg1.sw_if_index,
424 proto=DpoProto.DPO_PROTO_IP6)],
425 is_ip6=1)
426 route.add_vpp_config()
427
428 # configure encaps IPv6 source address
429 # needs to be done before SR Policy config
430 # TODO: API?
431 self.vapi.cli("set sr encaps source addr a3::")
432
433 bsid = 'a3::9999:1'
434 # configure SRv6 Policy
435 # Note: segment list order: first -> last
436 sr_policy = VppSRv6Policy(
437 self, bsid=bsid,
438 is_encap=1,
439 sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
440 weight=1, fib_table=0,
441 segments=['a4::', 'a5::', 'a6::c7'],
442 source='a3::')
443 sr_policy.add_vpp_config()
444 self.sr_policy = sr_policy
445
446 # log the sr policies
447 self.logger.info(self.vapi.cli("show sr policies"))
448
449 # steer L2 traffic into SRv6 Policy
450 # use the bsid of the above self.sr_policy
451 pol_steering = VppSRv6Steering(
452 self,
453 bsid=self.sr_policy.bsid,
454 prefix="::", mask_width=0,
455 traffic_type=SRv6PolicySteeringTypes.SR_STEER_L2,
456 sr_policy_index=0, table_id=0,
457 sw_if_index=self.pg0.sw_if_index)
458 pol_steering.add_vpp_config()
459
460 # log the sr steering policies
461 self.logger.info(self.vapi.cli("show sr steering policies"))
462
463 # create packets
464 count = len(self.pg_packet_sizes)
465 pkts = []
466
467 # create L2 packets without dot1q header
468 packet_header = self.create_packet_header_L2()
469 # create traffic stream pg0->pg1
470 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
471 self.pg_packet_sizes, count))
472
473 # create L2 packets with dot1q header
474 packet_header = self.create_packet_header_L2(vlan=123)
475 # create traffic stream pg0->pg1
476 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
477 self.pg_packet_sizes, count))
478
479 # send packets and verify received packets
480 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
481 self.compare_rx_tx_packet_T_Encaps_L2)
482
483 # log the localsid counters
484 self.logger.info(self.vapi.cli("show sr localsid"))
485
486 # remove SR steering
487 pol_steering.remove_vpp_config()
488 self.logger.info(self.vapi.cli("show sr steering policies"))
489
490 # remove SR Policies
491 self.sr_policy.remove_vpp_config()
492 self.logger.info(self.vapi.cli("show sr policies"))
493
494 # remove FIB entries
495 # done by tearDown
496
497 # cleanup interfaces
498 self.teardown_interfaces()
499
500 def test_SRv6_End(self):
501 """ Test SRv6 End (without PSP) behavior.
502 """
503 # send traffic to one destination interface
504 # source and destination interfaces are IPv6 only
505 self.setup_interfaces(ipv6=[True, True])
506
507 # configure FIB entries
508 route = VppIpRoute(self, "a4::", 64,
509 [VppRoutePath(self.pg1.remote_ip6,
510 self.pg1.sw_if_index,
511 proto=DpoProto.DPO_PROTO_IP6)],
512 is_ip6=1)
513 route.add_vpp_config()
514
515 # configure SRv6 localSID End without PSP behavior
516 localsid = VppSRv6LocalSID(
517 self, localsid_addr='A3::0',
518 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_END,
519 nh_addr='::',
520 end_psp=0,
521 sw_if_index=0,
522 vlan_index=0,
523 fib_table=0)
524 localsid.add_vpp_config()
525 # log the localsids
526 self.logger.debug(self.vapi.cli("show sr localsid"))
527
528 # create IPv6 packets with SRH (SL=2, SL=1, SL=0)
529 # send one packet per SL value per packet size
530 # SL=0 packet with localSID End with USP needs 2nd SRH
531 count = len(self.pg_packet_sizes)
532 dst_inner = 'a4::1234'
533 pkts = []
534
535 # packets with segments-left 2, active segment a3::
536 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
537 dst_inner,
538 sidlist=['a5::', 'a4::', 'a3::'],
539 segleft=2)
540 # create traffic stream pg0->pg1
541 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
542 self.pg_packet_sizes, count))
543
544 # packets with segments-left 1, active segment a3::
545 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
546 dst_inner,
547 sidlist=['a4::', 'a3::', 'a2::'],
548 segleft=1)
549 # add to traffic stream pg0->pg1
550 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
551 self.pg_packet_sizes, count))
552
553 # TODO: test behavior with SL=0 packet (needs 2*SRH?)
554
555 # send packets and verify received packets
556 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
557 self.compare_rx_tx_packet_End)
558
559 # log the localsid counters
560 self.logger.info(self.vapi.cli("show sr localsid"))
561
562 # remove SRv6 localSIDs
563 localsid.remove_vpp_config()
564
565 # remove FIB entries
566 # done by tearDown
567
568 # cleanup interfaces
569 self.teardown_interfaces()
570
571 def test_SRv6_End_with_PSP(self):
572 """ Test SRv6 End with PSP behavior.
573 """
574 # send traffic to one destination interface
575 # source and destination interfaces are IPv6 only
576 self.setup_interfaces(ipv6=[True, True])
577
578 # configure FIB entries
579 route = VppIpRoute(self, "a4::", 64,
580 [VppRoutePath(self.pg1.remote_ip6,
581 self.pg1.sw_if_index,
582 proto=DpoProto.DPO_PROTO_IP6)],
583 is_ip6=1)
584 route.add_vpp_config()
585
586 # configure SRv6 localSID End with PSP behavior
587 localsid = VppSRv6LocalSID(
588 self, localsid_addr='A3::0',
589 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_END,
590 nh_addr='::',
591 end_psp=1,
592 sw_if_index=0,
593 vlan_index=0,
594 fib_table=0)
595 localsid.add_vpp_config()
596 # log the localsids
597 self.logger.debug(self.vapi.cli("show sr localsid"))
598
599 # create IPv6 packets with SRH (SL=2, SL=1)
600 # send one packet per SL value per packet size
601 # SL=0 packet with localSID End with PSP is dropped
602 count = len(self.pg_packet_sizes)
603 dst_inner = 'a4::1234'
604 pkts = []
605
606 # packets with segments-left 2, active segment a3::
607 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
608 dst_inner,
609 sidlist=['a5::', 'a4::', 'a3::'],
610 segleft=2)
611 # create traffic stream pg0->pg1
612 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
613 self.pg_packet_sizes, count))
614
615 # packets with segments-left 1, active segment a3::
616 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
617 dst_inner,
618 sidlist=['a4::', 'a3::', 'a2::'],
619 segleft=1)
620 # add to traffic stream pg0->pg1
621 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
622 self.pg_packet_sizes, count))
623
624 # send packets and verify received packets
625 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
626 self.compare_rx_tx_packet_End_PSP)
627
628 # log the localsid counters
629 self.logger.info(self.vapi.cli("show sr localsid"))
630
631 # remove SRv6 localSIDs
632 localsid.remove_vpp_config()
633
634 # remove FIB entries
635 # done by tearDown
636
637 # cleanup interfaces
638 self.teardown_interfaces()
639
640 def test_SRv6_End_X(self):
641 """ Test SRv6 End.X (without PSP) behavior.
642 """
643 # create three interfaces (1 source, 2 destinations)
644 # source and destination interfaces are IPv6 only
645 self.setup_interfaces(ipv6=[True, True, True])
646
647 # configure FIB entries
648 # a4::/64 via pg1 and pg2
649 route = VppIpRoute(self, "a4::", 64,
650 [VppRoutePath(self.pg1.remote_ip6,
651 self.pg1.sw_if_index,
652 proto=DpoProto.DPO_PROTO_IP6),
653 VppRoutePath(self.pg2.remote_ip6,
654 self.pg2.sw_if_index,
655 proto=DpoProto.DPO_PROTO_IP6)],
656 is_ip6=1)
657 route.add_vpp_config()
658 self.logger.debug(self.vapi.cli("show ip6 fib"))
659
660 # configure SRv6 localSID End.X without PSP behavior
661 # End.X points to interface pg1
662 localsid = VppSRv6LocalSID(
663 self, localsid_addr='A3::C4',
664 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_X,
665 nh_addr=self.pg1.remote_ip6,
666 end_psp=0,
667 sw_if_index=self.pg1.sw_if_index,
668 vlan_index=0,
669 fib_table=0)
670 localsid.add_vpp_config()
671 # log the localsids
672 self.logger.debug(self.vapi.cli("show sr localsid"))
673
674 # create IPv6 packets with SRH (SL=2, SL=1)
675 # send one packet per SL value per packet size
676 # SL=0 packet with localSID End with PSP is dropped
677 count = len(self.pg_packet_sizes)
678 dst_inner = 'a4::1234'
679 pkts = []
680
681 # packets with segments-left 2, active segment a3::c4
682 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
683 dst_inner,
684 sidlist=['a5::', 'a4::', 'a3::c4'],
685 segleft=2)
686 # create traffic stream pg0->pg1
687 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
688 self.pg_packet_sizes, count))
689
690 # packets with segments-left 1, active segment a3::c4
691 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
692 dst_inner,
693 sidlist=['a4::', 'a3::c4', 'a2::'],
694 segleft=1)
695 # add to traffic stream pg0->pg1
696 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
697 self.pg_packet_sizes, count))
698
699 # send packets and verify received packets
700 # using same comparison function as End (no PSP)
701 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
702 self.compare_rx_tx_packet_End)
703
704 # assert nothing was received on the other interface (pg2)
705 self.pg2.assert_nothing_captured("mis-directed packet(s)")
706
707 # log the localsid counters
708 self.logger.info(self.vapi.cli("show sr localsid"))
709
710 # remove SRv6 localSIDs
711 localsid.remove_vpp_config()
712
713 # remove FIB entries
714 # done by tearDown
715
716 # cleanup interfaces
717 self.teardown_interfaces()
718
719 def test_SRv6_End_X_with_PSP(self):
720 """ Test SRv6 End.X with PSP behavior.
721 """
722 # create three interfaces (1 source, 2 destinations)
723 # source and destination interfaces are IPv6 only
724 self.setup_interfaces(ipv6=[True, True, True])
725
726 # configure FIB entries
727 # a4::/64 via pg1 and pg2
728 route = VppIpRoute(self, "a4::", 64,
729 [VppRoutePath(self.pg1.remote_ip6,
730 self.pg1.sw_if_index,
731 proto=DpoProto.DPO_PROTO_IP6),
732 VppRoutePath(self.pg2.remote_ip6,
733 self.pg2.sw_if_index,
734 proto=DpoProto.DPO_PROTO_IP6)],
735 is_ip6=1)
736 route.add_vpp_config()
737
738 # configure SRv6 localSID End with PSP behavior
739 localsid = VppSRv6LocalSID(
740 self, localsid_addr='A3::C4',
741 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_X,
742 nh_addr=self.pg1.remote_ip6,
743 end_psp=1,
744 sw_if_index=self.pg1.sw_if_index,
745 vlan_index=0,
746 fib_table=0)
747 localsid.add_vpp_config()
748 # log the localsids
749 self.logger.debug(self.vapi.cli("show sr localsid"))
750
751 # create IPv6 packets with SRH (SL=2, SL=1)
752 # send one packet per SL value per packet size
753 # SL=0 packet with localSID End with PSP is dropped
754 count = len(self.pg_packet_sizes)
755 dst_inner = 'a4::1234'
756 pkts = []
757
758 # packets with segments-left 2, active segment a3::
759 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
760 dst_inner,
761 sidlist=['a5::', 'a4::', 'a3::c4'],
762 segleft=2)
763 # create traffic stream pg0->pg1
764 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
765 self.pg_packet_sizes, count))
766
767 # packets with segments-left 1, active segment a3::
768 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
769 dst_inner,
770 sidlist=['a4::', 'a3::c4', 'a2::'],
771 segleft=1)
772 # add to traffic stream pg0->pg1
773 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
774 self.pg_packet_sizes, count))
775
776 # send packets and verify received packets
777 # using same comparison function as End with PSP
778 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
779 self.compare_rx_tx_packet_End_PSP)
780
781 # assert nothing was received on the other interface (pg2)
782 self.pg2.assert_nothing_captured("mis-directed packet(s)")
783
784 # log the localsid counters
785 self.logger.info(self.vapi.cli("show sr localsid"))
786
787 # remove SRv6 localSIDs
788 localsid.remove_vpp_config()
789
790 # remove FIB entries
791 # done by tearDown
792
793 # cleanup interfaces
794 self.teardown_interfaces()
795
796 def test_SRv6_End_DX6(self):
797 """ Test SRv6 End.DX6 behavior.
798 """
799 # send traffic to one destination interface
800 # source and destination interfaces are IPv6 only
801 self.setup_interfaces(ipv6=[True, True])
802
803 # configure SRv6 localSID End.DX6 behavior
804 localsid = VppSRv6LocalSID(
805 self, localsid_addr='a3::c4',
806 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX6,
807 nh_addr=self.pg1.remote_ip6,
808 end_psp=0,
809 sw_if_index=self.pg1.sw_if_index,
810 vlan_index=0,
811 fib_table=0)
812 localsid.add_vpp_config()
813 # log the localsids
814 self.logger.debug(self.vapi.cli("show sr localsid"))
815
816 # create IPv6 packets with SRH (SL=0)
817 # send one packet per packet size
818 count = len(self.pg_packet_sizes)
819 dst_inner = 'a4::1234' # inner header destination address
820 pkts = []
821
822 # packets with SRH, segments-left 0, active segment a3::c4
823 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
824 dst_inner,
825 sidlist=['a3::c4', 'a2::', 'a1::'],
826 segleft=0)
827 # add to traffic stream pg0->pg1
828 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
829 self.pg_packet_sizes, count))
830
831 # packets without SRH, IPv6 in IPv6
832 # outer IPv6 dest addr is the localsid End.DX6
833 packet_header = self.create_packet_header_IPv6_IPv6(
834 dst_inner,
835 dst_outer='a3::c4')
836 # add to traffic stream pg0->pg1
837 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
838 self.pg_packet_sizes, count))
839
840 # send packets and verify received packets
841 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
842 self.compare_rx_tx_packet_End_DX6)
843
844 # log the localsid counters
845 self.logger.info(self.vapi.cli("show sr localsid"))
846
847 # remove SRv6 localSIDs
848 localsid.remove_vpp_config()
849
850 # cleanup interfaces
851 self.teardown_interfaces()
852
853 def test_SRv6_End_DT6(self):
854 """ Test SRv6 End.DT6 behavior.
855 """
856 # create three interfaces (1 source, 2 destinations)
857 # all interfaces are IPv6 only
858 # source interface in global FIB (0)
859 # destination interfaces in global and vrf
860 vrf_1 = 1
Neale Ranns8f6dd322018-05-17 06:34:24 -0700861 ipt = VppIpTable(self, vrf_1, is_ip6=True)
862 ipt.add_vpp_config()
Kris Michielsen91074432017-06-22 13:00:20 +0200863 self.setup_interfaces(ipv6=[True, True, True],
864 ipv6_table_id=[0, 0, vrf_1])
865
866 # configure FIB entries
867 # a4::/64 is reachable
868 # via pg1 in table 0 (global)
869 # and via pg2 in table vrf_1
870 route0 = VppIpRoute(self, "a4::", 64,
871 [VppRoutePath(self.pg1.remote_ip6,
872 self.pg1.sw_if_index,
873 proto=DpoProto.DPO_PROTO_IP6,
874 nh_table_id=0)],
875 table_id=0,
876 is_ip6=1)
877 route0.add_vpp_config()
878 route1 = VppIpRoute(self, "a4::", 64,
879 [VppRoutePath(self.pg2.remote_ip6,
880 self.pg2.sw_if_index,
881 proto=DpoProto.DPO_PROTO_IP6,
882 nh_table_id=vrf_1)],
883 table_id=vrf_1,
884 is_ip6=1)
885 route1.add_vpp_config()
886 self.logger.debug(self.vapi.cli("show ip6 fib"))
887
888 # configure SRv6 localSID End.DT6 behavior
889 # Note:
890 # fib_table: where the localsid is installed
891 # sw_if_index: in T-variants of localsid this is the vrf table_id
892 localsid = VppSRv6LocalSID(
893 self, localsid_addr='a3::c4',
894 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DT6,
895 nh_addr='::',
896 end_psp=0,
897 sw_if_index=vrf_1,
898 vlan_index=0,
899 fib_table=0)
900 localsid.add_vpp_config()
901 # log the localsids
902 self.logger.debug(self.vapi.cli("show sr localsid"))
903
904 # create IPv6 packets with SRH (SL=0)
905 # send one packet per packet size
906 count = len(self.pg_packet_sizes)
907 dst_inner = 'a4::1234' # inner header destination address
908 pkts = []
909
910 # packets with SRH, segments-left 0, active segment a3::c4
911 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
912 dst_inner,
913 sidlist=['a3::c4', 'a2::', 'a1::'],
914 segleft=0)
915 # add to traffic stream pg0->pg1
916 pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
917 self.pg_packet_sizes, count))
918
919 # packets without SRH, IPv6 in IPv6
920 # outer IPv6 dest addr is the localsid End.DT6
921 packet_header = self.create_packet_header_IPv6_IPv6(
922 dst_inner,
923 dst_outer='a3::c4')
924 # add to traffic stream pg0->pg1
925 pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
926 self.pg_packet_sizes, count))
927
928 # send packets and verify received packets
929 # using same comparison function as End.DX6
930 self.send_and_verify_pkts(self.pg0, pkts, self.pg2,
931 self.compare_rx_tx_packet_End_DX6)
932
933 # assert nothing was received on the other interface (pg2)
934 self.pg1.assert_nothing_captured("mis-directed packet(s)")
935
936 # log the localsid counters
937 self.logger.info(self.vapi.cli("show sr localsid"))
938
939 # remove SRv6 localSIDs
940 localsid.remove_vpp_config()
941
942 # remove FIB entries
943 # done by tearDown
944
945 # cleanup interfaces
946 self.teardown_interfaces()
947
948 def test_SRv6_End_DX4(self):
949 """ Test SRv6 End.DX4 behavior.
950 """
951 # send traffic to one destination interface
952 # source interface is IPv6 only
953 # destination interface is IPv4 only
954 self.setup_interfaces(ipv6=[True, False], ipv4=[False, True])
955
956 # configure SRv6 localSID End.DX4 behavior
957 localsid = VppSRv6LocalSID(
958 self, localsid_addr='a3::c4',
959 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX4,
960 nh_addr=self.pg1.remote_ip4,
961 end_psp=0,
962 sw_if_index=self.pg1.sw_if_index,
963 vlan_index=0,
964 fib_table=0)
965 localsid.add_vpp_config()
966 # log the localsids
967 self.logger.debug(self.vapi.cli("show sr localsid"))
968
969 # send one packet per packet size
970 count = len(self.pg_packet_sizes)
971 dst_inner = '4.1.1.123' # inner header destination address
972 pkts = []
973
974 # packets with SRH, segments-left 0, active segment a3::c4
975 packet_header = self.create_packet_header_IPv6_SRH_IPv4(
976 dst_inner,
977 sidlist=['a3::c4', 'a2::', 'a1::'],
978 segleft=0)
979 # add to traffic stream pg0->pg1
980 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
981 self.pg_packet_sizes, count))
982
983 # packets without SRH, IPv4 in IPv6
984 # outer IPv6 dest addr is the localsid End.DX4
985 packet_header = self.create_packet_header_IPv6_IPv4(
986 dst_inner,
987 dst_outer='a3::c4')
988 # add to traffic stream pg0->pg1
989 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
990 self.pg_packet_sizes, count))
991
992 # send packets and verify received packets
993 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
994 self.compare_rx_tx_packet_End_DX4)
995
996 # log the localsid counters
997 self.logger.info(self.vapi.cli("show sr localsid"))
998
999 # remove SRv6 localSIDs
1000 localsid.remove_vpp_config()
1001
1002 # cleanup interfaces
1003 self.teardown_interfaces()
1004
1005 def test_SRv6_End_DT4(self):
1006 """ Test SRv6 End.DT4 behavior.
1007 """
1008 # create three interfaces (1 source, 2 destinations)
1009 # source interface is IPv6-only
1010 # destination interfaces are IPv4 only
1011 # source interface in global FIB (0)
1012 # destination interfaces in global and vrf
1013 vrf_1 = 1
Neale Ranns8f6dd322018-05-17 06:34:24 -07001014 ipt = VppIpTable(self, vrf_1)
1015 ipt.add_vpp_config()
Kris Michielsen91074432017-06-22 13:00:20 +02001016 self.setup_interfaces(ipv6=[True, False, False],
1017 ipv4=[False, True, True],
1018 ipv6_table_id=[0, 0, 0],
1019 ipv4_table_id=[0, 0, vrf_1])
1020
1021 # configure FIB entries
1022 # 4.1.1.0/24 is reachable
1023 # via pg1 in table 0 (global)
1024 # and via pg2 in table vrf_1
1025 route0 = VppIpRoute(self, "4.1.1.0", 24,
1026 [VppRoutePath(self.pg1.remote_ip4,
1027 self.pg1.sw_if_index,
1028 nh_table_id=0)],
1029 table_id=0,
1030 is_ip6=0)
1031 route0.add_vpp_config()
1032 route1 = VppIpRoute(self, "4.1.1.0", 24,
1033 [VppRoutePath(self.pg2.remote_ip4,
1034 self.pg2.sw_if_index,
1035 nh_table_id=vrf_1)],
1036 table_id=vrf_1,
1037 is_ip6=0)
1038 route1.add_vpp_config()
1039 self.logger.debug(self.vapi.cli("show ip fib"))
1040
1041 # configure SRv6 localSID End.DT6 behavior
1042 # Note:
1043 # fib_table: where the localsid is installed
1044 # sw_if_index: in T-variants of localsid: vrf table_id
1045 localsid = VppSRv6LocalSID(
1046 self, localsid_addr='a3::c4',
1047 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DT4,
1048 nh_addr='::',
1049 end_psp=0,
1050 sw_if_index=vrf_1,
1051 vlan_index=0,
1052 fib_table=0)
1053 localsid.add_vpp_config()
1054 # log the localsids
1055 self.logger.debug(self.vapi.cli("show sr localsid"))
1056
1057 # create IPv6 packets with SRH (SL=0)
1058 # send one packet per packet size
1059 count = len(self.pg_packet_sizes)
1060 dst_inner = '4.1.1.123' # inner header destination address
1061 pkts = []
1062
1063 # packets with SRH, segments-left 0, active segment a3::c4
1064 packet_header = self.create_packet_header_IPv6_SRH_IPv4(
1065 dst_inner,
1066 sidlist=['a3::c4', 'a2::', 'a1::'],
1067 segleft=0)
1068 # add to traffic stream pg0->pg1
1069 pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
1070 self.pg_packet_sizes, count))
1071
1072 # packets without SRH, IPv6 in IPv6
1073 # outer IPv6 dest addr is the localsid End.DX4
1074 packet_header = self.create_packet_header_IPv6_IPv4(
1075 dst_inner,
1076 dst_outer='a3::c4')
1077 # add to traffic stream pg0->pg1
1078 pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
1079 self.pg_packet_sizes, count))
1080
1081 # send packets and verify received packets
1082 # using same comparison function as End.DX4
1083 self.send_and_verify_pkts(self.pg0, pkts, self.pg2,
1084 self.compare_rx_tx_packet_End_DX4)
1085
1086 # assert nothing was received on the other interface (pg2)
1087 self.pg1.assert_nothing_captured("mis-directed packet(s)")
1088
1089 # log the localsid counters
1090 self.logger.info(self.vapi.cli("show sr localsid"))
1091
1092 # remove SRv6 localSIDs
1093 localsid.remove_vpp_config()
1094
1095 # remove FIB entries
1096 # done by tearDown
1097
1098 # cleanup interfaces
1099 self.teardown_interfaces()
1100
1101 def test_SRv6_End_DX2(self):
1102 """ Test SRv6 End.DX2 behavior.
1103 """
1104 # send traffic to one destination interface
1105 # source interface is IPv6 only
1106 self.setup_interfaces(ipv6=[True, False], ipv4=[False, False])
1107
1108 # configure SRv6 localSID End.DX2 behavior
1109 localsid = VppSRv6LocalSID(
1110 self, localsid_addr='a3::c4',
1111 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX2,
1112 nh_addr='::',
1113 end_psp=0,
1114 sw_if_index=self.pg1.sw_if_index,
1115 vlan_index=0,
1116 fib_table=0)
1117 localsid.add_vpp_config()
1118 # log the localsids
1119 self.logger.debug(self.vapi.cli("show sr localsid"))
1120
1121 # send one packet per packet size
1122 count = len(self.pg_packet_sizes)
1123 pkts = []
1124
1125 # packets with SRH, segments-left 0, active segment a3::c4
1126 # L2 has no dot1q header
1127 packet_header = self.create_packet_header_IPv6_SRH_L2(
1128 sidlist=['a3::c4', 'a2::', 'a1::'],
1129 segleft=0,
1130 vlan=0)
1131 # add to traffic stream pg0->pg1
1132 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1133 self.pg_packet_sizes, count))
1134
1135 # packets with SRH, segments-left 0, active segment a3::c4
1136 # L2 has dot1q header
1137 packet_header = self.create_packet_header_IPv6_SRH_L2(
1138 sidlist=['a3::c4', 'a2::', 'a1::'],
1139 segleft=0,
1140 vlan=123)
1141 # add to traffic stream pg0->pg1
1142 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1143 self.pg_packet_sizes, count))
1144
1145 # packets without SRH, L2 in IPv6
1146 # outer IPv6 dest addr is the localsid End.DX2
1147 # L2 has no dot1q header
1148 packet_header = self.create_packet_header_IPv6_L2(
1149 dst_outer='a3::c4',
1150 vlan=0)
1151 # add to traffic stream pg0->pg1
1152 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1153 self.pg_packet_sizes, count))
1154
1155 # packets without SRH, L2 in IPv6
1156 # outer IPv6 dest addr is the localsid End.DX2
1157 # L2 has dot1q header
1158 packet_header = self.create_packet_header_IPv6_L2(
1159 dst_outer='a3::c4',
1160 vlan=123)
1161 # add to traffic stream pg0->pg1
1162 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1163 self.pg_packet_sizes, count))
1164
1165 # send packets and verify received packets
1166 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
1167 self.compare_rx_tx_packet_End_DX2)
1168
1169 # log the localsid counters
1170 self.logger.info(self.vapi.cli("show sr localsid"))
1171
1172 # remove SRv6 localSIDs
1173 localsid.remove_vpp_config()
1174
1175 # cleanup interfaces
1176 self.teardown_interfaces()
1177
Neale Ranns2bc94022018-02-25 12:27:18 -08001178 @unittest.skipUnless(0, "PC to fix")
Gabriel Ganne8527f122017-10-02 11:41:24 +02001179 def test_SRv6_T_Insert_Classifier(self):
1180 """ Test SRv6 Transit.Insert behavior (IPv6 only).
1181 steer packets using the classifier
1182 """
1183 # send traffic to one destination interface
1184 # source and destination are IPv6 only
1185 self.setup_interfaces(ipv6=[False, False, False, True, True])
1186
1187 # configure FIB entries
1188 route = VppIpRoute(self, "a4::", 64,
1189 [VppRoutePath(self.pg4.remote_ip6,
1190 self.pg4.sw_if_index,
1191 proto=DpoProto.DPO_PROTO_IP6)],
1192 is_ip6=1)
1193 route.add_vpp_config()
1194
1195 # configure encaps IPv6 source address
1196 # needs to be done before SR Policy config
1197 # TODO: API?
1198 self.vapi.cli("set sr encaps source addr a3::")
1199
1200 bsid = 'a3::9999:1'
1201 # configure SRv6 Policy
1202 # Note: segment list order: first -> last
1203 sr_policy = VppSRv6Policy(
1204 self, bsid=bsid,
1205 is_encap=0,
1206 sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
1207 weight=1, fib_table=0,
1208 segments=['a4::', 'a5::', 'a6::c7'],
1209 source='a3::')
1210 sr_policy.add_vpp_config()
1211 self.sr_policy = sr_policy
1212
1213 # log the sr policies
1214 self.logger.info(self.vapi.cli("show sr policies"))
1215
1216 # add classify table
1217 # mask on dst ip address prefix a7::/8
1218 mask = '{:0<16}'.format('ff')
1219 r = self.vapi.classify_add_del_table(
1220 1,
1221 binascii.unhexlify(mask),
1222 match_n_vectors=(len(mask) - 1) // 32 + 1,
1223 current_data_flag=1,
1224 skip_n_vectors=2) # data offset
1225 self.assertIsNotNone(r, msg='No response msg for add_del_table')
1226 table_index = r.new_table_index
1227
1228 # add the source routign node as a ip6 inacl netxt node
1229 r = self.vapi.add_node_next('ip6-inacl',
1230 'sr-pl-rewrite-insert')
1231 inacl_next_node_index = r.node_index
1232
1233 match = '{:0<16}'.format('a7')
1234 r = self.vapi.classify_add_del_session(
1235 1,
1236 table_index,
1237 binascii.unhexlify(match),
1238 hit_next_index=inacl_next_node_index,
1239 action=3,
1240 metadata=0) # sr policy index
1241 self.assertIsNotNone(r, msg='No response msg for add_del_session')
1242
1243 # log the classify table used in the steering policy
1244 self.logger.info(self.vapi.cli("show classify table"))
1245
1246 r = self.vapi.input_acl_set_interface(
1247 is_add=1,
1248 sw_if_index=self.pg3.sw_if_index,
1249 ip6_table_index=table_index)
1250 self.assertIsNotNone(r,
1251 msg='No response msg for input_acl_set_interface')
1252
1253 # log the ip6 inacl
1254 self.logger.info(self.vapi.cli("show inacl type ip6"))
1255
1256 # create packets
1257 count = len(self.pg_packet_sizes)
1258 dst_inner = 'a7::1234'
1259 pkts = []
1260
1261 # create IPv6 packets without SRH
1262 packet_header = self.create_packet_header_IPv6(dst_inner)
1263 # create traffic stream pg3->pg4
1264 pkts.extend(self.create_stream(self.pg3, self.pg4, packet_header,
1265 self.pg_packet_sizes, count))
1266
1267 # create IPv6 packets with SRH
1268 # packets with segments-left 1, active segment a7::
1269 packet_header = self.create_packet_header_IPv6_SRH(
1270 sidlist=['a8::', 'a7::', 'a6::'],
1271 segleft=1)
1272 # create traffic stream pg3->pg4
1273 pkts.extend(self.create_stream(self.pg3, self.pg4, packet_header,
1274 self.pg_packet_sizes, count))
1275
1276 # send packets and verify received packets
1277 self.send_and_verify_pkts(self.pg3, pkts, self.pg4,
1278 self.compare_rx_tx_packet_T_Insert)
1279
1280 # remove the interface l2 input feature
1281 r = self.vapi.input_acl_set_interface(
1282 is_add=0,
1283 sw_if_index=self.pg3.sw_if_index,
1284 ip6_table_index=table_index)
1285 self.assertIsNotNone(r,
1286 msg='No response msg for input_acl_set_interface')
1287
1288 # log the ip6 inacl after cleaning
1289 self.logger.info(self.vapi.cli("show inacl type ip6"))
1290
1291 # log the localsid counters
1292 self.logger.info(self.vapi.cli("show sr localsid"))
1293
1294 # remove classifier SR steering
1295 # classifier_steering.remove_vpp_config()
1296 self.logger.info(self.vapi.cli("show sr steering policies"))
1297
1298 # remove SR Policies
1299 self.sr_policy.remove_vpp_config()
1300 self.logger.info(self.vapi.cli("show sr policies"))
1301
1302 # remove classify session and table
1303 r = self.vapi.classify_add_del_session(
1304 0,
1305 table_index,
1306 binascii.unhexlify(match))
1307 self.assertIsNotNone(r, msg='No response msg for add_del_session')
1308
1309 r = self.vapi.classify_add_del_table(
1310 0,
1311 binascii.unhexlify(mask),
1312 table_index=table_index)
1313 self.assertIsNotNone(r, msg='No response msg for add_del_table')
1314
1315 self.logger.info(self.vapi.cli("show classify table"))
1316
1317 # remove FIB entries
1318 # done by tearDown
1319
1320 # cleanup interfaces
1321 self.teardown_interfaces()
1322
Kris Michielsen91074432017-06-22 13:00:20 +02001323 def compare_rx_tx_packet_T_Encaps(self, tx_pkt, rx_pkt):
1324 """ Compare input and output packet after passing T.Encaps
1325
1326 :param tx_pkt: transmitted packet
1327 :param rx_pkt: received packet
1328 """
1329 # T.Encaps updates the headers as follows:
1330 # SR Policy seglist (S3, S2, S1)
1331 # SR Policy source C
1332 # IPv6:
1333 # in: IPv6(A, B2)
1334 # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv6(A, B2)
1335 # IPv6 + SRH:
1336 # in: IPv6(A, B2)SRH(B3, B2, B1; SL=1)
1337 # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv6(a, B2)SRH(B3, B2, B1; SL=1)
1338
1339 # get first (outer) IPv6 header of rx'ed packet
1340 rx_ip = rx_pkt.getlayer(IPv6)
1341 rx_srh = None
1342
1343 tx_ip = tx_pkt.getlayer(IPv6)
1344
1345 # expected segment-list
1346 seglist = self.sr_policy.segments
1347 # reverse list to get order as in SRH
1348 tx_seglist = seglist[::-1]
1349
1350 # get source address of SR Policy
1351 sr_policy_source = self.sr_policy.source
1352
1353 # rx'ed packet should have SRH
1354 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1355 # get SRH
1356 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1357
1358 # received ip.src should be equal to SR Policy source
1359 self.assertEqual(rx_ip.src, sr_policy_source)
1360 # received ip.dst should be equal to expected sidlist[lastentry]
1361 self.assertEqual(rx_ip.dst, tx_seglist[-1])
1362 # rx'ed seglist should be equal to expected seglist
1363 self.assertEqual(rx_srh.addresses, tx_seglist)
1364 # segleft should be equal to size expected seglist-1
1365 self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1366 # segleft should be equal to lastentry
1367 self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1368
1369 # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1370 # except for the hop-limit field
1371 # -> update tx'ed hlim to the expected hlim
1372 tx_ip.hlim = tx_ip.hlim - 1
1373
1374 self.assertEqual(rx_srh.payload, tx_ip)
1375
1376 self.logger.debug("packet verification: SUCCESS")
1377
1378 def compare_rx_tx_packet_T_Encaps_IPv4(self, tx_pkt, rx_pkt):
1379 """ Compare input and output packet after passing T.Encaps for IPv4
1380
1381 :param tx_pkt: transmitted packet
1382 :param rx_pkt: received packet
1383 """
1384 # T.Encaps for IPv4 updates the headers as follows:
1385 # SR Policy seglist (S3, S2, S1)
1386 # SR Policy source C
1387 # IPv4:
1388 # in: IPv4(A, B2)
1389 # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv4(A, B2)
1390
1391 # get first (outer) IPv6 header of rx'ed packet
1392 rx_ip = rx_pkt.getlayer(IPv6)
1393 rx_srh = None
1394
1395 tx_ip = tx_pkt.getlayer(IP)
1396
1397 # expected segment-list
1398 seglist = self.sr_policy.segments
1399 # reverse list to get order as in SRH
1400 tx_seglist = seglist[::-1]
1401
1402 # get source address of SR Policy
1403 sr_policy_source = self.sr_policy.source
1404
1405 # checks common to cases tx with and without SRH
1406 # rx'ed packet should have SRH and IPv4 header
1407 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1408 self.assertTrue(rx_ip.payload.haslayer(IP))
1409 # get SRH
1410 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1411
1412 # received ip.src should be equal to SR Policy source
1413 self.assertEqual(rx_ip.src, sr_policy_source)
1414 # received ip.dst should be equal to sidlist[lastentry]
1415 self.assertEqual(rx_ip.dst, tx_seglist[-1])
1416 # rx'ed seglist should be equal to seglist
1417 self.assertEqual(rx_srh.addresses, tx_seglist)
1418 # segleft should be equal to size seglist-1
1419 self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1420 # segleft should be equal to lastentry
1421 self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1422
1423 # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1424 # except for the ttl field and ip checksum
1425 # -> adjust tx'ed ttl to expected ttl
1426 tx_ip.ttl = tx_ip.ttl - 1
1427 # -> set tx'ed ip checksum to None and let scapy recompute
1428 tx_ip.chksum = None
1429 # read back the pkt (with str()) to force computing these fields
1430 # probably other ways to accomplish this are possible
1431 tx_ip = IP(str(tx_ip))
1432
1433 self.assertEqual(rx_srh.payload, tx_ip)
1434
1435 self.logger.debug("packet verification: SUCCESS")
1436
1437 def compare_rx_tx_packet_T_Encaps_L2(self, tx_pkt, rx_pkt):
1438 """ Compare input and output packet after passing T.Encaps for L2
1439
1440 :param tx_pkt: transmitted packet
1441 :param rx_pkt: received packet
1442 """
1443 # T.Encaps for L2 updates the headers as follows:
1444 # SR Policy seglist (S3, S2, S1)
1445 # SR Policy source C
1446 # L2:
1447 # in: L2
1448 # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)L2
1449
1450 # get first (outer) IPv6 header of rx'ed packet
1451 rx_ip = rx_pkt.getlayer(IPv6)
1452 rx_srh = None
1453
1454 tx_ether = tx_pkt.getlayer(Ether)
1455
1456 # expected segment-list
1457 seglist = self.sr_policy.segments
1458 # reverse list to get order as in SRH
1459 tx_seglist = seglist[::-1]
1460
1461 # get source address of SR Policy
1462 sr_policy_source = self.sr_policy.source
1463
1464 # rx'ed packet should have SRH
1465 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1466 # get SRH
1467 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1468
1469 # received ip.src should be equal to SR Policy source
1470 self.assertEqual(rx_ip.src, sr_policy_source)
1471 # received ip.dst should be equal to sidlist[lastentry]
1472 self.assertEqual(rx_ip.dst, tx_seglist[-1])
1473 # rx'ed seglist should be equal to seglist
1474 self.assertEqual(rx_srh.addresses, tx_seglist)
1475 # segleft should be equal to size seglist-1
1476 self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1477 # segleft should be equal to lastentry
1478 self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1479 # nh should be "No Next Header" (59)
1480 self.assertEqual(rx_srh.nh, 59)
1481
1482 # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1483 self.assertEqual(Ether(str(rx_srh.payload)), tx_ether)
1484
1485 self.logger.debug("packet verification: SUCCESS")
1486
1487 def compare_rx_tx_packet_T_Insert(self, tx_pkt, rx_pkt):
1488 """ Compare input and output packet after passing T.Insert
1489
1490 :param tx_pkt: transmitted packet
1491 :param rx_pkt: received packet
1492 """
1493 # T.Insert updates the headers as follows:
1494 # IPv6:
1495 # in: IPv6(A, B2)
1496 # out: IPv6(A, S1)SRH(B2, S3, S2, S1; SL=3)
1497 # IPv6 + SRH:
1498 # in: IPv6(A, B2)SRH(B3, B2, B1; SL=1)
1499 # out: IPv6(A, S1)SRH(B2, S3, S2, S1; SL=3)SRH(B3, B2, B1; SL=1)
1500
1501 # get first (outer) IPv6 header of rx'ed packet
1502 rx_ip = rx_pkt.getlayer(IPv6)
1503 rx_srh = None
1504 rx_ip2 = None
1505 rx_srh2 = None
1506 rx_ip3 = None
1507 rx_udp = rx_pkt[UDP]
1508
1509 tx_ip = tx_pkt.getlayer(IPv6)
1510 tx_srh = None
1511 tx_ip2 = None
1512 # some packets have been tx'ed with an SRH, some without it
1513 # get SRH if tx'ed packet has it
1514 if tx_pkt.haslayer(IPv6ExtHdrSegmentRouting):
1515 tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1516 tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1517 tx_udp = tx_pkt[UDP]
1518
1519 # expected segment-list (make copy of SR Policy segment list)
1520 seglist = self.sr_policy.segments[:]
1521 # expected seglist has initial dest addr as last segment
1522 seglist.append(tx_ip.dst)
1523 # reverse list to get order as in SRH
1524 tx_seglist = seglist[::-1]
1525
1526 # get source address of SR Policy
1527 sr_policy_source = self.sr_policy.source
1528
1529 # checks common to cases tx with and without SRH
1530 # rx'ed packet should have SRH and only one IPv6 header
1531 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1532 self.assertFalse(rx_ip.payload.haslayer(IPv6))
1533 # get SRH
1534 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1535
1536 # rx'ed ip.src should be equal to tx'ed ip.src
1537 self.assertEqual(rx_ip.src, tx_ip.src)
1538 # rx'ed ip.dst should be equal to sidlist[lastentry]
1539 self.assertEqual(rx_ip.dst, tx_seglist[-1])
1540
1541 # rx'ed seglist should be equal to expected seglist
1542 self.assertEqual(rx_srh.addresses, tx_seglist)
1543 # segleft should be equal to size(expected seglist)-1
1544 self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1545 # segleft should be equal to lastentry
1546 self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1547
1548 if tx_srh: # packet was tx'ed with SRH
1549 # packet should have 2nd SRH
1550 self.assertTrue(rx_srh.payload.haslayer(IPv6ExtHdrSegmentRouting))
1551 # get 2nd SRH
1552 rx_srh2 = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting, 2)
1553
1554 # rx'ed srh2.addresses should be equal to tx'ed srh.addresses
1555 self.assertEqual(rx_srh2.addresses, tx_srh.addresses)
1556 # rx'ed srh2.segleft should be equal to tx'ed srh.segleft
1557 self.assertEqual(rx_srh2.segleft, tx_srh.segleft)
1558 # rx'ed srh2.lastentry should be equal to tx'ed srh.lastentry
1559 self.assertEqual(rx_srh2.lastentry, tx_srh.lastentry)
1560
1561 else: # packet was tx'ed without SRH
1562 # rx packet should have no other SRH
1563 self.assertFalse(rx_srh.payload.haslayer(IPv6ExtHdrSegmentRouting))
1564
1565 # UDP layer should be unchanged
1566 self.assertEqual(rx_udp, tx_udp)
1567
1568 self.logger.debug("packet verification: SUCCESS")
1569
1570 def compare_rx_tx_packet_End(self, tx_pkt, rx_pkt):
1571 """ Compare input and output packet after passing End (without PSP)
1572
1573 :param tx_pkt: transmitted packet
1574 :param rx_pkt: received packet
1575 """
1576 # End (no PSP) updates the headers as follows:
1577 # IPv6 + SRH:
1578 # in: IPv6(A, S1)SRH(S3, S2, S1; SL=2)
1579 # out: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1580
1581 # get first (outer) IPv6 header of rx'ed packet
1582 rx_ip = rx_pkt.getlayer(IPv6)
1583 rx_srh = None
1584 rx_ip2 = None
1585 rx_udp = rx_pkt[UDP]
1586
1587 tx_ip = tx_pkt.getlayer(IPv6)
1588 # we know the packet has been tx'ed
1589 # with an inner IPv6 header and an SRH
1590 tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1591 tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1592 tx_udp = tx_pkt[UDP]
1593
1594 # common checks, regardless of tx segleft value
1595 # rx'ed packet should have 2nd IPv6 header
1596 self.assertTrue(rx_ip.payload.haslayer(IPv6))
1597 # get second (inner) IPv6 header
1598 rx_ip2 = rx_pkt.getlayer(IPv6, 2)
1599
1600 if tx_ip.segleft > 0:
1601 # SRH should NOT have been popped:
1602 # End SID without PSP does not pop SRH if segleft>0
1603 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1604 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1605
1606 # received ip.src should be equal to expected ip.src
1607 self.assertEqual(rx_ip.src, tx_ip.src)
1608 # sidlist should be unchanged
1609 self.assertEqual(rx_srh.addresses, tx_srh.addresses)
1610 # segleft should have been decremented
1611 self.assertEqual(rx_srh.segleft, tx_srh.segleft-1)
1612 # received ip.dst should be equal to sidlist[segleft]
1613 self.assertEqual(rx_ip.dst, rx_srh.addresses[rx_srh.segleft])
1614 # lastentry should be unchanged
1615 self.assertEqual(rx_srh.lastentry, tx_srh.lastentry)
1616 # inner IPv6 packet (ip2) should be unchanged
1617 self.assertEqual(rx_ip2.src, tx_ip2.src)
1618 self.assertEqual(rx_ip2.dst, tx_ip2.dst)
1619 # else: # tx_ip.segleft == 0
1620 # TODO: Does this work with 2 SRHs in ingress packet?
1621
1622 # UDP layer should be unchanged
1623 self.assertEqual(rx_udp, tx_udp)
1624
1625 self.logger.debug("packet verification: SUCCESS")
1626
1627 def compare_rx_tx_packet_End_PSP(self, tx_pkt, rx_pkt):
1628 """ Compare input and output packet after passing End with PSP
1629
1630 :param tx_pkt: transmitted packet
1631 :param rx_pkt: received packet
1632 """
1633 # End (PSP) updates the headers as follows:
1634 # IPv6 + SRH (SL>1):
1635 # in: IPv6(A, S1)SRH(S3, S2, S1; SL=2)
1636 # out: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1637 # IPv6 + SRH (SL=1):
1638 # in: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1639 # out: IPv6(A, S3)
1640
1641 # get first (outer) IPv6 header of rx'ed packet
1642 rx_ip = rx_pkt.getlayer(IPv6)
1643 rx_srh = None
1644 rx_ip2 = None
1645 rx_udp = rx_pkt[UDP]
1646
1647 tx_ip = tx_pkt.getlayer(IPv6)
1648 # we know the packet has been tx'ed
1649 # with an inner IPv6 header and an SRH
1650 tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1651 tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1652 tx_udp = tx_pkt[UDP]
1653
1654 # common checks, regardless of tx segleft value
1655 self.assertTrue(rx_ip.payload.haslayer(IPv6))
1656 rx_ip2 = rx_pkt.getlayer(IPv6, 2)
1657 # inner IPv6 packet (ip2) should be unchanged
1658 self.assertEqual(rx_ip2.src, tx_ip2.src)
1659 self.assertEqual(rx_ip2.dst, tx_ip2.dst)
1660
1661 if tx_ip.segleft > 1:
1662 # SRH should NOT have been popped:
1663 # End SID with PSP does not pop SRH if segleft>1
1664 # rx'ed packet should have SRH
1665 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1666 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1667
1668 # received ip.src should be equal to expected ip.src
1669 self.assertEqual(rx_ip.src, tx_ip.src)
1670 # sidlist should be unchanged
1671 self.assertEqual(rx_srh.addresses, tx_srh.addresses)
1672 # segleft should have been decremented
1673 self.assertEqual(rx_srh.segleft, tx_srh.segleft-1)
1674 # received ip.dst should be equal to sidlist[segleft]
1675 self.assertEqual(rx_ip.dst, rx_srh.addresses[rx_srh.segleft])
1676 # lastentry should be unchanged
1677 self.assertEqual(rx_srh.lastentry, tx_srh.lastentry)
1678
1679 else: # tx_ip.segleft <= 1
1680 # SRH should have been popped:
1681 # End SID with PSP and segleft=1 pops SRH
1682 # the two IPv6 headers are still present
1683 # outer IPv6 header has DA == last segment of popped SRH
1684 # SRH should not be present
1685 self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1686 # outer IPv6 header ip.src should be equal to tx'ed ip.src
1687 self.assertEqual(rx_ip.src, tx_ip.src)
1688 # outer IPv6 header ip.dst should be = to tx'ed sidlist[segleft-1]
1689 self.assertEqual(rx_ip.dst, tx_srh.addresses[tx_srh.segleft-1])
1690
1691 # UDP layer should be unchanged
1692 self.assertEqual(rx_udp, tx_udp)
1693
1694 self.logger.debug("packet verification: SUCCESS")
1695
1696 def compare_rx_tx_packet_End_DX6(self, tx_pkt, rx_pkt):
1697 """ Compare input and output packet after passing End.DX6
1698
1699 :param tx_pkt: transmitted packet
1700 :param rx_pkt: received packet
1701 """
1702 # End.DX6 updates the headers as follows:
1703 # IPv6 + SRH (SL=0):
1704 # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)IPv6(B, D)
1705 # out: IPv6(B, D)
1706 # IPv6:
1707 # in: IPv6(A, S3)IPv6(B, D)
1708 # out: IPv6(B, D)
1709
1710 # get first (outer) IPv6 header of rx'ed packet
1711 rx_ip = rx_pkt.getlayer(IPv6)
1712
1713 tx_ip = tx_pkt.getlayer(IPv6)
1714 tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1715
1716 # verify if rx'ed packet has no SRH
1717 self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1718
1719 # the whole rx_ip pkt should be equal to tx_ip2
1720 # except for the hlim field
1721 # -> adjust tx'ed hlim to expected hlim
1722 tx_ip2.hlim = tx_ip2.hlim - 1
1723
1724 self.assertEqual(rx_ip, tx_ip2)
1725
1726 self.logger.debug("packet verification: SUCCESS")
1727
1728 def compare_rx_tx_packet_End_DX4(self, tx_pkt, rx_pkt):
1729 """ Compare input and output packet after passing End.DX4
1730
1731 :param tx_pkt: transmitted packet
1732 :param rx_pkt: received packet
1733 """
1734 # End.DX4 updates the headers as follows:
1735 # IPv6 + SRH (SL=0):
1736 # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)IPv4(B, D)
1737 # out: IPv4(B, D)
1738 # IPv6:
1739 # in: IPv6(A, S3)IPv4(B, D)
1740 # out: IPv4(B, D)
1741
1742 # get IPv4 header of rx'ed packet
1743 rx_ip = rx_pkt.getlayer(IP)
1744
1745 tx_ip = tx_pkt.getlayer(IPv6)
1746 tx_ip2 = tx_pkt.getlayer(IP)
1747
1748 # verify if rx'ed packet has no SRH
1749 self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1750
1751 # the whole rx_ip pkt should be equal to tx_ip2
1752 # except for the ttl field and ip checksum
1753 # -> adjust tx'ed ttl to expected ttl
1754 tx_ip2.ttl = tx_ip2.ttl - 1
1755 # -> set tx'ed ip checksum to None and let scapy recompute
1756 tx_ip2.chksum = None
1757 # read back the pkt (with str()) to force computing these fields
1758 # probably other ways to accomplish this are possible
1759 tx_ip2 = IP(str(tx_ip2))
1760
1761 self.assertEqual(rx_ip, tx_ip2)
1762
1763 self.logger.debug("packet verification: SUCCESS")
1764
1765 def compare_rx_tx_packet_End_DX2(self, tx_pkt, rx_pkt):
1766 """ Compare input and output packet after passing End.DX2
1767
1768 :param tx_pkt: transmitted packet
1769 :param rx_pkt: received packet
1770 """
1771 # End.DX2 updates the headers as follows:
1772 # IPv6 + SRH (SL=0):
1773 # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)L2
1774 # out: L2
1775 # IPv6:
1776 # in: IPv6(A, S3)L2
1777 # out: L2
1778
1779 # get IPv4 header of rx'ed packet
1780 rx_eth = rx_pkt.getlayer(Ether)
1781
1782 tx_ip = tx_pkt.getlayer(IPv6)
1783 # we can't just get the 2nd Ether layer
1784 # get the Raw content and dissect it as Ether
1785 tx_eth1 = Ether(str(tx_pkt[Raw]))
1786
1787 # verify if rx'ed packet has no SRH
1788 self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1789
1790 # the whole rx_eth pkt should be equal to tx_eth1
1791 self.assertEqual(rx_eth, tx_eth1)
1792
1793 self.logger.debug("packet verification: SUCCESS")
1794
1795 def create_stream(self, src_if, dst_if, packet_header, packet_sizes,
1796 count):
1797 """Create SRv6 input packet stream for defined interface.
1798
1799 :param VppInterface src_if: Interface to create packet stream for
1800 :param VppInterface dst_if: destination interface of packet stream
1801 :param packet_header: Layer3 scapy packet headers,
1802 L2 is added when not provided,
1803 Raw(payload) with packet_info is added
1804 :param list packet_sizes: packet stream pckt sizes,sequentially applied
1805 to packets in stream have
1806 :param int count: number of packets in packet stream
1807 :return: list of packets
1808 """
1809 self.logger.info("Creating packets")
1810 pkts = []
1811 for i in range(0, count-1):
1812 payload_info = self.create_packet_info(src_if, dst_if)
1813 self.logger.debug(
1814 "Creating packet with index %d" % (payload_info.index))
1815 payload = self.info_to_payload(payload_info)
1816 # add L2 header if not yet provided in packet_header
1817 if packet_header.getlayer(0).name == 'Ethernet':
1818 p = (packet_header /
1819 Raw(payload))
1820 else:
1821 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
1822 packet_header /
1823 Raw(payload))
1824 size = packet_sizes[i % len(packet_sizes)]
1825 self.logger.debug("Packet size %d" % (size))
1826 self.extend_packet(p, size)
1827 # we need to store the packet with the automatic fields computed
1828 # read back the dumped packet (with str())
1829 # to force computing these fields
1830 # probably other ways are possible
1831 p = Ether(str(p))
1832 payload_info.data = p.copy()
1833 self.logger.debug(ppp("Created packet:", p))
1834 pkts.append(p)
1835 self.logger.info("Done creating packets")
1836 return pkts
1837
1838 def send_and_verify_pkts(self, input, pkts, output, compare_func):
1839 """Send packets and verify received packets using compare_func
1840
1841 :param input: ingress interface of DUT
1842 :param pkts: list of packets to transmit
1843 :param output: egress interface of DUT
1844 :param compare_func: function to compare in and out packets
1845 """
1846 # add traffic stream to input interface
1847 input.add_stream(pkts)
1848
1849 # enable capture on all interfaces
1850 self.pg_enable_capture(self.pg_interfaces)
1851
1852 # start traffic
1853 self.logger.info("Starting traffic")
1854 self.pg_start()
1855
1856 # get output capture
1857 self.logger.info("Getting packet capture")
1858 capture = output.get_capture()
1859
1860 # assert nothing was captured on input interface
1861 input.assert_nothing_captured()
1862
1863 # verify captured packets
1864 self.verify_captured_pkts(output, capture, compare_func)
1865
1866 def create_packet_header_IPv6(self, dst):
1867 """Create packet header: IPv6 header, UDP header
1868
1869 :param dst: IPv6 destination address
1870
1871 IPv6 source address is 1234::1
1872 UDP source port and destination port are 1234
1873 """
1874
1875 p = (IPv6(src='1234::1', dst=dst) /
1876 UDP(sport=1234, dport=1234))
1877 return p
1878
1879 def create_packet_header_IPv6_SRH(self, sidlist, segleft):
1880 """Create packet header: IPv6 header with SRH, UDP header
1881
1882 :param list sidlist: segment list
1883 :param int segleft: segments-left field value
1884
1885 IPv6 destination address is set to sidlist[segleft]
1886 IPv6 source addresses are 1234::1 and 4321::1
1887 UDP source port and destination port are 1234
1888 """
1889
1890 p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
1891 IPv6ExtHdrSegmentRouting(addresses=sidlist) /
1892 UDP(sport=1234, dport=1234))
1893 return p
1894
1895 def create_packet_header_IPv6_SRH_IPv6(self, dst, sidlist, segleft):
1896 """Create packet header: IPv6 encapsulated in SRv6:
1897 IPv6 header with SRH, IPv6 header, UDP header
1898
1899 :param ipv6address dst: inner IPv6 destination address
1900 :param list sidlist: segment list of outer IPv6 SRH
1901 :param int segleft: segments-left field of outer IPv6 SRH
1902
1903 Outer IPv6 destination address is set to sidlist[segleft]
1904 IPv6 source addresses are 1234::1 and 4321::1
1905 UDP source port and destination port are 1234
1906 """
1907
1908 p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
1909 IPv6ExtHdrSegmentRouting(addresses=sidlist,
1910 segleft=segleft, nh=41) /
1911 IPv6(src='4321::1', dst=dst) /
1912 UDP(sport=1234, dport=1234))
1913 return p
1914
1915 def create_packet_header_IPv6_IPv6(self, dst_inner, dst_outer):
1916 """Create packet header: IPv6 encapsulated in IPv6:
1917 IPv6 header, IPv6 header, UDP header
1918
1919 :param ipv6address dst_inner: inner IPv6 destination address
1920 :param ipv6address dst_outer: outer IPv6 destination address
1921
1922 IPv6 source addresses are 1234::1 and 4321::1
1923 UDP source port and destination port are 1234
1924 """
1925
1926 p = (IPv6(src='1234::1', dst=dst_outer) /
1927 IPv6(src='4321::1', dst=dst_inner) /
1928 UDP(sport=1234, dport=1234))
1929 return p
1930
1931 def create_packet_header_IPv6_SRH_SRH_IPv6(self, dst, sidlist1, segleft1,
1932 sidlist2, segleft2):
1933 """Create packet header: IPv6 encapsulated in SRv6 with 2 SRH:
1934 IPv6 header with SRH, 2nd SRH, IPv6 header, UDP header
1935
1936 :param ipv6address dst: inner IPv6 destination address
1937 :param list sidlist1: segment list of outer IPv6 SRH
1938 :param int segleft1: segments-left field of outer IPv6 SRH
1939 :param list sidlist2: segment list of inner IPv6 SRH
1940 :param int segleft2: segments-left field of inner IPv6 SRH
1941
1942 Outer IPv6 destination address is set to sidlist[segleft]
1943 IPv6 source addresses are 1234::1 and 4321::1
1944 UDP source port and destination port are 1234
1945 """
1946
1947 p = (IPv6(src='1234::1', dst=sidlist1[segleft1]) /
1948 IPv6ExtHdrSegmentRouting(addresses=sidlist1,
1949 segleft=segleft1, nh=43) /
1950 IPv6ExtHdrSegmentRouting(addresses=sidlist2,
1951 segleft=segleft2, nh=41) /
1952 IPv6(src='4321::1', dst=dst) /
1953 UDP(sport=1234, dport=1234))
1954 return p
1955
1956 def create_packet_header_IPv4(self, dst):
1957 """Create packet header: IPv4 header, UDP header
1958
1959 :param dst: IPv4 destination address
1960
1961 IPv4 source address is 123.1.1.1
1962 UDP source port and destination port are 1234
1963 """
1964
1965 p = (IP(src='123.1.1.1', dst=dst) /
1966 UDP(sport=1234, dport=1234))
1967 return p
1968
1969 def create_packet_header_IPv6_IPv4(self, dst_inner, dst_outer):
1970 """Create packet header: IPv4 encapsulated in IPv6:
1971 IPv6 header, IPv4 header, UDP header
1972
1973 :param ipv4address dst_inner: inner IPv4 destination address
1974 :param ipv6address dst_outer: outer IPv6 destination address
1975
1976 IPv6 source address is 1234::1
1977 IPv4 source address is 123.1.1.1
1978 UDP source port and destination port are 1234
1979 """
1980
1981 p = (IPv6(src='1234::1', dst=dst_outer) /
1982 IP(src='123.1.1.1', dst=dst_inner) /
1983 UDP(sport=1234, dport=1234))
1984 return p
1985
1986 def create_packet_header_IPv6_SRH_IPv4(self, dst, sidlist, segleft):
1987 """Create packet header: IPv4 encapsulated in SRv6:
1988 IPv6 header with SRH, IPv4 header, UDP header
1989
1990 :param ipv4address dst: inner IPv4 destination address
1991 :param list sidlist: segment list of outer IPv6 SRH
1992 :param int segleft: segments-left field of outer IPv6 SRH
1993
1994 Outer IPv6 destination address is set to sidlist[segleft]
1995 IPv6 source address is 1234::1
1996 IPv4 source address is 123.1.1.1
1997 UDP source port and destination port are 1234
1998 """
1999
2000 p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
2001 IPv6ExtHdrSegmentRouting(addresses=sidlist,
2002 segleft=segleft, nh=4) /
2003 IP(src='123.1.1.1', dst=dst) /
2004 UDP(sport=1234, dport=1234))
2005 return p
2006
2007 def create_packet_header_L2(self, vlan=0):
2008 """Create packet header: L2 header
2009
2010 :param vlan: if vlan!=0 then add 802.1q header
2011 """
2012 # Note: the dst addr ('00:55:44:33:22:11') is used in
2013 # the compare function compare_rx_tx_packet_T_Encaps_L2
2014 # to detect presence of L2 in SRH payload
2015 p = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
2016 etype = 0x8137 # IPX
2017 if vlan:
2018 # add 802.1q layer
2019 p /= Dot1Q(vlan=vlan, type=etype)
2020 else:
2021 p.type = etype
2022 return p
2023
2024 def create_packet_header_IPv6_SRH_L2(self, sidlist, segleft, vlan=0):
2025 """Create packet header: L2 encapsulated in SRv6:
2026 IPv6 header with SRH, L2
2027
2028 :param list sidlist: segment list of outer IPv6 SRH
2029 :param int segleft: segments-left field of outer IPv6 SRH
2030 :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
2031
2032 Outer IPv6 destination address is set to sidlist[segleft]
2033 IPv6 source address is 1234::1
2034 """
2035 eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
2036 etype = 0x8137 # IPX
2037 if vlan:
2038 # add 802.1q layer
2039 eth /= Dot1Q(vlan=vlan, type=etype)
2040 else:
2041 eth.type = etype
2042
2043 p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
2044 IPv6ExtHdrSegmentRouting(addresses=sidlist,
2045 segleft=segleft, nh=59) /
2046 eth)
2047 return p
2048
2049 def create_packet_header_IPv6_L2(self, dst_outer, vlan=0):
2050 """Create packet header: L2 encapsulated in IPv6:
2051 IPv6 header, L2
2052
2053 :param ipv6address dst_outer: outer IPv6 destination address
2054 :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
2055 """
2056 eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
2057 etype = 0x8137 # IPX
2058 if vlan:
2059 # add 802.1q layer
2060 eth /= Dot1Q(vlan=vlan, type=etype)
2061 else:
2062 eth.type = etype
2063
2064 p = (IPv6(src='1234::1', dst=dst_outer, nh=59) / eth)
2065 return p
2066
2067 def get_payload_info(self, packet):
2068 """ Extract the payload_info from the packet
2069 """
2070 # in most cases, payload_info is in packet[Raw]
2071 # but packet[Raw] gives the complete payload
2072 # (incl L2 header) for the T.Encaps L2 case
2073 try:
2074 payload_info = self.payload_to_info(str(packet[Raw]))
2075
2076 except:
2077 # remote L2 header from packet[Raw]:
2078 # take packet[Raw], convert it to an Ether layer
2079 # and then extract Raw from it
2080 payload_info = self.payload_to_info(
2081 str(Ether(str(packet[Raw]))[Raw]))
2082
2083 return payload_info
2084
2085 def verify_captured_pkts(self, dst_if, capture, compare_func):
2086 """
2087 Verify captured packet stream for specified interface.
2088 Compare ingress with egress packets using the specified compare fn
2089
2090 :param dst_if: egress interface of DUT
2091 :param capture: captured packets
2092 :param compare_func: function to compare in and out packet
2093 """
2094 self.logger.info("Verifying capture on interface %s using function %s"
2095 % (dst_if.name, compare_func.func_name))
2096
2097 last_info = dict()
2098 for i in self.pg_interfaces:
2099 last_info[i.sw_if_index] = None
2100 dst_sw_if_index = dst_if.sw_if_index
2101
2102 for packet in capture:
2103 try:
2104 # extract payload_info from packet's payload
2105 payload_info = self.get_payload_info(packet)
2106 packet_index = payload_info.index
2107
2108 self.logger.debug("Verifying packet with index %d"
2109 % (packet_index))
2110 # packet should have arrived on the expected interface
2111 self.assertEqual(payload_info.dst, dst_sw_if_index)
2112 self.logger.debug(
2113 "Got packet on interface %s: src=%u (idx=%u)" %
2114 (dst_if.name, payload_info.src, packet_index))
2115
2116 # search for payload_info with same src and dst if_index
2117 # this will give us the transmitted packet
2118 next_info = self.get_next_packet_info_for_interface2(
2119 payload_info.src, dst_sw_if_index,
2120 last_info[payload_info.src])
2121 last_info[payload_info.src] = next_info
2122 # next_info should not be None
2123 self.assertTrue(next_info is not None)
2124 # index of tx and rx packets should be equal
2125 self.assertEqual(packet_index, next_info.index)
2126 # data field of next_info contains the tx packet
2127 txed_packet = next_info.data
2128
2129 self.logger.debug(ppp("Transmitted packet:",
2130 txed_packet)) # ppp=Pretty Print Packet
2131
2132 self.logger.debug(ppp("Received packet:", packet))
2133
2134 # compare rcvd packet with expected packet using compare_func
2135 compare_func(txed_packet, packet)
2136
2137 except:
2138 print packet.command()
2139 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2140 raise
2141
2142 # have all expected packets arrived?
2143 for i in self.pg_interfaces:
2144 remaining_packet = self.get_next_packet_info_for_interface2(
2145 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
2146 self.assertTrue(remaining_packet is None,
2147 "Interface %s: Packet expected from interface %s "
2148 "didn't arrive" % (dst_if.name, i.name))
2149
2150
2151if __name__ == '__main__':
2152 unittest.main(testRunner=VppTestRunner)