af_xdp: integrate with new tx infra

Type: improvement

Signed-off-by: arikachen <eaglesora@gmail.com>
Signed-off-by: BenoƮt Ganne <bganne@cisco.com>
Change-Id: If8d57bcf033864935bd5e3a9912b2c1a7c712f44
diff --git a/src/plugins/af_xdp/af_xdp.h b/src/plugins/af_xdp/af_xdp.h
index 85c8fee..84fc65f 100644
--- a/src/plugins/af_xdp/af_xdp.h
+++ b/src/plugins/af_xdp/af_xdp.h
@@ -86,6 +86,10 @@
   struct xsk_ring_prod tx;
   struct xsk_ring_cons cq;
   int xsk_fd;
+
+  /* fields below are accessed in control-plane only (cold) */
+
+  u32 queue_index;
 } af_xdp_txq_t;
 
 typedef struct
diff --git a/src/plugins/af_xdp/device.c b/src/plugins/af_xdp/device.c
index 0b39280..d27901b 100644
--- a/src/plugins/af_xdp/device.c
+++ b/src/plugins/af_xdp/device.c
@@ -30,6 +30,7 @@
 #include <vppinfra/unix.h>
 #include <vnet/ethernet/ethernet.h>
 #include <vnet/interface/rx_queue_funcs.h>
+#include <vnet/interface/tx_queue_funcs.h>
 #include "af_xdp.h"
 
 af_xdp_main_t af_xdp_main;
@@ -315,6 +316,7 @@
   if (is_tx)
     {
       txq->xsk_fd = fd;
+      clib_spinlock_init (&txq->lock);
       if (is_rx && (ad->flags & AF_XDP_DEVICE_F_SYSCALL_LOCK))
 	{
 	  /* This is a shared rx+tx queue and we need to lock before syscalls.
@@ -432,6 +434,72 @@
   return 0;
 }
 
+static u32
+af_xdp_find_rxq_for_thread (vnet_main_t *vnm, const af_xdp_device_t *ad,
+			    const u32 thread)
+{
+  u32 i;
+  for (i = 0; i < ad->rxq_num; i++)
+    {
+      const u32 qid = vec_elt (ad->rxqs, i).queue_index;
+      const u32 tid = vnet_hw_if_get_rx_queue (vnm, qid)->thread_index;
+      if (tid == thread)
+	return i;
+    }
+  return ~0;
+}
+
+static clib_error_t *
+af_xdp_finalize_queues (vnet_main_t *vnm, af_xdp_device_t *ad,
+			const int n_vlib_mains)
+{
+  clib_error_t *err = 0;
+  int i;
+
+  for (i = 0; i < ad->rxq_num; i++)
+    {
+      af_xdp_rxq_t *rxq = vec_elt_at_index (ad->rxqs, i);
+      rxq->queue_index = vnet_hw_if_register_rx_queue (
+	vnm, ad->hw_if_index, i, VNET_HW_IF_RXQ_THREAD_ANY);
+      u8 *desc = format (0, "%U rxq %d", format_af_xdp_device_name,
+			 ad->dev_instance, i);
+      clib_file_t f = {
+	.file_descriptor = rxq->xsk_fd,
+	.private_data = rxq->queue_index,
+	.read_function = af_xdp_device_rxq_read_ready,
+	.description = desc,
+      };
+      rxq->file_index = clib_file_add (&file_main, &f);
+      vnet_hw_if_set_rx_queue_file_index (vnm, rxq->queue_index,
+					  rxq->file_index);
+      err = af_xdp_device_set_rxq_mode (ad, rxq, AF_XDP_RXQ_MODE_POLLING);
+      if (err)
+	return err;
+    }
+
+  for (i = 0; i < ad->txq_num; i++)
+    vec_elt (ad->txqs, i).queue_index =
+      vnet_hw_if_register_tx_queue (vnm, ad->hw_if_index, i);
+
+  /* We set the rxq and txq of the same queue pair on the same thread
+   * by default to avoid locking because of the syscall lock. */
+  int last_qid = clib_min (ad->rxq_num, ad->txq_num - 1);
+  for (i = 0; i < n_vlib_mains; i++)
+    {
+      /* search for the 1st rxq assigned on this thread, if any */
+      u32 qid = af_xdp_find_rxq_for_thread (vnm, ad, i);
+      /* if this rxq is combined with a txq, use it. Otherwise, we'll
+       * assign txq in a round-robin fashion. We start from the 1st txq
+       * not shared with a rxq if possible... */
+      qid = qid < ad->txq_num ? qid : (last_qid++ % ad->txq_num);
+      vnet_hw_if_tx_queue_assign_thread (
+	vnm, vec_elt (ad->txqs, qid).queue_index, i);
+    }
+
+  vnet_hw_if_update_runtime_data (vnm, ad->hw_if_index);
+  return 0;
+}
+
 void
 af_xdp_create_if (vlib_main_t * vm, af_xdp_create_if_args_t * args)
 {
@@ -556,13 +624,6 @@
       goto err2;
     }
 
-  if (ad->txq_num < tm->n_vlib_mains)
-    {
-      /* initialize lock for shared txq */
-      for (i = 0; i < ad->txq_num; i++)
-	clib_spinlock_init (&vec_elt (ad->txqs, i).lock);
-    }
-
   ad->dev_instance = ad - am->devices;
   ad->per_interface_next_index = VNET_DEVICE_INPUT_NEXT_ETHERNET_INPUT;
   ad->pool =
@@ -603,28 +664,13 @@
 
   vnet_hw_if_set_input_node (vnm, ad->hw_if_index, af_xdp_input_node.index);
 
-  for (i = 0; i < ad->rxq_num; i++)
+  args->error = af_xdp_finalize_queues (vnm, ad, tm->n_vlib_mains);
+  if (args->error)
     {
-      af_xdp_rxq_t *rxq = vec_elt_at_index (ad->rxqs, i);
-      rxq->queue_index = vnet_hw_if_register_rx_queue (
-	vnm, ad->hw_if_index, i, VNET_HW_IF_RXQ_THREAD_ANY);
-      u8 *desc = format (0, "%U rxq %d", format_af_xdp_device_name,
-			 ad->dev_instance, i);
-      clib_file_t f = {
-	.file_descriptor = rxq->xsk_fd,
-	.private_data = rxq->queue_index,
-	.read_function = af_xdp_device_rxq_read_ready,
-	.description = desc,
-      };
-      rxq->file_index = clib_file_add (&file_main, &f);
-      vnet_hw_if_set_rx_queue_file_index (vnm, rxq->queue_index,
-					  rxq->file_index);
-      if (af_xdp_device_set_rxq_mode (ad, rxq, AF_XDP_RXQ_MODE_POLLING))
-	goto err2;
+      args->rv = VNET_API_ERROR_SYSCALL_ERROR_7;
+      goto err2;
     }
 
-  vnet_hw_if_update_runtime_data (vnm, ad->hw_if_index);
-
   /* buffer template */
   vec_validate_aligned (ad->buffer_template, 1, CLIB_CACHE_LINE_BYTES);
   ad->buffer_template->flags = VLIB_BUFFER_TOTAL_LENGTH_VALID;
diff --git a/src/plugins/af_xdp/output.c b/src/plugins/af_xdp/output.c
index c5b73f9..d1500a6 100644
--- a/src/plugins/af_xdp/output.c
+++ b/src/plugins/af_xdp/output.c
@@ -215,9 +215,9 @@
   af_xdp_main_t *rm = &af_xdp_main;
   vnet_interface_output_runtime_t *ord = (void *) node->runtime_data;
   af_xdp_device_t *ad = pool_elt_at_index (rm->devices, ord->dev_instance);
-  u32 thread_index = vm->thread_index;
-  af_xdp_txq_t *txq = vec_elt_at_index (
-    ad->txqs, (thread_index - 1 + ad->txq_num) % ad->txq_num);
+  const vnet_hw_if_tx_frame_t *tf = vlib_frame_scalar_args (frame);
+  const int shared_queue = tf->shared_queue;
+  af_xdp_txq_t *txq = vec_elt_at_index (ad->txqs, tf->queue_id);
   u32 *from;
   u32 n, n_tx;
   int i;
@@ -225,7 +225,8 @@
   from = vlib_frame_vector_args (frame);
   n_tx = frame->n_vectors;
 
-  clib_spinlock_lock_if_init (&txq->lock);
+  if (shared_queue)
+    clib_spinlock_lock (&txq->lock);
 
   for (i = 0, n = 0; i < AF_XDP_TX_RETRIES && n < n_tx; i++)
     {
@@ -238,7 +239,8 @@
 
   af_xdp_device_output_tx_db (vm, node, ad, txq, n);
 
-  clib_spinlock_unlock_if_init (&txq->lock);
+  if (shared_queue)
+    clib_spinlock_unlock (&txq->lock);
 
   if (PREDICT_FALSE (n != n_tx))
     {