Filip Varga | eb60124 | 2018-10-31 07:58:05 +0100 | [diff] [blame] | 1 | #!/usr/bin/env python |
| 2 | """ CDP tests """ |
| 3 | |
| 4 | from scapy.packet import Packet |
| 5 | from scapy.all import ShortField, StrField |
| 6 | from scapy.layers.l2 import Dot3, LLC, SNAP |
| 7 | from scapy.contrib.cdp import CDPMsgDeviceID, CDPMsgSoftwareVersion, \ |
| 8 | CDPMsgPlatform, CDPMsgPortID, CDPv2_HDR |
| 9 | |
| 10 | from framework import VppTestCase |
| 11 | from scapy.all import raw |
| 12 | from re import compile |
| 13 | from time import sleep |
| 14 | from util import ppp |
| 15 | import platform |
| 16 | |
| 17 | |
| 18 | """ TestCDP is a subclass of VPPTestCase classes. |
| 19 | |
| 20 | CDP test. |
| 21 | |
| 22 | """ |
| 23 | |
| 24 | |
| 25 | class CustomTLV(Packet): |
| 26 | """ Custom TLV protocol layer for scapy """ |
| 27 | |
| 28 | fields_desc = [ |
| 29 | ShortField("type", 0), |
| 30 | ShortField("length", 4), |
| 31 | StrField("value", "") |
| 32 | |
| 33 | ] |
| 34 | |
| 35 | |
| 36 | class TestCDP(VppTestCase): |
| 37 | """ CDP Test Case """ |
| 38 | |
| 39 | nen_ptr = compile(r"not enabled") |
| 40 | cdp_ptr = compile(r"^([-\.\w]+)\s+([-\.\w]+)\s+([-\.\w]+)\s+([-\.\w]+)$") |
| 41 | err_ptr = compile(r"^([\d]+)\s+([-\w]+)\s+([ -\.\w)(]+)$") |
| 42 | |
| 43 | @property |
| 44 | def device_id(self): |
| 45 | return platform.node() |
| 46 | |
| 47 | @property |
| 48 | def version(self): |
| 49 | return platform.release() |
| 50 | |
| 51 | @property |
| 52 | def port_id(self): |
| 53 | return self.interface.name |
| 54 | |
| 55 | @property |
| 56 | def platform(self): |
| 57 | return platform.system() |
| 58 | |
| 59 | @classmethod |
| 60 | def setUpClass(cls): |
| 61 | super(TestCDP, cls).setUpClass() |
| 62 | try: |
| 63 | cls.create_pg_interfaces(range(1)) |
| 64 | cls.interface = cls.pg_interfaces[0] |
| 65 | |
| 66 | cls.interface.admin_up() |
| 67 | cls.interface.config_ip4() |
| 68 | cls.interface.resolve_arp() |
| 69 | |
| 70 | except Exception: |
| 71 | super(TestCDP, cls).tearDownClass() |
| 72 | raise |
| 73 | |
| 74 | def test_enable_cdp(self): |
| 75 | self.logger.info(self.vapi.cli("cdp enable")) |
| 76 | ret = self.vapi.cli("show cdp") |
| 77 | self.logger.info(ret) |
| 78 | not_enabled = self.nen_ptr.search(ret) |
| 79 | self.assertFalse(not_enabled, "CDP isn't enabled") |
| 80 | |
| 81 | def test_send_cdp_packet(self): |
| 82 | self.logger.info(self.vapi.cli("cdp enable")) |
| 83 | self.send_packet(self.create_packet()) |
| 84 | |
| 85 | neighbors = list(self.show_cdp()) |
| 86 | self.assertTrue(neighbors, "CDP didn't register neighbor") |
| 87 | |
| 88 | port, system = neighbors[0] |
Lijian.Zhang | ca7f53a | 2018-12-10 02:08:02 -0800 | [diff] [blame] | 89 | length = min(len(system), len(self.device_id)) |
Filip Varga | eb60124 | 2018-10-31 07:58:05 +0100 | [diff] [blame] | 90 | |
| 91 | self.assert_equal(port, self.port_id, "CDP received invalid port id") |
Lijian.Zhang | ca7f53a | 2018-12-10 02:08:02 -0800 | [diff] [blame] | 92 | self.assert_equal(system[:length], self.device_id[:length], |
Filip Varga | eb60124 | 2018-10-31 07:58:05 +0100 | [diff] [blame] | 93 | "CDP received invalid device id") |
| 94 | |
Filip Varga | 3206bb1 | 2018-11-05 09:41:56 +0100 | [diff] [blame] | 95 | def test_cdp_underflow_tlv(self): |
| 96 | self.send_bad_packet(3, ".") |
| 97 | |
| 98 | def test_cdp_overflow_tlv(self): |
| 99 | self.send_bad_packet(8, ".") |
| 100 | |
| 101 | def send_bad_packet(self, l, v): |
Filip Varga | eb60124 | 2018-10-31 07:58:05 +0100 | [diff] [blame] | 102 | self.logger.info(self.vapi.cli("cdp enable")) |
Filip Varga | 3206bb1 | 2018-11-05 09:41:56 +0100 | [diff] [blame] | 103 | self.send_packet(self.create_bad_packet(l, v)) |
Filip Varga | eb60124 | 2018-10-31 07:58:05 +0100 | [diff] [blame] | 104 | |
| 105 | errors = list(self.show_errors()) |
| 106 | self.assertTrue(errors) |
| 107 | |
| 108 | expected_errors = False |
| 109 | for count, node, reason in errors: |
| 110 | if (node == u'cdp-input' and |
| 111 | reason == u'cdp packets with bad TLVs' and |
Filip Varga | 3206bb1 | 2018-11-05 09:41:56 +0100 | [diff] [blame] | 112 | int(count) >= 1): |
Filip Varga | eb60124 | 2018-10-31 07:58:05 +0100 | [diff] [blame] | 113 | |
| 114 | expected_errors = True |
| 115 | break |
| 116 | self.assertTrue(expected_errors, "CDP didn't drop bad packet") |
| 117 | |
| 118 | def send_packet(self, packet): |
| 119 | self.logger.debug(ppp("Sending packet:", packet)) |
| 120 | self.interface.add_stream(packet) |
| 121 | self.pg_start() |
| 122 | |
| 123 | def create_base_packet(self): |
| 124 | packet = (Dot3(src=self.interface.remote_mac, |
| 125 | dst="01:00:0c:cc:cc:cc") / |
| 126 | LLC(dsap=0xaa, ssap=0xaa, ctrl=0x03) / |
| 127 | SNAP()/CDPv2_HDR()) |
| 128 | return packet |
| 129 | |
| 130 | def create_packet(self): |
| 131 | packet = (self.create_base_packet() / |
| 132 | CDPMsgDeviceID(val=self.device_id) / |
| 133 | CDPMsgSoftwareVersion(val=self.version) / |
| 134 | CDPMsgPortID(iface=self.port_id) / |
| 135 | CDPMsgPlatform(val=self.platform)) |
| 136 | return packet |
| 137 | |
| 138 | def create_bad_packet(self, tl=4, tv=""): |
| 139 | packet = (self.create_base_packet() / |
| 140 | CustomTLV(type=1, |
| 141 | length=tl, |
| 142 | value=tv)) |
| 143 | return packet |
| 144 | |
| 145 | def process_cli(self, exp, ptr): |
| 146 | for line in self.vapi.cli(exp).split('\n')[1:]: |
| 147 | m = ptr.match(line.strip()) |
| 148 | if m: |
| 149 | yield m.groups() |
| 150 | |
| 151 | def show_cdp(self): |
| 152 | for pack in self.process_cli("show cdp", self.cdp_ptr): |
| 153 | try: |
| 154 | port, system, _, _ = pack |
| 155 | except ValueError: |
| 156 | pass |
| 157 | else: |
| 158 | yield port, system |
| 159 | |
| 160 | def show_errors(self): |
| 161 | for pack in self.process_cli("show errors", self.err_ptr): |
| 162 | try: |
| 163 | count, node, reason = pack |
| 164 | except ValueError: |
| 165 | pass |
| 166 | else: |
| 167 | yield count, node, reason |