TCP/session improvements
- Added svm fifo flag for tracking fifo dequeue events (replaces event
length). Updated all code to switch to the new scheme.
- More session debugging
- Fix peek index wrap
- Add a trivial socket test client
- Fast retransmit/cc fixes
- tx and rx SACK fixes and unit testing
- SRTT computation fix
- remove dupack/ack burst filters
- improve ack rx
- improved segment rx
- builtin client test code
Change-Id: Ic4eb2d5ca446eb2260ccd3ccbcdaa73c64e7f4e1
Signed-off-by: Florin Coras <fcoras@cisco.com>
Signed-off-by: Dave Barach <dbarach@cisco.com>
diff --git a/src/uri/uri_socket_test.c b/src/uri/uri_socket_test.c
new file mode 100644
index 0000000..9f049bd
--- /dev/null
+++ b/src/uri/uri_socket_test.c
@@ -0,0 +1,126 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdio.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netdb.h>
+#include <vppinfra/format.h>
+
+int
+main (int argc, char *argv[])
+{
+ int sockfd, portno, n;
+ struct sockaddr_in serv_addr;
+ struct hostent *server;
+ u8 *rx_buffer = 0, *tx_buffer = 0;
+ u32 offset;
+ int iter, i;
+ if (0 && argc < 3)
+ {
+ fformat (stderr, "usage %s hostname port\n", argv[0]);
+ exit (0);
+ }
+
+ portno = 1234; // atoi(argv[2]);
+ sockfd = socket (AF_INET, SOCK_STREAM, 0);
+ if (sockfd < 0)
+ {
+ clib_unix_error ("socket");
+ exit (1);
+ }
+ server = gethostbyname ("6.0.1.1" /* argv[1] */ );
+ if (server == NULL)
+ {
+ clib_unix_warning ("gethostbyname");
+ exit (1);
+ }
+ bzero ((char *) &serv_addr, sizeof (serv_addr));
+ serv_addr.sin_family = AF_INET;
+ bcopy ((char *) server->h_addr,
+ (char *) &serv_addr.sin_addr.s_addr, server->h_length);
+ serv_addr.sin_port = htons (portno);
+ if (connect (sockfd, (const void *) &serv_addr, sizeof (serv_addr)) < 0)
+ {
+ clib_unix_warning ("connect");
+ exit (1);
+ }
+
+ vec_validate (rx_buffer, 1400);
+ vec_validate (tx_buffer, 1400);
+
+ for (i = 0; i < vec_len (tx_buffer); i++)
+ tx_buffer[i] = (i + 1) % 0xff;
+
+ /*
+ * Send one packet to warm up the RX pipeline
+ */
+ n = send (sockfd, tx_buffer, vec_len (tx_buffer), 0 /* flags */ );
+ if (n != vec_len (tx_buffer))
+ {
+ clib_unix_warning ("write");
+ exit (0);
+ }
+
+ for (iter = 0; iter < 100000; iter++)
+ {
+ if (iter < 99999)
+ {
+ n = send (sockfd, tx_buffer, vec_len (tx_buffer), 0 /* flags */ );
+ if (n != vec_len (tx_buffer))
+ {
+ clib_unix_warning ("write");
+ exit (0);
+ }
+ }
+ offset = 0;
+
+ do
+ {
+ n = recv (sockfd, rx_buffer + offset,
+ vec_len (rx_buffer) - offset, 0 /* flags */ );
+ if (n < 0)
+ {
+ clib_unix_warning ("read");
+ exit (0);
+ }
+ offset += n;
+ }
+ while (offset < vec_len (rx_buffer));
+
+ for (i = 0; i < vec_len (rx_buffer); i++)
+ {
+ if (rx_buffer[i] != tx_buffer[i])
+ {
+ clib_warning ("[%d] read 0x%x not 0x%x",
+ rx_buffer[i], tx_buffer[i]);
+ exit (1);
+ }
+ }
+
+ }
+ close (sockfd);
+ return 0;
+}
+
+
+/*
+ * fd.io coding-style-patch-verification: ON
+ *
+ * Local Variables:
+ * eval: (c-set-style "gnu")
+ * End:
+ */
diff --git a/src/uri/uri_tcp_test.c b/src/uri/uri_tcp_test.c
index 406a5f4..e283481 100644
--- a/src/uri/uri_tcp_test.c
+++ b/src/uri/uri_tcp_test.c
@@ -116,6 +116,7 @@
pthread_t client_rx_thread_handle;
u32 client_bytes_received;
u8 test_return_packets;
+ u32 bytes_to_send;
/* convenience */
svm_fifo_segment_main_t *segment_main;
@@ -313,11 +314,16 @@
rx_fifo = e->fifo;
- bytes = e->enqueue_length;
+ bytes = svm_fifo_max_dequeue (rx_fifo);
+ /* Allow enqueuing of new event */
+ svm_fifo_unset_event (rx_fifo);
+
+ /* Read the bytes */
do
{
- n_read = svm_fifo_dequeue_nowait (rx_fifo, 0, vec_len (utm->rx_buf),
- utm->rx_buf);
+ n_read = svm_fifo_dequeue_nowait (rx_fifo, 0,
+ clib_min (vec_len (utm->rx_buf),
+ bytes), utm->rx_buf);
if (n_read > 0)
{
bytes -= n_read;
@@ -333,9 +339,17 @@
}
utm->client_bytes_received += n_read;
}
+ else
+ {
+ if (n_read == -2)
+ {
+ clib_warning ("weird!");
+ break;
+ }
+ }
}
- while (n_read < 0 || bytes > 0);
+ while (bytes > 0);
}
void
@@ -479,47 +493,41 @@
}
}
-void
-client_send_data (uri_tcp_test_main_t * utm)
+static void
+send_test_chunk (uri_tcp_test_main_t * utm, svm_fifo_t * tx_fifo, int mypid,
+ u32 bytes)
{
u8 *test_data = utm->connect_test_data;
u64 bytes_sent = 0;
- int rv;
- int mypid = getpid ();
- session_t *session;
- svm_fifo_t *tx_fifo;
- int buffer_offset, bytes_to_send = 0;
+ int test_buf_offset = 0;
+ u32 bytes_to_snd;
+ u32 queue_max_chunk = 64 << 10, actual_write;
session_fifo_event_t evt;
static int serial_number = 0;
- int i;
- u32 max_chunk = 64 << 10, write;
+ int rv;
- session = pool_elt_at_index (utm->sessions, utm->connected_session_index);
- tx_fifo = session->server_tx_fifo;
+ bytes_to_snd = (bytes == 0) ? vec_len (test_data) : bytes;
+ if (bytes_to_snd > vec_len (test_data))
+ bytes_to_snd = vec_len (test_data);
- vec_validate (utm->rx_buf, vec_len (test_data) - 1);
-
- for (i = 0; i < 1; i++)
+ while (bytes_to_snd > 0)
{
- bytes_to_send = vec_len (test_data);
- buffer_offset = 0;
- while (bytes_to_send > 0)
+ actual_write =
+ bytes_to_snd > queue_max_chunk ? queue_max_chunk : bytes_to_snd;
+ rv = svm_fifo_enqueue_nowait (tx_fifo, mypid, actual_write,
+ test_data + test_buf_offset);
+
+ if (rv > 0)
{
- write = bytes_to_send > max_chunk ? max_chunk : bytes_to_send;
- rv = svm_fifo_enqueue_nowait (tx_fifo, mypid, write,
- test_data + buffer_offset);
+ bytes_to_snd -= rv;
+ test_buf_offset += rv;
+ bytes_sent += rv;
- if (rv > 0)
+ if (svm_fifo_set_event (tx_fifo))
{
- bytes_to_send -= rv;
- buffer_offset += rv;
- bytes_sent += rv;
-
/* Fabricate TX event, send to vpp */
evt.fifo = tx_fifo;
evt.event_type = FIFO_EVENT_SERVER_TX;
- /* $$$$ for event logging */
- evt.enqueue_length = rv;
evt.event_id = serial_number++;
unix_shared_memory_queue_add (utm->vpp_event_queue,
@@ -528,13 +536,40 @@
}
}
}
+}
+
+void
+client_send_data (uri_tcp_test_main_t * utm)
+{
+ u8 *test_data = utm->connect_test_data;
+ int mypid = getpid ();
+ session_t *session;
+ svm_fifo_t *tx_fifo;
+ u32 n_iterations, leftover;
+ int i;
+
+ session = pool_elt_at_index (utm->sessions, utm->connected_session_index);
+ tx_fifo = session->server_tx_fifo;
+
+ vec_validate (utm->rx_buf, vec_len (test_data) - 1);
+ n_iterations = utm->bytes_to_send / vec_len (test_data);
+
+ for (i = 0; i < n_iterations; i++)
+ {
+ send_test_chunk (utm, tx_fifo, mypid, 0);
+ }
+
+ leftover = utm->bytes_to_send % vec_len (test_data);
+ if (leftover)
+ send_test_chunk (utm, tx_fifo, mypid, leftover);
if (utm->test_return_packets)
{
f64 timeout = clib_time_now (&utm->clib_time) + 2;
/* Wait for the outstanding packets */
- while (utm->client_bytes_received < vec_len (test_data))
+ while (utm->client_bytes_received <
+ vec_len (test_data) * n_iterations + leftover)
{
if (clib_time_now (&utm->clib_time) > timeout)
{
@@ -542,9 +577,8 @@
break;
}
}
-
- utm->time_to_stop = 1;
}
+ utm->time_to_stop = 1;
}
void
@@ -599,6 +633,11 @@
/* Disconnect */
client_disconnect (utm);
+
+ if (wait_for_state_change (utm, STATE_START))
+ {
+ return;
+ }
}
static void
@@ -714,7 +753,6 @@
{
svm_fifo_t *rx_fifo, *tx_fifo;
int n_read;
-
session_fifo_event_t evt;
unix_shared_memory_queue_t *q;
int rv, bytes;
@@ -722,34 +760,46 @@
rx_fifo = e->fifo;
tx_fifo = utm->sessions[rx_fifo->client_session_index].server_tx_fifo;
- bytes = e->enqueue_length;
+ bytes = svm_fifo_max_dequeue (rx_fifo);
+ /* Allow enqueuing of a new event */
+ svm_fifo_unset_event (rx_fifo);
+
+ if (bytes == 0)
+ return;
+
+ /* Read the bytes */
do
{
n_read = svm_fifo_dequeue_nowait (rx_fifo, 0, vec_len (utm->rx_buf),
utm->rx_buf);
+ if (n_read > 0)
+ bytes -= n_read;
+
+ if (utm->drop_packets)
+ continue;
/* Reflect if a non-drop session */
- if (!utm->drop_packets && n_read > 0)
+ if (n_read > 0)
{
do
{
rv = svm_fifo_enqueue_nowait (tx_fifo, 0, n_read, utm->rx_buf);
}
- while (rv == -2 && !utm->time_to_stop);
+ while (rv <= 0 && !utm->time_to_stop);
- /* Fabricate TX event, send to vpp */
- evt.fifo = tx_fifo;
- evt.event_type = FIFO_EVENT_SERVER_TX;
- /* $$$$ for event logging */
- evt.enqueue_length = n_read;
- evt.event_id = e->event_id;
- q = utm->vpp_event_queue;
- unix_shared_memory_queue_add (q, (u8 *) & evt,
- 0 /* do wait for mutex */ );
+ /* If event wasn't set, add one */
+ if (svm_fifo_set_event (tx_fifo))
+ {
+ /* Fabricate TX event, send to vpp */
+ evt.fifo = tx_fifo;
+ evt.event_type = FIFO_EVENT_SERVER_TX;
+ evt.event_id = e->event_id;
+
+ q = utm->vpp_event_queue;
+ unix_shared_memory_queue_add (q, (u8 *) & evt,
+ 0 /* do wait for mutex */ );
+ }
}
-
- if (n_read > 0)
- bytes -= n_read;
}
while ((n_read < 0 || bytes > 0) && !utm->time_to_stop);
}
@@ -852,7 +902,10 @@
vl_api_disconnect_session_reply_t_handler (vl_api_disconnect_session_reply_t *
mp)
{
+ uri_tcp_test_main_t *utm = &uri_tcp_test_main;
+
clib_warning ("retval %d", ntohl (mp->retval));
+ utm->state = STATE_START;
}
#define foreach_uri_msg \
@@ -888,6 +941,7 @@
u8 *heap, *uri = 0;
u8 *bind_uri = (u8 *) "tcp://0.0.0.0/1234";
u8 *connect_uri = (u8 *) "tcp://6.0.1.2/1234";
+ u32 bytes_to_send = 64 << 10, mbytes;
u32 tmp;
mheap_t *h;
session_t *session;
@@ -934,6 +988,10 @@
drop_packets = 1;
else if (unformat (a, "test"))
test_return_packets = 1;
+ else if (unformat (a, "mbytes %d", &mbytes))
+ {
+ bytes_to_send = mbytes << 20;
+ }
else
{
fformat (stderr, "%s: usage [master|slave]\n");
@@ -956,6 +1014,7 @@
utm->segment_main = &svm_fifo_segment_main;
utm->drop_packets = drop_packets;
utm->test_return_packets = test_return_packets;
+ utm->bytes_to_send = bytes_to_send;
setup_signal_handlers ();
uri_api_hookup (utm);
diff --git a/src/uri/uri_udp_test.c b/src/uri/uri_udp_test.c
index 54625d6..e6c239c 100644
--- a/src/uri/uri_udp_test.c
+++ b/src/uri/uri_udp_test.c
@@ -742,17 +742,20 @@
/* Fabricate TX event, send to vpp */
evt.fifo = tx_fifo;
evt.event_type = FIFO_EVENT_SERVER_TX;
- /* $$$$ for event logging */
- evt.enqueue_length = nbytes;
evt.event_id = e->event_id;
- q = utm->vpp_event_queue;
- unix_shared_memory_queue_add (q, (u8 *) & evt, 0 /* do wait for mutex */ );
+
+ if (svm_fifo_set_event (tx_fifo))
+ {
+ q = utm->vpp_event_queue;
+ unix_shared_memory_queue_add (q, (u8 *) & evt,
+ 0 /* do wait for mutex */ );
+ }
}
void
server_handle_event_queue (uri_udp_test_main_t * utm)
{
- session_fifo_event_t _e, *e = &_e;;
+ session_fifo_event_t _e, *e = &_e;
while (1)
{