session: try to coalesce ct accept rpcs

Type: improvement

Signed-off-by: Florin Coras <fcoras@cisco.com>
Change-Id: I11de851949afd90a37c102ed0c00969a4cc73df4
diff --git a/src/vnet/session/application_local.c b/src/vnet/session/application_local.c
index 3a1d48b..db8d721 100644
--- a/src/vnet/session/application_local.c
+++ b/src/vnet/session/application_local.c
@@ -49,7 +49,9 @@
 typedef struct ct_worker_
 {
   ct_connection_t *connections;	      /**< Per-worker connection pools */
+  u32 *pending_connects;	      /**< Fifo of pending ho indices */
   ct_cleanup_req_t *pending_cleanups; /**< Fifo of pending indices */
+  u8 have_connects;		      /**< Set if connect rpc pending */
   u8 have_cleanups;		      /**< Set if cleanup rpc pending */
 } ct_worker_t;
 
@@ -631,22 +633,20 @@
 }
 
 static void
-ct_accept_rpc_wrk_handler (void *accept_args)
+ct_accept_one (u32 thread_index, u32 ho_index)
 {
-  u32 cct_index, ho_index, thread_index, ll_index;
   ct_connection_t *sct, *cct, *ho;
   transport_connection_t *ll_ct;
   app_worker_t *server_wrk;
+  u32 cct_index, ll_index;
   session_t *ss, *ll;
 
   /*
    * Alloc client ct and initialize from ho
    */
-  thread_index = vlib_get_thread_index ();
   cct = ct_connection_alloc (thread_index);
   cct_index = cct->c_c_index;
 
-  ho_index = pointer_to_uword (accept_args);
   ho = ct_connection_get (ho_index, 0);
 
   /* Unlikely but half-open session and transport could have been freed */
@@ -738,6 +738,34 @@
     }
 }
 
+static void
+ct_accept_rpc_wrk_handler (void *rpc_args)
+{
+  u32 thread_index, ho_index, n_connects, i, n_pending;
+  const u32 max_connects = 32;
+  ct_worker_t *wrk;
+
+  thread_index = pointer_to_uword (rpc_args);
+  wrk = ct_worker_get (thread_index);
+
+  /* Sub without lock as main enqueues with worker barrier */
+  n_pending = clib_fifo_elts (wrk->pending_connects);
+  n_connects = clib_min (n_pending, max_connects);
+
+  for (i = 0; i < n_connects; i++)
+    {
+      clib_fifo_sub1 (wrk->pending_connects, ho_index);
+      ct_accept_one (thread_index, ho_index);
+    }
+
+  if (n_pending == n_connects)
+    wrk->have_connects = 0;
+  else
+    session_send_rpc_evt_to_thread_force (
+      thread_index, ct_accept_rpc_wrk_handler,
+      uword_to_pointer (thread_index, void *));
+}
+
 static int
 ct_connect (app_worker_t * client_wrk, session_t * ll,
 	    session_endpoint_cfg_t * sep)
@@ -745,6 +773,7 @@
   u32 thread_index, ho_index;
   ct_main_t *cm = &ct_main;
   ct_connection_t *ho;
+  ct_worker_t *wrk;
 
   /* Simple round-robin policy for spreading sessions over workers. We skip
    * thread index 0, i.e., offset the index by 1, when we have workers as it
@@ -777,9 +806,17 @@
    * after server accepts the connection.
    */
 
-  session_send_rpc_evt_to_thread_force (thread_index,
-					ct_accept_rpc_wrk_handler,
-					uword_to_pointer (ho_index, void *));
+  wrk = ct_worker_get (thread_index);
+
+  /* Worker barrier held, add without additional lock */
+  clib_fifo_add1 (wrk->pending_connects, ho_index);
+  if (!wrk->have_connects)
+    {
+      wrk->have_connects = 1;
+      session_send_rpc_evt_to_thread_force (
+	thread_index, ct_accept_rpc_wrk_handler,
+	uword_to_pointer (thread_index, void *));
+    }
 
   return ho_index;
 }