tests: replace pycodestyle with black
Drop pycodestyle for code style checking in favor of black. Black is
much faster, stable PEP8 compliant code style checker offering also
automatic formatting. It aims to be very stable and produce smallest
diffs. It's used by many small and big projects.
Running checkstyle with black takes a few seconds with a terse output.
Thus, test-checkstyle-diff is no longer necessary.
Expand scope of checkstyle to all python files in the repo, replacing
test-checkstyle with checkstyle-python.
Also, fixstyle-python is now available for automatic style formatting.
Note: python virtualenv has been consolidated in test/Makefile,
test/requirements*.txt which will eventually be moved to a central
location. This is required to simply the automated generation of
docker executor images in the CI.
Type: improvement
Change-Id: I022a326603485f58585e879ac0f697fceefbc9c8
Signed-off-by: Klement Sekera <klement.sekera@gmail.com>
Signed-off-by: Dave Wallace <dwallacelf@gmail.com>
diff --git a/test/test_srv6.py b/test/test_srv6.py
index ec2fb6d..40b5337 100644
--- a/test/test_srv6.py
+++ b/test/test_srv6.py
@@ -6,8 +6,14 @@
from framework import VppTestCase, VppTestRunner
from vpp_ip_route import VppIpRoute, VppRoutePath, FibPathProto, VppIpTable
-from vpp_srv6 import SRv6LocalSIDBehaviors, VppSRv6LocalSID, VppSRv6Policy, \
- SRv6PolicyType, VppSRv6Steering, SRv6PolicySteeringTypes
+from vpp_srv6 import (
+ SRv6LocalSIDBehaviors,
+ VppSRv6LocalSID,
+ VppSRv6Policy,
+ SRv6PolicyType,
+ VppSRv6Steering,
+ SRv6PolicySteeringTypes,
+)
import scapy.compat
from scapy.packet import Raw
@@ -19,7 +25,7 @@
class TestSRv6(VppTestCase):
- """ SRv6 Test Case """
+ """SRv6 Test Case"""
@classmethod
def setUpClass(cls):
@@ -30,8 +36,7 @@
super(TestSRv6, cls).tearDownClass()
def setUp(self):
- """ Perform test setup before each test case.
- """
+ """Perform test setup before each test case."""
super(TestSRv6, self).setUp()
# packet sizes, inclusive L2 overhead
@@ -41,17 +46,15 @@
self.reset_packet_infos()
def tearDown(self):
- """ Clean up test setup after each test case.
- """
+ """Clean up test setup after each test case."""
self.teardown_interfaces()
super(TestSRv6, self).tearDown()
- def configure_interface(self,
- interface,
- ipv6=False, ipv4=False,
- ipv6_table_id=0, ipv4_table_id=0):
- """ Configure interface.
+ def configure_interface(
+ self, interface, ipv6=False, ipv4=False, ipv6_table_id=0, ipv4_table_id=0
+ ):
+ """Configure interface.
:param ipv6: configure IPv6 on interface
:param ipv4: configure IPv4 on interface
:param ipv6_table_id: FIB table_id for IPv6
@@ -70,9 +73,8 @@
interface.resolve_arp()
interface.admin_up()
- def setup_interfaces(self, ipv6=[], ipv4=[],
- ipv6_table_id=[], ipv4_table_id=[]):
- """ Create and configure interfaces.
+ def setup_interfaces(self, ipv6=[], ipv4=[], ipv6_table_id=[], ipv4_table_id=[]):
+ """Create and configure interfaces.
:param ipv6: list of interface IPv6 capabilities
:param ipv4: list of interface IPv4 capabilities
@@ -107,9 +109,9 @@
# setup all interfaces
for i in range(count):
intf = self.pg_interfaces[i]
- self.configure_interface(intf,
- ipv6[i], ipv4[i],
- ipv6_table_id[i], ipv4_table_id[i])
+ self.configure_interface(
+ intf, ipv6[i], ipv4[i], ipv6_table_id[i], ipv4_table_id[i]
+ )
if any(ipv6):
self.logger.debug(self.vapi.cli("show ip6 neighbors"))
@@ -121,8 +123,7 @@
return self.pg_interfaces
def teardown_interfaces(self):
- """ Unconfigure and bring down interface.
- """
+ """Unconfigure and bring down interface."""
self.logger.debug("Tearing down interfaces")
# tear down all interfaces
# AFAIK they cannot be deleted
@@ -135,16 +136,15 @@
@unittest.skipUnless(0, "PC to fix")
def test_SRv6_T_Encaps(self):
- """ Test SRv6 Transit.Encaps behavior for IPv6.
- """
+ """Test SRv6 Transit.Encaps behavior for IPv6."""
# send traffic to one destination interface
# source and destination are IPv6 only
self.setup_interfaces(ipv6=[True, True])
# configure FIB entries
- route = VppIpRoute(self, "a4::", 64,
- [VppRoutePath(self.pg1.remote_ip6,
- self.pg1.sw_if_index)])
+ route = VppIpRoute(
+ self, "a4::", 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
+ )
route.add_vpp_config()
# configure encaps IPv6 source address
@@ -152,16 +152,19 @@
# TODO: API?
self.vapi.cli("set sr encaps source addr a3::")
- bsid = 'a3::9999:1'
+ bsid = "a3::9999:1"
# configure SRv6 Policy
# Note: segment list order: first -> last
sr_policy = VppSRv6Policy(
- self, bsid=bsid,
+ self,
+ bsid=bsid,
is_encap=1,
sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
- weight=1, fib_table=0,
- segments=['a4::', 'a5::', 'a6::c7'],
- source='a3::')
+ weight=1,
+ fib_table=0,
+ segments=["a4::", "a5::", "a6::c7"],
+ source="a3::",
+ )
sr_policy.add_vpp_config()
self.sr_policy = sr_policy
@@ -171,12 +174,15 @@
# steer IPv6 traffic to a7::/64 into SRv6 Policy
# use the bsid of the above self.sr_policy
pol_steering = VppSRv6Steering(
- self,
- bsid=self.sr_policy.bsid,
- prefix="a7::", mask_width=64,
- traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6,
- sr_policy_index=0, table_id=0,
- sw_if_index=0)
+ self,
+ bsid=self.sr_policy.bsid,
+ prefix="a7::",
+ mask_width=64,
+ traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6,
+ sr_policy_index=0,
+ table_id=0,
+ sw_if_index=0,
+ )
pol_steering.add_vpp_config()
# log the sr steering policies
@@ -184,37 +190,46 @@
# create packets
count = len(self.pg_packet_sizes)
- dst_inner = 'a7::1234'
+ dst_inner = "a7::1234"
pkts = []
# create IPv6 packets without SRH
packet_header = self.create_packet_header_IPv6(dst_inner)
# create traffic stream pg0->pg1
- pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
- self.pg_packet_sizes, count))
+ pkts.extend(
+ self.create_stream(
+ self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
+ )
+ )
# create IPv6 packets with SRH
# packets with segments-left 1, active segment a7::
packet_header = self.create_packet_header_IPv6_SRH(
- sidlist=['a8::', 'a7::', 'a6::'],
- segleft=1)
+ sidlist=["a8::", "a7::", "a6::"], segleft=1
+ )
# create traffic stream pg0->pg1
- pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
- self.pg_packet_sizes, count))
+ pkts.extend(
+ self.create_stream(
+ self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
+ )
+ )
# create IPv6 packets with SRH and IPv6
# packets with segments-left 1, active segment a7::
packet_header = self.create_packet_header_IPv6_SRH_IPv6(
- dst_inner,
- sidlist=['a8::', 'a7::', 'a6::'],
- segleft=1)
+ dst_inner, sidlist=["a8::", "a7::", "a6::"], segleft=1
+ )
# create traffic stream pg0->pg1
- pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
- self.pg_packet_sizes, count))
+ pkts.extend(
+ self.create_stream(
+ self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
+ )
+ )
# send packets and verify received packets
- self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
- self.compare_rx_tx_packet_T_Encaps)
+ self.send_and_verify_pkts(
+ self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_T_Encaps
+ )
# log the localsid counters
self.logger.info(self.vapi.cli("show sr localsid"))
@@ -235,16 +250,15 @@
@unittest.skipUnless(0, "PC to fix")
def test_SRv6_T_Insert(self):
- """ Test SRv6 Transit.Insert behavior (IPv6 only).
- """
+ """Test SRv6 Transit.Insert behavior (IPv6 only)."""
# send traffic to one destination interface
# source and destination are IPv6 only
self.setup_interfaces(ipv6=[True, True])
# configure FIB entries
- route = VppIpRoute(self, "a4::", 64,
- [VppRoutePath(self.pg1.remote_ip6,
- self.pg1.sw_if_index)])
+ route = VppIpRoute(
+ self, "a4::", 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
+ )
route.add_vpp_config()
# configure encaps IPv6 source address
@@ -252,16 +266,19 @@
# TODO: API?
self.vapi.cli("set sr encaps source addr a3::")
- bsid = 'a3::9999:1'
+ bsid = "a3::9999:1"
# configure SRv6 Policy
# Note: segment list order: first -> last
sr_policy = VppSRv6Policy(
- self, bsid=bsid,
+ self,
+ bsid=bsid,
is_encap=0,
sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
- weight=1, fib_table=0,
- segments=['a4::', 'a5::', 'a6::c7'],
- source='a3::')
+ weight=1,
+ fib_table=0,
+ segments=["a4::", "a5::", "a6::c7"],
+ source="a3::",
+ )
sr_policy.add_vpp_config()
self.sr_policy = sr_policy
@@ -271,12 +288,15 @@
# steer IPv6 traffic to a7::/64 into SRv6 Policy
# use the bsid of the above self.sr_policy
pol_steering = VppSRv6Steering(
- self,
- bsid=self.sr_policy.bsid,
- prefix="a7::", mask_width=64,
- traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6,
- sr_policy_index=0, table_id=0,
- sw_if_index=0)
+ self,
+ bsid=self.sr_policy.bsid,
+ prefix="a7::",
+ mask_width=64,
+ traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6,
+ sr_policy_index=0,
+ table_id=0,
+ sw_if_index=0,
+ )
pol_steering.add_vpp_config()
# log the sr steering policies
@@ -284,27 +304,34 @@
# create packets
count = len(self.pg_packet_sizes)
- dst_inner = 'a7::1234'
+ dst_inner = "a7::1234"
pkts = []
# create IPv6 packets without SRH
packet_header = self.create_packet_header_IPv6(dst_inner)
# create traffic stream pg0->pg1
- pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
- self.pg_packet_sizes, count))
+ pkts.extend(
+ self.create_stream(
+ self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
+ )
+ )
# create IPv6 packets with SRH
# packets with segments-left 1, active segment a7::
packet_header = self.create_packet_header_IPv6_SRH(
- sidlist=['a8::', 'a7::', 'a6::'],
- segleft=1)
+ sidlist=["a8::", "a7::", "a6::"], segleft=1
+ )
# create traffic stream pg0->pg1
- pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
- self.pg_packet_sizes, count))
+ pkts.extend(
+ self.create_stream(
+ self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
+ )
+ )
# send packets and verify received packets
- self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
- self.compare_rx_tx_packet_T_Insert)
+ self.send_and_verify_pkts(
+ self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_T_Insert
+ )
# log the localsid counters
self.logger.info(self.vapi.cli("show sr localsid"))
@@ -325,17 +352,16 @@
@unittest.skipUnless(0, "PC to fix")
def test_SRv6_T_Encaps_IPv4(self):
- """ Test SRv6 Transit.Encaps behavior for IPv4.
- """
+ """Test SRv6 Transit.Encaps behavior for IPv4."""
# send traffic to one destination interface
# source interface is IPv4 only
# destination interface is IPv6 only
self.setup_interfaces(ipv6=[False, True], ipv4=[True, False])
# configure FIB entries
- route = VppIpRoute(self, "a4::", 64,
- [VppRoutePath(self.pg1.remote_ip6,
- self.pg1.sw_if_index)])
+ route = VppIpRoute(
+ self, "a4::", 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
+ )
route.add_vpp_config()
# configure encaps IPv6 source address
@@ -343,16 +369,19 @@
# TODO: API?
self.vapi.cli("set sr encaps source addr a3::")
- bsid = 'a3::9999:1'
+ bsid = "a3::9999:1"
# configure SRv6 Policy
# Note: segment list order: first -> last
sr_policy = VppSRv6Policy(
- self, bsid=bsid,
+ self,
+ bsid=bsid,
is_encap=1,
sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
- weight=1, fib_table=0,
- segments=['a4::', 'a5::', 'a6::c7'],
- source='a3::')
+ weight=1,
+ fib_table=0,
+ segments=["a4::", "a5::", "a6::c7"],
+ source="a3::",
+ )
sr_policy.add_vpp_config()
self.sr_policy = sr_policy
@@ -362,12 +391,15 @@
# steer IPv4 traffic to 7.1.1.0/24 into SRv6 Policy
# use the bsid of the above self.sr_policy
pol_steering = VppSRv6Steering(
- self,
- bsid=self.sr_policy.bsid,
- prefix="7.1.1.0", mask_width=24,
- traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV4,
- sr_policy_index=0, table_id=0,
- sw_if_index=0)
+ self,
+ bsid=self.sr_policy.bsid,
+ prefix="7.1.1.0",
+ mask_width=24,
+ traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV4,
+ sr_policy_index=0,
+ table_id=0,
+ sw_if_index=0,
+ )
pol_steering.add_vpp_config()
# log the sr steering policies
@@ -375,18 +407,22 @@
# create packets
count = len(self.pg_packet_sizes)
- dst_inner = '7.1.1.123'
+ dst_inner = "7.1.1.123"
pkts = []
# create IPv4 packets
packet_header = self.create_packet_header_IPv4(dst_inner)
# create traffic stream pg0->pg1
- pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
- self.pg_packet_sizes, count))
+ pkts.extend(
+ self.create_stream(
+ self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
+ )
+ )
# send packets and verify received packets
- self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
- self.compare_rx_tx_packet_T_Encaps_IPv4)
+ self.send_and_verify_pkts(
+ self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_T_Encaps_IPv4
+ )
# log the localsid counters
self.logger.info(self.vapi.cli("show sr localsid"))
@@ -407,17 +443,16 @@
@unittest.skip("VPP crashes after running this test")
def test_SRv6_T_Encaps_L2(self):
- """ Test SRv6 Transit.Encaps behavior for L2.
- """
+ """Test SRv6 Transit.Encaps behavior for L2."""
# send traffic to one destination interface
# source interface is IPv4 only TODO?
# destination interface is IPv6 only
self.setup_interfaces(ipv6=[False, True], ipv4=[False, False])
# configure FIB entries
- route = VppIpRoute(self, "a4::", 64,
- [VppRoutePath(self.pg1.remote_ip6,
- self.pg1.sw_if_index)])
+ route = VppIpRoute(
+ self, "a4::", 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
+ )
route.add_vpp_config()
# configure encaps IPv6 source address
@@ -425,16 +460,19 @@
# TODO: API?
self.vapi.cli("set sr encaps source addr a3::")
- bsid = 'a3::9999:1'
+ bsid = "a3::9999:1"
# configure SRv6 Policy
# Note: segment list order: first -> last
sr_policy = VppSRv6Policy(
- self, bsid=bsid,
+ self,
+ bsid=bsid,
is_encap=1,
sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
- weight=1, fib_table=0,
- segments=['a4::', 'a5::', 'a6::c7'],
- source='a3::')
+ weight=1,
+ fib_table=0,
+ segments=["a4::", "a5::", "a6::c7"],
+ source="a3::",
+ )
sr_policy.add_vpp_config()
self.sr_policy = sr_policy
@@ -444,12 +482,15 @@
# steer L2 traffic into SRv6 Policy
# use the bsid of the above self.sr_policy
pol_steering = VppSRv6Steering(
- self,
- bsid=self.sr_policy.bsid,
- prefix="::", mask_width=0,
- traffic_type=SRv6PolicySteeringTypes.SR_STEER_L2,
- sr_policy_index=0, table_id=0,
- sw_if_index=self.pg0.sw_if_index)
+ self,
+ bsid=self.sr_policy.bsid,
+ prefix="::",
+ mask_width=0,
+ traffic_type=SRv6PolicySteeringTypes.SR_STEER_L2,
+ sr_policy_index=0,
+ table_id=0,
+ sw_if_index=self.pg0.sw_if_index,
+ )
pol_steering.add_vpp_config()
# log the sr steering policies
@@ -462,18 +503,25 @@
# create L2 packets without dot1q header
packet_header = self.create_packet_header_L2()
# create traffic stream pg0->pg1
- pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
- self.pg_packet_sizes, count))
+ pkts.extend(
+ self.create_stream(
+ self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
+ )
+ )
# create L2 packets with dot1q header
packet_header = self.create_packet_header_L2(vlan=123)
# create traffic stream pg0->pg1
- pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
- self.pg_packet_sizes, count))
+ pkts.extend(
+ self.create_stream(
+ self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
+ )
+ )
# send packets and verify received packets
- self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
- self.compare_rx_tx_packet_T_Encaps_L2)
+ self.send_and_verify_pkts(
+ self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_T_Encaps_L2
+ )
# log the localsid counters
self.logger.info(self.vapi.cli("show sr localsid"))
@@ -493,27 +541,28 @@
self.teardown_interfaces()
def test_SRv6_End(self):
- """ Test SRv6 End (without PSP) behavior.
- """
+ """Test SRv6 End (without PSP) behavior."""
# send traffic to one destination interface
# source and destination interfaces are IPv6 only
self.setup_interfaces(ipv6=[True, True])
# configure FIB entries
- route = VppIpRoute(self, "a4::", 64,
- [VppRoutePath(self.pg1.remote_ip6,
- self.pg1.sw_if_index)])
+ route = VppIpRoute(
+ self, "a4::", 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
+ )
route.add_vpp_config()
# configure SRv6 localSID End without PSP behavior
localsid = VppSRv6LocalSID(
- self, localsid='A3::0',
- behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_END,
- nh_addr=0,
- end_psp=0,
- sw_if_index=0,
- vlan_index=0,
- fib_table=0)
+ self,
+ localsid="A3::0",
+ behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_END,
+ nh_addr=0,
+ end_psp=0,
+ sw_if_index=0,
+ vlan_index=0,
+ fib_table=0,
+ )
localsid.add_vpp_config()
# log the localsids
self.logger.debug(self.vapi.cli("show sr localsid"))
@@ -522,41 +571,52 @@
# send one packet per SL value per packet size
# SL=0 packet with localSID End with USP needs 2nd SRH
count = len(self.pg_packet_sizes)
- dst_inner = 'a4::1234'
+ dst_inner = "a4::1234"
pkts = []
# packets with segments-left 2, active segment a3::
packet_header = self.create_packet_header_IPv6_SRH_IPv6(
- dst_inner,
- sidlist=['a5::', 'a4::', 'a3::'],
- segleft=2)
+ dst_inner, sidlist=["a5::", "a4::", "a3::"], segleft=2
+ )
# create traffic stream pg0->pg1
- pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
- self.pg_packet_sizes, count))
+ pkts.extend(
+ self.create_stream(
+ self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
+ )
+ )
# packets with segments-left 1, active segment a3::
packet_header = self.create_packet_header_IPv6_SRH_IPv6(
- dst_inner,
- sidlist=['a4::', 'a3::', 'a2::'],
- segleft=1)
+ dst_inner, sidlist=["a4::", "a3::", "a2::"], segleft=1
+ )
# add to traffic stream pg0->pg1
- pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
- self.pg_packet_sizes, count))
+ pkts.extend(
+ self.create_stream(
+ self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
+ )
+ )
# TODO: test behavior with SL=0 packet (needs 2*SRH?)
expected_count = len(pkts)
# packets without SRH (should not crash)
- packet_header = self.create_packet_header_IPv6('a3::')
+ packet_header = self.create_packet_header_IPv6("a3::")
# create traffic stream pg0->pg1
- pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
- self.pg_packet_sizes, count))
+ pkts.extend(
+ self.create_stream(
+ self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
+ )
+ )
# send packets and verify received packets
- self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
- self.compare_rx_tx_packet_End,
- expected_count=expected_count)
+ self.send_and_verify_pkts(
+ self.pg0,
+ pkts,
+ self.pg1,
+ self.compare_rx_tx_packet_End,
+ expected_count=expected_count,
+ )
# log the localsid counters
self.logger.info(self.vapi.cli("show sr localsid"))
@@ -571,27 +631,28 @@
self.teardown_interfaces()
def test_SRv6_End_with_PSP(self):
- """ Test SRv6 End with PSP behavior.
- """
+ """Test SRv6 End with PSP behavior."""
# send traffic to one destination interface
# source and destination interfaces are IPv6 only
self.setup_interfaces(ipv6=[True, True])
# configure FIB entries
- route = VppIpRoute(self, "a4::", 64,
- [VppRoutePath(self.pg1.remote_ip6,
- self.pg1.sw_if_index)])
+ route = VppIpRoute(
+ self, "a4::", 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
+ )
route.add_vpp_config()
# configure SRv6 localSID End with PSP behavior
localsid = VppSRv6LocalSID(
- self, localsid='A3::0',
- behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_END,
- nh_addr=0,
- end_psp=1,
- sw_if_index=0,
- vlan_index=0,
- fib_table=0)
+ self,
+ localsid="A3::0",
+ behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_END,
+ nh_addr=0,
+ end_psp=1,
+ sw_if_index=0,
+ vlan_index=0,
+ fib_table=0,
+ )
localsid.add_vpp_config()
# log the localsids
self.logger.debug(self.vapi.cli("show sr localsid"))
@@ -600,30 +661,35 @@
# send one packet per SL value per packet size
# SL=0 packet with localSID End with PSP is dropped
count = len(self.pg_packet_sizes)
- dst_inner = 'a4::1234'
+ dst_inner = "a4::1234"
pkts = []
# packets with segments-left 2, active segment a3::
packet_header = self.create_packet_header_IPv6_SRH_IPv6(
- dst_inner,
- sidlist=['a5::', 'a4::', 'a3::'],
- segleft=2)
+ dst_inner, sidlist=["a5::", "a4::", "a3::"], segleft=2
+ )
# create traffic stream pg0->pg1
- pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
- self.pg_packet_sizes, count))
+ pkts.extend(
+ self.create_stream(
+ self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
+ )
+ )
# packets with segments-left 1, active segment a3::
packet_header = self.create_packet_header_IPv6_SRH_IPv6(
- dst_inner,
- sidlist=['a4::', 'a3::', 'a2::'],
- segleft=1)
+ dst_inner, sidlist=["a4::", "a3::", "a2::"], segleft=1
+ )
# add to traffic stream pg0->pg1
- pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
- self.pg_packet_sizes, count))
+ pkts.extend(
+ self.create_stream(
+ self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
+ )
+ )
# send packets and verify received packets
- self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
- self.compare_rx_tx_packet_End_PSP)
+ self.send_and_verify_pkts(
+ self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_End_PSP
+ )
# log the localsid counters
self.logger.info(self.vapi.cli("show sr localsid"))
@@ -638,32 +704,37 @@
self.teardown_interfaces()
def test_SRv6_End_X(self):
- """ Test SRv6 End.X (without PSP) behavior.
- """
+ """Test SRv6 End.X (without PSP) behavior."""
# create three interfaces (1 source, 2 destinations)
# source and destination interfaces are IPv6 only
self.setup_interfaces(ipv6=[True, True, True])
# configure FIB entries
# a4::/64 via pg1 and pg2
- route = VppIpRoute(self, "a4::", 64,
- [VppRoutePath(self.pg1.remote_ip6,
- self.pg1.sw_if_index),
- VppRoutePath(self.pg2.remote_ip6,
- self.pg2.sw_if_index)])
+ route = VppIpRoute(
+ self,
+ "a4::",
+ 64,
+ [
+ VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index),
+ VppRoutePath(self.pg2.remote_ip6, self.pg2.sw_if_index),
+ ],
+ )
route.add_vpp_config()
self.logger.debug(self.vapi.cli("show ip6 fib"))
# configure SRv6 localSID End.X without PSP behavior
# End.X points to interface pg1
localsid = VppSRv6LocalSID(
- self, localsid='A3::C4',
- behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_X,
- nh_addr=self.pg1.remote_ip6,
- end_psp=0,
- sw_if_index=self.pg1.sw_if_index,
- vlan_index=0,
- fib_table=0)
+ self,
+ localsid="A3::C4",
+ behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_X,
+ nh_addr=self.pg1.remote_ip6,
+ end_psp=0,
+ sw_if_index=self.pg1.sw_if_index,
+ vlan_index=0,
+ fib_table=0,
+ )
localsid.add_vpp_config()
# log the localsids
self.logger.debug(self.vapi.cli("show sr localsid"))
@@ -672,31 +743,36 @@
# send one packet per SL value per packet size
# SL=0 packet with localSID End with PSP is dropped
count = len(self.pg_packet_sizes)
- dst_inner = 'a4::1234'
+ dst_inner = "a4::1234"
pkts = []
# packets with segments-left 2, active segment a3::c4
packet_header = self.create_packet_header_IPv6_SRH_IPv6(
- dst_inner,
- sidlist=['a5::', 'a4::', 'a3::c4'],
- segleft=2)
+ dst_inner, sidlist=["a5::", "a4::", "a3::c4"], segleft=2
+ )
# create traffic stream pg0->pg1
- pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
- self.pg_packet_sizes, count))
+ pkts.extend(
+ self.create_stream(
+ self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
+ )
+ )
# packets with segments-left 1, active segment a3::c4
packet_header = self.create_packet_header_IPv6_SRH_IPv6(
- dst_inner,
- sidlist=['a4::', 'a3::c4', 'a2::'],
- segleft=1)
+ dst_inner, sidlist=["a4::", "a3::c4", "a2::"], segleft=1
+ )
# add to traffic stream pg0->pg1
- pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
- self.pg_packet_sizes, count))
+ pkts.extend(
+ self.create_stream(
+ self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
+ )
+ )
# send packets and verify received packets
# using same comparison function as End (no PSP)
- self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
- self.compare_rx_tx_packet_End)
+ self.send_and_verify_pkts(
+ self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_End
+ )
# assert nothing was received on the other interface (pg2)
self.pg2.assert_nothing_captured(remark="mis-directed packet(s)")
@@ -714,31 +790,35 @@
self.teardown_interfaces()
def test_SRv6_End_X_with_PSP(self):
- """ Test SRv6 End.X with PSP behavior.
- """
+ """Test SRv6 End.X with PSP behavior."""
# create three interfaces (1 source, 2 destinations)
# source and destination interfaces are IPv6 only
self.setup_interfaces(ipv6=[True, True, True])
# configure FIB entries
# a4::/64 via pg1 and pg2
- route = VppIpRoute(self, "a4::", 64,
- [VppRoutePath(
- self.pg1.remote_ip6,
- self.pg1.sw_if_index),
- VppRoutePath(self.pg2.remote_ip6,
- self.pg2.sw_if_index)])
+ route = VppIpRoute(
+ self,
+ "a4::",
+ 64,
+ [
+ VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index),
+ VppRoutePath(self.pg2.remote_ip6, self.pg2.sw_if_index),
+ ],
+ )
route.add_vpp_config()
# configure SRv6 localSID End with PSP behavior
localsid = VppSRv6LocalSID(
- self, localsid='A3::C4',
- behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_X,
- nh_addr=self.pg1.remote_ip6,
- end_psp=1,
- sw_if_index=self.pg1.sw_if_index,
- vlan_index=0,
- fib_table=0)
+ self,
+ localsid="A3::C4",
+ behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_X,
+ nh_addr=self.pg1.remote_ip6,
+ end_psp=1,
+ sw_if_index=self.pg1.sw_if_index,
+ vlan_index=0,
+ fib_table=0,
+ )
localsid.add_vpp_config()
# log the localsids
self.logger.debug(self.vapi.cli("show sr localsid"))
@@ -747,31 +827,36 @@
# send one packet per SL value per packet size
# SL=0 packet with localSID End with PSP is dropped
count = len(self.pg_packet_sizes)
- dst_inner = 'a4::1234'
+ dst_inner = "a4::1234"
pkts = []
# packets with segments-left 2, active segment a3::
packet_header = self.create_packet_header_IPv6_SRH_IPv6(
- dst_inner,
- sidlist=['a5::', 'a4::', 'a3::c4'],
- segleft=2)
+ dst_inner, sidlist=["a5::", "a4::", "a3::c4"], segleft=2
+ )
# create traffic stream pg0->pg1
- pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
- self.pg_packet_sizes, count))
+ pkts.extend(
+ self.create_stream(
+ self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
+ )
+ )
# packets with segments-left 1, active segment a3::
packet_header = self.create_packet_header_IPv6_SRH_IPv6(
- dst_inner,
- sidlist=['a4::', 'a3::c4', 'a2::'],
- segleft=1)
+ dst_inner, sidlist=["a4::", "a3::c4", "a2::"], segleft=1
+ )
# add to traffic stream pg0->pg1
- pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
- self.pg_packet_sizes, count))
+ pkts.extend(
+ self.create_stream(
+ self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
+ )
+ )
# send packets and verify received packets
# using same comparison function as End with PSP
- self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
- self.compare_rx_tx_packet_End_PSP)
+ self.send_and_verify_pkts(
+ self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_End_PSP
+ )
# assert nothing was received on the other interface (pg2)
self.pg2.assert_nothing_captured(remark="mis-directed packet(s)")
@@ -789,21 +874,22 @@
self.teardown_interfaces()
def test_SRv6_End_DX6(self):
- """ Test SRv6 End.DX6 behavior.
- """
+ """Test SRv6 End.DX6 behavior."""
# send traffic to one destination interface
# source and destination interfaces are IPv6 only
self.setup_interfaces(ipv6=[True, True])
# configure SRv6 localSID End.DX6 behavior
localsid = VppSRv6LocalSID(
- self, localsid='A3::C4',
- behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX6,
- nh_addr=self.pg1.remote_ip6,
- end_psp=0,
- sw_if_index=self.pg1.sw_if_index,
- vlan_index=0,
- fib_table=0)
+ self,
+ localsid="A3::C4",
+ behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX6,
+ nh_addr=self.pg1.remote_ip6,
+ end_psp=0,
+ sw_if_index=self.pg1.sw_if_index,
+ vlan_index=0,
+ fib_table=0,
+ )
localsid.add_vpp_config()
# log the localsids
self.logger.debug(self.vapi.cli("show sr localsid"))
@@ -811,30 +897,36 @@
# create IPv6 packets with SRH (SL=0)
# send one packet per packet size
count = len(self.pg_packet_sizes)
- dst_inner = 'a4::1234' # inner header destination address
+ dst_inner = "a4::1234" # inner header destination address
pkts = []
# packets with SRH, segments-left 0, active segment a3::c4
packet_header = self.create_packet_header_IPv6_SRH_IPv6(
- dst_inner,
- sidlist=['a3::c4', 'a2::', 'a1::'],
- segleft=0)
+ dst_inner, sidlist=["a3::c4", "a2::", "a1::"], segleft=0
+ )
# add to traffic stream pg0->pg1
- pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
- self.pg_packet_sizes, count))
+ pkts.extend(
+ self.create_stream(
+ self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
+ )
+ )
# packets without SRH, IPv6 in IPv6
# outer IPv6 dest addr is the localsid End.DX6
packet_header = self.create_packet_header_IPv6_IPv6(
- dst_inner,
- dst_outer='a3::c4')
+ dst_inner, dst_outer="a3::c4"
+ )
# add to traffic stream pg0->pg1
- pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
- self.pg_packet_sizes, count))
+ pkts.extend(
+ self.create_stream(
+ self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
+ )
+ )
# send packets and verify received packets
- self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
- self.compare_rx_tx_packet_End_DX6)
+ self.send_and_verify_pkts(
+ self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_End_DX6
+ )
# log the localsid counters
self.logger.info(self.vapi.cli("show sr localsid"))
@@ -846,8 +938,7 @@
self.teardown_interfaces()
def test_SRv6_End_DT6(self):
- """ Test SRv6 End.DT6 behavior.
- """
+ """Test SRv6 End.DT6 behavior."""
# create three interfaces (1 source, 2 destinations)
# all interfaces are IPv6 only
# source interface in global FIB (0)
@@ -855,24 +946,31 @@
vrf_1 = 1
ipt = VppIpTable(self, vrf_1, is_ip6=True)
ipt.add_vpp_config()
- self.setup_interfaces(ipv6=[True, True, True],
- ipv6_table_id=[0, 0, vrf_1])
+ self.setup_interfaces(ipv6=[True, True, True], ipv6_table_id=[0, 0, vrf_1])
# configure FIB entries
# a4::/64 is reachable
# via pg1 in table 0 (global)
# and via pg2 in table vrf_1
- route0 = VppIpRoute(self, "a4::", 64,
- [VppRoutePath(self.pg1.remote_ip6,
- self.pg1.sw_if_index,
- nh_table_id=0)],
- table_id=0)
+ route0 = VppIpRoute(
+ self,
+ "a4::",
+ 64,
+ [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index, nh_table_id=0)],
+ table_id=0,
+ )
route0.add_vpp_config()
- route1 = VppIpRoute(self, "a4::", 64,
- [VppRoutePath(self.pg2.remote_ip6,
- self.pg2.sw_if_index,
- nh_table_id=vrf_1)],
- table_id=vrf_1)
+ route1 = VppIpRoute(
+ self,
+ "a4::",
+ 64,
+ [
+ VppRoutePath(
+ self.pg2.remote_ip6, self.pg2.sw_if_index, nh_table_id=vrf_1
+ )
+ ],
+ table_id=vrf_1,
+ )
route1.add_vpp_config()
self.logger.debug(self.vapi.cli("show ip6 fib"))
@@ -881,13 +979,15 @@
# fib_table: where the localsid is installed
# sw_if_index: in T-variants of localsid this is the vrf table_id
localsid = VppSRv6LocalSID(
- self, localsid='A3::C4',
- behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DT6,
- nh_addr=0,
- end_psp=0,
- sw_if_index=vrf_1,
- vlan_index=0,
- fib_table=0)
+ self,
+ localsid="A3::C4",
+ behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DT6,
+ nh_addr=0,
+ end_psp=0,
+ sw_if_index=vrf_1,
+ vlan_index=0,
+ fib_table=0,
+ )
localsid.add_vpp_config()
# log the localsids
self.logger.debug(self.vapi.cli("show sr localsid"))
@@ -895,31 +995,37 @@
# create IPv6 packets with SRH (SL=0)
# send one packet per packet size
count = len(self.pg_packet_sizes)
- dst_inner = 'a4::1234' # inner header destination address
+ dst_inner = "a4::1234" # inner header destination address
pkts = []
# packets with SRH, segments-left 0, active segment a3::c4
packet_header = self.create_packet_header_IPv6_SRH_IPv6(
- dst_inner,
- sidlist=['a3::c4', 'a2::', 'a1::'],
- segleft=0)
+ dst_inner, sidlist=["a3::c4", "a2::", "a1::"], segleft=0
+ )
# add to traffic stream pg0->pg1
- pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
- self.pg_packet_sizes, count))
+ pkts.extend(
+ self.create_stream(
+ self.pg0, self.pg2, packet_header, self.pg_packet_sizes, count
+ )
+ )
# packets without SRH, IPv6 in IPv6
# outer IPv6 dest addr is the localsid End.DT6
packet_header = self.create_packet_header_IPv6_IPv6(
- dst_inner,
- dst_outer='a3::c4')
+ dst_inner, dst_outer="a3::c4"
+ )
# add to traffic stream pg0->pg1
- pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
- self.pg_packet_sizes, count))
+ pkts.extend(
+ self.create_stream(
+ self.pg0, self.pg2, packet_header, self.pg_packet_sizes, count
+ )
+ )
# send packets and verify received packets
# using same comparison function as End.DX6
- self.send_and_verify_pkts(self.pg0, pkts, self.pg2,
- self.compare_rx_tx_packet_End_DX6)
+ self.send_and_verify_pkts(
+ self.pg0, pkts, self.pg2, self.compare_rx_tx_packet_End_DX6
+ )
# assert nothing was received on the other interface (pg2)
self.pg1.assert_nothing_captured(remark="mis-directed packet(s)")
@@ -937,8 +1043,7 @@
self.teardown_interfaces()
def test_SRv6_End_DX4(self):
- """ Test SRv6 End.DX4 behavior.
- """
+ """Test SRv6 End.DX4 behavior."""
# send traffic to one destination interface
# source interface is IPv6 only
# destination interface is IPv4 only
@@ -946,43 +1051,51 @@
# configure SRv6 localSID End.DX4 behavior
localsid = VppSRv6LocalSID(
- self, localsid='A3::C4',
- behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX4,
- nh_addr=self.pg1.remote_ip4,
- end_psp=0,
- sw_if_index=self.pg1.sw_if_index,
- vlan_index=0,
- fib_table=0)
+ self,
+ localsid="A3::C4",
+ behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX4,
+ nh_addr=self.pg1.remote_ip4,
+ end_psp=0,
+ sw_if_index=self.pg1.sw_if_index,
+ vlan_index=0,
+ fib_table=0,
+ )
localsid.add_vpp_config()
# log the localsids
self.logger.debug(self.vapi.cli("show sr localsid"))
# send one packet per packet size
count = len(self.pg_packet_sizes)
- dst_inner = '4.1.1.123' # inner header destination address
+ dst_inner = "4.1.1.123" # inner header destination address
pkts = []
# packets with SRH, segments-left 0, active segment a3::c4
packet_header = self.create_packet_header_IPv6_SRH_IPv4(
- dst_inner,
- sidlist=['a3::c4', 'a2::', 'a1::'],
- segleft=0)
+ dst_inner, sidlist=["a3::c4", "a2::", "a1::"], segleft=0
+ )
# add to traffic stream pg0->pg1
- pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
- self.pg_packet_sizes, count))
+ pkts.extend(
+ self.create_stream(
+ self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
+ )
+ )
# packets without SRH, IPv4 in IPv6
# outer IPv6 dest addr is the localsid End.DX4
packet_header = self.create_packet_header_IPv6_IPv4(
- dst_inner,
- dst_outer='a3::c4')
+ dst_inner, dst_outer="a3::c4"
+ )
# add to traffic stream pg0->pg1
- pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
- self.pg_packet_sizes, count))
+ pkts.extend(
+ self.create_stream(
+ self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
+ )
+ )
# send packets and verify received packets
- self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
- self.compare_rx_tx_packet_End_DX4)
+ self.send_and_verify_pkts(
+ self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_End_DX4
+ )
# log the localsid counters
self.logger.info(self.vapi.cli("show sr localsid"))
@@ -994,8 +1107,7 @@
self.teardown_interfaces()
def test_SRv6_End_DT4(self):
- """ Test SRv6 End.DT4 behavior.
- """
+ """Test SRv6 End.DT4 behavior."""
# create three interfaces (1 source, 2 destinations)
# source interface is IPv6-only
# destination interfaces are IPv4 only
@@ -1004,26 +1116,36 @@
vrf_1 = 1
ipt = VppIpTable(self, vrf_1)
ipt.add_vpp_config()
- self.setup_interfaces(ipv6=[True, False, False],
- ipv4=[False, True, True],
- ipv6_table_id=[0, 0, 0],
- ipv4_table_id=[0, 0, vrf_1])
+ self.setup_interfaces(
+ ipv6=[True, False, False],
+ ipv4=[False, True, True],
+ ipv6_table_id=[0, 0, 0],
+ ipv4_table_id=[0, 0, vrf_1],
+ )
# configure FIB entries
# 4.1.1.0/24 is reachable
# via pg1 in table 0 (global)
# and via pg2 in table vrf_1
- route0 = VppIpRoute(self, "4.1.1.0", 24,
- [VppRoutePath(self.pg1.remote_ip4,
- self.pg1.sw_if_index,
- nh_table_id=0)],
- table_id=0)
+ route0 = VppIpRoute(
+ self,
+ "4.1.1.0",
+ 24,
+ [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index, nh_table_id=0)],
+ table_id=0,
+ )
route0.add_vpp_config()
- route1 = VppIpRoute(self, "4.1.1.0", 24,
- [VppRoutePath(self.pg2.remote_ip4,
- self.pg2.sw_if_index,
- nh_table_id=vrf_1)],
- table_id=vrf_1)
+ route1 = VppIpRoute(
+ self,
+ "4.1.1.0",
+ 24,
+ [
+ VppRoutePath(
+ self.pg2.remote_ip4, self.pg2.sw_if_index, nh_table_id=vrf_1
+ )
+ ],
+ table_id=vrf_1,
+ )
route1.add_vpp_config()
self.logger.debug(self.vapi.cli("show ip fib"))
@@ -1032,13 +1154,15 @@
# fib_table: where the localsid is installed
# sw_if_index: in T-variants of localsid: vrf table_id
localsid = VppSRv6LocalSID(
- self, localsid='A3::C4',
- behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DT4,
- nh_addr=0,
- end_psp=0,
- sw_if_index=vrf_1,
- vlan_index=0,
- fib_table=0)
+ self,
+ localsid="A3::C4",
+ behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DT4,
+ nh_addr=0,
+ end_psp=0,
+ sw_if_index=vrf_1,
+ vlan_index=0,
+ fib_table=0,
+ )
localsid.add_vpp_config()
# log the localsids
self.logger.debug(self.vapi.cli("show sr localsid"))
@@ -1046,31 +1170,37 @@
# create IPv6 packets with SRH (SL=0)
# send one packet per packet size
count = len(self.pg_packet_sizes)
- dst_inner = '4.1.1.123' # inner header destination address
+ dst_inner = "4.1.1.123" # inner header destination address
pkts = []
# packets with SRH, segments-left 0, active segment a3::c4
packet_header = self.create_packet_header_IPv6_SRH_IPv4(
- dst_inner,
- sidlist=['a3::c4', 'a2::', 'a1::'],
- segleft=0)
+ dst_inner, sidlist=["a3::c4", "a2::", "a1::"], segleft=0
+ )
# add to traffic stream pg0->pg1
- pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
- self.pg_packet_sizes, count))
+ pkts.extend(
+ self.create_stream(
+ self.pg0, self.pg2, packet_header, self.pg_packet_sizes, count
+ )
+ )
# packets without SRH, IPv6 in IPv6
# outer IPv6 dest addr is the localsid End.DX4
packet_header = self.create_packet_header_IPv6_IPv4(
- dst_inner,
- dst_outer='a3::c4')
+ dst_inner, dst_outer="a3::c4"
+ )
# add to traffic stream pg0->pg1
- pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
- self.pg_packet_sizes, count))
+ pkts.extend(
+ self.create_stream(
+ self.pg0, self.pg2, packet_header, self.pg_packet_sizes, count
+ )
+ )
# send packets and verify received packets
# using same comparison function as End.DX4
- self.send_and_verify_pkts(self.pg0, pkts, self.pg2,
- self.compare_rx_tx_packet_End_DX4)
+ self.send_and_verify_pkts(
+ self.pg0, pkts, self.pg2, self.compare_rx_tx_packet_End_DX4
+ )
# assert nothing was received on the other interface (pg2)
self.pg1.assert_nothing_captured(remark="mis-directed packet(s)")
@@ -1088,21 +1218,22 @@
self.teardown_interfaces()
def test_SRv6_End_DX2(self):
- """ Test SRv6 End.DX2 behavior.
- """
+ """Test SRv6 End.DX2 behavior."""
# send traffic to one destination interface
# source interface is IPv6 only
self.setup_interfaces(ipv6=[True, False], ipv4=[False, False])
# configure SRv6 localSID End.DX2 behavior
localsid = VppSRv6LocalSID(
- self, localsid='A3::C4',
- behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX2,
- nh_addr=0,
- end_psp=0,
- sw_if_index=self.pg1.sw_if_index,
- vlan_index=0,
- fib_table=0)
+ self,
+ localsid="A3::C4",
+ behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX2,
+ nh_addr=0,
+ end_psp=0,
+ sw_if_index=self.pg1.sw_if_index,
+ vlan_index=0,
+ fib_table=0,
+ )
localsid.add_vpp_config()
# log the localsids
self.logger.debug(self.vapi.cli("show sr localsid"))
@@ -1114,46 +1245,53 @@
# packets with SRH, segments-left 0, active segment a3::c4
# L2 has no dot1q header
packet_header = self.create_packet_header_IPv6_SRH_L2(
- sidlist=['a3::c4', 'a2::', 'a1::'],
- segleft=0,
- vlan=0)
+ sidlist=["a3::c4", "a2::", "a1::"], segleft=0, vlan=0
+ )
# add to traffic stream pg0->pg1
- pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
- self.pg_packet_sizes, count))
+ pkts.extend(
+ self.create_stream(
+ self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
+ )
+ )
# packets with SRH, segments-left 0, active segment a3::c4
# L2 has dot1q header
packet_header = self.create_packet_header_IPv6_SRH_L2(
- sidlist=['a3::c4', 'a2::', 'a1::'],
- segleft=0,
- vlan=123)
+ sidlist=["a3::c4", "a2::", "a1::"], segleft=0, vlan=123
+ )
# add to traffic stream pg0->pg1
- pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
- self.pg_packet_sizes, count))
+ pkts.extend(
+ self.create_stream(
+ self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
+ )
+ )
# packets without SRH, L2 in IPv6
# outer IPv6 dest addr is the localsid End.DX2
# L2 has no dot1q header
- packet_header = self.create_packet_header_IPv6_L2(
- dst_outer='a3::c4',
- vlan=0)
+ packet_header = self.create_packet_header_IPv6_L2(dst_outer="a3::c4", vlan=0)
# add to traffic stream pg0->pg1
- pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
- self.pg_packet_sizes, count))
+ pkts.extend(
+ self.create_stream(
+ self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
+ )
+ )
# packets without SRH, L2 in IPv6
# outer IPv6 dest addr is the localsid End.DX2
# L2 has dot1q header
- packet_header = self.create_packet_header_IPv6_L2(
- dst_outer='a3::c4',
- vlan=123)
+ packet_header = self.create_packet_header_IPv6_L2(dst_outer="a3::c4", vlan=123)
# add to traffic stream pg0->pg1
- pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
- self.pg_packet_sizes, count))
+ pkts.extend(
+ self.create_stream(
+ self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
+ )
+ )
# send packets and verify received packets
- self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
- self.compare_rx_tx_packet_End_DX2)
+ self.send_and_verify_pkts(
+ self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_End_DX2
+ )
# log the localsid counters
self.logger.info(self.vapi.cli("show sr localsid"))
@@ -1166,18 +1304,17 @@
@unittest.skipUnless(0, "PC to fix")
def test_SRv6_T_Insert_Classifier(self):
- """ Test SRv6 Transit.Insert behavior (IPv6 only).
- steer packets using the classifier
+ """Test SRv6 Transit.Insert behavior (IPv6 only).
+ steer packets using the classifier
"""
# send traffic to one destination interface
# source and destination are IPv6 only
self.setup_interfaces(ipv6=[False, False, False, True, True])
# configure FIB entries
- route = VppIpRoute(self, "a4::", 64,
- [VppRoutePath(
- self.pg4.remote_ip6,
- self.pg4.sw_if_index)])
+ route = VppIpRoute(
+ self, "a4::", 64, [VppRoutePath(self.pg4.remote_ip6, self.pg4.sw_if_index)]
+ )
route.add_vpp_config()
# configure encaps IPv6 source address
@@ -1185,16 +1322,19 @@
# TODO: API?
self.vapi.cli("set sr encaps source addr a3::")
- bsid = 'a3::9999:1'
+ bsid = "a3::9999:1"
# configure SRv6 Policy
# Note: segment list order: first -> last
sr_policy = VppSRv6Policy(
- self, bsid=bsid,
+ self,
+ bsid=bsid,
is_encap=0,
sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
- weight=1, fib_table=0,
- segments=['a4::', 'a5::', 'a6::c7'],
- source='a3::')
+ weight=1,
+ fib_table=0,
+ segments=["a4::", "a5::", "a6::c7"],
+ source="a3::",
+ )
sr_policy.add_vpp_config()
self.sr_policy = sr_policy
@@ -1203,75 +1343,79 @@
# add classify table
# mask on dst ip address prefix a7::/8
- mask = '{!s:0<16}'.format('ff')
+ mask = "{!s:0<16}".format("ff")
r = self.vapi.classify_add_del_table(
1,
binascii.unhexlify(mask),
match_n_vectors=(len(mask) - 1) // 32 + 1,
current_data_flag=1,
- skip_n_vectors=2) # data offset
- self.assertIsNotNone(r, 'No response msg for add_del_table')
+ skip_n_vectors=2,
+ ) # data offset
+ self.assertIsNotNone(r, "No response msg for add_del_table")
table_index = r.new_table_index
# add the source routing node as a ip6 inacl netxt node
- r = self.vapi.add_node_next('ip6-inacl',
- 'sr-pl-rewrite-insert')
+ r = self.vapi.add_node_next("ip6-inacl", "sr-pl-rewrite-insert")
inacl_next_node_index = r.node_index
- match = '{!s:0<16}'.format('a7')
+ match = "{!s:0<16}".format("a7")
r = self.vapi.classify_add_del_session(
1,
table_index,
binascii.unhexlify(match),
hit_next_index=inacl_next_node_index,
action=3,
- metadata=0) # sr policy index
- self.assertIsNotNone(r, 'No response msg for add_del_session')
+ metadata=0,
+ ) # sr policy index
+ self.assertIsNotNone(r, "No response msg for add_del_session")
# log the classify table used in the steering policy
self.logger.info(self.vapi.cli("show classify table"))
r = self.vapi.input_acl_set_interface(
- is_add=1,
- sw_if_index=self.pg3.sw_if_index,
- ip6_table_index=table_index)
- self.assertIsNotNone(r,
- 'No response msg for input_acl_set_interface')
+ is_add=1, sw_if_index=self.pg3.sw_if_index, ip6_table_index=table_index
+ )
+ self.assertIsNotNone(r, "No response msg for input_acl_set_interface")
# log the ip6 inacl
self.logger.info(self.vapi.cli("show inacl type ip6"))
# create packets
count = len(self.pg_packet_sizes)
- dst_inner = 'a7::1234'
+ dst_inner = "a7::1234"
pkts = []
# create IPv6 packets without SRH
packet_header = self.create_packet_header_IPv6(dst_inner)
# create traffic stream pg3->pg4
- pkts.extend(self.create_stream(self.pg3, self.pg4, packet_header,
- self.pg_packet_sizes, count))
+ pkts.extend(
+ self.create_stream(
+ self.pg3, self.pg4, packet_header, self.pg_packet_sizes, count
+ )
+ )
# create IPv6 packets with SRH
# packets with segments-left 1, active segment a7::
packet_header = self.create_packet_header_IPv6_SRH(
- sidlist=['a8::', 'a7::', 'a6::'],
- segleft=1)
+ sidlist=["a8::", "a7::", "a6::"], segleft=1
+ )
# create traffic stream pg3->pg4
- pkts.extend(self.create_stream(self.pg3, self.pg4, packet_header,
- self.pg_packet_sizes, count))
+ pkts.extend(
+ self.create_stream(
+ self.pg3, self.pg4, packet_header, self.pg_packet_sizes, count
+ )
+ )
# send packets and verify received packets
- self.send_and_verify_pkts(self.pg3, pkts, self.pg4,
- self.compare_rx_tx_packet_T_Insert)
+ self.send_and_verify_pkts(
+ self.pg3, pkts, self.pg4, self.compare_rx_tx_packet_T_Insert
+ )
# remove the interface l2 input feature
r = self.vapi.input_acl_set_interface(
- is_add=0,
- sw_if_index=self.pg3.sw_if_index,
- ip6_table_index=table_index)
- self.assertIsNotNone(r,
- 'No response msg for input_acl_set_interface')
+ is_add=0, sw_if_index=self.pg3.sw_if_index, ip6_table_index=table_index
+ )
+ self.assertIsNotNone(r, "No response msg for input_acl_set_interface")
# log the ip6 inacl after cleaning
self.logger.info(self.vapi.cli("show inacl type ip6"))
@@ -1289,16 +1433,14 @@
# remove classify session and table
r = self.vapi.classify_add_del_session(
- 0,
- table_index,
- binascii.unhexlify(match))
- self.assertIsNotNone(r, 'No response msg for add_del_session')
+ 0, table_index, binascii.unhexlify(match)
+ )
+ self.assertIsNotNone(r, "No response msg for add_del_session")
r = self.vapi.classify_add_del_table(
- 0,
- binascii.unhexlify(mask),
- table_index=table_index)
- self.assertIsNotNone(r, 'No response msg for add_del_table')
+ 0, binascii.unhexlify(mask), table_index=table_index
+ )
+ self.assertIsNotNone(r, "No response msg for add_del_table")
self.logger.info(self.vapi.cli("show classify table"))
@@ -1309,7 +1451,7 @@
self.teardown_interfaces()
def compare_rx_tx_packet_T_Encaps(self, tx_pkt, rx_pkt):
- """ Compare input and output packet after passing T.Encaps
+ """Compare input and output packet after passing T.Encaps
:param tx_pkt: transmitted packet
:param rx_pkt: received packet
@@ -1350,7 +1492,7 @@
# rx'ed seglist should be equal to expected seglist
self.assertEqual(rx_srh.addresses, tx_seglist)
# segleft should be equal to size expected seglist-1
- self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
+ self.assertEqual(rx_srh.segleft, len(tx_seglist) - 1)
# segleft should be equal to lastentry
self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
@@ -1364,7 +1506,7 @@
self.logger.debug("packet verification: SUCCESS")
def compare_rx_tx_packet_T_Encaps_IPv4(self, tx_pkt, rx_pkt):
- """ Compare input and output packet after passing T.Encaps for IPv4
+ """Compare input and output packet after passing T.Encaps for IPv4
:param tx_pkt: transmitted packet
:param rx_pkt: received packet
@@ -1404,7 +1546,7 @@
# rx'ed seglist should be equal to seglist
self.assertEqual(rx_srh.addresses, tx_seglist)
# segleft should be equal to size seglist-1
- self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
+ self.assertEqual(rx_srh.segleft, len(tx_seglist) - 1)
# segleft should be equal to lastentry
self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
@@ -1423,7 +1565,7 @@
self.logger.debug("packet verification: SUCCESS")
def compare_rx_tx_packet_T_Encaps_L2(self, tx_pkt, rx_pkt):
- """ Compare input and output packet after passing T.Encaps for L2
+ """Compare input and output packet after passing T.Encaps for L2
:param tx_pkt: transmitted packet
:param rx_pkt: received packet
@@ -1461,7 +1603,7 @@
# rx'ed seglist should be equal to seglist
self.assertEqual(rx_srh.addresses, tx_seglist)
# segleft should be equal to size seglist-1
- self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
+ self.assertEqual(rx_srh.segleft, len(tx_seglist) - 1)
# segleft should be equal to lastentry
self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
# nh should be "No Next Header" (143)
@@ -1473,7 +1615,7 @@
self.logger.debug("packet verification: SUCCESS")
def compare_rx_tx_packet_T_Insert(self, tx_pkt, rx_pkt):
- """ Compare input and output packet after passing T.Insert
+ """Compare input and output packet after passing T.Insert
:param tx_pkt: transmitted packet
:param rx_pkt: received packet
@@ -1529,7 +1671,7 @@
# rx'ed seglist should be equal to expected seglist
self.assertEqual(rx_srh.addresses, tx_seglist)
# segleft should be equal to size(expected seglist)-1
- self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
+ self.assertEqual(rx_srh.segleft, len(tx_seglist) - 1)
# segleft should be equal to lastentry
self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
@@ -1556,7 +1698,7 @@
self.logger.debug("packet verification: SUCCESS")
def compare_rx_tx_packet_End(self, tx_pkt, rx_pkt):
- """ Compare input and output packet after passing End (without PSP)
+ """Compare input and output packet after passing End (without PSP)
:param tx_pkt: transmitted packet
:param rx_pkt: received packet
@@ -1596,7 +1738,7 @@
# sidlist should be unchanged
self.assertEqual(rx_srh.addresses, tx_srh.addresses)
# segleft should have been decremented
- self.assertEqual(rx_srh.segleft, tx_srh.segleft-1)
+ self.assertEqual(rx_srh.segleft, tx_srh.segleft - 1)
# received ip.dst should be equal to sidlist[segleft]
self.assertEqual(rx_ip.dst, rx_srh.addresses[rx_srh.segleft])
# lastentry should be unchanged
@@ -1605,7 +1747,7 @@
self.assertEqual(rx_ip2.src, tx_ip2.src)
self.assertEqual(rx_ip2.dst, tx_ip2.dst)
# else: # tx_ip.segleft == 0
- # TODO: Does this work with 2 SRHs in ingress packet?
+ # TODO: Does this work with 2 SRHs in ingress packet?
# UDP layer should be unchanged
self.assertEqual(rx_udp, tx_udp)
@@ -1613,7 +1755,7 @@
self.logger.debug("packet verification: SUCCESS")
def compare_rx_tx_packet_End_PSP(self, tx_pkt, rx_pkt):
- """ Compare input and output packet after passing End with PSP
+ """Compare input and output packet after passing End with PSP
:param tx_pkt: transmitted packet
:param rx_pkt: received packet
@@ -1658,7 +1800,7 @@
# sidlist should be unchanged
self.assertEqual(rx_srh.addresses, tx_srh.addresses)
# segleft should have been decremented
- self.assertEqual(rx_srh.segleft, tx_srh.segleft-1)
+ self.assertEqual(rx_srh.segleft, tx_srh.segleft - 1)
# received ip.dst should be equal to sidlist[segleft]
self.assertEqual(rx_ip.dst, rx_srh.addresses[rx_srh.segleft])
# lastentry should be unchanged
@@ -1674,7 +1816,7 @@
# outer IPv6 header ip.src should be equal to tx'ed ip.src
self.assertEqual(rx_ip.src, tx_ip.src)
# outer IPv6 header ip.dst should be = to tx'ed sidlist[segleft-1]
- self.assertEqual(rx_ip.dst, tx_srh.addresses[tx_srh.segleft-1])
+ self.assertEqual(rx_ip.dst, tx_srh.addresses[tx_srh.segleft - 1])
# UDP layer should be unchanged
self.assertEqual(rx_udp, tx_udp)
@@ -1682,7 +1824,7 @@
self.logger.debug("packet verification: SUCCESS")
def compare_rx_tx_packet_End_DX6(self, tx_pkt, rx_pkt):
- """ Compare input and output packet after passing End.DX6
+ """Compare input and output packet after passing End.DX6
:param tx_pkt: transmitted packet
:param rx_pkt: received packet
@@ -1714,7 +1856,7 @@
self.logger.debug("packet verification: SUCCESS")
def compare_rx_tx_packet_End_DX4(self, tx_pkt, rx_pkt):
- """ Compare input and output packet after passing End.DX4
+ """Compare input and output packet after passing End.DX4
:param tx_pkt: transmitted packet
:param rx_pkt: received packet
@@ -1751,7 +1893,7 @@
self.logger.debug("packet verification: SUCCESS")
def compare_rx_tx_packet_End_DX2(self, tx_pkt, rx_pkt):
- """ Compare input and output packet after passing End.DX2
+ """Compare input and output packet after passing End.DX2
:param tx_pkt: transmitted packet
:param rx_pkt: received packet
@@ -1780,8 +1922,7 @@
self.logger.debug("packet verification: SUCCESS")
- def create_stream(self, src_if, dst_if, packet_header, packet_sizes,
- count):
+ def create_stream(self, src_if, dst_if, packet_header, packet_sizes, count):
"""Create SRv6 input packet stream for defined interface.
:param VppInterface src_if: Interface to create packet stream for
@@ -1796,19 +1937,19 @@
"""
self.logger.info("Creating packets")
pkts = []
- for i in range(0, count-1):
+ for i in range(0, count - 1):
payload_info = self.create_packet_info(src_if, dst_if)
- self.logger.debug(
- "Creating packet with index %d" % (payload_info.index))
+ self.logger.debug("Creating packet with index %d" % (payload_info.index))
payload = self.info_to_payload(payload_info)
# add L2 header if not yet provided in packet_header
- if packet_header.getlayer(0).name == 'Ethernet':
- p = (packet_header /
- Raw(payload))
+ if packet_header.getlayer(0).name == "Ethernet":
+ p = packet_header / Raw(payload)
else:
- p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
- packet_header /
- Raw(payload))
+ p = (
+ Ether(dst=src_if.local_mac, src=src_if.remote_mac)
+ / packet_header
+ / Raw(payload)
+ )
size = packet_sizes[i % len(packet_sizes)]
self.logger.debug("Packet size %d" % (size))
self.extend_packet(p, size)
@@ -1823,8 +1964,9 @@
self.logger.info("Done creating packets")
return pkts
- def send_and_verify_pkts(self, input, pkts, output, compare_func,
- expected_count=None):
+ def send_and_verify_pkts(
+ self, input, pkts, output, compare_func, expected_count=None
+ ):
"""Send packets and verify received packets using compare_func
:param input: ingress interface of DUT
@@ -1863,8 +2005,7 @@
UDP source port and destination port are 1234
"""
- p = (IPv6(src='1234::1', dst=dst) /
- UDP(sport=1234, dport=1234))
+ p = IPv6(src="1234::1", dst=dst) / UDP(sport=1234, dport=1234)
return p
def create_packet_header_IPv6_SRH(self, sidlist, segleft):
@@ -1878,9 +2019,11 @@
UDP source port and destination port are 1234
"""
- p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
- IPv6ExtHdrSegmentRouting(addresses=sidlist) /
- UDP(sport=1234, dport=1234))
+ p = (
+ IPv6(src="1234::1", dst=sidlist[segleft])
+ / IPv6ExtHdrSegmentRouting(addresses=sidlist)
+ / UDP(sport=1234, dport=1234)
+ )
return p
def create_packet_header_IPv6_SRH_IPv6(self, dst, sidlist, segleft):
@@ -1896,11 +2039,12 @@
UDP source port and destination port are 1234
"""
- p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
- IPv6ExtHdrSegmentRouting(addresses=sidlist,
- segleft=segleft, nh=41) /
- IPv6(src='4321::1', dst=dst) /
- UDP(sport=1234, dport=1234))
+ p = (
+ IPv6(src="1234::1", dst=sidlist[segleft])
+ / IPv6ExtHdrSegmentRouting(addresses=sidlist, segleft=segleft, nh=41)
+ / IPv6(src="4321::1", dst=dst)
+ / UDP(sport=1234, dport=1234)
+ )
return p
def create_packet_header_IPv6_IPv6(self, dst_inner, dst_outer):
@@ -1914,13 +2058,16 @@
UDP source port and destination port are 1234
"""
- p = (IPv6(src='1234::1', dst=dst_outer) /
- IPv6(src='4321::1', dst=dst_inner) /
- UDP(sport=1234, dport=1234))
+ p = (
+ IPv6(src="1234::1", dst=dst_outer)
+ / IPv6(src="4321::1", dst=dst_inner)
+ / UDP(sport=1234, dport=1234)
+ )
return p
- def create_packet_header_IPv6_SRH_SRH_IPv6(self, dst, sidlist1, segleft1,
- sidlist2, segleft2):
+ def create_packet_header_IPv6_SRH_SRH_IPv6(
+ self, dst, sidlist1, segleft1, sidlist2, segleft2
+ ):
"""Create packet header: IPv6 encapsulated in SRv6 with 2 SRH:
IPv6 header with SRH, 2nd SRH, IPv6 header, UDP header
@@ -1935,13 +2082,13 @@
UDP source port and destination port are 1234
"""
- p = (IPv6(src='1234::1', dst=sidlist1[segleft1]) /
- IPv6ExtHdrSegmentRouting(addresses=sidlist1,
- segleft=segleft1, nh=43) /
- IPv6ExtHdrSegmentRouting(addresses=sidlist2,
- segleft=segleft2, nh=41) /
- IPv6(src='4321::1', dst=dst) /
- UDP(sport=1234, dport=1234))
+ p = (
+ IPv6(src="1234::1", dst=sidlist1[segleft1])
+ / IPv6ExtHdrSegmentRouting(addresses=sidlist1, segleft=segleft1, nh=43)
+ / IPv6ExtHdrSegmentRouting(addresses=sidlist2, segleft=segleft2, nh=41)
+ / IPv6(src="4321::1", dst=dst)
+ / UDP(sport=1234, dport=1234)
+ )
return p
def create_packet_header_IPv4(self, dst):
@@ -1953,8 +2100,7 @@
UDP source port and destination port are 1234
"""
- p = (IP(src='123.1.1.1', dst=dst) /
- UDP(sport=1234, dport=1234))
+ p = IP(src="123.1.1.1", dst=dst) / UDP(sport=1234, dport=1234)
return p
def create_packet_header_IPv6_IPv4(self, dst_inner, dst_outer):
@@ -1969,9 +2115,11 @@
UDP source port and destination port are 1234
"""
- p = (IPv6(src='1234::1', dst=dst_outer) /
- IP(src='123.1.1.1', dst=dst_inner) /
- UDP(sport=1234, dport=1234))
+ p = (
+ IPv6(src="1234::1", dst=dst_outer)
+ / IP(src="123.1.1.1", dst=dst_inner)
+ / UDP(sport=1234, dport=1234)
+ )
return p
def create_packet_header_IPv6_SRH_IPv4(self, dst, sidlist, segleft):
@@ -1988,11 +2136,12 @@
UDP source port and destination port are 1234
"""
- p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
- IPv6ExtHdrSegmentRouting(addresses=sidlist,
- segleft=segleft, nh=4) /
- IP(src='123.1.1.1', dst=dst) /
- UDP(sport=1234, dport=1234))
+ p = (
+ IPv6(src="1234::1", dst=sidlist[segleft])
+ / IPv6ExtHdrSegmentRouting(addresses=sidlist, segleft=segleft, nh=4)
+ / IP(src="123.1.1.1", dst=dst)
+ / UDP(sport=1234, dport=1234)
+ )
return p
def create_packet_header_L2(self, vlan=0):
@@ -2003,7 +2152,7 @@
# Note: the dst addr ('00:55:44:33:22:11') is used in
# the compare function compare_rx_tx_packet_T_Encaps_L2
# to detect presence of L2 in SRH payload
- p = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
+ p = Ether(src="00:11:22:33:44:55", dst="00:55:44:33:22:11")
etype = 0x8137 # IPX
if vlan:
# add 802.1q layer
@@ -2023,7 +2172,7 @@
Outer IPv6 destination address is set to sidlist[segleft]
IPv6 source address is 1234::1
"""
- eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
+ eth = Ether(src="00:11:22:33:44:55", dst="00:55:44:33:22:11")
etype = 0x8137 # IPX
if vlan:
# add 802.1q layer
@@ -2031,10 +2180,11 @@
else:
eth.type = etype
- p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
- IPv6ExtHdrSegmentRouting(addresses=sidlist,
- segleft=segleft, nh=143) /
- eth)
+ p = (
+ IPv6(src="1234::1", dst=sidlist[segleft])
+ / IPv6ExtHdrSegmentRouting(addresses=sidlist, segleft=segleft, nh=143)
+ / eth
+ )
return p
def create_packet_header_IPv6_L2(self, dst_outer, vlan=0):
@@ -2044,7 +2194,7 @@
:param ipv6address dst_outer: outer IPv6 destination address
:param vlan: L2 vlan; if vlan!=0 then add 802.1q header
"""
- eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
+ eth = Ether(src="00:11:22:33:44:55", dst="00:55:44:33:22:11")
etype = 0x8137 # IPX
if vlan:
# add 802.1q layer
@@ -2052,12 +2202,11 @@
else:
eth.type = etype
- p = (IPv6(src='1234::1', dst=dst_outer, nh=143) / eth)
+ p = IPv6(src="1234::1", dst=dst_outer, nh=143) / eth
return p
def get_payload_info(self, packet):
- """ Extract the payload_info from the packet
- """
+ """Extract the payload_info from the packet"""
# in most cases, payload_info is in packet[Raw]
# but packet[Raw] gives the complete payload
# (incl L2 header) for the T.Encaps L2 case
@@ -2068,8 +2217,7 @@
# remote L2 header from packet[Raw]:
# take packet[Raw], convert it to an Ether layer
# and then extract Raw from it
- payload_info = self.payload_to_info(
- Ether(scapy.compat.r(packet[Raw]))[Raw])
+ payload_info = self.payload_to_info(Ether(scapy.compat.r(packet[Raw]))[Raw])
return payload_info
@@ -2082,8 +2230,10 @@
:param capture: captured packets
:param compare_func: function to compare in and out packet
"""
- self.logger.info("Verifying capture on interface %s using function %s"
- % (dst_if.name, compare_func.__name__))
+ self.logger.info(
+ "Verifying capture on interface %s using function %s"
+ % (dst_if.name, compare_func.__name__)
+ )
last_info = dict()
for i in self.pg_interfaces:
@@ -2096,19 +2246,19 @@
payload_info = self.get_payload_info(packet)
packet_index = payload_info.index
- self.logger.debug("Verifying packet with index %d"
- % (packet_index))
+ self.logger.debug("Verifying packet with index %d" % (packet_index))
# packet should have arrived on the expected interface
self.assertEqual(payload_info.dst, dst_sw_if_index)
self.logger.debug(
- "Got packet on interface %s: src=%u (idx=%u)" %
- (dst_if.name, payload_info.src, packet_index))
+ "Got packet on interface %s: src=%u (idx=%u)"
+ % (dst_if.name, payload_info.src, packet_index)
+ )
# search for payload_info with same src and dst if_index
# this will give us the transmitted packet
next_info = self.get_next_packet_info_for_interface2(
- payload_info.src, dst_sw_if_index,
- last_info[payload_info.src])
+ payload_info.src, dst_sw_if_index, last_info[payload_info.src]
+ )
last_info[payload_info.src] = next_info
# next_info should not be None
self.assertTrue(next_info is not None)
@@ -2117,8 +2267,9 @@
# data field of next_info contains the tx packet
txed_packet = next_info.data
- self.logger.debug(ppp("Transmitted packet:",
- txed_packet)) # ppp=Pretty Print Packet
+ self.logger.debug(
+ ppp("Transmitted packet:", txed_packet)
+ ) # ppp=Pretty Print Packet
self.logger.debug(ppp("Received packet:", packet))
@@ -2143,5 +2294,5 @@
# "didn't arrive" % (dst_if.name, i.name))
-if __name__ == '__main__':
+if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)