memif: multi-queues support

- Add rx-queues and tx-queues option to the create memif CLI
- Add vlib_worker_thread_barrier_sync () to memif_conn_fd_read_ready () as
the latter function may disconnect the ring and clean up the shared memory.
- On transmit, write the rid (queue number) to the socket.
- On receive, read the rid and trigger the interrupt for the corresponding
thread.

Change-Id: If1c7e26c7124174678f047909cbc33e931eaac8c
Signed-off-by: Steven <sluong@cisco.com>
diff --git a/src/plugins/memif/cli.c b/src/plugins/memif/cli.c
index ef73693..88c09e9 100644
--- a/src/plugins/memif/cli.c
+++ b/src/plugins/memif/cli.c
@@ -25,6 +25,20 @@
 
 #include <memif/memif.h>
 
+static uword
+unformat_memif_queues (unformat_input_t * input, va_list * args)
+{
+  u32 *rx_queues = va_arg (*args, u32 *);
+  u32 *tx_queues = va_arg (*args, u32 *);
+
+  if (unformat (input, "rx-queues %u", rx_queues))
+    ;
+  if (unformat (input, "tx-queues %u", tx_queues))
+    ;
+
+  return 1;
+}
+
 static clib_error_t *
 memif_create_command_fn (vlib_main_t * vm, unformat_input_t * input,
 			 vlib_cli_command_t * cmd)
@@ -34,6 +48,8 @@
   u32 ring_size = MEMIF_DEFAULT_RING_SIZE;
   memif_create_if_args_t args = { 0 };
   args.buffer_size = MEMIF_DEFAULT_BUFFER_SIZE;
+  u32 rx_queues = MEMIF_DEFAULT_RX_QUEUES;
+  u32 tx_queues = MEMIF_DEFAULT_TX_QUEUES;
 
   /* Get a line of input. */
   if (!unformat_user (input, unformat_line_input, line_input))
@@ -51,7 +67,8 @@
 	;
       else if (unformat (line_input, "master"))
 	args.is_master = 1;
-      else if (unformat (line_input, "slave"))
+      else if (unformat (line_input, "slave %U",
+			 unformat_memif_queues, &rx_queues, &tx_queues))
 	args.is_master = 0;
       else if (unformat (line_input, "hw-addr %U",
 			 unformat_ethernet_address, args.hw_addr))
@@ -67,6 +84,14 @@
 
   args.log2_ring_size = min_log2 (ring_size);
 
+  if (rx_queues > 255 || rx_queues < 1)
+    return clib_error_return (0, "rx queue must be between 1 - 255");
+  if (tx_queues > 255 || tx_queues < 1)
+    return clib_error_return (0, "tx queue must be between 1 - 255");
+
+  args.rx_queues = rx_queues;
+  args.tx_queues = tx_queues;
+
   r = memif_create_if (vm, &args);
 
   if (r <= VNET_API_ERROR_SYSCALL_ERROR_1
@@ -87,7 +112,7 @@
   .path = "create memif",
   .short_help = "create memif [key <key>] [socket <path>] "
                 "[ring-size <size>] [buffer-size <size>] [hw-addr <mac-address>] "
-		"<master|slave>",
+		"<master|slave [rx-queues <number>] [tx-queues <number>]>",
   .function = memif_create_command_fn,
 };
 /* *INDENT-ON* */
@@ -148,7 +173,7 @@
 			mif->socket_filename);
        vlib_cli_output (vm, "  listener %d conn-fd %d int-fd %d", mif->listener_index,
 			mif->connection.fd, mif->interrupt_line.fd);
-       vlib_cli_output (vm, "  ring-size %u num-c2s-rings %u num-s2c-rings %u buffer_size %u",
+       vlib_cli_output (vm, "  ring-size %u num-s2m-rings %u num-m2s-rings %u buffer_size %u",
 			(1 << mif->log2_ring_size),
 			mif->num_s2m_rings,
 			mif->num_m2s_rings,
diff --git a/src/plugins/memif/device.c b/src/plugins/memif/device.c
index 70bdb48..f496b17 100644
--- a/src/plugins/memif/device.c
+++ b/src/plugins/memif/device.c
@@ -91,16 +91,27 @@
 			   vlib_frame_t * frame, memif_if_t * mif,
 			   memif_ring_type_t type)
 {
-  u8 rid = 0;
-  memif_ring_t *ring = memif_get_ring (mif, type, rid);
+  u8 rid;
+  memif_ring_t *ring;
   u32 *buffers = vlib_frame_args (frame);
   u32 n_left = frame->n_vectors;
   u16 ring_size = 1 << mif->log2_ring_size;
   u16 mask = ring_size - 1;
   u16 head, tail;
   u16 free_slots;
+  u32 thread_index = vlib_get_thread_index ();
+  u8 tx_queues = memif_get_tx_queues (mif);
 
-  clib_spinlock_lock_if_init (&mif->lockp);
+  if (tx_queues < vec_len (vlib_mains))
+    {
+      rid = thread_index % tx_queues;
+      clib_spinlock_lock_if_init (&mif->lockp);
+    }
+  else
+    {
+      rid = thread_index;
+    }
+  ring = memif_get_ring (mif, type, rid);
 
   /* free consumed buffers */
 
diff --git a/src/plugins/memif/memif.api b/src/plugins/memif/memif.api
index 95e016c..b0a351a 100644
--- a/src/plugins/memif/memif.api
+++ b/src/plugins/memif/memif.api
@@ -17,6 +17,8 @@
     @param client_index - opaque cookie to identify the sender
     @param context - sender context, to match reply w/ request
     @param role - role of the interface in the connection (master/slave)
+    @param rx_queues - number of rx queues (only valid for slave)
+    #param tx_queues - number of tx queues (only valid for slave)
     @param key - 64bit integer used to authenticate and match opposite sides
            of the connection
     @param socket_filename - filename of the socket to be used for connection
@@ -31,6 +33,8 @@
   u32 context;
 
   u8 role; /* 0 = master, 1 = slave */
+  u8 rx_queues; /* optional, default is 1 */
+  u8 tx_queues; /* optional, default is 1 */
   u64 key; /* optional, default is 0 */
   u8 socket_filename[128]; /* optional, default is "/var/vpp/memif.sock"
                               and can be changed in VPP startup config */
diff --git a/src/plugins/memif/memif.c b/src/plugins/memif/memif.c
index 44c5012..a671bda 100644
--- a/src/plugins/memif/memif.c
+++ b/src/plugins/memif/memif.c
@@ -72,6 +72,9 @@
   vnet_main_t *vnm = vnet_get_main ();
   int num_rings = mif->num_s2m_rings + mif->num_m2s_rings;
   memif_ring_data_t *rd = NULL;
+  vnet_hw_interface_t *hw;
+  u8 rid, rx_queues;
+  int ret;
 
   vec_validate_aligned (mif->ring_data, num_rings - 1, CLIB_CACHE_LINE_BYTES);
   vec_foreach (rd, mif->ring_data)
@@ -83,12 +86,30 @@
   mif->flags |= MEMIF_IF_FLAG_CONNECTED;
   vnet_hw_interface_set_flags (vnm, mif->hw_if_index,
 			       VNET_HW_INTERFACE_FLAG_LINK_UP);
+
+  hw = vnet_get_hw_interface (vnm, mif->hw_if_index);
+  hw->flags |= VNET_HW_INTERFACE_FLAG_SUPPORTS_INT_MODE;
+  vnet_hw_interface_set_input_node (vnm, mif->hw_if_index,
+				    memif_input_node.index);
+  rx_queues = memif_get_rx_queues (mif);
+  for (rid = 0; rid < rx_queues; rid++)
+    {
+      vnet_hw_interface_assign_rx_thread (vnm, mif->hw_if_index, rid, ~0);
+      ret = vnet_hw_interface_set_rx_mode (vnm, mif->hw_if_index, rid,
+					   VNET_HW_INTERFACE_RX_MODE_INTERRUPT);
+      if (ret)
+	DEBUG_LOG ("Warning: unable to set rx mode for interface %d "
+		   "queue %d: rc=%d", mif->hw_if_index, rid, ret);
+    }
 }
 
 static void
 memif_disconnect_do (vlib_main_t * vm, memif_if_t * mif)
 {
   vnet_main_t *vnm = vnet_get_main ();
+  u8 rid, rx_queues;
+  int rv;
+  memif_shm_t **shm;
 
   mif->flags &= ~(MEMIF_IF_FLAG_CONNECTED | MEMIF_IF_FLAG_CONNECTING);
   if (mif->hw_if_index != ~0)
@@ -101,7 +122,20 @@
       mif->connection.fd = -1;	/* closed in unix_file_del */
     }
 
-  // TODO: properly munmap + close memif-owned shared memory segments
+  rx_queues = memif_get_rx_queues (mif);
+  for (rid = 0; rid < rx_queues; rid++)
+    {
+      rv = vnet_hw_interface_unassign_rx_thread (vnm, mif->hw_if_index, rid);
+      if (rv)
+	DEBUG_LOG ("Warning: unable to unassign interface %d, "
+		   "queue %d: rc=%d", mif->hw_if_index, rid, rv);
+    }
+
+  shm = (memif_shm_t **) mif->regions;
+  rv = munmap ((void *) *shm, mif->shared_mem_size);
+  if (rv)
+    DEBUG_UNIX_LOG ("Error: failed munmap call");
+
   vec_free (mif->regions);
 }
 
@@ -228,6 +262,7 @@
       goto response;
     }
 
+  mif->shared_mem_size = req->shared_mem_size;
   mif->log2_ring_size = req->log2_ring_size;
   mif->num_s2m_rings = req->num_s2m_rings;
   mif->num_m2s_rings = req->num_m2s_rings;
@@ -332,6 +367,9 @@
   else
     mif = vec_elt_at_index (mm->interfaces, uf->private_data >> 1);
 
+  /* Stop workers to avoid end of the world */
+  vlib_worker_thread_barrier_sync (vlib_get_main ());
+
   /* receive the incoming message */
   size = recvmsg (uf->file_descriptor, &mh, 0);
   if (size != sizeof (memif_msg_t))
@@ -342,7 +380,7 @@
 	    memif_remove_pending_conn (pending_conn);
 	  else
 	    memif_disconnect_do (vm, mif);
-	  return error;
+	  goto return_ok;
 	}
 
       DEBUG_UNIX_LOG ("Malformed message received on fd %d",
@@ -364,38 +402,36 @@
     {
     case MEMIF_MSG_TYPE_CONNECT_REQ:
       if (pending_conn == 0)
+	DEBUG_LOG ("Received unexpected connection request");
+      else
 	{
-	  DEBUG_LOG ("Received unexpected connection request");
-	  return 0;
-	}
-
-      /* Read anciliary data */
-      cmsg = CMSG_FIRSTHDR (&mh);
-      while (cmsg)
-	{
-	  if (cmsg->cmsg_level == SOL_SOCKET
-	      && cmsg->cmsg_type == SCM_CREDENTIALS)
+	  /* Read anciliary data */
+	  cmsg = CMSG_FIRSTHDR (&mh);
+	  while (cmsg)
 	    {
-	      cr = (struct ucred *) CMSG_DATA (cmsg);
+	      if (cmsg->cmsg_level == SOL_SOCKET
+		  && cmsg->cmsg_type == SCM_CREDENTIALS)
+		{
+		  cr = (struct ucred *) CMSG_DATA (cmsg);
+		}
+	      else if (cmsg->cmsg_level == SOL_SOCKET
+		       && cmsg->cmsg_type == SCM_RIGHTS)
+		{
+		  memcpy (fd_array, CMSG_DATA (cmsg), sizeof (fd_array));
+		}
+	      cmsg = CMSG_NXTHDR (&mh, cmsg);
 	    }
-	  else if (cmsg->cmsg_level == SOL_SOCKET
-		   && cmsg->cmsg_type == SCM_RIGHTS)
-	    {
-	      memcpy (fd_array, CMSG_DATA (cmsg), sizeof (fd_array));
-	    }
-	  cmsg = CMSG_NXTHDR (&mh, cmsg);
+	  error = memif_process_connect_req (pending_conn, &msg, cr,
+					     fd_array[0], fd_array[1]);
 	}
-
-      return memif_process_connect_req (pending_conn, &msg, cr,
-					fd_array[0], fd_array[1]);
+      break;
 
     case MEMIF_MSG_TYPE_CONNECT_RESP:
       if (mif == 0)
-	{
-	  DEBUG_LOG ("Received unexpected connection response");
-	  return 0;
-	}
-      return memif_process_connect_resp (mif, &msg);
+	DEBUG_LOG ("Received unexpected connection response");
+      else
+	error = memif_process_connect_resp (mif, &msg);
+      break;
 
     case MEMIF_MSG_TYPE_DISCONNECT:
       goto disconnect;
@@ -405,13 +441,16 @@
       goto disconnect;
     }
 
-  return 0;
+return_ok:
+  vlib_worker_thread_barrier_release (vlib_get_main ());
+  return error;
 
 disconnect:
   if (pending_conn)
     memif_remove_pending_conn (pending_conn);
   else
     memif_disconnect (vm, mif);
+  vlib_worker_thread_barrier_release (vlib_get_main ());
   return error;
 }
 
@@ -434,7 +473,8 @@
       mif->interrupt_line.fd = -1;
     }
   else
-    vnet_device_input_set_interrupt_pending (vnm, mif->hw_if_index, 0);
+    vnet_device_input_set_interrupt_pending (vnm, mif->hw_if_index, b);
+
   return 0;
 }
 
@@ -530,6 +570,7 @@
       goto error;
     }
 
+  mif->shared_mem_size = msg.shared_mem_size;
   vec_add1 (mif->regions, shm);
   ((memif_shm_t *) mif->regions[0])->cookie = 0xdeadbeef;
 
@@ -803,7 +844,6 @@
   clib_error_t *error = 0;
   int ret = 0;
   uword *p;
-  vnet_hw_interface_t *hw;
 
   p = mhash_get (&mm->if_index_by_key, &args->key);
   if (p)
@@ -851,9 +891,8 @@
   mif->log2_ring_size = args->log2_ring_size;
   mif->buffer_size = args->buffer_size;
 
-  /* TODO: make configurable */
-  mif->num_s2m_rings = 1;
-  mif->num_m2s_rings = 1;
+  mif->num_s2m_rings = args->rx_queues;
+  mif->num_m2s_rings = args->tx_queues;
 
   mhash_set_mem (&mm->if_index_by_key, &args->key, &mif->if_index, 0);
 
@@ -952,17 +991,6 @@
       mif->flags |= MEMIF_IF_FLAG_IS_SLAVE;
     }
 
-  hw = vnet_get_hw_interface (vnm, mif->hw_if_index);
-  hw->flags |= VNET_HW_INTERFACE_FLAG_SUPPORTS_INT_MODE;
-  vnet_hw_interface_set_input_node (vnm, mif->hw_if_index,
-				    memif_input_node.index);
-  vnet_hw_interface_assign_rx_thread (vnm, mif->hw_if_index, 0, ~0);
-  ret = vnet_hw_interface_set_rx_mode (vnm, mif->hw_if_index, 0,
-				       VNET_HW_INTERFACE_RX_MODE_INTERRUPT);
-  if (ret)
-    clib_warning ("Warning: unable to set rx mode for interface %d: "
-		  "rc=%d", mif->hw_if_index, ret);
-
 #if 0
   /* use configured or generate random MAC address */
   if (!args->hw_addr_set &&
@@ -995,32 +1023,26 @@
   memif_main_t *mm = &memif_main;
   memif_if_t *mif;
   uword *p;
-  int ret;
+  u32 hw_if_index;
 
   p = mhash_get (&mm->if_index_by_key, &key);
   if (p == NULL)
     {
-      clib_warning ("Memory interface with key 0x%" PRIx64 " does not exist",
-		    key);
+      DEBUG_LOG ("Memory interface with key 0x%" PRIx64 " does not exist",
+		 key);
       return VNET_API_ERROR_SYSCALL_ERROR_1;
     }
   mif = pool_elt_at_index (mm->interfaces, p[0]);
   mif->flags |= MEMIF_IF_FLAG_DELETING;
 
-  ret = vnet_hw_interface_unassign_rx_thread (vnm, mif->hw_if_index, 0);
-  if (ret)
-    clib_warning ("Warning: unable to unassign interface %d: rc=%d",
-		  mif->hw_if_index, ret);
-
   /* bring down the interface */
-  vnet_hw_interface_set_flags (vnm, mif->hw_if_index, 0);
   vnet_sw_interface_set_flags (vnm, mif->sw_if_index, 0);
 
-  /* remove the interface */
-  ethernet_delete_interface (vnm, mif->hw_if_index);
-  mif->hw_if_index = ~0;
+  hw_if_index = mif->hw_if_index;
   memif_close_if (mm, mif);
 
+  /* remove the interface */
+  ethernet_delete_interface (vnm, hw_if_index);
   if (pool_elts (mm->interfaces) == 0)
     {
       vlib_process_signal_event (vm, memif_process_node.index,
diff --git a/src/plugins/memif/memif.h b/src/plugins/memif/memif.h
index ea5b350..56028a2 100644
--- a/src/plugins/memif/memif.h
+++ b/src/plugins/memif/memif.h
@@ -33,7 +33,9 @@
   u8 log2_ring_size;
 #define MEMIF_DEFAULT_RING_SIZE 1024
   u16 num_s2m_rings;
+#define MEMIF_DEFAULT_RX_QUEUES 1
   u16 num_m2s_rings;
+#define MEMIF_DEFAULT_TX_QUEUES 1
   u16 buffer_size;
 #define MEMIF_DEFAULT_BUFFER_SIZE 2048
   u32 shared_mem_size;
@@ -126,6 +128,7 @@
   u8 num_s2m_rings;
   u8 num_m2s_rings;
   u16 buffer_size;
+  u32 shared_mem_size;
 
   memif_ring_data_t *ring_data;
 
@@ -189,6 +192,8 @@
   u16 buffer_size;
   u8 hw_addr_set;
   u8 hw_addr[6];
+  u8 rx_queues;
+  u8 tx_queues;
 
   /* return */
   u32 sw_if_index;
@@ -211,6 +216,32 @@
 #endif
 #endif
 
+static_always_inline u8
+memif_get_rx_queues (memif_if_t * mif)
+{
+  u8 rx_queues;
+
+  if (mif->flags & MEMIF_IF_FLAG_IS_SLAVE)
+    rx_queues = mif->num_m2s_rings;
+  else
+    rx_queues = mif->num_s2m_rings;
+
+  return (rx_queues);
+}
+
+static_always_inline u8
+memif_get_tx_queues (memif_if_t * mif)
+{
+  u8 tx_queues;
+
+  if (mif->flags & MEMIF_IF_FLAG_IS_SLAVE)
+    tx_queues = mif->num_s2m_rings;
+  else
+    tx_queues = mif->num_m2s_rings;
+
+  return (tx_queues);
+}
+
 static inline int
 memfd_create (const char *name, unsigned int flags)
 {
diff --git a/src/plugins/memif/memif_api.c b/src/plugins/memif/memif_api.c
index 1ade317..1470f94 100644
--- a/src/plugins/memif/memif_api.c
+++ b/src/plugins/memif/memif_api.c
@@ -122,6 +122,11 @@
 
   /* role */
   args.is_master = (mp->role == 0);
+  if (args.is_master == 0)
+    {
+      args.rx_queues = mp->rx_queues;
+      args.tx_queues = mp->tx_queues;
+    }
 
   /* ring size */
   if (mp->ring_size)
diff --git a/src/plugins/memif/node.c b/src/plugins/memif/node.c
index 2690dc4..fd7baa3 100644
--- a/src/plugins/memif/node.c
+++ b/src/plugins/memif/node.c
@@ -78,15 +78,12 @@
 static_always_inline uword
 memif_device_input_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
 			   vlib_frame_t * frame, memif_if_t * mif,
-			   memif_ring_type_t type)
+			   memif_ring_type_t type, u16 rid)
 {
   vnet_main_t *vnm = vnet_get_main ();
-  u8 rid = 0;			/* Ring id */
   memif_ring_t *ring = memif_get_ring (mif, type, rid);
-  memif_ring_data_t *rd =
-    vec_elt_at_index (mif->ring_data, rid + type * mif->num_s2m_rings);
+  memif_ring_data_t *rd;
   u16 head;
-
   u32 next_index = VNET_DEVICE_INPUT_NEXT_ETHERNET_INPUT;
   uword n_trace = vlib_get_trace_count (vm, node);
   memif_main_t *nm = &memif_main;
@@ -102,6 +99,7 @@
   u16 num_slots;
   void *mb0, *mb1;
 
+  rd = vec_elt_at_index (mif->ring_data, rid + type * mif->num_s2m_rings);
   if (mif->per_interface_next_index != ~0)
     next_index = mif->per_interface_next_index;
 
@@ -328,7 +326,6 @@
 		vlib_frame_t * frame)
 {
   u32 n_rx_packets = 0;
-  u32 thread_index = vlib_get_thread_index ();
   memif_main_t *nm = &memif_main;
   memif_if_t *mif;
   vnet_device_input_runtime_t *rt = (void *) node->runtime_data;
@@ -338,17 +335,16 @@
   foreach_device_and_queue (dq, rt->devices_and_queues)
   {
     mif = vec_elt_at_index (nm->interfaces, dq->dev_instance);
-    if (mif->flags & MEMIF_IF_FLAG_ADMIN_UP &&
-	mif->flags & MEMIF_IF_FLAG_CONNECTED &&
-	(mif->if_index % nm->input_cpu_count) ==
-	(thread_index - nm->input_cpu_first_index))
+    if ((mif->flags & MEMIF_IF_FLAG_ADMIN_UP) &&
+	(mif->flags & MEMIF_IF_FLAG_CONNECTED))
       {
 	if (mif->flags & MEMIF_IF_FLAG_IS_SLAVE)
 	  type = MEMIF_RING_M2S;
 	else
 	  type = MEMIF_RING_S2M;
 	n_rx_packets +=
-	  memif_device_input_inline (vm, node, frame, mif, type);
+	  memif_device_input_inline (vm, node, frame, mif, type,
+				     dq->queue_id);
       }
   }