virtio: add packet buffering on tx

Type: feature

This patch adds packet buffering on tx for
slow backend which have some jitter/delays
in freeing the vrings.

There are some limitations to the current design:
1) It only works in poll mode.
2) Atleast 1 rx queue of an interface (with buffering
   enabled) should be placed on each worker and main thread.

Signed-off-by: Mohsin Kazmi <sykazmi@cisco.com>
Change-Id: Ib93c350298b228e80426e58ac77f3bbc93b8be27
diff --git a/src/vnet/devices/virtio/cli.c b/src/vnet/devices/virtio/cli.c
index 6d47bad..8a9c97b 100644
--- a/src/vnet/devices/virtio/cli.c
+++ b/src/vnet/devices/virtio/cli.c
@@ -30,6 +30,7 @@
   unformat_input_t _line_input, *line_input = &_line_input;
   virtio_pci_create_if_args_t args;
   u64 feature_mask = (u64) ~ (0ULL);
+  u32 buffering_size = 0;
 
   /* Get a line of input. */
   if (!unformat_user (input, unformat_line_input, line_input))
@@ -46,6 +47,12 @@
 	args.gso_enabled = 1;
       else if (unformat (line_input, "csum-enabled"))
 	args.checksum_offload_enabled = 1;
+      else if (unformat (line_input, "buffering"))
+	{
+	  args.virtio_flags = VIRTIO_FLAG_BUFFERING;
+	  if (unformat (line_input, "size %u", &buffering_size))
+	    args.buffering_size = buffering_size;
+	}
       else
 	return clib_error_return (0, "unknown input `%U'",
 				  format_unformat_error, input);
@@ -61,7 +68,8 @@
 VLIB_CLI_COMMAND (virtio_pci_create_command, static) = {
   .path = "create interface virtio",
   .short_help = "create interface virtio <pci-address> "
-                "[feature-mask <hex-mask>] [gso-enabled] [csum-enabled]",
+                "[feature-mask <hex-mask>] [gso-enabled] [csum-enabled] "
+		"[buffering [size <buffering-szie>]]",
   .function = virtio_pci_create_command_fn,
 };
 /* *INDENT-ON* */
diff --git a/src/vnet/devices/virtio/device.c b/src/vnet/devices/virtio/device.c
index dcd565d..a414f82 100644
--- a/src/vnet/devices/virtio/device.c
+++ b/src/vnet/devices/virtio/device.c
@@ -546,7 +546,7 @@
   virtio_vring_t *vring;
   u16 qid = vm->thread_index % vif->num_txqs;
   vring = vec_elt_at_index (vif->txq_vrings, qid);
-  u16 used, next, avail;
+  u16 used, next, avail, n_buffers = 0, n_buffers_left = 0;
   u16 sz = vring->size;
   u16 mask = sz - 1;
   u16 retry_count = 2;
@@ -588,6 +588,42 @@
   else
     free_desc_count = sz - used;
 
+  if (vif->packet_buffering)
+    {
+      n_buffers = n_buffers_left = virtio_vring_n_buffers (vring->buffering);
+
+      while (n_buffers_left && free_desc_count)
+	{
+	  u16 n_added = 0;
+
+	  u32 bi = virtio_vring_buffering_read_from_front (vring->buffering);
+	  if (bi == ~0)
+	    break;
+	  vlib_buffer_t *b0 = vlib_get_buffer (vm, bi);
+	  if (b0->flags & VLIB_BUFFER_IS_TRACED)
+	    {
+	      virtio_tx_trace (vm, node, type, b0, buffers[0]);
+	    }
+	  n_added =
+	    add_buffer_to_slot (vm, vif, type, vring, bi, free_desc_count,
+				avail, next, mask, do_gso, csum_offload,
+				node->node_index);
+	  if (PREDICT_FALSE (n_added == 0))
+	    {
+	      n_buffers_left--;
+	      continue;
+	    }
+	  else if (PREDICT_FALSE (n_added > free_desc_count))
+	    break;
+
+	  avail++;
+	  next = (next + n_added) & mask;
+	  used += n_added;
+	  n_buffers_left--;
+	  free_desc_count -= n_added;
+	}
+    }
+
   while (n_left && free_desc_count)
     {
       u16 n_added = 0;
@@ -619,7 +655,7 @@
       free_desc_count -= n_added;
     }
 
-  if (n_left != frame->n_vectors)
+  if (n_left != frame->n_vectors || n_buffers != n_buffers_left)
     {
       CLIB_MEMORY_STORE_BARRIER ();
       vring->avail->idx = avail;
@@ -634,9 +670,19 @@
       if (retry_count--)
 	goto retry;
 
-      virtio_interface_drop_inline (vm, node->node_index,
-				    buffers, n_left,
-				    VIRTIO_TX_ERROR_NO_FREE_SLOTS);
+      if (vif->packet_buffering)
+	{
+
+	  u16 n_buffered =
+	    virtio_vring_buffering_store_packets (vring->buffering, buffers,
+						  n_left);
+	  buffers += n_buffered;
+	  n_left -= n_buffered;
+	}
+      if (n_left)
+	virtio_interface_drop_inline (vm, node->node_index,
+				      buffers, n_left,
+				      VIRTIO_TX_ERROR_NO_FREE_SLOTS);
     }
 
   clib_spinlock_unlock_if_init (&vring->lockp);
@@ -740,20 +786,23 @@
       {
 	/* only enable packet coalesce in poll mode */
 	gro_flow_table_set_is_enable (tx_vring->flow_table, 1);
+	/* only enable packet buffering in poll mode */
+	virtio_vring_buffering_set_is_enable (tx_vring->buffering, 1);
       }
       rx_vring->avail->flags |= VRING_AVAIL_F_NO_INTERRUPT;
     }
   else
     {
-      if (vif->packet_coalesce)
+      if (vif->packet_coalesce || vif->packet_buffering)
 	{
 	  virtio_log_warning (vif,
-			      "interface %U is in interrupt mode, disabling packet coalescing",
+			      "interface %U is in interrupt mode, disabling packet coalescing or buffering",
 			      format_vnet_sw_if_index_name, vnet_get_main (),
 			      vif->sw_if_index);
 	  vec_foreach (tx_vring, vif->txq_vrings)
 	  {
 	    gro_flow_table_set_is_enable (tx_vring->flow_table, 0);
+	    virtio_vring_buffering_set_is_enable (tx_vring->buffering, 0);
 	  }
 	}
       rx_vring->avail->flags &= ~VRING_AVAIL_F_NO_INTERRUPT;
diff --git a/src/vnet/devices/virtio/node.c b/src/vnet/devices/virtio/node.c
index 1c9cfd0..42b2590 100644
--- a/src/vnet/devices/virtio/node.c
+++ b/src/vnet/devices/virtio/node.c
@@ -279,11 +279,14 @@
   u16 last = vring->last_used_idx;
   u16 n_left = vring->used->idx - last;
 
-  if (vif->packet_coalesce
-      && clib_spinlock_trylock_if_init (&txq_vring->lockp))
+  if (clib_spinlock_trylock_if_init (&txq_vring->lockp))
     {
-      vnet_gro_flow_table_schedule_node_on_dispatcher (vm,
-						       txq_vring->flow_table);
+      if (vif->packet_coalesce)
+	vnet_gro_flow_table_schedule_node_on_dispatcher (vm,
+							 txq_vring->flow_table);
+      else if (vif->packet_buffering)
+	virtio_vring_buffering_schedule_node_on_dispatcher (vm,
+							    txq_vring->buffering);
       clib_spinlock_unlock_if_init (&txq_vring->lockp);
     }
 
diff --git a/src/vnet/devices/virtio/pci.c b/src/vnet/devices/virtio/pci.c
index df8e2bd..71d813a 100644
--- a/src/vnet/devices/virtio/pci.c
+++ b/src/vnet/devices/virtio/pci.c
@@ -1154,6 +1154,19 @@
 
   vnet_hw_interface_t *hw = vnet_get_hw_interface (vnm, vif->hw_if_index);
   hw->flags |= VNET_HW_INTERFACE_FLAG_SUPPORTS_INT_MODE;
+
+  if (args->virtio_flags & VIRTIO_FLAG_BUFFERING)
+    {
+      error = virtio_set_packet_buffering (vif, args->buffering_size);
+      if (error)
+	{
+	  args->rv = VNET_API_ERROR_INIT_FAILED;
+	  virtio_log_error (vif,
+			    "error encountered during packet buffering init");
+	  goto error;
+	}
+    }
+
   vnet_hw_interface_set_input_node (vnm, vif->hw_if_index,
 				    virtio_input_node.index);
   u32 i = 0;
diff --git a/src/vnet/devices/virtio/pci.h b/src/vnet/devices/virtio/pci.h
index ab5c6f1..10df49c 100644
--- a/src/vnet/devices/virtio/pci.h
+++ b/src/vnet/devices/virtio/pci.h
@@ -214,7 +214,8 @@
   _ (CSUM_OFFLOAD, 1)         \
   _ (GRO_COALESCE, 2)         \
   _ (PACKED, 3)               \
-  _ (IN_ORDER, 4)
+  _ (IN_ORDER, 4)	      \
+  _ (BUFFERING, 5)
 
 typedef enum
 {
@@ -234,6 +235,7 @@
   u64 features;
   u8 gso_enabled;
   u8 checksum_offload_enabled;
+  u32 buffering_size;
   u32 virtio_flags;
   clib_error_t *error;
 } virtio_pci_create_if_args_t;
diff --git a/src/vnet/devices/virtio/virtio.api b/src/vnet/devices/virtio/virtio.api
index 3c8aed2..bbe2341 100644
--- a/src/vnet/devices/virtio/virtio.api
+++ b/src/vnet/devices/virtio/virtio.api
@@ -62,6 +62,7 @@
         VIRTIO_API_FLAG_GRO_COALESCE = 4, /* enable packet coalescing on tx side, provided gso enabled */
         VIRTIO_API_FLAG_PACKED = 8, /* enable packed ring support, provided it is available from backend */
         VIRTIO_API_FLAG_IN_ORDER = 16, /* enable in order support, provided it is available from backend */
+        VIRTIO_API_FLAG_BUFFERING = 32 [backwards_compatible], /* enable buffering to handle backend jitter/delays */
 };
 
 /** \brief Initialize a new virtio pci interface with the given parameters
diff --git a/src/vnet/devices/virtio/virtio.c b/src/vnet/devices/virtio/virtio.c
index e74f378..8209a46 100644
--- a/src/vnet/devices/virtio/virtio.c
+++ b/src/vnet/devices/virtio/virtio.c
@@ -216,6 +216,7 @@
     clib_mem_free (vring->avail);
   vec_free (vring->buffers);
   gro_flow_table_free (vring->flow_table);
+  virtio_vring_buffering_free (vm, vring->buffering);
   clib_spinlock_free (&vring->lockp);
   return 0;
 }
@@ -235,6 +236,28 @@
   }
 }
 
+clib_error_t *
+virtio_set_packet_buffering (virtio_if_t * vif, u16 buffering_size)
+{
+  vnet_main_t *vnm = vnet_get_main ();
+  vnet_hw_interface_t *hw = vnet_get_hw_interface (vnm, vif->hw_if_index);
+  virtio_vring_t *vring;
+  clib_error_t *error = 0;
+  vif->packet_buffering = 1;
+
+  vec_foreach (vring, vif->txq_vrings)
+  {
+    if ((error =
+	 virtio_vring_buffering_init (&vring->buffering, hw->tx_node_index,
+				      buffering_size)))
+      {
+	break;
+      }
+  }
+
+  return error;
+}
+
 void
 virtio_vring_set_numa_node (vlib_main_t * vm, virtio_if_t * vif, u32 idx)
 {
@@ -335,6 +358,7 @@
       vlib_cli_output (vm, "  gso-enabled %d", vif->gso_enabled);
       vlib_cli_output (vm, "  csum-enabled %d", vif->csum_offload_enabled);
       vlib_cli_output (vm, "  packet-coalesce %d", vif->packet_coalesce);
+      vlib_cli_output (vm, "  packet-buffering %d", vif->packet_buffering);
       if (type & (VIRTIO_IF_TYPE_TAP | VIRTIO_IF_TYPE_PCI))
 	vlib_cli_output (vm, "  Mac Address: %U", format_ethernet_address,
 			 vif->mac_addr);
@@ -432,6 +456,11 @@
 	    vlib_cli_output (vm, "    %U", gro_flow_table_format,
 			     vring->flow_table);
 	  }
+	if (vif->packet_buffering)
+	  {
+	    vlib_cli_output (vm, "    %U", virtio_vring_buffering_format,
+			     vring->buffering);
+	  }
 	if (show_descr)
 	  {
 	    vlib_cli_output (vm, "\n  descriptor table:\n");
diff --git a/src/vnet/devices/virtio/virtio.h b/src/vnet/devices/virtio/virtio.h
index b00e1ec..acefc38 100644
--- a/src/vnet/devices/virtio/virtio.h
+++ b/src/vnet/devices/virtio/virtio.h
@@ -20,6 +20,7 @@
 
 #include <vnet/devices/virtio/virtio_std.h>
 #include <vnet/devices/virtio/vhost_std.h>
+#include <vnet/devices/virtio/virtio_buffering.h>
 #include <vnet/gso/gro.h>
 
 #define foreach_virtio_if_flag		\
@@ -76,6 +77,7 @@
   u16 last_used_idx;
   u16 last_kick_avail_idx;
   u32 call_file_index;
+  virtio_vring_buffering_t *buffering;
   gro_flow_table_t *flow_table;
 } virtio_vring_t;
 
@@ -123,6 +125,7 @@
 
     CLIB_CACHE_LINE_ALIGN_MARK (cacheline1);
   int packet_coalesce;
+  int packet_buffering;
   u32 dev_instance;
   u32 numa_node;
   u64 remote_features;
@@ -203,6 +206,7 @@
 extern void virtio_show (vlib_main_t * vm, u32 * hw_if_indices, u8 show_descr,
 			 u32 type);
 extern void virtio_set_packet_coalesce (virtio_if_t * vif);
+clib_error_t *virtio_set_packet_buffering (virtio_if_t * vif, u16 size);
 extern void virtio_pci_legacy_notify_queue (vlib_main_t * vm,
 					    virtio_if_t * vif, u16 queue_id);
 extern void virtio_pci_modern_notify_queue (vlib_main_t * vm,
diff --git a/src/vnet/devices/virtio/virtio_api.c b/src/vnet/devices/virtio/virtio_api.c
index 9a145d9..d993104 100644
--- a/src/vnet/devices/virtio/virtio_api.c
+++ b/src/vnet/devices/virtio/virtio_api.c
@@ -126,15 +126,18 @@
   STATIC_ASSERT (((int) VIRTIO_API_FLAG_IN_ORDER ==
 		  (int) VIRTIO_FLAG_IN_ORDER),
 		 "virtio in-order api flag mismatch");
+  STATIC_ASSERT (((int) VIRTIO_API_FLAG_BUFFERING ==
+		  (int) VIRTIO_FLAG_BUFFERING),
+		 "virtio buffering api flag mismatch");
 
   ap->virtio_flags = clib_net_to_host_u32 (mp->virtio_flags);
   ap->features = clib_net_to_host_u64 (mp->features);
 
-  if (ap->virtio_flags & VIRTIO_FLAG_GSO)
+  if (ap->virtio_flags & VIRTIO_API_FLAG_GSO)
     ap->gso_enabled = 1;
   else
     ap->gso_enabled = 0;
-  if (ap->virtio_flags & VIRTIO_FLAG_CSUM_OFFLOAD)
+  if (ap->virtio_flags & VIRTIO_API_FLAG_CSUM_OFFLOAD)
     ap->checksum_offload_enabled = 1;
   else
     ap->checksum_offload_enabled = 0;
diff --git a/src/vnet/devices/virtio/virtio_buffering.h b/src/vnet/devices/virtio/virtio_buffering.h
new file mode 100644
index 0000000..ef3d9d2
--- /dev/null
+++ b/src/vnet/devices/virtio/virtio_buffering.h
@@ -0,0 +1,259 @@
+/*
+ *------------------------------------------------------------------
+ * Copyright (c) 2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *------------------------------------------------------------------
+ */
+
+#ifndef _VNET_DEVICES_VIRTIO_VIRTIO_BUFFERING_H_
+#define _VNET_DEVICES_VIRTIO_VIRTIO_BUFFERING_H_
+
+#define VIRTIO_BUFFERING_DEFAULT_SIZE 1024
+#define VIRTIO_BUFFERING_TIMEOUT 1e-5
+
+typedef struct
+{
+  f64 timeout_ts;
+  u32 *buffers;
+  u32 node_index;
+  u16 size;
+  u16 free_size;
+  u16 front;
+  u16 back;
+  u8 is_enable;
+} virtio_vring_buffering_t;
+
+static_always_inline clib_error_t *
+virtio_vring_buffering_init (virtio_vring_buffering_t ** buffering,
+			     u32 node_index, u16 size)
+{
+  if (*buffering)
+    return clib_error_return (0, "buffering: already initialized");
+
+  if (!is_pow2 (size))
+    return clib_error_return (0, "buffering: size must be power of 2");
+
+  if (size > 32768)
+    return clib_error_return (0, "buffering: size must be 32768 or lower");
+
+  if (size == 0)
+    size = VIRTIO_BUFFERING_DEFAULT_SIZE;
+
+  virtio_vring_buffering_t *b_temp = 0;
+  b_temp =
+    (virtio_vring_buffering_t *)
+    clib_mem_alloc (sizeof (virtio_vring_buffering_t));
+  if (!b_temp)
+    return clib_error_return (0, "buffering: memory allocation failed");
+
+  clib_memset (b_temp, 0, sizeof (virtio_vring_buffering_t));
+
+  b_temp->node_index = node_index;
+  b_temp->free_size = size;
+  b_temp->size = size;
+
+  vec_validate_aligned (b_temp->buffers, size, CLIB_CACHE_LINE_BYTES);
+  b_temp->is_enable = 1;
+
+  *buffering = b_temp;
+  return 0;
+}
+
+static_always_inline void
+virtio_vring_buffering_buffers_free (vlib_main_t * vm,
+				     virtio_vring_buffering_t * buffering)
+{
+  u16 n_buffers = buffering->size - buffering->free_size;
+  if (n_buffers)
+    {
+      vlib_buffer_free_from_ring (vm, buffering->buffers, buffering->front,
+				  buffering->size, n_buffers);
+      buffering->free_size += n_buffers;
+    }
+}
+
+static_always_inline void
+virtio_vring_buffering_free (vlib_main_t * vm,
+			     virtio_vring_buffering_t * buffering)
+{
+  if (buffering)
+    {
+      virtio_vring_buffering_buffers_free (vm, buffering);
+      vec_free (buffering->buffers);
+      clib_mem_free (buffering);
+    }
+}
+
+static_always_inline u8
+virtio_vring_buffering_is_enable (virtio_vring_buffering_t * buffering)
+{
+  if (buffering)
+    return buffering->is_enable;
+
+  return 0;
+}
+
+static_always_inline void
+virtio_vring_buffering_set_is_enable (virtio_vring_buffering_t * buffering,
+				      u8 is_enable)
+{
+  if (buffering)
+    buffering->is_enable = is_enable;
+}
+
+static_always_inline void
+virtio_vring_buffering_set_timeout (vlib_main_t * vm,
+				    virtio_vring_buffering_t * buffering,
+				    f64 timeout_expire)
+{
+  if (buffering)
+    buffering->timeout_ts = vlib_time_now (vm) + timeout_expire;
+}
+
+static_always_inline u8
+virtio_vring_buffering_is_timeout (vlib_main_t * vm,
+				   virtio_vring_buffering_t * buffering)
+{
+  if (buffering && (buffering->timeout_ts < vlib_time_now (vm)))
+    return 1;
+  return 0;
+}
+
+static_always_inline u8
+virtio_vring_buffering_is_empty (virtio_vring_buffering_t * buffering)
+{
+  if (buffering->size == buffering->free_size)
+    return 1;
+  return 0;
+}
+
+static_always_inline u8
+virtio_vring_buffering_is_full (virtio_vring_buffering_t * buffering)
+{
+  if (buffering->free_size == 0)
+    return 1;
+  return 0;
+}
+
+static_always_inline u16
+virtio_vring_n_buffers (virtio_vring_buffering_t * buffering)
+{
+  return (buffering->size - buffering->free_size);
+}
+
+static_always_inline u16
+virtio_vring_buffering_store_packets (virtio_vring_buffering_t * buffering,
+				      u32 * bi, u16 n_store)
+{
+  u16 mask, n_s = 0, i = 0;
+
+  if (!virtio_vring_buffering_is_enable (buffering)
+      || virtio_vring_buffering_is_full (buffering))
+    return 0;
+
+  mask = buffering->size - 1;
+  n_s = clib_min (n_store, buffering->free_size);
+
+  while (i < n_s)
+    {
+      buffering->buffers[buffering->back] = bi[i];
+      buffering->back = (buffering->back + 1) & mask;
+      buffering->free_size--;
+      i++;
+    }
+  return n_s;
+}
+
+static_always_inline u32
+virtio_vring_buffering_read_from_front (virtio_vring_buffering_t * buffering)
+{
+  u32 bi = ~0;
+  u16 mask = buffering->size - 1;
+  if (virtio_vring_buffering_is_empty (buffering))
+    return bi;
+
+  bi = buffering->buffers[buffering->front];
+  buffering->buffers[buffering->front] = ~0;
+  buffering->front = (buffering->front + 1) & mask;
+  buffering->free_size++;
+  return bi;
+}
+
+static_always_inline u32
+virtio_vring_buffering_read_from_back (virtio_vring_buffering_t * buffering)
+{
+  u32 bi = ~0;
+  u16 mask = buffering->size - 1;
+  if (virtio_vring_buffering_is_empty (buffering))
+    return bi;
+
+  buffering->back = (buffering->back - 1) & mask;
+  bi = buffering->buffers[buffering->back];
+  buffering->buffers[buffering->back] = ~0;
+  buffering->free_size++;
+  return bi;
+}
+
+static_always_inline void
+virtio_vring_buffering_schedule_node_on_dispatcher (vlib_main_t * vm,
+						    virtio_vring_buffering_t *
+						    buffering)
+{
+  if (buffering && virtio_vring_buffering_is_timeout (vm, buffering)
+      && virtio_vring_n_buffers (buffering))
+    {
+      vlib_frame_t *f = vlib_get_frame_to_node (vm, buffering->node_index);
+      u32 *f_to = vlib_frame_vector_args (f);
+      f_to[f->n_vectors] = virtio_vring_buffering_read_from_back (buffering);
+      f->n_vectors++;
+      vlib_put_frame_to_node (vm, buffering->node_index, f);
+      virtio_vring_buffering_set_timeout (vm, buffering,
+					  VIRTIO_BUFFERING_TIMEOUT);
+    }
+}
+
+static_always_inline u8 *
+virtio_vring_buffering_format (u8 * s, va_list * args)
+{
+  virtio_vring_buffering_t *buffering =
+    va_arg (*args, virtio_vring_buffering_t *);
+  u32 indent = format_get_indent (s);
+
+  if (!buffering)
+    return s;
+
+  indent += 2;
+
+  if (buffering->is_enable)
+    s = format (s, "packet-buffering: enable\n");
+  else
+    s = format (s, "packet-buffering: disable\n");
+  s =
+    format (s,
+	    "%Usize %u n_buffers %u front %u back %u",
+	    format_white_space, indent, buffering->size,
+	    virtio_vring_n_buffers (buffering), buffering->front,
+	    buffering->back);
+
+  return s;
+}
+
+#endif /* _VNET_DEVICES_VIRTIO_VIRTIO_BUFFERING_H_ */
+
+/*
+ * fd.io coding-style-patch-verification: ON
+ *
+ * Local Variables:
+ * eval: (c-set-style "gnu")
+ * End:
+ */