blob: ef5b3a107dcfe28dea7e12bb53e12890aa1887d1 [file] [log] [blame]
#!/usr/bin/env python3
"""Policy 1:1 NAT functional tests"""
import unittest
from scapy.layers.inet import Ether, IP, UDP, ICMP
from framework import VppTestCase
from asfframework import VppTestRunner
from vpp_papi import VppEnum
from config import config
@unittest.skipIf("nat" in config.excluded_plugins, "Exclude NAT plugin tests")
class TestPNAT(VppTestCase):
"""PNAT Test Case"""
maxDiff = None
@classmethod
def setUpClass(cls):
super(TestPNAT, cls).setUpClass()
cls.create_pg_interfaces(range(2))
cls.interfaces = list(cls.pg_interfaces)
@classmethod
def tearDownClass(cls):
super(TestPNAT, cls).tearDownClass()
def setUp(self):
super(TestPNAT, self).setUp()
for i in self.interfaces:
i.admin_up()
i.config_ip4()
i.resolve_arp()
def tearDown(self):
super(TestPNAT, self).tearDown()
if not self.vpp_dead:
for i in self.pg_interfaces:
i.unconfig_ip4()
i.admin_down()
def validate(self, rx, expected):
self.assertTrue(bytes(rx), bytes(expected))
def validate_bytes(self, rx, expected):
self.assertEqual(rx, expected)
def ping_check(self):
"""Verify non matching traffic works."""
p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
icmpecho = IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) / ICMP()
reply = IP(src=self.pg0.local_ip4, dst=self.pg0.remote_ip4) / ICMP(
type="echo-reply"
)
rx = self.send_and_expect(self.pg0, p_ether / icmpecho * 1, self.pg0)
for p in rx:
reply[IP].id = p[IP].id
self.validate(p[1], reply)
def test_pnat(self):
"""PNAT test"""
PNAT_IP4_INPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_INPUT
PNAT_IP4_OUTPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_OUTPUT
tests = [
{
"input": PNAT_IP4_INPUT,
"sw_if_index": self.pg0.sw_if_index,
"match": {
"mask": 0xA,
"dst": "10.10.10.10",
"proto": 17,
"dport": 6871,
},
"rewrite": {"mask": 0x2, "dst": self.pg1.remote_ip4},
"send": (
IP(src=self.pg0.remote_ip4, dst="10.10.10.10") / UDP(dport=6871)
),
"reply": (
IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
/ UDP(dport=6871)
),
},
{
"input": PNAT_IP4_OUTPUT,
"sw_if_index": self.pg1.sw_if_index,
"match": {
"mask": 0x9,
"src": self.pg0.remote_ip4,
"proto": 17,
"dport": 6871,
},
"rewrite": {"mask": 0x1, "src": "11.11.11.11"},
"send": (
IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
/ UDP(dport=6871)
),
"reply": (
IP(src="11.11.11.11", dst=self.pg1.remote_ip4) / UDP(dport=6871)
),
},
{
"input": PNAT_IP4_INPUT,
"sw_if_index": self.pg0.sw_if_index,
"match": {
"mask": 0xA,
"dst": "10.10.10.10",
"proto": 17,
"dport": 6871,
},
"rewrite": {"mask": 0xA, "dst": self.pg1.remote_ip4, "dport": 5555},
"send": (
IP(src=self.pg0.remote_ip4, dst="10.10.10.10")
/ UDP(sport=65530, dport=6871)
),
"reply": (
IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
/ UDP(sport=65530, dport=5555)
),
},
{
"input": PNAT_IP4_INPUT,
"sw_if_index": self.pg0.sw_if_index,
"match": {
"mask": 0xA,
"dst": self.pg1.remote_ip4,
"proto": 17,
"dport": 6871,
},
"rewrite": {"mask": 0x8, "dport": 5555},
"send": (
IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
/ UDP(dport=6871, chksum=0)
),
"reply": (
IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
/ UDP(dport=5555, chksum=0)
),
},
{
"input": PNAT_IP4_INPUT,
"sw_if_index": self.pg0.sw_if_index,
"match": {"mask": 0x2, "dst": self.pg1.remote_ip4, "proto": 1},
"rewrite": {"mask": 0x1, "src": "8.8.8.8"},
"send": (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / ICMP()),
"reply": IP(src="8.8.8.8", dst=self.pg1.remote_ip4) / ICMP(),
},
]
p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
for t in tests:
rv = self.vapi.pnat_binding_add(match=t["match"], rewrite=t["rewrite"])
self.vapi.pnat_binding_attach(
sw_if_index=t["sw_if_index"],
attachment=t["input"],
binding_index=rv.binding_index,
)
reply = t["reply"]
reply[IP].ttl -= 1
rx = self.send_and_expect(self.pg0, p_ether / t["send"] * 1, self.pg1)
for p in rx:
# p.show2()
self.validate(p[1], reply)
self.ping_check()
self.vapi.pnat_binding_detach(
sw_if_index=t["sw_if_index"],
attachment=t["input"],
binding_index=rv.binding_index,
)
self.vapi.pnat_binding_del(binding_index=rv.binding_index)
def test_pnat_show(self):
"""PNAT show tests"""
PNAT_IP4_INPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_INPUT
PNAT_IP4_OUTPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_OUTPUT
tests = [
{
"input": PNAT_IP4_INPUT,
"sw_if_index": self.pg0.sw_if_index,
"match": {
"mask": 0xA,
"dst": "10.10.10.10",
"proto": 17,
"dport": 6871,
},
"rewrite": {"mask": 0x2, "dst": self.pg1.remote_ip4},
"send": (
IP(src=self.pg0.remote_ip4, dst="10.10.10.10") / UDP(dport=6871)
),
"reply": (
IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
/ UDP(dport=6871)
),
},
{
"input": PNAT_IP4_OUTPUT,
"sw_if_index": self.pg1.sw_if_index,
"match": {
"mask": 0x9,
"src": self.pg0.remote_ip4,
"proto": 17,
"dport": 6871,
},
"rewrite": {"mask": 0x1, "src": "11.11.11.11"},
"send": (
IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
/ UDP(dport=6871)
),
"reply": (
IP(src="11.11.11.11", dst=self.pg1.remote_ip4) / UDP(dport=6871)
),
},
]
binding_index = []
for t in tests:
rv = self.vapi.pnat_binding_add(match=t["match"], rewrite=t["rewrite"])
binding_index.append(rv.binding_index)
self.vapi.pnat_binding_attach(
sw_if_index=t["sw_if_index"],
attachment=t["input"],
binding_index=rv.binding_index,
)
rv, l = self.vapi.pnat_bindings_get()
self.assertEqual(len(l), len(tests))
rv, l = self.vapi.pnat_interfaces_get()
self.assertEqual(len(l), 2)
self.logger.info(self.vapi.cli("show pnat translations"))
self.logger.info(self.vapi.cli("show pnat interfaces"))
for i, t in enumerate(tests):
self.vapi.pnat_binding_detach(
sw_if_index=t["sw_if_index"],
attachment=t["input"],
binding_index=binding_index[i],
)
self.vapi.pnat_binding_del(binding_index=binding_index[i])
def test_pnat_wildcard_proto(self):
"""
PNAT test wildcard IP protocol, PNAT_PROTO for mask should be set by
handler
"""
PNAT_IP4_INPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_INPUT
PNAT_IP4_OUTPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_OUTPUT
tests = [
{
"input": PNAT_IP4_INPUT,
"sw_if_index": self.pg0.sw_if_index,
"match": {"mask": 0x2, "dst": "10.10.10.10"},
"rewrite": {"mask": 0x2, "dst": self.pg1.remote_ip4},
"send": (IP(src=self.pg0.remote_ip4, dst="10.10.10.10")),
"reply": (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)),
},
{
"input": PNAT_IP4_OUTPUT,
"sw_if_index": self.pg1.sw_if_index,
"match": {"mask": 0x1, "src": self.pg0.remote_ip4},
"rewrite": {"mask": 0x1, "src": "11.11.11.11"},
"send": (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)),
"reply": (IP(src="11.11.11.11", dst=self.pg1.remote_ip4)),
},
]
p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
for t in tests:
rv = self.vapi.pnat_binding_add(match=t["match"], rewrite=t["rewrite"])
self.vapi.pnat_binding_attach(
sw_if_index=t["sw_if_index"],
attachment=t["input"],
binding_index=rv.binding_index,
)
reply = t["reply"]
reply[IP].ttl -= 1
rx = self.send_and_expect(self.pg0, p_ether / t["send"] * 1, self.pg1)
for p in rx:
self.validate(p[1], reply)
self.ping_check()
self.vapi.pnat_binding_detach(
sw_if_index=t["sw_if_index"],
attachment=t["input"],
binding_index=rv.binding_index,
)
self.vapi.pnat_binding_del(binding_index=rv.binding_index)
def test_pnat_wildcard_proto_v2(self):
"""PNAT test wildcard IP protocol using pnat_binding_add_v2"""
PNAT_IP4_INPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_INPUT
PNAT_IP4_OUTPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_OUTPUT
tests = [
{
"input": PNAT_IP4_INPUT,
"sw_if_index": self.pg0.sw_if_index,
"match": {"mask": 0x42, "dst": "10.10.10.10"},
"rewrite": {"mask": 0x42, "dst": self.pg1.remote_ip4},
"send": (IP(src=self.pg0.remote_ip4, dst="10.10.10.10")),
"reply": (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)),
},
{
"input": PNAT_IP4_OUTPUT,
"sw_if_index": self.pg1.sw_if_index,
"match": {"mask": 0x41, "src": self.pg0.remote_ip4},
"rewrite": {"mask": 0x41, "src": "11.11.11.11"},
"send": (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)),
"reply": (IP(src="11.11.11.11", dst=self.pg1.remote_ip4)),
},
]
p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
for t in tests:
rv = self.vapi.pnat_binding_add_v2(match=t["match"], rewrite=t["rewrite"])
self.vapi.pnat_binding_attach(
sw_if_index=t["sw_if_index"],
attachment=t["input"],
binding_index=rv.binding_index,
)
reply = t["reply"]
reply[IP].ttl -= 1
rx = self.send_and_expect(self.pg0, p_ether / t["send"] * 1, self.pg1)
for p in rx:
self.validate(p[1], reply)
self.ping_check()
self.vapi.pnat_binding_detach(
sw_if_index=t["sw_if_index"],
attachment=t["input"],
binding_index=rv.binding_index,
)
self.vapi.pnat_binding_del(binding_index=rv.binding_index)
if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)