vcl/session: add api for changing session app worker

In case of multi process apps, after forking, the parent may decide to
close part or all of the sessions it shares with the child. Because the
sessions have fifos allocated in the parent's segment manager, they must
be moved to the child's segment manager.

Change-Id: I85b4c8c8545005724023ee14043647719cef61dd
Signed-off-by: Florin Coras <fcoras@cisco.com>
diff --git a/src/vnet/session/application.c b/src/vnet/session/application.c
index 19c8fa2..85b5f93 100644
--- a/src/vnet/session/application.c
+++ b/src/vnet/session/application.c
@@ -724,6 +724,48 @@
   return 0;
 }
 
+int
+app_worker_own_session (app_worker_t * app_wrk, stream_session_t * s)
+{
+  segment_manager_t *sm;
+  svm_fifo_t *rxf, *txf;
+
+  s->app_wrk_index = app_wrk->wrk_index;
+
+  rxf = s->server_rx_fifo;
+  txf = s->server_tx_fifo;
+
+  if (!rxf || !txf)
+    return 0;
+
+  s->server_rx_fifo = 0;
+  s->server_tx_fifo = 0;
+
+  sm = app_worker_get_or_alloc_connect_segment_manager (app_wrk);
+  if (session_alloc_fifos (sm, s))
+    return -1;
+
+  if (!svm_fifo_is_empty (rxf))
+    {
+      clib_memcpy_fast (s->server_rx_fifo->data, rxf->data, rxf->nitems);
+      s->server_rx_fifo->head = rxf->head;
+      s->server_rx_fifo->tail = rxf->tail;
+      s->server_rx_fifo->cursize = rxf->cursize;
+    }
+
+  if (!svm_fifo_is_empty (txf))
+    {
+      clib_memcpy_fast (s->server_tx_fifo->data, txf->data, txf->nitems);
+      s->server_tx_fifo->head = txf->head;
+      s->server_tx_fifo->tail = txf->tail;
+      s->server_tx_fifo->cursize = txf->cursize;
+    }
+
+  segment_manager_dealloc_fifos (rxf->segment_index, rxf, txf);
+
+  return 0;
+}
+
 /**
  * Start listening local transport endpoint for requested transport.
  *
@@ -890,6 +932,14 @@
 }
 
 segment_manager_t *
+app_worker_get_or_alloc_connect_segment_manager (app_worker_t * app_wrk)
+{
+  if (app_wrk->connects_seg_manager == (u32) ~ 0)
+    app_worker_alloc_connects_segment_manager (app_wrk);
+  return segment_manager_get (app_wrk->connects_seg_manager);
+}
+
+segment_manager_t *
 app_worker_get_listen_segment_manager (app_worker_t * app,
 				       stream_session_t * listener)
 {
diff --git a/src/vnet/session/application.h b/src/vnet/session/application.h
index e33f2ff..1d2064d 100644
--- a/src/vnet/session/application.h
+++ b/src/vnet/session/application.h
@@ -225,12 +225,15 @@
 app_worker_t *app_worker_get (u32 wrk_index);
 app_worker_t *app_worker_get_if_valid (u32 wrk_index);
 application_t *app_worker_get_app (u32 wrk_index);
+int app_worker_own_session (app_worker_t * app_wrk, stream_session_t * s);
 void app_worker_free (app_worker_t * app_wrk);
 int app_worker_open_session (app_worker_t * app, session_endpoint_t * tep,
 			     u32 api_context);
 segment_manager_t *app_worker_get_listen_segment_manager (app_worker_t *,
 							  stream_session_t *);
 segment_manager_t *app_worker_get_connect_segment_manager (app_worker_t *);
+segment_manager_t
+  * app_worker_get_or_alloc_connect_segment_manager (app_worker_t *);
 int app_worker_alloc_connects_segment_manager (app_worker_t * app);
 int app_worker_add_segment_notify (u32 app_or_wrk, u64 segment_handle);
 u32 app_worker_n_listeners (app_worker_t * app);
diff --git a/src/vnet/session/application_interface.h b/src/vnet/session/application_interface.h
index a156c82..9c48faa 100644
--- a/src/vnet/session/application_interface.h
+++ b/src/vnet/session/application_interface.h
@@ -220,9 +220,9 @@
   u8 lcl_is_ip4;
   u8 lcl_ip[16];
   u16 lcl_port;
-  u64 rx_fifo;
-  u64 tx_fifo;
-  u64 vpp_evt_q;
+  uword rx_fifo;
+  uword tx_fifo;
+  uword vpp_evt_q;
   u32 segment_size;
   u8 segment_name_length;
   u8 segment_name[128];
@@ -233,12 +233,12 @@
   u32 context;
   u64 listener_handle;
   u64 handle;
-  u64 server_rx_fifo;
-  u64 server_tx_fifo;
+  uword server_rx_fifo;
+  uword server_tx_fifo;
   u64 segment_handle;
-  u64 vpp_event_queue_address;
-  u64 server_event_queue_address;
-  u64 client_event_queue_address;
+  uword vpp_event_queue_address;
+  uword server_event_queue_address;
+  uword client_event_queue_address;
   u16 port;
   u8 is_ip4;
   u8 ip[16];
@@ -260,12 +260,12 @@
   u32 context;
   i32 retval;
   u64 handle;
-  u64 server_rx_fifo;
-  u64 server_tx_fifo;
+  uword server_rx_fifo;
+  uword server_tx_fifo;
   u64 segment_handle;
-  u64 vpp_event_queue_address;
-  u64 client_event_queue_address;
-  u64 server_event_queue_address;
+  uword vpp_event_queue_address;
+  uword client_event_queue_address;
+  uword server_event_queue_address;
   u32 segment_size;
   u8 segment_name_length;
   u8 segment_name[64];
@@ -302,6 +302,28 @@
   u64 handle;
 } __clib_packed session_reset_reply_msg_t;
 
+typedef struct session_req_worker_update_msg_
+{
+  u64 session_handle;
+} __clib_packed session_req_worker_update_msg_t;
+
+/* NOTE: using u16 for wrk indices because message needs to fit in 18B */
+typedef struct session_worker_update_msg_
+{
+  u32 client_index;
+  u16 wrk_index;
+  u16 req_wrk_index;
+  u64 handle;
+} __clib_packed session_worker_update_msg_t;
+
+typedef struct session_worker_update_reply_msg_
+{
+  u64 handle;
+  uword rx_fifo;
+  uword tx_fifo;
+  u64 segment_handle;
+} __clib_packed session_worker_update_reply_msg_t;
+
 typedef struct app_session_event_
 {
   svm_msg_q_msg_t msg;
diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h
index e3c7300..cf1b3e9 100644
--- a/src/vnet/session/session.h
+++ b/src/vnet/session/session.h
@@ -49,7 +49,10 @@
   SESSION_CTRL_EVT_DISCONNECTED,
   SESSION_CTRL_EVT_DISCONNECTED_REPLY,
   SESSION_CTRL_EVT_RESET,
-  SESSION_CTRL_EVT_RESET_REPLY
+  SESSION_CTRL_EVT_RESET_REPLY,
+  SESSION_CTRL_EVT_REQ_WORKER_UPDATE,
+  SESSION_CTRL_EVT_WORKER_UPDATE,
+  SESSION_CTRL_EVT_WORKER_UPDATE_REPLY,
 } session_evt_type_t;
 
 static inline const char *
diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c
index 98965f3..880f163 100644
--- a/src/vnet/session/session_node.c
+++ b/src/vnet/session/session_node.c
@@ -173,7 +173,7 @@
   svm_msg_q_unlock (app_wrk->event_queue);
   evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
   clib_memset (evt, 0, sizeof (*evt));
-  evt->event_type = SESSION_CTRL_EVT_DISCONNECTED;
+  evt->event_type = SESSION_CTRL_EVT_DISCONNECTED_REPLY;
   rmp = (session_disconnected_reply_msg_t *) evt->data;
   rmp->handle = mp->handle;
   rmp->context = mp->context;
@@ -207,6 +207,86 @@
     }
 }
 
+static void
+session_mq_worker_update_handler (void *data)
+{
+  session_worker_update_msg_t *mp = (session_worker_update_msg_t *) data;
+  session_worker_update_reply_msg_t *rmp;
+  svm_msg_q_msg_t _msg, *msg = &_msg;
+  app_worker_t *app_wrk;
+  u32 owner_app_wrk_map;
+  session_event_t *evt;
+  stream_session_t *s;
+  application_t *app;
+
+  app = application_lookup (mp->client_index);
+  if (!app)
+    return;
+  if (!(s = session_get_from_handle_if_valid (mp->handle)))
+    {
+      clib_warning ("invalid handle %llu", mp->handle);
+      return;
+    }
+  app_wrk = app_worker_get (s->app_wrk_index);
+  if (app_wrk->app_index != app->app_index)
+    {
+      clib_warning ("app %u does not own session %llu", app->app_index,
+		    mp->handle);
+      return;
+    }
+  owner_app_wrk_map = app_wrk->wrk_map_index;
+  app_wrk = application_get_worker (app, mp->wrk_index);
+
+  /* This needs to come from the new owner */
+  if (mp->req_wrk_index == owner_app_wrk_map)
+    {
+      session_req_worker_update_msg_t *wump;
+
+      svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
+					   SESSION_MQ_CTRL_EVT_RING,
+					   SVM_Q_WAIT, msg);
+      svm_msg_q_unlock (app_wrk->event_queue);
+      evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
+      clib_memset (evt, 0, sizeof (*evt));
+      evt->event_type = SESSION_CTRL_EVT_REQ_WORKER_UPDATE;
+      wump = (session_req_worker_update_msg_t *) evt->data;
+      wump->session_handle = mp->handle;
+      svm_msg_q_add (app_wrk->event_queue, msg, SVM_Q_WAIT);
+      return;
+    }
+
+  app_worker_own_session (app_wrk, s);
+
+  /*
+   * Send reply
+   */
+  svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
+				       SESSION_MQ_CTRL_EVT_RING,
+				       SVM_Q_WAIT, msg);
+  svm_msg_q_unlock (app_wrk->event_queue);
+  evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
+  clib_memset (evt, 0, sizeof (*evt));
+  evt->event_type = SESSION_CTRL_EVT_WORKER_UPDATE_REPLY;
+  rmp = (session_worker_update_reply_msg_t *) evt->data;
+  rmp->handle = mp->handle;
+  rmp->rx_fifo = pointer_to_uword (s->server_rx_fifo);
+  rmp->tx_fifo = pointer_to_uword (s->server_tx_fifo);
+  rmp->segment_handle = session_segment_handle (s);
+  svm_msg_q_add (app_wrk->event_queue, msg, SVM_Q_WAIT);
+
+  /*
+   * Retransmit messages that may have been lost
+   */
+  if (!svm_fifo_is_empty (s->server_tx_fifo))
+    session_send_io_evt_to_thread (s->server_tx_fifo, FIFO_EVENT_APP_TX);
+
+  if (!svm_fifo_is_empty (s->server_rx_fifo))
+    app_worker_lock_and_send_event (app_wrk, s, FIFO_EVENT_APP_RX);
+
+  if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
+    app->cb_fns.session_disconnect_callback (s);
+}
+
 vlib_node_registration_t session_queue_node;
 
 typedef struct
@@ -936,6 +1016,9 @@
 	case SESSION_CTRL_EVT_RESET_REPLY:
 	  session_mq_reset_reply_handler (e->data);
 	  break;
+	case SESSION_CTRL_EVT_WORKER_UPDATE:
+	  session_mq_worker_update_handler (e->data);
+	  break;
 	default:
 	  clib_warning ("unhandled event type %d", e->event_type);
 	}