nat: add ipfix rate-limiter for nat44-ed, nat44-ei and nat64

This prevents ipfix flood with the repeating events and allows
to enable nat64 max_session and max_bibs events. Also fix wrong
endian for det44 and nat64 ipfix tests, now should be fine with
extended tests enabled.
Max session per user event @ nat44-ei requires more precise rate
limiter per user address, probably with sparse vec, not handled.

Type: improvement
Signed-off-by: Vladislav Grishenko <themiron@yandex-team.ru>
Change-Id: Ib20cc1ee3f81e7acc88a415fe83b4e2deae2a836
diff --git a/src/plugins/nat/lib/ipfix_logging.c b/src/plugins/nat/lib/ipfix_logging.c
index 50dbb2b..5f40db8 100644
--- a/src/plugins/nat/lib/ipfix_logging.c
+++ b/src/plugins/nat/lib/ipfix_logging.c
@@ -1319,9 +1319,23 @@
 void
 nat_ipfix_logging_addresses_exhausted (u32 thread_index, u32 pool_id)
 {
-  //TODO: This event SHOULD be rate limited
+  nat_ipfix_logging_main_t *silm = &nat_ipfix_logging_main;
+  static f64 *last_sent = 0;
+
   skip_if_disabled ();
 
+  /* TODO: make rate configurable, use 1pps so far */
+  clib_spinlock_lock_if_init (&silm->addr_exhausted_lock);
+  f64 now = vlib_time_now (vlib_get_main ());
+  vec_validate (last_sent, pool_id);
+  if (now < last_sent[pool_id] + 1.0)
+    {
+      clib_spinlock_unlock_if_init (&silm->addr_exhausted_lock);
+      return;
+    }
+  last_sent[pool_id] = now;
+  clib_spinlock_unlock_if_init (&silm->addr_exhausted_lock);
+
   nat_ipfix_logging_addr_exhausted (thread_index, pool_id, 0);
 }
 
@@ -1362,9 +1376,22 @@
 void
 nat_ipfix_logging_max_sessions (u32 thread_index, u32 limit)
 {
-  //TODO: This event SHOULD be rate limited
+  nat_ipfix_logging_main_t *silm = &nat_ipfix_logging_main;
+  static f64 last_sent = 0;
+
   skip_if_disabled ();
 
+  /* TODO: make rate configurable, use 1pps so far */
+  clib_spinlock_lock_if_init (&silm->max_sessions_lock);
+  f64 now = vlib_time_now (vlib_get_main ());
+  if (now < last_sent + 1.0)
+    {
+      clib_spinlock_unlock_if_init (&silm->max_sessions_lock);
+      return;
+    }
+  last_sent = now;
+  clib_spinlock_unlock_if_init (&silm->max_sessions_lock);
+
   nat_ipfix_logging_max_ses (thread_index, limit, 0);
 }
 
@@ -1377,9 +1404,22 @@
 void
 nat_ipfix_logging_max_bibs (u32 thread_index, u32 limit)
 {
-  //TODO: This event SHOULD be rate limited
+  nat_ipfix_logging_main_t *silm = &nat_ipfix_logging_main;
+  static f64 last_sent = 0;
+
   skip_if_disabled ();
 
+  /* TODO: make rate configurable, use 1pps so far */
+  clib_spinlock_lock_if_init (&silm->max_bibs_lock);
+  f64 now = vlib_time_now (vlib_get_main ());
+  if (now < last_sent + 1.0)
+    {
+      clib_spinlock_unlock_if_init (&silm->max_bibs_lock);
+      return;
+    }
+  last_sent = now;
+  clib_spinlock_unlock_if_init (&silm->max_bibs_lock);
+
   nat_ipfix_logging_max_bib (thread_index, limit, 0);
 }
 
@@ -1574,6 +1614,11 @@
   silm->milisecond_time_0 = unix_time_now_nsec () * 1e-6;
 
   vec_validate (silm->per_thread_data, tm->n_vlib_mains - 1);
+
+  /* Set up rate-limit */
+  clib_spinlock_init (&silm->addr_exhausted_lock);
+  clib_spinlock_init (&silm->max_sessions_lock);
+  clib_spinlock_init (&silm->max_bibs_lock);
 }
 
 static uword
diff --git a/src/plugins/nat/lib/ipfix_logging.h b/src/plugins/nat/lib/ipfix_logging.h
index 0b8f568..dc7927a 100644
--- a/src/plugins/nat/lib/ipfix_logging.h
+++ b/src/plugins/nat/lib/ipfix_logging.h
@@ -108,6 +108,10 @@
   /** nat data callbacks call counter */
   u16 call_counter;
 
+  /** rate-limit locks */
+  clib_spinlock_t addr_exhausted_lock;
+  clib_spinlock_t max_sessions_lock;
+  clib_spinlock_t max_bibs_lock;
 } nat_ipfix_logging_main_t;
 
 extern nat_ipfix_logging_main_t nat_ipfix_logging_main;
diff --git a/src/plugins/nat/nat64/nat64_db.c b/src/plugins/nat/nat64/nat64_db.c
index 82ef70d..24f7f57 100644
--- a/src/plugins/nat/nat64/nat64_db.c
+++ b/src/plugins/nat/nat64/nat64_db.c
@@ -82,7 +82,7 @@
   if (db->bib.bib_entries_num >= db->bib.limit)
     {
       db->free_addr_port_cb (db, out_addr, out_port, proto);
-      //nat_ipfix_logging_max_bibs (thread_index, db->bib.limit);
+      nat_ipfix_logging_max_bibs (thread_index, db->bib.limit);
       return 0;
     }
 
@@ -401,7 +401,7 @@
 
   if (db->st.st_entries_num >= db->st.limit)
     {
-      //nat_ipfix_logging_max_sessions (thread_index, db->st.limit);
+      nat_ipfix_logging_max_sessions (thread_index, db->st.limit);
       return 0;
     }
 
diff --git a/test/test_det44.py b/test/test_det44.py
index ecd57a6..ede8098 100644
--- a/test/test_det44.py
+++ b/test/test_det44.py
@@ -98,9 +98,9 @@
         # natEvent
         self.assertEqual(scapy.compat.orb(record[230]), 13)
         # natQuotaExceededEvent
-        self.assertEqual(struct.pack("I", 3), record[466])
+        self.assertEqual(struct.pack("!I", 3), record[466])
         # maxEntriesPerUser
-        self.assertEqual(struct.pack("I", limit), record[473])
+        self.assertEqual(struct.pack("!I", limit), record[473])
         # sourceIPv4Address
         self.assertEqual(socket.inet_pton(socket.AF_INET, src_addr), record[8])
 
@@ -724,7 +724,7 @@
 
         # verify IPFIX logging
         self.vapi.ipfix_flush()
-        capture = self.pg2.get_capture(2)
+        capture = self.pg2.get_capture(7)
         ipfix = IPFIXDecoder()
         # first load template
         for p in capture:
diff --git a/test/test_nat44_ei.py b/test/test_nat44_ei.py
index 15eb70a..dc74d03 100644
--- a/test/test_nat44_ei.py
+++ b/test/test_nat44_ei.py
@@ -663,6 +663,7 @@
         self.assertEqual(scapy.compat.orb(record[230]), 3)
         # natPoolID
         self.assertEqual(struct.pack("!I", 0), record[283])
+        return len(data)
 
     def verify_ipfix_max_sessions(self, data, limit):
         self.assertEqual(1, len(data))
@@ -673,6 +674,7 @@
         self.assertEqual(struct.pack("!I", 1), record[466])
         # maxSessionEntries
         self.assertEqual(struct.pack("!I", limit), record[471])
+        return len(data)
 
     def verify_no_nat44_user(self):
         """Verify that there is no NAT44EI user"""
@@ -2463,7 +2465,7 @@
             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
             / TCP(sport=3025)
-        )
+        ) * 3
         self.pg0.add_stream(p)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
@@ -2482,10 +2484,12 @@
             if p.haslayer(Template):
                 ipfix.add_template(p.getlayer(Template))
         # verify events in data set
+        event_count = 0
         for p in capture:
             if p.haslayer(Data):
                 data = ipfix.decode_data_set(p.getlayer(Set))
-                self.verify_ipfix_addr_exhausted(data)
+                event_count += self.verify_ipfix_addr_exhausted(data)
+        self.assertEqual(event_count, 1)
 
     def test_ipfix_max_sessions(self):
         """NAT44EI IPFIX logging maximum session entries exceeded"""
@@ -2529,7 +2533,7 @@
             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
             / TCP(sport=1025)
-        )
+        ) * 3
         self.pg0.add_stream(p)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
@@ -2548,10 +2552,14 @@
             if p.haslayer(Template):
                 ipfix.add_template(p.getlayer(Template))
         # verify events in data set
+        event_count = 0
         for p in capture:
             if p.haslayer(Data):
                 data = ipfix.decode_data_set(p.getlayer(Set))
-                self.verify_ipfix_max_sessions(data, max_sessions_per_thread)
+                event_count += self.verify_ipfix_max_sessions(
+                    data, max_sessions_per_thread
+                )
+        self.assertEqual(event_count, 1)
 
     def test_syslog_apmap(self):
         """NAT44EI syslog address and port mapping creation and deletion"""
diff --git a/test/test_nat64.py b/test/test_nat64.py
index 05a2031..4ae9f66 100644
--- a/test/test_nat64.py
+++ b/test/test_nat64.py
@@ -462,9 +462,10 @@
         # natEvent
         self.assertEqual(scapy.compat.orb(record[230]), 13)
         # natQuotaExceededEvent
-        self.assertEqual(struct.pack("I", 2), record[466])
+        self.assertEqual(struct.pack("!I", 2), record[466])
         # maxBIBEntries
-        self.assertEqual(struct.pack("I", limit), record[472])
+        self.assertEqual(struct.pack("!I", limit), record[472])
+        return len(data)
 
     def verify_ipfix_bib(self, data, is_create, src_addr):
         """
@@ -622,9 +623,10 @@
         # natEvent
         self.assertEqual(scapy.compat.orb(record[230]), 13)
         # natQuotaExceededEvent
-        self.assertEqual(struct.pack("I", 1), record[466])
+        self.assertEqual(struct.pack("!I", 1), record[466])
         # maxSessionEntries
-        self.assertEqual(struct.pack("I", limit), record[471])
+        self.assertEqual(struct.pack("!I", limit), record[471])
+        return len(data)
 
     def test_nat64_inside_interface_handles_neighbor_advertisement(self):
         """NAT64 inside interface handles Neighbor Advertisement"""
@@ -1859,7 +1861,7 @@
             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
             / IPv6(src=src, dst=remote_host_ip6)
             / TCP(sport=12345, dport=25)
-        )
+        ) * 3
         self.pg0.add_stream(p)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
@@ -1878,16 +1880,18 @@
             if p.haslayer(Template):
                 ipfix.add_template(p.getlayer(Template))
         # verify events in data set
+        event_count = 0
         for p in capture:
             if p.haslayer(Data):
                 data = ipfix.decode_data_set(p.getlayer(Set))
-                self.verify_ipfix_max_sessions(data, max_sessions)
+                event_count += self.verify_ipfix_max_sessions(data, max_sessions)
+        self.assertEqual(event_count, 1)
 
         p = (
             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
             / IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6)
             / TCP(sport=12345, dport=80)
-        )
+        ) * 3
         self.pg0.add_stream(p)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
@@ -1895,6 +1899,7 @@
         self.vapi.ipfix_flush()
         capture = self.pg3.get_capture(1)
         # verify events in data set
+        event_count = 0
         for p in capture:
             self.assertTrue(p.haslayer(IPFIX))
             self.assertEqual(p[IP].src, self.pg3.local_ip4)
@@ -1904,7 +1909,8 @@
             self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id)
             if p.haslayer(Data):
                 data = ipfix.decode_data_set(p.getlayer(Set))
-                self.verify_ipfix_max_bibs(data, max_bibs)
+                event_count += self.verify_ipfix_max_bibs(data, max_bibs)
+        self.assertEqual(event_count, 1)
 
     def test_ipfix_bib_ses(self):
         """IPFIX logging NAT64 BIB/session create and delete events"""