session: async rx event notifications

Move from synchronous flushing of io and ctrl events from transports to
applications to an async model via a new session_input input node that
runs in interrupt mode. Events are coalesced per application worker.

On the one hand, this helps by minimizing message queue locking churn.
And on the other, it opens the possibility for further optimizations of
event message generation, obviates need for rx rescheduling rpcs and is
a first step towards a fully async data/io rx path.

Type: improvement

Signed-off-by: Florin Coras <fcoras@cisco.com>
Change-Id: Id6bebcb65fc9feef8aa02ddf1af6d9ba6f6745ce
diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c
index e15625e..4f2cae4 100644
--- a/src/vnet/session/session_node.c
+++ b/src/vnet/session/session_node.c
@@ -142,10 +142,14 @@
     session_worker_stat_error_inc (wrk, rv, 1);
 
   app_wrk = application_get_worker (app, mp->wrk_index);
-  mq_send_session_bound_cb (app_wrk->wrk_index, mp->context, a->handle, rv);
+  app_worker_listened_notify (app_wrk, a->handle, mp->context, rv);
 
   if (mp->ext_config)
     session_mq_free_ext_config (app, mp->ext_config);
+
+  /* Make sure events are flushed before releasing barrier, to avoid
+   * potential race with accept. */
+  app_wrk_flush_wrk_events (app_wrk, 0);
 }
 
 static void
@@ -170,7 +174,8 @@
   rv = vnet_bind_uri (a);
 
   app_wrk = application_get_worker (app, 0);
-  mq_send_session_bound_cb (app_wrk->wrk_index, mp->context, a->handle, rv);
+  app_worker_listened_notify (app_wrk, a->handle, mp->context, rv);
+  app_wrk_flush_wrk_events (app_wrk, 0);
 }
 
 static void
@@ -215,7 +220,7 @@
       wrk = session_main_get_worker (vlib_get_thread_index ());
       session_worker_stat_error_inc (wrk, rv, 1);
       app_wrk = application_get_worker (app, mp->wrk_index);
-      mq_send_session_connected_cb (app_wrk->wrk_index, mp->context, 0, rv);
+      app_worker_connect_notify (app_wrk, 0, rv, mp->context);
     }
 
   if (mp->ext_config)
@@ -324,7 +329,7 @@
     {
       session_worker_stat_error_inc (wrk, rv, 1);
       app_wrk = application_get_worker (app, 0 /* default wrk only */ );
-      mq_send_session_connected_cb (app_wrk->wrk_index, mp->context, 0, rv);
+      app_worker_connect_notify (app_wrk, 0, rv, mp->context);
     }
 }
 
@@ -410,7 +415,7 @@
   if (!app_wrk)
     return;
 
-  mq_send_unlisten_reply (app_wrk, sh, mp->context, rv);
+  app_worker_unlisten_reply (app_wrk, sh, mp->context, rv);
 }
 
 static void
@@ -466,7 +471,7 @@
   session_set_state (s, SESSION_STATE_READY);
 
   if (!svm_fifo_is_empty_prod (s->rx_fifo))
-    app_worker_lock_and_send_event (app_wrk, s, SESSION_IO_EVT_RX);
+    app_worker_rx_notify (app_wrk, s);
 
   /* Closed while waiting for app to reply. Resend disconnect */
   if (old_state >= SESSION_STATE_TRANSPORT_CLOSING)
@@ -669,7 +674,7 @@
     session_send_io_evt_to_thread (s->tx_fifo, SESSION_IO_EVT_TX);
 
   if (s->rx_fifo && !svm_fifo_is_empty (s->rx_fifo))
-    app_worker_lock_and_send_event (app_wrk, s, SESSION_IO_EVT_RX);
+    app_worker_rx_notify (app_wrk, s);
 
   if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
     app_worker_close_notify (app_wrk, s);
@@ -1790,7 +1795,7 @@
 	break;
       svm_fifo_unset_event (s->rx_fifo);
       app_wrk = app_worker_get (s->app_wrk_index);
-      app_worker_builtin_rx (app_wrk, s);
+      app_worker_rx_notify (app_wrk, s);
       break;
     case SESSION_IO_EVT_TX_MAIN:
       s = session_get_if_valid (e->session_index, 0 /* main thread */);