| #!/usr/bin/env python |
| import binascii |
| import random |
| import socket |
| import unittest |
| import os |
| import scapy.layers.inet6 as inet6 |
| |
| from util import ppp, ppc |
| from re import compile |
| from scapy.packet import Raw |
| from scapy.layers.l2 import Ether |
| from scapy.layers.inet import IP, UDP, ICMP |
| from scapy.layers.inet6 import IPv6, ICMPv6DestUnreach |
| from framework import VppTestCase, VppTestRunner |
| |
| |
| class TestPuntSocket(VppTestCase): |
| """ Punt Socket """ |
| |
| tempdir = "" |
| sock = None |
| err_ptr = compile(r"^([\d]+)\s+([-\w]+)\s+([ -\.\w)(]+)$") |
| |
| @classmethod |
| def setUpConstants(cls): |
| tempdir = cls.tempdir |
| cls.extra_vpp_punt_config = [ |
| "punt", "{", "socket", cls.tempdir+"/socket_punt", "}"] |
| super(TestPuntSocket, cls).setUpConstants() |
| |
| def process_cli(self, exp, ptr): |
| for line in self.vapi.cli(exp).split('\n')[1:]: |
| m = ptr.match(line.strip()) |
| if m: |
| yield m.groups() |
| |
| def show_errors(self): |
| for pack in self.process_cli("show errors", self.err_ptr): |
| try: |
| count, node, reason = pack |
| except ValueError: |
| pass |
| else: |
| yield count, node, reason |
| |
| def get_punt_count(self, counter): |
| errors = list(self.show_errors()) |
| for count, node, reason in errors: |
| if (node == counter and |
| reason == u'Socket TX'): |
| return int(count) |
| return 0 |
| |
| def socket_client_create(self, sock_name): |
| self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) |
| try: |
| os.unlink(sock_name) |
| except: |
| self.logger.debug("Unlink socket faild") |
| self.sock.bind(sock_name) |
| |
| def socket_client_close(self): |
| self.sock.close() |
| |
| |
| class TestIP4PuntSocket(TestPuntSocket): |
| """ Punt Socket for IPv4 """ |
| |
| def setUp(self): |
| super(TestIP4PuntSocket, self).setUp() |
| |
| self.create_pg_interfaces(range(2)) |
| |
| for i in self.pg_interfaces: |
| i.admin_up() |
| i.config_ip4() |
| i.resolve_arp() |
| |
| def tearDown(self): |
| super(TestIP4PuntSocket, self).tearDown() |
| for i in self.pg_interfaces: |
| i.unconfig_ip4() |
| i.admin_down() |
| |
| def test_punt_socket_dump(self): |
| """ Punt socket registration""" |
| |
| punts = self.vapi.punt_socket_dump(0) |
| self.assertEqual(len(punts), 0) |
| |
| # |
| # configure a punt socket |
| # |
| self.vapi.punt_socket_register(1111, self.tempdir+"/socket_punt_1111") |
| self.vapi.punt_socket_register(2222, self.tempdir+"/socket_punt_2222") |
| punts = self.vapi.punt_socket_dump(0) |
| self.assertEqual(len(punts), 2) |
| self.assertEqual(punts[0].punt.l4_port, 1111) |
| # self.assertEqual(punts[0].pathname, "/tmp/punt_socket_udp_1234") |
| self.assertEqual(punts[1].punt.l4_port, 2222) |
| # self.assertEqual(punts[1].pathname, "/tmp/punt_socket_udp_5678") |
| |
| # |
| # deregister a punt socket |
| # |
| self.vapi.punt_socket_deregister(1111) |
| punts = self.vapi.punt_socket_dump(0) |
| self.assertEqual(len(punts), 1) |
| |
| # |
| # configure a punt socket again |
| # |
| self.vapi.punt_socket_register(1111, self.tempdir+"/socket_punt_1111") |
| self.vapi.punt_socket_register(3333, self.tempdir+"/socket_punt_3333") |
| punts = self.vapi.punt_socket_dump(0) |
| self.assertEqual(len(punts), 3) |
| |
| # |
| # deregister all punt socket |
| # |
| self.vapi.punt_socket_deregister(1111) |
| self.vapi.punt_socket_deregister(2222) |
| self.vapi.punt_socket_deregister(3333) |
| punts = self.vapi.punt_socket_dump(0) |
| self.assertEqual(len(punts), 0) |
| |
| def test_punt_socket_traffic(self): |
| """ Punt socket traffic""" |
| |
| nr_packets = 8 |
| p = (Ether(src=self.pg0.remote_mac, |
| dst=self.pg0.local_mac) / |
| IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) / |
| UDP(sport=9876, dport=1234) / |
| Raw('\xa5' * 100)) |
| |
| pkts = p * nr_packets |
| |
| punts = self.vapi.punt_socket_dump(0) |
| self.assertEqual(len(punts), 0) |
| |
| # |
| # expect ICMP - port unreachable for all packets |
| # |
| self.vapi.cli("clear trace") |
| self.pg0.add_stream(pkts) |
| self.pg_enable_capture(self.pg_interfaces) |
| self.pg_start() |
| rx = self.pg0.get_capture(nr_packets) |
| for p in rx: |
| self.assertEqual(int(p[IP].proto), 1) # ICMP |
| self.assertEqual(int(p[ICMP].code), 3) # unreachable |
| |
| # |
| # configure a punt socket |
| # |
| self.socket_client_create(self.tempdir+"/socket_punt_1234") |
| self.vapi.punt_socket_register(1234, self.tempdir+"/socket_punt_1234") |
| punts = self.vapi.punt_socket_dump(0) |
| self.assertEqual(len(punts), 1) |
| |
| # |
| # expect punt socket and no packets on pg0 |
| # |
| self.vapi.cli("clear errors") |
| self.pg0.add_stream(pkts) |
| self.pg_enable_capture(self.pg_interfaces) |
| self.pg_start() |
| self.pg0.get_capture(0) |
| self.socket_client_close() |
| |
| # |
| # remove punt socket. expect ICMP - port unreachable for all packets |
| # |
| self.vapi.punt_socket_deregister(1234) |
| punts = self.vapi.punt_socket_dump(0) |
| self.assertEqual(len(punts), 0) |
| self.pg0.add_stream(pkts) |
| self.pg_enable_capture(self.pg_interfaces) |
| self.pg_start() |
| # FIXME - when punt socket deregister is implemented |
| # self.pg0.get_capture(nr_packets) |
| |
| |
| class TestIP6PuntSocket(TestPuntSocket): |
| """ Punt Socket for IPv6""" |
| |
| def setUp(self): |
| super(TestIP6PuntSocket, self).setUp() |
| |
| self.create_pg_interfaces(range(2)) |
| |
| for i in self.pg_interfaces: |
| i.admin_up() |
| i.config_ip6() |
| i.resolve_ndp() |
| |
| def tearDown(self): |
| super(TestIP6PuntSocket, self).tearDown() |
| for i in self.pg_interfaces: |
| i.unconfig_ip6() |
| i.admin_down() |
| |
| def test_punt_socket_dump(self): |
| """ Punt socket registration """ |
| |
| punts = self.vapi.punt_socket_dump(0) |
| self.assertEqual(len(punts), 0) |
| |
| # |
| # configure a punt socket |
| # |
| self.vapi.punt_socket_register(1111, self.tempdir+"/socket_punt_1111", |
| is_ip4=0) |
| self.vapi.punt_socket_register(2222, self.tempdir+"/socket_punt_2222", |
| is_ip4=0) |
| punts = self.vapi.punt_socket_dump(1) |
| self.assertEqual(len(punts), 2) |
| self.assertEqual(punts[0].punt.l4_port, 1111) |
| # self.assertEqual(punts[0].pathname, "/tmp/punt_socket_udp_1234") |
| self.assertEqual(punts[1].punt.l4_port, 2222) |
| # self.assertEqual(punts[1].pathname, "/tmp/punt_socket_udp_5678") |
| |
| # |
| # deregister a punt socket |
| # |
| self.vapi.punt_socket_deregister(1111, is_ip4=0) |
| punts = self.vapi.punt_socket_dump(1) |
| self.assertEqual(len(punts), 1) |
| |
| # |
| # configure a punt socket again |
| # |
| self.vapi.punt_socket_register(1111, self.tempdir+"/socket_punt_1111", |
| is_ip4=0) |
| punts = self.vapi.punt_socket_dump(1) |
| self.assertEqual(len(punts), 2) |
| |
| # |
| # deregister all punt socket |
| # |
| self.vapi.punt_socket_deregister(1111, is_ip4=0) |
| self.vapi.punt_socket_deregister(2222, is_ip4=0) |
| self.vapi.punt_socket_deregister(3333, is_ip4=0) |
| punts = self.vapi.punt_socket_dump(1) |
| self.assertEqual(len(punts), 0) |
| |
| def test_punt_socket_traffic(self): |
| """ Punt socket traffic""" |
| |
| nr_packets = 2 |
| p = (Ether(src=self.pg0.remote_mac, |
| dst=self.pg0.local_mac) / |
| IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6) / |
| inet6.UDP(sport=9876, dport=1234) / |
| Raw('\xa5' * 100)) |
| |
| pkts = p * nr_packets |
| |
| punts = self.vapi.punt_socket_dump(1) |
| self.assertEqual(len(punts), 0) |
| |
| # |
| # expect ICMPv6 - destination unreachable for all packets |
| # |
| self.vapi.cli("clear trace") |
| self.pg0.add_stream(pkts) |
| self.pg_enable_capture(self.pg_interfaces) |
| self.pg_start() |
| rx = self.pg0.get_capture(nr_packets) |
| for p in rx: |
| self.assertEqual(int(p[IPv6].nh), 58) # ICMPv6 |
| self.assertEqual(int(p[ICMPv6DestUnreach].code), 4) # unreachable |
| |
| # |
| # configure a punt socket |
| # |
| self.socket_client_create(self.tempdir+"/socket_punt_1234") |
| self.vapi.punt_socket_register(1234, self.tempdir+"/socket_punt_1234", |
| is_ip4=0) |
| punts = self.vapi.punt_socket_dump(1) |
| self.assertEqual(len(punts), 1) |
| |
| # |
| # expect punt socket and no packets on pg0 |
| # |
| self.vapi.cli("clear errors") |
| self.pg0.add_stream(pkts) |
| self.pg_enable_capture(self.pg_interfaces) |
| self.pg_start() |
| self.pg0.get_capture(0) |
| self.socket_client_close() |
| |
| # |
| # remove punt socket. expect ICMP - dest. unreachable for all packets |
| # |
| self.vapi.punt_socket_deregister(1234, is_ip4=0) |
| punts = self.vapi.punt_socket_dump(1) |
| self.assertEqual(len(punts), 0) |
| self.pg0.add_stream(pkts) |
| self.pg_enable_capture(self.pg_interfaces) |
| self.pg_start() |
| # FIXME - when punt socket deregister is implemented |
| # self.pg0.get_capture(nr_packets) |
| |
| |
| if __name__ == '__main__': |
| unittest.main(testRunner=VppTestRunner) |