Zachary Leaf | 26fec71 | 2021-10-26 10:05:58 -0500 | [diff] [blame] | 1 | import socket |
| 2 | import unittest |
| 3 | |
| 4 | from util import ppp |
| 5 | from framework import VppTestRunner |
| 6 | from template_ipsec import IPSecIPv4Fwd |
| 7 | |
| 8 | """ |
| 9 | When an IPSec SPD is configured on an interface, any inbound packets |
| 10 | not matching inbound policies, or outbound packets not matching outbound |
| 11 | policies, must be dropped by default as per RFC4301. |
| 12 | |
| 13 | This test uses simple IPv4 forwarding on interfaces with IPSec enabled |
| 14 | to check if packets with no matching rules are dropped by default. |
| 15 | |
| 16 | The basic setup is a single SPD bound to two interfaces, pg0 and pg1. |
| 17 | |
| 18 | ┌────┐ ┌────┐ |
| 19 | │SPD1│ │SPD1│ |
| 20 | ├────┤ ─────> ├────┤ |
| 21 | │PG0 │ │PG1 │ |
| 22 | └────┘ └────┘ |
| 23 | |
| 24 | First, both inbound and outbound BYPASS policies are configured allowing |
| 25 | traffic to pass from pg0 -> pg1. |
| 26 | |
| 27 | Packets are captured and verified at pg1. |
| 28 | |
| 29 | Then either the inbound or outbound policies are removed and we verify |
| 30 | packets are dropped as expected. |
| 31 | |
| 32 | """ |
| 33 | |
| 34 | |
| 35 | class IPSecInboundDefaultDrop(IPSecIPv4Fwd): |
| 36 | """ IPSec: inbound packets drop by default with no matching rule """ |
| 37 | def test_ipsec_inbound_default_drop(self): |
| 38 | # configure two interfaces and bind the same SPD to both |
| 39 | self.create_interfaces(2) |
| 40 | self.spd_create_and_intf_add(1, self.pg_interfaces) |
| 41 | pkt_count = 5 |
| 42 | |
| 43 | # catch-all inbound BYPASS policy, all interfaces |
| 44 | inbound_policy = self.spd_add_rem_policy( |
| 45 | 1, None, None, socket.IPPROTO_UDP, is_out=0, priority=10, |
| 46 | policy_type="bypass", all_ips=True) |
| 47 | |
| 48 | # outbound BYPASS policy allowing traffic from pg0->pg1 |
| 49 | outbound_policy = self.spd_add_rem_policy( |
| 50 | 1, self.pg0, self.pg1, socket.IPPROTO_UDP, |
| 51 | is_out=1, priority=10, policy_type="bypass") |
| 52 | |
| 53 | # create a packet stream pg0->pg1 + add to pg0 |
| 54 | packets0 = self.create_stream(self.pg0, self.pg1, pkt_count) |
| 55 | self.pg0.add_stream(packets0) |
| 56 | |
| 57 | # with inbound BYPASS rule at pg0, we expect to see forwarded |
| 58 | # packets on pg1 |
| 59 | self.pg_interfaces[1].enable_capture() |
| 60 | self.pg_start() |
| 61 | cap1 = self.pg1.get_capture() |
| 62 | for packet in cap1: |
| 63 | try: |
| 64 | self.logger.debug(ppp("SPD - Got packet:", packet)) |
| 65 | except Exception: |
| 66 | self.logger.error( |
| 67 | ppp("Unexpected or invalid packet:", packet)) |
| 68 | raise |
| 69 | self.logger.debug("SPD: Num packets: %s", len(cap1.res)) |
| 70 | # verify captures on pg1 |
| 71 | self.verify_capture(self.pg0, self.pg1, cap1) |
| 72 | # verify policies matched correct number of times |
| 73 | self.verify_policy_match(pkt_count, inbound_policy) |
| 74 | self.verify_policy_match(pkt_count, outbound_policy) |
| 75 | |
| 76 | # remove inbound catch-all BYPASS rule, traffic should now be dropped |
| 77 | self.spd_add_rem_policy( # inbound, all interfaces |
| 78 | 1, None, None, socket.IPPROTO_UDP, is_out=0, priority=10, |
| 79 | policy_type="bypass", all_ips=True, remove=True) |
| 80 | |
| 81 | # create another packet stream pg0->pg1 + add to pg0 |
| 82 | packets1 = self.create_stream(self.pg0, self.pg1, pkt_count) |
| 83 | self.pg0.add_stream(packets1) |
| 84 | self.pg_interfaces[1].enable_capture() |
| 85 | self.pg_start() |
| 86 | # confirm traffic has now been dropped |
| 87 | self.pg1.assert_nothing_captured("inbound pkts with no matching \ |
| 88 | rules NOT dropped by default") |
| 89 | # both policies should not have matched any further packets |
| 90 | # since we've dropped at input stage |
| 91 | self.verify_policy_match(pkt_count, outbound_policy) |
| 92 | self.verify_policy_match(pkt_count, inbound_policy) |
| 93 | |
| 94 | |
| 95 | class IPSecOutboundDefaultDrop(IPSecIPv4Fwd): |
| 96 | """ IPSec: outbound packets drop by default with no matching rule """ |
| 97 | def test_ipsec_inbound_default_drop(self): |
| 98 | # configure two interfaces and bind the same SPD to both |
| 99 | self.create_interfaces(2) |
| 100 | self.spd_create_and_intf_add(1, self.pg_interfaces) |
| 101 | pkt_count = 5 |
| 102 | |
| 103 | # catch-all inbound BYPASS policy, all interfaces |
| 104 | inbound_policy = self.spd_add_rem_policy( |
| 105 | 1, None, None, socket.IPPROTO_UDP, is_out=0, priority=10, |
| 106 | policy_type="bypass", all_ips=True) |
| 107 | |
| 108 | # outbound BYPASS policy allowing traffic from pg0->pg1 |
| 109 | outbound_policy = self.spd_add_rem_policy( |
| 110 | 1, self.pg0, self.pg1, socket.IPPROTO_UDP, |
| 111 | is_out=1, priority=10, policy_type="bypass") |
| 112 | |
| 113 | # create a packet stream pg0->pg1 + add to pg0 |
| 114 | packets0 = self.create_stream(self.pg0, self.pg1, pkt_count) |
| 115 | self.pg0.add_stream(packets0) |
| 116 | |
| 117 | # with outbound BYPASS rule allowing pg0->pg1, we expect to see |
| 118 | # forwarded packets on pg1 |
| 119 | self.pg_interfaces[1].enable_capture() |
| 120 | self.pg_start() |
| 121 | cap1 = self.pg1.get_capture() |
| 122 | for packet in cap1: |
| 123 | try: |
| 124 | self.logger.debug(ppp("SPD - Got packet:", packet)) |
| 125 | except Exception: |
| 126 | self.logger.error( |
| 127 | ppp("Unexpected or invalid packet:", packet)) |
| 128 | raise |
| 129 | self.logger.debug("SPD: Num packets: %s", len(cap1.res)) |
| 130 | # verify captures on pg1 |
| 131 | self.verify_capture(self.pg0, self.pg1, cap1) |
| 132 | # verify policies matched correct number of times |
| 133 | self.verify_policy_match(pkt_count, inbound_policy) |
| 134 | self.verify_policy_match(pkt_count, outbound_policy) |
| 135 | |
| 136 | # remove outbound rule |
| 137 | self.spd_add_rem_policy( |
| 138 | 1, self.pg0, self.pg1, socket.IPPROTO_UDP, |
| 139 | is_out=1, priority=10, policy_type="bypass", |
| 140 | remove=True) |
| 141 | |
| 142 | # create another packet stream pg0->pg1 + add to pg0 |
| 143 | packets1 = self.create_stream(self.pg0, self.pg1, pkt_count) |
| 144 | self.pg0.add_stream(packets1) |
| 145 | self.pg_interfaces[1].enable_capture() |
| 146 | self.pg_start() |
| 147 | # confirm traffic was dropped and not forwarded |
| 148 | self.pg1.assert_nothing_captured("outbound pkts with no matching \ |
| 149 | rules NOT dropped by default") |
| 150 | # inbound rule should have matched twice the # of pkts now |
| 151 | self.verify_policy_match(pkt_count*2, inbound_policy) |
| 152 | # as dropped at outbound, outbound policy is the same |
| 153 | self.verify_policy_match(pkt_count, outbound_policy) |
| 154 | |
| 155 | if __name__ == '__main__': |
| 156 | unittest.main(testRunner=VppTestRunner) |