blob: 645939c98a9d89b3bb7da2267c3461fef40c1615 [file] [log] [blame]
Fan Zhangab0bf0c2022-05-25 20:03:40 +00001import socket
2import unittest
3import ipaddress
4
5from util import ppp
6from framework import VppTestRunner
7from template_ipsec import IPSecIPv4Fwd
8
9
10class SpdFastPathOutbound(IPSecIPv4Fwd):
11 # Override setUpConstants to enable outbound fast path in config
12 @classmethod
13 def setUpConstants(cls):
14 super(SpdFastPathOutbound, cls).setUpConstants()
15 cls.vpp_cmdline.extend(["ipsec", "{", "ipv4-outbound-spd-fast-path on", "}"])
16 cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline))
17
18
19class IPSec4SpdTestCaseAdd(SpdFastPathOutbound):
20 """ IPSec/IPv4 outbound: Policy mode test case with fast path \
21 (add rule)"""
22
23 def test_ipsec_spd_outbound_add(self):
24 # In this test case, packets in IPv4 FWD path are configured
25 # to go through IPSec outbound SPD policy lookup.
26 # 2 SPD rules (1 HIGH and 1 LOW) are added.
27 # High priority rule action is set to BYPASS.
28 # Low priority rule action is set to DISCARD.
29 # Traffic sent on pg0 interface should match high priority
30 # rule and should be sent out on pg1 interface.
31 self.create_interfaces(2)
32 pkt_count = 5
33 s_port_s = 1111
34 s_port_e = 1111
35 d_port_s = 2222
36 d_port_e = 2222
37 self.spd_create_and_intf_add(1, [self.pg1])
38 policy_0 = self.spd_add_rem_policy( # outbound, priority 10
39 1,
40 self.pg0,
41 self.pg1,
42 socket.IPPROTO_UDP,
43 is_out=1,
44 priority=10,
45 policy_type="bypass",
46 local_port_start=s_port_s,
47 local_port_stop=s_port_e,
48 remote_port_start=d_port_s,
49 remote_port_stop=d_port_e,
50 )
51 policy_1 = self.spd_add_rem_policy( # outbound, priority 5
52 1,
53 self.pg0,
54 self.pg1,
55 socket.IPPROTO_UDP,
56 is_out=1,
57 priority=5,
58 policy_type="discard",
59 local_port_start=s_port_s,
60 local_port_stop=s_port_e,
61 remote_port_start=d_port_s,
62 remote_port_stop=d_port_e,
63 )
64
65 # create the packet stream
66 packets = self.create_stream(self.pg0, self.pg1, pkt_count, s_port_s, d_port_s)
67 # add the stream to the source interface + enable capture
68 self.pg0.add_stream(packets)
69 self.pg0.enable_capture()
70 self.pg1.enable_capture()
71 # start the packet generator
72 self.pg_start()
73 # get capture
74 capture = self.pg1.get_capture()
75 for packet in capture:
76 try:
77 self.logger.debug(ppp("SPD - Got packet:", packet))
78 except Exception:
79 self.logger.error(ppp("Unexpected or invalid packet:", packet))
80 raise
81 self.logger.debug("SPD: Num packets: %s", len(capture.res))
82
83 # assert nothing captured on pg0
84 self.pg0.assert_nothing_captured()
85 # verify captured packets
86 self.verify_capture(self.pg0, self.pg1, capture)
87 # verify all policies matched the expected number of times
88 self.verify_policy_match(pkt_count, policy_0)
89 self.verify_policy_match(0, policy_1)
90
91
92class IPSec4SpdTestCaseAddPortRange(SpdFastPathOutbound):
93 """ IPSec/IPv4 outbound: Policy mode test case with fast path \
94 (add all ips port range rule)"""
95
96 def test_ipsec_spd_outbound_add(self):
97 # In this test case, packets in IPv4 FWD path are configured
98 # to go through IPSec outbound SPD policy lookup.
99 # 2 SPD rules (1 HIGH and 1 LOW) are added.
100 # High priority rule action is set to BYPASS.
101 # Low priority rule action is set to DISCARD.
102 # Traffic sent on pg0 interface should match high priority
103 # rule and should be sent out on pg1 interface.
104 self.create_interfaces(2)
105 pkt_count = 5
106 s_port_s = 1000
107 s_port_e = 2023
108 d_port_s = 5000
109 d_port_e = 6023
110 self.spd_create_and_intf_add(1, [self.pg1])
111 policy_0 = self.spd_add_rem_policy( # outbound, priority 10
112 1,
113 self.pg0,
114 self.pg1,
115 socket.IPPROTO_UDP,
116 is_out=1,
117 priority=10,
118 policy_type="bypass",
119 all_ips=True,
120 local_port_start=s_port_s,
121 local_port_stop=s_port_e,
122 remote_port_start=d_port_s,
123 remote_port_stop=d_port_e,
124 )
125 policy_1 = self.spd_add_rem_policy( # outbound, priority 5
126 1,
127 self.pg0,
128 self.pg1,
129 socket.IPPROTO_UDP,
130 is_out=1,
131 priority=5,
132 policy_type="discard",
133 all_ips=True,
134 local_port_start=s_port_s,
135 local_port_stop=s_port_e,
136 remote_port_start=d_port_s,
137 remote_port_stop=d_port_e,
138 )
139
140 # create the packet stream
141 packets = self.create_stream(self.pg0, self.pg1, pkt_count, 1333, 5444)
142 # add the stream to the source interface + enable capture
143 self.pg0.add_stream(packets)
144 self.pg0.enable_capture()
145 self.pg1.enable_capture()
146 # start the packet generator
147 self.pg_start()
148 # get capture
149 capture = self.pg1.get_capture()
150 for packet in capture:
151 try:
152 self.logger.debug(ppp("SPD - Got packet:", packet))
153 except Exception:
154 self.logger.error(ppp("Unexpected or invalid packet:", packet))
155 raise
156 self.logger.debug("SPD: Num packets: %s", len(capture.res))
157
158 # assert nothing captured on pg0
159 self.pg0.assert_nothing_captured()
160 # verify captured packets
161 self.verify_capture(self.pg0, self.pg1, capture)
162 # verify all policies matched the expected number of times
163 self.verify_policy_match(pkt_count, policy_0)
164 self.verify_policy_match(0, policy_1)
165
166
167class IPSec4SpdTestCaseAddIPRange(SpdFastPathOutbound):
168 """ IPSec/IPv4 outbound: Policy mode test case with fast path \
169 (add ips range with any port rule)"""
170
171 def test_ipsec_spd_outbound_add(self):
172 # In this test case, packets in IPv4 FWD path are configured
173 # to go through IPSec outbound SPD policy lookup.
174 # 2 SPD rules (1 HIGH and 1 LOW) are added.
175 # High priority rule action is set to BYPASS.
176 # Low priority rule action is set to DISCARD.
177 # Traffic sent on pg0 interface should match high priority
178 # rule and should be sent out on pg1 interface.
179 self.create_interfaces(2)
180 pkt_count = 5
181 s_ip_s = ipaddress.ip_address(self.pg0.remote_ip4)
182 s_ip_e = ipaddress.ip_address(int(s_ip_s) + 5)
183 d_ip_s = ipaddress.ip_address(self.pg1.remote_ip4)
184 d_ip_e = ipaddress.ip_address(int(d_ip_s) + 0)
185 self.spd_create_and_intf_add(1, [self.pg1])
186 policy_0 = self.spd_add_rem_policy( # outbound, priority 10
187 1,
188 self.pg0,
189 self.pg1,
190 socket.IPPROTO_UDP,
191 is_out=1,
192 priority=10,
193 policy_type="bypass",
194 ip_range=True,
195 local_ip_start=s_ip_s,
196 local_ip_stop=s_ip_e,
197 remote_ip_start=d_ip_s,
198 remote_ip_stop=d_ip_e,
199 )
200 policy_1 = self.spd_add_rem_policy( # outbound, priority 5
201 1,
202 self.pg0,
203 self.pg1,
204 socket.IPPROTO_UDP,
205 is_out=1,
206 priority=5,
207 policy_type="discard",
208 ip_range=True,
209 local_ip_start=s_ip_s,
210 local_ip_stop=s_ip_e,
211 remote_ip_start=d_ip_s,
212 remote_ip_stop=d_ip_e,
213 )
214
215 # create the packet stream
216 packets = self.create_stream(self.pg0, self.pg1, pkt_count)
217 # add the stream to the source interface + enable capture
218 self.pg0.add_stream(packets)
219 self.pg0.enable_capture()
220 self.pg1.enable_capture()
221 # start the packet generator
222 self.pg_start()
223 # get capture
224 capture = self.pg1.get_capture()
225 for packet in capture:
226 try:
227 self.logger.debug(ppp("SPD - Got packet:", packet))
228 except Exception:
229 self.logger.error(ppp("Unexpected or invalid packet:", packet))
230 raise
231 self.logger.debug("SPD: Num packets: %s", len(capture.res))
232
233 # assert nothing captured on pg0
234 self.pg0.assert_nothing_captured()
235 # verify captured packets
236 self.verify_capture(self.pg0, self.pg1, capture)
237 # verify all policies matched the expected number of times
238 self.verify_policy_match(pkt_count, policy_0)
239 self.verify_policy_match(0, policy_1)
240
241
242class IPSec4SpdTestCaseAddIPAndPortRange(SpdFastPathOutbound):
243 """ IPSec/IPv4 outbound: Policy mode test case with fast path \
244 (add all ips range rule)"""
245
246 def test_ipsec_spd_outbound_add(self):
247 # In this test case, packets in IPv4 FWD path are configured
248 # to go through IPSec outbound SPD policy lookup.
249 # 2 SPD rules (1 HIGH and 1 LOW) are added.
250 # High priority rule action is set to BYPASS.
251 # Low priority rule action is set to DISCARD.
252 # Traffic sent on pg0 interface should match high priority
253 # rule and should be sent out on pg1 interface.
254 # in this test we define ranges of ports and ip addresses.
255 self.create_interfaces(2)
256 pkt_count = 5
257 s_port_s = 1000
258 s_port_e = 1000 + 1023
259 d_port_s = 5000
260 d_port_e = 5000 + 1023
261
262 s_ip_s = ipaddress.ip_address(
263 int(ipaddress.ip_address(self.pg0.remote_ip4)) - 24
264 )
265 s_ip_e = ipaddress.ip_address(int(s_ip_s) + 255)
266 d_ip_s = ipaddress.ip_address(self.pg1.remote_ip4)
267 d_ip_e = ipaddress.ip_address(int(d_ip_s) + 255)
268 self.spd_create_and_intf_add(1, [self.pg1])
269 policy_0 = self.spd_add_rem_policy( # outbound, priority 10
270 1,
271 self.pg0,
272 self.pg1,
273 socket.IPPROTO_UDP,
274 is_out=1,
275 priority=10,
276 policy_type="bypass",
277 ip_range=True,
278 local_ip_start=s_ip_s,
279 local_ip_stop=s_ip_e,
280 remote_ip_start=d_ip_s,
281 remote_ip_stop=d_ip_e,
282 local_port_start=s_port_s,
283 local_port_stop=s_port_e,
284 remote_port_start=d_port_s,
285 remote_port_stop=d_port_e,
286 )
287 policy_1 = self.spd_add_rem_policy( # outbound, priority 5
288 1,
289 self.pg0,
290 self.pg1,
291 socket.IPPROTO_UDP,
292 is_out=1,
293 priority=5,
294 policy_type="discard",
295 ip_range=True,
296 local_ip_start=s_ip_s,
297 local_ip_stop=s_ip_e,
298 remote_ip_start=d_ip_s,
299 remote_ip_stop=d_ip_e,
300 local_port_start=s_port_s,
301 local_port_stop=s_port_e,
302 remote_port_start=d_port_s,
303 remote_port_stop=d_port_e,
304 )
305
306 # create the packet stream
307 packets = self.create_stream(self.pg0, self.pg1, pkt_count)
308 # add the stream to the source interface + enable capture
309 self.pg0.add_stream(packets)
310 self.pg0.enable_capture()
311 self.pg1.enable_capture()
312 # start the packet generator
313 self.pg_start()
314 # get capture
315 capture = self.pg1.get_capture()
316 for packet in capture:
317 try:
318 self.logger.debug(ppp("SPD - Got packet:", packet))
319 except Exception:
320 self.logger.error(ppp("Unexpected or invalid packet:", packet))
321 raise
322 self.logger.debug("SPD: Num packets: %s", len(capture.res))
323
324 # assert nothing captured on pg0
325 self.pg0.assert_nothing_captured()
326 # verify captured packets
327 self.verify_capture(self.pg0, self.pg1, capture)
328 # verify all policies matched the expected number of times
329 self.verify_policy_match(pkt_count, policy_0)
330 self.verify_policy_match(0, policy_1)
331
332
333class IPSec4SpdTestCaseAddAll(SpdFastPathOutbound):
334 """ IPSec/IPv4 outbound: Policy mode test case with fast path \
335 (add all ips ports rule)"""
336
337 def test_ipsec_spd_outbound_add(self):
338 # In this test case, packets in IPv4 FWD path are configured
339 # to go through IPSec outbound SPD policy lookup.
340 # 2 SPD rules (1 HIGH and 1 LOW) are added.
341 # Low priority rule action is set to BYPASS all ips.
342 # High priority rule action is set to DISCARD all ips.
343 # Traffic sent on pg0 interface when LOW priority rule is added,
344 # expect the packet is being sent out to pg1. Then HIGH priority
345 # rule is added and send the same traffic to pg0, this time expect
346 # the traffic is dropped.
347 self.create_interfaces(2)
348 pkt_count = 5
349 self.spd_create_and_intf_add(1, [self.pg1])
350 policy_0 = self.spd_add_rem_policy( # outbound, priority 10
351 1,
352 self.pg0,
353 self.pg1,
354 socket.IPPROTO_UDP,
355 is_out=1,
356 priority=10,
357 policy_type="bypass",
358 all_ips=True,
359 )
360
361 # create the packet stream
362 packets = self.create_stream(self.pg0, self.pg1, pkt_count)
363 # add the stream to the source interface + enable capture
364 self.pg0.add_stream(packets)
365 self.pg0.enable_capture()
366 self.pg1.enable_capture()
367 # start the packet generator
368 self.pg_start()
369 # get capture
370 capture = self.pg1.get_capture()
371 for packet in capture:
372 try:
373 self.logger.debug(ppp("SPD - Got packet:", packet))
374 except Exception:
375 self.logger.error(ppp("Unexpected or invalid packet:", packet))
376 raise
377 self.logger.debug("SPD: Num packets: %s", len(capture.res))
378
379 # assert nothing captured on pg0
380 self.pg0.assert_nothing_captured()
381 # verify captured packets
382 self.verify_capture(self.pg0, self.pg1, capture)
383 # verify all policies matched the expected number of times
384 self.verify_policy_match(pkt_count, policy_0)
385
386 policy_1 = self.spd_add_rem_policy( # outbound, priority 20
387 1,
388 self.pg0,
389 self.pg1,
390 socket.IPPROTO_UDP,
391 is_out=1,
392 priority=20,
393 policy_type="discard",
394 all_ips=True,
395 )
396
397 # create the packet stream
398 packets = self.create_stream(self.pg0, self.pg1, pkt_count)
399 # add the stream to the source interface + enable capture
400 self.pg0.add_stream(packets)
401 self.pg0.enable_capture()
402 self.pg1.enable_capture()
403 # start the packet generator
404 self.pg_start()
405 # assert nothing captured on pg0 and pg1
406 self.pg0.assert_nothing_captured()
407 self.pg1.assert_nothing_captured()
408
409
410class IPSec4SpdTestCaseRemove(SpdFastPathOutbound):
411 """ IPSec/IPv4 outbound: Policy mode test case with fast path \
412 (remove rule)"""
413
414 def test_ipsec_spd_outbound_remove(self):
415 # In this test case, packets in IPv4 FWD path are configured
416 # to go through IPSec outbound SPD policy lookup.
417 # 2 SPD rules (1 HIGH and 1 LOW) are added.
418 # High priority rule action is set to BYPASS.
419 # Low priority rule action is set to DISCARD.
420 # High priority rule is then removed.
421 # Traffic sent on pg0 interface should match low priority
422 # rule and should be discarded after SPD lookup.
423 self.create_interfaces(2)
424 pkt_count = 5
425 self.spd_create_and_intf_add(1, [self.pg1])
426 policy_0 = self.spd_add_rem_policy( # outbound, priority 10
427 1,
428 self.pg0,
429 self.pg1,
430 socket.IPPROTO_UDP,
431 is_out=1,
432 priority=10,
433 policy_type="bypass",
434 )
435 policy_1 = self.spd_add_rem_policy( # outbound, priority 5
436 1,
437 self.pg0,
438 self.pg1,
439 socket.IPPROTO_UDP,
440 is_out=1,
441 priority=5,
442 policy_type="discard",
443 )
444
445 # create the packet stream
446 packets = self.create_stream(self.pg0, self.pg1, pkt_count)
447 # add the stream to the source interface + enable capture
448 self.pg0.add_stream(packets)
449 self.pg0.enable_capture()
450 self.pg1.enable_capture()
451 # start the packet generator
452 self.pg_start()
453 # get capture
454 capture = self.pg1.get_capture()
455 for packet in capture:
456 try:
457 self.logger.debug(ppp("SPD - Got packet:", packet))
458 except Exception:
459 self.logger.error(ppp("Unexpected or invalid packet:", packet))
460 raise
461
462 # assert nothing captured on pg0
463 self.pg0.assert_nothing_captured()
464 # verify capture on pg1
465 self.logger.debug("SPD: Num packets: %s", len(capture.res))
466 self.verify_capture(self.pg0, self.pg1, capture)
467 # verify all policies matched the expected number of times
468 self.verify_policy_match(pkt_count, policy_0)
469 self.verify_policy_match(0, policy_1)
470 # check policy in SPD has been cached after traffic
471 # matched BYPASS rule in SPD
472 # now remove the bypass rule
473 self.spd_add_rem_policy( # outbound, priority 10
474 1,
475 self.pg0,
476 self.pg1,
477 socket.IPPROTO_UDP,
478 is_out=1,
479 priority=10,
480 policy_type="bypass",
481 remove=True,
482 )
483
484 # resend the same packets
485 self.pg0.add_stream(packets)
486 self.pg0.enable_capture() # flush the old captures
487 self.pg1.enable_capture()
488 self.pg_start()
489 # assert nothing captured on pg0
490 self.pg0.assert_nothing_captured()
491 # all packets will be dropped by SPD rule
492 self.pg1.assert_nothing_captured()
493 # verify all policies matched the expected number of times
494 self.verify_policy_match(pkt_count, policy_0)
495 self.verify_policy_match(pkt_count, policy_1)
496
497
498class IPSec4SpdTestCaseReadd(SpdFastPathOutbound):
499 """ IPSec/IPv4 outbound: Policy mode test case with fast path \
500 (add, remove, re-add)"""
501
502 def test_ipsec_spd_outbound_readd(self):
503 # In this test case, packets in IPv4 FWD path are configured
504 # to go through IPSec outbound SPD policy lookup.
505 # 2 SPD rules (1 HIGH and 1 LOW) are added.
506 # High priority rule action is set to BYPASS.
507 # Low priority rule action is set to DISCARD.
508 # Traffic sent on pg0 interface should match high priority
509 # rule and should be sent out on pg1 interface.
510 # High priority rule is then removed.
511 # Traffic sent on pg0 interface should match low priority
512 # rule and should be discarded after SPD lookup.
513 # Readd high priority rule.
514 # Traffic sent on pg0 interface should match high priority
515 # rule and should be sent out on pg1 interface.
516 self.create_interfaces(2)
517 pkt_count = 5
518 self.spd_create_and_intf_add(1, [self.pg1])
519 policy_0 = self.spd_add_rem_policy( # outbound, priority 10
520 1,
521 self.pg0,
522 self.pg1,
523 socket.IPPROTO_UDP,
524 is_out=1,
525 priority=10,
526 policy_type="bypass",
527 )
528 policy_1 = self.spd_add_rem_policy( # outbound, priority 5
529 1,
530 self.pg0,
531 self.pg1,
532 socket.IPPROTO_UDP,
533 is_out=1,
534 priority=5,
535 policy_type="discard",
536 )
537
538 # create the packet stream
539 packets = self.create_stream(self.pg0, self.pg1, pkt_count)
540 # add the stream to the source interface + enable capture
541 self.pg0.add_stream(packets)
542 self.pg0.enable_capture()
543 self.pg1.enable_capture()
544 # start the packet generator
545 self.pg_start()
546 # get capture
547 capture = self.pg1.get_capture()
548 for packet in capture:
549 try:
550 self.logger.debug(ppp("SPD - Got packet:", packet))
551 except Exception:
552 self.logger.error(ppp("Unexpected or invalid packet:", packet))
553 raise
554 self.logger.debug("SPD: Num packets: %s", len(capture.res))
555
556 # assert nothing captured on pg0
557 self.pg0.assert_nothing_captured()
558 # verify capture on pg1
559 self.verify_capture(self.pg0, self.pg1, capture)
560 # verify all policies matched the expected number of times
561 self.verify_policy_match(pkt_count, policy_0)
562 self.verify_policy_match(0, policy_1)
563 # check policy in SPD has been cached after traffic
564 # matched BYPASS rule in SPD
565
566 # now remove the bypass rule, leaving only the discard rule
567 self.spd_add_rem_policy( # outbound, priority 10
568 1,
569 self.pg0,
570 self.pg1,
571 socket.IPPROTO_UDP,
572 is_out=1,
573 priority=10,
574 policy_type="bypass",
575 remove=True,
576 )
577
578 # resend the same packets
579 self.pg0.add_stream(packets)
580 self.pg0.enable_capture() # flush the old captures
581 self.pg1.enable_capture()
582 self.pg_start()
583
584 # assert nothing captured on pg0
585 self.pg0.assert_nothing_captured()
586 # all packets will be dropped by SPD rule
587 self.pg1.assert_nothing_captured()
588 # verify all policies matched the expected number of times
589 self.verify_policy_match(pkt_count, policy_0)
590 self.verify_policy_match(pkt_count, policy_1)
591
592 # now readd the bypass rule
593 policy_0 = self.spd_add_rem_policy( # outbound, priority 10
594 1,
595 self.pg0,
596 self.pg1,
597 socket.IPPROTO_UDP,
598 is_out=1,
599 priority=10,
600 policy_type="bypass",
601 )
602
603 # resend the same packets
604 self.pg0.add_stream(packets)
605 self.pg0.enable_capture() # flush the old captures
606 self.pg1.enable_capture()
607 self.pg_start()
608
609 # get capture
610 capture = self.pg1.get_capture(pkt_count)
611 for packet in capture:
612 try:
613 self.logger.debug(ppp("SPD - Got packet:", packet))
614 except Exception:
615 self.logger.error(ppp("Unexpected or invalid packet:", packet))
616 raise
617 self.logger.debug("SPD: Num packets: %s", len(capture.res))
618
619 # assert nothing captured on pg0
620 self.pg0.assert_nothing_captured()
621 # verify captured packets
622 self.verify_capture(self.pg0, self.pg1, capture)
623 # verify all policies matched the expected number of times
624 self.verify_policy_match(pkt_count, policy_0)
625 self.verify_policy_match(pkt_count, policy_1)
626
627
628class IPSec4SpdTestCaseMultiple(SpdFastPathOutbound):
629 """ IPSec/IPv4 outbound: Policy mode test case with fast path \
630 (multiple interfaces, multiple rules)"""
631
632 def test_ipsec_spd_outbound_multiple(self):
633 # In this test case, packets in IPv4 FWD path are configured to go
634 # through IPSec outbound SPD policy lookup.
635 # Multiples rules on multiple interfaces are tested at the same time.
636 # 3x interfaces are configured, binding the same SPD to each.
637 # Each interface has 2 SPD rules (1 BYPASS and 1 DISCARD).
638 # On pg0 & pg1, the BYPASS rule is HIGH priority
639 # On pg2, the DISCARD rule is HIGH priority
640 # Traffic should be received on pg0 & pg1 and dropped on pg2.
641 self.create_interfaces(3)
642 pkt_count = 5
643 # bind SPD to all interfaces
644 self.spd_create_and_intf_add(1, self.pg_interfaces)
645 # add rules on all interfaces
646 policy_01 = self.spd_add_rem_policy( # outbound, priority 10
647 1,
648 self.pg0,
649 self.pg1,
650 socket.IPPROTO_UDP,
651 is_out=1,
652 priority=10,
653 policy_type="bypass",
654 )
655 policy_02 = self.spd_add_rem_policy( # outbound, priority 5
656 1,
657 self.pg0,
658 self.pg1,
659 socket.IPPROTO_UDP,
660 is_out=1,
661 priority=5,
662 policy_type="discard",
663 )
664
665 policy_11 = self.spd_add_rem_policy( # outbound, priority 10
666 1,
667 self.pg1,
668 self.pg2,
669 socket.IPPROTO_UDP,
670 is_out=1,
671 priority=10,
672 policy_type="bypass",
673 )
674 policy_12 = self.spd_add_rem_policy( # outbound, priority 5
675 1,
676 self.pg1,
677 self.pg2,
678 socket.IPPROTO_UDP,
679 is_out=1,
680 priority=5,
681 policy_type="discard",
682 )
683
684 policy_21 = self.spd_add_rem_policy( # outbound, priority 5
685 1,
686 self.pg2,
687 self.pg0,
688 socket.IPPROTO_UDP,
689 is_out=1,
690 priority=5,
691 policy_type="bypass",
692 )
693 policy_22 = self.spd_add_rem_policy( # outbound, priority 10
694 1,
695 self.pg2,
696 self.pg0,
697 socket.IPPROTO_UDP,
698 is_out=1,
699 priority=10,
700 policy_type="discard",
701 )
702
703 # interfaces bound to an SPD, will by default drop inbound
704 # traffic with no matching policies. add catch-all inbound
705 # bypass rule to SPD:
706 self.spd_add_rem_policy( # inbound, all interfaces
707 1,
708 None,
709 None,
710 socket.IPPROTO_UDP,
711 is_out=0,
712 priority=10,
713 policy_type="bypass",
714 all_ips=True,
715 )
716
717 # create the packet streams
718 packets0 = self.create_stream(self.pg0, self.pg1, pkt_count)
719 packets1 = self.create_stream(self.pg1, self.pg2, pkt_count)
720 packets2 = self.create_stream(self.pg2, self.pg0, pkt_count)
721 # add the streams to the source interfaces
722 self.pg0.add_stream(packets0)
723 self.pg1.add_stream(packets1)
724 self.pg2.add_stream(packets2)
725 # enable capture on all interfaces
726 for pg in self.pg_interfaces:
727 pg.enable_capture()
728 # start the packet generator
729 self.pg_start()
730
731 # get captures
732 if_caps = []
733 for pg in [self.pg1, self.pg2]: # we are expecting captures on pg1/pg2
734 if_caps.append(pg.get_capture())
735 for packet in if_caps[-1]:
736 try:
737 self.logger.debug(ppp("SPD - Got packet:", packet))
738 except Exception:
739 self.logger.error(ppp("Unexpected or invalid packet:", packet))
740 raise
741 self.logger.debug("SPD: Num packets: %s", len(if_caps[0].res))
742 self.logger.debug("SPD: Num packets: %s", len(if_caps[1].res))
743
744 # verify captures that matched BYPASS rule
745 self.verify_capture(self.pg0, self.pg1, if_caps[0])
746 self.verify_capture(self.pg1, self.pg2, if_caps[1])
747 # verify that traffic to pg0 matched DISCARD rule and was dropped
748 self.pg0.assert_nothing_captured()
749 # verify all packets that were expected to match rules, matched
750 # pg0 -> pg1
751 self.verify_policy_match(pkt_count, policy_01)
752 self.verify_policy_match(0, policy_02)
753 # pg1 -> pg2
754 self.verify_policy_match(pkt_count, policy_11)
755 self.verify_policy_match(0, policy_12)
756 # pg2 -> pg0
757 self.verify_policy_match(0, policy_21)
758 self.verify_policy_match(pkt_count, policy_22)
759
760
761if __name__ == "__main__":
762 unittest.main(testRunner=VppTestRunner)