session: linked list of events to be handled by main

Minimize amount of rpcs from first worker to main

Type: improvement

Signed-off-by: Florin Coras <fcoras@cisco.com>
Change-Id: I3794ff028a17d18b7bff69ede2b62e1e2d45ae77
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c
index 60991cd..5d07024 100644
--- a/src/vnet/session/session.c
+++ b/src/vnet/session/session.c
@@ -1896,6 +1896,8 @@
       wrk->new_head = clib_llist_make_head (wrk->event_elts, evt_list);
       wrk->old_head = clib_llist_make_head (wrk->event_elts, evt_list);
       wrk->pending_connects = clib_llist_make_head (wrk->event_elts, evt_list);
+      wrk->evts_pending_main =
+	clib_llist_make_head (wrk->event_elts, evt_list);
       wrk->vm = vlib_get_main_by_index (i);
       wrk->last_vlib_time = vlib_time_now (vm);
       wrk->last_vlib_us_time = wrk->last_vlib_time * CLIB_US_TIME_FREQ;
diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h
index 0ea621a..71f6d35 100644
--- a/src/vnet/session/session.h
+++ b/src/vnet/session/session.h
@@ -155,6 +155,9 @@
   /** Main thread loops in poll mode without a connect */
   u32 no_connect_loops;
 
+  /** List head for first worker evts pending handling on main */
+  clib_llist_index_t evts_pending_main;
+
 #if SESSION_DEBUG
   /** last event poll time by thread */
   clib_time_type_t last_event_poll;
@@ -786,6 +789,7 @@
 fifo_segment_t *session_main_get_wrk_mqs_segment (void);
 void session_node_enable_disable (u8 is_en);
 clib_error_t *vnet_session_enable_disable (vlib_main_t * vm, u8 is_en);
+void session_wrk_handle_evts_main_rpc ();
 
 session_t *session_alloc_for_connection (transport_connection_t * tc);
 session_t *session_alloc_for_half_open (transport_connection_t *tc);
diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c
index 2916c64..89c8ab0 100644
--- a/src/vnet/session/session_node.c
+++ b/src/vnet/session/session_node.c
@@ -26,12 +26,28 @@
 #include <svm/queue.h>
 #include <sys/timerfd.h>
 
-#define app_check_thread_and_barrier(_fn, _arg)				\
-  if (!vlib_thread_is_main_w_barrier ())				\
-    {									\
-     vlib_rpc_call_main_thread (_fn, (u8 *) _arg, sizeof(*_arg));	\
-      return;								\
-   }
+static inline void
+session_wrk_send_evt_to_main (session_worker_t *wrk, session_evt_elt_t *elt)
+{
+  session_evt_elt_t *he;
+  u32 thread_index;
+  u8 is_empty;
+
+  thread_index = wrk->vm->thread_index;
+  he = clib_llist_elt (wrk->event_elts, wrk->evts_pending_main);
+  is_empty = clib_llist_is_empty (wrk->event_elts, evt_list, he);
+  clib_llist_add_tail (wrk->event_elts, evt_list, elt, he);
+  if (is_empty)
+    vlib_rpc_call_main_thread (session_wrk_handle_evts_main_rpc,
+			       (u8 *) &thread_index, sizeof (thread_index));
+}
+
+#define app_check_thread_and_barrier(_wrk, _elt)                              \
+  if (!vlib_thread_is_main_w_barrier ())                                      \
+    {                                                                         \
+      session_wrk_send_evt_to_main (wrk, elt);                                \
+      return;                                                                 \
+    }
 
 static void
 session_wrk_timerfd_update (session_worker_t *wrk, u64 time_ns)
@@ -93,16 +109,17 @@
 }
 
 static void
-session_mq_listen_handler (void *data)
+session_mq_listen_handler (session_worker_t *wrk, session_evt_elt_t *elt)
 {
-  session_listen_msg_t *mp = (session_listen_msg_t *) data;
   vnet_listen_args_t _a, *a = &_a;
+  session_listen_msg_t *mp;
   app_worker_t *app_wrk;
   application_t *app;
   int rv;
 
-  app_check_thread_and_barrier (session_mq_listen_handler, mp);
+  app_check_thread_and_barrier (wrk, elt);
 
+  mp = session_evt_ctrl_data (wrk, elt);
   app = application_lookup (mp->client_index);
   if (!app)
     return;
@@ -132,16 +149,17 @@
 }
 
 static void
-session_mq_listen_uri_handler (void *data)
+session_mq_listen_uri_handler (session_worker_t *wrk, session_evt_elt_t *elt)
 {
-  session_listen_uri_msg_t *mp = (session_listen_uri_msg_t *) data;
   vnet_listen_args_t _a, *a = &_a;
+  session_listen_uri_msg_t *mp;
   app_worker_t *app_wrk;
   application_t *app;
   int rv;
 
-  app_check_thread_and_barrier (session_mq_listen_uri_handler, mp);
+  app_check_thread_and_barrier (wrk, elt);
 
+  mp = session_evt_ctrl_data (wrk, elt);
   app = application_lookup (mp->client_index);
   if (!app)
     return;
@@ -312,16 +330,17 @@
 }
 
 static void
-session_mq_connect_uri_handler (void *data)
+session_mq_connect_uri_handler (session_worker_t *wrk, session_evt_elt_t *elt)
 {
-  session_connect_uri_msg_t *mp = (session_connect_uri_msg_t *) data;
   vnet_connect_args_t _a, *a = &_a;
+  session_connect_uri_msg_t *mp;
   app_worker_t *app_wrk;
   application_t *app;
   int rv;
 
-  app_check_thread_and_barrier (session_mq_connect_uri_handler, mp);
+  app_check_thread_and_barrier (wrk, elt);
 
+  mp = session_evt_ctrl_data (wrk, elt);
   app = application_lookup (mp->client_index);
   if (!app)
     return;
@@ -371,14 +390,15 @@
 }
 
 static void
-app_mq_detach_handler (void *data)
+app_mq_detach_handler (session_worker_t *wrk, session_evt_elt_t *elt)
 {
-  session_app_detach_msg_t *mp = (session_app_detach_msg_t *) data;
   vnet_app_detach_args_t _a, *a = &_a;
+  session_app_detach_msg_t *mp;
   application_t *app;
 
-  app_check_thread_and_barrier (app_mq_detach_handler, mp);
+  app_check_thread_and_barrier (wrk, elt);
 
+  mp = session_evt_ctrl_data (wrk, elt);
   app = application_lookup (mp->client_index);
   if (!app)
     return;
@@ -389,18 +409,19 @@
 }
 
 static void
-session_mq_unlisten_rpc (session_unlisten_msg_t *mp)
+session_mq_unlisten_handler (session_worker_t *wrk, session_evt_elt_t *elt)
 {
-  vlib_main_t *vm = vlib_get_main ();
   vnet_unlisten_args_t _a, *a = &_a;
+  session_unlisten_msg_t *mp;
   app_worker_t *app_wrk;
   session_handle_t sh;
   application_t *app;
-  u32 context;
   int rv;
 
+  app_check_thread_and_barrier (wrk, elt);
+
+  mp = session_evt_ctrl_data (wrk, elt);
   sh = mp->handle;
-  context = mp->context;
 
   app = application_lookup (mp->client_index);
   if (!app)
@@ -411,56 +432,34 @@
   a->handle = sh;
   a->wrk_map_index = mp->wrk_index;
 
-  vlib_worker_thread_barrier_sync (vm);
-
   if ((rv = vnet_unlisten (a)))
     clib_warning ("unlisten returned: %d", rv);
 
-  vlib_worker_thread_barrier_release (vm);
-
   app_wrk = application_get_worker (app, a->wrk_map_index);
   if (!app_wrk)
     return;
 
-  mq_send_unlisten_reply (app_wrk, sh, context, rv);
-  clib_mem_free (mp);
+  mq_send_unlisten_reply (app_wrk, sh, mp->context, rv);
 }
 
 static void
-session_mq_unlisten_handler (session_worker_t *wrk, session_evt_elt_t *elt)
+session_mq_accepted_reply_handler (session_worker_t *wrk,
+				   session_evt_elt_t *elt)
 {
-  u32 thread_index = wrk - session_main.wrk;
-  session_unlisten_msg_t *mp, *arg;
-
-  mp = session_evt_ctrl_data (wrk, elt);
-  arg = clib_mem_alloc (sizeof (session_unlisten_msg_t));
-  clib_memcpy_fast (arg, mp, sizeof (*arg));
-
-  if (PREDICT_FALSE (!thread_index))
-    {
-      session_mq_unlisten_rpc (arg);
-      return;
-    }
-
-  session_send_rpc_evt_to_thread_force (0, session_mq_unlisten_rpc, arg);
-}
-
-static void
-session_mq_accepted_reply_handler (void *data)
-{
-  session_accepted_reply_msg_t *mp = (session_accepted_reply_msg_t *) data;
   vnet_disconnect_args_t _a = { 0 }, *a = &_a;
+  session_accepted_reply_msg_t *mp;
   session_state_t old_state;
   app_worker_t *app_wrk;
   session_t *s;
 
+  mp = session_evt_ctrl_data (wrk, elt);
+
   /* Mail this back from the main thread. We're not polling in main
    * thread so we're using other workers for notifications. */
   if (session_thread_from_handle (mp->handle) == 0 && vlib_num_workers () &&
       vlib_get_thread_index () != 0)
     {
-      vlib_rpc_call_main_thread (session_mq_accepted_reply_handler,
-				 (u8 *) mp, sizeof (*mp));
+      session_wrk_send_evt_to_main (wrk, elt);
       return;
     }
 
@@ -774,6 +773,52 @@
   svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
 }
 
+void
+session_wrk_handle_evts_main_rpc (void *args)
+{
+  session_evt_elt_t *he, *elt, *next;
+  session_worker_t *fwrk;
+  u32 thread_index;
+
+  ASSERT (vlib_thread_is_main_w_barrier ());
+  thread_index = *(u32 *) args;
+  fwrk = session_main_get_worker (thread_index);
+
+  he = clib_llist_elt (fwrk->event_elts, fwrk->evts_pending_main);
+  elt = clib_llist_next (fwrk->event_elts, evt_list, he);
+
+  while (elt != he)
+    {
+      next = clib_llist_next (fwrk->event_elts, evt_list, elt);
+      clib_llist_remove (fwrk->event_elts, evt_list, elt);
+      switch (elt->evt.event_type)
+	{
+	case SESSION_CTRL_EVT_LISTEN:
+	  session_mq_listen_handler (fwrk, elt);
+	  break;
+	case SESSION_CTRL_EVT_UNLISTEN:
+	  session_mq_unlisten_handler (fwrk, elt);
+	  break;
+	case SESSION_CTRL_EVT_APP_DETACH:
+	  app_mq_detach_handler (fwrk, elt);
+	  break;
+	case SESSION_CTRL_EVT_CONNECT_URI:
+	  session_mq_connect_uri_handler (fwrk, elt);
+	  break;
+	case SESSION_CTRL_EVT_ACCEPTED_REPLY:
+	  session_mq_accepted_reply_handler (fwrk, elt);
+	  break;
+	default:
+	  clib_warning ("unhandled %u", elt->evt.event_type);
+	  ALWAYS_ASSERT (0);
+	  break;
+	}
+      session_evt_ctrl_data_free (fwrk, elt);
+      clib_llist_put (fwrk->event_elts, elt);
+      elt = next;
+    }
+}
+
 vlib_node_registration_t session_queue_node;
 
 typedef struct
@@ -1514,10 +1559,10 @@
       session_transport_reset (s);
       break;
     case SESSION_CTRL_EVT_LISTEN:
-      session_mq_listen_handler (session_evt_ctrl_data (wrk, elt));
+      session_mq_listen_handler (wrk, elt);
       break;
     case SESSION_CTRL_EVT_LISTEN_URI:
-      session_mq_listen_uri_handler (session_evt_ctrl_data (wrk, elt));
+      session_mq_listen_uri_handler (wrk, elt);
       break;
     case SESSION_CTRL_EVT_UNLISTEN:
       session_mq_unlisten_handler (wrk, elt);
@@ -1526,7 +1571,7 @@
       session_mq_connect_handler (wrk, elt);
       break;
     case SESSION_CTRL_EVT_CONNECT_URI:
-      session_mq_connect_uri_handler (session_evt_ctrl_data (wrk, elt));
+      session_mq_connect_uri_handler (wrk, elt);
       break;
     case SESSION_CTRL_EVT_SHUTDOWN:
       session_mq_shutdown_handler (session_evt_ctrl_data (wrk, elt));
@@ -1538,7 +1583,7 @@
       session_mq_disconnected_handler (session_evt_ctrl_data (wrk, elt));
       break;
     case SESSION_CTRL_EVT_ACCEPTED_REPLY:
-      session_mq_accepted_reply_handler (session_evt_ctrl_data (wrk, elt));
+      session_mq_accepted_reply_handler (wrk, elt);
       break;
     case SESSION_CTRL_EVT_DISCONNECTED_REPLY:
       session_mq_disconnected_reply_handler (session_evt_ctrl_data (wrk,
@@ -1551,7 +1596,7 @@
       session_mq_worker_update_handler (session_evt_ctrl_data (wrk, elt));
       break;
     case SESSION_CTRL_EVT_APP_DETACH:
-      app_mq_detach_handler (session_evt_ctrl_data (wrk, elt));
+      app_mq_detach_handler (wrk, elt);
       break;
     case SESSION_CTRL_EVT_APP_WRK_RPC:
       session_mq_app_wrk_rpc_handler (session_evt_ctrl_data (wrk, elt));