blob: a31b30eb6192db7a0c492c6a1df8165429e784f1 [file] [log] [blame]
Kris Michielsen91074432017-06-22 13:00:20 +02001#!/usr/bin/env python
2
3import unittest
4from socket import AF_INET6
5
6from framework import VppTestCase, VppTestRunner
Florin Corasd0a59722017-10-15 17:41:21 +00007from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
Kris Michielsen91074432017-06-22 13:00:20 +02008from vpp_srv6 import SRv6LocalSIDBehaviors, VppSRv6LocalSID, VppSRv6Policy, \
9 SRv6PolicyType, VppSRv6Steering, SRv6PolicySteeringTypes
10
11from scapy.packet import Raw
12from scapy.layers.l2 import Ether, Dot1Q
13from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
14from scapy.layers.inet import IP, UDP
15
16from scapy.utils import inet_pton, inet_ntop
17
18from util import ppp
19
20
21class TestSRv6(VppTestCase):
22 """ SRv6 Test Case """
23
24 @classmethod
25 def setUpClass(self):
26 super(TestSRv6, self).setUpClass()
27
28 def setUp(self):
29 """ Perform test setup before each test case.
30 """
31 super(TestSRv6, self).setUp()
32
33 # packet sizes, inclusive L2 overhead
34 self.pg_packet_sizes = [64, 512, 1518, 9018]
35
36 # reset packet_infos
37 self.reset_packet_infos()
38
39 def tearDown(self):
40 """ Clean up test setup after each test case.
41 """
42 self.teardown_interfaces()
43
44 super(TestSRv6, self).tearDown()
45
46 def configure_interface(self,
47 interface,
48 ipv6=False, ipv4=False,
49 ipv6_table_id=0, ipv4_table_id=0):
50 """ Configure interface.
51 :param ipv6: configure IPv6 on interface
52 :param ipv4: configure IPv4 on interface
53 :param ipv6_table_id: FIB table_id for IPv6
54 :param ipv4_table_id: FIB table_id for IPv4
55 """
56 self.logger.debug("Configuring interface %s" % (interface.name))
57 if ipv6:
58 self.logger.debug("Configuring IPv6")
59 interface.set_table_ip6(ipv6_table_id)
60 interface.config_ip6()
61 interface.resolve_ndp(timeout=5)
62 if ipv4:
63 self.logger.debug("Configuring IPv4")
64 interface.set_table_ip4(ipv4_table_id)
65 interface.config_ip4()
66 interface.resolve_arp()
67 interface.admin_up()
68
69 def setup_interfaces(self, ipv6=[], ipv4=[],
70 ipv6_table_id=[], ipv4_table_id=[]):
71 """ Create and configure interfaces.
72
73 :param ipv6: list of interface IPv6 capabilities
74 :param ipv4: list of interface IPv4 capabilities
75 :param ipv6_table_id: list of intf IPv6 FIB table_ids
76 :param ipv4_table_id: list of intf IPv4 FIB table_ids
77 :returns: List of created interfaces.
78 """
79 # how many interfaces?
80 if len(ipv6):
81 count = len(ipv6)
82 else:
83 count = len(ipv4)
84 self.logger.debug("Creating and configuring %d interfaces" % (count))
85
86 # fill up ipv6 and ipv4 lists if needed
87 # not enabled (False) is the default
88 if len(ipv6) < count:
89 ipv6 += (count - len(ipv6)) * [False]
90 if len(ipv4) < count:
91 ipv4 += (count - len(ipv4)) * [False]
92
93 # fill up table_id lists if needed
94 # table_id 0 (global) is the default
95 if len(ipv6_table_id) < count:
96 ipv6_table_id += (count - len(ipv6_table_id)) * [0]
97 if len(ipv4_table_id) < count:
98 ipv4_table_id += (count - len(ipv4_table_id)) * [0]
99
100 # create 'count' pg interfaces
101 self.create_pg_interfaces(range(count))
102
103 # setup all interfaces
104 for i in range(count):
105 intf = self.pg_interfaces[i]
106 self.configure_interface(intf,
107 ipv6[i], ipv4[i],
108 ipv6_table_id[i], ipv4_table_id[i])
109
110 if any(ipv6):
111 self.logger.debug(self.vapi.cli("show ip6 neighbors"))
112 if any(ipv4):
113 self.logger.debug(self.vapi.cli("show ip arp"))
114 self.logger.debug(self.vapi.cli("show interface"))
115 self.logger.debug(self.vapi.cli("show hardware"))
116
117 return self.pg_interfaces
118
119 def teardown_interfaces(self):
120 """ Unconfigure and bring down interface.
121 """
122 self.logger.debug("Tearing down interfaces")
123 # tear down all interfaces
124 # AFAIK they cannot be deleted
125 for i in self.pg_interfaces:
126 self.logger.debug("Tear down interface %s" % (i.name))
Neale Rannsf9342022017-10-13 02:43:33 -0700127 i.admin_down()
Florin Corasd0a59722017-10-15 17:41:21 +0000128 i.unconfig()
Kris Michielsen91074432017-06-22 13:00:20 +0200129
130 def test_SRv6_T_Encaps(self):
131 """ Test SRv6 Transit.Encaps behavior for IPv6.
132 """
133 # send traffic to one destination interface
134 # source and destination are IPv6 only
135 self.setup_interfaces(ipv6=[True, True])
136
137 # configure FIB entries
138 route = VppIpRoute(self, "a4::", 64,
139 [VppRoutePath(self.pg1.remote_ip6,
140 self.pg1.sw_if_index,
141 proto=DpoProto.DPO_PROTO_IP6)],
142 is_ip6=1)
143 route.add_vpp_config()
144
145 # configure encaps IPv6 source address
146 # needs to be done before SR Policy config
147 # TODO: API?
148 self.vapi.cli("set sr encaps source addr a3::")
149
150 bsid = 'a3::9999:1'
151 # configure SRv6 Policy
152 # Note: segment list order: first -> last
153 sr_policy = VppSRv6Policy(
154 self, bsid=bsid,
155 is_encap=1,
156 sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
157 weight=1, fib_table=0,
158 segments=['a4::', 'a5::', 'a6::c7'],
159 source='a3::')
160 sr_policy.add_vpp_config()
161 self.sr_policy = sr_policy
162
163 # log the sr policies
164 self.logger.info(self.vapi.cli("show sr policies"))
165
166 # steer IPv6 traffic to a7::/64 into SRv6 Policy
167 # use the bsid of the above self.sr_policy
168 pol_steering = VppSRv6Steering(
169 self,
170 bsid=self.sr_policy.bsid,
171 prefix="a7::", mask_width=64,
172 traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6,
173 sr_policy_index=0, table_id=0,
174 sw_if_index=0)
175 pol_steering.add_vpp_config()
176
177 # log the sr steering policies
178 self.logger.info(self.vapi.cli("show sr steering policies"))
179
180 # create packets
181 count = len(self.pg_packet_sizes)
182 dst_inner = 'a7::1234'
183 pkts = []
184
185 # create IPv6 packets without SRH
186 packet_header = self.create_packet_header_IPv6(dst_inner)
187 # create traffic stream pg0->pg1
188 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
189 self.pg_packet_sizes, count))
190
191 # create IPv6 packets with SRH
192 # packets with segments-left 1, active segment a7::
193 packet_header = self.create_packet_header_IPv6_SRH(
194 sidlist=['a8::', 'a7::', 'a6::'],
195 segleft=1)
196 # create traffic stream pg0->pg1
197 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
198 self.pg_packet_sizes, count))
199
200 # create IPv6 packets with SRH and IPv6
201 # packets with segments-left 1, active segment a7::
202 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
203 dst_inner,
204 sidlist=['a8::', 'a7::', 'a6::'],
205 segleft=1)
206 # create traffic stream pg0->pg1
207 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
208 self.pg_packet_sizes, count))
209
210 # send packets and verify received packets
211 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
212 self.compare_rx_tx_packet_T_Encaps)
213
214 # log the localsid counters
215 self.logger.info(self.vapi.cli("show sr localsid"))
216
217 # remove SR steering
218 pol_steering.remove_vpp_config()
219 self.logger.info(self.vapi.cli("show sr steering policies"))
220
221 # remove SR Policies
222 self.sr_policy.remove_vpp_config()
223 self.logger.info(self.vapi.cli("show sr policies"))
224
225 # remove FIB entries
226 # done by tearDown
227
228 # cleanup interfaces
229 self.teardown_interfaces()
230
231 def test_SRv6_T_Insert(self):
232 """ Test SRv6 Transit.Insert behavior (IPv6 only).
233 """
234 # send traffic to one destination interface
235 # source and destination are IPv6 only
236 self.setup_interfaces(ipv6=[True, True])
237
238 # configure FIB entries
239 route = VppIpRoute(self, "a4::", 64,
240 [VppRoutePath(self.pg1.remote_ip6,
241 self.pg1.sw_if_index,
242 proto=DpoProto.DPO_PROTO_IP6)],
243 is_ip6=1)
244 route.add_vpp_config()
245
246 # configure encaps IPv6 source address
247 # needs to be done before SR Policy config
248 # TODO: API?
249 self.vapi.cli("set sr encaps source addr a3::")
250
251 bsid = 'a3::9999:1'
252 # configure SRv6 Policy
253 # Note: segment list order: first -> last
254 sr_policy = VppSRv6Policy(
255 self, bsid=bsid,
256 is_encap=0,
257 sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
258 weight=1, fib_table=0,
259 segments=['a4::', 'a5::', 'a6::c7'],
260 source='a3::')
261 sr_policy.add_vpp_config()
262 self.sr_policy = sr_policy
263
264 # log the sr policies
265 self.logger.info(self.vapi.cli("show sr policies"))
266
267 # steer IPv6 traffic to a7::/64 into SRv6 Policy
268 # use the bsid of the above self.sr_policy
269 pol_steering = VppSRv6Steering(
270 self,
271 bsid=self.sr_policy.bsid,
272 prefix="a7::", mask_width=64,
273 traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6,
274 sr_policy_index=0, table_id=0,
275 sw_if_index=0)
276 pol_steering.add_vpp_config()
277
278 # log the sr steering policies
279 self.logger.info(self.vapi.cli("show sr steering policies"))
280
281 # create packets
282 count = len(self.pg_packet_sizes)
283 dst_inner = 'a7::1234'
284 pkts = []
285
286 # create IPv6 packets without SRH
287 packet_header = self.create_packet_header_IPv6(dst_inner)
288 # create traffic stream pg0->pg1
289 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
290 self.pg_packet_sizes, count))
291
292 # create IPv6 packets with SRH
293 # packets with segments-left 1, active segment a7::
294 packet_header = self.create_packet_header_IPv6_SRH(
295 sidlist=['a8::', 'a7::', 'a6::'],
296 segleft=1)
297 # create traffic stream pg0->pg1
298 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
299 self.pg_packet_sizes, count))
300
301 # send packets and verify received packets
302 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
303 self.compare_rx_tx_packet_T_Insert)
304
305 # log the localsid counters
306 self.logger.info(self.vapi.cli("show sr localsid"))
307
308 # remove SR steering
309 pol_steering.remove_vpp_config()
310 self.logger.info(self.vapi.cli("show sr steering policies"))
311
312 # remove SR Policies
313 self.sr_policy.remove_vpp_config()
314 self.logger.info(self.vapi.cli("show sr policies"))
315
316 # remove FIB entries
317 # done by tearDown
318
319 # cleanup interfaces
320 self.teardown_interfaces()
321
322 def test_SRv6_T_Encaps_IPv4(self):
323 """ Test SRv6 Transit.Encaps behavior for IPv4.
324 """
325 # send traffic to one destination interface
326 # source interface is IPv4 only
327 # destination interface is IPv6 only
328 self.setup_interfaces(ipv6=[False, True], ipv4=[True, False])
329
330 # configure FIB entries
331 route = VppIpRoute(self, "a4::", 64,
332 [VppRoutePath(self.pg1.remote_ip6,
333 self.pg1.sw_if_index,
334 proto=DpoProto.DPO_PROTO_IP6)],
335 is_ip6=1)
336 route.add_vpp_config()
337
338 # configure encaps IPv6 source address
339 # needs to be done before SR Policy config
340 # TODO: API?
341 self.vapi.cli("set sr encaps source addr a3::")
342
343 bsid = 'a3::9999:1'
344 # configure SRv6 Policy
345 # Note: segment list order: first -> last
346 sr_policy = VppSRv6Policy(
347 self, bsid=bsid,
348 is_encap=1,
349 sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
350 weight=1, fib_table=0,
351 segments=['a4::', 'a5::', 'a6::c7'],
352 source='a3::')
353 sr_policy.add_vpp_config()
354 self.sr_policy = sr_policy
355
356 # log the sr policies
357 self.logger.info(self.vapi.cli("show sr policies"))
358
359 # steer IPv4 traffic to 7.1.1.0/24 into SRv6 Policy
360 # use the bsid of the above self.sr_policy
361 pol_steering = VppSRv6Steering(
362 self,
363 bsid=self.sr_policy.bsid,
364 prefix="7.1.1.0", mask_width=24,
365 traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV4,
366 sr_policy_index=0, table_id=0,
367 sw_if_index=0)
368 pol_steering.add_vpp_config()
369
370 # log the sr steering policies
371 self.logger.info(self.vapi.cli("show sr steering policies"))
372
373 # create packets
374 count = len(self.pg_packet_sizes)
375 dst_inner = '7.1.1.123'
376 pkts = []
377
378 # create IPv4 packets
379 packet_header = self.create_packet_header_IPv4(dst_inner)
380 # create traffic stream pg0->pg1
381 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
382 self.pg_packet_sizes, count))
383
384 # send packets and verify received packets
385 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
386 self.compare_rx_tx_packet_T_Encaps_IPv4)
387
388 # log the localsid counters
389 self.logger.info(self.vapi.cli("show sr localsid"))
390
391 # remove SR steering
392 pol_steering.remove_vpp_config()
393 self.logger.info(self.vapi.cli("show sr steering policies"))
394
395 # remove SR Policies
396 self.sr_policy.remove_vpp_config()
397 self.logger.info(self.vapi.cli("show sr policies"))
398
399 # remove FIB entries
400 # done by tearDown
401
402 # cleanup interfaces
403 self.teardown_interfaces()
404
405 @unittest.skip("VPP crashes after running this test")
406 def test_SRv6_T_Encaps_L2(self):
407 """ Test SRv6 Transit.Encaps behavior for L2.
408 """
409 # send traffic to one destination interface
410 # source interface is IPv4 only TODO?
411 # destination interface is IPv6 only
412 self.setup_interfaces(ipv6=[False, True], ipv4=[False, False])
413
414 # configure FIB entries
415 route = VppIpRoute(self, "a4::", 64,
416 [VppRoutePath(self.pg1.remote_ip6,
417 self.pg1.sw_if_index,
418 proto=DpoProto.DPO_PROTO_IP6)],
419 is_ip6=1)
420 route.add_vpp_config()
421
422 # configure encaps IPv6 source address
423 # needs to be done before SR Policy config
424 # TODO: API?
425 self.vapi.cli("set sr encaps source addr a3::")
426
427 bsid = 'a3::9999:1'
428 # configure SRv6 Policy
429 # Note: segment list order: first -> last
430 sr_policy = VppSRv6Policy(
431 self, bsid=bsid,
432 is_encap=1,
433 sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
434 weight=1, fib_table=0,
435 segments=['a4::', 'a5::', 'a6::c7'],
436 source='a3::')
437 sr_policy.add_vpp_config()
438 self.sr_policy = sr_policy
439
440 # log the sr policies
441 self.logger.info(self.vapi.cli("show sr policies"))
442
443 # steer L2 traffic into SRv6 Policy
444 # use the bsid of the above self.sr_policy
445 pol_steering = VppSRv6Steering(
446 self,
447 bsid=self.sr_policy.bsid,
448 prefix="::", mask_width=0,
449 traffic_type=SRv6PolicySteeringTypes.SR_STEER_L2,
450 sr_policy_index=0, table_id=0,
451 sw_if_index=self.pg0.sw_if_index)
452 pol_steering.add_vpp_config()
453
454 # log the sr steering policies
455 self.logger.info(self.vapi.cli("show sr steering policies"))
456
457 # create packets
458 count = len(self.pg_packet_sizes)
459 pkts = []
460
461 # create L2 packets without dot1q header
462 packet_header = self.create_packet_header_L2()
463 # create traffic stream pg0->pg1
464 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
465 self.pg_packet_sizes, count))
466
467 # create L2 packets with dot1q header
468 packet_header = self.create_packet_header_L2(vlan=123)
469 # create traffic stream pg0->pg1
470 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
471 self.pg_packet_sizes, count))
472
473 # send packets and verify received packets
474 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
475 self.compare_rx_tx_packet_T_Encaps_L2)
476
477 # log the localsid counters
478 self.logger.info(self.vapi.cli("show sr localsid"))
479
480 # remove SR steering
481 pol_steering.remove_vpp_config()
482 self.logger.info(self.vapi.cli("show sr steering policies"))
483
484 # remove SR Policies
485 self.sr_policy.remove_vpp_config()
486 self.logger.info(self.vapi.cli("show sr policies"))
487
488 # remove FIB entries
489 # done by tearDown
490
491 # cleanup interfaces
492 self.teardown_interfaces()
493
494 def test_SRv6_End(self):
495 """ Test SRv6 End (without PSP) behavior.
496 """
497 # send traffic to one destination interface
498 # source and destination interfaces are IPv6 only
499 self.setup_interfaces(ipv6=[True, True])
500
501 # configure FIB entries
502 route = VppIpRoute(self, "a4::", 64,
503 [VppRoutePath(self.pg1.remote_ip6,
504 self.pg1.sw_if_index,
505 proto=DpoProto.DPO_PROTO_IP6)],
506 is_ip6=1)
507 route.add_vpp_config()
508
509 # configure SRv6 localSID End without PSP behavior
510 localsid = VppSRv6LocalSID(
511 self, localsid_addr='A3::0',
512 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_END,
513 nh_addr='::',
514 end_psp=0,
515 sw_if_index=0,
516 vlan_index=0,
517 fib_table=0)
518 localsid.add_vpp_config()
519 # log the localsids
520 self.logger.debug(self.vapi.cli("show sr localsid"))
521
522 # create IPv6 packets with SRH (SL=2, SL=1, SL=0)
523 # send one packet per SL value per packet size
524 # SL=0 packet with localSID End with USP needs 2nd SRH
525 count = len(self.pg_packet_sizes)
526 dst_inner = 'a4::1234'
527 pkts = []
528
529 # packets with segments-left 2, active segment a3::
530 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
531 dst_inner,
532 sidlist=['a5::', 'a4::', 'a3::'],
533 segleft=2)
534 # create traffic stream pg0->pg1
535 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
536 self.pg_packet_sizes, count))
537
538 # packets with segments-left 1, active segment a3::
539 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
540 dst_inner,
541 sidlist=['a4::', 'a3::', 'a2::'],
542 segleft=1)
543 # add to traffic stream pg0->pg1
544 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
545 self.pg_packet_sizes, count))
546
547 # TODO: test behavior with SL=0 packet (needs 2*SRH?)
548
549 # send packets and verify received packets
550 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
551 self.compare_rx_tx_packet_End)
552
553 # log the localsid counters
554 self.logger.info(self.vapi.cli("show sr localsid"))
555
556 # remove SRv6 localSIDs
557 localsid.remove_vpp_config()
558
559 # remove FIB entries
560 # done by tearDown
561
562 # cleanup interfaces
563 self.teardown_interfaces()
564
565 def test_SRv6_End_with_PSP(self):
566 """ Test SRv6 End with PSP behavior.
567 """
568 # send traffic to one destination interface
569 # source and destination interfaces are IPv6 only
570 self.setup_interfaces(ipv6=[True, True])
571
572 # configure FIB entries
573 route = VppIpRoute(self, "a4::", 64,
574 [VppRoutePath(self.pg1.remote_ip6,
575 self.pg1.sw_if_index,
576 proto=DpoProto.DPO_PROTO_IP6)],
577 is_ip6=1)
578 route.add_vpp_config()
579
580 # configure SRv6 localSID End with PSP behavior
581 localsid = VppSRv6LocalSID(
582 self, localsid_addr='A3::0',
583 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_END,
584 nh_addr='::',
585 end_psp=1,
586 sw_if_index=0,
587 vlan_index=0,
588 fib_table=0)
589 localsid.add_vpp_config()
590 # log the localsids
591 self.logger.debug(self.vapi.cli("show sr localsid"))
592
593 # create IPv6 packets with SRH (SL=2, SL=1)
594 # send one packet per SL value per packet size
595 # SL=0 packet with localSID End with PSP is dropped
596 count = len(self.pg_packet_sizes)
597 dst_inner = 'a4::1234'
598 pkts = []
599
600 # packets with segments-left 2, active segment a3::
601 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
602 dst_inner,
603 sidlist=['a5::', 'a4::', 'a3::'],
604 segleft=2)
605 # create traffic stream pg0->pg1
606 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
607 self.pg_packet_sizes, count))
608
609 # packets with segments-left 1, active segment a3::
610 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
611 dst_inner,
612 sidlist=['a4::', 'a3::', 'a2::'],
613 segleft=1)
614 # add to traffic stream pg0->pg1
615 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
616 self.pg_packet_sizes, count))
617
618 # send packets and verify received packets
619 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
620 self.compare_rx_tx_packet_End_PSP)
621
622 # log the localsid counters
623 self.logger.info(self.vapi.cli("show sr localsid"))
624
625 # remove SRv6 localSIDs
626 localsid.remove_vpp_config()
627
628 # remove FIB entries
629 # done by tearDown
630
631 # cleanup interfaces
632 self.teardown_interfaces()
633
634 def test_SRv6_End_X(self):
635 """ Test SRv6 End.X (without PSP) behavior.
636 """
637 # create three interfaces (1 source, 2 destinations)
638 # source and destination interfaces are IPv6 only
639 self.setup_interfaces(ipv6=[True, True, True])
640
641 # configure FIB entries
642 # a4::/64 via pg1 and pg2
643 route = VppIpRoute(self, "a4::", 64,
644 [VppRoutePath(self.pg1.remote_ip6,
645 self.pg1.sw_if_index,
646 proto=DpoProto.DPO_PROTO_IP6),
647 VppRoutePath(self.pg2.remote_ip6,
648 self.pg2.sw_if_index,
649 proto=DpoProto.DPO_PROTO_IP6)],
650 is_ip6=1)
651 route.add_vpp_config()
652 self.logger.debug(self.vapi.cli("show ip6 fib"))
653
654 # configure SRv6 localSID End.X without PSP behavior
655 # End.X points to interface pg1
656 localsid = VppSRv6LocalSID(
657 self, localsid_addr='A3::C4',
658 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_X,
659 nh_addr=self.pg1.remote_ip6,
660 end_psp=0,
661 sw_if_index=self.pg1.sw_if_index,
662 vlan_index=0,
663 fib_table=0)
664 localsid.add_vpp_config()
665 # log the localsids
666 self.logger.debug(self.vapi.cli("show sr localsid"))
667
668 # create IPv6 packets with SRH (SL=2, SL=1)
669 # send one packet per SL value per packet size
670 # SL=0 packet with localSID End with PSP is dropped
671 count = len(self.pg_packet_sizes)
672 dst_inner = 'a4::1234'
673 pkts = []
674
675 # packets with segments-left 2, active segment a3::c4
676 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
677 dst_inner,
678 sidlist=['a5::', 'a4::', 'a3::c4'],
679 segleft=2)
680 # create traffic stream pg0->pg1
681 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
682 self.pg_packet_sizes, count))
683
684 # packets with segments-left 1, active segment a3::c4
685 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
686 dst_inner,
687 sidlist=['a4::', 'a3::c4', 'a2::'],
688 segleft=1)
689 # add to traffic stream pg0->pg1
690 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
691 self.pg_packet_sizes, count))
692
693 # send packets and verify received packets
694 # using same comparison function as End (no PSP)
695 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
696 self.compare_rx_tx_packet_End)
697
698 # assert nothing was received on the other interface (pg2)
699 self.pg2.assert_nothing_captured("mis-directed packet(s)")
700
701 # log the localsid counters
702 self.logger.info(self.vapi.cli("show sr localsid"))
703
704 # remove SRv6 localSIDs
705 localsid.remove_vpp_config()
706
707 # remove FIB entries
708 # done by tearDown
709
710 # cleanup interfaces
711 self.teardown_interfaces()
712
713 def test_SRv6_End_X_with_PSP(self):
714 """ Test SRv6 End.X with PSP behavior.
715 """
716 # create three interfaces (1 source, 2 destinations)
717 # source and destination interfaces are IPv6 only
718 self.setup_interfaces(ipv6=[True, True, True])
719
720 # configure FIB entries
721 # a4::/64 via pg1 and pg2
722 route = VppIpRoute(self, "a4::", 64,
723 [VppRoutePath(self.pg1.remote_ip6,
724 self.pg1.sw_if_index,
725 proto=DpoProto.DPO_PROTO_IP6),
726 VppRoutePath(self.pg2.remote_ip6,
727 self.pg2.sw_if_index,
728 proto=DpoProto.DPO_PROTO_IP6)],
729 is_ip6=1)
730 route.add_vpp_config()
731
732 # configure SRv6 localSID End with PSP behavior
733 localsid = VppSRv6LocalSID(
734 self, localsid_addr='A3::C4',
735 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_X,
736 nh_addr=self.pg1.remote_ip6,
737 end_psp=1,
738 sw_if_index=self.pg1.sw_if_index,
739 vlan_index=0,
740 fib_table=0)
741 localsid.add_vpp_config()
742 # log the localsids
743 self.logger.debug(self.vapi.cli("show sr localsid"))
744
745 # create IPv6 packets with SRH (SL=2, SL=1)
746 # send one packet per SL value per packet size
747 # SL=0 packet with localSID End with PSP is dropped
748 count = len(self.pg_packet_sizes)
749 dst_inner = 'a4::1234'
750 pkts = []
751
752 # packets with segments-left 2, active segment a3::
753 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
754 dst_inner,
755 sidlist=['a5::', 'a4::', 'a3::c4'],
756 segleft=2)
757 # create traffic stream pg0->pg1
758 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
759 self.pg_packet_sizes, count))
760
761 # packets with segments-left 1, active segment a3::
762 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
763 dst_inner,
764 sidlist=['a4::', 'a3::c4', 'a2::'],
765 segleft=1)
766 # add to traffic stream pg0->pg1
767 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
768 self.pg_packet_sizes, count))
769
770 # send packets and verify received packets
771 # using same comparison function as End with PSP
772 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
773 self.compare_rx_tx_packet_End_PSP)
774
775 # assert nothing was received on the other interface (pg2)
776 self.pg2.assert_nothing_captured("mis-directed packet(s)")
777
778 # log the localsid counters
779 self.logger.info(self.vapi.cli("show sr localsid"))
780
781 # remove SRv6 localSIDs
782 localsid.remove_vpp_config()
783
784 # remove FIB entries
785 # done by tearDown
786
787 # cleanup interfaces
788 self.teardown_interfaces()
789
790 def test_SRv6_End_DX6(self):
791 """ Test SRv6 End.DX6 behavior.
792 """
793 # send traffic to one destination interface
794 # source and destination interfaces are IPv6 only
795 self.setup_interfaces(ipv6=[True, True])
796
797 # configure SRv6 localSID End.DX6 behavior
798 localsid = VppSRv6LocalSID(
799 self, localsid_addr='a3::c4',
800 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX6,
801 nh_addr=self.pg1.remote_ip6,
802 end_psp=0,
803 sw_if_index=self.pg1.sw_if_index,
804 vlan_index=0,
805 fib_table=0)
806 localsid.add_vpp_config()
807 # log the localsids
808 self.logger.debug(self.vapi.cli("show sr localsid"))
809
810 # create IPv6 packets with SRH (SL=0)
811 # send one packet per packet size
812 count = len(self.pg_packet_sizes)
813 dst_inner = 'a4::1234' # inner header destination address
814 pkts = []
815
816 # packets with SRH, segments-left 0, active segment a3::c4
817 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
818 dst_inner,
819 sidlist=['a3::c4', 'a2::', 'a1::'],
820 segleft=0)
821 # add to traffic stream pg0->pg1
822 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
823 self.pg_packet_sizes, count))
824
825 # packets without SRH, IPv6 in IPv6
826 # outer IPv6 dest addr is the localsid End.DX6
827 packet_header = self.create_packet_header_IPv6_IPv6(
828 dst_inner,
829 dst_outer='a3::c4')
830 # add to traffic stream pg0->pg1
831 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
832 self.pg_packet_sizes, count))
833
834 # send packets and verify received packets
835 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
836 self.compare_rx_tx_packet_End_DX6)
837
838 # log the localsid counters
839 self.logger.info(self.vapi.cli("show sr localsid"))
840
841 # remove SRv6 localSIDs
842 localsid.remove_vpp_config()
843
844 # cleanup interfaces
845 self.teardown_interfaces()
846
847 def test_SRv6_End_DT6(self):
848 """ Test SRv6 End.DT6 behavior.
849 """
850 # create three interfaces (1 source, 2 destinations)
851 # all interfaces are IPv6 only
852 # source interface in global FIB (0)
853 # destination interfaces in global and vrf
854 vrf_1 = 1
855 self.setup_interfaces(ipv6=[True, True, True],
856 ipv6_table_id=[0, 0, vrf_1])
857
858 # configure FIB entries
859 # a4::/64 is reachable
860 # via pg1 in table 0 (global)
861 # and via pg2 in table vrf_1
862 route0 = VppIpRoute(self, "a4::", 64,
863 [VppRoutePath(self.pg1.remote_ip6,
864 self.pg1.sw_if_index,
865 proto=DpoProto.DPO_PROTO_IP6,
866 nh_table_id=0)],
867 table_id=0,
868 is_ip6=1)
869 route0.add_vpp_config()
870 route1 = VppIpRoute(self, "a4::", 64,
871 [VppRoutePath(self.pg2.remote_ip6,
872 self.pg2.sw_if_index,
873 proto=DpoProto.DPO_PROTO_IP6,
874 nh_table_id=vrf_1)],
875 table_id=vrf_1,
876 is_ip6=1)
877 route1.add_vpp_config()
878 self.logger.debug(self.vapi.cli("show ip6 fib"))
879
880 # configure SRv6 localSID End.DT6 behavior
881 # Note:
882 # fib_table: where the localsid is installed
883 # sw_if_index: in T-variants of localsid this is the vrf table_id
884 localsid = VppSRv6LocalSID(
885 self, localsid_addr='a3::c4',
886 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DT6,
887 nh_addr='::',
888 end_psp=0,
889 sw_if_index=vrf_1,
890 vlan_index=0,
891 fib_table=0)
892 localsid.add_vpp_config()
893 # log the localsids
894 self.logger.debug(self.vapi.cli("show sr localsid"))
895
896 # create IPv6 packets with SRH (SL=0)
897 # send one packet per packet size
898 count = len(self.pg_packet_sizes)
899 dst_inner = 'a4::1234' # inner header destination address
900 pkts = []
901
902 # packets with SRH, segments-left 0, active segment a3::c4
903 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
904 dst_inner,
905 sidlist=['a3::c4', 'a2::', 'a1::'],
906 segleft=0)
907 # add to traffic stream pg0->pg1
908 pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
909 self.pg_packet_sizes, count))
910
911 # packets without SRH, IPv6 in IPv6
912 # outer IPv6 dest addr is the localsid End.DT6
913 packet_header = self.create_packet_header_IPv6_IPv6(
914 dst_inner,
915 dst_outer='a3::c4')
916 # add to traffic stream pg0->pg1
917 pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
918 self.pg_packet_sizes, count))
919
920 # send packets and verify received packets
921 # using same comparison function as End.DX6
922 self.send_and_verify_pkts(self.pg0, pkts, self.pg2,
923 self.compare_rx_tx_packet_End_DX6)
924
925 # assert nothing was received on the other interface (pg2)
926 self.pg1.assert_nothing_captured("mis-directed packet(s)")
927
928 # log the localsid counters
929 self.logger.info(self.vapi.cli("show sr localsid"))
930
931 # remove SRv6 localSIDs
932 localsid.remove_vpp_config()
933
934 # remove FIB entries
935 # done by tearDown
936
937 # cleanup interfaces
938 self.teardown_interfaces()
939
940 def test_SRv6_End_DX4(self):
941 """ Test SRv6 End.DX4 behavior.
942 """
943 # send traffic to one destination interface
944 # source interface is IPv6 only
945 # destination interface is IPv4 only
946 self.setup_interfaces(ipv6=[True, False], ipv4=[False, True])
947
948 # configure SRv6 localSID End.DX4 behavior
949 localsid = VppSRv6LocalSID(
950 self, localsid_addr='a3::c4',
951 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX4,
952 nh_addr=self.pg1.remote_ip4,
953 end_psp=0,
954 sw_if_index=self.pg1.sw_if_index,
955 vlan_index=0,
956 fib_table=0)
957 localsid.add_vpp_config()
958 # log the localsids
959 self.logger.debug(self.vapi.cli("show sr localsid"))
960
961 # send one packet per packet size
962 count = len(self.pg_packet_sizes)
963 dst_inner = '4.1.1.123' # inner header destination address
964 pkts = []
965
966 # packets with SRH, segments-left 0, active segment a3::c4
967 packet_header = self.create_packet_header_IPv6_SRH_IPv4(
968 dst_inner,
969 sidlist=['a3::c4', 'a2::', 'a1::'],
970 segleft=0)
971 # add to traffic stream pg0->pg1
972 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
973 self.pg_packet_sizes, count))
974
975 # packets without SRH, IPv4 in IPv6
976 # outer IPv6 dest addr is the localsid End.DX4
977 packet_header = self.create_packet_header_IPv6_IPv4(
978 dst_inner,
979 dst_outer='a3::c4')
980 # add to traffic stream pg0->pg1
981 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
982 self.pg_packet_sizes, count))
983
984 # send packets and verify received packets
985 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
986 self.compare_rx_tx_packet_End_DX4)
987
988 # log the localsid counters
989 self.logger.info(self.vapi.cli("show sr localsid"))
990
991 # remove SRv6 localSIDs
992 localsid.remove_vpp_config()
993
994 # cleanup interfaces
995 self.teardown_interfaces()
996
997 def test_SRv6_End_DT4(self):
998 """ Test SRv6 End.DT4 behavior.
999 """
1000 # create three interfaces (1 source, 2 destinations)
1001 # source interface is IPv6-only
1002 # destination interfaces are IPv4 only
1003 # source interface in global FIB (0)
1004 # destination interfaces in global and vrf
1005 vrf_1 = 1
1006 self.setup_interfaces(ipv6=[True, False, False],
1007 ipv4=[False, True, True],
1008 ipv6_table_id=[0, 0, 0],
1009 ipv4_table_id=[0, 0, vrf_1])
1010
1011 # configure FIB entries
1012 # 4.1.1.0/24 is reachable
1013 # via pg1 in table 0 (global)
1014 # and via pg2 in table vrf_1
1015 route0 = VppIpRoute(self, "4.1.1.0", 24,
1016 [VppRoutePath(self.pg1.remote_ip4,
1017 self.pg1.sw_if_index,
1018 nh_table_id=0)],
1019 table_id=0,
1020 is_ip6=0)
1021 route0.add_vpp_config()
1022 route1 = VppIpRoute(self, "4.1.1.0", 24,
1023 [VppRoutePath(self.pg2.remote_ip4,
1024 self.pg2.sw_if_index,
1025 nh_table_id=vrf_1)],
1026 table_id=vrf_1,
1027 is_ip6=0)
1028 route1.add_vpp_config()
1029 self.logger.debug(self.vapi.cli("show ip fib"))
1030
1031 # configure SRv6 localSID End.DT6 behavior
1032 # Note:
1033 # fib_table: where the localsid is installed
1034 # sw_if_index: in T-variants of localsid: vrf table_id
1035 localsid = VppSRv6LocalSID(
1036 self, localsid_addr='a3::c4',
1037 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DT4,
1038 nh_addr='::',
1039 end_psp=0,
1040 sw_if_index=vrf_1,
1041 vlan_index=0,
1042 fib_table=0)
1043 localsid.add_vpp_config()
1044 # log the localsids
1045 self.logger.debug(self.vapi.cli("show sr localsid"))
1046
1047 # create IPv6 packets with SRH (SL=0)
1048 # send one packet per packet size
1049 count = len(self.pg_packet_sizes)
1050 dst_inner = '4.1.1.123' # inner header destination address
1051 pkts = []
1052
1053 # packets with SRH, segments-left 0, active segment a3::c4
1054 packet_header = self.create_packet_header_IPv6_SRH_IPv4(
1055 dst_inner,
1056 sidlist=['a3::c4', 'a2::', 'a1::'],
1057 segleft=0)
1058 # add to traffic stream pg0->pg1
1059 pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
1060 self.pg_packet_sizes, count))
1061
1062 # packets without SRH, IPv6 in IPv6
1063 # outer IPv6 dest addr is the localsid End.DX4
1064 packet_header = self.create_packet_header_IPv6_IPv4(
1065 dst_inner,
1066 dst_outer='a3::c4')
1067 # add to traffic stream pg0->pg1
1068 pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
1069 self.pg_packet_sizes, count))
1070
1071 # send packets and verify received packets
1072 # using same comparison function as End.DX4
1073 self.send_and_verify_pkts(self.pg0, pkts, self.pg2,
1074 self.compare_rx_tx_packet_End_DX4)
1075
1076 # assert nothing was received on the other interface (pg2)
1077 self.pg1.assert_nothing_captured("mis-directed packet(s)")
1078
1079 # log the localsid counters
1080 self.logger.info(self.vapi.cli("show sr localsid"))
1081
1082 # remove SRv6 localSIDs
1083 localsid.remove_vpp_config()
1084
1085 # remove FIB entries
1086 # done by tearDown
1087
1088 # cleanup interfaces
1089 self.teardown_interfaces()
1090
1091 def test_SRv6_End_DX2(self):
1092 """ Test SRv6 End.DX2 behavior.
1093 """
1094 # send traffic to one destination interface
1095 # source interface is IPv6 only
1096 self.setup_interfaces(ipv6=[True, False], ipv4=[False, False])
1097
1098 # configure SRv6 localSID End.DX2 behavior
1099 localsid = VppSRv6LocalSID(
1100 self, localsid_addr='a3::c4',
1101 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX2,
1102 nh_addr='::',
1103 end_psp=0,
1104 sw_if_index=self.pg1.sw_if_index,
1105 vlan_index=0,
1106 fib_table=0)
1107 localsid.add_vpp_config()
1108 # log the localsids
1109 self.logger.debug(self.vapi.cli("show sr localsid"))
1110
1111 # send one packet per packet size
1112 count = len(self.pg_packet_sizes)
1113 pkts = []
1114
1115 # packets with SRH, segments-left 0, active segment a3::c4
1116 # L2 has no dot1q header
1117 packet_header = self.create_packet_header_IPv6_SRH_L2(
1118 sidlist=['a3::c4', 'a2::', 'a1::'],
1119 segleft=0,
1120 vlan=0)
1121 # add to traffic stream pg0->pg1
1122 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1123 self.pg_packet_sizes, count))
1124
1125 # packets with SRH, segments-left 0, active segment a3::c4
1126 # L2 has dot1q header
1127 packet_header = self.create_packet_header_IPv6_SRH_L2(
1128 sidlist=['a3::c4', 'a2::', 'a1::'],
1129 segleft=0,
1130 vlan=123)
1131 # add to traffic stream pg0->pg1
1132 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1133 self.pg_packet_sizes, count))
1134
1135 # packets without SRH, L2 in IPv6
1136 # outer IPv6 dest addr is the localsid End.DX2
1137 # L2 has no dot1q header
1138 packet_header = self.create_packet_header_IPv6_L2(
1139 dst_outer='a3::c4',
1140 vlan=0)
1141 # add to traffic stream pg0->pg1
1142 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1143 self.pg_packet_sizes, count))
1144
1145 # packets without SRH, L2 in IPv6
1146 # outer IPv6 dest addr is the localsid End.DX2
1147 # L2 has dot1q header
1148 packet_header = self.create_packet_header_IPv6_L2(
1149 dst_outer='a3::c4',
1150 vlan=123)
1151 # add to traffic stream pg0->pg1
1152 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1153 self.pg_packet_sizes, count))
1154
1155 # send packets and verify received packets
1156 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
1157 self.compare_rx_tx_packet_End_DX2)
1158
1159 # log the localsid counters
1160 self.logger.info(self.vapi.cli("show sr localsid"))
1161
1162 # remove SRv6 localSIDs
1163 localsid.remove_vpp_config()
1164
1165 # cleanup interfaces
1166 self.teardown_interfaces()
1167
1168 def compare_rx_tx_packet_T_Encaps(self, tx_pkt, rx_pkt):
1169 """ Compare input and output packet after passing T.Encaps
1170
1171 :param tx_pkt: transmitted packet
1172 :param rx_pkt: received packet
1173 """
1174 # T.Encaps updates the headers as follows:
1175 # SR Policy seglist (S3, S2, S1)
1176 # SR Policy source C
1177 # IPv6:
1178 # in: IPv6(A, B2)
1179 # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv6(A, B2)
1180 # IPv6 + SRH:
1181 # in: IPv6(A, B2)SRH(B3, B2, B1; SL=1)
1182 # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv6(a, B2)SRH(B3, B2, B1; SL=1)
1183
1184 # get first (outer) IPv6 header of rx'ed packet
1185 rx_ip = rx_pkt.getlayer(IPv6)
1186 rx_srh = None
1187
1188 tx_ip = tx_pkt.getlayer(IPv6)
1189
1190 # expected segment-list
1191 seglist = self.sr_policy.segments
1192 # reverse list to get order as in SRH
1193 tx_seglist = seglist[::-1]
1194
1195 # get source address of SR Policy
1196 sr_policy_source = self.sr_policy.source
1197
1198 # rx'ed packet should have SRH
1199 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1200 # get SRH
1201 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1202
1203 # received ip.src should be equal to SR Policy source
1204 self.assertEqual(rx_ip.src, sr_policy_source)
1205 # received ip.dst should be equal to expected sidlist[lastentry]
1206 self.assertEqual(rx_ip.dst, tx_seglist[-1])
1207 # rx'ed seglist should be equal to expected seglist
1208 self.assertEqual(rx_srh.addresses, tx_seglist)
1209 # segleft should be equal to size expected seglist-1
1210 self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1211 # segleft should be equal to lastentry
1212 self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1213
1214 # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1215 # except for the hop-limit field
1216 # -> update tx'ed hlim to the expected hlim
1217 tx_ip.hlim = tx_ip.hlim - 1
1218
1219 self.assertEqual(rx_srh.payload, tx_ip)
1220
1221 self.logger.debug("packet verification: SUCCESS")
1222
1223 def compare_rx_tx_packet_T_Encaps_IPv4(self, tx_pkt, rx_pkt):
1224 """ Compare input and output packet after passing T.Encaps for IPv4
1225
1226 :param tx_pkt: transmitted packet
1227 :param rx_pkt: received packet
1228 """
1229 # T.Encaps for IPv4 updates the headers as follows:
1230 # SR Policy seglist (S3, S2, S1)
1231 # SR Policy source C
1232 # IPv4:
1233 # in: IPv4(A, B2)
1234 # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv4(A, B2)
1235
1236 # get first (outer) IPv6 header of rx'ed packet
1237 rx_ip = rx_pkt.getlayer(IPv6)
1238 rx_srh = None
1239
1240 tx_ip = tx_pkt.getlayer(IP)
1241
1242 # expected segment-list
1243 seglist = self.sr_policy.segments
1244 # reverse list to get order as in SRH
1245 tx_seglist = seglist[::-1]
1246
1247 # get source address of SR Policy
1248 sr_policy_source = self.sr_policy.source
1249
1250 # checks common to cases tx with and without SRH
1251 # rx'ed packet should have SRH and IPv4 header
1252 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1253 self.assertTrue(rx_ip.payload.haslayer(IP))
1254 # get SRH
1255 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1256
1257 # received ip.src should be equal to SR Policy source
1258 self.assertEqual(rx_ip.src, sr_policy_source)
1259 # received ip.dst should be equal to sidlist[lastentry]
1260 self.assertEqual(rx_ip.dst, tx_seglist[-1])
1261 # rx'ed seglist should be equal to seglist
1262 self.assertEqual(rx_srh.addresses, tx_seglist)
1263 # segleft should be equal to size seglist-1
1264 self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1265 # segleft should be equal to lastentry
1266 self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1267
1268 # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1269 # except for the ttl field and ip checksum
1270 # -> adjust tx'ed ttl to expected ttl
1271 tx_ip.ttl = tx_ip.ttl - 1
1272 # -> set tx'ed ip checksum to None and let scapy recompute
1273 tx_ip.chksum = None
1274 # read back the pkt (with str()) to force computing these fields
1275 # probably other ways to accomplish this are possible
1276 tx_ip = IP(str(tx_ip))
1277
1278 self.assertEqual(rx_srh.payload, tx_ip)
1279
1280 self.logger.debug("packet verification: SUCCESS")
1281
1282 def compare_rx_tx_packet_T_Encaps_L2(self, tx_pkt, rx_pkt):
1283 """ Compare input and output packet after passing T.Encaps for L2
1284
1285 :param tx_pkt: transmitted packet
1286 :param rx_pkt: received packet
1287 """
1288 # T.Encaps for L2 updates the headers as follows:
1289 # SR Policy seglist (S3, S2, S1)
1290 # SR Policy source C
1291 # L2:
1292 # in: L2
1293 # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)L2
1294
1295 # get first (outer) IPv6 header of rx'ed packet
1296 rx_ip = rx_pkt.getlayer(IPv6)
1297 rx_srh = None
1298
1299 tx_ether = tx_pkt.getlayer(Ether)
1300
1301 # expected segment-list
1302 seglist = self.sr_policy.segments
1303 # reverse list to get order as in SRH
1304 tx_seglist = seglist[::-1]
1305
1306 # get source address of SR Policy
1307 sr_policy_source = self.sr_policy.source
1308
1309 # rx'ed packet should have SRH
1310 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1311 # get SRH
1312 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1313
1314 # received ip.src should be equal to SR Policy source
1315 self.assertEqual(rx_ip.src, sr_policy_source)
1316 # received ip.dst should be equal to sidlist[lastentry]
1317 self.assertEqual(rx_ip.dst, tx_seglist[-1])
1318 # rx'ed seglist should be equal to seglist
1319 self.assertEqual(rx_srh.addresses, tx_seglist)
1320 # segleft should be equal to size seglist-1
1321 self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1322 # segleft should be equal to lastentry
1323 self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1324 # nh should be "No Next Header" (59)
1325 self.assertEqual(rx_srh.nh, 59)
1326
1327 # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1328 self.assertEqual(Ether(str(rx_srh.payload)), tx_ether)
1329
1330 self.logger.debug("packet verification: SUCCESS")
1331
1332 def compare_rx_tx_packet_T_Insert(self, tx_pkt, rx_pkt):
1333 """ Compare input and output packet after passing T.Insert
1334
1335 :param tx_pkt: transmitted packet
1336 :param rx_pkt: received packet
1337 """
1338 # T.Insert updates the headers as follows:
1339 # IPv6:
1340 # in: IPv6(A, B2)
1341 # out: IPv6(A, S1)SRH(B2, S3, S2, S1; SL=3)
1342 # IPv6 + SRH:
1343 # in: IPv6(A, B2)SRH(B3, B2, B1; SL=1)
1344 # out: IPv6(A, S1)SRH(B2, S3, S2, S1; SL=3)SRH(B3, B2, B1; SL=1)
1345
1346 # get first (outer) IPv6 header of rx'ed packet
1347 rx_ip = rx_pkt.getlayer(IPv6)
1348 rx_srh = None
1349 rx_ip2 = None
1350 rx_srh2 = None
1351 rx_ip3 = None
1352 rx_udp = rx_pkt[UDP]
1353
1354 tx_ip = tx_pkt.getlayer(IPv6)
1355 tx_srh = None
1356 tx_ip2 = None
1357 # some packets have been tx'ed with an SRH, some without it
1358 # get SRH if tx'ed packet has it
1359 if tx_pkt.haslayer(IPv6ExtHdrSegmentRouting):
1360 tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1361 tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1362 tx_udp = tx_pkt[UDP]
1363
1364 # expected segment-list (make copy of SR Policy segment list)
1365 seglist = self.sr_policy.segments[:]
1366 # expected seglist has initial dest addr as last segment
1367 seglist.append(tx_ip.dst)
1368 # reverse list to get order as in SRH
1369 tx_seglist = seglist[::-1]
1370
1371 # get source address of SR Policy
1372 sr_policy_source = self.sr_policy.source
1373
1374 # checks common to cases tx with and without SRH
1375 # rx'ed packet should have SRH and only one IPv6 header
1376 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1377 self.assertFalse(rx_ip.payload.haslayer(IPv6))
1378 # get SRH
1379 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1380
1381 # rx'ed ip.src should be equal to tx'ed ip.src
1382 self.assertEqual(rx_ip.src, tx_ip.src)
1383 # rx'ed ip.dst should be equal to sidlist[lastentry]
1384 self.assertEqual(rx_ip.dst, tx_seglist[-1])
1385
1386 # rx'ed seglist should be equal to expected seglist
1387 self.assertEqual(rx_srh.addresses, tx_seglist)
1388 # segleft should be equal to size(expected seglist)-1
1389 self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1390 # segleft should be equal to lastentry
1391 self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1392
1393 if tx_srh: # packet was tx'ed with SRH
1394 # packet should have 2nd SRH
1395 self.assertTrue(rx_srh.payload.haslayer(IPv6ExtHdrSegmentRouting))
1396 # get 2nd SRH
1397 rx_srh2 = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting, 2)
1398
1399 # rx'ed srh2.addresses should be equal to tx'ed srh.addresses
1400 self.assertEqual(rx_srh2.addresses, tx_srh.addresses)
1401 # rx'ed srh2.segleft should be equal to tx'ed srh.segleft
1402 self.assertEqual(rx_srh2.segleft, tx_srh.segleft)
1403 # rx'ed srh2.lastentry should be equal to tx'ed srh.lastentry
1404 self.assertEqual(rx_srh2.lastentry, tx_srh.lastentry)
1405
1406 else: # packet was tx'ed without SRH
1407 # rx packet should have no other SRH
1408 self.assertFalse(rx_srh.payload.haslayer(IPv6ExtHdrSegmentRouting))
1409
1410 # UDP layer should be unchanged
1411 self.assertEqual(rx_udp, tx_udp)
1412
1413 self.logger.debug("packet verification: SUCCESS")
1414
1415 def compare_rx_tx_packet_End(self, tx_pkt, rx_pkt):
1416 """ Compare input and output packet after passing End (without PSP)
1417
1418 :param tx_pkt: transmitted packet
1419 :param rx_pkt: received packet
1420 """
1421 # End (no PSP) updates the headers as follows:
1422 # IPv6 + SRH:
1423 # in: IPv6(A, S1)SRH(S3, S2, S1; SL=2)
1424 # out: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1425
1426 # get first (outer) IPv6 header of rx'ed packet
1427 rx_ip = rx_pkt.getlayer(IPv6)
1428 rx_srh = None
1429 rx_ip2 = None
1430 rx_udp = rx_pkt[UDP]
1431
1432 tx_ip = tx_pkt.getlayer(IPv6)
1433 # we know the packet has been tx'ed
1434 # with an inner IPv6 header and an SRH
1435 tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1436 tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1437 tx_udp = tx_pkt[UDP]
1438
1439 # common checks, regardless of tx segleft value
1440 # rx'ed packet should have 2nd IPv6 header
1441 self.assertTrue(rx_ip.payload.haslayer(IPv6))
1442 # get second (inner) IPv6 header
1443 rx_ip2 = rx_pkt.getlayer(IPv6, 2)
1444
1445 if tx_ip.segleft > 0:
1446 # SRH should NOT have been popped:
1447 # End SID without PSP does not pop SRH if segleft>0
1448 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1449 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1450
1451 # received ip.src should be equal to expected ip.src
1452 self.assertEqual(rx_ip.src, tx_ip.src)
1453 # sidlist should be unchanged
1454 self.assertEqual(rx_srh.addresses, tx_srh.addresses)
1455 # segleft should have been decremented
1456 self.assertEqual(rx_srh.segleft, tx_srh.segleft-1)
1457 # received ip.dst should be equal to sidlist[segleft]
1458 self.assertEqual(rx_ip.dst, rx_srh.addresses[rx_srh.segleft])
1459 # lastentry should be unchanged
1460 self.assertEqual(rx_srh.lastentry, tx_srh.lastentry)
1461 # inner IPv6 packet (ip2) should be unchanged
1462 self.assertEqual(rx_ip2.src, tx_ip2.src)
1463 self.assertEqual(rx_ip2.dst, tx_ip2.dst)
1464 # else: # tx_ip.segleft == 0
1465 # TODO: Does this work with 2 SRHs in ingress packet?
1466
1467 # UDP layer should be unchanged
1468 self.assertEqual(rx_udp, tx_udp)
1469
1470 self.logger.debug("packet verification: SUCCESS")
1471
1472 def compare_rx_tx_packet_End_PSP(self, tx_pkt, rx_pkt):
1473 """ Compare input and output packet after passing End with PSP
1474
1475 :param tx_pkt: transmitted packet
1476 :param rx_pkt: received packet
1477 """
1478 # End (PSP) updates the headers as follows:
1479 # IPv6 + SRH (SL>1):
1480 # in: IPv6(A, S1)SRH(S3, S2, S1; SL=2)
1481 # out: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1482 # IPv6 + SRH (SL=1):
1483 # in: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1484 # out: IPv6(A, S3)
1485
1486 # get first (outer) IPv6 header of rx'ed packet
1487 rx_ip = rx_pkt.getlayer(IPv6)
1488 rx_srh = None
1489 rx_ip2 = None
1490 rx_udp = rx_pkt[UDP]
1491
1492 tx_ip = tx_pkt.getlayer(IPv6)
1493 # we know the packet has been tx'ed
1494 # with an inner IPv6 header and an SRH
1495 tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1496 tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1497 tx_udp = tx_pkt[UDP]
1498
1499 # common checks, regardless of tx segleft value
1500 self.assertTrue(rx_ip.payload.haslayer(IPv6))
1501 rx_ip2 = rx_pkt.getlayer(IPv6, 2)
1502 # inner IPv6 packet (ip2) should be unchanged
1503 self.assertEqual(rx_ip2.src, tx_ip2.src)
1504 self.assertEqual(rx_ip2.dst, tx_ip2.dst)
1505
1506 if tx_ip.segleft > 1:
1507 # SRH should NOT have been popped:
1508 # End SID with PSP does not pop SRH if segleft>1
1509 # rx'ed packet should have SRH
1510 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1511 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1512
1513 # received ip.src should be equal to expected ip.src
1514 self.assertEqual(rx_ip.src, tx_ip.src)
1515 # sidlist should be unchanged
1516 self.assertEqual(rx_srh.addresses, tx_srh.addresses)
1517 # segleft should have been decremented
1518 self.assertEqual(rx_srh.segleft, tx_srh.segleft-1)
1519 # received ip.dst should be equal to sidlist[segleft]
1520 self.assertEqual(rx_ip.dst, rx_srh.addresses[rx_srh.segleft])
1521 # lastentry should be unchanged
1522 self.assertEqual(rx_srh.lastentry, tx_srh.lastentry)
1523
1524 else: # tx_ip.segleft <= 1
1525 # SRH should have been popped:
1526 # End SID with PSP and segleft=1 pops SRH
1527 # the two IPv6 headers are still present
1528 # outer IPv6 header has DA == last segment of popped SRH
1529 # SRH should not be present
1530 self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1531 # outer IPv6 header ip.src should be equal to tx'ed ip.src
1532 self.assertEqual(rx_ip.src, tx_ip.src)
1533 # outer IPv6 header ip.dst should be = to tx'ed sidlist[segleft-1]
1534 self.assertEqual(rx_ip.dst, tx_srh.addresses[tx_srh.segleft-1])
1535
1536 # UDP layer should be unchanged
1537 self.assertEqual(rx_udp, tx_udp)
1538
1539 self.logger.debug("packet verification: SUCCESS")
1540
1541 def compare_rx_tx_packet_End_DX6(self, tx_pkt, rx_pkt):
1542 """ Compare input and output packet after passing End.DX6
1543
1544 :param tx_pkt: transmitted packet
1545 :param rx_pkt: received packet
1546 """
1547 # End.DX6 updates the headers as follows:
1548 # IPv6 + SRH (SL=0):
1549 # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)IPv6(B, D)
1550 # out: IPv6(B, D)
1551 # IPv6:
1552 # in: IPv6(A, S3)IPv6(B, D)
1553 # out: IPv6(B, D)
1554
1555 # get first (outer) IPv6 header of rx'ed packet
1556 rx_ip = rx_pkt.getlayer(IPv6)
1557
1558 tx_ip = tx_pkt.getlayer(IPv6)
1559 tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1560
1561 # verify if rx'ed packet has no SRH
1562 self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1563
1564 # the whole rx_ip pkt should be equal to tx_ip2
1565 # except for the hlim field
1566 # -> adjust tx'ed hlim to expected hlim
1567 tx_ip2.hlim = tx_ip2.hlim - 1
1568
1569 self.assertEqual(rx_ip, tx_ip2)
1570
1571 self.logger.debug("packet verification: SUCCESS")
1572
1573 def compare_rx_tx_packet_End_DX4(self, tx_pkt, rx_pkt):
1574 """ Compare input and output packet after passing End.DX4
1575
1576 :param tx_pkt: transmitted packet
1577 :param rx_pkt: received packet
1578 """
1579 # End.DX4 updates the headers as follows:
1580 # IPv6 + SRH (SL=0):
1581 # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)IPv4(B, D)
1582 # out: IPv4(B, D)
1583 # IPv6:
1584 # in: IPv6(A, S3)IPv4(B, D)
1585 # out: IPv4(B, D)
1586
1587 # get IPv4 header of rx'ed packet
1588 rx_ip = rx_pkt.getlayer(IP)
1589
1590 tx_ip = tx_pkt.getlayer(IPv6)
1591 tx_ip2 = tx_pkt.getlayer(IP)
1592
1593 # verify if rx'ed packet has no SRH
1594 self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1595
1596 # the whole rx_ip pkt should be equal to tx_ip2
1597 # except for the ttl field and ip checksum
1598 # -> adjust tx'ed ttl to expected ttl
1599 tx_ip2.ttl = tx_ip2.ttl - 1
1600 # -> set tx'ed ip checksum to None and let scapy recompute
1601 tx_ip2.chksum = None
1602 # read back the pkt (with str()) to force computing these fields
1603 # probably other ways to accomplish this are possible
1604 tx_ip2 = IP(str(tx_ip2))
1605
1606 self.assertEqual(rx_ip, tx_ip2)
1607
1608 self.logger.debug("packet verification: SUCCESS")
1609
1610 def compare_rx_tx_packet_End_DX2(self, tx_pkt, rx_pkt):
1611 """ Compare input and output packet after passing End.DX2
1612
1613 :param tx_pkt: transmitted packet
1614 :param rx_pkt: received packet
1615 """
1616 # End.DX2 updates the headers as follows:
1617 # IPv6 + SRH (SL=0):
1618 # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)L2
1619 # out: L2
1620 # IPv6:
1621 # in: IPv6(A, S3)L2
1622 # out: L2
1623
1624 # get IPv4 header of rx'ed packet
1625 rx_eth = rx_pkt.getlayer(Ether)
1626
1627 tx_ip = tx_pkt.getlayer(IPv6)
1628 # we can't just get the 2nd Ether layer
1629 # get the Raw content and dissect it as Ether
1630 tx_eth1 = Ether(str(tx_pkt[Raw]))
1631
1632 # verify if rx'ed packet has no SRH
1633 self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1634
1635 # the whole rx_eth pkt should be equal to tx_eth1
1636 self.assertEqual(rx_eth, tx_eth1)
1637
1638 self.logger.debug("packet verification: SUCCESS")
1639
1640 def create_stream(self, src_if, dst_if, packet_header, packet_sizes,
1641 count):
1642 """Create SRv6 input packet stream for defined interface.
1643
1644 :param VppInterface src_if: Interface to create packet stream for
1645 :param VppInterface dst_if: destination interface of packet stream
1646 :param packet_header: Layer3 scapy packet headers,
1647 L2 is added when not provided,
1648 Raw(payload) with packet_info is added
1649 :param list packet_sizes: packet stream pckt sizes,sequentially applied
1650 to packets in stream have
1651 :param int count: number of packets in packet stream
1652 :return: list of packets
1653 """
1654 self.logger.info("Creating packets")
1655 pkts = []
1656 for i in range(0, count-1):
1657 payload_info = self.create_packet_info(src_if, dst_if)
1658 self.logger.debug(
1659 "Creating packet with index %d" % (payload_info.index))
1660 payload = self.info_to_payload(payload_info)
1661 # add L2 header if not yet provided in packet_header
1662 if packet_header.getlayer(0).name == 'Ethernet':
1663 p = (packet_header /
1664 Raw(payload))
1665 else:
1666 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
1667 packet_header /
1668 Raw(payload))
1669 size = packet_sizes[i % len(packet_sizes)]
1670 self.logger.debug("Packet size %d" % (size))
1671 self.extend_packet(p, size)
1672 # we need to store the packet with the automatic fields computed
1673 # read back the dumped packet (with str())
1674 # to force computing these fields
1675 # probably other ways are possible
1676 p = Ether(str(p))
1677 payload_info.data = p.copy()
1678 self.logger.debug(ppp("Created packet:", p))
1679 pkts.append(p)
1680 self.logger.info("Done creating packets")
1681 return pkts
1682
1683 def send_and_verify_pkts(self, input, pkts, output, compare_func):
1684 """Send packets and verify received packets using compare_func
1685
1686 :param input: ingress interface of DUT
1687 :param pkts: list of packets to transmit
1688 :param output: egress interface of DUT
1689 :param compare_func: function to compare in and out packets
1690 """
1691 # add traffic stream to input interface
1692 input.add_stream(pkts)
1693
1694 # enable capture on all interfaces
1695 self.pg_enable_capture(self.pg_interfaces)
1696
1697 # start traffic
1698 self.logger.info("Starting traffic")
1699 self.pg_start()
1700
1701 # get output capture
1702 self.logger.info("Getting packet capture")
1703 capture = output.get_capture()
1704
1705 # assert nothing was captured on input interface
1706 input.assert_nothing_captured()
1707
1708 # verify captured packets
1709 self.verify_captured_pkts(output, capture, compare_func)
1710
1711 def create_packet_header_IPv6(self, dst):
1712 """Create packet header: IPv6 header, UDP header
1713
1714 :param dst: IPv6 destination address
1715
1716 IPv6 source address is 1234::1
1717 UDP source port and destination port are 1234
1718 """
1719
1720 p = (IPv6(src='1234::1', dst=dst) /
1721 UDP(sport=1234, dport=1234))
1722 return p
1723
1724 def create_packet_header_IPv6_SRH(self, sidlist, segleft):
1725 """Create packet header: IPv6 header with SRH, UDP header
1726
1727 :param list sidlist: segment list
1728 :param int segleft: segments-left field value
1729
1730 IPv6 destination address is set to sidlist[segleft]
1731 IPv6 source addresses are 1234::1 and 4321::1
1732 UDP source port and destination port are 1234
1733 """
1734
1735 p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
1736 IPv6ExtHdrSegmentRouting(addresses=sidlist) /
1737 UDP(sport=1234, dport=1234))
1738 return p
1739
1740 def create_packet_header_IPv6_SRH_IPv6(self, dst, sidlist, segleft):
1741 """Create packet header: IPv6 encapsulated in SRv6:
1742 IPv6 header with SRH, IPv6 header, UDP header
1743
1744 :param ipv6address dst: inner IPv6 destination address
1745 :param list sidlist: segment list of outer IPv6 SRH
1746 :param int segleft: segments-left field of outer IPv6 SRH
1747
1748 Outer IPv6 destination address is set to sidlist[segleft]
1749 IPv6 source addresses are 1234::1 and 4321::1
1750 UDP source port and destination port are 1234
1751 """
1752
1753 p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
1754 IPv6ExtHdrSegmentRouting(addresses=sidlist,
1755 segleft=segleft, nh=41) /
1756 IPv6(src='4321::1', dst=dst) /
1757 UDP(sport=1234, dport=1234))
1758 return p
1759
1760 def create_packet_header_IPv6_IPv6(self, dst_inner, dst_outer):
1761 """Create packet header: IPv6 encapsulated in IPv6:
1762 IPv6 header, IPv6 header, UDP header
1763
1764 :param ipv6address dst_inner: inner IPv6 destination address
1765 :param ipv6address dst_outer: outer IPv6 destination address
1766
1767 IPv6 source addresses are 1234::1 and 4321::1
1768 UDP source port and destination port are 1234
1769 """
1770
1771 p = (IPv6(src='1234::1', dst=dst_outer) /
1772 IPv6(src='4321::1', dst=dst_inner) /
1773 UDP(sport=1234, dport=1234))
1774 return p
1775
1776 def create_packet_header_IPv6_SRH_SRH_IPv6(self, dst, sidlist1, segleft1,
1777 sidlist2, segleft2):
1778 """Create packet header: IPv6 encapsulated in SRv6 with 2 SRH:
1779 IPv6 header with SRH, 2nd SRH, IPv6 header, UDP header
1780
1781 :param ipv6address dst: inner IPv6 destination address
1782 :param list sidlist1: segment list of outer IPv6 SRH
1783 :param int segleft1: segments-left field of outer IPv6 SRH
1784 :param list sidlist2: segment list of inner IPv6 SRH
1785 :param int segleft2: segments-left field of inner IPv6 SRH
1786
1787 Outer IPv6 destination address is set to sidlist[segleft]
1788 IPv6 source addresses are 1234::1 and 4321::1
1789 UDP source port and destination port are 1234
1790 """
1791
1792 p = (IPv6(src='1234::1', dst=sidlist1[segleft1]) /
1793 IPv6ExtHdrSegmentRouting(addresses=sidlist1,
1794 segleft=segleft1, nh=43) /
1795 IPv6ExtHdrSegmentRouting(addresses=sidlist2,
1796 segleft=segleft2, nh=41) /
1797 IPv6(src='4321::1', dst=dst) /
1798 UDP(sport=1234, dport=1234))
1799 return p
1800
1801 def create_packet_header_IPv4(self, dst):
1802 """Create packet header: IPv4 header, UDP header
1803
1804 :param dst: IPv4 destination address
1805
1806 IPv4 source address is 123.1.1.1
1807 UDP source port and destination port are 1234
1808 """
1809
1810 p = (IP(src='123.1.1.1', dst=dst) /
1811 UDP(sport=1234, dport=1234))
1812 return p
1813
1814 def create_packet_header_IPv6_IPv4(self, dst_inner, dst_outer):
1815 """Create packet header: IPv4 encapsulated in IPv6:
1816 IPv6 header, IPv4 header, UDP header
1817
1818 :param ipv4address dst_inner: inner IPv4 destination address
1819 :param ipv6address dst_outer: outer IPv6 destination address
1820
1821 IPv6 source address is 1234::1
1822 IPv4 source address is 123.1.1.1
1823 UDP source port and destination port are 1234
1824 """
1825
1826 p = (IPv6(src='1234::1', dst=dst_outer) /
1827 IP(src='123.1.1.1', dst=dst_inner) /
1828 UDP(sport=1234, dport=1234))
1829 return p
1830
1831 def create_packet_header_IPv6_SRH_IPv4(self, dst, sidlist, segleft):
1832 """Create packet header: IPv4 encapsulated in SRv6:
1833 IPv6 header with SRH, IPv4 header, UDP header
1834
1835 :param ipv4address dst: inner IPv4 destination address
1836 :param list sidlist: segment list of outer IPv6 SRH
1837 :param int segleft: segments-left field of outer IPv6 SRH
1838
1839 Outer IPv6 destination address is set to sidlist[segleft]
1840 IPv6 source address is 1234::1
1841 IPv4 source address is 123.1.1.1
1842 UDP source port and destination port are 1234
1843 """
1844
1845 p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
1846 IPv6ExtHdrSegmentRouting(addresses=sidlist,
1847 segleft=segleft, nh=4) /
1848 IP(src='123.1.1.1', dst=dst) /
1849 UDP(sport=1234, dport=1234))
1850 return p
1851
1852 def create_packet_header_L2(self, vlan=0):
1853 """Create packet header: L2 header
1854
1855 :param vlan: if vlan!=0 then add 802.1q header
1856 """
1857 # Note: the dst addr ('00:55:44:33:22:11') is used in
1858 # the compare function compare_rx_tx_packet_T_Encaps_L2
1859 # to detect presence of L2 in SRH payload
1860 p = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
1861 etype = 0x8137 # IPX
1862 if vlan:
1863 # add 802.1q layer
1864 p /= Dot1Q(vlan=vlan, type=etype)
1865 else:
1866 p.type = etype
1867 return p
1868
1869 def create_packet_header_IPv6_SRH_L2(self, sidlist, segleft, vlan=0):
1870 """Create packet header: L2 encapsulated in SRv6:
1871 IPv6 header with SRH, L2
1872
1873 :param list sidlist: segment list of outer IPv6 SRH
1874 :param int segleft: segments-left field of outer IPv6 SRH
1875 :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
1876
1877 Outer IPv6 destination address is set to sidlist[segleft]
1878 IPv6 source address is 1234::1
1879 """
1880 eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
1881 etype = 0x8137 # IPX
1882 if vlan:
1883 # add 802.1q layer
1884 eth /= Dot1Q(vlan=vlan, type=etype)
1885 else:
1886 eth.type = etype
1887
1888 p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
1889 IPv6ExtHdrSegmentRouting(addresses=sidlist,
1890 segleft=segleft, nh=59) /
1891 eth)
1892 return p
1893
1894 def create_packet_header_IPv6_L2(self, dst_outer, vlan=0):
1895 """Create packet header: L2 encapsulated in IPv6:
1896 IPv6 header, L2
1897
1898 :param ipv6address dst_outer: outer IPv6 destination address
1899 :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
1900 """
1901 eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
1902 etype = 0x8137 # IPX
1903 if vlan:
1904 # add 802.1q layer
1905 eth /= Dot1Q(vlan=vlan, type=etype)
1906 else:
1907 eth.type = etype
1908
1909 p = (IPv6(src='1234::1', dst=dst_outer, nh=59) / eth)
1910 return p
1911
1912 def get_payload_info(self, packet):
1913 """ Extract the payload_info from the packet
1914 """
1915 # in most cases, payload_info is in packet[Raw]
1916 # but packet[Raw] gives the complete payload
1917 # (incl L2 header) for the T.Encaps L2 case
1918 try:
1919 payload_info = self.payload_to_info(str(packet[Raw]))
1920
1921 except:
1922 # remote L2 header from packet[Raw]:
1923 # take packet[Raw], convert it to an Ether layer
1924 # and then extract Raw from it
1925 payload_info = self.payload_to_info(
1926 str(Ether(str(packet[Raw]))[Raw]))
1927
1928 return payload_info
1929
1930 def verify_captured_pkts(self, dst_if, capture, compare_func):
1931 """
1932 Verify captured packet stream for specified interface.
1933 Compare ingress with egress packets using the specified compare fn
1934
1935 :param dst_if: egress interface of DUT
1936 :param capture: captured packets
1937 :param compare_func: function to compare in and out packet
1938 """
1939 self.logger.info("Verifying capture on interface %s using function %s"
1940 % (dst_if.name, compare_func.func_name))
1941
1942 last_info = dict()
1943 for i in self.pg_interfaces:
1944 last_info[i.sw_if_index] = None
1945 dst_sw_if_index = dst_if.sw_if_index
1946
1947 for packet in capture:
1948 try:
1949 # extract payload_info from packet's payload
1950 payload_info = self.get_payload_info(packet)
1951 packet_index = payload_info.index
1952
1953 self.logger.debug("Verifying packet with index %d"
1954 % (packet_index))
1955 # packet should have arrived on the expected interface
1956 self.assertEqual(payload_info.dst, dst_sw_if_index)
1957 self.logger.debug(
1958 "Got packet on interface %s: src=%u (idx=%u)" %
1959 (dst_if.name, payload_info.src, packet_index))
1960
1961 # search for payload_info with same src and dst if_index
1962 # this will give us the transmitted packet
1963 next_info = self.get_next_packet_info_for_interface2(
1964 payload_info.src, dst_sw_if_index,
1965 last_info[payload_info.src])
1966 last_info[payload_info.src] = next_info
1967 # next_info should not be None
1968 self.assertTrue(next_info is not None)
1969 # index of tx and rx packets should be equal
1970 self.assertEqual(packet_index, next_info.index)
1971 # data field of next_info contains the tx packet
1972 txed_packet = next_info.data
1973
1974 self.logger.debug(ppp("Transmitted packet:",
1975 txed_packet)) # ppp=Pretty Print Packet
1976
1977 self.logger.debug(ppp("Received packet:", packet))
1978
1979 # compare rcvd packet with expected packet using compare_func
1980 compare_func(txed_packet, packet)
1981
1982 except:
1983 print packet.command()
1984 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1985 raise
1986
1987 # have all expected packets arrived?
1988 for i in self.pg_interfaces:
1989 remaining_packet = self.get_next_packet_info_for_interface2(
1990 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
1991 self.assertTrue(remaining_packet is None,
1992 "Interface %s: Packet expected from interface %s "
1993 "didn't arrive" % (dst_if.name, i.name))
1994
1995
1996if __name__ == '__main__':
1997 unittest.main(testRunner=VppTestRunner)