svm: fifo ooo reads/writes with multiple chunks

Change-Id: If23a04623a7138c9f6c98ee9ecfa587396618a60
Signed-off-by: Florin Coras <fcoras@cisco.com>
diff --git a/src/svm/svm_fifo.c b/src/svm/svm_fifo.c
index f457128..a335519 100644
--- a/src/svm/svm_fifo.c
+++ b/src/svm/svm_fifo.c
@@ -22,7 +22,7 @@
 
 CLIB_MARCH_FN (svm_fifo_copy_to_chunk, void, svm_fifo_t * f,
 	       svm_fifo_chunk_t * c, u32 tail_idx, const u8 * src, u32 len,
-	       u8 update_tail)
+	       svm_fifo_chunk_t ** last)
 {
   u32 n_chunk;
 
@@ -30,18 +30,19 @@
 
   tail_idx -= c->start_byte;
   n_chunk = c->length - tail_idx;
-  if (n_chunk < len)
+  if (n_chunk <= len)
     {
       u32 to_copy = len;
       clib_memcpy_fast (&c->data[tail_idx], src, n_chunk);
+      c = c->next;
       while ((to_copy -= n_chunk))
 	{
-	  c = c->next;
 	  n_chunk = clib_min (c->length, to_copy);
 	  clib_memcpy_fast (&c->data[0], src + (len - to_copy), n_chunk);
+	  c = c->length <= to_copy ? c->next : c;
 	}
-      if (update_tail)
-	f->tail_chunk = c;
+      if (*last)
+	*last = c;
     }
   else
     {
@@ -51,24 +52,27 @@
 
 CLIB_MARCH_FN (svm_fifo_copy_from_chunk, void, svm_fifo_t * f,
 	       svm_fifo_chunk_t * c, u32 head_idx, u8 * dst, u32 len,
-	       u8 update_head)
+	       svm_fifo_chunk_t ** last)
 {
   u32 n_chunk;
 
+  ASSERT (head_idx >= c->start_byte && head_idx < c->start_byte + c->length);
+
   head_idx -= c->start_byte;
   n_chunk = c->length - head_idx;
-  if (n_chunk < len)
+  if (n_chunk <= len)
     {
       u32 to_copy = len;
       clib_memcpy_fast (dst, &c->data[head_idx], n_chunk);
+      c = c->next;
       while ((to_copy -= n_chunk))
 	{
-	  c = c->next;
 	  n_chunk = clib_min (c->length, to_copy);
 	  clib_memcpy_fast (dst + (len - to_copy), &c->data[0], n_chunk);
+	  c = c->length <= to_copy ? c->next : c;
 	}
-      if (update_head)
-	f->head_chunk = c;
+      if (*last)
+	*last = c;
     }
   else
     {
@@ -80,18 +84,18 @@
 
 static inline void
 svm_fifo_copy_to_chunk (svm_fifo_t * f, svm_fifo_chunk_t * c, u32 tail_idx,
-			const u8 * src, u32 len, u8 update_tail)
+			const u8 * src, u32 len, svm_fifo_chunk_t ** last)
 {
   CLIB_MARCH_FN_SELECT (svm_fifo_copy_to_chunk) (f, c, tail_idx, src, len,
-						 update_tail);
+						 last);
 }
 
 static inline void
 svm_fifo_copy_from_chunk (svm_fifo_t * f, svm_fifo_chunk_t * c, u32 head_idx,
-			  u8 * dst, u32 len, u8 update_head)
+			  u8 * dst, u32 len, svm_fifo_chunk_t ** last)
 {
   CLIB_MARCH_FN_SELECT (svm_fifo_copy_from_chunk) (f, c, head_idx, dst, len,
-						   update_head);
+						   last);
 }
 
 static inline u8
@@ -290,8 +294,8 @@
   f->refcnt = 1;
   f->default_chunk.start_byte = 0;
   f->default_chunk.length = f->size;
-  f->default_chunk.next = f->start_chunk = &f->default_chunk;
-  f->end_chunk = f->head_chunk = f->tail_chunk = f->start_chunk;
+  f->default_chunk.next = f->start_chunk = f->end_chunk = &f->default_chunk;
+  f->head_chunk = f->tail_chunk = f->ooo_enq = f->ooo_deq = f->start_chunk;
 }
 
 /** create an svm fifo, in the current heap. Fails vs blow up the process */
@@ -319,12 +323,13 @@
   svm_fifo_chunk_t *prev;
   u32 add_bytes = 0;
 
-  prev = f->end_chunk;
+  if (!c)
+    return;
+
+  f->end_chunk->next = c;
   while (c)
     {
-      c->start_byte = prev->start_byte + prev->length;
       add_bytes += c->length;
-      prev->next = c;
       prev = c;
       c = c->next;
     }
@@ -348,26 +353,112 @@
 void
 svm_fifo_add_chunk (svm_fifo_t * f, svm_fifo_chunk_t * c)
 {
-  if (svm_fifo_is_wrapped (f))
-    {
-      if (f->new_chunks)
-	{
-	  svm_fifo_chunk_t *prev;
+  svm_fifo_chunk_t *cur, *prev;
 
-	  prev = f->new_chunks;
-	  while (prev->next)
-	    prev = prev->next;
-	  prev->next = c;
-	}
-      else
-	{
-	  f->new_chunks = c;
-	}
-      f->flags |= SVM_FIFO_F_SIZE_UPDATE;
+  /* Initialize rbtree if needed and add default chunk to it */
+  if (!(f->flags & SVM_FIFO_F_MULTI_CHUNK))
+    {
+      rb_tree_init (&f->chunk_lookup);
+      rb_tree_add2 (&f->chunk_lookup, 0, pointer_to_uword (f->start_chunk));
+      f->flags |= SVM_FIFO_F_MULTI_CHUNK;
+    }
+
+  /* Initialize chunks and add to lookup rbtree. Expectation is that this is
+   * called with the heap where the rbtree's pool is pushed. */
+  cur = c;
+  if (f->new_chunks)
+    {
+      prev = f->new_chunks;
+      while (prev->next)
+	prev = prev->next;
+      prev->next = c;
+    }
+  else
+    prev = f->end_chunk;
+
+  while (cur)
+    {
+      cur->start_byte = prev->start_byte + prev->length;
+      rb_tree_add2 (&f->chunk_lookup, cur->start_byte,
+		    pointer_to_uword (cur));
+      prev = cur;
+      cur = cur->next;
+    }
+
+  /* If fifo is not wrapped, update the size now */
+  if (!svm_fifo_is_wrapped (f))
+    {
+      ASSERT (!f->new_chunks);
+      svm_fifo_size_update (f, c);
       return;
     }
 
-  svm_fifo_size_update (f, c);
+  /* Postpone size update */
+  if (!f->new_chunks)
+    {
+      f->new_chunks = c;
+      f->flags |= SVM_FIFO_F_SIZE_UPDATE;
+    }
+}
+
+static inline u8
+svm_fifo_chunk_includes_pos (svm_fifo_chunk_t * c, u32 pos)
+{
+  return (pos >= c->start_byte && pos < c->start_byte + c->length);
+}
+
+/**
+ * Find chunk for given byte position
+ *
+ * @param f	fifo
+ * @param pos	normalized position in fifo
+ *
+ * @return chunk that includes given position or 0
+ */
+static svm_fifo_chunk_t *
+svm_fifo_find_chunk (svm_fifo_t * f, u32 pos)
+{
+  rb_tree_t *rt = &f->chunk_lookup;
+  rb_node_t *cur, *prev;
+  svm_fifo_chunk_t *c;
+
+  cur = rb_node (rt, rt->root);
+  while (pos != cur->key)
+    {
+      prev = cur;
+      if (pos < cur->key)
+	cur = rb_node_left (rt, cur);
+      else
+	cur = rb_node_right (rt, cur);
+
+      if (rb_node_is_tnil (rt, cur))
+	{
+	  /* Hit tnil as a left child. Find predecessor */
+	  if (pos < prev->key)
+	    {
+	      cur = rb_tree_predecessor (rt, prev);
+	      c = uword_to_pointer (cur->opaque, svm_fifo_chunk_t *);
+	      if (svm_fifo_chunk_includes_pos (c, pos))
+		return c;
+	      return 0;
+	    }
+	  /* Hit tnil as a right child. Check if this is the one, otherwise
+	   * search for successor */
+	  c = uword_to_pointer (prev->opaque, svm_fifo_chunk_t *);
+	  if (svm_fifo_chunk_includes_pos (c, pos))
+	    return c;
+
+	  cur = rb_tree_successor (rt, prev);
+	  c = uword_to_pointer (cur->opaque, svm_fifo_chunk_t *);
+	  if (svm_fifo_chunk_includes_pos (c, pos))
+	    return c;
+	  return 0;
+	}
+    }
+
+  if (!rb_node_is_tnil (rt, cur))
+    return uword_to_pointer (cur->opaque, svm_fifo_chunk_t *);
+  return 0;
 }
 
 void
@@ -650,7 +741,7 @@
   len = clib_min (free_count, len);
 
   svm_fifo_copy_to_chunk (f, f->tail_chunk, tail % f->size, src, len,
-			  1 /* update tail */ );
+			  &f->tail_chunk);
   tail += len;
 
   svm_fifo_trace_add (f, head, n_total, 2);
@@ -675,7 +766,7 @@
 int
 svm_fifo_enqueue_with_offset (svm_fifo_t * f, u32 offset, u32 len, u8 * src)
 {
-  u32 tail, head, free_count;
+  u32 tail, head, free_count, tail_idx;
 
   f_load_head_tail_prod (f, &head, &tail);
 
@@ -692,8 +783,12 @@
 
   ooo_segment_add (f, offset, head, tail, len);
 
-  svm_fifo_copy_to_chunk (f, f->tail_chunk, (tail + offset) % f->size, src,
-			  len, 0 /* update tail */ );
+  tail_idx = (tail + offset) % f->size;
+
+  if (!svm_fifo_chunk_includes_pos (f->ooo_enq, tail_idx))
+    f->ooo_enq = svm_fifo_find_chunk (f, tail_idx);
+
+  svm_fifo_copy_to_chunk (f, f->ooo_enq, tail_idx, src, len, &f->ooo_enq);
 
   return 0;
 }
@@ -712,10 +807,9 @@
     return -2;			/* nothing in the fifo */
 
   len = clib_min (cursize, len);
-  ASSERT (cursize >= len);
 
   svm_fifo_copy_from_chunk (f, f->head_chunk, head % f->size, dst, len,
-			    1 /* update head */ );
+			    &f->head_chunk);
   head += len;
 
   if (PREDICT_FALSE (f->flags & SVM_FIFO_F_SIZE_UPDATE))
@@ -728,23 +822,24 @@
 }
 
 int
-svm_fifo_peek (svm_fifo_t * f, u32 relative_offset, u32 len, u8 * dst)
+svm_fifo_peek (svm_fifo_t * f, u32 offset, u32 len, u8 * dst)
 {
-  u32 tail, head, cursize;
+  u32 tail, head, cursize, head_idx;
 
   f_load_head_tail_cons (f, &head, &tail);
 
   /* current size of fifo can only increase during peek: SPSC */
   cursize = f_cursize (f, head, tail);
 
-  if (PREDICT_FALSE (cursize < relative_offset))
+  if (PREDICT_FALSE (cursize < offset))
     return -2;			/* nothing in the fifo */
 
-  len = clib_min (cursize - relative_offset, len);
+  len = clib_min (cursize - offset, len);
+  head_idx = (head + offset) % f->size;
+  if (!svm_fifo_chunk_includes_pos (f->ooo_deq, head_idx))
+    f->ooo_deq = svm_fifo_find_chunk (f, head_idx);
 
-  svm_fifo_copy_from_chunk (f, f->head_chunk,
-			    (head + relative_offset) % f->size, dst, len,
-			    0 /* update head */ );
+  svm_fifo_copy_from_chunk (f, f->ooo_deq, head_idx, dst, len, &f->ooo_deq);
   return len;
 }
 
@@ -861,10 +956,20 @@
  * Set fifo pointers to requested offset
  */
 void
-svm_fifo_init_pointers (svm_fifo_t * f, u32 pointer)
+svm_fifo_init_pointers (svm_fifo_t * f, u32 head, u32 tail)
 {
-  clib_atomic_store_rel_n (&f->head, pointer);
-  clib_atomic_store_rel_n (&f->tail, pointer);
+  clib_atomic_store_rel_n (&f->head, head);
+  clib_atomic_store_rel_n (&f->tail, tail);
+  if (f->flags & SVM_FIFO_F_MULTI_CHUNK)
+    {
+      svm_fifo_chunk_t *c;
+      c = svm_fifo_find_chunk (f, head % f->size);
+      ASSERT (c != 0);
+      f->head_chunk = f->ooo_deq = c;
+      c = svm_fifo_find_chunk (f, tail % f->size);
+      ASSERT (c != 0);
+      f->tail_chunk = f->ooo_enq = c;
+    }
 }
 
 void