blob: 2bf34052325ec46074c934595649130554c89a91 [file] [log] [blame]
#!/usr/bin/env python3
"""Policy 1:1 NAT functional tests"""
import unittest
from scapy.layers.inet import Ether, IP, UDP, ICMP
from framework import VppTestCase, VppTestRunner
from vpp_papi import VppEnum
class TestPNAT(VppTestCase):
"""PNAT Test Case"""
maxDiff = None
@classmethod
def setUpClass(cls):
super(TestPNAT, cls).setUpClass()
cls.create_pg_interfaces(range(2))
cls.interfaces = list(cls.pg_interfaces)
@classmethod
def tearDownClass(cls):
super(TestPNAT, cls).tearDownClass()
def setUp(self):
super(TestPNAT, self).setUp()
for i in self.interfaces:
i.admin_up()
i.config_ip4()
i.resolve_arp()
def tearDown(self):
super(TestPNAT, self).tearDown()
if not self.vpp_dead:
for i in self.pg_interfaces:
i.unconfig_ip4()
i.admin_down()
def validate(self, rx, expected):
self.assertEqual(rx, expected.__class__(expected))
def validate_bytes(self, rx, expected):
self.assertEqual(rx, expected)
def ping_check(self):
"""Verify non matching traffic works."""
p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
icmpecho = IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) / ICMP()
reply = IP(src=self.pg0.local_ip4, dst=self.pg0.remote_ip4) / ICMP(
type="echo-reply"
)
rx = self.send_and_expect(self.pg0, p_ether / icmpecho * 1, self.pg0)
for p in rx:
reply[IP].id = p[IP].id
self.validate(p[1], reply)
def test_pnat(self):
"""PNAT test"""
PNAT_IP4_INPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_INPUT
PNAT_IP4_OUTPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_OUTPUT
tests = [
{
"input": PNAT_IP4_INPUT,
"sw_if_index": self.pg0.sw_if_index,
"match": {
"mask": 0xA,
"dst": "10.10.10.10",
"proto": 17,
"dport": 6871,
},
"rewrite": {"mask": 0x2, "dst": self.pg1.remote_ip4},
"send": (
IP(src=self.pg0.remote_ip4, dst="10.10.10.10") / UDP(dport=6871)
),
"reply": (
IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
/ UDP(dport=6871)
),
},
{
"input": PNAT_IP4_OUTPUT,
"sw_if_index": self.pg1.sw_if_index,
"match": {
"mask": 0x9,
"src": self.pg0.remote_ip4,
"proto": 17,
"dport": 6871,
},
"rewrite": {"mask": 0x1, "src": "11.11.11.11"},
"send": (
IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
/ UDP(dport=6871)
),
"reply": (
IP(src="11.11.11.11", dst=self.pg1.remote_ip4) / UDP(dport=6871)
),
},
{
"input": PNAT_IP4_INPUT,
"sw_if_index": self.pg0.sw_if_index,
"match": {
"mask": 0xA,
"dst": "10.10.10.10",
"proto": 17,
"dport": 6871,
},
"rewrite": {"mask": 0xA, "dst": self.pg1.remote_ip4, "dport": 5555},
"send": (
IP(src=self.pg0.remote_ip4, dst="10.10.10.10")
/ UDP(sport=65530, dport=6871)
),
"reply": (
IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
/ UDP(sport=65530, dport=5555)
),
},
{
"input": PNAT_IP4_INPUT,
"sw_if_index": self.pg0.sw_if_index,
"match": {
"mask": 0xA,
"dst": self.pg1.remote_ip4,
"proto": 17,
"dport": 6871,
},
"rewrite": {"mask": 0x8, "dport": 5555},
"send": (
IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
/ UDP(dport=6871, chksum=0)
),
"reply": (
IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
/ UDP(dport=5555, chksum=0)
),
},
{
"input": PNAT_IP4_INPUT,
"sw_if_index": self.pg0.sw_if_index,
"match": {"mask": 0x2, "dst": self.pg1.remote_ip4, "proto": 1},
"rewrite": {"mask": 0x1, "src": "8.8.8.8"},
"send": (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / ICMP()),
"reply": IP(src="8.8.8.8", dst=self.pg1.remote_ip4) / ICMP(),
},
]
p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
for t in tests:
rv = self.vapi.pnat_binding_add(match=t["match"], rewrite=t["rewrite"])
self.vapi.pnat_binding_attach(
sw_if_index=t["sw_if_index"],
attachment=t["input"],
binding_index=rv.binding_index,
)
reply = t["reply"]
reply[IP].ttl -= 1
rx = self.send_and_expect(self.pg0, p_ether / t["send"] * 1, self.pg1)
for p in rx:
# p.show2()
self.validate(p[1], reply)
self.ping_check()
self.vapi.pnat_binding_detach(
sw_if_index=t["sw_if_index"],
attachment=t["input"],
binding_index=rv.binding_index,
)
self.vapi.pnat_binding_del(binding_index=rv.binding_index)
def test_pnat_show(self):
"""PNAT show tests"""
PNAT_IP4_INPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_INPUT
PNAT_IP4_OUTPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_OUTPUT
tests = [
{
"input": PNAT_IP4_INPUT,
"sw_if_index": self.pg0.sw_if_index,
"match": {
"mask": 0xA,
"dst": "10.10.10.10",
"proto": 17,
"dport": 6871,
},
"rewrite": {"mask": 0x2, "dst": self.pg1.remote_ip4},
"send": (
IP(src=self.pg0.remote_ip4, dst="10.10.10.10") / UDP(dport=6871)
),
"reply": (
IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
/ UDP(dport=6871)
),
},
{
"input": PNAT_IP4_OUTPUT,
"sw_if_index": self.pg1.sw_if_index,
"match": {
"mask": 0x9,
"src": self.pg0.remote_ip4,
"proto": 17,
"dport": 6871,
},
"rewrite": {"mask": 0x1, "src": "11.11.11.11"},
"send": (
IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
/ UDP(dport=6871)
),
"reply": (
IP(src="11.11.11.11", dst=self.pg1.remote_ip4) / UDP(dport=6871)
),
},
]
binding_index = []
for t in tests:
rv = self.vapi.pnat_binding_add(match=t["match"], rewrite=t["rewrite"])
binding_index.append(rv.binding_index)
self.vapi.pnat_binding_attach(
sw_if_index=t["sw_if_index"],
attachment=t["input"],
binding_index=rv.binding_index,
)
rv, l = self.vapi.pnat_bindings_get()
self.assertEqual(len(l), len(tests))
rv, l = self.vapi.pnat_interfaces_get()
self.assertEqual(len(l), 2)
self.logger.info(self.vapi.cli("show pnat translations"))
self.logger.info(self.vapi.cli("show pnat interfaces"))
for i, t in enumerate(tests):
self.vapi.pnat_binding_detach(
sw_if_index=t["sw_if_index"],
attachment=t["input"],
binding_index=binding_index[i],
)
self.vapi.pnat_binding_del(binding_index=binding_index[i])
def test_pnat_wildcard_proto(self):
"""
PNAT test wildcard IP protocol, PNAT_PROTO for mask should be set by
handler
"""
PNAT_IP4_INPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_INPUT
PNAT_IP4_OUTPUT = \
VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_OUTPUT
tests = [
{
'input': PNAT_IP4_INPUT,
'sw_if_index': self.pg0.sw_if_index,
'match': {'mask': 0x2, 'dst': '10.10.10.10'},
'rewrite': {'mask': 0x2, 'dst': self.pg1.remote_ip4},
'send': (IP(src=self.pg0.remote_ip4, dst='10.10.10.10')),
'reply': (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4))
},
{
'input': PNAT_IP4_OUTPUT,
'sw_if_index': self.pg1.sw_if_index,
'match': {'mask': 0x1, 'src': self.pg0.remote_ip4},
'rewrite': {'mask': 0x1, 'src': '11.11.11.11'},
'send': (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)),
'reply': (IP(src='11.11.11.11', dst=self.pg1.remote_ip4))
},
]
p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
for t in tests:
rv = self.vapi.pnat_binding_add(match=t['match'],
rewrite=t['rewrite'])
self.vapi.pnat_binding_attach(sw_if_index=t['sw_if_index'],
attachment=t['input'],
binding_index=rv.binding_index)
reply = t['reply']
reply[IP].ttl -= 1
rx = self.send_and_expect(self.pg0, p_ether / t['send'] * 1,
self.pg1)
for p in rx:
self.validate(p[1], reply)
self.ping_check()
self.vapi.pnat_binding_detach(sw_if_index=t['sw_if_index'],
attachment=t['input'],
binding_index=rv.binding_index)
self.vapi.pnat_binding_del(binding_index=rv.binding_index)
def test_pnat_wildcard_proto_v2(self):
""" PNAT test wildcard IP protocol using pnat_binding_add_v2"""
PNAT_IP4_INPUT = VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_INPUT
PNAT_IP4_OUTPUT = \
VppEnum.vl_api_pnat_attachment_point_t.PNAT_IP4_OUTPUT
tests = [
{
'input': PNAT_IP4_INPUT,
'sw_if_index': self.pg0.sw_if_index,
'match': {'mask': 0x42, 'dst': '10.10.10.10'},
'rewrite': {'mask': 0x42, 'dst': self.pg1.remote_ip4},
'send': (IP(src=self.pg0.remote_ip4, dst='10.10.10.10')),
'reply': (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4))
},
{
'input': PNAT_IP4_OUTPUT,
'sw_if_index': self.pg1.sw_if_index,
'match': {'mask': 0x41, 'src': self.pg0.remote_ip4},
'rewrite': {'mask': 0x41, 'src': '11.11.11.11'},
'send': (IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)),
'reply': (IP(src='11.11.11.11', dst=self.pg1.remote_ip4))
},
]
p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
for t in tests:
rv = self.vapi.pnat_binding_add_v2(match=t['match'],
rewrite=t['rewrite'])
self.vapi.pnat_binding_attach(sw_if_index=t['sw_if_index'],
attachment=t['input'],
binding_index=rv.binding_index)
reply = t['reply']
reply[IP].ttl -= 1
rx = self.send_and_expect(self.pg0, p_ether / t['send'] * 1,
self.pg1)
for p in rx:
self.validate(p[1], reply)
self.ping_check()
self.vapi.pnat_binding_detach(sw_if_index=t['sw_if_index'],
attachment=t['input'],
binding_index=rv.binding_index)
self.vapi.pnat_binding_del(binding_index=rv.binding_index)
if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)