tests: replace pycodestyle with black
Drop pycodestyle for code style checking in favor of black. Black is
much faster, stable PEP8 compliant code style checker offering also
automatic formatting. It aims to be very stable and produce smallest
diffs. It's used by many small and big projects.
Running checkstyle with black takes a few seconds with a terse output.
Thus, test-checkstyle-diff is no longer necessary.
Expand scope of checkstyle to all python files in the repo, replacing
test-checkstyle with checkstyle-python.
Also, fixstyle-python is now available for automatic style formatting.
Note: python virtualenv has been consolidated in test/Makefile,
test/requirements*.txt which will eventually be moved to a central
location. This is required to simply the automated generation of
docker executor images in the CI.
Type: improvement
Change-Id: I022a326603485f58585e879ac0f697fceefbc9c8
Signed-off-by: Klement Sekera <klement.sekera@gmail.com>
Signed-off-by: Dave Wallace <dwallacelf@gmail.com>
diff --git a/test/test_wireguard.py b/test/test_wireguard.py
index e844b1d..1a955b1 100644
--- a/test/test_wireguard.py
+++ b/test/test_wireguard.py
@@ -11,12 +11,22 @@
from scapy.layers.l2 import Ether, ARP
from scapy.layers.inet import IP, UDP
from scapy.layers.inet6 import IPv6
-from scapy.contrib.wireguard import Wireguard, WireguardResponse, \
- WireguardInitiation, WireguardTransport
-from cryptography.hazmat.primitives.asymmetric.x25519 import \
- X25519PrivateKey, X25519PublicKey
-from cryptography.hazmat.primitives.serialization import Encoding, \
- PrivateFormat, PublicFormat, NoEncryption
+from scapy.contrib.wireguard import (
+ Wireguard,
+ WireguardResponse,
+ WireguardInitiation,
+ WireguardTransport,
+)
+from cryptography.hazmat.primitives.asymmetric.x25519 import (
+ X25519PrivateKey,
+ X25519PublicKey,
+)
+from cryptography.hazmat.primitives.serialization import (
+ Encoding,
+ PrivateFormat,
+ PublicFormat,
+ NoEncryption,
+)
from cryptography.hazmat.primitives.hashes import BLAKE2s, Hash
from cryptography.hazmat.primitives.hmac import HMAC
from cryptography.hazmat.backends import default_backend
@@ -39,14 +49,11 @@
def private_key_bytes(k):
- return k.private_bytes(Encoding.Raw,
- PrivateFormat.Raw,
- NoEncryption())
+ return k.private_bytes(Encoding.Raw, PrivateFormat.Raw, NoEncryption())
def public_key_bytes(k):
- return k.public_bytes(Encoding.Raw,
- PublicFormat.Raw)
+ return k.public_bytes(Encoding.Raw, PublicFormat.Raw)
class VppWgInterface(VppInterface):
@@ -69,37 +76,41 @@
return private_key_bytes(self.private_key)
def add_vpp_config(self):
- r = self.test.vapi.wireguard_interface_create(interface={
- 'user_instance': 0xffffffff,
- 'port': self.port,
- 'src_ip': self.src,
- 'private_key': private_key_bytes(self.private_key),
- 'generate_key': False
- })
+ r = self.test.vapi.wireguard_interface_create(
+ interface={
+ "user_instance": 0xFFFFFFFF,
+ "port": self.port,
+ "src_ip": self.src,
+ "private_key": private_key_bytes(self.private_key),
+ "generate_key": False,
+ }
+ )
self.set_sw_if_index(r.sw_if_index)
self.test.registry.register(self, self.test.logger)
return self
def remove_vpp_config(self):
- self.test.vapi.wireguard_interface_delete(
- sw_if_index=self._sw_if_index)
+ self.test.vapi.wireguard_interface_delete(sw_if_index=self._sw_if_index)
def query_vpp_config(self):
- ts = self.test.vapi.wireguard_interface_dump(sw_if_index=0xffffffff)
+ ts = self.test.vapi.wireguard_interface_dump(sw_if_index=0xFFFFFFFF)
for t in ts:
- if t.interface.sw_if_index == self._sw_if_index and \
- str(t.interface.src_ip) == self.src and \
- t.interface.port == self.port and \
- t.interface.private_key == private_key_bytes(self.private_key):
+ if (
+ t.interface.sw_if_index == self._sw_if_index
+ and str(t.interface.src_ip) == self.src
+ and t.interface.port == self.port
+ and t.interface.private_key == private_key_bytes(self.private_key)
+ ):
return True
return False
- def want_events(self, peer_index=0xffffffff):
+ def want_events(self, peer_index=0xFFFFFFFF):
self.test.vapi.want_wireguard_peer_events(
enable_disable=1,
pid=os.getpid(),
sw_if_index=self._sw_if_index,
- peer_index=peer_index)
+ peer_index=peer_index,
+ )
def wait_events(self, expect, peers, timeout=5):
for i in range(len(peers)):
@@ -118,8 +129,7 @@
routes = test.vapi.ip_route_dump(table_id, is_ip6)
for e in routes:
- if table_id == e.route.table_id \
- and str(e.route.prefix) == str(prefix):
+ if table_id == e.route.table_id and str(e.route.prefix) == str(prefix):
return True
return False
@@ -129,14 +139,7 @@
class VppWgPeer(VppObject):
-
- def __init__(self,
- test,
- itf,
- endpoint,
- port,
- allowed_ips,
- persistent_keepalive=15):
+ def __init__(self, test, itf, endpoint, port, allowed_ips, persistent_keepalive=15):
self._test = test
self.itf = itf
self.endpoint = endpoint
@@ -153,13 +156,15 @@
def add_vpp_config(self, is_ip6=False):
rv = self._test.vapi.wireguard_peer_add(
peer={
- 'public_key': self.public_key_bytes(),
- 'port': self.port,
- 'endpoint': self.endpoint,
- 'n_allowed_ips': len(self.allowed_ips),
- 'allowed_ips': self.allowed_ips,
- 'sw_if_index': self.itf.sw_if_index,
- 'persistent_keepalive': self.persistent_keepalive})
+ "public_key": self.public_key_bytes(),
+ "port": self.port,
+ "endpoint": self.endpoint,
+ "n_allowed_ips": len(self.allowed_ips),
+ "allowed_ips": self.allowed_ips,
+ "sw_if_index": self.itf.sw_if_index,
+ "persistent_keepalive": self.persistent_keepalive,
+ }
+ )
self.index = rv.peer_index
self.receiver_index = self.index + 1
self._test.registry.register(self, self._test.logger)
@@ -169,7 +174,7 @@
self._test.vapi.wireguard_peer_remove(peer_index=self.index)
def object_id(self):
- return ("wireguard-peer-%s" % self.index)
+ return "wireguard-peer-%s" % self.index
def public_key_bytes(self):
return public_key_bytes(self.public_key)
@@ -178,11 +183,13 @@
peers = self._test.vapi.wireguard_peers_dump()
for p in peers:
- if p.peer.public_key == self.public_key_bytes() and \
- p.peer.port == self.port and \
- str(p.peer.endpoint) == self.endpoint and \
- p.peer.sw_if_index == self.itf.sw_if_index and \
- len(self.allowed_ips) == p.peer.n_allowed_ips:
+ if (
+ p.peer.public_key == self.public_key_bytes()
+ and p.peer.port == self.port
+ and str(p.peer.endpoint) == self.endpoint
+ and p.peer.sw_if_index == self.itf.sw_if_index
+ and len(self.allowed_ips) == p.peer.n_allowed_ips
+ ):
self.allowed_ips.sort()
p.peer.allowed_ips.sort()
@@ -197,13 +204,17 @@
def mk_tunnel_header(self, tx_itf, is_ip6=False):
if is_ip6 is False:
- return (Ether(dst=tx_itf.local_mac, src=tx_itf.remote_mac) /
- IP(src=self.endpoint, dst=self.itf.src) /
- UDP(sport=self.port, dport=self.itf.port))
+ return (
+ Ether(dst=tx_itf.local_mac, src=tx_itf.remote_mac)
+ / IP(src=self.endpoint, dst=self.itf.src)
+ / UDP(sport=self.port, dport=self.itf.port)
+ )
else:
- return (Ether(dst=tx_itf.local_mac, src=tx_itf.remote_mac) /
- IPv6(src=self.endpoint, dst=self.itf.src) /
- UDP(sport=self.port, dport=self.itf.port))
+ return (
+ Ether(dst=tx_itf.local_mac, src=tx_itf.remote_mac)
+ / IPv6(src=self.endpoint, dst=self.itf.src)
+ / UDP(sport=self.port, dport=self.itf.port)
+ )
def noise_init(self, public_key=None):
self.noise.set_prologue(NOISE_IDENTIFIER_NAME)
@@ -214,12 +225,12 @@
# local/this private
self.noise.set_keypair_from_private_bytes(
- Keypair.STATIC,
- private_key_bytes(self.private_key))
+ Keypair.STATIC, private_key_bytes(self.private_key)
+ )
# remote's public
self.noise.set_keypair_from_public_bytes(
- Keypair.REMOTE_STATIC,
- public_key_bytes(public_key))
+ Keypair.REMOTE_STATIC, public_key_bytes(public_key)
+ )
self.noise.start_handshake()
@@ -227,7 +238,7 @@
self.noise.set_as_initiator()
self.noise_init(public_key)
- p = (Wireguard() / WireguardInitiation())
+ p = Wireguard() / WireguardInitiation()
p[Wireguard].message_type = 1
p[Wireguard].reserved_zero = 0
@@ -236,8 +247,11 @@
# some random data for the message
# lifted from the noise protocol's wireguard example
now = datetime.datetime.now()
- tai = struct.pack('!qi', 4611686018427387914 + int(now.timestamp()),
- int(now.microsecond * 1e3))
+ tai = struct.pack(
+ "!qi",
+ 4611686018427387914 + int(now.timestamp()),
+ int(now.microsecond * 1e3),
+ )
b = self.noise.write_message(payload=tai)
# load noise into init message
@@ -246,14 +260,13 @@
p[WireguardInitiation].encrypted_timestamp = b[80:108]
# generate the mac1 hash
- mac_key = blake2s(b'mac1----' +
- self.itf.public_key_bytes()).digest()
- p[WireguardInitiation].mac1 = blake2s(bytes(p)[0:116],
- digest_size=16,
- key=mac_key).digest()
+ mac_key = blake2s(b"mac1----" + self.itf.public_key_bytes()).digest()
+ p[WireguardInitiation].mac1 = blake2s(
+ bytes(p)[0:116], digest_size=16, key=mac_key
+ ).digest()
p[WireguardInitiation].mac2 = bytearray(16)
- p = (self.mk_tunnel_header(tx_itf, is_ip6) / p)
+ p = self.mk_tunnel_header(tx_itf, is_ip6) / p
return p
@@ -281,11 +294,8 @@
self.sender = init[WireguardInitiation].sender_index
# validate the hash
- mac_key = blake2s(b'mac1----' +
- public_key_bytes(self.public_key)).digest()
- mac1 = blake2s(bytes(init)[0:-32],
- digest_size=16,
- key=mac_key).digest()
+ mac_key = blake2s(b"mac1----" + public_key_bytes(self.public_key)).digest()
+ mac1 = blake2s(bytes(init)[0:-32], digest_size=16, key=mac_key).digest()
self._test.assertEqual(init[WireguardInitiation].mac1, mac1)
# this passes only unencrypted_ephemeral, encrypted_static,
@@ -294,19 +304,17 @@
# build the response
b = self.noise.write_message()
- mac_key = blake2s(b'mac1----' +
- public_key_bytes(self.itf.public_key)).digest()
- resp = (Wireguard(message_type=2, reserved_zero=0) /
- WireguardResponse(sender_index=self.receiver_index,
- receiver_index=self.sender,
- unencrypted_ephemeral=b[0:32],
- encrypted_nothing=b[32:]))
- mac1 = blake2s(bytes(resp)[:-32],
- digest_size=16,
- key=mac_key).digest()
+ mac_key = blake2s(b"mac1----" + public_key_bytes(self.itf.public_key)).digest()
+ resp = Wireguard(message_type=2, reserved_zero=0) / WireguardResponse(
+ sender_index=self.receiver_index,
+ receiver_index=self.sender,
+ unencrypted_ephemeral=b[0:32],
+ encrypted_nothing=b[32:],
+ )
+ mac1 = blake2s(bytes(resp)[:-32], digest_size=16, key=mac_key).digest()
resp[WireguardResponse].mac1 = mac1
- resp = (self.mk_tunnel_header(tx_itf, is_ip6) / resp)
+ resp = self.mk_tunnel_header(tx_itf, is_ip6) / resp
self._test.assertTrue(self.noise.handshake_finished)
return resp
@@ -318,13 +326,14 @@
self._test.assertEqual(resp[Wireguard].message_type, 2)
self._test.assertEqual(resp[Wireguard].reserved_zero, 0)
- self._test.assertEqual(resp[WireguardResponse].receiver_index,
- self.receiver_index)
+ self._test.assertEqual(
+ resp[WireguardResponse].receiver_index, self.receiver_index
+ )
self.sender = resp[Wireguard].sender_index
payload = self.noise.read_message(bytes(resp)[12:60])
- self._test.assertEqual(payload, b'')
+ self._test.assertEqual(payload, b"")
self._test.assertTrue(self.noise.handshake_finished)
def decrypt_transport(self, p, is_ip6=False):
@@ -333,11 +342,11 @@
p = Wireguard(p[Raw])
self._test.assertEqual(p[Wireguard].message_type, 4)
self._test.assertEqual(p[Wireguard].reserved_zero, 0)
- self._test.assertEqual(p[WireguardTransport].receiver_index,
- self.receiver_index)
+ self._test.assertEqual(
+ p[WireguardTransport].receiver_index, self.receiver_index
+ )
- d = self.noise.decrypt(
- p[WireguardTransport].encrypted_encapsulated_packet)
+ d = self.noise.decrypt(p[WireguardTransport].encrypted_encapsulated_packet)
return d
def encrypt_transport(self, p):
@@ -350,20 +359,21 @@
# chech the oringial packet is present
self._test.assertEqual(rx[IP].dst, tx[IP].dst)
- self._test.assertEqual(rx[IP].ttl, tx[IP].ttl-1)
+ self._test.assertEqual(rx[IP].ttl, tx[IP].ttl - 1)
else:
rx = IPv6(self.decrypt_transport(rx))
# chech the oringial packet is present
self._test.assertEqual(rx[IPv6].dst, tx[IPv6].dst)
- self._test.assertEqual(rx[IPv6].ttl, tx[IPv6].ttl-1)
+ self._test.assertEqual(rx[IPv6].ttl, tx[IPv6].ttl - 1)
def want_events(self):
self._test.vapi.want_wireguard_peer_events(
enable_disable=1,
pid=os.getpid(),
peer_index=self.index,
- sw_if_index=self.itf.sw_if_index)
+ sw_if_index=self.itf.sw_if_index,
+ )
def wait_event(self, expect, timeout=5):
rv = self._test.vapi.wait_for_event(timeout, "wireguard_peer_event")
@@ -372,14 +382,14 @@
class TestWg(VppTestCase):
- """ Wireguard Test Case """
+ """Wireguard Test Case"""
error_str = compile(r"Error")
- wg4_output_node_name = '/err/wg4-output-tun/'
- wg4_input_node_name = '/err/wg4-input/'
- wg6_output_node_name = '/err/wg6-output-tun/'
- wg6_input_node_name = '/err/wg6-input/'
+ wg4_output_node_name = "/err/wg4-output-tun/"
+ wg4_input_node_name = "/err/wg4-input/"
+ wg6_output_node_name = "/err/wg6-output-tun/"
+ wg6_input_node_name = "/err/wg6-input/"
kp4_error = wg4_output_node_name + "Keypair error"
mac4_error = wg4_input_node_name + "Invalid MAC handshake"
peer4_error = wg4_input_node_name + "Peer error"
@@ -417,13 +427,11 @@
self.base_peer6_err = self.statistics.get_err_counter(self.peer6_error)
def test_wg_interface(self):
- """ Simple interface creation """
+ """Simple interface creation"""
port = 12312
# Create interface
- wg0 = VppWgInterface(self,
- self.pg1.local_ip4,
- port).add_vpp_config()
+ wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
self.logger.info(self.vapi.cli("sh int"))
@@ -431,27 +439,29 @@
wg0.remove_vpp_config()
def test_handshake_hash(self):
- """ test hashing an init message """
+ """test hashing an init message"""
# a init packet generated by linux given the key below
- h = "0100000098b9032b" \
- "55cc4b39e73c3d24" \
- "a2a1ab884b524a81" \
- "1808bb86640fb70d" \
- "e93154fec1879125" \
- "ab012624a27f0b75" \
- "c0a2582f438ddb5f" \
- "8e768af40b4ab444" \
- "02f9ff473e1b797e" \
- "80d39d93c5480c82" \
- "a3d4510f70396976" \
- "586fb67300a5167b" \
- "ae6ca3ff3dfd00eb" \
- "59be198810f5aa03" \
- "6abc243d2155ee4f" \
- "2336483900aef801" \
- "08752cd700000000" \
- "0000000000000000" \
+ h = (
+ "0100000098b9032b"
+ "55cc4b39e73c3d24"
+ "a2a1ab884b524a81"
+ "1808bb86640fb70d"
+ "e93154fec1879125"
+ "ab012624a27f0b75"
+ "c0a2582f438ddb5f"
+ "8e768af40b4ab444"
+ "02f9ff473e1b797e"
+ "80d39d93c5480c82"
+ "a3d4510f70396976"
+ "586fb67300a5167b"
+ "ae6ca3ff3dfd00eb"
+ "59be198810f5aa03"
+ "6abc243d2155ee4f"
+ "2336483900aef801"
+ "08752cd700000000"
+ "0000000000000000"
"00000000"
+ )
b = bytearray.fromhex(h)
tgt = Wireguard(b)
@@ -463,40 +473,34 @@
# strip the macs and build a new packet
init = b[0:-32]
- mac_key = blake2s(b'mac1----' + public_key_bytes(pub)).digest()
- init += blake2s(init,
- digest_size=16,
- key=mac_key).digest()
- init += b'\x00' * 16
+ mac_key = blake2s(b"mac1----" + public_key_bytes(pub)).digest()
+ init += blake2s(init, digest_size=16, key=mac_key).digest()
+ init += b"\x00" * 16
act = Wireguard(init)
self.assertEqual(tgt, act)
def test_wg_peer_resp(self):
- """ Send handshake response """
+ """Send handshake response"""
port = 12323
# Create interfaces
- wg0 = VppWgInterface(self,
- self.pg1.local_ip4,
- port).add_vpp_config()
+ wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
wg0.admin_up()
wg0.config_ip4()
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- peer_1 = VppWgPeer(self,
- wg0,
- self.pg1.remote_ip4,
- port+1,
- ["10.11.3.0/24"]).add_vpp_config()
+ peer_1 = VppWgPeer(
+ self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.3.0/24"]
+ ).add_vpp_config()
self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
- r1 = VppIpRoute(self, "10.11.3.0", 24,
- [VppRoutePath("10.11.3.1",
- wg0.sw_if_index)]).add_vpp_config()
+ r1 = VppIpRoute(
+ self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)]
+ ).add_vpp_config()
# wait for the peer to send a handshake
rx = self.pg1.get_capture(1, timeout=2)
@@ -513,10 +517,12 @@
self.assertEqual(0, len(b))
# send a packets that are routed into the tunnel
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=self.pg0.remote_ip4, dst="10.11.3.2") /
- UDP(sport=555, dport=556) /
- Raw(b'\x00' * 80))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
+ / UDP(sport=555, dport=556)
+ / Raw(b"\x00" * 80)
+ )
rxs = self.send_and_expect(self.pg0, p * 255, self.pg1)
@@ -524,15 +530,24 @@
# send packets into the tunnel, expect to receive them on
# the other side
- p = [(peer_1.mk_tunnel_header(self.pg1) /
- Wireguard(message_type=4, reserved_zero=0) /
- WireguardTransport(
- receiver_index=peer_1.sender,
- counter=ii,
- encrypted_encapsulated_packet=peer_1.encrypt_transport(
- (IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20) /
- UDP(sport=222, dport=223) /
- Raw())))) for ii in range(255)]
+ p = [
+ (
+ peer_1.mk_tunnel_header(self.pg1)
+ / Wireguard(message_type=4, reserved_zero=0)
+ / WireguardTransport(
+ receiver_index=peer_1.sender,
+ counter=ii,
+ encrypted_encapsulated_packet=peer_1.encrypt_transport(
+ (
+ IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
+ / UDP(sport=222, dport=223)
+ / Raw()
+ )
+ ),
+ )
+ )
+ for ii in range(255)
+ ]
rxs = self.send_and_expect(self.pg1, p, self.pg0)
@@ -545,53 +560,54 @@
wg0.remove_vpp_config()
def test_wg_peer_v4o4(self):
- """ Test v4o4"""
+ """Test v4o4"""
port = 12333
# Create interfaces
- wg0 = VppWgInterface(self,
- self.pg1.local_ip4,
- port).add_vpp_config()
+ wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
wg0.admin_up()
wg0.config_ip4()
- peer_1 = VppWgPeer(self,
- wg0,
- self.pg1.remote_ip4,
- port+1,
- ["10.11.3.0/24"]).add_vpp_config()
+ peer_1 = VppWgPeer(
+ self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.3.0/24"]
+ ).add_vpp_config()
self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
- r1 = VppIpRoute(self, "10.11.3.0", 24,
- [VppRoutePath("10.11.3.1",
- wg0.sw_if_index)]).add_vpp_config()
+ r1 = VppIpRoute(
+ self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)]
+ ).add_vpp_config()
# route a packet into the wg interface
# use the allowed-ip prefix
# this is dropped because the peer is not initiated
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=self.pg0.remote_ip4, dst="10.11.3.2") /
- UDP(sport=555, dport=556) /
- Raw())
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
+ / UDP(sport=555, dport=556)
+ / Raw()
+ )
self.send_and_assert_no_replies(self.pg0, [p])
- self.assertEqual(self.base_kp4_err + 1,
- self.statistics.get_err_counter(self.kp4_error))
+ self.assertEqual(
+ self.base_kp4_err + 1, self.statistics.get_err_counter(self.kp4_error)
+ )
# send a handsake from the peer with an invalid MAC
p = peer_1.mk_handshake(self.pg1)
- p[WireguardInitiation].mac1 = b'foobar'
+ p[WireguardInitiation].mac1 = b"foobar"
self.send_and_assert_no_replies(self.pg1, [p])
- self.assertEqual(self.base_mac4_err + 1,
- self.statistics.get_err_counter(self.mac4_error))
+ self.assertEqual(
+ self.base_mac4_err + 1, self.statistics.get_err_counter(self.mac4_error)
+ )
# send a handsake from the peer but signed by the wrong key.
- p = peer_1.mk_handshake(self.pg1,
- False,
- X25519PrivateKey.generate().public_key())
+ p = peer_1.mk_handshake(
+ self.pg1, False, X25519PrivateKey.generate().public_key()
+ )
self.send_and_assert_no_replies(self.pg1, [p])
- self.assertEqual(self.base_peer4_err + 1,
- self.statistics.get_err_counter(self.peer4_error))
+ self.assertEqual(
+ self.base_peer4_err + 1, self.statistics.get_err_counter(self.peer4_error)
+ )
# send a valid handsake init for which we expect a response
p = peer_1.mk_handshake(self.pg1)
@@ -602,25 +618,31 @@
# route a packet into the wg interface
# this is dropped because the peer is still not initiated
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=self.pg0.remote_ip4, dst="10.11.3.2") /
- UDP(sport=555, dport=556) /
- Raw())
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
+ / UDP(sport=555, dport=556)
+ / Raw()
+ )
self.send_and_assert_no_replies(self.pg0, [p])
- self.assertEqual(self.base_kp4_err + 2,
- self.statistics.get_err_counter(self.kp4_error))
+ self.assertEqual(
+ self.base_kp4_err + 2, self.statistics.get_err_counter(self.kp4_error)
+ )
# send a data packet from the peer through the tunnel
# this completes the handshake
- p = (IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20) /
- UDP(sport=222, dport=223) /
- Raw())
+ p = (
+ IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
+ / UDP(sport=222, dport=223)
+ / Raw()
+ )
d = peer_1.encrypt_transport(p)
- p = (peer_1.mk_tunnel_header(self.pg1) /
- (Wireguard(message_type=4, reserved_zero=0) /
- WireguardTransport(receiver_index=peer_1.sender,
- counter=0,
- encrypted_encapsulated_packet=d)))
+ p = peer_1.mk_tunnel_header(self.pg1) / (
+ Wireguard(message_type=4, reserved_zero=0)
+ / WireguardTransport(
+ receiver_index=peer_1.sender, counter=0, encrypted_encapsulated_packet=d
+ )
+ )
rxs = self.send_and_expect(self.pg1, [p], self.pg0)
for rx in rxs:
@@ -628,10 +650,12 @@
self.assertEqual(rx[IP].ttl, 19)
# send a packets that are routed into the tunnel
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=self.pg0.remote_ip4, dst="10.11.3.2") /
- UDP(sport=555, dport=556) /
- Raw(b'\x00' * 80))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
+ / UDP(sport=555, dport=556)
+ / Raw(b"\x00" * 80)
+ )
rxs = self.send_and_expect(self.pg0, p * 255, self.pg1)
@@ -640,19 +664,28 @@
# chech the oringial packet is present
self.assertEqual(rx[IP].dst, p[IP].dst)
- self.assertEqual(rx[IP].ttl, p[IP].ttl-1)
+ self.assertEqual(rx[IP].ttl, p[IP].ttl - 1)
# send packets into the tunnel, expect to receive them on
# the other side
- p = [(peer_1.mk_tunnel_header(self.pg1) /
- Wireguard(message_type=4, reserved_zero=0) /
- WireguardTransport(
- receiver_index=peer_1.sender,
- counter=ii+1,
- encrypted_encapsulated_packet=peer_1.encrypt_transport(
- (IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20) /
- UDP(sport=222, dport=223) /
- Raw())))) for ii in range(255)]
+ p = [
+ (
+ peer_1.mk_tunnel_header(self.pg1)
+ / Wireguard(message_type=4, reserved_zero=0)
+ / WireguardTransport(
+ receiver_index=peer_1.sender,
+ counter=ii + 1,
+ encrypted_encapsulated_packet=peer_1.encrypt_transport(
+ (
+ IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
+ / UDP(sport=222, dport=223)
+ / Raw()
+ )
+ ),
+ )
+ )
+ for ii in range(255)
+ ]
rxs = self.send_and_expect(self.pg1, p, self.pg0)
@@ -665,56 +698,57 @@
wg0.remove_vpp_config()
def test_wg_peer_v6o6(self):
- """ Test v6o6"""
+ """Test v6o6"""
port = 12343
# Create interfaces
- wg0 = VppWgInterface(self,
- self.pg1.local_ip6,
- port).add_vpp_config()
+ wg0 = VppWgInterface(self, self.pg1.local_ip6, port).add_vpp_config()
wg0.admin_up()
wg0.config_ip6()
- peer_1 = VppWgPeer(self,
- wg0,
- self.pg1.remote_ip6,
- port+1,
- ["1::3:0/112"]).add_vpp_config(True)
+ peer_1 = VppWgPeer(
+ self, wg0, self.pg1.remote_ip6, port + 1, ["1::3:0/112"]
+ ).add_vpp_config(True)
self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
- r1 = VppIpRoute(self, "1::3:0", 112,
- [VppRoutePath("1::3:1",
- wg0.sw_if_index)]).add_vpp_config()
+ r1 = VppIpRoute(
+ self, "1::3:0", 112, [VppRoutePath("1::3:1", wg0.sw_if_index)]
+ ).add_vpp_config()
# route a packet into the wg interface
# use the allowed-ip prefix
# this is dropped because the peer is not initiated
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IPv6(src=self.pg0.remote_ip6, dst="1::3:2") /
- UDP(sport=555, dport=556) /
- Raw())
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
+ / UDP(sport=555, dport=556)
+ / Raw()
+ )
self.send_and_assert_no_replies(self.pg0, [p])
- self.assertEqual(self.base_kp6_err + 1,
- self.statistics.get_err_counter(self.kp6_error))
+ self.assertEqual(
+ self.base_kp6_err + 1, self.statistics.get_err_counter(self.kp6_error)
+ )
# send a handsake from the peer with an invalid MAC
p = peer_1.mk_handshake(self.pg1, True)
- p[WireguardInitiation].mac1 = b'foobar'
+ p[WireguardInitiation].mac1 = b"foobar"
self.send_and_assert_no_replies(self.pg1, [p])
- self.assertEqual(self.base_mac6_err + 1,
- self.statistics.get_err_counter(self.mac6_error))
+ self.assertEqual(
+ self.base_mac6_err + 1, self.statistics.get_err_counter(self.mac6_error)
+ )
# send a handsake from the peer but signed by the wrong key.
- p = peer_1.mk_handshake(self.pg1,
- True,
- X25519PrivateKey.generate().public_key())
+ p = peer_1.mk_handshake(
+ self.pg1, True, X25519PrivateKey.generate().public_key()
+ )
self.send_and_assert_no_replies(self.pg1, [p])
- self.assertEqual(self.base_peer6_err + 1,
- self.statistics.get_err_counter(self.peer6_error))
+ self.assertEqual(
+ self.base_peer6_err + 1, self.statistics.get_err_counter(self.peer6_error)
+ )
# send a valid handsake init for which we expect a response
p = peer_1.mk_handshake(self.pg1, True)
@@ -725,25 +759,31 @@
# route a packet into the wg interface
# this is dropped because the peer is still not initiated
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IPv6(src=self.pg0.remote_ip6, dst="1::3:2") /
- UDP(sport=555, dport=556) /
- Raw())
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
+ / UDP(sport=555, dport=556)
+ / Raw()
+ )
self.send_and_assert_no_replies(self.pg0, [p])
- self.assertEqual(self.base_kp6_err + 2,
- self.statistics.get_err_counter(self.kp6_error))
+ self.assertEqual(
+ self.base_kp6_err + 2, self.statistics.get_err_counter(self.kp6_error)
+ )
# send a data packet from the peer through the tunnel
# this completes the handshake
- p = (IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20) /
- UDP(sport=222, dport=223) /
- Raw())
+ p = (
+ IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20)
+ / UDP(sport=222, dport=223)
+ / Raw()
+ )
d = peer_1.encrypt_transport(p)
- p = (peer_1.mk_tunnel_header(self.pg1, True) /
- (Wireguard(message_type=4, reserved_zero=0) /
- WireguardTransport(receiver_index=peer_1.sender,
- counter=0,
- encrypted_encapsulated_packet=d)))
+ p = peer_1.mk_tunnel_header(self.pg1, True) / (
+ Wireguard(message_type=4, reserved_zero=0)
+ / WireguardTransport(
+ receiver_index=peer_1.sender, counter=0, encrypted_encapsulated_packet=d
+ )
+ )
rxs = self.send_and_expect(self.pg1, [p], self.pg0)
for rx in rxs:
@@ -751,10 +791,12 @@
self.assertEqual(rx[IPv6].hlim, 19)
# send a packets that are routed into the tunnel
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IPv6(src=self.pg0.remote_ip6, dst="1::3:2") /
- UDP(sport=555, dport=556) /
- Raw(b'\x00' * 80))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
+ / UDP(sport=555, dport=556)
+ / Raw(b"\x00" * 80)
+ )
rxs = self.send_and_expect(self.pg0, p * 255, self.pg1)
@@ -763,19 +805,28 @@
# chech the oringial packet is present
self.assertEqual(rx[IPv6].dst, p[IPv6].dst)
- self.assertEqual(rx[IPv6].hlim, p[IPv6].hlim-1)
+ self.assertEqual(rx[IPv6].hlim, p[IPv6].hlim - 1)
# send packets into the tunnel, expect to receive them on
# the other side
- p = [(peer_1.mk_tunnel_header(self.pg1, True) /
- Wireguard(message_type=4, reserved_zero=0) /
- WireguardTransport(
- receiver_index=peer_1.sender,
- counter=ii+1,
- encrypted_encapsulated_packet=peer_1.encrypt_transport(
- (IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20) /
- UDP(sport=222, dport=223) /
- Raw())))) for ii in range(255)]
+ p = [
+ (
+ peer_1.mk_tunnel_header(self.pg1, True)
+ / Wireguard(message_type=4, reserved_zero=0)
+ / WireguardTransport(
+ receiver_index=peer_1.sender,
+ counter=ii + 1,
+ encrypted_encapsulated_packet=peer_1.encrypt_transport(
+ (
+ IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20)
+ / UDP(sport=222, dport=223)
+ / Raw()
+ )
+ ),
+ )
+ )
+ for ii in range(255)
+ ]
rxs = self.send_and_expect(self.pg1, p, self.pg0)
@@ -788,54 +839,55 @@
wg0.remove_vpp_config()
def test_wg_peer_v6o4(self):
- """ Test v6o4"""
+ """Test v6o4"""
port = 12353
# Create interfaces
- wg0 = VppWgInterface(self,
- self.pg1.local_ip4,
- port).add_vpp_config()
+ wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
wg0.admin_up()
wg0.config_ip6()
- peer_1 = VppWgPeer(self,
- wg0,
- self.pg1.remote_ip4,
- port+1,
- ["1::3:0/112"]).add_vpp_config(True)
+ peer_1 = VppWgPeer(
+ self, wg0, self.pg1.remote_ip4, port + 1, ["1::3:0/112"]
+ ).add_vpp_config(True)
self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
- r1 = VppIpRoute(self, "1::3:0", 112,
- [VppRoutePath("1::3:1",
- wg0.sw_if_index)]).add_vpp_config()
+ r1 = VppIpRoute(
+ self, "1::3:0", 112, [VppRoutePath("1::3:1", wg0.sw_if_index)]
+ ).add_vpp_config()
# route a packet into the wg interface
# use the allowed-ip prefix
# this is dropped because the peer is not initiated
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IPv6(src=self.pg0.remote_ip6, dst="1::3:2") /
- UDP(sport=555, dport=556) /
- Raw())
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
+ / UDP(sport=555, dport=556)
+ / Raw()
+ )
self.send_and_assert_no_replies(self.pg0, [p])
- self.assertEqual(self.base_kp6_err + 1,
- self.statistics.get_err_counter(self.kp6_error))
+ self.assertEqual(
+ self.base_kp6_err + 1, self.statistics.get_err_counter(self.kp6_error)
+ )
# send a handsake from the peer with an invalid MAC
p = peer_1.mk_handshake(self.pg1)
- p[WireguardInitiation].mac1 = b'foobar'
+ p[WireguardInitiation].mac1 = b"foobar"
self.send_and_assert_no_replies(self.pg1, [p])
- self.assertEqual(self.base_mac4_err + 1,
- self.statistics.get_err_counter(self.mac4_error))
+ self.assertEqual(
+ self.base_mac4_err + 1, self.statistics.get_err_counter(self.mac4_error)
+ )
# send a handsake from the peer but signed by the wrong key.
- p = peer_1.mk_handshake(self.pg1,
- False,
- X25519PrivateKey.generate().public_key())
+ p = peer_1.mk_handshake(
+ self.pg1, False, X25519PrivateKey.generate().public_key()
+ )
self.send_and_assert_no_replies(self.pg1, [p])
- self.assertEqual(self.base_peer4_err + 1,
- self.statistics.get_err_counter(self.peer4_error))
+ self.assertEqual(
+ self.base_peer4_err + 1, self.statistics.get_err_counter(self.peer4_error)
+ )
# send a valid handsake init for which we expect a response
p = peer_1.mk_handshake(self.pg1)
@@ -846,25 +898,31 @@
# route a packet into the wg interface
# this is dropped because the peer is still not initiated
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IPv6(src=self.pg0.remote_ip6, dst="1::3:2") /
- UDP(sport=555, dport=556) /
- Raw())
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
+ / UDP(sport=555, dport=556)
+ / Raw()
+ )
self.send_and_assert_no_replies(self.pg0, [p])
- self.assertEqual(self.base_kp6_err + 2,
- self.statistics.get_err_counter(self.kp6_error))
+ self.assertEqual(
+ self.base_kp6_err + 2, self.statistics.get_err_counter(self.kp6_error)
+ )
# send a data packet from the peer through the tunnel
# this completes the handshake
- p = (IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20) /
- UDP(sport=222, dport=223) /
- Raw())
+ p = (
+ IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20)
+ / UDP(sport=222, dport=223)
+ / Raw()
+ )
d = peer_1.encrypt_transport(p)
- p = (peer_1.mk_tunnel_header(self.pg1) /
- (Wireguard(message_type=4, reserved_zero=0) /
- WireguardTransport(receiver_index=peer_1.sender,
- counter=0,
- encrypted_encapsulated_packet=d)))
+ p = peer_1.mk_tunnel_header(self.pg1) / (
+ Wireguard(message_type=4, reserved_zero=0)
+ / WireguardTransport(
+ receiver_index=peer_1.sender, counter=0, encrypted_encapsulated_packet=d
+ )
+ )
rxs = self.send_and_expect(self.pg1, [p], self.pg0)
for rx in rxs:
@@ -872,10 +930,12 @@
self.assertEqual(rx[IPv6].hlim, 19)
# send a packets that are routed into the tunnel
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IPv6(src=self.pg0.remote_ip6, dst="1::3:2") /
- UDP(sport=555, dport=556) /
- Raw(b'\x00' * 80))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IPv6(src=self.pg0.remote_ip6, dst="1::3:2")
+ / UDP(sport=555, dport=556)
+ / Raw(b"\x00" * 80)
+ )
rxs = self.send_and_expect(self.pg0, p * 255, self.pg1)
@@ -884,19 +944,28 @@
# chech the oringial packet is present
self.assertEqual(rx[IPv6].dst, p[IPv6].dst)
- self.assertEqual(rx[IPv6].hlim, p[IPv6].hlim-1)
+ self.assertEqual(rx[IPv6].hlim, p[IPv6].hlim - 1)
# send packets into the tunnel, expect to receive them on
# the other side
- p = [(peer_1.mk_tunnel_header(self.pg1) /
- Wireguard(message_type=4, reserved_zero=0) /
- WireguardTransport(
- receiver_index=peer_1.sender,
- counter=ii+1,
- encrypted_encapsulated_packet=peer_1.encrypt_transport(
- (IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20) /
- UDP(sport=222, dport=223) /
- Raw())))) for ii in range(255)]
+ p = [
+ (
+ peer_1.mk_tunnel_header(self.pg1)
+ / Wireguard(message_type=4, reserved_zero=0)
+ / WireguardTransport(
+ receiver_index=peer_1.sender,
+ counter=ii + 1,
+ encrypted_encapsulated_packet=peer_1.encrypt_transport(
+ (
+ IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20)
+ / UDP(sport=222, dport=223)
+ / Raw()
+ )
+ ),
+ )
+ )
+ for ii in range(255)
+ ]
rxs = self.send_and_expect(self.pg1, p, self.pg0)
@@ -909,53 +978,54 @@
wg0.remove_vpp_config()
def test_wg_peer_v4o6(self):
- """ Test v4o6"""
+ """Test v4o6"""
port = 12363
# Create interfaces
- wg0 = VppWgInterface(self,
- self.pg1.local_ip6,
- port).add_vpp_config()
+ wg0 = VppWgInterface(self, self.pg1.local_ip6, port).add_vpp_config()
wg0.admin_up()
wg0.config_ip4()
- peer_1 = VppWgPeer(self,
- wg0,
- self.pg1.remote_ip6,
- port+1,
- ["10.11.3.0/24"]).add_vpp_config()
+ peer_1 = VppWgPeer(
+ self, wg0, self.pg1.remote_ip6, port + 1, ["10.11.3.0/24"]
+ ).add_vpp_config()
self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
- r1 = VppIpRoute(self, "10.11.3.0", 24,
- [VppRoutePath("10.11.3.1",
- wg0.sw_if_index)]).add_vpp_config()
+ r1 = VppIpRoute(
+ self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)]
+ ).add_vpp_config()
# route a packet into the wg interface
# use the allowed-ip prefix
# this is dropped because the peer is not initiated
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=self.pg0.remote_ip4, dst="10.11.3.2") /
- UDP(sport=555, dport=556) /
- Raw())
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
+ / UDP(sport=555, dport=556)
+ / Raw()
+ )
self.send_and_assert_no_replies(self.pg0, [p])
- self.assertEqual(self.base_kp4_err + 1,
- self.statistics.get_err_counter(self.kp4_error))
+ self.assertEqual(
+ self.base_kp4_err + 1, self.statistics.get_err_counter(self.kp4_error)
+ )
# send a handsake from the peer with an invalid MAC
p = peer_1.mk_handshake(self.pg1, True)
- p[WireguardInitiation].mac1 = b'foobar'
+ p[WireguardInitiation].mac1 = b"foobar"
self.send_and_assert_no_replies(self.pg1, [p])
- self.assertEqual(self.base_mac6_err + 1,
- self.statistics.get_err_counter(self.mac6_error))
+ self.assertEqual(
+ self.base_mac6_err + 1, self.statistics.get_err_counter(self.mac6_error)
+ )
# send a handsake from the peer but signed by the wrong key.
- p = peer_1.mk_handshake(self.pg1,
- True,
- X25519PrivateKey.generate().public_key())
+ p = peer_1.mk_handshake(
+ self.pg1, True, X25519PrivateKey.generate().public_key()
+ )
self.send_and_assert_no_replies(self.pg1, [p])
- self.assertEqual(self.base_peer6_err + 1,
- self.statistics.get_err_counter(self.peer6_error))
+ self.assertEqual(
+ self.base_peer6_err + 1, self.statistics.get_err_counter(self.peer6_error)
+ )
# send a valid handsake init for which we expect a response
p = peer_1.mk_handshake(self.pg1, True)
@@ -966,25 +1036,31 @@
# route a packet into the wg interface
# this is dropped because the peer is still not initiated
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=self.pg0.remote_ip4, dst="10.11.3.2") /
- UDP(sport=555, dport=556) /
- Raw())
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
+ / UDP(sport=555, dport=556)
+ / Raw()
+ )
self.send_and_assert_no_replies(self.pg0, [p])
- self.assertEqual(self.base_kp4_err + 2,
- self.statistics.get_err_counter(self.kp4_error))
+ self.assertEqual(
+ self.base_kp4_err + 2, self.statistics.get_err_counter(self.kp4_error)
+ )
# send a data packet from the peer through the tunnel
# this completes the handshake
- p = (IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20) /
- UDP(sport=222, dport=223) /
- Raw())
+ p = (
+ IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
+ / UDP(sport=222, dport=223)
+ / Raw()
+ )
d = peer_1.encrypt_transport(p)
- p = (peer_1.mk_tunnel_header(self.pg1, True) /
- (Wireguard(message_type=4, reserved_zero=0) /
- WireguardTransport(receiver_index=peer_1.sender,
- counter=0,
- encrypted_encapsulated_packet=d)))
+ p = peer_1.mk_tunnel_header(self.pg1, True) / (
+ Wireguard(message_type=4, reserved_zero=0)
+ / WireguardTransport(
+ receiver_index=peer_1.sender, counter=0, encrypted_encapsulated_packet=d
+ )
+ )
rxs = self.send_and_expect(self.pg1, [p], self.pg0)
for rx in rxs:
@@ -992,10 +1068,12 @@
self.assertEqual(rx[IP].ttl, 19)
# send a packets that are routed into the tunnel
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=self.pg0.remote_ip4, dst="10.11.3.2") /
- UDP(sport=555, dport=556) /
- Raw(b'\x00' * 80))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
+ / UDP(sport=555, dport=556)
+ / Raw(b"\x00" * 80)
+ )
rxs = self.send_and_expect(self.pg0, p * 255, self.pg1)
@@ -1004,19 +1082,28 @@
# chech the oringial packet is present
self.assertEqual(rx[IP].dst, p[IP].dst)
- self.assertEqual(rx[IP].ttl, p[IP].ttl-1)
+ self.assertEqual(rx[IP].ttl, p[IP].ttl - 1)
# send packets into the tunnel, expect to receive them on
# the other side
- p = [(peer_1.mk_tunnel_header(self.pg1, True) /
- Wireguard(message_type=4, reserved_zero=0) /
- WireguardTransport(
- receiver_index=peer_1.sender,
- counter=ii+1,
- encrypted_encapsulated_packet=peer_1.encrypt_transport(
- (IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20) /
- UDP(sport=222, dport=223) /
- Raw())))) for ii in range(255)]
+ p = [
+ (
+ peer_1.mk_tunnel_header(self.pg1, True)
+ / Wireguard(message_type=4, reserved_zero=0)
+ / WireguardTransport(
+ receiver_index=peer_1.sender,
+ counter=ii + 1,
+ encrypted_encapsulated_packet=peer_1.encrypt_transport(
+ (
+ IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
+ / UDP(sport=222, dport=223)
+ / Raw()
+ )
+ ),
+ )
+ )
+ for ii in range(255)
+ ]
rxs = self.send_and_expect(self.pg1, p, self.pg0)
@@ -1029,16 +1116,12 @@
wg0.remove_vpp_config()
def test_wg_multi_peer(self):
- """ multiple peer setup """
+ """multiple peer setup"""
port = 12373
# Create interfaces
- wg0 = VppWgInterface(self,
- self.pg1.local_ip4,
- port).add_vpp_config()
- wg1 = VppWgInterface(self,
- self.pg2.local_ip4,
- port+1).add_vpp_config()
+ wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
+ wg1 = VppWgInterface(self, self.pg2.local_ip4, port + 1).add_vpp_config()
wg0.admin_up()
wg1.admin_up()
@@ -1060,25 +1143,43 @@
routes_1 = []
routes_2 = []
for i in range(NUM_PEERS):
- peers_1.append(VppWgPeer(self,
- wg0,
- self.pg1.remote_hosts[i].ip4,
- port+1+i,
- ["10.0.%d.4/32" % i]).add_vpp_config())
- routes_1.append(VppIpRoute(self, "10.0.%d.4" % i, 32,
- [VppRoutePath(self.pg1.remote_hosts[i].ip4,
- wg0.sw_if_index)]).add_vpp_config())
+ peers_1.append(
+ VppWgPeer(
+ self,
+ wg0,
+ self.pg1.remote_hosts[i].ip4,
+ port + 1 + i,
+ ["10.0.%d.4/32" % i],
+ ).add_vpp_config()
+ )
+ routes_1.append(
+ VppIpRoute(
+ self,
+ "10.0.%d.4" % i,
+ 32,
+ [VppRoutePath(self.pg1.remote_hosts[i].ip4, wg0.sw_if_index)],
+ ).add_vpp_config()
+ )
- peers_2.append(VppWgPeer(self,
- wg1,
- self.pg2.remote_hosts[i].ip4,
- port+100+i,
- ["10.100.%d.4/32" % i]).add_vpp_config())
- routes_2.append(VppIpRoute(self, "10.100.%d.4" % i, 32,
- [VppRoutePath(self.pg2.remote_hosts[i].ip4,
- wg1.sw_if_index)]).add_vpp_config())
+ peers_2.append(
+ VppWgPeer(
+ self,
+ wg1,
+ self.pg2.remote_hosts[i].ip4,
+ port + 100 + i,
+ ["10.100.%d.4/32" % i],
+ ).add_vpp_config()
+ )
+ routes_2.append(
+ VppIpRoute(
+ self,
+ "10.100.%d.4" % i,
+ 32,
+ [VppRoutePath(self.pg2.remote_hosts[i].ip4, wg1.sw_if_index)],
+ ).add_vpp_config()
+ )
- self.assertEqual(len(self.vapi.wireguard_peers_dump()), NUM_PEERS*2)
+ self.assertEqual(len(self.vapi.wireguard_peers_dump()), NUM_PEERS * 2)
self.logger.info(self.vapi.cli("show wireguard peer"))
self.logger.info(self.vapi.cli("show wireguard interface"))
@@ -1104,7 +1205,7 @@
wg1.remove_vpp_config()
def test_wg_multi_interface(self):
- """ Multi-tunnel on the same port """
+ """Multi-tunnel on the same port"""
port = 12500
# Create many wireguard interfaces
@@ -1120,21 +1221,28 @@
wg_ifs = []
for i in range(NUM_IFS):
# Use the same port for each interface
- wg0 = VppWgInterface(self,
- self.pg1.local_ip4,
- port).add_vpp_config()
+ wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
wg0.admin_up()
wg0.config_ip4()
wg_ifs.append(wg0)
- peers.append(VppWgPeer(self,
- wg0,
- self.pg1.remote_hosts[i].ip4,
- port+1+i,
- ["10.0.%d.0/24" % i]).add_vpp_config())
+ peers.append(
+ VppWgPeer(
+ self,
+ wg0,
+ self.pg1.remote_hosts[i].ip4,
+ port + 1 + i,
+ ["10.0.%d.0/24" % i],
+ ).add_vpp_config()
+ )
- routes.append(VppIpRoute(self, "10.0.%d.0" % i, 24,
- [VppRoutePath("10.0.%d.4" % i,
- wg0.sw_if_index)]).add_vpp_config())
+ routes.append(
+ VppIpRoute(
+ self,
+ "10.0.%d.0" % i,
+ 24,
+ [VppRoutePath("10.0.%d.4" % i, wg0.sw_if_index)],
+ ).add_vpp_config()
+ )
self.assertEqual(len(self.vapi.wireguard_peers_dump()), NUM_IFS)
@@ -1146,16 +1254,20 @@
# send a data packet from the peer through the tunnel
# this completes the handshake
- p = (IP(src="10.0.%d.4" % i,
- dst=self.pg0.remote_hosts[i].ip4, ttl=20) /
- UDP(sport=222, dport=223) /
- Raw())
+ p = (
+ IP(src="10.0.%d.4" % i, dst=self.pg0.remote_hosts[i].ip4, ttl=20)
+ / UDP(sport=222, dport=223)
+ / Raw()
+ )
d = peers[i].encrypt_transport(p)
- p = (peers[i].mk_tunnel_header(self.pg1) /
- (Wireguard(message_type=4, reserved_zero=0) /
- WireguardTransport(receiver_index=peers[i].sender,
- counter=0,
- encrypted_encapsulated_packet=d)))
+ p = peers[i].mk_tunnel_header(self.pg1) / (
+ Wireguard(message_type=4, reserved_zero=0)
+ / WireguardTransport(
+ receiver_index=peers[i].sender,
+ counter=0,
+ encrypted_encapsulated_packet=d,
+ )
+ )
rxs = self.send_and_expect(self.pg1, [p], self.pg0)
for rx in rxs:
self.assertEqual(rx[IP].dst, self.pg0.remote_hosts[i].ip4)
@@ -1163,10 +1275,12 @@
# send a packets that are routed into the tunnel
for i in range(NUM_IFS):
- p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=self.pg0.remote_hosts[i].ip4, dst="10.0.%d.4" % i) /
- UDP(sport=555, dport=556) /
- Raw(b'\x00' * 80))
+ p = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=self.pg0.remote_hosts[i].ip4, dst="10.0.%d.4" % i)
+ / UDP(sport=555, dport=556)
+ / Raw(b"\x00" * 80)
+ )
rxs = self.send_and_expect(self.pg0, p * 64, self.pg1)
@@ -1175,20 +1289,32 @@
# check the oringial packet is present
self.assertEqual(rx[IP].dst, p[IP].dst)
- self.assertEqual(rx[IP].ttl, p[IP].ttl-1)
+ self.assertEqual(rx[IP].ttl, p[IP].ttl - 1)
# send packets into the tunnel
for i in range(NUM_IFS):
- p = [(peers[i].mk_tunnel_header(self.pg1) /
- Wireguard(message_type=4, reserved_zero=0) /
- WireguardTransport(
- receiver_index=peers[i].sender,
- counter=ii+1,
- encrypted_encapsulated_packet=peers[i].encrypt_transport(
- (IP(src="10.0.%d.4" % i,
- dst=self.pg0.remote_hosts[i].ip4, ttl=20) /
- UDP(sport=222, dport=223) /
- Raw())))) for ii in range(64)]
+ p = [
+ (
+ peers[i].mk_tunnel_header(self.pg1)
+ / Wireguard(message_type=4, reserved_zero=0)
+ / WireguardTransport(
+ receiver_index=peers[i].sender,
+ counter=ii + 1,
+ encrypted_encapsulated_packet=peers[i].encrypt_transport(
+ (
+ IP(
+ src="10.0.%d.4" % i,
+ dst=self.pg0.remote_hosts[i].ip4,
+ ttl=20,
+ )
+ / UDP(sport=222, dport=223)
+ / Raw()
+ )
+ ),
+ )
+ )
+ for ii in range(64)
+ ]
rxs = self.send_and_expect(self.pg1, p, self.pg0)
@@ -1204,22 +1330,16 @@
i.remove_vpp_config()
def test_wg_event(self):
- """ Test events """
+ """Test events"""
port = 12600
- ESTABLISHED_FLAG = VppEnum.\
- vl_api_wireguard_peer_flags_t.\
- WIREGUARD_PEER_ESTABLISHED
- DEAD_FLAG = VppEnum.\
- vl_api_wireguard_peer_flags_t.\
- WIREGUARD_PEER_STATUS_DEAD
+ ESTABLISHED_FLAG = (
+ VppEnum.vl_api_wireguard_peer_flags_t.WIREGUARD_PEER_ESTABLISHED
+ )
+ DEAD_FLAG = VppEnum.vl_api_wireguard_peer_flags_t.WIREGUARD_PEER_STATUS_DEAD
# Create interfaces
- wg0 = VppWgInterface(self,
- self.pg1.local_ip4,
- port).add_vpp_config()
- wg1 = VppWgInterface(self,
- self.pg2.local_ip4,
- port+1).add_vpp_config()
+ wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
+ wg1 = VppWgInterface(self, self.pg2.local_ip4, port + 1).add_vpp_config()
wg0.admin_up()
wg1.admin_up()
@@ -1241,25 +1361,43 @@
routes_0 = []
routes_1 = []
for i in range(NUM_PEERS):
- peers_0.append(VppWgPeer(self,
- wg0,
- self.pg1.remote_hosts[i].ip4,
- port+1+i,
- ["10.0.%d.4/32" % i]).add_vpp_config())
- routes_0.append(VppIpRoute(self, "10.0.%d.4" % i, 32,
- [VppRoutePath(self.pg1.remote_hosts[i].ip4,
- wg0.sw_if_index)]).add_vpp_config())
+ peers_0.append(
+ VppWgPeer(
+ self,
+ wg0,
+ self.pg1.remote_hosts[i].ip4,
+ port + 1 + i,
+ ["10.0.%d.4/32" % i],
+ ).add_vpp_config()
+ )
+ routes_0.append(
+ VppIpRoute(
+ self,
+ "10.0.%d.4" % i,
+ 32,
+ [VppRoutePath(self.pg1.remote_hosts[i].ip4, wg0.sw_if_index)],
+ ).add_vpp_config()
+ )
- peers_1.append(VppWgPeer(self,
- wg1,
- self.pg2.remote_hosts[i].ip4,
- port+100+i,
- ["10.100.%d.4/32" % i]).add_vpp_config())
- routes_1.append(VppIpRoute(self, "10.100.%d.4" % i, 32,
- [VppRoutePath(self.pg2.remote_hosts[i].ip4,
- wg1.sw_if_index)]).add_vpp_config())
+ peers_1.append(
+ VppWgPeer(
+ self,
+ wg1,
+ self.pg2.remote_hosts[i].ip4,
+ port + 100 + i,
+ ["10.100.%d.4/32" % i],
+ ).add_vpp_config()
+ )
+ routes_1.append(
+ VppIpRoute(
+ self,
+ "10.100.%d.4" % i,
+ 32,
+ [VppRoutePath(self.pg2.remote_hosts[i].ip4, wg1.sw_if_index)],
+ ).add_vpp_config()
+ )
- self.assertEqual(len(self.vapi.wireguard_peers_dump()), NUM_PEERS*2)
+ self.assertEqual(len(self.vapi.wireguard_peers_dump()), NUM_PEERS * 2)
# Want events from the first perr of wg0
# and from all wg1 peers
@@ -1271,16 +1409,14 @@
p = peers_0[i].mk_handshake(self.pg1)
rx = self.send_and_expect(self.pg1, [p], self.pg1)
peers_0[i].consume_response(rx[0])
- if (i == 0):
+ if i == 0:
peers_0[0].wait_event(ESTABLISHED_FLAG)
p = peers_1[i].mk_handshake(self.pg2)
rx = self.send_and_expect(self.pg2, [p], self.pg2)
peers_1[i].consume_response(rx[0])
- wg1.wait_events(
- ESTABLISHED_FLAG,
- [peers_1[0].index, peers_1[1].index])
+ wg1.wait_events(ESTABLISHED_FLAG, [peers_1[0].index, peers_1[1].index])
# remove routes
for r in routes_0:
@@ -1292,7 +1428,7 @@
for i in range(NUM_PEERS):
self.assertTrue(peers_0[i].query_vpp_config())
peers_0[i].remove_vpp_config()
- if (i == 0):
+ if i == 0:
peers_0[i].wait_event(0)
peers_0[i].wait_event(DEAD_FLAG)
for p in peers_1:
@@ -1306,32 +1442,28 @@
class WireguardHandoffTests(TestWg):
- """ Wireguard Tests in multi worker setup """
+ """Wireguard Tests in multi worker setup"""
+
vpp_worker_count = 2
def test_wg_peer_init(self):
- """ Handoff """
+ """Handoff"""
port = 12383
# Create interfaces
- wg0 = VppWgInterface(self,
- self.pg1.local_ip4,
- port).add_vpp_config()
+ wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config()
wg0.admin_up()
wg0.config_ip4()
- peer_1 = VppWgPeer(self,
- wg0,
- self.pg1.remote_ip4,
- port+1,
- ["10.11.2.0/24",
- "10.11.3.0/24"]).add_vpp_config()
+ peer_1 = VppWgPeer(
+ self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.2.0/24", "10.11.3.0/24"]
+ ).add_vpp_config()
self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
- r1 = VppIpRoute(self, "10.11.3.0", 24,
- [VppRoutePath("10.11.3.1",
- wg0.sw_if_index)]).add_vpp_config()
+ r1 = VppIpRoute(
+ self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)]
+ ).add_vpp_config()
# send a valid handsake init for which we expect a response
p = peer_1.mk_handshake(self.pg1)
@@ -1342,17 +1474,19 @@
# send a data packet from the peer through the tunnel
# this completes the handshake and pins the peer to worker 0
- p = (IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20) /
- UDP(sport=222, dport=223) /
- Raw())
+ p = (
+ IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
+ / UDP(sport=222, dport=223)
+ / Raw()
+ )
d = peer_1.encrypt_transport(p)
- p = (peer_1.mk_tunnel_header(self.pg1) /
- (Wireguard(message_type=4, reserved_zero=0) /
- WireguardTransport(receiver_index=peer_1.sender,
- counter=0,
- encrypted_encapsulated_packet=d)))
- rxs = self.send_and_expect(self.pg1, [p], self.pg0,
- worker=0)
+ p = peer_1.mk_tunnel_header(self.pg1) / (
+ Wireguard(message_type=4, reserved_zero=0)
+ / WireguardTransport(
+ receiver_index=peer_1.sender, counter=0, encrypted_encapsulated_packet=d
+ )
+ )
+ rxs = self.send_and_expect(self.pg1, [p], self.pg0, worker=0)
for rx in rxs:
self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
@@ -1360,23 +1494,34 @@
# send a packets that are routed into the tunnel
# and pins the peer tp worker 1
- pe = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IP(src=self.pg0.remote_ip4, dst="10.11.3.2") /
- UDP(sport=555, dport=556) /
- Raw(b'\x00' * 80))
+ pe = (
+ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
+ / IP(src=self.pg0.remote_ip4, dst="10.11.3.2")
+ / UDP(sport=555, dport=556)
+ / Raw(b"\x00" * 80)
+ )
rxs = self.send_and_expect(self.pg0, pe * 255, self.pg1, worker=1)
peer_1.validate_encapped(rxs, pe)
# send packets into the tunnel, from the other worker
- p = [(peer_1.mk_tunnel_header(self.pg1) /
- Wireguard(message_type=4, reserved_zero=0) /
- WireguardTransport(
+ p = [
+ (
+ peer_1.mk_tunnel_header(self.pg1)
+ / Wireguard(message_type=4, reserved_zero=0)
+ / WireguardTransport(
receiver_index=peer_1.sender,
- counter=ii+1,
+ counter=ii + 1,
encrypted_encapsulated_packet=peer_1.encrypt_transport(
- (IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20) /
- UDP(sport=222, dport=223) /
- Raw())))) for ii in range(255)]
+ (
+ IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20)
+ / UDP(sport=222, dport=223)
+ / Raw()
+ )
+ ),
+ )
+ )
+ for ii in range(255)
+ ]
rxs = self.send_and_expect(self.pg1, p, self.pg0, worker=1)
@@ -1396,4 +1541,4 @@
@unittest.skip("test disabled")
def test_wg_multi_interface(self):
- """ Multi-tunnel on the same port """
+ """Multi-tunnel on the same port"""