IPSEC: Mutli-tunnel tests
Change-Id: I46f1db6579835c6613fdbb2b726246cc62b135fe
Signed-off-by: Neale Ranns <nranns@cisco.com>
diff --git a/test/template_ipsec.py b/test/template_ipsec.py
index 483699c..1b9a379 100644
--- a/test/template_ipsec.py
+++ b/test/template_ipsec.py
@@ -79,6 +79,45 @@
self.nat_header = None
+def config_tun_params(p, encryption_type, tun_if):
+ ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
+ p.scapy_tun_sa = SecurityAssociation(
+ encryption_type, spi=p.vpp_tun_spi,
+ crypt_algo=p.crypt_algo, crypt_key=p.crypt_key,
+ auth_algo=p.auth_algo, auth_key=p.auth_key,
+ tunnel_header=ip_class_by_addr_type[p.addr_type](
+ src=tun_if.remote_addr[p.addr_type],
+ dst=tun_if.local_addr[p.addr_type]),
+ nat_t_header=p.nat_header)
+ p.vpp_tun_sa = SecurityAssociation(
+ encryption_type, spi=p.scapy_tun_spi,
+ crypt_algo=p.crypt_algo, crypt_key=p.crypt_key,
+ auth_algo=p.auth_algo, auth_key=p.auth_key,
+ tunnel_header=ip_class_by_addr_type[p.addr_type](
+ dst=tun_if.remote_addr[p.addr_type],
+ src=tun_if.local_addr[p.addr_type]),
+ nat_t_header=p.nat_header)
+
+
+def config_tra_params(p, encryption_type):
+ p.scapy_tra_sa = SecurityAssociation(
+ encryption_type,
+ spi=p.vpp_tra_spi,
+ crypt_algo=p.crypt_algo,
+ crypt_key=p.crypt_key,
+ auth_algo=p.auth_algo,
+ auth_key=p.auth_key,
+ nat_t_header=p.nat_header)
+ p.vpp_tra_sa = SecurityAssociation(
+ encryption_type,
+ spi=p.scapy_tra_spi,
+ crypt_algo=p.crypt_algo,
+ crypt_key=p.crypt_key,
+ auth_algo=p.auth_algo,
+ auth_key=p.auth_key,
+ nat_t_header=p.nat_header)
+
+
class TemplateIpsec(VppTestCase):
"""
TRANSPORT MODE:
@@ -164,26 +203,6 @@
ICMPv6EchoRequest(id=0, seq=1, data=self.payload)
for i in range(count)]
- def configure_sa_tun(self, params):
- ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
- scapy_tun_sa = SecurityAssociation(
- self.encryption_type, spi=params.vpp_tun_spi,
- crypt_algo=params.crypt_algo, crypt_key=params.crypt_key,
- auth_algo=params.auth_algo, auth_key=params.auth_key,
- tunnel_header=ip_class_by_addr_type[params.addr_type](
- src=self.tun_if.remote_addr[params.addr_type],
- dst=self.tun_if.local_addr[params.addr_type]),
- nat_t_header=params.nat_header)
- vpp_tun_sa = SecurityAssociation(
- self.encryption_type, spi=params.scapy_tun_spi,
- crypt_algo=params.crypt_algo, crypt_key=params.crypt_key,
- auth_algo=params.auth_algo, auth_key=params.auth_key,
- tunnel_header=ip_class_by_addr_type[params.addr_type](
- dst=self.tun_if.remote_addr[params.addr_type],
- src=self.tun_if.local_addr[params.addr_type]),
- nat_t_header=params.nat_header)
- return vpp_tun_sa, scapy_tun_sa
-
def configure_sa_tra(self, params):
params.scapy_tra_sa = SecurityAssociation(
self.encryption_type,
@@ -208,15 +227,15 @@
""" verify checksum correctness for vpp generated packets """
self.vapi.cli("test http server")
p = self.params[socket.AF_INET]
- vpp_tun_sa, scapy_tun_sa = self.configure_sa_tun(p)
+ config_tun_params(p, self.encryption_type, self.tun_if)
send = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
- scapy_tun_sa.encrypt(IP(src=p.remote_tun_if_host,
- dst=self.tun_if.local_ip4) /
- TCP(flags='S', dport=80)))
+ p.scapy_tun_sa.encrypt(IP(src=p.remote_tun_if_host,
+ dst=self.tun_if.local_ip4) /
+ TCP(flags='S', dport=80)))
self.logger.debug(ppp("Sending packet:", send))
recv = self.send_and_expect(self.tun_if, [send], self.tun_if)
recv = recv[0]
- decrypted = vpp_tun_sa.decrypt(recv[IP])
+ decrypted = p.vpp_tun_sa.decrypt(recv[IP])
self.assert_packet_checksums_valid(decrypted)
@@ -374,14 +393,13 @@
pass
-class IpsecTun4Tests(object):
- def test_tun_basic44(self, count=1):
- """ ipsec 4o4 tunnel basic test """
+class IpsecTun4(object):
+
+ def verify_tun_44(self, p, count=1):
self.vapi.cli("clear errors")
try:
- p = self.params[socket.AF_INET]
- vpp_tun_sa, scapy_tun_sa = self.configure_sa_tun(p)
- send_pkts = self.gen_encrypt_pkts(scapy_tun_sa, self.tun_if,
+ config_tun_params(p, self.encryption_type, self.tun_if)
+ send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
src=p.remote_tun_if_host,
dst=self.pg1.remote_ip4,
count=count)
@@ -395,7 +413,7 @@
recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
for recv_pkt in recv_pkts:
try:
- decrypt_pkt = vpp_tun_sa.decrypt(recv_pkt[IP])
+ decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IP])
if not decrypt_pkt.haslayer(IP):
decrypt_pkt = IP(decrypt_pkt[Raw].load)
self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
@@ -432,19 +450,26 @@
self.assert_packet_counter_equal(self.tun4_encrypt_node_name, count)
self.assert_packet_counter_equal(self.tun4_decrypt_node_name, count)
+
+class IpsecTun4Tests(IpsecTun4):
+
+ def test_tun_basic44(self):
+ """ ipsec 4o4 tunnel basic test """
+ self.verify_tun_44(self.params[socket.AF_INET], count=1)
+
def test_tun_burst44(self):
""" ipsec 4o4 tunnel burst test """
- self.test_tun_basic44(count=257)
+ self.verify_tun_44(self.params[socket.AF_INET], count=257)
-class IpsecTun6Tests(object):
- def test_tun_basic66(self, count=1):
+class IpsecTun6(object):
+
+ def verify_tun_66(self, p, count=1):
""" ipsec 6o6 tunnel basic test """
self.vapi.cli("clear errors")
try:
- p = self.params[socket.AF_INET6]
- vpp_tun_sa, scapy_tun_sa = self.configure_sa_tun(p)
- send_pkts = self.gen_encrypt_pkts6(scapy_tun_sa, self.tun_if,
+ config_tun_params(p, self.encryption_type, self.tun_if)
+ send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
src=p.remote_tun_if_host,
dst=self.pg1.remote_ip6,
count=count)
@@ -459,7 +484,7 @@
recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
for recv_pkt in recv_pkts:
try:
- decrypt_pkt = vpp_tun_sa.decrypt(recv_pkt[IPv6])
+ decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
if not decrypt_pkt.haslayer(IPv6):
decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
@@ -489,9 +514,16 @@
self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
self.assert_packet_counter_equal(self.tun6_decrypt_node_name, count)
+
+class IpsecTun6Tests(IpsecTun6):
+
+ def test_tun_basic66(self):
+ """ ipsec 6o6 tunnel basic test """
+ self.verify_tun_66(self.params[socket.AF_INET6], count=1)
+
def test_tun_burst66(self):
""" ipsec 6o6 tunnel burst test """
- self.test_tun_basic66(count=257)
+ self.verify_tun_66(self.params[socket.AF_INET6], count=257)
class IpsecTun46Tests(IpsecTun4Tests, IpsecTun6Tests):