| from config import config |
| from asfframework import VppTestRunner |
| import unittest |
| from framework import VppTestCase |
| from scapy.layers.l2 import Ether |
| from scapy.layers.inet import IP, UDP |
| from random import randint |
| import ctypes |
| |
| |
| def create_stream(src_if, dst_if, count): |
| packets = [] |
| for i in range(count): |
| p = ( |
| Ether(dst=src_if.local_mac, src=src_if.remote_mac) |
| / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) |
| / UDP(sport=randint(49152, 65535), dport=5678) |
| ) |
| packets.append(p) |
| |
| return packets |
| |
| |
| @unittest.skipIf( |
| "tracedump" in config.excluded_plugins, "Exclude tracedump plugin tests" |
| ) |
| class TestTracedump(VppTestCase): |
| """Tracedump plugin tests""" |
| |
| @classmethod |
| def setUpClass(cls): |
| super(TestTracedump, cls).setUpClass() |
| cls.create_pg_interfaces(range(2)) |
| for i in cls.pg_interfaces: |
| i.admin_up() |
| i.config_ip4() |
| i.resolve_arp() |
| |
| @classmethod |
| def tearDownClass(cls): |
| for i in cls.pg_interfaces: |
| i.unconfig_ip4() |
| i.admin_down() |
| super(TestTracedump, cls).tearDownClass() |
| |
| def test_tracedump_include(self): |
| """Check API/CLI output + include node""" |
| packets = create_stream(self.pg0, self.pg1, 5) |
| self.pg0.add_stream(packets) |
| |
| self.vapi.trace_clear_cache() |
| self.vapi.trace_clear_capture() |
| # get pg-input node index |
| reply = self.vapi.graph_node_get( |
| cursor=0xFFFFFFFF, |
| index=0xFFFFFFFF, |
| name="pg-input", |
| ) |
| self.assertTrue(reply[1][0].name == "pg-input") |
| pg_input_index = reply[1][0].index |
| self.vapi.trace_set_filters(flag=1, node_index=pg_input_index, count=5) |
| self.vapi.trace_capture_packets( |
| node_index=pg_input_index, |
| max_packets=5, |
| use_filter=True, |
| verbose=True, |
| pre_capture_clear=True, |
| ) |
| |
| reply = self.vapi.cli( |
| "show graph node want_arcs input drop output punt handoff no_free polling interrupt" |
| ) |
| self.assertIn("af-packet-input", reply) |
| |
| self.pg_start() |
| reply = self.vapi.trace_v2_dump( |
| thread_id=0xFFFFFFFF, position=0, clear_cache=False |
| ) |
| self.assertTrue(reply) |
| reply = self.vapi.trace_filter_function_dump() |
| self.assertTrue(reply[1].selected) |
| |
| def test_tracedump_exclude(self): |
| """Exclude node (no trace output)""" |
| self.vapi.trace_clear_cache() |
| self.vapi.trace_clear_capture() |
| |
| packets = create_stream(self.pg0, self.pg1, 5) |
| self.pg0.add_stream(packets) |
| |
| # exclude node |
| reply = self.vapi.graph_node_get( |
| cursor=0xFFFFFFFF, |
| index=0xFFFFFFFF, |
| name="pg-input", |
| ) |
| self.assertTrue(reply[1][0].name == "pg-input") |
| pg_input_index = reply[1][0].index |
| self.vapi.trace_set_filters(flag=2, node_index=pg_input_index, count=5) |
| self.vapi.trace_capture_packets( |
| node_index=pg_input_index, |
| max_packets=5, |
| use_filter=True, |
| verbose=True, |
| pre_capture_clear=True, |
| ) |
| self.pg_start() |
| reply = self.vapi.trace_v2_dump(thread_id=0, position=1, clear_cache=False) |
| self.assertFalse(reply) |
| |
| |
| if __name__ == "__main__": |
| unittest.main(testRunner=VppTestRunner) |