Renato Botelho do Couto | ead1e53 | 2019-10-31 13:31:07 -0500 | [diff] [blame] | 1 | #!/usr/bin/env python3 |
Filip Varga | eb60124 | 2018-10-31 07:58:05 +0100 | [diff] [blame] | 2 | """ CDP tests """ |
| 3 | |
| 4 | from scapy.packet import Packet |
| 5 | from scapy.all import ShortField, StrField |
| 6 | from scapy.layers.l2 import Dot3, LLC, SNAP |
| 7 | from scapy.contrib.cdp import CDPMsgDeviceID, CDPMsgSoftwareVersion, \ |
| 8 | CDPMsgPlatform, CDPMsgPortID, CDPv2_HDR |
| 9 | |
| 10 | from framework import VppTestCase |
| 11 | from scapy.all import raw |
| 12 | from re import compile |
| 13 | from time import sleep |
| 14 | from util import ppp |
| 15 | import platform |
Ole Troan | bd38f7f | 2019-10-22 08:57:31 +0200 | [diff] [blame] | 16 | import sys |
| 17 | import unittest |
Filip Varga | eb60124 | 2018-10-31 07:58:05 +0100 | [diff] [blame] | 18 | |
| 19 | |
| 20 | """ TestCDP is a subclass of VPPTestCase classes. |
| 21 | |
| 22 | CDP test. |
| 23 | |
| 24 | """ |
| 25 | |
| 26 | |
| 27 | class CustomTLV(Packet): |
| 28 | """ Custom TLV protocol layer for scapy """ |
| 29 | |
| 30 | fields_desc = [ |
| 31 | ShortField("type", 0), |
| 32 | ShortField("length", 4), |
| 33 | StrField("value", "") |
| 34 | |
| 35 | ] |
| 36 | |
| 37 | |
| 38 | class TestCDP(VppTestCase): |
| 39 | """ CDP Test Case """ |
| 40 | |
| 41 | nen_ptr = compile(r"not enabled") |
| 42 | cdp_ptr = compile(r"^([-\.\w]+)\s+([-\.\w]+)\s+([-\.\w]+)\s+([-\.\w]+)$") |
| 43 | err_ptr = compile(r"^([\d]+)\s+([-\w]+)\s+([ -\.\w)(]+)$") |
| 44 | |
| 45 | @property |
| 46 | def device_id(self): |
| 47 | return platform.node() |
| 48 | |
| 49 | @property |
| 50 | def version(self): |
| 51 | return platform.release() |
| 52 | |
| 53 | @property |
| 54 | def port_id(self): |
| 55 | return self.interface.name |
| 56 | |
| 57 | @property |
| 58 | def platform(self): |
| 59 | return platform.system() |
| 60 | |
| 61 | @classmethod |
| 62 | def setUpClass(cls): |
| 63 | super(TestCDP, cls).setUpClass() |
| 64 | try: |
| 65 | cls.create_pg_interfaces(range(1)) |
| 66 | cls.interface = cls.pg_interfaces[0] |
| 67 | |
| 68 | cls.interface.admin_up() |
| 69 | cls.interface.config_ip4() |
| 70 | cls.interface.resolve_arp() |
| 71 | |
| 72 | except Exception: |
| 73 | super(TestCDP, cls).tearDownClass() |
| 74 | raise |
| 75 | |
Paul Vinciguerra | a7427ec | 2019-03-10 10:04:23 -0700 | [diff] [blame] | 76 | @classmethod |
| 77 | def tearDownClass(cls): |
| 78 | super(TestCDP, cls).tearDownClass() |
| 79 | |
Filip Varga | eb60124 | 2018-10-31 07:58:05 +0100 | [diff] [blame] | 80 | def test_enable_cdp(self): |
Paul Vinciguerra | 76ef609 | 2019-03-19 05:44:16 -0700 | [diff] [blame] | 81 | self.logger.info(self.vapi.cdp_enable_disable(enable_disable=1)) |
Filip Varga | eb60124 | 2018-10-31 07:58:05 +0100 | [diff] [blame] | 82 | ret = self.vapi.cli("show cdp") |
| 83 | self.logger.info(ret) |
| 84 | not_enabled = self.nen_ptr.search(ret) |
| 85 | self.assertFalse(not_enabled, "CDP isn't enabled") |
| 86 | |
| 87 | def test_send_cdp_packet(self): |
Paul Vinciguerra | 76ef609 | 2019-03-19 05:44:16 -0700 | [diff] [blame] | 88 | self.logger.info(self.vapi.cdp_enable_disable(enable_disable=1)) |
Filip Varga | eb60124 | 2018-10-31 07:58:05 +0100 | [diff] [blame] | 89 | self.send_packet(self.create_packet()) |
| 90 | |
| 91 | neighbors = list(self.show_cdp()) |
| 92 | self.assertTrue(neighbors, "CDP didn't register neighbor") |
| 93 | |
| 94 | port, system = neighbors[0] |
Lijian.Zhang | ca7f53a | 2018-12-10 02:08:02 -0800 | [diff] [blame] | 95 | length = min(len(system), len(self.device_id)) |
Filip Varga | eb60124 | 2018-10-31 07:58:05 +0100 | [diff] [blame] | 96 | |
| 97 | self.assert_equal(port, self.port_id, "CDP received invalid port id") |
Lijian.Zhang | ca7f53a | 2018-12-10 02:08:02 -0800 | [diff] [blame] | 98 | self.assert_equal(system[:length], self.device_id[:length], |
Filip Varga | eb60124 | 2018-10-31 07:58:05 +0100 | [diff] [blame] | 99 | "CDP received invalid device id") |
| 100 | |
Filip Varga | 3206bb1 | 2018-11-05 09:41:56 +0100 | [diff] [blame] | 101 | def test_cdp_underflow_tlv(self): |
| 102 | self.send_bad_packet(3, ".") |
| 103 | |
| 104 | def test_cdp_overflow_tlv(self): |
| 105 | self.send_bad_packet(8, ".") |
| 106 | |
| 107 | def send_bad_packet(self, l, v): |
Paul Vinciguerra | 76ef609 | 2019-03-19 05:44:16 -0700 | [diff] [blame] | 108 | self.logger.info(self.vapi.cdp_enable_disable(enable_disable=1)) |
Filip Varga | 3206bb1 | 2018-11-05 09:41:56 +0100 | [diff] [blame] | 109 | self.send_packet(self.create_bad_packet(l, v)) |
Filip Varga | eb60124 | 2018-10-31 07:58:05 +0100 | [diff] [blame] | 110 | |
Ole Troan | 148c7b7 | 2020-10-07 18:05:37 +0200 | [diff] [blame] | 111 | err = self.statistics.get_err_counter( |
| 112 | '/err/cdp-input/cdp packets with bad TLVs') |
| 113 | self.assertTrue(err >= 1, "CDP didn't drop bad packet") |
Filip Varga | eb60124 | 2018-10-31 07:58:05 +0100 | [diff] [blame] | 114 | |
| 115 | def send_packet(self, packet): |
| 116 | self.logger.debug(ppp("Sending packet:", packet)) |
| 117 | self.interface.add_stream(packet) |
| 118 | self.pg_start() |
| 119 | |
| 120 | def create_base_packet(self): |
| 121 | packet = (Dot3(src=self.interface.remote_mac, |
| 122 | dst="01:00:0c:cc:cc:cc") / |
| 123 | LLC(dsap=0xaa, ssap=0xaa, ctrl=0x03) / |
| 124 | SNAP()/CDPv2_HDR()) |
| 125 | return packet |
| 126 | |
| 127 | def create_packet(self): |
| 128 | packet = (self.create_base_packet() / |
| 129 | CDPMsgDeviceID(val=self.device_id) / |
| 130 | CDPMsgSoftwareVersion(val=self.version) / |
| 131 | CDPMsgPortID(iface=self.port_id) / |
| 132 | CDPMsgPlatform(val=self.platform)) |
| 133 | return packet |
| 134 | |
| 135 | def create_bad_packet(self, tl=4, tv=""): |
| 136 | packet = (self.create_base_packet() / |
| 137 | CustomTLV(type=1, |
| 138 | length=tl, |
| 139 | value=tv)) |
| 140 | return packet |
| 141 | |
| 142 | def process_cli(self, exp, ptr): |
| 143 | for line in self.vapi.cli(exp).split('\n')[1:]: |
| 144 | m = ptr.match(line.strip()) |
| 145 | if m: |
| 146 | yield m.groups() |
| 147 | |
| 148 | def show_cdp(self): |
| 149 | for pack in self.process_cli("show cdp", self.cdp_ptr): |
| 150 | try: |
| 151 | port, system, _, _ = pack |
| 152 | except ValueError: |
| 153 | pass |
| 154 | else: |
| 155 | yield port, system |