Hash and handoff reassembly fragments

in the following two scenarios
    1. When fragments arrive in multiple interfaces and endup in different threads
    2. When fragments arrive in same interafce but in different queues due to interface RSS doesnt have the ability to place fragments in the right queues

Change-Id: I9f9a8a4085692055ef6823d634c8e19ff3daea05
Signed-off-by: Vijayabhaskar Katamreddy <vkatamre@cisco.com>
diff --git a/src/vnet/ip/ip4_reassembly.c b/src/vnet/ip/ip4_reassembly.c
index b54279c..86e5e39 100644
--- a/src/vnet/ip/ip4_reassembly.c
+++ b/src/vnet/ip/ip4_reassembly.c
@@ -78,6 +78,26 @@
   };
 } ip4_reass_key_t;
 
+typedef union
+{
+  struct
+  {
+    u32 reass_index;
+    u32 thread_index;
+  };
+  u64 as_u64;
+} ip4_reass_val_t;
+
+typedef union
+{
+  struct
+  {
+    ip4_reass_key_t k;
+    ip4_reass_val_t v;
+  };
+  clib_bihash_kv_16_8_t kv;
+} ip4_reass_kv_t;
+
 always_inline u32
 ip4_reass_buffer_get_data_offset (vlib_buffer_t * b)
 {
@@ -113,6 +133,7 @@
   u8 next_index;
   // minimum fragment length for this reassembly - used to estimate MTU
   u16 min_fragment_length;
+
 } ip4_reass_t;
 
 typedef struct
@@ -143,6 +164,11 @@
   // node index of ip4-drop node
   u32 ip4_drop_idx;
   u32 ip4_reass_expire_node_idx;
+
+  /** Worker handoff */
+  u32 fq_index;
+  u32 fq_feature_index;
+
 } ip4_reass_main_t;
 
 ip4_reass_main_t ip4_reass_main;
@@ -151,6 +177,7 @@
 {
   IP4_REASSEMBLY_NEXT_INPUT,
   IP4_REASSEMBLY_NEXT_DROP,
+  IP4_REASSEMBLY_NEXT_HANDOFF,
   IP4_REASSEMBLY_N_NEXT,
 } ip4_reass_next_t;
 
@@ -278,6 +305,7 @@
 #endif
 }
 
+
 always_inline void
 ip4_reass_free (ip4_reass_main_t * rm, ip4_reass_per_thread_t * rt,
 		ip4_reass_t * reass)
@@ -325,17 +353,22 @@
 
 ip4_reass_t *
 ip4_reass_find_or_create (vlib_main_t * vm, ip4_reass_main_t * rm,
-			  ip4_reass_per_thread_t * rt, ip4_reass_key_t * k)
+			  ip4_reass_per_thread_t * rt, ip4_reass_kv_t * kv,
+			  u8 * do_handoff)
 {
   ip4_reass_t *reass = NULL;
   f64 now = vlib_time_now (rm->vlib_main);
-  clib_bihash_kv_16_8_t kv, value;
-  kv.key[0] = k->as_u64[0];
-  kv.key[1] = k->as_u64[1];
 
-  if (!clib_bihash_search_16_8 (&rm->hash, &kv, &value))
+  if (!clib_bihash_search_16_8
+      (&rm->hash, (clib_bihash_kv_16_8_t *) kv, (clib_bihash_kv_16_8_t *) kv))
     {
-      reass = pool_elt_at_index (rt->pool, value.value);
+      if (vm->thread_index != kv->v.thread_index)
+	{
+	  *do_handoff = 1;
+	  return NULL;
+	}
+      reass = pool_elt_at_index (rt->pool, kv->v.reass_index);
+
       if (now > reass->last_heard + rm->timeout)
 	{
 	  ip4_reass_on_timeout (vm, rm, reass);
@@ -359,8 +392,7 @@
     {
       pool_get (rt->pool, reass);
       clib_memset (reass, 0, sizeof (*reass));
-      reass->id =
-	((u64) os_get_thread_index () * 1000000000) + rt->id_counter;
+      reass->id = ((u64) vm->thread_index * 1000000000) + rt->id_counter;
       ++rt->id_counter;
       reass->first_bi = ~0;
       reass->last_packet_octet = ~0;
@@ -368,12 +400,13 @@
       ++rt->reass_n;
     }
 
-  reass->key.as_u64[0] = kv.key[0] = k->as_u64[0];
-  reass->key.as_u64[1] = kv.key[1] = k->as_u64[1];
-  kv.value = reass - rt->pool;
+  reass->key.as_u64[0] = ((clib_bihash_kv_16_8_t *) kv)->key[0];
+  reass->key.as_u64[1] = ((clib_bihash_kv_16_8_t *) kv)->key[1];
+  kv->v.reass_index = (reass - rt->pool);
+  kv->v.thread_index = vm->thread_index;
   reass->last_heard = now;
 
-  if (clib_bihash_add_del_16_8 (&rm->hash, &kv, 1))
+  if (clib_bihash_add_del_16_8 (&rm->hash, (clib_bihash_kv_16_8_t *) kv, 1))
     {
       ip4_reass_free (rm, rt, reass);
       reass = NULL;
@@ -889,7 +922,7 @@
   u32 *from = vlib_frame_vector_args (frame);
   u32 n_left_from, n_left_to_next, *to_next, next_index;
   ip4_reass_main_t *rm = &ip4_reass_main;
-  ip4_reass_per_thread_t *rt = &rm->per_thread_data[os_get_thread_index ()];
+  ip4_reass_per_thread_t *rt = &rm->per_thread_data[vm->thread_index];
   clib_spinlock_lock (&rt->lock);
 
   n_left_from = frame->n_vectors;
@@ -934,22 +967,36 @@
 		}
 	      else
 		{
-		  ip4_reass_key_t k;
-		  k.as_u64[0] =
-		    (u64) vnet_buffer (b0)->sw_if_index[VLIB_RX] |
+		  ip4_reass_kv_t kv;
+		  u8 do_handoff = 0;
+
+		  kv.k.as_u64[0] =
+		    (u64) vec_elt (ip4_main.fib_index_by_sw_if_index,
+				   vnet_buffer (b0)->sw_if_index[VLIB_RX]) |
 		    (u64) ip0->src_address.as_u32 << 32;
-		  k.as_u64[1] =
+		  kv.k.as_u64[1] =
 		    (u64) ip0->dst_address.as_u32 |
 		    (u64) ip0->fragment_id << 32 | (u64) ip0->protocol << 48;
 
 		  ip4_reass_t *reass =
-		    ip4_reass_find_or_create (vm, rm, rt, &k);
+		    ip4_reass_find_or_create (vm, rm, rt, &kv, &do_handoff);
 
-		  if (reass)
+		  if (PREDICT_FALSE (do_handoff))
+		    {
+		      next0 = IP4_REASSEMBLY_NEXT_HANDOFF;
+		      if (is_feature)
+			vnet_buffer (b0)->ip.
+			  reass.owner_feature_thread_index =
+			  kv.v.thread_index;
+		      else
+			vnet_buffer (b0)->ip.reass.owner_thread_index =
+			  kv.v.thread_index;
+		    }
+		  else if (reass)
 		    {
 		      switch (ip4_reass_update
-			      (vm, node, rm, rt, reass, &bi0, &next0, &error0,
-			       is_feature))
+			      (vm, node, rm, rt, reass, &bi0, &next0,
+			       &error0, is_feature))
 			{
 			case IP4_REASS_RC_OK:
 			  /* nothing to do here */
@@ -984,8 +1031,9 @@
 		  b0 = vlib_get_buffer (vm, bi0);
 		  vnet_feature_next (&next0, b0);
 		}
-	      vlib_validate_buffer_enqueue_x1 (vm, node, next_index, to_next,
-					       n_left_to_next, bi0, next0);
+	      vlib_validate_buffer_enqueue_x1 (vm, node, next_index,
+					       to_next, n_left_to_next,
+					       bi0, next0);
 	      IP4_REASS_DEBUG_BUFFER (bi0, enqueue_next);
 	    }
 
@@ -1027,6 +1075,8 @@
         {
                 [IP4_REASSEMBLY_NEXT_INPUT] = "ip4-input",
                 [IP4_REASSEMBLY_NEXT_DROP] = "ip4-drop",
+                [IP4_REASSEMBLY_NEXT_HANDOFF] = "ip4-reassembly-handoff",
+
         },
 };
 /* *INDENT-ON* */
@@ -1053,6 +1103,7 @@
         {
                 [IP4_REASSEMBLY_NEXT_INPUT] = "ip4-input",
                 [IP4_REASSEMBLY_NEXT_DROP] = "ip4-drop",
+                [IP4_REASSEMBLY_NEXT_HANDOFF] = "ip4-reass-feature-hoff",
         },
 };
 /* *INDENT-ON* */
@@ -1197,6 +1248,11 @@
   ASSERT (node);
   rm->ip4_drop_idx = node->index;
 
+  rm->fq_index = vlib_frame_queue_main_init (ip4_reass_node.index, 0);
+  rm->fq_feature_index =
+    vlib_frame_queue_main_init (ip4_reass_node_feature.index, 0);
+
+
   return error;
 }
 
@@ -1212,8 +1268,9 @@
   while (true)
     {
       vlib_process_wait_for_event_or_clock (vm,
-					    (f64) rm->expire_walk_interval_ms
-					    / (f64) MSEC_PER_SEC);
+					    (f64)
+					    rm->expire_walk_interval_ms /
+					    (f64) MSEC_PER_SEC);
       event_type = vlib_process_get_events (vm, &event_data);
 
       switch (event_type)
@@ -1332,7 +1389,8 @@
 }
 
 static clib_error_t *
-show_ip4_reass (vlib_main_t * vm, unformat_input_t * input,
+show_ip4_reass (vlib_main_t * vm,
+		unformat_input_t * input,
 		CLIB_UNUSED (vlib_cli_command_t * lmd))
 {
   ip4_reass_main_t *rm = &ip4_reass_main;
@@ -1385,10 +1443,156 @@
 vnet_api_error_t
 ip4_reass_enable_disable (u32 sw_if_index, u8 enable_disable)
 {
-  return vnet_feature_enable_disable ("ip4-unicast", "ip4-reassembly-feature",
-				      sw_if_index, enable_disable, 0, 0);
+  return vnet_feature_enable_disable ("ip4-unicast",
+				      "ip4-reassembly-feature", sw_if_index,
+				      enable_disable, 0, 0);
 }
 
+
+#define foreach_ip4_reassembly_handoff_error                       \
+_(CONGESTION_DROP, "congestion drop")
+
+
+typedef enum
+{
+#define _(sym,str) IP4_REASSEMBLY_HANDOFF_ERROR_##sym,
+  foreach_ip4_reassembly_handoff_error
+#undef _
+    IP4_REASSEMBLY_HANDOFF_N_ERROR,
+} ip4_reassembly_handoff_error_t;
+
+static char *ip4_reassembly_handoff_error_strings[] = {
+#define _(sym,string) string,
+  foreach_ip4_reassembly_handoff_error
+#undef _
+};
+
+typedef struct
+{
+  u32 next_worker_index;
+} ip4_reassembly_handoff_trace_t;
+
+static u8 *
+format_ip4_reassembly_handoff_trace (u8 * s, va_list * args)
+{
+  CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *);
+  CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *);
+  ip4_reassembly_handoff_trace_t *t =
+    va_arg (*args, ip4_reassembly_handoff_trace_t *);
+
+  s =
+    format (s, "ip4-reassembly-handoff: next-worker %d",
+	    t->next_worker_index);
+
+  return s;
+}
+
+always_inline uword
+ip4_reassembly_handoff_node_inline (vlib_main_t * vm,
+				    vlib_node_runtime_t * node,
+				    vlib_frame_t * frame, bool is_feature)
+{
+  ip4_reass_main_t *rm = &ip4_reass_main;
+
+  vlib_buffer_t *bufs[VLIB_FRAME_SIZE], **b;
+  u32 n_enq, n_left_from, *from;
+  u16 thread_indices[VLIB_FRAME_SIZE], *ti;
+  u32 fq_index;
+
+  from = vlib_frame_vector_args (frame);
+  n_left_from = frame->n_vectors;
+  vlib_get_buffers (vm, from, bufs, n_left_from);
+
+  b = bufs;
+  ti = thread_indices;
+
+  fq_index = (is_feature) ? rm->fq_feature_index : rm->fq_index;
+
+  while (n_left_from > 0)
+    {
+      ti[0] =
+	(is_feature) ? vnet_buffer (b[0])->ip.
+	reass.owner_feature_thread_index : vnet_buffer (b[0])->ip.
+	reass.owner_thread_index;
+
+      if (PREDICT_FALSE
+	  ((node->flags & VLIB_NODE_FLAG_TRACE)
+	   && (b[0]->flags & VLIB_BUFFER_IS_TRACED)))
+	{
+	  ip4_reassembly_handoff_trace_t *t =
+	    vlib_add_trace (vm, node, b[0], sizeof (*t));
+	  t->next_worker_index = ti[0];
+	}
+
+      n_left_from -= 1;
+      ti += 1;
+      b += 1;
+    }
+  n_enq =
+    vlib_buffer_enqueue_to_thread (vm, fq_index, from, thread_indices,
+				   frame->n_vectors, 1);
+
+  if (n_enq < frame->n_vectors)
+    vlib_node_increment_counter (vm, node->node_index,
+				 IP4_REASSEMBLY_HANDOFF_ERROR_CONGESTION_DROP,
+				 frame->n_vectors - n_enq);
+  return frame->n_vectors;
+}
+
+VLIB_NODE_FN (ip4_reassembly_handoff_node) (vlib_main_t * vm,
+					    vlib_node_runtime_t * node,
+					    vlib_frame_t * frame)
+{
+  return ip4_reassembly_handoff_node_inline (vm, node, frame,
+					     false /* is_feature */ );
+}
+
+
+/* *INDENT-OFF* */
+VLIB_REGISTER_NODE (ip4_reassembly_handoff_node) = {
+  .name = "ip4-reassembly-handoff",
+  .vector_size = sizeof (u32),
+  .n_errors = ARRAY_LEN(ip4_reassembly_handoff_error_strings),
+  .error_strings = ip4_reassembly_handoff_error_strings,
+  .format_trace = format_ip4_reassembly_handoff_trace,
+
+  .n_next_nodes = 1,
+
+  .next_nodes = {
+    [0] = "error-drop",
+  },
+};
+/* *INDENT-ON* */
+
+
+/* *INDENT-OFF* */
+VLIB_NODE_FN (ip4_reassembly_feature_handoff_node) (vlib_main_t * vm,
+						    vlib_node_runtime_t *
+						    node,
+						    vlib_frame_t * frame)
+{
+  return ip4_reassembly_handoff_node_inline (vm, node, frame,
+					     true /* is_feature */ );
+}
+/* *INDENT-ON* */
+
+
+/* *INDENT-OFF* */
+VLIB_REGISTER_NODE (ip4_reassembly_feature_handoff_node) = {
+  .name = "ip4-reass-feature-hoff",
+  .vector_size = sizeof (u32),
+  .n_errors = ARRAY_LEN(ip4_reassembly_handoff_error_strings),
+  .error_strings = ip4_reassembly_handoff_error_strings,
+  .format_trace = format_ip4_reassembly_handoff_trace,
+
+  .n_next_nodes = 1,
+
+  .next_nodes = {
+    [0] = "error-drop",
+  },
+};
+/* *INDENT-ON* */
+
 /*
  * fd.io coding-style-patch-verification: ON
  *