TCP/session improvements

- Added svm fifo flag for tracking fifo dequeue events (replaces event
  length). Updated all code to switch to the new scheme.
- More session debugging
- Fix peek index wrap
- Add a trivial socket test client
- Fast retransmit/cc fixes
- tx and rx SACK fixes and unit testing
- SRTT computation fix
- remove dupack/ack burst filters
- improve ack rx
- improved segment rx
- builtin client test code

Change-Id: Ic4eb2d5ca446eb2260ccd3ccbcdaa73c64e7f4e1
Signed-off-by: Florin Coras <fcoras@cisco.com>
Signed-off-by: Dave Barach <dbarach@cisco.com>
diff --git a/src/uri/uri_socket_test.c b/src/uri/uri_socket_test.c
new file mode 100644
index 0000000..9f049bd
--- /dev/null
+++ b/src/uri/uri_socket_test.c
@@ -0,0 +1,126 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdio.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netdb.h>
+#include <vppinfra/format.h>
+
+int
+main (int argc, char *argv[])
+{
+  int sockfd, portno, n;
+  struct sockaddr_in serv_addr;
+  struct hostent *server;
+  u8 *rx_buffer = 0, *tx_buffer = 0;
+  u32 offset;
+  int iter, i;
+  if (0 && argc < 3)
+    {
+      fformat (stderr, "usage %s hostname port\n", argv[0]);
+      exit (0);
+    }
+
+  portno = 1234;		// atoi(argv[2]);
+  sockfd = socket (AF_INET, SOCK_STREAM, 0);
+  if (sockfd < 0)
+    {
+      clib_unix_error ("socket");
+      exit (1);
+    }
+  server = gethostbyname ("6.0.1.1" /* argv[1] */ );
+  if (server == NULL)
+    {
+      clib_unix_warning ("gethostbyname");
+      exit (1);
+    }
+  bzero ((char *) &serv_addr, sizeof (serv_addr));
+  serv_addr.sin_family = AF_INET;
+  bcopy ((char *) server->h_addr,
+	 (char *) &serv_addr.sin_addr.s_addr, server->h_length);
+  serv_addr.sin_port = htons (portno);
+  if (connect (sockfd, (const void *) &serv_addr, sizeof (serv_addr)) < 0)
+    {
+      clib_unix_warning ("connect");
+      exit (1);
+    }
+
+  vec_validate (rx_buffer, 1400);
+  vec_validate (tx_buffer, 1400);
+
+  for (i = 0; i < vec_len (tx_buffer); i++)
+    tx_buffer[i] = (i + 1) % 0xff;
+
+  /*
+   * Send one packet to warm up the RX pipeline
+   */
+  n = send (sockfd, tx_buffer, vec_len (tx_buffer), 0 /* flags */ );
+  if (n != vec_len (tx_buffer))
+    {
+      clib_unix_warning ("write");
+      exit (0);
+    }
+
+  for (iter = 0; iter < 100000; iter++)
+    {
+      if (iter < 99999)
+	{
+	  n = send (sockfd, tx_buffer, vec_len (tx_buffer), 0 /* flags */ );
+	  if (n != vec_len (tx_buffer))
+	    {
+	      clib_unix_warning ("write");
+	      exit (0);
+	    }
+	}
+      offset = 0;
+
+      do
+	{
+	  n = recv (sockfd, rx_buffer + offset,
+		    vec_len (rx_buffer) - offset, 0 /* flags */ );
+	  if (n < 0)
+	    {
+	      clib_unix_warning ("read");
+	      exit (0);
+	    }
+	  offset += n;
+	}
+      while (offset < vec_len (rx_buffer));
+
+      for (i = 0; i < vec_len (rx_buffer); i++)
+	{
+	  if (rx_buffer[i] != tx_buffer[i])
+	    {
+	      clib_warning ("[%d] read 0x%x not 0x%x",
+			    rx_buffer[i], tx_buffer[i]);
+	      exit (1);
+	    }
+	}
+
+    }
+  close (sockfd);
+  return 0;
+}
+
+
+/*
+ * fd.io coding-style-patch-verification: ON
+ *
+ * Local Variables:
+ * eval: (c-set-style "gnu")
+ * End:
+ */
diff --git a/src/uri/uri_tcp_test.c b/src/uri/uri_tcp_test.c
index 406a5f4..e283481 100644
--- a/src/uri/uri_tcp_test.c
+++ b/src/uri/uri_tcp_test.c
@@ -116,6 +116,7 @@
   pthread_t client_rx_thread_handle;
   u32 client_bytes_received;
   u8 test_return_packets;
+  u32 bytes_to_send;
 
   /* convenience */
   svm_fifo_segment_main_t *segment_main;
@@ -313,11 +314,16 @@
 
   rx_fifo = e->fifo;
 
-  bytes = e->enqueue_length;
+  bytes = svm_fifo_max_dequeue (rx_fifo);
+  /* Allow enqueuing of new event */
+  svm_fifo_unset_event (rx_fifo);
+
+  /* Read the bytes */
   do
     {
-      n_read = svm_fifo_dequeue_nowait (rx_fifo, 0, vec_len (utm->rx_buf),
-					utm->rx_buf);
+      n_read = svm_fifo_dequeue_nowait (rx_fifo, 0,
+					clib_min (vec_len (utm->rx_buf),
+						  bytes), utm->rx_buf);
       if (n_read > 0)
 	{
 	  bytes -= n_read;
@@ -333,9 +339,17 @@
 	    }
 	  utm->client_bytes_received += n_read;
 	}
+      else
+	{
+	  if (n_read == -2)
+	    {
+	      clib_warning ("weird!");
+	      break;
+	    }
+	}
 
     }
-  while (n_read < 0 || bytes > 0);
+  while (bytes > 0);
 }
 
 void
@@ -479,47 +493,41 @@
     }
 }
 
-void
-client_send_data (uri_tcp_test_main_t * utm)
+static void
+send_test_chunk (uri_tcp_test_main_t * utm, svm_fifo_t * tx_fifo, int mypid,
+		 u32 bytes)
 {
   u8 *test_data = utm->connect_test_data;
   u64 bytes_sent = 0;
-  int rv;
-  int mypid = getpid ();
-  session_t *session;
-  svm_fifo_t *tx_fifo;
-  int buffer_offset, bytes_to_send = 0;
+  int test_buf_offset = 0;
+  u32 bytes_to_snd;
+  u32 queue_max_chunk = 64 << 10, actual_write;
   session_fifo_event_t evt;
   static int serial_number = 0;
-  int i;
-  u32 max_chunk = 64 << 10, write;
+  int rv;
 
-  session = pool_elt_at_index (utm->sessions, utm->connected_session_index);
-  tx_fifo = session->server_tx_fifo;
+  bytes_to_snd = (bytes == 0) ? vec_len (test_data) : bytes;
+  if (bytes_to_snd > vec_len (test_data))
+    bytes_to_snd = vec_len (test_data);
 
-  vec_validate (utm->rx_buf, vec_len (test_data) - 1);
-
-  for (i = 0; i < 1; i++)
+  while (bytes_to_snd > 0)
     {
-      bytes_to_send = vec_len (test_data);
-      buffer_offset = 0;
-      while (bytes_to_send > 0)
+      actual_write =
+	bytes_to_snd > queue_max_chunk ? queue_max_chunk : bytes_to_snd;
+      rv = svm_fifo_enqueue_nowait (tx_fifo, mypid, actual_write,
+				    test_data + test_buf_offset);
+
+      if (rv > 0)
 	{
-	  write = bytes_to_send > max_chunk ? max_chunk : bytes_to_send;
-	  rv = svm_fifo_enqueue_nowait (tx_fifo, mypid, write,
-					test_data + buffer_offset);
+	  bytes_to_snd -= rv;
+	  test_buf_offset += rv;
+	  bytes_sent += rv;
 
-	  if (rv > 0)
+	  if (svm_fifo_set_event (tx_fifo))
 	    {
-	      bytes_to_send -= rv;
-	      buffer_offset += rv;
-	      bytes_sent += rv;
-
 	      /* Fabricate TX event, send to vpp */
 	      evt.fifo = tx_fifo;
 	      evt.event_type = FIFO_EVENT_SERVER_TX;
-	      /* $$$$ for event logging */
-	      evt.enqueue_length = rv;
 	      evt.event_id = serial_number++;
 
 	      unix_shared_memory_queue_add (utm->vpp_event_queue,
@@ -528,13 +536,40 @@
 	    }
 	}
     }
+}
+
+void
+client_send_data (uri_tcp_test_main_t * utm)
+{
+  u8 *test_data = utm->connect_test_data;
+  int mypid = getpid ();
+  session_t *session;
+  svm_fifo_t *tx_fifo;
+  u32 n_iterations, leftover;
+  int i;
+
+  session = pool_elt_at_index (utm->sessions, utm->connected_session_index);
+  tx_fifo = session->server_tx_fifo;
+
+  vec_validate (utm->rx_buf, vec_len (test_data) - 1);
+  n_iterations = utm->bytes_to_send / vec_len (test_data);
+
+  for (i = 0; i < n_iterations; i++)
+    {
+      send_test_chunk (utm, tx_fifo, mypid, 0);
+    }
+
+  leftover = utm->bytes_to_send % vec_len (test_data);
+  if (leftover)
+    send_test_chunk (utm, tx_fifo, mypid, leftover);
 
   if (utm->test_return_packets)
     {
       f64 timeout = clib_time_now (&utm->clib_time) + 2;
 
       /* Wait for the outstanding packets */
-      while (utm->client_bytes_received < vec_len (test_data))
+      while (utm->client_bytes_received <
+	     vec_len (test_data) * n_iterations + leftover)
 	{
 	  if (clib_time_now (&utm->clib_time) > timeout)
 	    {
@@ -542,9 +577,8 @@
 	      break;
 	    }
 	}
-
-      utm->time_to_stop = 1;
     }
+  utm->time_to_stop = 1;
 }
 
 void
@@ -599,6 +633,11 @@
 
   /* Disconnect */
   client_disconnect (utm);
+
+  if (wait_for_state_change (utm, STATE_START))
+    {
+      return;
+    }
 }
 
 static void
@@ -714,7 +753,6 @@
 {
   svm_fifo_t *rx_fifo, *tx_fifo;
   int n_read;
-
   session_fifo_event_t evt;
   unix_shared_memory_queue_t *q;
   int rv, bytes;
@@ -722,34 +760,46 @@
   rx_fifo = e->fifo;
   tx_fifo = utm->sessions[rx_fifo->client_session_index].server_tx_fifo;
 
-  bytes = e->enqueue_length;
+  bytes = svm_fifo_max_dequeue (rx_fifo);
+  /* Allow enqueuing of a new event */
+  svm_fifo_unset_event (rx_fifo);
+
+  if (bytes == 0)
+    return;
+
+  /* Read the bytes */
   do
     {
       n_read = svm_fifo_dequeue_nowait (rx_fifo, 0, vec_len (utm->rx_buf),
 					utm->rx_buf);
+      if (n_read > 0)
+	bytes -= n_read;
+
+      if (utm->drop_packets)
+	continue;
 
       /* Reflect if a non-drop session */
-      if (!utm->drop_packets && n_read > 0)
+      if (n_read > 0)
 	{
 	  do
 	    {
 	      rv = svm_fifo_enqueue_nowait (tx_fifo, 0, n_read, utm->rx_buf);
 	    }
-	  while (rv == -2 && !utm->time_to_stop);
+	  while (rv <= 0 && !utm->time_to_stop);
 
-	  /* Fabricate TX event, send to vpp */
-	  evt.fifo = tx_fifo;
-	  evt.event_type = FIFO_EVENT_SERVER_TX;
-	  /* $$$$ for event logging */
-	  evt.enqueue_length = n_read;
-	  evt.event_id = e->event_id;
-	  q = utm->vpp_event_queue;
-	  unix_shared_memory_queue_add (q, (u8 *) & evt,
-					0 /* do wait for mutex */ );
+	  /* If event wasn't set, add one */
+	  if (svm_fifo_set_event (tx_fifo))
+	    {
+	      /* Fabricate TX event, send to vpp */
+	      evt.fifo = tx_fifo;
+	      evt.event_type = FIFO_EVENT_SERVER_TX;
+	      evt.event_id = e->event_id;
+
+	      q = utm->vpp_event_queue;
+	      unix_shared_memory_queue_add (q, (u8 *) & evt,
+					    0 /* do wait for mutex */ );
+	    }
 	}
-
-      if (n_read > 0)
-	bytes -= n_read;
     }
   while ((n_read < 0 || bytes > 0) && !utm->time_to_stop);
 }
@@ -852,7 +902,10 @@
 vl_api_disconnect_session_reply_t_handler (vl_api_disconnect_session_reply_t *
 					   mp)
 {
+  uri_tcp_test_main_t *utm = &uri_tcp_test_main;
+
   clib_warning ("retval %d", ntohl (mp->retval));
+  utm->state = STATE_START;
 }
 
 #define foreach_uri_msg                                 \
@@ -888,6 +941,7 @@
   u8 *heap, *uri = 0;
   u8 *bind_uri = (u8 *) "tcp://0.0.0.0/1234";
   u8 *connect_uri = (u8 *) "tcp://6.0.1.2/1234";
+  u32 bytes_to_send = 64 << 10, mbytes;
   u32 tmp;
   mheap_t *h;
   session_t *session;
@@ -934,6 +988,10 @@
 	drop_packets = 1;
       else if (unformat (a, "test"))
 	test_return_packets = 1;
+      else if (unformat (a, "mbytes %d", &mbytes))
+	{
+	  bytes_to_send = mbytes << 20;
+	}
       else
 	{
 	  fformat (stderr, "%s: usage [master|slave]\n");
@@ -956,6 +1014,7 @@
   utm->segment_main = &svm_fifo_segment_main;
   utm->drop_packets = drop_packets;
   utm->test_return_packets = test_return_packets;
+  utm->bytes_to_send = bytes_to_send;
 
   setup_signal_handlers ();
   uri_api_hookup (utm);
diff --git a/src/uri/uri_udp_test.c b/src/uri/uri_udp_test.c
index 54625d6..e6c239c 100644
--- a/src/uri/uri_udp_test.c
+++ b/src/uri/uri_udp_test.c
@@ -742,17 +742,20 @@
   /* Fabricate TX event, send to vpp */
   evt.fifo = tx_fifo;
   evt.event_type = FIFO_EVENT_SERVER_TX;
-  /* $$$$ for event logging */
-  evt.enqueue_length = nbytes;
   evt.event_id = e->event_id;
-  q = utm->vpp_event_queue;
-  unix_shared_memory_queue_add (q, (u8 *) & evt, 0 /* do wait for mutex */ );
+
+  if (svm_fifo_set_event (tx_fifo))
+    {
+      q = utm->vpp_event_queue;
+      unix_shared_memory_queue_add (q, (u8 *) & evt,
+				    0 /* do wait for mutex */ );
+    }
 }
 
 void
 server_handle_event_queue (uri_udp_test_main_t * utm)
 {
-  session_fifo_event_t _e, *e = &_e;;
+  session_fifo_event_t _e, *e = &_e;
 
   while (1)
     {