blob: 50a0ed0a3eec1a9f2c9045ea8fcadf5fb592547b [file] [log] [blame]
#!/usr/bin/env python3
# The MIT License (MIT)
#
# Copyright (c) 2015
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# File included from https://github.com/ejordangottlieb/pyswmap
# Thanks to jordan ;)
# - Pierre
#
# There is still a great deal of work required on this module. Please
# use with caution.
# -Jordan
import sys
from ipaddress import (
IPv6Address,
IPv6Network,
ip_network,
ip_address,
)
from math import (
log,
)
class MapCalc(object):
def __init__(self,**bmr):
#rulev6,rulev4):
self.portranges = False
# Validate and set BMR and BMR derived values
self._check_bmr_values(bmr)
def _check_bmr_values(self,bmr):
# Assume these values have not been supplied. Validate later.
self.ealen = False
self.ratio = False
# Validate that a proper PSID Offset has been set
if 'psidoffset' not in bmr:
# Set Default PSID Offset of 6 if it is not set
self.psidoffset = 6
else:
self.psidoffset = self._psid_offset(bmr['psidoffset'])
# Validate that a proper IPv4 rule prefix is defined
if 'rulev4' not in bmr:
print("The rule IPv4 prefix has not been set")
sys.exit(1)
else:
self.rulev4 = self._ipv4_rule(bmr['rulev4'])
# Validate that a proper IPv6 rule prefix is defined
if 'rulev6' not in bmr:
print("The rule IPv6 prefix has not been set")
sys.exit(1)
else:
self.rulev6 = self._ipv6_rule(bmr['rulev6'])
# Check if EA length was passed
if 'ealen' not in bmr:
self.ealen = False
else:
self.ealen = bmr['ealen']
self.ratio = self._calc_ratio(bmr['ealen'])
# Check if sharing ratio was passed or calculated by _calc_ratio
if 'ratio' not in bmr:
# Skip if we have already calculated ratio
if not (self.ratio):
self.ratio = False
else:
if (self.ealen):
# Check to see if supplied EA length contradicts supplied ratio
if ( bmr['ratio'] != self.ratio ):
eavalue = "EA value {}".format(self.ealen)
sharingratio = "sharing ratio {}".format(bmr['ratio'])
print("Supplied {} and {} are contradictory".format(
eavalue,
sharingratio)
)
sys.exit(1)
else:
self.ratio = bmr['ratio']
self.ealen = self._calc_ea(bmr['ratio'])
# EA length or sharing ratio must be set
if not ( self.ealen or self.ratio):
print("The BMR must include an EA length or sharing ratio")
sys.exit(1)
# Since we have not hit an exception we can calculate the port bits
self.portbits = self._calc_port_bits()
def _ipv4_rule(self,rulev4):
try:
self.rulev4mask = ip_network(
rulev4,
strict=False
).prefixlen
except ValueError:
print("Invalid IPv4 prefix {}".format(rulev4))
sys.exit(1)
self.rulev4object = ip_network(rulev4)
return rulev4
def _ipv6_rule(self,rulev6):
try:
self.rulev6mask = IPv6Network(
rulev6,
strict=False
).prefixlen
except ValueError:
print("Invalid IPv6 prefix {}".format(rulev6))
sys.exit(1)
return rulev6
def _psid_offset(self,psidoffset):
PSIDOFFSET_MAX = 6
if psidoffset in range(0,PSIDOFFSET_MAX+1):
return psidoffset
else:
print("Invalid PSID Offset value: {}".format(psidoffset))
sys.exit(1)
def _psid_range(self,x):
rset = []
for i in range(0,x+1):
rset.append(2**i)
return rset
def _calc_port_bits(self):
portbits = 16 - self.psidoffset - self.psidbits
return portbits
def _calc_ea(self,ratio):
if ratio not in ( self._psid_range(16) ):
print("Invalid ratio {}".format(ratio))
print("Ratio between 2 to the power of 0 thru 16")
sys.exit(1)
if ( 1 == ratio):
self.psidbits = 0
else:
self.psidbits = int(log(ratio,2))
ealen = self.psidbits + ( 32 - self.rulev4mask )
return ealen
def _calc_ratio(self,ealen):
maskbits = 32 - self.rulev4mask
if ( ealen < maskbits ):
print("EA of {} incompatible with rule IPv4 prefix {}".format(
ealen,
self.rulev4,
)
)
print("EA length must be at least {} bits".format(
maskbits,
)
)
sys.exit(1)
self.psidbits = ealen - ( 32 - self.rulev4mask )
if ( self.psidbits > 16):
print("EA length of {} is too large".format(
ealen,
)
)
print("EA should not exceed {} for rule IPv4 prefix {}".format(
maskbits + 16,
self.rulev4,
)
)
sys.exit(1)
ratio = 2**self.psidbits
return ratio
def gen_psid(self,portnum):
if ( portnum < self.start_port() ):
print("port value is less than allowed by PSID Offset")
sys.exit(1)
psid = (portnum & ((2**self.psidbits - 1) << self.portbits))
psid = psid >> self.portbits
return psid
def port_ranges(self):
return 2**self.psidoffset - 1
def start_port(self):
if self.psidoffset == 0: return 0
return 2**(16 - self.psidoffset)
def port_list(self,psid):
startrange = psid * (2**self.portbits) + self.start_port()
increment = (2**self.psidbits) * (2**self.portbits)
portlist = [ ]
for port in range(startrange,startrange + 2**self.portbits):
if port >= 65536: continue
portlist.append(port)
for x in range(1,self.port_ranges()):
startrange += increment
for port in range(startrange,startrange + 2**self.portbits):
portlist.append(port)
return portlist
def ipv4_index(self,ipv4addr):
if ip_address(ipv4addr) in ip_network(self.rulev4):
x = ip_address(ipv4addr)
y = ip_network(self.rulev4,strict=False).network_address
self.ipv4addr = x
return ( int(x) - int(y) )
else:
print("Error: IPv4 address {} not in Rule IPv4 subnet {}".format(
ipv4add,
ip_network(self.rulev4,strict=False).network_address))
sys.exit(1)
def _calc_ipv6bit_pos(self):
addroffset = 128 - (self.rulev6mask + ( self.ealen - self.psidbits))
psidshift = 128 - ( self.rulev6mask + self.ealen )
return [addroffset,psidshift]
def _append_map_eabits(self,ipv4index,addroffset,psidshift,psid):
rulev6base = IPv6Network(self.rulev6,strict=False).network_address
map_prefix = int(rulev6base) | ( ipv4index << addroffset )
map_fullprefix = map_prefix | ( psid << psidshift)
return map_fullprefix
def get_mapce_addr(self,ipv4addr,psid):
ipv4index = self.ipv4_index(ipv4addr)
(addroffset,psidshift) = self._calc_ipv6bit_pos()
map_fullprefix = self._append_map_eabits(ipv4index,
addroffset,
psidshift,
psid)
mapv4iid = map_fullprefix | ( int(self.ipv4addr) << 16 )
map_full_address = mapv4iid | psid
mapce_address = "{}".format(IPv6Address(map_full_address))
return mapce_address
def get_mapce_prefix(self,ipv4addr,psid):
ipv4index = self.ipv4_index(ipv4addr)
(addroffset,psidshift) = self._calc_ipv6bit_pos()
map_fullprefix = self._append_map_eabits(ipv4index,
addroffset,
psidshift,
psid)
mapce_prefix = "{}/{}".format(
IPv6Address(map_fullprefix),
self.rulev6mask + self.ealen
)
return mapce_prefix
def get_map_ipv4(self,mapce_address):
ipv4 = (int(IPv6Address(mapce_address)) & ( 0xffffffff << 16 )) >> 16
return ip_address(ipv4)
class DmrCalc(object):
def __init__(self,dmr):
# Validate and set BMR and BMR derived values
self.dmrprefix = self._check_dmr_prefix(dmr)
def embed_6052addr(self,ipv4addr):
try:
ipv4addrint = int(ip_address(ipv4addr))
except ValueError:
print("Invalid IPv4 address {}".format(ipv4addr))
sys.exit(1)
if ( self.dmrprefix.prefixlen == 64 ):
ipv6int = ipv4addrint << 24
ipv6int += int(self.dmrprefix.network_address)
return IPv6Address(ipv6int)
if ( self.dmrprefix.prefixlen == 96 ):
ipv6int = ipv4addrint
ipv6int += int(self.dmrprefix.network_address)
return IPv6Address(ipv6int)
def _check_dmr_prefix(self,dmrprefix):
try:
self.dmrmask = IPv6Network(
dmrprefix,
strict=False
).prefixlen
except ValueError:
print("Invalid IPv6 prefix {}".format(prefix))
sys.exit(1)
if self.dmrmask not in (32,40,48,56,64,96):
print("Invalid prefix mask /{}".format(self.dmrmask))
sys.exit(1)
return IPv6Network(dmrprefix)
if __name__ == "__main__":
m = DmrCalc('fd80::/48')
print(m.dmrprefix)