blob: 6787ca1e3d79aa6fc1b60f2afc61f731920a9b4d [file] [log] [blame]
Naveen Joy7ea7ab52021-05-11 10:31:18 -07001#!/usr/bin/env python3
2import unittest
3from ipaddress import ip_interface
4from vpp_qemu_utils import create_namespace
5from vpp_iperf import VppIperf
6from framework import VppTestCase, VppTestRunner
7from config import config
8
9
10class TestTapQemu(VppTestCase):
11 """Test Tap interfaces inside a QEMU VM.
12
13 Start an iPerf connection stream between QEMU and VPP via
14 tap v2 interfaces.
15
16 Linux_ns1 -- iperf_client -- tap1 -- VPP-BD -- tap2 --
17 -- iperfServer -- Linux_ns2
18 """
19
20 @classmethod
21 def setUpClass(cls):
22 super(TestTapQemu, cls).setUpClass()
23
24 @classmethod
25 def tearDownClass(cls):
26 super(TestTapQemu, cls).tearDownClass()
27
28 def setUp(self):
29 """Perform test setup before running QEMU tests.
30
31 1. Create a namespace for the iPerf Server & Client.
32 2. Create 2 tap interfaces in VPP & add them to each namespace.
33 3. Add the tap interfaces to a bridge-domain.
34 """
35 super(TestTapQemu, self).setUp()
36 self.client_namespace = "iprf_client_ns"
37 self.server_namespace = "iprf_server_ns"
38 self.client_ip4_prefix = "10.0.0.101/24"
39 self.server_ip4_prefix = "10.0.0.102/24"
40 create_namespace(self.client_namespace)
41 create_namespace(self.server_namespace)
42 tap1_if_idx = self.create_tap(
43 101, self.client_namespace, self.client_ip4_prefix
44 )
45 tap2_if_idx = self.create_tap(
46 102, self.server_namespace, self.server_ip4_prefix
47 )
48 self.l2_connect_interfaces(tap1_if_idx, tap2_if_idx)
49
50 def create_tap(self, id, host_namespace, host_ip4_prefix):
51 result = self.vapi.api(
52 self.vapi.papi.tap_create_v2,
53 {
54 "id": id,
55 "use_random_mac": True,
56 "host_namespace_set": True,
57 "host_namespace": host_namespace,
58 "host_if_name_set": False,
59 "host_bridge_set": False,
60 "host_mac_addr_set": False,
61 "host_ip4_prefix": ip_interface(host_ip4_prefix),
62 "host_ip4_prefix_set": True,
63 },
64 )
65 sw_if_index = result.sw_if_index
66 self.vapi.api(
67 self.vapi.papi.sw_interface_set_flags,
68 {"sw_if_index": sw_if_index, "flags": 1},
69 )
70 return sw_if_index
71
72 def dump_vpp_tap_interfaces(self):
73 return self.vapi.api(self.vapi.papi.sw_interface_tap_v2_dump, {})
74
75 def dump_bridge_domain_details(self):
76 return self.vapi.api(self.vapi.papi.bridge_domain_dump, {"bd_id": 1})
77
78 def l2_connect_interfaces(self, *sw_if_idxs):
79 for if_idx in sw_if_idxs:
80 self.vapi.api(
81 self.vapi.papi.sw_interface_set_l2_bridge,
82 {
83 "rx_sw_if_index": if_idx,
84 "bd_id": 1,
85 "shg": 0,
86 "port_type": 0,
87 "enable": True,
88 },
89 )
90
91 @unittest.skipUnless(config.extended, "part of extended tests")
92 def test_tap_iperf(self):
93 """Start an iperf connection stream between QEMU & VPP via tap."""
94 iperf = VppIperf()
95 iperf.client_ns = self.client_namespace
96 iperf.server_ns = self.server_namespace
97 iperf.server_ip = str(ip_interface(self.server_ip4_prefix).ip)
98 iperf.start()
99
100
101if __name__ == "__main__":
102 unittest.main(testRunner=VppTestRunner)