blob: e017feebffe8196d8c226e1dcb046468918a3fc6 [file] [log] [blame]
Paul Vinciguerrae8fece82019-02-28 15:34:00 -08001from socket import AF_INET6
2
Juraj Sloboda81119e82018-05-25 14:02:20 +02003from scapy.layers.dhcp6 import DHCP6_Advertise, DHCP6OptClientId, \
4 DHCP6OptStatusCode, DHCP6OptPref, DHCP6OptIA_PD, DHCP6OptIAPrefix, \
5 DHCP6OptServerId, DHCP6_Solicit, DHCP6_Reply, DHCP6_Request, DHCP6_Renew, \
Juraj Slobodadd3b8f72018-05-04 14:20:06 +02006 DHCP6_Rebind, DUID_LL, DHCP6_Release, DHCP6OptElapsedTime, DHCP6OptIA_NA, \
7 DHCP6OptIAAddress
Juraj Sloboda81119e82018-05-25 14:02:20 +02008from scapy.layers.inet6 import IPv6, Ether, UDP
9from scapy.utils6 import in6_mactoifaceid
10from scapy.utils import inet_ntop, inet_pton
Paul Vinciguerrae8fece82019-02-28 15:34:00 -080011
Juraj Sloboda81119e82018-05-25 14:02:20 +020012from framework import VppTestCase
Paul Vinciguerrae8fece82019-02-28 15:34:00 -080013import util
Juraj Sloboda81119e82018-05-25 14:02:20 +020014
15
16def ip6_normalize(ip6):
17 return inet_ntop(AF_INET6, inet_pton(AF_INET6, ip6))
18
19
Juraj Slobodadd3b8f72018-05-04 14:20:06 +020020class TestDHCPv6DataPlane(VppTestCase):
21 """ DHCPv6 Data Plane Test Case """
Juraj Sloboda81119e82018-05-25 14:02:20 +020022
23 @classmethod
24 def setUpClass(cls):
Juraj Slobodadd3b8f72018-05-04 14:20:06 +020025 super(TestDHCPv6DataPlane, cls).setUpClass()
Juraj Sloboda81119e82018-05-25 14:02:20 +020026
Paul Vinciguerra7f9b7f92019-03-12 19:23:27 -070027 @classmethod
28 def tearDownClass(cls):
29 super(TestDHCPv6DataPlane, cls).tearDownClass()
30
Juraj Sloboda81119e82018-05-25 14:02:20 +020031 def setUp(self):
Juraj Slobodadd3b8f72018-05-04 14:20:06 +020032 super(TestDHCPv6DataPlane, self).setUp()
Juraj Sloboda81119e82018-05-25 14:02:20 +020033
34 self.create_pg_interfaces(range(1))
35 self.interfaces = list(self.pg_interfaces)
36 for i in self.interfaces:
37 i.admin_up()
38 i.config_ip6()
39
Juraj Slobodad9778c22018-06-12 10:21:05 +020040 self.server_duid = DUID_LL(lladdr=self.pg0.remote_mac)
Juraj Sloboda81119e82018-05-25 14:02:20 +020041
42 def tearDown(self):
43 for i in self.interfaces:
44 i.unconfig_ip6()
45 i.admin_down()
Juraj Slobodadd3b8f72018-05-04 14:20:06 +020046 super(TestDHCPv6DataPlane, self).tearDown()
Juraj Sloboda81119e82018-05-25 14:02:20 +020047
Juraj Slobodadd3b8f72018-05-04 14:20:06 +020048 def test_dhcp_ia_na_send_solicit_receive_advertise(self):
Paul Vinciguerra8feeaff2019-03-27 11:25:48 -070049 """ Verify DHCPv6 IA NA Solicit packet and Advertise event """
Juraj Slobodadd3b8f72018-05-04 14:20:06 +020050
51 self.vapi.dhcp6_clients_enable_disable()
52
53 self.pg_enable_capture(self.pg_interfaces)
54 self.pg_start()
55 address_bin = '\00\01\00\02\00\03' + '\00' * 8 + '\00\05'
56 address = {'address': address_bin,
57 'preferred_time': 60,
58 'valid_time': 120}
Ole Troana5b2eec2019-03-11 19:23:25 +010059 self.vapi.dhcp6_send_client_message(msg_type=1,
60 sw_if_index=self.pg0.sw_if_index,
61 T1=20, T2=40, addresses=[address],
62 n_addresses=len([address]))
Juraj Slobodadd3b8f72018-05-04 14:20:06 +020063 rx_list = self.pg0.get_capture(1)
64 self.assertEqual(len(rx_list), 1)
65 packet = rx_list[0]
66
Paul Vinciguerra978aa642018-11-24 22:19:12 -080067 self.assertEqual(packet.haslayer(IPv6), 1)
68 self.assertEqual(packet[IPv6].haslayer(DHCP6_Solicit), 1)
Juraj Slobodadd3b8f72018-05-04 14:20:06 +020069
70 client_duid = packet[DHCP6OptClientId].duid
71 trid = packet[DHCP6_Solicit].trid
72
73 dst = ip6_normalize(packet[IPv6].dst)
74 dst2 = ip6_normalize("ff02::1:2")
75 self.assert_equal(dst, dst2)
76 src = ip6_normalize(packet[IPv6].src)
77 src2 = ip6_normalize(self.pg0.local_ip6_ll)
78 self.assert_equal(src, src2)
79 ia_na = packet[DHCP6OptIA_NA]
80 self.assert_equal(ia_na.T1, 20)
81 self.assert_equal(ia_na.T2, 40)
82 self.assert_equal(len(ia_na.ianaopts), 1)
83 address = ia_na.ianaopts[0]
84 self.assert_equal(address.addr, '1:2:3::5')
85 self.assert_equal(address.preflft, 60)
86 self.assert_equal(address.validlft, 120)
87
88 self.vapi.want_dhcp6_reply_events()
89
90 try:
91 ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=60,
92 validlft=120)
93 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
Paul Vinciguerrae8fece82019-02-28 15:34:00 -080094 IPv6(src=util.mk_ll_addr(self.pg0.remote_mac),
Juraj Slobodadd3b8f72018-05-04 14:20:06 +020095 dst=self.pg0.local_ip6_ll) /
96 UDP(sport=547, dport=546) /
97 DHCP6_Advertise(trid=trid) /
98 DHCP6OptServerId(duid=self.server_duid) /
99 DHCP6OptClientId(duid=client_duid) /
100 DHCP6OptPref(prefval=7) /
101 DHCP6OptStatusCode(statuscode=1) /
102 DHCP6OptIA_NA(iaid=1, T1=20, T2=40, ianaopts=ia_na_opts)
103 )
104 self.pg0.add_stream([p])
105 self.pg_start()
106
107 ev = self.vapi.wait_for_event(1, "dhcp6_reply_event")
108
109 self.assert_equal(ev.preference, 7)
110 self.assert_equal(ev.status_code, 1)
111 self.assert_equal(ev.T1, 20)
112 self.assert_equal(ev.T2, 40)
113
114 reported_address = ev.addresses[0]
115 address = inet_pton(AF_INET6, ia_na_opts.getfieldval("addr"))
116 self.assert_equal(reported_address.address, address)
117 self.assert_equal(reported_address.preferred_time,
118 ia_na_opts.getfieldval("preflft"))
119 self.assert_equal(reported_address.valid_time,
120 ia_na_opts.getfieldval("validlft"))
121
122 finally:
123 self.vapi.want_dhcp6_reply_events(enable_disable=0)
124
125 def test_dhcp_pd_send_solicit_receive_advertise(self):
Paul Vinciguerra8feeaff2019-03-27 11:25:48 -0700126 """ Verify DHCPv6 PD Solicit packet and Advertise event """
Juraj Sloboda81119e82018-05-25 14:02:20 +0200127
Juraj Slobodad9778c22018-06-12 10:21:05 +0200128 self.vapi.dhcp6_clients_enable_disable()
129
Juraj Sloboda81119e82018-05-25 14:02:20 +0200130 self.pg_enable_capture(self.pg_interfaces)
131 self.pg_start()
132 prefix_bin = '\00\01\00\02\00\03' + '\00' * 10
133 prefix = {'prefix': prefix_bin,
134 'prefix_length': 50,
135 'preferred_time': 60,
136 'valid_time': 120}
137 self.vapi.dhcp6_pd_send_client_message(1, self.pg0.sw_if_index,
138 T1=20, T2=40, prefixes=[prefix])
139 rx_list = self.pg0.get_capture(1)
140 self.assertEqual(len(rx_list), 1)
141 packet = rx_list[0]
142
Paul Vinciguerra978aa642018-11-24 22:19:12 -0800143 self.assertEqual(packet.haslayer(IPv6), 1)
144 self.assertEqual(packet[IPv6].haslayer(DHCP6_Solicit), 1)
Juraj Sloboda81119e82018-05-25 14:02:20 +0200145
146 client_duid = packet[DHCP6OptClientId].duid
147 trid = packet[DHCP6_Solicit].trid
148
149 dst = ip6_normalize(packet[IPv6].dst)
150 dst2 = ip6_normalize("ff02::1:2")
151 self.assert_equal(dst, dst2)
152 src = ip6_normalize(packet[IPv6].src)
153 src2 = ip6_normalize(self.pg0.local_ip6_ll)
154 self.assert_equal(src, src2)
155 ia_pd = packet[DHCP6OptIA_PD]
156 self.assert_equal(ia_pd.T1, 20)
157 self.assert_equal(ia_pd.T2, 40)
158 self.assert_equal(len(ia_pd.iapdopt), 1)
159 prefix = ia_pd.iapdopt[0]
160 self.assert_equal(prefix.prefix, '1:2:3::')
161 self.assert_equal(prefix.plen, 50)
162 self.assert_equal(prefix.preflft, 60)
163 self.assert_equal(prefix.validlft, 120)
164
165 self.vapi.want_dhcp6_pd_reply_events()
Juraj Sloboda81119e82018-05-25 14:02:20 +0200166
Juraj Slobodadd3b8f72018-05-04 14:20:06 +0200167 try:
168 ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=60,
169 validlft=120)
170 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
Paul Vinciguerrae8fece82019-02-28 15:34:00 -0800171 IPv6(src=util.mk_ll_addr(self.pg0.remote_mac),
Juraj Slobodadd3b8f72018-05-04 14:20:06 +0200172 dst=self.pg0.local_ip6_ll) /
173 UDP(sport=547, dport=546) /
174 DHCP6_Advertise(trid=trid) /
175 DHCP6OptServerId(duid=self.server_duid) /
176 DHCP6OptClientId(duid=client_duid) /
177 DHCP6OptPref(prefval=7) /
178 DHCP6OptStatusCode(statuscode=1) /
179 DHCP6OptIA_PD(iaid=1, T1=20, T2=40, iapdopt=ia_pd_opts)
180 )
181 self.pg0.add_stream([p])
182 self.pg_start()
183
184 ev = self.vapi.wait_for_event(1, "dhcp6_pd_reply_event")
185
186 self.assert_equal(ev.preference, 7)
187 self.assert_equal(ev.status_code, 1)
188 self.assert_equal(ev.T1, 20)
189 self.assert_equal(ev.T2, 40)
190
191 reported_prefix = ev.prefixes[0]
192 prefix = inet_pton(AF_INET6, ia_pd_opts.getfieldval("prefix"))
193 self.assert_equal(reported_prefix.prefix, prefix)
194 self.assert_equal(reported_prefix.prefix_length,
195 ia_pd_opts.getfieldval("plen"))
196 self.assert_equal(reported_prefix.preferred_time,
197 ia_pd_opts.getfieldval("preflft"))
198 self.assert_equal(reported_prefix.valid_time,
199 ia_pd_opts.getfieldval("validlft"))
200
201 finally:
202 self.vapi.want_dhcp6_pd_reply_events(enable_disable=0)
203
204
205class TestDHCPv6IANAControlPlane(VppTestCase):
206 """ DHCPv6 IA NA Control Plane Test Case """
207
208 @classmethod
209 def setUpClass(cls):
210 super(TestDHCPv6IANAControlPlane, cls).setUpClass()
211
Paul Vinciguerra7f9b7f92019-03-12 19:23:27 -0700212 @classmethod
213 def tearDownClass(cls):
214 super(TestDHCPv6IANAControlPlane, cls).tearDownClass()
215
Juraj Slobodadd3b8f72018-05-04 14:20:06 +0200216 def setUp(self):
217 super(TestDHCPv6IANAControlPlane, self).setUp()
218
219 self.create_pg_interfaces(range(1))
220 self.interfaces = list(self.pg_interfaces)
221 for i in self.interfaces:
222 i.admin_up()
223
224 self.server_duid = DUID_LL(lladdr=self.pg0.remote_mac)
225 self.client_duid = None
226 self.T1 = 1
227 self.T2 = 2
228
229 fib = self.vapi.ip6_fib_dump()
230 self.initial_addresses = set(self.get_interface_addresses(fib,
231 self.pg0))
232
233 self.pg_enable_capture(self.pg_interfaces)
234 self.pg_start()
235
236 self.vapi.dhcp6_client_enable_disable(self.pg0.sw_if_index)
237
238 def tearDown(self):
239 self.vapi.dhcp6_client_enable_disable(self.pg0.sw_if_index, enable=0)
240
241 for i in self.interfaces:
242 i.admin_down()
243
244 super(TestDHCPv6IANAControlPlane, self).tearDown()
245
246 @staticmethod
247 def get_interface_addresses(fib, pg):
248 lst = []
249 for entry in fib:
250 if entry.address_length == 128:
251 path = entry.path[0]
252 if path.sw_if_index == pg.sw_if_index:
253 lst.append(entry.address)
254 return lst
255
256 def get_addresses(self):
257 fib = self.vapi.ip6_fib_dump()
258 addresses = set(self.get_interface_addresses(fib, self.pg0))
259 return addresses.difference(self.initial_addresses)
260
261 def validate_duid_ll(self, duid):
262 DUID_LL(duid)
263
264 def validate_packet(self, packet, msg_type, is_resend=False):
265 try:
Paul Vinciguerra978aa642018-11-24 22:19:12 -0800266 self.assertEqual(packet.haslayer(msg_type), 1)
Juraj Slobodadd3b8f72018-05-04 14:20:06 +0200267 client_duid = packet[DHCP6OptClientId].duid
268 if self.client_duid is None:
269 self.client_duid = client_duid
270 self.validate_duid_ll(client_duid)
271 else:
272 self.assertEqual(self.client_duid, client_duid)
273 if msg_type != DHCP6_Solicit and msg_type != DHCP6_Rebind:
274 server_duid = packet[DHCP6OptServerId].duid
275 self.assertEqual(server_duid, self.server_duid)
276 if is_resend:
277 self.assertEqual(self.trid, packet[msg_type].trid)
278 else:
279 self.trid = packet[msg_type].trid
280 ip = packet[IPv6]
281 udp = packet[UDP]
282 self.assertEqual(ip.dst, 'ff02::1:2')
283 self.assertEqual(udp.sport, 546)
284 self.assertEqual(udp.dport, 547)
285 dhcpv6 = packet[msg_type]
286 elapsed_time = dhcpv6[DHCP6OptElapsedTime]
287 if (is_resend):
288 self.assertNotEqual(elapsed_time.elapsedtime, 0)
289 else:
290 self.assertEqual(elapsed_time.elapsedtime, 0)
291 except:
292 packet.show()
293 raise
294
295 def wait_for_packet(self, msg_type, timeout=None, is_resend=False):
296 if timeout is None:
297 timeout = 3
298 rx_list = self.pg0.get_capture(1, timeout=timeout)
299 packet = rx_list[0]
300 self.validate_packet(packet, msg_type, is_resend=is_resend)
301
302 def wait_for_solicit(self, timeout=None, is_resend=False):
303 self.wait_for_packet(DHCP6_Solicit, timeout, is_resend=is_resend)
304
305 def wait_for_request(self, timeout=None, is_resend=False):
306 self.wait_for_packet(DHCP6_Request, timeout, is_resend=is_resend)
307
308 def wait_for_renew(self, timeout=None, is_resend=False):
309 self.wait_for_packet(DHCP6_Renew, timeout, is_resend=is_resend)
310
311 def wait_for_rebind(self, timeout=None, is_resend=False):
312 self.wait_for_packet(DHCP6_Rebind, timeout, is_resend=is_resend)
313
314 def wait_for_release(self, timeout=None, is_resend=False):
315 self.wait_for_packet(DHCP6_Release, timeout, is_resend=is_resend)
316
317 def send_packet(self, msg_type, t1=None, t2=None, ianaopts=None):
318 if t1 is None:
319 t1 = self.T1
320 if t2 is None:
321 t2 = self.T2
322 if ianaopts is None:
323 opt_ia_na = DHCP6OptIA_NA(iaid=1, T1=t1, T2=t2)
324 else:
325 opt_ia_na = DHCP6OptIA_NA(iaid=1, T1=t1, T2=t2, ianaopts=ianaopts)
Juraj Sloboda81119e82018-05-25 14:02:20 +0200326 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
Paul Vinciguerrae8fece82019-02-28 15:34:00 -0800327 IPv6(src=util.mk_ll_addr(self.pg0.remote_mac),
Juraj Sloboda81119e82018-05-25 14:02:20 +0200328 dst=self.pg0.local_ip6_ll) /
329 UDP(sport=547, dport=546) /
Juraj Slobodadd3b8f72018-05-04 14:20:06 +0200330 msg_type(trid=self.trid) /
Juraj Sloboda81119e82018-05-25 14:02:20 +0200331 DHCP6OptServerId(duid=self.server_duid) /
Juraj Slobodadd3b8f72018-05-04 14:20:06 +0200332 DHCP6OptClientId(duid=self.client_duid) /
333 opt_ia_na
Juraj Sloboda81119e82018-05-25 14:02:20 +0200334 )
335 self.pg0.add_stream([p])
Juraj Slobodadd3b8f72018-05-04 14:20:06 +0200336 self.pg_enable_capture(self.pg_interfaces)
Juraj Sloboda81119e82018-05-25 14:02:20 +0200337 self.pg_start()
338
Juraj Slobodadd3b8f72018-05-04 14:20:06 +0200339 def send_advertise(self, t1=None, t2=None, ianaopts=None):
340 self.send_packet(DHCP6_Advertise, t1, t2, ianaopts)
Juraj Sloboda81119e82018-05-25 14:02:20 +0200341
Juraj Slobodadd3b8f72018-05-04 14:20:06 +0200342 def send_reply(self, t1=None, t2=None, ianaopts=None):
343 self.send_packet(DHCP6_Reply, t1, t2, ianaopts)
Juraj Sloboda81119e82018-05-25 14:02:20 +0200344
Juraj Slobodadd3b8f72018-05-04 14:20:06 +0200345 def test_T1_and_T2_timeouts(self):
346 """ Test T1 and T2 timeouts """
347
348 self.wait_for_solicit()
349 self.send_advertise()
350 self.wait_for_request()
351 self.send_reply()
352
353 self.sleep(1)
354
355 self.wait_for_renew()
356
357 self.pg_enable_capture(self.pg_interfaces)
358
359 self.sleep(1)
360
361 self.wait_for_rebind()
362
363 def test_addresses(self):
364 """ Test handling of addresses """
365
366 ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=1,
367 validlft=2)
368
369 self.wait_for_solicit()
370 self.send_advertise(t1=20, t2=40, ianaopts=ia_na_opts)
371 self.wait_for_request()
372 self.send_reply(t1=20, t2=40, ianaopts=ia_na_opts)
373 self.sleep(0.1)
374
375 # check FIB for new address
376 new_addresses = self.get_addresses()
377 self.assertEqual(len(new_addresses), 1)
378 addr = list(new_addresses)[0]
379 self.assertEqual(inet_ntop(AF_INET6, addr), '7:8::2')
380
381 self.sleep(2)
382
383 # check that the address is deleted
384 fib = self.vapi.ip6_fib_dump()
385 addresses = set(self.get_interface_addresses(fib, self.pg0))
386 new_addresses = addresses.difference(self.initial_addresses)
387 self.assertEqual(len(new_addresses), 0)
388
389 def test_sending_client_messages_solicit(self):
390 """ VPP receives messages from DHCPv6 client """
391
392 self.wait_for_solicit()
393 self.send_packet(DHCP6_Solicit)
394 self.send_packet(DHCP6_Request)
395 self.send_packet(DHCP6_Renew)
396 self.send_packet(DHCP6_Rebind)
397 self.sleep(1)
398 self.wait_for_solicit(is_resend=True)
399
400 def test_sending_inappropriate_packets(self):
401 """ Server sends messages with inappropriate message types """
402
403 self.wait_for_solicit()
404 self.send_reply()
405 self.wait_for_solicit(is_resend=True)
406 self.send_advertise()
407 self.wait_for_request()
408 self.send_advertise()
409 self.wait_for_request(is_resend=True)
410 self.send_reply()
411 self.wait_for_renew()
412
413 def test_no_address_available_in_advertise(self):
414 """ Advertise message contains NoAddrsAvail status code """
415
416 self.wait_for_solicit()
417 noavail = DHCP6OptStatusCode(statuscode=2) # NoAddrsAvail
418 self.send_advertise(ianaopts=noavail)
419 self.wait_for_solicit(is_resend=True)
420
Paul Vinciguerra8feeaff2019-03-27 11:25:48 -0700421 def test_preferred_greater_than_valid_lifetime(self):
Juraj Slobodadd3b8f72018-05-04 14:20:06 +0200422 """ Preferred lifetime is greater than valid lifetime """
423
424 self.wait_for_solicit()
425 self.send_advertise()
426 self.wait_for_request()
427 ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=4, validlft=3)
428 self.send_reply(ianaopts=ia_na_opts)
429
430 self.sleep(0.5)
431
432 # check FIB contains no addresses
433 fib = self.vapi.ip6_fib_dump()
434 addresses = set(self.get_interface_addresses(fib, self.pg0))
435 new_addresses = addresses.difference(self.initial_addresses)
436 self.assertEqual(len(new_addresses), 0)
437
438 def test_T1_greater_than_T2(self):
439 """ T1 is greater than T2 """
440
441 self.wait_for_solicit()
442 self.send_advertise()
443 self.wait_for_request()
444 ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=4, validlft=8)
445 self.send_reply(t1=80, t2=40, ianaopts=ia_na_opts)
446
447 self.sleep(0.5)
448
449 # check FIB contains no addresses
450 fib = self.vapi.ip6_fib_dump()
451 addresses = set(self.get_interface_addresses(fib, self.pg0))
452 new_addresses = addresses.difference(self.initial_addresses)
453 self.assertEqual(len(new_addresses), 0)
Juraj Sloboda81119e82018-05-25 14:02:20 +0200454
455
456class TestDHCPv6PDControlPlane(VppTestCase):
457 """ DHCPv6 PD Control Plane Test Case """
458
459 @classmethod
460 def setUpClass(cls):
461 super(TestDHCPv6PDControlPlane, cls).setUpClass()
462
Paul Vinciguerra7f9b7f92019-03-12 19:23:27 -0700463 @classmethod
464 def tearDownClass(cls):
465 super(TestDHCPv6PDControlPlane, cls).tearDownClass()
466
Juraj Sloboda81119e82018-05-25 14:02:20 +0200467 def setUp(self):
468 super(TestDHCPv6PDControlPlane, self).setUp()
469
470 self.create_pg_interfaces(range(2))
471 self.interfaces = list(self.pg_interfaces)
472 for i in self.interfaces:
473 i.admin_up()
474
Juraj Slobodad9778c22018-06-12 10:21:05 +0200475 self.server_duid = DUID_LL(lladdr=self.pg0.remote_mac)
Juraj Sloboda81119e82018-05-25 14:02:20 +0200476 self.client_duid = None
477 self.T1 = 1
478 self.T2 = 2
479
480 fib = self.vapi.ip6_fib_dump()
481 self.initial_addresses = set(self.get_interface_addresses(fib,
482 self.pg1))
483
484 self.pg_enable_capture(self.pg_interfaces)
485 self.pg_start()
486
487 self.prefix_group = 'my-pd-prefix-group'
488
489 self.vapi.dhcp6_pd_client_enable_disable(
490 self.pg0.sw_if_index,
491 prefix_group=self.prefix_group)
492
493 def tearDown(self):
494 self.vapi.dhcp6_pd_client_enable_disable(self.pg0.sw_if_index,
495 enable=0)
496
497 for i in self.interfaces:
498 i.admin_down()
499
500 super(TestDHCPv6PDControlPlane, self).tearDown()
501
502 @staticmethod
503 def get_interface_addresses(fib, pg):
504 lst = []
505 for entry in fib:
506 if entry.address_length == 128:
507 path = entry.path[0]
508 if path.sw_if_index == pg.sw_if_index:
509 lst.append(entry.address)
510 return lst
511
512 def get_addresses(self):
513 fib = self.vapi.ip6_fib_dump()
514 addresses = set(self.get_interface_addresses(fib, self.pg1))
515 return addresses.difference(self.initial_addresses)
516
Juraj Slobodad9778c22018-06-12 10:21:05 +0200517 def validate_duid_ll(self, duid):
518 DUID_LL(duid)
Juraj Sloboda81119e82018-05-25 14:02:20 +0200519
520 def validate_packet(self, packet, msg_type, is_resend=False):
521 try:
Paul Vinciguerra978aa642018-11-24 22:19:12 -0800522 self.assertEqual(packet.haslayer(msg_type), 1)
Juraj Sloboda81119e82018-05-25 14:02:20 +0200523 client_duid = packet[DHCP6OptClientId].duid
524 if self.client_duid is None:
525 self.client_duid = client_duid
Juraj Slobodad9778c22018-06-12 10:21:05 +0200526 self.validate_duid_ll(client_duid)
Juraj Sloboda81119e82018-05-25 14:02:20 +0200527 else:
528 self.assertEqual(self.client_duid, client_duid)
529 if msg_type != DHCP6_Solicit and msg_type != DHCP6_Rebind:
530 server_duid = packet[DHCP6OptServerId].duid
531 self.assertEqual(server_duid, self.server_duid)
532 if is_resend:
533 self.assertEqual(self.trid, packet[msg_type].trid)
534 else:
535 self.trid = packet[msg_type].trid
536 ip = packet[IPv6]
537 udp = packet[UDP]
538 self.assertEqual(ip.dst, 'ff02::1:2')
539 self.assertEqual(udp.sport, 546)
540 self.assertEqual(udp.dport, 547)
541 dhcpv6 = packet[msg_type]
542 elapsed_time = dhcpv6[DHCP6OptElapsedTime]
543 if (is_resend):
544 self.assertNotEqual(elapsed_time.elapsedtime, 0)
545 else:
546 self.assertEqual(elapsed_time.elapsedtime, 0)
547 except:
548 packet.show()
549 raise
550
551 def wait_for_packet(self, msg_type, timeout=None, is_resend=False):
552 if timeout is None:
553 timeout = 3
554 rx_list = self.pg0.get_capture(1, timeout=timeout)
555 packet = rx_list[0]
556 self.validate_packet(packet, msg_type, is_resend=is_resend)
557
558 def wait_for_solicit(self, timeout=None, is_resend=False):
559 self.wait_for_packet(DHCP6_Solicit, timeout, is_resend=is_resend)
560
561 def wait_for_request(self, timeout=None, is_resend=False):
562 self.wait_for_packet(DHCP6_Request, timeout, is_resend=is_resend)
563
564 def wait_for_renew(self, timeout=None, is_resend=False):
565 self.wait_for_packet(DHCP6_Renew, timeout, is_resend=is_resend)
566
567 def wait_for_rebind(self, timeout=None, is_resend=False):
568 self.wait_for_packet(DHCP6_Rebind, timeout, is_resend=is_resend)
569
570 def wait_for_release(self, timeout=None, is_resend=False):
571 self.wait_for_packet(DHCP6_Release, timeout, is_resend=is_resend)
572
573 def send_packet(self, msg_type, t1=None, t2=None, iapdopt=None):
574 if t1 is None:
575 t1 = self.T1
576 if t2 is None:
577 t2 = self.T2
578 if iapdopt is None:
579 opt_ia_pd = DHCP6OptIA_PD(iaid=1, T1=t1, T2=t2)
580 else:
581 opt_ia_pd = DHCP6OptIA_PD(iaid=1, T1=t1, T2=t2, iapdopt=iapdopt)
582 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
Paul Vinciguerrae8fece82019-02-28 15:34:00 -0800583 IPv6(src=util.mk_ll_addr(self.pg0.remote_mac),
Juraj Sloboda81119e82018-05-25 14:02:20 +0200584 dst=self.pg0.local_ip6_ll) /
585 UDP(sport=547, dport=546) /
586 msg_type(trid=self.trid) /
587 DHCP6OptServerId(duid=self.server_duid) /
588 DHCP6OptClientId(duid=self.client_duid) /
589 opt_ia_pd
590 )
591 self.pg0.add_stream([p])
592 self.pg_enable_capture(self.pg_interfaces)
593 self.pg_start()
594
595 def send_advertise(self, t1=None, t2=None, iapdopt=None):
596 self.send_packet(DHCP6_Advertise, t1, t2, iapdopt)
597
598 def send_reply(self, t1=None, t2=None, iapdopt=None):
599 self.send_packet(DHCP6_Reply, t1, t2, iapdopt)
600
601 def test_T1_and_T2_timeouts(self):
602 """ Test T1 and T2 timeouts """
603
604 self.wait_for_solicit()
605 self.send_advertise()
606 self.wait_for_request()
607 self.send_reply()
608
609 self.sleep(1)
610
611 self.wait_for_renew()
612
613 self.pg_enable_capture(self.pg_interfaces)
614
615 self.sleep(1)
616
617 self.wait_for_rebind()
618
619 def test_prefixes(self):
620 """ Test handling of prefixes """
Juraj Slobodadd3b8f72018-05-04 14:20:06 +0200621
Juraj Sloboda81119e82018-05-25 14:02:20 +0200622 address_bin_1 = None
623 address_bin_2 = None
624 try:
625 address_bin_1 = '\x00' * 6 + '\x00\x02' + '\x00' * 6 + '\x04\x05'
626 address_prefix_length_1 = 60
627 self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
628 address_bin_1,
629 address_prefix_length_1,
630 self.prefix_group)
631
632 ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=2,
633 validlft=3)
634
635 self.wait_for_solicit()
636 self.send_advertise(t1=20, t2=40, iapdopt=ia_pd_opts)
637 self.wait_for_request()
638 self.send_reply(t1=20, t2=40, iapdopt=ia_pd_opts)
639 self.sleep(0.1)
640
641 # check FIB for new address
642 new_addresses = self.get_addresses()
643 self.assertEqual(len(new_addresses), 1)
644 addr = list(new_addresses)[0]
645 self.assertEqual(inet_ntop(AF_INET6, addr), '7:8:0:2::405')
646
647 self.sleep(1)
648
649 address_bin_2 = '\x00' * 6 + '\x00\x76' + '\x00' * 6 + '\x04\x06'
650 address_prefix_length_2 = 62
651 self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
652 address_bin_2,
653 address_prefix_length_2,
654 self.prefix_group)
655
656 self.sleep(1)
657
658 # check FIB contains 2 addresses
659 fib = self.vapi.ip6_fib_dump()
660 addresses = set(self.get_interface_addresses(fib, self.pg1))
661 new_addresses = addresses.difference(self.initial_addresses)
662 self.assertEqual(len(new_addresses), 2)
663 addr1 = list(new_addresses)[0]
664 addr2 = list(new_addresses)[1]
665 if inet_ntop(AF_INET6, addr1) == '7:8:0:76::406':
666 addr1, addr2 = addr2, addr1
667 self.assertEqual(inet_ntop(AF_INET6, addr1), '7:8:0:2::405')
668 self.assertEqual(inet_ntop(AF_INET6, addr2), '7:8:0:76::406')
669
670 self.sleep(1)
671
672 # check that the addresses are deleted
673 fib = self.vapi.ip6_fib_dump()
674 addresses = set(self.get_interface_addresses(fib, self.pg1))
675 new_addresses = addresses.difference(self.initial_addresses)
676 self.assertEqual(len(new_addresses), 0)
677
678 finally:
679 if address_bin_1 is not None:
680 self.vapi.ip6_add_del_address_using_prefix(
681 self.pg1.sw_if_index, address_bin_1,
682 address_prefix_length_1, self.prefix_group, is_add=0)
683 if address_bin_2 is not None:
684 self.vapi.ip6_add_del_address_using_prefix(
685 self.pg1.sw_if_index, address_bin_2,
686 address_prefix_length_2, self.prefix_group, is_add=0)
687
688 def test_sending_client_messages_solicit(self):
689 """ VPP receives messages from DHCPv6 client """
690
691 self.wait_for_solicit()
692 self.send_packet(DHCP6_Solicit)
693 self.send_packet(DHCP6_Request)
694 self.send_packet(DHCP6_Renew)
695 self.send_packet(DHCP6_Rebind)
696 self.sleep(1)
697 self.wait_for_solicit(is_resend=True)
698
Juraj Slobodadd3b8f72018-05-04 14:20:06 +0200699 def test_sending_inappropriate_packets(self):
700 """ Server sends messages with inappropriate message types """
Juraj Sloboda81119e82018-05-25 14:02:20 +0200701
702 self.wait_for_solicit()
703 self.send_reply()
704 self.wait_for_solicit(is_resend=True)
705 self.send_advertise()
706 self.wait_for_request()
707 self.send_advertise()
708 self.wait_for_request(is_resend=True)
709 self.send_reply()
710 self.wait_for_renew()
711
712 def test_no_prefix_available_in_advertise(self):
713 """ Advertise message contains NoPrefixAvail status code """
714
715 self.wait_for_solicit()
716 noavail = DHCP6OptStatusCode(statuscode=6) # NoPrefixAvail
717 self.send_advertise(iapdopt=noavail)
718 self.wait_for_solicit(is_resend=True)
719
Paul Vinciguerra8feeaff2019-03-27 11:25:48 -0700720 def test_preferred_greater_than_valid_lifetime(self):
Juraj Sloboda81119e82018-05-25 14:02:20 +0200721 """ Preferred lifetime is greater than valid lifetime """
722
723 try:
724 address_bin = '\x00' * 6 + '\x00\x02' + '\x00' * 6 + '\x04\x05'
725 address_prefix_length = 60
726 self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
727 address_bin,
728 address_prefix_length,
729 self.prefix_group)
730
731 self.wait_for_solicit()
732 self.send_advertise()
733 self.wait_for_request()
734 ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=4,
735 validlft=3)
736 self.send_reply(iapdopt=ia_pd_opts)
737
738 self.sleep(0.5)
739
740 # check FIB contains no addresses
741 fib = self.vapi.ip6_fib_dump()
742 addresses = set(self.get_interface_addresses(fib, self.pg1))
743 new_addresses = addresses.difference(self.initial_addresses)
744 self.assertEqual(len(new_addresses), 0)
745
746 finally:
747 self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
748 address_bin,
749 address_prefix_length,
750 self.prefix_group,
751 is_add=0)
752
753 def test_T1_greater_than_T2(self):
754 """ T1 is greater than T2 """
755
756 try:
757 address_bin = '\x00' * 6 + '\x00\x02' + '\x00' * 6 + '\x04\x05'
758 address_prefix_length = 60
759 self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
760 address_bin,
761 address_prefix_length,
762 self.prefix_group)
763
764 self.wait_for_solicit()
765 self.send_advertise()
766 self.wait_for_request()
767 ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=4,
768 validlft=8)
769 self.send_reply(t1=80, t2=40, iapdopt=ia_pd_opts)
770
771 self.sleep(0.5)
772
773 # check FIB contains no addresses
774 fib = self.vapi.ip6_fib_dump()
775 addresses = set(self.get_interface_addresses(fib, self.pg1))
776 new_addresses = addresses.difference(self.initial_addresses)
777 self.assertEqual(len(new_addresses), 0)
778
779 finally:
780 self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
781 address_bin,
782 address_prefix_length,
783 self.prefix_group,
784 is_add=0)