vcl/session: add api for changing session app worker

In case of multi process apps, after forking, the parent may decide to
close part or all of the sessions it shares with the child. Because the
sessions have fifos allocated in the parent's segment manager, they must
be moved to the child's segment manager.

Change-Id: I85b4c8c8545005724023ee14043647719cef61dd
Signed-off-by: Florin Coras <fcoras@cisco.com>
diff --git a/src/vcl/ldp.c b/src/vcl/ldp.c
index ee35396..a61acb9 100644
--- a/src/vcl/ldp.c
+++ b/src/vcl/ldp.c
@@ -57,9 +57,11 @@
 typedef struct ldp_fd_entry_
 {
   u32 session_index;
+  u32 worker_index;
   u32 fd;
   u32 fd_index;
   u32 flags;
+  clib_spinlock_t lock;
 } ldp_fd_entry_t;
 
 typedef struct ldp_worker_ctx_
@@ -101,7 +103,7 @@
   ldp_worker_ctx_t *workers;
   int init;
   char app_name[LDP_APP_NAME_MAX];
-  u32 sid_bit_val;
+  u32 sh_bit_val;
   u32 sid_bit_mask;
   u32 debug;
   ldp_fd_entry_t *fd_pool;
@@ -119,7 +121,7 @@
     clib_warning ("ldp<%d>: " _fmt, getpid(), ##_args)
 
 static ldp_main_t ldp_main = {
-  .sid_bit_val = (1 << LDP_SID_BIT_MIN),
+  .sh_bit_val = (1 << LDP_SID_BIT_MIN),
   .sid_bit_mask = (1 << LDP_SID_BIT_MIN) - 1,
   .debug = LDP_DEBUG_INIT,
 };
@@ -154,44 +156,73 @@
   return ldp->app_name;
 }
 
+static inline vcl_session_handle_t
+ldp_fd_entry_sh (ldp_fd_entry_t * fde)
+{
+  return vppcom_session_handle (fde->session_index);
+}
+
 static int
-ldp_fd_alloc (u32 sid)
+ldp_fd_alloc (vcl_session_handle_t sh)
 {
   ldp_fd_entry_t *fde;
 
   clib_rwlock_writer_lock (&ldp->fd_table_lock);
-  if (pool_elts (ldp->fd_pool) >= (1ULL << 32) - ldp->sid_bit_val)
+  if (pool_elts (ldp->fd_pool) >= (1ULL << 32) - ldp->sh_bit_val)
     {
       clib_rwlock_writer_unlock (&ldp->fd_table_lock);
       return -1;
     }
   pool_get (ldp->fd_pool, fde);
-  fde->session_index = vppcom_session_index (sid);
+  fde->session_index = vppcom_session_index (sh);
+  fde->worker_index = vppcom_session_worker (sh);
   fde->fd_index = fde - ldp->fd_pool;
-  fde->fd = fde->fd_index + ldp->sid_bit_val;
+  fde->fd = fde->fd_index + ldp->sh_bit_val;
   hash_set (ldp->session_index_to_fd_table, fde->session_index, fde->fd);
+  clib_spinlock_init (&fde->lock);
   clib_rwlock_writer_unlock (&ldp->fd_table_lock);
   return fde->fd;
 }
 
 static ldp_fd_entry_t *
-ldp_fd_entry_get_w_lock (u32 fd_index)
+ldp_fd_entry_get (u32 fd_index)
 {
-  clib_rwlock_reader_lock (&ldp->fd_table_lock);
   if (pool_is_free_index (ldp->fd_pool, fd_index))
     return 0;
-
   return pool_elt_at_index (ldp->fd_pool, fd_index);
 }
 
+static ldp_fd_entry_t *
+ldp_fd_entry_lock (u32 fd_index)
+{
+  ldp_fd_entry_t *fe;
+  clib_rwlock_reader_lock (&ldp->fd_table_lock);
+  if (pool_is_free_index (ldp->fd_pool, fd_index))
+    {
+      clib_rwlock_reader_unlock (&ldp->fd_table_lock);
+      return 0;
+    }
+
+  fe = pool_elt_at_index (ldp->fd_pool, fd_index);
+  clib_spinlock_lock (&fe->lock);
+  return fe;
+}
+
+static void
+ldp_fd_entry_unlock (ldp_fd_entry_t * fde)
+{
+  clib_spinlock_unlock (&fde->lock);
+  clib_rwlock_reader_unlock (&ldp->fd_table_lock);
+}
+
 static inline int
-ldp_fd_from_sid (u32 sid)
+ldp_fd_from_sh (vcl_session_handle_t sh)
 {
   uword *fdp;
   int fd;
 
   clib_rwlock_reader_lock (&ldp->fd_table_lock);
-  fdp = hash_get (ldp->session_index_to_fd_table, vppcom_session_index (sid));
+  fdp = hash_get (ldp->session_index_to_fd_table, vppcom_session_index (sh));
   fd = fdp ? *fdp : -EMFILE;
   clib_rwlock_reader_unlock (&ldp->fd_table_lock);
 
@@ -199,52 +230,63 @@
 }
 
 static inline int
-ldp_fd_is_sid (int fd)
+ldp_fd_is_sh (int fd)
 {
-  return fd >= ldp->sid_bit_val;
+  return fd >= ldp->sh_bit_val;
 }
 
 static inline u32
-ldp_sid_from_fd (int fd)
+ldp_sh_from_fd (int fd)
 {
   u32 fd_index, session_index;
   ldp_fd_entry_t *fde;
 
-  if (!ldp_fd_is_sid (fd))
+  if (!ldp_fd_is_sh (fd))
     return INVALID_SESSION_ID;
 
-  fd_index = fd - ldp->sid_bit_val;
-  fde = ldp_fd_entry_get_w_lock (fd_index);
+  fd_index = fd - ldp->sh_bit_val;
+  fde = ldp_fd_entry_lock (fd_index);
   if (!fde)
     {
       LDBG (0, "unknown fd %d", fd);
-      clib_rwlock_reader_unlock (&ldp->fd_table_lock);
       return INVALID_SESSION_ID;
     }
   session_index = fde->session_index;
-  clib_rwlock_reader_unlock (&ldp->fd_table_lock);
+  ldp_fd_entry_unlock (fde);
 
   return vppcom_session_handle (session_index);
 }
 
+static ldp_fd_entry_t *
+ldp_fd_entry_lock_w_fd (int fd)
+{
+  u32 fd_index;
+
+  if (!ldp_fd_is_sh (fd))
+    return 0;
+
+  fd_index = fd - ldp->sh_bit_val;
+  return ldp_fd_entry_lock (fd_index);
+}
+
 static void
-ldp_fd_free_w_sid (u32 sid)
+ldp_fd_free_w_sh (vcl_session_handle_t sh)
 {
   ldp_fd_entry_t *fde;
   u32 fd_index;
   int fd;
 
-  fd = ldp_fd_from_sid (sid);
+  fd = ldp_fd_from_sh (sh);
   if (!fd)
     return;
 
-  fd_index = fd - ldp->sid_bit_val;
-  fde = ldp_fd_entry_get_w_lock (fd_index);
-  if (fde)
-    {
-      hash_unset (ldp->session_index_to_fd_table, fde->session_index);
-      pool_put (ldp->fd_pool, fde);
-    }
+  fd_index = fd - ldp->sh_bit_val;
+  clib_rwlock_writer_lock (&ldp->fd_table_lock);
+  fde = ldp_fd_entry_get (fd_index);
+  ASSERT (fde != 0);
+  hash_unset (ldp->session_index_to_fd_table, fde->session_index);
+  clib_spinlock_free (&fde->lock);
+  pool_put (ldp->fd_pool, fde);
   clib_rwlock_writer_unlock (&ldp->fd_table_lock);
 }
 
@@ -307,38 +349,38 @@
 	  clib_warning ("LDP<%d>: WARNING: Invalid LDP sid bit specified in"
 			" the env var " LDP_ENV_SID_BIT " (%s)! sid bit "
 			"value %d (0x%x)", getpid (), env_var_str,
-			ldp->sid_bit_val, ldp->sid_bit_val);
+			ldp->sh_bit_val, ldp->sh_bit_val);
 	}
       else if (sb < LDP_SID_BIT_MIN)
 	{
-	  ldp->sid_bit_val = (1 << LDP_SID_BIT_MIN);
-	  ldp->sid_bit_mask = ldp->sid_bit_val - 1;
+	  ldp->sh_bit_val = (1 << LDP_SID_BIT_MIN);
+	  ldp->sid_bit_mask = ldp->sh_bit_val - 1;
 
 	  clib_warning ("LDP<%d>: WARNING: LDP sid bit (%u) specified in the"
 			" env var " LDP_ENV_SID_BIT " (%s) is too small. "
 			"Using LDP_SID_BIT_MIN (%d)! sid bit value %d (0x%x)",
 			getpid (), sb, env_var_str, LDP_SID_BIT_MIN,
-			ldp->sid_bit_val, ldp->sid_bit_val);
+			ldp->sh_bit_val, ldp->sh_bit_val);
 	}
       else if (sb > LDP_SID_BIT_MAX)
 	{
-	  ldp->sid_bit_val = (1 << LDP_SID_BIT_MAX);
-	  ldp->sid_bit_mask = ldp->sid_bit_val - 1;
+	  ldp->sh_bit_val = (1 << LDP_SID_BIT_MAX);
+	  ldp->sid_bit_mask = ldp->sh_bit_val - 1;
 
 	  clib_warning ("LDP<%d>: WARNING: LDP sid bit (%u) specified in the"
 			" env var " LDP_ENV_SID_BIT " (%s) is too big. Using"
 			" LDP_SID_BIT_MAX (%d)! sid bit value %d (0x%x)",
 			getpid (), sb, env_var_str, LDP_SID_BIT_MAX,
-			ldp->sid_bit_val, ldp->sid_bit_val);
+			ldp->sh_bit_val, ldp->sh_bit_val);
 	}
       else
 	{
-	  ldp->sid_bit_val = (1 << sb);
-	  ldp->sid_bit_mask = ldp->sid_bit_val - 1;
+	  ldp->sh_bit_val = (1 << sb);
+	  ldp->sid_bit_mask = ldp->sh_bit_val - 1;
 
 	  LDBG (0, "configured LDP sid bit (%u) from "
 		LDP_ENV_SID_BIT "!  sid bit value %d (0x%x)", sb,
-		ldp->sid_bit_val, ldp->sid_bit_val);
+		ldp->sh_bit_val, ldp->sh_bit_val);
 	}
     }
 
@@ -352,17 +394,18 @@
 int
 close (int fd)
 {
-  int rv, refcnt;
-  u32 sid = ldp_sid_from_fd (fd);
+  int rv, refcnt, epfd;
+  ldp_fd_entry_t *fde;
+  u32 sh;
 
   if ((errno = -ldp_init ()))
     return -1;
 
-  if (sid != INVALID_SESSION_ID)
+  fde = ldp_fd_entry_lock_w_fd (fd);
+  if (fde)
     {
-      int epfd;
-
-      epfd = vppcom_session_attr (sid, VPPCOM_ATTR_GET_LIBC_EPFD, 0, 0);
+      sh = ldp_fd_entry_sh (fde);
+      epfd = vppcom_session_attr (sh, VPPCOM_ATTR_GET_LIBC_EPFD, 0, 0);
       if (epfd > 0)
 	{
 	  LDBG (0, "fd %d (0x%x): calling libc_close: epfd %u (0x%x)",
@@ -374,7 +417,7 @@
 	      u32 size = sizeof (epfd);
 	      epfd = 0;
 
-	      (void) vppcom_session_attr (sid, VPPCOM_ATTR_SET_LIBC_EPFD,
+	      (void) vppcom_session_attr (sh, VPPCOM_ATTR_SET_LIBC_EPFD,
 					  &epfd, &size);
 	    }
 	}
@@ -382,21 +425,24 @@
 	{
 	  errno = -epfd;
 	  rv = -1;
+	  ldp_fd_entry_unlock (fde);
 	  goto done;
 	}
 
       LDBG (0, "fd %d (0x%x): calling vppcom_session_close: sid %u (0x%x)",
-	    fd, fd, sid, sid);
+	    fd, fd, sh, sh);
 
-      refcnt = vppcom_session_attr (sid, VPPCOM_ATTR_GET_REFCNT, 0, 0);
-      rv = vppcom_session_close (sid);
+      refcnt = vppcom_session_attr (sh, VPPCOM_ATTR_GET_REFCNT, 0, 0);
+      rv = vppcom_session_close (sh);
       if (rv != VPPCOM_OK)
 	{
 	  errno = -rv;
 	  rv = -1;
 	}
+
+      ldp_fd_entry_unlock (fde);
       if (refcnt <= 1)
-	ldp_fd_free_w_sid (sid);
+	ldp_fd_free_w_sh (sh);
     }
   else
     {
@@ -413,23 +459,27 @@
 ssize_t
 read (int fd, void *buf, size_t nbytes)
 {
+  vcl_session_handle_t sh;
+  ldp_fd_entry_t *fde;
   ssize_t size;
-  u32 sid = ldp_sid_from_fd (fd);
 
   if ((errno = -ldp_init ()))
     return -1;
 
-  if (sid != INVALID_SESSION_ID)
+  fde = ldp_fd_entry_lock_w_fd (fd);
+  if (fde)
     {
+      sh = ldp_fd_entry_sh (fde);
       LDBG (2, "fd %d (0x%x): calling vppcom_session_read(): sid %u (0x%x),"
-	    " buf %p, nbytes %u", fd, fd, sid, sid, buf, nbytes);
+	    " buf %p, nbytes %u", fd, fd, sh, sh, buf, nbytes);
 
-      size = vppcom_session_read (sid, buf, nbytes);
+      size = vppcom_session_read (sh, buf, nbytes);
       if (size < 0)
 	{
 	  errno = -size;
 	  size = -1;
 	}
+      ldp_fd_entry_unlock (fde);
     }
   else
     {
@@ -447,7 +497,7 @@
 readv (int fd, const struct iovec * iov, int iovcnt)
 {
   ssize_t size = 0;
-  u32 sid = ldp_sid_from_fd (fd);
+  u32 sid = ldp_sh_from_fd (fd);
   int rv = 0, i, total = 0;
 
   if ((errno = -ldp_init ()))
@@ -504,23 +554,27 @@
 ssize_t
 write (int fd, const void *buf, size_t nbytes)
 {
+  vcl_session_handle_t sh;
+  ldp_fd_entry_t *fde;
   ssize_t size = 0;
-  u32 sid = ldp_sid_from_fd (fd);
 
   if ((errno = -ldp_init ()))
     return -1;
 
-  if (sid != INVALID_SESSION_ID)
+  fde = ldp_fd_entry_lock_w_fd (fd);
+  if (fde)
     {
+      sh = ldp_fd_entry_sh (fde);
       LDBG (2, "fd %d (0x%x): calling vppcom_session_write(): sid %u (0x%x), "
-	    "buf %p, nbytes %u", fd, fd, sid, sid, buf, nbytes);
+	    "buf %p, nbytes %u", fd, fd, sh, sh, buf, nbytes);
 
-      size = vppcom_session_write_msg (sid, (void *) buf, nbytes);
+      size = vppcom_session_write_msg (sh, (void *) buf, nbytes);
       if (size < 0)
 	{
 	  errno = -size;
 	  size = -1;
 	}
+      ldp_fd_entry_unlock (fde);
     }
   else
     {
@@ -538,7 +592,7 @@
 writev (int fd, const struct iovec * iov, int iovcnt)
 {
   ssize_t size = 0, total = 0;
-  u32 sid = ldp_sid_from_fd (fd);
+  u32 sid = ldp_sh_from_fd (fd);
   int i, rv = 0;
 
   /*
@@ -590,7 +644,7 @@
   const char *func_str = __func__;
   int rv = 0;
   va_list ap;
-  u32 sid = ldp_sid_from_fd (fd);
+  u32 sid = ldp_sh_from_fd (fd);
 
   if ((errno = -ldp_init ()))
     return -1;
@@ -681,7 +735,7 @@
   const char *func_str;
   int rv;
   va_list ap;
-  u32 sid = ldp_sid_from_fd (fd);
+  u32 sid = ldp_sh_from_fd (fd);
 
   if ((errno = -ldp_init ()))
     return -1;
@@ -808,7 +862,7 @@
     time_out = -1;
 
 
-  if (nfds <= ldp->sid_bit_val)
+  if (nfds <= ldp->sh_bit_val)
     {
       func_str = "libc_pselect";
 
@@ -821,11 +875,11 @@
       goto done;
     }
 
-  if (PREDICT_FALSE (ldp->sid_bit_val > FD_SETSIZE / 2))
+  if (PREDICT_FALSE (ldp->sh_bit_val > FD_SETSIZE / 2))
     {
       clib_warning ("LDP<%d>: ERROR: LDP sid bit value %d (0x%x) > "
 		    "FD_SETSIZE/2 %d (0x%x)!", getpid (),
-		    ldp->sid_bit_val, ldp->sid_bit_val,
+		    ldp->sh_bit_val, ldp->sh_bit_val,
 		    FD_SETSIZE / 2, FD_SETSIZE / 2);
       errno = EOVERFLOW;
       return -1;
@@ -845,7 +899,7 @@
       clib_bitmap_foreach (fd, ldpw->rd_bitmap, ({
 	if (fd > nfds)
 	  break;
-        sid = ldp_sid_from_fd (fd);
+        sid = ldp_sh_from_fd (fd);
         LDBG (3, "readfds: fd %d (0x%x), sid %u (0x%x)", fd, fd, sid, sid);
         if (sid == INVALID_SESSION_ID)
           clib_bitmap_set_no_check (ldpw->libc_rd_bitmap, fd, 1);
@@ -877,7 +931,7 @@
       clib_bitmap_foreach (fd, ldpw->wr_bitmap, ({
 	if (fd > nfds)
 	  break;
-        sid = ldp_sid_from_fd (fd);
+        sid = ldp_sh_from_fd (fd);
         LDBG (3, "writefds: fd %d (0x%x), sid %u (0x%x)", fd, fd, sid, sid);
         if (sid == INVALID_SESSION_ID)
           clib_bitmap_set_no_check (ldpw->libc_wr_bitmap, fd, 1);
@@ -909,7 +963,7 @@
       clib_bitmap_foreach (fd, ldpw->ex_bitmap, ({
 	if (fd > nfds)
 	  break;
-        sid = ldp_sid_from_fd (fd);
+        sid = ldp_sh_from_fd (fd);
         LDBG (3, "exceptfds: fd %d (0x%x), sid %u (0x%x)", fd, fd, sid, sid);
         if (sid == INVALID_SESSION_ID)
           clib_bitmap_set_no_check (ldpw->libc_ex_bitmap, fd, 1);
@@ -977,7 +1031,7 @@
                       /* *INDENT-OFF* */
                       clib_bitmap_foreach (sid, ldpw->rd_bitmap,
                         ({
-                          fd = ldp_fd_from_sid (vppcom_session_handle (sid));
+                          fd = ldp_fd_from_sh (vppcom_session_handle (sid));
                           if (PREDICT_FALSE (fd < 0))
                             {
                               errno = EBADFD;
@@ -993,7 +1047,7 @@
                       /* *INDENT-OFF* */
                       clib_bitmap_foreach (sid, ldpw->wr_bitmap,
                         ({
-                          fd = ldp_fd_from_sid (vppcom_session_handle (sid));
+                          fd = ldp_fd_from_sh (vppcom_session_handle (sid));
                           if (PREDICT_FALSE (fd < 0))
                             {
                               errno = EBADFD;
@@ -1009,7 +1063,7 @@
                       /* *INDENT-OFF* */
                       clib_bitmap_foreach (sid, ldpw->ex_bitmap,
                         ({
-                          fd = ldp_fd_from_sid (vppcom_session_handle (sid));
+                          fd = ldp_fd_from_sh (vppcom_session_handle (sid));
                           if (PREDICT_FALSE (fd < 0))
                             {
                               errno = EBADFD;
@@ -1237,7 +1291,7 @@
 bind (int fd, __CONST_SOCKADDR_ARG addr, socklen_t len)
 {
   int rv;
-  u32 sid = ldp_sid_from_fd (fd);
+  u32 sid = ldp_sh_from_fd (fd);
 
   if ((errno = -ldp_init ()))
     return -1;
@@ -1361,7 +1415,7 @@
 {
   int rv;
   const char *func_str;
-  u32 sid = ldp_sid_from_fd (fd);
+  u32 sid = ldp_sh_from_fd (fd);
 
   if ((errno = -ldp_init ()))
     return -1;
@@ -1430,7 +1484,7 @@
 connect (int fd, __CONST_SOCKADDR_ARG addr, socklen_t len)
 {
   int rv;
-  u32 sid = ldp_sid_from_fd (fd);
+  u32 sid = ldp_sh_from_fd (fd);
 
   if ((errno = -ldp_init ()))
     return -1;
@@ -1516,7 +1570,7 @@
 {
   int rv;
   const char *func_str;
-  u32 sid = ldp_sid_from_fd (fd);
+  u32 sid = ldp_sh_from_fd (fd);
 
   if ((errno = -ldp_init ()))
     return -1;
@@ -1586,7 +1640,7 @@
 {
   ssize_t size;
   const char *func_str;
-  u32 sid = ldp_sid_from_fd (fd);
+  u32 sid = ldp_sh_from_fd (fd);
 
   if ((errno = -ldp_init ()))
     return -1;
@@ -1644,7 +1698,7 @@
   ldp_worker_ctx_t *ldpw = ldp_worker_get_current ();
   ssize_t size = 0;
   const char *func_str;
-  u32 sid = ldp_sid_from_fd (out_fd);
+  u32 sid = ldp_sh_from_fd (out_fd);
 
   if ((errno = -ldp_init ()))
     return -1;
@@ -1888,7 +1942,7 @@
   if ((errno = -ldp_init ()))
     return -1;
 
-  sid = ldp_sid_from_fd (fd);
+  sid = ldp_sh_from_fd (fd);
   if (sid != INVALID_SESSION_ID)
     {
       LDBG (2, "fd %d (0x%x): calling vcl recvfrom: sid %u (0x%x), buf %p,"
@@ -1915,7 +1969,7 @@
 {
   ssize_t size;
   const char *func_str = __func__;
-  u32 sid = ldp_sid_from_fd (fd);
+  u32 sid = ldp_sh_from_fd (fd);
 
   if ((errno = -ldp_init ()))
     return -1;
@@ -2006,7 +2060,7 @@
 {
   ssize_t size;
   const char *func_str;
-  u32 sid = ldp_sid_from_fd (fd);
+  u32 sid = ldp_sh_from_fd (fd);
 
   if ((errno = -ldp_init ()))
     return -1;
@@ -2076,7 +2130,7 @@
 {
   ssize_t size;
   const char *func_str;
-  u32 sid = ldp_sid_from_fd (fd);
+  u32 sid = ldp_sh_from_fd (fd);
 
   if ((errno = -ldp_init ()))
     return -1;
@@ -2125,7 +2179,7 @@
 {
   ssize_t size;
   const char *func_str;
-  u32 sid = ldp_sid_from_fd (fd);
+  u32 sid = ldp_sh_from_fd (fd);
 
   if ((errno = -ldp_init ()))
     return -1;
@@ -2172,7 +2226,7 @@
 {
   ssize_t size;
   const char *func_str;
-  u32 sid = ldp_sid_from_fd (fd);
+  u32 sid = ldp_sh_from_fd (fd);
 
   if ((errno = -ldp_init ()))
     return -1;
@@ -2222,7 +2276,7 @@
 {
   ssize_t size;
   const char *func_str;
-  u32 sid = ldp_sid_from_fd (fd);
+  u32 sid = ldp_sh_from_fd (fd);
 
   if ((errno = -ldp_init ()))
     return -1;
@@ -2271,7 +2325,7 @@
 {
   int rv;
   const char *func_str = __func__;
-  u32 sid = ldp_sid_from_fd (fd);
+  u32 sid = ldp_sh_from_fd (fd);
   u32 buflen = optlen ? (u32) * optlen : 0;
 
   if ((errno = -ldp_init ()))
@@ -2499,7 +2553,7 @@
 {
   int rv;
   const char *func_str = __func__;
-  u32 sid = ldp_sid_from_fd (fd);
+  u32 sid = ldp_sh_from_fd (fd);
 
   if ((errno = -ldp_init ()))
     return -1;
@@ -2663,7 +2717,7 @@
 listen (int fd, int n)
 {
   int rv;
-  u32 sid = ldp_sid_from_fd (fd);
+  u32 sid = ldp_sh_from_fd (fd);
 
   if ((errno = -ldp_init ()))
     return -1;
@@ -2696,13 +2750,14 @@
 	     socklen_t * __restrict addr_len, int flags)
 {
   int rv;
-  u32 listen_sid = ldp_sid_from_fd (listen_fd);
-  int accept_sid;
+  u32 listen_sh;
+  int accept_sh;
 
   if ((errno = -ldp_init ()))
     return -1;
 
-  if (listen_sid != INVALID_SESSION_ID)
+  listen_sh = ldp_sh_from_fd (listen_fd);
+  if (listen_sh != INVALID_SESSION_ID)
     {
       vppcom_endpt_t ep;
       u8 src_addr[sizeof (struct sockaddr_in6)];
@@ -2711,12 +2766,12 @@
 
       LDBG (0, "listen fd %d (0x%x): calling vppcom_session_accept:"
 	    " listen sid %u (0x%x), ep %p, flags 0x%x", listen_fd,
-	    listen_fd, listen_sid, listen_sid, ep, flags);
+	    listen_fd, listen_sh, listen_sh, ep, flags);
 
-      accept_sid = vppcom_session_accept (listen_sid, &ep, flags);
-      if (accept_sid < 0)
+      accept_sh = vppcom_session_accept (listen_sh, &ep, flags);
+      if (accept_sh < 0)
 	{
-	  errno = -accept_sid;
+	  errno = -accept_sh;
 	  rv = -1;
 	}
       else
@@ -2724,16 +2779,16 @@
 	  rv = ldp_copy_ep_to_sockaddr (addr, addr_len, &ep);
 	  if (rv != VPPCOM_OK)
 	    {
-	      (void) vppcom_session_close ((u32) accept_sid);
+	      (void) vppcom_session_close ((u32) accept_sh);
 	      errno = -rv;
 	      rv = -1;
 	    }
 	  else
 	    {
-	      rv = ldp_fd_alloc ((u32) accept_sid);
+	      rv = ldp_fd_alloc ((u32) accept_sh);
 	      if (rv < 0)
 		{
-		  (void) vppcom_session_close ((u32) accept_sid);
+		  (void) vppcom_session_close ((u32) accept_sh);
 		  errno = -rv;
 		  rv = -1;
 		}
@@ -2776,15 +2831,14 @@
   if ((errno = -ldp_init ()))
     return -1;
 
-  if (ldp_fd_is_sid (fd))
+  if (ldp_fd_is_sh (fd))
     {
-      u32 fd_index = fd - ldp->sid_bit_val;
+      u32 fd_index = fd - ldp->sh_bit_val;
       ldp_fd_entry_t *fde;
 
-      fde = ldp_fd_entry_get_w_lock (fd_index);
+      fde = ldp_fd_entry_lock (fd_index);
       if (!fde)
 	{
-	  clib_rwlock_reader_unlock (&ldp->fd_table_lock);
 	  errno = ENOTCONN;
 	  return -1;
 	}
@@ -2799,7 +2853,7 @@
       if ((fde->flags & LDP_F_SHUT_RD) && (fde->flags & LDP_F_SHUT_WR))
 	rv = close (fd);
 
-      clib_rwlock_reader_unlock (&ldp->fd_table_lock);
+      ldp_fd_entry_unlock (fde);
       LDBG (0, "fd %d (0x%x): calling vcl shutdown: how %d", fd, fd, how);
     }
   else
@@ -2869,7 +2923,7 @@
 int
 epoll_ctl (int epfd, int op, int fd, struct epoll_event *event)
 {
-  u32 vep_idx = ldp_sid_from_fd (epfd), sid;
+  u32 vep_idx = ldp_sh_from_fd (epfd), sid;
   const char *func_str;
   int rv;
 
@@ -2892,7 +2946,7 @@
       goto done;
     }
 
-  sid = ldp_sid_from_fd (fd);
+  sid = ldp_sh_from_fd (fd);
 
   LDBG (0, "epfd %d (0x%x), vep_idx %d (0x%x), sid %d (0x%x)",
 	epfd, epfd, vep_idx, vep_idx, sid, sid);
@@ -2995,7 +3049,7 @@
 {
   ldp_worker_ctx_t *ldpw = ldp_worker_get_current ();
   double time_to_wait = (double) 0, time_out, now = 0;
-  u32 vep_idx = ldp_sid_from_fd (epfd);
+  u32 vep_idx = ldp_sh_from_fd (epfd);
   int libc_epfd, rv = 0;
 
   if ((errno = -ldp_init ()))
@@ -3115,7 +3169,7 @@
       LDBG (3, "fds[%d] fd %d (0x%0x) events = 0x%x revents = 0x%x",
 	    i, fds[i].fd, fds[i].fd, fds[i].events, fds[i].revents);
 
-      sid = ldp_sid_from_fd (fds[i].fd);
+      sid = ldp_sh_from_fd (fds[i].fd);
       if (sid != INVALID_SESSION_ID)
 	{
 	  fds[i].fd = -fds[i].fd;
diff --git a/src/vcl/vcl_bapi.c b/src/vcl/vcl_bapi.c
index de5e80a..5b9a9d5 100644
--- a/src/vcl/vcl_bapi.c
+++ b/src/vcl/vcl_bapi.c
@@ -610,21 +610,6 @@
 }
 
 void
-vppcom_send_accept_session_reply (u64 handle, u32 context, int retval)
-{
-  vcl_worker_t *wrk = vcl_worker_get_current ();
-  vl_api_accept_session_reply_t *rmp;
-
-  rmp = vl_msg_api_alloc (sizeof (*rmp));
-  memset (rmp, 0, sizeof (*rmp));
-  rmp->_vl_msg_id = ntohs (VL_API_ACCEPT_SESSION_REPLY);
-  rmp->retval = htonl (retval);
-  rmp->context = context;
-  rmp->handle = handle;
-  vl_msg_api_send_shmem (wrk->vl_input_queue, (u8 *) & rmp);
-}
-
-void
 vppcom_send_application_tls_cert_add (vcl_session_t * session, char *cert,
 				      u32 cert_len)
 {
diff --git a/src/vcl/vcl_private.c b/src/vcl/vcl_private.c
index d82a7ff..3266431 100644
--- a/src/vcl/vcl_private.c
+++ b/src/vcl/vcl_private.c
@@ -375,21 +375,23 @@
 			  vcl_session_t * new_s)
 {
   vcl_shared_session_t *ss;
-  vcl_session_t *s;
+  vcl_session_t *old_s;
 
-  s = vcl_session_get (parent, new_s->session_index);
-  if (s->shared_index == ~0)
+  if (new_s->shared_index == ~0)
     {
       ss = vcl_shared_session_alloc ();
+      ss->session_index = new_s->session_index;
       vec_add1 (ss->workers, parent->wrk_index);
-      s->shared_index = ss->ss_index;
+      vec_add1 (ss->workers, wrk->wrk_index);
+      new_s->shared_index = ss->ss_index;
+      old_s = vcl_session_get (parent, new_s->session_index);
+      old_s->shared_index = ss->ss_index;
     }
   else
     {
-      ss = vcl_shared_session_get (s->shared_index);
+      ss = vcl_shared_session_get (new_s->shared_index);
+      vec_add1 (ss->workers, wrk->wrk_index);
     }
-  new_s->shared_index = ss->ss_index;
-  vec_add1 (ss->workers, wrk->wrk_index);
 }
 
 int
@@ -414,6 +416,12 @@
       return 1;
     }
 
+  /* If the first removed and not last, start session worker change.
+   * First request goes to vpp and vpp reflects it back to the right
+   * worker */
+  if (i == 0)
+    vcl_send_session_worker_update (wrk, s, ss->workers[0]);
+
   return 0;
 }
 
diff --git a/src/vcl/vcl_private.h b/src/vcl/vcl_private.h
index 9dce518..2ae4b72 100644
--- a/src/vcl/vcl_private.h
+++ b/src/vcl/vcl_private.h
@@ -69,7 +69,8 @@
   STATE_ACCEPT = 0x08,
   STATE_VPP_CLOSING = 0x10,
   STATE_DISCONNECT = 0x20,
-  STATE_FAILED = 0x40
+  STATE_FAILED = 0x40,
+  STATE_UPDATED = 0x80,
 } session_state_t;
 
 #define SERVER_STATE_OPEN  (STATE_ACCEPT|STATE_VPP_CLOSING)
@@ -144,6 +145,7 @@
 {
   u32 ss_index;
   u32 *workers;
+  u32 session_index;
 } vcl_shared_session_t;
 
 typedef struct
@@ -287,6 +289,8 @@
   /** Vector of unhandled events */
   session_event_t *unhandled_evts_vector;
 
+  u32 *pending_session_wrk_updates;
+
   /** Used also as a thread stop key buffer */
   pthread_t thread_id;
 
@@ -517,6 +521,7 @@
 int vcl_worker_set_bapi (void);
 void vcl_worker_share_sessions (vcl_worker_t * parent_wrk);
 int vcl_worker_unshare_session (vcl_worker_t * wrk, vcl_session_t * s);
+vcl_shared_session_t *vcl_shared_session_get (u32 ss_index);
 int vcl_session_get_refcnt (vcl_session_t * s);
 
 void vcl_segment_table_add (u64 segment_handle, u32 svm_segment_index);
@@ -543,6 +548,17 @@
   return vcl_worker_get (vcl_get_worker_index ());
 }
 
+static inline svm_msg_q_t *
+vcl_session_vpp_evt_q (vcl_worker_t * wrk, vcl_session_t * s)
+{
+  if (vcl_session_is_ct (s))
+    return wrk->vpp_event_queues[0];
+  else
+    return wrk->vpp_event_queues[s->vpp_thread_index];
+}
+
+void vcl_send_session_worker_update (vcl_worker_t * wrk, vcl_session_t * s,
+				     u32 wrk_index);
 /*
  * VCL Binary API
  */
@@ -556,12 +572,10 @@
 void vppcom_send_bind_sock (vcl_session_t * session);
 void vppcom_send_unbind_sock (u64 vpp_handle);
 void vppcom_api_hookup (void);
-void vppcom_send_accept_session_reply (u64 vpp_handle, u32 context, int rv);
 void vppcom_send_application_tls_cert_add (vcl_session_t * session,
 					   char *cert, u32 cert_len);
-void
-vppcom_send_application_tls_key_add (vcl_session_t * session, char *key,
-				     u32 key_len);
+void vppcom_send_application_tls_key_add (vcl_session_t * session, char *key,
+					  u32 key_len);
 void vcl_send_app_worker_add_del (u8 is_add);
 void vcl_send_child_worker_del (vcl_worker_t * wrk);
 
diff --git a/src/vcl/vppcom.c b/src/vcl/vppcom.c
index 70afdce..6a1bf1c 100644
--- a/src/vcl/vppcom.c
+++ b/src/vcl/vppcom.c
@@ -44,6 +44,22 @@
   return 1;
 }
 
+static inline int
+vcl_mq_dequeue_batch (vcl_worker_t * wrk, svm_msg_q_t * mq)
+{
+  svm_msg_q_msg_t *msg;
+  u32 n_msgs;
+  int i;
+
+  n_msgs = svm_msg_q_size (mq);
+  for (i = 0; i < n_msgs; i++)
+    {
+      vec_add2 (wrk->mq_msg_vector, msg, 1);
+      svm_msg_q_sub_w_lock (mq, msg);
+    }
+  return n_msgs;
+}
+
 const char *
 vppcom_session_state_str (session_state_t state)
 {
@@ -175,15 +191,6 @@
  */
 
 
-static svm_msg_q_t *
-vcl_session_vpp_evt_q (vcl_worker_t * wrk, vcl_session_t * s)
-{
-  if (vcl_session_is_ct (s))
-    return wrk->vpp_event_queues[0];
-  else
-    return wrk->vpp_event_queues[s->vpp_thread_index];
-}
-
 static void
 vcl_send_session_accepted_reply (svm_msg_q_t * mq, u32 context,
 				 session_handle_t handle, int retval)
@@ -227,6 +234,24 @@
   app_send_ctrl_evt_to_vpp (mq, app_evt);
 }
 
+void
+vcl_send_session_worker_update (vcl_worker_t * wrk, vcl_session_t * s,
+				u32 wrk_index)
+{
+  app_session_evt_t _app_evt, *app_evt = &_app_evt;
+  session_worker_update_msg_t *mp;
+  svm_msg_q_t *mq;
+
+  mq = vcl_session_vpp_evt_q (wrk, s);
+  app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_WORKER_UPDATE);
+  mp = (session_worker_update_msg_t *) app_evt->evt->data;
+  mp->client_index = wrk->my_client_index;
+  mp->handle = s->vpp_handle;
+  mp->req_wrk_index = wrk->vpp_wrk_index;
+  mp->wrk_index = wrk_index;
+  app_send_ctrl_evt_to_vpp (mq, app_evt);
+}
+
 static u32
 vcl_session_accepted_handler (vcl_worker_t * wrk, session_accepted_msg_t * mp)
 {
@@ -540,7 +565,6 @@
   /* Caught a disconnect before actually accepting the session */
   if (session->session_state == STATE_LISTEN)
     {
-
       if (!vcl_flag_accepted_session (session, msg->handle,
 				      VCL_ACCEPTED_F_CLOSED))
 	VDBG (0, "session was not accepted!");
@@ -551,6 +575,59 @@
   return session;
 }
 
+static void
+vcl_session_req_worker_update_handler (vcl_worker_t * wrk, void *data)
+{
+  session_req_worker_update_msg_t *msg;
+  vcl_session_t *s;
+
+  msg = (session_req_worker_update_msg_t *) data;
+  s = vcl_session_get_w_vpp_handle (wrk, msg->session_handle);
+  if (!s)
+    return;
+
+  vec_add1 (wrk->pending_session_wrk_updates, s->session_index);
+}
+
+static void
+vcl_session_worker_update_reply_handler (vcl_worker_t * wrk, void *data)
+{
+  session_worker_update_reply_msg_t *msg;
+  vcl_session_t *s;
+
+  msg = (session_worker_update_reply_msg_t *) data;
+  s = vcl_session_get_w_vpp_handle (wrk, msg->handle);
+  if (!s)
+    {
+      VDBG (0, "unknown handle 0x%llx", msg->handle);
+      return;
+    }
+  if (vcl_wait_for_segment (msg->segment_handle))
+    {
+      clib_warning ("segment for session %u couldn't be mounted!",
+		    s->session_index);
+      return;
+    }
+  s->rx_fifo = uword_to_pointer (msg->rx_fifo, svm_fifo_t *);
+  s->tx_fifo = uword_to_pointer (msg->tx_fifo, svm_fifo_t *);
+
+  s->rx_fifo->client_session_index = s->session_index;
+  s->tx_fifo->client_session_index = s->session_index;
+  s->rx_fifo->client_thread_index = wrk->wrk_index;
+  s->tx_fifo->client_thread_index = wrk->wrk_index;
+  s->session_state = STATE_UPDATED;
+
+  if (s->shared_index != VCL_INVALID_SESSION_INDEX)
+    {
+      vcl_shared_session_t *ss;
+      ss = vcl_shared_session_get (s->shared_index);
+      if (vec_len (ss->workers) > 1)
+	VDBG (0, "workers need to be updated");
+    }
+  VDBG (0, "session %u[0x%llx] moved to worker %u", s->session_index,
+	s->vpp_handle, wrk->wrk_index);
+}
+
 static int
 vcl_handle_mq_event (vcl_worker_t * wrk, session_event_t * e)
 {
@@ -587,13 +664,19 @@
     case SESSION_CTRL_EVT_BOUND:
       vcl_session_bound_handler (wrk, (session_bound_msg_t *) e->data);
       break;
+    case SESSION_CTRL_EVT_REQ_WORKER_UPDATE:
+      vcl_session_req_worker_update_handler (wrk, e->data);
+      break;
+    case SESSION_CTRL_EVT_WORKER_UPDATE_REPLY:
+      vcl_session_worker_update_reply_handler (wrk, e->data);
+      break;
     default:
       clib_warning ("unhandled %u", e->event_type);
     }
   return VPPCOM_OK;
 }
 
-static inline int
+static int
 vppcom_wait_for_session_state_change (u32 session_index,
 				      session_state_t state,
 				      f64 wait_for_time)
@@ -638,6 +721,52 @@
   return VPPCOM_ETIMEDOUT;
 }
 
+static void
+vcl_handle_pending_wrk_updates (vcl_worker_t * wrk)
+{
+  session_state_t state;
+  vcl_session_t *s;
+  u32 *sip;
+
+  if (PREDICT_TRUE (vec_len (wrk->pending_session_wrk_updates) == 0))
+    return;
+
+  vec_foreach (sip, wrk->pending_session_wrk_updates)
+  {
+    s = vcl_session_get (wrk, *sip);
+    vcl_send_session_worker_update (wrk, s, wrk->wrk_index);
+    state = s->session_state;
+    vppcom_wait_for_session_state_change (s->session_index, STATE_UPDATED, 5);
+    s->session_state = state;
+  }
+  vec_reset_length (wrk->pending_session_wrk_updates);
+}
+
+static void
+vcl_flush_mq_events (void)
+{
+  vcl_worker_t *wrk = vcl_worker_get_current ();
+  svm_msg_q_msg_t *msg;
+  session_event_t *e;
+  svm_msg_q_t *mq;
+  int i;
+
+  mq = wrk->app_event_queue;
+  svm_msg_q_lock (mq);
+  vcl_mq_dequeue_batch (wrk, mq);
+  svm_msg_q_unlock (mq);
+
+  for (i = 0; i < vec_len (wrk->mq_msg_vector); i++)
+    {
+      msg = vec_elt_at_index (wrk->mq_msg_vector, i);
+      e = svm_msg_q_msg_data (mq, msg);
+      vcl_handle_mq_event (wrk, e);
+      svm_msg_q_free_msg (mq, msg);
+    }
+  vec_reset_length (wrk->mq_msg_vector);
+  vcl_handle_pending_wrk_updates (wrk);
+}
+
 static int
 vppcom_app_session_enable (void)
 {
@@ -845,13 +974,14 @@
 vcl_app_pre_fork (void)
 {
   vcl_incercept_sigchld ();
+  vcl_flush_mq_events ();
 }
 
 static void
 vcl_app_fork_child_handler (void)
 {
+  vcl_worker_t *parent_wrk, *wrk;
   int rv, parent_wrk_index;
-  vcl_worker_t *parent_wrk;
   u8 *child_name;
 
   parent_wrk_index = vcl_get_worker_index ();
@@ -884,6 +1014,8 @@
    */
   vcl_worker_register_with_vpp ();
   parent_wrk = vcl_worker_get (parent_wrk_index);
+  wrk = vcl_worker_get_current ();
+  wrk->vpp_event_queues = vec_dup (parent_wrk->vpp_event_queues);
   vcl_worker_share_sessions (parent_wrk);
   parent_wrk->forked_child = vcl_get_worker_index ();
 
@@ -1097,7 +1229,11 @@
 	}
 
       if (!do_disconnect)
-	goto cleanup;
+	{
+	  VDBG (0, "session handle %u [0x%llx] disconnect skipped",
+		session_handle, vpp_handle);
+	  goto cleanup;
+	}
 
       if (state & STATE_LISTEN)
 	{
@@ -1143,10 +1279,7 @@
       vcl_ct_registration_unlock (wrk);
     }
 
-  if (vpp_handle != ~0)
-    {
-      vcl_session_table_del_vpp_handle (wrk, vpp_handle);
-    }
+  vcl_session_table_del_vpp_handle (wrk, vpp_handle);
   vcl_session_free (wrk, session);
 
   VDBG (0, "session handle %u [0x%llx] removed", session_handle, vpp_handle);
@@ -1948,22 +2081,6 @@
   return svm_fifo_max_enqueue (session->tx_fifo);
 }
 
-static inline int
-vcl_mq_dequeue_batch (vcl_worker_t * wrk, svm_msg_q_t * mq)
-{
-  svm_msg_q_msg_t *msg;
-  u32 n_msgs;
-  int i;
-
-  n_msgs = svm_msg_q_size (mq);
-  for (i = 0; i < n_msgs; i++)
-    {
-      vec_add2 (wrk->mq_msg_vector, msg, 1);
-      svm_msg_q_sub_w_lock (mq, msg);
-    }
-  return n_msgs;
-}
-
 #define vcl_fifo_rx_evt_valid_or_break(_fifo)			\
 if (PREDICT_FALSE (svm_fifo_is_empty (_fifo)))			\
   {								\
@@ -2067,6 +2184,12 @@
 	  *bits_set += 1;
 	}
       break;
+    case SESSION_CTRL_EVT_WORKER_UPDATE_REPLY:
+      vcl_session_worker_update_reply_handler (wrk, e->data);
+      break;
+    case SESSION_CTRL_EVT_REQ_WORKER_UPDATE:
+      vcl_session_req_worker_update_handler (wrk, e->data);
+      break;
     default:
       clib_warning ("unhandled: %u", e->event_type);
       break;
@@ -2122,6 +2245,7 @@
       svm_msg_q_free_msg (mq, msg);
     }
   vec_reset_length (wrk->mq_msg_vector);
+  vcl_handle_pending_wrk_updates (wrk);
   return *bits_set;
 }
 
@@ -2676,6 +2800,12 @@
       session_evt_data = session->vep.ev.data.u64;
       session_events = session->vep.ev.events;
       break;
+    case SESSION_CTRL_EVT_REQ_WORKER_UPDATE:
+      vcl_session_req_worker_update_handler (wrk, e->data);
+      break;
+    case SESSION_CTRL_EVT_WORKER_UPDATE_REPLY:
+      vcl_session_worker_update_reply_handler (wrk, e->data);
+      break;
     default:
       VDBG (0, "unhandled: %u", e->event_type);
       break;
@@ -2741,7 +2871,7 @@
       svm_msg_q_free_msg (mq, msg);
     }
   vec_reset_length (wrk->mq_msg_vector);
-
+  vcl_handle_pending_wrk_updates (wrk);
   return *num_ev;
 }
 
@@ -3580,12 +3710,18 @@
 }
 
 int
-vppcom_session_index (uint32_t session_handle)
+vppcom_session_index (vcl_session_handle_t session_handle)
 {
   return session_handle & 0xFFFFFF;
 }
 
 int
+vppcom_session_worker (vcl_session_handle_t session_handle)
+{
+  return session_handle >> 24;
+}
+
+int
 vppcom_session_handle (uint32_t session_index)
 {
   return (vcl_get_worker_index () << 24) | session_index;
diff --git a/src/vcl/vppcom.h b/src/vcl/vppcom.h
index 641946b..f2fca09 100644
--- a/src/vcl/vppcom.h
+++ b/src/vcl/vppcom.h
@@ -97,6 +97,8 @@
   uint16_t port;
 } vppcom_endpt_t;
 
+typedef uint32_t vcl_session_handle_t;
+
 typedef enum
 {
   VPPCOM_OK = 0,
@@ -277,7 +279,8 @@
 extern int vppcom_poll (vcl_poll_t * vp, uint32_t n_sids,
 			double wait_for_time);
 extern int vppcom_mq_epoll_fd (void);
-extern int vppcom_session_index (uint32_t session_handle);
+extern int vppcom_session_index (vcl_session_handle_t session_handle);
+extern int vppcom_session_worker (vcl_session_handle_t session_handle);
 extern int vppcom_session_handle (uint32_t session_index);
 
 extern int vppcom_session_read_segments (uint32_t session_handle,