blob: 6c203868673989fd14336e4b96629ec96df89323 [file] [log] [blame]
Juraj Sloboda81119e82018-05-25 14:02:20 +02001from scapy.layers.dhcp6 import DHCP6_Advertise, DHCP6OptClientId, \
2 DHCP6OptStatusCode, DHCP6OptPref, DHCP6OptIA_PD, DHCP6OptIAPrefix, \
3 DHCP6OptServerId, DHCP6_Solicit, DHCP6_Reply, DHCP6_Request, DHCP6_Renew, \
Juraj Slobodadd3b8f72018-05-04 14:20:06 +02004 DHCP6_Rebind, DUID_LL, DHCP6_Release, DHCP6OptElapsedTime, DHCP6OptIA_NA, \
5 DHCP6OptIAAddress
Juraj Sloboda81119e82018-05-25 14:02:20 +02006from scapy.layers.inet6 import IPv6, Ether, UDP
7from scapy.utils6 import in6_mactoifaceid
8from scapy.utils import inet_ntop, inet_pton
9from socket import AF_INET6
10from framework import VppTestCase
Juraj Sloboda81119e82018-05-25 14:02:20 +020011
12
13def ip6_normalize(ip6):
14 return inet_ntop(AF_INET6, inet_pton(AF_INET6, ip6))
15
16
17def mk_ll_addr(mac):
18 euid = in6_mactoifaceid(mac)
19 addr = "fe80::" + euid
20 return addr
21
22
Juraj Slobodadd3b8f72018-05-04 14:20:06 +020023class TestDHCPv6DataPlane(VppTestCase):
24 """ DHCPv6 Data Plane Test Case """
Juraj Sloboda81119e82018-05-25 14:02:20 +020025
26 @classmethod
27 def setUpClass(cls):
Juraj Slobodadd3b8f72018-05-04 14:20:06 +020028 super(TestDHCPv6DataPlane, cls).setUpClass()
Juraj Sloboda81119e82018-05-25 14:02:20 +020029
30 def setUp(self):
Juraj Slobodadd3b8f72018-05-04 14:20:06 +020031 super(TestDHCPv6DataPlane, self).setUp()
Juraj Sloboda81119e82018-05-25 14:02:20 +020032
33 self.create_pg_interfaces(range(1))
34 self.interfaces = list(self.pg_interfaces)
35 for i in self.interfaces:
36 i.admin_up()
37 i.config_ip6()
38
Juraj Slobodad9778c22018-06-12 10:21:05 +020039 self.server_duid = DUID_LL(lladdr=self.pg0.remote_mac)
Juraj Sloboda81119e82018-05-25 14:02:20 +020040
41 def tearDown(self):
42 for i in self.interfaces:
43 i.unconfig_ip6()
44 i.admin_down()
Juraj Slobodadd3b8f72018-05-04 14:20:06 +020045 super(TestDHCPv6DataPlane, self).tearDown()
Juraj Sloboda81119e82018-05-25 14:02:20 +020046
Juraj Slobodadd3b8f72018-05-04 14:20:06 +020047 def test_dhcp_ia_na_send_solicit_receive_advertise(self):
48 """ Verify DHCPv6 IA NA Solicit packet and Advertise envent """
49
50 self.vapi.dhcp6_clients_enable_disable()
51
52 self.pg_enable_capture(self.pg_interfaces)
53 self.pg_start()
54 address_bin = '\00\01\00\02\00\03' + '\00' * 8 + '\00\05'
55 address = {'address': address_bin,
56 'preferred_time': 60,
57 'valid_time': 120}
58 self.vapi.dhcp6_send_client_message(1, self.pg0.sw_if_index,
59 T1=20, T2=40, addresses=[address])
60 rx_list = self.pg0.get_capture(1)
61 self.assertEqual(len(rx_list), 1)
62 packet = rx_list[0]
63
Paul Vinciguerra978aa642018-11-24 22:19:12 -080064 self.assertEqual(packet.haslayer(IPv6), 1)
65 self.assertEqual(packet[IPv6].haslayer(DHCP6_Solicit), 1)
Juraj Slobodadd3b8f72018-05-04 14:20:06 +020066
67 client_duid = packet[DHCP6OptClientId].duid
68 trid = packet[DHCP6_Solicit].trid
69
70 dst = ip6_normalize(packet[IPv6].dst)
71 dst2 = ip6_normalize("ff02::1:2")
72 self.assert_equal(dst, dst2)
73 src = ip6_normalize(packet[IPv6].src)
74 src2 = ip6_normalize(self.pg0.local_ip6_ll)
75 self.assert_equal(src, src2)
76 ia_na = packet[DHCP6OptIA_NA]
77 self.assert_equal(ia_na.T1, 20)
78 self.assert_equal(ia_na.T2, 40)
79 self.assert_equal(len(ia_na.ianaopts), 1)
80 address = ia_na.ianaopts[0]
81 self.assert_equal(address.addr, '1:2:3::5')
82 self.assert_equal(address.preflft, 60)
83 self.assert_equal(address.validlft, 120)
84
85 self.vapi.want_dhcp6_reply_events()
86
87 try:
88 ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=60,
89 validlft=120)
90 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
91 IPv6(src=mk_ll_addr(self.pg0.remote_mac),
92 dst=self.pg0.local_ip6_ll) /
93 UDP(sport=547, dport=546) /
94 DHCP6_Advertise(trid=trid) /
95 DHCP6OptServerId(duid=self.server_duid) /
96 DHCP6OptClientId(duid=client_duid) /
97 DHCP6OptPref(prefval=7) /
98 DHCP6OptStatusCode(statuscode=1) /
99 DHCP6OptIA_NA(iaid=1, T1=20, T2=40, ianaopts=ia_na_opts)
100 )
101 self.pg0.add_stream([p])
102 self.pg_start()
103
104 ev = self.vapi.wait_for_event(1, "dhcp6_reply_event")
105
106 self.assert_equal(ev.preference, 7)
107 self.assert_equal(ev.status_code, 1)
108 self.assert_equal(ev.T1, 20)
109 self.assert_equal(ev.T2, 40)
110
111 reported_address = ev.addresses[0]
112 address = inet_pton(AF_INET6, ia_na_opts.getfieldval("addr"))
113 self.assert_equal(reported_address.address, address)
114 self.assert_equal(reported_address.preferred_time,
115 ia_na_opts.getfieldval("preflft"))
116 self.assert_equal(reported_address.valid_time,
117 ia_na_opts.getfieldval("validlft"))
118
119 finally:
120 self.vapi.want_dhcp6_reply_events(enable_disable=0)
121
122 def test_dhcp_pd_send_solicit_receive_advertise(self):
123 """ Verify DHCPv6 PD Solicit packet and Advertise envent """
Juraj Sloboda81119e82018-05-25 14:02:20 +0200124
Juraj Slobodad9778c22018-06-12 10:21:05 +0200125 self.vapi.dhcp6_clients_enable_disable()
126
Juraj Sloboda81119e82018-05-25 14:02:20 +0200127 self.pg_enable_capture(self.pg_interfaces)
128 self.pg_start()
129 prefix_bin = '\00\01\00\02\00\03' + '\00' * 10
130 prefix = {'prefix': prefix_bin,
131 'prefix_length': 50,
132 'preferred_time': 60,
133 'valid_time': 120}
134 self.vapi.dhcp6_pd_send_client_message(1, self.pg0.sw_if_index,
135 T1=20, T2=40, prefixes=[prefix])
136 rx_list = self.pg0.get_capture(1)
137 self.assertEqual(len(rx_list), 1)
138 packet = rx_list[0]
139
Paul Vinciguerra978aa642018-11-24 22:19:12 -0800140 self.assertEqual(packet.haslayer(IPv6), 1)
141 self.assertEqual(packet[IPv6].haslayer(DHCP6_Solicit), 1)
Juraj Sloboda81119e82018-05-25 14:02:20 +0200142
143 client_duid = packet[DHCP6OptClientId].duid
144 trid = packet[DHCP6_Solicit].trid
145
146 dst = ip6_normalize(packet[IPv6].dst)
147 dst2 = ip6_normalize("ff02::1:2")
148 self.assert_equal(dst, dst2)
149 src = ip6_normalize(packet[IPv6].src)
150 src2 = ip6_normalize(self.pg0.local_ip6_ll)
151 self.assert_equal(src, src2)
152 ia_pd = packet[DHCP6OptIA_PD]
153 self.assert_equal(ia_pd.T1, 20)
154 self.assert_equal(ia_pd.T2, 40)
155 self.assert_equal(len(ia_pd.iapdopt), 1)
156 prefix = ia_pd.iapdopt[0]
157 self.assert_equal(prefix.prefix, '1:2:3::')
158 self.assert_equal(prefix.plen, 50)
159 self.assert_equal(prefix.preflft, 60)
160 self.assert_equal(prefix.validlft, 120)
161
162 self.vapi.want_dhcp6_pd_reply_events()
Juraj Sloboda81119e82018-05-25 14:02:20 +0200163
Juraj Slobodadd3b8f72018-05-04 14:20:06 +0200164 try:
165 ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=60,
166 validlft=120)
167 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
168 IPv6(src=mk_ll_addr(self.pg0.remote_mac),
169 dst=self.pg0.local_ip6_ll) /
170 UDP(sport=547, dport=546) /
171 DHCP6_Advertise(trid=trid) /
172 DHCP6OptServerId(duid=self.server_duid) /
173 DHCP6OptClientId(duid=client_duid) /
174 DHCP6OptPref(prefval=7) /
175 DHCP6OptStatusCode(statuscode=1) /
176 DHCP6OptIA_PD(iaid=1, T1=20, T2=40, iapdopt=ia_pd_opts)
177 )
178 self.pg0.add_stream([p])
179 self.pg_start()
180
181 ev = self.vapi.wait_for_event(1, "dhcp6_pd_reply_event")
182
183 self.assert_equal(ev.preference, 7)
184 self.assert_equal(ev.status_code, 1)
185 self.assert_equal(ev.T1, 20)
186 self.assert_equal(ev.T2, 40)
187
188 reported_prefix = ev.prefixes[0]
189 prefix = inet_pton(AF_INET6, ia_pd_opts.getfieldval("prefix"))
190 self.assert_equal(reported_prefix.prefix, prefix)
191 self.assert_equal(reported_prefix.prefix_length,
192 ia_pd_opts.getfieldval("plen"))
193 self.assert_equal(reported_prefix.preferred_time,
194 ia_pd_opts.getfieldval("preflft"))
195 self.assert_equal(reported_prefix.valid_time,
196 ia_pd_opts.getfieldval("validlft"))
197
198 finally:
199 self.vapi.want_dhcp6_pd_reply_events(enable_disable=0)
200
201
202class TestDHCPv6IANAControlPlane(VppTestCase):
203 """ DHCPv6 IA NA Control Plane Test Case """
204
205 @classmethod
206 def setUpClass(cls):
207 super(TestDHCPv6IANAControlPlane, cls).setUpClass()
208
209 def setUp(self):
210 super(TestDHCPv6IANAControlPlane, self).setUp()
211
212 self.create_pg_interfaces(range(1))
213 self.interfaces = list(self.pg_interfaces)
214 for i in self.interfaces:
215 i.admin_up()
216
217 self.server_duid = DUID_LL(lladdr=self.pg0.remote_mac)
218 self.client_duid = None
219 self.T1 = 1
220 self.T2 = 2
221
222 fib = self.vapi.ip6_fib_dump()
223 self.initial_addresses = set(self.get_interface_addresses(fib,
224 self.pg0))
225
226 self.pg_enable_capture(self.pg_interfaces)
227 self.pg_start()
228
229 self.vapi.dhcp6_client_enable_disable(self.pg0.sw_if_index)
230
231 def tearDown(self):
232 self.vapi.dhcp6_client_enable_disable(self.pg0.sw_if_index, enable=0)
233
234 for i in self.interfaces:
235 i.admin_down()
236
237 super(TestDHCPv6IANAControlPlane, self).tearDown()
238
239 @staticmethod
240 def get_interface_addresses(fib, pg):
241 lst = []
242 for entry in fib:
243 if entry.address_length == 128:
244 path = entry.path[0]
245 if path.sw_if_index == pg.sw_if_index:
246 lst.append(entry.address)
247 return lst
248
249 def get_addresses(self):
250 fib = self.vapi.ip6_fib_dump()
251 addresses = set(self.get_interface_addresses(fib, self.pg0))
252 return addresses.difference(self.initial_addresses)
253
254 def validate_duid_ll(self, duid):
255 DUID_LL(duid)
256
257 def validate_packet(self, packet, msg_type, is_resend=False):
258 try:
Paul Vinciguerra978aa642018-11-24 22:19:12 -0800259 self.assertEqual(packet.haslayer(msg_type), 1)
Juraj Slobodadd3b8f72018-05-04 14:20:06 +0200260 client_duid = packet[DHCP6OptClientId].duid
261 if self.client_duid is None:
262 self.client_duid = client_duid
263 self.validate_duid_ll(client_duid)
264 else:
265 self.assertEqual(self.client_duid, client_duid)
266 if msg_type != DHCP6_Solicit and msg_type != DHCP6_Rebind:
267 server_duid = packet[DHCP6OptServerId].duid
268 self.assertEqual(server_duid, self.server_duid)
269 if is_resend:
270 self.assertEqual(self.trid, packet[msg_type].trid)
271 else:
272 self.trid = packet[msg_type].trid
273 ip = packet[IPv6]
274 udp = packet[UDP]
275 self.assertEqual(ip.dst, 'ff02::1:2')
276 self.assertEqual(udp.sport, 546)
277 self.assertEqual(udp.dport, 547)
278 dhcpv6 = packet[msg_type]
279 elapsed_time = dhcpv6[DHCP6OptElapsedTime]
280 if (is_resend):
281 self.assertNotEqual(elapsed_time.elapsedtime, 0)
282 else:
283 self.assertEqual(elapsed_time.elapsedtime, 0)
284 except:
285 packet.show()
286 raise
287
288 def wait_for_packet(self, msg_type, timeout=None, is_resend=False):
289 if timeout is None:
290 timeout = 3
291 rx_list = self.pg0.get_capture(1, timeout=timeout)
292 packet = rx_list[0]
293 self.validate_packet(packet, msg_type, is_resend=is_resend)
294
295 def wait_for_solicit(self, timeout=None, is_resend=False):
296 self.wait_for_packet(DHCP6_Solicit, timeout, is_resend=is_resend)
297
298 def wait_for_request(self, timeout=None, is_resend=False):
299 self.wait_for_packet(DHCP6_Request, timeout, is_resend=is_resend)
300
301 def wait_for_renew(self, timeout=None, is_resend=False):
302 self.wait_for_packet(DHCP6_Renew, timeout, is_resend=is_resend)
303
304 def wait_for_rebind(self, timeout=None, is_resend=False):
305 self.wait_for_packet(DHCP6_Rebind, timeout, is_resend=is_resend)
306
307 def wait_for_release(self, timeout=None, is_resend=False):
308 self.wait_for_packet(DHCP6_Release, timeout, is_resend=is_resend)
309
310 def send_packet(self, msg_type, t1=None, t2=None, ianaopts=None):
311 if t1 is None:
312 t1 = self.T1
313 if t2 is None:
314 t2 = self.T2
315 if ianaopts is None:
316 opt_ia_na = DHCP6OptIA_NA(iaid=1, T1=t1, T2=t2)
317 else:
318 opt_ia_na = DHCP6OptIA_NA(iaid=1, T1=t1, T2=t2, ianaopts=ianaopts)
Juraj Sloboda81119e82018-05-25 14:02:20 +0200319 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
320 IPv6(src=mk_ll_addr(self.pg0.remote_mac),
321 dst=self.pg0.local_ip6_ll) /
322 UDP(sport=547, dport=546) /
Juraj Slobodadd3b8f72018-05-04 14:20:06 +0200323 msg_type(trid=self.trid) /
Juraj Sloboda81119e82018-05-25 14:02:20 +0200324 DHCP6OptServerId(duid=self.server_duid) /
Juraj Slobodadd3b8f72018-05-04 14:20:06 +0200325 DHCP6OptClientId(duid=self.client_duid) /
326 opt_ia_na
Juraj Sloboda81119e82018-05-25 14:02:20 +0200327 )
328 self.pg0.add_stream([p])
Juraj Slobodadd3b8f72018-05-04 14:20:06 +0200329 self.pg_enable_capture(self.pg_interfaces)
Juraj Sloboda81119e82018-05-25 14:02:20 +0200330 self.pg_start()
331
Juraj Slobodadd3b8f72018-05-04 14:20:06 +0200332 def send_advertise(self, t1=None, t2=None, ianaopts=None):
333 self.send_packet(DHCP6_Advertise, t1, t2, ianaopts)
Juraj Sloboda81119e82018-05-25 14:02:20 +0200334
Juraj Slobodadd3b8f72018-05-04 14:20:06 +0200335 def send_reply(self, t1=None, t2=None, ianaopts=None):
336 self.send_packet(DHCP6_Reply, t1, t2, ianaopts)
Juraj Sloboda81119e82018-05-25 14:02:20 +0200337
Juraj Slobodadd3b8f72018-05-04 14:20:06 +0200338 def test_T1_and_T2_timeouts(self):
339 """ Test T1 and T2 timeouts """
340
341 self.wait_for_solicit()
342 self.send_advertise()
343 self.wait_for_request()
344 self.send_reply()
345
346 self.sleep(1)
347
348 self.wait_for_renew()
349
350 self.pg_enable_capture(self.pg_interfaces)
351
352 self.sleep(1)
353
354 self.wait_for_rebind()
355
356 def test_addresses(self):
357 """ Test handling of addresses """
358
359 ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=1,
360 validlft=2)
361
362 self.wait_for_solicit()
363 self.send_advertise(t1=20, t2=40, ianaopts=ia_na_opts)
364 self.wait_for_request()
365 self.send_reply(t1=20, t2=40, ianaopts=ia_na_opts)
366 self.sleep(0.1)
367
368 # check FIB for new address
369 new_addresses = self.get_addresses()
370 self.assertEqual(len(new_addresses), 1)
371 addr = list(new_addresses)[0]
372 self.assertEqual(inet_ntop(AF_INET6, addr), '7:8::2')
373
374 self.sleep(2)
375
376 # check that the address is deleted
377 fib = self.vapi.ip6_fib_dump()
378 addresses = set(self.get_interface_addresses(fib, self.pg0))
379 new_addresses = addresses.difference(self.initial_addresses)
380 self.assertEqual(len(new_addresses), 0)
381
382 def test_sending_client_messages_solicit(self):
383 """ VPP receives messages from DHCPv6 client """
384
385 self.wait_for_solicit()
386 self.send_packet(DHCP6_Solicit)
387 self.send_packet(DHCP6_Request)
388 self.send_packet(DHCP6_Renew)
389 self.send_packet(DHCP6_Rebind)
390 self.sleep(1)
391 self.wait_for_solicit(is_resend=True)
392
393 def test_sending_inappropriate_packets(self):
394 """ Server sends messages with inappropriate message types """
395
396 self.wait_for_solicit()
397 self.send_reply()
398 self.wait_for_solicit(is_resend=True)
399 self.send_advertise()
400 self.wait_for_request()
401 self.send_advertise()
402 self.wait_for_request(is_resend=True)
403 self.send_reply()
404 self.wait_for_renew()
405
406 def test_no_address_available_in_advertise(self):
407 """ Advertise message contains NoAddrsAvail status code """
408
409 self.wait_for_solicit()
410 noavail = DHCP6OptStatusCode(statuscode=2) # NoAddrsAvail
411 self.send_advertise(ianaopts=noavail)
412 self.wait_for_solicit(is_resend=True)
413
414 def test_preferred_greater_than_valit_lifetime(self):
415 """ Preferred lifetime is greater than valid lifetime """
416
417 self.wait_for_solicit()
418 self.send_advertise()
419 self.wait_for_request()
420 ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=4, validlft=3)
421 self.send_reply(ianaopts=ia_na_opts)
422
423 self.sleep(0.5)
424
425 # check FIB contains no addresses
426 fib = self.vapi.ip6_fib_dump()
427 addresses = set(self.get_interface_addresses(fib, self.pg0))
428 new_addresses = addresses.difference(self.initial_addresses)
429 self.assertEqual(len(new_addresses), 0)
430
431 def test_T1_greater_than_T2(self):
432 """ T1 is greater than T2 """
433
434 self.wait_for_solicit()
435 self.send_advertise()
436 self.wait_for_request()
437 ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=4, validlft=8)
438 self.send_reply(t1=80, t2=40, ianaopts=ia_na_opts)
439
440 self.sleep(0.5)
441
442 # check FIB contains no addresses
443 fib = self.vapi.ip6_fib_dump()
444 addresses = set(self.get_interface_addresses(fib, self.pg0))
445 new_addresses = addresses.difference(self.initial_addresses)
446 self.assertEqual(len(new_addresses), 0)
Juraj Sloboda81119e82018-05-25 14:02:20 +0200447
448
449class TestDHCPv6PDControlPlane(VppTestCase):
450 """ DHCPv6 PD Control Plane Test Case """
451
452 @classmethod
453 def setUpClass(cls):
454 super(TestDHCPv6PDControlPlane, cls).setUpClass()
455
456 def setUp(self):
457 super(TestDHCPv6PDControlPlane, self).setUp()
458
459 self.create_pg_interfaces(range(2))
460 self.interfaces = list(self.pg_interfaces)
461 for i in self.interfaces:
462 i.admin_up()
463
Juraj Slobodad9778c22018-06-12 10:21:05 +0200464 self.server_duid = DUID_LL(lladdr=self.pg0.remote_mac)
Juraj Sloboda81119e82018-05-25 14:02:20 +0200465 self.client_duid = None
466 self.T1 = 1
467 self.T2 = 2
468
469 fib = self.vapi.ip6_fib_dump()
470 self.initial_addresses = set(self.get_interface_addresses(fib,
471 self.pg1))
472
473 self.pg_enable_capture(self.pg_interfaces)
474 self.pg_start()
475
476 self.prefix_group = 'my-pd-prefix-group'
477
478 self.vapi.dhcp6_pd_client_enable_disable(
479 self.pg0.sw_if_index,
480 prefix_group=self.prefix_group)
481
482 def tearDown(self):
483 self.vapi.dhcp6_pd_client_enable_disable(self.pg0.sw_if_index,
484 enable=0)
485
486 for i in self.interfaces:
487 i.admin_down()
488
489 super(TestDHCPv6PDControlPlane, self).tearDown()
490
491 @staticmethod
492 def get_interface_addresses(fib, pg):
493 lst = []
494 for entry in fib:
495 if entry.address_length == 128:
496 path = entry.path[0]
497 if path.sw_if_index == pg.sw_if_index:
498 lst.append(entry.address)
499 return lst
500
501 def get_addresses(self):
502 fib = self.vapi.ip6_fib_dump()
503 addresses = set(self.get_interface_addresses(fib, self.pg1))
504 return addresses.difference(self.initial_addresses)
505
Juraj Slobodad9778c22018-06-12 10:21:05 +0200506 def validate_duid_ll(self, duid):
507 DUID_LL(duid)
Juraj Sloboda81119e82018-05-25 14:02:20 +0200508
509 def validate_packet(self, packet, msg_type, is_resend=False):
510 try:
Paul Vinciguerra978aa642018-11-24 22:19:12 -0800511 self.assertEqual(packet.haslayer(msg_type), 1)
Juraj Sloboda81119e82018-05-25 14:02:20 +0200512 client_duid = packet[DHCP6OptClientId].duid
513 if self.client_duid is None:
514 self.client_duid = client_duid
Juraj Slobodad9778c22018-06-12 10:21:05 +0200515 self.validate_duid_ll(client_duid)
Juraj Sloboda81119e82018-05-25 14:02:20 +0200516 else:
517 self.assertEqual(self.client_duid, client_duid)
518 if msg_type != DHCP6_Solicit and msg_type != DHCP6_Rebind:
519 server_duid = packet[DHCP6OptServerId].duid
520 self.assertEqual(server_duid, self.server_duid)
521 if is_resend:
522 self.assertEqual(self.trid, packet[msg_type].trid)
523 else:
524 self.trid = packet[msg_type].trid
525 ip = packet[IPv6]
526 udp = packet[UDP]
527 self.assertEqual(ip.dst, 'ff02::1:2')
528 self.assertEqual(udp.sport, 546)
529 self.assertEqual(udp.dport, 547)
530 dhcpv6 = packet[msg_type]
531 elapsed_time = dhcpv6[DHCP6OptElapsedTime]
532 if (is_resend):
533 self.assertNotEqual(elapsed_time.elapsedtime, 0)
534 else:
535 self.assertEqual(elapsed_time.elapsedtime, 0)
536 except:
537 packet.show()
538 raise
539
540 def wait_for_packet(self, msg_type, timeout=None, is_resend=False):
541 if timeout is None:
542 timeout = 3
543 rx_list = self.pg0.get_capture(1, timeout=timeout)
544 packet = rx_list[0]
545 self.validate_packet(packet, msg_type, is_resend=is_resend)
546
547 def wait_for_solicit(self, timeout=None, is_resend=False):
548 self.wait_for_packet(DHCP6_Solicit, timeout, is_resend=is_resend)
549
550 def wait_for_request(self, timeout=None, is_resend=False):
551 self.wait_for_packet(DHCP6_Request, timeout, is_resend=is_resend)
552
553 def wait_for_renew(self, timeout=None, is_resend=False):
554 self.wait_for_packet(DHCP6_Renew, timeout, is_resend=is_resend)
555
556 def wait_for_rebind(self, timeout=None, is_resend=False):
557 self.wait_for_packet(DHCP6_Rebind, timeout, is_resend=is_resend)
558
559 def wait_for_release(self, timeout=None, is_resend=False):
560 self.wait_for_packet(DHCP6_Release, timeout, is_resend=is_resend)
561
562 def send_packet(self, msg_type, t1=None, t2=None, iapdopt=None):
563 if t1 is None:
564 t1 = self.T1
565 if t2 is None:
566 t2 = self.T2
567 if iapdopt is None:
568 opt_ia_pd = DHCP6OptIA_PD(iaid=1, T1=t1, T2=t2)
569 else:
570 opt_ia_pd = DHCP6OptIA_PD(iaid=1, T1=t1, T2=t2, iapdopt=iapdopt)
571 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
572 IPv6(src=mk_ll_addr(self.pg0.remote_mac),
573 dst=self.pg0.local_ip6_ll) /
574 UDP(sport=547, dport=546) /
575 msg_type(trid=self.trid) /
576 DHCP6OptServerId(duid=self.server_duid) /
577 DHCP6OptClientId(duid=self.client_duid) /
578 opt_ia_pd
579 )
580 self.pg0.add_stream([p])
581 self.pg_enable_capture(self.pg_interfaces)
582 self.pg_start()
583
584 def send_advertise(self, t1=None, t2=None, iapdopt=None):
585 self.send_packet(DHCP6_Advertise, t1, t2, iapdopt)
586
587 def send_reply(self, t1=None, t2=None, iapdopt=None):
588 self.send_packet(DHCP6_Reply, t1, t2, iapdopt)
589
590 def test_T1_and_T2_timeouts(self):
591 """ Test T1 and T2 timeouts """
592
593 self.wait_for_solicit()
594 self.send_advertise()
595 self.wait_for_request()
596 self.send_reply()
597
598 self.sleep(1)
599
600 self.wait_for_renew()
601
602 self.pg_enable_capture(self.pg_interfaces)
603
604 self.sleep(1)
605
606 self.wait_for_rebind()
607
608 def test_prefixes(self):
609 """ Test handling of prefixes """
Juraj Slobodadd3b8f72018-05-04 14:20:06 +0200610
Juraj Sloboda81119e82018-05-25 14:02:20 +0200611 address_bin_1 = None
612 address_bin_2 = None
613 try:
614 address_bin_1 = '\x00' * 6 + '\x00\x02' + '\x00' * 6 + '\x04\x05'
615 address_prefix_length_1 = 60
616 self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
617 address_bin_1,
618 address_prefix_length_1,
619 self.prefix_group)
620
621 ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=2,
622 validlft=3)
623
624 self.wait_for_solicit()
625 self.send_advertise(t1=20, t2=40, iapdopt=ia_pd_opts)
626 self.wait_for_request()
627 self.send_reply(t1=20, t2=40, iapdopt=ia_pd_opts)
628 self.sleep(0.1)
629
630 # check FIB for new address
631 new_addresses = self.get_addresses()
632 self.assertEqual(len(new_addresses), 1)
633 addr = list(new_addresses)[0]
634 self.assertEqual(inet_ntop(AF_INET6, addr), '7:8:0:2::405')
635
636 self.sleep(1)
637
638 address_bin_2 = '\x00' * 6 + '\x00\x76' + '\x00' * 6 + '\x04\x06'
639 address_prefix_length_2 = 62
640 self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
641 address_bin_2,
642 address_prefix_length_2,
643 self.prefix_group)
644
645 self.sleep(1)
646
647 # check FIB contains 2 addresses
648 fib = self.vapi.ip6_fib_dump()
649 addresses = set(self.get_interface_addresses(fib, self.pg1))
650 new_addresses = addresses.difference(self.initial_addresses)
651 self.assertEqual(len(new_addresses), 2)
652 addr1 = list(new_addresses)[0]
653 addr2 = list(new_addresses)[1]
654 if inet_ntop(AF_INET6, addr1) == '7:8:0:76::406':
655 addr1, addr2 = addr2, addr1
656 self.assertEqual(inet_ntop(AF_INET6, addr1), '7:8:0:2::405')
657 self.assertEqual(inet_ntop(AF_INET6, addr2), '7:8:0:76::406')
658
659 self.sleep(1)
660
661 # check that the addresses are deleted
662 fib = self.vapi.ip6_fib_dump()
663 addresses = set(self.get_interface_addresses(fib, self.pg1))
664 new_addresses = addresses.difference(self.initial_addresses)
665 self.assertEqual(len(new_addresses), 0)
666
667 finally:
668 if address_bin_1 is not None:
669 self.vapi.ip6_add_del_address_using_prefix(
670 self.pg1.sw_if_index, address_bin_1,
671 address_prefix_length_1, self.prefix_group, is_add=0)
672 if address_bin_2 is not None:
673 self.vapi.ip6_add_del_address_using_prefix(
674 self.pg1.sw_if_index, address_bin_2,
675 address_prefix_length_2, self.prefix_group, is_add=0)
676
677 def test_sending_client_messages_solicit(self):
678 """ VPP receives messages from DHCPv6 client """
679
680 self.wait_for_solicit()
681 self.send_packet(DHCP6_Solicit)
682 self.send_packet(DHCP6_Request)
683 self.send_packet(DHCP6_Renew)
684 self.send_packet(DHCP6_Rebind)
685 self.sleep(1)
686 self.wait_for_solicit(is_resend=True)
687
Juraj Slobodadd3b8f72018-05-04 14:20:06 +0200688 def test_sending_inappropriate_packets(self):
689 """ Server sends messages with inappropriate message types """
Juraj Sloboda81119e82018-05-25 14:02:20 +0200690
691 self.wait_for_solicit()
692 self.send_reply()
693 self.wait_for_solicit(is_resend=True)
694 self.send_advertise()
695 self.wait_for_request()
696 self.send_advertise()
697 self.wait_for_request(is_resend=True)
698 self.send_reply()
699 self.wait_for_renew()
700
701 def test_no_prefix_available_in_advertise(self):
702 """ Advertise message contains NoPrefixAvail status code """
703
704 self.wait_for_solicit()
705 noavail = DHCP6OptStatusCode(statuscode=6) # NoPrefixAvail
706 self.send_advertise(iapdopt=noavail)
707 self.wait_for_solicit(is_resend=True)
708
709 def test_preferred_greater_than_valit_lifetime(self):
710 """ Preferred lifetime is greater than valid lifetime """
711
712 try:
713 address_bin = '\x00' * 6 + '\x00\x02' + '\x00' * 6 + '\x04\x05'
714 address_prefix_length = 60
715 self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
716 address_bin,
717 address_prefix_length,
718 self.prefix_group)
719
720 self.wait_for_solicit()
721 self.send_advertise()
722 self.wait_for_request()
723 ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=4,
724 validlft=3)
725 self.send_reply(iapdopt=ia_pd_opts)
726
727 self.sleep(0.5)
728
729 # check FIB contains no addresses
730 fib = self.vapi.ip6_fib_dump()
731 addresses = set(self.get_interface_addresses(fib, self.pg1))
732 new_addresses = addresses.difference(self.initial_addresses)
733 self.assertEqual(len(new_addresses), 0)
734
735 finally:
736 self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
737 address_bin,
738 address_prefix_length,
739 self.prefix_group,
740 is_add=0)
741
742 def test_T1_greater_than_T2(self):
743 """ T1 is greater than T2 """
744
745 try:
746 address_bin = '\x00' * 6 + '\x00\x02' + '\x00' * 6 + '\x04\x05'
747 address_prefix_length = 60
748 self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
749 address_bin,
750 address_prefix_length,
751 self.prefix_group)
752
753 self.wait_for_solicit()
754 self.send_advertise()
755 self.wait_for_request()
756 ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=4,
757 validlft=8)
758 self.send_reply(t1=80, t2=40, iapdopt=ia_pd_opts)
759
760 self.sleep(0.5)
761
762 # check FIB contains no addresses
763 fib = self.vapi.ip6_fib_dump()
764 addresses = set(self.get_interface_addresses(fib, self.pg1))
765 new_addresses = addresses.difference(self.initial_addresses)
766 self.assertEqual(len(new_addresses), 0)
767
768 finally:
769 self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
770 address_bin,
771 address_prefix_length,
772 self.prefix_group,
773 is_add=0)