vhost: VPP stalls with vhost performing control plane actions

Symptom
-------
With NDR traffic blasting at VPP, bringing up a new VM with vhost
connection to VPP causes packet drops. I am able to recreate this
problem easily using a simple setup like this.

TREX-------------- switch ---- VPP
    |---------------|  |-------|

Cause
-----
The reason for the packet drops is due to vhost holding onto the worker
barrier lock for too long in vhost_user_socket_read(). There are quite a
few of system calls inside the routine. At the end of the routine, it
unconditionally calls vhost_user_update_iface_state() for all message
types. vhost_user_update_iface_state() also unconditionally calls
vhost_user_rx_thread_placement() and vhost_user_tx_thread_placement().
vhost_user_rx_thread_placement scraps out all existing cpu/queue mappings
for the interface and creates brand new cpu/queue mappings for the
interface. This process is very disruptive and very expensive. In my
opinion, this area of code needs a makeover.

Fixes
-----
* vhost_user_socket_read() is rewritten that it should not hold
  onto the worker barrier lock for system calls, or at least minimize the
  need for doing it.
* Remove the call to vhost_user_update_iface_state as a default route at
  the end of vhost_user_socket_read(). There is only a couple of message
  types which really need to call vhost_user_update_iface_state(). We put
  the call to those message types which need it.
* Remove vhost_user_rx_thread_placement() and
  vhost_user_tx_thread_placement from vhost_user_update_iface_state().
  There is no need to repetatively change the cpu/queue mappings.
* vhost_user_rx_thread_placement() is actually quite expensive. It should
  be called only once per queue for the interface. There is no need to
  scrap the existing cpu/queue mappings and create new cpu/queue mappings
  when the additional queues becomes active/enable.
* Change to create the cpu/queue mappings for the first RX when the
  interface is created. Dont remove the cpu/queue mapping when the
  interface is disconnected. Remove the cpu/queue mapping only when the
  interface is deleted.

The create vhost user interface CLI also has some very expensive system
calls if the command is entered with the optional keyword "server"

As a bonus, This patch makes the create vhost user interface binary-api and
CLI thread safe. Do the protection for the small amount of code which is
thread unsafe.

Change-Id: I4a19cbf7e9cc37ea01286169882e5603e6d7eb77
Signed-off-by: Steven Luong <sluong@cisco.com>
diff --git a/src/vnet/devices/virtio/vhost_user.c b/src/vnet/devices/virtio/vhost_user.c
index d13ea3b..1d52b42 100644
--- a/src/vnet/devices/virtio/vhost_user.c
+++ b/src/vnet/devices/virtio/vhost_user.c
@@ -116,12 +116,13 @@
     }
 }
 
-static void
+static_always_inline void
 vhost_user_tx_thread_placement (vhost_user_intf_t * vui)
 {
   //Let's try to assign one queue to each thread
-  u32 qid = 0;
+  u32 qid;
   u32 thread_index = 0;
+
   vui->use_tx_spinlock = 0;
   while (1)
     {
@@ -156,67 +157,27 @@
  * @brief Unassign existing interface/queue to thread mappings and re-assign
  * new interface/queue to thread mappings
  */
-static void
-vhost_user_rx_thread_placement ()
+static_always_inline void
+vhost_user_rx_thread_placement (vhost_user_intf_t * vui, u32 qid)
 {
-  vhost_user_main_t *vum = &vhost_user_main;
-  vhost_user_intf_t *vui;
-  vhost_user_vring_t *txvq;
+  vhost_user_vring_t *txvq = &vui->vrings[qid];
   vnet_main_t *vnm = vnet_get_main ();
-  u32 qid;
   int rv;
-  u16 *queue;
+  u32 q = qid >> 1;
 
-  // Scrap all existing mappings for all interfaces/queues
-  /* *INDENT-OFF* */
-  pool_foreach (vui, vum->vhost_user_interfaces, {
-      vec_foreach (queue, vui->rx_queues)
-	{
-	  rv = vnet_hw_interface_unassign_rx_thread (vnm, vui->hw_if_index,
-						     *queue);
-	  if (rv)
-	    vu_log_warn (vui, "unable to unassign interface %d, "
-			 "queue %d: rc=%d", vui->hw_if_index, *queue, rv);
-	}
-      vec_reset_length (vui->rx_queues);
-  });
-  /* *INDENT-ON* */
-
-  // Create the rx_queues for all interfaces
-  /* *INDENT-OFF* */
-  pool_foreach (vui, vum->vhost_user_interfaces, {
-      for (qid = 0; qid < VHOST_VRING_MAX_N / 2; qid++)
-	{
-	  txvq = &vui->vrings[VHOST_VRING_IDX_TX (qid)];
-	  if (txvq->started)
-	    {
-	      if (txvq->mode == VNET_HW_INTERFACE_RX_MODE_UNKNOWN)
-		/* Set polling as the default */
-		txvq->mode = VNET_HW_INTERFACE_RX_MODE_POLLING;
-	      vec_add1 (vui->rx_queues, qid);
-	    }
-	}
-  });
-  /* *INDENT-ON* */
-
-  // Assign new mappings for all interfaces/queues
-  /* *INDENT-OFF* */
-  pool_foreach (vui, vum->vhost_user_interfaces, {
-      vnet_hw_interface_set_input_node (vnm, vui->hw_if_index,
-					vhost_user_input_node.index);
-      vec_foreach (queue, vui->rx_queues)
-	{
-	  vnet_hw_interface_assign_rx_thread (vnm, vui->hw_if_index, *queue,
-					      ~0);
-	  txvq = &vui->vrings[VHOST_VRING_IDX_TX (*queue)];
-	  rv = vnet_hw_interface_set_rx_mode (vnm, vui->hw_if_index, *queue,
-					      txvq->mode);
-	  if (rv)
-	    vu_log_warn (vui, "unable to set rx mode for interface %d, "
-			 "queue %d: rc=%d", vui->hw_if_index, *queue, rv);
-	}
-  });
-  /* *INDENT-ON* */
+  ASSERT ((qid & 1) == 1);	// should be odd
+  // Assign new queue mappings for the interface
+  vnet_hw_interface_set_input_node (vnm, vui->hw_if_index,
+				    vhost_user_input_node.index);
+  vnet_hw_interface_assign_rx_thread (vnm, vui->hw_if_index, q, ~0);
+  if (txvq->mode == VNET_HW_INTERFACE_RX_MODE_UNKNOWN)
+    /* Set polling as the default */
+    txvq->mode = VNET_HW_INTERFACE_RX_MODE_POLLING;
+  txvq->qid = q;
+  rv = vnet_hw_interface_set_rx_mode (vnm, vui->hw_if_index, q, txvq->mode);
+  if (rv)
+    vu_log_warn (vui, "unable to set rx mode for interface %d, "
+		 "queue %d: rc=%d", vui->hw_if_index, q, rv);
 }
 
 /** @brief Returns whether at least one TX and one RX vring are enabled */
@@ -232,7 +193,7 @@
   return found[0] && found[1];
 }
 
-static void
+static_always_inline void
 vhost_user_update_iface_state (vhost_user_intf_t * vui)
 {
   /* if we have pointers to descriptor table, go up */
@@ -247,8 +208,6 @@
 				     : 0);
       vui->is_ready = is_ready;
     }
-  vhost_user_rx_thread_placement ();
-  vhost_user_tx_thread_placement (vui);
 }
 
 static void
@@ -278,6 +237,18 @@
   return 0;
 }
 
+static_always_inline void
+vhost_user_thread_placement (vhost_user_intf_t * vui, u32 qid)
+{
+  if (qid & 1)			// RX is odd, TX is even
+    {
+      if (vui->vrings[qid].qid == -1)
+	vhost_user_rx_thread_placement (vui, qid);
+    }
+  else
+    vhost_user_tx_thread_placement (vui);
+}
+
 static clib_error_t *
 vhost_user_kickfd_read_ready (clib_file_t * uf)
 {
@@ -293,10 +264,12 @@
   if (!vui->vrings[qid].started ||
       (vhost_user_intf_ready (vui) != vui->is_ready))
     {
-      vlib_worker_thread_barrier_sync (vlib_get_main ());
-      vui->vrings[qid].started = 1;
-      vhost_user_update_iface_state (vui);
-      vlib_worker_thread_barrier_release (vlib_get_main ());
+      if (vui->vrings[qid].started == 0)
+	{
+	  vui->vrings[qid].started = 1;
+	  vhost_user_thread_placement (vui, qid);
+	  vhost_user_update_iface_state (vui);
+	}
     }
 
   vhost_user_set_interrupt_pending (vui, uf->private_data);
@@ -311,6 +284,7 @@
   vring->kickfd_idx = ~0;
   vring->callfd_idx = ~0;
   vring->errfd = -1;
+  vring->qid = -1;
 
   /*
    * We have a bug with some qemu 2.5, and this may be a fix.
@@ -329,6 +303,7 @@
 vhost_user_vring_close (vhost_user_intf_t * vui, u32 qid)
 {
   vhost_user_vring_t *vring = &vui->vrings[qid];
+
   if (vring->kickfd_idx != ~0)
     {
       clib_file_t *uf = pool_elt_at_index (file_main.file_pool,
@@ -348,7 +323,12 @@
       close (vring->errfd);
       vring->errfd = -1;
     }
+
+  // save the qid so that we don't need to unassign and assign_rx_thread
+  // when the interface comes back up. They are expensive calls.
+  u16 q = vui->vrings[qid].qid;
   vhost_user_vring_init (vui, qid);
+  vui->vrings[qid].qid = q;
 }
 
 static_always_inline void
@@ -377,7 +357,7 @@
 static clib_error_t *
 vhost_user_socket_read (clib_file_t * uf)
 {
-  int n, i;
+  int n, i, j;
   int fd, number_of_fds = 0;
   int fds[VHOST_MEMORY_MAX_NREGIONS];
   vhost_user_msg_t msg;
@@ -389,6 +369,7 @@
   u8 q;
   clib_file_t template = { 0 };
   vnet_main_t *vnm = vnet_get_main ();
+  vlib_main_t *vm = vlib_get_main ();
 
   vui = pool_elt_at_index (vum->vhost_user_interfaces, uf->private_data);
 
@@ -411,9 +392,6 @@
 
   n = recvmsg (uf->file_descriptor, &mh, 0);
 
-  /* Stop workers to avoid end of the world */
-  vlib_worker_thread_barrier_sync (vlib_get_main ());
-
   if (n != VHOST_USER_MSG_HDR_SZ)
     {
       if (n == -1)
@@ -488,6 +466,13 @@
       msg.size = sizeof (msg.u64);
       vu_log_debug (vui, "if %d msg VHOST_USER_GET_FEATURES - reply "
 		    "0x%016llx", vui->hw_if_index, msg.u64);
+      n =
+	send (uf->file_descriptor, &msg, VHOST_USER_MSG_HDR_SZ + msg.size, 0);
+      if (n != (msg.size + VHOST_USER_MSG_HDR_SZ))
+	{
+	  vu_log_debug (vui, "could not send message response");
+	  goto close_socket;
+	}
       break;
 
     case VHOST_USER_SET_FEATURES:
@@ -509,10 +494,6 @@
       ASSERT (vui->virtio_net_hdr_sz < VLIB_BUFFER_PRE_DATA_SIZE);
       vnet_hw_interface_set_flags (vnm, vui->hw_if_index, 0);
       vui->is_ready = 0;
-
-      /*for (q = 0; q < VHOST_VRING_MAX_N; q++)
-         vhost_user_vring_close(&vui->vrings[q]); */
-
       break;
 
     case VHOST_USER_SET_MEM_TABLE:
@@ -522,10 +503,8 @@
       if ((msg.memory.nregions < 1) ||
 	  (msg.memory.nregions > VHOST_MEMORY_MAX_NREGIONS))
 	{
-
 	  vu_log_debug (vui, "number of mem regions must be between 1 and %i",
 			VHOST_MEMORY_MAX_NREGIONS);
-
 	  goto close_socket;
 	}
 
@@ -534,39 +513,50 @@
 	  vu_log_debug (vui, "each memory region must have FD");
 	  goto close_socket;
 	}
+
+      /* Do the mmap without barrier sync */
+      void *region_mmap_addr[VHOST_MEMORY_MAX_NREGIONS];
+      for (i = 0; i < msg.memory.nregions; i++)
+	{
+	  long page_sz = get_huge_page_size (fds[i]);
+
+	  /* align size to page */
+	  ssize_t map_sz = (msg.memory.regions[i].memory_size +
+			    msg.memory.regions[i].mmap_offset +
+			    page_sz - 1) & ~(page_sz - 1);
+
+	  region_mmap_addr[i] = mmap (0, map_sz, PROT_READ | PROT_WRITE,
+				      MAP_SHARED, fds[i], 0);
+	  if (region_mmap_addr[i] == MAP_FAILED)
+	    {
+	      vu_log_err (vui, "failed to map memory. errno is %d", errno);
+	      for (j = 0; j < i; j++)
+		munmap (region_mmap_addr[j], map_sz);
+	      goto close_socket;
+	    }
+	  vu_log_debug (vui, "map memory region %d addr 0 len 0x%lx fd %d "
+			"mapped 0x%lx page_sz 0x%x", i, map_sz, fds[i],
+			region_mmap_addr[i], page_sz);
+	}
+
+      vlib_worker_thread_barrier_sync (vm);
       unmap_all_mem_regions (vui);
       for (i = 0; i < msg.memory.nregions; i++)
 	{
 	  clib_memcpy_fast (&(vui->regions[i]), &msg.memory.regions[i],
 			    sizeof (vhost_user_memory_region_t));
 
-	  long page_sz = get_huge_page_size (fds[i]);
-
-	  /* align size to page */
-	  ssize_t map_sz = (vui->regions[i].memory_size +
-			    vui->regions[i].mmap_offset +
-			    page_sz - 1) & ~(page_sz - 1);
-
-	  vui->region_mmap_addr[i] = mmap (0, map_sz, PROT_READ | PROT_WRITE,
-					   MAP_SHARED, fds[i], 0);
+	  vui->region_mmap_addr[i] = region_mmap_addr[i];
 	  vui->region_guest_addr_lo[i] = vui->regions[i].guest_phys_addr;
 	  vui->region_guest_addr_hi[i] = vui->regions[i].guest_phys_addr +
 	    vui->regions[i].memory_size;
 
-	  vu_log_debug (vui, "map memory region %d addr 0 len 0x%lx fd %d "
-			"mapped 0x%lx page_sz 0x%x", i, map_sz, fds[i],
-			vui->region_mmap_addr[i], page_sz);
-
-	  if (vui->region_mmap_addr[i] == MAP_FAILED)
-	    {
-	      vu_log_err (vui, "failed to map memory. errno is %d", errno);
-	      goto close_socket;
-	    }
 	  vui->region_mmap_addr[i] += vui->regions[i].mmap_offset;
 	  vui->region_mmap_fd[i] = fds[i];
 
 	  vui->nregions++;
 	}
+      vlib_worker_thread_barrier_release (vm);
       break;
 
     case VHOST_USER_SET_VRING_NUM:
@@ -598,22 +588,22 @@
 	  goto close_socket;
 	}
 
-      vui->vrings[msg.state.index].desc = (vring_desc_t *)
-	map_user_mem (vui, msg.addr.desc_user_addr);
-      vui->vrings[msg.state.index].used = (vring_used_t *)
-	map_user_mem (vui, msg.addr.used_user_addr);
-      vui->vrings[msg.state.index].avail = (vring_avail_t *)
-	map_user_mem (vui, msg.addr.avail_user_addr);
+      vring_desc_t *desc = map_user_mem (vui, msg.addr.desc_user_addr);
+      vring_used_t *used = map_user_mem (vui, msg.addr.used_user_addr);
+      vring_avail_t *avail = map_user_mem (vui, msg.addr.avail_user_addr);
 
-      if ((vui->vrings[msg.state.index].desc == NULL) ||
-	  (vui->vrings[msg.state.index].used == NULL) ||
-	  (vui->vrings[msg.state.index].avail == NULL))
+      if ((desc == NULL) || (used == NULL) || (avail == NULL))
 	{
 	  vu_log_debug (vui, "failed to map user memory for hw_if_index %d",
 			vui->hw_if_index);
 	  goto close_socket;
 	}
 
+      vlib_worker_thread_barrier_sync (vm);
+      vui->vrings[msg.state.index].desc = desc;
+      vui->vrings[msg.state.index].used = used;
+      vui->vrings[msg.state.index].avail = avail;
+
       vui->vrings[msg.state.index].log_guest_addr = msg.addr.log_guest_addr;
       vui->vrings[msg.state.index].log_used =
 	(msg.addr.flags & (1 << VHOST_VRING_F_LOG)) ? 1 : 0;
@@ -621,9 +611,7 @@
       /* Spec says: If VHOST_USER_F_PROTOCOL_FEATURES has not been negotiated,
          the ring is initialized in an enabled state. */
       if (!(vui->features & (1 << FEAT_VHOST_USER_F_PROTOCOL_FEATURES)))
-	{
-	  vui->vrings[msg.state.index].enabled = 1;
-	}
+	vui->vrings[msg.state.index].enabled = 1;
 
       vui->vrings[msg.state.index].last_used_idx =
 	vui->vrings[msg.state.index].last_avail_idx =
@@ -631,6 +619,8 @@
 
       /* tell driver that we don't want interrupts */
       vui->vrings[msg.state.index].used->flags = VRING_USED_F_NO_NOTIFY;
+      vlib_worker_thread_barrier_release (vm);
+      vhost_user_update_iface_state (vui);
       break;
 
     case VHOST_USER_SET_OWNER:
@@ -709,8 +699,9 @@
 	  //When no kickfd is set, the queue is initialized as started
 	  vui->vrings[q].kickfd_idx = ~0;
 	  vui->vrings[q].started = 1;
+	  vhost_user_thread_placement (vui, q);
 	}
-
+      vhost_user_update_iface_state (vui);
       break;
 
     case VHOST_USER_SET_VRING_ERR:
@@ -731,14 +722,14 @@
 	}
       else
 	vui->vrings[q].errfd = -1;
-
       break;
 
     case VHOST_USER_SET_VRING_BASE:
       vu_log_debug (vui, "if %d msg VHOST_USER_SET_VRING_BASE idx %d num %d",
 		    vui->hw_if_index, msg.state.index, msg.state.num);
-
+      vlib_worker_thread_barrier_sync (vm);
       vui->vrings[msg.state.index].last_avail_idx = msg.state.num;
+      vlib_worker_thread_barrier_release (vm);
       break;
 
     case VHOST_USER_GET_VRING_BASE:
@@ -749,6 +740,8 @@
 	  goto close_socket;
 	}
 
+      /* protection is needed to prevent rx/tx from changing last_avail_idx */
+      vlib_worker_thread_barrier_sync (vm);
       /*
        * Copy last_avail_idx from the vring before closing it because
        * closing the vring also initializes the vring last_avail_idx
@@ -757,68 +750,84 @@
       msg.flags |= 4;
       msg.size = sizeof (msg.state);
 
-      /* Spec says: Client must [...] stop ring upon receiving VHOST_USER_GET_VRING_BASE. */
+      /*
+       * Spec says: Client must [...] stop ring upon receiving
+       * VHOST_USER_GET_VRING_BASE
+       */
       vhost_user_vring_close (vui, msg.state.index);
+      vlib_worker_thread_barrier_release (vm);
       vu_log_debug (vui, "if %d msg VHOST_USER_GET_VRING_BASE idx %d num %d",
 		    vui->hw_if_index, msg.state.index, msg.state.num);
+      n =
+	send (uf->file_descriptor, &msg, VHOST_USER_MSG_HDR_SZ + msg.size, 0);
+      if (n != (msg.size + VHOST_USER_MSG_HDR_SZ))
+	{
+	  vu_log_debug (vui, "could not send message response");
+	  goto close_socket;
+	}
+      vhost_user_update_iface_state (vui);
       break;
 
     case VHOST_USER_NONE:
       vu_log_debug (vui, "if %d msg VHOST_USER_NONE", vui->hw_if_index);
-
       break;
 
     case VHOST_USER_SET_LOG_BASE:
-      {
-	vu_log_debug (vui, "if %d msg VHOST_USER_SET_LOG_BASE",
-		      vui->hw_if_index);
+      vu_log_debug (vui, "if %d msg VHOST_USER_SET_LOG_BASE",
+		    vui->hw_if_index);
 
-	if (msg.size != sizeof (msg.log))
-	  {
-	    vu_log_debug (vui, "invalid msg size for VHOST_USER_SET_LOG_BASE:"
-			  " %d instead of %d", msg.size, sizeof (msg.log));
-	    goto close_socket;
-	  }
+      if (msg.size != sizeof (msg.log))
+	{
+	  vu_log_debug (vui, "invalid msg size for VHOST_USER_SET_LOG_BASE:"
+			" %d instead of %d", msg.size, sizeof (msg.log));
+	  goto close_socket;
+	}
 
-	if (!
-	    (vui->protocol_features & (1 << VHOST_USER_PROTOCOL_F_LOG_SHMFD)))
-	  {
-	    vu_log_debug (vui, "VHOST_USER_PROTOCOL_F_LOG_SHMFD not set but "
-			  "VHOST_USER_SET_LOG_BASE received");
-	    goto close_socket;
-	  }
+      if (!(vui->protocol_features & (1 << VHOST_USER_PROTOCOL_F_LOG_SHMFD)))
+	{
+	  vu_log_debug (vui, "VHOST_USER_PROTOCOL_F_LOG_SHMFD not set but "
+			"VHOST_USER_SET_LOG_BASE received");
+	  goto close_socket;
+	}
 
-	fd = fds[0];
-	/* align size to page */
-	long page_sz = get_huge_page_size (fd);
-	ssize_t map_sz =
-	  (msg.log.size + msg.log.offset + page_sz - 1) & ~(page_sz - 1);
+      fd = fds[0];
+      /* align size to page */
+      long page_sz = get_huge_page_size (fd);
+      ssize_t map_sz =
+	(msg.log.size + msg.log.offset + page_sz - 1) & ~(page_sz - 1);
 
-	vui->log_base_addr = mmap (0, map_sz, PROT_READ | PROT_WRITE,
-				   MAP_SHARED, fd, 0);
+      void *log_base_addr = mmap (0, map_sz, PROT_READ | PROT_WRITE,
+				  MAP_SHARED, fd, 0);
 
-	vu_log_debug (vui, "map log region addr 0 len 0x%lx off 0x%lx fd %d "
-		      "mapped 0x%lx", map_sz, msg.log.offset, fd,
-		      vui->log_base_addr);
+      vu_log_debug (vui, "map log region addr 0 len 0x%lx off 0x%lx fd %d "
+		    "mapped 0x%lx", map_sz, msg.log.offset, fd,
+		    log_base_addr);
 
-	if (vui->log_base_addr == MAP_FAILED)
-	  {
-	    vu_log_err (vui, "failed to map memory. errno is %d", errno);
-	    goto close_socket;
-	  }
+      if (log_base_addr == MAP_FAILED)
+	{
+	  vu_log_err (vui, "failed to map memory. errno is %d", errno);
+	  goto close_socket;
+	}
 
-	vui->log_base_addr += msg.log.offset;
-	vui->log_size = msg.log.size;
+      vlib_worker_thread_barrier_sync (vm);
+      vui->log_base_addr = log_base_addr;
+      vui->log_base_addr += msg.log.offset;
+      vui->log_size = msg.log.size;
+      vlib_worker_thread_barrier_release (vm);
 
-	msg.flags |= 4;
-	msg.size = sizeof (msg.u64);
-
-	break;
-      }
+      msg.flags |= 4;
+      msg.size = sizeof (msg.u64);
+      n =
+	send (uf->file_descriptor, &msg, VHOST_USER_MSG_HDR_SZ + msg.size, 0);
+      if (n != (msg.size + VHOST_USER_MSG_HDR_SZ))
+	{
+	  vu_log_debug (vui, "could not send message response");
+	  goto close_socket;
+	}
+      break;
 
     case VHOST_USER_SET_LOG_FD:
       vu_log_debug (vui, "if %d msg VHOST_USER_SET_LOG_FD", vui->hw_if_index);
-
       break;
 
     case VHOST_USER_GET_PROTOCOL_FEATURES:
@@ -828,14 +837,19 @@
       msg.size = sizeof (msg.u64);
       vu_log_debug (vui, "if %d msg VHOST_USER_GET_PROTOCOL_FEATURES - "
 		    "reply 0x%016llx", vui->hw_if_index, msg.u64);
+      n =
+	send (uf->file_descriptor, &msg, VHOST_USER_MSG_HDR_SZ + msg.size, 0);
+      if (n != (msg.size + VHOST_USER_MSG_HDR_SZ))
+	{
+	  vu_log_debug (vui, "could not send message response");
+	  goto close_socket;
+	}
       break;
 
     case VHOST_USER_SET_PROTOCOL_FEATURES:
       vu_log_debug (vui, "if %d msg VHOST_USER_SET_PROTOCOL_FEATURES "
 		    "features 0x%016llx", vui->hw_if_index, msg.u64);
-
       vui->protocol_features = msg.u64;
-
       break;
 
     case VHOST_USER_GET_QUEUE_NUM:
@@ -844,6 +858,13 @@
       msg.size = sizeof (msg.u64);
       vu_log_debug (vui, "if %d msg VHOST_USER_GET_QUEUE_NUM - reply %d",
 		    vui->hw_if_index, msg.u64);
+      n =
+	send (uf->file_descriptor, &msg, VHOST_USER_MSG_HDR_SZ + msg.size, 0);
+      if (n != (msg.size + VHOST_USER_MSG_HDR_SZ))
+	{
+	  vu_log_debug (vui, "could not send message response");
+	  goto close_socket;
+	}
       break;
 
     case VHOST_USER_SET_VRING_ENABLE:
@@ -858,6 +879,8 @@
 	}
 
       vui->vrings[msg.state.index].enabled = msg.state.num;
+      vhost_user_thread_placement (vui, msg.state.index);
+      vhost_user_update_iface_state (vui);
       break;
 
     default:
@@ -866,26 +889,13 @@
       goto close_socket;
     }
 
-  /* if we need to reply */
-  if (msg.flags & 4)
-    {
-      n =
-	send (uf->file_descriptor, &msg, VHOST_USER_MSG_HDR_SZ + msg.size, 0);
-      if (n != (msg.size + VHOST_USER_MSG_HDR_SZ))
-	{
-	  vu_log_debug (vui, "could not send message response");
-	  goto close_socket;
-	}
-    }
-
-  vhost_user_update_iface_state (vui);
-  vlib_worker_thread_barrier_release (vlib_get_main ());
   return 0;
 
 close_socket:
+  vlib_worker_thread_barrier_sync (vm);
   vhost_user_if_disconnect (vui);
+  vlib_worker_thread_barrier_release (vm);
   vhost_user_update_iface_state (vui);
-  vlib_worker_thread_barrier_release (vlib_get_main ());
   return 0;
 }
 
@@ -900,7 +910,6 @@
   vu_log_debug (vui, "socket error on if %d", vui->sw_if_index);
   vlib_worker_thread_barrier_sync (vm);
   vhost_user_if_disconnect (vui);
-  vhost_user_rx_thread_placement ();
   vlib_worker_thread_barrier_release (vm);
   return 0;
 }
@@ -984,7 +993,7 @@
   f64 timeout = 3153600000.0 /* 100 years */ ;
   uword event_type, *event_data = 0;
   vhost_user_main_t *vum = &vhost_user_main;
-  u16 *queue;
+  u16 qid;
   f64 now, poll_time_remaining;
   f64 next_timeout;
   u8 stop_timer = 0;
@@ -1022,13 +1031,13 @@
 	  /* *INDENT-OFF* */
 	  pool_foreach (vui, vum->vhost_user_interfaces, {
 	      next_timeout = timeout;
-	      vec_foreach (queue, vui->rx_queues)
+	      for (qid = 0; qid < VHOST_VRING_MAX_N / 2; qid += 2)
 		{
-		  vhost_user_vring_t *rxvq =
-		    &vui->vrings[VHOST_VRING_IDX_RX (*queue)];
-		  vhost_user_vring_t *txvq =
-		    &vui->vrings[VHOST_VRING_IDX_TX (*queue)];
+		  vhost_user_vring_t *rxvq = &vui->vrings[qid];
+		  vhost_user_vring_t *txvq = &vui->vrings[qid + 1];
 
+		  if (txvq->qid == -1)
+		    continue;
 		  if (txvq->n_since_last_int)
 		    {
 		      if (now >= txvq->int_deadline)
@@ -1196,6 +1205,24 @@
 
   for (q = 0; q < VHOST_VRING_MAX_N; q++)
     {
+      // Remove existing queue mapping for the interface
+      if (q & 1)
+	{
+	  int rv;
+	  vnet_main_t *vnm = vnet_get_main ();
+	  vhost_user_vring_t *txvq = &vui->vrings[q];
+
+	  if (txvq->qid != -1)
+	    {
+	      rv = vnet_hw_interface_unassign_rx_thread (vnm,
+							 vui->hw_if_index,
+							 q >> 1);
+	      if (rv)
+		vu_log_warn (vui, "unable to unassign interface %d, "
+			     "queue %d: rc=%d", vui->hw_if_index, q >> 1, rv);
+	    }
+	}
+
       clib_mem_free ((void *) vui->vring_locks[q]);
     }
 
@@ -1220,7 +1247,7 @@
   vhost_user_intf_t *vui;
   int rv = 0;
   vnet_hw_interface_t *hwif;
-  u16 *queue;
+  u16 qid;
 
   if (!(hwif = vnet_get_sup_hw_interface (vnm, sw_if_index)) ||
       hwif->dev_class_index != vhost_user_device_class.index)
@@ -1231,27 +1258,28 @@
   vu_log_debug (vui, "Deleting vhost-user interface %s (instance %d)",
 		hwif->name, hwif->dev_instance);
 
-  vec_foreach (queue, vui->rx_queues)
-  {
-    vhost_user_vring_t *txvq;
+  for (qid = 1; qid < VHOST_VRING_MAX_N / 2; qid += 2)
+    {
+      vhost_user_vring_t *txvq = &vui->vrings[qid];
 
-    txvq = &vui->vrings[VHOST_VRING_IDX_TX (*queue)];
-    if ((vum->ifq_count > 0) &&
-	((txvq->mode == VNET_HW_INTERFACE_RX_MODE_INTERRUPT) ||
-	 (txvq->mode == VNET_HW_INTERFACE_RX_MODE_ADAPTIVE)))
-      {
-	vum->ifq_count--;
-	// Stop the timer if there is no more interrupt interface/queue
-	if ((vum->ifq_count == 0) &&
-	    (vum->coalesce_time > 0.0) && (vum->coalesce_frames > 0))
-	  {
-	    vlib_process_signal_event (vm,
-				       vhost_user_send_interrupt_node.index,
-				       VHOST_USER_EVENT_STOP_TIMER, 0);
-	    break;
-	  }
-      }
-  }
+      if (txvq->qid == -1)
+	continue;
+      if ((vum->ifq_count > 0) &&
+	  ((txvq->mode == VNET_HW_INTERFACE_RX_MODE_INTERRUPT) ||
+	   (txvq->mode == VNET_HW_INTERFACE_RX_MODE_ADAPTIVE)))
+	{
+	  vum->ifq_count--;
+	  // Stop the timer if there is no more interrupt interface/queue
+	  if ((vum->ifq_count == 0) &&
+	      (vum->coalesce_time > 0.0) && (vum->coalesce_frames > 0))
+	    {
+	      vlib_process_signal_event (vm,
+					 vhost_user_send_interrupt_node.index,
+					 VHOST_USER_EVENT_STOP_TIMER, 0);
+	      break;
+	    }
+	}
+    }
 
   // Disable and reset interface
   vhost_user_term_if (vui);
@@ -1467,12 +1495,16 @@
 	}
     }
 
+  /* Protect the uninitialized vui from being dispatched by rx/tx */
+  vlib_worker_thread_barrier_sync (vm);
   pool_get (vhost_user_main.vhost_user_interfaces, vui);
-
   vhost_user_create_ethernet (vnm, vm, vui, hwaddr);
+  vlib_worker_thread_barrier_release (vm);
+
   vhost_user_vui_init (vnm, vui, server_sock_fd, sock_filename,
 		       feature_mask, &sw_if_idx);
   vnet_sw_interface_set_mtu (vnm, vui->sw_if_index, 9000);
+  vhost_user_rx_thread_placement (vui, 1);
 
   if (renumber)
     vnet_interface_name_renumber (sw_if_idx, custom_dev_instance);
@@ -1711,7 +1743,7 @@
   vhost_user_intf_t *vui;
   u32 hw_if_index, *hw_if_indices = 0;
   vnet_hw_interface_t *hi;
-  u16 *queue;
+  u16 qid;
   u32 ci;
   int i, j, q;
   int show_descr = 0;
@@ -1818,20 +1850,24 @@
 
       vlib_cli_output (vm, " rx placement: ");
 
-      vec_foreach (queue, vui->rx_queues)
-      {
-	vnet_main_t *vnm = vnet_get_main ();
-	uword thread_index;
-	vnet_hw_interface_rx_mode mode;
+      for (qid = 1; qid < VHOST_VRING_MAX_N / 2; qid += 2)
+	{
+	  vnet_main_t *vnm = vnet_get_main ();
+	  uword thread_index;
+	  vnet_hw_interface_rx_mode mode;
+	  vhost_user_vring_t *txvq = &vui->vrings[qid];
 
-	thread_index = vnet_get_device_input_thread_index (vnm,
-							   vui->hw_if_index,
-							   *queue);
-	vnet_hw_interface_get_rx_mode (vnm, vui->hw_if_index, *queue, &mode);
-	vlib_cli_output (vm, "   thread %d on vring %d, %U\n",
-			 thread_index, VHOST_VRING_IDX_TX (*queue),
-			 format_vnet_hw_interface_rx_mode, mode);
-      }
+	  if (txvq->qid == -1)
+	    continue;
+	  thread_index =
+	    vnet_get_device_input_thread_index (vnm, vui->hw_if_index,
+						qid >> 1);
+	  vnet_hw_interface_get_rx_mode (vnm, vui->hw_if_index, qid >> 1,
+					 &mode);
+	  vlib_cli_output (vm, "   thread %d on vring %d, %U\n",
+			   thread_index, qid,
+			   format_vnet_hw_interface_rx_mode, mode);
+	}
 
       vlib_cli_output (vm, " tx placement: %s\n",
 		       vui->use_tx_spinlock ? "spin-lock" : "lock-free");
@@ -1986,6 +2022,7 @@
     .short_help = "create vhost-user socket <socket-filename> [server] "
     "[feature-mask <hex>] [hwaddr <mac-addr>] [renumber <dev_instance>] ",
     .function = vhost_user_connect_command_fn,
+    .is_mp_safe = 1,
 };
 /* *INDENT-ON* */
 
diff --git a/src/vnet/devices/virtio/vhost_user.h b/src/vnet/devices/virtio/vhost_user.h
index f2ed2df..7dadfed 100644
--- a/src/vnet/devices/virtio/vhost_user.h
+++ b/src/vnet/devices/virtio/vhost_user.h
@@ -250,6 +250,14 @@
 
   /* The rx queue policy (interrupt/adaptive/polling) for this queue */
   u32 mode;
+
+  /*
+   * It contains the device queue number. -1 if it does not. The idea is
+   * to not invoke vnet_hw_interface_assign_rx_thread and
+   * vnet_hw_interface_unassign_rx_thread more than once for the duration of
+   * the interface even if it is disconnected and reconnected.
+   */
+  i16 qid;
 } vhost_user_vring_t;
 
 #define VHOST_USER_EVENT_START_TIMER 1
@@ -293,9 +301,6 @@
   /* Whether to use spinlock or per_cpu_tx_qid assignment */
   u8 use_tx_spinlock;
   u16 *per_cpu_tx_qid;
-
-  /* Vector of active rx queues for this interface */
-  u16 *rx_queues;
 } vhost_user_intf_t;
 
 typedef struct
diff --git a/src/vnet/devices/virtio/vhost_user_api.c b/src/vnet/devices/virtio/vhost_user_api.c
index b8d89a0..acfb3e0 100644
--- a/src/vnet/devices/virtio/vhost_user_api.c
+++ b/src/vnet/devices/virtio/vhost_user_api.c
@@ -244,6 +244,9 @@
   foreach_vpe_api_msg;
 #undef _
 
+  /* Mark CREATE_VHOST_USER_IF as mp safe */
+  am->is_mp_safe[VL_API_CREATE_VHOST_USER_IF] = 1;
+
   /*
    * Set up the (msg_name, crc, message-id) table
    */