session: lock app worker mq for io events

Also fixes vcl client/server stats and closing procedure.

Change-Id: I7d5a274ea0a3c8ea13062bf61bf402248dfe1a19
Signed-off-by: Florin Coras <fcoras@cisco.com>
diff --git a/src/vcl/vcl_test_client.c b/src/vcl/vcl_test_client.c
index b2431df..899d729 100644
--- a/src/vcl/vcl_test_client.c
+++ b/src/vcl/vcl_test_client.c
@@ -233,7 +233,6 @@
 	  vtwrn ("failed to register worker");
 	  return -1;
 	}
-
       vt_atomic_add (&vcm->active_workers, 1);
     }
   rv = vtc_connect_test_sessions (wrk);
@@ -259,6 +258,64 @@
   return 0;
 }
 
+static int stats_lock = 0;
+
+static void
+vtc_accumulate_stats (vcl_test_client_worker_t * wrk,
+		      sock_test_socket_t * ctrl)
+{
+  sock_test_socket_t *tsock;
+  static char buf[64];
+  int i, show_rx = 0;
+
+  while (__sync_lock_test_and_set (&stats_lock, 1))
+    ;
+
+  if (ctrl->cfg.test == SOCK_TEST_TYPE_BI
+      || ctrl->cfg.test == SOCK_TEST_TYPE_ECHO)
+    show_rx = 1;
+
+  for (i = 0; i < wrk->cfg.num_test_sockets; i++)
+    {
+      tsock = &wrk->sessions[i];
+      tsock->stats.start = ctrl->stats.start;
+
+      if (ctrl->cfg.verbose > 1)
+	{
+	  sprintf (buf, "CLIENT (fd %d) RESULTS", tsock->fd);
+	  sock_test_stats_dump (buf, &tsock->stats, show_rx, 1 /* show tx */ ,
+				ctrl->cfg.verbose);
+	}
+
+      sock_test_stats_accumulate (&ctrl->stats, &tsock->stats);
+    }
+
+  __sync_lock_release (&stats_lock);
+}
+
+static void
+vtc_worker_sessions_exit (vcl_test_client_worker_t * wrk)
+{
+  vcl_test_client_main_t *vcm = &vcl_client_main;
+  sock_test_socket_t *ctrl = &vcm->ctrl_socket;
+  sock_test_socket_t *tsock;
+  int i, verbose = ctrl->cfg.verbose;
+
+  for (i = 0; i < wrk->cfg.num_test_sockets; i++)
+    {
+      tsock = &wrk->sessions[i];
+      tsock->cfg.test = SOCK_TEST_TYPE_EXIT;
+
+      if (verbose)
+	{
+	  vtinf ("(fd %d): Sending exit cfg to server...", tsock->fd);
+	  sock_test_cfg_dump (&tsock->cfg, 1 /* is_client */ );
+	}
+      (void) vcl_test_write (tsock->fd, (uint8_t *) & tsock->cfg,
+			     sizeof (tsock->cfg), &tsock->stats, verbose);
+    }
+}
+
 static void *
 vtc_worker_loop (void *arg)
 {
@@ -338,40 +395,16 @@
 	}
     }
 exit:
+  vtinf ("Worker %d done ...", wrk->wrk_index);
+  vtc_accumulate_stats (wrk, ctrl);
+  sleep (1);
+  vtc_worker_sessions_exit (wrk);
   if (wrk->wrk_index)
     vt_atomic_add (&vcm->active_workers, -1);
   return 0;
 }
 
 static void
-vtc_accumulate_stats (vcl_test_client_worker_t * wrk,
-		      sock_test_socket_t * ctrl)
-{
-  sock_test_socket_t *tsock;
-  static char buf[64];
-  int i, show_rx = 0;
-
-  if (ctrl->cfg.test == SOCK_TEST_TYPE_BI
-      || ctrl->cfg.test == SOCK_TEST_TYPE_ECHO)
-    show_rx = 1;
-
-  for (i = 0; i < wrk->cfg.num_test_sockets; i++)
-    {
-      tsock = &wrk->sessions[i];
-      tsock->stats.start = ctrl->stats.start;
-
-      if (ctrl->cfg.verbose > 1)
-	{
-	  sprintf (buf, "CLIENT (fd %d) RESULTS", tsock->fd);
-	  sock_test_stats_dump (buf, &tsock->stats, show_rx, 1 /* show tx */ ,
-				ctrl->cfg.verbose);
-	}
-
-      sock_test_stats_accumulate (&ctrl->stats, &tsock->stats);
-    }
-}
-
-static void
 vtc_print_stats (sock_test_socket_t * ctrl)
 {
   int is_echo = ctrl->cfg.test == SOCK_TEST_TYPE_ECHO;
@@ -488,8 +521,6 @@
       return;
     }
 
-  for (i = 0; i < vcm->n_workers; i++)
-    vtc_accumulate_stats (&vcm->workers[i], ctrl);
   vtc_print_stats (ctrl);
 
   ctrl->cfg.test = SOCK_TEST_TYPE_ECHO;
@@ -499,42 +530,6 @@
 }
 
 static void
-vtc_client_exit (void)
-{
-  vcl_test_client_main_t *vcm = &vcl_client_main;
-  vcl_test_client_worker_t *wrk = &vcm->workers[0];
-  sock_test_socket_t *ctrl = &vcm->ctrl_socket;
-  sock_test_socket_t *tsock;
-  int i, verbose;
-
-  verbose = ctrl->cfg.verbose;
-  for (i = 0; i < wrk->cfg.num_test_sockets; i++)
-    {
-      tsock = &wrk->sessions[i];
-      tsock->cfg.test = SOCK_TEST_TYPE_EXIT;
-
-      if (verbose)
-	{
-	  vtinf ("(fd %d): Sending exit cfg to server...", tsock->fd);
-	  sock_test_cfg_dump (&tsock->cfg, 1 /* is_client */ );
-	}
-      (void) vcl_test_write (tsock->fd, (uint8_t *) & tsock->cfg,
-			     sizeof (tsock->cfg), &tsock->stats, verbose);
-    }
-
-  ctrl->cfg.test = SOCK_TEST_TYPE_EXIT;
-  if (verbose)
-    {
-      vtinf ("(fd %d): Sending exit cfg to server...", ctrl->fd);
-      sock_test_cfg_dump (&ctrl->cfg, 1 /* is_client */ );
-    }
-  (void) vcl_test_write (ctrl->fd, (uint8_t *) & ctrl->cfg,
-			 sizeof (ctrl->cfg), &ctrl->stats, verbose);
-  vtinf ("So long and thanks for all the fish!\n\n");
-  sleep (1);
-}
-
-static void
 dump_help (void)
 {
 #define INDENT "\n  "
@@ -954,6 +949,25 @@
     }
 }
 
+static void
+vtc_ctrl_session_exit (void)
+{
+  vcl_test_client_main_t *vcm = &vcl_client_main;
+  sock_test_socket_t *ctrl = &vcm->ctrl_socket;
+  int verbose = ctrl->cfg.verbose;
+
+  ctrl->cfg.test = SOCK_TEST_TYPE_EXIT;
+  if (verbose)
+    {
+      vtinf ("(fd %d): Sending exit cfg to server...", ctrl->fd);
+      sock_test_cfg_dump (&ctrl->cfg, 1 /* is_client */ );
+    }
+  (void) vcl_test_write (ctrl->fd, (uint8_t *) & ctrl->cfg,
+			 sizeof (ctrl->cfg), &ctrl->stats, verbose);
+  vtinf ("So long and thanks for all the fish!\n\n");
+  sleep (1);
+}
+
 int
 main (int argc, char **argv)
 {
@@ -1046,7 +1060,7 @@
       vtc_read_user_input (ctrl);
     }
 
-  vtc_client_exit ();
+  vtc_ctrl_session_exit ();
   vppcom_session_close (ctrl->fd);
   vppcom_app_destroy ();
   free (vcm->workers);
diff --git a/src/vcl/vcl_test_server.c b/src/vcl/vcl_test_server.c
index 2fdd7ec..5c8656c 100644
--- a/src/vcl/vcl_test_server.c
+++ b/src/vcl/vcl_test_server.c
@@ -497,6 +497,7 @@
   if (rv < 0)
     vtfail ("vppcom_epoll_ctl", rv);
 
+  ssm->active_workers += 1;
   vtinf ("Waiting for a client to connect on port %d ...", ssm->cfg.port);
 }
 
@@ -532,6 +533,12 @@
 	  if (wrk->wait_events[i].events & (EPOLLHUP | EPOLLRDHUP))
 	    {
 	      vppcom_session_close (conn->fd);
+	      wrk->nfds -= 1;
+	      if (!wrk->nfds)
+		{
+		  vtinf ("All client connections closed\n");
+		  goto done;
+		}
 	      continue;
 	    }
 	  if (wrk->wait_events[i].data.u32 == ~0)
@@ -542,6 +549,7 @@
 
 	  if (EPOLLIN & wrk->wait_events[i].events)
 	    {
+	    read_again:
 	      rx_bytes = vcl_test_read (conn->fd, conn->buf,
 					conn->buf_size, &conn->stats);
 
@@ -563,7 +571,6 @@
 		  if (!wrk->nfds)
 		    {
 		      vtinf ("All client connections closed\n");
-		      vtinf ("May the force be with you!\n");
 		      goto done;
 		    }
 		  continue;
@@ -572,6 +579,9 @@
 		       || (conn->cfg.test == SOCK_TEST_TYPE_BI))
 		{
 		  vts_server_rx (conn, rx_bytes);
+		  if (vppcom_session_attr (conn->fd, VPPCOM_ATTR_GET_NREAD, 0,
+					   0) > 0)
+		    goto read_again;
 		  continue;
 		}
 	      else if (isascii (conn->buf[0]))
diff --git a/src/vcl/vppcom.c b/src/vcl/vppcom.c
index 6455508..3acd1c4 100644
--- a/src/vcl/vppcom.c
+++ b/src/vcl/vppcom.c
@@ -285,6 +285,8 @@
       vcl_wait_for_memory (session->vpp_evt_q);
       rx_fifo->master_session_index = session->session_index;
       tx_fifo->master_session_index = session->session_index;
+      rx_fifo->master_thread_index = vcl_get_worker_index ();
+      tx_fifo->master_thread_index = vcl_get_worker_index ();
       vec_validate (wrk->vpp_event_queues, 0);
       evt_q = uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *);
       wrk->vpp_event_queues[0] = evt_q;
@@ -295,7 +297,8 @@
 					     svm_msg_q_t *);
       rx_fifo->client_session_index = session->session_index;
       tx_fifo->client_session_index = session->session_index;
-
+      rx_fifo->client_thread_index = vcl_get_worker_index ();
+      tx_fifo->client_thread_index = vcl_get_worker_index ();
       vpp_wrk_index = tx_fifo->master_thread_index;
       vec_validate (wrk->vpp_event_queues, vpp_wrk_index);
       wrk->vpp_event_queues[vpp_wrk_index] = session->vpp_evt_q;
@@ -349,8 +352,7 @@
   if (mp->retval)
     {
       clib_warning ("VCL<%d>: ERROR: sid %u: connect failed! %U", getpid (),
-		    mp->handle, session_index, format_api_error,
-		    ntohl (mp->retval));
+		    session_index, format_api_error, ntohl (mp->retval));
       session->session_state = STATE_FAILED;
       session->vpp_handle = mp->handle;
       return session_index;
@@ -361,6 +363,8 @@
   vcl_wait_for_memory (rx_fifo);
   rx_fifo->client_session_index = session_index;
   tx_fifo->client_session_index = session_index;
+  rx_fifo->client_thread_index = vcl_get_worker_index ();
+  tx_fifo->client_thread_index = vcl_get_worker_index ();
 
   if (mp->client_event_queue_address)
     {
@@ -648,6 +652,9 @@
   u64 vpp_handle;
 
   session = vcl_session_get_w_handle (wrk, session_handle);
+  if (!session)
+    return VPPCOM_EBADFD;
+
   vpp_handle = session->vpp_handle;
   state = session->session_state;
 
@@ -2233,6 +2240,7 @@
       switch (e->event_type)
 	{
 	case FIFO_EVENT_APP_RX:
+	  ASSERT (e->fifo->client_thread_index == vcl_get_worker_index ());
 	  vcl_fifo_rx_evt_valid_or_break (e->fifo);
 	  sid = e->fifo->client_session_index;
 	  session = vcl_session_get (wrk, sid);