vcl: use events for epoll/select/read/write

Have vcl poll and wait on the event message queues as opposed to
constantly polling the session fifos. This also adds event signaling to
cut through sessions.

On the downside, because we can't wait on multiple condvars, i.e., when
we have multiple message queues because of cut-through registrations, we
do timed waits.

Change-Id: I29ade95dba449659fe46008bb1af502276a7c5fd
Signed-off-by: Florin Coras <fcoras@cisco.com>
diff --git a/src/vnet/session/application.c b/src/vnet/session/application.c
index 5f18bd2..806e390 100644
--- a/src/vnet/session/application.c
+++ b/src/vnet/session/application.c
@@ -849,12 +849,11 @@
       return 0;
     }
 
-  /* Built-in app? Hand event to the callback... */
   if (app->cb_fns.builtin_app_rx_callback)
     return app->cb_fns.builtin_app_rx_callback (s);
 
-  /* If no need for event, return */
-  if (!svm_fifo_set_event (s->server_rx_fifo))
+  if (svm_fifo_has_event (s->server_rx_fifo)
+      || svm_fifo_is_empty (s->server_rx_fifo))
     return 0;
 
   mq = app->event_queue;
@@ -876,7 +875,10 @@
   evt->fifo = s->server_rx_fifo;
   evt->event_type = FIFO_EVENT_APP_RX;
 
-  return app_enqueue_evt (mq, &msg, lock);
+  if (app_enqueue_evt (mq, &msg, lock))
+    return -1;
+  svm_fifo_set_event (s->server_rx_fifo);
+  return 0;
 }
 
 static inline int
@@ -1081,6 +1083,7 @@
 {
   u32 seg_size, evt_q_sz, evt_q_elts, margin = 16 << 10;
   segment_manager_properties_t *props, *cprops;
+  u32 round_rx_fifo_sz, round_tx_fifo_sz;
   int rv, has_transport, seg_index;
   svm_fifo_segment_private_t *seg;
   segment_manager_t *sm;
@@ -1093,7 +1096,9 @@
   cprops = application_segment_manager_properties (client);
   evt_q_elts = props->evt_q_size + cprops->evt_q_size;
   evt_q_sz = segment_manager_evt_q_expected_size (evt_q_elts);
-  seg_size = props->rx_fifo_size + props->tx_fifo_size + evt_q_sz + margin;
+  round_rx_fifo_sz = 1 << max_log2 (props->rx_fifo_size);
+  round_tx_fifo_sz = 1 << max_log2 (props->tx_fifo_size);
+  seg_size = round_rx_fifo_sz + round_tx_fifo_sz + evt_q_sz + margin;
 
   has_transport = session_has_transport ((stream_session_t *) ll);
   if (!has_transport)
diff --git a/src/vnet/session/application_interface.c b/src/vnet/session/application_interface.c
index 9d82a18..c8fa37f 100644
--- a/src/vnet/session/application_interface.c
+++ b/src/vnet/session/application_interface.c
@@ -598,7 +598,7 @@
 {
   int rv;
   if ((rv = vnet_bind_i (a->app_index, &a->sep, &a->handle)))
-    return clib_error_return_code (0, rv, 0, "bind failed");
+    return clib_error_return_code (0, rv, 0, "bind failed: %d", rv);
   return 0;
 }
 
@@ -607,7 +607,7 @@
 {
   int rv;
   if ((rv = vnet_unbind_i (a->app_index, a->handle)))
-    return clib_error_return_code (0, rv, 0, "unbind failed");
+    return clib_error_return_code (0, rv, 0, "unbind failed: %d", rv);
   return 0;
 }
 
@@ -618,7 +618,7 @@
   int rv;
 
   if ((rv = application_connect (a->app_index, a->api_context, sep)))
-    return clib_error_return_code (0, rv, 0, "connect failed");
+    return clib_error_return_code (0, rv, 0, "connect failed: %d", rv);
   return 0;
 }
 
diff --git a/src/vnet/session/application_interface.h b/src/vnet/session/application_interface.h
index 0aabd38..ffe2a64 100644
--- a/src/vnet/session/application_interface.h
+++ b/src/vnet/session/application_interface.h
@@ -207,17 +207,18 @@
   u64 server_tx_fifo;
   u64 vpp_event_queue_address;
   u64 server_event_queue_address;
+  u64 client_event_queue_address;
   u16 port;
   u8 is_ip4;
   u8 ip[16];
-} session_accepted_msg_t;
+} __clib_packed session_accepted_msg_t;
 
 typedef struct session_accepted_reply_msg_
 {
   u32 context;
   i32 retval;
   u64 handle;
-} session_accepted_reply_msg_t;
+} __clib_packed session_accepted_reply_msg_t;
 
 /* Make sure this is not too large, otherwise it won't fit when dequeued in
  * the session queue node */
@@ -232,34 +233,35 @@
   u64 server_tx_fifo;
   u64 vpp_event_queue_address;
   u64 client_event_queue_address;
+  u64 server_event_queue_address;
   u32 segment_size;
   u8 segment_name_length;
   u8 segment_name[64];
   u8 lcl_ip[16];
   u8 is_ip4;
   u16 lcl_port;
-} session_connected_msg_t;
+} __clib_packed session_connected_msg_t;
 
 typedef struct session_disconnected_msg_
 {
   u32 client_index;
   u32 context;
   u64 handle;
-} session_disconnected_msg_t;
+} __clib_packed session_disconnected_msg_t;
 
 typedef struct session_disconnected_reply_msg_
 {
   u32 context;
   i32 retval;
   u64 handle;
-} session_disconnected_reply_msg_t;
+} __clib_packed session_disconnected_reply_msg_t;
 
 typedef struct session_reset_msg_
 {
   u32 client_index;
   u32 context;
   u64 handle;
-} session_reset_msg_t;
+} __clib_packed session_reset_msg_t;
 
 typedef struct session_reset_reply_msg_
 {
@@ -267,13 +269,13 @@
   u32 context;
   i32 retval;
   u64 handle;
-} session_reset_reply_msg_t;
+} __clib_packed session_reset_reply_msg_t;
 
 typedef struct app_session_event_
 {
   svm_msg_q_msg_t msg;
   session_event_t *evt;
-} app_session_evt_t;
+} __clib_packed app_session_evt_t;
 
 static inline void
 app_alloc_ctrl_evt_to_vpp (svm_msg_q_t * mq, app_session_evt_t * app_evt,
@@ -337,12 +339,9 @@
   else
     {
       svm_msg_q_lock (mq);
+      while (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING))
+	svm_msg_q_wait (mq);
       msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
-      while (svm_msg_q_msg_is_invalid (&msg))
-	{
-	  svm_msg_q_wait (mq);
-	  msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
-	}
       evt = (session_event_t *) svm_msg_q_msg_data (mq, &msg);
       evt->fifo = f;
       evt->event_type = evt_type;
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c
index 897cb1a..56f885b 100644
--- a/src/vnet/session/session.c
+++ b/src/vnet/session/session.c
@@ -753,9 +753,10 @@
   stream_session_t *s;
 
   s = session_get (tc->s_index, tc->thread_index);
-  server = application_get (s->app_index);
-  server->cb_fns.session_disconnect_callback (s);
   s->session_state = SESSION_STATE_CLOSING;
+  server = application_get_if_valid (s->app_index);
+  if (server)
+    server->cb_fns.session_disconnect_callback (s);
 }
 
 /**
diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h
index 1917616..99546cb 100644
--- a/src/vnet/session/session.h
+++ b/src/vnet/session/session.h
@@ -37,6 +37,8 @@
   FIFO_EVENT_DISCONNECT,
   FIFO_EVENT_BUILTIN_RX,
   FIFO_EVENT_RPC,
+  SESSION_IO_EVT_CT_TX,
+  SESSION_IO_EVT_CT_RX,
   SESSION_CTRL_EVT_ACCEPTED,
   SESSION_CTRL_EVT_ACCEPTED_REPLY,
   SESSION_CTRL_EVT_CONNECTED,
diff --git a/src/vnet/session/session_api.c b/src/vnet/session/session_api.c
index 724aff1..8585b57 100755
--- a/src/vnet/session/session_api.c
+++ b/src/vnet/session/session_api.c
@@ -435,7 +435,9 @@
 	}
       mp->handle = application_local_session_handle (ls);
       mp->port = ls->port;
-      mp->vpp_event_queue_address = ls->client_evt_q;
+      vpp_queue = session_manager_get_vpp_event_queue (0);
+      mp->vpp_event_queue_address = pointer_to_uword (vpp_queue);
+      mp->client_event_queue_address = ls->client_evt_q;
       mp->server_event_queue_address = ls->server_evt_q;
     }
   svm_msg_q_add (app_mq, msg, SVM_Q_WAIT);
@@ -541,8 +543,10 @@
       local_session_t *ls = (local_session_t *) s;
       mp->handle = application_local_session_handle (ls);
       mp->lcl_port = ls->port;
-      mp->vpp_event_queue_address = ls->server_evt_q;
+      vpp_mq = session_manager_get_vpp_event_queue (0);
+      mp->vpp_event_queue_address = pointer_to_uword (vpp_mq);
       mp->client_event_queue_address = ls->client_evt_q;
+      mp->server_event_queue_address = ls->server_evt_q;
       mp->server_rx_fifo = pointer_to_uword (s->server_tx_fifo);
       mp->server_tx_fifo = pointer_to_uword (s->server_rx_fifo);
     }
diff --git a/src/vnet/session/session_cli.c b/src/vnet/session/session_cli.c
index 06d98ae..3588bbc 100755
--- a/src/vnet/session/session_cli.c
+++ b/src/vnet/session/session_cli.c
@@ -26,14 +26,16 @@
   if (!ss->server_rx_fifo || !ss->server_tx_fifo)
     return s;
 
-  s = format (s, " Rx fifo: %U", format_svm_fifo, ss->server_rx_fifo, 1);
+  s = format (s, " Rx fifo: %U", format_svm_fifo, ss->server_rx_fifo,
+	      verbose);
   if (verbose > 2 && ss->server_rx_fifo->has_event)
     {
       found = session_node_lookup_fifo_event (ss->server_rx_fifo, e);
       s = format (s, " session node event: %s\n",
 		  found ? "found" : "not found");
     }
-  s = format (s, " Tx fifo: %U", format_svm_fifo, ss->server_tx_fifo, 1);
+  s = format (s, " Tx fifo: %U", format_svm_fifo, ss->server_tx_fifo,
+	      verbose);
   if (verbose > 2 && ss->server_tx_fifo->has_event)
     {
       found = session_node_lookup_fifo_event (ss->server_tx_fifo, e);
diff --git a/src/vnet/session/session_lookup.c b/src/vnet/session/session_lookup.c
index 3a31352..37fccd9 100644
--- a/src/vnet/session/session_lookup.c
+++ b/src/vnet/session/session_lookup.c
@@ -1299,7 +1299,7 @@
 format_ip4_session_lookup_kvp (u8 * s, va_list * args)
 {
   clib_bihash_kv_16_8_t *kvp = va_arg (*args, clib_bihash_kv_16_8_t *);
-  u32 is_local = va_arg (*args, u32);
+  u32 is_local = va_arg (*args, u32), app_index, session_index;
   u8 *app_name, *str = 0;
   stream_session_t *session;
   v4_connection_key_t *key = (v4_connection_key_t *) kvp->key;
@@ -1316,7 +1316,8 @@
     }
   else
     {
-      app_name = application_name_from_index (kvp->value);
+      local_session_parse_handle (kvp->value, &app_index, &session_index);
+      app_name = application_name_from_index (app_index);
       str = format (0, "[%U] %U:%d", format_transport_proto_short, key->proto,
 		    format_ip4_address, &key->src,
 		    clib_net_to_host_u16 (key->src_port));
diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c
index baabb05..30cd5ae 100644
--- a/src/vnet/session/session_node.c
+++ b/src/vnet/session/session_node.c
@@ -795,7 +795,7 @@
     {
       stream_session_t *s;	/* $$$ prefetch 1 ahead maybe */
       session_event_t *e;
-      u32 to_dequeue;
+      u8 is_full;
 
       e = &fifo_events[i];
       switch (e->event_type)
@@ -814,7 +814,7 @@
 	      clib_warning ("It's dead, Jim!");
 	      continue;
 	    }
-	  to_dequeue = svm_fifo_max_dequeue (s->server_tx_fifo);
+	  is_full = svm_fifo_is_full (s->server_tx_fifo);
 
 	  /* Spray packets in per session type frames, since they go to
 	   * different nodes */
@@ -823,7 +823,7 @@
 	  if (PREDICT_TRUE (rv == SESSION_TX_OK))
 	    {
 	      /* Notify app there's tx space if not polling */
-	      if (PREDICT_FALSE (to_dequeue == s->server_tx_fifo->nitems
+	      if (PREDICT_FALSE (is_full
 				 && !svm_fifo_has_event (s->server_tx_fifo)))
 		session_dequeue_notify (s);
 	    }