session: add infra for safe pool reallocs

This is not to be used lightly.

The idea is to forces pool reallocs to be done only on main thread with
a barrier to make sure pools are always reallocated without
peekers/readers. If rpcs are delayed and the pool runs out of elements,
workers will block waiting for barrier and force the realloc.

Consumers of this api should be session layer and transports.

Type: feature

Signed-off-by: Florin Coras <fcoras@cisco.com>
Change-Id: I533272a29534338935a3fcf7027c0e7af2ca948c
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c
index 2a5d13d..60991cd 100644
--- a/src/vnet/session/session.c
+++ b/src/vnet/session/session.c
@@ -1887,6 +1887,7 @@
 
   /* Allocate cache line aligned worker contexts */
   vec_validate_aligned (smm->wrk, num_threads - 1, CLIB_CACHE_LINE_BYTES);
+  clib_spinlock_init (&session_main.pool_realloc_lock);
 
   for (i = 0; i < num_threads; i++)
     {
diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h
index fe8a85c..0ea621a 100644
--- a/src/vnet/session/session.h
+++ b/src/vnet/session/session.h
@@ -200,6 +200,12 @@
 
   transport_proto_t last_transport_proto_type;
 
+  /** Number of workers at pool realloc barrier */
+  u32 pool_realloc_at_barrier;
+
+  /** Lock to synchronize parallel forced reallocs */
+  clib_spinlock_t pool_realloc_lock;
+
   /*
    * Config parameters
    */
@@ -784,6 +790,147 @@
 session_t *session_alloc_for_connection (transport_connection_t * tc);
 session_t *session_alloc_for_half_open (transport_connection_t *tc);
 
+typedef void (pool_safe_realloc_rpc_fn) (void *rpc_args);
+
+typedef struct
+{
+  u8 ph[STRUCT_OFFSET_OF (pool_header_t, max_elts) + 4];
+  u32 flag;
+} pool_safe_realloc_header_t;
+
+STATIC_ASSERT_SIZEOF (pool_safe_realloc_header_t, sizeof (pool_header_t));
+
+#define POOL_REALLOC_SAFE_ELT_THRESH 32
+
+#define pool_realloc_flag(PH)                                                 \
+  ((pool_safe_realloc_header_t *) pool_header (PH))->flag
+
+#define pool_realloc_safe_aligned(P, align)                                   \
+  do                                                                          \
+    {                                                                         \
+      vlib_main_t *vm = vlib_get_main ();                                     \
+      u32 free_elts, max_elts, n_alloc;                                       \
+      ASSERT (vlib_get_thread_index () == 0);                                 \
+      vlib_worker_thread_barrier_sync (vm);                                   \
+      free_elts = pool_free_elts (P);                                         \
+      max_elts = pool_max_len (P);                                            \
+      n_alloc = clib_max (2 * max_elts, POOL_REALLOC_SAFE_ELT_THRESH);        \
+      pool_alloc_aligned (P, free_elts + n_alloc, align);                     \
+      clib_bitmap_validate (pool_header (P)->free_bitmap,                     \
+			    max_elts + n_alloc);                              \
+      pool_realloc_flag (P) = 0;                                              \
+      vlib_worker_thread_barrier_release (vm);                                \
+    }                                                                         \
+  while (0)
+
+always_inline void
+pool_program_safe_realloc (void *p, u32 thread_index,
+			   pool_safe_realloc_rpc_fn *rpc_fn)
+{
+  /* Reuse pad as a realloc flag */
+  if (pool_realloc_flag (p))
+    return;
+
+  pool_realloc_flag (p) = 1;
+  session_send_rpc_evt_to_thread (0 /* thread index */, rpc_fn,
+				  uword_to_pointer (thread_index, void *));
+}
+
+always_inline void
+pool_realloc_maybe_wait_at_barrier (void)
+{
+  if (!(*vlib_worker_threads->wait_at_barrier))
+    return;
+
+  /* Node refork required. Don't stop at the barrier from within a node */
+  if (*vlib_worker_threads->node_reforks_required)
+    return;
+
+  clib_atomic_fetch_add (vlib_worker_threads->workers_at_barrier, 1);
+
+  while (*vlib_worker_threads->wait_at_barrier)
+    ;
+
+  clib_atomic_fetch_add (vlib_worker_threads->workers_at_barrier, -1);
+}
+
+#define pool_realloc_all_at_barrier(_not)                                     \
+  (*vlib_worker_threads->workers_at_barrier >= (vlib_num_workers () - _not))
+
+#define pool_realloc_safe_force(P)                                            \
+  do                                                                          \
+    {                                                                         \
+      ALWAYS_ASSERT (*vlib_worker_threads->node_reforks_required);            \
+      if (pool_realloc_all_at_barrier (1))                                    \
+	{                                                                     \
+	  pool_alloc (P, pool_max_len (P));                                   \
+	}                                                                     \
+      else                                                                    \
+	{                                                                     \
+	  session_main_t *sm = &session_main;                                 \
+	  clib_warning ("forced pool realloc");                               \
+	  clib_atomic_fetch_add (&sm->pool_realloc_at_barrier, 1);            \
+	  while (!pool_realloc_all_at_barrier (sm->pool_realloc_at_barrier))  \
+	    ;                                                                 \
+	  clib_spinlock_lock (&sm->pool_realloc_lock);                        \
+	  pool_alloc (P, pool_max_len (P));                                   \
+	  clib_spinlock_unlock (&sm->pool_realloc_lock);                      \
+	  clib_atomic_fetch_add (&sm->pool_realloc_at_barrier, -1);           \
+	}                                                                     \
+    }                                                                         \
+  while (0)
+
+#define pool_needs_realloc(P)                                                 \
+  ((!P) ||                                                                    \
+   (vec_len (pool_header (P)->free_indices) < POOL_REALLOC_SAFE_ELT_THRESH && \
+    pool_free_elts (P) < POOL_REALLOC_SAFE_ELT_THRESH))
+
+#define pool_get_aligned_safe(P, E, thread_index, rpc_fn, align)              \
+  do                                                                          \
+    {                                                                         \
+      ASSERT (vlib_get_thread_index () == thread_index ||                     \
+	      vlib_thread_is_main_w_barrier ());                              \
+      if (PREDICT_FALSE (pool_needs_realloc (P)))                             \
+	{                                                                     \
+	  if (PREDICT_FALSE (!(P)))                                           \
+	    {                                                                 \
+	      pool_alloc_aligned (P, 2 * POOL_REALLOC_SAFE_ELT_THRESH,        \
+				  align);                                     \
+	    }                                                                 \
+	  else if (PREDICT_FALSE (pool_free_elts (P) <                        \
+				  POOL_REALLOC_SAFE_ELT_THRESH / 2))          \
+	    {                                                                 \
+	      volatile typeof (P) *PP = &(P);                                 \
+	      pool_program_safe_realloc (P, thread_index, rpc_fn);            \
+	      if (thread_index)                                               \
+		{                                                             \
+		  while (pool_realloc_flag (P))                               \
+		    {                                                         \
+		      /* If refork required abort and consume existing elt */ \
+		      if (*vlib_worker_threads->node_reforks_required)        \
+			{                                                     \
+			  /* All workers at barrier realloc now */            \
+			  if (pool_realloc_all_at_barrier (1))                \
+			    pool_alloc_aligned (P, pool_max_len (P), align);  \
+			  break;                                              \
+			}                                                     \
+		      pool_realloc_maybe_wait_at_barrier ();                  \
+		    }                                                         \
+		  if (pool_free_elts (P) == 0)                                \
+		    pool_realloc_safe_force (P);                              \
+		  ALWAYS_ASSERT (pool_free_elts (P) > 0);                     \
+		}                                                             \
+	      (P) = *PP;                                                      \
+	    }                                                                 \
+	  else                                                                \
+	    {                                                                 \
+	      pool_program_safe_realloc (P, thread_index, rpc_fn);            \
+	    }                                                                 \
+	}                                                                     \
+      pool_get_aligned (P, E, align);                                         \
+    }                                                                         \
+  while (0)
+
 #endif /* __included_session_h__ */
 
 /*