ipsec: huge anti-replay window support

Type: improvement

Since RFC4303 does not specify the anti-replay window size, VPP should
support multiple window size. It is done through a clib_bitmap.

Signed-off-by: Maxime Peim <mpeim@cisco.com>
Change-Id: I3dfe30efd20018e345418bef298ec7cec19b1cfc
diff --git a/test/template_ipsec.py b/test/template_ipsec.py
index 5d83510..c438fbb 100644
--- a/test/template_ipsec.py
+++ b/test/template_ipsec.py
@@ -51,6 +51,8 @@
         self.outer_flow_label = 0
         self.inner_flow_label = 0x12345
 
+        self.anti_replay_window_size = 64
+
         self.auth_algo_vpp_id = (
             VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
         )
@@ -98,6 +100,8 @@
         self.outer_flow_label = 0
         self.inner_flow_label = 0x12345
 
+        self.anti_replay_window_size = 64
+
         self.auth_algo_vpp_id = (
             VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
         )
@@ -625,22 +629,34 @@
     def verify_tra_anti_replay(self):
         p = self.params[socket.AF_INET]
         esn_en = p.vpp_tra_sa.esn_en
+        anti_replay_window_size = p.anti_replay_window_size
 
         seq_cycle_node_name = "/err/%s/seq_cycled" % self.tra4_encrypt_node_name
         replay_count = self.get_replay_counts(p)
+        initial_sa_node_replay_diff = replay_count - p.tra_sa_in.get_err("replay")
         hash_failed_count = self.get_hash_failed_counts(p)
         seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
+        initial_sa_node_cycled_diff = seq_cycle_count - p.tra_sa_in.get_err(
+            "seq_cycled"
+        )
         hash_err = "integ_error"
 
         if ESP == self.encryption_type:
             undersize_node_name = "/err/%s/runt" % self.tra4_decrypt_node_name[0]
             undersize_count = self.statistics.get_err_counter(undersize_node_name)
+            initial_sa_node_undersize_diff = undersize_count - p.tra_sa_in.get_err(
+                "runt"
+            )
             # For AES-GCM an error in the hash is reported as a decryption failure
             if p.crypt_algo in ("AES-GCM", "AES-NULL-GMAC"):
                 hash_err = "decryption_failed"
         # In async mode, we don't report errors in the hash.
         if p.async_mode:
             hash_err = ""
+        else:
+            initial_sa_node_hash_diff = hash_failed_count - p.tra_sa_in.get_err(
+                hash_err
+            )
 
         #
         # send packets with seq numbers 1->34
@@ -666,7 +682,7 @@
         self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
         replay_count += len(pkts)
         self.assertEqual(self.get_replay_counts(p), replay_count)
-        err = p.tra_sa_in.get_err("replay")
+        err = p.tra_sa_in.get_err("replay") + initial_sa_node_replay_diff
         self.assertEqual(err, replay_count)
 
         #
@@ -684,29 +700,31 @@
         recv_pkts = self.send_and_expect(self.tra_if, pkts * 8, self.tra_if, n_rx=1)
         replay_count += 7
         self.assertEqual(self.get_replay_counts(p), replay_count)
-        err = p.tra_sa_in.get_err("replay")
+        err = p.tra_sa_in.get_err("replay") + initial_sa_node_replay_diff
         self.assertEqual(err, replay_count)
 
         #
-        # now move the window over to 257 (more than one byte) and into Case A
+        # now move the window over to anti_replay_window_size + 100 and into Case A
         #
         self.vapi.cli("clear error")
         pkt = Ether(
             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
         ) / p.scapy_tra_sa.encrypt(
             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
-            seq_num=257,
+            seq_num=anti_replay_window_size + 100,
         )
         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
 
+        self.logger.info(self.vapi.ppcli("show ipsec sa 1"))
+
         # replayed packets are dropped
         self.send_and_assert_no_replies(self.tra_if, pkt * 3, timeout=0.2)
         replay_count += 3
         self.assertEqual(self.get_replay_counts(p), replay_count)
-        err = p.tra_sa_in.get_err("replay")
+        err = p.tra_sa_in.get_err("replay") + initial_sa_node_replay_diff
         self.assertEqual(err, replay_count)
 
-        # the window size is 64 packets
+        # the window size is anti_replay_window_size packets
         # in window are still accepted
         pkt = Ether(
             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
@@ -714,7 +732,6 @@
             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
             seq_num=200,
         )
-        recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
 
         # a packet that does not decrypt does not move the window forward
         bogus_sa = SecurityAssociation(
@@ -729,14 +746,14 @@
             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
         ) / bogus_sa.encrypt(
             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
-            seq_num=350,
+            seq_num=anti_replay_window_size + 200,
         )
         self.send_and_assert_no_replies(self.tra_if, pkt * 17, timeout=0.2)
 
         hash_failed_count += 17
         self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
         if hash_err != "":
-            err = p.tra_sa_in.get_err(hash_err)
+            err = p.tra_sa_in.get_err(hash_err) + initial_sa_node_hash_diff
             self.assertEqual(err, hash_failed_count)
 
         # a malformed 'runt' packet
@@ -747,13 +764,13 @@
                 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
             ) / bogus_sa.encrypt(
                 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
-                seq_num=350,
+                seq_num=anti_replay_window_size + 200,
             )
             self.send_and_assert_no_replies(self.tra_if, pkt * 17, timeout=0.2)
 
             undersize_count += 17
             self.assert_error_counter_equal(undersize_node_name, undersize_count)
-            err = p.tra_sa_in.get_err("runt")
+            err = p.tra_sa_in.get_err("runt") + initial_sa_node_undersize_diff
             self.assertEqual(err, undersize_count)
 
         # which we can determine since this packet is still in the window
@@ -784,21 +801,21 @@
             hash_failed_count += 17
             self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
             if hash_err != "":
-                err = p.tra_sa_in.get_err(hash_err)
+                err = p.tra_sa_in.get_err(hash_err) + initial_sa_node_hash_diff
                 self.assertEqual(err, hash_failed_count)
 
         else:
             replay_count += 17
             self.assertEqual(self.get_replay_counts(p), replay_count)
-            err = p.tra_sa_in.get_err("replay")
+            err = p.tra_sa_in.get_err("replay") + initial_sa_node_replay_diff
             self.assertEqual(err, replay_count)
 
-        # valid packet moves the window over to 258
+        # valid packet moves the window over to anti_replay_window_size + 258
         pkt = Ether(
             src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
         ) / p.scapy_tra_sa.encrypt(
             IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
-            seq_num=258,
+            seq_num=anti_replay_window_size + 258,
         )
         rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
         decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
@@ -852,7 +869,7 @@
             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
 
             #
-            # A packet that has seq num between (2^32-64) and 5 is within
+            # A packet that has seq num between (2^32-anti_replay_window_size)+4 and 5 is within
             # the window
             #
             p.scapy_tra_sa.seq_num = 0xFFFFFFFD
@@ -883,19 +900,19 @@
             hash_failed_count += 1
             self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
             if hash_err != "":
-                err = p.tra_sa_in.get_err(hash_err)
+                err = p.tra_sa_in.get_err(hash_err) + initial_sa_node_hash_diff
                 self.assertEqual(err, hash_failed_count)
 
             #
             # but if we move the window forward to case B, then we can wrap
             # again
             #
-            p.scapy_tra_sa.seq_num = 0x100000555
+            p.scapy_tra_sa.seq_num = 0x100000000 + anti_replay_window_size + 0x555
             pkt = Ether(
                 src=self.tra_if.remote_mac, dst=self.tra_if.local_mac
             ) / p.scapy_tra_sa.encrypt(
                 IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
-                seq_num=0x100000555,
+                seq_num=p.scapy_tra_sa.seq_num,
             )
             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
@@ -918,7 +935,7 @@
             self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
             seq_cycle_count += len(pkts)
             self.assert_error_counter_equal(seq_cycle_node_name, seq_cycle_count)
-            err = p.tra_sa_out.get_err("seq_cycled")
+            err = p.tra_sa_out.get_err("seq_cycled") + initial_sa_node_cycled_diff
             self.assertEqual(err, seq_cycle_count)
 
         # move the security-associations seq number on to the last we used
@@ -1068,6 +1085,768 @@
         self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count)
         self.assert_packet_counter_equal(self.tra4_decrypt_node_name[0], count)
 
+    def _verify_tra_anti_replay_algorithm_esn(self):
+        def seq_num(seqh, seql):
+            return (seqh << 32) | (seql & 0xFFFF_FFFF)
+
+        p = self.params[socket.AF_INET]
+        anti_replay_window_size = p.anti_replay_window_size
+
+        seq_cycle_node_name = "/err/%s/seq_cycled" % self.tra4_encrypt_node_name
+        replay_count = self.get_replay_counts(p)
+        hash_failed_count = self.get_hash_failed_counts(p)
+        seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
+
+        if ESP == self.encryption_type:
+            undersize_node_name = "/err/%s/runt" % self.tra4_decrypt_node_name[0]
+            undersize_count = self.statistics.get_err_counter(undersize_node_name)
+
+        # reset the TX SA to avoid conflict with left configuration
+        self.vapi.cli(f"test ipsec sa {p.vpp_tra_sa_id} seq 0x0")
+
+        """
+        RFC 4303 Appendix A2. Case A
+
+        |: new Th marker
+        a-i: possible seq num received
+        +: Bl, Tl, Bl', Tl'
+        [BT]l(sign) = [BT]l (sign) 2^32 mod 2^32 (Th inc/dec-remented by 1)
+
+                Th - 1               Th                Th + 1
+        --|--a--+---b---+-c--|--d--+---e---+-f--|--g--+---h---+--i-|--
+                =========          =========          =========
+                Bl-     Tl-        Bl      Tl         Bl+     Tl+
+
+        Case A implies Tl >= W - 1
+        """
+
+        Th = 1
+        Tl = anti_replay_window_size + 40
+        Bl = Tl - anti_replay_window_size + 1
+
+        # move VPP's RX AR window to Case A
+        self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Th, Tl):#x}")
+        p.scapy_tra_sa.seq_num = seq_num(Th, Tl)
+
+        """
+        case a: Seql < Bl
+            - pre-crypto check: algorithm predicts that the packet wrap the window
+                -> Seqh = Th + 1
+            - integrity check: should fail
+            - post-crypto check: ...
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th - 1, Bl - 20), seq_num(Th - 1, Bl - 5))
+        ]
+
+        # out-of-window packets fail integrity check
+        self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+        hash_failed_count += len(pkts)
+        self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+        """
+            case b: Bl <= Seql <= Tl
+                - pre-crypto check: algorithm predicts that the packet is in the window
+                    -> Seqh = Th
+                    -> check for a replayed packet with Seql
+                - integrity check: should fail
+                - post-crypto check: ...
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th, Tl - 10), seq_num(Th, Tl - 5))
+        ]
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        p.scapy_tra_sa.seq_num = seq_num(Th - 1, Tl)
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th - 1, Tl - 35), seq_num(Th - 1, Tl - 5))
+        ]
+
+        self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+
+        # some packets are rejected by the pre-crypto check
+        replay_count += 5
+        self.assertEqual(self.get_replay_counts(p), replay_count)
+
+        # out-of-window packets fail integrity check
+        hash_failed_count += len(pkts) - 5
+        self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+        """
+            case c: Seql > Tl
+                - pre-crypto check: algorithm predicts that the packet does not wrap the window
+                    -> Seqh = Th
+                - integrity check: should fail
+                - post-crypto check: ...
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th - 1, Tl + 5), seq_num(Th - 1, Tl + 20))
+        ]
+
+        self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+
+        # out-of-window packets fail integrity check
+        hash_failed_count += len(pkts)
+        self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+        """
+            case d: Seql < Bl
+                - pre-crypto check: algorithm predicts that the packet wrap the window
+                    -> Seqh = Th + 1
+                - integrity check: should fail
+                - post-crypto check: ...
+        """
+        p.scapy_tra_sa.seq_num = seq_num(Th, Tl)
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th, Bl - 20), seq_num(Th, Bl - 5))
+        ]
+
+        # out-of-window packets fail integrity check
+        self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+        hash_failed_count += len(pkts)
+        self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+        """
+            case e: Bl <= Seql <= Tl
+                - pre-crypto check: algorithm predicts that the packet is in the window
+                    -> Seqh = Th
+                    -> check for a replayed packet with Seql
+                - integrity check: should pass
+                - post-crypto check: should pass
+                    -> Seql is marked in the AR window
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th, Bl + 10), seq_num(Th, Bl + 30))
+        ]
+
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        """
+            case f: Seql > Tl
+                - pre-crypto check: algorithm predicts that the packet does not wrap the window
+                    -> Seqh = Th
+                - integrity check: should pass
+                - post-crypto check: should pass
+                    -> AR window shift (the window stays Case A)
+                    -> Seql is marked in the AR window
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th, Tl + 50), seq_num(Th, Tl + 60))
+        ]
+
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        """
+            case g: Seql < Bl
+                - pre-crypto check: algorithm predicts that the packet wrap the window
+                    -> Seqh = Th + 1
+                - integrity check: should pass
+                - post-crypto check: should pass
+                    -> AR window shift (may set the window in Case B)
+                    -> Seql is marked in the AR window
+        """
+        p.scapy_tra_sa.seq_num = seq_num(Th + 1, Tl)
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            # set the window in Case B (the minimum window size is 64
+            # so we are sure to overlap)
+            for seq in range(seq_num(Th + 1, 10), seq_num(Th + 1, 20))
+        ]
+
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        # reset the VPP's RX AR window to Case A
+        Th = 1
+        Tl = 2 * anti_replay_window_size + 40
+        Bl = Tl - anti_replay_window_size + 1
+
+        self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Th, Tl):#x}")
+
+        p.scapy_tra_sa.seq_num = seq_num(Th + 1, Tl)
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            # the AR will stay in Case A
+            for seq in range(
+                seq_num(Th + 1, anti_replay_window_size + 10),
+                seq_num(Th + 1, anti_replay_window_size + 20),
+            )
+        ]
+
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        """
+            case h: Bl <= Seql <= Tl
+                - pre-crypto check: algorithm predicts that the packet is in the window
+                    -> Seqh = Th
+                    -> check for a replayed packet with Seql
+                - integrity check: the wrap is not detected, should fail
+                - post-crypto check: ...
+        """
+        Th += 1
+        Tl = anti_replay_window_size + 20
+        Bl = Tl - anti_replay_window_size + 1
+
+        p.scapy_tra_sa.seq_num = seq_num(Th + 1, Tl)
+
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th + 1, Tl - 20), seq_num(Th + 1, Tl - 5))
+        ]
+
+        self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+
+        # some packets are rejected by the pre-crypto check
+        replay_count += 5
+        self.assertEqual(self.get_replay_counts(p), replay_count)
+
+        # out-of-window packets fail integrity check
+        hash_failed_count += len(pkts) - 5
+        self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+        """
+            case i: Seql > Tl
+                - pre-crypto check: algorithm predicts that the packet does not wrap the window
+                    -> Seqh = Th
+                - integrity check: the wrap is not detected, shoud fail
+                - post-crypto check: ...
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th + 1, Tl + 5), seq_num(Th + 1, Tl + 15))
+        ]
+
+        # out-of-window packets fail integrity check
+        self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+        hash_failed_count += len(pkts)
+        self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+        """
+            RFC 4303 Appendix A2. Case B
+
+                        Th - 1               Th                Th + 1
+            ----|-a-+-----b----+--c--|-d-+----e-----+--f--|-g-+--h---
+            =========          ===========          ===========
+                    Tl-        Bl       Tl          Bl+       Tl+
+
+            Case B implies Tl < W - 1
+        """
+
+        # reset the VPP's RX AR window to Case B
+        Th = 2
+        Tl = 30  # minimum window size of 64, we are sure to overlap
+        Bl = (Tl - anti_replay_window_size + 1) % (1 << 32)
+
+        self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Th, Tl):#x}")
+        p.scapy_tra_sa.seq_num = seq_num(Th, Tl)
+
+        """
+            case a: Seql <= Tl < Bl
+                - pre-crypto check: algorithm predicts that the packet is in the window
+                    -> Seqh = Th
+                    -> check for replayed packet
+                - integrity check: should fail
+                - post-crypto check: ...
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th, 5), seq_num(Th, 10))
+        ]
+
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        p.scapy_tra_sa.seq_num = seq_num(Th - 1, Tl)
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th - 1, 0), seq_num(Th - 1, 15))
+        ]
+
+        self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+
+        # some packets are rejected by the pre-crypto check
+        replay_count += 5
+        self.assertEqual(self.get_replay_counts(p), replay_count)
+
+        # out-of-window packets fail integrity check
+        hash_failed_count += len(pkts) - 5
+        self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+        """
+            case b: Tl < Seql < Bl
+                - pre-crypto check: algorithm predicts that the packet will shift the window
+                    -> Seqh = Th
+                - integrity check: should fail
+                - post-crypto check: ...
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th - 1, Tl + 10), seq_num(Th - 1, Tl + 20))
+        ]
+
+        self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+
+        # out-of-window packets fail integrity check
+        hash_failed_count += len(pkts)
+        self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+        """
+            case c: Tl < Bl <= Seql
+                - pre-crypto check: algorithm predicts that the packet is in the window
+                    -> Seqh = Th - 1
+                    -> check for a replayed packet with Seql
+                - integrity check: should pass
+                - post-crypto check: should pass
+                    -> Seql is marked in the AR window
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th - 1, Bl + 10), seq_num(Th - 1, Bl + 20))
+        ]
+
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        """
+            case d: Seql <= Tl < Bl
+                - pre-crypto check: algorithm predicts that the packet is the window
+                    -> Seqh = Th
+                    -> check for replayed packet
+                - integrity check: should pass
+                - post-crypto check: should pass
+                    -> Seql is marked in the AR window
+        """
+        p.scapy_tra_sa.seq_num = seq_num(Th, Tl)
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th, 15), seq_num(Th, 25))
+        ]
+
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        """
+            case e: Tl < Seql < Bl
+                - pre-crypto check: algorithm predicts that the packet is in the window
+                    -> Seqh = Th
+                    -> check for a replayed packet with Seql
+                - integrity check: should pass
+                - post-crypto check: should pass
+                    -> AR window shift (may set the window in Case A)
+                    -> Seql is marked in the AR window
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th, Tl + 5), seq_num(Th, Tl + 15))
+        ]
+
+        # the window stays in Case B
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(
+                seq_num(Th, Tl + anti_replay_window_size + 5),
+                seq_num(Th, Tl + anti_replay_window_size + 15),
+            )
+        ]
+
+        # the window moves to Case A
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        # reset the VPP's RX AR window to Case B
+        Th = 2
+        Tl = 30  # minimum window size of 64, we are sure to overlap
+        Bl = (Tl - anti_replay_window_size + 1) % (1 << 32)
+
+        self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Th, Tl):#x}")
+        p.scapy_tra_sa.seq_num = seq_num(Th, Tl)
+
+        """
+            case f: Tl < Bl <= Seql
+                - pre-crypto check: algorithm predicts that the packet is in the previous window
+                    -> Seqh = Th - 1
+                    -> check for a replayed packet with Seql
+                - integrity check: should fail
+                - post-crypto check: ...
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th, Bl + 10), seq_num(Th, Bl + 20))
+        ]
+
+        self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+
+        # out-of-window packets fail integrity check
+        hash_failed_count += len(pkts)
+        self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+        """
+            case g: Seql <= Tl < Bl
+                - pre-crypto check: algorithm predicts that the packet is the window
+                    -> Seqh = Th
+                    -> check for replayed packet
+                - integrity check: should fail
+                - post-crypto check: ...
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th, 10), seq_num(Th, 15))
+        ]
+
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        p.scapy_tra_sa.seq_num = seq_num(Th + 1, Tl)
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th + 1, 0), seq_num(Th + 1, 15))
+        ]
+
+        self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+
+        # some packets are rejected by the pre-crypto check
+        replay_count += 5
+        self.assertEqual(self.get_replay_counts(p), replay_count)
+
+        # out-of-window packets fail integrity check
+        hash_failed_count += len(pkts) - 5
+        self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+        """
+            case h: Tl < Seql < Bl
+                - pre-crypto check: algorithm predicts that the packet will shift the window
+                    -> Seqh = Th
+                - integrity check: should fail
+                - post-crypto check: ...
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Th + 1, Tl + 10), seq_num(Th + 1, Tl + 20))
+        ]
+
+        self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+
+        # out-of-window packets fail integrity check
+        hash_failed_count += len(pkts)
+        self.assertEqual(self.get_hash_failed_counts(p), hash_failed_count)
+
+    def _verify_tra_anti_replay_algorithm_no_esn(self):
+        def seq_num(seql):
+            return seql & 0xFFFF_FFFF
+
+        p = self.params[socket.AF_INET]
+        anti_replay_window_size = p.anti_replay_window_size
+
+        seq_cycle_node_name = "/err/%s/seq_cycled" % self.tra4_encrypt_node_name
+        replay_count = self.get_replay_counts(p)
+        hash_failed_count = self.get_hash_failed_counts(p)
+        seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
+
+        if ESP == self.encryption_type:
+            undersize_node_name = "/err/%s/runt" % self.tra4_decrypt_node_name[0]
+            undersize_count = self.statistics.get_err_counter(undersize_node_name)
+
+        # reset the TX SA to avoid conflict with left configuration
+        self.vapi.cli(f"test ipsec sa {p.vpp_tra_sa_id} seq 0x0")
+
+        """
+        RFC 4303 Appendix A2. Case A
+
+        a-c: possible seq num received
+        +: Bl, Tl
+
+        |--a--+---b---+-c--|
+              =========
+              Bl      Tl
+
+        No ESN implies Th = 0
+        Case A implies Tl >= W - 1
+        """
+
+        Tl = anti_replay_window_size + 40
+        Bl = Tl - anti_replay_window_size + 1
+
+        # move VPP's RX AR window to Case A
+        self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Tl):#x}")
+        p.scapy_tra_sa.seq_num = seq_num(Tl)
+
+        """
+        case a: Seql < Bl
+            - pre-crypto check: algorithm predicts that the packet is out of window
+                -> packet should be dropped
+            - integrity check: ...
+            - post-crypto check: ...
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Bl - 20), seq_num(Bl - 5))
+        ]
+
+        # out-of-window packets
+        self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+        replay_count += len(pkts)
+        self.assertEqual(self.get_replay_counts(p), replay_count)
+
+        """
+            case b: Bl <= Seql <= Tl
+                - pre-crypto check: algorithm predicts that the packet is in the window
+                    -> check for a replayed packet with Seql
+                - integrity check: should pass
+                - post-crypto check:
+                    -> check for a replayed packet with Seql
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Tl - 50), seq_num(Tl - 30))
+        ]
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Tl - 35), seq_num(Tl - 30))
+        ]
+
+        self.send_and_assert_no_replies(self.tra_if, pkts, timeout=0.2)
+
+        # replayed packets
+        replay_count += 5
+        self.assertEqual(self.get_replay_counts(p), replay_count)
+
+        """
+            case c: Seql > Tl
+                - pre-crypto check: algorithm predicts that the packet will shift the window
+                - integrity check: should pass
+                - post-crypto check: should pass
+                    -> AR window is shifted
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(Tl + 5), seq_num(Tl + 20))
+        ]
+
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        """
+            RFC 4303 Appendix A2. Case B
+
+            |-a-----+------b-----|
+            =========
+                    Tl
+
+            Case B implies Tl < W - 1
+        """
+
+        # reset the VPP's RX AR window to Case B
+        Tl = 30  # minimum window size of 64, we are sure to overlap
+        Bl = seq_num(Tl - anti_replay_window_size + 1)
+
+        self.vapi.cli(f"test ipsec sa {p.scapy_tra_sa_id} seq {seq_num(Tl):#x}")
+
+        """
+        case a: Seql <= Tl < Bl
+            - pre-crypto check: algorithm predicts that the packet is in the window
+                -> check for replayed packet
+            - integrity check: should fail
+            - post-crypto check: ...
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(5), seq_num(10))
+        ]
+
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+        """
+            case b: Tl < Seql < Bl
+                - pre-crypto check: algorithm predicts that the packet will shift the window
+                - integrity check: should pass
+                - post-crypto check: should pass
+                    -> AR window is shifted
+        """
+        pkts = [
+            (
+                Ether(src=self.tra_if.remote_mac, dst=self.tra_if.local_mac)
+                / p.scapy_tra_sa.encrypt(
+                    IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) / ICMP(),
+                    seq_num=seq,
+                )
+            )
+            for seq in range(seq_num(-50), seq_num(-20))
+        ]
+
+        self.send_and_expect(self.tra_if, pkts, self.tra_if)
+
+    def verify_tra_anti_replay_algorithm(self):
+        if self.params[socket.AF_INET].vpp_tra_sa.esn_en:
+            self._verify_tra_anti_replay_algorithm_esn()
+        else:
+            self._verify_tra_anti_replay_algorithm_no_esn()
+
 
 class IpsecTra4Tests(IpsecTra4):
     """UT test methods for Transport v4"""
@@ -1076,6 +1855,10 @@
         """ipsec v4 transport anti-replay test"""
         self.verify_tra_anti_replay()
 
+    def test_tra_anti_replay_algorithm(self):
+        """ipsec v4 transport anti-replay algorithm test"""
+        self.verify_tra_anti_replay_algorithm()
+
     def test_tra_lost(self):
         """ipsec v4 transport lost packet test"""
         self.verify_tra_lost()