session: separate ctrl, new and old events

Type: feature

Change-Id: I5e030b23943c012d8191ff657165055d33ec87a2
Signed-off-by: Florin Coras <fcoras@cisco.com>
diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c
index 1bb5cb2..35704b7 100644
--- a/src/vnet/session/session_node.c
+++ b/src/vnet/session/session_node.c
@@ -665,7 +665,7 @@
     }
 
   ctx->snd_space = transport_connection_snd_space (ctx->tc,
-						   wrk->vm->clib_time.
+						   vm->clib_time.
 						   last_cpu_time,
 						   ctx->snd_mss);
 
@@ -693,6 +693,8 @@
       if (n_bufs)
 	vlib_buffer_free (vm, wrk->tx_buffers, n_bufs);
       session_evt_add_old (wrk, elt);
+      vlib_node_increment_counter (wrk->vm, node->node_index,
+				   SESSION_QUEUE_ERROR_NO_BUFFER, 1);
       return SESSION_TX_NO_BUFFERS;
     }
 
@@ -854,41 +856,29 @@
 {
   session_main_t *smm = &session_main;
   app_worker_t *app_wrk;
+  clib_llist_index_t ei;
   void (*fp) (void *);
   session_event_t *e;
   session_t *s;
-  int rv;
 
+  ei = clib_llist_entry_index (wrk->event_elts, elt);
   e = &elt->evt;
+
   switch (e->event_type)
     {
     case SESSION_IO_EVT_TX_FLUSH:
     case SESSION_IO_EVT_TX:
-      /* Don't try to send more that one frame per dispatch cycle */
-      if (*n_tx_packets == VLIB_FRAME_SIZE)
-	{
-	  session_evt_add_postponed (wrk, elt);
-	  return;
-	}
-
       s = session_event_get_session (e, thread_index);
       if (PREDICT_FALSE (!s))
 	{
-	  clib_warning ("session was freed!");
+	  clib_warning ("session %u was freed!", e->session_index);
 	  break;
 	}
       CLIB_PREFETCH (s->tx_fifo, 2 * CLIB_CACHE_LINE_BYTES, LOAD);
       wrk->ctx.s = s;
       /* Spray packets in per session type frames, since they go to
        * different nodes */
-      rv = (smm->session_tx_fns[s->session_type]) (wrk, node, elt,
-						   n_tx_packets);
-      if (PREDICT_FALSE (rv == SESSION_TX_NO_BUFFERS))
-	{
-	  vlib_node_increment_counter (wrk->vm, node->node_index,
-				       SESSION_QUEUE_ERROR_NO_BUFFER, 1);
-	  break;
-	}
+      (smm->session_tx_fns[s->session_type]) (wrk, node, elt, n_tx_packets);
       break;
     case SESSION_IO_EVT_RX:
       s = session_event_get_session (e, thread_index);
@@ -941,6 +931,11 @@
     default:
       clib_warning ("unhandled event type %d", e->event_type);
     }
+
+  /* Regrab elements in case pool moved */
+  elt = pool_elt_at_index (wrk->event_elts, ei);
+  if (!clib_llist_elt_is_linked (elt, evt_list))
+    session_evt_elt_free (wrk, elt);
 }
 
 static uword
@@ -950,10 +945,11 @@
   session_main_t *smm = vnet_get_session_main ();
   u32 thread_index = vm->thread_index, n_to_dequeue;
   session_worker_t *wrk = &smm->wrk[thread_index];
-  session_evt_elt_t *elt, *new_he, *new_te, *old_he;
-  session_evt_elt_t *disconnects_he, *postponed_he;
+  session_evt_elt_t *elt, *ctrl_he, *new_he, *old_he;
   svm_msg_q_msg_t _msg, *msg = &_msg;
+  clib_llist_index_t old_ti;
   int i, n_tx_packets = 0;
+  session_event_t *evt;
   svm_msg_q_t *mq;
 
   SESSION_EVT (SESSION_EVT_DISPATCH_START, wrk);
@@ -965,12 +961,9 @@
    */
   transport_update_time (wrk->last_vlib_time, thread_index);
 
-  /* Make sure postponed events are handled first */
-  new_he = pool_elt_at_index (wrk->event_elts, wrk->new_head);
-  new_te = clib_llist_prev (wrk->event_elts, evt_list, new_he);
-
-  postponed_he = pool_elt_at_index (wrk->event_elts, wrk->postponed_head);
-  clib_llist_splice (wrk->event_elts, evt_list, new_te, postponed_he);
+  /*
+   *  Dequeue and handle new events
+   */
 
   /* Try to dequeue what is available. Don't wait for lock.
    * XXX: we may need priorities here */
@@ -980,44 +973,78 @@
     {
       for (i = 0; i < n_to_dequeue; i++)
 	{
-	  elt = session_evt_elt_alloc (wrk);
 	  svm_msg_q_sub_w_lock (mq, msg);
+	  evt = svm_msg_q_msg_data (mq, msg);
+	  if (evt->event_type > SESSION_IO_EVT_BUILTIN_TX)
+	    elt = session_evt_alloc_ctrl (wrk);
+	  else
+	    elt = session_evt_alloc_new (wrk);
 	  /* Works because reply messages are smaller than a session evt.
 	   * If we ever need to support bigger messages this needs to be
 	   * fixed */
-	  clib_memcpy_fast (&elt->evt, svm_msg_q_msg_data (mq, msg),
-			    sizeof (elt->evt));
+	  clib_memcpy_fast (&elt->evt, evt, sizeof (elt->evt));
 	  svm_msg_q_free_msg (mq, msg);
-	  new_he = pool_elt_at_index (wrk->event_elts, wrk->new_head);
-	  clib_llist_add_tail (wrk->event_elts, evt_list, elt, new_he);
 	}
       svm_msg_q_unlock (mq);
     }
 
+  /*
+   * Handle control events
+   */
+
+  ctrl_he = pool_elt_at_index (wrk->event_elts, wrk->ctrl_head);
+
+  /* *INDENT-OFF* */
+  clib_llist_foreach_safe (wrk->event_elts, evt_list, ctrl_he, elt, ({
+    clib_llist_remove (wrk->event_elts, evt_list, elt);
+    session_event_dispatch (wrk, node, elt, thread_index, &n_tx_packets);
+  }));
+  /* *INDENT-ON* */
+
+  /*
+   * Handle the new io events.
+   */
+
+  new_he = pool_elt_at_index (wrk->event_elts, wrk->new_head);
+
+  /* *INDENT-OFF* */
+  clib_llist_foreach_safe (wrk->event_elts, evt_list, new_he, elt, ({
+    session_evt_type_t et;
+
+    et = elt->evt.event_type;
+    clib_llist_remove (wrk->event_elts, evt_list, elt);
+
+    /* Postpone tx events if we can't handle them this dispatch cycle */
+    if (n_tx_packets >= VLIB_FRAME_SIZE
+	&& (et == SESSION_IO_EVT_TX || et == SESSION_IO_EVT_TX_FLUSH))
+      {
+	clib_llist_add (wrk->event_elts, evt_list, elt, new_he);
+	continue;
+      }
+
+    session_event_dispatch (wrk, node, elt, thread_index, &n_tx_packets);
+  }));
+  /* *INDENT-ON* */
+
+  /*
+   * Handle the old io events
+   */
+
   old_he = pool_elt_at_index (wrk->event_elts, wrk->old_head);
-  disconnects_he = pool_elt_at_index (wrk->event_elts, wrk->disconnects_head);
+  old_ti = clib_llist_prev_index (old_he, evt_list);
 
-  new_te = clib_llist_prev (wrk->event_elts, evt_list, new_he);
-  clib_llist_splice (wrk->event_elts, evt_list, new_te, old_he);
-  new_te = clib_llist_prev (wrk->event_elts, evt_list, new_he);
-  clib_llist_splice (wrk->event_elts, evt_list, new_te, disconnects_he);
-
-  while (!clib_llist_is_empty (wrk->event_elts, evt_list, new_he))
+  while (!clib_llist_is_empty (wrk->event_elts, evt_list, old_he))
     {
       clib_llist_index_t ei;
 
-      clib_llist_pop_first (wrk->event_elts, evt_list, elt, new_he);
+      clib_llist_pop_first (wrk->event_elts, evt_list, elt, old_he);
       ei = clib_llist_entry_index (wrk->event_elts, elt);
-
       session_event_dispatch (wrk, node, elt, thread_index, &n_tx_packets);
 
-      /* Regrab elements in case pool moved */
-      elt = pool_elt_at_index (wrk->event_elts, ei);
-      if (!clib_llist_elt_is_linked (elt, evt_list))
-	session_evt_elt_free (wrk, elt);
-
-      new_he = pool_elt_at_index (wrk->event_elts, wrk->new_head);
-    }
+      old_he = pool_elt_at_index (wrk->event_elts, wrk->old_head);
+      if (n_tx_packets >= VLIB_FRAME_SIZE || ei == old_ti)
+	break;
+    };
 
   vlib_node_increment_counter (vm, session_queue_node.index,
 			       SESSION_QUEUE_ERROR_TX, n_tx_packets);
@@ -1166,7 +1193,8 @@
 
   /* *INDENT-OFF* */
   clib_llist_foreach (wrk->event_elts, evt_list,
-                      session_evt_old_head (wrk), elt, ({
+                      pool_elt_at_index (wrk->event_elts, wrk->old_head),
+                      elt, ({
     found = session_node_cmp_event (&elt->evt, f);
     if (found)
       {