blob: 45af470421ebffc199a050eecd1f90eba86f0af4 [file] [log] [blame]
#!/usr/bin/env python
import unittest
import socket
from logging import *
from framework import VppTestCase, VppTestRunner
from vpp_sub_interface import VppSubInterface, VppDot1QSubint, VppDot1ADSubint
from scapy.packet import Raw
from scapy.layers.l2 import Ether, Dot1Q, ARP
from scapy.layers.inet import IP, UDP
from scapy.layers.inet6 import ICMPv6ND_NS, IPv6, UDP
from scapy.contrib.mpls import MPLS
class TestMPLS(VppTestCase):
""" MPLS Test Case """
@classmethod
def setUpClass(cls):
super(TestMPLS, cls).setUpClass()
def setUp(self):
super(TestMPLS, self).setUp()
# create 2 pg interfaces
self.create_pg_interfaces(range(3))
# setup both interfaces
# assign them different tables.
table_id = 0
for i in self.pg_interfaces:
i.admin_up()
i.set_table_ip4(table_id)
i.set_table_ip6(table_id)
i.config_ip4()
i.config_ip6()
i.enable_mpls()
i.resolve_arp()
i.resolve_ndp()
table_id += 1
def tearDown(self):
super(TestMPLS, self).tearDown()
def create_stream_ip4(self, src_if, mpls_label, mpls_ttl):
pkts = []
for i in range(0, 257):
info = self.create_packet_info(src_if.sw_if_index,
src_if.sw_if_index)
payload = self.info_to_payload(info)
p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
MPLS(label=mpls_label, ttl=mpls_ttl) /
IP(src=src_if.remote_ip4, dst=src_if.remote_ip4) /
UDP(sport=1234, dport=1234) /
Raw(payload))
info.data = p.copy()
pkts.append(p)
return pkts
def create_stream_ip6(self, src_if, mpls_label, mpls_ttl):
pkts = []
for i in range(0, 257):
info = self.create_packet_info(src_if.sw_if_index,
src_if.sw_if_index)
payload = self.info_to_payload(info)
p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
MPLS(label=mpls_label, ttl=mpls_ttl) /
IPv6(src=src_if.remote_ip6, dst=src_if.remote_ip6) /
UDP(sport=1234, dport=1234) /
Raw(payload))
info.data = p.copy()
pkts.append(p)
return pkts
def verify_capture_ip4(self, src_if, capture, sent):
try:
self.assertEqual(len(capture), len(sent))
for i in range(len(capture)):
tx = sent[i]
rx = capture[i]
# the rx'd packet has the MPLS label popped
eth = rx[Ether];
self.assertEqual(eth.type, 0x800);
tx_ip = tx[IP]
rx_ip = rx[IP]
self.assertEqual(rx_ip.src, tx_ip.src)
self.assertEqual(rx_ip.dst, tx_ip.dst)
# IP processing post pop has decremented the TTL
self.assertEqual(rx_ip.ttl+1, tx_ip.ttl)
except:
raise;
def verify_capture_ip6(self, src_if, capture, sent):
try:
self.assertEqual(len(capture), len(sent))
for i in range(len(capture)):
tx = sent[i]
rx = capture[i]
# the rx'd packet has the MPLS label popped
eth = rx[Ether];
self.assertEqual(eth.type, 0x86DD);
tx_ip = tx[IPv6]
rx_ip = rx[IPv6]
self.assertEqual(rx_ip.src, tx_ip.src)
self.assertEqual(rx_ip.dst, tx_ip.dst)
# IP processing post pop has decremented the TTL
self.assertEqual(rx_ip.hlim + 1, tx_ip.hlim)
except:
raise;
def test_v4_exp_null(self):
""" MPLS V4 Explicit NULL test """
#
# The first test case has an MPLS TTL of 0
# all packet should be dropped
#
tx = self.create_stream_ip4(self.pg0, 0, 0)
self.pg0.add_stream(tx)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
rx = self.pg0.get_capture()
try:
self.assertEqual(0, len(rx));
except:
error("MPLS TTL=0 packets forwarded")
error(packet.show())
raise
#
# a stream with a non-zero MPLS TTL
# PG0 is in the default table
#
self.vapi.cli("clear trace")
tx = self.create_stream_ip4(self.pg0, 0, 2)
self.pg0.add_stream(tx)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
rx = self.pg0.get_capture()
self.verify_capture_ip4(self.pg0, rx, tx)
#
# a stream with a non-zero MPLS TTL
# PG1 is in table 1
# we are ensuring the post-pop lookup occurs in the VRF table
#
self.vapi.cli("clear trace")
tx = self.create_stream_ip4(self.pg1, 0, 2)
self.pg1.add_stream(tx)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
rx = self.pg1.get_capture()
self.verify_capture_ip4(self.pg0, rx, tx)
def test_v6_exp_null(self):
""" MPLS V6 Explicit NULL test """
#
# a stream with a non-zero MPLS TTL
# PG0 is in the default table
#
self.vapi.cli("clear trace")
tx = self.create_stream_ip6(self.pg0, 2, 2)
self.pg0.add_stream(tx)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
rx = self.pg0.get_capture()
self.verify_capture_ip6(self.pg0, rx, tx)
#
# a stream with a non-zero MPLS TTL
# PG1 is in table 1
# we are ensuring the post-pop lookup occurs in the VRF table
#
self.vapi.cli("clear trace")
tx = self.create_stream_ip6(self.pg1, 2, 2)
self.pg1.add_stream(tx)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
rx = self.pg1.get_capture()
self.verify_capture_ip6(self.pg0, rx, tx)
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)