| #!/usr/bin/env python3 |
| import unittest |
| from ipaddress import ip_address, ip_interface |
| from vpp_qemu_utils import ( |
| create_namespace, |
| delete_namespace, |
| create_host_interface, |
| delete_host_interfaces, |
| set_interface_mtu, |
| disable_interface_gso, |
| add_namespace_route, |
| libmemif_test_app, |
| ) |
| from vpp_iperf import start_iperf, stop_iperf |
| from framework import VppTestCase |
| from asfframework import VppTestRunner, tag_fixme_debian11, is_distro_debian11 |
| from config import config |
| from vpp_papi import VppEnum |
| import time |
| import sys |
| from vm_test_config import test_config |
| |
| # |
| # Tests for: |
| # - tapv2, tunv2, af_packet_v2/v3 & memif interfaces. |
| # - reads test config from the file vm_test_config.py |
| # - Uses iPerf to send TCP/IP streams to VPP |
| # - VPP ingress interface runs the iperf client |
| # - VPP egress interface runs the iperf server |
| # - Runs tests specified in the vm_test_config module and verifies that: |
| # - TCP over IPv4 and IPv6 is enabled correctly for Bridged and Routed topologies. |
| # sending jumbo frames (9000/9001 MTUs) with GSO/GRO is enabled correctly. |
| # sending VPP buffer-sized frames(2048 MTU) with GSO/GRO is enabled correctly. |
| # sending standard frames (1500 MTU) with GSO/GRO is enabled correctly. |
| # sending smaller frames (512 MTU) with GSO/GRO is enabled correctly for IPv4 |
| # sending odd sized frames (9001, 2049 MTU) with GSO/GRO is enabled correctly. |
| # |
| |
| |
| class TestSelector: |
| """Selects specified test(s) from vm_test_config to run |
| |
| The selected_test field specifies a comma separated or range(s) of |
| tests to run (default='' i.e all_tests) e.g. setting the selected_tests |
| attribute to "1,3-4,19-23" runs tests with ID's 1, 3, 4, 19, 20, 21, |
| 22 & 23 from the spec file vm_test_config |
| """ |
| |
| def __init__(self, selected_tests="") -> None: |
| self.selected_tests = selected_tests |
| |
| def filter_tests(self, test): |
| """Works with the filter fn. to include only selected tests.""" |
| |
| if self.selected_tests: |
| selection = self.selected_tests |
| else: |
| selection = test_config["tests_to_run"] |
| |
| if not selection or selection == " ": |
| return True |
| |
| test_ids_to_run = [] |
| for test_id in selection.split(","): |
| if "-" in test_id.strip(): |
| start, end = map(int, test_id.split("-")) |
| test_ids_to_run.extend(list(range(start, end + 1))) |
| elif test_id.strip(): |
| test_ids_to_run.append(int(test_id)) |
| return test["id"] in test_ids_to_run |
| |
| |
| # Test Config variables |
| af_packet_config = test_config["af_packet"] |
| layer2 = test_config["L2"] |
| layer3 = test_config["L3"] |
| |
| |
| def create_test(test_name, test, ip_version, mtu): |
| """Create and return a unittest method for a test.""" |
| |
| @unittest.skipIf( |
| is_distro_debian11, "FIXME intermittent test failures on debian11 distro" |
| ) |
| @unittest.skipIf( |
| config.skip_netns_tests, "netns not available or disabled from cli" |
| ) |
| def test_func(self): |
| self.logger.debug(f"Starting unittest:{test_name}") |
| self.setUpTestToplogy(test=test, ip_version=ip_version) |
| result = self.set_interfaces_mtu( |
| mtu=mtu, |
| ip_version=ip_version, |
| vpp_interfaces=self.vpp_interfaces, |
| linux_interfaces=self.linux_interfaces, |
| ) |
| if "memif" in self.if_types: |
| self.logger.debug("Starting libmemif test_app for memif test") |
| self.memif_process = libmemif_test_app( |
| memif_sock_path=self.get_memif_sock_path(), logger=self.logger |
| ) |
| if result is True: |
| # Start an instance of an iperf server using |
| # a unique port. Save the iperf cmdline for |
| # terminating the iperf_server process after the test. |
| self.iperf_cmd = start_iperf( |
| ip_version=6, |
| client_ns=self.client_namespace, |
| server_ns=self.server_namespace, |
| server_only=True, |
| server_args=f"-p {self.iperf_port}", |
| logger=self.logger, |
| ) |
| # Send traffic between iperf client & server |
| self.assertTrue( |
| start_iperf( |
| ip_version=ip_version, |
| client_ns=self.client_namespace, |
| server_ns=self.server_namespace, |
| server_ipv4_address=self.server_ip4_address, |
| server_ipv6_address=self.server_ip6_address, |
| client_args=f"-p {self.iperf_port}", |
| client_only=True, |
| duration=2, |
| logger=self.logger, |
| ) |
| ) |
| else: |
| return unittest.skip( |
| f"Skipping test:{test_name} as mtu:{mtu} is " |
| f"invalid for TCP/IPv{ip_version}" |
| ) |
| |
| test_func.__name__ = test_name |
| return test_func |
| |
| |
| def generate_vpp_interface_tests(tests, test_class): |
| """Generate unittests for testing vpp interfaces |
| |
| Generates unittests from test spec. and sets them as attributes |
| to the test_class. |
| Args: |
| tests : list of test specs from vm_test_config['tests'] |
| test_class : the name of the test class to which the |
| generated tests are set as attributes. |
| """ |
| |
| for test in tests: |
| for ip_version in test_config["ip_versions"]: |
| for mtu in test_config["mtus"]: |
| test_name = ( |
| f"test_id_{test['id']}_" |
| + f"client_{test['client_if_type']}" |
| + f"_v{test['client_if_version']}_" |
| + f"gso_{test.get('client_if_gso', 0)}_" |
| + f"gro_{test.get('client_if_gro', 0)}_" |
| + f"checksum_{test.get('client_if_checksum_offload', 0)}_" |
| + f"to_server_{test['server_if_type']}" |
| + f"_v{test['server_if_version']}_" |
| + f"gso_{test.get('server_if_gso', 0)}_" |
| + f"gro_{test.get('server_if_gro', 0)}_" |
| + f"checksum_{test.get('server_if_checksum_offload', 0)}_" |
| + f"mtu_{mtu}_mode_{test['x_connect_mode']}_" |
| + f"tcp_ipv{ip_version}" |
| ) |
| test_func = create_test( |
| test_name=test_name, test=test, ip_version=ip_version, mtu=mtu |
| ) |
| setattr(test_class, test_name, test_func) |
| |
| |
| @tag_fixme_debian11 |
| class TestVPPInterfacesQemu: |
| """Test VPP interfaces inside a QEMU VM for IPv4/v6. |
| |
| Test Setup: |
| Linux_ns1--iperfClient--host-int1--vpp-af_packet-int1--VPP-BD |
| --vppaf_packet_int2--host-int2--iperfServer--Linux_ns2 |
| """ |
| |
| def setUpTestToplogy(self, test, ip_version): |
| """Setup the test topology. |
| |
| 1. Create Linux Namespaces for iPerf Client & Server. |
| 2. Create VPP iPerf client and server virtual interfaces. |
| 3. Enable desired vif features such as GSO & GRO. |
| 3. Cross-Connect interfaces in VPP using L2 or L3. |
| """ |
| |
| # Need to support multiple interface types as the memif interface |
| # in VPP is connected to the iPerf client & server by x-connecting |
| # to a tap interface in their respective namespaces. |
| client_if_types = test["client_if_type"].split(",") |
| server_if_types = test["server_if_type"].split(",") |
| client_if_version = test["client_if_version"] |
| server_if_version = test["server_if_version"] |
| x_connect_mode = test["x_connect_mode"] |
| # server ip4/ip6 addresses required by iperf client |
| server_ip4_prefix = ( |
| layer2["server_ip4_prefix"] |
| if x_connect_mode == "L2" |
| else layer3["server_ip4_prefix"] |
| ) |
| server_ip6_prefix = ( |
| layer2["server_ip6_prefix"] |
| if x_connect_mode == "L2" |
| else layer3["server_ip6_prefix"] |
| ) |
| self.server_ip4_address = str(ip_interface(server_ip4_prefix).ip) |
| self.server_ip6_address = str(ip_interface(server_ip6_prefix).ip) |
| # next-hop IP address on VPP for routing from client & server namespaces |
| vpp_client_prefix = ( |
| layer3["vpp_client_ip4_prefix"] |
| if ip_version == 4 |
| else layer3["vpp_client_ip6_prefix"] |
| ) |
| vpp_client_nexthop = str(ip_interface(vpp_client_prefix).ip) |
| vpp_server_prefix = ( |
| layer3["vpp_server_ip4_prefix"] |
| if ip_version == 4 |
| else layer3["vpp_server_ip6_prefix"] |
| ) |
| vpp_server_nexthop = str(ip_interface(vpp_server_prefix).ip) |
| # Create unique namespaces for iperf client & iperf server to |
| # prevent conflicts when TEST_JOBS > 1 |
| self.client_namespace = test_config["client_namespace"] + str(test["id"]) |
| self.server_namespace = test_config["server_namespace"] + str(test["id"]) |
| create_namespace([self.client_namespace, self.server_namespace]) |
| # Set a unique iPerf port for parallel server and client runs |
| self.iperf_port = 5000 + test["id"] |
| # IPerf client & server ingress/egress interface indexes in VPP |
| self.tap_interfaces = [] |
| self.memif_interfaces = [] |
| self.ingress_if_idxes = [] |
| self.egress_if_idxes = [] |
| self.vpp_interfaces = [] |
| self.linux_interfaces = [] |
| enable_client_if_gso = test.get("client_if_gso", 0) |
| enable_server_if_gso = test.get("server_if_gso", 0) |
| enable_client_if_gro = test.get("client_if_gro", 0) |
| enable_server_if_gro = test.get("server_if_gro", 0) |
| enable_client_if_checksum_offload = test.get("client_if_checksum_offload", 0) |
| enable_server_if_checksum_offload = test.get("server_if_checksum_offload", 0) |
| |
| # Create unique host interfaces in Linux and VPP for connecting to iperf |
| # client & iperf server to prevent conflicts when TEST_JOBS > 1 |
| self.iprf_client_host_interface_on_linux = af_packet_config[ |
| "iprf_client_interface_on_linux" |
| ] + str(test["id"]) |
| self.iprf_client_host_interface_on_vpp = af_packet_config[ |
| "iprf_client_interface_on_vpp" |
| ] + str(test["id"]) |
| self.iprf_server_host_interface_on_linux = af_packet_config[ |
| "iprf_server_interface_on_linux" |
| ] + str(test["id"]) |
| self.iprf_server_host_interface_on_vpp = af_packet_config[ |
| "iprf_server_interface_on_vpp" |
| ] + str(test["id"]) |
| # Handle client interface types |
| for client_if_type in client_if_types: |
| if client_if_type == "af_packet": |
| create_host_interface( |
| self.iprf_client_host_interface_on_linux, |
| self.iprf_client_host_interface_on_vpp, |
| self.client_namespace, |
| ( |
| layer2["client_ip4_prefix"] |
| if x_connect_mode == "L2" |
| else layer3["client_ip4_prefix"] |
| ), |
| ( |
| layer2["client_ip6_prefix"] |
| if x_connect_mode == "L2" |
| else layer3["client_ip6_prefix"] |
| ), |
| ) |
| self.ingress_if_idx = self.create_af_packet( |
| version=client_if_version, |
| host_if_name=self.iprf_client_host_interface_on_vpp, |
| enable_gso=enable_client_if_gso, |
| ) |
| self.ingress_if_idxes.append(self.ingress_if_idx) |
| self.vpp_interfaces.append(self.ingress_if_idx) |
| self.linux_interfaces.append( |
| ["", self.iprf_client_host_interface_on_vpp] |
| ) |
| self.linux_interfaces.append( |
| [ |
| self.client_namespace, |
| self.iprf_client_host_interface_on_linux, |
| ] |
| ) |
| if enable_client_if_gso == 0: |
| disable_interface_gso("", self.iprf_client_host_interface_on_vpp) |
| disable_interface_gso( |
| self.client_namespace, |
| self.iprf_client_host_interface_on_linux, |
| ) |
| elif client_if_type == "tap" or client_if_type == "tun": |
| self.ingress_if_idx = self.create_tap_tun( |
| id=101, |
| host_namespace=self.client_namespace, |
| ip_version=ip_version, |
| host_ip4_prefix=( |
| layer2["client_ip4_prefix"] |
| if x_connect_mode == "L2" |
| else layer3["client_ip4_prefix"] |
| ), |
| host_ip6_prefix=( |
| layer2["client_ip6_prefix"] |
| if x_connect_mode == "L2" |
| else layer3["client_ip6_prefix"] |
| ), |
| host_ip4_gw=( |
| vpp_client_nexthop |
| if x_connect_mode == "L3" and ip_version == 4 |
| else None |
| ), |
| host_ip6_gw=( |
| vpp_client_nexthop |
| if x_connect_mode == "L3" and ip_version == 6 |
| else None |
| ), |
| int_type=client_if_type, |
| host_if_name=f"{client_if_type}0", |
| enable_gso=enable_client_if_gso, |
| enable_gro=enable_client_if_gro, |
| enable_checksum_offload=enable_client_if_checksum_offload, |
| ) |
| self.tap_interfaces.append(self.ingress_if_idx) |
| self.ingress_if_idxes.append(self.ingress_if_idx) |
| self.vpp_interfaces.append(self.ingress_if_idx) |
| self.linux_interfaces.append( |
| [self.client_namespace, f"{client_if_type}0"] |
| ) |
| # Seeing TCP timeouts if tx=on & rx=on Linux tap & tun interfaces |
| disable_interface_gso(self.client_namespace, f"{client_if_type}0") |
| elif client_if_type == "memif": |
| self.ingress_if_idx = self.create_memif( |
| memif_id=0, mode=0 if x_connect_mode == "L2" else 1 |
| ) |
| self.memif_interfaces.append(self.ingress_if_idx) |
| self.ingress_if_idxes.append(self.ingress_if_idx) |
| self.vpp_interfaces.append(self.ingress_if_idx) |
| else: |
| print( |
| f"Unsupported client interface type: {client_if_type} " |
| f"for test - ID={test['id']}" |
| ) |
| sys.exit(1) |
| for server_if_type in server_if_types: |
| if server_if_type == "af_packet": |
| create_host_interface( |
| self.iprf_server_host_interface_on_linux, |
| self.iprf_server_host_interface_on_vpp, |
| self.server_namespace, |
| server_ip4_prefix, |
| server_ip6_prefix, |
| ) |
| self.egress_if_idx = self.create_af_packet( |
| version=server_if_version, |
| host_if_name=self.iprf_server_host_interface_on_vpp, |
| enable_gso=enable_server_if_gso, |
| ) |
| self.egress_if_idxes.append(self.egress_if_idx) |
| self.vpp_interfaces.append(self.egress_if_idx) |
| self.linux_interfaces.append( |
| ["", self.iprf_server_host_interface_on_vpp] |
| ) |
| self.linux_interfaces.append( |
| [ |
| self.server_namespace, |
| self.iprf_server_host_interface_on_linux, |
| ] |
| ) |
| if enable_server_if_gso == 0: |
| disable_interface_gso("", self.iprf_server_host_interface_on_vpp) |
| disable_interface_gso( |
| self.server_namespace, |
| self.iprf_server_host_interface_on_linux, |
| ) |
| elif server_if_type == "tap" or server_if_type == "tun": |
| self.egress_if_idx = self.create_tap_tun( |
| id=102, |
| host_namespace=self.server_namespace, |
| ip_version=ip_version, |
| host_ip4_prefix=( |
| layer2["server_ip4_prefix"] |
| if x_connect_mode == "L2" |
| else layer3["server_ip4_prefix"] |
| ), |
| host_ip6_prefix=( |
| layer2["server_ip6_prefix"] |
| if x_connect_mode == "L2" |
| else layer3["server_ip6_prefix"] |
| ), |
| int_type=server_if_type, |
| host_if_name=f"{server_if_type}0", |
| enable_gso=enable_server_if_gso, |
| enable_gro=enable_server_if_gro, |
| enable_checksum_offload=enable_server_if_checksum_offload, |
| ) |
| self.tap_interfaces.append(self.egress_if_idx) |
| self.egress_if_idxes.append(self.egress_if_idx) |
| self.vpp_interfaces.append(self.egress_if_idx) |
| self.linux_interfaces.append( |
| [self.server_namespace, f"{server_if_type}0"] |
| ) |
| # Seeing TCP timeouts if tx=on & rx=on Linux tap & tun interfaces |
| disable_interface_gso(self.server_namespace, f"{server_if_type}0") |
| elif server_if_type == "memif": |
| self.egress_if_idx = self.create_memif( |
| memif_id=1, mode=0 if x_connect_mode == "L2" else 1 |
| ) |
| self.memif_interfaces.append(self.egress_if_idx) |
| self.egress_if_idxes.append(self.egress_if_idx) |
| self.vpp_interfaces.append(self.egress_if_idx) |
| else: |
| print( |
| f"Unsupported server interface type: {server_if_type} " |
| f"for test - ID={test['id']}" |
| ) |
| sys.exit(1) |
| self.if_types = set(client_if_types).union(set(server_if_types)) |
| # for memif testing: tapv2, memif & libmemif_app are connected |
| if "memif" not in self.if_types: |
| if x_connect_mode == "L2": |
| self.l2_connect_interfaces(1, self.ingress_if_idx, self.egress_if_idx) |
| elif x_connect_mode == "L3": |
| # L3 connect client & server side |
| vrf_id = layer3["ip4_vrf"] if ip_version == 4 else layer3["ip6_vrf"] |
| self.l3_connect_interfaces( |
| ip_version, |
| vrf_id, |
| (self.ingress_if_idx, vpp_client_prefix), |
| (self.egress_if_idx, vpp_server_prefix), |
| ) |
| # Setup namespace routing |
| if ip_version == 4: |
| add_namespace_route( |
| self.client_namespace, "0.0.0.0/0", vpp_client_nexthop |
| ) |
| add_namespace_route( |
| self.server_namespace, "0.0.0.0/0", vpp_server_nexthop |
| ) |
| else: |
| add_namespace_route( |
| self.client_namespace, "::/0", vpp_client_nexthop |
| ) |
| add_namespace_route( |
| self.server_namespace, "::/0", vpp_server_nexthop |
| ) |
| else: |
| # connect: ingress tap & memif & egress tap and memif interfaces |
| if x_connect_mode == "L2": |
| self.l2_connect_interfaces(1, *self.ingress_if_idxes) |
| self.l2_connect_interfaces(2, *self.egress_if_idxes) |
| # Wait for Linux IPv6 stack to become ready |
| if ip_version == 6: |
| time.sleep(2) |
| |
| def tearDown(self): |
| try: |
| for interface_if_idx in self.tap_interfaces: |
| self.vapi.tap_delete_v2(sw_if_index=interface_if_idx) |
| except Exception: |
| pass |
| try: |
| for interface_if_idx in self.memif_interfaces: |
| self.vapi.memif_delete(sw_if_index=interface_if_idx) |
| except Exception: |
| pass |
| try: |
| for interface in self.vapi.af_packet_dump(): |
| if interface.host_if_name == self.iprf_client_host_interface_on_vpp: |
| self.vapi.af_packet_delete(self.iprf_client_host_interface_on_vpp) |
| elif interface.host_if_name == self.iprf_server_host_interface_on_vpp: |
| self.vapi.af_packet_delete(self.iprf_server_host_interface_on_vpp) |
| except Exception: |
| pass |
| try: |
| delete_host_interfaces( |
| self.iprf_client_host_interface_on_linux, |
| self.iprf_server_host_interface_on_linux, |
| self.iprf_client_host_interface_on_vpp, |
| self.iprf_server_host_interface_on_vpp, |
| ) |
| except Exception: |
| pass |
| try: |
| self.vapi.ip_table_add_del_v2( |
| is_add=0, table={"table_id": layer3["ip4_vrf"]} |
| ) |
| except Exception: |
| pass |
| try: |
| self.vapi.ip_table_add_del_v2( |
| is_add=0, table={"table_id": layer3["ip6_vrf"]} |
| ) |
| except Exception: |
| pass |
| try: |
| self.vapi.bridge_domain_add_del_v2(bd_id=1, is_add=0) |
| self.vapi.bridge_domain_add_del_v2(bd_id=2, is_add=0) |
| except Exception: |
| pass |
| try: |
| delete_namespace( |
| [ |
| self.client_namespace, |
| self.server_namespace, |
| ] |
| ) |
| except Exception: |
| pass |
| try: |
| if hasattr(self, "iperf_cmd"): |
| stop_iperf(" ".join(self.iperf_cmd)) |
| except Exception: |
| pass |
| try: |
| if self.memif_process: |
| self.memif_process.terminate() |
| self.memif_process.join() |
| except Exception: |
| pass |
| |
| def create_af_packet(self, version, host_if_name, enable_gso=0): |
| """Create an af_packetv3 interface in VPP. |
| |
| Parameters: |
| version -- 2 for af_packet_create_v2 |
| -- 3 for af_packet_create_v3 |
| host_if_name -- host interface name |
| enable_gso -- Enable GSO on the interface when True |
| """ |
| af_packet_mode = VppEnum.vl_api_af_packet_mode_t |
| af_packet_interface_mode = af_packet_mode.AF_PACKET_API_MODE_ETHERNET |
| af_packet_flags = VppEnum.vl_api_af_packet_flags_t |
| af_packet_interface_flags = af_packet_flags.AF_PACKET_API_FLAG_QDISC_BYPASS |
| if enable_gso: |
| af_packet_interface_flags = ( |
| af_packet_interface_flags | af_packet_flags.AF_PACKET_API_FLAG_CKSUM_GSO |
| ) |
| if version == 2: |
| af_packet_interface_flags = ( |
| af_packet_interface_flags | af_packet_flags.AF_PACKET_API_FLAG_VERSION_2 |
| ) |
| api_args = { |
| "use_random_hw_addr": True, |
| "host_if_name": host_if_name, |
| "flags": af_packet_interface_flags, |
| } |
| api_args["mode"] = af_packet_interface_mode |
| result = self.vapi.af_packet_create_v3(**api_args) |
| sw_if_index = result.sw_if_index |
| # Enable software GSO chunking when interface doesn't support GSO offload |
| if enable_gso == 0: |
| self.vapi.feature_gso_enable_disable( |
| sw_if_index=sw_if_index, enable_disable=1 |
| ) |
| else: |
| self.vapi.feature_gso_enable_disable( |
| sw_if_index=sw_if_index, enable_disable=0 |
| ) |
| self.vapi.sw_interface_set_flags(sw_if_index=sw_if_index, flags=1) |
| return sw_if_index |
| |
| def create_tap_tun( |
| self, |
| id, |
| host_namespace, |
| ip_version, |
| host_ip4_prefix=None, |
| host_ip6_prefix=None, |
| host_ip4_gw=None, |
| host_ip6_gw=None, |
| int_type="tap", |
| host_if_name=None, |
| enable_gso=0, |
| enable_gro=0, |
| enable_checksum_offload=0, |
| ): |
| """Create a tapv2 or tunv2 interface in VPP and attach to host. |
| |
| Parameters: |
| id -- interface ID |
| host_namespace -- host namespace to attach the tap/tun interface to |
| ip_version -- 4 or 6 |
| host_ip4_prefix -- ipv4 host interface address in CIDR notation |
| if ip_version=4 |
| host_ip6_prefix -- ipv6 host interface address in CIDR notation |
| if ip_version=6 |
| host_ip4_gw -- host IPv4 default gateway IP Address |
| host_ip6_gw -- host IPv6 default gateway IP address |
| int_type -- "tap" for tapv2 & "tun" for tunv2 interface |
| host_if_name -- host side interface name |
| enable_gso -- enable GSO |
| enable_gro -- enable GSO/GRO-Coalesce |
| enable_checksum_offload -- enable checksum offload without gso |
| """ |
| TapFlags = VppEnum.vl_api_tap_flags_t |
| tap_flags = 0 |
| if int_type == "tun": |
| tap_flags = TapFlags.TAP_API_FLAG_TUN |
| if enable_gro: |
| tap_flags = tap_flags | ( |
| TapFlags.TAP_API_FLAG_GSO | TapFlags.TAP_API_FLAG_GRO_COALESCE |
| ) |
| elif enable_gso: |
| tap_flags = tap_flags | TapFlags.TAP_API_FLAG_GSO |
| elif enable_checksum_offload: |
| tap_flags = tap_flags | TapFlags.TAP_API_FLAG_CSUM_OFFLOAD |
| elif int_type == "tap": |
| if enable_gro: |
| tap_flags = ( |
| TapFlags.TAP_API_FLAG_GSO | TapFlags.TAP_API_FLAG_GRO_COALESCE |
| ) |
| elif enable_gso: |
| tap_flags = TapFlags.TAP_API_FLAG_GSO |
| elif enable_checksum_offload: |
| tap_flags = tap_flags | TapFlags.TAP_API_FLAG_CSUM_OFFLOAD |
| |
| api_args = { |
| "id": id, |
| "host_namespace_set": True, |
| "host_namespace": host_namespace, |
| "host_if_name_set": False, |
| "host_bridge_set": False, |
| "host_mac_addr_set": False, |
| } |
| if tap_flags != 0: |
| api_args["tap_flags"] = tap_flags |
| if ip_version == 4: |
| api_args["host_ip4_prefix"] = ip_interface(host_ip4_prefix) |
| api_args["host_ip4_prefix_set"] = True |
| if host_ip4_gw: |
| api_args["host_ip4_gw"] = ip_address(host_ip4_gw) |
| api_args["host_ip4_gw_set"] = True |
| if ip_version == 6: |
| api_args["host_ip6_prefix"] = ip_interface(host_ip6_prefix) |
| api_args["host_ip6_prefix_set"] = True |
| if host_ip6_gw: |
| api_args["host_ip6_gw"] = ip_address(host_ip6_gw) |
| api_args["host_ip6_gw_set"] = True |
| if host_if_name: |
| api_args["host_if_name"] = host_if_name |
| api_args["host_if_name_set"] = True |
| |
| result = self.vapi.tap_create_v2(**api_args) |
| sw_if_index = result.sw_if_index |
| # Enable software GSO chunking when interface doesn't support GSO offload and |
| # GRO coalesce |
| if enable_gso == 0 and enable_gro == 0: |
| self.vapi.feature_gso_enable_disable( |
| sw_if_index=sw_if_index, enable_disable=1 |
| ) |
| else: |
| self.vapi.feature_gso_enable_disable( |
| sw_if_index=sw_if_index, enable_disable=0 |
| ) |
| # Admin up |
| self.vapi.sw_interface_set_flags(sw_if_index=sw_if_index, flags=1) |
| return sw_if_index |
| |
| def create_memif(self, memif_id, mode): |
| """Create memif interface in VPP. |
| |
| Parameters: |
| memif_id: A unique ID for the memif interface |
| mode: 0 = ethernet, 1 = ip, 2 = punt/inject |
| """ |
| # create memif interface with role=0 (i.e. master) |
| result = self.vapi.memif_create_v2( |
| role=0, mode=mode, id=memif_id, buffer_size=9216 |
| ) |
| sw_if_index = result.sw_if_index |
| self.vapi.sw_interface_set_flags(sw_if_index=sw_if_index, flags=1) |
| return sw_if_index |
| |
| def dump_bridge_domain_details(self, bd_id): |
| return self.vapi.bridge_domain_dump(bd_id=bd_id) |
| |
| def l2_connect_interfaces(self, bridge_id, *sw_if_idxs): |
| for if_idx in sw_if_idxs: |
| self.vapi.sw_interface_set_l2_bridge( |
| rx_sw_if_index=if_idx, bd_id=bridge_id, shg=0, port_type=0, enable=True |
| ) |
| |
| def l3_connect_interfaces(self, ip_version, vrf_id, *if_idx_ip_prefixes): |
| """Setup routing for (if_idx, ip_prefix) inside VPP. |
| |
| arguments: |
| if_idx_ip_prefixes -- sequence of (if_idx, ip_prefix) tuples |
| ip_version -- 4 or 6 |
| vrf_id -- vrf_id |
| """ |
| is_ipv6 = 0 if ip_version == 4 else 1 |
| self.vapi.ip_table_add_del_v2( |
| is_add=1, table={"table_id": vrf_id, "is_ip6": is_ipv6} |
| ) |
| for sw_if_index, ip_prefix in if_idx_ip_prefixes: |
| self.vapi.sw_interface_set_table( |
| sw_if_index=sw_if_index, is_ipv6=is_ipv6, vrf_id=vrf_id |
| ) |
| self.vapi.sw_interface_add_del_address( |
| sw_if_index=sw_if_index, is_add=1, prefix=ip_interface(ip_prefix) |
| ) |
| |
| def set_interfaces_mtu(self, mtu, ip_version, **kwargs): |
| """Set MTUs on VPP and Linux interfaces. |
| |
| arguments -- |
| mtu -- mtu value |
| ip_version - 4 or 6 |
| kwargs['vpp_interfaces'] -- list of vpp interface if indexes |
| kwargs['linux_interfaces'] -- list of tuples (namespace, interface_names) |
| return True if mtu is set, else False |
| """ |
| vpp_interfaces = kwargs.get("vpp_interfaces") |
| linux_interfaces = kwargs.get("linux_interfaces") |
| # IPv6 on Linux requires an MTU value >=1280 |
| if (ip_version == 6 and mtu >= 1280) or ip_version == 4: |
| for sw_if_idx in vpp_interfaces: |
| self.vapi.sw_interface_set_mtu( |
| sw_if_index=sw_if_idx, mtu=[mtu, 0, 0, 0] |
| ) |
| for namespace, interface_name in linux_interfaces: |
| set_interface_mtu( |
| namespace=namespace, |
| interface=interface_name, |
| mtu=mtu, |
| logger=self.logger, |
| ) |
| return True |
| else: |
| return False |
| |
| |
| if __name__ == "__main__": |
| unittest.main(testRunner=VppTestRunner) |