tests: replace pycodestyle with black
Drop pycodestyle for code style checking in favor of black. Black is
much faster, stable PEP8 compliant code style checker offering also
automatic formatting. It aims to be very stable and produce smallest
diffs. It's used by many small and big projects.
Running checkstyle with black takes a few seconds with a terse output.
Thus, test-checkstyle-diff is no longer necessary.
Expand scope of checkstyle to all python files in the repo, replacing
test-checkstyle with checkstyle-python.
Also, fixstyle-python is now available for automatic style formatting.
Note: python virtualenv has been consolidated in test/Makefile,
test/requirements*.txt which will eventually be moved to a central
location. This is required to simply the automated generation of
docker executor images in the CI.
Type: improvement
Change-Id: I022a326603485f58585e879ac0f697fceefbc9c8
Signed-off-by: Klement Sekera <klement.sekera@gmail.com>
Signed-off-by: Dave Wallace <dwallacelf@gmail.com>
diff --git a/test/vpp_ip_route.py b/test/vpp_ip_route.py
index f5e5a88..06a963c 100644
--- a/test/vpp_ip_route.py
+++ b/test/vpp_ip_route.py
@@ -5,13 +5,12 @@
"""
from vpp_object import VppObject
-from vpp_ip import DpoProto, INVALID_INDEX, VppIpAddressUnion, \
- VppIpMPrefix
+from vpp_ip import DpoProto, INVALID_INDEX, VppIpAddressUnion, VppIpMPrefix
from ipaddress import ip_network, ip_address, IPv4Network, IPv6Network
from vpp_papi_exceptions import UnexpectedApiReturnValueError
# from vnet/vnet/mpls/mpls_types.h
-MPLS_IETF_MAX_LABEL = 0xfffff
+MPLS_IETF_MAX_LABEL = 0xFFFFF
MPLS_LABEL_INVALID = MPLS_IETF_MAX_LABEL + 1
try:
@@ -85,8 +84,7 @@
routes = test.vapi.ip_route_dump(table_id, True)
for e in routes:
- if table_id == e.route.table_id \
- and str(e.route.prefix) == str(prefix):
+ if table_id == e.route.table_id and str(e.route.prefix) == str(prefix):
if not sw_if_index:
return True
else:
@@ -95,15 +93,14 @@
if e.route.n_paths != 1:
return False
else:
- return (e.route.paths[0].sw_if_index == sw_if_index)
+ return e.route.paths[0].sw_if_index == sw_if_index
return False
def find_route_in_dump(dump, route, table):
for r in dump:
- if table.table_id == r.route.table_id \
- and route.prefix == r.route.prefix:
+ if table.table_id == r.route.table_id and route.prefix == r.route.prefix:
if len(route.paths) == r.route.n_paths:
return True
return False
@@ -111,17 +108,13 @@
def find_mroute_in_dump(dump, route, table):
for r in dump:
- if table.table_id == r.route.table_id \
- and route.prefix == r.route.prefix:
+ if table.table_id == r.route.table_id and route.prefix == r.route.prefix:
return True
return False
-def find_mroute(test, grp_addr, src_addr, grp_addr_len,
- table_id=0):
- ip_mprefix = VppIpMPrefix(text_type(src_addr),
- text_type(grp_addr),
- grp_addr_len)
+def find_mroute(test, grp_addr, src_addr, grp_addr_len, table_id=0):
+ ip_mprefix = VppIpMPrefix(text_type(src_addr), text_type(grp_addr), grp_addr_len)
if 4 == ip_mprefix.version:
routes = test.vapi.ip_mroute_dump(table_id, False)
@@ -137,16 +130,18 @@
def find_mpls_route(test, table_id, label, eos_bit, paths=None):
dump = test.vapi.mpls_route_dump(table_id)
for e in dump:
- if label == e.mr_route.mr_label \
- and eos_bit == e.mr_route.mr_eos \
- and table_id == e.mr_route.mr_table_id:
+ if (
+ label == e.mr_route.mr_label
+ and eos_bit == e.mr_route.mr_eos
+ and table_id == e.mr_route.mr_table_id
+ ):
if not paths:
return True
else:
- if (len(paths) != len(e.mr_route.mr_paths)):
+ if len(paths) != len(e.mr_route.mr_paths):
return False
for i in range(len(paths)):
- if (paths[i] != e.mr_route.mr_paths[i]):
+ if paths[i] != e.mr_route.mr_paths[i]:
return False
return True
return False
@@ -156,54 +151,49 @@
# can't use python net here since we need the host bits in the prefix
prefix = "%s/%d" % (addr, len)
addrs = test.vapi.ip_address_dump(
- sw_if_index,
- is_ipv6=(6 == ip_address(addr).version))
+ sw_if_index, is_ipv6=(6 == ip_address(addr).version)
+ )
for a in addrs:
- if a.sw_if_index == sw_if_index and \
- str(a.prefix) == prefix:
+ if a.sw_if_index == sw_if_index and str(a.prefix) == prefix:
return True
return False
class VppIpTable(VppObject):
-
- def __init__(self,
- test,
- table_id,
- is_ip6=0,
- register=True):
+ def __init__(self, test, table_id, is_ip6=0, register=True):
self._test = test
self.table_id = table_id
self.is_ip6 = is_ip6
self.register = register
def add_vpp_config(self):
- self._test.vapi.ip_table_add_del(is_add=1,
- table={'is_ip6': self.is_ip6,
- 'table_id': self.table_id})
+ self._test.vapi.ip_table_add_del(
+ is_add=1, table={"is_ip6": self.is_ip6, "table_id": self.table_id}
+ )
if self.register:
self._test.registry.register(self, self._test.logger)
return self
def remove_vpp_config(self):
- self._test.vapi.ip_table_add_del(is_add=0,
- table={'is_ip6': self.is_ip6,
- 'table_id': self.table_id})
+ self._test.vapi.ip_table_add_del(
+ is_add=0, table={"is_ip6": self.is_ip6, "table_id": self.table_id}
+ )
def replace_begin(self):
self._test.vapi.ip_table_replace_begin(
- table={'is_ip6': self.is_ip6,
- 'table_id': self.table_id})
+ table={"is_ip6": self.is_ip6, "table_id": self.table_id}
+ )
def replace_end(self):
self._test.vapi.ip_table_replace_end(
- table={'is_ip6': self.is_ip6,
- 'table_id': self.table_id})
+ table={"is_ip6": self.is_ip6, "table_id": self.table_id}
+ )
def flush(self):
- self._test.vapi.ip_table_flush(table={'is_ip6': self.is_ip6,
- 'table_id': self.table_id})
+ self._test.vapi.ip_table_flush(
+ table={"is_ip6": self.is_ip6, "table_id": self.table_id}
+ )
def dump(self):
return self._test.vapi.ip_route_dump(self.table_id, self.is_ip6)
@@ -216,19 +206,15 @@
# the default table always exists
return False
# find the default route
- return find_route(self._test,
- "::" if self.is_ip6 else "0.0.0.0",
- 0,
- self.table_id)
+ return find_route(
+ self._test, "::" if self.is_ip6 else "0.0.0.0", 0, self.table_id
+ )
def object_id(self):
- return ("table-%s-%d" %
- ("v6" if self.is_ip6 == 1 else "v4",
- self.table_id))
+ return "table-%s-%d" % ("v6" if self.is_ip6 == 1 else "v4", self.table_id)
class VppIpInterfaceAddress(VppObject):
-
def __init__(self, test, intf, addr, len, bind=None):
self._test = test
self.intf = intf
@@ -242,55 +228,57 @@
def add_vpp_config(self):
self._test.vapi.sw_interface_add_del_address(
- sw_if_index=self.intf.sw_if_index, prefix=self.prefix,
- is_add=1)
+ sw_if_index=self.intf.sw_if_index, prefix=self.prefix, is_add=1
+ )
self._test.registry.register(self, self._test.logger)
return self
def remove_vpp_config(self):
self._test.vapi.sw_interface_add_del_address(
- sw_if_index=self.intf.sw_if_index, prefix=self.prefix,
- is_add=0)
+ sw_if_index=self.intf.sw_if_index, prefix=self.prefix, is_add=0
+ )
def query_vpp_config(self):
# search for the IP address mapping and the two expected
# FIB entries
v = ip_address(self.addr).version
- if ((v == 4 and self.len < 31) or (v == 6 and self.len < 127)):
- return (fib_interface_ip_prefix(self._test,
- self.addr,
- self.len,
- self.intf.sw_if_index) &
- find_route(self._test,
- self.addr,
- self.len,
- table_id=self.table_id,
- sw_if_index=self.intf.sw_if_index) &
- find_route(self._test,
- self.addr,
- self.host_len,
- table_id=self.table_id,
- sw_if_index=self.intf.sw_if_index))
+ if (v == 4 and self.len < 31) or (v == 6 and self.len < 127):
+ return (
+ fib_interface_ip_prefix(
+ self._test, self.addr, self.len, self.intf.sw_if_index
+ )
+ & find_route(
+ self._test,
+ self.addr,
+ self.len,
+ table_id=self.table_id,
+ sw_if_index=self.intf.sw_if_index,
+ )
+ & find_route(
+ self._test,
+ self.addr,
+ self.host_len,
+ table_id=self.table_id,
+ sw_if_index=self.intf.sw_if_index,
+ )
+ )
else:
- return (fib_interface_ip_prefix(self._test,
- self.addr,
- self.len,
- self.intf.sw_if_index) &
- find_route(self._test,
- self.addr,
- self.host_len,
- table_id=self.table_id,
- sw_if_index=self.intf.sw_if_index))
+ return fib_interface_ip_prefix(
+ self._test, self.addr, self.len, self.intf.sw_if_index
+ ) & find_route(
+ self._test,
+ self.addr,
+ self.host_len,
+ table_id=self.table_id,
+ sw_if_index=self.intf.sw_if_index,
+ )
def object_id(self):
- return "interface-ip-%s-%d-%s" % (self.intf,
- self.table_id,
- self.prefix)
+ return "interface-ip-%s-%d-%s" % (self.intf, self.table_id, self.prefix)
class VppIp6LinkLocalAddress(VppObject):
-
def __init__(self, test, intf, addr):
self._test = test
self.intf = intf
@@ -298,7 +286,8 @@
def add_vpp_config(self):
self._test.vapi.sw_interface_ip6_set_link_local_address(
- sw_if_index=self.intf.sw_if_index, ip=self.addr)
+ sw_if_index=self.intf.sw_if_index, ip=self.addr
+ )
self._test.registry.register(self, self._test.logger)
return self
@@ -315,7 +304,6 @@
class VppIpInterfaceBind(VppObject):
-
def __init__(self, test, intf, table):
self._test = test
self.intf = intf
@@ -341,9 +329,12 @@
if 0 == self.table.table_id:
return False
try:
- return self._test.vapi.sw_interface_get_table(
- self.intf.sw_if_index,
- self.table.is_ip6).vrf_id == self.table.table_id
+ return (
+ self._test.vapi.sw_interface_get_table(
+ self.intf.sw_if_index, self.table.is_ip6
+ ).vrf_id
+ == self.table.table_id
+ )
except UnexpectedApiReturnValueError as e:
if e.retval == -2: # INVALID_SW_IF_INDEX
return False
@@ -362,22 +353,28 @@
def encode(self):
is_uniform = 0 if self.mode is MplsLspMode.PIPE else 1
- return {'label': self.value,
- 'ttl': self.ttl,
- 'exp': self.exp,
- 'is_uniform': is_uniform}
+ return {
+ "label": self.value,
+ "ttl": self.ttl,
+ "exp": self.exp,
+ "is_uniform": is_uniform,
+ }
def __eq__(self, other):
if isinstance(other, self.__class__):
- return (self.value == other.value and
- self.ttl == other.ttl and
- self.exp == other.exp and
- self.mode == other.mode)
- elif hasattr(other, 'label'):
- return (self.value == other.label and
- self.ttl == other.ttl and
- self.exp == other.exp and
- (self.mode == MplsLspMode.UNIFORM) == other.is_uniform)
+ return (
+ self.value == other.value
+ and self.ttl == other.ttl
+ and self.exp == other.exp
+ and self.mode == other.mode
+ )
+ elif hasattr(other, "label"):
+ return (
+ self.value == other.label
+ and self.ttl == other.ttl
+ and self.exp == other.exp
+ and (self.mode == MplsLspMode.UNIFORM) == other.is_uniform
+ )
else:
return False
@@ -386,20 +383,18 @@
class VppFibPathNextHop:
- def __init__(self, addr,
- via_label=MPLS_LABEL_INVALID,
- next_hop_id=INVALID_INDEX):
+ def __init__(self, addr, via_label=MPLS_LABEL_INVALID, next_hop_id=INVALID_INDEX):
self.addr = VppIpAddressUnion(addr)
self.via_label = via_label
self.obj_id = next_hop_id
def encode(self):
if self.via_label is not MPLS_LABEL_INVALID:
- return {'via_label': self.via_label}
+ return {"via_label": self.via_label}
if self.obj_id is not INVALID_INDEX:
- return {'obj_id': self.obj_id}
+ return {"obj_id": self.obj_id}
else:
- return {'address': self.addr.encode()}
+ return {"address": self.addr.encode()}
def proto(self):
if self.via_label is MPLS_LABEL_INVALID:
@@ -411,25 +406,27 @@
if not isinstance(other, self.__class__):
# try the other instance's __eq__.
return NotImplemented
- return (self.addr == other.addr and
- self.via_label == other.via_label and
- self.obj_id == other.obj_id)
+ return (
+ self.addr == other.addr
+ and self.via_label == other.via_label
+ and self.obj_id == other.obj_id
+ )
class VppRoutePath:
-
def __init__(
- self,
- nh_addr,
- nh_sw_if_index,
- nh_table_id=0,
- labels=[],
- nh_via_label=MPLS_LABEL_INVALID,
- rpf_id=0,
- next_hop_id=INVALID_INDEX,
- proto=None,
- flags=FibPathFlags.FIB_PATH_FLAG_NONE,
- type=FibPathType.FIB_PATH_TYPE_NORMAL):
+ self,
+ nh_addr,
+ nh_sw_if_index,
+ nh_table_id=0,
+ labels=[],
+ nh_via_label=MPLS_LABEL_INVALID,
+ rpf_id=0,
+ next_hop_id=INVALID_INDEX,
+ proto=None,
+ flags=FibPathFlags.FIB_PATH_FLAG_NONE,
+ type=FibPathType.FIB_PATH_TYPE_NORMAL,
+ ):
self.nh_itf = nh_sw_if_index
self.nh_table_id = nh_table_id
self.nh_labels = labels
@@ -451,36 +448,37 @@
if type(l) == VppMplsLabel:
lstack.append(l.encode())
else:
- lstack.append({'label': l,
- 'ttl': 255})
- while (len(lstack) < 16):
+ lstack.append({"label": l, "ttl": 255})
+ while len(lstack) < 16:
lstack.append({})
return lstack
def encode(self):
- return {'weight': 1,
- 'preference': 0,
- 'table_id': self.nh_table_id,
- 'nh': self.nh.encode(),
- 'next_hop_id': self.next_hop_id,
- 'sw_if_index': self.nh_itf,
- 'rpf_id': self.rpf_id,
- 'proto': self.proto,
- 'type': self.type,
- 'flags': self.flags,
- 'n_labels': len(self.nh_labels),
- 'label_stack': self.encode_labels()}
+ return {
+ "weight": 1,
+ "preference": 0,
+ "table_id": self.nh_table_id,
+ "nh": self.nh.encode(),
+ "next_hop_id": self.next_hop_id,
+ "sw_if_index": self.nh_itf,
+ "rpf_id": self.rpf_id,
+ "proto": self.proto,
+ "type": self.type,
+ "flags": self.flags,
+ "n_labels": len(self.nh_labels),
+ "label_stack": self.encode_labels(),
+ }
def __eq__(self, other):
if isinstance(other, self.__class__):
return self.nh == other.nh
- elif hasattr(other, 'sw_if_index'):
+ elif hasattr(other, "sw_if_index"):
# vl_api_fib_path_t
- if (len(self.nh_labels) != other.n_labels):
+ if len(self.nh_labels) != other.n_labels:
return False
for i in range(len(self.nh_labels)):
- if (self.nh_labels[i] != other.label_stack[i]):
+ if self.nh_labels[i] != other.label_stack[i]:
return False
return self.nh_itf == other.sw_if_index
else:
@@ -491,26 +489,28 @@
class VppMRoutePath(VppRoutePath):
-
- def __init__(self, nh_sw_if_index, flags,
- nh=None,
- proto=FibPathProto.FIB_PATH_NH_PROTO_IP4,
- type=FibPathType.FIB_PATH_TYPE_NORMAL,
- bier_imp=INVALID_INDEX):
+ def __init__(
+ self,
+ nh_sw_if_index,
+ flags,
+ nh=None,
+ proto=FibPathProto.FIB_PATH_NH_PROTO_IP4,
+ type=FibPathType.FIB_PATH_TYPE_NORMAL,
+ bier_imp=INVALID_INDEX,
+ ):
if not nh:
- nh = "::" if proto is FibPathProto.FIB_PATH_NH_PROTO_IP6 \
- else "0.0.0.0"
- super(VppMRoutePath, self).__init__(nh,
- nh_sw_if_index,
- proto=proto,
- type=type,
- next_hop_id=bier_imp)
+ nh = "::" if proto is FibPathProto.FIB_PATH_NH_PROTO_IP6 else "0.0.0.0"
+ super(VppMRoutePath, self).__init__(
+ nh, nh_sw_if_index, proto=proto, type=type, next_hop_id=bier_imp
+ )
self.nh_i_flags = flags
self.bier_imp = bier_imp
def encode(self):
- return {'path': super(VppMRoutePath, self).encode(),
- 'itf_flags': self.nh_i_flags}
+ return {
+ "path": super(VppMRoutePath, self).encode(),
+ "itf_flags": self.nh_i_flags,
+ }
class VppIpRoute(VppObject):
@@ -518,8 +518,9 @@
IP Route
"""
- def __init__(self, test, dest_addr,
- dest_addr_len, paths, table_id=0, register=True):
+ def __init__(
+ self, test, dest_addr, dest_addr_len, paths, table_id=0, register=True
+ ):
self._test = test
self.paths = paths
self.table_id = table_id
@@ -533,8 +534,7 @@
self.encoded_paths.append(path.encode())
def __eq__(self, other):
- if self.table_id == other.table_id and \
- self.prefix == other.prefix:
+ if self.table_id == other.table_id and self.prefix == other.prefix:
return True
return False
@@ -545,24 +545,28 @@
self.encoded_paths.append(path.encode())
self.modified = True
- self._test.vapi.ip_route_add_del(route={'table_id': self.table_id,
- 'prefix': self.prefix,
- 'n_paths': len(
- self.encoded_paths),
- 'paths': self.encoded_paths,
- },
- is_add=1,
- is_multipath=0)
+ self._test.vapi.ip_route_add_del(
+ route={
+ "table_id": self.table_id,
+ "prefix": self.prefix,
+ "n_paths": len(self.encoded_paths),
+ "paths": self.encoded_paths,
+ },
+ is_add=1,
+ is_multipath=0,
+ )
def add_vpp_config(self):
r = self._test.vapi.ip_route_add_del(
- route={'table_id': self.table_id,
- 'prefix': self.prefix,
- 'n_paths': len(self.encoded_paths),
- 'paths': self.encoded_paths,
- },
+ route={
+ "table_id": self.table_id,
+ "prefix": self.prefix,
+ "n_paths": len(self.encoded_paths),
+ "paths": self.encoded_paths,
+ },
is_add=1,
- is_multipath=0)
+ is_multipath=0,
+ )
self.stats_index = r.stats_index
if self.register:
self._test.registry.register(self, self._test.logger)
@@ -575,32 +579,36 @@
# passing no paths and multipath=0
if self.modified:
self._test.vapi.ip_route_add_del(
- route={'table_id': self.table_id,
- 'prefix': self.prefix,
- 'n_paths': len(
- self.encoded_paths),
- 'paths': self.encoded_paths},
+ route={
+ "table_id": self.table_id,
+ "prefix": self.prefix,
+ "n_paths": len(self.encoded_paths),
+ "paths": self.encoded_paths,
+ },
is_add=0,
- is_multipath=1)
+ is_multipath=1,
+ )
else:
self._test.vapi.ip_route_add_del(
- route={'table_id': self.table_id,
- 'prefix': self.prefix,
- 'n_paths': 0},
+ route={"table_id": self.table_id, "prefix": self.prefix, "n_paths": 0},
is_add=0,
- is_multipath=0)
+ is_multipath=0,
+ )
def query_vpp_config(self):
- return find_route(self._test,
- self.prefix.network_address,
- self.prefix.prefixlen,
- self.table_id)
+ return find_route(
+ self._test,
+ self.prefix.network_address,
+ self.prefix.prefixlen,
+ self.table_id,
+ )
def object_id(self):
- return ("%s:table-%d-%s" % (
- 'ip6-route' if self.prefix.version == 6 else 'ip-route',
- self.table_id,
- self.prefix))
+ return "%s:table-%d-%s" % (
+ "ip6-route" if self.prefix.version == 6 else "ip-route",
+ self.table_id,
+ self.prefix,
+ )
def get_stats_to(self):
c = self._test.statistics.get_counter("/net/route/to")
@@ -616,9 +624,9 @@
IP Route V2
"""
- def __init__(self, test, dest_addr,
- dest_addr_len, paths, table_id=0,
- register=True, src=0):
+ def __init__(
+ self, test, dest_addr, dest_addr_len, paths, table_id=0, register=True, src=0
+ ):
self._test = test
self.paths = paths
self.table_id = table_id
@@ -633,8 +641,7 @@
self.encoded_paths.append(path.encode())
def __eq__(self, other):
- if self.table_id == other.table_id and \
- self.prefix == other.prefix:
+ if self.table_id == other.table_id and self.prefix == other.prefix:
return True
return False
@@ -645,26 +652,30 @@
self.encoded_paths.append(path.encode())
self.modified = True
- self._test.vapi.ip_route_add_del_v2(route={'table_id': self.table_id,
- 'prefix': self.prefix,
- 'src': self.src,
- 'n_paths': len(
- self.encoded_paths),
- 'paths': self.encoded_paths,
- },
- is_add=1,
- is_multipath=0)
+ self._test.vapi.ip_route_add_del_v2(
+ route={
+ "table_id": self.table_id,
+ "prefix": self.prefix,
+ "src": self.src,
+ "n_paths": len(self.encoded_paths),
+ "paths": self.encoded_paths,
+ },
+ is_add=1,
+ is_multipath=0,
+ )
def add_vpp_config(self):
r = self._test.vapi.ip_route_add_del_v2(
- route={'table_id': self.table_id,
- 'prefix': self.prefix,
- 'n_paths': len(self.encoded_paths),
- 'paths': self.encoded_paths,
- 'src': self.src,
- },
+ route={
+ "table_id": self.table_id,
+ "prefix": self.prefix,
+ "n_paths": len(self.encoded_paths),
+ "paths": self.encoded_paths,
+ "src": self.src,
+ },
is_add=1,
- is_multipath=0)
+ is_multipath=0,
+ )
self.stats_index = r.stats_index
if self.register:
self._test.registry.register(self, self._test.logger)
@@ -677,34 +688,42 @@
# passing no paths and multipath=0
if self.modified:
self._test.vapi.ip_route_add_del_v2(
- route={'table_id': self.table_id,
- 'prefix': self.prefix,
- 'src': self.src,
- 'n_paths': len(
- self.encoded_paths),
- 'paths': self.encoded_paths},
+ route={
+ "table_id": self.table_id,
+ "prefix": self.prefix,
+ "src": self.src,
+ "n_paths": len(self.encoded_paths),
+ "paths": self.encoded_paths,
+ },
is_add=0,
- is_multipath=1)
+ is_multipath=1,
+ )
else:
self._test.vapi.ip_route_add_del_v2(
- route={'table_id': self.table_id,
- 'prefix': self.prefix,
- 'src': self.src,
- 'n_paths': 0},
+ route={
+ "table_id": self.table_id,
+ "prefix": self.prefix,
+ "src": self.src,
+ "n_paths": 0,
+ },
is_add=0,
- is_multipath=0)
+ is_multipath=0,
+ )
def query_vpp_config(self):
- return find_route(self._test,
- self.prefix.network_address,
- self.prefix.prefixlen,
- self.table_id)
+ return find_route(
+ self._test,
+ self.prefix.network_address,
+ self.prefix.prefixlen,
+ self.table_id,
+ )
def object_id(self):
- return ("%s:table-%d-%s" % (
- 'ip6-route' if self.prefix.version == 6 else 'ip-route',
- self.table_id,
- self.prefix))
+ return "%s:table-%d-%s" % (
+ "ip6-route" if self.prefix.version == 6 else "ip-route",
+ self.table_id,
+ self.prefix,
+ )
def get_stats_to(self):
c = self._test.statistics.get_counter("/net/route/to")
@@ -720,9 +739,17 @@
IP Multicast Route
"""
- def __init__(self, test, src_addr, grp_addr,
- grp_addr_len, e_flags, paths, table_id=0,
- rpf_id=0):
+ def __init__(
+ self,
+ test,
+ src_addr,
+ grp_addr,
+ grp_addr_len,
+ e_flags,
+ paths,
+ table_id=0,
+ rpf_id=0,
+ ):
self._test = test
self.paths = paths
self.table_id = table_id
@@ -736,38 +763,37 @@
def encode(self, paths=None):
_paths = self.encoded_paths if paths is None else paths
- return {'table_id': self.table_id,
- 'entry_flags': self.e_flags,
- 'rpf_id': self.rpf_id,
- 'prefix': self.prefix.encode(),
- 'n_paths': len(_paths),
- 'paths': _paths,
- }
+ return {
+ "table_id": self.table_id,
+ "entry_flags": self.e_flags,
+ "rpf_id": self.rpf_id,
+ "prefix": self.prefix.encode(),
+ "n_paths": len(_paths),
+ "paths": _paths,
+ }
def add_vpp_config(self):
- r = self._test.vapi.ip_mroute_add_del(route=self.encode(),
- is_multipath=1,
- is_add=1)
+ r = self._test.vapi.ip_mroute_add_del(
+ route=self.encode(), is_multipath=1, is_add=1
+ )
self.stats_index = r.stats_index
self._test.registry.register(self, self._test.logger)
return self
def remove_vpp_config(self):
- self._test.vapi.ip_mroute_add_del(route=self.encode(),
- is_multipath=1,
- is_add=0)
+ self._test.vapi.ip_mroute_add_del(route=self.encode(), is_multipath=1, is_add=0)
def update_entry_flags(self, flags):
self.e_flags = flags
- self._test.vapi.ip_mroute_add_del(route=self.encode(paths=[]),
- is_multipath=1,
- is_add=1)
+ self._test.vapi.ip_mroute_add_del(
+ route=self.encode(paths=[]), is_multipath=1, is_add=1
+ )
def update_rpf_id(self, rpf_id):
self.rpf_id = rpf_id
- self._test.vapi.ip_mroute_add_del(route=self.encode(paths=[]),
- is_multipath=1,
- is_add=1)
+ self._test.vapi.ip_mroute_add_del(
+ route=self.encode(paths=[]), is_multipath=1, is_add=1
+ )
def update_path_flags(self, itf, flags):
for p in range(len(self.paths)):
@@ -777,23 +803,25 @@
break
self._test.vapi.ip_mroute_add_del(
- route=self.encode(
- paths=[self.encoded_paths[p]]),
- is_add=1,
- is_multipath=0)
+ route=self.encode(paths=[self.encoded_paths[p]]), is_add=1, is_multipath=0
+ )
def query_vpp_config(self):
- return find_mroute(self._test,
- self.prefix.gaddr,
- self.prefix.saddr,
- self.prefix.length,
- self.table_id)
+ return find_mroute(
+ self._test,
+ self.prefix.gaddr,
+ self.prefix.saddr,
+ self.prefix.length,
+ self.table_id,
+ )
def object_id(self):
- return ("%d:(%s,%s/%d)" % (self.table_id,
- self.prefix.saddr,
- self.prefix.gaddr,
- self.prefix.length))
+ return "%d:(%s,%s/%d)" % (
+ self.table_id,
+ self.prefix.saddr,
+ self.prefix.gaddr,
+ self.prefix.length,
+ )
def get_stats(self):
c = self._test.statistics.get_counter("/net/mroute")
@@ -818,8 +846,16 @@
MPLS to IP Binding
"""
- def __init__(self, test, local_label, dest_addr, dest_addr_len,
- table_id=0, ip_table_id=0, is_ip6=0):
+ def __init__(
+ self,
+ test,
+ local_label,
+ dest_addr,
+ dest_addr_len,
+ table_id=0,
+ ip_table_id=0,
+ is_ip6=0,
+ ):
self._test = test
self.dest_addr_len = dest_addr_len
self.dest_addr = dest_addr
@@ -830,54 +866,54 @@
self.prefix = mk_network(dest_addr, dest_addr_len)
def add_vpp_config(self):
- self._test.vapi.mpls_ip_bind_unbind(self.local_label,
- self.prefix,
- table_id=self.table_id,
- ip_table_id=self.ip_table_id)
+ self._test.vapi.mpls_ip_bind_unbind(
+ self.local_label,
+ self.prefix,
+ table_id=self.table_id,
+ ip_table_id=self.ip_table_id,
+ )
self._test.registry.register(self, self._test.logger)
def remove_vpp_config(self):
- self._test.vapi.mpls_ip_bind_unbind(self.local_label,
- self.prefix,
- table_id=self.table_id,
- ip_table_id=self.ip_table_id,
- is_bind=0)
+ self._test.vapi.mpls_ip_bind_unbind(
+ self.local_label,
+ self.prefix,
+ table_id=self.table_id,
+ ip_table_id=self.ip_table_id,
+ is_bind=0,
+ )
def query_vpp_config(self):
dump = self._test.vapi.mpls_route_dump(self.table_id)
for e in dump:
- if self.local_label == e.mr_route.mr_label \
- and self.table_id == e.mr_route.mr_table_id:
+ if (
+ self.local_label == e.mr_route.mr_label
+ and self.table_id == e.mr_route.mr_table_id
+ ):
return True
return False
def object_id(self):
- return ("%d:%s binds %d:%s/%d"
- % (self.table_id,
- self.local_label,
- self.ip_table_id,
- self.dest_addr,
- self.dest_addr_len))
+ return "%d:%s binds %d:%s/%d" % (
+ self.table_id,
+ self.local_label,
+ self.ip_table_id,
+ self.dest_addr,
+ self.dest_addr_len,
+ )
class VppMplsTable(VppObject):
-
- def __init__(self,
- test,
- table_id):
+ def __init__(self, test, table_id):
self._test = test
self.table_id = table_id
def add_vpp_config(self):
- self._test.vapi.mpls_table_add_del(
- self.table_id,
- is_add=1)
+ self._test.vapi.mpls_table_add_del(self.table_id, is_add=1)
self._test.registry.register(self, self._test.logger)
def remove_vpp_config(self):
- self._test.vapi.mpls_table_add_del(
- self.table_id,
- is_add=0)
+ self._test.vapi.mpls_table_add_del(self.table_id, is_add=0)
def query_vpp_config(self):
dump = self._test.vapi.mpls_table_dump()
@@ -887,7 +923,7 @@
return False
def object_id(self):
- return ("table-mpls-%d" % (self.table_id))
+ return "table-mpls-%d" % (self.table_id)
class VppMplsRoute(VppObject):
@@ -895,9 +931,16 @@
MPLS Route/LSP
"""
- def __init__(self, test, local_label, eos_bit, paths, table_id=0,
- is_multicast=0,
- eos_proto=FibPathProto.FIB_PATH_NH_PROTO_IP4):
+ def __init__(
+ self,
+ test,
+ local_label,
+ eos_bit,
+ paths,
+ table_id=0,
+ is_multicast=0,
+ eos_proto=FibPathProto.FIB_PATH_NH_PROTO_IP4,
+ ):
self._test = test
self.paths = paths
self.local_label = local_label
@@ -911,12 +954,16 @@
for path in self.paths:
paths.append(path.encode())
- r = self._test.vapi.mpls_route_add_del(self.table_id,
- self.local_label,
- self.eos_bit,
- self.eos_proto,
- self.is_multicast,
- paths, 1, 0)
+ r = self._test.vapi.mpls_route_add_del(
+ self.table_id,
+ self.local_label,
+ self.eos_bit,
+ self.eos_proto,
+ self.is_multicast,
+ paths,
+ 1,
+ 0,
+ )
self.stats_index = r.stats_index
self._test.registry.register(self, self._test.logger)
@@ -925,22 +972,28 @@
for path in self.paths:
paths.append(path.encode())
- self._test.vapi.mpls_route_add_del(self.table_id,
- self.local_label,
- self.eos_bit,
- self.eos_proto,
- self.is_multicast,
- paths, 0, 0)
+ self._test.vapi.mpls_route_add_del(
+ self.table_id,
+ self.local_label,
+ self.eos_bit,
+ self.eos_proto,
+ self.is_multicast,
+ paths,
+ 0,
+ 0,
+ )
def query_vpp_config(self):
- return find_mpls_route(self._test, self.table_id,
- self.local_label, self.eos_bit)
+ return find_mpls_route(
+ self._test, self.table_id, self.local_label, self.eos_bit
+ )
def object_id(self):
- return ("mpls-route-%d:%s/%d"
- % (self.table_id,
- self.local_label,
- 20 + self.eos_bit))
+ return "mpls-route-%d:%s/%d" % (
+ self.table_id,
+ self.local_label,
+ 20 + self.eos_bit,
+ )
def get_stats_to(self):
c = self._test.statistics.get_counter("/net/route/to")