vcl/session: add api for changing session app worker
In case of multi process apps, after forking, the parent may decide to
close part or all of the sessions it shares with the child. Because the
sessions have fifos allocated in the parent's segment manager, they must
be moved to the child's segment manager.
Change-Id: I85b4c8c8545005724023ee14043647719cef61dd
Signed-off-by: Florin Coras <fcoras@cisco.com>
diff --git a/src/vcl/ldp.c b/src/vcl/ldp.c
index ee35396..a61acb9 100644
--- a/src/vcl/ldp.c
+++ b/src/vcl/ldp.c
@@ -57,9 +57,11 @@
typedef struct ldp_fd_entry_
{
u32 session_index;
+ u32 worker_index;
u32 fd;
u32 fd_index;
u32 flags;
+ clib_spinlock_t lock;
} ldp_fd_entry_t;
typedef struct ldp_worker_ctx_
@@ -101,7 +103,7 @@
ldp_worker_ctx_t *workers;
int init;
char app_name[LDP_APP_NAME_MAX];
- u32 sid_bit_val;
+ u32 sh_bit_val;
u32 sid_bit_mask;
u32 debug;
ldp_fd_entry_t *fd_pool;
@@ -119,7 +121,7 @@
clib_warning ("ldp<%d>: " _fmt, getpid(), ##_args)
static ldp_main_t ldp_main = {
- .sid_bit_val = (1 << LDP_SID_BIT_MIN),
+ .sh_bit_val = (1 << LDP_SID_BIT_MIN),
.sid_bit_mask = (1 << LDP_SID_BIT_MIN) - 1,
.debug = LDP_DEBUG_INIT,
};
@@ -154,44 +156,73 @@
return ldp->app_name;
}
+static inline vcl_session_handle_t
+ldp_fd_entry_sh (ldp_fd_entry_t * fde)
+{
+ return vppcom_session_handle (fde->session_index);
+}
+
static int
-ldp_fd_alloc (u32 sid)
+ldp_fd_alloc (vcl_session_handle_t sh)
{
ldp_fd_entry_t *fde;
clib_rwlock_writer_lock (&ldp->fd_table_lock);
- if (pool_elts (ldp->fd_pool) >= (1ULL << 32) - ldp->sid_bit_val)
+ if (pool_elts (ldp->fd_pool) >= (1ULL << 32) - ldp->sh_bit_val)
{
clib_rwlock_writer_unlock (&ldp->fd_table_lock);
return -1;
}
pool_get (ldp->fd_pool, fde);
- fde->session_index = vppcom_session_index (sid);
+ fde->session_index = vppcom_session_index (sh);
+ fde->worker_index = vppcom_session_worker (sh);
fde->fd_index = fde - ldp->fd_pool;
- fde->fd = fde->fd_index + ldp->sid_bit_val;
+ fde->fd = fde->fd_index + ldp->sh_bit_val;
hash_set (ldp->session_index_to_fd_table, fde->session_index, fde->fd);
+ clib_spinlock_init (&fde->lock);
clib_rwlock_writer_unlock (&ldp->fd_table_lock);
return fde->fd;
}
static ldp_fd_entry_t *
-ldp_fd_entry_get_w_lock (u32 fd_index)
+ldp_fd_entry_get (u32 fd_index)
{
- clib_rwlock_reader_lock (&ldp->fd_table_lock);
if (pool_is_free_index (ldp->fd_pool, fd_index))
return 0;
-
return pool_elt_at_index (ldp->fd_pool, fd_index);
}
+static ldp_fd_entry_t *
+ldp_fd_entry_lock (u32 fd_index)
+{
+ ldp_fd_entry_t *fe;
+ clib_rwlock_reader_lock (&ldp->fd_table_lock);
+ if (pool_is_free_index (ldp->fd_pool, fd_index))
+ {
+ clib_rwlock_reader_unlock (&ldp->fd_table_lock);
+ return 0;
+ }
+
+ fe = pool_elt_at_index (ldp->fd_pool, fd_index);
+ clib_spinlock_lock (&fe->lock);
+ return fe;
+}
+
+static void
+ldp_fd_entry_unlock (ldp_fd_entry_t * fde)
+{
+ clib_spinlock_unlock (&fde->lock);
+ clib_rwlock_reader_unlock (&ldp->fd_table_lock);
+}
+
static inline int
-ldp_fd_from_sid (u32 sid)
+ldp_fd_from_sh (vcl_session_handle_t sh)
{
uword *fdp;
int fd;
clib_rwlock_reader_lock (&ldp->fd_table_lock);
- fdp = hash_get (ldp->session_index_to_fd_table, vppcom_session_index (sid));
+ fdp = hash_get (ldp->session_index_to_fd_table, vppcom_session_index (sh));
fd = fdp ? *fdp : -EMFILE;
clib_rwlock_reader_unlock (&ldp->fd_table_lock);
@@ -199,52 +230,63 @@
}
static inline int
-ldp_fd_is_sid (int fd)
+ldp_fd_is_sh (int fd)
{
- return fd >= ldp->sid_bit_val;
+ return fd >= ldp->sh_bit_val;
}
static inline u32
-ldp_sid_from_fd (int fd)
+ldp_sh_from_fd (int fd)
{
u32 fd_index, session_index;
ldp_fd_entry_t *fde;
- if (!ldp_fd_is_sid (fd))
+ if (!ldp_fd_is_sh (fd))
return INVALID_SESSION_ID;
- fd_index = fd - ldp->sid_bit_val;
- fde = ldp_fd_entry_get_w_lock (fd_index);
+ fd_index = fd - ldp->sh_bit_val;
+ fde = ldp_fd_entry_lock (fd_index);
if (!fde)
{
LDBG (0, "unknown fd %d", fd);
- clib_rwlock_reader_unlock (&ldp->fd_table_lock);
return INVALID_SESSION_ID;
}
session_index = fde->session_index;
- clib_rwlock_reader_unlock (&ldp->fd_table_lock);
+ ldp_fd_entry_unlock (fde);
return vppcom_session_handle (session_index);
}
+static ldp_fd_entry_t *
+ldp_fd_entry_lock_w_fd (int fd)
+{
+ u32 fd_index;
+
+ if (!ldp_fd_is_sh (fd))
+ return 0;
+
+ fd_index = fd - ldp->sh_bit_val;
+ return ldp_fd_entry_lock (fd_index);
+}
+
static void
-ldp_fd_free_w_sid (u32 sid)
+ldp_fd_free_w_sh (vcl_session_handle_t sh)
{
ldp_fd_entry_t *fde;
u32 fd_index;
int fd;
- fd = ldp_fd_from_sid (sid);
+ fd = ldp_fd_from_sh (sh);
if (!fd)
return;
- fd_index = fd - ldp->sid_bit_val;
- fde = ldp_fd_entry_get_w_lock (fd_index);
- if (fde)
- {
- hash_unset (ldp->session_index_to_fd_table, fde->session_index);
- pool_put (ldp->fd_pool, fde);
- }
+ fd_index = fd - ldp->sh_bit_val;
+ clib_rwlock_writer_lock (&ldp->fd_table_lock);
+ fde = ldp_fd_entry_get (fd_index);
+ ASSERT (fde != 0);
+ hash_unset (ldp->session_index_to_fd_table, fde->session_index);
+ clib_spinlock_free (&fde->lock);
+ pool_put (ldp->fd_pool, fde);
clib_rwlock_writer_unlock (&ldp->fd_table_lock);
}
@@ -307,38 +349,38 @@
clib_warning ("LDP<%d>: WARNING: Invalid LDP sid bit specified in"
" the env var " LDP_ENV_SID_BIT " (%s)! sid bit "
"value %d (0x%x)", getpid (), env_var_str,
- ldp->sid_bit_val, ldp->sid_bit_val);
+ ldp->sh_bit_val, ldp->sh_bit_val);
}
else if (sb < LDP_SID_BIT_MIN)
{
- ldp->sid_bit_val = (1 << LDP_SID_BIT_MIN);
- ldp->sid_bit_mask = ldp->sid_bit_val - 1;
+ ldp->sh_bit_val = (1 << LDP_SID_BIT_MIN);
+ ldp->sid_bit_mask = ldp->sh_bit_val - 1;
clib_warning ("LDP<%d>: WARNING: LDP sid bit (%u) specified in the"
" env var " LDP_ENV_SID_BIT " (%s) is too small. "
"Using LDP_SID_BIT_MIN (%d)! sid bit value %d (0x%x)",
getpid (), sb, env_var_str, LDP_SID_BIT_MIN,
- ldp->sid_bit_val, ldp->sid_bit_val);
+ ldp->sh_bit_val, ldp->sh_bit_val);
}
else if (sb > LDP_SID_BIT_MAX)
{
- ldp->sid_bit_val = (1 << LDP_SID_BIT_MAX);
- ldp->sid_bit_mask = ldp->sid_bit_val - 1;
+ ldp->sh_bit_val = (1 << LDP_SID_BIT_MAX);
+ ldp->sid_bit_mask = ldp->sh_bit_val - 1;
clib_warning ("LDP<%d>: WARNING: LDP sid bit (%u) specified in the"
" env var " LDP_ENV_SID_BIT " (%s) is too big. Using"
" LDP_SID_BIT_MAX (%d)! sid bit value %d (0x%x)",
getpid (), sb, env_var_str, LDP_SID_BIT_MAX,
- ldp->sid_bit_val, ldp->sid_bit_val);
+ ldp->sh_bit_val, ldp->sh_bit_val);
}
else
{
- ldp->sid_bit_val = (1 << sb);
- ldp->sid_bit_mask = ldp->sid_bit_val - 1;
+ ldp->sh_bit_val = (1 << sb);
+ ldp->sid_bit_mask = ldp->sh_bit_val - 1;
LDBG (0, "configured LDP sid bit (%u) from "
LDP_ENV_SID_BIT "! sid bit value %d (0x%x)", sb,
- ldp->sid_bit_val, ldp->sid_bit_val);
+ ldp->sh_bit_val, ldp->sh_bit_val);
}
}
@@ -352,17 +394,18 @@
int
close (int fd)
{
- int rv, refcnt;
- u32 sid = ldp_sid_from_fd (fd);
+ int rv, refcnt, epfd;
+ ldp_fd_entry_t *fde;
+ u32 sh;
if ((errno = -ldp_init ()))
return -1;
- if (sid != INVALID_SESSION_ID)
+ fde = ldp_fd_entry_lock_w_fd (fd);
+ if (fde)
{
- int epfd;
-
- epfd = vppcom_session_attr (sid, VPPCOM_ATTR_GET_LIBC_EPFD, 0, 0);
+ sh = ldp_fd_entry_sh (fde);
+ epfd = vppcom_session_attr (sh, VPPCOM_ATTR_GET_LIBC_EPFD, 0, 0);
if (epfd > 0)
{
LDBG (0, "fd %d (0x%x): calling libc_close: epfd %u (0x%x)",
@@ -374,7 +417,7 @@
u32 size = sizeof (epfd);
epfd = 0;
- (void) vppcom_session_attr (sid, VPPCOM_ATTR_SET_LIBC_EPFD,
+ (void) vppcom_session_attr (sh, VPPCOM_ATTR_SET_LIBC_EPFD,
&epfd, &size);
}
}
@@ -382,21 +425,24 @@
{
errno = -epfd;
rv = -1;
+ ldp_fd_entry_unlock (fde);
goto done;
}
LDBG (0, "fd %d (0x%x): calling vppcom_session_close: sid %u (0x%x)",
- fd, fd, sid, sid);
+ fd, fd, sh, sh);
- refcnt = vppcom_session_attr (sid, VPPCOM_ATTR_GET_REFCNT, 0, 0);
- rv = vppcom_session_close (sid);
+ refcnt = vppcom_session_attr (sh, VPPCOM_ATTR_GET_REFCNT, 0, 0);
+ rv = vppcom_session_close (sh);
if (rv != VPPCOM_OK)
{
errno = -rv;
rv = -1;
}
+
+ ldp_fd_entry_unlock (fde);
if (refcnt <= 1)
- ldp_fd_free_w_sid (sid);
+ ldp_fd_free_w_sh (sh);
}
else
{
@@ -413,23 +459,27 @@
ssize_t
read (int fd, void *buf, size_t nbytes)
{
+ vcl_session_handle_t sh;
+ ldp_fd_entry_t *fde;
ssize_t size;
- u32 sid = ldp_sid_from_fd (fd);
if ((errno = -ldp_init ()))
return -1;
- if (sid != INVALID_SESSION_ID)
+ fde = ldp_fd_entry_lock_w_fd (fd);
+ if (fde)
{
+ sh = ldp_fd_entry_sh (fde);
LDBG (2, "fd %d (0x%x): calling vppcom_session_read(): sid %u (0x%x),"
- " buf %p, nbytes %u", fd, fd, sid, sid, buf, nbytes);
+ " buf %p, nbytes %u", fd, fd, sh, sh, buf, nbytes);
- size = vppcom_session_read (sid, buf, nbytes);
+ size = vppcom_session_read (sh, buf, nbytes);
if (size < 0)
{
errno = -size;
size = -1;
}
+ ldp_fd_entry_unlock (fde);
}
else
{
@@ -447,7 +497,7 @@
readv (int fd, const struct iovec * iov, int iovcnt)
{
ssize_t size = 0;
- u32 sid = ldp_sid_from_fd (fd);
+ u32 sid = ldp_sh_from_fd (fd);
int rv = 0, i, total = 0;
if ((errno = -ldp_init ()))
@@ -504,23 +554,27 @@
ssize_t
write (int fd, const void *buf, size_t nbytes)
{
+ vcl_session_handle_t sh;
+ ldp_fd_entry_t *fde;
ssize_t size = 0;
- u32 sid = ldp_sid_from_fd (fd);
if ((errno = -ldp_init ()))
return -1;
- if (sid != INVALID_SESSION_ID)
+ fde = ldp_fd_entry_lock_w_fd (fd);
+ if (fde)
{
+ sh = ldp_fd_entry_sh (fde);
LDBG (2, "fd %d (0x%x): calling vppcom_session_write(): sid %u (0x%x), "
- "buf %p, nbytes %u", fd, fd, sid, sid, buf, nbytes);
+ "buf %p, nbytes %u", fd, fd, sh, sh, buf, nbytes);
- size = vppcom_session_write_msg (sid, (void *) buf, nbytes);
+ size = vppcom_session_write_msg (sh, (void *) buf, nbytes);
if (size < 0)
{
errno = -size;
size = -1;
}
+ ldp_fd_entry_unlock (fde);
}
else
{
@@ -538,7 +592,7 @@
writev (int fd, const struct iovec * iov, int iovcnt)
{
ssize_t size = 0, total = 0;
- u32 sid = ldp_sid_from_fd (fd);
+ u32 sid = ldp_sh_from_fd (fd);
int i, rv = 0;
/*
@@ -590,7 +644,7 @@
const char *func_str = __func__;
int rv = 0;
va_list ap;
- u32 sid = ldp_sid_from_fd (fd);
+ u32 sid = ldp_sh_from_fd (fd);
if ((errno = -ldp_init ()))
return -1;
@@ -681,7 +735,7 @@
const char *func_str;
int rv;
va_list ap;
- u32 sid = ldp_sid_from_fd (fd);
+ u32 sid = ldp_sh_from_fd (fd);
if ((errno = -ldp_init ()))
return -1;
@@ -808,7 +862,7 @@
time_out = -1;
- if (nfds <= ldp->sid_bit_val)
+ if (nfds <= ldp->sh_bit_val)
{
func_str = "libc_pselect";
@@ -821,11 +875,11 @@
goto done;
}
- if (PREDICT_FALSE (ldp->sid_bit_val > FD_SETSIZE / 2))
+ if (PREDICT_FALSE (ldp->sh_bit_val > FD_SETSIZE / 2))
{
clib_warning ("LDP<%d>: ERROR: LDP sid bit value %d (0x%x) > "
"FD_SETSIZE/2 %d (0x%x)!", getpid (),
- ldp->sid_bit_val, ldp->sid_bit_val,
+ ldp->sh_bit_val, ldp->sh_bit_val,
FD_SETSIZE / 2, FD_SETSIZE / 2);
errno = EOVERFLOW;
return -1;
@@ -845,7 +899,7 @@
clib_bitmap_foreach (fd, ldpw->rd_bitmap, ({
if (fd > nfds)
break;
- sid = ldp_sid_from_fd (fd);
+ sid = ldp_sh_from_fd (fd);
LDBG (3, "readfds: fd %d (0x%x), sid %u (0x%x)", fd, fd, sid, sid);
if (sid == INVALID_SESSION_ID)
clib_bitmap_set_no_check (ldpw->libc_rd_bitmap, fd, 1);
@@ -877,7 +931,7 @@
clib_bitmap_foreach (fd, ldpw->wr_bitmap, ({
if (fd > nfds)
break;
- sid = ldp_sid_from_fd (fd);
+ sid = ldp_sh_from_fd (fd);
LDBG (3, "writefds: fd %d (0x%x), sid %u (0x%x)", fd, fd, sid, sid);
if (sid == INVALID_SESSION_ID)
clib_bitmap_set_no_check (ldpw->libc_wr_bitmap, fd, 1);
@@ -909,7 +963,7 @@
clib_bitmap_foreach (fd, ldpw->ex_bitmap, ({
if (fd > nfds)
break;
- sid = ldp_sid_from_fd (fd);
+ sid = ldp_sh_from_fd (fd);
LDBG (3, "exceptfds: fd %d (0x%x), sid %u (0x%x)", fd, fd, sid, sid);
if (sid == INVALID_SESSION_ID)
clib_bitmap_set_no_check (ldpw->libc_ex_bitmap, fd, 1);
@@ -977,7 +1031,7 @@
/* *INDENT-OFF* */
clib_bitmap_foreach (sid, ldpw->rd_bitmap,
({
- fd = ldp_fd_from_sid (vppcom_session_handle (sid));
+ fd = ldp_fd_from_sh (vppcom_session_handle (sid));
if (PREDICT_FALSE (fd < 0))
{
errno = EBADFD;
@@ -993,7 +1047,7 @@
/* *INDENT-OFF* */
clib_bitmap_foreach (sid, ldpw->wr_bitmap,
({
- fd = ldp_fd_from_sid (vppcom_session_handle (sid));
+ fd = ldp_fd_from_sh (vppcom_session_handle (sid));
if (PREDICT_FALSE (fd < 0))
{
errno = EBADFD;
@@ -1009,7 +1063,7 @@
/* *INDENT-OFF* */
clib_bitmap_foreach (sid, ldpw->ex_bitmap,
({
- fd = ldp_fd_from_sid (vppcom_session_handle (sid));
+ fd = ldp_fd_from_sh (vppcom_session_handle (sid));
if (PREDICT_FALSE (fd < 0))
{
errno = EBADFD;
@@ -1237,7 +1291,7 @@
bind (int fd, __CONST_SOCKADDR_ARG addr, socklen_t len)
{
int rv;
- u32 sid = ldp_sid_from_fd (fd);
+ u32 sid = ldp_sh_from_fd (fd);
if ((errno = -ldp_init ()))
return -1;
@@ -1361,7 +1415,7 @@
{
int rv;
const char *func_str;
- u32 sid = ldp_sid_from_fd (fd);
+ u32 sid = ldp_sh_from_fd (fd);
if ((errno = -ldp_init ()))
return -1;
@@ -1430,7 +1484,7 @@
connect (int fd, __CONST_SOCKADDR_ARG addr, socklen_t len)
{
int rv;
- u32 sid = ldp_sid_from_fd (fd);
+ u32 sid = ldp_sh_from_fd (fd);
if ((errno = -ldp_init ()))
return -1;
@@ -1516,7 +1570,7 @@
{
int rv;
const char *func_str;
- u32 sid = ldp_sid_from_fd (fd);
+ u32 sid = ldp_sh_from_fd (fd);
if ((errno = -ldp_init ()))
return -1;
@@ -1586,7 +1640,7 @@
{
ssize_t size;
const char *func_str;
- u32 sid = ldp_sid_from_fd (fd);
+ u32 sid = ldp_sh_from_fd (fd);
if ((errno = -ldp_init ()))
return -1;
@@ -1644,7 +1698,7 @@
ldp_worker_ctx_t *ldpw = ldp_worker_get_current ();
ssize_t size = 0;
const char *func_str;
- u32 sid = ldp_sid_from_fd (out_fd);
+ u32 sid = ldp_sh_from_fd (out_fd);
if ((errno = -ldp_init ()))
return -1;
@@ -1888,7 +1942,7 @@
if ((errno = -ldp_init ()))
return -1;
- sid = ldp_sid_from_fd (fd);
+ sid = ldp_sh_from_fd (fd);
if (sid != INVALID_SESSION_ID)
{
LDBG (2, "fd %d (0x%x): calling vcl recvfrom: sid %u (0x%x), buf %p,"
@@ -1915,7 +1969,7 @@
{
ssize_t size;
const char *func_str = __func__;
- u32 sid = ldp_sid_from_fd (fd);
+ u32 sid = ldp_sh_from_fd (fd);
if ((errno = -ldp_init ()))
return -1;
@@ -2006,7 +2060,7 @@
{
ssize_t size;
const char *func_str;
- u32 sid = ldp_sid_from_fd (fd);
+ u32 sid = ldp_sh_from_fd (fd);
if ((errno = -ldp_init ()))
return -1;
@@ -2076,7 +2130,7 @@
{
ssize_t size;
const char *func_str;
- u32 sid = ldp_sid_from_fd (fd);
+ u32 sid = ldp_sh_from_fd (fd);
if ((errno = -ldp_init ()))
return -1;
@@ -2125,7 +2179,7 @@
{
ssize_t size;
const char *func_str;
- u32 sid = ldp_sid_from_fd (fd);
+ u32 sid = ldp_sh_from_fd (fd);
if ((errno = -ldp_init ()))
return -1;
@@ -2172,7 +2226,7 @@
{
ssize_t size;
const char *func_str;
- u32 sid = ldp_sid_from_fd (fd);
+ u32 sid = ldp_sh_from_fd (fd);
if ((errno = -ldp_init ()))
return -1;
@@ -2222,7 +2276,7 @@
{
ssize_t size;
const char *func_str;
- u32 sid = ldp_sid_from_fd (fd);
+ u32 sid = ldp_sh_from_fd (fd);
if ((errno = -ldp_init ()))
return -1;
@@ -2271,7 +2325,7 @@
{
int rv;
const char *func_str = __func__;
- u32 sid = ldp_sid_from_fd (fd);
+ u32 sid = ldp_sh_from_fd (fd);
u32 buflen = optlen ? (u32) * optlen : 0;
if ((errno = -ldp_init ()))
@@ -2499,7 +2553,7 @@
{
int rv;
const char *func_str = __func__;
- u32 sid = ldp_sid_from_fd (fd);
+ u32 sid = ldp_sh_from_fd (fd);
if ((errno = -ldp_init ()))
return -1;
@@ -2663,7 +2717,7 @@
listen (int fd, int n)
{
int rv;
- u32 sid = ldp_sid_from_fd (fd);
+ u32 sid = ldp_sh_from_fd (fd);
if ((errno = -ldp_init ()))
return -1;
@@ -2696,13 +2750,14 @@
socklen_t * __restrict addr_len, int flags)
{
int rv;
- u32 listen_sid = ldp_sid_from_fd (listen_fd);
- int accept_sid;
+ u32 listen_sh;
+ int accept_sh;
if ((errno = -ldp_init ()))
return -1;
- if (listen_sid != INVALID_SESSION_ID)
+ listen_sh = ldp_sh_from_fd (listen_fd);
+ if (listen_sh != INVALID_SESSION_ID)
{
vppcom_endpt_t ep;
u8 src_addr[sizeof (struct sockaddr_in6)];
@@ -2711,12 +2766,12 @@
LDBG (0, "listen fd %d (0x%x): calling vppcom_session_accept:"
" listen sid %u (0x%x), ep %p, flags 0x%x", listen_fd,
- listen_fd, listen_sid, listen_sid, ep, flags);
+ listen_fd, listen_sh, listen_sh, ep, flags);
- accept_sid = vppcom_session_accept (listen_sid, &ep, flags);
- if (accept_sid < 0)
+ accept_sh = vppcom_session_accept (listen_sh, &ep, flags);
+ if (accept_sh < 0)
{
- errno = -accept_sid;
+ errno = -accept_sh;
rv = -1;
}
else
@@ -2724,16 +2779,16 @@
rv = ldp_copy_ep_to_sockaddr (addr, addr_len, &ep);
if (rv != VPPCOM_OK)
{
- (void) vppcom_session_close ((u32) accept_sid);
+ (void) vppcom_session_close ((u32) accept_sh);
errno = -rv;
rv = -1;
}
else
{
- rv = ldp_fd_alloc ((u32) accept_sid);
+ rv = ldp_fd_alloc ((u32) accept_sh);
if (rv < 0)
{
- (void) vppcom_session_close ((u32) accept_sid);
+ (void) vppcom_session_close ((u32) accept_sh);
errno = -rv;
rv = -1;
}
@@ -2776,15 +2831,14 @@
if ((errno = -ldp_init ()))
return -1;
- if (ldp_fd_is_sid (fd))
+ if (ldp_fd_is_sh (fd))
{
- u32 fd_index = fd - ldp->sid_bit_val;
+ u32 fd_index = fd - ldp->sh_bit_val;
ldp_fd_entry_t *fde;
- fde = ldp_fd_entry_get_w_lock (fd_index);
+ fde = ldp_fd_entry_lock (fd_index);
if (!fde)
{
- clib_rwlock_reader_unlock (&ldp->fd_table_lock);
errno = ENOTCONN;
return -1;
}
@@ -2799,7 +2853,7 @@
if ((fde->flags & LDP_F_SHUT_RD) && (fde->flags & LDP_F_SHUT_WR))
rv = close (fd);
- clib_rwlock_reader_unlock (&ldp->fd_table_lock);
+ ldp_fd_entry_unlock (fde);
LDBG (0, "fd %d (0x%x): calling vcl shutdown: how %d", fd, fd, how);
}
else
@@ -2869,7 +2923,7 @@
int
epoll_ctl (int epfd, int op, int fd, struct epoll_event *event)
{
- u32 vep_idx = ldp_sid_from_fd (epfd), sid;
+ u32 vep_idx = ldp_sh_from_fd (epfd), sid;
const char *func_str;
int rv;
@@ -2892,7 +2946,7 @@
goto done;
}
- sid = ldp_sid_from_fd (fd);
+ sid = ldp_sh_from_fd (fd);
LDBG (0, "epfd %d (0x%x), vep_idx %d (0x%x), sid %d (0x%x)",
epfd, epfd, vep_idx, vep_idx, sid, sid);
@@ -2995,7 +3049,7 @@
{
ldp_worker_ctx_t *ldpw = ldp_worker_get_current ();
double time_to_wait = (double) 0, time_out, now = 0;
- u32 vep_idx = ldp_sid_from_fd (epfd);
+ u32 vep_idx = ldp_sh_from_fd (epfd);
int libc_epfd, rv = 0;
if ((errno = -ldp_init ()))
@@ -3115,7 +3169,7 @@
LDBG (3, "fds[%d] fd %d (0x%0x) events = 0x%x revents = 0x%x",
i, fds[i].fd, fds[i].fd, fds[i].events, fds[i].revents);
- sid = ldp_sid_from_fd (fds[i].fd);
+ sid = ldp_sh_from_fd (fds[i].fd);
if (sid != INVALID_SESSION_ID)
{
fds[i].fd = -fds[i].fd;
diff --git a/src/vcl/vcl_bapi.c b/src/vcl/vcl_bapi.c
index de5e80a..5b9a9d5 100644
--- a/src/vcl/vcl_bapi.c
+++ b/src/vcl/vcl_bapi.c
@@ -610,21 +610,6 @@
}
void
-vppcom_send_accept_session_reply (u64 handle, u32 context, int retval)
-{
- vcl_worker_t *wrk = vcl_worker_get_current ();
- vl_api_accept_session_reply_t *rmp;
-
- rmp = vl_msg_api_alloc (sizeof (*rmp));
- memset (rmp, 0, sizeof (*rmp));
- rmp->_vl_msg_id = ntohs (VL_API_ACCEPT_SESSION_REPLY);
- rmp->retval = htonl (retval);
- rmp->context = context;
- rmp->handle = handle;
- vl_msg_api_send_shmem (wrk->vl_input_queue, (u8 *) & rmp);
-}
-
-void
vppcom_send_application_tls_cert_add (vcl_session_t * session, char *cert,
u32 cert_len)
{
diff --git a/src/vcl/vcl_private.c b/src/vcl/vcl_private.c
index d82a7ff..3266431 100644
--- a/src/vcl/vcl_private.c
+++ b/src/vcl/vcl_private.c
@@ -375,21 +375,23 @@
vcl_session_t * new_s)
{
vcl_shared_session_t *ss;
- vcl_session_t *s;
+ vcl_session_t *old_s;
- s = vcl_session_get (parent, new_s->session_index);
- if (s->shared_index == ~0)
+ if (new_s->shared_index == ~0)
{
ss = vcl_shared_session_alloc ();
+ ss->session_index = new_s->session_index;
vec_add1 (ss->workers, parent->wrk_index);
- s->shared_index = ss->ss_index;
+ vec_add1 (ss->workers, wrk->wrk_index);
+ new_s->shared_index = ss->ss_index;
+ old_s = vcl_session_get (parent, new_s->session_index);
+ old_s->shared_index = ss->ss_index;
}
else
{
- ss = vcl_shared_session_get (s->shared_index);
+ ss = vcl_shared_session_get (new_s->shared_index);
+ vec_add1 (ss->workers, wrk->wrk_index);
}
- new_s->shared_index = ss->ss_index;
- vec_add1 (ss->workers, wrk->wrk_index);
}
int
@@ -414,6 +416,12 @@
return 1;
}
+ /* If the first removed and not last, start session worker change.
+ * First request goes to vpp and vpp reflects it back to the right
+ * worker */
+ if (i == 0)
+ vcl_send_session_worker_update (wrk, s, ss->workers[0]);
+
return 0;
}
diff --git a/src/vcl/vcl_private.h b/src/vcl/vcl_private.h
index 9dce518..2ae4b72 100644
--- a/src/vcl/vcl_private.h
+++ b/src/vcl/vcl_private.h
@@ -69,7 +69,8 @@
STATE_ACCEPT = 0x08,
STATE_VPP_CLOSING = 0x10,
STATE_DISCONNECT = 0x20,
- STATE_FAILED = 0x40
+ STATE_FAILED = 0x40,
+ STATE_UPDATED = 0x80,
} session_state_t;
#define SERVER_STATE_OPEN (STATE_ACCEPT|STATE_VPP_CLOSING)
@@ -144,6 +145,7 @@
{
u32 ss_index;
u32 *workers;
+ u32 session_index;
} vcl_shared_session_t;
typedef struct
@@ -287,6 +289,8 @@
/** Vector of unhandled events */
session_event_t *unhandled_evts_vector;
+ u32 *pending_session_wrk_updates;
+
/** Used also as a thread stop key buffer */
pthread_t thread_id;
@@ -517,6 +521,7 @@
int vcl_worker_set_bapi (void);
void vcl_worker_share_sessions (vcl_worker_t * parent_wrk);
int vcl_worker_unshare_session (vcl_worker_t * wrk, vcl_session_t * s);
+vcl_shared_session_t *vcl_shared_session_get (u32 ss_index);
int vcl_session_get_refcnt (vcl_session_t * s);
void vcl_segment_table_add (u64 segment_handle, u32 svm_segment_index);
@@ -543,6 +548,17 @@
return vcl_worker_get (vcl_get_worker_index ());
}
+static inline svm_msg_q_t *
+vcl_session_vpp_evt_q (vcl_worker_t * wrk, vcl_session_t * s)
+{
+ if (vcl_session_is_ct (s))
+ return wrk->vpp_event_queues[0];
+ else
+ return wrk->vpp_event_queues[s->vpp_thread_index];
+}
+
+void vcl_send_session_worker_update (vcl_worker_t * wrk, vcl_session_t * s,
+ u32 wrk_index);
/*
* VCL Binary API
*/
@@ -556,12 +572,10 @@
void vppcom_send_bind_sock (vcl_session_t * session);
void vppcom_send_unbind_sock (u64 vpp_handle);
void vppcom_api_hookup (void);
-void vppcom_send_accept_session_reply (u64 vpp_handle, u32 context, int rv);
void vppcom_send_application_tls_cert_add (vcl_session_t * session,
char *cert, u32 cert_len);
-void
-vppcom_send_application_tls_key_add (vcl_session_t * session, char *key,
- u32 key_len);
+void vppcom_send_application_tls_key_add (vcl_session_t * session, char *key,
+ u32 key_len);
void vcl_send_app_worker_add_del (u8 is_add);
void vcl_send_child_worker_del (vcl_worker_t * wrk);
diff --git a/src/vcl/vppcom.c b/src/vcl/vppcom.c
index 70afdce..6a1bf1c 100644
--- a/src/vcl/vppcom.c
+++ b/src/vcl/vppcom.c
@@ -44,6 +44,22 @@
return 1;
}
+static inline int
+vcl_mq_dequeue_batch (vcl_worker_t * wrk, svm_msg_q_t * mq)
+{
+ svm_msg_q_msg_t *msg;
+ u32 n_msgs;
+ int i;
+
+ n_msgs = svm_msg_q_size (mq);
+ for (i = 0; i < n_msgs; i++)
+ {
+ vec_add2 (wrk->mq_msg_vector, msg, 1);
+ svm_msg_q_sub_w_lock (mq, msg);
+ }
+ return n_msgs;
+}
+
const char *
vppcom_session_state_str (session_state_t state)
{
@@ -175,15 +191,6 @@
*/
-static svm_msg_q_t *
-vcl_session_vpp_evt_q (vcl_worker_t * wrk, vcl_session_t * s)
-{
- if (vcl_session_is_ct (s))
- return wrk->vpp_event_queues[0];
- else
- return wrk->vpp_event_queues[s->vpp_thread_index];
-}
-
static void
vcl_send_session_accepted_reply (svm_msg_q_t * mq, u32 context,
session_handle_t handle, int retval)
@@ -227,6 +234,24 @@
app_send_ctrl_evt_to_vpp (mq, app_evt);
}
+void
+vcl_send_session_worker_update (vcl_worker_t * wrk, vcl_session_t * s,
+ u32 wrk_index)
+{
+ app_session_evt_t _app_evt, *app_evt = &_app_evt;
+ session_worker_update_msg_t *mp;
+ svm_msg_q_t *mq;
+
+ mq = vcl_session_vpp_evt_q (wrk, s);
+ app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_WORKER_UPDATE);
+ mp = (session_worker_update_msg_t *) app_evt->evt->data;
+ mp->client_index = wrk->my_client_index;
+ mp->handle = s->vpp_handle;
+ mp->req_wrk_index = wrk->vpp_wrk_index;
+ mp->wrk_index = wrk_index;
+ app_send_ctrl_evt_to_vpp (mq, app_evt);
+}
+
static u32
vcl_session_accepted_handler (vcl_worker_t * wrk, session_accepted_msg_t * mp)
{
@@ -540,7 +565,6 @@
/* Caught a disconnect before actually accepting the session */
if (session->session_state == STATE_LISTEN)
{
-
if (!vcl_flag_accepted_session (session, msg->handle,
VCL_ACCEPTED_F_CLOSED))
VDBG (0, "session was not accepted!");
@@ -551,6 +575,59 @@
return session;
}
+static void
+vcl_session_req_worker_update_handler (vcl_worker_t * wrk, void *data)
+{
+ session_req_worker_update_msg_t *msg;
+ vcl_session_t *s;
+
+ msg = (session_req_worker_update_msg_t *) data;
+ s = vcl_session_get_w_vpp_handle (wrk, msg->session_handle);
+ if (!s)
+ return;
+
+ vec_add1 (wrk->pending_session_wrk_updates, s->session_index);
+}
+
+static void
+vcl_session_worker_update_reply_handler (vcl_worker_t * wrk, void *data)
+{
+ session_worker_update_reply_msg_t *msg;
+ vcl_session_t *s;
+
+ msg = (session_worker_update_reply_msg_t *) data;
+ s = vcl_session_get_w_vpp_handle (wrk, msg->handle);
+ if (!s)
+ {
+ VDBG (0, "unknown handle 0x%llx", msg->handle);
+ return;
+ }
+ if (vcl_wait_for_segment (msg->segment_handle))
+ {
+ clib_warning ("segment for session %u couldn't be mounted!",
+ s->session_index);
+ return;
+ }
+ s->rx_fifo = uword_to_pointer (msg->rx_fifo, svm_fifo_t *);
+ s->tx_fifo = uword_to_pointer (msg->tx_fifo, svm_fifo_t *);
+
+ s->rx_fifo->client_session_index = s->session_index;
+ s->tx_fifo->client_session_index = s->session_index;
+ s->rx_fifo->client_thread_index = wrk->wrk_index;
+ s->tx_fifo->client_thread_index = wrk->wrk_index;
+ s->session_state = STATE_UPDATED;
+
+ if (s->shared_index != VCL_INVALID_SESSION_INDEX)
+ {
+ vcl_shared_session_t *ss;
+ ss = vcl_shared_session_get (s->shared_index);
+ if (vec_len (ss->workers) > 1)
+ VDBG (0, "workers need to be updated");
+ }
+ VDBG (0, "session %u[0x%llx] moved to worker %u", s->session_index,
+ s->vpp_handle, wrk->wrk_index);
+}
+
static int
vcl_handle_mq_event (vcl_worker_t * wrk, session_event_t * e)
{
@@ -587,13 +664,19 @@
case SESSION_CTRL_EVT_BOUND:
vcl_session_bound_handler (wrk, (session_bound_msg_t *) e->data);
break;
+ case SESSION_CTRL_EVT_REQ_WORKER_UPDATE:
+ vcl_session_req_worker_update_handler (wrk, e->data);
+ break;
+ case SESSION_CTRL_EVT_WORKER_UPDATE_REPLY:
+ vcl_session_worker_update_reply_handler (wrk, e->data);
+ break;
default:
clib_warning ("unhandled %u", e->event_type);
}
return VPPCOM_OK;
}
-static inline int
+static int
vppcom_wait_for_session_state_change (u32 session_index,
session_state_t state,
f64 wait_for_time)
@@ -638,6 +721,52 @@
return VPPCOM_ETIMEDOUT;
}
+static void
+vcl_handle_pending_wrk_updates (vcl_worker_t * wrk)
+{
+ session_state_t state;
+ vcl_session_t *s;
+ u32 *sip;
+
+ if (PREDICT_TRUE (vec_len (wrk->pending_session_wrk_updates) == 0))
+ return;
+
+ vec_foreach (sip, wrk->pending_session_wrk_updates)
+ {
+ s = vcl_session_get (wrk, *sip);
+ vcl_send_session_worker_update (wrk, s, wrk->wrk_index);
+ state = s->session_state;
+ vppcom_wait_for_session_state_change (s->session_index, STATE_UPDATED, 5);
+ s->session_state = state;
+ }
+ vec_reset_length (wrk->pending_session_wrk_updates);
+}
+
+static void
+vcl_flush_mq_events (void)
+{
+ vcl_worker_t *wrk = vcl_worker_get_current ();
+ svm_msg_q_msg_t *msg;
+ session_event_t *e;
+ svm_msg_q_t *mq;
+ int i;
+
+ mq = wrk->app_event_queue;
+ svm_msg_q_lock (mq);
+ vcl_mq_dequeue_batch (wrk, mq);
+ svm_msg_q_unlock (mq);
+
+ for (i = 0; i < vec_len (wrk->mq_msg_vector); i++)
+ {
+ msg = vec_elt_at_index (wrk->mq_msg_vector, i);
+ e = svm_msg_q_msg_data (mq, msg);
+ vcl_handle_mq_event (wrk, e);
+ svm_msg_q_free_msg (mq, msg);
+ }
+ vec_reset_length (wrk->mq_msg_vector);
+ vcl_handle_pending_wrk_updates (wrk);
+}
+
static int
vppcom_app_session_enable (void)
{
@@ -845,13 +974,14 @@
vcl_app_pre_fork (void)
{
vcl_incercept_sigchld ();
+ vcl_flush_mq_events ();
}
static void
vcl_app_fork_child_handler (void)
{
+ vcl_worker_t *parent_wrk, *wrk;
int rv, parent_wrk_index;
- vcl_worker_t *parent_wrk;
u8 *child_name;
parent_wrk_index = vcl_get_worker_index ();
@@ -884,6 +1014,8 @@
*/
vcl_worker_register_with_vpp ();
parent_wrk = vcl_worker_get (parent_wrk_index);
+ wrk = vcl_worker_get_current ();
+ wrk->vpp_event_queues = vec_dup (parent_wrk->vpp_event_queues);
vcl_worker_share_sessions (parent_wrk);
parent_wrk->forked_child = vcl_get_worker_index ();
@@ -1097,7 +1229,11 @@
}
if (!do_disconnect)
- goto cleanup;
+ {
+ VDBG (0, "session handle %u [0x%llx] disconnect skipped",
+ session_handle, vpp_handle);
+ goto cleanup;
+ }
if (state & STATE_LISTEN)
{
@@ -1143,10 +1279,7 @@
vcl_ct_registration_unlock (wrk);
}
- if (vpp_handle != ~0)
- {
- vcl_session_table_del_vpp_handle (wrk, vpp_handle);
- }
+ vcl_session_table_del_vpp_handle (wrk, vpp_handle);
vcl_session_free (wrk, session);
VDBG (0, "session handle %u [0x%llx] removed", session_handle, vpp_handle);
@@ -1948,22 +2081,6 @@
return svm_fifo_max_enqueue (session->tx_fifo);
}
-static inline int
-vcl_mq_dequeue_batch (vcl_worker_t * wrk, svm_msg_q_t * mq)
-{
- svm_msg_q_msg_t *msg;
- u32 n_msgs;
- int i;
-
- n_msgs = svm_msg_q_size (mq);
- for (i = 0; i < n_msgs; i++)
- {
- vec_add2 (wrk->mq_msg_vector, msg, 1);
- svm_msg_q_sub_w_lock (mq, msg);
- }
- return n_msgs;
-}
-
#define vcl_fifo_rx_evt_valid_or_break(_fifo) \
if (PREDICT_FALSE (svm_fifo_is_empty (_fifo))) \
{ \
@@ -2067,6 +2184,12 @@
*bits_set += 1;
}
break;
+ case SESSION_CTRL_EVT_WORKER_UPDATE_REPLY:
+ vcl_session_worker_update_reply_handler (wrk, e->data);
+ break;
+ case SESSION_CTRL_EVT_REQ_WORKER_UPDATE:
+ vcl_session_req_worker_update_handler (wrk, e->data);
+ break;
default:
clib_warning ("unhandled: %u", e->event_type);
break;
@@ -2122,6 +2245,7 @@
svm_msg_q_free_msg (mq, msg);
}
vec_reset_length (wrk->mq_msg_vector);
+ vcl_handle_pending_wrk_updates (wrk);
return *bits_set;
}
@@ -2676,6 +2800,12 @@
session_evt_data = session->vep.ev.data.u64;
session_events = session->vep.ev.events;
break;
+ case SESSION_CTRL_EVT_REQ_WORKER_UPDATE:
+ vcl_session_req_worker_update_handler (wrk, e->data);
+ break;
+ case SESSION_CTRL_EVT_WORKER_UPDATE_REPLY:
+ vcl_session_worker_update_reply_handler (wrk, e->data);
+ break;
default:
VDBG (0, "unhandled: %u", e->event_type);
break;
@@ -2741,7 +2871,7 @@
svm_msg_q_free_msg (mq, msg);
}
vec_reset_length (wrk->mq_msg_vector);
-
+ vcl_handle_pending_wrk_updates (wrk);
return *num_ev;
}
@@ -3580,12 +3710,18 @@
}
int
-vppcom_session_index (uint32_t session_handle)
+vppcom_session_index (vcl_session_handle_t session_handle)
{
return session_handle & 0xFFFFFF;
}
int
+vppcom_session_worker (vcl_session_handle_t session_handle)
+{
+ return session_handle >> 24;
+}
+
+int
vppcom_session_handle (uint32_t session_index)
{
return (vcl_get_worker_index () << 24) | session_index;
diff --git a/src/vcl/vppcom.h b/src/vcl/vppcom.h
index 641946b..f2fca09 100644
--- a/src/vcl/vppcom.h
+++ b/src/vcl/vppcom.h
@@ -97,6 +97,8 @@
uint16_t port;
} vppcom_endpt_t;
+typedef uint32_t vcl_session_handle_t;
+
typedef enum
{
VPPCOM_OK = 0,
@@ -277,7 +279,8 @@
extern int vppcom_poll (vcl_poll_t * vp, uint32_t n_sids,
double wait_for_time);
extern int vppcom_mq_epoll_fd (void);
-extern int vppcom_session_index (uint32_t session_handle);
+extern int vppcom_session_index (vcl_session_handle_t session_handle);
+extern int vppcom_session_worker (vcl_session_handle_t session_handle);
extern int vppcom_session_handle (uint32_t session_index);
extern int vppcom_session_read_segments (uint32_t session_handle,
diff --git a/src/vnet/session/application.c b/src/vnet/session/application.c
index 19c8fa2..85b5f93 100644
--- a/src/vnet/session/application.c
+++ b/src/vnet/session/application.c
@@ -724,6 +724,48 @@
return 0;
}
+int
+app_worker_own_session (app_worker_t * app_wrk, stream_session_t * s)
+{
+ segment_manager_t *sm;
+ svm_fifo_t *rxf, *txf;
+
+ s->app_wrk_index = app_wrk->wrk_index;
+
+ rxf = s->server_rx_fifo;
+ txf = s->server_tx_fifo;
+
+ if (!rxf || !txf)
+ return 0;
+
+ s->server_rx_fifo = 0;
+ s->server_tx_fifo = 0;
+
+ sm = app_worker_get_or_alloc_connect_segment_manager (app_wrk);
+ if (session_alloc_fifos (sm, s))
+ return -1;
+
+ if (!svm_fifo_is_empty (rxf))
+ {
+ clib_memcpy_fast (s->server_rx_fifo->data, rxf->data, rxf->nitems);
+ s->server_rx_fifo->head = rxf->head;
+ s->server_rx_fifo->tail = rxf->tail;
+ s->server_rx_fifo->cursize = rxf->cursize;
+ }
+
+ if (!svm_fifo_is_empty (txf))
+ {
+ clib_memcpy_fast (s->server_tx_fifo->data, txf->data, txf->nitems);
+ s->server_tx_fifo->head = txf->head;
+ s->server_tx_fifo->tail = txf->tail;
+ s->server_tx_fifo->cursize = txf->cursize;
+ }
+
+ segment_manager_dealloc_fifos (rxf->segment_index, rxf, txf);
+
+ return 0;
+}
+
/**
* Start listening local transport endpoint for requested transport.
*
@@ -890,6 +932,14 @@
}
segment_manager_t *
+app_worker_get_or_alloc_connect_segment_manager (app_worker_t * app_wrk)
+{
+ if (app_wrk->connects_seg_manager == (u32) ~ 0)
+ app_worker_alloc_connects_segment_manager (app_wrk);
+ return segment_manager_get (app_wrk->connects_seg_manager);
+}
+
+segment_manager_t *
app_worker_get_listen_segment_manager (app_worker_t * app,
stream_session_t * listener)
{
diff --git a/src/vnet/session/application.h b/src/vnet/session/application.h
index e33f2ff..1d2064d 100644
--- a/src/vnet/session/application.h
+++ b/src/vnet/session/application.h
@@ -225,12 +225,15 @@
app_worker_t *app_worker_get (u32 wrk_index);
app_worker_t *app_worker_get_if_valid (u32 wrk_index);
application_t *app_worker_get_app (u32 wrk_index);
+int app_worker_own_session (app_worker_t * app_wrk, stream_session_t * s);
void app_worker_free (app_worker_t * app_wrk);
int app_worker_open_session (app_worker_t * app, session_endpoint_t * tep,
u32 api_context);
segment_manager_t *app_worker_get_listen_segment_manager (app_worker_t *,
stream_session_t *);
segment_manager_t *app_worker_get_connect_segment_manager (app_worker_t *);
+segment_manager_t
+ * app_worker_get_or_alloc_connect_segment_manager (app_worker_t *);
int app_worker_alloc_connects_segment_manager (app_worker_t * app);
int app_worker_add_segment_notify (u32 app_or_wrk, u64 segment_handle);
u32 app_worker_n_listeners (app_worker_t * app);
diff --git a/src/vnet/session/application_interface.h b/src/vnet/session/application_interface.h
index a156c82..9c48faa 100644
--- a/src/vnet/session/application_interface.h
+++ b/src/vnet/session/application_interface.h
@@ -220,9 +220,9 @@
u8 lcl_is_ip4;
u8 lcl_ip[16];
u16 lcl_port;
- u64 rx_fifo;
- u64 tx_fifo;
- u64 vpp_evt_q;
+ uword rx_fifo;
+ uword tx_fifo;
+ uword vpp_evt_q;
u32 segment_size;
u8 segment_name_length;
u8 segment_name[128];
@@ -233,12 +233,12 @@
u32 context;
u64 listener_handle;
u64 handle;
- u64 server_rx_fifo;
- u64 server_tx_fifo;
+ uword server_rx_fifo;
+ uword server_tx_fifo;
u64 segment_handle;
- u64 vpp_event_queue_address;
- u64 server_event_queue_address;
- u64 client_event_queue_address;
+ uword vpp_event_queue_address;
+ uword server_event_queue_address;
+ uword client_event_queue_address;
u16 port;
u8 is_ip4;
u8 ip[16];
@@ -260,12 +260,12 @@
u32 context;
i32 retval;
u64 handle;
- u64 server_rx_fifo;
- u64 server_tx_fifo;
+ uword server_rx_fifo;
+ uword server_tx_fifo;
u64 segment_handle;
- u64 vpp_event_queue_address;
- u64 client_event_queue_address;
- u64 server_event_queue_address;
+ uword vpp_event_queue_address;
+ uword client_event_queue_address;
+ uword server_event_queue_address;
u32 segment_size;
u8 segment_name_length;
u8 segment_name[64];
@@ -302,6 +302,28 @@
u64 handle;
} __clib_packed session_reset_reply_msg_t;
+typedef struct session_req_worker_update_msg_
+{
+ u64 session_handle;
+} __clib_packed session_req_worker_update_msg_t;
+
+/* NOTE: using u16 for wrk indices because message needs to fit in 18B */
+typedef struct session_worker_update_msg_
+{
+ u32 client_index;
+ u16 wrk_index;
+ u16 req_wrk_index;
+ u64 handle;
+} __clib_packed session_worker_update_msg_t;
+
+typedef struct session_worker_update_reply_msg_
+{
+ u64 handle;
+ uword rx_fifo;
+ uword tx_fifo;
+ u64 segment_handle;
+} __clib_packed session_worker_update_reply_msg_t;
+
typedef struct app_session_event_
{
svm_msg_q_msg_t msg;
diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h
index e3c7300..cf1b3e9 100644
--- a/src/vnet/session/session.h
+++ b/src/vnet/session/session.h
@@ -49,7 +49,10 @@
SESSION_CTRL_EVT_DISCONNECTED,
SESSION_CTRL_EVT_DISCONNECTED_REPLY,
SESSION_CTRL_EVT_RESET,
- SESSION_CTRL_EVT_RESET_REPLY
+ SESSION_CTRL_EVT_RESET_REPLY,
+ SESSION_CTRL_EVT_REQ_WORKER_UPDATE,
+ SESSION_CTRL_EVT_WORKER_UPDATE,
+ SESSION_CTRL_EVT_WORKER_UPDATE_REPLY,
} session_evt_type_t;
static inline const char *
diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c
index 98965f3..880f163 100644
--- a/src/vnet/session/session_node.c
+++ b/src/vnet/session/session_node.c
@@ -173,7 +173,7 @@
svm_msg_q_unlock (app_wrk->event_queue);
evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
clib_memset (evt, 0, sizeof (*evt));
- evt->event_type = SESSION_CTRL_EVT_DISCONNECTED;
+ evt->event_type = SESSION_CTRL_EVT_DISCONNECTED_REPLY;
rmp = (session_disconnected_reply_msg_t *) evt->data;
rmp->handle = mp->handle;
rmp->context = mp->context;
@@ -207,6 +207,86 @@
}
}
+static void
+session_mq_worker_update_handler (void *data)
+{
+ session_worker_update_msg_t *mp = (session_worker_update_msg_t *) data;
+ session_worker_update_reply_msg_t *rmp;
+ svm_msg_q_msg_t _msg, *msg = &_msg;
+ app_worker_t *app_wrk;
+ u32 owner_app_wrk_map;
+ session_event_t *evt;
+ stream_session_t *s;
+ application_t *app;
+
+ app = application_lookup (mp->client_index);
+ if (!app)
+ return;
+ if (!(s = session_get_from_handle_if_valid (mp->handle)))
+ {
+ clib_warning ("invalid handle %llu", mp->handle);
+ return;
+ }
+ app_wrk = app_worker_get (s->app_wrk_index);
+ if (app_wrk->app_index != app->app_index)
+ {
+ clib_warning ("app %u does not own session %llu", app->app_index,
+ mp->handle);
+ return;
+ }
+ owner_app_wrk_map = app_wrk->wrk_map_index;
+ app_wrk = application_get_worker (app, mp->wrk_index);
+
+ /* This needs to come from the new owner */
+ if (mp->req_wrk_index == owner_app_wrk_map)
+ {
+ session_req_worker_update_msg_t *wump;
+
+ svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
+ SESSION_MQ_CTRL_EVT_RING,
+ SVM_Q_WAIT, msg);
+ svm_msg_q_unlock (app_wrk->event_queue);
+ evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
+ clib_memset (evt, 0, sizeof (*evt));
+ evt->event_type = SESSION_CTRL_EVT_REQ_WORKER_UPDATE;
+ wump = (session_req_worker_update_msg_t *) evt->data;
+ wump->session_handle = mp->handle;
+ svm_msg_q_add (app_wrk->event_queue, msg, SVM_Q_WAIT);
+ return;
+ }
+
+ app_worker_own_session (app_wrk, s);
+
+ /*
+ * Send reply
+ */
+ svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
+ SESSION_MQ_CTRL_EVT_RING,
+ SVM_Q_WAIT, msg);
+ svm_msg_q_unlock (app_wrk->event_queue);
+ evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
+ clib_memset (evt, 0, sizeof (*evt));
+ evt->event_type = SESSION_CTRL_EVT_WORKER_UPDATE_REPLY;
+ rmp = (session_worker_update_reply_msg_t *) evt->data;
+ rmp->handle = mp->handle;
+ rmp->rx_fifo = pointer_to_uword (s->server_rx_fifo);
+ rmp->tx_fifo = pointer_to_uword (s->server_tx_fifo);
+ rmp->segment_handle = session_segment_handle (s);
+ svm_msg_q_add (app_wrk->event_queue, msg, SVM_Q_WAIT);
+
+ /*
+ * Retransmit messages that may have been lost
+ */
+ if (!svm_fifo_is_empty (s->server_tx_fifo))
+ session_send_io_evt_to_thread (s->server_tx_fifo, FIFO_EVENT_APP_TX);
+
+ if (!svm_fifo_is_empty (s->server_rx_fifo))
+ app_worker_lock_and_send_event (app_wrk, s, FIFO_EVENT_APP_RX);
+
+ if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
+ app->cb_fns.session_disconnect_callback (s);
+}
+
vlib_node_registration_t session_queue_node;
typedef struct
@@ -936,6 +1016,9 @@
case SESSION_CTRL_EVT_RESET_REPLY:
session_mq_reset_reply_handler (e->data);
break;
+ case SESSION_CTRL_EVT_WORKER_UPDATE:
+ session_mq_worker_update_handler (e->data);
+ break;
default:
clib_warning ("unhandled event type %d", e->event_type);
}