session: use half-open sessions for vc establishment
Use half-open sessions to track virtual circuit connection
establishment. These sesssions can only be allocated and freed by the
thread that allocates half-open connections (main). Consequently, they
can only be freed on half-open cleanup notifications from transports.
Goal is to simplify state tracking within the session layer but it's
also a first step towards allowing builtin apps to track and cleanup
outstanding connects.
Type: improvement
Signed-off-by: Florin Coras <fcoras@cisco.com>
Change-Id: I8a535906d13eb7f8966deb82333839de80f8049f
diff --git a/src/vnet/session/application.h b/src/vnet/session/application.h
index a8ddfec..5beb813 100644
--- a/src/vnet/session/application.h
+++ b/src/vnet/session/application.h
@@ -61,8 +61,8 @@
u8 app_is_builtin;
- /** Per transport proto hash tables of half-open connection handles */
- uword **half_open_table;
+ /** Pool of half-open session handles. Tracked in case worker detaches */
+ session_handle_t *half_open_table;
/** Protects detached seg managers */
clib_spinlock_t detached_seg_managers_lock;
@@ -313,13 +313,8 @@
int app_worker_init_connected (app_worker_t * app_wrk, session_t * s);
int app_worker_connect_notify (app_worker_t * app_wrk, session_t * s,
session_error_t err, u32 opaque);
-int app_worker_add_half_open (app_worker_t * app_wrk, transport_proto_t tp,
- session_handle_t ho_handle,
- session_handle_t wrk_handle);
-int app_worker_del_half_open (app_worker_t * app_wrk, transport_proto_t tp,
- session_handle_t ho_handle);
-u64 app_worker_lookup_half_open (app_worker_t * app_wrk, transport_proto_t tp,
- session_handle_t ho_handle);
+int app_worker_add_half_open (app_worker_t *app_wrk, session_handle_t sh);
+int app_worker_del_half_open (app_worker_t *app_wrk, u32 ho_index);
int app_worker_close_notify (app_worker_t * app_wrk, session_t * s);
int app_worker_transport_closed_notify (app_worker_t * app_wrk,
session_t * s);
diff --git a/src/vnet/session/application_worker.c b/src/vnet/session/application_worker.c
index 7e03171..13a3ede 100644
--- a/src/vnet/session/application_worker.c
+++ b/src/vnet/session/application_worker.c
@@ -58,9 +58,10 @@
vnet_unlisten_args_t _a, *a = &_a;
u64 handle, *handles = 0, *sm_indices = 0;
segment_manager_t *sm;
+ session_handle_t *sh;
session_t *ls;
u32 sm_index;
- int i, j;
+ int i;
/*
* Listener cleanup
@@ -110,26 +111,10 @@
* Half-open cleanup
*/
- for (i = 0; i < vec_len (app_wrk->half_open_table); i++)
- {
- if (!app_wrk->half_open_table[i])
- continue;
+ pool_foreach (sh, app_wrk->half_open_table)
+ session_cleanup_half_open (*sh);
- /* *INDENT-OFF* */
- hash_foreach (handle, sm_index, app_wrk->half_open_table[i], ({
- vec_add1 (handles, handle);
- }));
- /* *INDENT-ON* */
-
- for (j = 0; j < vec_len (handles); j++)
- session_cleanup_half_open (i, handles[j]);
-
- hash_free (app_wrk->half_open_table[i]);
- vec_reset_length (handles);
- }
-
- vec_free (app_wrk->half_open_table);
- vec_free (handles);
+ pool_free (app_wrk->half_open_table);
/*
* Detached listener segment managers cleanup
@@ -396,39 +381,25 @@
}
int
-app_worker_add_half_open (app_worker_t * app_wrk, transport_proto_t tp,
- session_handle_t ho_handle,
- session_handle_t wrk_handle)
+app_worker_add_half_open (app_worker_t *app_wrk, session_handle_t sh)
{
+ session_handle_t *shp;
+
ASSERT (vlib_get_thread_index () == 0);
- vec_validate (app_wrk->half_open_table, tp);
- hash_set (app_wrk->half_open_table[tp], ho_handle, wrk_handle);
- return 0;
+ pool_get (app_wrk->half_open_table, shp);
+ *shp = sh;
+
+ return (shp - app_wrk->half_open_table);
}
int
-app_worker_del_half_open (app_worker_t * app_wrk, transport_proto_t tp,
- session_handle_t ho_handle)
+app_worker_del_half_open (app_worker_t *app_wrk, u32 ho_index)
{
ASSERT (vlib_get_thread_index () == 0);
- hash_unset (app_wrk->half_open_table[tp], ho_handle);
+ pool_put_index (app_wrk->half_open_table, ho_index);
return 0;
}
-u64
-app_worker_lookup_half_open (app_worker_t * app_wrk, transport_proto_t tp,
- session_handle_t ho_handle)
-{
- u64 *ho_wrk_handlep;
-
- /* No locking because all updates are done from main thread */
- ho_wrk_handlep = hash_get (app_wrk->half_open_table[tp], ho_handle);
- if (!ho_wrk_handlep)
- return SESSION_INVALID_HANDLE;
-
- return *ho_wrk_handlep;
-}
-
int
app_worker_close_notify (app_worker_t * app_wrk, session_t * s)
{
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c
index 7fe35c2..1fac5ed 100644
--- a/src/vnet/session/session.c
+++ b/src/vnet/session/session.c
@@ -251,6 +251,10 @@
|| s->session_state <= SESSION_STATE_LISTENING)
return 1;
+ if (s->session_state == SESSION_STATE_CONNECTING &&
+ (s->flags & SESSION_F_HALF_OPEN))
+ return 1;
+
tc = session_get_transport (s);
if (s->connection_index != tc->c_index
|| s->thread_index != tc->thread_index || tc->s_index != si)
@@ -296,17 +300,23 @@
}
void
-session_cleanup_half_open (transport_proto_t tp, session_handle_t ho_handle)
+session_cleanup_half_open (session_handle_t ho_handle)
{
- transport_cleanup_half_open (tp, session_handle_index (ho_handle));
+ session_t *s = session_get_from_handle (ho_handle);
+ transport_cleanup_half_open (session_get_transport_proto (s),
+ s->connection_index);
}
void
-session_half_open_delete_notify (transport_proto_t tp,
- session_handle_t ho_handle)
+session_half_open_delete_notify (transport_connection_t *tc)
{
- app_worker_t *app_wrk = app_worker_get (session_handle_data (ho_handle));
- app_worker_del_half_open (app_wrk, tp, ho_handle);
+ app_worker_t *app_wrk;
+ session_t *s;
+
+ s = session_get (tc->s_index, tc->thread_index);
+ app_wrk = app_worker_get (s->app_wrk_index);
+ app_worker_del_half_open (app_wrk, s->ho_index);
+ session_free (s);
}
session_t *
@@ -328,6 +338,18 @@
return s;
}
+static session_t *
+session_alloc_for_half_open (transport_connection_t *tc)
+{
+ session_t *s;
+
+ s = ho_session_alloc ();
+ s->session_type = session_type_from_proto_and_ip (tc->proto, tc->is_ip4);
+ s->connection_index = tc->c_index;
+ tc->s_index = s->session_index;
+ return s;
+}
+
/**
* Discards bytes from buffer chain
*
@@ -797,39 +819,21 @@
session_stream_connect_notify (transport_connection_t * tc,
session_error_t err)
{
- session_handle_t ho_handle, wrk_handle;
u32 opaque = 0, new_ti, new_si;
app_worker_t *app_wrk;
- session_t *s = 0;
+ session_t *s = 0, *ho;
/*
- * Find connection handle and cleanup half-open table
+ * Cleanup half-open table
*/
- ho_handle = session_lookup_half_open_handle (tc);
- if (ho_handle == HALF_OPEN_LOOKUP_INVALID_VALUE)
- {
- SESSION_DBG ("half-open was removed!");
- return -1;
- }
session_lookup_del_half_open (tc);
- /* Get the app's index from the handle we stored when opening connection
- * and the opaque (api_context for external apps) from transport session
- * index */
- app_wrk = app_worker_get_if_valid (session_handle_data (ho_handle));
+ ho = ho_session_get (tc->s_index);
+ opaque = ho->opaque;
+ app_wrk = app_worker_get_if_valid (ho->app_wrk_index);
if (!app_wrk)
return -1;
- wrk_handle = app_worker_lookup_half_open (app_wrk, tc->proto, ho_handle);
- if (wrk_handle == SESSION_INVALID_HANDLE)
- return -1;
-
- /* Make sure this is the same half-open index */
- if (session_handle_index (wrk_handle) != session_handle_index (ho_handle))
- return -1;
-
- opaque = session_handle_data (wrk_handle);
-
if (err)
return app_worker_connect_notify (app_wrk, s, err, opaque);
@@ -1258,7 +1262,8 @@
{
transport_connection_t *tc;
transport_endpoint_cfg_t *tep;
- u64 handle, wrk_handle;
+ app_worker_t *app_wrk;
+ session_t *s;
int rv;
tep = session_endpoint_to_transport_cfg (rmt);
@@ -1271,27 +1276,22 @@
tc = transport_get_half_open (rmt->transport_proto, (u32) rv);
- /* If transport offers a stream service, only allocate session once the
- * connection has been established.
- * Add connection to half-open table and save app and tc index. The
- * latter is needed to help establish the connection while the former
- * is needed when the connect notify comes and we have to notify the
- * external app
- */
- handle = session_make_handle (tc->c_index, app_wrk_index);
- session_lookup_add_half_open (tc, handle);
+ app_wrk = app_worker_get (app_wrk_index);
- /* Store the half-open handle in the connection. Transport will use it
- * when cleaning up @ref session_half_open_delete_notify
+ /* If transport offers a vc service, only allocate established
+ * session once the connection has been established.
+ * In the meantime allocate half-open session for tracking purposes
+ * associate half-open connection to it and add session to app-worker
+ * half-open table. These are needed to allocate the established
+ * session on transport notification, and to cleanup the half-open
+ * session if the app detaches before connection establishment.
*/
- tc->s_ho_handle = handle;
+ s = session_alloc_for_half_open (tc);
+ s->app_wrk_index = app_wrk->wrk_index;
+ s->ho_index = app_worker_add_half_open (app_wrk, session_handle (s));
+ s->opaque = opaque;
- /* Track the half-open connections in case we want to forcefully
- * clean them up @ref session_cleanup_half_open
- */
- wrk_handle = session_make_handle (tc->c_index, opaque);
- app_worker_add_half_open (app_worker_get (app_wrk_index),
- rmt->transport_proto, handle, wrk_handle);
+ session_lookup_add_half_open (tc, tc->c_index);
return 0;
}
diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h
index 8b59133..1a59d7d 100644
--- a/src/vnet/session/session.h
+++ b/src/vnet/session/session.h
@@ -331,8 +331,7 @@
session_t *session_alloc (u32 thread_index);
void session_free (session_t * s);
void session_free_w_fifos (session_t * s);
-void session_cleanup_half_open (transport_proto_t tp,
- session_handle_t ho_handle);
+void session_cleanup_half_open (session_handle_t ho_handle);
u8 session_is_valid (u32 si, u8 thread_index);
always_inline session_t *
@@ -504,8 +503,7 @@
int session_stream_accept_notify (transport_connection_t * tc);
void session_transport_closing_notify (transport_connection_t * tc);
void session_transport_delete_notify (transport_connection_t * tc);
-void session_half_open_delete_notify (transport_proto_t tp,
- session_handle_t ho_handle);
+void session_half_open_delete_notify (transport_connection_t *tc);
void session_transport_closed_notify (transport_connection_t * tc);
void session_transport_reset_notify (transport_connection_t * tc);
int session_stream_accept (transport_connection_t * tc, u32 listener_index,
@@ -656,6 +654,30 @@
session_free (s);
}
+always_inline session_t *
+ho_session_alloc (void)
+{
+ session_t *s;
+ ASSERT (vlib_get_thread_index () == 0);
+ s = session_alloc (0);
+ s->session_state = SESSION_STATE_CONNECTING;
+ s->flags |= SESSION_F_HALF_OPEN;
+ return s;
+}
+
+always_inline session_t *
+ho_session_get (u32 ho_index)
+{
+ return session_get (ho_index, 0 /* half-open thread */);
+}
+
+always_inline void
+ho_session_free (session_t *s)
+{
+ ASSERT (!s->rx_fifo && s->thread_index == 0);
+ session_free (s);
+}
+
transport_connection_t *listen_session_get_transport (session_t * s);
/*
diff --git a/src/vnet/session/session_cli.c b/src/vnet/session/session_cli.c
index 2ca9630..1505a95 100644
--- a/src/vnet/session/session_cli.c
+++ b/src/vnet/session/session_cli.c
@@ -144,8 +144,12 @@
}
else if (ss->session_state == SESSION_STATE_CONNECTING)
{
- s = format (s, "%-40U%v", format_transport_half_open_connection,
- tp, ss->connection_index, ss->thread_index, str);
+ if (ss->flags & SESSION_F_HALF_OPEN)
+ s = format (s, "%U%v", format_transport_half_open_connection, tp,
+ ss->connection_index, ss->thread_index, verbose, str);
+ else
+ s = format (s, "%U", format_transport_connection, tp,
+ ss->connection_index, ss->thread_index, verbose);
}
else
{
diff --git a/src/vnet/session/session_types.h b/src/vnet/session/session_types.h
index c8b1d2e..2c3db5f 100644
--- a/src/vnet/session/session_types.h
+++ b/src/vnet/session/session_types.h
@@ -148,13 +148,14 @@
SESSION_N_STATES,
} session_state_t;
-#define foreach_session_flag \
- _(RX_EVT, "rx-event") \
- _(PROXY, "proxy") \
- _(CUSTOM_TX, "custom-tx") \
- _(IS_MIGRATING, "migrating") \
- _(UNIDIRECTIONAL, "unidirectional") \
- _(CUSTOM_FIFO_TUNING, "custom-fifo-tuning") \
+#define foreach_session_flag \
+ _ (RX_EVT, "rx-event") \
+ _ (PROXY, "proxy") \
+ _ (CUSTOM_TX, "custom-tx") \
+ _ (IS_MIGRATING, "migrating") \
+ _ (UNIDIRECTIONAL, "unidirectional") \
+ _ (CUSTOM_FIFO_TUNING, "custom-fifo-tuning") \
+ _ (HALF_OPEN, "half-open")
typedef enum session_flags_bits_
{
@@ -208,6 +209,9 @@
/** App listener index in app's listener pool if a listener */
u32 al_index;
+
+ /** Index in app worker's half-open table if a half-open */
+ u32 ho_index;
};
/** Opaque, for general use */
diff --git a/src/vnet/session/transport.c b/src/vnet/session/transport.c
index a420dcd..18ae3ca 100644
--- a/src/vnet/session/transport.c
+++ b/src/vnet/session/transport.c
@@ -124,14 +124,14 @@
format_transport_half_open_connection (u8 * s, va_list * args)
{
u32 transport_proto = va_arg (*args, u32);
- u32 listen_index = va_arg (*args, u32);
+ u32 ho_index = va_arg (*args, u32);
transport_proto_vft_t *tp_vft;
tp_vft = transport_protocol_get_vft (transport_proto);
if (!tp_vft)
return s;
- s = format (s, "%U", tp_vft->format_half_open, listen_index);
+ s = format (s, "%U", tp_vft->format_half_open, ho_index);
return s;
}
diff --git a/src/vnet/session/transport_types.h b/src/vnet/session/transport_types.h
index 7ea8d5f..75fd1b8 100644
--- a/src/vnet/session/transport_types.h
+++ b/src/vnet/session/transport_types.h
@@ -147,7 +147,6 @@
#define c_pacer connection.pacer
#define c_flags connection.flags
#define s_ho_handle pacer.bytes_per_sec
-#define c_s_ho_handle connection.pacer.bytes_per_sec
} transport_connection_t;
STATIC_ASSERT (STRUCT_OFFSET_OF (transport_connection_t, s_index)
diff --git a/src/vnet/tcp/tcp.c b/src/vnet/tcp/tcp.c
index a6c1e21..90b832c 100644
--- a/src/vnet/tcp/tcp.c
+++ b/src/vnet/tcp/tcp.c
@@ -214,7 +214,7 @@
if (tc->c_thread_index != vlib_get_thread_index ())
return 1;
- session_half_open_delete_notify (TRANSPORT_PROTO_TCP, tc->c_s_ho_handle);
+ session_half_open_delete_notify (&tc->connection);
wrk = tcp_get_worker (tc->c_thread_index);
tcp_timer_reset (&wrk->timer_wheel, tc, TCP_TIMER_RETRANSMIT_SYN);
tcp_half_open_connection_free (tc);
@@ -853,8 +853,20 @@
{
u32 tci = va_arg (*args, u32);
u32 __clib_unused thread_index = va_arg (*args, u32);
- tcp_connection_t *tc = tcp_half_open_connection_get (tci);
- return format (s, "%U", format_tcp_connection_id, tc);
+ u32 verbose = va_arg (*args, u32);
+ tcp_connection_t *tc;
+ u8 *state = 0;
+
+ tc = tcp_half_open_connection_get (tci);
+ if (tc->flags & TCP_CONN_HALF_OPEN_DONE)
+ state = format (state, "%s", "CLOSED");
+ else
+ state = format (state, "%U", format_tcp_state, tc->state);
+ s = format (s, "%-" SESSION_CLI_ID_LEN "U", format_tcp_connection_id, tc);
+ if (verbose)
+ s = format (s, "%-" SESSION_CLI_STATE_LEN "v", state);
+ vec_free (state);
+ return s;
}
static transport_connection_t *