| #!/usr/bin/env python |
| |
| # Utility functions for QEMU tests ## |
| |
| import subprocess |
| import sys |
| import os |
| import time |
| import random |
| import string |
| from multiprocessing import Lock, Process |
| |
| lock = Lock() |
| |
| |
| def can_create_namespaces(namespace="vpp_chk_4212"): |
| """Check if the environment allows creating the namespaces""" |
| with lock: |
| try: |
| result = subprocess.run( |
| ["ip", "netns", "add", namespace], capture_output=True |
| ) |
| if result.returncode != 0: |
| return False |
| subprocess.run(["ip", "netns", "del", namespace], capture_output=True) |
| return True |
| except Exception: |
| return False |
| |
| |
| def create_namespace(history_file, ns=None): |
| """Create one or more namespaces.""" |
| |
| with lock: |
| namespaces = [] |
| retries = 5 |
| |
| if ns is None: |
| result = None |
| |
| for retry in range(retries): |
| suffix = "".join( |
| random.choices(string.ascii_lowercase + string.digits, k=8) |
| ) |
| new_namespace_name = f"vpp_ns{suffix}" |
| # Check if the namespace already exists |
| result = subprocess.run( |
| ["ip", "netns", "add", new_namespace_name], |
| capture_output=True, |
| text=True, |
| ) |
| if result.returncode == 0: |
| with open(history_file, "a") as ns_file: |
| ns_file.write(f"{new_namespace_name}\n") |
| return new_namespace_name |
| error_message = result.stderr if result else "Unknown error" |
| raise Exception( |
| f"Failed to generate a unique namespace name after {retries} attempts." |
| f"Error from last attempt: {error_message}" |
| ) |
| elif isinstance(ns, str): |
| namespaces = [ns] |
| else: |
| namespaces = ns |
| |
| for namespace in namespaces: |
| for attempt in range(retries): |
| result = subprocess.run( |
| ["ip", "netns", "add", namespace], |
| capture_output=True, |
| text=True, |
| ) |
| |
| if result.returncode == 0: |
| with open(history_file, "a") as ns_file: |
| ns_file.write(f"{namespace}\n") |
| break |
| if attempt >= retries - 1: |
| raise Exception( |
| f"Failed to create namespace {namespace} after {retries} attempts. Error: {result.stderr.decode()}" |
| ) |
| return ns |
| |
| |
| def add_namespace_route(ns, prefix, gw_ip): |
| """Add a route to a namespace. |
| arguments: |
| ns -- namespace string value |
| prefix -- NETWORK/MASK or "default" |
| gw_ip -- Gateway IP |
| """ |
| with lock: |
| try: |
| subprocess.run( |
| ["ip", "netns", "exec", ns, "ip", "route", "add", prefix, "via", gw_ip], |
| capture_output=True, |
| ) |
| except subprocess.CalledProcessError as e: |
| raise Exception("Error adding route to namespace:", e.output) |
| |
| |
| def delete_all_host_interfaces(history_file): |
| """Delete all host interfaces whose names have been added to the history file.""" |
| |
| with lock: |
| if os.path.exists(history_file): |
| with open(history_file, "r") as if_file: |
| for line in if_file: |
| if_name = line.strip() |
| if if_name: |
| _delete_host_interfaces(if_name) |
| os.remove(history_file) |
| |
| |
| def _delete_host_interfaces(*host_interface_names): |
| """Delete host interfaces. |
| arguments: |
| host_interface_names - sequence of host interface names to be deleted |
| """ |
| for host_interface_name in host_interface_names: |
| retries = 3 |
| for attempt in range(retries): |
| check_result = subprocess.run( |
| ["ip", "link", "show", host_interface_name], |
| capture_output=True, |
| text=True, |
| ) |
| if check_result.returncode != 0: |
| break |
| |
| result = subprocess.run( |
| ["ip", "link", "del", host_interface_name], |
| capture_output=True, |
| text=True, |
| ) |
| |
| if result.returncode == 0: |
| break |
| if attempt < retries - 1: |
| time.sleep(1) |
| else: |
| raise Exception( |
| f"Failed to delete host interface {host_interface_name} after {retries} attempts" |
| ) |
| |
| |
| def create_host_interface( |
| history_file, host_namespace, *host_ip_prefixes, vpp_if_name=None, host_if_name=None |
| ): |
| """Create a host interface of type veth. |
| arguments: |
| host_namespace -- host namespace into which the host_interface needs to be set |
| host_ip_prefixes -- a sequence of ip/prefix-lengths to be set |
| on the host_interface |
| vpp_if_name -- name of the veth interface on the VPP side |
| host_if_name -- name of the veth interface on the host side |
| """ |
| with lock: |
| retries = 5 |
| |
| for attempt in range(retries): |
| if_name = ( |
| host_if_name |
| or f"hostif{''.join(random.choices(string.ascii_lowercase + string.digits, k=8))}" |
| ) |
| new_vpp_if_name = ( |
| vpp_if_name |
| or f"vppout{''.join(random.choices(string.ascii_lowercase + string.digits, k=8))}" |
| ) |
| |
| result = subprocess.run( |
| [ |
| "ip", |
| "link", |
| "add", |
| "name", |
| new_vpp_if_name, |
| "type", |
| "veth", |
| "peer", |
| "name", |
| if_name, |
| ], |
| capture_output=True, |
| ) |
| if result.returncode == 0: |
| host_if_name = if_name |
| vpp_if_name = new_vpp_if_name |
| with open(history_file, "a") as if_file: |
| if_file.write(f"{host_if_name}\n{vpp_if_name}\n") |
| break |
| if attempt >= retries - 1: |
| raise Exception( |
| f"Failed to create host interface {if_name} and vpp {new_vpp_if_name} after {retries} attempts. Error: {result.stderr.decode()}" |
| ) |
| |
| result = subprocess.run( |
| ["ip", "link", "set", host_if_name, "netns", host_namespace], |
| capture_output=True, |
| ) |
| if result.returncode != 0: |
| raise Exception( |
| f"Error setting host interface namespace: {result.stderr.decode()}" |
| ) |
| |
| result = subprocess.run( |
| ["ip", "link", "set", "dev", vpp_if_name, "up"], capture_output=True |
| ) |
| if result.returncode != 0: |
| raise Exception( |
| f"Error bringing up the host interface: {result.stderr.decode()}" |
| ) |
| |
| result = subprocess.run( |
| [ |
| "ip", |
| "netns", |
| "exec", |
| host_namespace, |
| "ip", |
| "link", |
| "set", |
| "dev", |
| host_if_name, |
| "up", |
| ], |
| capture_output=True, |
| ) |
| if result.returncode != 0: |
| raise Exception( |
| f"Error bringing up the host interface in namespace: {result.stderr.decode()}" |
| ) |
| |
| for host_ip_prefix in host_ip_prefixes: |
| result = subprocess.run( |
| [ |
| "ip", |
| "netns", |
| "exec", |
| host_namespace, |
| "ip", |
| "addr", |
| "add", |
| host_ip_prefix, |
| "dev", |
| host_if_name, |
| ], |
| capture_output=True, |
| ) |
| if result.returncode != 0: |
| raise Exception( |
| f"Error setting ip prefix on the host interface: {result.stderr.decode()}" |
| ) |
| |
| return host_if_name, vpp_if_name |
| |
| |
| def set_interface_mtu(namespace, interface, mtu, logger): |
| """Set an MTU number on a linux device interface.""" |
| args = ["ip", "link", "set", "mtu", str(mtu), "dev", interface] |
| if namespace: |
| args = ["ip", "netns", "exec", namespace] + args |
| with lock: |
| retries = 3 |
| for attempt in range(retries): |
| result = subprocess.run(args, capture_output=True) |
| if result.returncode == 0: |
| break |
| if attempt < retries - 1: |
| time.sleep(1) |
| else: |
| raise Exception( |
| f"Failed to set MTU on interface {interface} in namespace {namespace} after {retries} attempts" |
| ) |
| |
| |
| def enable_interface_gso(namespace, interface): |
| """Enable GSO offload on a linux device interface.""" |
| args = ["ethtool", "-K", interface, "rx", "on", "tx", "on"] |
| if namespace: |
| args = ["ip", "netns", "exec", namespace] + args |
| with lock: |
| result = subprocess.run(args, capture_output=True) |
| if result.returncode != 0: |
| raise Exception( |
| f"Error enabling GSO offload on interface {interface} in namespace {namespace}: {result.stderr.decode()}" |
| ) |
| |
| |
| def disable_interface_gso(namespace, interface): |
| """Disable GSO offload on a linux device interface.""" |
| args = ["ethtool", "-K", interface, "rx", "off", "tx", "off"] |
| if namespace: |
| args = ["ip", "netns", "exec", namespace] + args |
| with lock: |
| result = subprocess.run(args, capture_output=True) |
| if result.returncode != 0: |
| raise Exception( |
| f"Error disabling GSO offload on interface {interface} in namespace {namespace}: {result.stderr.decode()}" |
| ) |
| |
| |
| def delete_all_namespaces(history_file): |
| """Delete all namespaces whose names have been added to the history file.""" |
| with lock: |
| if os.path.exists(history_file): |
| with open(history_file, "r") as ns_file: |
| for line in ns_file: |
| ns_name = line.strip() |
| if ns_name: |
| _delete_namespace(ns_name) |
| os.remove(history_file) |
| |
| |
| def _delete_namespace(ns): |
| """Delete one or more namespaces. |
| |
| arguments: |
| ns -- a list of namespace names or namespace |
| """ |
| if isinstance(ns, str): |
| namespaces = [ns] |
| else: |
| namespaces = ns |
| |
| existing_namespaces = subprocess.run( |
| ["ip", "netns", "list"], capture_output=True, text=True |
| ).stdout.splitlines() |
| existing_namespaces = {line.split()[0] for line in existing_namespaces} |
| |
| for namespace in namespaces: |
| if namespace not in existing_namespaces: |
| continue |
| |
| retries = 3 |
| for attempt in range(retries): |
| result = subprocess.run( |
| ["ip", "netns", "del", namespace], capture_output=True |
| ) |
| if result.returncode == 0: |
| break |
| if attempt < retries - 1: |
| time.sleep(1) |
| else: |
| raise Exception( |
| f"Failed to delete namespace {namespace} after {retries} attempts" |
| ) |
| |
| |
| def list_namespace(ns): |
| """List the IP address of a namespace.""" |
| with lock: |
| result = subprocess.run( |
| ["ip", "netns", "exec", ns, "ip", "addr"], capture_output=True |
| ) |
| if result.returncode != 0: |
| raise Exception( |
| f"Error listing IP addresses in namespace {ns}: {result.stderr.decode()}" |
| ) |
| |
| |
| def libmemif_test_app(memif_sock_path, logger): |
| """Build & run the libmemif test_app for memif interface testing.""" |
| test_dir = os.path.dirname(os.path.realpath(__file__)) |
| ws_root = os.path.dirname(test_dir) |
| libmemif_app = os.path.join( |
| ws_root, "extras", "libmemif", "build", "examples", "test_app" |
| ) |
| |
| def build_libmemif_app(): |
| if not os.path.exists(libmemif_app): |
| logger.info(f"Building app:{libmemif_app} for memif interface testing") |
| libmemif_app_dir = os.path.join(ws_root, "extras", "libmemif", "build") |
| os.makedirs(libmemif_app_dir, exist_ok=True) |
| os.chdir(libmemif_app_dir) |
| subprocess.run(["cmake", ".."], check=True) |
| subprocess.run(["make"], check=True) |
| |
| def start_libmemif_app(): |
| """Restart once if the initial run fails.""" |
| max_tries = 2 |
| run = 0 |
| while run < max_tries: |
| result = subprocess.run( |
| [libmemif_app, "-b", "9216", "-s", memif_sock_path], capture_output=True |
| ) |
| if result.returncode == 0: |
| break |
| logger.error( |
| f"Restarting libmemif app due to error: {result.stderr.decode()}" |
| ) |
| run += 1 |
| time.sleep(1) |
| |
| build_libmemif_app() |
| process = Process(target=start_libmemif_app) |
| process.start() |
| return process |