blob: 6787ca1e3d79aa6fc1b60f2afc61f731920a9b4d [file] [log] [blame]
#!/usr/bin/env python3
import unittest
from ipaddress import ip_interface
from vpp_qemu_utils import create_namespace
from vpp_iperf import VppIperf
from framework import VppTestCase, VppTestRunner
from config import config
class TestTapQemu(VppTestCase):
"""Test Tap interfaces inside a QEMU VM.
Start an iPerf connection stream between QEMU and VPP via
tap v2 interfaces.
Linux_ns1 -- iperf_client -- tap1 -- VPP-BD -- tap2 --
-- iperfServer -- Linux_ns2
"""
@classmethod
def setUpClass(cls):
super(TestTapQemu, cls).setUpClass()
@classmethod
def tearDownClass(cls):
super(TestTapQemu, cls).tearDownClass()
def setUp(self):
"""Perform test setup before running QEMU tests.
1. Create a namespace for the iPerf Server & Client.
2. Create 2 tap interfaces in VPP & add them to each namespace.
3. Add the tap interfaces to a bridge-domain.
"""
super(TestTapQemu, self).setUp()
self.client_namespace = "iprf_client_ns"
self.server_namespace = "iprf_server_ns"
self.client_ip4_prefix = "10.0.0.101/24"
self.server_ip4_prefix = "10.0.0.102/24"
create_namespace(self.client_namespace)
create_namespace(self.server_namespace)
tap1_if_idx = self.create_tap(
101, self.client_namespace, self.client_ip4_prefix
)
tap2_if_idx = self.create_tap(
102, self.server_namespace, self.server_ip4_prefix
)
self.l2_connect_interfaces(tap1_if_idx, tap2_if_idx)
def create_tap(self, id, host_namespace, host_ip4_prefix):
result = self.vapi.api(
self.vapi.papi.tap_create_v2,
{
"id": id,
"use_random_mac": True,
"host_namespace_set": True,
"host_namespace": host_namespace,
"host_if_name_set": False,
"host_bridge_set": False,
"host_mac_addr_set": False,
"host_ip4_prefix": ip_interface(host_ip4_prefix),
"host_ip4_prefix_set": True,
},
)
sw_if_index = result.sw_if_index
self.vapi.api(
self.vapi.papi.sw_interface_set_flags,
{"sw_if_index": sw_if_index, "flags": 1},
)
return sw_if_index
def dump_vpp_tap_interfaces(self):
return self.vapi.api(self.vapi.papi.sw_interface_tap_v2_dump, {})
def dump_bridge_domain_details(self):
return self.vapi.api(self.vapi.papi.bridge_domain_dump, {"bd_id": 1})
def l2_connect_interfaces(self, *sw_if_idxs):
for if_idx in sw_if_idxs:
self.vapi.api(
self.vapi.papi.sw_interface_set_l2_bridge,
{
"rx_sw_if_index": if_idx,
"bd_id": 1,
"shg": 0,
"port_type": 0,
"enable": True,
},
)
@unittest.skipUnless(config.extended, "part of extended tests")
def test_tap_iperf(self):
"""Start an iperf connection stream between QEMU & VPP via tap."""
iperf = VppIperf()
iperf.client_ns = self.client_namespace
iperf.server_ns = self.server_namespace
iperf.server_ip = str(ip_interface(self.server_ip4_prefix).ip)
iperf.start()
if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)