session tcp: track half open in app wrk
Type: improvement
Do extra checks when establishing an active connect and cleanup pending
connects if application detaches.
Signed-off-by: Florin Coras <fcoras@cisco.com>
Change-Id: Ibe9349db57b313ba2aa5ea3960ef5cf755f5098a
diff --git a/src/vnet/session/application.h b/src/vnet/session/application.h
index 00b60c3..377ea1e 100644
--- a/src/vnet/session/application.h
+++ b/src/vnet/session/application.h
@@ -63,6 +63,9 @@
u32 api_client_index;
u8 app_is_builtin;
+
+ /** Per transport proto hash tables of half-open connection handles */
+ uword **half_open_table;
} app_worker_t;
typedef struct app_worker_map_
@@ -251,6 +254,13 @@
int app_worker_init_connected (app_worker_t * app_wrk, session_t * s);
int app_worker_connect_notify (app_worker_t * app_wrk, session_t * s,
session_error_t err, u32 opaque);
+int app_worker_add_half_open (app_worker_t * app_wrk, transport_proto_t tp,
+ session_handle_t ho_handle,
+ session_handle_t wrk_handle);
+int app_worker_del_half_open (app_worker_t * app_wrk, transport_proto_t tp,
+ session_handle_t ho_handle);
+u64 app_worker_lookup_half_open (app_worker_t * app_wrk, transport_proto_t tp,
+ session_handle_t ho_handle);
int app_worker_close_notify (app_worker_t * app_wrk, session_t * s);
int app_worker_transport_closed_notify (app_worker_t * app_wrk,
session_t * s);
diff --git a/src/vnet/session/application_worker.c b/src/vnet/session/application_worker.c
index 47b1567..c67aa88 100644
--- a/src/vnet/session/application_worker.c
+++ b/src/vnet/session/application_worker.c
@@ -75,6 +75,8 @@
}));
/* *INDENT-ON* */
+ hash_free (app_wrk->listeners_table);
+
for (i = 0; i < vec_len (handles); i++)
{
a->app_index = app->app_index;
@@ -83,6 +85,7 @@
/* seg manager is removed when unbind completes */
(void) vnet_unlisten (a);
}
+ vec_reset_length (handles);
/*
* Connects segment manager cleanup
@@ -96,6 +99,31 @@
segment_manager_init_free (sm);
}
+ /*
+ * Half-open cleanup
+ */
+
+ for (i = 0; i < vec_len (app_wrk->half_open_table); i++)
+ {
+ if (!app_wrk->half_open_table[i])
+ continue;
+
+ /* *INDENT-OFF* */
+ hash_foreach (handle, sm_index, app_wrk->half_open_table[i], ({
+ vec_add1 (handles, handle);
+ }));
+ /* *INDENT-ON* */
+
+ for (i = 0; i < vec_len (handles); i++)
+ session_cleanup_half_open (i, handles[i]);
+
+ hash_free (app_wrk->half_open_table[i]);
+ vec_reset_length (handles);
+ }
+
+ vec_free (app_wrk->half_open_table);
+ vec_free (handles);
+
/* If first segment manager is used by a listener */
if (app_wrk->first_segment_manager != APP_INVALID_SEGMENT_MANAGER_INDEX
&& app_wrk->first_segment_manager != app_wrk->connects_seg_manager)
@@ -335,6 +363,40 @@
}
int
+app_worker_add_half_open (app_worker_t * app_wrk, transport_proto_t tp,
+ session_handle_t ho_handle,
+ session_handle_t wrk_handle)
+{
+ ASSERT (vlib_get_thread_index () == 0);
+ vec_validate (app_wrk->half_open_table, tp);
+ hash_set (app_wrk->half_open_table[tp], ho_handle, wrk_handle);
+ return 0;
+}
+
+int
+app_worker_del_half_open (app_worker_t * app_wrk, transport_proto_t tp,
+ session_handle_t ho_handle)
+{
+ ASSERT (vlib_get_thread_index () == 0);
+ hash_unset (app_wrk->half_open_table[tp], ho_handle);
+ return 0;
+}
+
+u64
+app_worker_lookup_half_open (app_worker_t * app_wrk, transport_proto_t tp,
+ session_handle_t ho_handle)
+{
+ u64 *ho_wrk_handlep;
+
+ /* No locking because all updates are done from main thread */
+ ho_wrk_handlep = hash_get (app_wrk->half_open_table[tp], ho_handle);
+ if (!ho_wrk_handlep)
+ return SESSION_INVALID_HANDLE;
+
+ return *ho_wrk_handlep;
+}
+
+int
app_worker_close_notify (app_worker_t * app_wrk, session_t * s)
{
application_t *app = application_get (app_wrk->app_index);
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c
index ed940d5..a709302 100644
--- a/src/vnet/session/session.c
+++ b/src/vnet/session/session.c
@@ -282,6 +282,20 @@
session_free_w_fifos (s);
}
+void
+session_cleanup_half_open (transport_proto_t tp, session_handle_t ho_handle)
+{
+ transport_cleanup_half_open (tp, session_handle_index (ho_handle));
+}
+
+void
+session_half_open_delete_notify (transport_proto_t tp,
+ session_handle_t ho_handle)
+{
+ app_worker_t *app_wrk = app_worker_get (session_handle_data (ho_handle));
+ app_worker_del_half_open (app_wrk, tp, ho_handle);
+}
+
session_t *
session_alloc_for_connection (transport_connection_t * tc)
{
@@ -738,10 +752,10 @@
session_stream_connect_notify (transport_connection_t * tc,
session_error_t err)
{
+ session_handle_t ho_handle, wrk_handle;
u32 opaque = 0, new_ti, new_si;
app_worker_t *app_wrk;
session_t *s = 0;
- u64 ho_handle;
/*
* Find connection handle and cleanup half-open table
@@ -757,11 +771,19 @@
/* Get the app's index from the handle we stored when opening connection
* and the opaque (api_context for external apps) from transport session
* index */
- app_wrk = app_worker_get_if_valid (ho_handle >> 32);
+ app_wrk = app_worker_get_if_valid (session_handle_data (ho_handle));
if (!app_wrk)
return -1;
- opaque = tc->s_index;
+ wrk_handle = app_worker_lookup_half_open (app_wrk, tc->proto, ho_handle);
+ if (wrk_handle == SESSION_INVALID_HANDLE)
+ return -1;
+
+ /* Make sure this is the same half-open index */
+ if (session_handle_index (wrk_handle) != session_handle_index (ho_handle))
+ return -1;
+
+ opaque = session_handle_data (wrk_handle);
if (err)
return app_worker_connect_notify (app_wrk, s, err, opaque);
@@ -1183,7 +1205,7 @@
{
transport_connection_t *tc;
transport_endpoint_cfg_t *tep;
- u64 handle;
+ u64 handle, wrk_handle;
int rv;
tep = session_endpoint_to_transport_cfg (rmt);
@@ -1203,13 +1225,20 @@
* is needed when the connect notify comes and we have to notify the
* external app
*/
- handle = (((u64) app_wrk_index) << 32) | (u64) tc->c_index;
+ handle = session_make_handle (tc->c_index, app_wrk_index);
session_lookup_add_half_open (tc, handle);
- /* Store api_context (opaque) for when the reply comes. Not the nicest
- * thing but better than allocating a separate half-open pool.
+ /* Store the half-open handle in the connection. Transport will use it
+ * when cleaning up @ref session_half_open_delete_notify
*/
- tc->s_index = opaque;
+ tc->s_ho_handle = handle;
+
+ /* Track the half-open connections in case we want to forcefully
+ * clean them up @ref session_cleanup_half_open
+ */
+ wrk_handle = session_make_handle (tc->c_index, opaque);
+ app_worker_add_half_open (app_worker_get (app_wrk_index),
+ rmt->transport_proto, handle, wrk_handle);
return 0;
}
diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h
index 956bff0..5e0a832 100644
--- a/src/vnet/session/session.h
+++ b/src/vnet/session/session.h
@@ -293,6 +293,8 @@
session_t *session_alloc (u32 thread_index);
void session_free (session_t * s);
void session_free_w_fifos (session_t * s);
+void session_cleanup_half_open (transport_proto_t tp,
+ session_handle_t ho_handle);
u8 session_is_valid (u32 si, u8 thread_index);
always_inline session_t *
@@ -462,6 +464,8 @@
int session_stream_accept_notify (transport_connection_t * tc);
void session_transport_closing_notify (transport_connection_t * tc);
void session_transport_delete_notify (transport_connection_t * tc);
+void session_half_open_delete_notify (transport_proto_t tp,
+ session_handle_t ho_handle);
void session_transport_closed_notify (transport_connection_t * tc);
void session_transport_reset_notify (transport_connection_t * tc);
int session_stream_accept (transport_connection_t * tc, u32 listener_index,
diff --git a/src/vnet/session/session_types.h b/src/vnet/session/session_types.h
index cdaaad0..784312d 100644
--- a/src/vnet/session/session_types.h
+++ b/src/vnet/session/session_types.h
@@ -309,9 +309,21 @@
}
static inline session_handle_t
-session_make_handle (u32 session_index, u32 thread_index)
+session_make_handle (u32 session_index, u32 data)
{
- return (((u64) thread_index << 32) | (u64) session_index);
+ return (((u64) data << 32) | (u64) session_index);
+}
+
+always_inline u32
+session_handle_index (session_handle_t ho_handle)
+{
+ return (ho_handle & 0xffffffff);
+}
+
+always_inline u32
+session_handle_data (session_handle_t ho_handle)
+{
+ return (ho_handle >> 32);
}
typedef enum
diff --git a/src/vnet/session/transport.c b/src/vnet/session/transport.c
index d2c6594..9dd495c 100644
--- a/src/vnet/session/transport.c
+++ b/src/vnet/session/transport.c
@@ -303,6 +303,13 @@
tp_vfts[tp].cleanup (conn_index, thread_index);
}
+void
+transport_cleanup_half_open (transport_proto_t tp, u32 conn_index)
+{
+ if (tp_vfts[tp].cleanup)
+ tp_vfts[tp].cleanup_ho (conn_index);
+}
+
int
transport_connect (transport_proto_t tp, transport_endpoint_cfg_t * tep)
{
diff --git a/src/vnet/session/transport.h b/src/vnet/session/transport.h
index 954db49..9c873b1 100644
--- a/src/vnet/session/transport.h
+++ b/src/vnet/session/transport.h
@@ -76,6 +76,7 @@
void (*close) (u32 conn_index, u32 thread_index);
void (*reset) (u32 conn_index, u32 thread_index);
void (*cleanup) (u32 conn_index, u32 thread_index);
+ void (*cleanup_ho) (u32 conn_index);
clib_error_t *(*enable) (vlib_main_t * vm, u8 is_en);
/*
@@ -137,6 +138,7 @@
u32 transport_stop_listen (transport_proto_t tp, u32 conn_index);
void transport_cleanup (transport_proto_t tp, u32 conn_index,
u8 thread_index);
+void transport_cleanup_half_open (transport_proto_t tp, u32 conn_index);
void transport_get_endpoint (transport_proto_t tp, u32 conn_index,
u32 thread_index, transport_endpoint_t * tep,
u8 is_lcl);
diff --git a/src/vnet/session/transport_types.h b/src/vnet/session/transport_types.h
index 28043b5..d76970f 100644
--- a/src/vnet/session/transport_types.h
+++ b/src/vnet/session/transport_types.h
@@ -144,6 +144,8 @@
#define c_stats connection.stats
#define c_pacer connection.pacer
#define c_flags connection.flags
+#define s_ho_handle pacer.bucket
+#define c_s_ho_handle connection.pacer.bucket
} transport_connection_t;
STATIC_ASSERT (STRUCT_OFFSET_OF (transport_connection_t, s_index)