session: basic support for interrupt mode

Experimental support for session layer interrupt mode.  When enabled
(use-private-rx-mqs must be set) session queue node switches to
interrupt state when lightly loaded, i.e., no events and less than 1
vector/dispatch.

Because transport protocols require a periodic time update, when in
interrupt state the session queue node workers register a timerfd with
the unix-epoll-input node that when triggered signals, i.e., wakes up,
the queue node. Under light load, the timer is set to trigger every 1ms
whereas if no session is allocated, the worker moves to idle state and
the timeout is set to 100ms.

Type: feature

Signed-off-by: Florin Coras <fcoras@cisco.com>
Change-Id: I905b00777fbc025faf9c4074fce4c516cd139387
diff --git a/src/vnet/session/application.c b/src/vnet/session/application.c
index 16acc9c..a93e4b9 100644
--- a/src/vnet/session/application.c
+++ b/src/vnet/session/application.c
@@ -497,6 +497,9 @@
   if (aw->pending_rx_mqs)
     vlib_node_set_interrupt_pending (vm, appsl_rx_mqs_input_node.index);
 
+  if (n_msgs && wrk->state == SESSION_WRK_INTERRUPT)
+    vlib_node_set_interrupt_pending (vm, session_queue_node.index);
+
   return n_msgs;
 }
 
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c
index c24a95f..7513aa3 100644
--- a/src/vnet/session/session.c
+++ b/src/vnet/session/session.c
@@ -28,11 +28,12 @@
 session_send_evt_to_thread (void *data, void *args, u32 thread_index,
 			    session_evt_type_t evt_type)
 {
+  session_worker_t *wrk = session_main_get_worker (thread_index);
   session_event_t *evt;
   svm_msg_q_msg_t msg;
   svm_msg_q_t *mq;
 
-  mq = session_main_get_vpp_event_queue (thread_index);
+  mq = wrk->vpp_event_queue;
   if (PREDICT_FALSE (svm_msg_q_lock (mq)))
     return -1;
   if (PREDICT_FALSE (svm_msg_q_is_full (mq)
@@ -72,6 +73,10 @@
   evt->event_type = evt_type;
 
   svm_msg_q_add_and_unlock (mq, &msg);
+
+  if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
+    vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
+
   return 0;
 }
 
@@ -121,19 +126,20 @@
 void
 session_add_self_custom_tx_evt (transport_connection_t * tc, u8 has_prio)
 {
-  session_t *s;
+  session_t *s = session_get (tc->s_index, tc->thread_index);
 
-  s = session_get (tc->s_index, tc->thread_index);
   ASSERT (s->thread_index == vlib_get_thread_index ());
   ASSERT (s->session_state != SESSION_STATE_TRANSPORT_DELETED);
+
   if (!(s->flags & SESSION_F_CUSTOM_TX))
     {
       s->flags |= SESSION_F_CUSTOM_TX;
       if (svm_fifo_set_event (s->tx_fifo)
 	  || transport_connection_is_descheduled (tc))
 	{
-	  session_worker_t *wrk;
 	  session_evt_elt_t *elt;
+	  session_worker_t *wrk;
+
 	  wrk = session_main_get_worker (tc->thread_index);
 	  if (has_prio)
 	    elt = session_evt_alloc_new (wrk);
@@ -142,6 +148,10 @@
 	  elt->evt.session_index = tc->s_index;
 	  elt->evt.event_type = SESSION_IO_EVT_TX;
 	  tc->flags &= ~TRANSPORT_CONNECTION_F_DESCHED;
+
+	  if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
+	    vlib_node_set_interrupt_pending (wrk->vm,
+					     session_queue_node.index);
 	}
     }
 }
@@ -157,6 +167,9 @@
   elt = session_evt_alloc_new (wrk);
   elt->evt.session_index = tc->s_index;
   elt->evt.event_type = SESSION_IO_EVT_TX;
+
+  if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
+    vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
 }
 
 static void
@@ -175,6 +188,9 @@
       clib_memset (&elt->evt, 0, sizeof (session_event_t));
       elt->evt.session_handle = session_handle (s);
       elt->evt.event_type = evt;
+
+      if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
+	vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
     }
   else
     session_send_ctrl_evt_to_thread (s, evt);
@@ -1693,6 +1709,9 @@
 
       if (num_threads > 1)
 	clib_rwlock_init (&smm->wrk[i].peekers_rw_locks);
+
+      if (!smm->no_adaptive && smm->use_private_rx_mqs)
+	session_wrk_enable_adaptive_mode (wrk);
     }
 
   /* Allocate vpp event queues segment and queue */
@@ -1817,6 +1836,7 @@
   smm->session_enable_asap = 0;
   smm->poll_main = 0;
   smm->use_private_rx_mqs = 0;
+  smm->no_adaptive = 0;
   smm->session_baseva = HIGH_SEGMENT_BASEVA;
 
 #if (HIGH_SEGMENT_BASEVA > (4ULL << 30))
@@ -1938,6 +1958,8 @@
 	smm->poll_main = 1;
       else if (unformat (input, "use-private-rx-mqs"))
 	smm->use_private_rx_mqs = 1;
+      else if (unformat (input, "no-adaptive"))
+	smm->no_adaptive = 1;
       else
 	return clib_error_return (0, "unknown input `%U'",
 				  format_unformat_error, input);
diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h
index 5586316..93278d6 100644
--- a/src/vnet/session/session.h
+++ b/src/vnet/session/session.h
@@ -69,6 +69,18 @@
   u8 data[SESSION_CTRL_MSG_MAX_SIZE];
 } session_evt_ctrl_data_t;
 
+typedef enum session_wrk_state_
+{
+  SESSION_WRK_POLLING,
+  SESSION_WRK_INTERRUPT,
+  SESSION_WRK_IDLE,
+} __clib_packed session_wrk_state_t;
+
+typedef enum session_wrk_flags_
+{
+  SESSION_WRK_F_ADAPTIVE = 1 << 0,
+} __clib_packed session_wrk_flag_t;
+
 typedef struct session_worker_
 {
   CLIB_CACHE_LINE_ALIGN_MARK (cacheline0);
@@ -91,6 +103,15 @@
   /** Per-proto vector of sessions to enqueue */
   u32 **session_to_enqueue;
 
+  /** Timerfd used to periodically signal wrk session queue node */
+  u32 timerfd;
+
+  /** Worker flags */
+  session_wrk_flag_t flags;
+
+  /** Worker state */
+  session_wrk_state_t state;
+
   /** Context for session tx */
   session_tx_context_t ctx;
 
@@ -121,6 +142,9 @@
   /** Vector of nexts for the pending tx buffers */
   u16 *pending_tx_nexts;
 
+  /** Clib file for timerfd. Used only if adaptive mode is on */
+  uword timerfd_file;
+
 #if SESSION_DEBUG
   /** last event poll time by thread */
   clib_time_type_t last_event_poll;
@@ -177,6 +201,9 @@
   /** Allocate private rx mqs for external apps */
   u8 use_private_rx_mqs;
 
+  /** Do not enable session queue node adaptive mode */
+  u8 no_adaptive;
+
   /** vpp fifo event queue configured length */
   u32 configured_event_queue_length;
 
@@ -682,6 +709,8 @@
   session_worker_t *wrk = session_main_get_worker (thread_index);
   vec_add1 (wrk->pending_tx_buffers, bi);
   vec_add1 (wrk->pending_tx_nexts, next_node);
+  if (PREDICT_FALSE (wrk->state == SESSION_WRK_INTERRUPT))
+    vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
 }
 
 always_inline void
@@ -691,6 +720,7 @@
   wrk->last_vlib_us_time = wrk->last_vlib_time * CLIB_US_TIME_FREQ;
 }
 
+void session_wrk_enable_adaptive_mode (session_worker_t *wrk);
 fifo_segment_t *session_main_get_evt_q_segment (void);
 void session_node_enable_disable (u8 is_en);
 clib_error_t *vnet_session_enable_disable (vlib_main_t * vm, u8 is_en);
diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c
index f8157cc..d7adbb5 100644
--- a/src/vnet/session/session_node.c
+++ b/src/vnet/session/session_node.c
@@ -24,6 +24,7 @@
 #include <vnet/session/application_local.h>
 #include <vnet/session/session_debug.h>
 #include <svm/queue.h>
+#include <sys/timerfd.h>
 
 #define app_check_thread_and_barrier(_fn, _arg)				\
   if (!vlib_thread_is_main_w_barrier ())				\
@@ -1418,6 +1419,79 @@
   return n_to_dequeue;
 }
 
+static void
+session_wrk_timerfd_update (session_worker_t *wrk, u64 time_ns)
+{
+  struct itimerspec its;
+
+  its.it_value.tv_sec = 0;
+  its.it_value.tv_nsec = time_ns;
+  its.it_interval.tv_sec = 0;
+  its.it_interval.tv_nsec = its.it_value.tv_nsec;
+
+  if (timerfd_settime (wrk->timerfd, 0, &its, NULL) == -1)
+    clib_warning ("timerfd_settime");
+}
+
+always_inline u64
+session_wrk_tfd_timeout (session_wrk_state_t state, u32 thread_index)
+{
+  if (state == SESSION_WRK_INTERRUPT)
+    return thread_index ? 1e6 : vlib_num_workers () ? 5e8 : 1e6;
+  else if (state == SESSION_WRK_IDLE)
+    return thread_index ? 1e8 : vlib_num_workers () ? 5e8 : 1e8;
+  else
+    return 0;
+}
+
+static inline void
+session_wrk_state_update (session_worker_t *wrk, session_wrk_state_t state)
+{
+  u64 time_ns;
+
+  wrk->state = state;
+  time_ns = session_wrk_tfd_timeout (state, wrk->vm->thread_index);
+  session_wrk_timerfd_update (wrk, time_ns);
+}
+
+static void
+session_wrk_update_state (session_worker_t *wrk)
+{
+  vlib_main_t *vm = wrk->vm;
+
+  if (wrk->state == SESSION_WRK_POLLING)
+    {
+      if (pool_elts (wrk->event_elts) == 3 &&
+	  vlib_last_vectors_per_main_loop (vm) < 1)
+	{
+	  session_wrk_state_update (wrk, SESSION_WRK_INTERRUPT);
+	  vlib_node_set_state (vm, session_queue_node.index,
+			       VLIB_NODE_STATE_INTERRUPT);
+	}
+    }
+  else if (wrk->state == SESSION_WRK_INTERRUPT)
+    {
+      if (pool_elts (wrk->event_elts) > 3 ||
+	  vlib_last_vectors_per_main_loop (vm) > 1)
+	{
+	  session_wrk_state_update (wrk, SESSION_WRK_POLLING);
+	  vlib_node_set_state (vm, session_queue_node.index,
+			       VLIB_NODE_STATE_POLLING);
+	}
+      else if (PREDICT_FALSE (!pool_elts (wrk->sessions)))
+	{
+	  session_wrk_state_update (wrk, SESSION_WRK_IDLE);
+	}
+    }
+  else
+    {
+      if (pool_elts (wrk->event_elts))
+	{
+	  session_wrk_state_update (wrk, SESSION_WRK_INTERRUPT);
+	}
+    }
+}
+
 static uword
 session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
 		       vlib_frame_t * frame)
@@ -1513,6 +1587,9 @@
 
   SESSION_EVT (SESSION_EVT_DISPATCH_END, wrk, n_tx_packets);
 
+  if (wrk->flags & SESSION_WRK_F_ADAPTIVE)
+    session_wrk_update_state (wrk);
+
   return n_tx_packets;
 }
 
@@ -1531,6 +1608,46 @@
 /* *INDENT-ON* */
 
 static clib_error_t *
+session_wrk_tfd_read_ready (clib_file_t *cf)
+{
+  session_worker_t *wrk = session_main_get_worker (cf->private_data);
+  u64 buf;
+  int rv;
+
+  vlib_node_set_interrupt_pending (wrk->vm, session_queue_node.index);
+  rv = read (wrk->timerfd, &buf, sizeof (buf));
+  if (rv < 0 && errno != EAGAIN)
+    clib_unix_warning ("failed");
+  return 0;
+}
+
+static clib_error_t *
+session_wrk_tfd_write_ready (clib_file_t *cf)
+{
+  return 0;
+}
+
+void
+session_wrk_enable_adaptive_mode (session_worker_t *wrk)
+{
+  u32 thread_index = wrk->vm->thread_index;
+  clib_file_t template = { 0 };
+
+  if ((wrk->timerfd = timerfd_create (CLOCK_MONOTONIC, TFD_NONBLOCK)) < 0)
+    clib_warning ("timerfd_create");
+
+  template.read_function = session_wrk_tfd_read_ready;
+  template.write_function = session_wrk_tfd_write_ready;
+  template.file_descriptor = wrk->timerfd;
+  template.private_data = thread_index;
+  template.polling_thread_index = thread_index;
+  template.description = format (0, "session-wrk-tfd-%u", thread_index);
+
+  wrk->timerfd_file = clib_file_add (&file_main, &template);
+  wrk->flags |= SESSION_WRK_F_ADAPTIVE;
+}
+
+static clib_error_t *
 session_queue_exit (vlib_main_t * vm)
 {
   if (vlib_get_n_threads () < 2)