blob: baa93740ac8422c2fe9340c560119dab31f3aefb [file] [log] [blame]
adrianvillin4d05f062023-10-25 10:36:05 +02001from framework import VppTestCase, VppTestRunner
2import unittest
3from config import config
4from scapy.layers.l2 import Ether
5from scapy.packet import Raw
6from scapy.layers.inet import IP, UDP
7from random import randint
8from util import ppp
9
10
11@unittest.skipIf("bufmon" in config.excluded_plugins, "Exclude bufmon plugin tests")
12class TestBufmon(VppTestCase):
13 """bufmon plugin test"""
14
15 @classmethod
16 def setUpClass(cls):
17 super(TestBufmon, cls).setUpClass()
18 try:
19 cls.create_pg_interfaces(range(2))
20 for i in cls.pg_interfaces:
21 i.config_ip4()
22 i.resolve_arp()
23 i.admin_up()
24 except Exception:
25 cls.tearDownClass()
26 raise
27
28 @classmethod
29 def tearDownClass(cls):
30 for i in cls.pg_interfaces:
31 i.unconfig_ip4()
32 i.admin_down()
33 super(TestBufmon, cls).tearDownClass()
34
35 # https://fd.io/docs/vpp/master/developer/tests/overview.html#example-how-to-add-a-new-test
36 def create_stream(self, src_if, dst_if, count):
37 packets = []
38 for i in range(count):
39 info = self.create_packet_info(src_if, dst_if)
40 payload = self.info_to_payload(info)
41 p = (
42 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
43 / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
44 / UDP(sport=randint(1000, 2000), dport=5678)
45 / Raw(payload)
46 )
47 info.data = p.copy()
48 packets.append(p)
49 return packets
50
51 def verify_capture(self, src_if, dst_if, capture):
52 packet_info = None
53 for packet in capture:
54 try:
55 ip = packet[IP]
56 udp = packet[UDP]
57 # convert the payload to packet info object
58 payload_info = self.payload_to_info(packet[Raw])
59 # make sure the indexes match
60 self.assert_equal(
61 payload_info.src, src_if.sw_if_index, "source sw_if_index"
62 )
63 self.assert_equal(
64 payload_info.dst, dst_if.sw_if_index, "destination sw_if_index"
65 )
66 packet_info = self.get_next_packet_info_for_interface2(
67 src_if.sw_if_index, dst_if.sw_if_index, packet_info
68 )
69 # make sure we didn't run out of saved packets
70 self.assertIsNotNone(packet_info)
71 self.assert_equal(
72 payload_info.index, packet_info.index, "packet info index"
73 )
74 saved_packet = packet_info.data # fetch the saved packet
75 # assert the values match
76 self.assert_equal(ip.src, saved_packet[IP].src, "IP source address")
77 # ... more assertions here
78 self.assert_equal(udp.sport, saved_packet[UDP].sport, "UDP source port")
79 except Exception:
80 self.logger.error(ppp("Unexpected or invalid packet:", packet))
81 raise
82 remaining_packet = self.get_next_packet_info_for_interface2(
83 src_if.sw_if_index, dst_if.sw_if_index, packet_info
84 )
85 self.assertIsNone(
86 remaining_packet,
87 "Interface %s: Packet expected from interface "
88 "%s didn't arrive" % (dst_if.name, src_if.name),
89 )
90
91 def test_bufmon(self):
92 self.vapi.cli("set buffer traces on")
93
94 reply = self.vapi.cli("show buffer traces status")
95 self.assertIn("buffers tracing is on", reply)
96
97 packets = self.create_stream(self.pg0, self.pg1, 5)
98 self.pg0.add_stream(packets)
99 self.pg0.enable_capture()
100 self.pg1.enable_capture()
101 self.pg_start()
102
103 capture = self.pg1.get_capture()
104 self.pg0.assert_nothing_captured()
105 self.verify_capture(self.pg0, self.pg1, capture)
106
107 expected = [
108 "pg-input",
109 "ip4-input",
110 "ip4-rewrite",
111 "ip4-lookup",
112 "ethernet-input",
113 "pg1-tx",
114 "pg1-output",
115 ]
116 reply = self.vapi.cli("show buffer traces verbose")
117 for entry in expected:
118 self.assertIn(entry, reply)
119
120 # not verbose, skips nodes w/o buffered buffers
121 reply = self.vapi.cli("show buffer traces")
122 self.assertNotIn("pg-input", reply)
123
124 self.vapi.cli("clear buffer traces")
125 reply = self.vapi.cli("show buffer traces verbose")
126 self.assertNotIn("pg-input", reply)
127
128
129if __name__ == "__main__":
130 unittest.main(testRunner=VppTestRunner)