tests: Added NSIM plugin tests

Type: test

Change-Id: Id621a806b853688ced7c6a38e1a9e5f298d2b97e
Signed-off-by: adrianvillin <avillin@cisco.com>
diff --git a/test/test_nsim.py b/test/test_nsim.py
new file mode 100644
index 0000000..16b5fd0
--- /dev/null
+++ b/test/test_nsim.py
@@ -0,0 +1,267 @@
+from framework import VppTestCase
+from asfframework import VppTestRunner
+from config import config
+import unittest
+import re
+
+from scapy.layers.l2 import Ether
+from scapy.layers.inet import IP, UDP
+from scapy.packet import Raw
+from random import randint
+from util import ppp
+
+
+def create_stream(self, src_if, dst_if, count):
+    packets = []
+    for i in range(count):
+        # create packet info stored in the test case instance
+        info = self.create_packet_info(src_if, dst_if)
+        # convert the info into packet payload
+        payload = self.info_to_payload(info)
+        # create the packet itself
+        p = (
+            Ether(dst=src_if.local_mac, src=src_if.remote_mac)
+            / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
+            / UDP(sport=randint(49152, 65535), dport=5678)
+            / Raw(payload)
+        )
+        # store a copy of the packet in the packet info
+        info.data = p.copy()
+        # append the packet to the list
+        packets.append(p)
+
+    # return the created packet list
+    return packets
+
+
+def verify_capture(self, src_if, dst_if, capture, reply):
+    packet_info = None
+    for packet in capture:
+        try:
+            ip = packet[IP]
+            udp = packet[UDP]
+            # convert the payload to packet info object
+            payload_info = self.payload_to_info(packet[Raw])
+            # make sure the indexes match
+            self.assert_equal(
+                payload_info.src, src_if.sw_if_index, "source sw_if_index"
+            )
+            self.assert_equal(
+                payload_info.dst, dst_if.sw_if_index, "destination sw_if_index"
+            )
+            packet_info = self.get_next_packet_info_for_interface2(
+                src_if.sw_if_index, dst_if.sw_if_index, packet_info
+            )
+            # make sure we didn't run out of saved packets
+            self.assertIsNotNone(packet_info)
+            self.assert_equal(
+                payload_info.index, packet_info.index, "packet info index"
+            )
+            saved_packet = packet_info.data  # fetch the saved packet
+            # assert the values match
+            self.assert_equal(ip.src, saved_packet[IP].src, "IP source address")
+            # ... more assertions here
+            self.assert_equal(udp.sport, saved_packet[UDP].sport, "UDP source port")
+        except Exception:
+            self.logger.error(ppp("Unexpected or invalid packet:", packet))
+            raise
+    remaining_packet = self.get_next_packet_info_for_interface2(
+        src_if.sw_if_index, dst_if.sw_if_index, packet_info
+    )
+    self.assertIsNone(
+        remaining_packet,
+        "Interface %s: Packet expected from interface "
+        "%s didn't arrive" % (dst_if.name, src_if.name),
+    )
+
+    # find timestamps and get actual delay
+    pattern = r"\d{2}:\d{2}:\d{2}:\d{6}"
+    timestamps = re.findall(pattern, reply)
+    actual_delay = int(timestamps[2][9:]) - int(timestamps[0][9:])
+    self.assertTrue(
+        actual_delay >= 100000, f"Delay is lower than expected: {actual_delay} < 100000"
+    )
+
+
+@unittest.skipIf("nsim" in config.excluded_plugins, "Exclude NSIM plugin tests")
+class TestNsimCli(VppTestCase):
+    """NSIM plugin tests [CLI]"""
+
+    @classmethod
+    def setUpClass(cls):
+        super(TestNsimCli, cls).setUpClass()
+        try:
+            cls.create_pg_interfaces(range(2))
+            for i in cls.pg_interfaces:
+                i.config_ip4()
+                i.resolve_arp()
+                i.admin_up()
+        except Exception:
+            cls.tearDownClass()
+            raise
+
+    @classmethod
+    def tearDownClass(cls):
+        cls.vapi.cli("nsim cross-connect enable-disable pg0 pg1 disable")
+        cls.vapi.cli("nsim output-feature enable-disable pg0 disable")
+        for i in cls.pg_interfaces:
+            i.unconfig_ip4()
+            i.admin_down()
+        super(TestNsimCli, cls).tearDownClass()
+
+    def test_nsim_delay(self):
+        """Add 100ms delay"""
+        packets = create_stream(self, self.pg0, self.pg1, 5)
+        self.pg0.add_stream(packets)
+        self.pg0.enable_capture()
+        self.pg1.enable_capture()
+
+        self.vapi.cli(
+            "set nsim delay 100.0 ms bandwidth 1 gbit packet-size 128 drop-fraction 0.0"
+        )
+        self.vapi.cli("nsim cross-connect enable-disable pg0 pg1")
+        self.vapi.cli("nsim output-feature enable-disable pg0")
+
+        self.pg_start()
+        capture = self.pg1.get_capture()
+        self.pg0.assert_nothing_captured()
+        reply = self.vapi.cli("show trace")
+        verify_capture(self, self.pg0, self.pg1, capture, reply)
+        self.assertIn("nsim", reply)
+        reply = self.vapi.cli("show nsim")
+        self.assertIn("delay: 100.0 ms", reply)
+
+    def test_nsim_drop(self):
+        """Drop all packets"""
+        packets = create_stream(self, self.pg0, self.pg1, 5)
+        self.pg0.add_stream(packets)
+        self.vapi.cli("clear trace")
+        # test fails if running test-debug and no delay is set ("invalid delay 0.00")
+        self.vapi.cli(
+            "set nsim delay 1 us bandwidth 1 gbit packet-size 128 drop-fraction 1.0 packets-per-drop 0"
+        )
+
+        self.pg_start()
+        self.pg1.assert_nothing_captured()
+        reply = self.vapi.cli("show nsim")
+        self.assertIn("drop fraction: 1.0", reply)
+        reply = self.vapi.cli("show trace")
+        self.assertIn("sw_if_index -1", reply)
+
+
+@unittest.skipIf("nsim" in config.excluded_plugins, "Exclude NSIM plugin tests")
+class TestNsimApi(VppTestCase):
+    """NSIM plugin tests [API]"""
+
+    @classmethod
+    def setUpClass(cls):
+        super(TestNsimApi, cls).setUpClass()
+        try:
+            cls.create_pg_interfaces(range(2))
+            for i in cls.pg_interfaces:
+                i.config_ip4()
+                i.resolve_arp()
+                i.admin_up()
+        except Exception:
+            cls.tearDownClass()
+            raise
+
+    @classmethod
+    def tearDownClass(cls):
+        cls.vapi.nsim_cross_connect_enable_disable(
+            enable_disable=False, sw_if_index0=1, sw_if_index1=2
+        )
+        cls.vapi.nsim_output_feature_enable_disable(enable_disable=False, sw_if_index=1)
+        for i in cls.pg_interfaces:
+            i.unconfig_ip4()
+            i.admin_down()
+        super(TestNsimApi, cls).tearDownClass()
+
+    def test_nsim_delay(self):
+        """Add 100ms delay"""
+        packets = create_stream(self, self.pg0, self.pg1, 5)
+        self.pg0.add_stream(packets)
+        self.pg0.enable_capture()
+        self.pg1.enable_capture()
+
+        # "show nsim" shows 99.9ms if delay is exactly 100000
+        self.vapi.nsim_configure2(
+            delay_in_usec=100001,
+            average_packet_size=128,
+            bandwidth_in_bits_per_second=100000000000,
+            packets_per_drop=0,
+            packets_per_reorder=0,
+        )
+        self.vapi.nsim_cross_connect_enable_disable(
+            enable_disable=True, sw_if_index0=1, sw_if_index1=2
+        )
+        self.vapi.nsim_output_feature_enable_disable(enable_disable=True, sw_if_index=1)
+        self.pg_start()
+        capture = self.pg1.get_capture()
+        reply = self.vapi.cli("show trace")
+        verify_capture(self, self.pg0, self.pg1, capture, reply)
+        self.assertIn("nsim", reply)
+        reply = self.vapi.cli("show nsim")
+        self.assertIn("delay: 100.0 ms", reply)
+
+
+# has to be separated, otherwise we get "VPP API client: read failed"
+# when configuring NSIM (nsim_configure2) and then VPP crashes on teardown
+@unittest.skipIf("nsim" in config.excluded_plugins, "Exclude NSIM plugin tests")
+class TestNsimApi2(VppTestCase):
+    """NSIM plugin tests [API]"""
+
+    @classmethod
+    def setUpClass(cls):
+        super(TestNsimApi2, cls).setUpClass()
+        try:
+            cls.create_pg_interfaces(range(2))
+            for i in cls.pg_interfaces:
+                i.config_ip4()
+                i.resolve_arp()
+                i.admin_up()
+        except Exception:
+            cls.tearDownClass()
+            raise
+
+    @classmethod
+    def tearDownClass(cls):
+        cls.vapi.nsim_cross_connect_enable_disable(
+            enable_disable=False, sw_if_index0=1, sw_if_index1=2
+        )
+        cls.vapi.nsim_output_feature_enable_disable(enable_disable=False, sw_if_index=1)
+        for i in cls.pg_interfaces:
+            i.unconfig_ip4()
+            i.admin_down()
+        super(TestNsimApi2, cls).tearDownClass()
+
+    def test_nsim_drop(self):
+        """Drop all packets"""
+        packets = create_stream(self, self.pg0, self.pg1, 5)
+        self.pg0.add_stream(packets)
+        self.pg0.enable_capture()
+        self.pg1.enable_capture()
+        self.vapi.cli("clear trace")
+
+        self.vapi.nsim_configure2(
+            delay_in_usec=10,
+            average_packet_size=128,
+            bandwidth_in_bits_per_second=100000000,
+            packets_per_drop=1,
+            packets_per_reorder=0,
+        )
+        self.vapi.nsim_cross_connect_enable_disable(
+            enable_disable=True, sw_if_index0=1, sw_if_index1=2
+        )
+        self.vapi.nsim_output_feature_enable_disable(enable_disable=True, sw_if_index=1)
+
+        self.pg_start()
+        self.pg1.assert_nothing_captured()
+        reply = self.vapi.cli("show nsim")
+        self.assertIn("drop fraction: 1.0", reply)
+        reply = self.vapi.cli("show trace")
+        self.assertIn("sw_if_index -1", reply)
+
+
+if __name__ == "__main__":
+    unittest.main(testRunner=VppTestRunner)