session: optimize ct fifo segment allocations

Allocate per app pair segments with space for more than one fifo.

Type: feature

Signed-off-by: Florin Coras <fcoras@cisco.com>
Change-Id: Ib96fe12b899cb14ff20c0be607814011e2c3fc6a
diff --git a/src/svm/fifo_segment.h b/src/svm/fifo_segment.h
index 74f73d4..f76798f 100644
--- a/src/svm/fifo_segment.h
+++ b/src/svm/fifo_segment.h
@@ -38,6 +38,7 @@
   FIFO_SEGMENT_F_IS_PREALLOCATED = 1 << 0,
   FIFO_SEGMENT_F_WILL_DELETE = 1 << 1,
   FIFO_SEGMENT_F_MEM_LIMIT = 1 << 2,
+  FIFO_SEGMENT_F_CUSTOM_USE = 1 << 3,
 } fifo_segment_flags_t;
 
 #define foreach_segment_mem_status	\
diff --git a/src/vnet/session/application_local.c b/src/vnet/session/application_local.c
index b5c062f..9a8fe00 100644
--- a/src/vnet/session/application_local.c
+++ b/src/vnet/session/application_local.c
@@ -16,6 +16,28 @@
 #include <vnet/session/application_local.h>
 #include <vnet/session/session.h>
 
+typedef enum ct_segment_flags_
+{
+  CT_SEGMENT_F_CLIENT_DETACHED = 1 << 0,
+  CT_SEGMENT_F_SERVER_DETACHED = 1 << 1,
+} ct_segment_flags_t;
+
+typedef struct ct_segment_
+{
+  u32 segment_index;
+  u32 client_n_sessions;
+  u32 server_n_sessions;
+  ct_segment_flags_t flags;
+} ct_segment_t;
+
+typedef struct ct_segments_
+{
+  u32 sm_index;
+  u32 server_wrk;
+  u32 client_wrk;
+  ct_segment_t *segments;
+} ct_segments_ctx_t;
+
 typedef struct ct_main_
 {
   ct_connection_t **connections;	/**< Per-worker connection pools */
@@ -23,6 +45,9 @@
   u32 n_sessions;			/**< Cumulative sessions counter */
   u32 *ho_reusable;			/**< Vector of reusable ho indices */
   clib_spinlock_t ho_reuseable_lock;	/**< Lock for reusable ho indices */
+  clib_rwlock_t app_segs_lock;		/**< RW lock for seg contexts */
+  uword *app_segs_ctxs_table;		/**< App handle to segment pool map */
+  ct_segments_ctx_t *app_seg_ctxs;	/**< Pool of ct segment contexts */
 } ct_main_t;
 
 static ct_main_t ct_main;
@@ -37,6 +62,8 @@
   ct->c_thread_index = thread_index;
   ct->client_wrk = ~0;
   ct->server_wrk = ~0;
+  ct->seg_ctx_index = ~0;
+  ct->ct_seg_index = ~0;
   return ct;
 }
 
@@ -106,17 +133,133 @@
   ip_copy (&sep->ip, &ct->c_lcl_ip, ct->c_is_ip4);
 }
 
-int
-ct_session_connect_notify (session_t * ss)
+static void
+ct_session_dealloc_fifos (ct_connection_t *ct, svm_fifo_t *rx_fifo,
+			  svm_fifo_t *tx_fifo)
 {
-  u32 ss_index, opaque, thread_index;
-  ct_connection_t *sct, *cct;
-  app_worker_t *client_wrk;
+  ct_segments_ctx_t *seg_ctx;
+  ct_main_t *cm = &ct_main;
+  ct_segment_flags_t flags;
   segment_manager_t *sm;
-  fifo_segment_t *seg;
-  u64 segment_handle;
-  int err = 0;
+  app_worker_t *app_wrk;
+  ct_segment_t *ct_seg;
+  fifo_segment_t *fs;
+  u32 seg_index;
+  u8 cnt;
+
+  /*
+   * Cleanup fifos
+   */
+
+  sm = segment_manager_get (rx_fifo->segment_manager);
+  seg_index = rx_fifo->segment_index;
+
+  fs = segment_manager_get_segment_w_lock (sm, seg_index);
+  fifo_segment_free_fifo (fs, rx_fifo);
+  fifo_segment_free_fifo (fs, tx_fifo);
+  segment_manager_segment_reader_unlock (sm);
+
+  /*
+   * Update segment context
+   */
+
+  clib_rwlock_reader_lock (&cm->app_segs_lock);
+
+  seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, ct->seg_ctx_index);
+  ct_seg = pool_elt_at_index (seg_ctx->segments, ct->ct_seg_index);
+
+  if (ct->flags & CT_CONN_F_CLIENT)
+    {
+      cnt =
+	__atomic_sub_fetch (&ct_seg->client_n_sessions, 1, __ATOMIC_RELAXED);
+      if (!cnt)
+	ct_seg->flags |= CT_SEGMENT_F_CLIENT_DETACHED;
+    }
+  else
+    {
+      cnt =
+	__atomic_sub_fetch (&ct_seg->server_n_sessions, 1, __ATOMIC_RELAXED);
+      if (!cnt)
+	ct_seg->flags |= CT_SEGMENT_F_SERVER_DETACHED;
+    }
+
+  flags = ct_seg->flags;
+
+  clib_rwlock_reader_unlock (&cm->app_segs_lock);
+
+  /*
+   * No need to do any app updates, return
+   */
+  if (cnt)
+    return;
+
+  if (ct->flags & CT_CONN_F_CLIENT)
+    {
+      app_wrk = app_worker_get_if_valid (ct->client_wrk);
+      /* Determine if client app still needs notification, i.e., if it is
+       * still attached. If client detached and this is the last ct session
+       * on this segment, then its connects segment manager should also be
+       * detached, so do not send notification */
+      if (app_wrk)
+	{
+	  segment_manager_t *csm;
+	  csm = app_worker_get_connect_segment_manager (app_wrk);
+	  if (!segment_manager_app_detached (csm))
+	    app_worker_del_segment_notify (app_wrk, ct->segment_handle);
+	}
+    }
+  else if (!segment_manager_app_detached (sm))
+    {
+      app_wrk = app_worker_get (ct->server_wrk);
+      app_worker_del_segment_notify (app_wrk, ct->segment_handle);
+    }
+
+  if (!(flags & CT_SEGMENT_F_CLIENT_DETACHED) ||
+      !(flags & CT_SEGMENT_F_SERVER_DETACHED))
+    return;
+
+  /*
+   * Remove segment context because both client and server detached
+   */
+
+  clib_rwlock_writer_lock (&cm->app_segs_lock);
+
+  seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, ct->seg_ctx_index);
+  pool_put_index (seg_ctx->segments, ct->ct_seg_index);
+
+  /*
+   * No more segment indices left, remove the segments context
+   */
+  if (!pool_elts (seg_ctx->segments))
+    {
+      u64 table_handle = seg_ctx->client_wrk << 16 | seg_ctx->server_wrk;
+      table_handle = (u64) seg_ctx->sm_index << 32 | table_handle;
+      hash_unset (cm->app_segs_ctxs_table, table_handle);
+      pool_free (seg_ctx->segments);
+      pool_put_index (cm->app_seg_ctxs, ct->seg_ctx_index);
+    }
+
+  clib_rwlock_writer_unlock (&cm->app_segs_lock);
+
+  segment_manager_lock_and_del_segment (sm, seg_index);
+
+  /* Cleanup segment manager if needed. If server detaches there's a chance
+   * the client's sessions will hold up segment removal */
+  if (segment_manager_app_detached (sm) && !segment_manager_has_fifos (sm))
+    segment_manager_free_safe (sm);
+}
+
+int
+ct_session_connect_notify (session_t *ss)
+{
+  u32 ss_index, opaque, thread_index, cnt;
+  ct_connection_t *sct, *cct;
+  ct_segments_ctx_t *seg_ctx;
+  app_worker_t *client_wrk;
+  ct_main_t *cm = &ct_main;
+  ct_segment_t *ct_seg;
   session_t *cs;
+  int err = 0;
 
   ss_index = ss->session_index;
   thread_index = ss->thread_index;
@@ -124,26 +267,6 @@
   client_wrk = app_worker_get (sct->client_wrk);
   opaque = sct->client_opaque;
 
-  sm = segment_manager_get (ss->rx_fifo->segment_manager);
-  seg = segment_manager_get_segment_w_lock (sm, ss->rx_fifo->segment_index);
-  segment_handle = segment_manager_segment_handle (sm, seg);
-
-  if ((err = app_worker_add_segment_notify (client_wrk, segment_handle)))
-    {
-      clib_warning ("failed to notify client %u of new segment",
-		    sct->client_wrk);
-      segment_manager_segment_reader_unlock (sm);
-      session_close (ss);
-      goto error;
-    }
-  else
-    {
-      segment_manager_segment_reader_unlock (sm);
-    }
-
-  /*
-   * Alloc client session
-   */
   cct = ct_connection_get (sct->peer_index, thread_index);
 
   /* Client closed while waiting for reply from server */
@@ -158,6 +281,33 @@
   session_half_open_delete_notify (&cct->connection);
   cct->flags &= ~CT_CONN_F_HALF_OPEN;
 
+  /*
+   * Update ct segment context
+   */
+
+  clib_rwlock_reader_lock (&cm->app_segs_lock);
+
+  seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, sct->seg_ctx_index);
+  ct_seg = pool_elt_at_index (seg_ctx->segments, sct->ct_seg_index);
+
+  cnt = __atomic_add_fetch (&ct_seg->client_n_sessions, 1, __ATOMIC_RELAXED);
+  if (cnt == 1)
+    {
+      err = app_worker_add_segment_notify (client_wrk, cct->segment_handle);
+      if (err)
+	{
+	  clib_rwlock_reader_unlock (&cm->app_segs_lock);
+	  session_close (ss);
+	  goto error;
+	}
+    }
+
+  clib_rwlock_reader_unlock (&cm->app_segs_lock);
+
+  /*
+   * Alloc client session
+   */
+
   cs = session_alloc (thread_index);
   ss = session_get (ss_index, thread_index);
   cs->session_type = ss->session_type;
@@ -165,6 +315,8 @@
   cs->session_state = SESSION_STATE_CONNECTING;
   cs->app_wrk_index = client_wrk->wrk_index;
   cs->connection_index = cct->c_c_index;
+  cct->seg_ctx_index = sct->seg_ctx_index;
+  cct->ct_seg_index = sct->ct_seg_index;
 
   cct->c_s_index = cs->session_index;
   cct->client_rx_fifo = ss->tx_fifo;
@@ -188,7 +340,7 @@
   if (app_worker_connect_notify (client_wrk, cs, err, opaque))
     {
       session_close (ss);
-      segment_manager_dealloc_fifos (cs->rx_fifo, cs->tx_fifo);
+      ct_session_dealloc_fifos (cct, cs->rx_fifo, cs->tx_fifo);
       session_free (cs);
       return -1;
     }
@@ -203,75 +355,181 @@
   return -1;
 }
 
-static int
-ct_init_accepted_session (app_worker_t * server_wrk,
-			  ct_connection_t * ct, session_t * ls,
-			  session_t * ll)
+static ct_segment_t *
+ct_lookup_free_segment (segment_manager_t *sm, ct_segments_ctx_t *seg_ctx,
+			u32 pair_bytes)
 {
-  u32 round_rx_fifo_sz, round_tx_fifo_sz, sm_index, seg_size;
+  uword free_bytes, max_free_bytes;
+  ct_segment_t *ct_seg, *res = 0;
+  fifo_segment_t *fs;
+  u32 max_fifos;
+
+  max_free_bytes = pair_bytes;
+  pool_foreach (ct_seg, seg_ctx->segments)
+    {
+      /* Client or server has detached so segment cannot be used */
+      if ((ct_seg->flags & CT_SEGMENT_F_SERVER_DETACHED) ||
+	  (ct_seg->flags & CT_SEGMENT_F_CLIENT_DETACHED))
+	continue;
+      fs = segment_manager_get_segment (sm, ct_seg->segment_index);
+      free_bytes = fifo_segment_available_bytes (fs);
+      max_fifos = fifo_segment_size (fs) / pair_bytes;
+      if (free_bytes > max_free_bytes &&
+	  fifo_segment_num_fifos (fs) / 2 < max_fifos)
+	{
+	  max_free_bytes = free_bytes;
+	  res = ct_seg;
+	}
+    }
+
+  return res;
+}
+
+static int
+ct_init_accepted_session (app_worker_t *server_wrk, ct_connection_t *ct,
+			  session_t *ls, session_t *ll)
+{
+  u32 sm_index, pair_bytes, seg_ctx_index = ~0, ct_seg_index = ~0;
+  u64 seg_handle, table_handle, seg_size;
   segment_manager_props_t *props;
+  const u32 margin = 16 << 10;
+  ct_segments_ctx_t *seg_ctx;
+  ct_main_t *cm = &ct_main;
   application_t *server;
   segment_manager_t *sm;
-  u32 margin = 16 << 10;
-  fifo_segment_t *seg;
-  u64 segment_handle;
-  int seg_index, rv;
-
-  server = application_get (server_wrk->app_index);
-
-  props = application_segment_manager_properties (server);
-  round_rx_fifo_sz = 1 << max_log2 (props->rx_fifo_size);
-  round_tx_fifo_sz = 1 << max_log2 (props->tx_fifo_size);
-  /* Increase size because of inefficient chunk allocations. Depending on
-   * how data is consumed, it may happen that more chunks than needed are
-   * allocated.
-   * TODO should remove once allocations are done more efficiently */
-  seg_size = 4 * (round_rx_fifo_sz + round_tx_fifo_sz + margin);
+  ct_segment_t *ct_seg;
+  fifo_segment_t *fs;
+  int rv, fs_index;
+  uword *spp;
 
   sm = app_worker_get_listen_segment_manager (server_wrk, ll);
-  seg_index = segment_manager_add_segment (sm, seg_size, 0);
-  if (seg_index < 0)
-    {
-      clib_warning ("failed to add new cut-through segment");
-      return seg_index;
-    }
-  seg = segment_manager_get_segment_w_lock (sm, seg_index);
+  sm_index = segment_manager_index (sm);
+  server = application_get (server_wrk->app_index);
+  props = application_segment_manager_properties (server);
 
-  rv = segment_manager_try_alloc_fifos (seg, ls->thread_index,
-					props->rx_fifo_size,
-					props->tx_fifo_size, &ls->rx_fifo,
-					&ls->tx_fifo);
+  table_handle = ct->client_wrk << 16 | server_wrk->wrk_index;
+  table_handle = (u64) segment_manager_index (sm) << 32 | table_handle;
+
+  /*
+   * Check if we already have a segment that can hold the fifos
+   */
+
+  clib_rwlock_reader_lock (&cm->app_segs_lock);
+
+  spp = hash_get (cm->app_segs_ctxs_table, table_handle);
+  if (spp)
+    {
+      seg_ctx_index = *spp;
+      seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, seg_ctx_index);
+      pair_bytes = props->rx_fifo_size + props->tx_fifo_size + margin;
+      ct_seg = ct_lookup_free_segment (sm, seg_ctx, pair_bytes);
+      if (ct_seg)
+	{
+	  ct_seg_index = ct_seg - seg_ctx->segments;
+	  fs_index = ct_seg->segment_index;
+	  __atomic_add_fetch (&ct_seg->server_n_sessions, 1, __ATOMIC_RELAXED);
+	}
+    }
+
+  clib_rwlock_reader_unlock (&cm->app_segs_lock);
+
+  /*
+   * No segment, try to alloc one and notify the server
+   */
+
+  if (ct_seg_index == ~0)
+    {
+      seg_size = clib_max (props->segment_size, 128 << 20);
+      fs_index = segment_manager_add_segment (sm, seg_size, 0);
+      if (fs_index < 0)
+	{
+	  rv = -1;
+	  goto failed;
+	}
+
+      /* Make sure the segment is not used for other fifos */
+      fs = segment_manager_get_segment_w_lock (sm, fs_index);
+      fifo_segment_flags (fs) |= FIFO_SEGMENT_F_CUSTOM_USE;
+      segment_manager_segment_reader_unlock (sm);
+
+      clib_rwlock_writer_lock (&cm->app_segs_lock);
+
+      if (seg_ctx_index == ~0)
+	{
+	  pool_get_zero (cm->app_seg_ctxs, seg_ctx);
+	  seg_ctx_index = seg_ctx - cm->app_seg_ctxs;
+	  hash_set (cm->app_segs_ctxs_table, table_handle, seg_ctx_index);
+	  seg_ctx->server_wrk = server_wrk->wrk_index;
+	  seg_ctx->client_wrk = ct->client_wrk;
+	  seg_ctx->sm_index = sm_index;
+	}
+      else
+	seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, seg_ctx_index);
+
+      pool_get_zero (seg_ctx->segments, ct_seg);
+      ct_seg->segment_index = fs_index;
+      ct_seg->server_n_sessions += 1;
+      ct_seg_index = ct_seg - seg_ctx->segments;
+
+      clib_rwlock_writer_unlock (&cm->app_segs_lock);
+
+      /* New segment, notify the server. Client notification sent after
+       * server accepts the connection */
+      seg_handle = segment_manager_make_segment_handle (sm_index, fs_index);
+      if ((rv = app_worker_add_segment_notify (server_wrk, seg_handle)))
+	{
+	  segment_manager_lock_and_del_segment (sm, fs_index);
+
+	  clib_rwlock_writer_lock (&cm->app_segs_lock);
+	  pool_put_index (seg_ctx->segments, ct_seg_index);
+	  clib_rwlock_writer_unlock (&cm->app_segs_lock);
+
+	  goto failed_fix_count;
+	}
+    }
+
+  /*
+   * Allocate and initialize the fifos
+   */
+  fs = segment_manager_get_segment_w_lock (sm, fs_index);
+  rv = segment_manager_try_alloc_fifos (
+    fs, ls->thread_index, props->rx_fifo_size, props->tx_fifo_size,
+    &ls->rx_fifo, &ls->tx_fifo);
   if (rv)
     {
-      clib_warning ("failed to add fifos in cut-through segment");
       segment_manager_segment_reader_unlock (sm);
-      goto failed;
+      goto failed_fix_count;
     }
 
-  sm_index = segment_manager_index (sm);
   ls->rx_fifo->shr->master_session_index = ls->session_index;
   ls->tx_fifo->shr->master_session_index = ls->session_index;
   ls->rx_fifo->master_thread_index = ls->thread_index;
   ls->tx_fifo->master_thread_index = ls->thread_index;
   ls->rx_fifo->segment_manager = sm_index;
   ls->tx_fifo->segment_manager = sm_index;
-  ls->rx_fifo->segment_index = seg_index;
-  ls->tx_fifo->segment_index = seg_index;
+  ls->rx_fifo->segment_index = fs_index;
+  ls->tx_fifo->segment_index = fs_index;
 
-  segment_handle = segment_manager_segment_handle (sm, seg);
-  if ((rv = app_worker_add_segment_notify (server_wrk, segment_handle)))
-    {
-      clib_warning ("failed to notify server of new segment");
-      segment_manager_segment_reader_unlock (sm);
-      goto failed;
-    }
+  seg_handle = segment_manager_segment_handle (sm, fs);
   segment_manager_segment_reader_unlock (sm);
-  ct->segment_handle = segment_handle;
+
+  ct->segment_handle = seg_handle;
+  ct->seg_ctx_index = seg_ctx_index;
+  ct->ct_seg_index = ct_seg_index;
 
   return 0;
 
+failed_fix_count:
+
+  clib_rwlock_reader_lock (&cm->app_segs_lock);
+
+  seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, seg_ctx_index);
+  ct_seg = pool_elt_at_index (seg_ctx->segments, ct_seg_index);
+  __atomic_sub_fetch (&ct_seg->server_n_sessions, 1, __ATOMIC_RELAXED);
+
+  clib_rwlock_reader_unlock (&cm->app_segs_lock);
+
 failed:
-  segment_manager_lock_and_del_segment (sm, seg_index);
   return rv;
 }
 
@@ -366,8 +624,8 @@
   ss->session_state = SESSION_STATE_ACCEPTING;
   if (app_worker_accept_notify (server_wrk, ss))
     {
+      ct_session_dealloc_fifos (sct, ss->rx_fifo, ss->tx_fifo);
       ct_connection_free (sct);
-      segment_manager_dealloc_fifos (ss->rx_fifo, ss->tx_fifo);
       session_free (ss);
       return;
     }
@@ -571,12 +829,26 @@
     }
 
   s = session_get (ct->c_s_index, ct->c_thread_index);
-  app_wrk = app_worker_get_if_valid (s->app_wrk_index);
-  if (app_wrk)
-    app_worker_del_segment_notify (app_wrk, ct->segment_handle);
-  session_free_w_fifos (s);
+
   if (ct->flags & CT_CONN_F_CLIENT)
-    segment_manager_dealloc_fifos (ct->client_rx_fifo, ct->client_tx_fifo);
+    {
+      /* Normal free for client session as the fifos are allocated through
+       * the connects segment manager in a segment that's not shared with
+       * the server */
+      session_free_w_fifos (s);
+      ct_session_dealloc_fifos (ct, ct->client_rx_fifo, ct->client_tx_fifo);
+    }
+  else
+    {
+      /* Manual session and fifo segment cleanup to avoid implicit
+       * segment manager cleanups and notifications */
+      app_wrk = app_worker_get_if_valid (s->app_wrk_index);
+      if (app_wrk)
+	app_worker_cleanup_notify (app_wrk, s, SESSION_CLEANUP_SESSION);
+
+      ct_session_dealloc_fifos (ct, s->rx_fifo, s->tx_fifo);
+      session_free (s);
+    }
 
   ct_connection_free (ct);
 }
@@ -718,7 +990,7 @@
   cm->n_workers = vlib_num_workers ();
   vec_validate (cm->connections, cm->n_workers);
   clib_spinlock_init (&cm->ho_reuseable_lock);
-
+  clib_rwlock_init (&cm->app_segs_lock);
   return 0;
 }
 
diff --git a/src/vnet/session/application_local.h b/src/vnet/session/application_local.h
index 09c33cc..f98f469 100644
--- a/src/vnet/session/application_local.h
+++ b/src/vnet/session/application_local.h
@@ -43,12 +43,14 @@
   transport_connection_t connection;
   u32 client_wrk;
   u32 server_wrk;
-  transport_proto_t actual_tp;
   u32 client_opaque;
   u32 peer_index;
   u64 segment_handle;
+  u32 seg_ctx_index;
+  u32 ct_seg_index;
   svm_fifo_t *client_rx_fifo;
   svm_fifo_t *client_tx_fifo;
+  transport_proto_t actual_tp;
   ct_connection_flags_t flags;
 } ct_connection_t;
 
diff --git a/src/vnet/session/segment_manager.c b/src/vnet/session/segment_manager.c
index 103f89e..a7ce989 100644
--- a/src/vnet/session/segment_manager.c
+++ b/src/vnet/session/segment_manager.c
@@ -498,8 +498,8 @@
     segment_manager_free (sm);
 }
 
-static void
-segment_manager_free_safe (segment_manager_t * sm)
+void
+segment_manager_free_safe (segment_manager_t *sm)
 {
   if (!vlib_thread_is_main_w_barrier ())
     {
@@ -738,16 +738,16 @@
 
   segment_manager_segment_reader_lock (sm);
 
-  /* *INDENT-OFF* */
   pool_foreach (cur, sm->segments)  {
-    free_bytes = fifo_segment_available_bytes (cur);
-    if (free_bytes > max_free_bytes)
-      {
-        max_free_bytes = free_bytes;
-        fs = cur;
-      }
+      if (fifo_segment_flags (cur) & FIFO_SEGMENT_F_CUSTOM_USE)
+	continue;
+      free_bytes = fifo_segment_available_bytes (cur);
+      if (free_bytes > max_free_bytes)
+	{
+	  max_free_bytes = free_bytes;
+	  fs = cur;
+	}
   }
-  /* *INDENT-ON* */
 
   if (fs)
     {
diff --git a/src/vnet/session/segment_manager.h b/src/vnet/session/segment_manager.h
index 3278d8d..ef8b970 100644
--- a/src/vnet/session/segment_manager.h
+++ b/src/vnet/session/segment_manager.h
@@ -90,6 +90,7 @@
  * @param sm	segment manager to be freed
  */
 void segment_manager_free (segment_manager_t * sm);
+void segment_manager_free_safe (segment_manager_t *sm);
 
 /**
  * Initiate segment manager cleanup