| from ipaddress import IPv4Network |
| |
| from vpp_object import VppObject |
| from vpp_papi import VppEnum |
| from vpp_ip import INVALID_INDEX |
| from vpp_papi_provider import UnexpectedApiReturnValueError |
| |
| |
| class VppAclPlugin(VppObject): |
| def __init__(self, test, enable_intf_counters=False): |
| self._test = test |
| self.enable_intf_counters = enable_intf_counters |
| |
| @property |
| def enable_intf_counters(self): |
| return self._enable_intf_counters |
| |
| @enable_intf_counters.setter |
| def enable_intf_counters(self, enable): |
| self.vapi.acl_stats_intf_counters_enable(enable=enable) |
| |
| def add_vpp_config(self): |
| pass |
| |
| def remove_vpp_config(self): |
| pass |
| |
| def query_vpp_config(self): |
| pass |
| |
| def object_id(self): |
| return "acl-plugin-%d" % (self._sw_if_index) |
| |
| |
| class AclRule: |
| """ACL Rule""" |
| |
| # port ranges |
| PORTS_ALL = -1 |
| PORTS_RANGE = 0 |
| PORTS_RANGE_2 = 1 |
| udp_sport_from = 10 |
| udp_sport_to = udp_sport_from + 5 |
| udp_dport_from = 20000 |
| udp_dport_to = udp_dport_from + 5000 |
| tcp_sport_from = 30 |
| tcp_sport_to = tcp_sport_from + 5 |
| tcp_dport_from = 40000 |
| tcp_dport_to = tcp_dport_from + 5000 |
| |
| udp_sport_from_2 = 90 |
| udp_sport_to_2 = udp_sport_from_2 + 5 |
| udp_dport_from_2 = 30000 |
| udp_dport_to_2 = udp_dport_from_2 + 5000 |
| tcp_sport_from_2 = 130 |
| tcp_sport_to_2 = tcp_sport_from_2 + 5 |
| tcp_dport_from_2 = 20000 |
| tcp_dport_to_2 = tcp_dport_from_2 + 5000 |
| |
| icmp4_type = 8 # echo request |
| icmp4_code = 3 |
| icmp6_type = 128 # echo request |
| icmp6_code = 3 |
| |
| icmp4_type_2 = 8 |
| icmp4_code_from_2 = 5 |
| icmp4_code_to_2 = 20 |
| icmp6_type_2 = 128 |
| icmp6_code_from_2 = 8 |
| icmp6_code_to_2 = 42 |
| |
| def __init__( |
| self, |
| is_permit, |
| src_prefix=IPv4Network("0.0.0.0/0"), |
| dst_prefix=IPv4Network("0.0.0.0/0"), |
| proto=0, |
| ports=PORTS_ALL, |
| sport_from=None, |
| sport_to=None, |
| dport_from=None, |
| dport_to=None, |
| ): |
| self.is_permit = is_permit |
| self.src_prefix = src_prefix |
| self.dst_prefix = dst_prefix |
| self._proto = proto |
| self._ports = ports |
| # assign ports by range |
| self.update_ports() |
| # assign specified ports |
| if sport_from: |
| self.sport_from = sport_from |
| if sport_to: |
| self.sport_to = sport_to |
| if dport_from: |
| self.dport_from = dport_from |
| if dport_to: |
| self.dport_to = dport_to |
| |
| def __copy__(self): |
| new_rule = AclRule( |
| self.is_permit, |
| self.src_prefix, |
| self.dst_prefix, |
| self._proto, |
| self._ports, |
| self.sport_from, |
| self.sport_to, |
| self.dport_from, |
| self.dport_to, |
| ) |
| return new_rule |
| |
| def update_ports(self): |
| if self._ports == self.PORTS_ALL: |
| self.sport_from = 0 |
| self.dport_from = 0 |
| self.sport_to = 65535 |
| if self._proto == 1 or self._proto == 58: |
| self.sport_to = 255 |
| self.dport_to = self.sport_to |
| elif self._ports == self.PORTS_RANGE: |
| if self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP: |
| self.sport_from = self.icmp4_type |
| self.sport_to = self.icmp4_type |
| self.dport_from = self.icmp4_code |
| self.dport_to = self.icmp4_code |
| elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP6: |
| self.sport_from = self.icmp6_type |
| self.sport_to = self.icmp6_type |
| self.dport_from = self.icmp6_code |
| self.dport_to = self.icmp6_code |
| elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_TCP: |
| self.sport_from = self.tcp_sport_from |
| self.sport_to = self.tcp_sport_to |
| self.dport_from = self.tcp_dport_from |
| self.dport_to = self.tcp_dport_to |
| elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP: |
| self.sport_from = self.udp_sport_from |
| self.sport_to = self.udp_sport_to |
| self.dport_from = self.udp_dport_from |
| self.dport_to = self.udp_dport_to |
| elif self._ports == self.PORTS_RANGE_2: |
| if self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP: |
| self.sport_from = self.icmp4_type_2 |
| self.sport_to = self.icmp4_type_2 |
| self.dport_from = self.icmp4_code_from_2 |
| self.dport_to = self.icmp4_code_to_2 |
| elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP6: |
| self.sport_from = self.icmp6_type_2 |
| self.sport_to = self.icmp6_type_2 |
| self.dport_from = self.icmp6_code_from_2 |
| self.dport_to = self.icmp6_code_to_2 |
| elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_TCP: |
| self.sport_from = self.tcp_sport_from_2 |
| self.sport_to = self.tcp_sport_to_2 |
| self.dport_from = self.tcp_dport_from_2 |
| self.dport_to = self.tcp_dport_to_2 |
| elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP: |
| self.sport_from = self.udp_sport_from_2 |
| self.sport_to = self.udp_sport_to_2 |
| self.dport_from = self.udp_dport_from_2 |
| self.dport_to = self.udp_dport_to_2 |
| else: |
| self.sport_from = self._ports |
| self.sport_to = self._ports |
| self.dport_from = self._ports |
| self.dport_to = self._ports |
| |
| @property |
| def proto(self): |
| return self._proto |
| |
| @proto.setter |
| def proto(self, proto): |
| self._proto = proto |
| self.update_ports() |
| |
| @property |
| def ports(self): |
| return self._ports |
| |
| @ports.setter |
| def ports(self, ports): |
| self._ports = ports |
| self.update_ports() |
| |
| def encode(self): |
| return { |
| "is_permit": self.is_permit, |
| "proto": self.proto, |
| "srcport_or_icmptype_first": self.sport_from, |
| "srcport_or_icmptype_last": self.sport_to, |
| "src_prefix": self.src_prefix, |
| "dstport_or_icmpcode_first": self.dport_from, |
| "dstport_or_icmpcode_last": self.dport_to, |
| "dst_prefix": self.dst_prefix, |
| } |
| |
| |
| class VppAcl(VppObject): |
| """VPP ACL""" |
| |
| def __init__(self, test, rules, acl_index=INVALID_INDEX, tag=None): |
| self._test = test |
| self._acl_index = acl_index |
| self.tag = tag |
| self._rules = rules |
| |
| @property |
| def rules(self): |
| return self._rules |
| |
| @property |
| def acl_index(self): |
| return self._acl_index |
| |
| @property |
| def count(self): |
| return len(self._rules) |
| |
| def encode_rules(self): |
| rules = [] |
| for rule in self._rules: |
| rules.append(rule.encode()) |
| return rules |
| |
| def add_vpp_config(self, expect_error=False): |
| try: |
| reply = self._test.vapi.acl_add_replace( |
| acl_index=self._acl_index, |
| tag=self.tag, |
| count=self.count, |
| r=self.encode_rules(), |
| ) |
| self._acl_index = reply.acl_index |
| self._test.registry.register(self, self._test.logger) |
| if expect_error: |
| self._test.fail("Unexpected api reply") |
| return self |
| except UnexpectedApiReturnValueError: |
| if not expect_error: |
| self._test.fail("Unexpected api reply") |
| return None |
| |
| def modify_vpp_config(self, rules): |
| self._rules = rules |
| self.add_vpp_config() |
| |
| def remove_vpp_config(self, expect_error=False): |
| try: |
| self._test.vapi.acl_del(acl_index=self._acl_index) |
| if expect_error: |
| self._test.fail("Unexpected api reply") |
| except UnexpectedApiReturnValueError: |
| if not expect_error: |
| self._test.fail("Unexpected api reply") |
| |
| def dump(self): |
| return self._test.vapi.acl_dump(acl_index=self._acl_index) |
| |
| def query_vpp_config(self): |
| dump = self.dump() |
| for rule in dump: |
| if rule.acl_index == self._acl_index: |
| return True |
| return False |
| |
| def object_id(self): |
| return "acl-%s-%d" % (self.tag, self._acl_index) |
| |
| |
| class VppEtypeWhitelist(VppObject): |
| """VPP Etype Whitelist""" |
| |
| def __init__(self, test, sw_if_index, whitelist, n_input=0): |
| self._test = test |
| self.whitelist = whitelist |
| self.n_input = n_input |
| self._sw_if_index = sw_if_index |
| |
| @property |
| def sw_if_index(self): |
| return self._sw_if_index |
| |
| @property |
| def count(self): |
| return len(self.whitelist) |
| |
| def add_vpp_config(self): |
| self._test.vapi.acl_interface_set_etype_whitelist( |
| sw_if_index=self._sw_if_index, |
| count=self.count, |
| n_input=self.n_input, |
| whitelist=self.whitelist, |
| ) |
| self._test.registry.register(self, self._test.logger) |
| return self |
| |
| def remove_vpp_config(self): |
| self._test.vapi.acl_interface_set_etype_whitelist( |
| sw_if_index=self._sw_if_index, count=0, n_input=0, whitelist=[] |
| ) |
| |
| def query_vpp_config(self): |
| self._test.vapi.acl_interface_etype_whitelist_dump( |
| sw_if_index=self._sw_if_index |
| ) |
| return False |
| |
| def object_id(self): |
| return "acl-etype_wl-%d" % (self._sw_if_index) |
| |
| |
| class VppAclInterface(VppObject): |
| """VPP ACL Interface""" |
| |
| def __init__(self, test, sw_if_index, acls, n_input=0): |
| self._test = test |
| self._sw_if_index = sw_if_index |
| self.n_input = n_input |
| self.acls = acls |
| |
| @property |
| def sw_if_index(self): |
| return self._sw_if_index |
| |
| @property |
| def count(self): |
| return len(self.acls) |
| |
| def encode_acls(self): |
| acls = [] |
| for acl in self.acls: |
| acls.append(acl.acl_index) |
| return acls |
| |
| def add_vpp_config(self, expect_error=False): |
| try: |
| reply = self._test.vapi.acl_interface_set_acl_list( |
| sw_if_index=self._sw_if_index, |
| n_input=self.n_input, |
| count=self.count, |
| acls=self.encode_acls(), |
| ) |
| self._test.registry.register(self, self._test.logger) |
| if expect_error: |
| self._test.fail("Unexpected api reply") |
| return self |
| except UnexpectedApiReturnValueError: |
| if not expect_error: |
| self._test.fail("Unexpected api reply") |
| return None |
| |
| def remove_vpp_config(self, expect_error=False): |
| try: |
| reply = self._test.vapi.acl_interface_set_acl_list( |
| sw_if_index=self._sw_if_index, n_input=0, count=0, acls=[] |
| ) |
| if expect_error: |
| self._test.fail("Unexpected api reply") |
| except UnexpectedApiReturnValueError: |
| if not expect_error: |
| self._test.fail("Unexpected api reply") |
| |
| def query_vpp_config(self): |
| dump = self._test.vapi.acl_interface_list_dump(sw_if_index=self._sw_if_index) |
| for acl_list in dump: |
| if acl_list.count > 0: |
| return True |
| return False |
| |
| def object_id(self): |
| return "acl-if-list-%d" % (self._sw_if_index) |
| |
| |
| class MacipRule: |
| """Mac Ip rule""" |
| |
| def __init__( |
| self, is_permit, src_mac=0, src_mac_mask=0, src_prefix=IPv4Network("0.0.0.0/0") |
| ): |
| self.is_permit = is_permit |
| self.src_mac = src_mac |
| self.src_mac_mask = src_mac_mask |
| self.src_prefix = src_prefix |
| |
| def encode(self): |
| return { |
| "is_permit": self.is_permit, |
| "src_mac": self.src_mac, |
| "src_mac_mask": self.src_mac_mask, |
| "src_prefix": self.src_prefix, |
| } |
| |
| |
| class VppMacipAcl(VppObject): |
| """Vpp Mac Ip ACL""" |
| |
| def __init__(self, test, rules, acl_index=INVALID_INDEX, tag=None): |
| self._test = test |
| self._acl_index = acl_index |
| self.tag = tag |
| self._rules = rules |
| |
| @property |
| def acl_index(self): |
| return self._acl_index |
| |
| @property |
| def rules(self): |
| return self._rules |
| |
| @property |
| def count(self): |
| return len(self._rules) |
| |
| def encode_rules(self): |
| rules = [] |
| for rule in self._rules: |
| rules.append(rule.encode()) |
| return rules |
| |
| def add_vpp_config(self, expect_error=False): |
| try: |
| reply = self._test.vapi.macip_acl_add_replace( |
| acl_index=self._acl_index, |
| tag=self.tag, |
| count=self.count, |
| r=self.encode_rules(), |
| ) |
| self._acl_index = reply.acl_index |
| self._test.registry.register(self, self._test.logger) |
| if expect_error: |
| self._test.fail("Unexpected api reply") |
| return self |
| except UnexpectedApiReturnValueError: |
| if not expect_error: |
| self._test.fail("Unexpected api reply") |
| return None |
| |
| def modify_vpp_config(self, rules): |
| self._rules = rules |
| self.add_vpp_config() |
| |
| def remove_vpp_config(self, expect_error=False): |
| try: |
| self._test.vapi.macip_acl_del(acl_index=self._acl_index) |
| if expect_error: |
| self._test.fail("Unexpected api reply") |
| except UnexpectedApiReturnValueError: |
| if not expect_error: |
| self._test.fail("Unexpected api reply") |
| |
| def dump(self): |
| return self._test.vapi.macip_acl_dump(acl_index=self._acl_index) |
| |
| def query_vpp_config(self): |
| dump = self.dump() |
| for rule in dump: |
| if rule.acl_index == self._acl_index: |
| return True |
| return False |
| |
| def object_id(self): |
| return "macip-acl-%s-%d" % (self.tag, self._acl_index) |
| |
| |
| class VppMacipAclInterface(VppObject): |
| """VPP Mac Ip ACL Interface""" |
| |
| def __init__(self, test, sw_if_index, acls): |
| self._test = test |
| self._sw_if_index = sw_if_index |
| self.acls = acls |
| |
| @property |
| def sw_if_index(self): |
| return self._sw_if_index |
| |
| @property |
| def count(self): |
| return len(self.acls) |
| |
| def add_vpp_config(self): |
| for acl in self.acls: |
| self._test.vapi.macip_acl_interface_add_del( |
| is_add=True, sw_if_index=self._sw_if_index, acl_index=acl.acl_index |
| ) |
| self._test.registry.register(self, self._test.logger) |
| |
| def remove_vpp_config(self): |
| for acl in self.acls: |
| self._test.vapi.macip_acl_interface_add_del( |
| is_add=False, sw_if_index=self._sw_if_index, acl_index=acl.acl_index |
| ) |
| |
| def dump(self): |
| return self._test.vapi.macip_acl_interface_list_dump( |
| sw_if_index=self._sw_if_index |
| ) |
| |
| def query_vpp_config(self): |
| dump = self.dump() |
| for acl_list in dump: |
| for acl_index in acl_list.acls: |
| if acl_index != INVALID_INDEX: |
| return True |
| return False |
| |
| def object_id(self): |
| return "macip-acl-if-list-%d" % (self._sw_if_index) |