fib: fix multiple dpo pool expand cases

Add dpo_pool_barrier_sync/release, use them to clean up
thread-unsafe pool expansion cases.

Type: fix

Signed-off-by: Dave Barach <dave@barachs.net>
Change-Id: I09299124a25f8d541e3bb4b75375568990e9b911
diff --git a/src/vnet/dpo/classify_dpo.c b/src/vnet/dpo/classify_dpo.c
index 08fab83..bc536a1 100644
--- a/src/vnet/dpo/classify_dpo.c
+++ b/src/vnet/dpo/classify_dpo.c
@@ -26,8 +26,13 @@
 classify_dpo_alloc (void)
 {
     classify_dpo_t *cd;
+    vlib_main_t *vm;
+    u8 did_barrier_sync;
 
+    dpo_pool_barrier_sync (vm, classify_dpo_pool, did_barrier_sync);
     pool_get_aligned(classify_dpo_pool, cd, CLIB_CACHE_LINE_BYTES);
+    dpo_pool_barrier_release (vm, did_barrier_sync);
+
     clib_memset(cd, 0, sizeof(*cd));
 
     return (cd);
diff --git a/src/vnet/dpo/dpo.h b/src/vnet/dpo/dpo.h
index d3c2371..e5a9bdc 100644
--- a/src/vnet/dpo/dpo.h
+++ b/src/vnet/dpo/dpo.h
@@ -16,7 +16,7 @@
  * @brief
  * A Data-Path Object is an object that represents actions that are
  * applied to packets are they are switched through VPP's data-path.
- * 
+ *
  * The DPO can be considered to be like is a base class that is specialised
  * by other objects to provide concreate actions
  *
@@ -328,7 +328,7 @@
                       const dpo_id_t *parent_dpo);
 
 /**
- * @brief 
+ * @brief
  *  Set and stack a DPO.
  *  The DPO passed is set to the parent DPO and the necessary
  *  VLIB graph arcs are created, from the child_node passed.
@@ -341,7 +341,7 @@
  *
  * @param parent_dpo
  *  The parent DPO to stack onto.
- */ 
+ */
 extern void dpo_stack_from_node(u32 child_node,
                                 dpo_id_t *dpo,
                                 const dpo_id_t *parent);
@@ -443,7 +443,7 @@
  *     (see above).
  *
  * @param type
- *  The type being registered. 
+ *  The type being registered.
  *
  * @param vft
  *  The virtual function table to register for the type.
@@ -497,4 +497,49 @@
                                      dpo_proto_t  child_proto,
                                      dpo_type_t   parent_type,
                                      dpo_proto_t  parent_proto);
+
+
+/**
+ * @brief Barrier sync if a dpo pool is about to expand
+ *
+ * @param VM (output)
+ *  vlib_main_t *, invariably &vlib_global_main
+ *
+ * @param P
+ *  pool pointer
+ *
+ * @param YESNO (output)
+ *  typically a u8, 1 => expand will occur, worker barrier held
+ *                  0 => no expand, barrier not held
+ *
+ * @return YESNO set
+ */
+
+#define dpo_pool_barrier_sync(VM,P,YESNO)                               \
+do {                                                                    \
+    pool_get_aligned_will_expand ((P), YESNO, CLIB_CACHE_LINE_BYTES);   \
+                                                                        \
+    if (YESNO)                                                          \
+    {                                                                   \
+        VM = vlib_get_main();                                           \
+        ASSERT ((VM)->thread_index == 0);                               \
+        vlib_worker_thread_barrier_sync((VM));                          \
+    }                                                                   \
+} while(0);
+
+/**
+ * @brief Release barrier sync after dpo pool expansion
+ *
+ * @param VM
+ *  vlib_main_t pointer, must be &vlib_global_main
+ *
+ * @param YESNO
+ *  typically a u8, 1 => release required
+ *                  0 => no release required
+ * @return none
+ */
+
+#define dpo_pool_barrier_release(VM,YESNO) \
+    if ((YESNO)) vlib_worker_thread_barrier_release((VM));
+
 #endif
diff --git a/src/vnet/dpo/load_balance_map.c b/src/vnet/dpo/load_balance_map.c
index 7da360b..c03acaf 100644
--- a/src/vnet/dpo/load_balance_map.c
+++ b/src/vnet/dpo/load_balance_map.c
@@ -387,8 +387,13 @@
 {
     load_balance_map_t *lbm;
     u32 ii;
+    vlib_main_t *vm;
+    u8 did_barrier_sync;
 
+    dpo_pool_barrier_sync (vm, load_balance_map_pool, did_barrier_sync);
     pool_get_aligned(load_balance_map_pool, lbm, CLIB_CACHE_LINE_BYTES);
+    dpo_pool_barrier_release (vm, did_barrier_sync);
+
     clib_memset(lbm, 0, sizeof(*lbm));
 
     vec_validate(lbm->lbm_paths, vec_len(paths)-1);
diff --git a/src/vnet/dpo/lookup_dpo.c b/src/vnet/dpo/lookup_dpo.c
index daa2352..677a19f 100644
--- a/src/vnet/dpo/lookup_dpo.c
+++ b/src/vnet/dpo/lookup_dpo.c
@@ -63,8 +63,12 @@
 lookup_dpo_alloc (void)
 {
     lookup_dpo_t *lkd;
+    vlib_main_t *vm;
+    u8 did_barrier_sync;
 
+    dpo_pool_barrier_sync (vm, lookup_dpo_pool, did_barrier_sync);
     pool_get_aligned(lookup_dpo_pool, lkd, CLIB_CACHE_LINE_BYTES);
+    dpo_pool_barrier_release (vm, did_barrier_sync);
 
     return (lkd);
 }
@@ -1076,7 +1080,7 @@
              */
             if (table_from_interface)
             {
-                fib_index0 = 
+                fib_index0 =
                     mpls_fib_table_get_index_for_sw_if_index(
                         vnet_buffer(b0)->sw_if_index[VLIB_RX]);
             }
@@ -1142,9 +1146,9 @@
             if (PREDICT_FALSE(vnet_buffer2(b0)->loop_counter > MAX_LUKPS_PER_PACKET))
                 next0 = MPLS_LOOKUP_NEXT_DROP;
 
-	    if (PREDICT_FALSE(b0->flags & VLIB_BUFFER_IS_TRACED)) 
+	    if (PREDICT_FALSE(b0->flags & VLIB_BUFFER_IS_TRACED))
             {
-                lookup_trace_t *tr = vlib_add_trace (vm, node, 
+                lookup_trace_t *tr = vlib_add_trace (vm, node,
                                                      b0, sizeof (*tr));
                 tr->fib_index = fib_index0;
                 tr->lbi = lbi0;
diff --git a/src/vnet/dpo/mpls_label_dpo.c b/src/vnet/dpo/mpls_label_dpo.c
index 9d147f9..683b544 100644
--- a/src/vnet/dpo/mpls_label_dpo.c
+++ b/src/vnet/dpo/mpls_label_dpo.c
@@ -39,8 +39,13 @@
 mpls_label_dpo_alloc (void)
 {
     mpls_label_dpo_t *mld;
+    vlib_main_t *vm;
+    u8 did_barrier_sync;
 
+    dpo_pool_barrier_sync (vm, mpls_label_dpo_pool, did_barrier_sync);
     pool_get_aligned(mpls_label_dpo_pool, mld, CLIB_CACHE_LINE_BYTES);
+    dpo_pool_barrier_release (vm, did_barrier_sync);
+
     clib_memset(mld, 0, sizeof(*mld));
 
     dpo_reset(&mld->mld_dpo);
diff --git a/src/vnet/dpo/receive_dpo.c b/src/vnet/dpo/receive_dpo.c
index 949dbfa..b12b382 100644
--- a/src/vnet/dpo/receive_dpo.c
+++ b/src/vnet/dpo/receive_dpo.c
@@ -35,8 +35,13 @@
 receive_dpo_alloc (void)
 {
     receive_dpo_t *rd;
+    vlib_main_t *vm;
+    u8 did_barrier_sync;
 
+    dpo_pool_barrier_sync (vm, receive_dpo_pool, did_barrier_sync);
     pool_get_aligned(receive_dpo_pool, rd, CLIB_CACHE_LINE_BYTES);
+    dpo_pool_barrier_release (vm, did_barrier_sync);
+
     clib_memset(rd, 0, sizeof(*rd));
 
     return (rd);