| #!/usr/bin/env python |
| import binascii |
| import random |
| import socket |
| import os |
| import threading |
| import struct |
| from struct import unpack, unpack_from |
| |
| try: |
| import unittest2 as unittest |
| except ImportError: |
| import unittest |
| |
| from util import ppp, ppc |
| from re import compile |
| import scapy.compat |
| from scapy.packet import Raw |
| from scapy.layers.l2 import Ether |
| from scapy.layers.inet import IP, UDP, ICMP |
| import scapy.layers.inet6 as inet6 |
| from scapy.layers.inet6 import IPv6, ICMPv6DestUnreach |
| import six |
| from framework import VppTestCase, VppTestRunner |
| |
| from vpp_ip import DpoProto |
| from vpp_ip_route import VppIpRoute, VppRoutePath |
| |
| NUM_PKTS = 67 |
| |
| |
| # Format MAC Address |
| def get_mac_addr(bytes_addr): |
| return ':'.join('%02x' % scapy.compat.orb(b) for b in bytes_addr) |
| |
| |
| # Format IP Address |
| def ipv4(bytes_addr): |
| return '.'.join('%d' % scapy.compat.orb(b) for b in bytes_addr) |
| |
| |
| # Unpack Ethernet Frame |
| def ethernet_frame(data): |
| dest_mac, src_mac, proto = struct.unpack('! 6s 6s H', data[:14]) |
| return dest_mac, src_mac, socket.htons(proto), data[14:] |
| |
| |
| # Unpack IPv4 Packets |
| def ipv4_packet(data): |
| proto, src, target = struct.unpack('! 8x 1x B 2x 4s 4s', data[:20]) |
| return proto, src, target, data[20:] |
| |
| |
| # Unpack IPv6 Packets |
| def ipv6_packet(data): |
| nh, src, target = struct.unpack('! 6x B 1x 16s 16s', data[:40]) |
| return nh, src, target, data[40:] |
| |
| |
| # Unpacks any UDP Packet |
| def udp_seg(data): |
| src_port, dest_port, size = struct.unpack('! H H 2x H', data[:8]) |
| return src_port, dest_port, size, data[8:] |
| |
| |
| # Unpacks any TCP Packet |
| def tcp_seg(data): |
| src_port, dest_port, seq, flag = struct.unpack('! H H L 4x H', data[:14]) |
| return src_port, dest_port, seq, data[((flag >> 12) * 4):] |
| |
| |
| def receivePackets(sock, counters): |
| # Wait for some packets on socket |
| while True: |
| data = sock.recv(65536) |
| |
| # punt socket metadata |
| # packet_desc = data[0:8] |
| |
| # Ethernet |
| _, _, eth_proto, data = ethernet_frame(data[8:]) |
| # Ipv4 |
| if eth_proto == 8: |
| proto, _, _, data = ipv4_packet(data) |
| # TCP |
| if proto == 6: |
| _, dst_port, _, data = udp_seg(data) |
| # UDP |
| elif proto == 17: |
| _, dst_port, _, data = udp_seg(data) |
| counters[dst_port] = 0 |
| # Ipv6 |
| elif eth_proto == 0xdd86: |
| nh, _, _, data = ipv6_packet(data) |
| # TCP |
| if nh == 6: |
| _, dst_port, _, data = udp_seg(data) |
| # UDP |
| elif nh == 17: |
| _, dst_port, _, data = udp_seg(data) |
| counters[dst_port] = 0 |
| |
| |
| class serverSocketThread(threading.Thread): |
| """ Socket server thread""" |
| |
| def __init__(self, threadID, sockName, counters): |
| threading.Thread.__init__(self) |
| self.threadID = threadID |
| self.sockName = sockName |
| self.sock = None |
| self.counters = counters |
| |
| def run(self): |
| self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) |
| try: |
| os.unlink(self.sockName) |
| except: |
| pass |
| self.sock.bind(self.sockName) |
| |
| receivePackets(self.sock, self.counters) |
| |
| |
| class TestPuntSocket(VppTestCase): |
| """ Punt Socket """ |
| |
| ports = [1111, 2222, 3333, 4444] |
| sock_servers = list() |
| portsCheck = dict() |
| nr_packets = 256 |
| |
| @classmethod |
| def setUpClass(cls): |
| super(TestPuntSocket, cls).setUpClass() |
| |
| @classmethod |
| def tearDownClass(cls): |
| super(TestPuntSocket, cls).tearDownClass() |
| |
| @classmethod |
| def setUpConstants(cls): |
| cls.extra_vpp_punt_config = [ |
| "punt", "{", "socket", cls.tempdir+"/socket_punt", "}"] |
| super(TestPuntSocket, cls).setUpConstants() |
| |
| def setUp(self): |
| super(TestPuntSocket, self).setUp() |
| random.seed() |
| |
| self.create_pg_interfaces(range(2)) |
| for i in self.pg_interfaces: |
| i.admin_up() |
| |
| def tearDown(self): |
| del self.sock_servers[:] |
| super(TestPuntSocket, self).tearDown() |
| |
| def socket_client_create(self, sock_name, id=None): |
| thread = serverSocketThread(id, sock_name, self.portsCheck) |
| self.sock_servers.append(thread) |
| thread.start() |
| |
| def socket_client_close(self): |
| for thread in self.sock_servers: |
| thread.sock.close() |
| |
| |
| class TestIP4PuntSocket(TestPuntSocket): |
| """ Punt Socket for IPv4 """ |
| |
| @classmethod |
| def setUpClass(cls): |
| super(TestIP4PuntSocket, cls).setUpClass() |
| |
| @classmethod |
| def tearDownClass(cls): |
| super(TestIP4PuntSocket, cls).tearDownClass() |
| |
| def setUp(self): |
| super(TestIP4PuntSocket, self).setUp() |
| |
| for i in self.pg_interfaces: |
| i.config_ip4() |
| i.resolve_arp() |
| |
| def tearDown(self): |
| super(TestIP4PuntSocket, self).tearDown() |
| for i in self.pg_interfaces: |
| i.unconfig_ip4() |
| i.admin_down() |
| |
| def test_punt_socket_dump(self): |
| """ Punt socket registration/deregistration""" |
| |
| punts = self.vapi.punt_socket_dump(is_ip6=0) |
| self.assertEqual(len(punts), 0) |
| |
| # |
| # configure a punt socket |
| # |
| self.vapi.punt_socket_register(1111, b"%s/socket_punt_1111" % |
| six.ensure_binary(self.tempdir)) |
| self.vapi.punt_socket_register(2222, b"%s/socket_punt_2222" % |
| six.ensure_binary(self.tempdir)) |
| punts = self.vapi.punt_socket_dump(is_ip6=0) |
| self.assertEqual(len(punts), 2) |
| self.assertEqual(punts[0].punt.l4_port, 1111) |
| self.assertEqual(punts[1].punt.l4_port, 2222) |
| |
| # |
| # deregister a punt socket |
| # |
| self.vapi.punt_socket_deregister(1111) |
| punts = self.vapi.punt_socket_dump(is_ip6=0) |
| self.assertEqual(len(punts), 1) |
| |
| # |
| # configure a punt socket again |
| # |
| self.vapi.punt_socket_register(1111, b"%s/socket_punt_1111" % |
| six.ensure_binary(self.tempdir)) |
| self.vapi.punt_socket_register(3333, b"%s/socket_punt_3333" % |
| six.ensure_binary(self.tempdir)) |
| punts = self.vapi.punt_socket_dump(is_ip6=0) |
| self.assertEqual(len(punts), 3) |
| |
| # |
| # deregister all punt socket |
| # |
| self.vapi.punt_socket_deregister(1111) |
| self.vapi.punt_socket_deregister(2222) |
| self.vapi.punt_socket_deregister(3333) |
| punts = self.vapi.punt_socket_dump(is_ip6=0) |
| self.assertEqual(len(punts), 0) |
| |
| def test_punt_socket_traffic_single_port_single_socket(self): |
| """ Punt socket traffic single port single socket""" |
| |
| port = self.ports[0] |
| |
| p = (Ether(src=self.pg0.remote_mac, |
| dst=self.pg0.local_mac) / |
| IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) / |
| UDP(sport=9876, dport=port) / |
| Raw('\xa5' * 100)) |
| |
| pkts = p * self.nr_packets |
| self.portsCheck[port] = self.nr_packets |
| |
| punts = self.vapi.punt_socket_dump(is_ip6=0) |
| self.assertEqual(len(punts), 0) |
| |
| # |
| # expect ICMP - port unreachable for all packets |
| # |
| self.vapi.cli("clear trace") |
| self.pg0.add_stream(pkts) |
| self.pg_enable_capture(self.pg_interfaces) |
| self.pg_start() |
| # FIXME - when punt socket deregister is implemented |
| # rx = self.pg0.get_capture(self.nr_packets) |
| # for p in rx: |
| # self.assertEqual(int(p[IP].proto), 1) # ICMP |
| # self.assertEqual(int(p[ICMP].code), 3) # unreachable |
| |
| # |
| # configure a punt socket |
| # |
| self.socket_client_create(b"%s/socket_%d" % ( |
| six.ensure_binary(self.tempdir), port)) |
| self.vapi.punt_socket_register(port, b"%s/socket_%d" % ( |
| six.ensure_binary(self.tempdir), port)) |
| punts = self.vapi.punt_socket_dump(is_ip6=0) |
| self.assertEqual(len(punts), 1) |
| |
| self.logger.debug("Sending %s packets to port %d", |
| str(self.portsCheck[port]), port) |
| # |
| # expect punt socket and no packets on pg0 |
| # |
| self.vapi.cli("clear errors") |
| self.vapi.cli("clear trace") |
| self.pg0.add_stream(pkts) |
| self.pg_enable_capture(self.pg_interfaces) |
| self.pg_start() |
| self.pg0.get_capture(0) |
| self.logger.info(self.vapi.cli("show trace")) |
| self.socket_client_close() |
| self.assertEqual(self.portsCheck[port], 0) |
| |
| # |
| # remove punt socket. expect ICMP - port unreachable for all packets |
| # |
| self.vapi.punt_socket_deregister(port) |
| punts = self.vapi.punt_socket_dump(is_ip6=0) |
| self.assertEqual(len(punts), 0) |
| self.pg0.add_stream(pkts) |
| self.pg_enable_capture(self.pg_interfaces) |
| self.pg_start() |
| # FIXME - when punt socket deregister is implemented |
| # self.pg0.get_capture(nr_packets) |
| |
| def test_punt_socket_traffic_multi_port_multi_sockets(self): |
| """ Punt socket traffic multi ports and multi sockets""" |
| |
| for p in self.ports: |
| self.portsCheck[p] = 0 |
| |
| # |
| # create stream with random packets count per given ports |
| # |
| pkts = list() |
| for _ in range(0, self.nr_packets): |
| # choose port from port list |
| p = random.choice(self.ports) |
| pkts.append(( |
| Ether(src=self.pg0.remote_mac, |
| dst=self.pg0.local_mac) / |
| IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) / |
| UDP(sport=9876, dport=p) / |
| Raw('\xa5' * 100))) |
| self.portsCheck[p] += 1 |
| # |
| # no punt socket |
| # |
| punts = self.vapi.punt_socket_dump(is_ip6=0) |
| self.assertEqual(len(punts), 0) |
| |
| # |
| # configure a punt socket |
| # |
| for p in self.ports: |
| self.socket_client_create(b"%s/socket_%d" % ( |
| six.ensure_binary(self.tempdir), p)) |
| self.vapi.punt_socket_register(p, b"%s/socket_%d" % ( |
| six.ensure_binary(self.tempdir), p)) |
| punts = self.vapi.punt_socket_dump(is_ip6=0) |
| self.assertEqual(len(punts), len(self.ports)) |
| |
| for p in self.ports: |
| self.logger.debug("Sending %s packets to port %d", |
| str(self.portsCheck[p]), p) |
| |
| # |
| # expect punt socket and no packets on pg0 |
| # |
| self.vapi.cli("clear errors") |
| self.vapi.cli("clear trace") |
| self.pg0.add_stream(pkts) |
| self.pg_enable_capture(self.pg_interfaces) |
| self.pg_start() |
| self.pg0.get_capture(0) |
| self.logger.info(self.vapi.cli("show trace")) |
| self.socket_client_close() |
| |
| for p in self.ports: |
| self.assertEqual(self.portsCheck[p], 0) |
| self.vapi.punt_socket_deregister(p) |
| punts = self.vapi.punt_socket_dump(is_ip6=0) |
| self.assertEqual(len(punts), 0) |
| |
| def test_punt_socket_traffic_multi_ports_single_socket(self): |
| """ Punt socket traffic multi ports and single socket""" |
| |
| for p in self.ports: |
| self.portsCheck[p] = 0 |
| |
| # |
| # create stream with random packets count per given ports |
| # |
| pkts = list() |
| for _ in range(0, self.nr_packets): |
| # choose port from port list |
| p = random.choice(self.ports) |
| pkts.append(( |
| Ether(src=self.pg0.remote_mac, |
| dst=self.pg0.local_mac) / |
| IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) / |
| UDP(sport=9876, dport=p) / |
| Raw('\xa5' * 100))) |
| self.portsCheck[p] += 1 |
| |
| # |
| # no punt socket |
| # |
| punts = self.vapi.punt_socket_dump(is_ip6=0) |
| self.assertEqual(len(punts), 0) |
| |
| # configure a punt socket |
| # |
| self.socket_client_create(b"%s/socket_multi" % |
| six.ensure_binary(self.tempdir)) |
| for p in self.ports: |
| self.vapi.punt_socket_register(p, |
| b"%s/socket_multi" % |
| six.ensure_binary(self.tempdir)) |
| punts = self.vapi.punt_socket_dump(is_ip6=0) |
| self.assertEqual(len(punts), len(self.ports)) |
| |
| for p in self.ports: |
| self.logger.debug("Sending %s packets to port %d", |
| str(self.portsCheck[p]), p) |
| # |
| # expect punt socket and no packets on pg0 |
| # |
| self.vapi.cli("clear errors") |
| self.vapi.cli("clear trace") |
| self.pg0.add_stream(pkts) |
| self.pg_enable_capture(self.pg_interfaces) |
| self.pg_start() |
| self.pg0.get_capture(0) |
| self.logger.info(self.vapi.cli("show trace")) |
| self.socket_client_close() |
| |
| for p in self.ports: |
| self.assertEqual(self.portsCheck[p], 0) |
| self.vapi.punt_socket_deregister(p) |
| punts = self.vapi.punt_socket_dump(is_ip6=0) |
| self.assertEqual(len(punts), 0) |
| |
| |
| class TestIP6PuntSocket(TestPuntSocket): |
| """ Punt Socket for IPv6""" |
| |
| @classmethod |
| def setUpClass(cls): |
| super(TestIP6PuntSocket, cls).setUpClass() |
| |
| @classmethod |
| def tearDownClass(cls): |
| super(TestIP6PuntSocket, cls).tearDownClass() |
| |
| def setUp(self): |
| super(TestIP6PuntSocket, self).setUp() |
| |
| for i in self.pg_interfaces: |
| i.config_ip6() |
| i.resolve_ndp() |
| |
| def tearDown(self): |
| super(TestIP6PuntSocket, self).tearDown() |
| for i in self.pg_interfaces: |
| i.unconfig_ip6() |
| i.admin_down() |
| |
| def test_punt_socket_dump(self): |
| """ Punt socket registration """ |
| |
| punts = self.vapi.punt_socket_dump(is_ip6=1) |
| self.assertEqual(len(punts), 0) |
| |
| # |
| # configure a punt socket |
| # |
| self.vapi.punt_socket_register(1111, b"%s/socket_1111" % |
| six.ensure_binary(self.tempdir), |
| is_ip4=0) |
| self.vapi.punt_socket_register(2222, b"%s/socket_2222" % |
| six.ensure_binary(self.tempdir), |
| is_ip4=0) |
| punts = self.vapi.punt_socket_dump(is_ip6=1) |
| self.assertEqual(len(punts), 2) |
| self.assertEqual(punts[0].punt.l4_port, 1111) |
| self.assertEqual(punts[1].punt.l4_port, 2222) |
| |
| # |
| # deregister a punt socket |
| # |
| self.vapi.punt_socket_deregister(1111, is_ip4=0) |
| punts = self.vapi.punt_socket_dump(is_ip6=1) |
| self.assertEqual(len(punts), 1) |
| |
| # |
| # configure a punt socket again |
| # |
| self.vapi.punt_socket_register(1111, b"%s/socket_1111" % |
| six.ensure_binary(self.tempdir), |
| is_ip4=0) |
| punts = self.vapi.punt_socket_dump(is_ip6=1) |
| self.assertEqual(len(punts), 2) |
| |
| # |
| # deregister all punt socket |
| # |
| self.vapi.punt_socket_deregister(1111, is_ip4=0) |
| self.vapi.punt_socket_deregister(2222, is_ip4=0) |
| self.vapi.punt_socket_deregister(3333, is_ip4=0) |
| punts = self.vapi.punt_socket_dump(is_ip6=1) |
| self.assertEqual(len(punts), 0) |
| |
| def test_punt_socket_traffic_single_port_single_socket(self): |
| """ Punt socket traffic single port single socket""" |
| |
| port = self.ports[0] |
| |
| p = (Ether(src=self.pg0.remote_mac, |
| dst=self.pg0.local_mac) / |
| IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6) / |
| inet6.UDP(sport=9876, dport=port) / |
| Raw('\xa5' * 100)) |
| |
| pkts = p * self.nr_packets |
| self.portsCheck[port] = self.nr_packets |
| |
| punts = self.vapi.punt_socket_dump(is_ip6=1) |
| self.assertEqual(len(punts), 0) |
| |
| # |
| # expect ICMPv6 - destination unreachable for all packets |
| # |
| self.vapi.cli("clear trace") |
| self.pg0.add_stream(pkts) |
| self.pg_enable_capture(self.pg_interfaces) |
| self.pg_start() |
| # FIXME - when punt socket deregister is implemented |
| # rx = self.pg0.get_capture(self.nr_packets) |
| # for p in rx: |
| # self.assertEqual(int(p[IPv6].nh), 58) # ICMPv6 |
| # self.assertEqual(int(p[ICMPv6DestUnreach].code),4) # unreachable |
| |
| # |
| # configure a punt socket |
| # |
| self.socket_client_create(b"%s/socket_%d" % ( |
| six.ensure_binary(self.tempdir), port)) |
| self.vapi.punt_socket_register(port, b"%s/socket_%d" % ( |
| six.ensure_binary(self.tempdir), port), is_ip4=0) |
| punts = self.vapi.punt_socket_dump(is_ip6=1) |
| self.assertEqual(len(punts), 1) |
| |
| self.logger.debug("Sending %s packets to port %d", |
| str(self.portsCheck[port]), port) |
| # |
| # expect punt socket and no packets on pg0 |
| # |
| self.vapi.cli("clear errors") |
| self.vapi.cli("clear trace") |
| self.pg0.add_stream(pkts) |
| self.pg_enable_capture(self.pg_interfaces) |
| self.pg_start() |
| self.pg0.get_capture(0) |
| self.logger.info(self.vapi.cli("show trace")) |
| self.socket_client_close() |
| self.assertEqual(self.portsCheck[port], 0) |
| |
| # |
| # remove punt socket. expect ICMP - dest. unreachable for all packets |
| # |
| self.vapi.punt_socket_deregister(port, is_ip4=0) |
| punts = self.vapi.punt_socket_dump(is_ip6=1) |
| self.assertEqual(len(punts), 0) |
| self.pg0.add_stream(pkts) |
| self.pg_enable_capture(self.pg_interfaces) |
| self.pg_start() |
| # FIXME - when punt socket deregister is implemented |
| # self.pg0.get_capture(nr_packets) |
| |
| def test_punt_socket_traffic_multi_port_multi_sockets(self): |
| """ Punt socket traffic multi ports and multi sockets""" |
| |
| for p in self.ports: |
| self.portsCheck[p] = 0 |
| |
| # |
| # create stream with random packets count per given ports |
| # |
| pkts = list() |
| for _ in range(0, self.nr_packets): |
| # choose port from port list |
| p = random.choice(self.ports) |
| pkts.append(( |
| Ether(src=self.pg0.remote_mac, |
| dst=self.pg0.local_mac) / |
| IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6) / |
| inet6.UDP(sport=9876, dport=p) / |
| Raw('\xa5' * 100))) |
| self.portsCheck[p] += 1 |
| # |
| # no punt socket |
| # |
| punts = self.vapi.punt_socket_dump(is_ip6=1) |
| self.assertEqual(len(punts), 0) |
| |
| # |
| # configure a punt socket |
| # |
| for p in self.ports: |
| self.socket_client_create(b"%s/socket_%d" % ( |
| six.ensure_binary(self.tempdir), p)) |
| self.vapi.punt_socket_register(p, b"%s/socket_%d" % ( |
| six.ensure_binary(self.tempdir), p), is_ip4=0) |
| punts = self.vapi.punt_socket_dump(is_ip6=1) |
| self.assertEqual(len(punts), len(self.ports)) |
| |
| for p in self.ports: |
| self.logger.debug("Sending %s packets to port %d", |
| str(self.portsCheck[p]), p) |
| |
| # |
| # expect punt socket and no packets on pg0 |
| # |
| self.vapi.cli("clear errors") |
| self.vapi.cli("clear trace") |
| self.pg0.add_stream(pkts) |
| self.pg_enable_capture(self.pg_interfaces) |
| self.pg_start() |
| self.pg0.get_capture(0) |
| self.logger.info(self.vapi.cli("show trace")) |
| self.socket_client_close() |
| |
| for p in self.ports: |
| self.assertEqual(self.portsCheck[p], 0) |
| self.vapi.punt_socket_deregister(p, is_ip4=0) |
| punts = self.vapi.punt_socket_dump(is_ip6=1) |
| self.assertEqual(len(punts), 0) |
| |
| def test_punt_socket_traffic_multi_ports_single_socket(self): |
| """ Punt socket traffic multi ports and single socket""" |
| |
| for p in self.ports: |
| self.portsCheck[p] = 0 |
| |
| # |
| # create stream with random packets count per given ports |
| # |
| pkts = list() |
| for _ in range(0, self.nr_packets): |
| # choose port from port list |
| p = random.choice(self.ports) |
| pkts.append(( |
| Ether(src=self.pg0.remote_mac, |
| dst=self.pg0.local_mac) / |
| IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6) / |
| inet6.UDP(sport=9876, dport=p) / |
| Raw('\xa5' * 100))) |
| self.portsCheck[p] += 1 |
| |
| # |
| # no punt socket |
| # |
| punts = self.vapi.punt_socket_dump(is_ip6=1) |
| self.assertEqual(len(punts), 0) |
| |
| # |
| # configure a punt socket |
| # |
| self.socket_client_create(b"%s/socket_multi" % |
| six.ensure_binary(self.tempdir)) |
| for p in self.ports: |
| self.vapi.punt_socket_register(p, |
| b"%s/socket_multi" % |
| six.ensure_binary(self.tempdir), |
| is_ip4=0) |
| punts = self.vapi.punt_socket_dump(is_ip6=1) |
| self.assertEqual(len(punts), len(self.ports)) |
| |
| for p in self.ports: |
| self.logger.debug("Send %s packets to port %d", |
| str(self.portsCheck[p]), p) |
| # |
| # expect punt socket and no packets on pg0 |
| # |
| self.vapi.cli("clear errors") |
| self.vapi.cli("clear trace") |
| self.pg0.add_stream(pkts) |
| self.pg_enable_capture(self.pg_interfaces) |
| self.pg_start() |
| self.pg0.get_capture(0) |
| self.logger.info(self.vapi.cli("show trace")) |
| self.socket_client_close() |
| |
| for p in self.ports: |
| self.assertEqual(self.portsCheck[p], 0) |
| self.vapi.punt_socket_deregister(p, is_ip4=0) |
| punts = self.vapi.punt_socket_dump(is_ip6=1) |
| self.assertEqual(len(punts), 0) |
| |
| |
| class TestPunt(VppTestCase): |
| """ Punt Test Case """ |
| |
| @classmethod |
| def setUpClass(cls): |
| super(TestPunt, cls).setUpClass() |
| |
| @classmethod |
| def tearDownClass(cls): |
| super(TestPunt, cls).tearDownClass() |
| |
| def setUp(self): |
| super(TestPunt, self).setUp() |
| |
| self.create_pg_interfaces(range(4)) |
| |
| for i in self.pg_interfaces: |
| i.admin_up() |
| i.config_ip4() |
| i.resolve_arp() |
| i.config_ip6() |
| i.resolve_ndp() |
| |
| def tearDown(self): |
| for i in self.pg_interfaces: |
| i.unconfig_ip4() |
| i.unconfig_ip6() |
| i.ip6_disable() |
| i.admin_down() |
| super(TestPunt, self).tearDown() |
| |
| def test_punt(self): |
| """ Excpetion Path testing """ |
| |
| # |
| # Using the test CLI we will hook in a exception path to |
| # send ACL deny packets out of pg0 and pg1. |
| # the ACL is src,dst = 1.1.1.1,1.1.1.2 |
| # |
| ip_1_1_1_2 = VppIpRoute(self, "1.1.1.2", 32, |
| [VppRoutePath(self.pg3.remote_ip4, |
| self.pg3.sw_if_index)]) |
| ip_1_1_1_2.add_vpp_config() |
| ip_1_2 = VppIpRoute(self, "1::2", 128, |
| [VppRoutePath(self.pg3.remote_ip6, |
| self.pg3.sw_if_index, |
| proto=DpoProto.DPO_PROTO_IP6)], |
| is_ip6=1) |
| ip_1_2.add_vpp_config() |
| |
| p4 = (Ether(src=self.pg2.remote_mac, |
| dst=self.pg2.local_mac) / |
| IP(src="1.1.1.1", dst="1.1.1.2") / |
| UDP(sport=1234, dport=1234) / |
| Raw('\xa5' * 100)) |
| p6 = (Ether(src=self.pg2.remote_mac, |
| dst=self.pg2.local_mac) / |
| IPv6(src="1::1", dst="1::2") / |
| UDP(sport=1234, dport=1234) / |
| Raw('\xa5' * 100)) |
| self.send_and_expect(self.pg2, p4*1, self.pg3) |
| self.send_and_expect(self.pg2, p6*1, self.pg3) |
| |
| # |
| # apply the punting features |
| # |
| self.vapi.cli("test punt pg2") |
| |
| # |
| # pkts now dropped |
| # |
| self.send_and_assert_no_replies(self.pg2, p4*NUM_PKTS) |
| self.send_and_assert_no_replies(self.pg2, p6*NUM_PKTS) |
| |
| # |
| # Check state: |
| # 1 - node error counters |
| # 2 - per-reason counters |
| # 2, 3 are the index of the assigned punt reason |
| # |
| stats = self.statistics.get_counter( |
| "/err/punt-dispatch/No registrations") |
| self.assertEqual(stats, 2*NUM_PKTS) |
| |
| stats = self.statistics.get_counter("/net/punt") |
| self.assertEqual(stats[0][7]['packets'], NUM_PKTS) |
| self.assertEqual(stats[0][8]['packets'], NUM_PKTS) |
| |
| # |
| # use the test CLI to test a client that punts exception |
| # packets out of pg0 |
| # |
| self.vapi.cli("test punt pg0 %s" % self.pg0.remote_ip4) |
| self.vapi.cli("test punt pg0 %s" % self.pg0.remote_ip6) |
| |
| rx4s = self.send_and_expect(self.pg2, p4*NUM_PKTS, self.pg0) |
| rx6s = self.send_and_expect(self.pg2, p6*NUM_PKTS, self.pg0) |
| |
| # |
| # check the packets come out IP unmodified but destined to pg0 host |
| # |
| for rx in rx4s: |
| self.assertEqual(rx[Ether].dst, self.pg0.remote_mac) |
| self.assertEqual(rx[Ether].src, self.pg0.local_mac) |
| self.assertEqual(p4[IP].dst, rx[IP].dst) |
| self.assertEqual(p4[IP].ttl, rx[IP].ttl) |
| for rx in rx6s: |
| self.assertEqual(rx[Ether].dst, self.pg0.remote_mac) |
| self.assertEqual(rx[Ether].src, self.pg0.local_mac) |
| self.assertEqual(p6[IPv6].dst, rx[IPv6].dst) |
| self.assertEqual(p6[IPv6].hlim, rx[IPv6].hlim) |
| |
| stats = self.statistics.get_counter("/net/punt") |
| self.assertEqual(stats[0][7]['packets'], 2*NUM_PKTS) |
| self.assertEqual(stats[0][8]['packets'], 2*NUM_PKTS) |
| |
| # |
| # add another registration for the same reason to send packets |
| # out of pg1 |
| # |
| self.vapi.cli("test punt pg1 %s" % self.pg1.remote_ip4) |
| self.vapi.cli("test punt pg1 %s" % self.pg1.remote_ip6) |
| |
| self.vapi.cli("clear trace") |
| self.pg2.add_stream(p4 * NUM_PKTS) |
| self.pg_enable_capture(self.pg_interfaces) |
| self.pg_start() |
| |
| rxd = self.pg0.get_capture(NUM_PKTS) |
| for rx in rxd: |
| self.assertEqual(rx[Ether].dst, self.pg0.remote_mac) |
| self.assertEqual(rx[Ether].src, self.pg0.local_mac) |
| self.assertEqual(p4[IP].dst, rx[IP].dst) |
| self.assertEqual(p4[IP].ttl, rx[IP].ttl) |
| rxd = self.pg1.get_capture(NUM_PKTS) |
| for rx in rxd: |
| self.assertEqual(rx[Ether].dst, self.pg1.remote_mac) |
| self.assertEqual(rx[Ether].src, self.pg1.local_mac) |
| self.assertEqual(p4[IP].dst, rx[IP].dst) |
| self.assertEqual(p4[IP].ttl, rx[IP].ttl) |
| |
| self.vapi.cli("clear trace") |
| self.pg2.add_stream(p6 * NUM_PKTS) |
| self.pg_enable_capture(self.pg_interfaces) |
| self.pg_start() |
| |
| rxd = self.pg0.get_capture(NUM_PKTS) |
| for rx in rxd: |
| self.assertEqual(rx[Ether].dst, self.pg0.remote_mac) |
| self.assertEqual(rx[Ether].src, self.pg0.local_mac) |
| self.assertEqual(p6[IPv6].dst, rx[IPv6].dst) |
| self.assertEqual(p6[IPv6].hlim, rx[IPv6].hlim) |
| rxd = self.pg1.get_capture(NUM_PKTS) |
| for rx in rxd: |
| self.assertEqual(rx[Ether].dst, self.pg1.remote_mac) |
| self.assertEqual(rx[Ether].src, self.pg1.local_mac) |
| self.assertEqual(p6[IPv6].dst, rx[IPv6].dst) |
| self.assertEqual(p6[IPv6].hlim, rx[IPv6].hlim) |
| |
| stats = self.statistics.get_counter("/net/punt") |
| self.assertEqual(stats[0][7]['packets'], 3*NUM_PKTS) |
| self.assertEqual(stats[0][8]['packets'], 3*NUM_PKTS) |
| |
| self.logger.info(self.vapi.cli("show vlib graph punt-dispatch")) |
| self.logger.info(self.vapi.cli("show punt client")) |
| self.logger.info(self.vapi.cli("show punt reason")) |
| self.logger.info(self.vapi.cli("show punt stats")) |
| self.logger.info(self.vapi.cli("show punt db")) |
| |
| |
| if __name__ == '__main__': |
| unittest.main(testRunner=VppTestRunner) |