blob: acda72bcdf644775465e489f079282d90999d7d8 [file] [log] [blame]
Ole Troan96068d62020-07-01 13:16:16 +02001#!/usr/bin/env python3
2
3import ipaddress
4import random
5import socket
6import struct
7import unittest
8from io import BytesIO
9from time import sleep
10
11import scapy.compat
12from framework import VppTestCase, VppTestRunner, running_extended_tests
13from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
14from scapy.all import bind_layers, Packet, ByteEnumField, ShortField, \
15 IPField, IntField, LongField, XByteField, FlagsField, FieldLenField, \
16 PacketListField
17from scapy.data import IP_PROTOS
18from scapy.layers.inet import IP, TCP, UDP, ICMP
19from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
20from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
21from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, \
22 ICMPv6ND_NS, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, fragment6
23from scapy.layers.l2 import Ether, ARP, GRE
24from scapy.packet import Raw
25from syslog_rfc5424_parser import SyslogMessage, ParseError
26from syslog_rfc5424_parser.constants import SyslogSeverity
27from util import ip4_range
28from util import ppc, ppp
29from vpp_acl import AclRule, VppAcl, VppAclInterface
30from vpp_ip_route import VppIpRoute, VppRoutePath
31from vpp_neighbor import VppNeighbor
32from vpp_papi import VppEnum
33
34
Filip Vargaed2ee5e2021-03-23 12:57:58 +010035class TestNAT66(VppTestCase):
Ole Troan96068d62020-07-01 13:16:16 +020036 """ NAT66 Test Cases """
37
38 @classmethod
39 def setUpClass(cls):
40 super(TestNAT66, cls).setUpClass()
41
42 cls.nat_addr = 'fd01:ff::2'
Ole Troan96068d62020-07-01 13:16:16 +020043 cls.create_pg_interfaces(range(2))
44 cls.interfaces = list(cls.pg_interfaces)
45
46 for i in cls.interfaces:
47 i.admin_up()
48 i.config_ip6()
49 i.configure_ipv6_neighbors()
50
Filip Vargaed2ee5e2021-03-23 12:57:58 +010051 @property
52 def config_flags(self):
53 return VppEnum.vl_api_nat_config_flags_t
54
55 def plugin_enable(self):
56 self.vapi.nat66_plugin_enable_disable(enable=1)
57
58 def plugin_disable(self):
59 self.vapi.nat66_plugin_enable_disable(enable=0)
60
61 def setUp(self):
62 super(TestNAT66, self).setUp()
63 self.plugin_enable()
64
65 def tearDown(self):
66 super(TestNAT66, self).tearDown()
67 if not self.vpp_dead:
68 self.plugin_disable()
Ole Troan96068d62020-07-01 13:16:16 +020069
70 def test_static(self):
71 """ 1:1 NAT66 test """
72 flags = self.config_flags.NAT_IS_INSIDE
73 self.vapi.nat66_add_del_interface(is_add=1, flags=flags,
74 sw_if_index=self.pg0.sw_if_index)
75 self.vapi.nat66_add_del_interface(is_add=1,
76 sw_if_index=self.pg1.sw_if_index)
77 self.vapi.nat66_add_del_static_mapping(
78 local_ip_address=self.pg0.remote_ip6,
79 external_ip_address=self.nat_addr,
80 is_add=1)
81
82 # in2out
83 pkts = []
84 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
85 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
86 TCP())
87 pkts.append(p)
88 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
89 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
90 UDP())
91 pkts.append(p)
92 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
93 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
94 ICMPv6EchoRequest())
95 pkts.append(p)
96 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
97 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
98 GRE() / IP() / TCP())
99 pkts.append(p)
100 self.pg0.add_stream(pkts)
101 self.pg_enable_capture(self.pg_interfaces)
102 self.pg_start()
103 capture = self.pg1.get_capture(len(pkts))
104
105 for packet in capture:
106 try:
107 self.assertEqual(packet[IPv6].src, self.nat_addr)
108 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
109 self.assert_packet_checksums_valid(packet)
110 except:
111 self.logger.error(ppp("Unexpected or invalid packet:", packet))
112 raise
113
114 # out2in
115 pkts = []
116 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
117 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
118 TCP())
119 pkts.append(p)
120 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
121 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
122 UDP())
123 pkts.append(p)
124 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
125 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
126 ICMPv6EchoReply())
127 pkts.append(p)
128 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
129 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
130 GRE() / IP() / TCP())
131 pkts.append(p)
132 self.pg1.add_stream(pkts)
133 self.pg_enable_capture(self.pg_interfaces)
134 self.pg_start()
135 capture = self.pg0.get_capture(len(pkts))
136 for packet in capture:
137 try:
138 self.assertEqual(packet[IPv6].src, self.pg1.remote_ip6)
139 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
140 self.assert_packet_checksums_valid(packet)
141 except:
142 self.logger.error(ppp("Unexpected or invalid packet:", packet))
143 raise
144
145 sm = self.vapi.nat66_static_mapping_dump()
146 self.assertEqual(len(sm), 1)
147 self.assertEqual(sm[0].total_pkts, 8)
148
149 def test_check_no_translate(self):
150 """ NAT66 translate only when egress interface is outside interface """
151 flags = self.config_flags.NAT_IS_INSIDE
152 self.vapi.nat66_add_del_interface(is_add=1, flags=flags,
153 sw_if_index=self.pg0.sw_if_index)
154 self.vapi.nat66_add_del_interface(is_add=1, flags=flags,
155 sw_if_index=self.pg1.sw_if_index)
156 self.vapi.nat66_add_del_static_mapping(
157 local_ip_address=self.pg0.remote_ip6,
158 external_ip_address=self.nat_addr,
159 is_add=1)
160
161 # in2out
162 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
163 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
164 UDP())
165 self.pg0.add_stream([p])
166 self.pg_enable_capture(self.pg_interfaces)
167 self.pg_start()
168 capture = self.pg1.get_capture(1)
169 packet = capture[0]
170 try:
171 self.assertEqual(packet[IPv6].src, self.pg0.remote_ip6)
172 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
173 except:
174 self.logger.error(ppp("Unexpected or invalid packet:", packet))
175 raise
176
Ole Troan96068d62020-07-01 13:16:16 +0200177
178if __name__ == '__main__':
179 unittest.main(testRunner=VppTestRunner)