Implement sack based tcp loss recovery (RFC 6675)
- refactor existing congestion control code (RFC 6582/5681). Handling of ack
feedback now consists of: ack parsing, cc event detection, event handling,
congestion control update
- extend sack scoreboard to support sack based retransmissions
- basic implementation of Eifel detection algorithm (RFC 3522) for
detecting spurious retransmissions
- actually initialize the per-thread frame freelist hash tables
- increase worker stack size to 2mb
- fix session queue node out-of-buffer handling
- ensure that the local buffer cache vec_len matches reality
- avoid 2x spurious event requeues when short of buffers
- count out-of-buffer events
- make the builtin server thread-safe
- fix bihash template threading issue: need to paint -1 across uninitialized
working_copy_length vector elements (via rebase from master)
Change-Id: I646cb9f1add9a67d08f4a87badbcb117980ebfc4
Signed-off-by: Florin Coras <fcoras@cisco.com>
Signed-off-by: Dave Barach <dbarach@cisco.com>
diff --git a/src/vnet/session/node.c b/src/vnet/session/node.c
index 3053ccc..07eeae8 100644
--- a/src/vnet/session/node.c
+++ b/src/vnet/session/node.c
@@ -47,7 +47,8 @@
#define foreach_session_queue_error \
_(TX, "Packets transmitted") \
-_(TIMER, "Timer events")
+_(TIMER, "Timer events") \
+_(NO_BUFFER, "Out of buffers")
typedef enum
{
@@ -141,6 +142,7 @@
u8 *data0;
int i, n_bytes_read;
u32 n_bytes_per_buf, deq_per_buf;
+ u32 buffers_allocated, buffers_allocated_this_call;
next_index = next0 = session_type_to_next[s0->session_type];
@@ -167,9 +169,6 @@
/* Check how much we can pull. If buffering, subtract the offset */
max_dequeue0 = svm_fifo_max_dequeue (s0->server_tx_fifo) - rx_offset;
- /* Allow enqueuing of a new event */
- svm_fifo_unset_event (s0->server_tx_fifo);
-
/* Nothing to read return */
if (max_dequeue0 == 0)
return 0;
@@ -187,8 +186,8 @@
max_len_to_snd0 = snd_space0;
}
- n_bytes_per_buf = vlib_buffer_free_list_buffer_size (vm,
- VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX);
+ n_bytes_per_buf = vlib_buffer_free_list_buffer_size
+ (vm, VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX);
n_bytes_per_seg = MAX_HDRS_LEN + snd_mss0;
n_bufs_per_seg = ceil ((double) n_bytes_per_seg / n_bytes_per_buf);
n_bufs_per_evt = (ceil ((double) max_len_to_snd0 / n_bytes_per_seg))
@@ -205,24 +204,33 @@
if (PREDICT_FALSE (n_bufs < VLIB_FRAME_SIZE))
{
vec_validate (smm->tx_buffers[thread_index],
- n_bufs + VLIB_FRAME_SIZE - 1);
- n_bufs += vlib_buffer_alloc (vm,
- &smm->tx_buffers[thread_index][n_bufs],
- VLIB_FRAME_SIZE);
+ n_bufs + 2 * VLIB_FRAME_SIZE - 1);
- /* buffer shortage
- * XXX 0.9 because when debugging we might not get a full frame */
- if (PREDICT_FALSE (n_bufs < 0.9 * VLIB_FRAME_SIZE))
+ buffers_allocated = 0;
+ do
{
- if (svm_fifo_set_event (s0->server_tx_fifo))
- {
- vec_add1 (smm->pending_event_vector[thread_index], *e0);
- }
- return -1;
+ buffers_allocated_this_call =
+ vlib_buffer_alloc
+ (vm,
+ &smm->tx_buffers[thread_index][n_bufs + buffers_allocated],
+ 2 * VLIB_FRAME_SIZE - buffers_allocated);
+ buffers_allocated += buffers_allocated_this_call;
}
+ while (buffers_allocated_this_call > 0
+ && ((buffers_allocated + n_bufs < VLIB_FRAME_SIZE)));
+
+ n_bufs += buffers_allocated;
_vec_len (smm->tx_buffers[thread_index]) = n_bufs;
+
+ if (PREDICT_FALSE (n_bufs < VLIB_FRAME_SIZE))
+ {
+ vec_add1 (smm->pending_event_vector[thread_index], *e0);
+ return -1;
+ }
}
+ /* Allow enqueuing of a new event */
+ svm_fifo_unset_event (s0->server_tx_fifo);
vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next);
while (left_to_snd0 && n_left_to_next >= n_bufs_per_seg)
@@ -232,7 +240,9 @@
*/
/* Get free buffer */
+ ASSERT (n_bufs >= 1);
bi0 = smm->tx_buffers[thread_index][--n_bufs];
+ ASSERT (bi0);
_vec_len (smm->tx_buffers[thread_index]) = n_bufs;
b0 = vlib_get_buffer (vm, bi0);
@@ -545,9 +555,10 @@
my_thread_index,
&n_tx_packets);
/* Out of buffers */
- if (rv < 0)
+ if (PREDICT_FALSE (rv < 0))
{
- vec_add1 (smm->pending_event_vector[my_thread_index], *e0);
+ vlib_node_increment_counter (vm, node->node_index,
+ SESSION_QUEUE_ERROR_NO_BUFFER, 1);
continue;
}
break;
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c
index 02b0cce..534598d 100644
--- a/src/vnet/session/session.c
+++ b/src/vnet/session/session.c
@@ -551,7 +551,7 @@
stream_session_no_space (transport_connection_t * tc, u32 thread_index,
u16 data_len)
{
- stream_session_t *s = stream_session_get (tc->c_index, thread_index);
+ stream_session_t *s = stream_session_get (tc->s_index, thread_index);
if (PREDICT_FALSE (s->session_state != SESSION_STATE_READY))
return 1;
@@ -563,6 +563,15 @@
}
u32
+stream_session_tx_fifo_max_dequeue (transport_connection_t * tc)
+{
+ stream_session_t *s = stream_session_get (tc->s_index, tc->thread_index);
+ if (s->session_state != SESSION_STATE_READY)
+ return 0;
+ return svm_fifo_max_dequeue (s->server_tx_fifo);
+}
+
+int
stream_session_peek_bytes (transport_connection_t * tc, u8 * buffer,
u32 offset, u32 max_bytes)
{
diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h
index a872864..d9c38bd 100644
--- a/src/vnet/session/session.h
+++ b/src/vnet/session/session.h
@@ -352,16 +352,18 @@
}
always_inline u32
-stream_session_fifo_size (transport_connection_t * tc)
+stream_session_rx_fifo_size (transport_connection_t * tc)
{
stream_session_t *s = stream_session_get (tc->s_index, tc->thread_index);
return s->server_rx_fifo->nitems;
}
+u32 stream_session_tx_fifo_max_dequeue (transport_connection_t * tc);
+
int
stream_session_enqueue_data (transport_connection_t * tc, vlib_buffer_t * b,
u32 offset, u8 queue_event, u8 is_in_order);
-u32
+int
stream_session_peek_bytes (transport_connection_t * tc, u8 * buffer,
u32 offset, u32 max_bytes);
u32 stream_session_dequeue_drop (transport_connection_t * tc, u32 max_bytes);
diff --git a/src/vnet/session/session_cli.c b/src/vnet/session/session_cli.c
index 509eedb..6b8341a 100755
--- a/src/vnet/session/session_cli.c
+++ b/src/vnet/session/session_cli.c
@@ -15,6 +15,15 @@
#include <vnet/session/application.h>
#include <vnet/session/session.h>
+u8 *
+format_stream_session_fifos (u8 * s, va_list * args)
+{
+ stream_session_t *ss = va_arg (*args, stream_session_t *);
+ s = format (s, " Rx fifo: %U", format_svm_fifo, ss->server_rx_fifo, 1);
+ s = format (s, " Tx fifo: %U", format_svm_fifo, ss->server_tx_fifo, 1);
+ return s;
+}
+
/**
* Format stream session as per the following format
*
@@ -44,6 +53,8 @@
ss->thread_index, verbose);
if (verbose == 1)
s = format (s, "%v", str);
+ if (verbose > 1)
+ s = format (s, "%U", format_stream_session_fifos, ss);
}
else if (ss->session_state == SESSION_STATE_LISTENING)
{
@@ -57,8 +68,12 @@
}
else if (ss->session_state == SESSION_STATE_CLOSED)
{
- s = format (s, "[CL] %-40U%v", tp_vft->format_connection,
- ss->connection_index, ss->thread_index, verbose, str);
+ s = format (s, "[CL] %-40U", tp_vft->format_connection,
+ ss->connection_index, ss->thread_index, verbose);
+ if (verbose == 1)
+ s = format (s, "%v", str);
+ if (verbose > 1)
+ s = format (s, "%U", format_stream_session_fifos, ss);
}
else
{
@@ -124,13 +139,6 @@
({
vec_reset_length (str);
str = format (str, "%U", format_stream_session, s, verbose);
- if (verbose > 1)
- {
- str = format (str, " Rx fifo: %U", format_svm_fifo,
- s->server_rx_fifo, 1);
- str = format (str, " Tx fifo: %U", format_svm_fifo,
- s->server_tx_fifo, 1);
- }
vlib_cli_output (vm, "%v", str);
}));
/* *INDENT-ON* */