vlib: epoll on worker threads

This patch teaches worer threads to sleep and to be waken up by
kernel if there is activity on file desctiptors assigned to that thread.

It also adds counters to epoll file descriptors and new
debug cli 'show unix file'.

Change-Id: Iaf67869f4aa88ff5b0a08982e1c08474013107c4
Signed-off-by: Damjan Marion <damarion@cisco.com>
diff --git a/src/vlib/unix/input.c b/src/vlib/unix/input.c
index ecf659b..0c29844 100644
--- a/src/vlib/unix/input.c
+++ b/src/vlib/unix/input.c
@@ -40,6 +40,7 @@
 #include <vlib/vlib.h>
 #include <vlib/unix/unix.h>
 #include <signal.h>
+#include <unistd.h>
 #include <vppinfra/tw_timer_1t_3w_1024sl_ov.h>
 
 /* FIXME autoconf */
@@ -53,23 +54,23 @@
 {
   int epoll_fd;
   struct epoll_event *epoll_events;
+  int n_epoll_fds;
 
   /* Statistics. */
   u64 epoll_files_ready;
   u64 epoll_waits;
 } linux_epoll_main_t;
 
-static linux_epoll_main_t linux_epoll_main;
+static linux_epoll_main_t *linux_epoll_mains = 0;
 
 static void
 linux_epoll_file_update (clib_file_t * f, clib_file_update_type_t update_type)
 {
   clib_file_main_t *fm = &file_main;
-  linux_epoll_main_t *em = &linux_epoll_main;
-  struct epoll_event e;
-  int op;
-
-  memset (&e, 0, sizeof (e));
+  linux_epoll_main_t *em = vec_elt_at_index (linux_epoll_mains,
+					     f->polling_thread_index);
+  struct epoll_event e = { 0 };
+  int op, add_del = 0;
 
   e.events = EPOLLIN;
   if (f->flags & UNIX_FILE_DATA_AVAILABLE_TO_WRITE)
@@ -84,6 +85,7 @@
     {
     case UNIX_FILE_UPDATE_ADD:
       op = EPOLL_CTL_ADD;
+      add_del = 1;
       break;
 
     case UNIX_FILE_UPDATE_MODIFY:
@@ -92,6 +94,7 @@
 
     case UNIX_FILE_UPDATE_DELETE:
       op = EPOLL_CTL_DEL;
+      add_del = -1;
       break;
 
     default:
@@ -99,19 +102,43 @@
       return;
     }
 
+  /* worker threads open epoll fd only if needed */
+  if (update_type == UNIX_FILE_UPDATE_ADD && em->epoll_fd == -1)
+    {
+      em->epoll_fd = epoll_create (1);
+      if (em->epoll_fd < 0)
+	{
+	  clib_unix_warning ("epoll_create");
+	  return;
+	}
+      em->n_epoll_fds = 0;
+    }
+
   if (epoll_ctl (em->epoll_fd, op, f->file_descriptor, &e) < 0)
-    clib_unix_warning ("epoll_ctl");
+    {
+      clib_unix_warning ("epoll_ctl");
+      return;
+    }
+
+  em->n_epoll_fds += add_del;
+
+  if (em->n_epoll_fds == 0)
+    {
+      close (em->epoll_fd);
+      em->epoll_fd = -1;
+    }
 }
 
-static uword
-linux_epoll_input (vlib_main_t * vm,
-		   vlib_node_runtime_t * node, vlib_frame_t * frame)
+static_always_inline uword
+linux_epoll_input_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
+			  vlib_frame_t * frame, u32 thread_index)
 {
   unix_main_t *um = &unix_main;
   clib_file_main_t *fm = &file_main;
-  linux_epoll_main_t *em = &linux_epoll_main;
+  linux_epoll_main_t *em = vec_elt_at_index (linux_epoll_mains, thread_index);
   struct epoll_event *e;
   int n_fds_ready;
+  int is_main = (thread_index == 0);
 
   {
     vlib_node_main_t *nm = &vm->node_main;
@@ -121,7 +148,7 @@
     f64 vector_rate = vlib_last_vectors_per_main_loop (vm);
 
     /* If we're not working very hard, decide how long to sleep */
-    if (vector_rate < 2 && vm->api_queue_nonempty == 0
+    if (is_main && vector_rate < 2 && vm->api_queue_nonempty == 0
 	&& nm->input_node_counts_by_state[VLIB_NODE_STATE_POLLING] == 0)
       {
 	ticks_until_expiration = TW (tw_timer_first_expires_in_ticks)
@@ -148,6 +175,13 @@
 	  }
 	node->input_main_loops_per_call = 0;
       }
+    else if (is_main == 0 && vector_rate < 2 &&
+	     nm->input_node_counts_by_state[VLIB_NODE_STATE_POLLING] == 0)
+      {
+	timeout = 10e-3;
+	timeout_ms = max_timeout_ms;
+	node->input_main_loops_per_call = 0;
+      }
     else			/* busy */
       {
 	/* Don't come back for a respectable number of dispatch cycles */
@@ -155,21 +189,28 @@
       }
 
     /* Allow any signal to wakeup our sleep. */
-    {
-      static sigset_t unblock_all_signals;
-      n_fds_ready = epoll_pwait (em->epoll_fd,
-				 em->epoll_events,
-				 vec_len (em->epoll_events),
-				 timeout_ms, &unblock_all_signals);
+    if (is_main || em->epoll_fd != -1)
+      {
+	static sigset_t unblock_all_signals;
+	n_fds_ready = epoll_pwait (em->epoll_fd,
+				   em->epoll_events,
+				   vec_len (em->epoll_events),
+				   timeout_ms, &unblock_all_signals);
 
-      /* This kludge is necessary to run over absurdly old kernels */
-      if (n_fds_ready < 0 && errno == ENOSYS)
-	{
-	  n_fds_ready = epoll_wait (em->epoll_fd,
-				    em->epoll_events,
-				    vec_len (em->epoll_events), timeout_ms);
-	}
-    }
+	/* This kludge is necessary to run over absurdly old kernels */
+	if (n_fds_ready < 0 && errno == ENOSYS)
+	  {
+	    n_fds_ready = epoll_wait (em->epoll_fd,
+				      em->epoll_events,
+				      vec_len (em->epoll_events), timeout_ms);
+	  }
+      }
+    else
+      {
+	if (timeout_ms)
+	  usleep (timeout_ms * 1000);
+	return 0;
+      }
   }
 
   if (n_fds_ready < 0)
@@ -196,11 +237,13 @@
 	  if (e->events & EPOLLIN)
 	    {
 	      errors[n_errors] = f->read_function (f);
+	      f->read_events++;
 	      n_errors += errors[n_errors] != 0;
 	    }
 	  if (e->events & EPOLLOUT)
 	    {
 	      errors[n_errors] = f->write_function (f);
+	      f->write_events++;
 	      n_errors += errors[n_errors] != 0;
 	    }
 	}
@@ -209,6 +252,7 @@
 	  if (f->error_function)
 	    {
 	      errors[n_errors] = f->error_function (f);
+	      f->error_events++;
 	      n_errors += errors[n_errors] != 0;
 	    }
 	  else
@@ -225,6 +269,18 @@
   return 0;
 }
 
+static uword
+linux_epoll_input (vlib_main_t * vm,
+		   vlib_node_runtime_t * node, vlib_frame_t * frame)
+{
+  u32 thread_index = vlib_get_thread_index ();
+
+  if (thread_index == 0)
+    return linux_epoll_input_inline (vm, node, frame, 0);
+  else
+    return linux_epoll_input_inline (vm, node, frame, thread_index);
+}
+
 /* *INDENT-OFF* */
 VLIB_REGISTER_NODE (linux_epoll_input_node,static) = {
   .function = linux_epoll_input,
@@ -236,15 +292,28 @@
 clib_error_t *
 linux_epoll_input_init (vlib_main_t * vm)
 {
-  linux_epoll_main_t *em = &linux_epoll_main;
+  linux_epoll_main_t *em;
   clib_file_main_t *fm = &file_main;
+  vlib_thread_main_t *tm = vlib_get_thread_main ();
 
-  /* Allocate some events. */
-  vec_resize (em->epoll_events, VLIB_FRAME_SIZE);
 
-  em->epoll_fd = epoll_create (vec_len (em->epoll_events));
-  if (em->epoll_fd < 0)
-    return clib_error_return_unix (0, "epoll_create");
+  vec_validate_aligned (linux_epoll_mains, tm->n_vlib_mains,
+			CLIB_CACHE_LINE_BYTES);
+
+  vec_foreach (em, linux_epoll_mains)
+  {
+    /* Allocate some events. */
+    vec_resize (em->epoll_events, VLIB_FRAME_SIZE);
+
+    if (linux_epoll_mains == em)
+      {
+	em->epoll_fd = epoll_create (1);
+	if (em->epoll_fd < 0)
+	  return clib_error_return_unix (0, "epoll_create");
+      }
+    else
+      em->epoll_fd = -1;
+  }
 
   fm->file_update = linux_epoll_file_update;