blob: ecdd5e837aa5ea922b28fb1e0dbdd0e7dde26566 [file] [log] [blame]
Dave Wallace8800f732023-08-31 00:47:44 -04001from asfframework import VppTestRunner
2from framework import VppTestCase
adrianvillin4d05f062023-10-25 10:36:05 +02003import unittest
4from config import config
5from scapy.layers.l2 import Ether
6from scapy.packet import Raw
7from scapy.layers.inet import IP, UDP
8from random import randint
9from util import ppp
10
11
12@unittest.skipIf("bufmon" in config.excluded_plugins, "Exclude bufmon plugin tests")
13class TestBufmon(VppTestCase):
14 """bufmon plugin test"""
15
16 @classmethod
17 def setUpClass(cls):
18 super(TestBufmon, cls).setUpClass()
19 try:
20 cls.create_pg_interfaces(range(2))
21 for i in cls.pg_interfaces:
22 i.config_ip4()
23 i.resolve_arp()
24 i.admin_up()
25 except Exception:
26 cls.tearDownClass()
27 raise
28
29 @classmethod
30 def tearDownClass(cls):
31 for i in cls.pg_interfaces:
32 i.unconfig_ip4()
33 i.admin_down()
34 super(TestBufmon, cls).tearDownClass()
35
36 # https://fd.io/docs/vpp/master/developer/tests/overview.html#example-how-to-add-a-new-test
37 def create_stream(self, src_if, dst_if, count):
38 packets = []
39 for i in range(count):
40 info = self.create_packet_info(src_if, dst_if)
41 payload = self.info_to_payload(info)
42 p = (
43 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
44 / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
Dave Wallace6e66ea72023-10-31 23:20:47 -040045 / UDP(sport=randint(49152, 65535), dport=5678)
adrianvillin4d05f062023-10-25 10:36:05 +020046 / Raw(payload)
47 )
48 info.data = p.copy()
49 packets.append(p)
50 return packets
51
52 def verify_capture(self, src_if, dst_if, capture):
53 packet_info = None
54 for packet in capture:
55 try:
56 ip = packet[IP]
57 udp = packet[UDP]
58 # convert the payload to packet info object
59 payload_info = self.payload_to_info(packet[Raw])
60 # make sure the indexes match
61 self.assert_equal(
62 payload_info.src, src_if.sw_if_index, "source sw_if_index"
63 )
64 self.assert_equal(
65 payload_info.dst, dst_if.sw_if_index, "destination sw_if_index"
66 )
67 packet_info = self.get_next_packet_info_for_interface2(
68 src_if.sw_if_index, dst_if.sw_if_index, packet_info
69 )
70 # make sure we didn't run out of saved packets
71 self.assertIsNotNone(packet_info)
72 self.assert_equal(
73 payload_info.index, packet_info.index, "packet info index"
74 )
75 saved_packet = packet_info.data # fetch the saved packet
76 # assert the values match
77 self.assert_equal(ip.src, saved_packet[IP].src, "IP source address")
78 # ... more assertions here
79 self.assert_equal(udp.sport, saved_packet[UDP].sport, "UDP source port")
80 except Exception:
81 self.logger.error(ppp("Unexpected or invalid packet:", packet))
82 raise
83 remaining_packet = self.get_next_packet_info_for_interface2(
84 src_if.sw_if_index, dst_if.sw_if_index, packet_info
85 )
86 self.assertIsNone(
87 remaining_packet,
88 "Interface %s: Packet expected from interface "
89 "%s didn't arrive" % (dst_if.name, src_if.name),
90 )
91
92 def test_bufmon(self):
93 self.vapi.cli("set buffer traces on")
94
95 reply = self.vapi.cli("show buffer traces status")
96 self.assertIn("buffers tracing is on", reply)
97
98 packets = self.create_stream(self.pg0, self.pg1, 5)
99 self.pg0.add_stream(packets)
100 self.pg0.enable_capture()
101 self.pg1.enable_capture()
102 self.pg_start()
103
104 capture = self.pg1.get_capture()
105 self.pg0.assert_nothing_captured()
106 self.verify_capture(self.pg0, self.pg1, capture)
107
108 expected = [
109 "pg-input",
110 "ip4-input",
111 "ip4-rewrite",
112 "ip4-lookup",
113 "ethernet-input",
114 "pg1-tx",
115 "pg1-output",
116 ]
117 reply = self.vapi.cli("show buffer traces verbose")
118 for entry in expected:
119 self.assertIn(entry, reply)
120
121 # not verbose, skips nodes w/o buffered buffers
122 reply = self.vapi.cli("show buffer traces")
123 self.assertNotIn("pg-input", reply)
124
125 self.vapi.cli("clear buffer traces")
126 reply = self.vapi.cli("show buffer traces verbose")
127 self.assertNotIn("pg-input", reply)
128
129
130if __name__ == "__main__":
131 unittest.main(testRunner=VppTestRunner)