vls: support passive listeners
If an application worker calls listen on a session, vpp registers the
worker to the listener's work load balance group and, as new connections
are accepted, it may potentially push accept notifications to it.
There are however applications, like nginx, that on some workers may
never accept new connections on a session they've started listening on.
To avoid accumulating accept events on such workers, this patch adds
support for passive listeners. That is, workers that have started
listening on a session but then never call accept or epoll/select on
that listener.
Change-Id: I007e6dcb54fc88a0e3aab3c6e2a3d1ef135cbd58
Signed-off-by: Florin Coras <fcoras@cisco.com>
diff --git a/src/vcl/vcl_bapi.c b/src/vcl/vcl_bapi.c
index 5b9a9d5..debfb8f 100644
--- a/src/vcl/vcl_bapi.c
+++ b/src/vcl/vcl_bapi.c
@@ -594,9 +594,8 @@
}
void
-vppcom_send_unbind_sock (u64 vpp_handle)
+vppcom_send_unbind_sock (vcl_worker_t * wrk, u64 vpp_handle)
{
- vcl_worker_t *wrk = vcl_worker_get_current ();
vl_api_unbind_sock_t *ump;
ump = vl_msg_api_alloc (sizeof (*ump));
diff --git a/src/vcl/vcl_locked.c b/src/vcl/vcl_locked.c
index 8a8d7d9..f5892c1 100644
--- a/src/vcl/vcl_locked.c
+++ b/src/vcl/vcl_locked.c
@@ -22,19 +22,31 @@
u32 session_index;
u32 worker_index;
u32 vls_index;
- u32 flags;
u32 *workers_subscribed;
+ clib_bitmap_t *listeners;
} vcl_locked_session_t;
-typedef struct vcl_main_
+typedef struct vls_local_
+{
+ int vls_wrk_index;
+ volatile int vls_mt_n_threads;
+ pthread_mutex_t vls_mt_mq_mlock;
+ pthread_mutex_t vls_mt_spool_mlock;
+ volatile u8 select_mp_check;
+ volatile u8 epoll_mp_check;
+} vls_process_local_t;
+
+static vls_process_local_t vls_local;
+static vls_process_local_t *vlsl = &vls_local;
+
+typedef struct vls_main_
{
vcl_locked_session_t *vls_pool;
clib_rwlock_t vls_table_lock;
uword *session_index_to_vlsh_table;
} vls_main_t;
-vls_main_t vls_main;
-vls_main_t *vlsm = &vls_main;
+vls_main_t *vlsm;
static inline void
vls_table_rlock (void)
@@ -74,40 +86,42 @@
VLS_MT_LOCK_SPOOL = 1 << 1
} vls_mt_lock_type_t;
-static int vls_wrk_index = ~0;
-static volatile int vls_mt_n_threads;
-static pthread_mutex_t vls_mt_mq_mlock = PTHREAD_MUTEX_INITIALIZER;
-static pthread_mutex_t vls_mt_spool_mlock = PTHREAD_MUTEX_INITIALIZER;
-
static void
vls_mt_add (void)
{
- vls_mt_n_threads += 1;
- vcl_set_worker_index (vls_wrk_index);
+ vlsl->vls_mt_n_threads += 1;
+ vcl_set_worker_index (vlsl->vls_wrk_index);
}
static inline void
vls_mt_mq_lock (void)
{
- pthread_mutex_lock (&vls_mt_mq_mlock);
+ pthread_mutex_lock (&vlsl->vls_mt_mq_mlock);
}
static inline void
vls_mt_mq_unlock (void)
{
- pthread_mutex_unlock (&vls_mt_mq_mlock);
+ pthread_mutex_unlock (&vlsl->vls_mt_mq_mlock);
}
static inline void
vls_mt_spool_lock (void)
{
- pthread_mutex_lock (&vls_mt_spool_mlock);
+ pthread_mutex_lock (&vlsl->vls_mt_spool_mlock);
}
static inline void
vls_mt_create_unlock (void)
{
- pthread_mutex_unlock (&vls_mt_spool_mlock);
+ pthread_mutex_unlock (&vlsl->vls_mt_spool_mlock);
+}
+
+static void
+vls_mt_locks_init (void)
+{
+ pthread_mutex_init (&vlsl->vls_mt_mq_mlock, NULL);
+ pthread_mutex_init (&vlsl->vls_mt_spool_mlock, NULL);
}
static inline vcl_session_handle_t
@@ -182,6 +196,12 @@
}
static inline void
+vls_lock (vcl_locked_session_t * vls)
+{
+ clib_spinlock_lock (&vls->lock);
+}
+
+static inline void
vls_unlock (vcl_locked_session_t * vls)
{
clib_spinlock_unlock (&vls->lock);
@@ -204,6 +224,48 @@
vls_table_runlock ();
}
+vcl_session_handle_t
+vlsh_to_sh (vls_handle_t vlsh)
+{
+ vcl_locked_session_t *vls;
+ int rv;
+
+ vls = vls_get_w_dlock (vlsh);
+ if (!vls)
+ return INVALID_SESSION_ID;
+ rv = vls_to_sh (vls);
+ vls_dunlock (vls);
+ return rv;
+}
+
+vcl_session_handle_t
+vlsh_to_session_index (vls_handle_t vlsh)
+{
+ vcl_session_handle_t sh;
+ sh = vlsh_to_sh (vlsh);
+ return vppcom_session_index (sh);
+}
+
+vls_handle_t
+vls_si_to_vlsh (u32 session_index)
+{
+ uword *vlshp;
+ vlshp = hash_get (vlsm->session_index_to_vlsh_table, session_index);
+ return vlshp ? *vlshp : VLS_INVALID_HANDLE;
+}
+
+vls_handle_t
+vls_session_index_to_vlsh (uint32_t session_index)
+{
+ vls_handle_t vlsh;
+
+ vls_table_rlock ();
+ vlsh = vls_si_to_vlsh (session_index);
+ vls_table_runlock ();
+
+ return vlsh;
+}
+
u8
vls_is_shared (vcl_locked_session_t * vls)
{
@@ -220,26 +282,63 @@
return 0;
}
+static void
+vls_listener_wrk_set (vcl_locked_session_t * vls, u32 wrk_index, u8 is_active)
+{
+ clib_bitmap_set (vls->listeners, wrk_index, is_active);
+}
+
+static u8
+vls_listener_wrk_is_active (vcl_locked_session_t * vls, u32 wrk_index)
+{
+ return (clib_bitmap_get (vls->listeners, wrk_index) == 1);
+}
+
+static void
+vls_listener_wrk_start_listen (vcl_locked_session_t * vls, u32 wrk_index)
+{
+ vppcom_session_listen (vls_to_sh (vls), ~0);
+ vls_listener_wrk_set (vls, wrk_index, 1 /* is_active */ );
+}
+
+static void
+vls_listener_wrk_stop_listen (vcl_locked_session_t * vls, u32 wrk_index)
+{
+ vcl_worker_t *wrk;
+ vcl_session_t *s;
+
+ wrk = vcl_worker_get (wrk_index);
+ s = vcl_session_get (wrk, vls->session_index);
+ if (s->session_state != STATE_LISTEN)
+ return;
+ vppcom_send_unbind_sock (wrk, s->vpp_handle);
+ s->session_state = STATE_LISTEN_NO_MQ;
+ vls_listener_wrk_set (vls, wrk_index, 0 /* is_active */ );
+}
+
int
vls_unshare_session (vcl_locked_session_t * vls, vcl_worker_t * wrk)
{
+ int i, do_disconnect;
vcl_session_t *s;
- int i;
+
+ s = vcl_session_get (wrk, vls->session_index);
+ if (s->session_state == STATE_LISTEN)
+ vls_listener_wrk_set (vls, wrk->wrk_index, 0 /* is_active */ );
for (i = 0; i < vec_len (vls->workers_subscribed); i++)
{
if (vls->workers_subscribed[i] != wrk->wrk_index)
continue;
- s = vcl_session_get (wrk, vls->session_index);
if (s->rx_fifo)
{
svm_fifo_del_subscriber (s->rx_fifo, wrk->vpp_wrk_index);
svm_fifo_del_subscriber (s->tx_fifo, wrk->vpp_wrk_index);
}
vec_del1 (vls->workers_subscribed, i);
- vcl_session_cleanup (wrk, s, vcl_session_handle (s),
- 0 /* do_disconnect */ );
+ do_disconnect = s->session_state == STATE_LISTEN;
+ vcl_session_cleanup (wrk, s, vcl_session_handle (s), do_disconnect);
return 0;
}
@@ -247,8 +346,6 @@
if (vls->worker_index != wrk->wrk_index)
return 0;
- s = vcl_session_get (wrk, vls->session_index);
-
/* Check if we can change owner or close */
if (vec_len (vls->workers_subscribed))
{
@@ -272,16 +369,22 @@
{
vcl_locked_session_t *vls;
- vls = vls_get_w_dlock (vls_session_index_to_vlsh (s->session_index));
+ vls = vls_get (vls_si_to_vlsh (s->session_index));
if (!vls)
return;
+ vls_lock (vls);
vec_add1 (vls->workers_subscribed, wrk->wrk_index);
if (s->rx_fifo)
{
svm_fifo_add_subscriber (s->rx_fifo, wrk->vpp_wrk_index);
svm_fifo_add_subscriber (s->tx_fifo, wrk->vpp_wrk_index);
}
- vls_dunlock (vls);
+ else if (s->session_state == STATE_LISTEN)
+ {
+ s->session_state = STATE_LISTEN_NO_MQ;
+ }
+
+ vls_unlock (vls);
}
void
@@ -294,12 +397,15 @@
wrk->sessions = pool_dup (parent_wrk->sessions);
wrk->session_index_by_vpp_handles =
hash_dup (parent_wrk->session_index_by_vpp_handles);
+ vls_table_wlock ();
/* *INDENT-OFF* */
pool_foreach (s, wrk->sessions, ({
vls_share_vcl_session (wrk, s);
}));
/* *INDENT-ON* */
+
+ vls_table_wunlock ();
}
static void
@@ -363,7 +469,7 @@
int _locks_acq = 0; \
if (PREDICT_FALSE (vcl_get_worker_index () == ~0)); \
vls_mt_add (); \
- if (PREDICT_FALSE (vls_mt_n_threads > 1)) \
+ if (PREDICT_FALSE (vlsl->vls_mt_n_threads > 1)) \
vls_mt_acq_locks (_vls, _op, &_locks_acq); \
#define vls_mt_unguard() \
@@ -505,6 +611,46 @@
return rv;
}
+static inline void
+vls_mp_checks (vcl_locked_session_t * vls, int is_add)
+{
+ vcl_worker_t *wrk = vcl_worker_get_current ();
+ vcl_session_t *s;
+
+ s = vcl_session_get (wrk, vls->session_index);
+ switch (s->session_state)
+ {
+ case STATE_LISTEN:
+ if (is_add)
+ {
+ if (vls->worker_index == wrk->wrk_index)
+ vls_listener_wrk_set (vls, wrk->wrk_index, 1 /* is_active */ );
+ break;
+ }
+ vls_listener_wrk_stop_listen (vls, vls->worker_index);
+ break;
+ case STATE_LISTEN_NO_MQ:
+ if (!is_add)
+ break;
+
+ /* Register worker as listener */
+ vls_listener_wrk_start_listen (vls, wrk->wrk_index);
+
+ /* If owner worker did not attempt to accept/xpoll on the session,
+ * force a listen stop for it, since it may not be interested in
+ * accepting new sessions.
+ * This is pretty much a hack done to give app workers the illusion
+ * that it is fine to listen and not accept new sessions for a
+ * given listener. Without it, we would accumulate unhandled
+ * accepts on the passive worker message queue. */
+ if (!vls_listener_wrk_is_active (vls, vls->worker_index))
+ vls_listener_wrk_stop_listen (vls, vls->worker_index);
+ break;
+ default:
+ break;
+ }
+}
+
vls_handle_t
vls_accept (vls_handle_t listener_vlsh, vppcom_endpt_t * ep, int flags)
{
@@ -514,6 +660,8 @@
if (!(vls = vls_get_w_dlock (listener_vlsh)))
return VPPCOM_EBADFD;
+ if (vcl_n_workers () > 1)
+ vls_mp_checks (vls, 1 /* is_add */ );
vls_mt_guard (vls, VLS_MT_OP_SPOOL);
sh = vppcom_session_accept (vls_to_sh_tu (vls), ep, flags);
vls_mt_unguard ();
@@ -597,6 +745,22 @@
return vlsh;
}
+static void
+vls_epoll_ctl_mp_checks (vcl_locked_session_t * vls, int op)
+{
+ if (vcl_n_workers () <= 1)
+ {
+ vlsl->epoll_mp_check = 1;
+ return;
+ }
+
+ if (op == EPOLL_CTL_MOD)
+ return;
+
+ vlsl->epoll_mp_check = 1;
+ vls_mp_checks (vls, op == EPOLL_CTL_ADD);
+}
+
int
vls_epoll_ctl (vls_handle_t ep_vlsh, int op, vls_handle_t vlsh,
struct epoll_event *event)
@@ -610,6 +774,10 @@
vls = vls_get_and_lock (vlsh);
ep_sh = vls_to_sh (ep_vls);
sh = vls_to_sh (vls);
+
+ if (PREDICT_FALSE (!vlsl->epoll_mp_check))
+ vls_epoll_ctl_mp_checks (vls, op);
+
vls_table_runlock ();
rv = vppcom_epoll_ctl (ep_sh, op, sh, event);
@@ -640,59 +808,52 @@
return rv;
}
+static void
+vls_select_mp_checks (vcl_si_set * read_map)
+{
+ vcl_locked_session_t *vls;
+ vcl_worker_t *wrk;
+ vcl_session_t *s;
+ u32 si;
+
+ if (vcl_n_workers () <= 1)
+ {
+ vlsl->select_mp_check = 1;
+ return;
+ }
+
+ if (!read_map)
+ return;
+
+ vlsl->select_mp_check = 1;
+ wrk = vcl_worker_get_current ();
+
+ /* *INDENT-OFF* */
+ clib_bitmap_foreach (si, read_map, ({
+ s = vcl_session_get (wrk, si);
+ if (s->session_state == STATE_LISTEN)
+ {
+ vls = vls_get (vls_session_index_to_vlsh (si));
+ vls_mp_checks (vls, 1 /* is_add */);
+ }
+ }));
+ /* *INDENT-ON* */
+}
+
int
vls_select (int n_bits, vcl_si_set * read_map, vcl_si_set * write_map,
vcl_si_set * except_map, double wait_for_time)
{
int rv;
+
vls_mt_guard (0, VLS_MT_OP_XPOLL);
+ if (PREDICT_FALSE (!vlsl->select_mp_check))
+ vls_select_mp_checks (read_map);
rv = vppcom_select (n_bits, read_map, write_map, except_map, wait_for_time);
vls_mt_unguard ();
return rv;
}
-vcl_session_handle_t
-vlsh_to_sh (vls_handle_t vlsh)
-{
- vcl_locked_session_t *vls;
- int rv;
-
- vls = vls_get_w_dlock (vlsh);
- if (!vls)
- return INVALID_SESSION_ID;
- rv = vls_to_sh (vls);
- vls_dunlock (vls);
- return rv;
-}
-
-vcl_session_handle_t
-vlsh_to_session_index (vls_handle_t vlsh)
-{
- vcl_session_handle_t sh;
- sh = vlsh_to_sh (vlsh);
- return vppcom_session_index (sh);
-}
-
-vls_handle_t
-vls_si_to_vlsh (u32 session_index)
-{
- uword *vlshp;
- vlshp = hash_get (vlsm->session_index_to_vlsh_table, session_index);
- return vlshp ? *vlshp : VLS_INVALID_HANDLE;
-}
-
-vls_handle_t
-vls_session_index_to_vlsh (uint32_t session_index)
-{
- vls_handle_t vlsh;
-
- vls_table_rlock ();
- vlsh = vls_si_to_vlsh (session_index);
- vls_table_runlock ();
-
- return vlsh;
-}
-
static void
vls_unshare_vcl_worker_sessions (vcl_worker_t * wrk)
{
@@ -855,8 +1016,11 @@
parent_wrk->forked_child = vcl_get_worker_index ();
/* Reset number of threads and set wrk index */
- vls_mt_n_threads = 0;
- vls_wrk_index = vcl_get_worker_index ();
+ vlsl->vls_mt_n_threads = 0;
+ vlsl->vls_wrk_index = vcl_get_worker_index ();
+ vlsl->select_mp_check = 0;
+ vlsl->epoll_mp_check = 0;
+ vls_mt_locks_init ();
VDBG (0, "forked child main worker initialized");
vcm->forking = 0;
@@ -884,12 +1048,14 @@
if ((rv = vppcom_app_create (app_name)))
return rv;
-
+ vlsm = clib_mem_alloc (sizeof (vls_main_t));
+ clib_memset (vlsm, 0, sizeof (*vlsm));
clib_rwlock_init (&vlsm->vls_table_lock);
pthread_atfork (vls_app_pre_fork, vls_app_fork_parent_handler,
vls_app_fork_child_handler);
atexit (vls_app_exit);
- vls_wrk_index = vcl_get_worker_index ();
+ vlsl->vls_wrk_index = vcl_get_worker_index ();
+ vls_mt_locks_init ();
return VPPCOM_OK;
}
diff --git a/src/vcl/vcl_private.h b/src/vcl/vcl_private.h
index 1cde850..c61bb0b 100644
--- a/src/vcl/vcl_private.h
+++ b/src/vcl/vcl_private.h
@@ -63,14 +63,15 @@
typedef enum
{
- STATE_START = 0x01,
- STATE_CONNECT = 0x02,
- STATE_LISTEN = 0x04,
- STATE_ACCEPT = 0x08,
- STATE_VPP_CLOSING = 0x10,
- STATE_DISCONNECT = 0x20,
- STATE_FAILED = 0x40,
- STATE_UPDATED = 0x80,
+ STATE_START = 0,
+ STATE_CONNECT = 0x01,
+ STATE_LISTEN = 0x02,
+ STATE_ACCEPT = 0x04,
+ STATE_VPP_CLOSING = 0x08,
+ STATE_DISCONNECT = 0x10,
+ STATE_FAILED = 0x20,
+ STATE_UPDATED = 0x40,
+ STATE_LISTEN_NO_MQ = 0x80,
} session_state_t;
#define SERVER_STATE_OPEN (STATE_ACCEPT|STATE_VPP_CLOSING)
@@ -491,7 +492,7 @@
}
session = pool_elt_at_index (wrk->sessions, p[0]);
- ASSERT (session->session_state & STATE_LISTEN);
+ ASSERT (session->session_state & (STATE_LISTEN | STATE_LISTEN_NO_MQ));
return session;
}
@@ -566,6 +567,12 @@
return vcl_worker_get (vcl_get_worker_index ());
}
+static inline u8
+vcl_n_workers (void)
+{
+ return pool_elts (vcm->workers);
+}
+
static inline svm_msg_q_t *
vcl_session_vpp_evt_q (vcl_worker_t * wrk, vcl_session_t * s)
{
@@ -588,7 +595,7 @@
void vppcom_send_connect_sock (vcl_session_t * session);
void vppcom_send_disconnect_session (u64 vpp_handle);
void vppcom_send_bind_sock (vcl_session_t * session);
-void vppcom_send_unbind_sock (u64 vpp_handle);
+void vppcom_send_unbind_sock (vcl_worker_t * wrk, u64 vpp_handle);
void vppcom_api_hookup (void);
void vppcom_send_application_tls_cert_add (vcl_session_t * session,
char *cert, u32 cert_len);
diff --git a/src/vcl/vppcom.c b/src/vcl/vppcom.c
index fc7d194..570e323 100644
--- a/src/vcl/vppcom.c
+++ b/src/vcl/vppcom.c
@@ -94,6 +94,14 @@
st = "STATE_FAILED";
break;
+ case STATE_UPDATED:
+ st = "STATE_UPDATED";
+ break;
+
+ case STATE_LISTEN_NO_MQ:
+ st = "STATE_LISTEN_NO_MQ";
+ break;
+
default:
st = "UNKNOWN_STATE";
break;
@@ -818,11 +826,11 @@
session->vpp_handle = ~0;
session->session_state = STATE_DISCONNECT;
- VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: sending unbind msg! new state"
- " 0x%x (%s)", getpid (), vpp_handle, session_handle, STATE_DISCONNECT,
+ VDBG (1, "vpp handle 0x%llx, sid %u: sending unbind msg! new state"
+ " 0x%x (%s)", vpp_handle, session_handle, STATE_DISCONNECT,
vppcom_session_state_str (STATE_DISCONNECT));
vcl_evt (VCL_EVT_UNBIND, session);
- vppcom_send_unbind_sock (vpp_handle);
+ vppcom_send_unbind_sock (wrk, vpp_handle);
return VPPCOM_OK;
}