session: improve disconnect handling

If the caller is the session owning thread or the main thread with a
worker barrier sync (cli/api) add an event to the pending disconnects
vector in the session node and entirely avoid using the event queue.
Useful for bursts of disconnects (like an app detach).

If disconnects come from a processes, be willing to retry enqueueing the
disconnect to the event queue multiple times.

Change-Id: Ieece1f1091b713f94c41c703b6e805bc8498816a
Signed-off-by: Florin Coras <fcoras@cisco.com>
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c
index c016325..ee02526 100644
--- a/src/vnet/session/session.c
+++ b/src/vnet/session/session.c
@@ -31,9 +31,9 @@
 session_send_evt_to_thread (u64 session_handle, fifo_event_type_t evt_type,
 			    u32 thread_index, void *fp, void *rpc_args)
 {
-  u32 tries = 0;
   session_fifo_event_t evt = { {0}, };
   svm_queue_t *q;
+  u32 tries = 0, max_tries;
 
   evt.event_type = evt_type;
   if (evt_type == FIFO_EVENT_RPC)
@@ -47,7 +47,8 @@
   q = session_manager_get_vpp_event_queue (thread_index);
   while (svm_queue_add (q, (u8 *) & evt, 1))
     {
-      if (tries++ == 3)
+      max_tries = vlib_get_current_process (vlib_get_main ())? 1e6 : 3;
+      if (tries++ == max_tries)
 	{
 	  SESSION_DBG ("failed to enqueue evt");
 	  break;
@@ -450,7 +451,7 @@
   session_fifo_event_t evt;
   svm_queue_t *q;
 
-  if (PREDICT_FALSE (s->session_state == SESSION_STATE_CLOSED))
+  if (PREDICT_FALSE (s->session_state >= SESSION_STATE_CLOSING))
     {
       /* Session is closed so app will never clean up. Flush rx fifo */
       u32 to_dequeue = svm_fifo_max_dequeue (s->server_rx_fifo);
@@ -1059,11 +1060,28 @@
 void
 stream_session_disconnect (stream_session_t * s)
 {
-  if (!s || s->session_state == SESSION_STATE_CLOSED)
+  u32 thread_index = vlib_get_thread_index ();
+  session_manager_main_t *smm = &session_manager_main;
+  session_fifo_event_t *evt;
+
+  if (!s || s->session_state >= SESSION_STATE_CLOSING)
     return;
-  s->session_state = SESSION_STATE_CLOSED;
-  session_send_session_evt_to_thread (session_handle (s),
-				      FIFO_EVENT_DISCONNECT, s->thread_index);
+  s->session_state = SESSION_STATE_CLOSING;
+
+  /* If we are in the handler thread, or being called with the worker barrier
+   * held (api/cli), just append a new event pending disconnects vector. */
+  if (thread_index > 0 || !vlib_get_current_process (vlib_get_main ()))
+    {
+      ASSERT (s->thread_index == thread_index || thread_index == 0);
+      vec_add2 (smm->pending_disconnects[s->thread_index], evt, 1);
+      memset (evt, 0, sizeof (*evt));
+      evt->session_handle = session_handle (s);
+      evt->event_type = FIFO_EVENT_DISCONNECT;
+    }
+  else
+    session_send_session_evt_to_thread (session_handle (s),
+					FIFO_EVENT_DISCONNECT,
+					s->thread_index);
 }
 
 /**
diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c
index e046efb..b6c1b2f 100644
--- a/src/vnet/session/session_node.c
+++ b/src/vnet/session/session_node.c
@@ -692,7 +692,7 @@
 		       vlib_frame_t * frame)
 {
   session_manager_main_t *smm = vnet_get_session_manager_main ();
-  session_fifo_event_t *my_pending_event_vector, *pending_disconnects, *e;
+  session_fifo_event_t *my_pending_event_vector, *e;
   session_fifo_event_t *my_fifo_events;
   u32 n_to_dequeue, n_events;
   svm_queue_t *q;
@@ -722,10 +722,9 @@
   /* min number of events we can dequeue without blocking */
   n_to_dequeue = q->cursize;
   my_pending_event_vector = smm->pending_event_vector[thread_index];
-  pending_disconnects = smm->pending_disconnects[thread_index];
 
   if (!n_to_dequeue && !vec_len (my_pending_event_vector)
-      && !vec_len (pending_disconnects))
+      && !vec_len (smm->pending_disconnects[thread_index]))
     return 0;
 
   SESSION_EVT_DBG (SESSION_EVT_DEQ_NODE, 0);
@@ -802,12 +801,20 @@
 	case FIFO_EVENT_DISCONNECT:
 	  /* Make sure stream disconnects run after the pending list is drained */
 	  s0 = session_get_from_handle (e0->session_handle);
-	  if (!e0->postponed || svm_fifo_max_dequeue (s0->server_tx_fifo))
+	  if (!e0->postponed)
 	    {
 	      e0->postponed = 1;
 	      vec_add1 (smm->pending_disconnects[thread_index], *e0);
 	      continue;
 	    }
+	  /* If tx queue is still not empty, wait a bit */
+	  if (svm_fifo_max_dequeue (s0->server_tx_fifo)
+	      && e0->postponed < 200)
+	    {
+	      e0->postponed += 1;
+	      vec_add1 (smm->pending_disconnects[thread_index], *e0);
+	      continue;
+	    }
 
 	  stream_session_disconnect_transport (s0);
 	  break;
diff --git a/src/vnet/session/stream_session.h b/src/vnet/session/stream_session.h
index 9e0e4d9..1673356 100644
--- a/src/vnet/session/stream_session.h
+++ b/src/vnet/session/stream_session.h
@@ -32,6 +32,7 @@
   SESSION_STATE_ACCEPTING,
   SESSION_STATE_READY,
   SESSION_STATE_OPENED,
+  SESSION_STATE_CLOSING,
   SESSION_STATE_CLOSED,
   SESSION_STATE_N_STATES,
 } stream_session_state_t;