blob: 418e03adb527eadab24331b936523655b1cf60f2 [file] [log] [blame]
import socket
import unittest
from scapy.layers.inet import IP, UDP
from scapy.layers.inet6 import IPv6
from scapy.layers.l2 import Ether
from scapy.packet import Raw
from framework import VppTestCase, running_extended_tests
from util import ppp
""" TestKP is a subclass of VPPTestCase classes.
TestKP class defines Four NAT test case for:
- IP4 to IP4 NAT
- IP4 to IP6 NAT
- IP6 to IP4 NAT
- IP6 to IP6 NAT
"""
class TestKP(VppTestCase):
""" Kube-proxy Test Case """
@classmethod
def setUpClass(cls):
super(TestKP, cls).setUpClass()
cls.pods = range(5)
cls.packets = range(5)
try:
cls.create_pg_interfaces(range(2))
cls.interfaces = list(cls.pg_interfaces)
for i in cls.interfaces:
i.admin_up()
i.config_ip4()
i.config_ip6()
i.disable_ipv6_ra()
i.resolve_arp()
i.resolve_ndp()
dst4 = socket.inet_pton(socket.AF_INET, "10.0.0.0")
dst6 = socket.inet_pton(socket.AF_INET6, "2002::")
cls.vapi.ip_add_del_route(dst4, 24, cls.pg1.remote_ip4n)
cls.vapi.ip_add_del_route(dst6, 16, cls.pg1.remote_ip6n, is_ipv6=1)
except Exception:
super(TestKP, cls).tearDownClass()
raise
def tearDown(self):
super(TestKP, self).tearDown()
if not self.vpp_dead:
self.logger.info(self.vapi.cli("show ku vip verbose"))
def getIPv4Flow(self, id):
return (IP(dst="90.0.%u.%u" % (id / 255, id % 255),
src="40.0.%u.%u" % (id / 255, id % 255)) /
UDP(sport=10000 + id, dport=3306))
def getIPv6Flow(self, id):
return (IPv6(dst="2001::%u" % (id), src="fd00:f00d:ffff::%u" % (id)) /
UDP(sport=10000 + id, dport=3306))
def generatePackets(self, src_if, isv4):
self.reset_packet_infos()
pkts = []
for pktid in self.packets:
info = self.create_packet_info(src_if, self.pg1)
payload = self.info_to_payload(info)
ip = self.getIPv4Flow(pktid) if isv4 else self.getIPv6Flow(pktid)
packet = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
ip /
Raw(payload))
self.extend_packet(packet, 128)
info.data = packet.copy()
pkts.append(packet)
return pkts
def checkInner(self, udp):
self.assertEqual(udp.dport, 3307)
def checkCapture(self, nat4, isv4):
self.pg0.assert_nothing_captured()
out = self.pg1.get_capture(len(self.packets))
load = [0] * len(self.pods)
self.info = None
for p in out:
try:
podid = 0
udp = None
if nat4:
ip = p[IP]
podid = int(ip.dst.split(".")[3])
self.assertEqual(ip.version, 4)
self.assertEqual(ip.flags, 0)
self.assertEqual(ip.dst, "10.0.0.%u" % podid)
self.assertEqual(ip.proto, 17)
self.assertEqual(len(ip.options), 0)
self.assertGreaterEqual(ip.ttl, 63)
udp = p[UDP]
else:
ip = p[IPv6]
podid = ip.dst.split(":")
podid = podid[len(podid) - 1]
podid = 0 if podid == "" else int(podid)
self.assertEqual(ip.version, 6)
self.assertEqual(ip.tc, 0)
self.assertEqual(ip.fl, 0)
self.assertEqual(
socket.inet_pton(socket.AF_INET6, ip.dst),
socket.inet_pton(socket.AF_INET6, "2002::%u" % podid)
)
self.assertEqual(ip.nh, 17)
self.assertGreaterEqual(ip.hlim, 63)
udp = UDP(str(p[IPv6].payload))
# self.assertEqual(len(ip.options), 0)
self.checkInner(udp)
load[podid] += 1
except:
self.logger.error(ppp("Unexpected or invalid packet:", p))
raise
# This is just to roughly check that the balancing algorithm
# is not completly biased.
for podid in self.pods:
if load[podid] < len(self.packets) / (len(self.pods) * 2):
self.log(
"Pod isn't balanced: load[%d] = %d" % (podid, load[podid]))
raise Exception("Kube-proxy algorithm is biased")
def test_kp_ip4_nat4(self):
""" Kube-proxy NAT44 """
try:
self.vapi.cli("ku vip 90.0.0.0/8 port 3306 target_port 3307 nat4")
for podid in self.pods:
self.vapi.cli("ku pod 90.0.0.0/8 10.0.0.%u" % (podid))
self.pg0.add_stream(self.generatePackets(self.pg0, isv4=True))
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
self.checkCapture(nat4=True, isv4=True)
finally:
for podid in self.pods:
self.vapi.cli("ku pod 90.0.0.0/8 10.0.0.%u del" % (podid))
self.vapi.cli("ku vip 90.0.0.0/8 nat4 del")
@unittest.skipUnless(running_extended_tests(), "part of extended tests")
def test_kp_ip6_nat4(self):
""" Kube-proxy NAT64 """
try:
self.vapi.cli("ku vip 90.0.0.0/8 port 3306 target_port 3307 nat4")
for podid in self.pods:
self.vapi.cli("ku pod 2001::/16 10.0.0.%u" % (podid))
self.pg0.add_stream(self.generatePackets(self.pg0, isv4=False))
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
self.checkCapture(nat4=True, isv4=False)
finally:
for podid in self.pods:
self.vapi.cli("ku pod 2001::/16 10.0.0.%u del" % (podid))
self.vapi.cli("ku vip 2001::/16 nat4 del")
@unittest.skipUnless(running_extended_tests(), "part of extended tests")
def test_kp_ip4_nat6(self):
""" Kube-proxy NAT46 """
try:
self.vapi.cli("ku vip 90.0.0.0/8 port 3306 target_port 3307 nat6")
for podid in self.pods:
self.vapi.cli("ku pod 90.0.0.0/8 2002::%u" % (podid))
self.pg0.add_stream(self.generatePackets(self.pg0, isv4=True))
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
self.checkCapture(nat4=False, isv4=True)
finally:
for podid in self.pods:
self.vapi.cli("ku pod 90.0.0.0/8 2002::%u" % (podid))
self.vapi.cli("ku vip 90.0.0.0/8 nat6 del")
@unittest.skipUnless(running_extended_tests(), "part of extended tests")
def test_kp_ip6_nat6(self):
""" Kube-proxy NAT66 """
try:
self.vapi.cli("ku vip 90.0.0.0/8 port 3306 target_port 3307 nat6")
for podid in self.pods:
self.vapi.cli("ku pod 2001::/16 2002::%u" % (podid))
self.pg0.add_stream(self.generatePackets(self.pg0, isv4=False))
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
self.checkCapture(nat4=False, isv4=False)
finally:
for podid in self.pods:
self.vapi.cli("ku pod 2001::/16 2002::%u del" % (podid))
self.vapi.cli("ku vip 2001::/16 nat6 del")