blob: 7510543af8eb7b58c9171a8090a918bb25c9b91f [file] [log] [blame]
Paul Vinciguerrae8fece82019-02-28 15:34:00 -08001from socket import AF_INET6
2
Juraj Sloboda81119e82018-05-25 14:02:20 +02003from scapy.layers.dhcp6 import DHCP6_Advertise, DHCP6OptClientId, \
4 DHCP6OptStatusCode, DHCP6OptPref, DHCP6OptIA_PD, DHCP6OptIAPrefix, \
5 DHCP6OptServerId, DHCP6_Solicit, DHCP6_Reply, DHCP6_Request, DHCP6_Renew, \
Juraj Slobodadd3b8f72018-05-04 14:20:06 +02006 DHCP6_Rebind, DUID_LL, DHCP6_Release, DHCP6OptElapsedTime, DHCP6OptIA_NA, \
7 DHCP6OptIAAddress
Juraj Sloboda81119e82018-05-25 14:02:20 +02008from scapy.layers.inet6 import IPv6, Ether, UDP
9from scapy.utils6 import in6_mactoifaceid
10from scapy.utils import inet_ntop, inet_pton
Paul Vinciguerrae8fece82019-02-28 15:34:00 -080011
Juraj Sloboda81119e82018-05-25 14:02:20 +020012from framework import VppTestCase
Paul Vinciguerrae8fece82019-02-28 15:34:00 -080013import util
Juraj Sloboda81119e82018-05-25 14:02:20 +020014
15
16def ip6_normalize(ip6):
17 return inet_ntop(AF_INET6, inet_pton(AF_INET6, ip6))
18
19
Juraj Slobodadd3b8f72018-05-04 14:20:06 +020020class TestDHCPv6DataPlane(VppTestCase):
21 """ DHCPv6 Data Plane Test Case """
Juraj Sloboda81119e82018-05-25 14:02:20 +020022
23 @classmethod
24 def setUpClass(cls):
Juraj Slobodadd3b8f72018-05-04 14:20:06 +020025 super(TestDHCPv6DataPlane, cls).setUpClass()
Juraj Sloboda81119e82018-05-25 14:02:20 +020026
27 def setUp(self):
Juraj Slobodadd3b8f72018-05-04 14:20:06 +020028 super(TestDHCPv6DataPlane, self).setUp()
Juraj Sloboda81119e82018-05-25 14:02:20 +020029
30 self.create_pg_interfaces(range(1))
31 self.interfaces = list(self.pg_interfaces)
32 for i in self.interfaces:
33 i.admin_up()
34 i.config_ip6()
35
Juraj Slobodad9778c22018-06-12 10:21:05 +020036 self.server_duid = DUID_LL(lladdr=self.pg0.remote_mac)
Juraj Sloboda81119e82018-05-25 14:02:20 +020037
38 def tearDown(self):
39 for i in self.interfaces:
40 i.unconfig_ip6()
41 i.admin_down()
Juraj Slobodadd3b8f72018-05-04 14:20:06 +020042 super(TestDHCPv6DataPlane, self).tearDown()
Juraj Sloboda81119e82018-05-25 14:02:20 +020043
Juraj Slobodadd3b8f72018-05-04 14:20:06 +020044 def test_dhcp_ia_na_send_solicit_receive_advertise(self):
45 """ Verify DHCPv6 IA NA Solicit packet and Advertise envent """
46
47 self.vapi.dhcp6_clients_enable_disable()
48
49 self.pg_enable_capture(self.pg_interfaces)
50 self.pg_start()
51 address_bin = '\00\01\00\02\00\03' + '\00' * 8 + '\00\05'
52 address = {'address': address_bin,
53 'preferred_time': 60,
54 'valid_time': 120}
55 self.vapi.dhcp6_send_client_message(1, self.pg0.sw_if_index,
56 T1=20, T2=40, addresses=[address])
57 rx_list = self.pg0.get_capture(1)
58 self.assertEqual(len(rx_list), 1)
59 packet = rx_list[0]
60
Paul Vinciguerra978aa642018-11-24 22:19:12 -080061 self.assertEqual(packet.haslayer(IPv6), 1)
62 self.assertEqual(packet[IPv6].haslayer(DHCP6_Solicit), 1)
Juraj Slobodadd3b8f72018-05-04 14:20:06 +020063
64 client_duid = packet[DHCP6OptClientId].duid
65 trid = packet[DHCP6_Solicit].trid
66
67 dst = ip6_normalize(packet[IPv6].dst)
68 dst2 = ip6_normalize("ff02::1:2")
69 self.assert_equal(dst, dst2)
70 src = ip6_normalize(packet[IPv6].src)
71 src2 = ip6_normalize(self.pg0.local_ip6_ll)
72 self.assert_equal(src, src2)
73 ia_na = packet[DHCP6OptIA_NA]
74 self.assert_equal(ia_na.T1, 20)
75 self.assert_equal(ia_na.T2, 40)
76 self.assert_equal(len(ia_na.ianaopts), 1)
77 address = ia_na.ianaopts[0]
78 self.assert_equal(address.addr, '1:2:3::5')
79 self.assert_equal(address.preflft, 60)
80 self.assert_equal(address.validlft, 120)
81
82 self.vapi.want_dhcp6_reply_events()
83
84 try:
85 ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=60,
86 validlft=120)
87 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
Paul Vinciguerrae8fece82019-02-28 15:34:00 -080088 IPv6(src=util.mk_ll_addr(self.pg0.remote_mac),
Juraj Slobodadd3b8f72018-05-04 14:20:06 +020089 dst=self.pg0.local_ip6_ll) /
90 UDP(sport=547, dport=546) /
91 DHCP6_Advertise(trid=trid) /
92 DHCP6OptServerId(duid=self.server_duid) /
93 DHCP6OptClientId(duid=client_duid) /
94 DHCP6OptPref(prefval=7) /
95 DHCP6OptStatusCode(statuscode=1) /
96 DHCP6OptIA_NA(iaid=1, T1=20, T2=40, ianaopts=ia_na_opts)
97 )
98 self.pg0.add_stream([p])
99 self.pg_start()
100
101 ev = self.vapi.wait_for_event(1, "dhcp6_reply_event")
102
103 self.assert_equal(ev.preference, 7)
104 self.assert_equal(ev.status_code, 1)
105 self.assert_equal(ev.T1, 20)
106 self.assert_equal(ev.T2, 40)
107
108 reported_address = ev.addresses[0]
109 address = inet_pton(AF_INET6, ia_na_opts.getfieldval("addr"))
110 self.assert_equal(reported_address.address, address)
111 self.assert_equal(reported_address.preferred_time,
112 ia_na_opts.getfieldval("preflft"))
113 self.assert_equal(reported_address.valid_time,
114 ia_na_opts.getfieldval("validlft"))
115
116 finally:
117 self.vapi.want_dhcp6_reply_events(enable_disable=0)
118
119 def test_dhcp_pd_send_solicit_receive_advertise(self):
120 """ Verify DHCPv6 PD Solicit packet and Advertise envent """
Juraj Sloboda81119e82018-05-25 14:02:20 +0200121
Juraj Slobodad9778c22018-06-12 10:21:05 +0200122 self.vapi.dhcp6_clients_enable_disable()
123
Juraj Sloboda81119e82018-05-25 14:02:20 +0200124 self.pg_enable_capture(self.pg_interfaces)
125 self.pg_start()
126 prefix_bin = '\00\01\00\02\00\03' + '\00' * 10
127 prefix = {'prefix': prefix_bin,
128 'prefix_length': 50,
129 'preferred_time': 60,
130 'valid_time': 120}
131 self.vapi.dhcp6_pd_send_client_message(1, self.pg0.sw_if_index,
132 T1=20, T2=40, prefixes=[prefix])
133 rx_list = self.pg0.get_capture(1)
134 self.assertEqual(len(rx_list), 1)
135 packet = rx_list[0]
136
Paul Vinciguerra978aa642018-11-24 22:19:12 -0800137 self.assertEqual(packet.haslayer(IPv6), 1)
138 self.assertEqual(packet[IPv6].haslayer(DHCP6_Solicit), 1)
Juraj Sloboda81119e82018-05-25 14:02:20 +0200139
140 client_duid = packet[DHCP6OptClientId].duid
141 trid = packet[DHCP6_Solicit].trid
142
143 dst = ip6_normalize(packet[IPv6].dst)
144 dst2 = ip6_normalize("ff02::1:2")
145 self.assert_equal(dst, dst2)
146 src = ip6_normalize(packet[IPv6].src)
147 src2 = ip6_normalize(self.pg0.local_ip6_ll)
148 self.assert_equal(src, src2)
149 ia_pd = packet[DHCP6OptIA_PD]
150 self.assert_equal(ia_pd.T1, 20)
151 self.assert_equal(ia_pd.T2, 40)
152 self.assert_equal(len(ia_pd.iapdopt), 1)
153 prefix = ia_pd.iapdopt[0]
154 self.assert_equal(prefix.prefix, '1:2:3::')
155 self.assert_equal(prefix.plen, 50)
156 self.assert_equal(prefix.preflft, 60)
157 self.assert_equal(prefix.validlft, 120)
158
159 self.vapi.want_dhcp6_pd_reply_events()
Juraj Sloboda81119e82018-05-25 14:02:20 +0200160
Juraj Slobodadd3b8f72018-05-04 14:20:06 +0200161 try:
162 ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=60,
163 validlft=120)
164 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
Paul Vinciguerrae8fece82019-02-28 15:34:00 -0800165 IPv6(src=util.mk_ll_addr(self.pg0.remote_mac),
Juraj Slobodadd3b8f72018-05-04 14:20:06 +0200166 dst=self.pg0.local_ip6_ll) /
167 UDP(sport=547, dport=546) /
168 DHCP6_Advertise(trid=trid) /
169 DHCP6OptServerId(duid=self.server_duid) /
170 DHCP6OptClientId(duid=client_duid) /
171 DHCP6OptPref(prefval=7) /
172 DHCP6OptStatusCode(statuscode=1) /
173 DHCP6OptIA_PD(iaid=1, T1=20, T2=40, iapdopt=ia_pd_opts)
174 )
175 self.pg0.add_stream([p])
176 self.pg_start()
177
178 ev = self.vapi.wait_for_event(1, "dhcp6_pd_reply_event")
179
180 self.assert_equal(ev.preference, 7)
181 self.assert_equal(ev.status_code, 1)
182 self.assert_equal(ev.T1, 20)
183 self.assert_equal(ev.T2, 40)
184
185 reported_prefix = ev.prefixes[0]
186 prefix = inet_pton(AF_INET6, ia_pd_opts.getfieldval("prefix"))
187 self.assert_equal(reported_prefix.prefix, prefix)
188 self.assert_equal(reported_prefix.prefix_length,
189 ia_pd_opts.getfieldval("plen"))
190 self.assert_equal(reported_prefix.preferred_time,
191 ia_pd_opts.getfieldval("preflft"))
192 self.assert_equal(reported_prefix.valid_time,
193 ia_pd_opts.getfieldval("validlft"))
194
195 finally:
196 self.vapi.want_dhcp6_pd_reply_events(enable_disable=0)
197
198
199class TestDHCPv6IANAControlPlane(VppTestCase):
200 """ DHCPv6 IA NA Control Plane Test Case """
201
202 @classmethod
203 def setUpClass(cls):
204 super(TestDHCPv6IANAControlPlane, cls).setUpClass()
205
206 def setUp(self):
207 super(TestDHCPv6IANAControlPlane, self).setUp()
208
209 self.create_pg_interfaces(range(1))
210 self.interfaces = list(self.pg_interfaces)
211 for i in self.interfaces:
212 i.admin_up()
213
214 self.server_duid = DUID_LL(lladdr=self.pg0.remote_mac)
215 self.client_duid = None
216 self.T1 = 1
217 self.T2 = 2
218
219 fib = self.vapi.ip6_fib_dump()
220 self.initial_addresses = set(self.get_interface_addresses(fib,
221 self.pg0))
222
223 self.pg_enable_capture(self.pg_interfaces)
224 self.pg_start()
225
226 self.vapi.dhcp6_client_enable_disable(self.pg0.sw_if_index)
227
228 def tearDown(self):
229 self.vapi.dhcp6_client_enable_disable(self.pg0.sw_if_index, enable=0)
230
231 for i in self.interfaces:
232 i.admin_down()
233
234 super(TestDHCPv6IANAControlPlane, self).tearDown()
235
236 @staticmethod
237 def get_interface_addresses(fib, pg):
238 lst = []
239 for entry in fib:
240 if entry.address_length == 128:
241 path = entry.path[0]
242 if path.sw_if_index == pg.sw_if_index:
243 lst.append(entry.address)
244 return lst
245
246 def get_addresses(self):
247 fib = self.vapi.ip6_fib_dump()
248 addresses = set(self.get_interface_addresses(fib, self.pg0))
249 return addresses.difference(self.initial_addresses)
250
251 def validate_duid_ll(self, duid):
252 DUID_LL(duid)
253
254 def validate_packet(self, packet, msg_type, is_resend=False):
255 try:
Paul Vinciguerra978aa642018-11-24 22:19:12 -0800256 self.assertEqual(packet.haslayer(msg_type), 1)
Juraj Slobodadd3b8f72018-05-04 14:20:06 +0200257 client_duid = packet[DHCP6OptClientId].duid
258 if self.client_duid is None:
259 self.client_duid = client_duid
260 self.validate_duid_ll(client_duid)
261 else:
262 self.assertEqual(self.client_duid, client_duid)
263 if msg_type != DHCP6_Solicit and msg_type != DHCP6_Rebind:
264 server_duid = packet[DHCP6OptServerId].duid
265 self.assertEqual(server_duid, self.server_duid)
266 if is_resend:
267 self.assertEqual(self.trid, packet[msg_type].trid)
268 else:
269 self.trid = packet[msg_type].trid
270 ip = packet[IPv6]
271 udp = packet[UDP]
272 self.assertEqual(ip.dst, 'ff02::1:2')
273 self.assertEqual(udp.sport, 546)
274 self.assertEqual(udp.dport, 547)
275 dhcpv6 = packet[msg_type]
276 elapsed_time = dhcpv6[DHCP6OptElapsedTime]
277 if (is_resend):
278 self.assertNotEqual(elapsed_time.elapsedtime, 0)
279 else:
280 self.assertEqual(elapsed_time.elapsedtime, 0)
281 except:
282 packet.show()
283 raise
284
285 def wait_for_packet(self, msg_type, timeout=None, is_resend=False):
286 if timeout is None:
287 timeout = 3
288 rx_list = self.pg0.get_capture(1, timeout=timeout)
289 packet = rx_list[0]
290 self.validate_packet(packet, msg_type, is_resend=is_resend)
291
292 def wait_for_solicit(self, timeout=None, is_resend=False):
293 self.wait_for_packet(DHCP6_Solicit, timeout, is_resend=is_resend)
294
295 def wait_for_request(self, timeout=None, is_resend=False):
296 self.wait_for_packet(DHCP6_Request, timeout, is_resend=is_resend)
297
298 def wait_for_renew(self, timeout=None, is_resend=False):
299 self.wait_for_packet(DHCP6_Renew, timeout, is_resend=is_resend)
300
301 def wait_for_rebind(self, timeout=None, is_resend=False):
302 self.wait_for_packet(DHCP6_Rebind, timeout, is_resend=is_resend)
303
304 def wait_for_release(self, timeout=None, is_resend=False):
305 self.wait_for_packet(DHCP6_Release, timeout, is_resend=is_resend)
306
307 def send_packet(self, msg_type, t1=None, t2=None, ianaopts=None):
308 if t1 is None:
309 t1 = self.T1
310 if t2 is None:
311 t2 = self.T2
312 if ianaopts is None:
313 opt_ia_na = DHCP6OptIA_NA(iaid=1, T1=t1, T2=t2)
314 else:
315 opt_ia_na = DHCP6OptIA_NA(iaid=1, T1=t1, T2=t2, ianaopts=ianaopts)
Juraj Sloboda81119e82018-05-25 14:02:20 +0200316 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
Paul Vinciguerrae8fece82019-02-28 15:34:00 -0800317 IPv6(src=util.mk_ll_addr(self.pg0.remote_mac),
Juraj Sloboda81119e82018-05-25 14:02:20 +0200318 dst=self.pg0.local_ip6_ll) /
319 UDP(sport=547, dport=546) /
Juraj Slobodadd3b8f72018-05-04 14:20:06 +0200320 msg_type(trid=self.trid) /
Juraj Sloboda81119e82018-05-25 14:02:20 +0200321 DHCP6OptServerId(duid=self.server_duid) /
Juraj Slobodadd3b8f72018-05-04 14:20:06 +0200322 DHCP6OptClientId(duid=self.client_duid) /
323 opt_ia_na
Juraj Sloboda81119e82018-05-25 14:02:20 +0200324 )
325 self.pg0.add_stream([p])
Juraj Slobodadd3b8f72018-05-04 14:20:06 +0200326 self.pg_enable_capture(self.pg_interfaces)
Juraj Sloboda81119e82018-05-25 14:02:20 +0200327 self.pg_start()
328
Juraj Slobodadd3b8f72018-05-04 14:20:06 +0200329 def send_advertise(self, t1=None, t2=None, ianaopts=None):
330 self.send_packet(DHCP6_Advertise, t1, t2, ianaopts)
Juraj Sloboda81119e82018-05-25 14:02:20 +0200331
Juraj Slobodadd3b8f72018-05-04 14:20:06 +0200332 def send_reply(self, t1=None, t2=None, ianaopts=None):
333 self.send_packet(DHCP6_Reply, t1, t2, ianaopts)
Juraj Sloboda81119e82018-05-25 14:02:20 +0200334
Juraj Slobodadd3b8f72018-05-04 14:20:06 +0200335 def test_T1_and_T2_timeouts(self):
336 """ Test T1 and T2 timeouts """
337
338 self.wait_for_solicit()
339 self.send_advertise()
340 self.wait_for_request()
341 self.send_reply()
342
343 self.sleep(1)
344
345 self.wait_for_renew()
346
347 self.pg_enable_capture(self.pg_interfaces)
348
349 self.sleep(1)
350
351 self.wait_for_rebind()
352
353 def test_addresses(self):
354 """ Test handling of addresses """
355
356 ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=1,
357 validlft=2)
358
359 self.wait_for_solicit()
360 self.send_advertise(t1=20, t2=40, ianaopts=ia_na_opts)
361 self.wait_for_request()
362 self.send_reply(t1=20, t2=40, ianaopts=ia_na_opts)
363 self.sleep(0.1)
364
365 # check FIB for new address
366 new_addresses = self.get_addresses()
367 self.assertEqual(len(new_addresses), 1)
368 addr = list(new_addresses)[0]
369 self.assertEqual(inet_ntop(AF_INET6, addr), '7:8::2')
370
371 self.sleep(2)
372
373 # check that the address is deleted
374 fib = self.vapi.ip6_fib_dump()
375 addresses = set(self.get_interface_addresses(fib, self.pg0))
376 new_addresses = addresses.difference(self.initial_addresses)
377 self.assertEqual(len(new_addresses), 0)
378
379 def test_sending_client_messages_solicit(self):
380 """ VPP receives messages from DHCPv6 client """
381
382 self.wait_for_solicit()
383 self.send_packet(DHCP6_Solicit)
384 self.send_packet(DHCP6_Request)
385 self.send_packet(DHCP6_Renew)
386 self.send_packet(DHCP6_Rebind)
387 self.sleep(1)
388 self.wait_for_solicit(is_resend=True)
389
390 def test_sending_inappropriate_packets(self):
391 """ Server sends messages with inappropriate message types """
392
393 self.wait_for_solicit()
394 self.send_reply()
395 self.wait_for_solicit(is_resend=True)
396 self.send_advertise()
397 self.wait_for_request()
398 self.send_advertise()
399 self.wait_for_request(is_resend=True)
400 self.send_reply()
401 self.wait_for_renew()
402
403 def test_no_address_available_in_advertise(self):
404 """ Advertise message contains NoAddrsAvail status code """
405
406 self.wait_for_solicit()
407 noavail = DHCP6OptStatusCode(statuscode=2) # NoAddrsAvail
408 self.send_advertise(ianaopts=noavail)
409 self.wait_for_solicit(is_resend=True)
410
411 def test_preferred_greater_than_valit_lifetime(self):
412 """ Preferred lifetime is greater than valid lifetime """
413
414 self.wait_for_solicit()
415 self.send_advertise()
416 self.wait_for_request()
417 ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=4, validlft=3)
418 self.send_reply(ianaopts=ia_na_opts)
419
420 self.sleep(0.5)
421
422 # check FIB contains no addresses
423 fib = self.vapi.ip6_fib_dump()
424 addresses = set(self.get_interface_addresses(fib, self.pg0))
425 new_addresses = addresses.difference(self.initial_addresses)
426 self.assertEqual(len(new_addresses), 0)
427
428 def test_T1_greater_than_T2(self):
429 """ T1 is greater than T2 """
430
431 self.wait_for_solicit()
432 self.send_advertise()
433 self.wait_for_request()
434 ia_na_opts = DHCP6OptIAAddress(addr='7:8::2', preflft=4, validlft=8)
435 self.send_reply(t1=80, t2=40, ianaopts=ia_na_opts)
436
437 self.sleep(0.5)
438
439 # check FIB contains no addresses
440 fib = self.vapi.ip6_fib_dump()
441 addresses = set(self.get_interface_addresses(fib, self.pg0))
442 new_addresses = addresses.difference(self.initial_addresses)
443 self.assertEqual(len(new_addresses), 0)
Juraj Sloboda81119e82018-05-25 14:02:20 +0200444
445
446class TestDHCPv6PDControlPlane(VppTestCase):
447 """ DHCPv6 PD Control Plane Test Case """
448
449 @classmethod
450 def setUpClass(cls):
451 super(TestDHCPv6PDControlPlane, cls).setUpClass()
452
453 def setUp(self):
454 super(TestDHCPv6PDControlPlane, self).setUp()
455
456 self.create_pg_interfaces(range(2))
457 self.interfaces = list(self.pg_interfaces)
458 for i in self.interfaces:
459 i.admin_up()
460
Juraj Slobodad9778c22018-06-12 10:21:05 +0200461 self.server_duid = DUID_LL(lladdr=self.pg0.remote_mac)
Juraj Sloboda81119e82018-05-25 14:02:20 +0200462 self.client_duid = None
463 self.T1 = 1
464 self.T2 = 2
465
466 fib = self.vapi.ip6_fib_dump()
467 self.initial_addresses = set(self.get_interface_addresses(fib,
468 self.pg1))
469
470 self.pg_enable_capture(self.pg_interfaces)
471 self.pg_start()
472
473 self.prefix_group = 'my-pd-prefix-group'
474
475 self.vapi.dhcp6_pd_client_enable_disable(
476 self.pg0.sw_if_index,
477 prefix_group=self.prefix_group)
478
479 def tearDown(self):
480 self.vapi.dhcp6_pd_client_enable_disable(self.pg0.sw_if_index,
481 enable=0)
482
483 for i in self.interfaces:
484 i.admin_down()
485
486 super(TestDHCPv6PDControlPlane, self).tearDown()
487
488 @staticmethod
489 def get_interface_addresses(fib, pg):
490 lst = []
491 for entry in fib:
492 if entry.address_length == 128:
493 path = entry.path[0]
494 if path.sw_if_index == pg.sw_if_index:
495 lst.append(entry.address)
496 return lst
497
498 def get_addresses(self):
499 fib = self.vapi.ip6_fib_dump()
500 addresses = set(self.get_interface_addresses(fib, self.pg1))
501 return addresses.difference(self.initial_addresses)
502
Juraj Slobodad9778c22018-06-12 10:21:05 +0200503 def validate_duid_ll(self, duid):
504 DUID_LL(duid)
Juraj Sloboda81119e82018-05-25 14:02:20 +0200505
506 def validate_packet(self, packet, msg_type, is_resend=False):
507 try:
Paul Vinciguerra978aa642018-11-24 22:19:12 -0800508 self.assertEqual(packet.haslayer(msg_type), 1)
Juraj Sloboda81119e82018-05-25 14:02:20 +0200509 client_duid = packet[DHCP6OptClientId].duid
510 if self.client_duid is None:
511 self.client_duid = client_duid
Juraj Slobodad9778c22018-06-12 10:21:05 +0200512 self.validate_duid_ll(client_duid)
Juraj Sloboda81119e82018-05-25 14:02:20 +0200513 else:
514 self.assertEqual(self.client_duid, client_duid)
515 if msg_type != DHCP6_Solicit and msg_type != DHCP6_Rebind:
516 server_duid = packet[DHCP6OptServerId].duid
517 self.assertEqual(server_duid, self.server_duid)
518 if is_resend:
519 self.assertEqual(self.trid, packet[msg_type].trid)
520 else:
521 self.trid = packet[msg_type].trid
522 ip = packet[IPv6]
523 udp = packet[UDP]
524 self.assertEqual(ip.dst, 'ff02::1:2')
525 self.assertEqual(udp.sport, 546)
526 self.assertEqual(udp.dport, 547)
527 dhcpv6 = packet[msg_type]
528 elapsed_time = dhcpv6[DHCP6OptElapsedTime]
529 if (is_resend):
530 self.assertNotEqual(elapsed_time.elapsedtime, 0)
531 else:
532 self.assertEqual(elapsed_time.elapsedtime, 0)
533 except:
534 packet.show()
535 raise
536
537 def wait_for_packet(self, msg_type, timeout=None, is_resend=False):
538 if timeout is None:
539 timeout = 3
540 rx_list = self.pg0.get_capture(1, timeout=timeout)
541 packet = rx_list[0]
542 self.validate_packet(packet, msg_type, is_resend=is_resend)
543
544 def wait_for_solicit(self, timeout=None, is_resend=False):
545 self.wait_for_packet(DHCP6_Solicit, timeout, is_resend=is_resend)
546
547 def wait_for_request(self, timeout=None, is_resend=False):
548 self.wait_for_packet(DHCP6_Request, timeout, is_resend=is_resend)
549
550 def wait_for_renew(self, timeout=None, is_resend=False):
551 self.wait_for_packet(DHCP6_Renew, timeout, is_resend=is_resend)
552
553 def wait_for_rebind(self, timeout=None, is_resend=False):
554 self.wait_for_packet(DHCP6_Rebind, timeout, is_resend=is_resend)
555
556 def wait_for_release(self, timeout=None, is_resend=False):
557 self.wait_for_packet(DHCP6_Release, timeout, is_resend=is_resend)
558
559 def send_packet(self, msg_type, t1=None, t2=None, iapdopt=None):
560 if t1 is None:
561 t1 = self.T1
562 if t2 is None:
563 t2 = self.T2
564 if iapdopt is None:
565 opt_ia_pd = DHCP6OptIA_PD(iaid=1, T1=t1, T2=t2)
566 else:
567 opt_ia_pd = DHCP6OptIA_PD(iaid=1, T1=t1, T2=t2, iapdopt=iapdopt)
568 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
Paul Vinciguerrae8fece82019-02-28 15:34:00 -0800569 IPv6(src=util.mk_ll_addr(self.pg0.remote_mac),
Juraj Sloboda81119e82018-05-25 14:02:20 +0200570 dst=self.pg0.local_ip6_ll) /
571 UDP(sport=547, dport=546) /
572 msg_type(trid=self.trid) /
573 DHCP6OptServerId(duid=self.server_duid) /
574 DHCP6OptClientId(duid=self.client_duid) /
575 opt_ia_pd
576 )
577 self.pg0.add_stream([p])
578 self.pg_enable_capture(self.pg_interfaces)
579 self.pg_start()
580
581 def send_advertise(self, t1=None, t2=None, iapdopt=None):
582 self.send_packet(DHCP6_Advertise, t1, t2, iapdopt)
583
584 def send_reply(self, t1=None, t2=None, iapdopt=None):
585 self.send_packet(DHCP6_Reply, t1, t2, iapdopt)
586
587 def test_T1_and_T2_timeouts(self):
588 """ Test T1 and T2 timeouts """
589
590 self.wait_for_solicit()
591 self.send_advertise()
592 self.wait_for_request()
593 self.send_reply()
594
595 self.sleep(1)
596
597 self.wait_for_renew()
598
599 self.pg_enable_capture(self.pg_interfaces)
600
601 self.sleep(1)
602
603 self.wait_for_rebind()
604
605 def test_prefixes(self):
606 """ Test handling of prefixes """
Juraj Slobodadd3b8f72018-05-04 14:20:06 +0200607
Juraj Sloboda81119e82018-05-25 14:02:20 +0200608 address_bin_1 = None
609 address_bin_2 = None
610 try:
611 address_bin_1 = '\x00' * 6 + '\x00\x02' + '\x00' * 6 + '\x04\x05'
612 address_prefix_length_1 = 60
613 self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
614 address_bin_1,
615 address_prefix_length_1,
616 self.prefix_group)
617
618 ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=2,
619 validlft=3)
620
621 self.wait_for_solicit()
622 self.send_advertise(t1=20, t2=40, iapdopt=ia_pd_opts)
623 self.wait_for_request()
624 self.send_reply(t1=20, t2=40, iapdopt=ia_pd_opts)
625 self.sleep(0.1)
626
627 # check FIB for new address
628 new_addresses = self.get_addresses()
629 self.assertEqual(len(new_addresses), 1)
630 addr = list(new_addresses)[0]
631 self.assertEqual(inet_ntop(AF_INET6, addr), '7:8:0:2::405')
632
633 self.sleep(1)
634
635 address_bin_2 = '\x00' * 6 + '\x00\x76' + '\x00' * 6 + '\x04\x06'
636 address_prefix_length_2 = 62
637 self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
638 address_bin_2,
639 address_prefix_length_2,
640 self.prefix_group)
641
642 self.sleep(1)
643
644 # check FIB contains 2 addresses
645 fib = self.vapi.ip6_fib_dump()
646 addresses = set(self.get_interface_addresses(fib, self.pg1))
647 new_addresses = addresses.difference(self.initial_addresses)
648 self.assertEqual(len(new_addresses), 2)
649 addr1 = list(new_addresses)[0]
650 addr2 = list(new_addresses)[1]
651 if inet_ntop(AF_INET6, addr1) == '7:8:0:76::406':
652 addr1, addr2 = addr2, addr1
653 self.assertEqual(inet_ntop(AF_INET6, addr1), '7:8:0:2::405')
654 self.assertEqual(inet_ntop(AF_INET6, addr2), '7:8:0:76::406')
655
656 self.sleep(1)
657
658 # check that the addresses are deleted
659 fib = self.vapi.ip6_fib_dump()
660 addresses = set(self.get_interface_addresses(fib, self.pg1))
661 new_addresses = addresses.difference(self.initial_addresses)
662 self.assertEqual(len(new_addresses), 0)
663
664 finally:
665 if address_bin_1 is not None:
666 self.vapi.ip6_add_del_address_using_prefix(
667 self.pg1.sw_if_index, address_bin_1,
668 address_prefix_length_1, self.prefix_group, is_add=0)
669 if address_bin_2 is not None:
670 self.vapi.ip6_add_del_address_using_prefix(
671 self.pg1.sw_if_index, address_bin_2,
672 address_prefix_length_2, self.prefix_group, is_add=0)
673
674 def test_sending_client_messages_solicit(self):
675 """ VPP receives messages from DHCPv6 client """
676
677 self.wait_for_solicit()
678 self.send_packet(DHCP6_Solicit)
679 self.send_packet(DHCP6_Request)
680 self.send_packet(DHCP6_Renew)
681 self.send_packet(DHCP6_Rebind)
682 self.sleep(1)
683 self.wait_for_solicit(is_resend=True)
684
Juraj Slobodadd3b8f72018-05-04 14:20:06 +0200685 def test_sending_inappropriate_packets(self):
686 """ Server sends messages with inappropriate message types """
Juraj Sloboda81119e82018-05-25 14:02:20 +0200687
688 self.wait_for_solicit()
689 self.send_reply()
690 self.wait_for_solicit(is_resend=True)
691 self.send_advertise()
692 self.wait_for_request()
693 self.send_advertise()
694 self.wait_for_request(is_resend=True)
695 self.send_reply()
696 self.wait_for_renew()
697
698 def test_no_prefix_available_in_advertise(self):
699 """ Advertise message contains NoPrefixAvail status code """
700
701 self.wait_for_solicit()
702 noavail = DHCP6OptStatusCode(statuscode=6) # NoPrefixAvail
703 self.send_advertise(iapdopt=noavail)
704 self.wait_for_solicit(is_resend=True)
705
706 def test_preferred_greater_than_valit_lifetime(self):
707 """ Preferred lifetime is greater than valid lifetime """
708
709 try:
710 address_bin = '\x00' * 6 + '\x00\x02' + '\x00' * 6 + '\x04\x05'
711 address_prefix_length = 60
712 self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
713 address_bin,
714 address_prefix_length,
715 self.prefix_group)
716
717 self.wait_for_solicit()
718 self.send_advertise()
719 self.wait_for_request()
720 ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=4,
721 validlft=3)
722 self.send_reply(iapdopt=ia_pd_opts)
723
724 self.sleep(0.5)
725
726 # check FIB contains no addresses
727 fib = self.vapi.ip6_fib_dump()
728 addresses = set(self.get_interface_addresses(fib, self.pg1))
729 new_addresses = addresses.difference(self.initial_addresses)
730 self.assertEqual(len(new_addresses), 0)
731
732 finally:
733 self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
734 address_bin,
735 address_prefix_length,
736 self.prefix_group,
737 is_add=0)
738
739 def test_T1_greater_than_T2(self):
740 """ T1 is greater than T2 """
741
742 try:
743 address_bin = '\x00' * 6 + '\x00\x02' + '\x00' * 6 + '\x04\x05'
744 address_prefix_length = 60
745 self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
746 address_bin,
747 address_prefix_length,
748 self.prefix_group)
749
750 self.wait_for_solicit()
751 self.send_advertise()
752 self.wait_for_request()
753 ia_pd_opts = DHCP6OptIAPrefix(prefix='7:8::', plen=56, preflft=4,
754 validlft=8)
755 self.send_reply(t1=80, t2=40, iapdopt=ia_pd_opts)
756
757 self.sleep(0.5)
758
759 # check FIB contains no addresses
760 fib = self.vapi.ip6_fib_dump()
761 addresses = set(self.get_interface_addresses(fib, self.pg1))
762 new_addresses = addresses.difference(self.initial_addresses)
763 self.assertEqual(len(new_addresses), 0)
764
765 finally:
766 self.vapi.ip6_add_del_address_using_prefix(self.pg1.sw_if_index,
767 address_bin,
768 address_prefix_length,
769 self.prefix_group,
770 is_add=0)