session: switch ct to vc and track half-opens

Type: improvement

Signed-off-by: Florin Coras <fcoras@cisco.com>
Change-Id: I7f9c4b9b6e523ab549087ad21724f34f08fca793
diff --git a/src/vnet/session/application.c b/src/vnet/session/application.c
index e0c87b9..8a304f9 100644
--- a/src/vnet/session/application.c
+++ b/src/vnet/session/application.c
@@ -1345,9 +1345,9 @@
       a->sep_ext.original_tp = a->sep_ext.transport_proto;
       a->sep_ext.transport_proto = TRANSPORT_PROTO_NONE;
       rv = app_worker_connect_session (client_wrk, &a->sep_ext, &a->sh);
-      if (rv <= 0)
-	return rv;
       a->sep_ext.transport_proto = a->sep_ext.original_tp;
+      if (!rv || rv != SESSION_E_LOCAL_CONNECT)
+	return rv;
     }
   /*
    * Not connecting to a local server, propagate to transport
diff --git a/src/vnet/session/application_local.c b/src/vnet/session/application_local.c
index 1220952..e061983 100644
--- a/src/vnet/session/application_local.c
+++ b/src/vnet/session/application_local.c
@@ -21,6 +21,8 @@
   ct_connection_t **connections;	/**< Per-worker connection pools */
   u32 n_workers;			/**< Number of vpp workers */
   u32 n_sessions;			/**< Cumulative sessions counter */
+  u32 *ho_reusable;			/**< Vector of reusable ho indices */
+  clib_spinlock_t ho_reuseable_lock;	/**< Lock for reusable ho indices */
 } ct_main_t;
 
 static ct_main_t ct_main;
@@ -59,6 +61,31 @@
   pool_put (ct_main.connections[ct->c_thread_index], ct);
 }
 
+static ct_connection_t *
+ct_half_open_alloc (void)
+{
+  ct_main_t *cm = &ct_main;
+  u32 *hip;
+
+  clib_spinlock_lock (&cm->ho_reuseable_lock);
+  vec_foreach (hip, cm->ho_reusable)
+    pool_put_index (cm->connections[0], *hip);
+  vec_reset_length (cm->ho_reusable);
+  clib_spinlock_unlock (&cm->ho_reuseable_lock);
+
+  return ct_connection_alloc (0);
+}
+
+void
+ct_half_open_add_reusable (u32 ho_index)
+{
+  ct_main_t *cm = &ct_main;
+
+  clib_spinlock_lock (&cm->ho_reuseable_lock);
+  vec_add1 (cm->ho_reusable, ho_index);
+  clib_spinlock_unlock (&cm->ho_reuseable_lock);
+}
+
 session_t *
 ct_session_get_peer (session_t * s)
 {
@@ -114,9 +141,23 @@
       segment_manager_segment_reader_unlock (sm);
     }
 
-  /* Alloc client session */
+  /*
+   * Alloc client session
+   */
   cct = ct_connection_get (sct->peer_index, thread_index);
 
+  /* Client closed while waiting for reply from server */
+  if (!cct)
+    {
+      session_transport_closing_notify (&sct->connection);
+      session_transport_delete_notify (&sct->connection);
+      ct_connection_free (sct);
+      return 0;
+    }
+
+  session_half_open_delete_notify (&cct->connection);
+  cct->flags &= ~CT_CONN_F_HALF_OPEN;
+
   cs = session_alloc (thread_index);
   ss = session_get (ss_index, thread_index);
   cs->session_type = ss->session_type;
@@ -234,55 +275,63 @@
   return rv;
 }
 
-typedef struct ct_accept_rpc_args
-{
-  u32 ll_s_index;
-  u32 thread_index;
-  ip46_address_t ip;
-  u16 port;
-  u8 is_ip4;
-  u32 opaque;
-  u32 client_wrk_index;
-} ct_accept_rpc_args_t;
-
 static void
 ct_accept_rpc_wrk_handler (void *accept_args)
 {
-  ct_accept_rpc_args_t *args = (ct_accept_rpc_args_t *) accept_args;
-  ct_connection_t *sct, *cct, *ll_ct;
+  u32 cct_index, ho_index, thread_index, ll_index;
+  ct_connection_t *sct, *cct, *ll_ct, *ho;
   app_worker_t *server_wrk;
   session_t *ss, *ll;
-  u32 cct_index;
 
-  ll = listen_session_get (args->ll_s_index);
-
-  cct = ct_connection_alloc (args->thread_index);
+  /*
+   * Alloc client ct and initialize from ho
+   */
+  thread_index = vlib_get_thread_index ();
+  cct = ct_connection_alloc (thread_index);
   cct_index = cct->c_c_index;
-  sct = ct_connection_alloc (args->thread_index);
-  ll_ct = ct_connection_get (ll->connection_index, 0 /* listener thread */ );
+
+  ho_index = pointer_to_uword (accept_args);
+  ho = ct_connection_get (ho_index, 0);
+
+  /* Unlikely but half-open session and transport could have been freed */
+  if (PREDICT_FALSE (!ho))
+    {
+      ct_connection_free (cct);
+      return;
+    }
+
+  clib_memcpy (cct, ho, sizeof (*ho));
+  cct->c_c_index = cct_index;
+  cct->c_thread_index = thread_index;
+  cct->flags |= CT_CONN_F_HALF_OPEN;
+
+  /* Notify session layer that half-open is on a different thread
+   * and mark ho connection index reusable. Avoids another rpc
+   */
+  session_half_open_migrate_notify (&cct->connection);
+  session_half_open_migrated_notify (&cct->connection);
+  ct_half_open_add_reusable (ho_index);
 
   /*
-   * Alloc and init client transport
+   * Alloc and init server transport
    */
-  cct = ct_connection_get (cct_index, args->thread_index);
-  cct->c_rmt_port = args->port;
-  cct->c_lcl_port = 0;
-  cct->c_is_ip4 = args->is_ip4;
-  clib_memcpy (&cct->c_rmt_ip, &args->ip, sizeof (args->ip));
+
+  ll_index = cct->peer_index;
+  ll = listen_session_get (ll_index);
+  sct = ct_connection_alloc (thread_index);
+  ll_ct = ct_connection_get (ll->connection_index, 0 /* listener thread */);
+
+  /* Make sure cct is valid after sct alloc */
+  cct = ct_connection_get (cct_index, thread_index);
   cct->actual_tp = ll_ct->actual_tp;
-  cct->is_client = 1;
-  cct->c_s_index = ~0;
 
-  /*
-   * Init server transport
-   */
   sct->c_rmt_port = 0;
   sct->c_lcl_port = ll_ct->c_lcl_port;
-  sct->c_is_ip4 = args->is_ip4;
+  sct->c_is_ip4 = cct->c_is_ip4;
   clib_memcpy (&sct->c_lcl_ip, &ll_ct->c_lcl_ip, sizeof (ll_ct->c_lcl_ip));
-  sct->client_wrk = args->client_wrk_index;
+  sct->client_wrk = cct->client_wrk;
   sct->c_proto = TRANSPORT_PROTO_NONE;
-  sct->client_opaque = args->opaque;
+  sct->client_opaque = cct->client_opaque;
   sct->actual_tp = ll_ct->actual_tp;
 
   sct->peer_index = cct->c_c_index;
@@ -292,8 +341,8 @@
    * Accept server session. Client session is created only after
    * server confirms accept.
    */
-  ss = session_alloc (args->thread_index);
-  ll = listen_session_get (args->ll_s_index);
+  ss = session_alloc (thread_index);
+  ll = listen_session_get (ll_index);
   ss->session_type = session_type_from_proto_and_ip (TRANSPORT_PROTO_NONE,
 						     sct->c_is_ip4);
   ss->connection_index = sct->c_c_index;
@@ -323,17 +372,15 @@
     }
 
   cct->segment_handle = sct->segment_handle;
-
-  clib_mem_free (args);
 }
 
 static int
 ct_connect (app_worker_t * client_wrk, session_t * ll,
 	    session_endpoint_cfg_t * sep)
 {
-  ct_accept_rpc_args_t *args;
+  u32 thread_index, ho_index;
   ct_main_t *cm = &ct_main;
-  u32 thread_index;
+  ct_connection_t *ho;
 
   /* Simple round-robin policy for spreading sessions over workers. We skip
    * thread index 0, i.e., offset the index by 1, when we have workers as it
@@ -342,18 +389,34 @@
   cm->n_sessions += 1;
   thread_index = cm->n_workers ? (cm->n_sessions % cm->n_workers) + 1 : 0;
 
-  args = clib_mem_alloc (sizeof (*args));
-  args->ll_s_index = ll->session_index;
-  args->thread_index = thread_index;
-  clib_memcpy_fast (&args->ip, &sep->ip, sizeof (ip46_address_t));
-  args->port = sep->port;
-  args->is_ip4 = sep->is_ip4;
-  args->opaque = sep->opaque;
-  args->client_wrk_index = client_wrk->wrk_index;
+  /*
+   * Alloc and init client half-open transport
+   */
 
-  session_send_rpc_evt_to_thread (thread_index, ct_accept_rpc_wrk_handler,
-				  args);
-  return 0;
+  ho = ct_half_open_alloc ();
+  ho_index = ho->c_c_index;
+  ho->c_rmt_port = sep->port;
+  ho->c_lcl_port = 0;
+  ho->c_is_ip4 = sep->is_ip4;
+  ho->client_opaque = sep->opaque;
+  ho->client_wrk = client_wrk->wrk_index;
+  ho->peer_index = ll->session_index;
+  ho->c_proto = TRANSPORT_PROTO_NONE;
+  ho->c_flags |= TRANSPORT_CONNECTION_F_NO_LOOKUP;
+  clib_memcpy (&ho->c_rmt_ip, &sep->ip, sizeof (&sep->ip));
+  ho->flags |= CT_CONN_F_CLIENT;
+  ho->c_s_index = ~0;
+
+  /*
+   * Accept connection on thread selected above. Connected reply comes
+   * after server accepts the connection.
+   */
+
+  session_send_rpc_evt_to_thread_force (thread_index,
+					ct_accept_rpc_wrk_handler,
+					uword_to_pointer (ho_index, void *));
+
+  return ho_index;
 }
 
 static u32
@@ -388,11 +451,39 @@
   return (transport_connection_t *) ct_connection_get (ct_index, 0);
 }
 
+static transport_connection_t *
+ct_half_open_get (u32 ct_index)
+{
+  return (transport_connection_t *) ct_connection_get (ct_index, 0);
+}
+
+static void
+ct_session_cleanup (u32 conn_index, u32 thread_index)
+{
+  ct_connection_t *ct, *peer_ct;
+
+  ct = ct_connection_get (conn_index, thread_index);
+  if (!ct)
+    return;
+
+  peer_ct = ct_connection_get (ct->peer_index, thread_index);
+  if (peer_ct)
+    peer_ct->peer_index = ~0;
+
+  ct_connection_free (ct);
+}
+
+static void
+ct_cleanup_ho (u32 ho_index)
+{
+  ct_connection_free (ct_connection_get (ho_index, 0));
+}
+
 static int
 ct_session_connect (transport_endpoint_cfg_t * tep)
 {
   session_endpoint_cfg_t *sep_ext;
-  session_endpoint_t *sep;
+  session_endpoint_t _sep, *sep = &_sep;
   app_worker_t *app_wrk;
   session_handle_t lh;
   application_t *app;
@@ -402,7 +493,7 @@
   u8 fib_proto;
 
   sep_ext = (session_endpoint_cfg_t *) tep;
-  sep = (session_endpoint_t *) tep;
+  _sep = *(session_endpoint_t *) tep;
   app_wrk = app_worker_get (sep_ext->app_wrk_index);
   app = application_get (app_wrk->app_index);
 
@@ -448,7 +539,7 @@
     return ct_connect (app_wrk, ll, sep_ext);
 
   /* Failed to connect but no error */
-  return 1;
+  return SESSION_E_LOCAL_CONNECT;
 }
 
 static void
@@ -464,7 +555,13 @@
     {
       peer_ct->peer_index = ~0;
       /* Make sure session was allocated */
-      if (peer_ct->c_s_index != ~0)
+      if (peer_ct->flags & CT_CONN_F_HALF_OPEN)
+	{
+	  app_wrk = app_worker_get (peer_ct->client_wrk);
+	  app_worker_connect_notify (app_wrk, 0, SESSION_E_REFUSED,
+				     peer_ct->client_opaque);
+	}
+      else if (peer_ct->c_s_index != ~0)
 	session_transport_closing_notify (&peer_ct->connection);
       else
 	ct_connection_free (peer_ct);
@@ -475,7 +572,7 @@
   if (app_wrk)
     app_worker_del_segment_notify (app_wrk, ct->segment_handle);
   session_free_w_fifos (s);
-  if (ct->is_client)
+  if (ct->flags & CT_CONN_F_CLIENT)
     segment_manager_dealloc_fifos (ct->client_rx_fifo, ct->client_tx_fifo);
 
   ct_connection_free (ct);
@@ -560,6 +657,18 @@
 }
 
 static u8 *
+format_ct_half_open (u8 *s, va_list *args)
+{
+  u32 ho_index = va_arg (*args, u32);
+  u32 verbose = va_arg (*args, u32);
+  ct_connection_t *ct = ct_connection_get (ho_index, 0);
+  s = format (s, "%-" SESSION_CLI_ID_LEN "U", format_ct_connection_id, ct);
+  if (verbose)
+    s = format (s, "%-" SESSION_CLI_STATE_LEN "s", "HALF-OPEN");
+  return s;
+}
+
+static u8 *
 format_ct_connection (u8 * s, va_list * args)
 {
   ct_connection_t *ct = va_arg (*args, ct_connection_t *);
@@ -601,8 +710,12 @@
 clib_error_t *
 ct_enable_disable (vlib_main_t * vm, u8 is_en)
 {
-  ct_main.n_workers = vlib_num_workers ();
-  vec_validate (ct_main.connections, ct_main.n_workers);
+  ct_main_t *cm = &ct_main;
+
+  cm->n_workers = vlib_num_workers ();
+  vec_validate (cm->connections, cm->n_workers);
+  clib_spinlock_init (&cm->ho_reuseable_lock);
+
   return 0;
 }
 
@@ -611,19 +724,23 @@
   .enable = ct_enable_disable,
   .start_listen = ct_start_listen,
   .stop_listen = ct_stop_listen,
+  .get_connection = ct_session_get,
   .get_listener = ct_listener_get,
+  .get_half_open = ct_half_open_get,
+  .cleanup = ct_session_cleanup,
+  .cleanup_ho = ct_cleanup_ho,
   .connect = ct_session_connect,
   .close = ct_session_close,
-  .get_connection = ct_session_get,
   .custom_tx = ct_custom_tx,
   .app_rx_evt = ct_app_rx_evt,
   .format_listener = format_ct_listener,
+  .format_half_open = format_ct_half_open,
   .format_connection = format_ct_session,
   .transport_options = {
     .name = "ct",
     .short_name = "C",
     .tx_type = TRANSPORT_TX_INTERNAL,
-    .service_type = TRANSPORT_SERVICE_APP,
+    .service_type = TRANSPORT_SERVICE_VC,
   },
 };
 /* *INDENT-ON* */
diff --git a/src/vnet/session/application_local.h b/src/vnet/session/application_local.h
index 7b937d3..09c33cc 100644
--- a/src/vnet/session/application_local.h
+++ b/src/vnet/session/application_local.h
@@ -20,19 +20,36 @@
 #include <vnet/session/application.h>
 #include <vnet/session/transport.h>
 
+#define foreach_ct_flags                                                      \
+  _ (CLIENT, "client")                                                        \
+  _ (HALF_OPEN, "half-open")
+
+enum
+{
+#define _(sym, str) CT_CONN_BIT_F_##sym,
+  foreach_ct_flags
+#undef _
+};
+
+typedef enum
+{
+#define _(sym, str) CT_CONN_F_##sym = 1 << CT_CONN_BIT_F_##sym,
+  foreach_ct_flags
+#undef _
+} ct_connection_flags_t;
+
 typedef struct ct_connection_
 {
   transport_connection_t connection;
   u32 client_wrk;
   u32 server_wrk;
-  u32 transport_listener_index;
   transport_proto_t actual_tp;
   u32 client_opaque;
   u32 peer_index;
   u64 segment_handle;
   svm_fifo_t *client_rx_fifo;
   svm_fifo_t *client_tx_fifo;
-  u8 is_client;
+  ct_connection_flags_t flags;
 } ct_connection_t;
 
 session_t *ct_session_get_peer (session_t * s);
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c
index 7aadb21..f6b61ab 100644
--- a/src/vnet/session/session.c
+++ b/src/vnet/session/session.c
@@ -318,20 +318,19 @@
       /* Migrated transports are no longer half-opens */
       transport_cleanup (session_get_transport_proto (ho),
 			 ho->connection_index, ho->app_index /* overloaded */);
-      return;
     }
-  transport_cleanup_half_open (session_get_transport_proto (ho),
-			       ho->connection_index);
+  else
+    transport_cleanup_half_open (session_get_transport_proto (ho),
+				 ho->connection_index);
+  session_free (ho);
 }
 
 static void
-session_half_open_free (u32 ho_index)
+session_half_open_free (session_t *ho)
 {
   app_worker_t *app_wrk;
-  session_t *ho;
 
   ASSERT (vlib_get_thread_index () <= 1);
-  ho = ho_session_get (ho_index);
   app_wrk = app_worker_get (ho->app_wrk_index);
   app_worker_del_half_open (app_wrk, ho);
   session_free (ho);
@@ -340,16 +339,25 @@
 static void
 session_half_open_free_rpc (void *args)
 {
-  session_half_open_free (pointer_to_uword (args));
+  session_t *ho = ho_session_get (pointer_to_uword (args));
+  session_half_open_free (ho);
 }
 
 void
 session_half_open_delete_notify (transport_connection_t *tc)
 {
-  void *args = uword_to_pointer ((uword) tc->s_index, void *);
-  u32 ctrl_thread = vlib_num_workers () ? 1 : 0;
-  session_send_rpc_evt_to_thread (ctrl_thread, session_half_open_free_rpc,
-				  args);
+  /* Notification from ctrl thread accepted without rpc */
+  if (tc->thread_index <= 1)
+    {
+      session_half_open_free (ho_session_get (tc->s_index));
+    }
+  else
+    {
+      void *args = uword_to_pointer ((uword) tc->s_index, void *);
+      u32 ctrl_thread = vlib_num_workers () ? 1 : 0;
+      session_send_rpc_evt_to_thread (ctrl_thread, session_half_open_free_rpc,
+				      args);
+    }
 }
 
 void
diff --git a/src/vnet/session/session_types.h b/src/vnet/session/session_types.h
index 821aac9..246978e 100644
--- a/src/vnet/session/session_types.h
+++ b/src/vnet/session/session_types.h
@@ -485,7 +485,8 @@
   _ (EVENTFD_ALLOC, "failed to alloc eventfd")                                \
   _ (NOEXTCFG, "no extended transport config")                                \
   _ (NOCRYPTOENG, "no crypto engine")                                         \
-  _ (NOCRYPTOCKP, "cert key pair not found ")
+  _ (NOCRYPTOCKP, "cert key pair not found ")                                 \
+  _ (LOCAL_CONNECT, "could not connect with local scope")
 
 typedef enum session_error_p_
 {