session tcp: handle rxt and acks as custom events

Type: feature

Control ack generation and retransmissions with session layer scheduler.

Change-Id: Iacdf9f84ab81f44851980aa45a83e75f29be2b7b
Signed-off-by: Florin Coras <fcoras@cisco.com>
diff --git a/src/plugins/quic/quic.c b/src/plugins/quic/quic.c
index 8bb9422..435113a 100644
--- a/src/plugins/quic/quic.c
+++ b/src/plugins/quic/quic.c
@@ -2100,7 +2100,7 @@
 }
 
 static int
-quic_custom_tx_callback (void *s)
+quic_custom_tx_callback (void *s, u32 max_burst_size)
 {
   session_t *stream_session = (session_t *) s;
   quicly_stream_t *stream;
diff --git a/src/plugins/unittest/session_test.c b/src/plugins/unittest/session_test.c
index 00a70b8..819c426 100644
--- a/src/plugins/unittest/session_test.c
+++ b/src/plugins/unittest/session_test.c
@@ -375,8 +375,7 @@
   SESSION_TEST ((error == 0), "connect should work");
 
   /* wait for stuff to happen */
-  while ((connected_session_index == ~0
-	  || vec_len (tcp_main.wrk_ctx[0].pending_acks)) && ++tries < 100)
+  while (connected_session_index == ~0 && ++tries < 100)
     vlib_process_suspend (vm, 100e-3);
   clib_warning ("waited %.1f seconds for connections", tries / 10.0);
   SESSION_TEST ((connected_session_index != ~0), "session should exist");
diff --git a/src/vnet/session/application_local.c b/src/vnet/session/application_local.c
index 4a268c7..7d8fb46 100644
--- a/src/vnet/session/application_local.c
+++ b/src/vnet/session/application_local.c
@@ -467,7 +467,7 @@
 }
 
 static int
-ct_custom_tx (void *session)
+ct_custom_tx (void *session, u32 max_burst_size)
 {
   session_t *s = (session_t *) session;
   if (session_has_transport (s))
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c
index 477215e..1c8b7fb 100644
--- a/src/vnet/session/session.c
+++ b/src/vnet/session/session.c
@@ -118,6 +118,31 @@
     }
 }
 
+void
+session_add_self_custom_tx_evt (transport_connection_t * tc, u8 has_prio)
+{
+  session_t *s;
+
+  s = session_get (tc->s_index, tc->thread_index);
+  ASSERT (s->thread_index == vlib_get_thread_index ());
+  if (!(s->flags & SESSION_F_CUSTOM_TX))
+    {
+      s->flags |= SESSION_F_CUSTOM_TX;
+      if (svm_fifo_set_event (s->tx_fifo))
+	{
+	  session_worker_t *wrk;
+	  session_evt_elt_t *elt;
+	  wrk = session_main_get_worker (tc->thread_index);
+	  if (has_prio)
+	    elt = session_evt_alloc_new (wrk);
+	  else
+	    elt = session_evt_alloc_old (wrk);
+	  elt->evt.session_index = tc->s_index;
+	  elt->evt.event_type = SESSION_IO_EVT_TX;
+	}
+    }
+}
+
 static void
 session_program_transport_close (session_t * s)
 {
diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h
index a3f2a01..8f7bd6c 100644
--- a/src/vnet/session/session.h
+++ b/src/vnet/session/session.h
@@ -243,6 +243,26 @@
 		       session_evt_pending_disconnects_head (wrk));
 }
 
+static inline session_evt_elt_t *
+session_evt_alloc_new (session_worker_t * wrk)
+{
+  session_evt_elt_t *elt;
+  elt = session_evt_elt_alloc (wrk);
+  clib_llist_add_tail (wrk->event_elts, evt_list, elt,
+		       pool_elt_at_index (wrk->event_elts, wrk->new_head));
+  return elt;
+}
+
+static inline session_evt_elt_t *
+session_evt_alloc_old (session_worker_t * wrk)
+{
+  session_evt_elt_t *elt;
+  elt = session_evt_elt_alloc (wrk);
+  clib_llist_add_tail (wrk->event_elts, evt_list, elt,
+		       pool_elt_at_index (wrk->event_elts, wrk->old_head));
+  return elt;
+}
+
 always_inline u8
 session_is_valid (u32 si, u8 thread_index)
 {
@@ -394,6 +414,8 @@
 				     void *rpc_args);
 void session_send_rpc_evt_to_thread_force (u32 thread_index, void *fp,
 					   void *rpc_args);
+void session_add_self_custom_tx_evt (transport_connection_t * tc,
+				     u8 has_prio);
 transport_connection_t *session_get_transport (session_t * s);
 void session_get_endpoint (session_t * s, transport_endpoint_t * tep,
 			   u8 is_lcl);
diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c
index 999776f..1bb5cb2 100644
--- a/src/vnet/session/session_node.c
+++ b/src/vnet/session/session_node.c
@@ -618,7 +618,7 @@
 				session_evt_elt_t * elt,
 				int *n_tx_packets, u8 peek_data)
 {
-  u32 next_index, next0, next1, *to_next, n_left_to_next;
+  u32 next_index, next0, next1, *to_next, n_left_to_next, max_burst;
   u32 n_trace, n_bufs_needed = 0, n_left, pbi;
   session_tx_context_t *ctx = &wrk->ctx;
   session_main_t *smm = &session_main;
@@ -637,6 +637,7 @@
 
   next_index = smm->session_type_to_next[ctx->s->session_type];
   next0 = next1 = next_index;
+  max_burst = VLIB_FRAME_SIZE - *n_tx_packets;
 
   tp = session_get_transport_proto (ctx->s);
   ctx->transport_vft = transport_protocol_get_vft (tp);
@@ -649,6 +650,20 @@
 	ctx->transport_vft->flush_data (ctx->tc);
     }
 
+  if (ctx->s->flags & SESSION_F_CUSTOM_TX)
+    {
+      u32 n_custom_tx;
+      ctx->s->flags &= ~SESSION_F_CUSTOM_TX;
+      n_custom_tx = ctx->transport_vft->custom_tx (ctx->tc, max_burst);
+      *n_tx_packets += n_custom_tx;
+      max_burst -= n_custom_tx;
+      if (!max_burst)
+	{
+	  session_evt_add_old (wrk, elt);
+	  return SESSION_TX_OK;
+	}
+    }
+
   ctx->snd_space = transport_connection_snd_space (ctx->tc,
 						   wrk->vm->clib_time.
 						   last_cpu_time,
@@ -664,8 +679,7 @@
   svm_fifo_unset_event (ctx->s->tx_fifo);
 
   /* Check how much we can pull. */
-  session_tx_set_dequeue_params (vm, ctx, VLIB_FRAME_SIZE - *n_tx_packets,
-				 peek_data);
+  session_tx_set_dequeue_params (vm, ctx, max_burst, peek_data);
 
   if (PREDICT_FALSE (!ctx->max_len_to_snd))
     return SESSION_TX_NO_DATA;
@@ -823,7 +837,8 @@
   if (PREDICT_FALSE (s->session_state >= SESSION_STATE_TRANSPORT_CLOSED))
     return 0;
   svm_fifo_unset_event (s->tx_fifo);
-  return transport_custom_tx (session_get_transport_proto (s), s);
+  return transport_custom_tx (session_get_transport_proto (s), s,
+			      VLIB_FRAME_SIZE - *n_tx_packets);
 }
 
 always_inline session_t *
diff --git a/src/vnet/session/session_types.h b/src/vnet/session/session_types.h
index 25b6116..9a5bc76 100644
--- a/src/vnet/session/session_types.h
+++ b/src/vnet/session/session_types.h
@@ -142,6 +142,7 @@
 {
   SESSION_F_RX_EVT = 1,
   SESSION_F_PROXY = (1 << 1),
+  SESSION_F_CUSTOM_TX = (1 << 2),
 } session_flags_t;
 
 typedef struct session_
diff --git a/src/vnet/session/transport.h b/src/vnet/session/transport.h
index 6e2feb0..058a9ae 100644
--- a/src/vnet/session/transport.h
+++ b/src/vnet/session/transport.h
@@ -52,7 +52,7 @@
   u32 (*tx_fifo_offset) (transport_connection_t * tc);
   void (*update_time) (f64 time_now, u8 thread_index);
   void (*flush_data) (transport_connection_t *tconn);
-  int (*custom_tx) (void *session);
+  int (*custom_tx) (void *session, u32 max_burst_size);
   int (*app_rx_evt) (transport_connection_t *tconn);
 
   /*
@@ -127,9 +127,9 @@
 }
 
 static inline int
-transport_custom_tx (transport_proto_t tp, void *s)
+transport_custom_tx (transport_proto_t tp, void *s, u32 max_burst_size)
 {
-  return tp_vfts[tp].custom_tx (s);
+  return tp_vfts[tp].custom_tx (s, max_burst_size);
 }
 
 static inline int
diff --git a/src/vnet/tcp/tcp.c b/src/vnet/tcp/tcp.c
index 928d1ba..5a215b6 100644
--- a/src/vnet/tcp/tcp.c
+++ b/src/vnet/tcp/tcp.c
@@ -1196,8 +1196,6 @@
 
   tcp_set_time_now (wrk);
   tw_timer_expire_timers_16t_2w_512sl (&wrk->timer_wheel, now);
-  tcp_do_fastretransmits (wrk);
-  tcp_send_acks (wrk);
   tcp_flush_frames_to_output (wrk);
 }
 
@@ -1228,6 +1226,7 @@
   .update_time = tcp_update_time,
   .tx_fifo_offset = tcp_session_tx_fifo_offset,
   .flush_data = tcp_session_flush_data,
+  .custom_tx = tcp_session_custom_tx,
   .format_connection = format_tcp_session,
   .format_listener = format_tcp_listener_session,
   .format_half_open = format_tcp_half_open_session,
@@ -1476,17 +1475,9 @@
 
   for (thread = 0; thread < num_threads; thread++)
     {
-      vec_validate (tm->wrk_ctx[thread].pending_fast_rxt, 255);
-      vec_validate (tm->wrk_ctx[thread].ongoing_fast_rxt, 255);
-      vec_validate (tm->wrk_ctx[thread].postponed_fast_rxt, 255);
       vec_validate (tm->wrk_ctx[thread].pending_deq_acked, 255);
-      vec_validate (tm->wrk_ctx[thread].pending_acks, 255);
       vec_validate (tm->wrk_ctx[thread].pending_disconnects, 255);
-      vec_reset_length (tm->wrk_ctx[thread].pending_fast_rxt);
-      vec_reset_length (tm->wrk_ctx[thread].ongoing_fast_rxt);
-      vec_reset_length (tm->wrk_ctx[thread].postponed_fast_rxt);
       vec_reset_length (tm->wrk_ctx[thread].pending_deq_acked);
-      vec_reset_length (tm->wrk_ctx[thread].pending_acks);
       vec_reset_length (tm->wrk_ctx[thread].pending_disconnects);
       tm->wrk_ctx[thread].vm = vlib_mains[thread];
 
diff --git a/src/vnet/tcp/tcp.h b/src/vnet/tcp/tcp.h
index 7ccc06a..8f2665d 100644
--- a/src/vnet/tcp/tcp.h
+++ b/src/vnet/tcp/tcp.h
@@ -464,21 +464,9 @@
   /** tx frames for ip 4/6 lookup nodes */
   vlib_frame_t *ip_lookup_tx_frames[2];
 
-  /** vector of connections needing fast rxt */
-  u32 *pending_fast_rxt;
-
-  /** vector of connections now doing fast rxt */
-  u32 *ongoing_fast_rxt;
-
-  /** vector of connections that will do fast rxt */
-  u32 *postponed_fast_rxt;
-
   /** vector of pending ack dequeues */
   u32 *pending_deq_acked;
 
-  /** vector of pending acks */
-  u32 *pending_acks;
-
   /** vector of pending disconnect notifications */
   u32 *pending_disconnects;
 
@@ -700,15 +688,12 @@
 void tcp_update_rto (tcp_connection_t * tc);
 void tcp_flush_frame_to_output (tcp_worker_ctx_t * wrk, u8 is_ip4);
 void tcp_flush_frames_to_output (tcp_worker_ctx_t * wrk);
-void tcp_program_fastretransmit (tcp_worker_ctx_t * wrk,
-				 tcp_connection_t * tc);
-void tcp_do_fastretransmits (tcp_worker_ctx_t * wrk);
-
-void tcp_program_ack (tcp_worker_ctx_t * wrk, tcp_connection_t * tc);
-void tcp_program_dupack (tcp_worker_ctx_t * wrk, tcp_connection_t * tc);
-void tcp_send_acks (tcp_worker_ctx_t * wrk);
 void tcp_send_window_update_ack (tcp_connection_t * tc);
 
+void tcp_program_ack (tcp_connection_t * tc);
+void tcp_program_dupack (tcp_connection_t * tc);
+void tcp_program_fastretransmit (tcp_connection_t * tc);
+
 /*
  * Rate estimation
  */
@@ -961,6 +946,7 @@
 
 u32 tcp_session_push_header (transport_connection_t * tconn,
 			     vlib_buffer_t * b);
+int tcp_session_custom_tx (void *conn, u32 max_burst_size);
 
 void tcp_connection_timers_init (tcp_connection_t * tc);
 void tcp_connection_timers_reset (tcp_connection_t * tc);
diff --git a/src/vnet/tcp/tcp_input.c b/src/vnet/tcp/tcp_input.c
index 5f37648..4695fbb 100755
--- a/src/vnet/tcp/tcp_input.c
+++ b/src/vnet/tcp/tcp_input.c
@@ -317,7 +317,7 @@
        * SEG.TSval */
       else if (!tcp_rst (th0))
 	{
-	  tcp_program_ack (wrk, tc0);
+	  tcp_program_ack (tc0);
 	  TCP_EVT_DBG (TCP_EVT_DUPACK_SENT, tc0, vnet_buffer (b0)->tcp);
 	  goto error;
 	}
@@ -340,7 +340,7 @@
 	    }
 	  else
 	    {
-	      tcp_program_ack (wrk, tc0);
+	      tcp_program_ack (tc0);
 	      TCP_EVT_DBG (TCP_EVT_SYNACK_RCVD, tc0);
 	      *error0 = TCP_ERROR_SYN_ACKS_RCVD;
 	    }
@@ -368,7 +368,7 @@
       /* If not RST, send dup ack */
       if (!tcp_rst (th0))
 	{
-	  tcp_program_dupack (wrk, tc0);
+	  tcp_program_dupack (tc0);
 	  TCP_EVT_DBG (TCP_EVT_DUPACK_SENT, tc0, vnet_buffer (b0)->tcp);
 	}
       goto error;
@@ -391,7 +391,7 @@
   if (PREDICT_FALSE (tcp_syn (th0)))
     {
       /* As per RFC5961 send challenge ack instead of reset */
-      tcp_program_ack (wrk, tc0);
+      tcp_program_ack (tc0);
       *error0 = TCP_ERROR_SPURIOUS_SYN;
       goto error;
     }
@@ -1199,6 +1199,7 @@
 
   tcp_fastrecovery_off (tc);
   tcp_fastrecovery_first_off (tc);
+  tc->flags &= ~TCP_CONN_FRXT_PENDING;
 
   TCP_EVT_DBG (TCP_EVT_CC_EVT, tc, 3);
 }
@@ -1305,81 +1306,6 @@
 	  || tcp_should_fastrecover_sack (tc));
 }
 
-#ifndef CLIB_MARCH_VARIANT
-void
-tcp_program_fastretransmit (tcp_worker_ctx_t * wrk, tcp_connection_t * tc)
-{
-  if (!(tc->flags & TCP_CONN_FRXT_PENDING))
-    {
-      vec_add1 (wrk->pending_fast_rxt, tc->c_c_index);
-      tc->flags |= TCP_CONN_FRXT_PENDING;
-    }
-}
-
-void
-tcp_do_fastretransmits (tcp_worker_ctx_t * wrk)
-{
-  u32 *ongoing_fast_rxt, burst_bytes, sent_bytes, thread_index;
-  u32 max_burst_size, burst_size, n_segs = 0, n_segs_now;
-  tcp_connection_t *tc;
-  u64 last_cpu_time;
-  int i;
-
-  if (vec_len (wrk->pending_fast_rxt) == 0
-      && vec_len (wrk->postponed_fast_rxt) == 0)
-    return;
-
-  thread_index = wrk->vm->thread_index;
-  last_cpu_time = wrk->vm->clib_time.last_cpu_time;
-  ongoing_fast_rxt = wrk->ongoing_fast_rxt;
-  vec_append (ongoing_fast_rxt, wrk->postponed_fast_rxt);
-  vec_append (ongoing_fast_rxt, wrk->pending_fast_rxt);
-
-  _vec_len (wrk->postponed_fast_rxt) = 0;
-  _vec_len (wrk->pending_fast_rxt) = 0;
-
-  max_burst_size = VLIB_FRAME_SIZE / vec_len (ongoing_fast_rxt);
-  max_burst_size = clib_max (max_burst_size, 1);
-
-  for (i = 0; i < vec_len (ongoing_fast_rxt); i++)
-    {
-      tc = tcp_connection_get (ongoing_fast_rxt[i], thread_index);
-      if (!tc)
-	continue;
-      if (!tcp_in_fastrecovery (tc))
-	{
-	  tc->flags &= ~TCP_CONN_FRXT_PENDING;
-	  continue;
-	}
-
-      if (n_segs >= VLIB_FRAME_SIZE)
-	{
-	  vec_add1 (wrk->postponed_fast_rxt, ongoing_fast_rxt[i]);
-	  continue;
-	}
-
-      tc->flags &= ~TCP_CONN_FRXT_PENDING;
-      burst_size = clib_min (max_burst_size, VLIB_FRAME_SIZE - n_segs);
-      burst_bytes = transport_connection_tx_pacer_burst (&tc->connection,
-							 last_cpu_time);
-      burst_size = clib_min (burst_size, burst_bytes / tc->snd_mss);
-      if (!burst_size)
-	{
-	  tcp_program_fastretransmit (wrk, tc);
-	  continue;
-	}
-
-      n_segs_now = tcp_fast_retransmit (wrk, tc, burst_size);
-      sent_bytes = clib_min (n_segs_now * tc->snd_mss, burst_bytes);
-      transport_connection_tx_pacer_update_bytes (&tc->connection,
-						  sent_bytes);
-      n_segs += n_segs_now;
-    }
-  _vec_len (ongoing_fast_rxt) = 0;
-  wrk->ongoing_fast_rxt = ongoing_fast_rxt;
-}
-#endif /* CLIB_MARCH_VARIANT */
-
 /**
  * One function to rule them all ... and in the darkness bind them
  */
@@ -1393,7 +1319,7 @@
     {
       if (tc->bytes_acked)
 	goto partial_ack;
-      tcp_program_fastretransmit (tcp_get_worker (tc->c_thread_index), tc);
+      tcp_program_fastretransmit (tc);
       return;
     }
   /*
@@ -1449,8 +1375,7 @@
 	  pacer_wnd = clib_max (0.1 * tc->cwnd, 2 * tc->snd_mss);
 	  tcp_connection_tx_pacer_reset (tc, pacer_wnd,
 					 0 /* start bucket */ );
-	  tcp_program_fastretransmit (tcp_get_worker (tc->c_thread_index),
-				      tc);
+	  tcp_program_fastretransmit (tc);
 	  return;
 	}
       else if (!tc->bytes_acked
@@ -1571,7 +1496,7 @@
   /*
    * Since this was a partial ack, try to retransmit some more data
    */
-  tcp_program_fastretransmit (tcp_get_worker (tc->c_thread_index), tc);
+  tcp_program_fastretransmit (tc);
 }
 
 /**
@@ -1712,7 +1637,7 @@
 
   /* Account for the FIN and send ack */
   tc->rcv_nxt += 1;
-  tcp_program_ack (wrk, tc);
+  tcp_program_ack (tc);
   /* Enter CLOSE-WAIT and notify session. To avoid lingering
    * in CLOSE-WAIT, set timer (reuse WAITCLOSE). */
   tcp_connection_set_state (tc, TCP_STATE_CLOSE_WAIT);
@@ -1976,7 +1901,7 @@
 	   * retransmissions since we may not have any data to send */
 	  if (seq_leq (vnet_buffer (b)->tcp.seq_end, tc->rcv_nxt))
 	    {
-	      tcp_program_ack (wrk, tc);
+	      tcp_program_ack (tc);
 	      error = TCP_ERROR_SEGMENT_OLD;
 	      goto done;
 	    }
@@ -1996,7 +1921,7 @@
 
       /* RFC2581: Enqueue and send DUPACK for fast retransmit */
       error = tcp_session_enqueue_ooo (tc, b, n_data_bytes);
-      tcp_program_dupack (wrk, tc);
+      tcp_program_dupack (tc);
       TCP_EVT_DBG (TCP_EVT_DUPACK_SENT, tc, vnet_buffer (b)->tcp);
       goto done;
     }
@@ -2013,7 +1938,7 @@
       goto done;
     }
 
-  tcp_program_ack (wrk, tc);
+  tcp_program_ack (tc);
 
 done:
   return error;
@@ -2591,7 +2516,7 @@
 	}
       else
 	{
-	  tcp_program_ack (wrk, new_tc0);
+	  tcp_program_ack (new_tc0);
 	}
 
     drop:
@@ -2921,7 +2846,7 @@
 	  if (!is_fin0)
 	    goto drop;
 
-	  tcp_program_ack (wrk, tc0);
+	  tcp_program_ack (tc0);
 	  tcp_timer_update (tc0, TCP_TIMER_WAITCLOSE, TCP_TIMEWAIT_TIME);
 	  goto drop;
 
@@ -2961,7 +2886,7 @@
 	case TCP_STATE_ESTABLISHED:
 	  /* Account for the FIN and send ack */
 	  tc0->rcv_nxt += 1;
-	  tcp_program_ack (wrk, tc0);
+	  tcp_program_ack (tc0);
 	  tcp_connection_set_state (tc0, TCP_STATE_CLOSE_WAIT);
 	  tcp_program_disconnect (wrk, tc0);
 	  tcp_timer_update (tc0, TCP_TIMER_WAITCLOSE, TCP_CLOSEWAIT_TIME);
@@ -2995,7 +2920,7 @@
 	  else
 	    {
 	      tcp_connection_set_state (tc0, TCP_STATE_CLOSING);
-	      tcp_program_ack (wrk, tc0);
+	      tcp_program_ack (tc0);
 	      /* Wait for ACK for our FIN but not forever */
 	      tcp_timer_update (tc0, TCP_TIMER_WAITCLOSE, TCP_2MSL_TIME);
 	    }
@@ -3006,7 +2931,7 @@
 	  tcp_connection_set_state (tc0, TCP_STATE_TIME_WAIT);
 	  tcp_connection_timers_reset (tc0);
 	  tcp_timer_set (tc0, TCP_TIMER_WAITCLOSE, TCP_TIMEWAIT_TIME);
-	  tcp_program_ack (wrk, tc0);
+	  tcp_program_ack (tc0);
 	  session_transport_closed_notify (&tc0->connection);
 	  break;
 	case TCP_STATE_TIME_WAIT:
diff --git a/src/vnet/tcp/tcp_output.c b/src/vnet/tcp/tcp_output.c
index 7b0303f..79cc95e 100644
--- a/src/vnet/tcp/tcp_output.c
+++ b/src/vnet/tcp/tcp_output.c
@@ -1188,21 +1188,21 @@
 }
 
 void
-tcp_program_ack (tcp_worker_ctx_t * wrk, tcp_connection_t * tc)
+tcp_program_ack (tcp_connection_t * tc)
 {
   if (!(tc->flags & TCP_CONN_SNDACK))
     {
-      vec_add1 (wrk->pending_acks, tc->c_c_index);
+      session_add_self_custom_tx_evt (&tc->connection, 1);
       tc->flags |= TCP_CONN_SNDACK;
     }
 }
 
 void
-tcp_program_dupack (tcp_worker_ctx_t * wrk, tcp_connection_t * tc)
+tcp_program_dupack (tcp_connection_t * tc)
 {
   if (!(tc->flags & TCP_CONN_SNDACK))
     {
-      vec_add1 (wrk->pending_acks, tc->c_c_index);
+      session_add_self_custom_tx_evt (&tc->connection, 1);
       tc->flags |= TCP_CONN_SNDACK;
     }
   if (tc->pending_dupacks < 255)
@@ -1210,51 +1210,13 @@
 }
 
 void
-tcp_send_acks (tcp_worker_ctx_t * wrk)
+tcp_program_fastretransmit (tcp_connection_t * tc)
 {
-  u32 thread_index, *pending_acks;
-  tcp_connection_t *tc;
-  int i, j, n_acks;
-
-  if (!vec_len (wrk->pending_acks))
-    return;
-
-  thread_index = wrk->vm->thread_index;
-  pending_acks = wrk->pending_acks;
-  for (i = 0; i < vec_len (pending_acks); i++)
+  if (!(tc->flags & TCP_CONN_FRXT_PENDING))
     {
-      tc = tcp_connection_get (pending_acks[i], thread_index);
-      tc->flags &= ~TCP_CONN_SNDACK;
-      if (!tc->pending_dupacks)
-	{
-	  tcp_send_ack (tc);
-	  continue;
-	}
-
-      /* If we're supposed to send dupacks but have no ooo data
-       * send only one ack */
-      if (!vec_len (tc->snd_sacks))
-	{
-	  tcp_send_ack (tc);
-	  continue;
-	}
-
-      /* Start with first sack block */
-      tc->snd_sack_pos = 0;
-
-      /* Generate enough dupacks to cover all sack blocks. Do not generate
-       * more sacks than the number of packets received. But do generate at
-       * least 3, i.e., the number needed to signal congestion, if needed. */
-      n_acks = vec_len (tc->snd_sacks) / TCP_OPTS_MAX_SACK_BLOCKS;
-      n_acks = clib_min (n_acks, tc->pending_dupacks);
-      n_acks = clib_max (n_acks, clib_min (tc->pending_dupacks, 3));
-      for (j = 0; j < n_acks; j++)
-	tcp_send_ack (tc);
-
-      tc->pending_dupacks = 0;
-      tc->snd_sack_pos = 0;
+      session_add_self_custom_tx_evt (&tc->connection, 0);
+      tc->flags |= TCP_CONN_FRXT_PENDING;
     }
-  _vec_len (wrk->pending_acks) = 0;
 }
 
 /**
@@ -1281,7 +1243,6 @@
 void
 tcp_send_window_update_ack (tcp_connection_t * tc)
 {
-  tcp_worker_ctx_t *wrk = tcp_get_worker (tc->c_thread_index);
   u32 win;
 
   if (tcp_zero_rwnd_sent (tc))
@@ -1290,7 +1251,7 @@
       if (win > 0)
 	{
 	  tcp_zero_rwnd_sent_off (tc);
-	  tcp_program_ack (wrk, tc);
+	  tcp_program_ack (tc);
 	}
     }
 }
@@ -1853,7 +1814,7 @@
   snd_space = tcp_available_cc_snd_space (tc);
   if (snd_space < tc->snd_mss)
     {
-      tcp_program_fastretransmit (wrk, tc);
+      tcp_program_fastretransmit (tc);
       return 0;
     }
 
@@ -1877,7 +1838,7 @@
 				     snd_space / tc->snd_mss);
 	      n_segs_now = tcp_fast_retransmit_unsent (wrk, tc, burst_size);
 	      if (max_deq > n_segs_now * tc->snd_mss)
-		tcp_program_fastretransmit (wrk, tc);
+		tcp_program_fastretransmit (tc);
 	      n_segs += n_segs_now;
 	      goto done;
 	    }
@@ -1929,7 +1890,7 @@
     }
 
   if (hole)
-    tcp_program_fastretransmit (wrk, tc);
+    tcp_program_fastretransmit (tc);
 
 done:
   return n_segs;
@@ -1990,7 +1951,7 @@
       burst_size = clib_min (burst_size - n_segs, snd_space / tc->snd_mss);
       n_segs_now = tcp_fast_retransmit_unsent (wrk, tc, burst_size);
       if (max_deq > n_segs_now * tc->snd_mss)
-	tcp_program_fastretransmit (wrk, tc);
+	tcp_program_fastretransmit (tc);
       n_segs += n_segs_now;
     }
 
@@ -2011,6 +1972,108 @@
   else
     return tcp_fast_retransmit_no_sack (wrk, tc, burst_size);
 }
+
+static int
+tcp_send_acks (tcp_connection_t * tc, u32 max_burst_size)
+{
+  int j, n_acks;
+
+  if (!tc->pending_dupacks)
+    {
+      tcp_send_ack (tc);
+      return 1;
+    }
+
+  /* If we're supposed to send dupacks but have no ooo data
+   * send only one ack */
+  if (!vec_len (tc->snd_sacks))
+    {
+      tcp_send_ack (tc);
+      return 1;
+    }
+
+  /* Start with first sack block */
+  tc->snd_sack_pos = 0;
+
+  /* Generate enough dupacks to cover all sack blocks. Do not generate
+   * more sacks than the number of packets received. But do generate at
+   * least 3, i.e., the number needed to signal congestion, if needed. */
+  n_acks = vec_len (tc->snd_sacks) / TCP_OPTS_MAX_SACK_BLOCKS;
+  n_acks = clib_min (n_acks, tc->pending_dupacks);
+  n_acks = clib_max (n_acks, clib_min (tc->pending_dupacks, 3));
+  for (j = 0; j < clib_min (n_acks, max_burst_size); j++)
+    tcp_send_ack (tc);
+
+  if (n_acks < max_burst_size)
+    {
+      tc->pending_dupacks = 0;
+      tc->snd_sack_pos = 0;
+      return n_acks;
+    }
+  else
+    {
+      TCP_DBG ("constrained by burst size");
+      tc->pending_dupacks = n_acks - max_burst_size;
+      tcp_program_dupack (tc);
+      return max_burst_size;
+    }
+}
+
+static int
+tcp_do_fastretransmit (tcp_connection_t * tc, u32 max_burst_size)
+{
+  u32 n_segs = 0, burst_size, sent_bytes, burst_bytes;
+  tcp_worker_ctx_t *wrk;
+
+  wrk = tcp_get_worker (tc->c_thread_index);
+  burst_bytes = transport_connection_tx_pacer_burst (&tc->connection,
+						     wrk->vm->
+						     clib_time.last_cpu_time);
+  burst_size = clib_min (max_burst_size, burst_bytes / tc->snd_mss);
+  if (!burst_size)
+    {
+      tcp_program_fastretransmit (tc);
+      return 0;
+    }
+
+  n_segs = tcp_fast_retransmit (wrk, tc, burst_size);
+  sent_bytes = clib_min (n_segs * tc->snd_mss, burst_bytes);
+  transport_connection_tx_pacer_update_bytes (&tc->connection, sent_bytes);
+  return n_segs;
+}
+
+int
+tcp_session_custom_tx (void *conn, u32 max_burst_size)
+{
+  tcp_connection_t *tc = (tcp_connection_t *) conn;
+  u32 n_segs = 0;
+
+  if (tcp_in_fastrecovery (tc) && (tc->flags & TCP_CONN_FRXT_PENDING))
+    {
+      tc->flags &= ~TCP_CONN_FRXT_PENDING;
+      n_segs = tcp_do_fastretransmit (tc, max_burst_size);
+      max_burst_size -= n_segs;
+    }
+
+  if (!(tc->flags & TCP_CONN_SNDACK))
+    return n_segs;
+
+  tc->flags &= ~TCP_CONN_SNDACK;
+
+  /* We have retransmitted packets and no dupack */
+  if (n_segs && !tc->pending_dupacks)
+    return n_segs;
+
+  if (!max_burst_size)
+    {
+      tcp_program_ack (tc);
+      return max_burst_size;
+    }
+
+  n_segs += tcp_send_acks (tc, max_burst_size);
+
+  return n_segs;
+}
 #endif /* CLIB_MARCH_VARIANT */
 
 static void
diff --git a/src/vnet/tls/tls.c b/src/vnet/tls/tls.c
index 4a9ec4e..f7780fe 100644
--- a/src/vnet/tls/tls.c
+++ b/src/vnet/tls/tls.c
@@ -660,7 +660,7 @@
 }
 
 int
-tls_custom_tx_callback (void *session)
+tls_custom_tx_callback (void *session, u32 max_burst_size)
 {
   session_t *app_session = (session_t *) session;
   tls_ctx_t *ctx;