#!/usr/bin/env python3

import socket
import unittest

from asfframework import VppTestRunner
from scapy.packet import Raw

from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP, TCP
from template_classifier import TestClassifier, VarMask, VarMatch
from vpp_ip_route import VppIpRoute, VppRoutePath
from vpp_ip import INVALID_INDEX
from vpp_papi import VppEnum


# Tests split to different test case classes because of issue reported in
# ticket VPP-1336
class TestClassifierIP(TestClassifier):
    """Classifier IP Test Case"""

    @classmethod
    def setUpClass(cls):
        super(TestClassifierIP, cls).setUpClass()

    @classmethod
    def tearDownClass(cls):
        super(TestClassifierIP, cls).tearDownClass()

    def test_iacl_src_ip(self):
        """Source IP iACL test

        Test scenario for basic IP ACL with source IP
            - Create IPv4 stream for pg0 -> pg1 interface.
            - Create iACL with source IP address.
            - Send and verify received packets on pg1 interface.
        """

        # Basic iACL testing with source IP
        pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
        self.pg0.add_stream(pkts)

        key = "ip_src"
        self.create_classify_table(key, self.build_ip_mask(src_ip="ffffffff"))
        self.create_classify_session(
            self.acl_tbl_idx.get(key), self.build_ip_match(src_ip=self.pg0.remote_ip4)
        )
        self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
        self.acl_active_table = key

        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()

        pkts = self.pg1.get_capture(len(pkts))
        self.verify_capture(self.pg1, pkts)
        self.pg0.assert_nothing_captured(remark="packets forwarded")
        self.pg2.assert_nothing_captured(remark="packets forwarded")
        self.pg3.assert_nothing_captured(remark="packets forwarded")

    def test_iacl_dst_ip(self):
        """Destination IP iACL test

        Test scenario for basic IP ACL with destination IP
            - Create IPv4 stream for pg0 -> pg1 interface.
            - Create iACL with destination IP address.
            - Send and verify received packets on pg1 interface.
        """

        # Basic iACL testing with destination IP
        pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
        self.pg0.add_stream(pkts)

        key = "ip_dst"
        self.create_classify_table(key, self.build_ip_mask(dst_ip="ffffffff"))
        self.create_classify_session(
            self.acl_tbl_idx.get(key), self.build_ip_match(dst_ip=self.pg1.remote_ip4)
        )
        self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
        self.acl_active_table = key

        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()

        pkts = self.pg1.get_capture(len(pkts))
        self.verify_capture(self.pg1, pkts)
        self.pg0.assert_nothing_captured(remark="packets forwarded")
        self.pg2.assert_nothing_captured(remark="packets forwarded")
        self.pg3.assert_nothing_captured(remark="packets forwarded")

    def test_iacl_src_dst_ip(self):
        """Source and destination IP iACL test

        Test scenario for basic IP ACL with source and destination IP
            - Create IPv4 stream for pg0 -> pg1 interface.
            - Create iACL with source and destination IP addresses.
            - Send and verify received packets on pg1 interface.
        """

        # Basic iACL testing with source and destination IP
        pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
        self.pg0.add_stream(pkts)

        key = "ip"
        self.create_classify_table(
            key, self.build_ip_mask(src_ip="ffffffff", dst_ip="ffffffff")
        )
        self.create_classify_session(
            self.acl_tbl_idx.get(key),
            self.build_ip_match(src_ip=self.pg0.remote_ip4, dst_ip=self.pg1.remote_ip4),
        )
        self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
        self.acl_active_table = key

        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()

        pkts = self.pg1.get_capture(len(pkts))
        self.verify_capture(self.pg1, pkts)
        self.pg0.assert_nothing_captured(remark="packets forwarded")
        self.pg2.assert_nothing_captured(remark="packets forwarded")
        self.pg3.assert_nothing_captured(remark="packets forwarded")


class TestClassifierUDP(TestClassifier):
    """Classifier UDP proto Test Case"""

    @classmethod
    def setUpClass(cls):
        super(TestClassifierUDP, cls).setUpClass()

    @classmethod
    def tearDownClass(cls):
        super(TestClassifierUDP, cls).tearDownClass()

    def test_iacl_proto_udp(self):
        """UDP protocol iACL test

        Test scenario for basic protocol ACL with UDP protocol
            - Create IPv4 stream for pg0 -> pg1 interface.
            - Create iACL with UDP IP protocol.
            - Send and verify received packets on pg1 interface.
        """

        # Basic iACL testing with UDP protocol
        pkts = self.create_stream(self.pg0, self.pg1, self.pg_if_packet_sizes)
        self.pg0.add_stream(pkts)

        key = "proto_udp"
        self.create_classify_table(key, self.build_ip_mask(proto="ff"))
        self.create_classify_session(
            self.acl_tbl_idx.get(key), self.build_ip_match(proto=socket.IPPROTO_UDP)
        )
        self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
        self.acl_active_table = key

        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()

        pkts = self.pg1.get_capture(len(pkts))
        self.verify_capture(self.pg1, pkts)
        self.pg0.assert_nothing_captured(remark="packets forwarded")
        self.pg2.assert_nothing_captured(remark="packets forwarded")
        self.pg3.assert_nothing_captured(remark="packets forwarded")

    def test_iacl_proto_udp_sport(self):
        """UDP source port iACL test

        Test scenario for basic protocol ACL with UDP and sport
            - Create IPv4 stream for pg0 -> pg1 interface.
            - Create iACL with UDP IP protocol and defined sport.
            - Send and verify received packets on pg1 interface.
        """

        # Basic iACL testing with UDP and sport
        sport = 38
        pkts = self.create_stream(
            self.pg0, self.pg1, self.pg_if_packet_sizes, UDP(sport=sport, dport=5678)
        )
        self.pg0.add_stream(pkts)

        key = "proto_udp_sport"
        self.create_classify_table(key, self.build_ip_mask(proto="ff", src_port="ffff"))
        self.create_classify_session(
            self.acl_tbl_idx.get(key),
            self.build_ip_match(proto=socket.IPPROTO_UDP, src_port=sport),
        )
        self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
        self.acl_active_table = key

        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()

        pkts = self.pg1.get_capture(len(pkts))
        self.verify_capture(self.pg1, pkts)
        self.pg0.assert_nothing_captured(remark="packets forwarded")
        self.pg2.assert_nothing_captured(remark="packets forwarded")
        self.pg3.assert_nothing_captured(remark="packets forwarded")

    def test_iacl_proto_udp_dport(self):
        """UDP destination port iACL test

        Test scenario for basic protocol ACL with UDP and dport
            - Create IPv4 stream for pg0 -> pg1 interface.
            - Create iACL with UDP IP protocol and defined dport.
            - Send and verify received packets on pg1 interface.
        """

        # Basic iACL testing with UDP and dport
        dport = 427
        pkts = self.create_stream(
            self.pg0, self.pg1, self.pg_if_packet_sizes, UDP(sport=1234, dport=dport)
        )
        self.pg0.add_stream(pkts)

        key = "proto_udp_dport"
        self.create_classify_table(key, self.build_ip_mask(proto="ff", dst_port="ffff"))
        self.create_classify_session(
            self.acl_tbl_idx.get(key),
            self.build_ip_match(proto=socket.IPPROTO_UDP, dst_port=dport),
        )
        self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
        self.acl_active_table = key

        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()

        pkts = self.pg1.get_capture(len(pkts))
        self.verify_capture(self.pg1, pkts)
        self.pg0.assert_nothing_captured(remark="packets forwarded")
        self.pg2.assert_nothing_captured(remark="packets forwarded")
        self.pg3.assert_nothing_captured(remark="packets forwarded")

    def test_iacl_proto_udp_sport_dport(self):
        """UDP source and destination ports iACL test

        Test scenario for basic protocol ACL with UDP and sport and dport
            - Create IPv4 stream for pg0 -> pg1 interface.
            - Create iACL with UDP IP protocol and defined sport and dport.
            - Send and verify received packets on pg1 interface.
        """

        # Basic iACL testing with UDP and sport and dport
        sport = 13720
        dport = 9080
        pkts = self.create_stream(
            self.pg0, self.pg1, self.pg_if_packet_sizes, UDP(sport=sport, dport=dport)
        )
        self.pg0.add_stream(pkts)

        key = "proto_udp_ports"
        self.create_classify_table(
            key, self.build_ip_mask(proto="ff", src_port="ffff", dst_port="ffff")
        )
        self.create_classify_session(
            self.acl_tbl_idx.get(key),
            self.build_ip_match(
                proto=socket.IPPROTO_UDP, src_port=sport, dst_port=dport
            ),
        )
        self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
        self.acl_active_table = key

        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()

        pkts = self.pg1.get_capture(len(pkts))
        self.verify_capture(self.pg1, pkts)
        self.pg0.assert_nothing_captured(remark="packets forwarded")
        self.pg2.assert_nothing_captured(remark="packets forwarded")
        self.pg3.assert_nothing_captured(remark="packets forwarded")


class TestClassifierTCP(TestClassifier):
    """Classifier TCP proto Test Case"""

    @classmethod
    def setUpClass(cls):
        super(TestClassifierTCP, cls).setUpClass()

    @classmethod
    def tearDownClass(cls):
        super(TestClassifierTCP, cls).tearDownClass()

    def test_iacl_proto_tcp(self):
        """TCP protocol iACL test

        Test scenario for basic protocol ACL with TCP protocol
            - Create IPv4 stream for pg0 -> pg1 interface.
            - Create iACL with TCP IP protocol.
            - Send and verify received packets on pg1 interface.
        """

        # Basic iACL testing with TCP protocol
        pkts = self.create_stream(
            self.pg0, self.pg1, self.pg_if_packet_sizes, TCP(sport=1234, dport=5678)
        )
        self.pg0.add_stream(pkts)

        key = "proto_tcp"
        self.create_classify_table(key, self.build_ip_mask(proto="ff"))
        self.create_classify_session(
            self.acl_tbl_idx.get(key), self.build_ip_match(proto=socket.IPPROTO_TCP)
        )
        self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
        self.acl_active_table = key

        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()

        pkts = self.pg1.get_capture(len(pkts))
        self.verify_capture(self.pg1, pkts, TCP)
        self.pg0.assert_nothing_captured(remark="packets forwarded")
        self.pg2.assert_nothing_captured(remark="packets forwarded")
        self.pg3.assert_nothing_captured(remark="packets forwarded")

    def test_iacl_proto_tcp_sport(self):
        """TCP source port iACL test

        Test scenario for basic protocol ACL with TCP and sport
            - Create IPv4 stream for pg0 -> pg1 interface.
            - Create iACL with TCP IP protocol and defined sport.
            - Send and verify received packets on pg1 interface.
        """

        # Basic iACL testing with TCP and sport
        sport = 38
        pkts = self.create_stream(
            self.pg0, self.pg1, self.pg_if_packet_sizes, TCP(sport=sport, dport=5678)
        )
        self.pg0.add_stream(pkts)

        key = "proto_tcp_sport"
        self.create_classify_table(key, self.build_ip_mask(proto="ff", src_port="ffff"))
        self.create_classify_session(
            self.acl_tbl_idx.get(key),
            self.build_ip_match(proto=socket.IPPROTO_TCP, src_port=sport),
        )
        self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
        self.acl_active_table = key

        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()

        pkts = self.pg1.get_capture(len(pkts))
        self.verify_capture(self.pg1, pkts, TCP)
        self.pg0.assert_nothing_captured(remark="packets forwarded")
        self.pg2.assert_nothing_captured(remark="packets forwarded")
        self.pg3.assert_nothing_captured(remark="packets forwarded")

    def test_iacl_proto_tcp_dport(self):
        """TCP destination port iACL test

        Test scenario for basic protocol ACL with TCP and dport
            - Create IPv4 stream for pg0 -> pg1 interface.
            - Create iACL with TCP IP protocol and defined dport.
            - Send and verify received packets on pg1 interface.
        """

        # Basic iACL testing with TCP and dport
        dport = 427
        pkts = self.create_stream(
            self.pg0, self.pg1, self.pg_if_packet_sizes, TCP(sport=1234, dport=dport)
        )
        self.pg0.add_stream(pkts)

        key = "proto_tcp_sport"
        self.create_classify_table(key, self.build_ip_mask(proto="ff", dst_port="ffff"))
        self.create_classify_session(
            self.acl_tbl_idx.get(key),
            self.build_ip_match(proto=socket.IPPROTO_TCP, dst_port=dport),
        )
        self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
        self.acl_active_table = key

        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()

        pkts = self.pg1.get_capture(len(pkts))
        self.verify_capture(self.pg1, pkts, TCP)
        self.pg0.assert_nothing_captured(remark="packets forwarded")
        self.pg2.assert_nothing_captured(remark="packets forwarded")
        self.pg3.assert_nothing_captured(remark="packets forwarded")

    def test_iacl_proto_tcp_sport_dport(self):
        """TCP source and destination ports iACL test

        Test scenario for basic protocol ACL with TCP and sport and dport
            - Create IPv4 stream for pg0 -> pg1 interface.
            - Create iACL with TCP IP protocol and defined sport and dport.
            - Send and verify received packets on pg1 interface.
        """

        # Basic iACL testing with TCP and sport and dport
        sport = 13720
        dport = 9080
        pkts = self.create_stream(
            self.pg0, self.pg1, self.pg_if_packet_sizes, TCP(sport=sport, dport=dport)
        )
        self.pg0.add_stream(pkts)

        key = "proto_tcp_ports"
        self.create_classify_table(
            key, self.build_ip_mask(proto="ff", src_port="ffff", dst_port="ffff")
        )
        self.create_classify_session(
            self.acl_tbl_idx.get(key),
            self.build_ip_match(
                proto=socket.IPPROTO_TCP, src_port=sport, dst_port=dport
            ),
        )
        self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
        self.acl_active_table = key

        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()

        pkts = self.pg1.get_capture(len(pkts))
        self.verify_capture(self.pg1, pkts, TCP)
        self.pg0.assert_nothing_captured(remark="packets forwarded")
        self.pg2.assert_nothing_captured(remark="packets forwarded")
        self.pg3.assert_nothing_captured(remark="packets forwarded")


class TestClassifierIPOut(TestClassifier):
    """Classifier output IP Test Case"""

    @classmethod
    def setUpClass(cls):
        super(TestClassifierIPOut, cls).setUpClass()

    @classmethod
    def tearDownClass(cls):
        super(TestClassifierIPOut, cls).tearDownClass()

    def test_acl_ip_out(self):
        """Output IP ACL test

        Test scenario for basic IP ACL with source IP
            - Create IPv4 stream for pg1 -> pg0 interface.
            - Create ACL with source IP address.
            - Send and verify received packets on pg0 interface.
        """

        # Basic oACL testing with source IP
        pkts = self.create_stream(self.pg1, self.pg0, self.pg_if_packet_sizes)
        self.pg1.add_stream(pkts)

        key = "ip_out"
        self.create_classify_table(
            key, self.build_ip_mask(src_ip="ffffffff"), data_offset=0
        )
        self.create_classify_session(
            self.acl_tbl_idx.get(key), self.build_ip_match(src_ip=self.pg1.remote_ip4)
        )
        self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
        self.acl_active_table = key

        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()

        pkts = self.pg0.get_capture(len(pkts))
        self.verify_capture(self.pg0, pkts)
        self.pg1.assert_nothing_captured(remark="packets forwarded")
        self.pg2.assert_nothing_captured(remark="packets forwarded")
        self.pg3.assert_nothing_captured(remark="packets forwarded")


class TestClassifierMAC(TestClassifier):
    """Classifier MAC Test Case"""

    @classmethod
    def setUpClass(cls):
        super(TestClassifierMAC, cls).setUpClass()

    @classmethod
    def tearDownClass(cls):
        super(TestClassifierMAC, cls).tearDownClass()

    def test_acl_mac(self):
        """MAC ACL test

        Test scenario for basic MAC ACL with source MAC
            - Create IPv4 stream for pg0 -> pg2 interface.
            - Create ACL with source MAC address.
            - Send and verify received packets on pg2 interface.
        """

        # Basic iACL testing with source MAC
        pkts = self.create_stream(self.pg0, self.pg2, self.pg_if_packet_sizes)
        self.pg0.add_stream(pkts)

        key = "mac"
        self.create_classify_table(
            key, self.build_mac_mask(src_mac="ffffffffffff"), data_offset=-14
        )
        self.create_classify_session(
            self.acl_tbl_idx.get(key), self.build_mac_match(src_mac=self.pg0.remote_mac)
        )
        self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
        self.acl_active_table = key

        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()

        pkts = self.pg2.get_capture(len(pkts))
        self.verify_capture(self.pg2, pkts)
        self.pg0.assert_nothing_captured(remark="packets forwarded")
        self.pg1.assert_nothing_captured(remark="packets forwarded")
        self.pg3.assert_nothing_captured(remark="packets forwarded")


class TestClassifierComplex(TestClassifier):
    """Large & Nested Classifiers Test Cases"""

    @classmethod
    def setUpClass(cls):
        super(TestClassifierComplex, cls).setUpClass()

    @classmethod
    def tearDownClass(cls):
        super(TestClassifierComplex, cls).tearDownClass()

    def test_iacl_large(self):
        """Large input ACL test

        Test scenario for Large ACL matching on ethernet+ip+udp headers
            - Create IPv4 stream for pg0 -> pg1 interface.
            - Create large acl matching on ethernet+ip+udp header fields
            - Send and verify received packets on pg1 interface.
        """

        # 40b offset = 80bytes - (sizeof(UDP/IP/ETH) + 4b)
        # + 4b as build_ip_ma*() func, do not match against UDP Len & Chksum
        msk = VarMask(offset=40, spec="ffff")
        mth = VarMatch(offset=40, value=0x1234, length=2)

        payload_msk = self.build_payload_mask([msk])
        payload_match = self.build_payload_match([mth])

        sport = 13720
        dport = 9080

        # 36b offset = 80bytes - (sizeof(UDP/IP/ETH))
        packet_ex = bytes.fromhex(("0" * 36) + "1234")
        pkts = self.create_stream(
            self.pg0,
            self.pg1,
            self.pg_if_packet_sizes,
            UDP(sport=sport, dport=dport),
            packet_ex,
        )
        self.pg0.add_stream(pkts)

        key = "large_in"
        self.create_classify_table(
            key,
            self.build_mac_mask(
                src_mac="ffffffffffff", dst_mac="ffffffffffff", ether_type="ffff"
            )
            + self.build_ip_mask(
                proto="ff",
                src_ip="ffffffff",
                dst_ip="ffffffff",
                src_port="ffff",
                dst_port="ffff",
            )
            + payload_msk,
            data_offset=-14,
        )

        self.create_classify_session(
            self.acl_tbl_idx.get(key),
            self.build_mac_match(
                src_mac=self.pg0.remote_mac,
                dst_mac=self.pg0.local_mac,
                # ipv4 next header
                ether_type="0800",
            )
            + self.build_ip_match(
                proto=socket.IPPROTO_UDP,
                src_ip=self.pg0.remote_ip4,
                dst_ip=self.pg1.remote_ip4,
                src_port=sport,
                dst_port=dport,
            )
            + payload_match,
        )

        self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
        self.acl_active_table = key

        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()

        pkts = self.pg1.get_capture(len(pkts))
        self.verify_capture(self.pg1, pkts)
        self.pg0.assert_nothing_captured(remark="packets forwarded")
        self.pg2.assert_nothing_captured(remark="packets forwarded")
        self.pg3.assert_nothing_captured(remark="packets forwarded")

    def test_oacl_large(self):
        """Large output ACL test
        Test scenario for Large ACL matching on ethernet+ip+udp headers
            - Create IPv4 stream for pg1 -> pg0 interface.
            - Create large acl matching on ethernet+ip+udp header fields
            - Send and verify received packets on pg0 interface.
        """

        # 40b offset = 80bytes - (sizeof(UDP/IP/ETH) + 4b)
        # + 4b as build_ip_ma*() func, do not match against UDP Len & Chksum
        msk = VarMask(offset=40, spec="ffff")
        mth = VarMatch(offset=40, value=0x1234, length=2)

        payload_msk = self.build_payload_mask([msk])
        payload_match = self.build_payload_match([mth])

        sport = 13720
        dport = 9080

        # 36b offset = 80bytes - (sizeof(UDP/IP/ETH))
        packet_ex = bytes.fromhex(("0" * 36) + "1234")
        pkts = self.create_stream(
            self.pg1,
            self.pg0,
            self.pg_if_packet_sizes,
            UDP(sport=sport, dport=dport),
            packet_ex,
        )
        self.pg1.add_stream(pkts)

        key = "large_out"
        self.create_classify_table(
            key,
            self.build_mac_mask(
                src_mac="ffffffffffff", dst_mac="ffffffffffff", ether_type="ffff"
            )
            + self.build_ip_mask(
                proto="ff",
                src_ip="ffffffff",
                dst_ip="ffffffff",
                src_port="ffff",
                dst_port="ffff",
            )
            + payload_msk,
            data_offset=-14,
        )

        self.create_classify_session(
            self.acl_tbl_idx.get(key),
            self.build_mac_match(
                src_mac=self.pg0.local_mac,
                dst_mac=self.pg0.remote_mac,
                # ipv4 next header
                ether_type="0800",
            )
            + self.build_ip_match(
                proto=socket.IPPROTO_UDP,
                src_ip=self.pg1.remote_ip4,
                dst_ip=self.pg0.remote_ip4,
                src_port=sport,
                dst_port=dport,
            )
            + payload_match,
        )

        self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
        self.acl_active_table = key

        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()

        pkts = self.pg0.get_capture(len(pkts))
        self.verify_capture(self.pg0, pkts)
        self.pg1.assert_nothing_captured(remark="packets forwarded")
        self.pg2.assert_nothing_captured(remark="packets forwarded")
        self.pg3.assert_nothing_captured(remark="packets forwarded")

    def test_iacl_nested(self):
        """Nested input ACL test

        Test scenario for Large ACL matching on ethernet+ip+udp headers
            - Create IPv4 stream for pg0 -> pg1 interface.
            - Create 1st classifier table, without any entries
            - Create nested acl matching on ethernet+ip+udp header fields
            - Send and verify received packets on pg1 interface.
        """

        sport = 13720
        dport = 9080
        pkts = self.create_stream(
            self.pg0, self.pg1, self.pg_if_packet_sizes, UDP(sport=sport, dport=dport)
        )

        self.pg0.add_stream(pkts)

        subtable_key = "subtable_in"
        self.create_classify_table(
            subtable_key,
            self.build_mac_mask(
                src_mac="ffffffffffff", dst_mac="ffffffffffff", ether_type="ffff"
            )
            + self.build_ip_mask(
                proto="ff",
                src_ip="ffffffff",
                dst_ip="ffffffff",
                src_port="ffff",
                dst_port="ffff",
            ),
            data_offset=-14,
        )

        key = "nested_in"
        self.create_classify_table(
            key,
            self.build_mac_mask(
                src_mac="ffffffffffff", dst_mac="ffffffffffff", ether_type="ffff"
            )
            + self.build_ip_mask(
                proto="ff",
                src_ip="ffffffff",
                dst_ip="ffffffff",
                src_port="ffff",
                dst_port="ffff",
            ),
            next_table_index=self.acl_tbl_idx.get(subtable_key),
        )

        self.create_classify_session(
            self.acl_tbl_idx.get(subtable_key),
            self.build_mac_match(
                src_mac=self.pg0.remote_mac,
                dst_mac=self.pg0.local_mac,
                # ipv4 next header
                ether_type="0800",
            )
            + self.build_ip_match(
                proto=socket.IPPROTO_UDP,
                src_ip=self.pg0.remote_ip4,
                dst_ip=self.pg1.remote_ip4,
                src_port=sport,
                dst_port=dport,
            ),
        )

        self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
        self.acl_active_table = key

        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()

        pkts = self.pg1.get_capture(len(pkts))
        self.verify_capture(self.pg1, pkts)
        self.pg0.assert_nothing_captured(remark="packets forwarded")
        self.pg2.assert_nothing_captured(remark="packets forwarded")
        self.pg3.assert_nothing_captured(remark="packets forwarded")

    def test_oacl_nested(self):
        """Nested output ACL test

        Test scenario for Large ACL matching on ethernet+ip+udp headers
            - Create IPv4 stream for pg1 -> pg0 interface.
            - Create 1st classifier table, without any entries
            - Create nested acl matching on ethernet+ip+udp header fields
            - Send and verify received packets on pg0 interface.
        """

        sport = 13720
        dport = 9080
        pkts = self.create_stream(
            self.pg1, self.pg0, self.pg_if_packet_sizes, UDP(sport=sport, dport=dport)
        )
        self.pg1.add_stream(pkts)

        subtable_key = "subtable_out"
        self.create_classify_table(
            subtable_key,
            self.build_mac_mask(
                src_mac="ffffffffffff", dst_mac="ffffffffffff", ether_type="ffff"
            )
            + self.build_ip_mask(
                proto="ff",
                src_ip="ffffffff",
                dst_ip="ffffffff",
                src_port="ffff",
                dst_port="ffff",
            ),
            data_offset=-14,
        )

        key = "nested_out"
        self.create_classify_table(
            key,
            self.build_mac_mask(
                src_mac="ffffffffffff", dst_mac="ffffffffffff", ether_type="ffff"
            )
            + self.build_ip_mask(
                proto="ff",
                src_ip="ffffffff",
                dst_ip="ffffffff",
                src_port="ffff",
                dst_port="ffff",
            ),
            next_table_index=self.acl_tbl_idx.get(subtable_key),
            data_offset=-14,
        )

        self.create_classify_session(
            self.acl_tbl_idx.get(subtable_key),
            self.build_mac_match(
                src_mac=self.pg0.local_mac,
                dst_mac=self.pg0.remote_mac,
                # ipv4 next header
                ether_type="0800",
            )
            + self.build_ip_match(
                proto=socket.IPPROTO_UDP,
                src_ip=self.pg1.remote_ip4,
                dst_ip=self.pg0.remote_ip4,
                src_port=sport,
                dst_port=dport,
            ),
        )

        self.output_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
        self.acl_active_table = key

        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()

        pkts = self.pg0.get_capture(len(pkts))
        self.verify_capture(self.pg0, pkts)
        self.pg1.assert_nothing_captured(remark="packets forwarded")
        self.pg2.assert_nothing_captured(remark="packets forwarded")
        self.pg3.assert_nothing_captured(remark="packets forwarded")


class TestClassifierPBR(TestClassifier):
    """Classifier PBR Test Case"""

    @classmethod
    def setUpClass(cls):
        super(TestClassifierPBR, cls).setUpClass()

    @classmethod
    def tearDownClass(cls):
        super(TestClassifierPBR, cls).tearDownClass()

    def test_acl_pbr(self):
        """IP PBR test

        Test scenario for PBR with source IP
            - Create IPv4 stream for pg0 -> pg3 interface.
            - Configure PBR fib entry for packet forwarding.
            - Send and verify received packets on pg3 interface.
        """

        # PBR testing with source IP
        pkts = self.create_stream(self.pg0, self.pg3, self.pg_if_packet_sizes)
        self.pg0.add_stream(pkts)

        key = "pbr"
        self.create_classify_table(key, self.build_ip_mask(src_ip="ffffffff"))
        pbr_option = 1
        # this will create the VRF/table in which we will insert the route
        self.create_classify_session(
            self.acl_tbl_idx.get(key),
            self.build_ip_match(src_ip=self.pg0.remote_ip4),
            pbr_option,
            self.pbr_vrfid,
        )
        self.assertTrue(self.verify_vrf(self.pbr_vrfid))
        r = VppIpRoute(
            self,
            self.pg3.local_ip4,
            24,
            [VppRoutePath(self.pg3.remote_ip4, INVALID_INDEX)],
            table_id=self.pbr_vrfid,
        )
        r.add_vpp_config()

        self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))

        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()

        pkts = self.pg3.get_capture(len(pkts))
        self.verify_capture(self.pg3, pkts)
        self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key), 0)
        self.pg0.assert_nothing_captured(remark="packets forwarded")
        self.pg1.assert_nothing_captured(remark="packets forwarded")
        self.pg2.assert_nothing_captured(remark="packets forwarded")

        # remove the classify session and the route
        r.remove_vpp_config()
        self.create_classify_session(
            self.acl_tbl_idx.get(key),
            self.build_ip_match(src_ip=self.pg0.remote_ip4),
            pbr_option,
            self.pbr_vrfid,
            is_add=0,
        )

        # and the table should be gone.
        self.assertFalse(self.verify_vrf(self.pbr_vrfid))


class TestClassifierPunt(TestClassifier):
    """Classifier punt Test Case"""

    @classmethod
    def setUpClass(cls):
        super(TestClassifierPunt, cls).setUpClass()

    @classmethod
    def tearDownClass(cls):
        super(TestClassifierPunt, cls).tearDownClass()

    def test_punt_udp(self):
        """IPv4/UDP protocol punt ACL test

        Test scenario for basic punt ACL with UDP protocol
            - Create IPv4 stream for pg0 -> pg1 interface.
            - Create punt ACL with UDP IP protocol.
            - Send and verify received packets on pg1 interface.
        """

        sport = 6754
        dport = 17923

        key = "ip4_udp_punt"
        self.create_classify_table(
            key, self.build_ip_mask(src_ip="ffffffff", proto="ff", src_port="ffff")
        )
        table_index = self.acl_tbl_idx.get(key)
        self.vapi.punt_acl_add_del(ip4_table_index=table_index)
        self.acl_active_table = key

        # punt udp packets to dport received on pg0 through pg1
        self.vapi.set_punt(
            is_add=1,
            punt={
                "type": VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_L4,
                "punt": {
                    "l4": {
                        "af": VppEnum.vl_api_address_family_t.ADDRESS_IP4,
                        "protocol": VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP,
                        "port": dport,
                    }
                },
            },
        )
        self.vapi.ip_punt_redirect(
            punt={
                "rx_sw_if_index": self.pg0.sw_if_index,
                "tx_sw_if_index": self.pg1.sw_if_index,
                "nh": self.pg1.remote_ip4,
            }
        )

        pkts = [
            (
                Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
                / IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
                / UDP(sport=sport, dport=dport)
                / Raw("\x17" * 100)
            )
        ] * 2

        # allow a session but not matching the stream: expect to drop
        self.create_classify_session(
            table_index,
            self.build_ip_match(
                src_ip=self.pg0.remote_ip4,
                proto=socket.IPPROTO_UDP,
                src_port=sport + 10,
            ),
        )
        self.send_and_assert_no_replies(self.pg0, pkts)

        # allow a session matching the stream: expect to pass
        self.create_classify_session(
            table_index,
            self.build_ip_match(
                src_ip=self.pg0.remote_ip4, proto=socket.IPPROTO_UDP, src_port=sport
            ),
        )
        self.send_and_expect_only(self.pg0, pkts, self.pg1)

        # test dump api: ip4 is set, ip6 is not
        r = self.vapi.punt_acl_get()
        self.assertEqual(r.ip4_table_index, table_index)
        self.assertEqual(r.ip6_table_index, 0xFFFFFFFF)

        # cleanup
        self.acl_active_table = ""
        self.vapi.punt_acl_add_del(ip4_table_index=table_index, is_add=0)

        # test dump api: nothing set
        r = self.vapi.punt_acl_get()
        self.assertEqual(r.ip4_table_index, 0xFFFFFFFF)
        self.assertEqual(r.ip6_table_index, 0xFFFFFFFF)


if __name__ == "__main__":
    unittest.main(testRunner=VppTestRunner)
