blob: ced774689593cec74c52c0b7548860331e5d7847 [file] [log] [blame]
Filip Varga603e7542020-07-21 10:27:39 +02001#!/usr/bin/env python3
2
3import socket
4import struct
5import unittest
6import scapy.compat
7from time import sleep
8from framework import VppTestCase, running_extended_tests
9from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
10from scapy.layers.inet import IP, TCP, UDP, ICMP
11from scapy.layers.inet import IPerror, UDPerror
12from scapy.layers.l2 import Ether
13from util import ppp
14
15
16class TestDET44(VppTestCase):
17 """ Deterministic NAT Test Cases """
18
19 @classmethod
20 def setUpClass(cls):
21 super(TestDET44, cls).setUpClass()
22 cls.vapi.cli("set log class det44 level debug")
23
24 cls.tcp_port_in = 6303
25 cls.tcp_external_port = 6303
26 cls.udp_port_in = 6304
27 cls.udp_external_port = 6304
28 cls.icmp_id_in = 6305
29 cls.nat_addr = '10.0.0.3'
30
31 cls.create_pg_interfaces(range(3))
32 cls.interfaces = list(cls.pg_interfaces)
33
34 for i in cls.interfaces:
35 i.admin_up()
36 i.config_ip4()
37 i.resolve_arp()
38
39 cls.pg0.generate_remote_hosts(2)
40 cls.pg0.configure_ipv4_neighbors()
41
42 @classmethod
43 def tearDownClass(cls):
44 super(TestDET44, cls).tearDownClass()
45
46 def setUp(self):
47 super(TestDET44, self).setUp()
48 self.vapi.det44_plugin_enable_disable(enable=1)
49
50 def tearDown(self):
51 super(TestDET44, self).tearDown()
52 if not self.vpp_dead:
53 self.vapi.det44_plugin_enable_disable(enable=0)
54
55 def show_commands_at_teardown(self):
56 self.logger.info(self.vapi.cli("show det44 interfaces"))
57 self.logger.info(self.vapi.cli("show det44 timeouts"))
58 self.logger.info(self.vapi.cli("show det44 mappings"))
59 self.logger.info(self.vapi.cli("show det44 sessions"))
60
61 def verify_capture_in(self, capture, in_if):
62 """
63 Verify captured packets on inside network
64
65 :param capture: Captured packets
66 :param in_if: Inside interface
67 """
68 fired = False
69 for packet in capture:
70 try:
71 self.assert_packet_checksums_valid(packet)
72 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
73 if packet.haslayer(TCP):
74 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
75 elif packet.haslayer(UDP):
76 self.assertEqual(packet[UDP].dport, self.udp_port_in)
77 else:
78 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
79 except:
80 fired = True
81 self.logger.error(ppp("Unexpected or invalid packet "
82 "(inside network):", packet))
83 if fired:
84 raise
85
86 def verify_ipfix_max_entries_per_user(self, data, limit, src_addr):
87 """
88 Verify IPFIX maximum entries per user exceeded event
89
90 :param data: Decoded IPFIX data records
91 :param limit: Number of maximum entries per user
92 :param src_addr: IPv4 source address
93 """
94 self.assertEqual(1, len(data))
95 record = data[0]
96 # natEvent
97 self.assertEqual(scapy.compat.orb(record[230]), 13)
98 # natQuotaExceededEvent
99 self.assertEqual(struct.pack("I", 3), record[466])
100 # maxEntriesPerUser
101 self.assertEqual(struct.pack("I", limit), record[473])
102 # sourceIPv4Address
103 self.assertEqual(socket.inet_pton(socket.AF_INET, src_addr), record[8])
104
105 def initiate_tcp_session(self, in_if, out_if):
106 """
107 Initiates TCP session 3 WAY HAND SHAKE
108
109 :param in_if: Inside interface
110 :param out_if: Outside interface
111 """
112
113 # SYN packet in->out
114 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
115 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
116 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
117 flags="S"))
118 in_if.add_stream(p)
119 self.pg_enable_capture(self.pg_interfaces)
120 self.pg_start()
121 capture = out_if.get_capture(1)
122 p = capture[0]
123 self.tcp_port_out = p[TCP].sport
124
125 # SYN + ACK packet out->in
126 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
127 IP(src=out_if.remote_ip4, dst=self.nat_addr) /
128 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
129 flags="SA"))
130 out_if.add_stream(p)
131 self.pg_enable_capture(self.pg_interfaces)
132 self.pg_start()
133 in_if.get_capture(1)
134
135 # ACK packet in->out
136 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
137 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
138 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
139 flags="A"))
140 in_if.add_stream(p)
141 self.pg_enable_capture(self.pg_interfaces)
142 self.pg_start()
143 out_if.get_capture(1)
144
145 def create_stream_in(self, in_if, out_if, ttl=64):
146 """
147 Create packet stream for inside network
148
149 :param in_if: Inside interface
150 :param out_if: Outside interface
151 :param ttl: TTL of generated packets
152 """
153 pkts = []
154 # TCP
155 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
156 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
157 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
158 pkts.append(p)
159
160 # UDP
161 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
162 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
163 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
164 pkts.append(p)
165
166 # ICMP
167 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
168 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
169 ICMP(id=self.icmp_id_in, type='echo-request'))
170 pkts.append(p)
171
172 return pkts
173
174 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
175 """
176 Create packet stream for outside network
177
178 :param out_if: Outside interface
179 :param dst_ip: Destination IP address (Default use global NAT address)
180 :param ttl: TTL of generated packets
181 """
182 if dst_ip is None:
183 dst_ip = self.nat_addr
184 pkts = []
185 # TCP
186 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
187 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
188 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
189 pkts.append(p)
190
191 # UDP
192 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
193 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
194 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
195 pkts.append(p)
196
197 # ICMP
198 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
199 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
200 ICMP(id=self.icmp_external_id, type='echo-reply'))
201 pkts.append(p)
202
203 return pkts
204
205 def verify_capture_out(self, capture, nat_ip=None):
206 """
207 Verify captured packets on outside network
208
209 :param capture: Captured packets
210 :param nat_ip: Translated IP address (Default use global NAT address)
211 :param same_port: Source port number is not translated (Default False)
212 """
213 if nat_ip is None:
214 nat_ip = self.nat_addr
215 for packet in capture:
216 try:
217 self.assertEqual(packet[IP].src, nat_ip)
218 if packet.haslayer(TCP):
219 self.tcp_port_out = packet[TCP].sport
220 elif packet.haslayer(UDP):
221 self.udp_port_out = packet[UDP].sport
222 else:
223 self.icmp_external_id = packet[ICMP].id
224 except:
225 self.logger.error(ppp("Unexpected or invalid packet "
226 "(outside network):", packet))
227 raise
228
229 def test_deterministic_mode(self):
230 """ NAT plugin run deterministic mode """
231 in_addr = '172.16.255.0'
232 out_addr = '172.17.255.50'
233 in_addr_t = '172.16.255.20'
234 in_plen = 24
235 out_plen = 32
236
237 self.vapi.det44_add_del_map(is_add=1, in_addr=in_addr,
238 in_plen=in_plen, out_addr=out_addr,
239 out_plen=out_plen)
240
241 rep1 = self.vapi.det44_forward(in_addr_t)
242 self.assertEqual(str(rep1.out_addr), out_addr)
243 rep2 = self.vapi.det44_reverse(rep1.out_port_hi, out_addr)
244
245 self.assertEqual(str(rep2.in_addr), in_addr_t)
246
247 deterministic_mappings = self.vapi.det44_map_dump()
248 self.assertEqual(len(deterministic_mappings), 1)
249 dsm = deterministic_mappings[0]
250 self.assertEqual(in_addr, str(dsm.in_addr))
251 self.assertEqual(in_plen, dsm.in_plen)
252 self.assertEqual(out_addr, str(dsm.out_addr))
253 self.assertEqual(out_plen, dsm.out_plen)
254
255 def test_set_timeouts(self):
256 """ Set deterministic NAT timeouts """
257 timeouts_before = self.vapi.det44_get_timeouts()
258
259 self.vapi.det44_set_timeouts(
260 udp=timeouts_before.udp + 10,
261 tcp_established=timeouts_before.tcp_established + 10,
262 tcp_transitory=timeouts_before.tcp_transitory + 10,
263 icmp=timeouts_before.icmp + 10)
264
265 timeouts_after = self.vapi.det44_get_timeouts()
266
267 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
268 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
269 self.assertNotEqual(timeouts_before.tcp_established,
270 timeouts_after.tcp_established)
271 self.assertNotEqual(timeouts_before.tcp_transitory,
272 timeouts_after.tcp_transitory)
273
274 def test_in(self):
275 """ DET44 translation test (TCP, UDP, ICMP) """
276
277 nat_ip = "10.0.0.10"
278
279 self.vapi.det44_add_del_map(is_add=1, in_addr=self.pg0.remote_ip4,
280 in_plen=32,
281 out_addr=socket.inet_aton(nat_ip),
282 out_plen=32)
283
284 self.vapi.det44_interface_add_del_feature(
285 sw_if_index=self.pg0.sw_if_index,
286 is_add=1, is_inside=1)
287 self.vapi.det44_interface_add_del_feature(
288 sw_if_index=self.pg1.sw_if_index,
289 is_add=1, is_inside=0)
290
291 # in2out
292 pkts = self.create_stream_in(self.pg0, self.pg1)
293 self.pg0.add_stream(pkts)
294 self.pg_enable_capture(self.pg_interfaces)
295 self.pg_start()
296 capture = self.pg1.get_capture(len(pkts))
297 self.verify_capture_out(capture, nat_ip)
298
299 # out2in
300 pkts = self.create_stream_out(self.pg1, nat_ip)
301 self.pg1.add_stream(pkts)
302 self.pg_enable_capture(self.pg_interfaces)
303 self.pg_start()
304 capture = self.pg0.get_capture(len(pkts))
305 self.verify_capture_in(capture, self.pg0)
306
307 # session dump test
308 sessions = self.vapi.det44_session_dump(self.pg0.remote_ip4)
309 self.assertEqual(len(sessions), 3)
310
311 # TCP session
312 s = sessions[0]
313 self.assertEqual(str(s.ext_addr), self.pg1.remote_ip4)
314 self.assertEqual(s.in_port, self.tcp_port_in)
315 self.assertEqual(s.out_port, self.tcp_port_out)
316 self.assertEqual(s.ext_port, self.tcp_external_port)
317
318 # UDP session
319 s = sessions[1]
320 self.assertEqual(str(s.ext_addr), self.pg1.remote_ip4)
321 self.assertEqual(s.in_port, self.udp_port_in)
322 self.assertEqual(s.out_port, self.udp_port_out)
323 self.assertEqual(s.ext_port, self.udp_external_port)
324
325 # ICMP session
326 s = sessions[2]
327 self.assertEqual(str(s.ext_addr), self.pg1.remote_ip4)
328 self.assertEqual(s.in_port, self.icmp_id_in)
329 self.assertEqual(s.out_port, self.icmp_external_id)
330
331 def test_multiple_users(self):
332 """ Deterministic NAT multiple users """
333
334 nat_ip = "10.0.0.10"
335 port_in = 80
336 external_port = 6303
337
338 host0 = self.pg0.remote_hosts[0]
339 host1 = self.pg0.remote_hosts[1]
340
341 self.vapi.det44_add_del_map(is_add=1, in_addr=host0.ip4, in_plen=24,
342 out_addr=socket.inet_aton(nat_ip),
343 out_plen=32)
344 self.vapi.det44_interface_add_del_feature(
345 sw_if_index=self.pg0.sw_if_index,
346 is_add=1, is_inside=1)
347 self.vapi.det44_interface_add_del_feature(
348 sw_if_index=self.pg1.sw_if_index,
349 is_add=1, is_inside=0)
350
351 # host0 to out
352 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
353 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
354 TCP(sport=port_in, dport=external_port))
355 self.pg0.add_stream(p)
356 self.pg_enable_capture(self.pg_interfaces)
357 self.pg_start()
358 capture = self.pg1.get_capture(1)
359 p = capture[0]
360 try:
361 ip = p[IP]
362 tcp = p[TCP]
363 self.assertEqual(ip.src, nat_ip)
364 self.assertEqual(ip.dst, self.pg1.remote_ip4)
365 self.assertEqual(tcp.dport, external_port)
366 port_out0 = tcp.sport
367 except:
368 self.logger.error(ppp("Unexpected or invalid packet:", p))
369 raise
370
371 # host1 to out
372 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
373 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
374 TCP(sport=port_in, dport=external_port))
375 self.pg0.add_stream(p)
376 self.pg_enable_capture(self.pg_interfaces)
377 self.pg_start()
378 capture = self.pg1.get_capture(1)
379 p = capture[0]
380 try:
381 ip = p[IP]
382 tcp = p[TCP]
383 self.assertEqual(ip.src, nat_ip)
384 self.assertEqual(ip.dst, self.pg1.remote_ip4)
385 self.assertEqual(tcp.dport, external_port)
386 port_out1 = tcp.sport
387 except:
388 self.logger.error(ppp("Unexpected or invalid packet:", p))
389 raise
390
391 dms = self.vapi.det44_map_dump()
392 self.assertEqual(1, len(dms))
393 self.assertEqual(2, dms[0].ses_num)
394
395 # out to host0
396 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
397 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
398 TCP(sport=external_port, dport=port_out0))
399 self.pg1.add_stream(p)
400 self.pg_enable_capture(self.pg_interfaces)
401 self.pg_start()
402 capture = self.pg0.get_capture(1)
403 p = capture[0]
404 try:
405 ip = p[IP]
406 tcp = p[TCP]
407 self.assertEqual(ip.src, self.pg1.remote_ip4)
408 self.assertEqual(ip.dst, host0.ip4)
409 self.assertEqual(tcp.dport, port_in)
410 self.assertEqual(tcp.sport, external_port)
411 except:
412 self.logger.error(ppp("Unexpected or invalid packet:", p))
413 raise
414
415 # out to host1
416 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
417 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
418 TCP(sport=external_port, dport=port_out1))
419 self.pg1.add_stream(p)
420 self.pg_enable_capture(self.pg_interfaces)
421 self.pg_start()
422 capture = self.pg0.get_capture(1)
423 p = capture[0]
424 try:
425 ip = p[IP]
426 tcp = p[TCP]
427 self.assertEqual(ip.src, self.pg1.remote_ip4)
428 self.assertEqual(ip.dst, host1.ip4)
429 self.assertEqual(tcp.dport, port_in)
430 self.assertEqual(tcp.sport, external_port)
431 except:
432 self.logger.error(ppp("Unexpected or invalid packet", p))
433 raise
434
435 # session close api test
436 self.vapi.det44_close_session_out(socket.inet_aton(nat_ip),
437 port_out1,
438 self.pg1.remote_ip4,
439 external_port)
440 dms = self.vapi.det44_map_dump()
441 self.assertEqual(dms[0].ses_num, 1)
442
443 self.vapi.det44_close_session_in(host0.ip4,
444 port_in,
445 self.pg1.remote_ip4,
446 external_port)
447 dms = self.vapi.det44_map_dump()
448 self.assertEqual(dms[0].ses_num, 0)
449
450 def test_tcp_session_close_detection_in(self):
451 """ DET44 TCP session close from inside network """
452 self.vapi.det44_add_del_map(is_add=1, in_addr=self.pg0.remote_ip4,
453 in_plen=32,
454 out_addr=socket.inet_aton(self.nat_addr),
455 out_plen=32)
456 self.vapi.det44_interface_add_del_feature(
457 sw_if_index=self.pg0.sw_if_index,
458 is_add=1, is_inside=1)
459 self.vapi.det44_interface_add_del_feature(
460 sw_if_index=self.pg1.sw_if_index,
461 is_add=1, is_inside=0)
462
463 self.initiate_tcp_session(self.pg0, self.pg1)
464
465 # close the session from inside
466 try:
467 # FIN packet in -> out
468 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
469 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
470 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
471 flags="F"))
472 self.pg0.add_stream(p)
473 self.pg_enable_capture(self.pg_interfaces)
474 self.pg_start()
475 self.pg1.get_capture(1)
476
477 pkts = []
478
479 # ACK packet out -> in
480 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
481 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
482 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
483 flags="A"))
484 pkts.append(p)
485
486 # FIN packet out -> in
487 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
488 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
489 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
490 flags="F"))
491 pkts.append(p)
492
493 self.pg1.add_stream(pkts)
494 self.pg_enable_capture(self.pg_interfaces)
495 self.pg_start()
496 self.pg0.get_capture(2)
497
498 # ACK packet in -> out
499 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
500 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
501 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
502 flags="A"))
503 self.pg0.add_stream(p)
504 self.pg_enable_capture(self.pg_interfaces)
505 self.pg_start()
506 self.pg1.get_capture(1)
507
508 # Check if deterministic NAT44 closed the session
509 dms = self.vapi.det44_map_dump()
510 self.assertEqual(0, dms[0].ses_num)
511 except:
512 self.logger.error("TCP session termination failed")
513 raise
514
515 def test_tcp_session_close_detection_out(self):
516 """ Deterministic NAT TCP session close from outside network """
517 self.vapi.det44_add_del_map(is_add=1, in_addr=self.pg0.remote_ip4,
518 in_plen=32,
519 out_addr=socket.inet_aton(self.nat_addr),
520 out_plen=32)
521 self.vapi.det44_interface_add_del_feature(
522 sw_if_index=self.pg0.sw_if_index,
523 is_add=1, is_inside=1)
524 self.vapi.det44_interface_add_del_feature(
525 sw_if_index=self.pg1.sw_if_index,
526 is_add=1, is_inside=0)
527
528 self.initiate_tcp_session(self.pg0, self.pg1)
529
530 # close the session from outside
531 try:
532 # FIN packet out -> in
533 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
534 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
535 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
536 flags="F"))
537 self.pg1.add_stream(p)
538 self.pg_enable_capture(self.pg_interfaces)
539 self.pg_start()
540 self.pg0.get_capture(1)
541
542 pkts = []
543
544 # ACK packet in -> out
545 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
546 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
547 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
548 flags="A"))
549 pkts.append(p)
550
551 # ACK packet in -> out
552 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
553 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
554 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
555 flags="F"))
556 pkts.append(p)
557
558 self.pg0.add_stream(pkts)
559 self.pg_enable_capture(self.pg_interfaces)
560 self.pg_start()
561 self.pg1.get_capture(2)
562
563 # ACK packet out -> in
564 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
565 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
566 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
567 flags="A"))
568 self.pg1.add_stream(p)
569 self.pg_enable_capture(self.pg_interfaces)
570 self.pg_start()
571 self.pg0.get_capture(1)
572
573 # Check if deterministic NAT44 closed the session
574 dms = self.vapi.det44_map_dump()
575 self.assertEqual(0, dms[0].ses_num)
576 except:
577 self.logger.error("TCP session termination failed")
578 raise
579
580 @unittest.skipUnless(running_extended_tests, "part of extended tests")
581 def test_session_timeout(self):
582 """ Deterministic NAT session timeouts """
583 self.vapi.det44_add_del_map(is_add=1, in_addr=self.pg0.remote_ip4,
584 in_plen=32,
585 out_addr=socket.inet_aton(self.nat_addr),
586 out_plen=32)
587 self.vapi.det44_interface_add_del_feature(
588 sw_if_index=self.pg0.sw_if_index,
589 is_add=1, is_inside=1)
590 self.vapi.det44_interface_add_del_feature(
591 sw_if_index=self.pg1.sw_if_index,
592 is_add=1, is_inside=0)
593
594 self.initiate_tcp_session(self.pg0, self.pg1)
595 self.vapi.det44_set_timeouts(udp=5, tcp_established=5,
596 tcp_transitory=5, icmp=5)
597 pkts = self.create_stream_in(self.pg0, self.pg1)
598 self.pg0.add_stream(pkts)
599 self.pg_enable_capture(self.pg_interfaces)
600 self.pg_start()
601 self.pg1.get_capture(len(pkts))
602 sleep(15)
603
604 dms = self.vapi.det44_map_dump()
605 self.assertEqual(0, dms[0].ses_num)
606
607 # TODO: ipfix needs to be separated from NAT base plugin
608 @unittest.skipUnless(running_extended_tests, "part of extended tests")
609 def test_session_limit_per_user(self):
610 """ Deterministic NAT maximum sessions per user limit """
611 self.vapi.det44_add_del_map(is_add=1, in_addr=self.pg0.remote_ip4,
612 in_plen=32,
613 out_addr=socket.inet_aton(self.nat_addr),
614 out_plen=32)
615 self.vapi.det44_interface_add_del_feature(
616 sw_if_index=self.pg0.sw_if_index,
617 is_add=1, is_inside=1)
618 self.vapi.det44_interface_add_del_feature(
619 sw_if_index=self.pg1.sw_if_index,
620 is_add=1, is_inside=0)
621 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4,
622 src_address=self.pg2.local_ip4,
623 path_mtu=512,
624 template_interval=10)
625 self.vapi.nat_ipfix_enable_disable(domain_id=1, src_port=4739,
626 enable=1)
627
628 pkts = []
629 for port in range(1025, 2025):
630 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
631 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
632 UDP(sport=port, dport=port))
633 pkts.append(p)
634
635 self.pg0.add_stream(pkts)
636 self.pg_enable_capture(self.pg_interfaces)
637 self.pg_start()
638 self.pg1.get_capture(len(pkts))
639
640 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
641 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
642 UDP(sport=3001, dport=3002))
643 self.pg0.add_stream(p)
644 self.pg_enable_capture(self.pg_interfaces)
645 self.pg_start()
646 self.pg1.assert_nothing_captured()
647
648 # verify ICMP error packet
649 capture = self.pg0.get_capture(1)
650 p = capture[0]
651 self.assertTrue(p.haslayer(ICMP))
652 icmp = p[ICMP]
653 self.assertEqual(icmp.type, 3)
654 self.assertEqual(icmp.code, 1)
655 self.assertTrue(icmp.haslayer(IPerror))
656 inner_ip = icmp[IPerror]
657 self.assertEqual(inner_ip[UDPerror].sport, 3001)
658 self.assertEqual(inner_ip[UDPerror].dport, 3002)
659
660 dms = self.vapi.det44_map_dump()
661
662 self.assertEqual(1000, dms[0].ses_num)
663
664 # verify IPFIX logging
665 self.vapi.ipfix_flush()
666 sleep(1)
667 capture = self.pg2.get_capture(2)
668 ipfix = IPFIXDecoder()
669 # first load template
670 for p in capture:
671 self.assertTrue(p.haslayer(IPFIX))
672 if p.haslayer(Template):
673 ipfix.add_template(p.getlayer(Template))
674 # verify events in data set
675 for p in capture:
676 if p.haslayer(Data):
677 data = ipfix.decode_data_set(p.getlayer(Set))
678 self.verify_ipfix_max_entries_per_user(data,
679 1000,
680 self.pg0.remote_ip4)
681 self.vapi.nat_ipfix_enable_disable(domain_id=1, src_port=4739,
682 enable=0)