| """ |
| IP Types |
| |
| """ |
| import logging |
| |
| from ipaddress import ip_address |
| from socket import AF_INET, AF_INET6 |
| from vpp_papi import VppEnum |
| from vpp_object import VppObject |
| try: |
| text_type = unicode |
| except NameError: |
| text_type = str |
| |
| _log = logging.getLogger(__name__) |
| |
| |
| class DpoProto: |
| DPO_PROTO_IP4 = 0 |
| DPO_PROTO_IP6 = 1 |
| DPO_PROTO_MPLS = 2 |
| DPO_PROTO_ETHERNET = 3 |
| DPO_PROTO_BIER = 4 |
| DPO_PROTO_NSH = 5 |
| |
| |
| INVALID_INDEX = 0xffffffff |
| |
| |
| def get_dpo_proto(addr): |
| if ip_address(addr).version == 6: |
| return DpoProto.DPO_PROTO_IP6 |
| else: |
| return DpoProto.DPO_PROTO_IP4 |
| |
| |
| class VppIpAddressUnion(): |
| def __init__(self, addr): |
| self.addr = addr |
| self.ip_addr = ip_address(text_type(self.addr)) |
| |
| def encode(self): |
| if self.version == 6: |
| return {'ip6': self.ip_addr} |
| else: |
| return {'ip4': self.ip_addr} |
| |
| @property |
| def version(self): |
| return self.ip_addr.version |
| |
| @property |
| def address(self): |
| return self.addr |
| |
| @property |
| def length(self): |
| return self.ip_addr.max_prefixlen |
| |
| @property |
| def bytes(self): |
| return self.ip_addr.packed |
| |
| def __eq__(self, other): |
| if isinstance(other, self.__class__): |
| return self.ip_addr == other.ip_addr |
| elif hasattr(other, "ip4") and hasattr(other, "ip6"): |
| # vl_api_address_union_t |
| if 4 == self.version: |
| return self.ip_addr == other.ip4 |
| else: |
| return self.ip_addr == other.ip6 |
| else: |
| raise Exception("Comparing VppIpAddressUnions:%s" |
| " with incomparable type: %s", |
| self, other) |
| |
| def __ne__(self, other): |
| return not (self == other) |
| |
| def __str__(self): |
| return str(self.ip_addr) |
| |
| |
| class VppIpMPrefix(): |
| def __init__(self, saddr, gaddr, glen): |
| self.saddr = saddr |
| self.gaddr = gaddr |
| self.glen = glen |
| if ip_address(self.saddr).version != \ |
| ip_address(self.gaddr).version: |
| raise ValueError('Source and group addresses must be of the ' |
| 'same address family.') |
| |
| def encode(self): |
| return { |
| 'af': ip_address(self.gaddr).vapi_af, |
| 'grp_address': { |
| ip_address(self.gaddr).vapi_af_name: self.gaddr |
| }, |
| 'src_address': { |
| ip_address(self.saddr).vapi_af_name: self.saddr |
| }, |
| 'grp_address_length': self.glen, |
| } |
| |
| @property |
| def length(self): |
| return self.glen |
| |
| @property |
| def version(self): |
| return ip_address(self.gaddr).version |
| |
| def __str__(self): |
| return "(%s,%s)/%d" % (self.saddr, self.gaddr, self.glen) |
| |
| def __eq__(self, other): |
| if isinstance(other, self.__class__): |
| return (self.glen == other.glen and |
| self.saddr == other.gaddr and |
| self.saddr == other.saddr) |
| elif (hasattr(other, "grp_address_length") and |
| hasattr(other, "grp_address") and |
| hasattr(other, "src_address")): |
| # vl_api_mprefix_t |
| if 4 == self.version: |
| return (self.glen == other.grp_address_length and |
| self.gaddr == str(other.grp_address.ip4) and |
| self.saddr == str(other.src_address.ip4)) |
| else: |
| return (self.glen == other.grp_address_length and |
| self.gaddr == str(other.grp_address.ip6) and |
| self.saddr == str(other.src_address.ip6)) |
| return NotImplemented |
| |
| |
| class VppIpPuntPolicer(VppObject): |
| def __init__(self, test, policer_index, is_ip6=False): |
| self._test = test |
| self._policer_index = policer_index |
| self._is_ip6 = is_ip6 |
| |
| def add_vpp_config(self): |
| self._test.vapi.ip_punt_police(policer_index=self._policer_index, |
| is_ip6=self._is_ip6, is_add=True) |
| |
| def remove_vpp_config(self): |
| self._test.vapi.ip_punt_police(policer_index=self._policer_index, |
| is_ip6=self._is_ip6, is_add=False) |
| |
| def query_vpp_config(self): |
| NotImplemented |
| |
| |
| class VppIpPuntRedirect(VppObject): |
| def __init__(self, test, rx_index, tx_index, nh_addr): |
| self._test = test |
| self._rx_index = rx_index |
| self._tx_index = tx_index |
| self._nh_addr = ip_address(nh_addr) |
| |
| def encode(self): |
| return {"rx_sw_if_index": self._rx_index, |
| "tx_sw_if_index": self._tx_index, "nh": self._nh_addr} |
| |
| def add_vpp_config(self): |
| self._test.vapi.ip_punt_redirect(punt=self.encode(), is_add=True) |
| self._test.registry.register(self, self._test.logger) |
| |
| def remove_vpp_config(self): |
| self._test.vapi.ip_punt_redirect(punt=self.encode(), is_add=False) |
| |
| def get_vpp_config(self): |
| is_ipv6 = True if self._nh_addr.version == 6 else False |
| return self._test.vapi.ip_punt_redirect_dump( |
| sw_if_index=self._rx_index, is_ipv6=is_ipv6) |
| |
| def query_vpp_config(self): |
| if self.get_vpp_config(): |
| return True |
| return False |
| |
| |
| class VppIpPathMtu(VppObject): |
| def __init__(self, test, nh, pmtu, table_id=0): |
| self._test = test |
| self.nh = nh |
| self.pmtu = pmtu |
| self.table_id = table_id |
| |
| def add_vpp_config(self): |
| self._test.vapi.ip_path_mtu_update(pmtu={'nh': self.nh, |
| 'table_id': self.table_id, |
| 'path_mtu': self.pmtu}) |
| self._test.registry.register(self, self._test.logger) |
| return self |
| |
| def modify(self, pmtu): |
| self.pmtu = pmtu |
| self._test.vapi.ip_path_mtu_update(pmtu={'nh': self.nh, |
| 'table_id': self.table_id, |
| 'path_mtu': self.pmtu}) |
| return self |
| |
| def remove_vpp_config(self): |
| self._test.vapi.ip_path_mtu_update(pmtu={'nh': self.nh, |
| 'table_id': self.table_id, |
| 'path_mtu': 0}) |
| |
| def query_vpp_config(self): |
| ds = list(self._test.vapi.vpp.details_iter( |
| self._test.vapi.ip_path_mtu_get)) |
| |
| for d in ds: |
| if self.nh == str(d.pmtu.nh) \ |
| and self.table_id == d.pmtu.table_id \ |
| and self.pmtu == d.pmtu.path_mtu: |
| return True |
| return False |
| |
| def object_id(self): |
| return ("ip-path-mtu-%d-%s-%d" % (self.table_id, |
| self.nh, |
| self.pmtu)) |
| |
| def __str__(self): |
| return self.object_id() |