ipsec: huge anti-replay window support

Type: improvement

Since RFC4303 does not specify the anti-replay window size, VPP should
support multiple window size. It is done through a clib_bitmap.

Signed-off-by: Maxime Peim <mpeim@cisco.com>
Change-Id: I3dfe30efd20018e345418bef298ec7cec19b1cfc
diff --git a/test/template_ipsec.py b/test/template_ipsec.py
index 5d83510..c438fbb 100644
--- a/test/template_ipsec.py
+++ b/test/template_ipsec.py
@@ -51,6 +51,8 @@
         self.outer_flow_label = 0
         self.inner_flow_label = 0x12345
 
+        self.anti_replay_window_size = 64
+
         self.auth_algo_vpp_id = (
             VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
         )
@@ -98,6 +100,8 @@
         self.outer_flow_label = 0
         self.inner_flow_label = 0x12345
 
+        self.anti_replay_window_size = 64
+
         self.auth_algo_vpp_id = (
             VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
         )
@@ -625,22 +629,34 @@
     def verify_tra_anti_replay(self):
         p = self.params[socket.AF_INET]
         esn_en = p.vpp_tra_sa.esn_en
+        anti_replay_window_size = p.anti_replay_window_size
 
         seq_cycle_node_name = "/err/%s/seq_cycled" % self.tra4_encrypt_node_name
         replay_count = self.get_replay_counts(p)
+        initial_sa_node_replay_diff = replay_count - p.tra_sa_in.get_err("replay")
         hash_failed_count = self.get_hash_failed_counts(p)
         seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
+        initial_sa_node_cycled_diff = seq_cycle_count - p.tra_sa_in.get_err(
+            "seq_cycled"
+        )
         hash_err = "integ_error"
 
         if ESP == self.encryption_type:
             undersize_node_name = "/err/%s/runt" % self.tra4_decrypt_node_name[0]
             undersize_count = self.statistics.get_err_counter(undersize_node_name)
+            initial_sa_node_undersize_diff = undersize_count - p.tra_sa_in.get_err(
+                "runt"
+            )
             # For AES-GCM an error in the hash is reported as a decryption failure
             if p.crypt_algo in ("AES-GCM", "AES-NULL-GMAC"):
                 hash_err = "decryption_failed"
         # In async mode, we don't report errors in the hash.
         if p.async_mode:
             hash_err = ""
+        else:
+            initial_sa_node_hash_diff = hash_failed_count - p.tra_sa_in.get_err(
+                hash_err
+            )
 
         #
         # send packets with seq numbers 1->34
@@ -666,7 +682,7 @@
         self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
         replay_count += len(pkts)
         self.assertEqual(self.get_replay_counts(p), replay_count)
-        err = p.tra_sa_in.get_err("replay")
+        err = p.tra_sa_in.get_err("replay") + initial_sa_node_replay_diff
         self.assertEqual(err, replay_count)
 
         #
@@ -684,29 +700,31 @@
         recv_pkts = self.send_and_expect(self.tra_if, pkts * 8, self.tra_if, n_rx=1)
         replay_count += 7
         self.assertEqual(self.get_replay_counts(p), replay_count)
-        err = p.tra_sa_in.get_err("replay")
+        err = p.tra_sa_in.get_err("replay") + initial_sa_node_replay_diff
         self.assertEqual(err, replay_count)
 
         #
-        # now move the window over to 257 (more than one byte) and into Case A
+        # now move the window over to anti_replay_window_size + 100 and into Case A
         #
         self.vapi.cli("clear error")
         pkt = Ether(
             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
         ) / p.scapy_tra_sa.encrypt(
             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
-            seq_num=257,
+            seq_num=anti_replay_window_size + 100,
         )
         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
 
+        self.logger.info(self.vapi.ppcli("show ipsec sa 1"))
+
         # replayed packets are dropped
         self.send_and_assert_no_replies(self.tra_if, pkt * 3, timeout=0.2)
         replay_count += 3
         self.assertEqual(self.get_replay_counts(p), replay_count)
-        err = p.tra_sa_in.get_err("replay")
+        err = p.tra_sa_in.get_err("replay") + initial_sa_node_replay_diff
         self.assertEqual(err, replay_count)
 
-        # the window size is 64 packets
+        # the window size is anti_replay_window_size packets
         # in window are still accepted
         pkt = Ether(
             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
@@ -714,7 +732,6 @@
             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
             seq_num=200,
         )
-        recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
 
         # a packet that does not decrypt does not move the window forward
         bogus_sa = SecurityAssociation(
@@ -729,14 +746,14 @@
             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
         ) / bogus_sa.encrypt(
             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
-            seq_num=350,
+            seq_num=anti_replay_window_size + 200,
         )
         self.send_and_assert_no_replies(self.tra_if, pkt * 17, timeout=0.2)
 
         hash_failed_count += 17
         self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
         if hash_err != "":
-            err = p.tra_sa_in.get_err(hash_err)
+            err = p.tra_sa_in.get_err(hash_err) + initial_sa_node_hash_diff
             self.assertEqual(err, hash_failed_count)
 
         # a malformed 'runt' packet
@@ -747,13 +764,13 @@
                 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
             ) / bogus_sa.encrypt(
                 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
-                seq_num=350,
+                seq_num=anti_replay_window_size + 200,
             )
             self.send_and_assert_no_replies(self.tra_if, pkt * 17, timeout=0.2)
 
             undersize_count += 17
             self.assert_error_counter_equal(undersize_node_name, undersize_count)
-            err = p.tra_sa_in.get_err("runt")
+            err = p.tra_sa_in.get_err("runt") + initial_sa_node_undersize_diff
             self.assertEqual(err, undersize_count)
 
         # which we can determine since this packet is still in the window
@@ -784,21 +801,21 @@
             hash_failed_count += 17
             self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
             if hash_err != "":
-                err = p.tra_sa_in.get_err(hash_err)
+                err = p.tra_sa_in.get_err(hash_err) + initial_sa_node_hash_diff
                 self.assertEqual(err, hash_failed_count)
 
         else:
             replay_count += 17
             self.assertEqual(self.get_replay_counts(p), replay_count)
-            err = p.tra_sa_in.get_err("replay")
+            err = p.tra_sa_in.get_err("replay") + initial_sa_node_replay_diff
             self.assertEqual(err, replay_count)
 
-        # valid packet moves the window over to 258
+        # valid packet moves the window over to anti_replay_window_size + 258
         pkt = Ether(
             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
         ) / p.scapy_tra_sa.encrypt(
             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
-            seq_num=258,
+            seq_num=anti_replay_window_size + 258,
         )
         rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
         decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
@@ -852,7 +869,7 @@
             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
 
             #
-            # A packet that has seq num between (2^32-64) and 5 is within
+            # A packet that has seq num between (2^32-anti_replay_window_size)+4 and 5 is within
             # the window
             #
             p.scapy_tra_sa.seq_num = 0xFFFFFFFD
@@ -883,19 +900,19 @@
             hash_failed_count += 1
             self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
             if hash_err != "":
-                err = p.tra_sa_in.get_err(hash_err)
+                err = p.tra_sa_in.get_err(hash_err) + initial_sa_node_hash_diff
                 self.assertEqual(err, hash_failed_count)
 
             #
             # but if we move the window forward to case B, then we can wrap
             # again
             #
-            p.scapy_tra_sa.seq_num = 0x100000555
+            p.scapy_tra_sa.seq_num = 0x100000000 + anti_replay_window_size + 0x555
             pkt = Ether(
                 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
             ) / p.scapy_tra_sa.encrypt(
                 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
-                seq_num=0x100000555,
+                seq_num=p.scapy_tra_sa.seq_num,
             )
             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
@@ -918,7 +935,7 @@
             self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
             seq_cycle_count += len(pkts)
             self.assert_error_counter_equal(seq_cycle_node_name, seq_cycle_count)
-            err = p.tra_sa_out.get_err("seq_cycled")
+            err = p.tra_sa_out.get_err("seq_cycled") + initial_sa_node_cycled_diff
             self.assertEqual(err, seq_cycle_count)
 
         # move the security-associations seq number on to the last we used
@@ -1068,6 +1085,768 @@
         self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count)
         self.assert_packet_counter_equal(self.tra4_decrypt_node_name[0], count)
 
+    def _verify_tra_anti_replay_algorithm_esn(self):
+        def seq_num(seqh, seql):
+            return (seqh << 32) | (seql & 0xFFFF_FFFF)
+
+        p = self.params[socket.AF_INET]
+        anti_replay_window_size = p.anti_replay_window_size
+
+        seq_cycle_node_name = "/err/%s/seq_cycled" % self.tra4_encrypt_node_name
+        replay_count = self.get_replay_counts(p)
+        hash_failed_count = self.get_hash_failed_counts(p)
+        seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
+
+        if ESP == self.encryption_type:
+            undersize_node_name = "/err/%s/runt" % self.tra4_decrypt_node_name[0]
+            undersize_count = self.statistics.get_err_counter(undersize_node_name)
+
+        # reset the TX SA to avoid conflict with left configuration
+        self.vapi.cli(f"test ipsec sa {p.vpp_tra_sa_id} seq 0x0")
+
+        """
+        RFC 4303 Appendix A2. Case A
+
+        |: new Th marker
+        a-i: possible seq num received
+        +: Bl, Tl, Bl', Tl'
+        [BT]l(sign) = [BT]l (sign) 2^32 mod 2^32 (Th inc/dec-remented by 1)
+
+                Th - 1               Th                Th + 1
+        --|--a--+---b---+-c--|--d--+---e---+-f--|--g--+---h---+--i-|--
+                =========          =========          =========
+                Bl-     Tl-        Bl      Tl         Bl+     Tl+
+
+        Case A implies Tl >= W - 1
+        """
+
+        Th = 1
+        Tl = anti_replay_window_size + 40
+        Bl = Tl - anti_replay_window_size + 1
+
+        # move VPP's RX AR window to Case A
+        self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Th, Tl):#x}")
+        p.scapy_tra_sa.seq_num = seq_num(Th, Tl)
+
+        """
+        case a: Seql < Bl
+            - pre-crypto check: algorithm predicts that the packet wrap the window
+                -> Seqh = Th + 1
+            - integrity check: should fail
+            - post-crypto check: ...
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th - 1, Bl - 20), seq_num(Th - 1, Bl - 5))
+        ]
+
+        # out-of-window packets fail integrity check
+        self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+        hash_failed_count += len(pkts)
+        self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+        """
+            case b: Bl <= Seql <= Tl
+                - pre-crypto check: algorithm predicts that the packet is in the window
+                    -> Seqh = Th
+                    -> check for a replayed packet with Seql
+                - integrity check: should fail
+                - post-crypto check: ...
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th, Tl - 10), seq_num(Th, Tl - 5))
+        ]
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        p.scapy_tra_sa.seq_num = seq_num(Th - 1, Tl)
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th - 1, Tl - 35), seq_num(Th - 1, Tl - 5))
+        ]
+
+        self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+
+        # some packets are rejected by the pre-crypto check
+        replay_count += 5
+        self.assertEqual(self.get_replay_counts(p), replay_count)
+
+        # out-of-window packets fail integrity check
+        hash_failed_count += len(pkts) - 5
+        self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+        """
+            case c: Seql > Tl
+                - pre-crypto check: algorithm predicts that the packet does not wrap the window
+                    -> Seqh = Th
+                - integrity check: should fail
+                - post-crypto check: ...
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th - 1, Tl + 5), seq_num(Th - 1, Tl + 20))
+        ]
+
+        self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+
+        # out-of-window packets fail integrity check
+        hash_failed_count += len(pkts)
+        self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+        """
+            case d: Seql < Bl
+                - pre-crypto check: algorithm predicts that the packet wrap the window
+                    -> Seqh = Th + 1
+                - integrity check: should fail
+                - post-crypto check: ...
+        """
+        p.scapy_tra_sa.seq_num = seq_num(Th, Tl)
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th, Bl - 20), seq_num(Th, Bl - 5))
+        ]
+
+        # out-of-window packets fail integrity check
+        self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+        hash_failed_count += len(pkts)
+        self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+        """
+            case e: Bl <= Seql <= Tl
+                - pre-crypto check: algorithm predicts that the packet is in the window
+                    -> Seqh = Th
+                    -> check for a replayed packet with Seql
+                - integrity check: should pass
+                - post-crypto check: should pass
+                    -> Seql is marked in the AR window
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th, Bl + 10), seq_num(Th, Bl + 30))
+        ]
+
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        """
+            case f: Seql > Tl
+                - pre-crypto check: algorithm predicts that the packet does not wrap the window
+                    -> Seqh = Th
+                - integrity check: should pass
+                - post-crypto check: should pass
+                    -> AR window shift (the window stays Case A)
+                    -> Seql is marked in the AR window
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th, Tl + 50), seq_num(Th, Tl + 60))
+        ]
+
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        """
+            case g: Seql < Bl
+                - pre-crypto check: algorithm predicts that the packet wrap the window
+                    -> Seqh = Th + 1
+                - integrity check: should pass
+                - post-crypto check: should pass
+                    -> AR window shift (may set the window in Case B)
+                    -> Seql is marked in the AR window
+        """
+        p.scapy_tra_sa.seq_num = seq_num(Th + 1, Tl)
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            # set the window in Case B (the minimum window size is 64
+            # so we are sure to overlap)
+            for seq in range(seq_num(Th + 1, 10), seq_num(Th + 1, 20))
+        ]
+
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        # reset the VPP's RX AR window to Case A
+        Th = 1
+        Tl = 2 * anti_replay_window_size + 40
+        Bl = Tl - anti_replay_window_size + 1
+
+        self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Th, Tl):#x}")
+
+        p.scapy_tra_sa.seq_num = seq_num(Th + 1, Tl)
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            # the AR will stay in Case A
+            for seq in range(
+                seq_num(Th + 1, anti_replay_window_size + 10),
+                seq_num(Th + 1, anti_replay_window_size + 20),
+            )
+        ]
+
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        """
+            case h: Bl <= Seql <= Tl
+                - pre-crypto check: algorithm predicts that the packet is in the window
+                    -> Seqh = Th
+                    -> check for a replayed packet with Seql
+                - integrity check: the wrap is not detected, should fail
+                - post-crypto check: ...
+        """
+        Th += 1
+        Tl = anti_replay_window_size + 20
+        Bl = Tl - anti_replay_window_size + 1
+
+        p.scapy_tra_sa.seq_num = seq_num(Th + 1, Tl)
+
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th + 1, Tl - 20), seq_num(Th + 1, Tl - 5))
+        ]
+
+        self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+
+        # some packets are rejected by the pre-crypto check
+        replay_count += 5
+        self.assertEqual(self.get_replay_counts(p), replay_count)
+
+        # out-of-window packets fail integrity check
+        hash_failed_count += len(pkts) - 5
+        self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+        """
+            case i: Seql > Tl
+                - pre-crypto check: algorithm predicts that the packet does not wrap the window
+                    -> Seqh = Th
+                - integrity check: the wrap is not detected, shoud fail
+                - post-crypto check: ...
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th + 1, Tl + 5), seq_num(Th + 1, Tl + 15))
+        ]
+
+        # out-of-window packets fail integrity check
+        self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+        hash_failed_count += len(pkts)
+        self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+        """
+            RFC 4303 Appendix A2. Case B
+
+                        Th - 1               Th                Th + 1
+            ----|-a-+-----b----+--c--|-d-+----e-----+--f--|-g-+--h---
+            =========          ===========          ===========
+                    Tl-        Bl       Tl          Bl+       Tl+
+
+            Case B implies Tl < W - 1
+        """
+
+        # reset the VPP's RX AR window to Case B
+        Th = 2
+        Tl = 30  # minimum window size of 64, we are sure to overlap
+        Bl = (Tl - anti_replay_window_size + 1) % (1 << 32)
+
+        self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Th, Tl):#x}")
+        p.scapy_tra_sa.seq_num = seq_num(Th, Tl)
+
+        """
+            case a: Seql <= Tl < Bl
+                - pre-crypto check: algorithm predicts that the packet is in the window
+                    -> Seqh = Th
+                    -> check for replayed packet
+                - integrity check: should fail
+                - post-crypto check: ...
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th, 5), seq_num(Th, 10))
+        ]
+
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        p.scapy_tra_sa.seq_num = seq_num(Th - 1, Tl)
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th - 1, 0), seq_num(Th - 1, 15))
+        ]
+
+        self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+
+        # some packets are rejected by the pre-crypto check
+        replay_count += 5
+        self.assertEqual(self.get_replay_counts(p), replay_count)
+
+        # out-of-window packets fail integrity check
+        hash_failed_count += len(pkts) - 5
+        self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+        """
+            case b: Tl < Seql < Bl
+                - pre-crypto check: algorithm predicts that the packet will shift the window
+                    -> Seqh = Th
+                - integrity check: should fail
+                - post-crypto check: ...
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th - 1, Tl + 10), seq_num(Th - 1, Tl + 20))
+        ]
+
+        self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+
+        # out-of-window packets fail integrity check
+        hash_failed_count += len(pkts)
+        self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+        """
+            case c: Tl < Bl <= Seql
+                - pre-crypto check: algorithm predicts that the packet is in the window
+                    -> Seqh = Th - 1
+                    -> check for a replayed packet with Seql
+                - integrity check: should pass
+                - post-crypto check: should pass
+                    -> Seql is marked in the AR window
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th - 1, Bl + 10), seq_num(Th - 1, Bl + 20))
+        ]
+
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        """
+            case d: Seql <= Tl < Bl
+                - pre-crypto check: algorithm predicts that the packet is the window
+                    -> Seqh = Th
+                    -> check for replayed packet
+                - integrity check: should pass
+                - post-crypto check: should pass
+                    -> Seql is marked in the AR window
+        """
+        p.scapy_tra_sa.seq_num = seq_num(Th, Tl)
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th, 15), seq_num(Th, 25))
+        ]
+
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        """
+            case e: Tl < Seql < Bl
+                - pre-crypto check: algorithm predicts that the packet is in the window
+                    -> Seqh = Th
+                    -> check for a replayed packet with Seql
+                - integrity check: should pass
+                - post-crypto check: should pass
+                    -> AR window shift (may set the window in Case A)
+                    -> Seql is marked in the AR window
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th, Tl + 5), seq_num(Th, Tl + 15))
+        ]
+
+        # the window stays in Case B
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(
+                seq_num(Th, Tl + anti_replay_window_size + 5),
+                seq_num(Th, Tl + anti_replay_window_size + 15),
+            )
+        ]
+
+        # the window moves to Case A
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        # reset the VPP's RX AR window to Case B
+        Th = 2
+        Tl = 30  # minimum window size of 64, we are sure to overlap
+        Bl = (Tl - anti_replay_window_size + 1) % (1 << 32)
+
+        self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Th, Tl):#x}")
+        p.scapy_tra_sa.seq_num = seq_num(Th, Tl)
+
+        """
+            case f: Tl < Bl <= Seql
+                - pre-crypto check: algorithm predicts that the packet is in the previous window
+                    -> Seqh = Th - 1
+                    -> check for a replayed packet with Seql
+                - integrity check: should fail
+                - post-crypto check: ...
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th, Bl + 10), seq_num(Th, Bl + 20))
+        ]
+
+        self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+
+        # out-of-window packets fail integrity check
+        hash_failed_count += len(pkts)
+        self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+        """
+            case g: Seql <= Tl < Bl
+                - pre-crypto check: algorithm predicts that the packet is the window
+                    -> Seqh = Th
+                    -> check for replayed packet
+                - integrity check: should fail
+                - post-crypto check: ...
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th, 10), seq_num(Th, 15))
+        ]
+
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        p.scapy_tra_sa.seq_num = seq_num(Th + 1, Tl)
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th + 1, 0), seq_num(Th + 1, 15))
+        ]
+
+        self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+
+        # some packets are rejected by the pre-crypto check
+        replay_count += 5
+        self.assertEqual(self.get_replay_counts(p), replay_count)
+
+        # out-of-window packets fail integrity check
+        hash_failed_count += len(pkts) - 5
+        self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+        """
+            case h: Tl < Seql < Bl
+                - pre-crypto check: algorithm predicts that the packet will shift the window
+                    -> Seqh = Th
+                - integrity check: should fail
+                - post-crypto check: ...
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th + 1, Tl + 10), seq_num(Th + 1, Tl + 20))
+        ]
+
+        self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+
+        # out-of-window packets fail integrity check
+        hash_failed_count += len(pkts)
+        self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+    def _verify_tra_anti_replay_algorithm_no_esn(self):
+        def seq_num(seql):
+            return seql & 0xFFFF_FFFF
+
+        p = self.params[socket.AF_INET]
+        anti_replay_window_size = p.anti_replay_window_size
+
+        seq_cycle_node_name = "/err/%s/seq_cycled" % self.tra4_encrypt_node_name
+        replay_count = self.get_replay_counts(p)
+        hash_failed_count = self.get_hash_failed_counts(p)
+        seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
+
+        if ESP == self.encryption_type:
+            undersize_node_name = "/err/%s/runt" % self.tra4_decrypt_node_name[0]
+            undersize_count = self.statistics.get_err_counter(undersize_node_name)
+
+        # reset the TX SA to avoid conflict with left configuration
+        self.vapi.cli(f"test ipsec sa {p.vpp_tra_sa_id} seq 0x0")
+
+        """
+        RFC 4303 Appendix A2. Case A
+
+        a-c: possible seq num received
+        +: Bl, Tl
+
+        |--a--+---b---+-c--|
+              =========
+              Bl      Tl
+
+        No ESN implies Th = 0
+        Case A implies Tl >= W - 1
+        """
+
+        Tl = anti_replay_window_size + 40
+        Bl = Tl - anti_replay_window_size + 1
+
+        # move VPP's RX AR window to Case A
+        self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Tl):#x}")
+        p.scapy_tra_sa.seq_num = seq_num(Tl)
+
+        """
+        case a: Seql < Bl
+            - pre-crypto check: algorithm predicts that the packet is out of window
+                -> packet should be dropped
+            - integrity check: ...
+            - post-crypto check: ...
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Bl - 20), seq_num(Bl - 5))
+        ]
+
+        # out-of-window packets
+        self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+        replay_count += len(pkts)
+        self.assertEqual(self.get_replay_counts(p), replay_count)
+
+        """
+            case b: Bl <= Seql <= Tl
+                - pre-crypto check: algorithm predicts that the packet is in the window
+                    -> check for a replayed packet with Seql
+                - integrity check: should pass
+                - post-crypto check:
+                    -> check for a replayed packet with Seql
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Tl - 50), seq_num(Tl - 30))
+        ]
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Tl - 35), seq_num(Tl - 30))
+        ]
+
+        self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+
+        # replayed packets
+        replay_count += 5
+        self.assertEqual(self.get_replay_counts(p), replay_count)
+
+        """
+            case c: Seql > Tl
+                - pre-crypto check: algorithm predicts that the packet will shift the window
+                - integrity check: should pass
+                - post-crypto check: should pass
+                    -> AR window is shifted
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Tl + 5), seq_num(Tl + 20))
+        ]
+
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        """
+            RFC 4303 Appendix A2. Case B
+
+            |-a-----+------b-----|
+            =========
+                    Tl
+
+            Case B implies Tl < W - 1
+        """
+
+        # reset the VPP's RX AR window to Case B
+        Tl = 30  # minimum window size of 64, we are sure to overlap
+        Bl = seq_num(Tl - anti_replay_window_size + 1)
+
+        self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Tl):#x}")
+
+        """
+        case a: Seql <= Tl < Bl
+            - pre-crypto check: algorithm predicts that the packet is in the window
+                -> check for replayed packet
+            - integrity check: should fail
+            - post-crypto check: ...
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(5), seq_num(10))
+        ]
+
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        """
+            case b: Tl < Seql < Bl
+                - pre-crypto check: algorithm predicts that the packet will shift the window
+                - integrity check: should pass
+                - post-crypto check: should pass
+                    -> AR window is shifted
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(-50), seq_num(-20))
+        ]
+
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+    def verify_tra_anti_replay_algorithm(self):
+        if self.params[socket.AF_INET].vpp_tra_sa.esn_en:
+            self._verify_tra_anti_replay_algorithm_esn()
+        else:
+            self._verify_tra_anti_replay_algorithm_no_esn()
+
 
 class IpsecTra4Tests(IpsecTra4):
     """UT test methods for Transport v4"""
@@ -1076,6 +1855,10 @@
         """ipsec v4 transport anti-replay test"""
         self.verify_tra_anti_replay()
 
+    def test_tra_anti_replay_algorithm(self):
+        """ipsec v4 transport anti-replay algorithm test"""
+        self.verify_tra_anti_replay_algorithm()
+
     def test_tra_lost(self):
         """ipsec v4 transport lost packet test"""
         self.verify_tra_lost()
diff --git a/test/test_ipsec_api.py b/test/test_ipsec_api.py
index 521762b..6e246f6 100644
--- a/test/test_ipsec_api.py
+++ b/test/test_ipsec_api.py
@@ -121,7 +121,7 @@
 
     def __check_sa_binding(self, sa_id, thread_index):
         found_sa = False
-        sa_dumps = self.vapi.ipsec_sa_v4_dump()
+        sa_dumps = self.vapi.ipsec_sa_v5_dump()
         for dump in sa_dumps:
             if dump.entry.sad_id == sa_id:
                 self.assertEqual(dump.thread_index, thread_index)
diff --git a/test/test_ipsec_esp.py b/test/test_ipsec_esp.py
index 927863c..fdd7eb8 100644
--- a/test/test_ipsec_esp.py
+++ b/test/test_ipsec_esp.py
@@ -62,10 +62,11 @@
     def tearDown(self):
         super(ConfigIpsecESP, self).tearDown()
 
-    def config_anti_replay(self, params):
+    def config_anti_replay(self, params, anti_replay_window_size=64):
         saf = VppEnum.vl_api_ipsec_sad_flags_t
         for p in params:
             p.flags |= saf.IPSEC_API_SAD_FLAG_USE_ANTI_REPLAY
+            p.anti_replay_window_size = anti_replay_window_size
 
     def config_network(self, params):
         self.net_objs = []
@@ -134,6 +135,7 @@
         flags = params.flags
         tun_flags = params.tun_flags
         salt = params.salt
+        anti_replay_window_size = params.anti_replay_window_size
         objs = []
 
         params.tun_sa_in = VppIpsecSA(
@@ -152,6 +154,7 @@
             flags=flags,
             salt=salt,
             hop_limit=params.outer_hop_limit,
+            anti_replay_window_size=anti_replay_window_size,
         )
         params.tun_sa_out = VppIpsecSA(
             self,
@@ -169,6 +172,7 @@
             flags=flags,
             salt=salt,
             hop_limit=params.outer_hop_limit,
+            anti_replay_window_size=anti_replay_window_size,
         )
         objs.append(params.tun_sa_in)
         objs.append(params.tun_sa_out)
@@ -274,6 +278,7 @@
         e = VppEnum.vl_api_ipsec_spd_action_t
         flags = params.flags
         salt = params.salt
+        anti_replay_window_size = params.anti_replay_window_size
         objs = []
 
         params.tra_sa_in = VppIpsecSA(
@@ -287,6 +292,7 @@
             self.vpp_esp_protocol,
             flags=flags,
             salt=salt,
+            anti_replay_window_size=anti_replay_window_size,
         )
         params.tra_sa_out = VppIpsecSA(
             self,
@@ -299,6 +305,7 @@
             self.vpp_esp_protocol,
             flags=flags,
             salt=salt,
+            anti_replay_window_size=anti_replay_window_size,
         )
         objs.append(params.tra_sa_in)
         objs.append(params.tra_sa_out)
@@ -1184,9 +1191,16 @@
         #
         saf = VppEnum.vl_api_ipsec_sad_flags_t
         if flag & saf.IPSEC_API_SAD_FLAG_USE_ANTI_REPLAY:
-            self.unconfig_network()
-            self.config_network(self.params.values())
-            self.verify_tra_anti_replay()
+            for anti_replay_window_size in (
+                64,
+                131072,
+            ):
+                self.unconfig_network()
+                self.config_anti_replay(self.params.values(), anti_replay_window_size)
+                self.config_network(self.params.values())
+                self.verify_tra_anti_replay()
+                self.verify_tra_anti_replay_algorithm()
+            self.config_anti_replay(self.params.values())
 
         self.unconfig_network()
         self.config_network(self.params.values())
diff --git a/test/vpp_ipsec.py b/test/vpp_ipsec.py
index 7a5a95a..e354cfc 100644
--- a/test/vpp_ipsec.py
+++ b/test/vpp_ipsec.py
@@ -218,6 +218,7 @@
         udp_src=None,
         udp_dst=None,
         hop_limit=None,
+        anti_replay_window_size=0,
     ):
         e = VppEnum.vl_api_ipsec_sad_flags_t
         self.test = test
@@ -229,6 +230,7 @@
         self.crypto_key = crypto_key
         self.proto = proto
         self.salt = salt
+        self.anti_replay_window_size = anti_replay_window_size
 
         self.table_id = 0
         self.tun_src = tun_src
@@ -284,13 +286,14 @@
             "tunnel": self.tunnel_encode(),
             "flags": self.flags,
             "salt": self.salt,
+            "anti_replay_window_size": self.anti_replay_window_size,
         }
         # don't explicitly send the defaults, let papi fill them in
         if self.udp_src:
             entry["udp_src_port"] = self.udp_src
         if self.udp_dst:
             entry["udp_dst_port"] = self.udp_dst
-        r = self.test.vapi.ipsec_sad_entry_add(entry=entry)
+        r = self.test.vapi.ipsec_sad_entry_add_v2(entry=entry)
         self.stat_index = r.stat_index
         self.test.registry.register(self, self.test.logger)
         return self
@@ -324,7 +327,7 @@
     def query_vpp_config(self):
         e = VppEnum.vl_api_ipsec_sad_flags_t
 
-        bs = self.test.vapi.ipsec_sa_v3_dump()
+        bs = self.test.vapi.ipsec_sa_v5_dump()
         for b in bs:
             if b.entry.sad_id == self.id:
                 # if udp encap is configured then the ports should match