session tcp udp: consolidate transport snd apis

Type: improvement

Use only one api to retrieve transport send parameters. Additionally,
allow transports to request postponing and descheduling of events.

With this, tcp now requests descheduling of sessions when the
connections are stuck probing for zero snd_wnd

Signed-off-by: Florin Coras <fcoras@cisco.com>
Change-Id: I722c974f3e68fa15424c519a1fffacda43af050c
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c
index e9cda36..15d949c 100644
--- a/src/vnet/session/session.c
+++ b/src/vnet/session/session.c
@@ -145,6 +145,19 @@
     }
 }
 
+void
+sesssion_reschedule_tx (transport_connection_t * tc)
+{
+  session_worker_t *wrk = session_main_get_worker (tc->thread_index);
+  session_evt_elt_t *elt;
+
+  ASSERT (tc->thread_index == vlib_get_thread_index ());
+
+  elt = session_evt_alloc_new (wrk);
+  elt->evt.session_index = tc->s_index;
+  elt->evt.event_type = SESSION_IO_EVT_TX;
+}
+
 static void
 session_program_transport_ctrl_evt (session_t * s, session_evt_type_t evt)
 {
diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h
index e856372..777984b 100644
--- a/src/vnet/session/session.h
+++ b/src/vnet/session/session.h
@@ -48,14 +48,12 @@
   session_t *s;
   transport_proto_vft_t *transport_vft;
   transport_connection_t *tc;
+  transport_send_params_t sp;
   u32 max_dequeue;
-  u32 snd_space;
   u32 left_to_snd;
-  u32 tx_offset;
   u32 max_len_to_snd;
   u16 deq_per_first_buf;
   u16 deq_per_buf;
-  u16 snd_mss;
   u16 n_segs_per_evt;
   u8 n_bufs_per_seg;
     CLIB_CACHE_LINE_ALIGN_MARK (cacheline1);
@@ -429,6 +427,7 @@
 					   void *rpc_args);
 void session_add_self_custom_tx_evt (transport_connection_t * tc,
 				     u8 has_prio);
+void sesssion_reschedule_tx (transport_connection_t * tc);
 transport_connection_t *session_get_transport (session_t * s);
 void session_get_endpoint (session_t * s, transport_endpoint_t * tep,
 			   u8 is_lcl);
diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c
index ad24f42..b1c2428 100644
--- a/src/vnet/session/session_node.c
+++ b/src/vnet/session/session_node.c
@@ -567,7 +567,7 @@
   b->total_length_not_including_first_buffer = 0;
 
   chain_b = b;
-  left_from_seg = clib_min (ctx->snd_mss - b->current_length,
+  left_from_seg = clib_min (ctx->sp.snd_mss - b->current_length,
 			    ctx->left_to_snd);
   to_deq = left_from_seg;
   for (j = 1; j < ctx->n_bufs_per_seg; j++)
@@ -583,8 +583,8 @@
       if (peek_data)
 	{
 	  n_bytes_read = svm_fifo_peek (ctx->s->tx_fifo,
-					ctx->tx_offset, len_to_deq, data);
-	  ctx->tx_offset += n_bytes_read;
+					ctx->sp.tx_offset, len_to_deq, data);
+	  ctx->sp.tx_offset += n_bytes_read;
 	}
       else
 	{
@@ -651,12 +651,12 @@
 
   if (peek_data)
     {
-      n_bytes_read = svm_fifo_peek (ctx->s->tx_fifo, ctx->tx_offset,
+      n_bytes_read = svm_fifo_peek (ctx->s->tx_fifo, ctx->sp.tx_offset,
 				    len_to_deq, data0);
       ASSERT (n_bytes_read > 0);
       /* Keep track of progress locally, transport is also supposed to
        * increment it independently when pushing the header */
-      ctx->tx_offset += n_bytes_read;
+      ctx->sp.tx_offset += n_bytes_read;
     }
   else
     {
@@ -756,13 +756,12 @@
   if (peek_data)
     {
       /* Offset in rx fifo from where to peek data */
-      ctx->tx_offset = ctx->transport_vft->tx_fifo_offset (ctx->tc);
-      if (PREDICT_FALSE (ctx->tx_offset >= ctx->max_dequeue))
+      if (PREDICT_FALSE (ctx->sp.tx_offset >= ctx->max_dequeue))
 	{
 	  ctx->max_len_to_snd = 0;
 	  return;
 	}
-      ctx->max_dequeue -= ctx->tx_offset;
+      ctx->max_dequeue -= ctx->sp.tx_offset;
     }
   else
     {
@@ -782,34 +781,34 @@
   ASSERT (ctx->max_dequeue > 0);
 
   /* Ensure we're not writing more than transport window allows */
-  if (ctx->max_dequeue < ctx->snd_space)
+  if (ctx->max_dequeue < ctx->sp.snd_space)
     {
       /* Constrained by tx queue. Try to send only fully formed segments */
-      ctx->max_len_to_snd =
-	(ctx->max_dequeue > ctx->snd_mss) ?
-	ctx->max_dequeue - ctx->max_dequeue % ctx->snd_mss : ctx->max_dequeue;
+      ctx->max_len_to_snd = (ctx->max_dequeue > ctx->sp.snd_mss) ?
+	(ctx->max_dequeue - (ctx->max_dequeue % ctx->sp.snd_mss)) :
+	ctx->max_dequeue;
       /* TODO Nagle ? */
     }
   else
     {
       /* Expectation is that snd_space0 is already a multiple of snd_mss */
-      ctx->max_len_to_snd = ctx->snd_space;
+      ctx->max_len_to_snd = ctx->sp.snd_space;
     }
 
   /* Check if we're tx constrained by the node */
-  ctx->n_segs_per_evt = ceil ((f64) ctx->max_len_to_snd / ctx->snd_mss);
+  ctx->n_segs_per_evt = ceil ((f64) ctx->max_len_to_snd / ctx->sp.snd_mss);
   if (ctx->n_segs_per_evt > max_segs)
     {
       ctx->n_segs_per_evt = max_segs;
-      ctx->max_len_to_snd = max_segs * ctx->snd_mss;
+      ctx->max_len_to_snd = max_segs * ctx->sp.snd_mss;
     }
 
   n_bytes_per_buf = vlib_buffer_get_default_data_size (vm);
   ASSERT (n_bytes_per_buf > TRANSPORT_MAX_HDRS_LEN);
-  n_bytes_per_seg = TRANSPORT_MAX_HDRS_LEN + ctx->snd_mss;
+  n_bytes_per_seg = TRANSPORT_MAX_HDRS_LEN + ctx->sp.snd_mss;
   ctx->n_bufs_per_seg = ceil ((f64) n_bytes_per_seg / n_bytes_per_buf);
-  ctx->deq_per_buf = clib_min (ctx->snd_mss, n_bytes_per_buf);
-  ctx->deq_per_first_buf = clib_min (ctx->snd_mss,
+  ctx->deq_per_buf = clib_min (ctx->sp.snd_mss, n_bytes_per_buf);
+  ctx->deq_per_first_buf = clib_min (ctx->sp.snd_mss,
 				     n_bytes_per_buf -
 				     TRANSPORT_MAX_HDRS_LEN);
 }
@@ -817,12 +816,12 @@
 always_inline void
 session_tx_maybe_reschedule (session_worker_t * wrk,
 			     session_tx_context_t * ctx,
-			     session_evt_elt_t * elt, u8 is_peek)
+			     session_evt_elt_t * elt)
 {
   session_t *s = ctx->s;
 
   svm_fifo_unset_event (s->tx_fifo);
-  if (svm_fifo_max_dequeue_cons (s->tx_fifo) > (is_peek ? ctx->tx_offset : 0))
+  if (svm_fifo_max_dequeue_cons (s->tx_fifo) > ctx->sp.tx_offset)
     if (svm_fifo_set_event (s->tx_fifo))
       session_evt_add_head_old (wrk, elt);
 }
@@ -880,20 +879,23 @@
 	}
     }
 
-  ctx->snd_mss = ctx->transport_vft->send_mss (ctx->tc);
-  if (PREDICT_FALSE (ctx->snd_mss == 0))
-    {
-      session_evt_add_old (wrk, elt);
-      return SESSION_TX_NO_DATA;
-    }
+  transport_connection_snd_params (ctx->tc, &ctx->sp);
 
-  ctx->snd_space = transport_connection_snd_space (ctx->tc);
-
-  /* This flow queue is "empty" so it should be re-evaluated before
-   * the ones that have data to send. */
-  if (!ctx->snd_space)
+  if (!ctx->sp.snd_space)
     {
-      session_evt_add_head_old (wrk, elt);
+      /* This flow queue is "empty" so it should be re-evaluated before
+       * the ones that have data to send. */
+      if (PREDICT_TRUE (!ctx->sp.flags))
+	session_evt_add_head_old (wrk, elt);
+      /* Request to postpone the session, e.g., zero-wnd and transport
+       * is not currently probing */
+      else if (ctx->sp.flags & TRANSPORT_SND_F_POSTPONE)
+	session_evt_add_old (wrk, elt);
+      /* If the deschedule flag was set, remove session from scheduler.
+       * Transport is responsible for rescheduling this session. */
+      else
+	transport_connection_deschedule (ctx->tc);
+
       return SESSION_TX_NO_DATA;
     }
 
@@ -905,9 +907,9 @@
 	  session_evt_add_head_old (wrk, elt);
 	  return SESSION_TX_NO_DATA;
 	}
-      snd_space = clib_min (ctx->snd_space, snd_space);
-      ctx->snd_space = snd_space >= ctx->snd_mss ?
-	snd_space - snd_space % ctx->snd_mss : snd_space;
+      snd_space = clib_min (ctx->sp.snd_space, snd_space);
+      ctx->sp.snd_space = snd_space >= ctx->sp.snd_mss ?
+	snd_space - snd_space % ctx->sp.snd_mss : snd_space;
     }
 
   /* Check how much we can pull. */
@@ -916,7 +918,7 @@
   if (PREDICT_FALSE (!ctx->max_len_to_snd))
     {
       transport_connection_tx_pacer_reset_bucket (ctx->tc, 0);
-      session_tx_maybe_reschedule (wrk, ctx, elt, peek_data);
+      session_tx_maybe_reschedule (wrk, ctx, elt);
       return SESSION_TX_NO_DATA;
     }
 
@@ -1019,7 +1021,7 @@
   if (ctx->max_len_to_snd < ctx->max_dequeue)
     session_evt_add_old (wrk, elt);
   else
-    session_tx_maybe_reschedule (wrk, ctx, elt, peek_data);
+    session_tx_maybe_reschedule (wrk, ctx, elt);
 
   if (!peek_data
       && ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM)
diff --git a/src/vnet/session/transport.c b/src/vnet/session/transport.c
index c8c5835..e27aaf3 100644
--- a/src/vnet/session/transport.c
+++ b/src/vnet/session/transport.c
@@ -103,6 +103,8 @@
       indent = format_get_indent (s) + 1;
       s = format (s, "%Upacer: %U\n", format_white_space, indent,
 		  format_transport_pacer, &tc->pacer, tc->thread_index);
+      s = format (s, "%Utransport: flags 0x%x\n", format_white_space, indent,
+		  tc->flags);
     }
   return s;
 }
@@ -720,6 +722,22 @@
 }
 
 void
+transport_connection_reschedule (transport_connection_t * tc)
+{
+  tc->flags &= ~TRANSPORT_CONNECTION_F_DESCHED;
+  if (transport_max_tx_dequeue (tc))
+    sesssion_reschedule_tx (tc);
+  else
+    {
+      session_t *s = session_get (tc->s_index, tc->thread_index);
+      svm_fifo_unset_event (s->tx_fifo);
+      if (svm_fifo_max_dequeue_cons (s->tx_fifo))
+	if (svm_fifo_set_event (s->tx_fifo))
+	  sesssion_reschedule_tx (tc);
+    }
+}
+
+void
 transport_update_time (clib_time_type_t time_now, u8 thread_index)
 {
   transport_proto_vft_t *vft;
diff --git a/src/vnet/session/transport.h b/src/vnet/session/transport.h
index adc695f..b2be990 100644
--- a/src/vnet/session/transport.h
+++ b/src/vnet/session/transport.h
@@ -32,6 +32,21 @@
   u8 half_open_has_fifos;
 } transport_options_t;
 
+typedef enum transport_snd_flags_
+{
+  TRANSPORT_SND_F_DESCHED = 1 << 0,
+  TRANSPORT_SND_F_POSTPONE = 1 << 1,
+  TRANSPORT_SND_N_FLAGS
+} __clib_packed transport_snd_flags_t;
+
+typedef struct transport_send_params_
+{
+  u32 snd_space;
+  u32 tx_offset;
+  u16 snd_mss;
+  transport_snd_flags_t flags;
+} transport_send_params_t;
+
 /*
  * Transport protocol virtual function table
  */
@@ -54,9 +69,8 @@
    */
 
   u32 (*push_header) (transport_connection_t * tconn, vlib_buffer_t * b);
-  u16 (*send_mss) (transport_connection_t * tc);
-  u32 (*send_space) (transport_connection_t * tc);
-  u32 (*tx_fifo_offset) (transport_connection_t * tc);
+  int (*send_params) (transport_connection_t * tconn,
+		      transport_send_params_t *sp);
   void (*update_time) (f64 time_now, u8 thread_index);
   void (*flush_data) (transport_connection_t *tconn);
   int (*custom_tx) (void *session, u32 max_burst_size);
@@ -151,16 +165,38 @@
 }
 
 /**
- * Get maximum tx burst allowed for transport connection
+ * Get send parameters for transport connection
+ *
+ * These include maximum tx burst, mss, tx offset and other flags
+ * transport might want to provide to sessin layer
  *
  * @param tc		transport connection
+ * @param sp		send paramaters
+ *
  */
 static inline u32
-transport_connection_snd_space (transport_connection_t * tc)
+transport_connection_snd_params (transport_connection_t * tc,
+				 transport_send_params_t * sp)
 {
-  return tp_vfts[tc->proto].send_space (tc);
+  return tp_vfts[tc->proto].send_params (tc, sp);
 }
 
+static inline u8
+transport_connection_is_descheduled (transport_connection_t * tc)
+{
+  if (tc->flags & TRANSPORT_CONNECTION_F_DESCHED)
+    return 1;
+  return 0;
+}
+
+static inline void
+transport_connection_deschedule (transport_connection_t * tc)
+{
+  tc->flags |= TRANSPORT_CONNECTION_F_DESCHED;
+}
+
+void transport_connection_reschedule (transport_connection_t * tc);
+
 void transport_register_protocol (transport_proto_t transport_proto,
 				  const transport_proto_vft_t * vft,
 				  fib_protocol_t fib_proto, u32 output_node);
diff --git a/src/vnet/session/transport_types.h b/src/vnet/session/transport_types.h
index 459fb0c..323d261 100644
--- a/src/vnet/session/transport_types.h
+++ b/src/vnet/session/transport_types.h
@@ -43,9 +43,15 @@
 typedef enum transport_connection_flags_
 {
   TRANSPORT_CONNECTION_F_IS_TX_PACED = 1 << 0,
-  TRANSPORT_CONNECTION_F_NO_LOOKUP = 1 << 1, /**< Don't register connection in lookup
-						  Does not apply to local apps and
-						  transports using the network layer (udp/tcp) */
+  /**
+   * Don't register connection in lookup. Does not apply to local apps
+   * and transports using the network layer (udp/tcp)
+   */
+  TRANSPORT_CONNECTION_F_NO_LOOKUP = 1 << 1,
+  /**
+   * Connection descheduled by the session layer.
+   */
+  TRANSPORT_CONNECTION_F_DESCHED = 1 << 2,
 } transport_connection_flags_t;
 
 typedef struct _spacer