| #!/usr/bin/env python |
| ## @file test_l2xc.py |
| # Module to provide L2 cross-connect test case. |
| # |
| # The module provides a set of tools for L2 cross-connect tests. |
| |
| import logging |
| logging.getLogger("scapy.runtime").setLevel(logging.ERROR) |
| |
| import unittest |
| import random |
| from framework import VppTestCase, VppTestRunner |
| from scapy.layers.l2 import Ether, Raw |
| from scapy.layers.inet import IP, UDP |
| |
| |
| ## Subclass of the VppTestCase class. |
| # |
| # This subclass is a class for L2 cross-connect test cases. It provides methods |
| # to create interfaces, configuring L2 cross-connects, creating and verifying |
| # packet streams. |
| class TestL2xc(VppTestCase): |
| """ L2XC Test Case """ |
| |
| # Test variables |
| interf_nr = 4 # Number of interfaces |
| hosts_nr = 10 # Number of hosts |
| pkts_per_burst = 257 # Number of packets per burst |
| |
| ## Class method to start the test case. |
| # Overrides setUpClass method in VppTestCase class. |
| # There is used try..except statement to ensure that the tear down of |
| # the class will be executed even if any exception is raised. |
| # @param cls The class pointer. |
| @classmethod |
| def setUpClass(cls): |
| super(TestL2xc, cls).setUpClass() |
| |
| try: |
| ## Create interfaces |
| cls.interfaces = range(TestL2xc.interf_nr) |
| cls.create_interfaces(cls.interfaces) |
| |
| ## Create bi-directional cross-connects between pg0 and pg1 |
| cls.api("sw_interface_set_l2_xconnect rx pg0 tx pg1 enable") |
| cls.api("sw_interface_set_l2_xconnect rx pg1 tx pg0 enable") |
| |
| ## Create bi-directional cross-connects between pg2 and pg3 |
| cls.api("sw_interface_set_l2_xconnect rx pg2 tx pg3 enable") |
| cls.api("sw_interface_set_l2_xconnect rx pg3 tx pg2 enable") |
| |
| cls.cli(0, "show l2patch") |
| |
| ## Create host MAC and IPv4 lists |
| cls.create_host_lists(TestL2xc.hosts_nr) |
| |
| except Exception as e: |
| cls.tearDownClass() |
| raise e |
| |
| ## Method to define tear down VPP actions of the test case. |
| # Overrides tearDown method in VppTestCase class. |
| # @param self The object pointer. |
| def tearDown(self): |
| self.cli(2, "show int") |
| self.cli(2, "show trace") |
| self.cli(2, "show hardware") |
| self.cli(2, "show l2patch") |
| self.cli(2, "show error") |
| self.cli(2, "show run") |
| |
| ## Class method to create required number of MAC and IPv4 addresses. |
| # Create required number of host MAC addresses and distribute them among |
| # interfaces. Create host IPv4 address for every host MAC address too. |
| # @param cls The class pointer. |
| # @param count Integer variable to store the number of MAC addresses to be |
| # created. |
| @classmethod |
| def create_host_lists(cls, count): |
| for i in cls.interfaces: |
| cls.MY_MACS[i] = [] |
| cls.MY_IP4S[i] = [] |
| for j in range(0, count): |
| cls.MY_MACS[i].append("00:00:00:ff:%02x:%02x" % (i, j)) |
| cls.MY_IP4S[i].append("172.17.1%02x.%u" % (i, j)) |
| ## @var MY_MACS |
| # Dictionary variable to store list of MAC addresses per interface. |
| ## @var MY_IP4S |
| # Dictionary variable to store list of IPv4 addresses per interface. |
| |
| ## Method to create packet stream for the packet generator interface. |
| # Create input packet stream for the given packet generator interface with |
| # packets of different length targeted for all other created packet |
| # generator interfaces. |
| # @param self The object pointer. |
| # @param pg_id Integer variable to store the index of the interface to |
| # create the input packet stream. |
| # @return pkts List variable to store created input stream of packets. |
| def create_stream(self, pg_id): |
| # TODO: use variables to create lists based on interface number |
| pg_targets = [None] * 4 |
| pg_targets[0] = [1] |
| pg_targets[1] = [0] |
| pg_targets[2] = [3] |
| pg_targets[3] = [2] |
| pkts = [] |
| for i in range(0, TestL2xc.pkts_per_burst): |
| target_pg_id = pg_targets[pg_id][0] |
| target_host_id = random.randrange(len(self.MY_MACS[target_pg_id])) |
| source_host_id = random.randrange(len(self.MY_MACS[pg_id])) |
| pkt_info = self.create_packet_info(pg_id, target_pg_id) |
| payload = self.info_to_payload(pkt_info) |
| p = (Ether(dst=self.MY_MACS[target_pg_id][target_host_id], |
| src=self.MY_MACS[pg_id][source_host_id]) / |
| IP(src=self.MY_IP4S[pg_id][source_host_id], |
| dst=self.MY_IP4S[target_pg_id][target_host_id]) / |
| UDP(sport=1234, dport=1234) / |
| Raw(payload)) |
| pkt_info.data = p.copy() |
| packet_sizes = [64, 512, 1518, 9018] |
| size = packet_sizes[(i / 2) % len(packet_sizes)] |
| self.extend_packet(p, size) |
| pkts.append(p) |
| return pkts |
| ## @var pg_targets |
| # List variable to store list of indexes of target packet generator |
| # interfaces for every source packet generator interface. |
| ## @var target_pg_id |
| # Integer variable to store the index of the random target packet |
| # generator interfaces. |
| ## @var target_host_id |
| # Integer variable to store the index of the randomly chosen |
| # destination host MAC/IPv4 address. |
| ## @var source_host_id |
| # Integer variable to store the index of the randomly chosen source |
| # host MAC/IPv4 address. |
| ## @var pkt_info |
| # Object variable to store the information about the generated packet. |
| ## @var payload |
| # String variable to store the payload of the packet to be generated. |
| ## @var p |
| # Object variable to store the generated packet. |
| ## @var packet_sizes |
| # List variable to store required packet sizes. |
| ## @var size |
| # List variable to store required packet sizes. |
| |
| ## Method to verify packet stream received on the packet generator interface. |
| # Verify packet-by-packet the output stream captured on a given packet |
| # generator (pg) interface using following packet payload data - order of |
| # packet in the stream, index of the source and destination pg interface, |
| # src and dst host IPv4 addresses and src port and dst port values of UDP |
| # layer. |
| # @param self The object pointer. |
| # @param o Integer variable to store the index of the interface to |
| # verify the output packet stream. |
| # @param capture List variable to store the captured output packet stream. |
| def verify_capture(self, o, capture): |
| last_info = {} |
| for i in self.interfaces: |
| last_info[i] = None |
| for packet in capture: |
| try: |
| ip = packet[IP] |
| udp = packet[UDP] |
| payload_info = self.payload_to_info(str(packet[Raw])) |
| self.assertEqual(payload_info.dst, o) |
| self.log("Got packet on port %u: src=%u (id=%u)" |
| % (o, payload_info.src, payload_info.index), 2) |
| next_info = self.get_next_packet_info_for_interface2( |
| payload_info.src, payload_info.dst, |
| last_info[payload_info.src]) |
| last_info[payload_info.src] = next_info |
| self.assertTrue(next_info is not None) |
| self.assertEqual(payload_info.index, next_info.index) |
| # Check standard fields |
| self.assertEqual(ip.src, next_info.data[IP].src) |
| self.assertEqual(ip.dst, next_info.data[IP].dst) |
| self.assertEqual(udp.sport, next_info.data[UDP].sport) |
| self.assertEqual(udp.dport, next_info.data[UDP].dport) |
| except: |
| self.log("Unexpected or invalid packet:") |
| packet.show() |
| raise |
| for i in self.interfaces: |
| remaining_packet = self.get_next_packet_info_for_interface2( |
| i, o, last_info[i]) |
| self.assertTrue(remaining_packet is None, |
| "Port %u: Packet expected from source %u didn't" |
| " arrive" % (o, i)) |
| ## @var last_info |
| # Dictionary variable to store verified packets per packet generator |
| # interface. |
| ## @var ip |
| # Object variable to store the IP layer of the packet. |
| ## @var udp |
| # Object variable to store the UDP layer of the packet. |
| ## @var payload_info |
| # Object variable to store required information about the packet. |
| ## @var next_info |
| # Object variable to store information about next packet. |
| ## @var remaining_packet |
| # Object variable to store information about remaining packet. |
| |
| ## Method defining L2 cross-connect test case. |
| # Contains steps of the test case. |
| # @param self The object pointer. |
| def test_l2xc(self): |
| """ L2XC test |
| |
| Test scenario: |
| 1.config |
| 2 pairs of 2 interfaces, l2xconnected |
| |
| 2.sending l2 eth packets between 4 interfaces |
| 64B, 512B, 1518B, 9018B (ether_size) |
| burst of packets per interface |
| """ |
| |
| ## Create incoming packet streams for packet-generator interfaces |
| for i in self.interfaces: |
| pkts = self.create_stream(i) |
| self.pg_add_stream(i, pkts) |
| |
| ## Enable packet capturing and start packet sending |
| self.pg_enable_capture(self.interfaces) |
| self.pg_start() |
| |
| ## Verify outgoing packet streams per packet-generator interface |
| for i in self.interfaces: |
| out = self.pg_get_capture(i) |
| self.log("Verifying capture %u" % i) |
| self.verify_capture(i, out) |
| ## @var pkts |
| # List variable to store created input stream of packets for the packet |
| # generator interface. |
| ## @var out |
| # List variable to store captured output stream of packets for |
| # the packet generator interface. |
| |
| |
| if __name__ == '__main__': |
| unittest.main(testRunner = VppTestRunner) |