| #!/usr/bin/env python |
| |
| import unittest |
| import socket |
| from logging import * |
| |
| from framework import VppTestCase, VppTestRunner |
| from vpp_sub_interface import VppSubInterface, VppDot1QSubint, VppDot1ADSubint |
| |
| from scapy.packet import Raw |
| from scapy.layers.l2 import Ether, Dot1Q, ARP |
| from scapy.layers.inet import IP, UDP |
| from scapy.layers.inet6 import ICMPv6ND_NS, IPv6, UDP |
| from scapy.contrib.mpls import MPLS |
| |
| class TestMPLS(VppTestCase): |
| """ MPLS Test Case """ |
| |
| @classmethod |
| def setUpClass(cls): |
| super(TestMPLS, cls).setUpClass() |
| |
| def setUp(self): |
| super(TestMPLS, self).setUp() |
| |
| # create 2 pg interfaces |
| self.create_pg_interfaces(range(3)) |
| |
| # setup both interfaces |
| # assign them different tables. |
| table_id = 0 |
| |
| for i in self.pg_interfaces: |
| i.admin_up() |
| i.set_table_ip4(table_id) |
| i.set_table_ip6(table_id) |
| i.config_ip4() |
| i.config_ip6() |
| i.enable_mpls() |
| i.resolve_arp() |
| i.resolve_ndp() |
| table_id += 1 |
| |
| def tearDown(self): |
| super(TestMPLS, self).tearDown() |
| |
| def create_stream_ip4(self, src_if, mpls_label, mpls_ttl): |
| pkts = [] |
| for i in range(0, 257): |
| info = self.create_packet_info(src_if.sw_if_index, |
| src_if.sw_if_index) |
| payload = self.info_to_payload(info) |
| p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) / |
| MPLS(label=mpls_label, ttl=mpls_ttl) / |
| IP(src=src_if.remote_ip4, dst=src_if.remote_ip4) / |
| UDP(sport=1234, dport=1234) / |
| Raw(payload)) |
| info.data = p.copy() |
| pkts.append(p) |
| return pkts |
| |
| def create_stream_ip6(self, src_if, mpls_label, mpls_ttl): |
| pkts = [] |
| for i in range(0, 257): |
| info = self.create_packet_info(src_if.sw_if_index, |
| src_if.sw_if_index) |
| payload = self.info_to_payload(info) |
| p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) / |
| MPLS(label=mpls_label, ttl=mpls_ttl) / |
| IPv6(src=src_if.remote_ip6, dst=src_if.remote_ip6) / |
| UDP(sport=1234, dport=1234) / |
| Raw(payload)) |
| info.data = p.copy() |
| pkts.append(p) |
| return pkts |
| |
| def verify_capture_ip4(self, src_if, capture, sent): |
| try: |
| self.assertEqual(len(capture), len(sent)) |
| |
| for i in range(len(capture)): |
| tx = sent[i] |
| rx = capture[i] |
| |
| # the rx'd packet has the MPLS label popped |
| eth = rx[Ether]; |
| self.assertEqual(eth.type, 0x800); |
| |
| tx_ip = tx[IP] |
| rx_ip = rx[IP] |
| |
| self.assertEqual(rx_ip.src, tx_ip.src) |
| self.assertEqual(rx_ip.dst, tx_ip.dst) |
| # IP processing post pop has decremented the TTL |
| self.assertEqual(rx_ip.ttl+1, tx_ip.ttl) |
| |
| except: |
| raise; |
| |
| def verify_capture_ip6(self, src_if, capture, sent): |
| try: |
| self.assertEqual(len(capture), len(sent)) |
| |
| for i in range(len(capture)): |
| tx = sent[i] |
| rx = capture[i] |
| |
| # the rx'd packet has the MPLS label popped |
| eth = rx[Ether]; |
| self.assertEqual(eth.type, 0x86DD); |
| |
| tx_ip = tx[IPv6] |
| rx_ip = rx[IPv6] |
| |
| self.assertEqual(rx_ip.src, tx_ip.src) |
| self.assertEqual(rx_ip.dst, tx_ip.dst) |
| # IP processing post pop has decremented the TTL |
| self.assertEqual(rx_ip.hlim + 1, tx_ip.hlim) |
| |
| except: |
| raise; |
| |
| |
| def test_v4_exp_null(self): |
| """ MPLS V4 Explicit NULL test """ |
| |
| # |
| # The first test case has an MPLS TTL of 0 |
| # all packet should be dropped |
| # |
| tx = self.create_stream_ip4(self.pg0, 0, 0) |
| self.pg0.add_stream(tx) |
| |
| self.pg_enable_capture(self.pg_interfaces) |
| self.pg_start() |
| |
| rx = self.pg0.get_capture() |
| |
| try: |
| self.assertEqual(0, len(rx)); |
| except: |
| error("MPLS TTL=0 packets forwarded") |
| error(packet.show()) |
| raise |
| |
| # |
| # a stream with a non-zero MPLS TTL |
| # PG0 is in the default table |
| # |
| self.vapi.cli("clear trace") |
| tx = self.create_stream_ip4(self.pg0, 0, 2) |
| self.pg0.add_stream(tx) |
| |
| self.pg_enable_capture(self.pg_interfaces) |
| self.pg_start() |
| |
| rx = self.pg0.get_capture() |
| self.verify_capture_ip4(self.pg0, rx, tx) |
| |
| # |
| # a stream with a non-zero MPLS TTL |
| # PG1 is in table 1 |
| # we are ensuring the post-pop lookup occurs in the VRF table |
| # |
| self.vapi.cli("clear trace") |
| tx = self.create_stream_ip4(self.pg1, 0, 2) |
| self.pg1.add_stream(tx) |
| |
| self.pg_enable_capture(self.pg_interfaces) |
| self.pg_start() |
| |
| rx = self.pg1.get_capture() |
| self.verify_capture_ip4(self.pg0, rx, tx) |
| |
| def test_v6_exp_null(self): |
| """ MPLS V6 Explicit NULL test """ |
| |
| # |
| # a stream with a non-zero MPLS TTL |
| # PG0 is in the default table |
| # |
| self.vapi.cli("clear trace") |
| tx = self.create_stream_ip6(self.pg0, 2, 2) |
| self.pg0.add_stream(tx) |
| |
| self.pg_enable_capture(self.pg_interfaces) |
| self.pg_start() |
| |
| rx = self.pg0.get_capture() |
| self.verify_capture_ip6(self.pg0, rx, tx) |
| |
| # |
| # a stream with a non-zero MPLS TTL |
| # PG1 is in table 1 |
| # we are ensuring the post-pop lookup occurs in the VRF table |
| # |
| self.vapi.cli("clear trace") |
| tx = self.create_stream_ip6(self.pg1, 2, 2) |
| self.pg1.add_stream(tx) |
| |
| self.pg_enable_capture(self.pg_interfaces) |
| self.pg_start() |
| |
| rx = self.pg1.get_capture() |
| self.verify_capture_ip6(self.pg0, rx, tx) |
| |
| |
| if __name__ == '__main__': |
| unittest.main(testRunner=VppTestRunner) |