session: postpone ct cleanups

Add infra to postpone cleanups while tx events are not delivered.

Type: improvement

Signed-off-by: Florin Coras <fcoras@cisco.com>
Change-Id: I7611ac2442116f71a229569a7e274eb58eb84546
diff --git a/src/vnet/session/application_local.c b/src/vnet/session/application_local.c
index 0abf03d..88536dc 100644
--- a/src/vnet/session/application_local.c
+++ b/src/vnet/session/application_local.c
@@ -41,9 +41,16 @@
   ct_segment_t *segments;
 } ct_segments_ctx_t;
 
+typedef struct ct_cleanup_req_
+{
+  u32 ct_index;
+} ct_cleanup_req_t;
+
 typedef struct ct_main_
 {
   ct_connection_t **connections;	/**< Per-worker connection pools */
+  ct_cleanup_req_t **pending_cleanups;
+  u8 *heave_cleanups;
   u32 n_workers;			/**< Number of vpp workers */
   u32 n_sessions;			/**< Cumulative sessions counter */
   u32 *ho_reusable;			/**< Vector of reusable ho indices */
@@ -864,10 +871,102 @@
 }
 
 static void
+ct_session_postponed_cleanup (ct_connection_t *ct)
+{
+  app_worker_t *app_wrk;
+  session_t *s;
+
+  s = session_get (ct->c_s_index, ct->c_thread_index);
+  app_wrk = app_worker_get_if_valid (s->app_wrk_index);
+
+  if (ct->flags & CT_CONN_F_CLIENT)
+    {
+      if (app_wrk)
+	app_worker_cleanup_notify (app_wrk, s, SESSION_CLEANUP_TRANSPORT);
+
+      /* Normal free for client session as the fifos are allocated through
+       * the connects segment manager in a segment that's not shared with
+       * the server */
+      session_free_w_fifos (s);
+      ct_session_dealloc_fifos (ct, ct->client_rx_fifo, ct->client_tx_fifo);
+    }
+  else
+    {
+      /* Manual session and fifo segment cleanup to avoid implicit
+       * segment manager cleanups and notifications */
+      app_wrk = app_worker_get_if_valid (s->app_wrk_index);
+      if (app_wrk)
+	{
+	  app_worker_cleanup_notify (app_wrk, s, SESSION_CLEANUP_TRANSPORT);
+	  app_worker_cleanup_notify (app_wrk, s, SESSION_CLEANUP_SESSION);
+	}
+
+      ct_session_dealloc_fifos (ct, s->rx_fifo, s->tx_fifo);
+      session_free (s);
+    }
+
+  ct_connection_free (ct);
+}
+
+static void
+ct_handle_cleanups (void *args)
+{
+  uword thread_index = pointer_to_uword (args);
+  const u32 max_cleanups = 100;
+  ct_main_t *cm = &ct_main;
+  ct_cleanup_req_t *req;
+  ct_connection_t *ct;
+  u32 n_to_handle = 0;
+  session_t *s;
+
+  cm->heave_cleanups[thread_index] = 0;
+  n_to_handle = clib_fifo_elts (cm->pending_cleanups[thread_index]);
+  n_to_handle = clib_min (n_to_handle, max_cleanups);
+
+  while (n_to_handle)
+    {
+      clib_fifo_sub2 (cm->pending_cleanups[thread_index], req);
+      ct = ct_connection_get (req->ct_index, thread_index);
+      s = session_get (ct->c_s_index, ct->c_thread_index);
+      if (!svm_fifo_has_event (s->tx_fifo))
+	ct_session_postponed_cleanup (ct);
+      else
+	clib_fifo_add1 (cm->pending_cleanups[thread_index], *req);
+      n_to_handle -= 1;
+    }
+
+  if (clib_fifo_elts (cm->pending_cleanups[thread_index]))
+    {
+      cm->heave_cleanups[thread_index] = 1;
+      session_send_rpc_evt_to_thread_force (
+	thread_index, ct_handle_cleanups,
+	uword_to_pointer (thread_index, void *));
+    }
+}
+
+static void
+ct_program_cleanup (ct_connection_t *ct)
+{
+  ct_main_t *cm = &ct_main;
+  ct_cleanup_req_t *req;
+  uword thread_index;
+
+  thread_index = ct->c_thread_index;
+  clib_fifo_add2 (cm->pending_cleanups[thread_index], req);
+  req->ct_index = ct->c_c_index;
+
+  if (cm->heave_cleanups[thread_index])
+    return;
+
+  cm->heave_cleanups[thread_index] = 1;
+  session_send_rpc_evt_to_thread_force (
+    thread_index, ct_handle_cleanups, uword_to_pointer (thread_index, void *));
+}
+
+static void
 ct_session_close (u32 ct_index, u32 thread_index)
 {
   ct_connection_t *ct, *peer_ct;
-  app_worker_t *app_wrk;
   session_t *s;
 
   ct = ct_connection_get (ct_index, thread_index);
@@ -884,30 +983,16 @@
       else if (peer_ct->c_s_index != ~0)
 	session_transport_closing_notify (&peer_ct->connection);
       else
-	ct_connection_free (peer_ct);
+	{
+	  /* should not happen */
+	  clib_warning ("ct peer without session");
+	  ct_connection_free (peer_ct);
+	}
     }
 
-  if (ct->flags & CT_CONN_F_CLIENT)
-    {
-      /* Normal free for client session as the fifos are allocated through
-       * the connects segment manager in a segment that's not shared with
-       * the server */
-      session_free_w_fifos (s);
-      ct_session_dealloc_fifos (ct, ct->client_rx_fifo, ct->client_tx_fifo);
-    }
-  else
-    {
-      /* Manual session and fifo segment cleanup to avoid implicit
-       * segment manager cleanups and notifications */
-      app_wrk = app_worker_get_if_valid (s->app_wrk_index);
-      if (app_wrk)
-	app_worker_cleanup_notify (app_wrk, s, SESSION_CLEANUP_SESSION);
-
-      ct_session_dealloc_fifos (ct, s->rx_fifo, s->tx_fifo);
-      session_free (s);
-    }
-
-  ct_connection_free (ct);
+  /* Do not send closed notify to make sure pending tx events are
+   * still delivered and program cleanup */
+  ct_program_cleanup (ct);
 }
 
 static transport_connection_t *
@@ -1046,6 +1131,8 @@
 
   cm->n_workers = vlib_num_workers ();
   vec_validate (cm->connections, cm->n_workers);
+  vec_validate (cm->pending_cleanups, cm->n_workers);
+  vec_validate (cm->heave_cleanups, cm->n_workers);
   clib_spinlock_init (&cm->ho_reuseable_lock);
   clib_rwlock_init (&cm->app_segs_lock);
   return 0;