Keep RPC traffic off the shared-memory API queue

Change-Id: Ib5c346641463768cf33eaf8cb5fab5b63171398d
Signed-off-by: Dave Barach <dave@barachs.net>
diff --git a/src/vlib/main.h b/src/vlib/main.h
index 7c34fb6..64f2859 100644
--- a/src/vlib/main.h
+++ b/src/vlib/main.h
@@ -209,6 +209,7 @@
 
   /* Vector of pending RPC requests */
   uword *pending_rpc_requests;
+  clib_spinlock_t pending_rpc_lock;
 
 } vlib_main_t;
 
diff --git a/src/vlib/threads.c b/src/vlib/threads.c
index c99458d..7ecfa30 100644
--- a/src/vlib/threads.c
+++ b/src/vlib/threads.c
@@ -699,6 +699,9 @@
       vlib_worker_threads->node_reforks_required =
 	clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES);
 
+      /* We'll need the rpc vector lock... */
+      clib_spinlock_init (&vm->pending_rpc_lock);
+
       /* Ask for an initial barrier sync */
       *vlib_worker_threads->workers_at_barrier = 0;
       *vlib_worker_threads->wait_at_barrier = 1;
diff --git a/src/vlibmemory/memory_api.c b/src/vlibmemory/memory_api.c
index a444ec7..aa0e25b 100644
--- a/src/vlibmemory/memory_api.c
+++ b/src/vlibmemory/memory_api.c
@@ -704,6 +704,26 @@
 }
 
 int
+vl_mem_api_handle_rpc (vlib_main_t * vm, vlib_node_runtime_t * node)
+{
+  api_main_t *am = &api_main;
+  int i;
+  uword *rpc_requests, mp;
+
+  clib_spinlock_lock_if_init (&vm->pending_rpc_lock);
+  rpc_requests = vm->pending_rpc_requests;
+
+  for (i = 0; i < vec_len (rpc_requests); i++)
+    {
+      mp = rpc_requests[i];
+      vl_msg_api_handler_with_vm_node (am, (void *) mp, vm, node);
+    }
+  vec_reset_length (vm->pending_rpc_requests);
+  clib_spinlock_unlock_if_init (&vm->pending_rpc_lock);
+  return 0;
+}
+
+int
 vl_mem_api_handle_msg_private (vlib_main_t * vm, vlib_node_runtime_t * node,
 			       u32 reg_index)
 {
diff --git a/src/vlibmemory/memory_api.h b/src/vlibmemory/memory_api.h
index 4cda04b..f658006 100644
--- a/src/vlibmemory/memory_api.h
+++ b/src/vlibmemory/memory_api.h
@@ -32,6 +32,8 @@
 int vl_mem_api_handle_msg_main (vlib_main_t * vm, vlib_node_runtime_t * node);
 int vl_mem_api_handle_msg_private (vlib_main_t * vm,
 				   vlib_node_runtime_t * node, u32 reg_index);
+int vl_mem_api_handle_rpc (vlib_main_t * vm, vlib_node_runtime_t * node);
+
 vl_api_registration_t *vl_mem_api_client_index_to_registration (u32 handle);
 void vl_mem_api_enable_disable (vlib_main_t * vm, int yesno);
 u32 vl_api_memclnt_create_internal (char *, svm_queue_t *);
diff --git a/src/vlibmemory/vlib_api.c b/src/vlibmemory/vlib_api.c
index b72f133..21f112b 100644
--- a/src/vlibmemory/vlib_api.c
+++ b/src/vlibmemory/vlib_api.c
@@ -346,7 +346,8 @@
       start_time = vlib_time_now (vm);
       while (1)
 	{
-	  if (vl_mem_api_handle_msg_main (vm, node))
+	  if (vl_mem_api_handle_rpc (vm, node)
+	      || vl_mem_api_handle_msg_main (vm, node))
 	    {
 	      vm->api_queue_nonempty = 0;
 	      VL_MEM_API_LOG_Q_LEN ("q-underflow: len %d", 0);
@@ -564,36 +565,16 @@
 void
 vl_api_send_pending_rpc_requests (vlib_main_t * vm)
 {
-  api_main_t *am = &api_main;
-  vl_shmem_hdr_t *shmem_hdr = am->shmem_hdr;
-  svm_queue_t *q;
-  int i;
+  vlib_main_t *vm_global = &vlib_global_main;
 
-  /*
-   * Use the "normal" control-plane mechanism for the main thread.
-   * Well, almost. if the main input queue is full, we cannot
-   * block. Otherwise, we can expect a barrier sync timeout.
-   */
-  q = shmem_hdr->vl_input_queue;
+  /* Our own RPCs are already pending */
+  if (vm == vm_global)
+    return;
 
-  for (i = 0; i < vec_len (vm->pending_rpc_requests); i++)
-    {
-      while (pthread_mutex_trylock (&q->mutex))
-	vlib_worker_thread_barrier_check ();
-
-      while (PREDICT_FALSE (svm_queue_is_full (q)))
-	{
-	  pthread_mutex_unlock (&q->mutex);
-	  vlib_worker_thread_barrier_check ();
-	  while (pthread_mutex_trylock (&q->mutex))
-	    vlib_worker_thread_barrier_check ();
-	}
-
-      vl_msg_api_send_shmem_nolock (q, (u8 *) (vm->pending_rpc_requests + i));
-
-      pthread_mutex_unlock (&q->mutex);
-    }
-  _vec_len (vm->pending_rpc_requests) = 0;
+  clib_spinlock_lock_if_init (&vm_global->pending_rpc_lock);
+  vec_append (vm_global->pending_rpc_requests, vm->pending_rpc_requests);
+  vec_reset_length (vm->pending_rpc_requests);
+  clib_spinlock_unlock_if_init (&vm_global->pending_rpc_lock);
 }
 
 always_inline void
@@ -601,6 +582,7 @@
 				    u8 force_rpc)
 {
   vl_api_rpc_call_t *mp;
+  vlib_main_t *vm_global = &vlib_global_main;
   vlib_main_t *vm = vlib_get_main ();
 
   /* Main thread and not a forced RPC: call the function directly */
@@ -626,7 +608,12 @@
   mp->function = pointer_to_uword (fp);
   mp->need_barrier_sync = 1;
 
+  /* Add to the pending vector. Thread 0 requires locking. */
+  if (vm == vm_global)
+    clib_spinlock_lock_if_init (&vm_global->pending_rpc_lock);
   vec_add1 (vm->pending_rpc_requests, (uword) mp);
+  if (vm == vm_global)
+    clib_spinlock_unlock_if_init (&vm_global->pending_rpc_lock);
 }
 
 /*