session tcp: handle rxt and acks as custom events
Type: feature
Control ack generation and retransmissions with session layer scheduler.
Change-Id: Iacdf9f84ab81f44851980aa45a83e75f29be2b7b
Signed-off-by: Florin Coras <fcoras@cisco.com>
diff --git a/src/plugins/quic/quic.c b/src/plugins/quic/quic.c
index 8bb9422..435113a 100644
--- a/src/plugins/quic/quic.c
+++ b/src/plugins/quic/quic.c
@@ -2100,7 +2100,7 @@
}
static int
-quic_custom_tx_callback (void *s)
+quic_custom_tx_callback (void *s, u32 max_burst_size)
{
session_t *stream_session = (session_t *) s;
quicly_stream_t *stream;
diff --git a/src/plugins/unittest/session_test.c b/src/plugins/unittest/session_test.c
index 00a70b8..819c426 100644
--- a/src/plugins/unittest/session_test.c
+++ b/src/plugins/unittest/session_test.c
@@ -375,8 +375,7 @@
SESSION_TEST ((error == 0), "connect should work");
/* wait for stuff to happen */
- while ((connected_session_index == ~0
- || vec_len (tcp_main.wrk_ctx[0].pending_acks)) && ++tries < 100)
+ while (connected_session_index == ~0 && ++tries < 100)
vlib_process_suspend (vm, 100e-3);
clib_warning ("waited %.1f seconds for connections", tries / 10.0);
SESSION_TEST ((connected_session_index != ~0), "session should exist");
diff --git a/src/vnet/session/application_local.c b/src/vnet/session/application_local.c
index 4a268c7..7d8fb46 100644
--- a/src/vnet/session/application_local.c
+++ b/src/vnet/session/application_local.c
@@ -467,7 +467,7 @@
}
static int
-ct_custom_tx (void *session)
+ct_custom_tx (void *session, u32 max_burst_size)
{
session_t *s = (session_t *) session;
if (session_has_transport (s))
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c
index 477215e..1c8b7fb 100644
--- a/src/vnet/session/session.c
+++ b/src/vnet/session/session.c
@@ -118,6 +118,31 @@
}
}
+void
+session_add_self_custom_tx_evt (transport_connection_t * tc, u8 has_prio)
+{
+ session_t *s;
+
+ s = session_get (tc->s_index, tc->thread_index);
+ ASSERT (s->thread_index == vlib_get_thread_index ());
+ if (!(s->flags & SESSION_F_CUSTOM_TX))
+ {
+ s->flags |= SESSION_F_CUSTOM_TX;
+ if (svm_fifo_set_event (s->tx_fifo))
+ {
+ session_worker_t *wrk;
+ session_evt_elt_t *elt;
+ wrk = session_main_get_worker (tc->thread_index);
+ if (has_prio)
+ elt = session_evt_alloc_new (wrk);
+ else
+ elt = session_evt_alloc_old (wrk);
+ elt->evt.session_index = tc->s_index;
+ elt->evt.event_type = SESSION_IO_EVT_TX;
+ }
+ }
+}
+
static void
session_program_transport_close (session_t * s)
{
diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h
index a3f2a01..8f7bd6c 100644
--- a/src/vnet/session/session.h
+++ b/src/vnet/session/session.h
@@ -243,6 +243,26 @@
session_evt_pending_disconnects_head (wrk));
}
+static inline session_evt_elt_t *
+session_evt_alloc_new (session_worker_t * wrk)
+{
+ session_evt_elt_t *elt;
+ elt = session_evt_elt_alloc (wrk);
+ clib_llist_add_tail (wrk->event_elts, evt_list, elt,
+ pool_elt_at_index (wrk->event_elts, wrk->new_head));
+ return elt;
+}
+
+static inline session_evt_elt_t *
+session_evt_alloc_old (session_worker_t * wrk)
+{
+ session_evt_elt_t *elt;
+ elt = session_evt_elt_alloc (wrk);
+ clib_llist_add_tail (wrk->event_elts, evt_list, elt,
+ pool_elt_at_index (wrk->event_elts, wrk->old_head));
+ return elt;
+}
+
always_inline u8
session_is_valid (u32 si, u8 thread_index)
{
@@ -394,6 +414,8 @@
void *rpc_args);
void session_send_rpc_evt_to_thread_force (u32 thread_index, void *fp,
void *rpc_args);
+void session_add_self_custom_tx_evt (transport_connection_t * tc,
+ u8 has_prio);
transport_connection_t *session_get_transport (session_t * s);
void session_get_endpoint (session_t * s, transport_endpoint_t * tep,
u8 is_lcl);
diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c
index 999776f..1bb5cb2 100644
--- a/src/vnet/session/session_node.c
+++ b/src/vnet/session/session_node.c
@@ -618,7 +618,7 @@
session_evt_elt_t * elt,
int *n_tx_packets, u8 peek_data)
{
- u32 next_index, next0, next1, *to_next, n_left_to_next;
+ u32 next_index, next0, next1, *to_next, n_left_to_next, max_burst;
u32 n_trace, n_bufs_needed = 0, n_left, pbi;
session_tx_context_t *ctx = &wrk->ctx;
session_main_t *smm = &session_main;
@@ -637,6 +637,7 @@
next_index = smm->session_type_to_next[ctx->s->session_type];
next0 = next1 = next_index;
+ max_burst = VLIB_FRAME_SIZE - *n_tx_packets;
tp = session_get_transport_proto (ctx->s);
ctx->transport_vft = transport_protocol_get_vft (tp);
@@ -649,6 +650,20 @@
ctx->transport_vft->flush_data (ctx->tc);
}
+ if (ctx->s->flags & SESSION_F_CUSTOM_TX)
+ {
+ u32 n_custom_tx;
+ ctx->s->flags &= ~SESSION_F_CUSTOM_TX;
+ n_custom_tx = ctx->transport_vft->custom_tx (ctx->tc, max_burst);
+ *n_tx_packets += n_custom_tx;
+ max_burst -= n_custom_tx;
+ if (!max_burst)
+ {
+ session_evt_add_old (wrk, elt);
+ return SESSION_TX_OK;
+ }
+ }
+
ctx->snd_space = transport_connection_snd_space (ctx->tc,
wrk->vm->clib_time.
last_cpu_time,
@@ -664,8 +679,7 @@
svm_fifo_unset_event (ctx->s->tx_fifo);
/* Check how much we can pull. */
- session_tx_set_dequeue_params (vm, ctx, VLIB_FRAME_SIZE - *n_tx_packets,
- peek_data);
+ session_tx_set_dequeue_params (vm, ctx, max_burst, peek_data);
if (PREDICT_FALSE (!ctx->max_len_to_snd))
return SESSION_TX_NO_DATA;
@@ -823,7 +837,8 @@
if (PREDICT_FALSE (s->session_state >= SESSION_STATE_TRANSPORT_CLOSED))
return 0;
svm_fifo_unset_event (s->tx_fifo);
- return transport_custom_tx (session_get_transport_proto (s), s);
+ return transport_custom_tx (session_get_transport_proto (s), s,
+ VLIB_FRAME_SIZE - *n_tx_packets);
}
always_inline session_t *
diff --git a/src/vnet/session/session_types.h b/src/vnet/session/session_types.h
index 25b6116..9a5bc76 100644
--- a/src/vnet/session/session_types.h
+++ b/src/vnet/session/session_types.h
@@ -142,6 +142,7 @@
{
SESSION_F_RX_EVT = 1,
SESSION_F_PROXY = (1 << 1),
+ SESSION_F_CUSTOM_TX = (1 << 2),
} session_flags_t;
typedef struct session_
diff --git a/src/vnet/session/transport.h b/src/vnet/session/transport.h
index 6e2feb0..058a9ae 100644
--- a/src/vnet/session/transport.h
+++ b/src/vnet/session/transport.h
@@ -52,7 +52,7 @@
u32 (*tx_fifo_offset) (transport_connection_t * tc);
void (*update_time) (f64 time_now, u8 thread_index);
void (*flush_data) (transport_connection_t *tconn);
- int (*custom_tx) (void *session);
+ int (*custom_tx) (void *session, u32 max_burst_size);
int (*app_rx_evt) (transport_connection_t *tconn);
/*
@@ -127,9 +127,9 @@
}
static inline int
-transport_custom_tx (transport_proto_t tp, void *s)
+transport_custom_tx (transport_proto_t tp, void *s, u32 max_burst_size)
{
- return tp_vfts[tp].custom_tx (s);
+ return tp_vfts[tp].custom_tx (s, max_burst_size);
}
static inline int
diff --git a/src/vnet/tcp/tcp.c b/src/vnet/tcp/tcp.c
index 928d1ba..5a215b6 100644
--- a/src/vnet/tcp/tcp.c
+++ b/src/vnet/tcp/tcp.c
@@ -1196,8 +1196,6 @@
tcp_set_time_now (wrk);
tw_timer_expire_timers_16t_2w_512sl (&wrk->timer_wheel, now);
- tcp_do_fastretransmits (wrk);
- tcp_send_acks (wrk);
tcp_flush_frames_to_output (wrk);
}
@@ -1228,6 +1226,7 @@
.update_time = tcp_update_time,
.tx_fifo_offset = tcp_session_tx_fifo_offset,
.flush_data = tcp_session_flush_data,
+ .custom_tx = tcp_session_custom_tx,
.format_connection = format_tcp_session,
.format_listener = format_tcp_listener_session,
.format_half_open = format_tcp_half_open_session,
@@ -1476,17 +1475,9 @@
for (thread = 0; thread < num_threads; thread++)
{
- vec_validate (tm->wrk_ctx[thread].pending_fast_rxt, 255);
- vec_validate (tm->wrk_ctx[thread].ongoing_fast_rxt, 255);
- vec_validate (tm->wrk_ctx[thread].postponed_fast_rxt, 255);
vec_validate (tm->wrk_ctx[thread].pending_deq_acked, 255);
- vec_validate (tm->wrk_ctx[thread].pending_acks, 255);
vec_validate (tm->wrk_ctx[thread].pending_disconnects, 255);
- vec_reset_length (tm->wrk_ctx[thread].pending_fast_rxt);
- vec_reset_length (tm->wrk_ctx[thread].ongoing_fast_rxt);
- vec_reset_length (tm->wrk_ctx[thread].postponed_fast_rxt);
vec_reset_length (tm->wrk_ctx[thread].pending_deq_acked);
- vec_reset_length (tm->wrk_ctx[thread].pending_acks);
vec_reset_length (tm->wrk_ctx[thread].pending_disconnects);
tm->wrk_ctx[thread].vm = vlib_mains[thread];
diff --git a/src/vnet/tcp/tcp.h b/src/vnet/tcp/tcp.h
index 7ccc06a..8f2665d 100644
--- a/src/vnet/tcp/tcp.h
+++ b/src/vnet/tcp/tcp.h
@@ -464,21 +464,9 @@
/** tx frames for ip 4/6 lookup nodes */
vlib_frame_t *ip_lookup_tx_frames[2];
- /** vector of connections needing fast rxt */
- u32 *pending_fast_rxt;
-
- /** vector of connections now doing fast rxt */
- u32 *ongoing_fast_rxt;
-
- /** vector of connections that will do fast rxt */
- u32 *postponed_fast_rxt;
-
/** vector of pending ack dequeues */
u32 *pending_deq_acked;
- /** vector of pending acks */
- u32 *pending_acks;
-
/** vector of pending disconnect notifications */
u32 *pending_disconnects;
@@ -700,15 +688,12 @@
void tcp_update_rto (tcp_connection_t * tc);
void tcp_flush_frame_to_output (tcp_worker_ctx_t * wrk, u8 is_ip4);
void tcp_flush_frames_to_output (tcp_worker_ctx_t * wrk);
-void tcp_program_fastretransmit (tcp_worker_ctx_t * wrk,
- tcp_connection_t * tc);
-void tcp_do_fastretransmits (tcp_worker_ctx_t * wrk);
-
-void tcp_program_ack (tcp_worker_ctx_t * wrk, tcp_connection_t * tc);
-void tcp_program_dupack (tcp_worker_ctx_t * wrk, tcp_connection_t * tc);
-void tcp_send_acks (tcp_worker_ctx_t * wrk);
void tcp_send_window_update_ack (tcp_connection_t * tc);
+void tcp_program_ack (tcp_connection_t * tc);
+void tcp_program_dupack (tcp_connection_t * tc);
+void tcp_program_fastretransmit (tcp_connection_t * tc);
+
/*
* Rate estimation
*/
@@ -961,6 +946,7 @@
u32 tcp_session_push_header (transport_connection_t * tconn,
vlib_buffer_t * b);
+int tcp_session_custom_tx (void *conn, u32 max_burst_size);
void tcp_connection_timers_init (tcp_connection_t * tc);
void tcp_connection_timers_reset (tcp_connection_t * tc);
diff --git a/src/vnet/tcp/tcp_input.c b/src/vnet/tcp/tcp_input.c
index 5f37648..4695fbb 100755
--- a/src/vnet/tcp/tcp_input.c
+++ b/src/vnet/tcp/tcp_input.c
@@ -317,7 +317,7 @@
* SEG.TSval */
else if (!tcp_rst (th0))
{
- tcp_program_ack (wrk, tc0);
+ tcp_program_ack (tc0);
TCP_EVT_DBG (TCP_EVT_DUPACK_SENT, tc0, vnet_buffer (b0)->tcp);
goto error;
}
@@ -340,7 +340,7 @@
}
else
{
- tcp_program_ack (wrk, tc0);
+ tcp_program_ack (tc0);
TCP_EVT_DBG (TCP_EVT_SYNACK_RCVD, tc0);
*error0 = TCP_ERROR_SYN_ACKS_RCVD;
}
@@ -368,7 +368,7 @@
/* If not RST, send dup ack */
if (!tcp_rst (th0))
{
- tcp_program_dupack (wrk, tc0);
+ tcp_program_dupack (tc0);
TCP_EVT_DBG (TCP_EVT_DUPACK_SENT, tc0, vnet_buffer (b0)->tcp);
}
goto error;
@@ -391,7 +391,7 @@
if (PREDICT_FALSE (tcp_syn (th0)))
{
/* As per RFC5961 send challenge ack instead of reset */
- tcp_program_ack (wrk, tc0);
+ tcp_program_ack (tc0);
*error0 = TCP_ERROR_SPURIOUS_SYN;
goto error;
}
@@ -1199,6 +1199,7 @@
tcp_fastrecovery_off (tc);
tcp_fastrecovery_first_off (tc);
+ tc->flags &= ~TCP_CONN_FRXT_PENDING;
TCP_EVT_DBG (TCP_EVT_CC_EVT, tc, 3);
}
@@ -1305,81 +1306,6 @@
|| tcp_should_fastrecover_sack (tc));
}
-#ifndef CLIB_MARCH_VARIANT
-void
-tcp_program_fastretransmit (tcp_worker_ctx_t * wrk, tcp_connection_t * tc)
-{
- if (!(tc->flags & TCP_CONN_FRXT_PENDING))
- {
- vec_add1 (wrk->pending_fast_rxt, tc->c_c_index);
- tc->flags |= TCP_CONN_FRXT_PENDING;
- }
-}
-
-void
-tcp_do_fastretransmits (tcp_worker_ctx_t * wrk)
-{
- u32 *ongoing_fast_rxt, burst_bytes, sent_bytes, thread_index;
- u32 max_burst_size, burst_size, n_segs = 0, n_segs_now;
- tcp_connection_t *tc;
- u64 last_cpu_time;
- int i;
-
- if (vec_len (wrk->pending_fast_rxt) == 0
- && vec_len (wrk->postponed_fast_rxt) == 0)
- return;
-
- thread_index = wrk->vm->thread_index;
- last_cpu_time = wrk->vm->clib_time.last_cpu_time;
- ongoing_fast_rxt = wrk->ongoing_fast_rxt;
- vec_append (ongoing_fast_rxt, wrk->postponed_fast_rxt);
- vec_append (ongoing_fast_rxt, wrk->pending_fast_rxt);
-
- _vec_len (wrk->postponed_fast_rxt) = 0;
- _vec_len (wrk->pending_fast_rxt) = 0;
-
- max_burst_size = VLIB_FRAME_SIZE / vec_len (ongoing_fast_rxt);
- max_burst_size = clib_max (max_burst_size, 1);
-
- for (i = 0; i < vec_len (ongoing_fast_rxt); i++)
- {
- tc = tcp_connection_get (ongoing_fast_rxt[i], thread_index);
- if (!tc)
- continue;
- if (!tcp_in_fastrecovery (tc))
- {
- tc->flags &= ~TCP_CONN_FRXT_PENDING;
- continue;
- }
-
- if (n_segs >= VLIB_FRAME_SIZE)
- {
- vec_add1 (wrk->postponed_fast_rxt, ongoing_fast_rxt[i]);
- continue;
- }
-
- tc->flags &= ~TCP_CONN_FRXT_PENDING;
- burst_size = clib_min (max_burst_size, VLIB_FRAME_SIZE - n_segs);
- burst_bytes = transport_connection_tx_pacer_burst (&tc->connection,
- last_cpu_time);
- burst_size = clib_min (burst_size, burst_bytes / tc->snd_mss);
- if (!burst_size)
- {
- tcp_program_fastretransmit (wrk, tc);
- continue;
- }
-
- n_segs_now = tcp_fast_retransmit (wrk, tc, burst_size);
- sent_bytes = clib_min (n_segs_now * tc->snd_mss, burst_bytes);
- transport_connection_tx_pacer_update_bytes (&tc->connection,
- sent_bytes);
- n_segs += n_segs_now;
- }
- _vec_len (ongoing_fast_rxt) = 0;
- wrk->ongoing_fast_rxt = ongoing_fast_rxt;
-}
-#endif /* CLIB_MARCH_VARIANT */
-
/**
* One function to rule them all ... and in the darkness bind them
*/
@@ -1393,7 +1319,7 @@
{
if (tc->bytes_acked)
goto partial_ack;
- tcp_program_fastretransmit (tcp_get_worker (tc->c_thread_index), tc);
+ tcp_program_fastretransmit (tc);
return;
}
/*
@@ -1449,8 +1375,7 @@
pacer_wnd = clib_max (0.1 * tc->cwnd, 2 * tc->snd_mss);
tcp_connection_tx_pacer_reset (tc, pacer_wnd,
0 /* start bucket */ );
- tcp_program_fastretransmit (tcp_get_worker (tc->c_thread_index),
- tc);
+ tcp_program_fastretransmit (tc);
return;
}
else if (!tc->bytes_acked
@@ -1571,7 +1496,7 @@
/*
* Since this was a partial ack, try to retransmit some more data
*/
- tcp_program_fastretransmit (tcp_get_worker (tc->c_thread_index), tc);
+ tcp_program_fastretransmit (tc);
}
/**
@@ -1712,7 +1637,7 @@
/* Account for the FIN and send ack */
tc->rcv_nxt += 1;
- tcp_program_ack (wrk, tc);
+ tcp_program_ack (tc);
/* Enter CLOSE-WAIT and notify session. To avoid lingering
* in CLOSE-WAIT, set timer (reuse WAITCLOSE). */
tcp_connection_set_state (tc, TCP_STATE_CLOSE_WAIT);
@@ -1976,7 +1901,7 @@
* retransmissions since we may not have any data to send */
if (seq_leq (vnet_buffer (b)->tcp.seq_end, tc->rcv_nxt))
{
- tcp_program_ack (wrk, tc);
+ tcp_program_ack (tc);
error = TCP_ERROR_SEGMENT_OLD;
goto done;
}
@@ -1996,7 +1921,7 @@
/* RFC2581: Enqueue and send DUPACK for fast retransmit */
error = tcp_session_enqueue_ooo (tc, b, n_data_bytes);
- tcp_program_dupack (wrk, tc);
+ tcp_program_dupack (tc);
TCP_EVT_DBG (TCP_EVT_DUPACK_SENT, tc, vnet_buffer (b)->tcp);
goto done;
}
@@ -2013,7 +1938,7 @@
goto done;
}
- tcp_program_ack (wrk, tc);
+ tcp_program_ack (tc);
done:
return error;
@@ -2591,7 +2516,7 @@
}
else
{
- tcp_program_ack (wrk, new_tc0);
+ tcp_program_ack (new_tc0);
}
drop:
@@ -2921,7 +2846,7 @@
if (!is_fin0)
goto drop;
- tcp_program_ack (wrk, tc0);
+ tcp_program_ack (tc0);
tcp_timer_update (tc0, TCP_TIMER_WAITCLOSE, TCP_TIMEWAIT_TIME);
goto drop;
@@ -2961,7 +2886,7 @@
case TCP_STATE_ESTABLISHED:
/* Account for the FIN and send ack */
tc0->rcv_nxt += 1;
- tcp_program_ack (wrk, tc0);
+ tcp_program_ack (tc0);
tcp_connection_set_state (tc0, TCP_STATE_CLOSE_WAIT);
tcp_program_disconnect (wrk, tc0);
tcp_timer_update (tc0, TCP_TIMER_WAITCLOSE, TCP_CLOSEWAIT_TIME);
@@ -2995,7 +2920,7 @@
else
{
tcp_connection_set_state (tc0, TCP_STATE_CLOSING);
- tcp_program_ack (wrk, tc0);
+ tcp_program_ack (tc0);
/* Wait for ACK for our FIN but not forever */
tcp_timer_update (tc0, TCP_TIMER_WAITCLOSE, TCP_2MSL_TIME);
}
@@ -3006,7 +2931,7 @@
tcp_connection_set_state (tc0, TCP_STATE_TIME_WAIT);
tcp_connection_timers_reset (tc0);
tcp_timer_set (tc0, TCP_TIMER_WAITCLOSE, TCP_TIMEWAIT_TIME);
- tcp_program_ack (wrk, tc0);
+ tcp_program_ack (tc0);
session_transport_closed_notify (&tc0->connection);
break;
case TCP_STATE_TIME_WAIT:
diff --git a/src/vnet/tcp/tcp_output.c b/src/vnet/tcp/tcp_output.c
index 7b0303f..79cc95e 100644
--- a/src/vnet/tcp/tcp_output.c
+++ b/src/vnet/tcp/tcp_output.c
@@ -1188,21 +1188,21 @@
}
void
-tcp_program_ack (tcp_worker_ctx_t * wrk, tcp_connection_t * tc)
+tcp_program_ack (tcp_connection_t * tc)
{
if (!(tc->flags & TCP_CONN_SNDACK))
{
- vec_add1 (wrk->pending_acks, tc->c_c_index);
+ session_add_self_custom_tx_evt (&tc->connection, 1);
tc->flags |= TCP_CONN_SNDACK;
}
}
void
-tcp_program_dupack (tcp_worker_ctx_t * wrk, tcp_connection_t * tc)
+tcp_program_dupack (tcp_connection_t * tc)
{
if (!(tc->flags & TCP_CONN_SNDACK))
{
- vec_add1 (wrk->pending_acks, tc->c_c_index);
+ session_add_self_custom_tx_evt (&tc->connection, 1);
tc->flags |= TCP_CONN_SNDACK;
}
if (tc->pending_dupacks < 255)
@@ -1210,51 +1210,13 @@
}
void
-tcp_send_acks (tcp_worker_ctx_t * wrk)
+tcp_program_fastretransmit (tcp_connection_t * tc)
{
- u32 thread_index, *pending_acks;
- tcp_connection_t *tc;
- int i, j, n_acks;
-
- if (!vec_len (wrk->pending_acks))
- return;
-
- thread_index = wrk->vm->thread_index;
- pending_acks = wrk->pending_acks;
- for (i = 0; i < vec_len (pending_acks); i++)
+ if (!(tc->flags & TCP_CONN_FRXT_PENDING))
{
- tc = tcp_connection_get (pending_acks[i], thread_index);
- tc->flags &= ~TCP_CONN_SNDACK;
- if (!tc->pending_dupacks)
- {
- tcp_send_ack (tc);
- continue;
- }
-
- /* If we're supposed to send dupacks but have no ooo data
- * send only one ack */
- if (!vec_len (tc->snd_sacks))
- {
- tcp_send_ack (tc);
- continue;
- }
-
- /* Start with first sack block */
- tc->snd_sack_pos = 0;
-
- /* Generate enough dupacks to cover all sack blocks. Do not generate
- * more sacks than the number of packets received. But do generate at
- * least 3, i.e., the number needed to signal congestion, if needed. */
- n_acks = vec_len (tc->snd_sacks) / TCP_OPTS_MAX_SACK_BLOCKS;
- n_acks = clib_min (n_acks, tc->pending_dupacks);
- n_acks = clib_max (n_acks, clib_min (tc->pending_dupacks, 3));
- for (j = 0; j < n_acks; j++)
- tcp_send_ack (tc);
-
- tc->pending_dupacks = 0;
- tc->snd_sack_pos = 0;
+ session_add_self_custom_tx_evt (&tc->connection, 0);
+ tc->flags |= TCP_CONN_FRXT_PENDING;
}
- _vec_len (wrk->pending_acks) = 0;
}
/**
@@ -1281,7 +1243,6 @@
void
tcp_send_window_update_ack (tcp_connection_t * tc)
{
- tcp_worker_ctx_t *wrk = tcp_get_worker (tc->c_thread_index);
u32 win;
if (tcp_zero_rwnd_sent (tc))
@@ -1290,7 +1251,7 @@
if (win > 0)
{
tcp_zero_rwnd_sent_off (tc);
- tcp_program_ack (wrk, tc);
+ tcp_program_ack (tc);
}
}
}
@@ -1853,7 +1814,7 @@
snd_space = tcp_available_cc_snd_space (tc);
if (snd_space < tc->snd_mss)
{
- tcp_program_fastretransmit (wrk, tc);
+ tcp_program_fastretransmit (tc);
return 0;
}
@@ -1877,7 +1838,7 @@
snd_space / tc->snd_mss);
n_segs_now = tcp_fast_retransmit_unsent (wrk, tc, burst_size);
if (max_deq > n_segs_now * tc->snd_mss)
- tcp_program_fastretransmit (wrk, tc);
+ tcp_program_fastretransmit (tc);
n_segs += n_segs_now;
goto done;
}
@@ -1929,7 +1890,7 @@
}
if (hole)
- tcp_program_fastretransmit (wrk, tc);
+ tcp_program_fastretransmit (tc);
done:
return n_segs;
@@ -1990,7 +1951,7 @@
burst_size = clib_min (burst_size - n_segs, snd_space / tc->snd_mss);
n_segs_now = tcp_fast_retransmit_unsent (wrk, tc, burst_size);
if (max_deq > n_segs_now * tc->snd_mss)
- tcp_program_fastretransmit (wrk, tc);
+ tcp_program_fastretransmit (tc);
n_segs += n_segs_now;
}
@@ -2011,6 +1972,108 @@
else
return tcp_fast_retransmit_no_sack (wrk, tc, burst_size);
}
+
+static int
+tcp_send_acks (tcp_connection_t * tc, u32 max_burst_size)
+{
+ int j, n_acks;
+
+ if (!tc->pending_dupacks)
+ {
+ tcp_send_ack (tc);
+ return 1;
+ }
+
+ /* If we're supposed to send dupacks but have no ooo data
+ * send only one ack */
+ if (!vec_len (tc->snd_sacks))
+ {
+ tcp_send_ack (tc);
+ return 1;
+ }
+
+ /* Start with first sack block */
+ tc->snd_sack_pos = 0;
+
+ /* Generate enough dupacks to cover all sack blocks. Do not generate
+ * more sacks than the number of packets received. But do generate at
+ * least 3, i.e., the number needed to signal congestion, if needed. */
+ n_acks = vec_len (tc->snd_sacks) / TCP_OPTS_MAX_SACK_BLOCKS;
+ n_acks = clib_min (n_acks, tc->pending_dupacks);
+ n_acks = clib_max (n_acks, clib_min (tc->pending_dupacks, 3));
+ for (j = 0; j < clib_min (n_acks, max_burst_size); j++)
+ tcp_send_ack (tc);
+
+ if (n_acks < max_burst_size)
+ {
+ tc->pending_dupacks = 0;
+ tc->snd_sack_pos = 0;
+ return n_acks;
+ }
+ else
+ {
+ TCP_DBG ("constrained by burst size");
+ tc->pending_dupacks = n_acks - max_burst_size;
+ tcp_program_dupack (tc);
+ return max_burst_size;
+ }
+}
+
+static int
+tcp_do_fastretransmit (tcp_connection_t * tc, u32 max_burst_size)
+{
+ u32 n_segs = 0, burst_size, sent_bytes, burst_bytes;
+ tcp_worker_ctx_t *wrk;
+
+ wrk = tcp_get_worker (tc->c_thread_index);
+ burst_bytes = transport_connection_tx_pacer_burst (&tc->connection,
+ wrk->vm->
+ clib_time.last_cpu_time);
+ burst_size = clib_min (max_burst_size, burst_bytes / tc->snd_mss);
+ if (!burst_size)
+ {
+ tcp_program_fastretransmit (tc);
+ return 0;
+ }
+
+ n_segs = tcp_fast_retransmit (wrk, tc, burst_size);
+ sent_bytes = clib_min (n_segs * tc->snd_mss, burst_bytes);
+ transport_connection_tx_pacer_update_bytes (&tc->connection, sent_bytes);
+ return n_segs;
+}
+
+int
+tcp_session_custom_tx (void *conn, u32 max_burst_size)
+{
+ tcp_connection_t *tc = (tcp_connection_t *) conn;
+ u32 n_segs = 0;
+
+ if (tcp_in_fastrecovery (tc) && (tc->flags & TCP_CONN_FRXT_PENDING))
+ {
+ tc->flags &= ~TCP_CONN_FRXT_PENDING;
+ n_segs = tcp_do_fastretransmit (tc, max_burst_size);
+ max_burst_size -= n_segs;
+ }
+
+ if (!(tc->flags & TCP_CONN_SNDACK))
+ return n_segs;
+
+ tc->flags &= ~TCP_CONN_SNDACK;
+
+ /* We have retransmitted packets and no dupack */
+ if (n_segs && !tc->pending_dupacks)
+ return n_segs;
+
+ if (!max_burst_size)
+ {
+ tcp_program_ack (tc);
+ return max_burst_size;
+ }
+
+ n_segs += tcp_send_acks (tc, max_burst_size);
+
+ return n_segs;
+}
#endif /* CLIB_MARCH_VARIANT */
static void
diff --git a/src/vnet/tls/tls.c b/src/vnet/tls/tls.c
index 4a9ec4e..f7780fe 100644
--- a/src/vnet/tls/tls.c
+++ b/src/vnet/tls/tls.c
@@ -660,7 +660,7 @@
}
int
-tls_custom_tx_callback (void *session)
+tls_custom_tx_callback (void *session, u32 max_burst_size)
{
session_t *app_session = (session_t *) session;
tls_ctx_t *ctx;