session: move connects to first worker

Type: improvement

Signed-off-by: Florin Coras <fcoras@cisco.com>
Change-Id: I035e3fdbb52eca010ad7b2c20ca2930cb1645978
diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c
index 8f6503d..be00925 100644
--- a/src/vnet/session/session_node.c
+++ b/src/vnet/session/session_node.c
@@ -224,23 +224,20 @@
 session_mq_handle_connects_rpc (void *arg)
 {
   u32 max_connects = 32, n_connects = 0;
-  vlib_main_t *vm = vlib_get_main ();
   session_evt_elt_t *he, *elt, *next;
-  session_worker_t *fwrk, *wrk;
+  session_worker_t *fwrk;
 
-  ASSERT (vlib_get_thread_index () == 0);
+  ASSERT (session_vlib_thread_is_cl_thread ());
 
   /* Pending connects on linked list pertaining to first worker */
-  fwrk = session_main_get_worker (1);
+  fwrk = session_main_get_worker (transport_cl_thread ());
   if (!fwrk->n_pending_connects)
-    goto update_state;
-
-  vlib_worker_thread_barrier_sync (vm);
+    return;
 
   he = clib_llist_elt (fwrk->event_elts, fwrk->pending_connects);
   elt = clib_llist_next (fwrk->event_elts, evt_list, he);
 
-  /* Avoid holding the barrier for too long */
+  /* Avoid holding the worker for too long */
   while (n_connects < max_connects && elt != he)
     {
       next = clib_llist_next (fwrk->event_elts, evt_list, elt);
@@ -254,45 +251,10 @@
 
   /* Decrement with worker barrier */
   fwrk->n_pending_connects -= n_connects;
-
-  vlib_worker_thread_barrier_release (vm);
-
-update_state:
-
-  /* Switch worker to poll mode if it was in interrupt mode and had work or
-   * back to interrupt if threshold of loops without a connect is passed.
-   * While in poll mode, reprogram connects rpc */
-  wrk = session_main_get_worker (0);
-  if (wrk->state != SESSION_WRK_POLLING)
+  if (fwrk->n_pending_connects > 0)
     {
-      if (n_connects)
-	{
-	  session_wrk_set_state (wrk, SESSION_WRK_POLLING);
-	  vlib_node_set_state (vm, session_queue_node.index,
-			       VLIB_NODE_STATE_POLLING);
-	  wrk->no_connect_loops = 0;
-	}
-    }
-  else
-    {
-      if (!n_connects)
-	{
-	  if (++wrk->no_connect_loops > 1e5)
-	    {
-	      session_wrk_set_state (wrk, SESSION_WRK_INTERRUPT);
-	      vlib_node_set_state (vm, session_queue_node.index,
-				   VLIB_NODE_STATE_INTERRUPT);
-	    }
-	}
-      else
-	wrk->no_connect_loops = 0;
-    }
-
-  if (wrk->state == SESSION_WRK_POLLING)
-    {
-      elt = session_evt_alloc_ctrl (wrk);
-      elt->evt.event_type = SESSION_CTRL_EVT_RPC;
-      elt->evt.rpc_args.fp = session_mq_handle_connects_rpc;
+      session_send_rpc_evt_to_thread_force (fwrk->vm->thread_index,
+					    session_mq_handle_connects_rpc, 0);
     }
 }
 
@@ -302,20 +264,28 @@
   u32 thread_index = wrk - session_main.wrk;
   session_evt_elt_t *he;
 
-  /* No workers, so just deal with the connect now */
-  if (PREDICT_FALSE (!thread_index))
-    {
-      session_mq_connect_one (session_evt_ctrl_data (wrk, elt));
-      return;
-    }
-
-  if (PREDICT_FALSE (thread_index != 1))
+  if (PREDICT_FALSE (thread_index > transport_cl_thread ()))
     {
       clib_warning ("Connect on wrong thread. Dropping");
       return;
     }
 
-  /* Add to pending list to be handled by main thread */
+  /* If on worker, check if main has any pending messages. Avoids reordering
+   * with other control messages that need to be handled by main
+   */
+  if (thread_index)
+    {
+      he = clib_llist_elt (wrk->event_elts, wrk->evts_pending_main);
+
+      /* Events pending on main, postpone to avoid reordering */
+      if (!clib_llist_is_empty (wrk->event_elts, evt_list, he))
+	{
+	  clib_llist_add_tail (wrk->event_elts, evt_list, elt, he);
+	  return;
+	}
+    }
+
+  /* Add to pending list to be handled by first worker */
   he = clib_llist_elt (wrk->event_elts, wrk->pending_connects);
   clib_llist_add_tail (wrk->event_elts, evt_list, elt, he);
 
@@ -323,9 +293,8 @@
   wrk->n_pending_connects += 1;
   if (wrk->n_pending_connects == 1)
     {
-      vlib_node_set_interrupt_pending (vlib_get_main_by_index (0),
-				       session_queue_node.index);
-      session_send_rpc_evt_to_thread (0, session_mq_handle_connects_rpc, 0);
+      session_send_rpc_evt_to_thread_force (thread_index,
+					    session_mq_handle_connects_rpc, 0);
     }
 }
 
@@ -812,6 +781,9 @@
 	case SESSION_CTRL_EVT_ACCEPTED_REPLY:
 	  session_mq_accepted_reply_handler (fwrk, elt);
 	  break;
+	case SESSION_CTRL_EVT_CONNECT:
+	  session_mq_connect_handler (fwrk, elt);
+	  break;
 	default:
 	  clib_warning ("unhandled %u", elt->evt.event_type);
 	  ALWAYS_ASSERT (0);
@@ -820,8 +792,11 @@
 
       /* Regrab element in case pool moved */
       elt = clib_llist_elt (fwrk->event_elts, ei);
-      session_evt_ctrl_data_free (fwrk, elt);
-      clib_llist_put (fwrk->event_elts, elt);
+      if (!clib_llist_elt_is_linked (elt, evt_list))
+	{
+	  session_evt_ctrl_data_free (fwrk, elt);
+	  clib_llist_put (fwrk->event_elts, elt);
+	}
       ei = next_ei;
     }