blob: e3236216bea5cb56f8b1e06e440b08174b6710ef [file] [log] [blame]
Ole Troan96068d62020-07-01 13:16:16 +02001#!/usr/bin/env python3
2
3import ipaddress
4import random
5import socket
6import struct
7import unittest
8from io import BytesIO
Ole Troan96068d62020-07-01 13:16:16 +02009
10import scapy.compat
11from framework import VppTestCase, VppTestRunner, running_extended_tests
12from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
13from scapy.all import bind_layers, Packet, ByteEnumField, ShortField, \
14 IPField, IntField, LongField, XByteField, FlagsField, FieldLenField, \
15 PacketListField
16from scapy.data import IP_PROTOS
17from scapy.layers.inet import IP, TCP, UDP, ICMP
18from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
19from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
20from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, \
21 ICMPv6ND_NS, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, fragment6
22from scapy.layers.l2 import Ether, ARP, GRE
23from scapy.packet import Raw
24from syslog_rfc5424_parser import SyslogMessage, ParseError
25from syslog_rfc5424_parser.constants import SyslogSeverity
26from util import ip4_range
27from util import ppc, ppp
28from vpp_acl import AclRule, VppAcl, VppAclInterface
29from vpp_ip_route import VppIpRoute, VppRoutePath
30from vpp_neighbor import VppNeighbor
31from vpp_papi import VppEnum
32
33
Filip Vargaed2ee5e2021-03-23 12:57:58 +010034class TestNAT66(VppTestCase):
Ole Troan96068d62020-07-01 13:16:16 +020035 """ NAT66 Test Cases """
36
37 @classmethod
38 def setUpClass(cls):
39 super(TestNAT66, cls).setUpClass()
40
41 cls.nat_addr = 'fd01:ff::2'
Ole Troan96068d62020-07-01 13:16:16 +020042 cls.create_pg_interfaces(range(2))
43 cls.interfaces = list(cls.pg_interfaces)
44
45 for i in cls.interfaces:
46 i.admin_up()
47 i.config_ip6()
48 i.configure_ipv6_neighbors()
49
Filip Vargaed2ee5e2021-03-23 12:57:58 +010050 @property
51 def config_flags(self):
52 return VppEnum.vl_api_nat_config_flags_t
53
54 def plugin_enable(self):
55 self.vapi.nat66_plugin_enable_disable(enable=1)
56
57 def plugin_disable(self):
58 self.vapi.nat66_plugin_enable_disable(enable=0)
59
60 def setUp(self):
61 super(TestNAT66, self).setUp()
62 self.plugin_enable()
63
64 def tearDown(self):
65 super(TestNAT66, self).tearDown()
66 if not self.vpp_dead:
67 self.plugin_disable()
Ole Troan96068d62020-07-01 13:16:16 +020068
69 def test_static(self):
70 """ 1:1 NAT66 test """
71 flags = self.config_flags.NAT_IS_INSIDE
72 self.vapi.nat66_add_del_interface(is_add=1, flags=flags,
73 sw_if_index=self.pg0.sw_if_index)
74 self.vapi.nat66_add_del_interface(is_add=1,
75 sw_if_index=self.pg1.sw_if_index)
76 self.vapi.nat66_add_del_static_mapping(
77 local_ip_address=self.pg0.remote_ip6,
78 external_ip_address=self.nat_addr,
79 is_add=1)
80
81 # in2out
82 pkts = []
83 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
84 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
85 TCP())
86 pkts.append(p)
87 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
88 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
89 UDP())
90 pkts.append(p)
91 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
92 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
93 ICMPv6EchoRequest())
94 pkts.append(p)
95 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
96 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
97 GRE() / IP() / TCP())
98 pkts.append(p)
99 self.pg0.add_stream(pkts)
100 self.pg_enable_capture(self.pg_interfaces)
101 self.pg_start()
102 capture = self.pg1.get_capture(len(pkts))
103
104 for packet in capture:
105 try:
106 self.assertEqual(packet[IPv6].src, self.nat_addr)
107 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
108 self.assert_packet_checksums_valid(packet)
109 except:
110 self.logger.error(ppp("Unexpected or invalid packet:", packet))
111 raise
112
113 # out2in
114 pkts = []
115 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
116 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
117 TCP())
118 pkts.append(p)
119 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
120 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
121 UDP())
122 pkts.append(p)
123 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
124 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
125 ICMPv6EchoReply())
126 pkts.append(p)
127 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
128 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
129 GRE() / IP() / TCP())
130 pkts.append(p)
131 self.pg1.add_stream(pkts)
132 self.pg_enable_capture(self.pg_interfaces)
133 self.pg_start()
134 capture = self.pg0.get_capture(len(pkts))
135 for packet in capture:
136 try:
137 self.assertEqual(packet[IPv6].src, self.pg1.remote_ip6)
138 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
139 self.assert_packet_checksums_valid(packet)
140 except:
141 self.logger.error(ppp("Unexpected or invalid packet:", packet))
142 raise
143
144 sm = self.vapi.nat66_static_mapping_dump()
145 self.assertEqual(len(sm), 1)
146 self.assertEqual(sm[0].total_pkts, 8)
147
148 def test_check_no_translate(self):
149 """ NAT66 translate only when egress interface is outside interface """
150 flags = self.config_flags.NAT_IS_INSIDE
151 self.vapi.nat66_add_del_interface(is_add=1, flags=flags,
152 sw_if_index=self.pg0.sw_if_index)
153 self.vapi.nat66_add_del_interface(is_add=1, flags=flags,
154 sw_if_index=self.pg1.sw_if_index)
155 self.vapi.nat66_add_del_static_mapping(
156 local_ip_address=self.pg0.remote_ip6,
157 external_ip_address=self.nat_addr,
158 is_add=1)
159
160 # in2out
161 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
162 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
163 UDP())
164 self.pg0.add_stream([p])
165 self.pg_enable_capture(self.pg_interfaces)
166 self.pg_start()
167 capture = self.pg1.get_capture(1)
168 packet = capture[0]
169 try:
170 self.assertEqual(packet[IPv6].src, self.pg0.remote_ip6)
171 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
172 except:
173 self.logger.error(ppp("Unexpected or invalid packet:", packet))
174 raise
175
Ole Troan96068d62020-07-01 13:16:16 +0200176
177if __name__ == '__main__':
178 unittest.main(testRunner=VppTestRunner)