linux-cp: Add tests for tun devices
Type: test
Signed-off-by: Neale Ranns <neale@graphiant.com>
Change-Id: Iec69d8624b15766ed65e7d09777819d2242dee17
diff --git a/test/test_linux_cp.py b/test/test_linux_cp.py
index df38681..4cbcf6c 100644
--- a/test/test_linux_cp.py
+++ b/test/test_linux_cp.py
@@ -6,8 +6,16 @@
from scapy.layers.inet6 import IPv6, Raw
from scapy.layers.l2 import Ether, ARP, Dot1Q
+from util import reassemble4
from vpp_object import VppObject
from framework import VppTestCase, VppTestRunner
+from vpp_ipip_tun_interface import VppIpIpTunInterface
+from template_ipsec import TemplateIpsec, IpsecTun4Tests, \
+ IpsecTun4, mk_scapy_crypt_key, config_tun_params
+from template_ipsec import TemplateIpsec, IpsecTun4Tests, \
+ IpsecTun4, mk_scapy_crypt_key, config_tun_params
+from test_ipsec_tun_if_esp import TemplateIpsecItf4
+from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect, VppIpsecInterface
class VppLcpPair(VppObject):
@@ -62,10 +70,13 @@
def setUp(self):
super(TestLinuxCP, self).setUp()
- # create 4 pg interfaces so there are a few addresses
- # in the FIB
+ # create 4 pg interfaces so we can create two pairs
self.create_pg_interfaces(range(4))
+ # create on ip4 and one ip6 pg tun
+ self.pg_interfaces += self.create_pg_ip4_interfaces(range(4, 5))
+ self.pg_interfaces += self.create_pg_ip6_interfaces(range(5, 6))
+
for i in self.pg_interfaces:
i.admin_up()
@@ -169,6 +180,237 @@
for phy in phys:
phy.unconfig_ip4()
+ def test_linux_cp_tun(self):
+ """ Linux CP TUN """
+
+ #
+ # Setup
+ #
+ N_PKTS = 31
+
+ # create two pairs, wihch a bunch of hots on the phys
+ hosts = [self.pg4, self.pg5]
+ phy = self.pg2
+
+ phy.config_ip4()
+ phy.config_ip6()
+ phy.resolve_arp()
+ phy.resolve_ndp()
+
+ tun4 = VppIpIpTunInterface(
+ self,
+ phy,
+ phy.local_ip4,
+ phy.remote_ip4).add_vpp_config()
+ tun6 = VppIpIpTunInterface(
+ self,
+ phy,
+ phy.local_ip6,
+ phy.remote_ip6).add_vpp_config()
+ tuns = [tun4, tun6]
+
+ tun4.admin_up()
+ tun4.config_ip4()
+ tun6.admin_up()
+ tun6.config_ip6()
+
+ pair1 = VppLcpPair(self, tuns[0], hosts[0]).add_vpp_config()
+ pair2 = VppLcpPair(self, tuns[1], hosts[1]).add_vpp_config()
+
+ self.logger.info(self.vapi.cli("sh lcp adj verbose"))
+ self.logger.info(self.vapi.cli("sh lcp"))
+ self.logger.info(self.vapi.cli("sh ip punt redirect"))
+
+ #
+ # Traffic Tests
+ #
+
+ # host to phy for v4
+ p = (IP(src=tun4.local_ip4, dst="2.2.2.2") /
+ UDP(sport=1234, dport=1234) /
+ Raw())
+
+ rxs = self.send_and_expect(self.pg4, p * N_PKTS, phy)
+
+ # verify inner packet is unchanged and has the tunnel encap
+ for rx in rxs:
+ self.assertEqual(rx[Ether].dst, phy.remote_mac)
+ self.assertEqual(rx[IP].dst, phy.remote_ip4)
+ self.assertEqual(rx[IP].src, phy.local_ip4)
+ inner = IP(rx[IP].payload)
+ self.assertEqual(inner.src, tun4.local_ip4)
+ self.assertEqual(inner.dst, "2.2.2.2")
+
+ # host to phy for v6
+ p = (IPv6(src=tun6.local_ip6, dst="2::2") /
+ UDP(sport=1234, dport=1234) /
+ Raw())
+
+ rxs = self.send_and_expect(self.pg5, p * N_PKTS, phy)
+
+ # verify inner packet is unchanged and has the tunnel encap
+ for rx in rxs:
+ self.assertEqual(rx[IPv6].dst, phy.remote_ip6)
+ self.assertEqual(rx[IPv6].src, phy.local_ip6)
+ inner = IPv6(rx[IPv6].payload)
+ self.assertEqual(inner.src, tun6.local_ip6)
+ self.assertEqual(inner.dst, "2::2")
+
+ # phy to host v4
+ p = (Ether(dst=phy.local_mac, src=phy.remote_mac) /
+ IP(dst=phy.local_ip4, src=phy.remote_ip4) /
+ IP(dst=tun4.local_ip4, src=tun4.remote_ip4) /
+ UDP(sport=1234, dport=1234) /
+ Raw())
+
+ rxs = self.send_and_expect(phy, p * N_PKTS, self.pg4)
+ for rx in rxs:
+ rx = IP(rx)
+ self.assertEqual(rx[IP].dst, tun4.local_ip4)
+ self.assertEqual(rx[IP].src, tun4.remote_ip4)
+
+ # phy to host v6
+ p = (Ether(dst=phy.local_mac, src=phy.remote_mac) /
+ IPv6(dst=phy.local_ip6, src=phy.remote_ip6) /
+ IPv6(dst=tun6.local_ip6, src=tun6.remote_ip6) /
+ UDP(sport=1234, dport=1234) /
+ Raw())
+
+ rxs = self.send_and_expect(phy, p * N_PKTS, self.pg5)
+ for rx in rxs:
+ rx = IPv6(rx)
+ self.assertEqual(rx[IPv6].dst, tun6.local_ip6)
+ self.assertEqual(rx[IPv6].src, tun6.remote_ip6)
+
+ # cleanup
+ phy.unconfig_ip4()
+ phy.unconfig_ip6()
+
+ tun4.unconfig_ip4()
+ tun6.unconfig_ip6()
+
+
+class TestLinuxCPIpsec(TemplateIpsec,
+ TemplateIpsecItf4,
+ IpsecTun4):
+ """ IPsec Interface IPv4 """
+
+ extra_vpp_plugin_config = ["plugin",
+ "linux_cp_plugin.so",
+ "{", "enable", "}",
+ "plugin",
+ "linux_cp_unittest_plugin.so",
+ "{", "enable", "}"]
+
+ def setUp(self):
+ super(TestLinuxCPIpsec, self).setUp()
+
+ self.tun_if = self.pg0
+ self.pg_interfaces += self.create_pg_ip4_interfaces(range(3, 4))
+ self.pg_interfaces += self.create_pg_ip6_interfaces(range(4, 5))
+
+ def tearDown(self):
+ super(TestLinuxCPIpsec, self).tearDown()
+
+ def verify_encrypted(self, p, sa, rxs):
+ decrypt_pkts = []
+ for rx in rxs:
+ if p.nat_header:
+ self.assertEqual(rx[UDP].dport, 4500)
+ self.assert_packet_checksums_valid(rx)
+ self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
+ try:
+ rx_ip = rx[IP]
+ decrypt_pkt = p.vpp_tun_sa.decrypt(rx_ip)
+ if not decrypt_pkt.haslayer(IP):
+ decrypt_pkt = IP(decrypt_pkt[Raw].load)
+ if rx_ip.proto == socket.IPPROTO_ESP:
+ self.verify_esp_padding(sa, rx_ip[ESP].data, decrypt_pkt)
+ decrypt_pkts.append(decrypt_pkt)
+ self.assert_equal(decrypt_pkt.src, p.tun_if.local_ip4)
+ self.assert_equal(decrypt_pkt.dst, p.tun_if.remote_ip4)
+ except:
+ self.logger.debug(ppp("Unexpected packet:", rx))
+ try:
+ self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
+ except:
+ pass
+ raise
+ pkts = reassemble4(decrypt_pkts)
+ for pkt in pkts:
+ self.assert_packet_checksums_valid(pkt)
+
+ def verify_decrypted(self, p, rxs):
+ for rx in rxs:
+ rx = IP(rx)
+ self.assert_equal(rx[IP].src, p.tun_if.remote_ip4)
+ self.assert_equal(rx[IP].dst, p.tun_if.local_ip4)
+ self.assert_packet_checksums_valid(rx)
+
+ def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
+ payload_size=54):
+ return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
+ sa.encrypt(IP(src=src, dst=dst) /
+ UDP(sport=1111, dport=2222) /
+ Raw(b'X' * payload_size))
+ for i in range(count)]
+
+ def test_linux_cp_ipsec4_tun(self):
+ """ Linux CP Ipsec TUN """
+
+ #
+ # Setup
+ #
+ N_PKTS = 31
+
+ # the pg that paris with the tunnel
+ self.host = self.pg3
+
+ # tunnel and protection setup
+ p = self.ipv4_params
+
+ self.config_network(p)
+ self.config_sa_tun(p,
+ self.pg0.local_ip4,
+ self.pg0.remote_ip4)
+ self.config_protect(p)
+
+ pair = VppLcpPair(self, p.tun_if, self.host).add_vpp_config()
+
+ self.logger.error(self.vapi.cli("sh int addr"))
+ self.logger.info(self.vapi.cli("sh lcp"))
+ self.logger.info(self.vapi.cli("sh ip punt redirect"))
+
+ #
+ # Traffic Tests
+ #
+
+ # host to phy for v4
+ pkt = (IP(src=p.tun_if.local_ip4,
+ dst=p.tun_if.remote_ip4) /
+ UDP(sport=1234, dport=1234) /
+ Raw())
+
+ rxs = self.send_and_expect(self.host, pkt * N_PKTS, self.tun_if)
+ self.verify_encrypted(p, p.vpp_tun_sa, rxs)
+
+ # phy to host for v4
+ pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
+ src=p.tun_if.remote_ip4,
+ dst=p.tun_if.local_ip4,
+ count=N_PKTS)
+ try:
+ rxs = self.send_and_expect(self.tun_if, pkts, self.host)
+ self.verify_decrypted(p, rxs)
+ finally:
+ self.logger.error(self.vapi.cli("sh trace"))
+
+ # cleanup
+ pair.remove_vpp_config()
+ self.unconfig_protect(p)
+ self.unconfig_sa(p)
+ self.unconfig_network(p)
+
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)