TCP/session improvements
- Added svm fifo flag for tracking fifo dequeue events (replaces event
length). Updated all code to switch to the new scheme.
- More session debugging
- Fix peek index wrap
- Add a trivial socket test client
- Fast retransmit/cc fixes
- tx and rx SACK fixes and unit testing
- SRTT computation fix
- remove dupack/ack burst filters
- improve ack rx
- improved segment rx
- builtin client test code
Change-Id: Ic4eb2d5ca446eb2260ccd3ccbcdaa73c64e7f4e1
Signed-off-by: Florin Coras <fcoras@cisco.com>
Signed-off-by: Dave Barach <dbarach@cisco.com>
diff --git a/src/svm/svm_fifo.c b/src/svm/svm_fifo.c
index e3f534b..07b0d2d 100644
--- a/src/svm/svm_fifo.c
+++ b/src/svm/svm_fifo.c
@@ -13,7 +13,7 @@
* limitations under the License.
*/
-#include "svm_fifo.h"
+#include <svm/svm_fifo.h>
/** create an svm fifo, in the current heap. Fails vs blow up the process */
svm_fifo_t *
@@ -362,18 +362,19 @@
return svm_fifo_enqueue_internal (f, pid, max_bytes, copy_from_here);
}
-/** Enqueue a future segment.
+/**
+ * Enqueue a future segment.
+ *
* Two choices: either copies the entire segment, or copies nothing
* Returns 0 of the entire segment was copied
* Returns -1 if none of the segment was copied due to lack of space
*/
-
static int
-svm_fifo_enqueue_with_offset_internal2 (svm_fifo_t * f,
- int pid,
- u32 offset,
- u32 required_bytes,
- u8 * copy_from_here)
+svm_fifo_enqueue_with_offset_internal (svm_fifo_t * f,
+ int pid,
+ u32 offset,
+ u32 required_bytes,
+ u8 * copy_from_here)
{
u32 total_copy_bytes, first_copy_bytes, second_copy_bytes;
u32 cursize, nitems;
@@ -424,14 +425,14 @@
u32 offset,
u32 required_bytes, u8 * copy_from_here)
{
- return svm_fifo_enqueue_with_offset_internal2
+ return svm_fifo_enqueue_with_offset_internal
(f, pid, offset, required_bytes, copy_from_here);
}
static int
-svm_fifo_dequeue_internal2 (svm_fifo_t * f,
- int pid, u32 max_bytes, u8 * copy_here)
+svm_fifo_dequeue_internal (svm_fifo_t * f,
+ int pid, u32 max_bytes, u8 * copy_here)
{
u32 total_copy_bytes, first_copy_bytes, second_copy_bytes;
u32 cursize, nitems;
@@ -484,7 +485,7 @@
svm_fifo_dequeue_nowait (svm_fifo_t * f,
int pid, u32 max_bytes, u8 * copy_here)
{
- return svm_fifo_dequeue_internal2 (f, pid, max_bytes, copy_here);
+ return svm_fifo_dequeue_internal (f, pid, max_bytes, copy_here);
}
int
@@ -492,7 +493,7 @@
u8 * copy_here)
{
u32 total_copy_bytes, first_copy_bytes, second_copy_bytes;
- u32 cursize, nitems;
+ u32 cursize, nitems, real_head;
if (PREDICT_FALSE (f->cursize == 0))
return -2; /* nothing in the fifo */
@@ -500,6 +501,8 @@
/* read cursize, which can only increase while we're working */
cursize = f->cursize;
nitems = f->nitems;
+ real_head = f->head + offset;
+ real_head = real_head >= nitems ? real_head - nitems : real_head;
/* Number of bytes we're going to copy */
total_copy_bytes = (cursize < max_bytes) ? cursize : max_bytes;
@@ -508,9 +511,9 @@
{
/* Number of bytes in first copy segment */
first_copy_bytes =
- ((nitems - f->head + offset) < total_copy_bytes) ?
- (nitems - f->head + offset) : total_copy_bytes;
- clib_memcpy (copy_here, &f->data[f->head + offset], first_copy_bytes);
+ ((nitems - real_head) < total_copy_bytes) ?
+ (nitems - real_head) : total_copy_bytes;
+ clib_memcpy (copy_here, &f->data[real_head], first_copy_bytes);
/* Number of bytes in second copy segment, if any */
second_copy_bytes = total_copy_bytes - first_copy_bytes;
diff --git a/src/svm/svm_fifo.h b/src/svm/svm_fifo.h
index 70624b7..3955617 100644
--- a/src/svm/svm_fifo.h
+++ b/src/svm/svm_fifo.h
@@ -46,9 +46,11 @@
{
pthread_mutex_t mutex; /* 8 bytes */
pthread_cond_t condvar; /* 8 bytes */
- u32 owner_pid;
svm_lock_tag_t tag;
- volatile u32 cursize;
+
+ volatile u32 cursize; /**< current fifo size */
+ volatile u8 has_event; /**< non-zero if deq event exists */
+ u32 owner_pid;
u32 nitems;
/* Backpointers */
@@ -112,6 +114,28 @@
return f->ooos_list_head != OOO_SEGMENT_INVALID_INDEX;
}
+/**
+ * Sets fifo event flag.
+ *
+ * @return 1 if flag was not set.
+ */
+always_inline u8
+svm_fifo_set_event (svm_fifo_t * f)
+{
+ /* Probably doesn't need to be atomic. Still, better avoid surprises */
+ return __sync_lock_test_and_set (&f->has_event, 1) == 0;
+}
+
+/**
+ * Unsets fifo event flag.
+ */
+always_inline void
+svm_fifo_unset_event (svm_fifo_t * f)
+{
+ /* Probably doesn't need to be atomic. Still, better avoid surprises */
+ __sync_lock_test_and_set (&f->has_event, 0);
+}
+
svm_fifo_t *svm_fifo_create (u32 data_size_in_bytes);
int svm_fifo_enqueue_nowait (svm_fifo_t * f, int pid, u32 max_bytes,
diff --git a/src/svm/svm_fifo_segment.h b/src/svm/svm_fifo_segment.h
index 793fa7c..ecb5653 100644
--- a/src/svm/svm_fifo_segment.h
+++ b/src/svm/svm_fifo_segment.h
@@ -15,8 +15,8 @@
#ifndef __included_ssvm_fifo_segment_h__
#define __included_ssvm_fifo_segment_h__
-#include "svm_fifo.h"
-#include "ssvm.h"
+#include <svm/svm_fifo.h>
+#include <svm/ssvm.h>
typedef struct
{
diff --git a/src/uri.am b/src/uri.am
index 09b5b15..ad4d65d 100644
--- a/src/uri.am
+++ b/src/uri.am
@@ -11,7 +11,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-noinst_PROGRAMS += uri_udp_test uri_tcp_test
+noinst_PROGRAMS += uri_udp_test uri_tcp_test uri_socket_test
uri_udp_test_SOURCES = uri/uri_udp_test.c
uri_udp_test_LDADD = libvlibmemoryclient.la libvlibapi.la libsvm.la \
@@ -20,3 +20,6 @@
uri_tcp_test_SOURCES = uri/uri_tcp_test.c
uri_tcp_test_LDADD = libvlibmemoryclient.la libvlibapi.la libsvm.la \
libvppinfra.la -lpthread -lm -lrt
+
+uri_socket_test_SOURCES = uri/uri_socket_test.c
+uri_socket_test_LDADD = libvppinfra.la -lpthread -lm -lrt
diff --git a/src/uri/uri_socket_test.c b/src/uri/uri_socket_test.c
new file mode 100644
index 0000000..9f049bd
--- /dev/null
+++ b/src/uri/uri_socket_test.c
@@ -0,0 +1,126 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdio.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netdb.h>
+#include <vppinfra/format.h>
+
+int
+main (int argc, char *argv[])
+{
+ int sockfd, portno, n;
+ struct sockaddr_in serv_addr;
+ struct hostent *server;
+ u8 *rx_buffer = 0, *tx_buffer = 0;
+ u32 offset;
+ int iter, i;
+ if (0 && argc < 3)
+ {
+ fformat (stderr, "usage %s hostname port\n", argv[0]);
+ exit (0);
+ }
+
+ portno = 1234; // atoi(argv[2]);
+ sockfd = socket (AF_INET, SOCK_STREAM, 0);
+ if (sockfd < 0)
+ {
+ clib_unix_error ("socket");
+ exit (1);
+ }
+ server = gethostbyname ("6.0.1.1" /* argv[1] */ );
+ if (server == NULL)
+ {
+ clib_unix_warning ("gethostbyname");
+ exit (1);
+ }
+ bzero ((char *) &serv_addr, sizeof (serv_addr));
+ serv_addr.sin_family = AF_INET;
+ bcopy ((char *) server->h_addr,
+ (char *) &serv_addr.sin_addr.s_addr, server->h_length);
+ serv_addr.sin_port = htons (portno);
+ if (connect (sockfd, (const void *) &serv_addr, sizeof (serv_addr)) < 0)
+ {
+ clib_unix_warning ("connect");
+ exit (1);
+ }
+
+ vec_validate (rx_buffer, 1400);
+ vec_validate (tx_buffer, 1400);
+
+ for (i = 0; i < vec_len (tx_buffer); i++)
+ tx_buffer[i] = (i + 1) % 0xff;
+
+ /*
+ * Send one packet to warm up the RX pipeline
+ */
+ n = send (sockfd, tx_buffer, vec_len (tx_buffer), 0 /* flags */ );
+ if (n != vec_len (tx_buffer))
+ {
+ clib_unix_warning ("write");
+ exit (0);
+ }
+
+ for (iter = 0; iter < 100000; iter++)
+ {
+ if (iter < 99999)
+ {
+ n = send (sockfd, tx_buffer, vec_len (tx_buffer), 0 /* flags */ );
+ if (n != vec_len (tx_buffer))
+ {
+ clib_unix_warning ("write");
+ exit (0);
+ }
+ }
+ offset = 0;
+
+ do
+ {
+ n = recv (sockfd, rx_buffer + offset,
+ vec_len (rx_buffer) - offset, 0 /* flags */ );
+ if (n < 0)
+ {
+ clib_unix_warning ("read");
+ exit (0);
+ }
+ offset += n;
+ }
+ while (offset < vec_len (rx_buffer));
+
+ for (i = 0; i < vec_len (rx_buffer); i++)
+ {
+ if (rx_buffer[i] != tx_buffer[i])
+ {
+ clib_warning ("[%d] read 0x%x not 0x%x",
+ rx_buffer[i], tx_buffer[i]);
+ exit (1);
+ }
+ }
+
+ }
+ close (sockfd);
+ return 0;
+}
+
+
+/*
+ * fd.io coding-style-patch-verification: ON
+ *
+ * Local Variables:
+ * eval: (c-set-style "gnu")
+ * End:
+ */
diff --git a/src/uri/uri_tcp_test.c b/src/uri/uri_tcp_test.c
index 406a5f4..e283481 100644
--- a/src/uri/uri_tcp_test.c
+++ b/src/uri/uri_tcp_test.c
@@ -116,6 +116,7 @@
pthread_t client_rx_thread_handle;
u32 client_bytes_received;
u8 test_return_packets;
+ u32 bytes_to_send;
/* convenience */
svm_fifo_segment_main_t *segment_main;
@@ -313,11 +314,16 @@
rx_fifo = e->fifo;
- bytes = e->enqueue_length;
+ bytes = svm_fifo_max_dequeue (rx_fifo);
+ /* Allow enqueuing of new event */
+ svm_fifo_unset_event (rx_fifo);
+
+ /* Read the bytes */
do
{
- n_read = svm_fifo_dequeue_nowait (rx_fifo, 0, vec_len (utm->rx_buf),
- utm->rx_buf);
+ n_read = svm_fifo_dequeue_nowait (rx_fifo, 0,
+ clib_min (vec_len (utm->rx_buf),
+ bytes), utm->rx_buf);
if (n_read > 0)
{
bytes -= n_read;
@@ -333,9 +339,17 @@
}
utm->client_bytes_received += n_read;
}
+ else
+ {
+ if (n_read == -2)
+ {
+ clib_warning ("weird!");
+ break;
+ }
+ }
}
- while (n_read < 0 || bytes > 0);
+ while (bytes > 0);
}
void
@@ -479,47 +493,41 @@
}
}
-void
-client_send_data (uri_tcp_test_main_t * utm)
+static void
+send_test_chunk (uri_tcp_test_main_t * utm, svm_fifo_t * tx_fifo, int mypid,
+ u32 bytes)
{
u8 *test_data = utm->connect_test_data;
u64 bytes_sent = 0;
- int rv;
- int mypid = getpid ();
- session_t *session;
- svm_fifo_t *tx_fifo;
- int buffer_offset, bytes_to_send = 0;
+ int test_buf_offset = 0;
+ u32 bytes_to_snd;
+ u32 queue_max_chunk = 64 << 10, actual_write;
session_fifo_event_t evt;
static int serial_number = 0;
- int i;
- u32 max_chunk = 64 << 10, write;
+ int rv;
- session = pool_elt_at_index (utm->sessions, utm->connected_session_index);
- tx_fifo = session->server_tx_fifo;
+ bytes_to_snd = (bytes == 0) ? vec_len (test_data) : bytes;
+ if (bytes_to_snd > vec_len (test_data))
+ bytes_to_snd = vec_len (test_data);
- vec_validate (utm->rx_buf, vec_len (test_data) - 1);
-
- for (i = 0; i < 1; i++)
+ while (bytes_to_snd > 0)
{
- bytes_to_send = vec_len (test_data);
- buffer_offset = 0;
- while (bytes_to_send > 0)
+ actual_write =
+ bytes_to_snd > queue_max_chunk ? queue_max_chunk : bytes_to_snd;
+ rv = svm_fifo_enqueue_nowait (tx_fifo, mypid, actual_write,
+ test_data + test_buf_offset);
+
+ if (rv > 0)
{
- write = bytes_to_send > max_chunk ? max_chunk : bytes_to_send;
- rv = svm_fifo_enqueue_nowait (tx_fifo, mypid, write,
- test_data + buffer_offset);
+ bytes_to_snd -= rv;
+ test_buf_offset += rv;
+ bytes_sent += rv;
- if (rv > 0)
+ if (svm_fifo_set_event (tx_fifo))
{
- bytes_to_send -= rv;
- buffer_offset += rv;
- bytes_sent += rv;
-
/* Fabricate TX event, send to vpp */
evt.fifo = tx_fifo;
evt.event_type = FIFO_EVENT_SERVER_TX;
- /* $$$$ for event logging */
- evt.enqueue_length = rv;
evt.event_id = serial_number++;
unix_shared_memory_queue_add (utm->vpp_event_queue,
@@ -528,13 +536,40 @@
}
}
}
+}
+
+void
+client_send_data (uri_tcp_test_main_t * utm)
+{
+ u8 *test_data = utm->connect_test_data;
+ int mypid = getpid ();
+ session_t *session;
+ svm_fifo_t *tx_fifo;
+ u32 n_iterations, leftover;
+ int i;
+
+ session = pool_elt_at_index (utm->sessions, utm->connected_session_index);
+ tx_fifo = session->server_tx_fifo;
+
+ vec_validate (utm->rx_buf, vec_len (test_data) - 1);
+ n_iterations = utm->bytes_to_send / vec_len (test_data);
+
+ for (i = 0; i < n_iterations; i++)
+ {
+ send_test_chunk (utm, tx_fifo, mypid, 0);
+ }
+
+ leftover = utm->bytes_to_send % vec_len (test_data);
+ if (leftover)
+ send_test_chunk (utm, tx_fifo, mypid, leftover);
if (utm->test_return_packets)
{
f64 timeout = clib_time_now (&utm->clib_time) + 2;
/* Wait for the outstanding packets */
- while (utm->client_bytes_received < vec_len (test_data))
+ while (utm->client_bytes_received <
+ vec_len (test_data) * n_iterations + leftover)
{
if (clib_time_now (&utm->clib_time) > timeout)
{
@@ -542,9 +577,8 @@
break;
}
}
-
- utm->time_to_stop = 1;
}
+ utm->time_to_stop = 1;
}
void
@@ -599,6 +633,11 @@
/* Disconnect */
client_disconnect (utm);
+
+ if (wait_for_state_change (utm, STATE_START))
+ {
+ return;
+ }
}
static void
@@ -714,7 +753,6 @@
{
svm_fifo_t *rx_fifo, *tx_fifo;
int n_read;
-
session_fifo_event_t evt;
unix_shared_memory_queue_t *q;
int rv, bytes;
@@ -722,34 +760,46 @@
rx_fifo = e->fifo;
tx_fifo = utm->sessions[rx_fifo->client_session_index].server_tx_fifo;
- bytes = e->enqueue_length;
+ bytes = svm_fifo_max_dequeue (rx_fifo);
+ /* Allow enqueuing of a new event */
+ svm_fifo_unset_event (rx_fifo);
+
+ if (bytes == 0)
+ return;
+
+ /* Read the bytes */
do
{
n_read = svm_fifo_dequeue_nowait (rx_fifo, 0, vec_len (utm->rx_buf),
utm->rx_buf);
+ if (n_read > 0)
+ bytes -= n_read;
+
+ if (utm->drop_packets)
+ continue;
/* Reflect if a non-drop session */
- if (!utm->drop_packets && n_read > 0)
+ if (n_read > 0)
{
do
{
rv = svm_fifo_enqueue_nowait (tx_fifo, 0, n_read, utm->rx_buf);
}
- while (rv == -2 && !utm->time_to_stop);
+ while (rv <= 0 && !utm->time_to_stop);
- /* Fabricate TX event, send to vpp */
- evt.fifo = tx_fifo;
- evt.event_type = FIFO_EVENT_SERVER_TX;
- /* $$$$ for event logging */
- evt.enqueue_length = n_read;
- evt.event_id = e->event_id;
- q = utm->vpp_event_queue;
- unix_shared_memory_queue_add (q, (u8 *) & evt,
- 0 /* do wait for mutex */ );
+ /* If event wasn't set, add one */
+ if (svm_fifo_set_event (tx_fifo))
+ {
+ /* Fabricate TX event, send to vpp */
+ evt.fifo = tx_fifo;
+ evt.event_type = FIFO_EVENT_SERVER_TX;
+ evt.event_id = e->event_id;
+
+ q = utm->vpp_event_queue;
+ unix_shared_memory_queue_add (q, (u8 *) & evt,
+ 0 /* do wait for mutex */ );
+ }
}
-
- if (n_read > 0)
- bytes -= n_read;
}
while ((n_read < 0 || bytes > 0) && !utm->time_to_stop);
}
@@ -852,7 +902,10 @@
vl_api_disconnect_session_reply_t_handler (vl_api_disconnect_session_reply_t *
mp)
{
+ uri_tcp_test_main_t *utm = &uri_tcp_test_main;
+
clib_warning ("retval %d", ntohl (mp->retval));
+ utm->state = STATE_START;
}
#define foreach_uri_msg \
@@ -888,6 +941,7 @@
u8 *heap, *uri = 0;
u8 *bind_uri = (u8 *) "tcp://0.0.0.0/1234";
u8 *connect_uri = (u8 *) "tcp://6.0.1.2/1234";
+ u32 bytes_to_send = 64 << 10, mbytes;
u32 tmp;
mheap_t *h;
session_t *session;
@@ -934,6 +988,10 @@
drop_packets = 1;
else if (unformat (a, "test"))
test_return_packets = 1;
+ else if (unformat (a, "mbytes %d", &mbytes))
+ {
+ bytes_to_send = mbytes << 20;
+ }
else
{
fformat (stderr, "%s: usage [master|slave]\n");
@@ -956,6 +1014,7 @@
utm->segment_main = &svm_fifo_segment_main;
utm->drop_packets = drop_packets;
utm->test_return_packets = test_return_packets;
+ utm->bytes_to_send = bytes_to_send;
setup_signal_handlers ();
uri_api_hookup (utm);
diff --git a/src/uri/uri_udp_test.c b/src/uri/uri_udp_test.c
index 54625d6..e6c239c 100644
--- a/src/uri/uri_udp_test.c
+++ b/src/uri/uri_udp_test.c
@@ -742,17 +742,20 @@
/* Fabricate TX event, send to vpp */
evt.fifo = tx_fifo;
evt.event_type = FIFO_EVENT_SERVER_TX;
- /* $$$$ for event logging */
- evt.enqueue_length = nbytes;
evt.event_id = e->event_id;
- q = utm->vpp_event_queue;
- unix_shared_memory_queue_add (q, (u8 *) & evt, 0 /* do wait for mutex */ );
+
+ if (svm_fifo_set_event (tx_fifo))
+ {
+ q = utm->vpp_event_queue;
+ unix_shared_memory_queue_add (q, (u8 *) & evt,
+ 0 /* do wait for mutex */ );
+ }
}
void
server_handle_event_queue (uri_udp_test_main_t * utm)
{
- session_fifo_event_t _e, *e = &_e;;
+ session_fifo_event_t _e, *e = &_e;
while (1)
{
diff --git a/src/vnet.am b/src/vnet.am
index 3e73de8..9c55e33 100644
--- a/src/vnet.am
+++ b/src/vnet.am
@@ -462,7 +462,9 @@
vnet/tcp/tcp_output.c \
vnet/tcp/tcp_input.c \
vnet/tcp/tcp_newreno.c \
+ vnet/tcp/builtin_client.c \
vnet/tcp/builtin_server.c \
+ vnet/tcp/tcp_test.c \
vnet/tcp/tcp.c
nobase_include_HEADERS += \
diff --git a/src/vnet/session/application.h b/src/vnet/session/application.h
index a60a8b8..480828f 100644
--- a/src/vnet/session/application.h
+++ b/src/vnet/session/application.h
@@ -45,8 +45,7 @@
void (*session_reset_callback) (stream_session_t * s);
/* Direct RX callback, for built-in servers */
- int (*builtin_server_rx_callback) (stream_session_t * session,
- session_fifo_event_t * ep);
+ int (*builtin_server_rx_callback) (stream_session_t * session);
/* Redirect connection to local server */
int (*redirect_connect_callback) (u32 api_client_index, void *mp);
diff --git a/src/vnet/session/node.c b/src/vnet/session/node.c
index 822afeb..8681105 100644
--- a/src/vnet/session/node.c
+++ b/src/vnet/session/node.c
@@ -13,21 +13,14 @@
* limitations under the License.
*/
+#include <math.h>
#include <vlib/vlib.h>
#include <vnet/vnet.h>
-#include <vnet/pg/pg.h>
-#include <vnet/ip/ip.h>
-
#include <vnet/tcp/tcp.h>
-
-#include <vppinfra/hash.h>
-#include <vppinfra/error.h>
#include <vppinfra/elog.h>
-#include <vlibmemory/unix_shared_memory_queue.h>
-
-#include <vnet/udp/udp_packet.h>
-#include <math.h>
+#include <vnet/session/application.h>
#include <vnet/session/session_debug.h>
+#include <vlibmemory/unix_shared_memory_queue.h>
vlib_node_registration_t session_queue_node;
@@ -52,8 +45,8 @@
vlib_node_registration_t session_queue_node;
-#define foreach_session_queue_error \
-_(TX, "Packets transmitted") \
+#define foreach_session_queue_error \
+_(TX, "Packets transmitted") \
_(TIMER, "Timer events")
typedef enum
@@ -91,10 +84,10 @@
transport_proto_vft_t *transport_vft;
u32 next_index, next0, *to_next, n_left_to_next, bi0;
vlib_buffer_t *b0;
- u32 rx_offset;
+ u32 rx_offset = 0, max_dequeue0;
u16 snd_mss0;
u8 *data0;
- int i;
+ int i, n_bytes_read;
next_index = next0 = session_type_to_next[s0->session_type];
@@ -106,24 +99,33 @@
snd_mss0 = transport_vft->send_mss (tc0);
/* Can't make any progress */
- if (snd_space0 == 0 || svm_fifo_max_dequeue (s0->server_tx_fifo) == 0
- || snd_mss0 == 0)
+ if (snd_space0 == 0 || snd_mss0 == 0)
{
vec_add1 (smm->evts_partially_read[thread_index], *e0);
return 0;
}
- ASSERT (e0->enqueue_length > 0);
-
- /* Ensure we're not writing more than transport window allows */
- max_len_to_snd0 = clib_min (e0->enqueue_length, snd_space0);
-
if (peek_data)
{
/* Offset in rx fifo from where to peek data */
rx_offset = transport_vft->tx_fifo_offset (tc0);
}
+ /* Check how much we can pull. If buffering, subtract the offset */
+ max_dequeue0 = svm_fifo_max_dequeue (s0->server_tx_fifo) - rx_offset;
+
+ /* Allow enqueuing of a new event */
+ svm_fifo_unset_event (s0->server_tx_fifo);
+
+ /* Nothing to read return */
+ if (max_dequeue0 == 0)
+ {
+ return 0;
+ }
+
+ /* Ensure we're not writing more than transport window allows */
+ max_len_to_snd0 = clib_min (max_dequeue0, snd_space0);
+
/* TODO check if transport is willing to send len_to_snd0
* bytes (Nagle) */
@@ -147,13 +149,10 @@
* XXX 0.9 because when debugging we might not get a full frame */
if (PREDICT_FALSE (n_bufs < 0.9 * VLIB_FRAME_SIZE))
{
- /* Keep track of how much we've dequeued and exit */
- if (left_to_snd0 != max_len_to_snd0)
+ if (svm_fifo_set_event (s0->server_tx_fifo))
{
- e0->enqueue_length -= max_len_to_snd0 - left_to_snd0;
vec_add1 (smm->evts_partially_read[thread_index], *e0);
}
-
return -1;
}
@@ -198,9 +197,9 @@
len_to_deq0 = (left_to_snd0 < snd_mss0) ? left_to_snd0 : snd_mss0;
/* *INDENT-OFF* */
- SESSION_EVT_DBG(s0, SESSION_EVT_DEQ, ({
+ SESSION_EVT_DBG(SESSION_EVT_DEQ, s0, ({
ed->data[0] = e0->event_id;
- ed->data[1] = e0->enqueue_length;
+ ed->data[1] = max_dequeue0;
ed->data[2] = len_to_deq0;
ed->data[3] = left_to_snd0;
}));
@@ -214,29 +213,30 @@
* 2) buffer chains */
if (peek_data)
{
- int n_bytes_read;
n_bytes_read = svm_fifo_peek (s0->server_tx_fifo, s0->pid,
rx_offset, len_to_deq0, data0);
- if (n_bytes_read < 0)
+ if (n_bytes_read <= 0)
goto dequeue_fail;
/* Keep track of progress locally, transport is also supposed to
- * increment it independently when pushing header */
+ * increment it independently when pushing the header */
rx_offset += n_bytes_read;
}
else
{
- if (svm_fifo_dequeue_nowait (s0->server_tx_fifo, s0->pid,
- len_to_deq0, data0) < 0)
+ n_bytes_read = svm_fifo_dequeue_nowait (s0->server_tx_fifo,
+ s0->pid, len_to_deq0,
+ data0);
+ if (n_bytes_read <= 0)
goto dequeue_fail;
}
- b0->current_length = len_to_deq0;
+ b0->current_length = n_bytes_read;
/* Ask transport to push header */
transport_vft->push_header (tc0, b0);
- left_to_snd0 -= len_to_deq0;
+ left_to_snd0 -= n_bytes_read;
*n_tx_packets = *n_tx_packets + 1;
vlib_validate_buffer_enqueue_x1 (vm, node, next_index,
@@ -246,25 +246,31 @@
vlib_put_next_frame (vm, node, next_index, n_left_to_next);
}
- /* If we couldn't dequeue all bytes store progress */
- if (max_len_to_snd0 < e0->enqueue_length)
+ /* If we couldn't dequeue all bytes mark as partially read */
+ if (max_len_to_snd0 < max_dequeue0)
{
- e0->enqueue_length -= max_len_to_snd0;
- vec_add1 (smm->evts_partially_read[thread_index], *e0);
+ /* If we don't already have new event */
+ if (svm_fifo_set_event (s0->server_tx_fifo))
+ {
+ vec_add1 (smm->evts_partially_read[thread_index], *e0);
+ }
}
return 0;
dequeue_fail:
- /* Can't read from fifo. Store event rx progress, save as partially read,
- * return buff to free list and return */
- e0->enqueue_length -= max_len_to_snd0 - left_to_snd0;
- vec_add1 (smm->evts_partially_read[thread_index], *e0);
+ /*
+ * Can't read from fifo. If we don't already have an event, save as partially
+ * read, return buff to free list and return
+ */
+ clib_warning ("dequeue fail");
- to_next -= 1;
- n_left_to_next += 1;
+ if (svm_fifo_set_event (s0->server_tx_fifo))
+ {
+ vec_add1 (smm->evts_partially_read[thread_index], *e0);
+ }
+ vlib_put_next_frame (vm, node, next_index, n_left_to_next + 1);
_vec_len (smm->tx_buffers[thread_index]) += 1;
- clib_warning ("dequeue fail");
return 0;
}
@@ -298,6 +304,7 @@
session_fifo_event_t *my_fifo_events, *e;
u32 n_to_dequeue, n_events;
unix_shared_memory_queue_t *q;
+ application_t *app;
int n_tx_packets = 0;
u32 my_thread_index = vm->cpu_index;
int i, rv;
@@ -321,13 +328,18 @@
if (n_to_dequeue == 0 && vec_len (my_fifo_events) == 0)
return 0;
+ SESSION_EVT_DBG (SESSION_EVT_DEQ_NODE, 0);
+
/*
* If we didn't manage to process previous events try going
* over them again without dequeuing new ones.
*/
/* XXX: Block senders to sessions that can't keep up */
if (vec_len (my_fifo_events) >= 100)
- goto skip_dequeue;
+ {
+ clib_warning ("too many fifo events unsolved");
+ goto skip_dequeue;
+ }
/* See you in the next life, don't be late */
if (pthread_mutex_trylock (&q->mutex))
@@ -352,19 +364,17 @@
{
svm_fifo_t *f0; /* $$$ prefetch 1 ahead maybe */
stream_session_t *s0;
- u32 server_session_index0, server_thread_index0;
+ u32 session_index0;
session_fifo_event_t *e0;
e0 = &my_fifo_events[i];
f0 = e0->fifo;
- server_session_index0 = f0->server_session_index;
- server_thread_index0 = f0->server_thread_index;
+ session_index0 = f0->server_session_index;
/* $$$ add multiple event queues, per vpp worker thread */
- ASSERT (server_thread_index0 == my_thread_index);
+ ASSERT (f0->server_thread_index == my_thread_index);
- s0 = stream_session_get_if_valid (server_session_index0,
- my_thread_index);
+ s0 = stream_session_get_if_valid (session_index0, my_thread_index);
if (CLIB_DEBUG && !s0)
{
@@ -385,11 +395,20 @@
rv = (smm->session_tx_fns[s0->session_type]) (vm, node, smm, e0, s0,
my_thread_index,
&n_tx_packets);
+ /* Out of buffers */
if (rv < 0)
goto done;
break;
-
+ case FIFO_EVENT_SERVER_EXIT:
+ stream_session_disconnect (s0);
+ break;
+ case FIFO_EVENT_BUILTIN_RX:
+ svm_fifo_unset_event (s0->server_rx_fifo);
+ /* Get session's server */
+ app = application_get (s0->app_index);
+ app->cb_fns.builtin_server_rx_callback (s0);
+ break;
default:
clib_warning ("unhandled event type %d", e0->event_type);
}
@@ -418,6 +437,8 @@
vlib_node_increment_counter (vm, session_queue_node.index,
SESSION_QUEUE_ERROR_TX, n_tx_packets);
+ SESSION_EVT_DBG (SESSION_EVT_DEQ_NODE, 1);
+
return n_tx_packets;
}
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c
index 06e2a09..f10918a 100644
--- a/src/vnet/session/session.c
+++ b/src/vnet/session/session.c
@@ -804,30 +804,36 @@
/* Get session's server */
app = application_get (s->app_index);
- /* Fabricate event */
- evt.fifo = s->server_rx_fifo;
- evt.event_type = FIFO_EVENT_SERVER_RX;
- evt.event_id = serial_number++;
- evt.enqueue_length = svm_fifo_max_dequeue (s->server_rx_fifo);
-
/* Built-in server? Hand event to the callback... */
if (app->cb_fns.builtin_server_rx_callback)
- return app->cb_fns.builtin_server_rx_callback (s, &evt);
+ return app->cb_fns.builtin_server_rx_callback (s);
- /* Add event to server's event queue */
- q = app->event_queue;
+ /* If no event, send one */
+ if (svm_fifo_set_event (s->server_rx_fifo))
+ {
+ /* Fabricate event */
+ evt.fifo = s->server_rx_fifo;
+ evt.event_type = FIFO_EVENT_SERVER_RX;
+ evt.event_id = serial_number++;
- /* Based on request block (or not) for lack of space */
- if (block || PREDICT_TRUE (q->cursize < q->maxsize))
- unix_shared_memory_queue_add (app->event_queue, (u8 *) & evt,
- 0 /* do wait for mutex */ );
- else
- return -1;
+ /* Add event to server's event queue */
+ q = app->event_queue;
+
+ /* Based on request block (or not) for lack of space */
+ if (block || PREDICT_TRUE (q->cursize < q->maxsize))
+ unix_shared_memory_queue_add (app->event_queue, (u8 *) & evt,
+ 0 /* do wait for mutex */ );
+ else
+ {
+ clib_warning ("fifo full");
+ return -1;
+ }
+ }
/* *INDENT-OFF* */
- SESSION_EVT_DBG(s, SESSION_EVT_ENQ, ({
+ SESSION_EVT_DBG(SESSION_EVT_ENQ, s, ({
ed->data[0] = evt.event_id;
- ed->data[1] = evt.enqueue_length;
+ ed->data[1] = svm_fifo_max_dequeue (s->server_rx_fifo);
}));
/* *INDENT-ON* */
@@ -1192,8 +1198,29 @@
void
stream_session_disconnect (stream_session_t * s)
{
+// session_fifo_event_t evt;
+
s->session_state = SESSION_STATE_CLOSED;
+ /* RPC to vpp evt queue in the right thread */
+
tp_vfts[s->session_type].close (s->connection_index, s->thread_index);
+
+// {
+// /* Fabricate event */
+// evt.fifo = s->server_rx_fifo;
+// evt.event_type = FIFO_EVENT_SERVER_RX;
+// evt.event_id = serial_number++;
+//
+// /* Based on request block (or not) for lack of space */
+// if (PREDICT_TRUE(q->cursize < q->maxsize))
+// unix_shared_memory_queue_add (app->event_queue, (u8 *) &evt,
+// 0 /* do wait for mutex */);
+// else
+// {
+// clib_warning("fifo full");
+// return -1;
+// }
+// }
}
/**
diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h
index 96c00d8..a39bc06 100644
--- a/src/vnet/session/session.h
+++ b/src/vnet/session/session.h
@@ -33,6 +33,7 @@
FIFO_EVENT_SERVER_TX,
FIFO_EVENT_TIMEOUT,
FIFO_EVENT_SERVER_EXIT,
+ FIFO_EVENT_BUILTIN_RX
} fifo_event_type_t;
#define foreach_session_input_error \
@@ -91,14 +92,13 @@
SESSION_STATE_N_STATES,
} stream_session_state_t;
-typedef CLIB_PACKED (struct
- {
- svm_fifo_t * fifo;
- u8 event_type;
- /* $$$$ for event logging */
- u16 event_id;
- u32 enqueue_length;
- }) session_fifo_event_t;
+/* *INDENT-OFF* */
+typedef CLIB_PACKED (struct {
+ svm_fifo_t * fifo;
+ u8 event_type;
+ u16 event_id;
+}) session_fifo_event_t;
+/* *INDENT-ON* */
typedef struct _stream_session_t
{
@@ -333,7 +333,7 @@
}
always_inline u32
-stream_session_max_enqueue (transport_connection_t * tc)
+stream_session_max_rx_enqueue (transport_connection_t * tc)
{
stream_session_t *s = stream_session_get (tc->s_index, tc->thread_index);
return svm_fifo_max_enqueue (s->server_rx_fifo);
@@ -346,7 +346,6 @@
return s->server_rx_fifo->nitems;
}
-
int
stream_session_enqueue_data (transport_connection_t * tc, u8 * data, u16 len,
u8 queue_event);
diff --git a/src/vnet/session/session_cli.c b/src/vnet/session/session_cli.c
index b029ee6..38762af 100644
--- a/src/vnet/session/session_cli.c
+++ b/src/vnet/session/session_cli.c
@@ -107,7 +107,7 @@
{
if (once_per_pool)
{
- str = format (str, "%-40s%-20s%-20s%-15s",
+ str = format (str, "%-50s%-20s%-20s%-15s",
"Connection", "Rx fifo", "Tx fifo",
"Session Index");
vlib_cli_output (vm, "%v", str);
diff --git a/src/vnet/session/session_debug.h b/src/vnet/session/session_debug.h
index 858f12e..80a97cd 100644
--- a/src/vnet/session/session_debug.h
+++ b/src/vnet/session/session_debug.h
@@ -21,7 +21,8 @@
#define foreach_session_dbg_evt \
_(ENQ, "enqueue") \
- _(DEQ, "dequeue")
+ _(DEQ, "dequeue") \
+ _(DEQ_NODE, "dequeue")
typedef enum _session_evt_dbg
{
@@ -30,7 +31,10 @@
#undef _
} session_evt_dbg_e;
-#if TRANSPORT_DEBUG
+#define SESSION_DBG (0)
+#define SESSION_DEQ_NODE_EVTS (0)
+
+#if TRANSPORT_DEBUG && SESSION_DBG
#define DEC_SESSION_ETD(_s, _e, _size) \
struct \
@@ -44,6 +48,12 @@
ed = ELOG_TRACK_DATA (&vlib_global_main.elog_main, \
_e, _tc->elog_track)
+#define DEC_SESSION_ED(_e, _size) \
+ struct \
+ { \
+ u32 data[_size]; \
+ } * ed; \
+ ed = ELOG_DATA (&vlib_global_main.elog_main, _e)
#define SESSION_EVT_DEQ_HANDLER(_s, _body) \
{ \
@@ -67,13 +77,33 @@
do { _body; } while (0); \
}
+#if SESSION_DEQ_NODE_EVTS
+#define SESSION_EVT_DEQ_NODE_HANDLER(_node_evt) \
+{ \
+ ELOG_TYPE_DECLARE (_e) = \
+ { \
+ .format = "deq-node: %s", \
+ .format_args = "t4", \
+ .n_enum_strings = 2, \
+ .enum_strings = { \
+ "start", \
+ "end", \
+ }, \
+ }; \
+ DEC_SESSION_ED(_e, 1); \
+ ed->data[0] = _node_evt; \
+}
+#else
+#define SESSION_EVT_DEQ_NODE_HANDLER(_node_evt)
+#endif
+
#define CONCAT_HELPER(_a, _b) _a##_b
#define CC(_a, _b) CONCAT_HELPER(_a, _b)
-#define SESSION_EVT_DBG(_s, _evt, _body) CC(_evt, _HANDLER)(_s, _body)
+#define SESSION_EVT_DBG(_evt, _args...) CC(_evt, _HANDLER)(_args)
#else
-#define SESSION_EVT_DBG(_s, _evt, _body)
+#define SESSION_EVT_DBG(_evt, _args...)
#endif
#endif /* SRC_VNET_SESSION_SESSION_DEBUG_H_ */
diff --git a/src/vnet/session/transport.h b/src/vnet/session/transport.h
index 421121d..2f912cb 100644
--- a/src/vnet/session/transport.h
+++ b/src/vnet/session/transport.h
@@ -38,7 +38,7 @@
u32 thread_index; /**< Worker-thread index */
#if TRANSPORT_DEBUG
- elog_track_t elog_track; /**< Debug purposes */
+ elog_track_t elog_track; /**< Event logging */
#endif
/** Macros for 'derived classes' where base is named "connection" */
diff --git a/src/vnet/tcp/builtin_client.c b/src/vnet/tcp/builtin_client.c
new file mode 100644
index 0000000..a6eeb77
--- /dev/null
+++ b/src/vnet/tcp/builtin_client.c
@@ -0,0 +1,411 @@
+/*
+ * builtin_client.c - vpp built-in tcp client/connect code
+ *
+ * Copyright (c) 2017 by Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <vnet/vnet.h>
+#include <vnet/plugin/plugin.h>
+#include <vnet/tcp/builtin_client.h>
+
+#include <vlibapi/api.h>
+#include <vlibmemory/api.h>
+#include <vlibsocket/api.h>
+#include <vpp/app/version.h>
+
+/* define message IDs */
+#include <vpp/api/vpe_msg_enum.h>
+
+/* define message structures */
+#define vl_typedefs
+#include <vpp/api/vpe_all_api_h.h>
+#undef vl_typedefs
+
+/* define generated endian-swappers */
+#define vl_endianfun
+#include <vpp/api/vpe_all_api_h.h>
+#undef vl_endianfun
+
+/* instantiate all the print functions we know about */
+#define vl_print(handle, ...) vlib_cli_output (handle, __VA_ARGS__)
+#define vl_printfun
+#include <vpp/api/vpe_all_api_h.h>
+#undef vl_printfun
+
+static void
+send_test_chunk (tclient_main_t * tm, session_t * s)
+{
+ u8 *test_data = tm->connect_test_data;
+ int test_buf_offset = 0;
+ u32 bytes_this_chunk;
+ session_fifo_event_t evt;
+ static int serial_number = 0;
+ int rv;
+
+ while (s->bytes_to_send > 0)
+ {
+ bytes_this_chunk = vec_len (test_data) < s->bytes_to_send
+ ? vec_len (test_data) : s->bytes_to_send;
+
+ rv = svm_fifo_enqueue_nowait (s->server_tx_fifo, 0 /*pid */ ,
+ bytes_this_chunk,
+ test_data + test_buf_offset);
+
+ if (rv > 0)
+ {
+ s->bytes_to_send -= rv;
+ test_buf_offset += rv;
+
+ if (svm_fifo_set_event (s->server_tx_fifo))
+ {
+ /* Fabricate TX event, send to vpp */
+ evt.fifo = s->server_tx_fifo;
+ evt.event_type = FIFO_EVENT_SERVER_TX;
+ evt.event_id = serial_number++;
+
+ unix_shared_memory_queue_add (tm->vpp_event_queue, (u8 *) & evt,
+ 0 /* do wait for mutex */ );
+ }
+ }
+ }
+}
+
+static void
+receive_test_chunk (tclient_main_t * tm, session_t * s)
+{
+ svm_fifo_t *rx_fifo = s->server_rx_fifo;
+ int n_read, bytes, i;
+
+ bytes = svm_fifo_max_dequeue (rx_fifo);
+ /* Allow enqueuing of new event */
+ svm_fifo_unset_event (rx_fifo);
+
+ /* Read the bytes */
+ do
+ {
+ n_read = svm_fifo_dequeue_nowait (rx_fifo, 0, vec_len (tm->rx_buf),
+ tm->rx_buf);
+ if (n_read > 0)
+ {
+ bytes -= n_read;
+ for (i = 0; i < n_read; i++)
+ {
+ if (tm->rx_buf[i] != ((s->bytes_received + i) & 0xff))
+ {
+ clib_warning ("read %d error at byte %lld, 0x%x not 0x%x",
+ n_read, s->bytes_received + i,
+ tm->rx_buf[i],
+ ((s->bytes_received + i) & 0xff));
+ }
+ }
+ s->bytes_to_receive -= n_read;
+ s->bytes_received += n_read;
+ }
+
+ }
+ while (n_read < 0 || bytes > 0);
+}
+
+static void *
+tclient_thread_fn (void *arg)
+{
+ tclient_main_t *tm = &tclient_main;
+ vl_api_disconnect_session_t *dmp;
+ session_t *sp;
+ struct timespec ts, tsrem;
+ int i;
+ int try_tx, try_rx;
+ u32 *session_indices = 0;
+
+ /* stats thread wants no signals. */
+ {
+ sigset_t s;
+ sigfillset (&s);
+ pthread_sigmask (SIG_SETMASK, &s, 0);
+ }
+
+ while (1)
+ {
+ /* Wait until we're told to get busy */
+ while (tm->run_test == 0
+ || (tm->ready_connections != tm->expected_connections))
+ {
+ ts.tv_sec = 0;
+ ts.tv_nsec = 100000000;
+ while (nanosleep (&ts, &tsrem) < 0)
+ ts = tsrem;
+ }
+ tm->run_test = 0;
+
+ clib_warning ("Run %d iterations", tm->n_iterations);
+
+ for (i = 0; i < tm->n_iterations; i++)
+ {
+ session_t *sp;
+
+ do
+ {
+ try_tx = try_rx = 0;
+
+ /* *INDENT-OFF* */
+ pool_foreach (sp, tm->sessions, ({
+ if (sp->bytes_to_send > 0)
+ {
+ send_test_chunk (tm, sp);
+ try_tx = 1;
+ }
+ }));
+ pool_foreach (sp, tm->sessions, ({
+ if (sp->bytes_to_receive > 0)
+ {
+ receive_test_chunk (tm, sp);
+ try_rx = 1;
+ }
+ }));
+ /* *INDENT-ON* */
+
+ }
+ while (try_tx || try_rx);
+ }
+ clib_warning ("Done %d iterations", tm->n_iterations);
+
+ /* Disconnect sessions... */
+ vec_reset_length (session_indices);
+ pool_foreach (sp, tm->sessions, (
+ {
+ vec_add1 (session_indices,
+ sp - tm->sessions);
+ }
+ ));
+
+ for (i = 0; i < vec_len (session_indices); i++)
+ {
+ sp = pool_elt_at_index (tm->sessions, session_indices[i]);
+ dmp = vl_msg_api_alloc_as_if_client (sizeof (*dmp));
+ memset (dmp, 0, sizeof (*dmp));
+ dmp->_vl_msg_id = ntohs (VL_API_DISCONNECT_SESSION);
+ dmp->client_index = tm->my_client_index;
+ dmp->session_index = sp->vpp_session_index;
+ dmp->session_thread_index = sp->vpp_session_thread;
+ vl_msg_api_send_shmem (tm->vl_input_queue, (u8 *) & dmp);
+ pool_put (tm->sessions, sp);
+ }
+ }
+ /* NOTREACHED */
+ return 0;
+}
+
+/* So we don't get "no handler for... " msgs */
+static void
+vl_api_memclnt_create_reply_t_handler (vl_api_memclnt_create_reply_t * mp)
+{
+ tclient_main_t *tm = &tclient_main;
+
+ tm->my_client_index = mp->index;
+}
+
+static void
+vl_api_connect_uri_reply_t_handler (vl_api_connect_uri_reply_t * mp)
+{
+ tclient_main_t *tm = &tclient_main;
+ session_t *session;
+ u32 session_index;
+ u64 key;
+ i32 retval = /* clib_net_to_host_u32 ( */ mp->retval /*) */ ;
+
+ if (retval < 0)
+ {
+ clib_warning ("connection failed: retval %d", retval);
+ return;
+ }
+
+ tm->our_event_queue = (unix_shared_memory_queue_t *)
+ mp->vpp_event_queue_address;
+
+ tm->vpp_event_queue = (unix_shared_memory_queue_t *)
+ mp->vpp_event_queue_address;
+
+ /*
+ * Setup session
+ */
+ pool_get (tm->sessions, session);
+ memset (session, 0, sizeof (*session));
+ session_index = session - tm->sessions;
+ session->bytes_to_receive = session->bytes_to_send = tm->bytes_to_send;
+
+ session->server_rx_fifo = (svm_fifo_t *) mp->server_rx_fifo;
+ session->server_rx_fifo->client_session_index = session_index;
+ session->server_tx_fifo = (svm_fifo_t *) mp->server_tx_fifo;
+ session->server_tx_fifo->client_session_index = session_index;
+
+ session->vpp_session_index = mp->session_index;
+ session->vpp_session_thread = mp->session_thread_index;
+
+ /* Add it to the session lookup table */
+ key = (((u64) mp->session_thread_index) << 32) | (u64) mp->session_index;
+ hash_set (tm->session_index_by_vpp_handles, key, session_index);
+
+ tm->ready_connections++;
+}
+
+static void
+create_api_loopback (tclient_main_t * tm)
+{
+ vl_api_memclnt_create_t _m, *mp = &_m;
+ extern void vl_api_memclnt_create_t_handler (vl_api_memclnt_create_t *);
+ api_main_t *am = &api_main;
+ vl_shmem_hdr_t *shmem_hdr;
+
+ /*
+ * Create a "loopback" API client connection
+ * Don't do things like this unless you know what you're doing...
+ */
+
+ shmem_hdr = am->shmem_hdr;
+ tm->vl_input_queue = shmem_hdr->vl_input_queue;
+ memset (mp, 0, sizeof (*mp));
+ mp->_vl_msg_id = VL_API_MEMCLNT_CREATE;
+ mp->context = 0xFEEDFACE;
+ mp->input_queue = (u64) tm->vl_input_queue;
+ strncpy ((char *) mp->name, "tcp_tester", sizeof (mp->name) - 1);
+
+ vl_api_memclnt_create_t_handler (mp);
+}
+
+#define foreach_tclient_static_api_msg \
+_(MEMCLNT_CREATE_REPLY, memclnt_create_reply) \
+_(CONNECT_URI_REPLY, connect_uri_reply)
+
+static clib_error_t *
+tclient_api_hookup (vlib_main_t * vm)
+{
+ tclient_main_t *tm = &tclient_main;
+ vl_msg_api_msg_config_t _c, *c = &_c;
+ int i;
+
+ /* Init test data */
+ vec_validate (tm->connect_test_data, 64 * 1024 - 1);
+ for (i = 0; i < vec_len (tm->connect_test_data); i++)
+ tm->connect_test_data[i] = i & 0xff;
+
+ tm->session_index_by_vpp_handles = hash_create (0, sizeof (uword));
+ vec_validate (tm->rx_buf, vec_len (tm->connect_test_data) - 1);
+
+ /* Hook up client-side static APIs to our handlers */
+#define _(N,n) do { \
+ c->id = VL_API_##N; \
+ c->name = #n; \
+ c->handler = vl_api_##n##_t_handler; \
+ c->cleanup = vl_noop_handler; \
+ c->endian = vl_api_##n##_t_endian; \
+ c->print = vl_api_##n##_t_print; \
+ c->size = sizeof(vl_api_##n##_t); \
+ c->traced = 1; /* trace, so these msgs print */ \
+ c->replay = 0; /* don't replay client create/delete msgs */ \
+ c->message_bounce = 0; /* don't bounce this message */ \
+ vl_msg_api_config(c);} while (0);
+
+ foreach_tclient_static_api_msg;
+#undef _
+
+ return 0;
+}
+
+VLIB_API_INIT_FUNCTION (tclient_api_hookup);
+
+static clib_error_t *
+test_tcp_clients_command_fn (vlib_main_t * vm,
+ unformat_input_t * input,
+ vlib_cli_command_t * cmd)
+{
+ u8 *connect_uri = (u8 *) "tcp://6.0.1.2/1234";
+ u8 *uri;
+ tclient_main_t *tm = &tclient_main;
+ int i;
+ u32 n_clients = 1;
+
+ tm->bytes_to_send = 8192;
+ tm->n_iterations = 1;
+ vec_free (tm->connect_uri);
+
+ while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
+ {
+ if (unformat (input, "nclients %d", &n_clients))
+ ;
+ else if (unformat (input, "iterations %d", &tm->n_iterations))
+ ;
+ else if (unformat (input, "bytes %d", &tm->bytes_to_send))
+ ;
+ else if (unformat (input, "uri %s", &tm->connect_uri))
+ ;
+ else
+ return clib_error_return (0, "unknown input `%U'",
+ format_unformat_error, input);
+ }
+
+ tm->ready_connections = 0;
+ tm->expected_connections = n_clients;
+ uri = connect_uri;
+ if (tm->connect_uri)
+ uri = tm->connect_uri;
+
+ create_api_loopback (tm);
+
+ /* Start a transmit thread */
+ if (tm->client_thread_handle == 0)
+ {
+ int rv = pthread_create (&tm->client_thread_handle,
+ NULL /*attr */ , tclient_thread_fn, 0);
+ if (rv)
+ {
+ tm->client_thread_handle = 0;
+ return clib_error_return (0, "pthread_create returned %d", rv);
+ }
+ }
+
+ /* Fire off connect requests, in something approaching a normal manner */
+ for (i = 0; i < n_clients; i++)
+ {
+ vl_api_connect_uri_t *cmp;
+ cmp = vl_msg_api_alloc_as_if_client (sizeof (*cmp));
+ memset (cmp, 0, sizeof (*cmp));
+
+ cmp->_vl_msg_id = ntohs (VL_API_CONNECT_URI);
+ cmp->client_index = tm->my_client_index;
+ cmp->context = ntohl (0xfeedface);
+ memcpy (cmp->uri, uri, strlen ((char *) uri) + 1);
+ vl_msg_api_send_shmem (tm->vl_input_queue, (u8 *) & cmp);
+ }
+
+ tm->run_test = 1;
+
+ return 0;
+}
+
+/* *INDENT-OFF* */
+VLIB_CLI_COMMAND (test_clients_command, static) =
+{
+ .path = "test tcp clients",
+ .short_help = "test tcp clients",
+ .function = test_tcp_clients_command_fn,
+};
+/* *INDENT-ON* */
+
+/*
+ * fd.io coding-style-patch-verification: ON
+ *
+ * Local Variables:
+ * eval: (c-set-style "gnu")
+ * End:
+ */
diff --git a/src/vnet/tcp/builtin_client.h b/src/vnet/tcp/builtin_client.h
new file mode 100644
index 0000000..6403030
--- /dev/null
+++ b/src/vnet/tcp/builtin_client.h
@@ -0,0 +1,131 @@
+
+/*
+ * tclient.h - skeleton vpp engine plug-in header file
+ *
+ * Copyright (c) <current-year> <your-organization>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __included_tclient_h__
+#define __included_tclient_h__
+
+#include <vnet/vnet.h>
+#include <vnet/ip/ip.h>
+#include <vnet/ethernet/ethernet.h>
+
+#include <vppinfra/hash.h>
+#include <vppinfra/error.h>
+#include <vlibmemory/unix_shared_memory_queue.h>
+#include <svm/svm_fifo_segment.h>
+#include <vnet/session/session.h>
+#include <vnet/session/application_interface.h>
+
+typedef struct
+{
+ u32 bytes_to_send;
+ u32 bytes_sent;
+ u32 bytes_to_receive;
+ u32 bytes_received;
+
+ svm_fifo_t *server_rx_fifo;
+ svm_fifo_t *server_tx_fifo;
+
+ u32 vpp_session_index;
+ u32 vpp_session_thread;
+} session_t;
+
+typedef struct
+{
+ /* API message ID base */
+ u16 msg_id_base;
+
+ /* vpe input queue */
+ unix_shared_memory_queue_t *vl_input_queue;
+
+ /* API client handle */
+ u32 my_client_index;
+
+ /* The URI we're playing with */
+ u8 *uri;
+
+ /* Session pool */
+ session_t *sessions;
+
+ /* Hash table for disconnect processing */
+ uword *session_index_by_vpp_handles;
+
+ /* intermediate rx buffer */
+ u8 *rx_buf;
+
+ /* URI for slave's connect */
+ u8 *connect_uri;
+
+ u32 connected_session_index;
+
+ int i_am_master;
+
+ /* drop all packets */
+ int drop_packets;
+
+ /* Our event queue */
+ unix_shared_memory_queue_t *our_event_queue;
+
+ /* $$$ single thread only for the moment */
+ unix_shared_memory_queue_t *vpp_event_queue;
+
+ pid_t my_pid;
+
+ /* For deadman timers */
+ clib_time_t clib_time;
+
+ /* Connection counts */
+ u32 expected_connections;
+ volatile u32 ready_connections;
+
+ /* Signal variables */
+ volatile int run_test;
+
+ /* Number of iterations */
+ int n_iterations;
+
+ /* Bytes to send */
+ u32 bytes_to_send;
+
+ u32 configured_segment_size;
+
+ /* VNET_API_ERROR_FOO -> "Foo" hash table */
+ uword *error_string_by_error_number;
+
+ u8 *connect_test_data;
+ pthread_t client_thread_handle;
+ u32 client_bytes_received;
+ u8 test_return_packets;
+
+ /* convenience */
+ vlib_main_t *vlib_main;
+ vnet_main_t *vnet_main;
+ ethernet_main_t *ethernet_main;
+} tclient_main_t;
+
+tclient_main_t tclient_main;
+
+vlib_node_registration_t tclient_node;
+
+#endif /* __included_tclient_h__ */
+
+/*
+ * fd.io coding-style-patch-verification: ON
+ *
+ * Local Variables:
+ * eval: (c-set-style "gnu")
+ * End:
+ */
diff --git a/src/vnet/tcp/builtin_server.c b/src/vnet/tcp/builtin_server.c
index dd6759c..efd26e9 100644
--- a/src/vnet/tcp/builtin_server.c
+++ b/src/vnet/tcp/builtin_server.c
@@ -22,6 +22,7 @@
{
u8 *rx_buf;
unix_shared_memory_queue_t **vpp_queue;
+ u32 byte_index;
vlib_main_t *vlib_main;
} builtin_server_main_t;
@@ -37,6 +38,7 @@
bsm->vpp_queue[s->thread_index] =
session_manager_get_vpp_event_queue (s->thread_index);
s->session_state = SESSION_STATE_READY;
+ bsm->byte_index = 0;
return 0;
}
@@ -80,57 +82,94 @@
return -1;
}
-int
-builtin_server_rx_callback (stream_session_t * s, session_fifo_event_t * e)
+void
+test_bytes (builtin_server_main_t * bsm, int actual_transfer)
{
- int n_written, bytes, total_copy_bytes;
- int n_read;
- svm_fifo_t *tx_fifo;
+ int i;
+
+ for (i = 0; i < actual_transfer; i++)
+ {
+ if (bsm->rx_buf[i] != ((bsm->byte_index + i) & 0xff))
+ {
+ clib_warning ("at %d expected %d got %d", bsm->byte_index + i,
+ (bsm->byte_index + i) & 0xff, bsm->rx_buf[i]);
+ }
+ }
+ bsm->byte_index += actual_transfer;
+}
+
+int
+builtin_server_rx_callback (stream_session_t * s)
+{
+ u32 n_written, max_dequeue, max_enqueue, max_transfer;
+ int actual_transfer;
+ svm_fifo_t *tx_fifo, *rx_fifo;
builtin_server_main_t *bsm = &builtin_server_main;
session_fifo_event_t evt;
static int serial_number = 0;
- bytes = e->enqueue_length;
- if (PREDICT_FALSE (bytes <= 0))
+ max_dequeue = svm_fifo_max_dequeue (s->server_rx_fifo);
+ max_enqueue = svm_fifo_max_enqueue (s->server_tx_fifo);
+
+ if (PREDICT_FALSE (max_dequeue == 0))
{
- clib_warning ("bizarre rx callback: bytes %d", bytes);
return 0;
}
tx_fifo = s->server_tx_fifo;
+ rx_fifo = s->server_rx_fifo;
/* Number of bytes we're going to copy */
- total_copy_bytes = (bytes < (tx_fifo->nitems - tx_fifo->cursize)) ? bytes :
- tx_fifo->nitems - tx_fifo->cursize;
+ max_transfer = (max_dequeue < max_enqueue) ? max_dequeue : max_enqueue;
- if (PREDICT_FALSE (total_copy_bytes <= 0))
+ /* No space in tx fifo */
+ if (PREDICT_FALSE (max_transfer == 0))
{
- clib_warning ("no space in tx fifo, event had %d bytes", bytes);
+ /* XXX timeout for session that are stuck */
+
+ /* Program self-tap to retry */
+ if (svm_fifo_set_event (rx_fifo))
+ {
+ evt.fifo = rx_fifo;
+ evt.event_type = FIFO_EVENT_BUILTIN_RX;
+ evt.event_id = 0;
+ unix_shared_memory_queue_add (bsm->vpp_queue[s->thread_index],
+ (u8 *) & evt,
+ 0 /* do wait for mutex */ );
+ }
+
return 0;
}
- vec_validate (bsm->rx_buf, total_copy_bytes - 1);
- _vec_len (bsm->rx_buf) = total_copy_bytes;
+ svm_fifo_unset_event (rx_fifo);
- n_read = svm_fifo_dequeue_nowait (s->server_rx_fifo, 0, total_copy_bytes,
- bsm->rx_buf);
- ASSERT (n_read == total_copy_bytes);
+ vec_validate (bsm->rx_buf, max_transfer - 1);
+ _vec_len (bsm->rx_buf) = max_transfer;
+
+ actual_transfer = svm_fifo_dequeue_nowait (rx_fifo, 0, max_transfer,
+ bsm->rx_buf);
+ ASSERT (actual_transfer == max_transfer);
+
+// test_bytes (bsm, actual_transfer);
/*
* Echo back
*/
- n_written = svm_fifo_enqueue_nowait (tx_fifo, 0, n_read, bsm->rx_buf);
- ASSERT (n_written == total_copy_bytes);
+ n_written =
+ svm_fifo_enqueue_nowait (tx_fifo, 0, actual_transfer, bsm->rx_buf);
+ ASSERT (n_written == max_transfer);
- /* Fabricate TX event, send to vpp */
- evt.fifo = tx_fifo;
- evt.event_type = FIFO_EVENT_SERVER_TX;
- evt.enqueue_length = total_copy_bytes;
- evt.event_id = serial_number++;
+ if (svm_fifo_set_event (tx_fifo))
+ {
+ /* Fabricate TX event, send to vpp */
+ evt.fifo = tx_fifo;
+ evt.event_type = FIFO_EVENT_SERVER_TX;
+ evt.event_id = serial_number++;
- unix_shared_memory_queue_add (bsm->vpp_queue[s->thread_index], (u8 *) & evt,
- 0 /* do wait for mutex */ );
+ unix_shared_memory_queue_add (bsm->vpp_queue[s->thread_index],
+ (u8 *) & evt, 0 /* do wait for mutex */ );
+ }
return 0;
}
@@ -164,7 +203,7 @@
a->api_client_index = ~0;
a->session_cb_vft = &builtin_session_cb_vft;
a->options = options;
- a->options[SESSION_OPTIONS_SEGMENT_SIZE] = 256 << 10;
+ a->options[SESSION_OPTIONS_SEGMENT_SIZE] = 128 << 20;
a->options[SESSION_OPTIONS_RX_FIFO_SIZE] = 64 << 10;
a->options[SESSION_OPTIONS_TX_FIFO_SIZE] = 64 << 10;
a->segment_name = segment_name;
diff --git a/src/vnet/tcp/tcp.c b/src/vnet/tcp/tcp.c
index 0d2e6d0..c3df5bc 100644
--- a/src/vnet/tcp/tcp.c
+++ b/src/vnet/tcp/tcp.c
@@ -328,7 +328,7 @@
{
tcp_connection_timers_init (tc);
tcp_set_snd_mss (tc);
- tc->sack_sb.head = TCP_INVALID_SACK_HOLE_INDEX;
+ scoreboard_init (&tc->sack_sb);
tcp_cc_init (tc);
}
@@ -558,17 +558,48 @@
return tc->snd_mss;
}
+/**
+ * Compute tx window session is allowed to fill.
+ */
u32
tcp_session_send_space (transport_connection_t * trans_conn)
{
+ u32 snd_space;
tcp_connection_t *tc = (tcp_connection_t *) trans_conn;
- return tcp_available_snd_space (tc);
+
+ /* If we haven't gotten dupacks or if we did and have gotten sacked bytes
+ * then we can still send */
+ if (PREDICT_TRUE (tcp_in_fastrecovery (tc) == 0
+ && (tc->rcv_dupacks == 0
+ || tc->sack_sb.last_sacked_bytes)))
+ {
+ snd_space = tcp_available_snd_space (tc);
+
+ /* If we can't write at least a segment, don't try at all */
+ if (snd_space < tc->snd_mss)
+ return 0;
+ return snd_space;
+ }
+
+ /* If in fast recovery, send 1 SMSS if wnd allows */
+ if (tcp_in_fastrecovery (tc) && tcp_available_snd_space (tc)
+ && tcp_fastrecovery_sent_1_smss (tc))
+ {
+ tcp_fastrecovery_1_smss_on (tc);
+ return tc->snd_mss;
+ }
+
+ return 0;
}
u32
tcp_session_tx_fifo_offset (transport_connection_t * trans_conn)
{
tcp_connection_t *tc = (tcp_connection_t *) trans_conn;
+
+ ASSERT (seq_geq (tc->snd_nxt, tc->snd_una));
+
+ /* This still works if fast retransmit is on */
return (tc->snd_nxt - tc->snd_una);
}
@@ -762,7 +793,7 @@
vec_validate (tm->timer_wheels, num_threads - 1);
tcp_initialize_timer_wheels (tm);
- vec_validate (tm->delack_connections, num_threads - 1);
+// vec_validate (tm->delack_connections, num_threads - 1);
/* Initialize clocks per tick for TCP timestamp. Used to compute
* monotonically increasing timestamps. */
diff --git a/src/vnet/tcp/tcp.h b/src/vnet/tcp/tcp.h
index 082ab1d..b4286bc 100644
--- a/src/vnet/tcp/tcp.h
+++ b/src/vnet/tcp/tcp.h
@@ -30,9 +30,10 @@
#define TCP_PAWS_IDLE 24 * 24 * 60 * 60 * THZ /**< 24 days */
#define TCP_MAX_OPTION_SPACE 40
-#define TCP_DUPACK_THRESHOLD 3
-#define TCP_MAX_RX_FIFO_SIZE 2 << 20
-#define TCP_IW_N_SEGMENTS 10
+#define TCP_DUPACK_THRESHOLD 3
+#define TCP_MAX_RX_FIFO_SIZE 2 << 20
+#define TCP_IW_N_SEGMENTS 10
+#define TCP_ALWAYS_ACK 0 /**< If on, we always ack */
/** TCP FSM state definitions as per RFC793. */
#define foreach_tcp_fsm_state \
@@ -102,13 +103,12 @@
/** TCP connection flags */
#define foreach_tcp_connection_flag \
- _(DELACK, "Delay ACK") \
_(SNDACK, "Send ACK") \
- _(BURSTACK, "Burst ACK set") \
_(FINSNT, "FIN sent") \
_(SENT_RCV_WND0, "Sent 0 receive window") \
_(RECOVERY, "Recovery on") \
- _(FAST_RECOVERY, "Fast Recovery on")
+ _(FAST_RECOVERY, "Fast Recovery on") \
+ _(FR_1_SMSS, "Sent 1 SMSS")
typedef enum _tcp_connection_flag_bits
{
@@ -160,8 +160,12 @@
typedef struct _sack_scoreboard
{
sack_scoreboard_hole_t *holes; /**< Pool of holes */
- u32 head; /**< Index to first entry */
+ u32 head; /**< Index of first entry */
+ u32 tail; /**< Index of last entry */
u32 sacked_bytes; /**< Number of bytes sacked in sb */
+ u32 last_sacked_bytes; /**< Number of bytes last sacked */
+ u32 snd_una_adv; /**< Bytes to add to snd_una */
+ u32 max_byte_sacked; /**< Highest byte acked */
} sack_scoreboard_t;
typedef enum _tcp_cc_algorithm_type
@@ -214,7 +218,7 @@
sack_block_t *snd_sacks; /**< Vector of SACKs to send. XXX Fixed size? */
sack_scoreboard_t sack_sb; /**< SACK "scoreboard" that tracks holes */
- u8 rcv_dupacks; /**< Number of DUPACKs received */
+ u16 rcv_dupacks; /**< Number of DUPACKs received */
u8 snt_dupacks; /**< Number of DUPACKs sent in a burst */
/* Congestion control */
@@ -224,6 +228,7 @@
u32 bytes_acked; /**< Bytes acknowledged by current segment */
u32 rtx_bytes; /**< Retransmitted bytes */
u32 tsecr_last_ack; /**< Timestamp echoed to us in last healthy ACK */
+ u32 snd_congestion; /**< snd_una_max when congestion is detected */
tcp_cc_algorithm_t *cc_algo; /**< Congestion control algorithm */
/* RTT and RTO */
@@ -250,8 +255,10 @@
#define tcp_fastrecovery_off(tc) (tc)->flags &= ~TCP_CONN_FAST_RECOVERY
#define tcp_in_fastrecovery(tc) ((tc)->flags & TCP_CONN_FAST_RECOVERY)
#define tcp_in_recovery(tc) ((tc)->flags & (TCP_CONN_FAST_RECOVERY | TCP_CONN_RECOVERY))
-#define tcp_recovery_off(tc) ((tc)->flags &= ~(TCP_CONN_FAST_RECOVERY | TCP_CONN_RECOVERY))
#define tcp_in_slowstart(tc) (tc->cwnd < tc->ssthresh)
+#define tcp_fastrecovery_sent_1_smss(tc) ((tc)->flags & TCP_CONN_FR_1_SMSS)
+#define tcp_fastrecovery_1_smss_on(tc) ((tc)->flags |= TCP_CONN_FR_1_SMSS)
+#define tcp_fastrecovery_1_smss_off(tc) ((tc)->flags &= ~TCP_CONN_FR_1_SMSS)
typedef enum
{
@@ -293,8 +300,8 @@
/* Per worker-thread timer wheel for connections timers */
tw_timer_wheel_16t_2w_512sl_t *timer_wheels;
- /* Convenience per worker-thread vector of connections to DELACK */
- u32 **delack_connections;
+// /* Convenience per worker-thread vector of connections to DELACK */
+// u32 **delack_connections;
/* Pool of half-open connections on which we've sent a SYN */
tcp_connection_t *half_open_connections;
@@ -397,8 +404,16 @@
always_inline u32
tcp_flight_size (const tcp_connection_t * tc)
{
- return tc->snd_una_max - tc->snd_una - tc->sack_sb.sacked_bytes
- + tc->rtx_bytes;
+ int flight_size;
+
+ flight_size = (int) ((tc->snd_una_max - tc->snd_una) + tc->rtx_bytes)
+ - (tc->rcv_dupacks * tc->snd_mss) /* - tc->sack_sb.sacked_bytes */ ;
+
+ /* Happens if we don't clear sacked bytes */
+ if (flight_size < 0)
+ return 0;
+
+ return flight_size;
}
/**
@@ -439,9 +454,13 @@
return available_wnd - flight_size;
}
+void tcp_update_rcv_wnd (tcp_connection_t * tc);
+
void tcp_retransmit_first_unacked (tcp_connection_t * tc);
void tcp_fast_retransmit (tcp_connection_t * tc);
+void tcp_cc_congestion (tcp_connection_t * tc);
+void tcp_cc_recover (tcp_connection_t * tc);
always_inline u32
tcp_time_now (void)
@@ -453,7 +472,7 @@
u32
tcp_prepare_retransmit_segment (tcp_connection_t * tc, vlib_buffer_t * b,
- u32 max_bytes);
+ u32 offset, u32 max_bytes);
void tcp_connection_timers_init (tcp_connection_t * tc);
void tcp_connection_timers_reset (tcp_connection_t * tc);
@@ -477,14 +496,6 @@
}
always_inline void
-tcp_retransmit_timer_set (tcp_connection_t * tc)
-{
- /* XXX Switch to faster TW */
- tcp_timer_set (tc, TCP_TIMER_RETRANSMIT,
- clib_max (tc->rto * TCP_TO_TIMER_TICK, 1));
-}
-
-always_inline void
tcp_timer_reset (tcp_connection_t * tc, u8 timer_id)
{
if (tc->timers[timer_id] == TCP_TIMER_HANDLE_INVALID)
@@ -506,6 +517,27 @@
tc->c_c_index, timer_id, interval);
}
+/* XXX Switch retransmit to faster TW */
+always_inline void
+tcp_retransmit_timer_set (tcp_connection_t * tc)
+{
+ tcp_timer_set (tc, TCP_TIMER_RETRANSMIT,
+ clib_max (tc->rto * TCP_TO_TIMER_TICK, 1));
+}
+
+always_inline void
+tcp_retransmit_timer_update (tcp_connection_t * tc)
+{
+ tcp_timer_update (tc, TCP_TIMER_RETRANSMIT,
+ clib_max (tc->rto * TCP_TO_TIMER_TICK, 1));
+}
+
+always_inline void
+tcp_retransmit_timer_reset (tcp_connection_t * tc)
+{
+ tcp_timer_reset (tc, TCP_TIMER_RETRANSMIT);
+}
+
always_inline u8
tcp_timer_is_active (tcp_connection_t * tc, tcp_timers_e timer)
{
@@ -517,6 +549,14 @@
sack_scoreboard_hole_t * hole);
always_inline sack_scoreboard_hole_t *
+scoreboard_get_hole (sack_scoreboard_t * sb, u32 index)
+{
+ if (index != TCP_INVALID_SACK_HOLE_INDEX)
+ return pool_elt_at_index (sb->holes, index);
+ return 0;
+}
+
+always_inline sack_scoreboard_hole_t *
scoreboard_next_hole (sack_scoreboard_t * sb, sack_scoreboard_hole_t * hole)
{
if (hole->next != TCP_INVALID_SACK_HOLE_INDEX)
@@ -532,6 +572,14 @@
return 0;
}
+always_inline sack_scoreboard_hole_t *
+scoreboard_last_hole (sack_scoreboard_t * sb)
+{
+ if (sb->tail != TCP_INVALID_SACK_HOLE_INDEX)
+ return pool_elt_at_index (sb->holes, sb->tail);
+ return 0;
+}
+
always_inline void
scoreboard_clear (sack_scoreboard_t * sb)
{
@@ -540,6 +588,10 @@
{
scoreboard_remove_hole (sb, hole);
}
+ sb->sacked_bytes = 0;
+ sb->last_sacked_bytes = 0;
+ sb->snd_una_adv = 0;
+ sb->max_byte_sacked = 0;
}
always_inline u32
@@ -548,6 +600,21 @@
return hole->end - hole->start;
}
+always_inline u32
+scoreboard_hole_index (sack_scoreboard_t * sb, sack_scoreboard_hole_t * hole)
+{
+ return hole - sb->holes;
+}
+
+always_inline void
+scoreboard_init (sack_scoreboard_t * sb)
+{
+ sb->head = TCP_INVALID_SACK_HOLE_INDEX;
+ sb->tail = TCP_INVALID_SACK_HOLE_INDEX;
+}
+
+void tcp_rcv_sacks (tcp_connection_t * tc, u32 ack);
+
always_inline void
tcp_cc_algo_register (tcp_cc_algorithm_type_e type,
const tcp_cc_algorithm_t * vft)
diff --git a/src/vnet/tcp/tcp_debug.h b/src/vnet/tcp/tcp_debug.h
index 069c512..5a71694 100644
--- a/src/vnet/tcp/tcp_debug.h
+++ b/src/vnet/tcp/tcp_debug.h
@@ -19,6 +19,8 @@
#include <vlib/vlib.h>
#define TCP_DEBUG (1)
+#define TCP_DEBUG_CC (1)
+#define TCP_DEBUG_VERBOSE (0)
#define foreach_tcp_dbg_evt \
_(INIT, "") \
@@ -30,14 +32,24 @@
_(DELETE, "delete") \
_(SYN_SENT, "SYN sent") \
_(FIN_SENT, "FIN sent") \
+ _(ACK_SENT, "ACK sent") \
+ _(DUPACK_SENT, "DUPACK sent") \
_(RST_SENT, "RST sent") \
_(SYN_RCVD, "SYN rcvd") \
_(ACK_RCVD, "ACK rcvd") \
+ _(DUPACK_RCVD, "DUPACK rcvd") \
_(FIN_RCVD, "FIN rcvd") \
_(RST_RCVD, "RST rcvd") \
_(PKTIZE, "packetize") \
_(INPUT, "in") \
- _(TIMER_POP, "timer pop")
+ _(SND_WND, "snd_wnd update") \
+ _(OUTPUT, "output") \
+ _(TIMER_POP, "timer pop") \
+ _(CC_RTX, "retransmit") \
+ _(CC_EVT, "cc event") \
+ _(CC_PACK, "cc partial ack") \
+ _(SEG_INVALID, "invalid segment") \
+ _(ACK_RCV_ERR, "invalid ack") \
typedef enum _tcp_dbg
{
@@ -73,10 +85,10 @@
ed = ELOG_TRACK_DATA (&vlib_global_main.elog_main, \
_e, _tc->c_elog_track)
-#define TCP_EVT_INIT_HANDLER(_tc, ...) \
+#define TCP_EVT_INIT_HANDLER(_tc, _fmt, ...) \
{ \
_tc->c_elog_track.name = \
- (char *) format (0, "%d%c", _tc->c_c_index, 0); \
+ (char *) format (0, _fmt, _tc->c_c_index, 0); \
elog_track_register (&vlib_global_main.elog_main, &_tc->c_elog_track);\
}
@@ -87,7 +99,7 @@
#define TCP_EVT_OPEN_HANDLER(_tc, ...) \
{ \
- TCP_EVT_INIT_HANDLER(_tc); \
+ TCP_EVT_INIT_HANDLER(_tc, "s%d%c"); \
ELOG_TYPE_DECLARE (_e) = \
{ \
.format = "open: index %d", \
@@ -110,7 +122,7 @@
#define TCP_EVT_BIND_HANDLER(_tc, ...) \
{ \
- TCP_EVT_INIT_HANDLER(_tc); \
+ TCP_EVT_INIT_HANDLER(_tc, "l%d%c"); \
ELOG_TYPE_DECLARE (_e) = \
{ \
.format = "bind: listener %d", \
@@ -138,16 +150,44 @@
.format = "delete: %d", \
.format_args = "i4", \
}; \
- DECLARE_ETD(_tc, _e, 0); \
+ DECLARE_ETD(_tc, _e, 1); \
ed->data[0] = _tc->c_c_index; \
TCP_EVT_DEALLOC_HANDLER(_tc); \
}
+#define TCP_EVT_ACK_SENT_HANDLER(_tc, ...) \
+{ \
+ ELOG_TYPE_DECLARE (_e) = \
+ { \
+ .format = "ack_prep: acked %u rcv_nxt %u rcv_wnd %u snd_nxt %u", \
+ .format_args = "i4i4i4i4", \
+ }; \
+ DECLARE_ETD(_tc, _e, 4); \
+ ed->data[0] = _tc->rcv_nxt - _tc->rcv_las; \
+ ed->data[1] = _tc->rcv_nxt - _tc->irs; \
+ ed->data[2] = _tc->rcv_wnd; \
+ ed->data[3] = _tc->snd_nxt - _tc->iss; \
+}
+
+#define TCP_EVT_DUPACK_SENT_HANDLER(_tc, ...) \
+{ \
+ ELOG_TYPE_DECLARE (_e) = \
+ { \
+ .format = "dack_tx: rcv_nxt %u rcv_wnd %u snd_nxt %u av-wnd %u", \
+ .format_args = "i4i4i4i4", \
+ }; \
+ DECLARE_ETD(_tc, _e, 4); \
+ ed->data[0] = _tc->rcv_nxt - _tc->irs; \
+ ed->data[1] = _tc->rcv_wnd; \
+ ed->data[2] = _tc->snd_nxt - _tc->iss; \
+ ed->data[3] = tcp_available_wnd(_tc); \
+}
+
#define TCP_EVT_SYN_SENT_HANDLER(_tc, ...) \
{ \
ELOG_TYPE_DECLARE (_e) = \
{ \
- .format = "SYN: iss %d", \
+ .format = "SYNtx: iss %u", \
.format_args = "i4", \
}; \
DECLARE_ETD(_tc, _e, 1); \
@@ -158,7 +198,7 @@
{ \
ELOG_TYPE_DECLARE (_e) = \
{ \
- .format = "FIN: snd_nxt %d rcv_nxt %d", \
+ .format = "FINtx: snd_nxt %d rcv_nxt %d", \
.format_args = "i4i4", \
}; \
DECLARE_ETD(_tc, _e, 2); \
@@ -170,7 +210,7 @@
{ \
ELOG_TYPE_DECLARE (_e) = \
{ \
- .format = "RST: snd_nxt %d rcv_nxt %d", \
+ .format = "RSTtx: snd_nxt %d rcv_nxt %d", \
.format_args = "i4i4", \
}; \
DECLARE_ETD(_tc, _e, 2); \
@@ -180,10 +220,10 @@
#define TCP_EVT_SYN_RCVD_HANDLER(_tc, ...) \
{ \
- TCP_EVT_INIT_HANDLER(_tc); \
+ TCP_EVT_INIT_HANDLER(_tc, "s%d%c"); \
ELOG_TYPE_DECLARE (_e) = \
{ \
- .format = "SYN rcvd: irs %d", \
+ .format = "SYNrx: irs %u", \
.format_args = "i4", \
}; \
DECLARE_ETD(_tc, _e, 1); \
@@ -194,7 +234,7 @@
{ \
ELOG_TYPE_DECLARE (_e) = \
{ \
- .format = "FIN rcvd: snd_nxt %d rcv_nxt %d", \
+ .format = "FINrx: snd_nxt %d rcv_nxt %d", \
.format_args = "i4i4", \
}; \
DECLARE_ETD(_tc, _e, 2); \
@@ -206,7 +246,7 @@
{ \
ELOG_TYPE_DECLARE (_e) = \
{ \
- .format = "RST rcvd: snd_nxt %d rcv_nxt %d", \
+ .format = "RSTrx: snd_nxt %d rcv_nxt %d", \
.format_args = "i4i4", \
}; \
DECLARE_ETD(_tc, _e, 2); \
@@ -214,54 +254,68 @@
ed->data[1] = _tc->rcv_nxt - _tc->irs; \
}
-#define TCP_EVT_ACK_RCVD_HANDLER(_tc, ...) \
+#define TCP_EVT_ACK_RCVD_HANDLER(_tc, _ack, ...) \
{ \
ELOG_TYPE_DECLARE (_e) = \
{ \
- .format = "ACK: acked %u cwnd %u inflight %u", \
- .format_args = "i4i4i4", \
+ .format = "acked: %u snd_una %u ack %u cwnd %u inflight %u", \
+ .format_args = "i4i4i4i4i4", \
}; \
- DECLARE_ETD(_tc, _e, 3); \
+ DECLARE_ETD(_tc, _e, 5); \
ed->data[0] = _tc->bytes_acked; \
+ ed->data[1] = _tc->snd_una - _tc->iss; \
+ ed->data[2] = _ack - _tc->iss; \
+ ed->data[3] = _tc->cwnd; \
+ ed->data[4] = tcp_flight_size(_tc); \
+}
+
+#define TCP_EVT_DUPACK_RCVD_HANDLER(_tc, ...) \
+{ \
+ ELOG_TYPE_DECLARE (_e) = \
+ { \
+ .format = "dack_rx: snd_una %u cwnd %u snd_wnd %u inflight %u", \
+ .format_args = "i4i4i4i4", \
+ }; \
+ DECLARE_ETD(_tc, _e, 4); \
+ ed->data[0] = _tc->snd_una - _tc->iss; \
ed->data[1] = _tc->cwnd; \
- ed->data[2] = tcp_flight_size(_tc); \
+ ed->data[2] = _tc->snd_wnd; \
+ ed->data[3] = tcp_flight_size(_tc); \
}
#define TCP_EVT_PKTIZE_HANDLER(_tc, ...) \
{ \
ELOG_TYPE_DECLARE (_e) = \
{ \
- .format = "pktize: snd_una %u snd_nxt %u una_max %u", \
- .format_args = "i4i4i4", \
+ .format = "pktize: una %u snd_nxt %u space %u flight %u rcv_wnd %u",\
+ .format_args = "i4i4i4i4i4", \
}; \
- DECLARE_ETD(_tc, _e, 3); \
+ DECLARE_ETD(_tc, _e, 5); \
ed->data[0] = _tc->snd_una - _tc->iss; \
ed->data[1] = _tc->snd_nxt - _tc->iss; \
- ed->data[2] = _tc->snd_una_max - _tc->iss; \
+ ed->data[2] = tcp_available_snd_space (_tc); \
+ ed->data[3] = tcp_flight_size (_tc); \
+ ed->data[4] = _tc->rcv_wnd; \
}
-#define TCP_EVT_OUTPUT_HANDLER(_tc, flags, n_bytes,...) \
+#define TCP_EVT_INPUT_HANDLER(_tc, _type, _len, _written, ...) \
{ \
ELOG_TYPE_DECLARE (_e) = \
{ \
- .format = "out: flags %x, bytes %u", \
- .format_args = "i4i4", \
+ .format = "in: %s len %u written %d rcv_nxt %u free wnd %d", \
+ .format_args = "t4i4i4i4i4", \
+ .n_enum_strings = 2, \
+ .enum_strings = { \
+ "order", \
+ "ooo", \
+ }, \
}; \
- DECLARE_ETD(_tc, _e, 2); \
- ed->data[0] = flags; \
- ed->data[1] = n_bytes; \
-}
-
-#define TCP_EVT_INPUT_HANDLER(_tc, n_bytes, ...) \
-{ \
- ELOG_TYPE_DECLARE (_e) = \
- { \
- .format = "in: bytes %u rcv_nxt %u", \
- .format_args = "i4i4", \
- }; \
- DECLARE_ETD(_tc, _e, 2); \
- ed->data[0] = n_bytes; \
- ed->data[1] = _tc->rcv_nxt - _tc->irs; \
+ DECLARE_ETD(_tc, _e, 5); \
+ ed->data[0] = _type; \
+ ed->data[1] = _len; \
+ ed->data[2] = _written; \
+ ed->data[3] = (_tc->rcv_nxt - _tc->irs) + _written; \
+ ed->data[4] = _tc->rcv_wnd - (_tc->rcv_nxt - _tc->rcv_las); \
}
#define TCP_EVT_TIMER_POP_HANDLER(_tc_index, _timer_id, ...) \
@@ -296,9 +350,131 @@
ed->data[1] = _timer_id; \
}
+#define TCP_EVT_SEG_INVALID_HANDLER(_tc, _seq, _end, ...) \
+{ \
+ ELOG_TYPE_DECLARE (_e) = \
+ { \
+ .format = "seg-inv: seq %u end %u rcv_las %u rcv_nxt %u wnd %u", \
+ .format_args = "i4i4i4i4i4", \
+ }; \
+ DECLARE_ETD(_tc, _e, 5); \
+ ed->data[0] = _seq - _tc->irs; \
+ ed->data[1] = _end - _tc->irs; \
+ ed->data[2] = _tc->rcv_las - _tc->irs; \
+ ed->data[3] = _tc->rcv_nxt - _tc->irs; \
+ ed->data[4] = _tc->rcv_wnd; \
+}
+
+#define TCP_EVT_ACK_RCV_ERR_HANDLER(_tc, _type, _ack, ...) \
+{ \
+ ELOG_TYPE_DECLARE (_e) = \
+ { \
+ .format = "ack-err: %s ack %u snd_una %u snd_nxt %u una_max %u", \
+ .format_args = "t4i4i4i4i4", \
+ .n_enum_strings = 3, \
+ .enum_strings = { \
+ "invalid", \
+ "old", \
+ "future", \
+ }, \
+ }; \
+ DECLARE_ETD(_tc, _e, 5); \
+ ed->data[0] = _type; \
+ ed->data[1] = _ack - _tc->iss; \
+ ed->data[2] = _tc->snd_una - _tc->iss; \
+ ed->data[3] = _tc->snd_nxt - _tc->iss; \
+ ed->data[4] = _tc->snd_una_max - _tc->iss; \
+}
+
+/*
+ * Congestion Control
+ */
+
+#if TCP_DEBUG_CC
+#define TCP_EVT_CC_RTX_HANDLER(_tc, offset, n_bytes, ...) \
+{ \
+ ELOG_TYPE_DECLARE (_e) = \
+ { \
+ .format = "rtx: snd_nxt %u offset %u snd %u rtx %u", \
+ .format_args = "i4i4i4i4", \
+ }; \
+ DECLARE_ETD(_tc, _e, 4); \
+ ed->data[0] = _tc->snd_nxt - _tc->iss; \
+ ed->data[1] = offset; \
+ ed->data[2] = n_bytes; \
+ ed->data[3] = _tc->rtx_bytes; \
+}
+
+#define TCP_EVT_CC_EVT_HANDLER(_tc, _sub_evt, ...) \
+{ \
+ ELOG_TYPE_DECLARE (_e) = \
+ { \
+ .format = "cc: %s wnd %u snd_cong %u rtx_bytes %u", \
+ .format_args = "t4i4i4i4", \
+ .n_enum_strings = 5, \
+ .enum_strings = { \
+ "fast-rtx", \
+ "rtx-timeout", \
+ "first-rtx", \
+ "recovered", \
+ "congestion", \
+ }, \
+ }; \
+ DECLARE_ETD(_tc, _e, 4); \
+ ed->data[0] = _sub_evt; \
+ ed->data[1] = tcp_available_snd_space (_tc); \
+ ed->data[2] = _tc->snd_congestion - _tc->iss; \
+ ed->data[3] = _tc->rtx_bytes; \
+}
+
+#define TCP_EVT_CC_PACK_HANDLER(_tc, ...) \
+{ \
+ ELOG_TYPE_DECLARE (_e) = \
+ { \
+ .format = "pack: snd_una %u snd_una_max %u", \
+ .format_args = "i4i4", \
+ }; \
+ DECLARE_ETD(_tc, _e, 2); \
+ ed->data[0] = _tc->snd_una - _tc->iss; \
+ ed->data[1] = _tc->snd_una_max - _tc->iss; \
+}
+
+#else
+#define TCP_EVT_CC_RTX_HANDLER(_tc, offset, n_bytes, ...)
+#define TCP_EVT_CC_EVT_HANDLER(_tc, _sub_evt, _snd_space, ...)
+#define TCP_EVT_CC_PACK_HANDLER(_tc, ...)
+#endif
+
+#if TCP_DBG_VERBOSE
+#define TCP_EVT_SND_WND_HANDLER(_tc, ...) \
+{ \
+ ELOG_TYPE_DECLARE (_e) = \
+ { \
+ .format = "snd_wnd update: %u ", \
+ .format_args = "i4", \
+ }; \
+ DECLARE_ETD(_tc, _e, 1); \
+ ed->data[0] = _tc->snd_wnd; \
+}
+
+#define TCP_EVT_OUTPUT_HANDLER(_tc, flags, n_bytes,...) \
+{ \
+ ELOG_TYPE_DECLARE (_e) = \
+ { \
+ .format = "out: flags %x, bytes %u", \
+ .format_args = "i4i4", \
+ }; \
+ DECLARE_ETD(_tc, _e, 2); \
+ ed->data[0] = flags; \
+ ed->data[1] = n_bytes; \
+}
+#else
+#define TCP_EVT_SND_WND_HANDLER(_tc, ...)
+#define TCP_EVT_OUTPUT_HANDLER(_tc, flags, n_bytes,...)
+#endif
+
#define CONCAT_HELPER(_a, _b) _a##_b
#define CC(_a, _b) CONCAT_HELPER(_a, _b)
-
#define TCP_EVT_DBG(_evt, _args...) CC(_evt, _HANDLER)(_args)
#else
diff --git a/src/vnet/tcp/tcp_error.def b/src/vnet/tcp/tcp_error.def
index 2dbdd9b..b91a08c 100644
--- a/src/vnet/tcp/tcp_error.def
+++ b/src/vnet/tcp/tcp_error.def
@@ -12,12 +12,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
tcp_error (NONE, "no error")
tcp_error (NO_LISTENER, "no listener for dst port")
tcp_error (LOOKUP_DROPS, "lookup drops")
tcp_error (DISPATCH, "Dispatch error")
tcp_error (ENQUEUED, "Packets pushed into rx fifo")
+tcp_error (PARTIALLY_ENQUEUED, "Packets partially pushed into rx fifo")
tcp_error (PURE_ACK, "Pure acks")
tcp_error (SYNS_RCVD, "SYNs received")
tcp_error (SYN_ACKS_RCVD, "SYN-ACKs received")
@@ -26,11 +26,14 @@
tcp_error (EVENT_FIFO_FULL, "Events not sent for lack of event fifo space")
tcp_error (API_QUEUE_FULL, "Sessions not created for lack of API queue space")
tcp_error (CREATE_SESSION_FAIL, "Sessions couldn't be allocated")
-tcp_error (SEGMENT_INVALID, "Invalid segment")
+tcp_error (SEGMENT_INVALID, "Invalid segments")
+tcp_error (SEGMENT_OLD, "Old segment")
tcp_error (ACK_INVALID, "Invalid ACK")
tcp_error (ACK_DUP, "Duplicate ACK")
tcp_error (ACK_OLD, "Old ACK")
+tcp_error (ACK_FUTURE, "Future ACK")
tcp_error (PKTS_SENT, "Packets sent")
tcp_error (FILTERED_DUPACKS, "Filtered duplicate ACKs")
tcp_error (RST_SENT, "Resets sent")
tcp_error (INVALID_CONNECTION, "Invalid connection")
+tcp_error (NO_WND, "No window")
\ No newline at end of file
diff --git a/src/vnet/tcp/tcp_input.c b/src/vnet/tcp/tcp_input.c
index 67af432..5d11985f 100644
--- a/src/vnet/tcp/tcp_input.c
+++ b/src/vnet/tcp/tcp_input.c
@@ -95,13 +95,21 @@
* or the rcv_nxt at last ack sent instead of rcv_nxt since that's the
* peer's reference when computing our receive window.
*
- * This accepts only segments within the window.
+ * This:
+ * seq_leq (end_seq, tc->rcv_las + tc->rcv_wnd) && seq_geq (seq, tc->rcv_las)
+ * however, is too strict when we have retransmits. Instead we just check that
+ * the seq is not beyond the right edge and that the end of the segment is not
+ * less than the left edge.
+ *
+ * N.B. rcv_nxt and rcv_wnd are both updated in this node if acks are sent, so
+ * use rcv_nxt in the right edge window test instead of rcv_las.
+ *
*/
always_inline u8
tcp_segment_in_rcv_wnd (tcp_connection_t * tc, u32 seq, u32 end_seq)
{
- return seq_leq (end_seq, tc->rcv_las + tc->rcv_wnd)
- && seq_geq (seq, tc->rcv_nxt);
+ return (seq_geq (end_seq, tc->rcv_las)
+ && seq_leq (seq, tc->rcv_nxt + tc->rcv_wnd));
}
void
@@ -253,6 +261,7 @@
{
tcp_make_ack (tc0, b0);
*next0 = tcp_next_output (tc0->c_is_ip4);
+ TCP_EVT_DBG (TCP_EVT_DUPACK_SENT, tc0);
return -1;
}
}
@@ -262,13 +271,25 @@
if (!tcp_segment_in_rcv_wnd (tc0, vnet_buffer (b0)->tcp.seq_number,
vnet_buffer (b0)->tcp.seq_end))
{
- if (!tcp_rst (th0))
+ /* If our window is 0 and the packet is in sequence, let it pass
+ * through for ack processing. It should be dropped later.*/
+ if (tc0->rcv_wnd == 0
+ && tc0->rcv_nxt == vnet_buffer (b0)->tcp.seq_number)
{
- /* Send dup ack */
- tcp_make_ack (tc0, b0);
- *next0 = tcp_next_output (tc0->c_is_ip4);
+ /* Make it look as if there's nothing to dequeue */
+ vnet_buffer (b0)->tcp.seq_end = vnet_buffer (b0)->tcp.seq_number;
}
- return -1;
+ else
+ {
+ /* If not RST, send dup ack */
+ if (!tcp_rst (th0))
+ {
+ tcp_make_ack (tc0, b0);
+ *next0 = tcp_next_output (tc0->c_is_ip4);
+ TCP_EVT_DBG (TCP_EVT_DUPACK_SENT, tc0);
+ }
+ return -1;
+ }
}
/* 2nd: check the RST bit */
@@ -326,13 +347,13 @@
/* XXX Drop in RTT results in RTTVAR increase and bigger RTO.
* The increase should be bound */
- tc->rttvar += (clib_abs (err) - tc->rttvar) >> 2;
+ tc->rttvar += ((int) clib_abs (err) - (int) tc->rttvar) >> 2;
}
else
{
/* First measurement. */
tc->srtt = mrtt;
- tc->rttvar = mrtt << 1;
+ tc->rttvar = mrtt >> 1;
}
}
@@ -394,7 +415,11 @@
}
}
-/** Check if dupack as per RFC5681 Sec. 2 */
+/**
+ * Check if dupack as per RFC5681 Sec. 2
+ *
+ * This works only if called before updating snd_wnd.
+ * */
always_inline u8
tcp_ack_is_dupack (tcp_connection_t * tc, vlib_buffer_t * b, u32 new_snd_wnd)
{
@@ -429,10 +454,10 @@
}
sack_scoreboard_hole_t *
-scoreboard_insert_hole (sack_scoreboard_t * sb, sack_scoreboard_hole_t * prev,
+scoreboard_insert_hole (sack_scoreboard_t * sb, u32 prev_index,
u32 start, u32 end)
{
- sack_scoreboard_hole_t *hole, *next;
+ sack_scoreboard_hole_t *hole, *next, *prev;
u32 hole_index;
pool_get (sb->holes, hole);
@@ -442,6 +467,7 @@
hole->end = end;
hole_index = hole - sb->holes;
+ prev = scoreboard_get_hole (sb, prev_index);
if (prev)
{
hole->prev = prev - sb->holes;
@@ -462,28 +488,35 @@
return hole;
}
-static void
+void
tcp_rcv_sacks (tcp_connection_t * tc, u32 ack)
{
sack_scoreboard_t *sb = &tc->sack_sb;
sack_block_t *blk, tmp;
- sack_scoreboard_hole_t *hole, *next_hole;
- u32 blk_index = 0;
+ sack_scoreboard_hole_t *hole, *next_hole, *last_hole, *new_hole;
+ u32 blk_index = 0, old_sacked_bytes, hole_index;
int i, j;
- if (!tcp_opts_sack (tc) && sb->head == TCP_INVALID_SACK_HOLE_INDEX)
+ sb->last_sacked_bytes = 0;
+ sb->snd_una_adv = 0;
+ old_sacked_bytes = sb->sacked_bytes;
+
+ if (!tcp_opts_sack (&tc->opt) && sb->head == TCP_INVALID_SACK_HOLE_INDEX)
return;
/* Remove invalid blocks */
- vec_foreach (blk, tc->opt.sacks)
- {
- if (seq_lt (blk->start, blk->end)
- && seq_gt (blk->start, tc->snd_una)
- && seq_gt (blk->start, ack) && seq_lt (blk->end, tc->snd_nxt))
- continue;
-
- vec_del1 (tc->opt.sacks, blk - tc->opt.sacks);
- }
+ blk = tc->opt.sacks;
+ while (blk < vec_end (tc->opt.sacks))
+ {
+ if (seq_lt (blk->start, blk->end)
+ && seq_gt (blk->start, tc->snd_una)
+ && seq_gt (blk->start, ack) && seq_leq (blk->end, tc->snd_nxt))
+ {
+ blk++;
+ continue;
+ }
+ vec_del1 (tc->opt.sacks, blk - tc->opt.sacks);
+ }
/* Add block for cumulative ack */
if (seq_gt (ack, tc->snd_una))
@@ -498,7 +531,7 @@
/* Make sure blocks are ordered */
for (i = 0; i < vec_len (tc->opt.sacks); i++)
- for (j = i; j < vec_len (tc->opt.sacks); j++)
+ for (j = i + 1; j < vec_len (tc->opt.sacks); j++)
if (seq_lt (tc->opt.sacks[j].start, tc->opt.sacks[i].start))
{
tmp = tc->opt.sacks[i];
@@ -506,10 +539,22 @@
tc->opt.sacks[j] = tmp;
}
- /* If no holes, insert the first that covers all outstanding bytes */
if (sb->head == TCP_INVALID_SACK_HOLE_INDEX)
{
- scoreboard_insert_hole (sb, 0, tc->snd_una, tc->snd_una_max);
+ /* If no holes, insert the first that covers all outstanding bytes */
+ last_hole = scoreboard_insert_hole (sb, TCP_INVALID_SACK_HOLE_INDEX,
+ tc->snd_una, tc->snd_una_max);
+ sb->tail = scoreboard_hole_index (sb, last_hole);
+ }
+ else
+ {
+ /* If we have holes but snd_una_max is beyond the last hole, update
+ * last hole end */
+ tmp = tc->opt.sacks[vec_len (tc->opt.sacks) - 1];
+ last_hole = scoreboard_last_hole (sb);
+ if (seq_gt (tc->snd_una_max, sb->max_byte_sacked)
+ && seq_gt (tc->snd_una_max, last_hole->end))
+ last_hole->end = tc->snd_una_max;
}
/* Walk the holes with the SACK blocks */
@@ -526,10 +571,10 @@
next_hole = scoreboard_next_hole (sb, hole);
/* Byte accounting */
- if (seq_lt (hole->end, ack))
+ if (seq_leq (hole->end, ack))
{
- /* Bytes lost because snd wnd left edge advances */
- if (seq_lt (next_hole->start, ack))
+ /* Bytes lost because snd_wnd left edge advances */
+ if (next_hole && seq_leq (next_hole->start, ack))
sb->sacked_bytes -= next_hole->start - hole->end;
else
sb->sacked_bytes -= ack - hole->end;
@@ -539,35 +584,78 @@
sb->sacked_bytes += scoreboard_hole_bytes (hole);
}
+ /* snd_una needs to be advanced */
+ if (seq_geq (ack, hole->end))
+ {
+ if (next_hole && seq_lt (ack, next_hole->start))
+ sb->snd_una_adv = next_hole->start - ack;
+ else
+ sb->snd_una_adv = sb->max_byte_sacked - ack;
+
+ /* all these can be delivered */
+ sb->sacked_bytes -= sb->snd_una_adv;
+ }
+
+ /* About to remove last hole */
+ if (hole == last_hole)
+ {
+ sb->tail = hole->prev;
+ last_hole = scoreboard_last_hole (sb);
+ /* keep track of max byte sacked in case the last hole
+ * is acked */
+ if (seq_gt (hole->end, sb->max_byte_sacked))
+ sb->max_byte_sacked = hole->end;
+ }
scoreboard_remove_hole (sb, hole);
hole = next_hole;
}
- /* Partial overlap */
+ /* Partial 'head' overlap */
else
{
- sb->sacked_bytes += blk->end - hole->start;
- hole->start = blk->end;
+ if (seq_gt (blk->end, hole->start))
+ {
+ sb->sacked_bytes += blk->end - hole->start;
+ hole->start = blk->end;
+ }
blk_index++;
}
}
else
{
/* Hole must be split */
- if (seq_leq (blk->end, hole->end))
+ if (seq_lt (blk->end, hole->end))
{
sb->sacked_bytes += blk->end - blk->start;
- scoreboard_insert_hole (sb, hole, blk->end, hole->end);
- hole->end = blk->start - 1;
+ hole_index = scoreboard_hole_index (sb, hole);
+ new_hole = scoreboard_insert_hole (sb, hole_index, blk->end,
+ hole->end);
+
+ /* Pool might've moved */
+ hole = scoreboard_get_hole (sb, hole_index);
+ hole->end = blk->start;
+
+ /* New or split of tail */
+ if ((last_hole->end == new_hole->end)
+ || seq_lt (last_hole->end, new_hole->start))
+ {
+ last_hole = new_hole;
+ sb->tail = scoreboard_hole_index (sb, new_hole);
+ }
+
blk_index++;
+ hole = scoreboard_next_hole (sb, hole);
}
else
{
- sb->sacked_bytes += hole->end - blk->start + 1;
- hole->end = blk->start - 1;
+ sb->sacked_bytes += hole->end - blk->start;
+ hole->end = blk->start;
hole = scoreboard_next_hole (sb, hole);
}
}
}
+
+ sb->last_sacked_bytes = sb->sacked_bytes + sb->snd_una_adv
+ - old_sacked_bytes;
}
/** Update snd_wnd
@@ -577,72 +665,94 @@
static void
tcp_update_snd_wnd (tcp_connection_t * tc, u32 seq, u32 ack, u32 snd_wnd)
{
- if (tc->snd_wl1 < seq || (tc->snd_wl1 == seq && tc->snd_wl2 <= ack))
+ if (seq_lt (tc->snd_wl1, seq)
+ || (tc->snd_wl1 == seq && seq_leq (tc->snd_wl2, ack)))
{
tc->snd_wnd = snd_wnd;
tc->snd_wl1 = seq;
tc->snd_wl2 = ack;
+ TCP_EVT_DBG (TCP_EVT_SND_WND, tc);
}
}
-static void
+void
tcp_cc_congestion (tcp_connection_t * tc)
{
+ tc->snd_congestion = tc->snd_nxt;
tc->cc_algo->congestion (tc);
+ TCP_EVT_DBG (TCP_EVT_CC_EVT, tc, 4);
}
-static void
+void
tcp_cc_recover (tcp_connection_t * tc)
{
- if (tcp_in_fastrecovery (tc))
- {
- tc->cc_algo->recovered (tc);
- tcp_recovery_off (tc);
- }
- else if (tcp_in_recovery (tc))
- {
- tcp_recovery_off (tc);
- tc->cwnd = tcp_loss_wnd (tc);
- }
+ tc->cc_algo->recovered (tc);
+
+ tc->rtx_bytes = 0;
+ tc->rcv_dupacks = 0;
+ tc->snd_nxt = tc->snd_una;
+
+ tc->cc_algo->rcv_ack (tc);
+ tc->tsecr_last_ack = tc->opt.tsecr;
+
+ tcp_fastrecovery_1_smss_off (tc);
+ tcp_fastrecovery_off (tc);
+
+ TCP_EVT_DBG (TCP_EVT_CC_EVT, tc, 3);
}
static void
-tcp_cc_rcv_ack (tcp_connection_t * tc)
+tcp_cc_rcv_ack (tcp_connection_t * tc, vlib_buffer_t * b)
{
u8 partial_ack;
if (tcp_in_recovery (tc))
{
- partial_ack = seq_lt (tc->snd_una, tc->snd_una_max);
+ partial_ack = seq_lt (tc->snd_una, tc->snd_congestion);
if (!partial_ack)
{
/* Clear retransmitted bytes. */
- tc->rtx_bytes = 0;
tcp_cc_recover (tc);
}
else
{
+ TCP_EVT_DBG (TCP_EVT_CC_PACK, tc);
+
/* Clear retransmitted bytes. XXX should we clear all? */
tc->rtx_bytes = 0;
tc->cc_algo->rcv_cong_ack (tc, TCP_CC_PARTIALACK);
- /* Retransmit first unacked segment */
- tcp_retransmit_first_unacked (tc);
+ /* In case snd_nxt is still in the past and output tries to
+ * shove some new bytes */
+ tc->snd_nxt = tc->snd_una;
+
+ /* XXX need proper RFC6675 support */
+ if (tc->sack_sb.last_sacked_bytes)
+ {
+ tcp_fast_retransmit (tc);
+ }
+ else
+ {
+ /* Retransmit first unacked segment */
+ tcp_retransmit_first_unacked (tc);
+ /* If window allows, send 1 SMSS of new data */
+ if (seq_lt (tc->snd_nxt, tc->snd_congestion))
+ tc->snd_nxt = tc->snd_congestion;
+ }
}
}
else
{
tc->cc_algo->rcv_ack (tc);
+ tc->tsecr_last_ack = tc->opt.tsecr;
+ tc->rcv_dupacks = 0;
}
-
- tc->rcv_dupacks = 0;
- tc->tsecr_last_ack = tc->opt.tsecr;
}
static void
tcp_cc_rcv_dupack (tcp_connection_t * tc, u32 ack)
{
- ASSERT (tc->snd_una == ack);
+// ASSERT (seq_geq(tc->snd_una, ack));
tc->rcv_dupacks++;
if (tc->rcv_dupacks == TCP_DUPACK_THRESHOLD)
@@ -688,20 +798,39 @@
{
u32 new_snd_wnd;
- /* If the ACK acks something not yet sent (SEG.ACK > SND.NXT) then send an
- * ACK, drop the segment, and return */
+ /* If the ACK acks something not yet sent (SEG.ACK > SND.NXT) */
if (seq_gt (vnet_buffer (b)->tcp.ack_number, tc->snd_nxt))
{
- tcp_make_ack (tc, b);
- *next = tcp_next_output (tc->c_is_ip4);
- *error = TCP_ERROR_ACK_INVALID;
- return -1;
+ /* If we have outstanding data and this is within the window, accept it,
+ * probably retransmit has timed out. Otherwise ACK segment and then
+ * drop it */
+ if (seq_gt (vnet_buffer (b)->tcp.ack_number, tc->snd_una_max))
+ {
+ tcp_make_ack (tc, b);
+ *next = tcp_next_output (tc->c_is_ip4);
+ *error = TCP_ERROR_ACK_INVALID;
+ TCP_EVT_DBG (TCP_EVT_ACK_RCV_ERR, tc, 0,
+ vnet_buffer (b)->tcp.ack_number);
+ return -1;
+ }
+
+ tc->snd_nxt = vnet_buffer (b)->tcp.ack_number;
+ *error = TCP_ERROR_ACK_FUTURE;
+ TCP_EVT_DBG (TCP_EVT_ACK_RCV_ERR, tc, 2,
+ vnet_buffer (b)->tcp.ack_number);
}
- /* If old ACK, discard */
+ /* If old ACK, probably it's an old dupack */
if (seq_lt (vnet_buffer (b)->tcp.ack_number, tc->snd_una))
{
*error = TCP_ERROR_ACK_OLD;
+ TCP_EVT_DBG (TCP_EVT_ACK_RCV_ERR, tc, 1,
+ vnet_buffer (b)->tcp.ack_number);
+ if (tcp_in_fastrecovery (tc) && tc->rcv_dupacks == TCP_DUPACK_THRESHOLD)
+ {
+ TCP_EVT_DBG (TCP_EVT_DUPACK_RCVD, tc);
+ tcp_cc_rcv_dupack (tc, vnet_buffer (b)->tcp.ack_number);
+ }
return -1;
}
@@ -712,32 +841,40 @@
if (tcp_ack_is_dupack (tc, b, new_snd_wnd))
{
+ TCP_EVT_DBG (TCP_EVT_DUPACK_RCVD, tc, 1);
tcp_cc_rcv_dupack (tc, vnet_buffer (b)->tcp.ack_number);
*error = TCP_ERROR_ACK_DUP;
return -1;
}
- /* Valid ACK */
+ /*
+ * Valid ACK
+ */
+
tc->bytes_acked = vnet_buffer (b)->tcp.ack_number - tc->snd_una;
- tc->snd_una = vnet_buffer (b)->tcp.ack_number;
+ tc->snd_una = vnet_buffer (b)->tcp.ack_number + tc->sack_sb.snd_una_adv;
- /* Dequeue ACKed packet and update RTT */
+ /* Dequeue ACKed data and update RTT */
tcp_dequeue_acked (tc, vnet_buffer (b)->tcp.ack_number);
-
tcp_update_snd_wnd (tc, vnet_buffer (b)->tcp.seq_number,
vnet_buffer (b)->tcp.ack_number, new_snd_wnd);
- /* Updates congestion control (slow start/congestion avoidance) */
- tcp_cc_rcv_ack (tc);
+ /* If some of our sent bytes have been acked, update cc and retransmit
+ * timer. */
+ if (tc->bytes_acked)
+ {
+ TCP_EVT_DBG (TCP_EVT_ACK_RCVD, tc, vnet_buffer (b)->tcp.ack_number);
- TCP_EVT_DBG (TCP_EVT_ACK_RCVD, tc);
+ /* Updates congestion control (slow start/congestion avoidance) */
+ tcp_cc_rcv_ack (tc, b);
- /* If everything has been acked, stop retransmit timer
- * otherwise update */
- if (tc->snd_una == tc->snd_una_max)
- tcp_timer_reset (tc, TCP_TIMER_RETRANSMIT);
- else
- tcp_timer_update (tc, TCP_TIMER_RETRANSMIT, tc->rto);
+ /* If everything has been acked, stop retransmit timer
+ * otherwise update */
+ if (tc->snd_una == tc->snd_una_max)
+ tcp_retransmit_timer_reset (tc);
+ else
+ tcp_retransmit_timer_update (tc);
+ }
return 0;
}
@@ -757,9 +894,7 @@
tcp_update_sack_list (tcp_connection_t * tc, u32 start, u32 end)
{
sack_block_t *new_list = 0, block;
- u32 n_elts;
int i;
- u8 new_head = 0;
/* If the first segment is ooo add it to the list. Last write might've moved
* rcv_nxt over the first segment. */
@@ -768,7 +903,6 @@
block.start = start;
block.end = end;
vec_add1 (new_list, block);
- new_head = 1;
}
/* Find the blocks still worth keeping. */
@@ -782,20 +916,19 @@
|| seq_leq (tc->snd_sacks[i].start, end))
continue;
- /* Save subsequent segments to new SACK list. */
- n_elts = clib_min (vec_len (tc->snd_sacks) - i,
- TCP_MAX_SACK_BLOCKS - new_head);
- vec_insert_elts (new_list, &tc->snd_sacks[i], n_elts, new_head);
- break;
+ /* Save to new SACK list. */
+ vec_add1 (new_list, tc->snd_sacks[i]);
}
+ ASSERT (vec_len (new_list) < TCP_MAX_SACK_BLOCKS);
+
/* Replace old vector with new one */
vec_free (tc->snd_sacks);
tc->snd_sacks = new_list;
}
/** Enqueue data for delivery to application */
-always_inline u32
+always_inline int
tcp_session_enqueue_data (tcp_connection_t * tc, vlib_buffer_t * b,
u16 data_len)
{
@@ -812,6 +945,8 @@
vlib_buffer_get_current (b),
data_len, 1 /* queue event */ );
+ TCP_EVT_DBG (TCP_EVT_INPUT, tc, 0, data_len, written);
+
/* Update rcv_nxt */
if (PREDICT_TRUE (written == data_len))
{
@@ -824,38 +959,61 @@
/* Send ACK confirming the update */
tc->flags |= TCP_CONN_SNDACK;
+ }
+ else if (written > 0)
+ {
+ /* We've written something but FIFO is probably full now */
+ tc->rcv_nxt += written;
- /* Update SACK list if need be */
- if (tcp_opts_sack_permitted (&tc->opt))
- {
- /* Remove SACK blocks that have been delivered */
- tcp_update_sack_list (tc, tc->rcv_nxt, tc->rcv_nxt);
- }
+ /* Depending on how fast the app is, all remaining buffers in burst will
+ * not be enqueued. Should we inform peer of the damage? XXX */
+ return TCP_ERROR_PARTIALLY_ENQUEUED;
}
else
{
- ASSERT (0);
return TCP_ERROR_FIFO_FULL;
}
+ /* Update SACK list if need be */
+ if (tcp_opts_sack_permitted (&tc->opt))
+ {
+ /* Remove SACK blocks that have been delivered */
+ tcp_update_sack_list (tc, tc->rcv_nxt, tc->rcv_nxt);
+ }
+
return TCP_ERROR_ENQUEUED;
}
/** Enqueue out-of-order data */
-always_inline u32
+always_inline int
tcp_session_enqueue_ooo (tcp_connection_t * tc, vlib_buffer_t * b,
u16 data_len)
{
stream_session_t *s0;
u32 offset, seq;
+ int rv;
+
+ /* Pure ACK. Do nothing */
+ if (PREDICT_FALSE (data_len == 0))
+ {
+ return TCP_ERROR_PURE_ACK;
+ }
s0 = stream_session_get (tc->c_s_index, tc->c_thread_index);
seq = vnet_buffer (b)->tcp.seq_number;
offset = seq - tc->rcv_nxt;
- if (svm_fifo_enqueue_with_offset (s0->server_rx_fifo, s0->pid, offset,
- data_len, vlib_buffer_get_current (b)))
- return TCP_ERROR_FIFO_FULL;
+ rv = svm_fifo_enqueue_with_offset (s0->server_rx_fifo, s0->pid, offset,
+ data_len, vlib_buffer_get_current (b));
+
+ /* Nothing written */
+ if (rv)
+ {
+ TCP_EVT_DBG (TCP_EVT_INPUT, tc, 1, data_len, 0);
+ return TCP_ERROR_FIFO_FULL;
+ }
+
+ TCP_EVT_DBG (TCP_EVT_INPUT, tc, 1, data_len, data_len);
/* Update SACK list if in use */
if (tcp_opts_sack_permitted (&tc->opt))
@@ -875,20 +1033,23 @@
}
/**
- * Check if ACK could be delayed. DELACK timer is set only after frame is
- * processed so this can return true for a full bursts of packets.
+ * Check if ACK could be delayed. If ack can be delayed, it should return
+ * true for a full frame. If we're always acking return 0.
*/
always_inline int
tcp_can_delack (tcp_connection_t * tc)
{
- /* If there's no DELACK timer set and the last window sent wasn't 0 we
- * can safely delay. */
- if (!tcp_timer_is_active (tc, TCP_TIMER_DELACK)
- && (tc->flags & TCP_CONN_SENT_RCV_WND0) == 0
- && (tc->flags & TCP_CONN_SNDACK) == 0)
- return 1;
+ /* Send ack if ... */
+ if (TCP_ALWAYS_ACK
+ /* just sent a rcv wnd 0 */
+ || (tc->flags & TCP_CONN_SENT_RCV_WND0) != 0
+ /* constrained to send ack */
+ || (tc->flags & TCP_CONN_SNDACK) != 0
+ /* we're almost out of tx wnd */
+ || tcp_available_snd_space (tc) < 2 * tc->snd_mss)
+ return 0;
- return 0;
+ return 1;
}
static int
@@ -900,23 +1061,33 @@
/* Handle out-of-order data */
if (PREDICT_FALSE (vnet_buffer (b)->tcp.seq_number != tc->rcv_nxt))
{
- error = tcp_session_enqueue_ooo (tc, b, n_data_bytes);
-
- /* Don't send more than 3 dupacks per burst
- * XXX decide if this is good */
- if (tc->snt_dupacks < 3)
+ /* Old sequence numbers allowed through because they overlapped
+ * the rx window */
+ if (seq_lt (vnet_buffer (b)->tcp.seq_number, tc->rcv_nxt))
{
- /* RFC2581: Send DUPACK for fast retransmit */
- tcp_make_ack (tc, b);
- *next0 = tcp_next_output (tc->c_is_ip4);
-
- /* Mark as DUPACK. We may filter these in output if
- * the burst fills the holes. */
- vnet_buffer (b)->tcp.flags = TCP_BUF_FLAG_DUPACK;
-
- tc->snt_dupacks++;
+ error = TCP_ERROR_SEGMENT_OLD;
+ *next0 = TCP_NEXT_DROP;
+ goto done;
}
+ error = tcp_session_enqueue_ooo (tc, b, n_data_bytes);
+
+ /* N.B. Should not filter burst of dupacks. Two issues 1) dupacks open
+ * cwnd on remote peer when congested 2) acks leaving should have the
+ * latest rcv_wnd since the burst may eaten up all of it, so only the
+ * old ones could be filtered.
+ */
+
+ /* RFC2581: Send DUPACK for fast retransmit */
+ tcp_make_ack (tc, b);
+ *next0 = tcp_next_output (tc->c_is_ip4);
+
+ /* Mark as DUPACK. We may filter these in output if
+ * the burst fills the holes. */
+ if (n_data_bytes)
+ vnet_buffer (b)->tcp.flags = TCP_BUF_FLAG_DUPACK;
+
+ TCP_EVT_DBG (TCP_EVT_DUPACK_SENT, tc);
goto done;
}
@@ -924,63 +1095,45 @@
* segments can be enqueued after fifo tail offset changes. */
error = tcp_session_enqueue_data (tc, b, n_data_bytes);
- TCP_EVT_DBG (TCP_EVT_INPUT, tc, n_data_bytes);
+ if (n_data_bytes == 0)
+ {
+ *next0 = TCP_NEXT_DROP;
+ goto done;
+ }
+
+ if (PREDICT_FALSE (error == TCP_ERROR_FIFO_FULL))
+ *next0 = TCP_NEXT_DROP;
/* Check if ACK can be delayed */
- if (tcp_can_delack (tc))
+ if (!tcp_can_delack (tc))
{
- /* Nothing to do for pure ACKs */
+ /* Nothing to do for pure ACKs XXX */
if (n_data_bytes == 0)
goto done;
- /* If connection has not been previously marked for delay ack
- * add it to the list and flag it */
- if (!tc->flags & TCP_CONN_DELACK)
- {
- vec_add1 (tm->delack_connections[tc->c_thread_index],
- tc->c_c_index);
- tc->flags |= TCP_CONN_DELACK;
- }
+ *next0 = tcp_next_output (tc->c_is_ip4);
+ tcp_make_ack (tc, b);
}
else
{
- /* Check if a packet has already been enqueued to output for burst.
- * If yes, then drop this one, otherwise, let it pass through to
- * output */
- if ((tc->flags & TCP_CONN_BURSTACK) == 0)
- {
- *next0 = tcp_next_output (tc->c_is_ip4);
- tcp_make_ack (tc, b);
- error = TCP_ERROR_ENQUEUED;
-
- /* TODO: maybe add counter to ensure N acks will be sent/burst */
- tc->flags |= TCP_CONN_BURSTACK;
- }
+ if (!tcp_timer_is_active (tc, TCP_TIMER_DELACK))
+ tcp_timer_set (tc, TCP_TIMER_DELACK, TCP_DELACK_TIME);
}
done:
return error;
}
-void
-delack_timers_init (tcp_main_t * tm, u32 thread_index)
+always_inline void
+tcp_established_inc_counter (vlib_main_t * vm, u8 is_ip4, u8 evt, u8 val)
{
- tcp_connection_t *tc;
- u32 i, *conns;
- tw_timer_wheel_16t_2w_512sl_t *tw;
+ if (PREDICT_TRUE (!val))
+ return;
- tw = &tm->timer_wheels[thread_index];
- conns = tm->delack_connections[thread_index];
- for (i = 0; i < vec_len (conns); i++)
- {
- tc = pool_elt_at_index (tm->connections[thread_index], conns[i]);
- ASSERT (0 != tc);
-
- tc->timers[TCP_TIMER_DELACK]
- = tw_timer_start_16t_2w_512sl (tw, conns[i],
- TCP_TIMER_DELACK, TCP_DELACK_TIME);
- }
- vec_reset_length (tm->delack_connections[thread_index]);
+ if (is_ip4)
+ vlib_node_increment_counter (vm, tcp4_established_node.index, evt, val);
+ else
+ vlib_node_increment_counter (vm, tcp6_established_node.index, evt, val);
}
always_inline uword
@@ -1027,7 +1180,7 @@
if (PREDICT_FALSE (tc0 == 0))
{
error0 = TCP_ERROR_INVALID_CONNECTION;
- goto drop;
+ goto done;
}
/* Checksum computed by ipx_local no need to compute again */
@@ -1061,18 +1214,22 @@
if (PREDICT_FALSE (tcp_segment_validate (vm, tc0, b0, th0, &next0)))
{
error0 = TCP_ERROR_SEGMENT_INVALID;
- goto drop;
+ TCP_EVT_DBG (TCP_EVT_SEG_INVALID, tc0,
+ vnet_buffer (b0)->tcp.seq_number,
+ vnet_buffer (b0)->tcp.seq_end);
+ goto done;
}
/* 5: check the ACK field */
if (tcp_rcv_ack (tc0, b0, th0, &next0, &error0))
{
- goto drop;
+ goto done;
}
/* 6: check the URG bit TODO */
/* 7: process the segment text */
+
vlib_buffer_advance (b0, n_advance_bytes0);
error0 = tcp_segment_rcv (tm, tc0, b0, n_data_bytes0, &next0);
@@ -1088,7 +1245,7 @@
tcp_timer_set (tc0, TCP_TIMER_WAITCLOSE, TCP_CLOSEWAIT_TIME);
}
- drop:
+ done:
b0->error = node->errors[error0];
if (PREDICT_FALSE (b0->flags & VLIB_BUFFER_IS_TRACED))
{
@@ -1103,17 +1260,7 @@
}
errors = session_manager_flush_enqueue_events (my_thread_index);
- if (errors)
- {
- if (is_ip4)
- vlib_node_increment_counter (vm, tcp4_established_node.index,
- TCP_ERROR_EVENT_FIFO_FULL, errors);
- else
- vlib_node_increment_counter (vm, tcp6_established_node.index,
- TCP_ERROR_EVENT_FIFO_FULL, errors);
- }
-
- delack_timers_init (tm, my_thread_index);
+ tcp_established_inc_counter (vm, is_ip4, TCP_ERROR_EVENT_FIFO_FULL, errors);
return from_frame->n_vectors;
}
@@ -1602,7 +1749,7 @@
stream_session_accept_notify (&tc0->connection);
/* Reset SYN-ACK retransmit timer */
- tcp_timer_reset (tc0, TCP_TIMER_RETRANSMIT);
+ tcp_retransmit_timer_reset (tc0);
break;
case TCP_STATE_ESTABLISHED:
/* We can get packets in established state here because they
@@ -1668,7 +1815,7 @@
tcp_timer_update (tc0, TCP_TIMER_WAITCLOSE, TCP_CLEANUP_TIME);
/* Stop retransmit */
- tcp_timer_reset (tc0, TCP_TIMER_RETRANSMIT);
+ tcp_retransmit_timer_reset (tc0);
goto drop;
diff --git a/src/vnet/tcp/tcp_output.c b/src/vnet/tcp/tcp_output.c
index 114a5b9..a671f72 100644
--- a/src/vnet/tcp/tcp_output.c
+++ b/src/vnet/tcp/tcp_output.c
@@ -125,15 +125,33 @@
u32
tcp_window_to_advertise (tcp_connection_t * tc, tcp_state_t state)
{
- u32 available_space, max_fifo, observed_wnd;
-
if (state < TCP_STATE_ESTABLISHED)
return tcp_initial_window_to_advertise (tc);
+ tcp_update_rcv_wnd (tc);
+
+ if (tc->rcv_wnd == 0)
+ {
+ tc->flags |= TCP_CONN_SENT_RCV_WND0;
+ }
+ else
+ {
+ tc->flags &= ~TCP_CONN_SENT_RCV_WND0;
+ }
+
+ return tc->rcv_wnd >> tc->rcv_wscale;
+}
+
+void
+tcp_update_rcv_wnd (tcp_connection_t * tc)
+{
+ i32 observed_wnd;
+ u32 available_space, max_fifo, wnd;
+
/*
* Figure out how much space we have available
*/
- available_space = stream_session_max_enqueue (&tc->connection);
+ available_space = stream_session_max_rx_enqueue (&tc->connection);
max_fifo = stream_session_fifo_size (&tc->connection);
ASSERT (tc->opt.mss < max_fifo);
@@ -145,23 +163,25 @@
* Use the above and what we know about what we've previously advertised
* to compute the new window
*/
- observed_wnd = tc->rcv_wnd - (tc->rcv_nxt - tc->rcv_las);
+ observed_wnd = (i32) tc->rcv_wnd - (tc->rcv_nxt - tc->rcv_las);
+ if (observed_wnd < 0)
+ observed_wnd = 0;
/* Bad. Thou shalt not shrink */
if (available_space < observed_wnd)
{
- if (available_space == 0)
- clib_warning ("Didn't shrink rcv window despite not having space");
+ /* Does happen! */
+ wnd = observed_wnd;
}
-
- tc->rcv_wnd = clib_min (available_space, TCP_WND_MAX << tc->rcv_wscale);
-
- if (tc->rcv_wnd == 0)
+ else
{
- tc->flags |= TCP_CONN_SENT_RCV_WND0;
+ wnd = available_space;
}
- return tc->rcv_wnd >> tc->rcv_wscale;
+ if (wnd && ((wnd << tc->rcv_wscale) >> tc->rcv_wscale != wnd))
+ wnd += 1 << tc->rcv_wscale;
+
+ tc->rcv_wnd = clib_min (wnd, TCP_WND_MAX << tc->rcv_wscale);
}
/**
@@ -363,7 +383,7 @@
#define tcp_get_free_buffer_index(tm, bidx) \
do { \
u32 *my_tx_buffers, n_free_buffers; \
- u32 cpu_index = tm->vlib_main->cpu_index; \
+ u32 cpu_index = os_get_cpu_number(); \
my_tx_buffers = tm->tx_buffers[cpu_index]; \
if (PREDICT_FALSE(vec_len (my_tx_buffers) == 0)) \
{ \
@@ -381,6 +401,14 @@
_vec_len (my_tx_buffers) -= 1; \
} while (0)
+#define tcp_return_buffer(tm) \
+do { \
+ u32 *my_tx_buffers; \
+ u32 cpu_index = os_get_cpu_number(); \
+ my_tx_buffers = tm->tx_buffers[cpu_index]; \
+ _vec_len (my_tx_buffers) +=1; \
+} while (0)
+
always_inline void
tcp_reuse_buffer (vlib_main_t * vm, vlib_buffer_t * b)
{
@@ -421,8 +449,6 @@
tc->rcv_nxt, tcp_hdr_opts_len, flags, wnd);
tcp_options_write ((u8 *) (th + 1), snd_opts);
-
- /* Mark as ACK */
vnet_buffer (b)->tcp.connection_index = tc->c_c_index;
}
@@ -432,12 +458,12 @@
void
tcp_make_ack (tcp_connection_t * tc, vlib_buffer_t * b)
{
- tcp_main_t *tm = vnet_get_tcp_main ();
- vlib_main_t *vm = tm->vlib_main;
+ vlib_main_t *vm = vlib_get_main ();
tcp_reuse_buffer (vm, b);
tcp_make_ack_i (tc, b, TCP_STATE_ESTABLISHED, TCP_FLAG_ACK);
vnet_buffer (b)->tcp.flags = TCP_BUF_FLAG_ACK;
+ TCP_EVT_DBG (TCP_EVT_ACK_SENT, tc);
}
/**
@@ -446,8 +472,7 @@
void
tcp_make_fin (tcp_connection_t * tc, vlib_buffer_t * b)
{
- tcp_main_t *tm = vnet_get_tcp_main ();
- vlib_main_t *vm = tm->vlib_main;
+ vlib_main_t *vm = vlib_get_main ();
u8 flags = 0;
tcp_reuse_buffer (vm, b);
@@ -467,8 +492,7 @@
void
tcp_make_synack (tcp_connection_t * tc, vlib_buffer_t * b)
{
- tcp_main_t *tm = vnet_get_tcp_main ();
- vlib_main_t *vm = tm->vlib_main;
+ vlib_main_t *vm = vlib_get_main ();
tcp_options_t _snd_opts, *snd_opts = &_snd_opts;
u8 tcp_opts_len, tcp_hdr_opts_len;
tcp_header_t *th;
@@ -631,7 +655,7 @@
vlib_buffer_t *b;
u32 bi;
tcp_main_t *tm = vnet_get_tcp_main ();
- vlib_main_t *vm = tm->vlib_main;
+ vlib_main_t *vm = vlib_get_main ();
u8 tcp_hdr_len, flags = 0;
tcp_header_t *th, *pkt_th;
u32 seq, ack;
@@ -736,7 +760,7 @@
vlib_buffer_t *b;
u32 bi;
tcp_main_t *tm = vnet_get_tcp_main ();
- vlib_main_t *vm = tm->vlib_main;
+ vlib_main_t *vm = vlib_get_main ();
u8 tcp_hdr_opts_len, tcp_opts_len;
tcp_header_t *th;
u32 time_now;
@@ -795,9 +819,9 @@
/* Decide where to send the packet */
next_index = is_ip4 ? tcp4_output_node.index : tcp6_output_node.index;
- f = vlib_get_frame_to_node (vm, next_index);
/* Enqueue the packet */
+ f = vlib_get_frame_to_node (vm, next_index);
to_next = vlib_frame_vector_args (f);
to_next[0] = bi;
f->n_vectors = 1;
@@ -813,7 +837,7 @@
vlib_buffer_t *b;
u32 bi;
tcp_main_t *tm = vnet_get_tcp_main ();
- vlib_main_t *vm = tm->vlib_main;
+ vlib_main_t *vm = vlib_get_main ();
tcp_get_free_buffer_index (tm, &bi);
b = vlib_get_buffer (vm, bi);
@@ -884,22 +908,21 @@
vnet_buffer (b)->tcp.connection_index = tc->c_c_index;
tc->snd_nxt += data_len;
+ /* TODO this is updated in output as well ... */
+ if (tc->snd_nxt > tc->snd_una_max)
+ tc->snd_una_max = tc->snd_nxt;
TCP_EVT_DBG (TCP_EVT_PKTIZE, tc);
}
-/* Send delayed ACK when timer expires */
void
-tcp_timer_delack_handler (u32 index)
+tcp_send_ack (tcp_connection_t * tc)
{
tcp_main_t *tm = vnet_get_tcp_main ();
- vlib_main_t *vm = tm->vlib_main;
- u32 thread_index = os_get_cpu_number ();
- tcp_connection_t *tc;
+ vlib_main_t *vm = vlib_get_main ();
+
vlib_buffer_t *b;
u32 bi;
- tc = tcp_connection_get (index, thread_index);
-
/* Get buffer */
tcp_get_free_buffer_index (tm, &bi);
b = vlib_get_buffer (vm, bi);
@@ -907,12 +930,22 @@
/* Fill in the ACK */
tcp_make_ack (tc, b);
- tc->timers[TCP_TIMER_DELACK] = TCP_TIMER_HANDLE_INVALID;
- tc->flags &= ~TCP_CONN_DELACK;
-
tcp_enqueue_to_output (vm, b, bi, tc->c_is_ip4);
}
+/* Send delayed ACK when timer expires */
+void
+tcp_timer_delack_handler (u32 index)
+{
+ u32 thread_index = os_get_cpu_number ();
+ tcp_connection_t *tc;
+
+ tc = tcp_connection_get (index, thread_index);
+ tc->timers[TCP_TIMER_DELACK] = TCP_TIMER_HANDLE_INVALID;
+// tc->flags &= ~TCP_CONN_DELACK;
+ tcp_send_ack (tc);
+}
+
/** Build a retransmit segment
*
* @return the number of bytes in the segment or 0 if there's nothing to
@@ -920,59 +953,74 @@
* */
u32
tcp_prepare_retransmit_segment (tcp_connection_t * tc, vlib_buffer_t * b,
- u32 max_bytes)
+ u32 offset, u32 max_bytes)
{
- tcp_main_t *tm = vnet_get_tcp_main ();
- vlib_main_t *vm = tm->vlib_main;
- u32 n_bytes, offset = 0;
- sack_scoreboard_hole_t *hole;
- u32 hole_size;
+ vlib_main_t *vm = vlib_get_main ();
+ u32 n_bytes = 0;
tcp_reuse_buffer (vm, b);
ASSERT (tc->state >= TCP_STATE_ESTABLISHED);
ASSERT (max_bytes != 0);
- if (tcp_opts_sack_permitted (&tc->opt))
+ max_bytes = clib_min (tc->snd_mss, max_bytes);
+
+ /* Start is beyond snd_congestion */
+ if (seq_geq (tc->snd_una + offset, tc->snd_congestion))
+ goto done;
+
+ /* Don't overshoot snd_congestion */
+ if (seq_gt (tc->snd_nxt + max_bytes, tc->snd_congestion))
{
- /* XXX get first hole not retransmitted yet */
- hole = scoreboard_first_hole (&tc->sack_sb);
- if (!hole)
- return 0;
-
- offset = hole->start - tc->snd_una;
- hole_size = hole->end - hole->start;
-
- ASSERT (hole_size);
-
- if (hole_size < max_bytes)
- max_bytes = hole_size;
+ max_bytes = tc->snd_congestion - tc->snd_nxt;
+ if (max_bytes == 0)
+ goto done;
}
- else
- {
- if (seq_geq (tc->snd_nxt, tc->snd_una_max))
- return 0;
- }
+
+ ASSERT (max_bytes <= tc->snd_mss);
n_bytes = stream_session_peek_bytes (&tc->connection,
vlib_buffer_get_current (b), offset,
max_bytes);
ASSERT (n_bytes != 0);
-
+ b->current_length = n_bytes;
tcp_push_hdr_i (tc, b, tc->state);
+done:
+ TCP_EVT_DBG (TCP_EVT_CC_RTX, tc, offset, n_bytes);
return n_bytes;
}
+/**
+ * Reset congestion control, switch cwnd to loss window and try again.
+ */
+static void
+tcp_rtx_timeout_cc_recover (tcp_connection_t * tc)
+{
+ /* Cleanly recover cc (also clears up fast retransmit) */
+ if (tcp_in_fastrecovery (tc))
+ {
+ tcp_cc_recover (tc);
+ }
+ else
+ {
+ tc->ssthresh = clib_max (tcp_flight_size (tc) / 2, 2 * tc->snd_mss);
+ }
+
+ /* Start again from the beginning */
+ tc->cwnd = tcp_loss_wnd (tc);
+ tc->snd_congestion = tc->snd_una_max;
+}
+
static void
tcp_timer_retransmit_handler_i (u32 index, u8 is_syn)
{
tcp_main_t *tm = vnet_get_tcp_main ();
- vlib_main_t *vm = tm->vlib_main;
+ vlib_main_t *vm = vlib_get_main ();
u32 thread_index = os_get_cpu_number ();
tcp_connection_t *tc;
vlib_buffer_t *b;
- u32 bi, max_bytes, snd_space;
+ u32 bi, snd_space, n_bytes;
if (is_syn)
{
@@ -998,26 +1046,43 @@
if (tc->state >= TCP_STATE_ESTABLISHED)
{
- tcp_fastrecovery_off (tc);
+ /* First retransmit timeout */
+ if (tc->rto_boff == 1)
+ tcp_rtx_timeout_cc_recover (tc);
/* Exponential backoff */
tc->rto = clib_min (tc->rto << 1, TCP_RTO_MAX);
/* Figure out what and how many bytes we can send */
snd_space = tcp_available_snd_space (tc);
- max_bytes = clib_min (tc->snd_mss, snd_space);
- if (max_bytes == 0)
+ TCP_EVT_DBG (TCP_EVT_CC_EVT, tc, 1);
+
+ if (snd_space == 0)
{
clib_warning ("no wnd to retransmit");
+ tcp_return_buffer (tm);
+
+ /* Force one segment */
+ tcp_retransmit_first_unacked (tc);
+
+ /* Re-enable retransmit timer. Output may be unwilling
+ * to do it for us */
+ tcp_retransmit_timer_set (tc);
+
return;
}
- tcp_prepare_retransmit_segment (tc, b, max_bytes);
+ else
+ {
+ /* No fancy recovery for now! */
+ n_bytes = tcp_prepare_retransmit_segment (tc, b, 0, snd_space);
+ scoreboard_clear (&tc->sack_sb);
- tc->rtx_bytes += max_bytes;
+ if (n_bytes == 0)
+ return;
- /* No fancy recovery for now! */
- scoreboard_clear (&tc->sack_sb);
+ tc->rtx_bytes += n_bytes;
+ }
}
else
{
@@ -1072,63 +1137,110 @@
}
/**
- * Retansmit first unacked segment */
+ * Retransmit first unacked segment
+ */
void
tcp_retransmit_first_unacked (tcp_connection_t * tc)
{
tcp_main_t *tm = vnet_get_tcp_main ();
- u32 snd_nxt = tc->snd_nxt;
+ vlib_main_t *vm = vlib_get_main ();
vlib_buffer_t *b;
- u32 bi;
+ u32 bi, n_bytes;
tc->snd_nxt = tc->snd_una;
/* Get buffer */
tcp_get_free_buffer_index (tm, &bi);
- b = vlib_get_buffer (tm->vlib_main, bi);
+ b = vlib_get_buffer (vm, bi);
- tcp_prepare_retransmit_segment (tc, b, tc->snd_mss);
- tcp_enqueue_to_output (tm->vlib_main, b, bi, tc->c_is_ip4);
+ TCP_EVT_DBG (TCP_EVT_CC_EVT, tc, 2);
- tc->snd_nxt = snd_nxt;
- tc->rtx_bytes += tc->snd_mss;
+ n_bytes = tcp_prepare_retransmit_segment (tc, b, 0, tc->snd_mss);
+ if (n_bytes == 0)
+ return;
+
+ tcp_enqueue_to_output (vm, b, bi, tc->c_is_ip4);
+ tc->rtx_bytes += n_bytes;
}
+sack_scoreboard_hole_t *
+scoreboard_first_rtx_hole (sack_scoreboard_t * sb)
+{
+ sack_scoreboard_hole_t *hole = 0;
+
+// hole = scoreboard_first_hole (&tc->sack_sb);
+// if (hole)
+// {
+//
+// offset = hole->start - tc->snd_una;
+// hole_size = hole->end - hole->start;
+//
+// ASSERT(hole_size);
+//
+// if (hole_size < max_bytes)
+// max_bytes = hole_size;
+// }
+ return hole;
+}
+
+/**
+ * Do fast retransmit.
+ */
void
tcp_fast_retransmit (tcp_connection_t * tc)
{
tcp_main_t *tm = vnet_get_tcp_main ();
- u32 snd_space, max_bytes, n_bytes, bi;
+ vlib_main_t *vm = vlib_get_main ();
+ u32 bi;
+ int snd_space;
+ u32 n_written = 0, offset = 0;
vlib_buffer_t *b;
+ u8 use_sacks = 0;
ASSERT (tcp_in_fastrecovery (tc));
- clib_warning ("fast retransmit!");
-
/* Start resending from first un-acked segment */
tc->snd_nxt = tc->snd_una;
snd_space = tcp_available_snd_space (tc);
+ TCP_EVT_DBG (TCP_EVT_CC_EVT, tc, 0);
- while (snd_space)
+ /* If we have SACKs use them */
+ if (tcp_opts_sack_permitted (&tc->opt)
+ && scoreboard_first_hole (&tc->sack_sb))
+ use_sacks = 0;
+
+ while (snd_space > 0)
{
tcp_get_free_buffer_index (tm, &bi);
- b = vlib_get_buffer (tm->vlib_main, bi);
+ b = vlib_get_buffer (vm, bi);
- max_bytes = clib_min (tc->snd_mss, snd_space);
- n_bytes = tcp_prepare_retransmit_segment (tc, b, max_bytes);
+ if (use_sacks)
+ {
+ scoreboard_first_rtx_hole (&tc->sack_sb);
+ }
+ else
+ {
+ offset += n_written;
+ }
+
+ n_written = tcp_prepare_retransmit_segment (tc, b, offset, snd_space);
/* Nothing left to retransmit */
- if (n_bytes == 0)
- return;
+ if (n_written == 0)
+ {
+ tcp_return_buffer (tm);
+ break;
+ }
- tcp_enqueue_to_output (tm->vlib_main, b, bi, tc->c_is_ip4);
-
- snd_space -= n_bytes;
+ tcp_enqueue_to_output (vm, b, bi, tc->c_is_ip4);
+ tc->rtx_bytes += n_written;
+ snd_space -= n_written;
}
- /* If window allows, send new data */
- tc->snd_nxt = tc->snd_una_max;
+ /* If window allows, send 1 SMSS of new data */
+ if (seq_lt (tc->snd_nxt, tc->snd_congestion))
+ tc->snd_nxt = tc->snd_congestion;
}
always_inline u32
@@ -1209,8 +1321,6 @@
if (PREDICT_FALSE
(vnet_buffer (b0)->tcp.flags & TCP_BUF_FLAG_DUPACK))
{
- ASSERT (tc0->snt_dupacks > 0);
- tc0->snt_dupacks--;
if (!tcp_session_has_ooo_data (tc0))
{
error0 = TCP_ERROR_FILTERED_DUPACKS;
@@ -1223,8 +1333,7 @@
tc0->rcv_las = tc0->rcv_nxt;
/* Stop DELACK timer and fix flags */
- tc0->flags &=
- ~(TCP_CONN_SNDACK | TCP_CONN_DELACK | TCP_CONN_BURSTACK);
+ tc0->flags &= ~(TCP_CONN_SNDACK);
if (tcp_timer_is_active (tc0, TCP_TIMER_DELACK))
{
tcp_timer_reset (tc0, TCP_TIMER_DELACK);
diff --git a/src/vnet/tcp/tcp_packet.h b/src/vnet/tcp/tcp_packet.h
index 866c5fd..4f28cf3 100644
--- a/src/vnet/tcp/tcp_packet.h
+++ b/src/vnet/tcp/tcp_packet.h
@@ -137,7 +137,7 @@
typedef struct _sack_block
{
u32 start; /**< Start sequence number */
- u32 end; /**< End sequence number */
+ u32 end; /**< End sequence number (first outside) */
} sack_block_t;
typedef struct
diff --git a/src/vnet/tcp/tcp_test.c b/src/vnet/tcp/tcp_test.c
new file mode 100644
index 0000000..0725bb0
--- /dev/null
+++ b/src/vnet/tcp/tcp_test.c
@@ -0,0 +1,216 @@
+/*
+ * Copyright (c) 2017 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <vnet/tcp/tcp.h>
+
+#define TCP_TEST_I(_cond, _comment, _args...) \
+({ \
+ int _evald = (_cond); \
+ if (!(_evald)) { \
+ fformat(stderr, "FAIL:%d: " _comment "\n", \
+ __LINE__, ##_args); \
+ } else { \
+ fformat(stderr, "PASS:%d: " _comment "\n", \
+ __LINE__, ##_args); \
+ } \
+ _evald; \
+})
+
+#define TCP_TEST(_cond, _comment, _args...) \
+{ \
+ if (!TCP_TEST_I(_cond, _comment, ##_args)) { \
+ return 1; \
+ } \
+}
+
+static int
+tcp_test_sack ()
+{
+ tcp_connection_t _tc, *tc = &_tc;
+ sack_scoreboard_t *sb = &tc->sack_sb;
+ sack_block_t *sacks = 0, block;
+ sack_scoreboard_hole_t *hole;
+ int i;
+
+ memset (tc, 0, sizeof (*tc));
+
+ tc->snd_una = 0;
+ tc->snd_una_max = 1000;
+ tc->snd_nxt = 1000;
+ tc->opt.flags |= TCP_OPTS_FLAG_SACK;
+ scoreboard_init (&tc->sack_sb);
+
+ for (i = 0; i < 1000 / 100; i++)
+ {
+ block.start = i * 100;
+ block.end = (i + 1) * 100;
+ vec_add1 (sacks, block);
+ }
+
+ /*
+ * Inject even blocks
+ */
+
+ for (i = 0; i < 1000 / 200; i++)
+ {
+ vec_add1 (tc->opt.sacks, sacks[i * 2]);
+ }
+ tc->opt.n_sack_blocks = vec_len (tc->opt.sacks);
+ tcp_rcv_sacks (tc, 0);
+
+ TCP_TEST ((pool_elts (sb->holes) == 5),
+ "scoreboard has %d elements", pool_elts (sb->holes));
+
+ /* First SACK block should be rejected */
+ hole = scoreboard_first_hole (sb);
+ TCP_TEST ((hole->start == 0 && hole->end == 200),
+ "first hole start %u end %u", hole->start, hole->end);
+ hole = scoreboard_last_hole (sb);
+ TCP_TEST ((hole->start == 900 && hole->end == 1000),
+ "last hole start %u end %u", hole->start, hole->end);
+ TCP_TEST ((sb->sacked_bytes == 400), "sacked bytes %d", sb->sacked_bytes);
+ TCP_TEST ((sb->snd_una_adv == 0), "snd_una_adv %u", sb->snd_una_adv);
+ TCP_TEST ((sb->last_sacked_bytes == 400),
+ "last sacked bytes %d", sb->last_sacked_bytes);
+
+ /*
+ * Inject odd blocks
+ */
+
+ vec_reset_length (tc->opt.sacks);
+ for (i = 0; i < 1000 / 200; i++)
+ {
+ vec_add1 (tc->opt.sacks, sacks[i * 2 + 1]);
+ }
+ tc->opt.n_sack_blocks = vec_len (tc->opt.sacks);
+ tcp_rcv_sacks (tc, 0);
+
+ hole = scoreboard_first_hole (sb);
+ TCP_TEST ((pool_elts (sb->holes) == 1),
+ "scoreboard has %d holes", pool_elts (sb->holes));
+ TCP_TEST ((hole->start == 0 && hole->end == 100),
+ "first hole start %u end %u", hole->start, hole->end);
+ TCP_TEST ((sb->sacked_bytes == 900), "sacked bytes %d", sb->sacked_bytes);
+ TCP_TEST ((sb->snd_una_adv == 0), "snd_una_adv %u", sb->snd_una_adv);
+ TCP_TEST ((sb->max_byte_sacked == 1000),
+ "max sacked byte %u", sb->max_byte_sacked);
+ TCP_TEST ((sb->last_sacked_bytes == 500),
+ "last sacked bytes %d", sb->last_sacked_bytes);
+
+ /*
+ * Ack until byte 100, all bytes are now acked + sacked
+ */
+ tcp_rcv_sacks (tc, 100);
+
+ TCP_TEST ((pool_elts (sb->holes) == 0),
+ "scoreboard has %d elements", pool_elts (sb->holes));
+ TCP_TEST ((sb->snd_una_adv == 900),
+ "snd_una_adv after ack %u", sb->snd_una_adv);
+ TCP_TEST ((sb->max_byte_sacked == 1000),
+ "max sacked byte %u", sb->max_byte_sacked);
+ TCP_TEST ((sb->sacked_bytes == 0), "sacked bytes %d", sb->sacked_bytes);
+ TCP_TEST ((sb->last_sacked_bytes == 0),
+ "last sacked bytes %d", sb->last_sacked_bytes);
+
+ /*
+ * Add new block
+ */
+
+ vec_reset_length (tc->opt.sacks);
+
+ block.start = 1200;
+ block.end = 1300;
+ vec_add1 (tc->opt.sacks, block);
+
+ tc->snd_una_max = 1500;
+ tc->snd_una = 1000;
+ tc->snd_nxt = 1500;
+ tcp_rcv_sacks (tc, 1000);
+
+ TCP_TEST ((sb->snd_una_adv == 0),
+ "snd_una_adv after ack %u", sb->snd_una_adv);
+ TCP_TEST ((pool_elts (sb->holes) == 2),
+ "scoreboard has %d holes", pool_elts (sb->holes));
+ hole = scoreboard_first_hole (sb);
+ TCP_TEST ((hole->start == 1000 && hole->end == 1200),
+ "first hole start %u end %u", hole->start, hole->end);
+ hole = scoreboard_last_hole (sb);
+ TCP_TEST ((hole->start == 1300 && hole->end == 1500),
+ "last hole start %u end %u", hole->start, hole->end);
+ TCP_TEST ((sb->sacked_bytes == 100), "sacked bytes %d", sb->sacked_bytes);
+
+ /*
+ * Ack first hole
+ */
+
+ vec_reset_length (tc->opt.sacks);
+ tcp_rcv_sacks (tc, 1200);
+
+ TCP_TEST ((sb->snd_una_adv == 100),
+ "snd_una_adv after ack %u", sb->snd_una_adv);
+ TCP_TEST ((sb->sacked_bytes == 0), "sacked bytes %d", sb->sacked_bytes);
+ TCP_TEST ((pool_elts (sb->holes) == 1),
+ "scoreboard has %d elements", pool_elts (sb->holes));
+
+ /*
+ * Remove all
+ */
+
+ scoreboard_clear (sb);
+ TCP_TEST ((pool_elts (sb->holes) == 0),
+ "number of holes %d", pool_elts (sb->holes));
+ return 0;
+}
+
+static clib_error_t *
+tcp_test (vlib_main_t * vm,
+ unformat_input_t * input, vlib_cli_command_t * cmd_arg)
+{
+ int res = 0;
+
+ while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
+ {
+ if (unformat (input, "sack"))
+ {
+ res = tcp_test_sack ();
+ }
+ else
+ {
+ return clib_error_return (0, "unknown input `%U'",
+ format_unformat_error, input);
+ }
+ }
+
+ if (res)
+ {
+ return clib_error_return (0, "TCP unit test failed");
+ }
+ else
+ {
+ return 0;
+ }
+}
+
+VLIB_CLI_COMMAND (tcp_test_command, static) =
+{
+.path = "test tcp",.short_help = "internal tcp unit tests",.function =
+ tcp_test,};
+/*
+ * fd.io coding-style-patch-verification: ON
+ *
+ * Local Variables:
+ * eval: (c-set-style "gnu")
+ * End:
+ */
diff --git a/src/vnet/udp/builtin_server.c b/src/vnet/udp/builtin_server.c
index 46c8e73..57f774c 100644
--- a/src/vnet/udp/builtin_server.c
+++ b/src/vnet/udp/builtin_server.c
@@ -39,10 +39,10 @@
}
static int
-builtin_server_rx_callback (stream_session_t * s, session_fifo_event_t * ep)
+builtin_server_rx_callback (stream_session_t * s)
{
svm_fifo_t *rx_fifo, *tx_fifo;
- u32 this_transfer;
+ u32 this_transfer, max_deq, max_enq;
int actual_transfer;
u8 *my_copy_buffer;
session_fifo_event_t evt;
@@ -52,9 +52,9 @@
rx_fifo = s->server_rx_fifo;
tx_fifo = s->server_tx_fifo;
- this_transfer = svm_fifo_max_enqueue (tx_fifo)
- < svm_fifo_max_dequeue (rx_fifo) ?
- svm_fifo_max_enqueue (tx_fifo) : svm_fifo_max_dequeue (rx_fifo);
+ max_deq = svm_fifo_max_dequeue (rx_fifo);
+ max_enq = svm_fifo_max_enqueue (tx_fifo);
+ this_transfer = max_enq < max_deq ? max_enq : max_deq;
vec_validate (my_copy_buffer, this_transfer - 1);
_vec_len (my_copy_buffer) = this_transfer;
@@ -64,17 +64,20 @@
ASSERT (actual_transfer == this_transfer);
actual_transfer = svm_fifo_enqueue_nowait (tx_fifo, 0, this_transfer,
my_copy_buffer);
+ ASSERT (actual_transfer == this_transfer);
copy_buffers[s->thread_index] = my_copy_buffer;
- /* Fabricate TX event, send to ourselves */
- evt.fifo = tx_fifo;
- evt.event_type = FIFO_EVENT_SERVER_TX;
- /* $$$$ for event logging */
- evt.enqueue_length = actual_transfer;
- evt.event_id = 0;
- q = session_manager_get_vpp_event_queue (s->thread_index);
- unix_shared_memory_queue_add (q, (u8 *) & evt, 0 /* do wait for mutex */ );
+ if (svm_fifo_set_event (tx_fifo))
+ {
+ /* Fabricate TX event, send to ourselves */
+ evt.fifo = tx_fifo;
+ evt.event_type = FIFO_EVENT_SERVER_TX;
+ evt.event_id = 0;
+ q = session_manager_get_vpp_event_queue (s->thread_index);
+ unix_shared_memory_queue_add (q, (u8 *) & evt,
+ 0 /* do wait for mutex */ );
+ }
return 0;
}
diff --git a/src/vnet/udp/udp_input.c b/src/vnet/udp/udp_input.c
index 8827873..4b22109 100644
--- a/src/vnet/udp/udp_input.c
+++ b/src/vnet/udp/udp_input.c
@@ -244,44 +244,53 @@
/* Get session's server */
server0 = application_get (s0->app_index);
- /* Fabricate event */
- evt.fifo = s0->server_rx_fifo;
- evt.event_type = FIFO_EVENT_SERVER_RX;
- evt.event_id = serial_number++;
- evt.enqueue_length = svm_fifo_max_dequeue (s0->server_rx_fifo);
-
/* Built-in server? Deliver the goods... */
if (server0->cb_fns.builtin_server_rx_callback)
{
- server0->cb_fns.builtin_server_rx_callback (s0, &evt);
+ server0->cb_fns.builtin_server_rx_callback (s0);
continue;
}
- /* Add event to server's event queue */
- q = server0->event_queue;
-
- /* Don't block for lack of space */
- if (PREDICT_TRUE (q->cursize < q->maxsize))
- unix_shared_memory_queue_add (server0->event_queue, (u8 *) & evt,
- 0 /* do wait for mutex */ );
- else
+ if (svm_fifo_set_event (s0->server_rx_fifo))
{
- vlib_node_increment_counter (vm, udp4_uri_input_node.index,
- SESSION_ERROR_FIFO_FULL, 1);
+ /* Fabricate event */
+ evt.fifo = s0->server_rx_fifo;
+ evt.event_type = FIFO_EVENT_SERVER_RX;
+ evt.event_id = serial_number++;
+
+ /* Add event to server's event queue */
+ q = server0->event_queue;
+
+ /* Don't block for lack of space */
+ if (PREDICT_TRUE (q->cursize < q->maxsize))
+ {
+ unix_shared_memory_queue_add (server0->event_queue,
+ (u8 *) & evt,
+ 0 /* do wait for mutex */ );
+ }
+ else
+ {
+ vlib_node_increment_counter (vm, udp4_uri_input_node.index,
+ SESSION_ERROR_FIFO_FULL, 1);
+ }
}
+ /* *INDENT-OFF* */
if (1)
{
ELOG_TYPE_DECLARE (e) =
{
- .format = "evt-enqueue: id %d length %d",.format_args = "i4i4",};
+ .format = "evt-enqueue: id %d length %d",
+ .format_args = "i4i4",};
struct
{
u32 data[2];
} *ed;
ed = ELOG_DATA (&vlib_global_main.elog_main, e);
ed->data[0] = evt.event_id;
- ed->data[1] = evt.enqueue_length;
+ ed->data[1] = svm_fifo_max_dequeue (s0->server_rx_fifo);
}
+ /* *INDENT-ON* */
+
}
vec_reset_length (session_indices_to_enqueue);