blob: 242d30dcbab9f6a2230eef5f3956ff767386d7cb [file] [log] [blame]
Fan Zhangab0bf0c2022-05-25 20:03:40 +00001import socket
2import unittest
3import ipaddress
4
5from util import ppp
6from framework import VppTestRunner
7from template_ipsec import IPSecIPv4Fwd
Piotr Bronowski651cc012022-07-08 12:45:51 +00008from template_ipsec import IPSecIPv6Fwd
Fan Zhangab0bf0c2022-05-25 20:03:40 +00009
10
11class SpdFastPathOutbound(IPSecIPv4Fwd):
12 # Override setUpConstants to enable outbound fast path in config
13 @classmethod
14 def setUpConstants(cls):
15 super(SpdFastPathOutbound, cls).setUpConstants()
16 cls.vpp_cmdline.extend(["ipsec", "{", "ipv4-outbound-spd-fast-path on", "}"])
17 cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline))
18
19
Piotr Bronowski651cc012022-07-08 12:45:51 +000020class SpdFastPathIPv6Outbound(IPSecIPv6Fwd):
21 # Override setUpConstants to enable outbound fast path in config
22 @classmethod
23 def setUpConstants(cls):
24 super(SpdFastPathIPv6Outbound, cls).setUpConstants()
25 cls.vpp_cmdline.extend(["ipsec", "{", "ipv6-outbound-spd-fast-path on", "}"])
26 cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline))
27
28
Fan Zhangab0bf0c2022-05-25 20:03:40 +000029class IPSec4SpdTestCaseAdd(SpdFastPathOutbound):
30 """ IPSec/IPv4 outbound: Policy mode test case with fast path \
31 (add rule)"""
32
33 def test_ipsec_spd_outbound_add(self):
34 # In this test case, packets in IPv4 FWD path are configured
35 # to go through IPSec outbound SPD policy lookup.
36 # 2 SPD rules (1 HIGH and 1 LOW) are added.
37 # High priority rule action is set to BYPASS.
38 # Low priority rule action is set to DISCARD.
39 # Traffic sent on pg0 interface should match high priority
40 # rule and should be sent out on pg1 interface.
41 self.create_interfaces(2)
42 pkt_count = 5
43 s_port_s = 1111
44 s_port_e = 1111
45 d_port_s = 2222
46 d_port_e = 2222
47 self.spd_create_and_intf_add(1, [self.pg1])
48 policy_0 = self.spd_add_rem_policy( # outbound, priority 10
49 1,
50 self.pg0,
51 self.pg1,
52 socket.IPPROTO_UDP,
53 is_out=1,
54 priority=10,
55 policy_type="bypass",
56 local_port_start=s_port_s,
57 local_port_stop=s_port_e,
58 remote_port_start=d_port_s,
59 remote_port_stop=d_port_e,
60 )
61 policy_1 = self.spd_add_rem_policy( # outbound, priority 5
62 1,
63 self.pg0,
64 self.pg1,
65 socket.IPPROTO_UDP,
66 is_out=1,
67 priority=5,
68 policy_type="discard",
69 local_port_start=s_port_s,
70 local_port_stop=s_port_e,
71 remote_port_start=d_port_s,
72 remote_port_stop=d_port_e,
73 )
74
75 # create the packet stream
76 packets = self.create_stream(self.pg0, self.pg1, pkt_count, s_port_s, d_port_s)
77 # add the stream to the source interface + enable capture
78 self.pg0.add_stream(packets)
79 self.pg0.enable_capture()
80 self.pg1.enable_capture()
81 # start the packet generator
82 self.pg_start()
83 # get capture
84 capture = self.pg1.get_capture()
85 for packet in capture:
86 try:
87 self.logger.debug(ppp("SPD - Got packet:", packet))
88 except Exception:
89 self.logger.error(ppp("Unexpected or invalid packet:", packet))
90 raise
91 self.logger.debug("SPD: Num packets: %s", len(capture.res))
92
93 # assert nothing captured on pg0
94 self.pg0.assert_nothing_captured()
95 # verify captured packets
96 self.verify_capture(self.pg0, self.pg1, capture)
97 # verify all policies matched the expected number of times
98 self.verify_policy_match(pkt_count, policy_0)
99 self.verify_policy_match(0, policy_1)
100
101
102class IPSec4SpdTestCaseAddPortRange(SpdFastPathOutbound):
103 """ IPSec/IPv4 outbound: Policy mode test case with fast path \
104 (add all ips port range rule)"""
105
106 def test_ipsec_spd_outbound_add(self):
107 # In this test case, packets in IPv4 FWD path are configured
108 # to go through IPSec outbound SPD policy lookup.
109 # 2 SPD rules (1 HIGH and 1 LOW) are added.
110 # High priority rule action is set to BYPASS.
111 # Low priority rule action is set to DISCARD.
112 # Traffic sent on pg0 interface should match high priority
113 # rule and should be sent out on pg1 interface.
114 self.create_interfaces(2)
115 pkt_count = 5
116 s_port_s = 1000
117 s_port_e = 2023
118 d_port_s = 5000
119 d_port_e = 6023
120 self.spd_create_and_intf_add(1, [self.pg1])
121 policy_0 = self.spd_add_rem_policy( # outbound, priority 10
122 1,
123 self.pg0,
124 self.pg1,
125 socket.IPPROTO_UDP,
126 is_out=1,
127 priority=10,
128 policy_type="bypass",
129 all_ips=True,
130 local_port_start=s_port_s,
131 local_port_stop=s_port_e,
132 remote_port_start=d_port_s,
133 remote_port_stop=d_port_e,
134 )
135 policy_1 = self.spd_add_rem_policy( # outbound, priority 5
136 1,
137 self.pg0,
138 self.pg1,
139 socket.IPPROTO_UDP,
140 is_out=1,
141 priority=5,
142 policy_type="discard",
143 all_ips=True,
144 local_port_start=s_port_s,
145 local_port_stop=s_port_e,
146 remote_port_start=d_port_s,
147 remote_port_stop=d_port_e,
148 )
149
150 # create the packet stream
151 packets = self.create_stream(self.pg0, self.pg1, pkt_count, 1333, 5444)
152 # add the stream to the source interface + enable capture
153 self.pg0.add_stream(packets)
154 self.pg0.enable_capture()
155 self.pg1.enable_capture()
156 # start the packet generator
157 self.pg_start()
158 # get capture
159 capture = self.pg1.get_capture()
160 for packet in capture:
161 try:
162 self.logger.debug(ppp("SPD - Got packet:", packet))
163 except Exception:
164 self.logger.error(ppp("Unexpected or invalid packet:", packet))
165 raise
166 self.logger.debug("SPD: Num packets: %s", len(capture.res))
167
168 # assert nothing captured on pg0
169 self.pg0.assert_nothing_captured()
170 # verify captured packets
171 self.verify_capture(self.pg0, self.pg1, capture)
172 # verify all policies matched the expected number of times
173 self.verify_policy_match(pkt_count, policy_0)
174 self.verify_policy_match(0, policy_1)
175
176
177class IPSec4SpdTestCaseAddIPRange(SpdFastPathOutbound):
178 """ IPSec/IPv4 outbound: Policy mode test case with fast path \
179 (add ips range with any port rule)"""
180
181 def test_ipsec_spd_outbound_add(self):
182 # In this test case, packets in IPv4 FWD path are configured
183 # to go through IPSec outbound SPD policy lookup.
184 # 2 SPD rules (1 HIGH and 1 LOW) are added.
185 # High priority rule action is set to BYPASS.
186 # Low priority rule action is set to DISCARD.
187 # Traffic sent on pg0 interface should match high priority
188 # rule and should be sent out on pg1 interface.
189 self.create_interfaces(2)
190 pkt_count = 5
191 s_ip_s = ipaddress.ip_address(self.pg0.remote_ip4)
192 s_ip_e = ipaddress.ip_address(int(s_ip_s) + 5)
193 d_ip_s = ipaddress.ip_address(self.pg1.remote_ip4)
194 d_ip_e = ipaddress.ip_address(int(d_ip_s) + 0)
195 self.spd_create_and_intf_add(1, [self.pg1])
196 policy_0 = self.spd_add_rem_policy( # outbound, priority 10
197 1,
198 self.pg0,
199 self.pg1,
200 socket.IPPROTO_UDP,
201 is_out=1,
202 priority=10,
203 policy_type="bypass",
204 ip_range=True,
205 local_ip_start=s_ip_s,
206 local_ip_stop=s_ip_e,
207 remote_ip_start=d_ip_s,
208 remote_ip_stop=d_ip_e,
209 )
210 policy_1 = self.spd_add_rem_policy( # outbound, priority 5
211 1,
212 self.pg0,
213 self.pg1,
214 socket.IPPROTO_UDP,
215 is_out=1,
216 priority=5,
217 policy_type="discard",
218 ip_range=True,
219 local_ip_start=s_ip_s,
220 local_ip_stop=s_ip_e,
221 remote_ip_start=d_ip_s,
222 remote_ip_stop=d_ip_e,
223 )
224
225 # create the packet stream
226 packets = self.create_stream(self.pg0, self.pg1, pkt_count)
227 # add the stream to the source interface + enable capture
228 self.pg0.add_stream(packets)
229 self.pg0.enable_capture()
230 self.pg1.enable_capture()
231 # start the packet generator
232 self.pg_start()
233 # get capture
234 capture = self.pg1.get_capture()
235 for packet in capture:
236 try:
237 self.logger.debug(ppp("SPD - Got packet:", packet))
238 except Exception:
239 self.logger.error(ppp("Unexpected or invalid packet:", packet))
240 raise
241 self.logger.debug("SPD: Num packets: %s", len(capture.res))
242
243 # assert nothing captured on pg0
244 self.pg0.assert_nothing_captured()
245 # verify captured packets
246 self.verify_capture(self.pg0, self.pg1, capture)
247 # verify all policies matched the expected number of times
248 self.verify_policy_match(pkt_count, policy_0)
249 self.verify_policy_match(0, policy_1)
250
251
252class IPSec4SpdTestCaseAddIPAndPortRange(SpdFastPathOutbound):
253 """ IPSec/IPv4 outbound: Policy mode test case with fast path \
254 (add all ips range rule)"""
255
256 def test_ipsec_spd_outbound_add(self):
257 # In this test case, packets in IPv4 FWD path are configured
258 # to go through IPSec outbound SPD policy lookup.
259 # 2 SPD rules (1 HIGH and 1 LOW) are added.
260 # High priority rule action is set to BYPASS.
261 # Low priority rule action is set to DISCARD.
262 # Traffic sent on pg0 interface should match high priority
263 # rule and should be sent out on pg1 interface.
264 # in this test we define ranges of ports and ip addresses.
265 self.create_interfaces(2)
266 pkt_count = 5
267 s_port_s = 1000
268 s_port_e = 1000 + 1023
269 d_port_s = 5000
270 d_port_e = 5000 + 1023
271
272 s_ip_s = ipaddress.ip_address(
273 int(ipaddress.ip_address(self.pg0.remote_ip4)) - 24
274 )
275 s_ip_e = ipaddress.ip_address(int(s_ip_s) + 255)
276 d_ip_s = ipaddress.ip_address(self.pg1.remote_ip4)
277 d_ip_e = ipaddress.ip_address(int(d_ip_s) + 255)
278 self.spd_create_and_intf_add(1, [self.pg1])
279 policy_0 = self.spd_add_rem_policy( # outbound, priority 10
280 1,
281 self.pg0,
282 self.pg1,
283 socket.IPPROTO_UDP,
284 is_out=1,
285 priority=10,
286 policy_type="bypass",
287 ip_range=True,
288 local_ip_start=s_ip_s,
289 local_ip_stop=s_ip_e,
290 remote_ip_start=d_ip_s,
291 remote_ip_stop=d_ip_e,
292 local_port_start=s_port_s,
293 local_port_stop=s_port_e,
294 remote_port_start=d_port_s,
295 remote_port_stop=d_port_e,
296 )
297 policy_1 = self.spd_add_rem_policy( # outbound, priority 5
298 1,
299 self.pg0,
300 self.pg1,
301 socket.IPPROTO_UDP,
302 is_out=1,
303 priority=5,
304 policy_type="discard",
305 ip_range=True,
306 local_ip_start=s_ip_s,
307 local_ip_stop=s_ip_e,
308 remote_ip_start=d_ip_s,
309 remote_ip_stop=d_ip_e,
310 local_port_start=s_port_s,
311 local_port_stop=s_port_e,
312 remote_port_start=d_port_s,
313 remote_port_stop=d_port_e,
314 )
315
316 # create the packet stream
317 packets = self.create_stream(self.pg0, self.pg1, pkt_count)
318 # add the stream to the source interface + enable capture
319 self.pg0.add_stream(packets)
320 self.pg0.enable_capture()
321 self.pg1.enable_capture()
322 # start the packet generator
323 self.pg_start()
324 # get capture
325 capture = self.pg1.get_capture()
326 for packet in capture:
327 try:
328 self.logger.debug(ppp("SPD - Got packet:", packet))
329 except Exception:
330 self.logger.error(ppp("Unexpected or invalid packet:", packet))
331 raise
332 self.logger.debug("SPD: Num packets: %s", len(capture.res))
333
334 # assert nothing captured on pg0
335 self.pg0.assert_nothing_captured()
336 # verify captured packets
337 self.verify_capture(self.pg0, self.pg1, capture)
338 # verify all policies matched the expected number of times
339 self.verify_policy_match(pkt_count, policy_0)
340 self.verify_policy_match(0, policy_1)
341
342
343class IPSec4SpdTestCaseAddAll(SpdFastPathOutbound):
344 """ IPSec/IPv4 outbound: Policy mode test case with fast path \
345 (add all ips ports rule)"""
346
347 def test_ipsec_spd_outbound_add(self):
348 # In this test case, packets in IPv4 FWD path are configured
349 # to go through IPSec outbound SPD policy lookup.
350 # 2 SPD rules (1 HIGH and 1 LOW) are added.
351 # Low priority rule action is set to BYPASS all ips.
352 # High priority rule action is set to DISCARD all ips.
353 # Traffic sent on pg0 interface when LOW priority rule is added,
354 # expect the packet is being sent out to pg1. Then HIGH priority
355 # rule is added and send the same traffic to pg0, this time expect
356 # the traffic is dropped.
357 self.create_interfaces(2)
358 pkt_count = 5
359 self.spd_create_and_intf_add(1, [self.pg1])
360 policy_0 = self.spd_add_rem_policy( # outbound, priority 10
361 1,
362 self.pg0,
363 self.pg1,
364 socket.IPPROTO_UDP,
365 is_out=1,
366 priority=10,
367 policy_type="bypass",
368 all_ips=True,
369 )
370
371 # create the packet stream
372 packets = self.create_stream(self.pg0, self.pg1, pkt_count)
373 # add the stream to the source interface + enable capture
374 self.pg0.add_stream(packets)
375 self.pg0.enable_capture()
376 self.pg1.enable_capture()
377 # start the packet generator
378 self.pg_start()
379 # get capture
380 capture = self.pg1.get_capture()
381 for packet in capture:
382 try:
383 self.logger.debug(ppp("SPD - Got packet:", packet))
384 except Exception:
385 self.logger.error(ppp("Unexpected or invalid packet:", packet))
386 raise
387 self.logger.debug("SPD: Num packets: %s", len(capture.res))
388
389 # assert nothing captured on pg0
390 self.pg0.assert_nothing_captured()
391 # verify captured packets
392 self.verify_capture(self.pg0, self.pg1, capture)
393 # verify all policies matched the expected number of times
394 self.verify_policy_match(pkt_count, policy_0)
395
396 policy_1 = self.spd_add_rem_policy( # outbound, priority 20
397 1,
398 self.pg0,
399 self.pg1,
400 socket.IPPROTO_UDP,
401 is_out=1,
402 priority=20,
403 policy_type="discard",
404 all_ips=True,
405 )
406
407 # create the packet stream
408 packets = self.create_stream(self.pg0, self.pg1, pkt_count)
409 # add the stream to the source interface + enable capture
410 self.pg0.add_stream(packets)
411 self.pg0.enable_capture()
412 self.pg1.enable_capture()
413 # start the packet generator
414 self.pg_start()
415 # assert nothing captured on pg0 and pg1
416 self.pg0.assert_nothing_captured()
417 self.pg1.assert_nothing_captured()
418
419
420class IPSec4SpdTestCaseRemove(SpdFastPathOutbound):
421 """ IPSec/IPv4 outbound: Policy mode test case with fast path \
422 (remove rule)"""
423
424 def test_ipsec_spd_outbound_remove(self):
425 # In this test case, packets in IPv4 FWD path are configured
426 # to go through IPSec outbound SPD policy lookup.
427 # 2 SPD rules (1 HIGH and 1 LOW) are added.
428 # High priority rule action is set to BYPASS.
429 # Low priority rule action is set to DISCARD.
430 # High priority rule is then removed.
431 # Traffic sent on pg0 interface should match low priority
432 # rule and should be discarded after SPD lookup.
433 self.create_interfaces(2)
434 pkt_count = 5
435 self.spd_create_and_intf_add(1, [self.pg1])
436 policy_0 = self.spd_add_rem_policy( # outbound, priority 10
437 1,
438 self.pg0,
439 self.pg1,
440 socket.IPPROTO_UDP,
441 is_out=1,
442 priority=10,
443 policy_type="bypass",
444 )
445 policy_1 = self.spd_add_rem_policy( # outbound, priority 5
446 1,
447 self.pg0,
448 self.pg1,
449 socket.IPPROTO_UDP,
450 is_out=1,
451 priority=5,
452 policy_type="discard",
453 )
454
455 # create the packet stream
456 packets = self.create_stream(self.pg0, self.pg1, pkt_count)
457 # add the stream to the source interface + enable capture
458 self.pg0.add_stream(packets)
459 self.pg0.enable_capture()
460 self.pg1.enable_capture()
461 # start the packet generator
462 self.pg_start()
463 # get capture
464 capture = self.pg1.get_capture()
465 for packet in capture:
466 try:
467 self.logger.debug(ppp("SPD - Got packet:", packet))
468 except Exception:
469 self.logger.error(ppp("Unexpected or invalid packet:", packet))
470 raise
471
472 # assert nothing captured on pg0
473 self.pg0.assert_nothing_captured()
474 # verify capture on pg1
475 self.logger.debug("SPD: Num packets: %s", len(capture.res))
476 self.verify_capture(self.pg0, self.pg1, capture)
477 # verify all policies matched the expected number of times
478 self.verify_policy_match(pkt_count, policy_0)
479 self.verify_policy_match(0, policy_1)
Fan Zhangab0bf0c2022-05-25 20:03:40 +0000480 # now remove the bypass rule
481 self.spd_add_rem_policy( # outbound, priority 10
482 1,
483 self.pg0,
484 self.pg1,
485 socket.IPPROTO_UDP,
486 is_out=1,
487 priority=10,
488 policy_type="bypass",
489 remove=True,
490 )
491
492 # resend the same packets
493 self.pg0.add_stream(packets)
494 self.pg0.enable_capture() # flush the old captures
495 self.pg1.enable_capture()
496 self.pg_start()
497 # assert nothing captured on pg0
498 self.pg0.assert_nothing_captured()
499 # all packets will be dropped by SPD rule
500 self.pg1.assert_nothing_captured()
501 # verify all policies matched the expected number of times
502 self.verify_policy_match(pkt_count, policy_0)
503 self.verify_policy_match(pkt_count, policy_1)
504
505
506class IPSec4SpdTestCaseReadd(SpdFastPathOutbound):
507 """ IPSec/IPv4 outbound: Policy mode test case with fast path \
508 (add, remove, re-add)"""
509
510 def test_ipsec_spd_outbound_readd(self):
511 # In this test case, packets in IPv4 FWD path are configured
512 # to go through IPSec outbound SPD policy lookup.
513 # 2 SPD rules (1 HIGH and 1 LOW) are added.
514 # High priority rule action is set to BYPASS.
515 # Low priority rule action is set to DISCARD.
516 # Traffic sent on pg0 interface should match high priority
517 # rule and should be sent out on pg1 interface.
518 # High priority rule is then removed.
519 # Traffic sent on pg0 interface should match low priority
520 # rule and should be discarded after SPD lookup.
521 # Readd high priority rule.
522 # Traffic sent on pg0 interface should match high priority
523 # rule and should be sent out on pg1 interface.
524 self.create_interfaces(2)
525 pkt_count = 5
526 self.spd_create_and_intf_add(1, [self.pg1])
527 policy_0 = self.spd_add_rem_policy( # outbound, priority 10
528 1,
529 self.pg0,
530 self.pg1,
531 socket.IPPROTO_UDP,
532 is_out=1,
533 priority=10,
534 policy_type="bypass",
535 )
536 policy_1 = self.spd_add_rem_policy( # outbound, priority 5
537 1,
538 self.pg0,
539 self.pg1,
540 socket.IPPROTO_UDP,
541 is_out=1,
542 priority=5,
543 policy_type="discard",
544 )
545
546 # create the packet stream
547 packets = self.create_stream(self.pg0, self.pg1, pkt_count)
548 # add the stream to the source interface + enable capture
549 self.pg0.add_stream(packets)
550 self.pg0.enable_capture()
551 self.pg1.enable_capture()
552 # start the packet generator
553 self.pg_start()
554 # get capture
555 capture = self.pg1.get_capture()
556 for packet in capture:
557 try:
558 self.logger.debug(ppp("SPD - Got packet:", packet))
559 except Exception:
560 self.logger.error(ppp("Unexpected or invalid packet:", packet))
561 raise
562 self.logger.debug("SPD: Num packets: %s", len(capture.res))
563
564 # assert nothing captured on pg0
565 self.pg0.assert_nothing_captured()
566 # verify capture on pg1
567 self.verify_capture(self.pg0, self.pg1, capture)
568 # verify all policies matched the expected number of times
569 self.verify_policy_match(pkt_count, policy_0)
570 self.verify_policy_match(0, policy_1)
Piotr Bronowski651cc012022-07-08 12:45:51 +0000571 # remove the bypass rule, leaving only the discard rule
Fan Zhangab0bf0c2022-05-25 20:03:40 +0000572 self.spd_add_rem_policy( # outbound, priority 10
573 1,
574 self.pg0,
575 self.pg1,
576 socket.IPPROTO_UDP,
577 is_out=1,
578 priority=10,
579 policy_type="bypass",
580 remove=True,
581 )
582
583 # resend the same packets
584 self.pg0.add_stream(packets)
585 self.pg0.enable_capture() # flush the old captures
586 self.pg1.enable_capture()
587 self.pg_start()
588
589 # assert nothing captured on pg0
590 self.pg0.assert_nothing_captured()
591 # all packets will be dropped by SPD rule
592 self.pg1.assert_nothing_captured()
593 # verify all policies matched the expected number of times
594 self.verify_policy_match(pkt_count, policy_0)
595 self.verify_policy_match(pkt_count, policy_1)
596
597 # now readd the bypass rule
598 policy_0 = self.spd_add_rem_policy( # outbound, priority 10
599 1,
600 self.pg0,
601 self.pg1,
602 socket.IPPROTO_UDP,
603 is_out=1,
604 priority=10,
605 policy_type="bypass",
606 )
607
608 # resend the same packets
609 self.pg0.add_stream(packets)
610 self.pg0.enable_capture() # flush the old captures
611 self.pg1.enable_capture()
612 self.pg_start()
613
614 # get capture
615 capture = self.pg1.get_capture(pkt_count)
616 for packet in capture:
617 try:
618 self.logger.debug(ppp("SPD - Got packet:", packet))
619 except Exception:
620 self.logger.error(ppp("Unexpected or invalid packet:", packet))
621 raise
622 self.logger.debug("SPD: Num packets: %s", len(capture.res))
623
624 # assert nothing captured on pg0
625 self.pg0.assert_nothing_captured()
626 # verify captured packets
627 self.verify_capture(self.pg0, self.pg1, capture)
628 # verify all policies matched the expected number of times
629 self.verify_policy_match(pkt_count, policy_0)
630 self.verify_policy_match(pkt_count, policy_1)
631
632
633class IPSec4SpdTestCaseMultiple(SpdFastPathOutbound):
634 """ IPSec/IPv4 outbound: Policy mode test case with fast path \
635 (multiple interfaces, multiple rules)"""
636
637 def test_ipsec_spd_outbound_multiple(self):
638 # In this test case, packets in IPv4 FWD path are configured to go
639 # through IPSec outbound SPD policy lookup.
640 # Multiples rules on multiple interfaces are tested at the same time.
641 # 3x interfaces are configured, binding the same SPD to each.
642 # Each interface has 2 SPD rules (1 BYPASS and 1 DISCARD).
643 # On pg0 & pg1, the BYPASS rule is HIGH priority
644 # On pg2, the DISCARD rule is HIGH priority
645 # Traffic should be received on pg0 & pg1 and dropped on pg2.
646 self.create_interfaces(3)
647 pkt_count = 5
648 # bind SPD to all interfaces
649 self.spd_create_and_intf_add(1, self.pg_interfaces)
650 # add rules on all interfaces
651 policy_01 = self.spd_add_rem_policy( # outbound, priority 10
652 1,
653 self.pg0,
654 self.pg1,
655 socket.IPPROTO_UDP,
656 is_out=1,
657 priority=10,
658 policy_type="bypass",
659 )
660 policy_02 = self.spd_add_rem_policy( # outbound, priority 5
661 1,
662 self.pg0,
663 self.pg1,
664 socket.IPPROTO_UDP,
665 is_out=1,
666 priority=5,
667 policy_type="discard",
668 )
669
670 policy_11 = self.spd_add_rem_policy( # outbound, priority 10
671 1,
672 self.pg1,
673 self.pg2,
674 socket.IPPROTO_UDP,
675 is_out=1,
676 priority=10,
677 policy_type="bypass",
678 )
679 policy_12 = self.spd_add_rem_policy( # outbound, priority 5
680 1,
681 self.pg1,
682 self.pg2,
683 socket.IPPROTO_UDP,
684 is_out=1,
685 priority=5,
686 policy_type="discard",
687 )
688
689 policy_21 = self.spd_add_rem_policy( # outbound, priority 5
690 1,
691 self.pg2,
692 self.pg0,
693 socket.IPPROTO_UDP,
694 is_out=1,
695 priority=5,
696 policy_type="bypass",
697 )
698 policy_22 = self.spd_add_rem_policy( # outbound, priority 10
699 1,
700 self.pg2,
701 self.pg0,
702 socket.IPPROTO_UDP,
703 is_out=1,
704 priority=10,
705 policy_type="discard",
706 )
707
708 # interfaces bound to an SPD, will by default drop inbound
709 # traffic with no matching policies. add catch-all inbound
710 # bypass rule to SPD:
711 self.spd_add_rem_policy( # inbound, all interfaces
712 1,
713 None,
714 None,
715 socket.IPPROTO_UDP,
716 is_out=0,
717 priority=10,
718 policy_type="bypass",
719 all_ips=True,
720 )
721
722 # create the packet streams
723 packets0 = self.create_stream(self.pg0, self.pg1, pkt_count)
724 packets1 = self.create_stream(self.pg1, self.pg2, pkt_count)
725 packets2 = self.create_stream(self.pg2, self.pg0, pkt_count)
726 # add the streams to the source interfaces
727 self.pg0.add_stream(packets0)
728 self.pg1.add_stream(packets1)
729 self.pg2.add_stream(packets2)
730 # enable capture on all interfaces
731 for pg in self.pg_interfaces:
732 pg.enable_capture()
733 # start the packet generator
734 self.pg_start()
735
736 # get captures
737 if_caps = []
738 for pg in [self.pg1, self.pg2]: # we are expecting captures on pg1/pg2
739 if_caps.append(pg.get_capture())
740 for packet in if_caps[-1]:
741 try:
742 self.logger.debug(ppp("SPD - Got packet:", packet))
743 except Exception:
744 self.logger.error(ppp("Unexpected or invalid packet:", packet))
745 raise
746 self.logger.debug("SPD: Num packets: %s", len(if_caps[0].res))
747 self.logger.debug("SPD: Num packets: %s", len(if_caps[1].res))
748
749 # verify captures that matched BYPASS rule
750 self.verify_capture(self.pg0, self.pg1, if_caps[0])
751 self.verify_capture(self.pg1, self.pg2, if_caps[1])
752 # verify that traffic to pg0 matched DISCARD rule and was dropped
753 self.pg0.assert_nothing_captured()
754 # verify all packets that were expected to match rules, matched
755 # pg0 -> pg1
756 self.verify_policy_match(pkt_count, policy_01)
757 self.verify_policy_match(0, policy_02)
758 # pg1 -> pg2
759 self.verify_policy_match(pkt_count, policy_11)
760 self.verify_policy_match(0, policy_12)
761 # pg2 -> pg0
762 self.verify_policy_match(0, policy_21)
763 self.verify_policy_match(pkt_count, policy_22)
764
765
Piotr Bronowski651cc012022-07-08 12:45:51 +0000766class IPSec6SpdTestCaseAdd(SpdFastPathIPv6Outbound):
767 """ IPSec/IPv6 outbound: Policy mode test case with fast path \
768 (add rule)"""
769
770 def test_ipsec_spd_outbound_add(self):
771 # In this test case, packets in IPv4 FWD path are configured
772 # to go through IPSec outbound SPD policy lookup.
773 # 2 SPD rules (1 HIGH and 1 LOW) are added.
774 # High priority rule action is set to BYPASS.
775 # Low priority rule action is set to DISCARD.
776 # Traffic sent on pg0 interface should match high priority
777 # rule and should be sent out on pg1 interface.
778 self.create_interfaces(2)
779 pkt_count = 5
780 s_port_s = 1111
781 s_port_e = 1111
782 d_port_s = 2222
783 d_port_e = 2222
784 self.spd_create_and_intf_add(1, [self.pg1])
785 policy_0 = self.spd_add_rem_policy( # outbound, priority 10
786 1,
787 self.pg0,
788 self.pg1,
789 socket.IPPROTO_UDP,
790 is_out=1,
791 priority=10,
792 policy_type="bypass",
793 local_port_start=s_port_s,
794 local_port_stop=s_port_e,
795 remote_port_start=d_port_s,
796 remote_port_stop=d_port_e,
797 )
798 policy_1 = self.spd_add_rem_policy( # outbound, priority 5
799 1,
800 self.pg0,
801 self.pg1,
802 socket.IPPROTO_UDP,
803 is_out=1,
804 priority=5,
805 policy_type="discard",
806 local_port_start=s_port_s,
807 local_port_stop=s_port_e,
808 remote_port_start=d_port_s,
809 remote_port_stop=d_port_e,
810 )
811
812 # create the packet stream
813 packets = self.create_stream(self.pg0, self.pg1, pkt_count, s_port_s, d_port_s)
814 # add the stream to the source interface + enable capture
815 self.pg0.add_stream(packets)
816 self.pg0.enable_capture()
817 self.pg1.enable_capture()
818 # start the packet generator
819 self.pg_start()
820 # get capture
821 capture = self.pg1.get_capture()
822 for packet in capture:
823 try:
824 self.logger.debug(ppp("SPD - Got packet:", packet))
825 except Exception:
826 self.logger.error(ppp("Unexpected or invalid packet:", packet))
827 raise
828 self.logger.debug("SPD: Num packets: %s", len(capture.res))
829
830 # assert nothing captured on pg0
831 self.pg0.assert_nothing_captured()
832 # verify captured packets
833 self.verify_capture(self.pg0, self.pg1, capture)
834 # verify all policies matched the expected number of times
835 self.verify_policy_match(pkt_count, policy_0)
836 self.verify_policy_match(0, policy_1)
837
838
839class IPSec6SpdTestCaseAddAll(SpdFastPathIPv6Outbound):
840 """ IPSec/IPv6 outbound: Policy mode test case with fast path \
841 (add all ips ports rule)"""
842
843 def test_ipsec_spd_outbound_add(self):
844 # In this test case, packets in IPv4 FWD path are configured
845 # to go through IPSec outbound SPD policy lookup.
846 # 2 SPD rules (1 HIGH and 1 LOW) are added.
847 # Low priority rule action is set to BYPASS all ips.
848 # High priority rule action is set to DISCARD all ips.
849 # Traffic sent on pg0 interface when LOW priority rule is added,
850 # expect the packet is being sent out to pg1. Then HIGH priority
851 # rule is added and send the same traffic to pg0, this time expect
852 # the traffic is dropped.
853 self.create_interfaces(2)
854 pkt_count = 5
855 self.spd_create_and_intf_add(1, [self.pg1])
856 policy_0 = self.spd_add_rem_policy( # outbound, priority 10
857 1,
858 self.pg0,
859 self.pg1,
860 socket.IPPROTO_UDP,
861 is_out=1,
862 priority=10,
863 policy_type="bypass",
864 all_ips=True,
865 )
866
867 # create the packet stream
868 packets = self.create_stream(self.pg0, self.pg1, pkt_count)
869 # add the stream to the source interface + enable capture
870 self.pg0.add_stream(packets)
871 self.pg0.enable_capture()
872 self.pg1.enable_capture()
873 # start the packet generator
874 self.pg_start()
875 # get capture
876 capture = self.pg1.get_capture()
877 for packet in capture:
878 try:
879 self.logger.debug(ppp("SPD - Got packet:", packet))
880 except Exception:
881 self.logger.error(ppp("Unexpected or invalid packet:", packet))
882 raise
883 self.logger.debug("SPD: Num packets: %s", len(capture.res))
884
885 # assert nothing captured on pg0
886 self.pg0.assert_nothing_captured()
887 # verify captured packets
888 self.verify_capture(self.pg0, self.pg1, capture)
889 # verify all policies matched the expected number of times
890 self.verify_policy_match(pkt_count, policy_0)
891
892 policy_1 = self.spd_add_rem_policy( # outbound, priority 20
893 1,
894 self.pg0,
895 self.pg1,
896 socket.IPPROTO_UDP,
897 is_out=1,
898 priority=20,
899 policy_type="discard",
900 all_ips=True,
901 )
902
903 # create the packet stream
904 packets = self.create_stream(self.pg0, self.pg1, pkt_count)
905 # add the stream to the source interface + enable capture
906 self.pg0.add_stream(packets)
907 self.pg0.enable_capture()
908 self.pg1.enable_capture()
909 # start the packet generator
910 self.pg_start()
911 # assert nothing captured on pg0 and pg1
912 self.pg0.assert_nothing_captured()
913 self.pg1.assert_nothing_captured()
914
915
916class IPSec6SpdTestCaseAddPortRange(SpdFastPathIPv6Outbound):
917 """ IPSec/IPv6 outbound: Policy mode test case with fast path \
918 (add all ips port range rule)"""
919
920 def test_ipsec_spd_outbound_add(self):
921 # In this test case, packets in IPv4 FWD path are configured
922 # to go through IPSec outbound SPD policy lookup.
923 # 2 SPD rules (1 HIGH and 1 LOW) are added.
924 # High priority rule action is set to BYPASS.
925 # Low priority rule action is set to DISCARD.
926 # Traffic sent on pg0 interface should match high priority
927 # rule and should be sent out on pg1 interface.
928 self.create_interfaces(2)
929 pkt_count = 5
930 s_port_s = 1000
931 s_port_e = 2023
932 d_port_s = 5000
933 d_port_e = 6023
934 self.spd_create_and_intf_add(1, [self.pg1])
935 policy_0 = self.spd_add_rem_policy( # outbound, priority 10
936 1,
937 self.pg0,
938 self.pg1,
939 socket.IPPROTO_UDP,
940 is_out=1,
941 priority=10,
942 policy_type="bypass",
943 all_ips=True,
944 local_port_start=s_port_s,
945 local_port_stop=s_port_e,
946 remote_port_start=d_port_s,
947 remote_port_stop=d_port_e,
948 )
949 policy_1 = self.spd_add_rem_policy( # outbound, priority 5
950 1,
951 self.pg0,
952 self.pg1,
953 socket.IPPROTO_UDP,
954 is_out=1,
955 priority=5,
956 policy_type="discard",
957 all_ips=True,
958 local_port_start=s_port_s,
959 local_port_stop=s_port_e,
960 remote_port_start=d_port_s,
961 remote_port_stop=d_port_e,
962 )
963
964 # create the packet stream
965 packets = self.create_stream(self.pg0, self.pg1, pkt_count, 1333, 5444)
966 # add the stream to the source interface + enable capture
967 self.pg0.add_stream(packets)
968 self.pg0.enable_capture()
969 self.pg1.enable_capture()
970 # start the packet generator
971 self.pg_start()
972 # get capture
973 capture = self.pg1.get_capture()
974 for packet in capture:
975 try:
976 self.logger.debug(ppp("SPD - Got packet:", packet))
977 except Exception:
978 self.logger.error(ppp("Unexpected or invalid packet:", packet))
979 raise
980 self.logger.debug("SPD: Num packets: %s", len(capture.res))
981
982 # assert nothing captured on pg0
983 self.pg0.assert_nothing_captured()
984 # verify captured packets
985 self.verify_capture(self.pg0, self.pg1, capture)
986 # verify all policies matched the expected number of times
987 self.verify_policy_match(pkt_count, policy_0)
988 self.verify_policy_match(0, policy_1)
989
990
991class IPSec6SpdTestCaseAddIPRange(SpdFastPathIPv6Outbound):
992 """ IPSec/IPv6 outbound: Policy mode test case with fast path \
993 (add ips range with any port rule)"""
994
995 def test_ipsec_spd_outbound_add(self):
996 # In this test case, packets in IPv4 FWD path are configured
997 # to go through IPSec outbound SPD policy lookup.
998 # 2 SPD rules (1 HIGH and 1 LOW) are added.
999 # High priority rule action is set to BYPASS.
1000 # Low priority rule action is set to DISCARD.
1001 # Traffic sent on pg0 interface should match high priority
1002 # rule and should be sent out on pg1 interface.
1003 self.create_interfaces(2)
1004 pkt_count = 5
1005 s_ip_s = ipaddress.ip_address(self.pg0.remote_ip6)
1006 s_ip_e = ipaddress.ip_address(int(s_ip_s) + 5)
1007 d_ip_s = ipaddress.ip_address(self.pg1.remote_ip6)
1008 d_ip_e = ipaddress.ip_address(int(d_ip_s) + 0)
1009 self.spd_create_and_intf_add(1, [self.pg1])
1010 policy_0 = self.spd_add_rem_policy( # outbound, priority 10
1011 1,
1012 self.pg0,
1013 self.pg1,
1014 socket.IPPROTO_UDP,
1015 is_out=1,
1016 priority=10,
1017 policy_type="bypass",
1018 ip_range=True,
1019 local_ip_start=s_ip_s,
1020 local_ip_stop=s_ip_e,
1021 remote_ip_start=d_ip_s,
1022 remote_ip_stop=d_ip_e,
1023 )
1024 policy_1 = self.spd_add_rem_policy( # outbound, priority 5
1025 1,
1026 self.pg0,
1027 self.pg1,
1028 socket.IPPROTO_UDP,
1029 is_out=1,
1030 priority=5,
1031 policy_type="discard",
1032 ip_range=True,
1033 local_ip_start=s_ip_s,
1034 local_ip_stop=s_ip_e,
1035 remote_ip_start=d_ip_s,
1036 remote_ip_stop=d_ip_e,
1037 )
1038
1039 # create the packet stream
1040 packets = self.create_stream(self.pg0, self.pg1, pkt_count)
1041 # add the stream to the source interface + enable capture
1042 self.pg0.add_stream(packets)
1043 self.pg0.enable_capture()
1044 self.pg1.enable_capture()
1045 # start the packet generator
1046 self.pg_start()
1047 # get capture
1048 capture = self.pg1.get_capture()
1049 for packet in capture:
1050 try:
1051 self.logger.debug(ppp("SPD - Got packet:", packet))
1052 except Exception:
1053 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1054 raise
1055 self.logger.debug("SPD: Num packets: %s", len(capture.res))
1056
1057 # assert nothing captured on pg0
1058 self.pg0.assert_nothing_captured()
1059 # verify captured packets
1060 self.verify_capture(self.pg0, self.pg1, capture)
1061 # verify all policies matched the expected number of times
1062 self.verify_policy_match(pkt_count, policy_0)
1063 self.verify_policy_match(0, policy_1)
1064
1065
1066class IPSec6SpdTestCaseAddIPAndPortRange(SpdFastPathIPv6Outbound):
1067 """ IPSec/IPvr6 outbound: Policy mode test case with fast path \
1068 (add all ips range rule)"""
1069
1070 def test_ipsec_spd_outbound_add(self):
1071 # In this test case, packets in IPv4 FWD path are configured
1072 # to go through IPSec outbound SPD policy lookup.
1073 # 2 SPD rules (1 HIGH and 1 LOW) are added.
1074 # High priority rule action is set to BYPASS.
1075 # Low priority rule action is set to DISCARD.
1076 # Traffic sent on pg0 interface should match high priority
1077 # rule and should be sent out on pg1 interface.
1078 # in this test we define ranges of ports and ip addresses.
1079 self.create_interfaces(2)
1080 pkt_count = 5
1081 s_port_s = 1000
1082 s_port_e = 1000 + 1023
1083 d_port_s = 5000
1084 d_port_e = 5000 + 1023
1085
1086 s_ip_s = ipaddress.ip_address(
1087 int(ipaddress.ip_address(self.pg0.remote_ip6)) - 24
1088 )
1089 s_ip_e = ipaddress.ip_address(int(s_ip_s) + 255)
1090 d_ip_s = ipaddress.ip_address(self.pg1.remote_ip6)
1091 d_ip_e = ipaddress.ip_address(int(d_ip_s) + 255)
1092 self.spd_create_and_intf_add(1, [self.pg1])
1093 policy_0 = self.spd_add_rem_policy( # outbound, priority 10
1094 1,
1095 self.pg0,
1096 self.pg1,
1097 socket.IPPROTO_UDP,
1098 is_out=1,
1099 priority=10,
1100 policy_type="bypass",
1101 ip_range=True,
1102 local_ip_start=s_ip_s,
1103 local_ip_stop=s_ip_e,
1104 remote_ip_start=d_ip_s,
1105 remote_ip_stop=d_ip_e,
1106 local_port_start=s_port_s,
1107 local_port_stop=s_port_e,
1108 remote_port_start=d_port_s,
1109 remote_port_stop=d_port_e,
1110 )
1111 policy_1 = self.spd_add_rem_policy( # outbound, priority 5
1112 1,
1113 self.pg0,
1114 self.pg1,
1115 socket.IPPROTO_UDP,
1116 is_out=1,
1117 priority=5,
1118 policy_type="discard",
1119 ip_range=True,
1120 local_ip_start=s_ip_s,
1121 local_ip_stop=s_ip_e,
1122 remote_ip_start=d_ip_s,
1123 remote_ip_stop=d_ip_e,
1124 local_port_start=s_port_s,
1125 local_port_stop=s_port_e,
1126 remote_port_start=d_port_s,
1127 remote_port_stop=d_port_e,
1128 )
1129
1130 # create the packet stream
1131 packets = self.create_stream(self.pg0, self.pg1, pkt_count)
1132 # add the stream to the source interface + enable capture
1133 self.pg0.add_stream(packets)
1134 self.pg0.enable_capture()
1135 self.pg1.enable_capture()
1136 # start the packet generator
1137 self.pg_start()
1138 # get capture
1139 capture = self.pg1.get_capture()
1140 for packet in capture:
1141 try:
1142 self.logger.debug(ppp("SPD - Got packet:", packet))
1143 except Exception:
1144 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1145 raise
1146 self.logger.debug("SPD: Num packets: %s", len(capture.res))
1147
1148 # assert nothing captured on pg0
1149 self.pg0.assert_nothing_captured()
1150 # verify captured packets
1151 self.verify_capture(self.pg0, self.pg1, capture)
1152 # verify all policies matched the expected number of times
1153 self.verify_policy_match(pkt_count, policy_0)
1154 self.verify_policy_match(0, policy_1)
1155
1156
1157class IPSec6SpdTestCaseReadd(SpdFastPathIPv6Outbound):
1158 """ IPSec/IPv6 outbound: Policy mode test case with fast path \
1159 (add, remove, re-add)"""
1160
1161 def test_ipsec_spd_outbound_readd(self):
1162 # In this test case, packets in IPv4 FWD path are configured
1163 # to go through IPSec outbound SPD policy lookup.
1164 # 2 SPD rules (1 HIGH and 1 LOW) are added.
1165 # High priority rule action is set to BYPASS.
1166 # Low priority rule action is set to DISCARD.
1167 # Traffic sent on pg0 interface should match high priority
1168 # rule and should be sent out on pg1 interface.
1169 # High priority rule is then removed.
1170 # Traffic sent on pg0 interface should match low priority
1171 # rule and should be discarded after SPD lookup.
1172 # Readd high priority rule.
1173 # Traffic sent on pg0 interface should match high priority
1174 # rule and should be sent out on pg1 interface.
1175 self.create_interfaces(2)
1176 pkt_count = 5
1177 self.spd_create_and_intf_add(1, [self.pg1])
1178 policy_0 = self.spd_add_rem_policy( # outbound, priority 10
1179 1,
1180 self.pg0,
1181 self.pg1,
1182 socket.IPPROTO_UDP,
1183 is_out=1,
1184 priority=10,
1185 policy_type="bypass",
1186 )
1187 policy_1 = self.spd_add_rem_policy( # outbound, priority 5
1188 1,
1189 self.pg0,
1190 self.pg1,
1191 socket.IPPROTO_UDP,
1192 is_out=1,
1193 priority=5,
1194 policy_type="discard",
1195 )
1196
1197 # create the packet stream
1198 packets = self.create_stream(self.pg0, self.pg1, pkt_count)
1199 # add the stream to the source interface + enable capture
1200 self.pg0.add_stream(packets)
1201 self.pg0.enable_capture()
1202 self.pg1.enable_capture()
1203 # start the packet generator
1204 self.pg_start()
1205 # get capture
1206 capture = self.pg1.get_capture()
1207 for packet in capture:
1208 try:
1209 self.logger.debug(ppp("SPD - Got packet:", packet))
1210 except Exception:
1211 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1212 raise
1213 self.logger.debug("SPD: Num packets: %s", len(capture.res))
1214
1215 # assert nothing captured on pg0
1216 self.pg0.assert_nothing_captured()
1217 # verify capture on pg1
1218 self.verify_capture(self.pg0, self.pg1, capture)
1219 # verify all policies matched the expected number of times
1220 self.verify_policy_match(pkt_count, policy_0)
1221 self.verify_policy_match(0, policy_1)
1222 # remove the bypass rule, leaving only the discard rule
1223 self.spd_add_rem_policy( # outbound, priority 10
1224 1,
1225 self.pg0,
1226 self.pg1,
1227 socket.IPPROTO_UDP,
1228 is_out=1,
1229 priority=10,
1230 policy_type="bypass",
1231 remove=True,
1232 )
1233
1234 # resend the same packets
1235 self.pg0.add_stream(packets)
1236 self.pg0.enable_capture() # flush the old captures
1237 self.pg1.enable_capture()
1238 self.pg_start()
1239
1240 # assert nothing captured on pg0
1241 self.pg0.assert_nothing_captured()
1242 # all packets will be dropped by SPD rule
1243 self.pg1.assert_nothing_captured()
1244 # verify all policies matched the expected number of times
1245 self.verify_policy_match(pkt_count, policy_0)
1246 self.verify_policy_match(pkt_count, policy_1)
1247
1248 # now readd the bypass rule
1249 policy_0 = self.spd_add_rem_policy( # outbound, priority 10
1250 1,
1251 self.pg0,
1252 self.pg1,
1253 socket.IPPROTO_UDP,
1254 is_out=1,
1255 priority=10,
1256 policy_type="bypass",
1257 )
1258
1259 # resend the same packets
1260 self.pg0.add_stream(packets)
1261 self.pg0.enable_capture() # flush the old captures
1262 self.pg1.enable_capture()
1263 self.pg_start()
1264
1265 # get capture
1266 capture = self.pg1.get_capture(pkt_count)
1267 for packet in capture:
1268 try:
1269 self.logger.debug(ppp("SPD - Got packet:", packet))
1270 except Exception:
1271 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1272 raise
1273 self.logger.debug("SPD: Num packets: %s", len(capture.res))
1274
1275 # assert nothing captured on pg0
1276 self.pg0.assert_nothing_captured()
1277 # verify captured packets
1278 self.verify_capture(self.pg0, self.pg1, capture)
1279 # verify all policies matched the expected number of times
1280 self.verify_policy_match(pkt_count, policy_0)
1281 self.verify_policy_match(pkt_count, policy_1)
1282
1283
1284class IPSec6SpdTestCaseMultiple(SpdFastPathIPv6Outbound):
1285 """ IPSec/IPv6 outbound: Policy mode test case with fast path \
1286 (multiple interfaces, multiple rules)"""
1287
1288 def test_ipsec_spd_outbound_multiple(self):
1289 # In this test case, packets in IPv4 FWD path are configured to go
1290 # through IPSec outbound SPD policy lookup.
1291 # Multiples rules on multiple interfaces are tested at the same time.
1292 # 3x interfaces are configured, binding the same SPD to each.
1293 # Each interface has 2 SPD rules (1 BYPASS and 1 DISCARD).
1294 # On pg0 & pg1, the BYPASS rule is HIGH priority
1295 # On pg2, the DISCARD rule is HIGH priority
1296 # Traffic should be received on pg0 & pg1 and dropped on pg2.
1297 self.create_interfaces(3)
1298 pkt_count = 5
1299 # bind SPD to all interfaces
1300 self.spd_create_and_intf_add(1, self.pg_interfaces)
1301 # add rules on all interfaces
1302 policy_01 = self.spd_add_rem_policy( # outbound, priority 10
1303 1,
1304 self.pg0,
1305 self.pg1,
1306 socket.IPPROTO_UDP,
1307 is_out=1,
1308 priority=10,
1309 policy_type="bypass",
1310 )
1311 policy_02 = self.spd_add_rem_policy( # outbound, priority 5
1312 1,
1313 self.pg0,
1314 self.pg1,
1315 socket.IPPROTO_UDP,
1316 is_out=1,
1317 priority=5,
1318 policy_type="discard",
1319 )
1320
1321 policy_11 = self.spd_add_rem_policy( # outbound, priority 10
1322 1,
1323 self.pg1,
1324 self.pg2,
1325 socket.IPPROTO_UDP,
1326 is_out=1,
1327 priority=10,
1328 policy_type="bypass",
1329 )
1330 policy_12 = self.spd_add_rem_policy( # outbound, priority 5
1331 1,
1332 self.pg1,
1333 self.pg2,
1334 socket.IPPROTO_UDP,
1335 is_out=1,
1336 priority=5,
1337 policy_type="discard",
1338 )
1339
1340 policy_21 = self.spd_add_rem_policy( # outbound, priority 5
1341 1,
1342 self.pg2,
1343 self.pg0,
1344 socket.IPPROTO_UDP,
1345 is_out=1,
1346 priority=5,
1347 policy_type="bypass",
1348 )
1349 policy_22 = self.spd_add_rem_policy( # outbound, priority 10
1350 1,
1351 self.pg2,
1352 self.pg0,
1353 socket.IPPROTO_UDP,
1354 is_out=1,
1355 priority=10,
1356 policy_type="discard",
1357 )
1358
1359 # interfaces bound to an SPD, will by default drop inbound
1360 # traffic with no matching policies. add catch-all inbound
1361 # bypass rule to SPD:
1362 self.spd_add_rem_policy( # inbound, all interfaces
1363 1,
1364 None,
1365 None,
1366 socket.IPPROTO_UDP,
1367 is_out=0,
1368 priority=10,
1369 policy_type="bypass",
1370 all_ips=True,
1371 )
1372
1373 # create the packet streams
1374 packets0 = self.create_stream(self.pg0, self.pg1, pkt_count)
1375 packets1 = self.create_stream(self.pg1, self.pg2, pkt_count)
1376 packets2 = self.create_stream(self.pg2, self.pg0, pkt_count)
1377 # add the streams to the source interfaces
1378 self.pg0.add_stream(packets0)
1379 self.pg1.add_stream(packets1)
1380 self.pg2.add_stream(packets2)
1381 # enable capture on all interfaces
1382 for pg in self.pg_interfaces:
1383 pg.enable_capture()
1384 # start the packet generator
1385 self.pg_start()
1386
1387 # get captures
1388 if_caps = []
1389 for pg in [self.pg1, self.pg2]: # we are expecting captures on pg1/pg2
1390 if_caps.append(pg.get_capture())
1391 for packet in if_caps[-1]:
1392 try:
1393 self.logger.debug(ppp("SPD - Got packet:", packet))
1394 except Exception:
1395 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1396 raise
1397 self.logger.debug("SPD: Num packets: %s", len(if_caps[0].res))
1398 self.logger.debug("SPD: Num packets: %s", len(if_caps[1].res))
1399
1400 # verify captures that matched BYPASS rule
1401 self.verify_capture(self.pg0, self.pg1, if_caps[0])
1402 self.verify_capture(self.pg1, self.pg2, if_caps[1])
1403 # verify that traffic to pg0 matched DISCARD rule and was dropped
1404 self.pg0.assert_nothing_captured()
1405 # verify all packets that were expected to match rules, matched
1406 # pg0 -> pg1
1407 self.verify_policy_match(pkt_count, policy_01)
1408 self.verify_policy_match(0, policy_02)
1409 # pg1 -> pg2
1410 self.verify_policy_match(pkt_count, policy_11)
1411 self.verify_policy_match(0, policy_12)
1412 # pg2 -> pg0
1413 self.verify_policy_match(0, policy_21)
1414 self.verify_policy_match(pkt_count, policy_22)
1415
1416
Fan Zhangab0bf0c2022-05-25 20:03:40 +00001417if __name__ == "__main__":
1418 unittest.main(testRunner=VppTestRunner)