blob: ecdd5e837aa5ea922b28fb1e0dbdd0e7dde26566 [file] [log] [blame]
from asfframework import VppTestRunner
from framework import VppTestCase
import unittest
from config import config
from scapy.layers.l2 import Ether
from scapy.packet import Raw
from scapy.layers.inet import IP, UDP
from random import randint
from util import ppp
@unittest.skipIf("bufmon" in config.excluded_plugins, "Exclude bufmon plugin tests")
class TestBufmon(VppTestCase):
"""bufmon plugin test"""
@classmethod
def setUpClass(cls):
super(TestBufmon, cls).setUpClass()
try:
cls.create_pg_interfaces(range(2))
for i in cls.pg_interfaces:
i.config_ip4()
i.resolve_arp()
i.admin_up()
except Exception:
cls.tearDownClass()
raise
@classmethod
def tearDownClass(cls):
for i in cls.pg_interfaces:
i.unconfig_ip4()
i.admin_down()
super(TestBufmon, cls).tearDownClass()
# https://fd.io/docs/vpp/master/developer/tests/overview.html#example-how-to-add-a-new-test
def create_stream(self, src_if, dst_if, count):
packets = []
for i in range(count):
info = self.create_packet_info(src_if, dst_if)
payload = self.info_to_payload(info)
p = (
Ether(dst=src_if.local_mac, src=src_if.remote_mac)
/ IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
/ UDP(sport=randint(49152, 65535), dport=5678)
/ Raw(payload)
)
info.data = p.copy()
packets.append(p)
return packets
def verify_capture(self, src_if, dst_if, capture):
packet_info = None
for packet in capture:
try:
ip = packet[IP]
udp = packet[UDP]
# convert the payload to packet info object
payload_info = self.payload_to_info(packet[Raw])
# make sure the indexes match
self.assert_equal(
payload_info.src, src_if.sw_if_index, "source sw_if_index"
)
self.assert_equal(
payload_info.dst, dst_if.sw_if_index, "destination sw_if_index"
)
packet_info = self.get_next_packet_info_for_interface2(
src_if.sw_if_index, dst_if.sw_if_index, packet_info
)
# make sure we didn't run out of saved packets
self.assertIsNotNone(packet_info)
self.assert_equal(
payload_info.index, packet_info.index, "packet info index"
)
saved_packet = packet_info.data # fetch the saved packet
# assert the values match
self.assert_equal(ip.src, saved_packet[IP].src, "IP source address")
# ... more assertions here
self.assert_equal(udp.sport, saved_packet[UDP].sport, "UDP source port")
except Exception:
self.logger.error(ppp("Unexpected or invalid packet:", packet))
raise
remaining_packet = self.get_next_packet_info_for_interface2(
src_if.sw_if_index, dst_if.sw_if_index, packet_info
)
self.assertIsNone(
remaining_packet,
"Interface %s: Packet expected from interface "
"%s didn't arrive" % (dst_if.name, src_if.name),
)
def test_bufmon(self):
self.vapi.cli("set buffer traces on")
reply = self.vapi.cli("show buffer traces status")
self.assertIn("buffers tracing is on", reply)
packets = self.create_stream(self.pg0, self.pg1, 5)
self.pg0.add_stream(packets)
self.pg0.enable_capture()
self.pg1.enable_capture()
self.pg_start()
capture = self.pg1.get_capture()
self.pg0.assert_nothing_captured()
self.verify_capture(self.pg0, self.pg1, capture)
expected = [
"pg-input",
"ip4-input",
"ip4-rewrite",
"ip4-lookup",
"ethernet-input",
"pg1-tx",
"pg1-output",
]
reply = self.vapi.cli("show buffer traces verbose")
for entry in expected:
self.assertIn(entry, reply)
# not verbose, skips nodes w/o buffered buffers
reply = self.vapi.cli("show buffer traces")
self.assertNotIn("pg-input", reply)
self.vapi.cli("clear buffer traces")
reply = self.vapi.cli("show buffer traces verbose")
self.assertNotIn("pg-input", reply)
if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)