| #!/usr/bin/env python3 |
| """ CDP tests """ |
| |
| from scapy.packet import Packet |
| from scapy.all import ShortField, StrField |
| from scapy.layers.l2 import Dot3, LLC, SNAP |
| from scapy.contrib.cdp import ( |
| CDPMsgDeviceID, |
| CDPMsgSoftwareVersion, |
| CDPMsgPlatform, |
| CDPMsgPortID, |
| CDPv2_HDR, |
| ) |
| |
| from framework import VppTestCase |
| from scapy.all import raw |
| from re import compile |
| from time import sleep |
| from util import ppp |
| from config import config |
| import platform |
| import sys |
| import unittest |
| |
| |
| """ TestCDP is a subclass of VPPTestCase classes. |
| |
| CDP test. |
| |
| """ |
| |
| |
| class CustomTLV(Packet): |
| """Custom TLV protocol layer for scapy""" |
| |
| fields_desc = [ |
| ShortField("type", 0), |
| ShortField("length", 4), |
| StrField("value", ""), |
| ] |
| |
| |
| @unittest.skipIf("cdp" in config.excluded_plugins, "Exclude CDP plugin tests") |
| class TestCDP(VppTestCase): |
| """CDP Test Case""" |
| |
| nen_ptr = compile(r"not enabled") |
| cdp_ptr = compile(r"^([-\.\w]+)\s+([-\.\w]+)\s+([-\.\w]+)\s+([-\.\w]+)$") |
| err_ptr = compile(r"^([\d]+)\s+([-\w]+)\s+([ -\.\w)(]+)$") |
| |
| @property |
| def device_id(self): |
| return platform.node() |
| |
| @property |
| def version(self): |
| return platform.release() |
| |
| @property |
| def port_id(self): |
| return self.interface.name |
| |
| @property |
| def platform(self): |
| return platform.system() |
| |
| @classmethod |
| def setUpClass(cls): |
| super(TestCDP, cls).setUpClass() |
| try: |
| cls.create_pg_interfaces(range(1)) |
| cls.interface = cls.pg_interfaces[0] |
| |
| cls.interface.admin_up() |
| cls.interface.config_ip4() |
| cls.interface.resolve_arp() |
| |
| except Exception: |
| super(TestCDP, cls).tearDownClass() |
| raise |
| |
| @classmethod |
| def tearDownClass(cls): |
| super(TestCDP, cls).tearDownClass() |
| |
| def test_enable_cdp(self): |
| self.logger.info(self.vapi.cdp_enable_disable(enable_disable=1)) |
| ret = self.vapi.cli("show cdp") |
| self.logger.info(ret) |
| not_enabled = self.nen_ptr.search(ret) |
| self.assertFalse(not_enabled, "CDP isn't enabled") |
| |
| def test_send_cdp_packet(self): |
| self.logger.info(self.vapi.cdp_enable_disable(enable_disable=1)) |
| self.send_packet(self.create_packet()) |
| |
| neighbors = list(self.show_cdp()) |
| self.assertTrue(neighbors, "CDP didn't register neighbor") |
| |
| port, system = neighbors[0] |
| length = min(len(system), len(self.device_id)) |
| |
| self.assert_equal(port, self.port_id, "CDP received invalid port id") |
| self.assert_equal( |
| system[:length], self.device_id[:length], "CDP received invalid device id" |
| ) |
| |
| def test_cdp_underflow_tlv(self): |
| self.send_bad_packet(3, ".") |
| |
| def test_cdp_overflow_tlv(self): |
| self.send_bad_packet(8, ".") |
| |
| def send_bad_packet(self, l, v): |
| self.logger.info(self.vapi.cdp_enable_disable(enable_disable=1)) |
| self.send_packet(self.create_bad_packet(l, v)) |
| |
| err = self.statistics.get_err_counter( |
| "/err/cdp-input/cdp packets with bad TLVs" |
| ) |
| self.assertTrue(err >= 1, "CDP didn't drop bad packet") |
| |
| def send_packet(self, packet): |
| self.logger.debug(ppp("Sending packet:", packet)) |
| self.interface.add_stream(packet) |
| self.pg_start() |
| |
| def create_base_packet(self): |
| packet = ( |
| Dot3(src=self.interface.remote_mac, dst="01:00:0c:cc:cc:cc") |
| / LLC(dsap=0xAA, ssap=0xAA, ctrl=0x03) |
| / SNAP() |
| / CDPv2_HDR() |
| ) |
| return packet |
| |
| def create_packet(self): |
| packet = ( |
| self.create_base_packet() |
| / CDPMsgDeviceID(val=self.device_id) |
| / CDPMsgSoftwareVersion(val=self.version) |
| / CDPMsgPortID(iface=self.port_id) |
| / CDPMsgPlatform(val=self.platform) |
| ) |
| return packet |
| |
| def create_bad_packet(self, tl=4, tv=""): |
| packet = self.create_base_packet() / CustomTLV(type=1, length=tl, value=tv) |
| return packet |
| |
| def process_cli(self, exp, ptr): |
| for line in self.vapi.cli(exp).split("\n")[1:]: |
| m = ptr.match(line.strip()) |
| if m: |
| yield m.groups() |
| |
| def show_cdp(self): |
| for pack in self.process_cli("show cdp", self.cdp_ptr): |
| try: |
| port, system, _, _ = pack |
| except ValueError: |
| pass |
| else: |
| yield port, system |