interface: rx queue infra rework, part one

Type: improvement
Change-Id: I4008cadfd5141f921afbdc09a3ebcd1dcf88eb29
Signed-off-by: Damjan Marion <damarion@cisco.com>
diff --git a/src/vlib/main.c b/src/vlib/main.c
index 6369f39..c76d874 100644
--- a/src/vlib/main.c
+++ b/src/vlib/main.c
@@ -1708,27 +1708,6 @@
 {
 }
 
-static_always_inline u64
-dispatch_pending_interrupts (vlib_main_t * vm, vlib_node_main_t * nm,
-			     u64 cpu_time_now,
-			     vlib_node_interrupt_t * interrupts)
-{
-  vlib_node_runtime_t *n;
-
-  for (int i = 0; i < _vec_len (interrupts); i++)
-    {
-      vlib_node_interrupt_t *in;
-      in = vec_elt_at_index (interrupts, i);
-      n = vec_elt_at_index (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT],
-			    in->node_runtime_index);
-      n->interrupt_data = in->data;
-      cpu_time_now = dispatch_node (vm, n, VLIB_NODE_TYPE_INPUT,
-				    VLIB_NODE_STATE_INTERRUPT, /* frame */ 0,
-				    cpu_time_now);
-    }
-  return cpu_time_now;
-}
-
 static inline void
 pcap_postmortem_reset (vlib_main_t * vm)
 {
@@ -1752,7 +1731,6 @@
   f64 now;
   vlib_frame_queue_main_t *fqm;
   u32 frame_queue_check_counter = 0;
-  vlib_node_interrupt_t *empty_int_list = 0;
 
   /* Initialize pending node vector. */
   if (is_main)
@@ -1771,12 +1749,7 @@
     cpu_time_now = clib_cpu_time_now ();
 
   /* Pre-allocate interupt runtime indices and lock. */
-  vec_alloc (nm->pending_local_interrupts, 32);
-  vec_alloc (nm->pending_remote_interrupts, 32);
-  vec_alloc (empty_int_list, 32);
-  vec_alloc_aligned (nm->pending_remote_interrupts_notify, 1,
-		     CLIB_CACHE_LINE_BYTES);
-  clib_spinlock_init (&nm->pending_interrupt_lock);
+  vec_alloc_aligned (nm->pending_interrupts, 1, CLIB_CACHE_LINE_BYTES);
 
   /* Pre-allocate expired nodes. */
   if (!nm->polling_threshold_vector_length)
@@ -1874,35 +1847,22 @@
       if (PREDICT_TRUE (is_main && vm->queue_signal_pending == 0))
 	vm->queue_signal_callback (vm);
 
-      /* handle local interruots */
-      if (_vec_len (nm->pending_local_interrupts))
+      if (__atomic_load_n (nm->pending_interrupts, __ATOMIC_ACQUIRE))
 	{
-	  vlib_node_interrupt_t *interrupts = nm->pending_local_interrupts;
-	  nm->pending_local_interrupts = empty_int_list;
-	  cpu_time_now = dispatch_pending_interrupts (vm, nm, cpu_time_now,
-						      interrupts);
-	  empty_int_list = interrupts;
-	  vec_reset_length (empty_int_list);
-	}
+	  int int_num = -1;
+	  *nm->pending_interrupts = 0;
 
-      /* handle remote interruots */
-      if (PREDICT_FALSE (_vec_len (nm->pending_remote_interrupts)))
-	{
-	  vlib_node_interrupt_t *interrupts;
-
-	  /* at this point it is known that
-	   * vec_len (nm->pending_local_interrupts) is zero so we quickly swap
-	   * local and remote vector under the spinlock */
-	  clib_spinlock_lock (&nm->pending_interrupt_lock);
-	  interrupts = nm->pending_remote_interrupts;
-	  nm->pending_remote_interrupts = empty_int_list;
-	  *nm->pending_remote_interrupts_notify = 0;
-	  clib_spinlock_unlock (&nm->pending_interrupt_lock);
-
-	  cpu_time_now = dispatch_pending_interrupts (vm, nm, cpu_time_now,
-						      interrupts);
-	  empty_int_list = interrupts;
-	  vec_reset_length (empty_int_list);
+	  while ((int_num =
+		    clib_interrupt_get_next (nm->interrupts, int_num)) != -1)
+	    {
+	      vlib_node_runtime_t *n;
+	      clib_interrupt_clear (nm->interrupts, int_num);
+	      n = vec_elt_at_index (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT],
+				    int_num);
+	      cpu_time_now = dispatch_node (vm, n, VLIB_NODE_TYPE_INPUT,
+					    VLIB_NODE_STATE_INTERRUPT,
+					    /* frame */ 0, cpu_time_now);
+	    }
 	}
 
       /* Input nodes may have added work to the pending vector.
diff --git a/src/vlib/node.c b/src/vlib/node.c
index cf65f29..13889d1 100644
--- a/src/vlib/node.c
+++ b/src/vlib/node.c
@@ -447,6 +447,9 @@
       {
 	vec_add2_aligned (nm->nodes_by_type[n->type], rt, 1,
 			  /* align */ CLIB_CACHE_LINE_BYTES);
+	if (n->type == VLIB_NODE_TYPE_INPUT)
+	  clib_interrupt_resize (&nm->interrupts,
+				 vec_len (nm->nodes_by_type[n->type]));
 	n->runtime_index = rt - nm->nodes_by_type[n->type];
       }
 
diff --git a/src/vlib/node.h b/src/vlib/node.h
index 6b9a2df..1ec5a7a 100644
--- a/src/vlib/node.h
+++ b/src/vlib/node.h
@@ -502,10 +502,6 @@
 
   u16 state;				/**< Input node state. */
 
-  u32 interrupt_data;			/**< Data passed together with interrupt.
-					  Valid only when state is
-					  VLIB_NODE_STATE_INTERRUPT */
-
   u16 n_next_nodes;
 
   u16 cached_next_index;		/**< Next frame index that vector
@@ -670,12 +666,6 @@
 
 typedef struct
 {
-  u32 node_runtime_index;
-  u32 data;
-} vlib_node_interrupt_t;
-
-typedef struct
-{
   /* Public nodes. */
   vlib_node_t **nodes;
 
@@ -690,10 +680,8 @@
   vlib_node_runtime_t *nodes_by_type[VLIB_N_NODE_TYPE];
 
   /* Node runtime indices for input nodes with pending interrupts. */
-  vlib_node_interrupt_t *pending_local_interrupts;
-  vlib_node_interrupt_t *pending_remote_interrupts;
-  volatile u32 *pending_remote_interrupts_notify;
-  clib_spinlock_t pending_interrupt_lock;
+  void *interrupts;
+  volatile u32 *pending_interrupts;
 
   /* Input nodes are switched from/to interrupt to/from polling mode
      when average vector length goes above/below polling/interrupt
diff --git a/src/vlib/node_funcs.h b/src/vlib/node_funcs.h
index b33f496..a12aea4 100644
--- a/src/vlib/node_funcs.h
+++ b/src/vlib/node_funcs.h
@@ -47,6 +47,7 @@
 
 #include <vppinfra/fifo.h>
 #include <vppinfra/tw_timer_1t_3w_1024sl_ov.h>
+#include <vppinfra/interrupt.h>
 
 #ifdef CLIB_SANITIZE_ADDR
 #include <sanitizer/asan_interface.h>
@@ -224,37 +225,19 @@
 }
 
 always_inline void
-vlib_node_set_interrupt_pending_with_data (vlib_main_t * vm, u32 node_index,
-					   u32 data)
+vlib_node_set_interrupt_pending (vlib_main_t *vm, u32 node_index)
 {
   vlib_node_main_t *nm = &vm->node_main;
   vlib_node_t *n = vec_elt (nm->nodes, node_index);
-  vlib_node_interrupt_t *i;
+
   ASSERT (n->type == VLIB_NODE_TYPE_INPUT);
 
-  if (vm == vlib_get_main ())
-    {
-      /* local thread */
-      vec_add2 (nm->pending_local_interrupts, i, 1);
-      i->node_runtime_index = n->runtime_index;
-      i->data = data;
-    }
+  if (vm != vlib_get_main ())
+    clib_interrupt_set_atomic (nm->interrupts, n->runtime_index);
   else
-    {
-      /* remote thread */
-      clib_spinlock_lock (&nm->pending_interrupt_lock);
-      vec_add2 (nm->pending_remote_interrupts, i, 1);
-      i->node_runtime_index = n->runtime_index;
-      i->data = data;
-      *nm->pending_remote_interrupts_notify = 1;
-      clib_spinlock_unlock (&nm->pending_interrupt_lock);
-    }
-}
+    clib_interrupt_set (nm->interrupts, n->runtime_index);
 
-always_inline void
-vlib_node_set_interrupt_pending (vlib_main_t * vm, u32 node_index)
-{
-  vlib_node_set_interrupt_pending_with_data (vm, node_index, 0);
+  __atomic_store_n (nm->pending_interrupts, 1, __ATOMIC_RELEASE);
 }
 
 always_inline vlib_process_t *
diff --git a/src/vlib/threads.c b/src/vlib/threads.c
index 7efddff..ea63653 100644
--- a/src/vlib/threads.c
+++ b/src/vlib/threads.c
@@ -18,6 +18,7 @@
 #include <math.h>
 #include <vppinfra/format.h>
 #include <vppinfra/time_range.h>
+#include <vppinfra/interrupt.h>
 #include <vppinfra/linux/sysfs.h>
 #include <vlib/vlib.h>
 
@@ -863,6 +864,9 @@
 	      nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT] =
 		vec_dup_aligned (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT],
 				 CLIB_CACHE_LINE_BYTES);
+	      clib_interrupt_init (
+		&nm_clone->interrupts,
+		vec_len (nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT]));
 	      vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT])
 	      {
 		vlib_node_t *n = vlib_get_node (vm, rt->node_index);
@@ -1178,6 +1182,9 @@
   nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT] =
     vec_dup_aligned (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT],
 		     CLIB_CACHE_LINE_BYTES);
+  clib_interrupt_resize (
+    &nm_clone->interrupts,
+    vec_len (nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT]));
 
   vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT])
   {
diff --git a/src/vlib/unix/input.c b/src/vlib/unix/input.c
index 7531dd1..6398148 100644
--- a/src/vlib/unix/input.c
+++ b/src/vlib/unix/input.c
@@ -249,8 +249,8 @@
 
 		while (nanosleep (&ts, &tsrem) < 0)
 		  ts = tsrem;
-		if (*vlib_worker_threads->wait_at_barrier
-		    || *nm->pending_remote_interrupts_notify)
+		if (*vlib_worker_threads->wait_at_barrier ||
+		    *nm->pending_interrupts)
 		  goto done;
 	      }
 	  }