blob: 38808a9e1b7e8a959d82ba3cb3dcf1b27f1a6747 [file] [log] [blame]
Damjan Marionf56b77a2016-10-03 19:44:57 +02001#!/usr/bin/env python
2
3import logging
4logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
5
6import unittest
7from framework import VppTestCase, VppTestRunner
8from util import Util
9
10from scapy.packet import Raw
11from scapy.layers.l2 import Ether, Dot1Q
12from scapy.layers.inet6 import (IPv6, UDP,
13 ICMPv6ND_NS, ICMPv6NDOptSrcLLAddr,
14 ICMPv6ND_NA, ICMPv6NDOptDstLLAddr)
15
16
17@unittest.skip('Not finished yet.\n')
18class TestIPv6(Util, VppTestCase):
19 """ IPv6 Test Case """
20
21 @classmethod
22 def setUpClass(cls):
23 super(TestIPv6, cls).setUpClass()
24
25 try:
26 cls.create_interfaces_and_subinterfaces()
27
28 # configure IPv6 on hardware interfaces
29 cls.config_ip6(cls.interfaces)
30
31 cls.config_ip6_on_software_interfaces(cls.interfaces)
32
33 # resolve ICMPv6 ND using hardware interfaces
34 cls.resolve_icmpv6_nd(cls.interfaces)
35
36 # let VPP know MAC addresses of peer (sub)interfaces
37 # cls.resolve_icmpv6_nd_on_software_interfaces(cls.interfaces)
38 cls.send_neighbour_advertisement_on_software_interfaces(cls.interfaces)
39
40 # config 2M FIB enries
41 #cls.config_fib_entries(2000000)
42 cls.config_fib_entries(1000000)
43
44 except Exception as e:
45 super(TestIPv6, cls).tearDownClass()
46 raise
47
48 def tearDown(self):
49 self.cli(2, "show int")
50 self.cli(2, "show trace")
51 self.cli(2, "show hardware")
52 self.cli(2, "show ip arp")
53 # self.cli(2, "show ip fib") # 2M entries
54 self.cli(2, "show error")
55 self.cli(2, "show run")
56
57 @classmethod
58 def create_vlan_subif(cls, pg_index, vlan):
59 cls.api("create_vlan_subif pg%u vlan %u" % (pg_index, vlan))
60
61 @classmethod
62 def create_dot1ad_subif(cls, pg_index, sub_id, outer_vlan_id, inner_vlan_id):
63 cls.api("create_subif pg%u sub_id %u outer_vlan_id %u inner_vlan_id %u dot1ad"
64 % (pg_index, sub_id, outer_vlan_id, inner_vlan_id))
65
66 class SoftInt(object):
67 pass
68
69 class HardInt(SoftInt):
70 pass
71
72 class Subint(SoftInt):
73 def __init__(self, sub_id):
74 self.sub_id = sub_id
75
76 class Dot1QSubint(Subint):
77 def __init__(self, sub_id, vlan=None):
78 if vlan is None:
79 vlan = sub_id
80 super(TestIPv6.Dot1QSubint, self).__init__(sub_id)
81 self.vlan = vlan
82
83 class Dot1ADSubint(Subint):
84 def __init__(self, sub_id, outer_vlan, inner_vlan):
85 super(TestIPv6.Dot1ADSubint, self).__init__(sub_id)
86 self.outer_vlan = outer_vlan
87 self.inner_vlan = inner_vlan
88
89 @classmethod
90 def create_interfaces_and_subinterfaces(cls):
91 cls.interfaces = range(3)
92
93 cls.create_interfaces(cls.interfaces)
94
95 # Make vpp_api_test see interfaces created using debug CLI (in function create_interfaces)
96 cls.api("sw_interface_dump")
97
98 cls.INT_DETAILS = dict()
99
100 cls.INT_DETAILS[0] = cls.HardInt()
101
102 cls.INT_DETAILS[1] = cls.Dot1QSubint(100)
103 cls.create_vlan_subif(1, cls.INT_DETAILS[1].vlan)
104
105 # FIXME: Wrong packet format/wrong layer on output of interface 2
106 #self.INT_DETAILS[2] = self.Dot1ADSubint(10, 200, 300)
107 #self.create_dot1ad_subif(2, self.INT_DETAILS[2].sub_id, self.INT_DETAILS[2].outer_vlan, self.INT_DETAILS[2].inner_vlan)
108
109 # Use dor1q for now
110 cls.INT_DETAILS[2] = cls.Dot1QSubint(200)
111 cls.create_vlan_subif(2, cls.INT_DETAILS[2].vlan)
112
113 for i in cls.interfaces:
114 det = cls.INT_DETAILS[i]
115 if isinstance(det, cls.Subint):
116 cls.api("sw_interface_set_flags pg%u.%u admin-up" % (i, det.sub_id))
117
118 # IP adresses on subinterfaces
119 MY_SOFT_IP6S = {}
120 VPP_SOFT_IP6S = {}
121
122 @classmethod
123 def config_ip6_on_software_interfaces(cls, args):
124 for i in args:
125 cls.MY_SOFT_IP6S[i] = "fd01:%u::2" % i
126 cls.VPP_SOFT_IP6S[i] = "fd01:%u::1" % i
127 if isinstance(cls.INT_DETAILS[i], cls.Subint):
128 interface = "pg%u.%u" % (i, cls.INT_DETAILS[i].sub_id)
129 else:
130 interface = "pg%u" % i
131 cls.api("sw_interface_add_del_address %s %s/32" % (interface, cls.VPP_SOFT_IP6S[i]))
132 cls.log("My subinterface IPv6 address is %s" % (cls.MY_SOFT_IP6S[i]))
133
134 # let VPP know MAC addresses of peer (sub)interfaces
135 @classmethod
136 def resolve_icmpv6_nd_on_software_interfaces(cls, args):
137 for i in args:
138 ip = cls.VPP_SOFT_IP6S[i]
139 cls.log("Sending ICMPv6ND_NS request for %s on port %u" % (ip, i))
140 nd_req = (Ether(dst="ff:ff:ff:ff:ff:ff", src=cls.MY_MACS[i]) /
141 IPv6(src=cls.MY_SOFT_IP6S[i], dst=ip) /
142 ICMPv6ND_NS(tgt=ip) /
143 ICMPv6NDOptSrcLLAddr(lladdr=cls.MY_MACS[i]))
144 cls.pg_add_stream(i, nd_req)
145 cls.pg_enable_capture([i])
146
147 cls.cli(2, "trace add pg-input 1")
148 cls.pg_start()
149
150 # We don't need to read output
151
152 # let VPP know MAC addresses of peer (sub)interfaces
153 @classmethod
154 def send_neighbour_advertisement_on_software_interfaces(cls, args):
155 for i in args:
156 ip = cls.VPP_SOFT_IP6S[i]
157 cls.log("Sending ICMPv6ND_NA message for %s on port %u" % (ip, i))
158 pkt = (Ether(dst="ff:ff:ff:ff:ff:ff", src=cls.MY_MACS[i]) /
159 IPv6(src=cls.MY_SOFT_IP6S[i], dst=ip) /
160 ICMPv6ND_NA(tgt=ip, R=0, S=0) /
161 ICMPv6NDOptDstLLAddr(lladdr=cls.MY_MACS[i]))
162 cls.pg_add_stream(i, pkt)
163 cls.pg_enable_capture([i])
164
165 cls.cli(2, "trace add pg-input 1")
166 cls.pg_start()
167
168 @classmethod
169 def config_fib_entries(cls, count):
170 n_int = len(cls.interfaces)
171 for i in cls.interfaces:
172 cls.api("ip_add_del_route fd02::1/128 via %s count %u" % (cls.VPP_SOFT_IP6S[i], count / n_int))
173
174 @classmethod
175 def add_dot1_layers(cls, i, packet):
176 assert(type(packet) is Ether)
177 payload = packet.payload
178 det = cls.INT_DETAILS[i]
179 if isinstance(det, cls.Dot1QSubint):
180 packet.remove_payload()
181 packet.add_payload(Dot1Q(vlan=det.sub_id) / payload)
182 elif isinstance(det, cls.Dot1ADSubint):
183 packet.remove_payload()
184 packet.add_payload(Dot1Q(vlan=det.outer_vlan) / Dot1Q(vlan=det.inner_vlan) / payload)
185 packet.type = 0x88A8
186
187 def remove_dot1_layers(self, i, packet):
188 self.assertEqual(type(packet), Ether)
189 payload = packet.payload
190 det = self.INT_DETAILS[i]
191 if isinstance(det, self.Dot1QSubint):
192 self.assertEqual(type(payload), Dot1Q)
193 self.assertEqual(payload.vlan, self.INT_DETAILS[i].vlan)
194 payload = payload.payload
195 elif isinstance(det, self.Dot1ADSubint): # TODO: change 88A8 type
196 self.assertEqual(type(payload), Dot1Q)
197 self.assertEqual(payload.vlan, self.INT_DETAILS[i].outer_vlan)
198 payload = payload.payload
199 self.assertEqual(type(payload), Dot1Q)
200 self.assertEqual(payload.vlan, self.INT_DETAILS[i].inner_vlan)
201 payload = payload.payload
202 packet.remove_payload()
203 packet.add_payload(payload)
204
205 def create_stream(self, pg_id):
206 pg_targets = [None] * 3
207 pg_targets[0] = [1, 2]
208 pg_targets[1] = [0, 2]
209 pg_targets[2] = [0, 1]
210 pkts = []
211 for i in range(0, 257):
212 target_pg_id = pg_targets[pg_id][i % 2]
213 info = self.create_packet_info(pg_id, target_pg_id)
214 payload = self.info_to_payload(info)
215 p = (Ether(dst=self.VPP_MACS[pg_id], src=self.MY_MACS[pg_id]) /
216 IPv6(src=self.MY_SOFT_IP6S[pg_id], dst=self.MY_SOFT_IP6S[target_pg_id]) /
217 UDP(sport=1234, dport=1234) /
218 Raw(payload))
219 info.data = p.copy()
220 self.add_dot1_layers(pg_id, p)
221 if not isinstance(self.INT_DETAILS[pg_id], self.Subint):
222 packet_sizes = [76, 512, 1518, 9018]
223 else:
224 packet_sizes = [76, 512, 1518+4, 9018+4]
225 size = packet_sizes[(i / 2) % len(packet_sizes)]
226 self.extend_packet(p, size)
227 pkts.append(p)
228 return pkts
229
230 def verify_capture(self, o, capture):
231 last_info = {}
232 for i in self.interfaces:
233 last_info[i] = None
234 for packet in capture:
235 self.remove_dot1_layers(o, packet) # Check VLAN tags and Ethernet header
236 self.assertTrue(Dot1Q not in packet)
237 try:
238 ip = packet[IPv6]
239 udp = packet[UDP]
240 payload_info = self.payload_to_info(str(packet[Raw]))
241 packet_index = payload_info.index
242 src_pg = payload_info.src
243 dst_pg = payload_info.dst
244 self.assertEqual(dst_pg, o)
245 self.log("Got packet on port %u: src=%u (id=%u)" % (o, src_pg, packet_index), 2)
246 next_info = self.get_next_packet_info_for_interface2(src_pg, dst_pg, last_info[src_pg])
247 last_info[src_pg] = next_info
248 self.assertTrue(next_info is not None)
249 self.assertEqual(packet_index, next_info.index)
250 saved_packet = next_info.data
251 # Check standard fields
252 self.assertEqual(ip.src, saved_packet[IPv6].src)
253 self.assertEqual(ip.dst, saved_packet[IPv6].dst)
254 self.assertEqual(udp.sport, saved_packet[UDP].sport)
255 self.assertEqual(udp.dport, saved_packet[UDP].dport)
256 except:
257 self.log("Unexpected or invalid packet:")
258 packet.show()
259 raise
260 for i in self.interfaces:
261 remaining_packet = self.get_next_packet_info_for_interface2(i, o, last_info[i])
262 self.assertTrue(remaining_packet is None, "Port %u: Packet expected from source %u didn't arrive" % (o, i))
263
264 def test_fib(self):
265 """ IPv6 FIB test """
266
267 for i in self.interfaces:
268 pkts = self.create_stream(i)
269 self.pg_add_stream(i, pkts)
270
271 self.pg_enable_capture(self.interfaces)
272 self.pg_start()
273
274 for i in self.interfaces:
275 out = self.pg_get_capture(i)
276 self.log("Verifying capture %u" % i)
277 self.verify_capture(i, out)
278
279
280if __name__ == '__main__':
281 unittest.main(testRunner = VppTestRunner)