session: send tx notification to app

Does some session node cleanup as well

Change-Id: Ifd52b07b28ba4dec1f6f729476decc76eb963837
Signed-off-by: Florin Coras <fcoras@cisco.com>
diff --git a/src/vnet/session/session.c b/src/vnet/session/session.c
index 6559e31..e486c2b 100644
--- a/src/vnet/session/session.c
+++ b/src/vnet/session/session.c
@@ -503,6 +503,32 @@
   return 0;
 }
 
+int
+session_dequeue_notify (stream_session_t * s)
+{
+  application_t *app;
+  svm_queue_t *q;
+
+  app = application_get (s->app_index);
+  if (application_is_builtin (app))
+    return 0;
+
+  q = app->event_queue;
+  if (PREDICT_TRUE (q->cursize < q->maxsize))
+    {
+      session_fifo_event_t evt = {
+	.event_type = FIFO_EVENT_APP_TX,
+	.fifo = s->server_tx_fifo
+      };
+      svm_queue_add (app->event_queue, (u8 *) & evt, SVM_Q_WAIT);
+    }
+  else
+    {
+      return -1;
+    }
+  return 0;
+}
+
 /**
  * Flushes queue of sessions that are to be notified of new data
  * enqueued events.
diff --git a/src/vnet/session/session.h b/src/vnet/session/session.h
index b57053c..fe3477b 100644
--- a/src/vnet/session/session.h
+++ b/src/vnet/session/session.h
@@ -517,6 +517,7 @@
 int session_dgram_connect_notify (transport_connection_t * tc,
 				  u32 old_thread_index,
 				  stream_session_t ** new_session);
+int session_dequeue_notify (stream_session_t * s);
 void stream_session_init_fifos_pointers (transport_connection_t * tc,
 					 u32 rx_pointer, u32 tx_pointer);
 
diff --git a/src/vnet/session/session_node.c b/src/vnet/session/session_node.c
index 4fd8d0e..0087463 100644
--- a/src/vnet/session/session_node.c
+++ b/src/vnet/session/session_node.c
@@ -65,6 +65,14 @@
 #undef _
 };
 
+enum
+{
+  SESSION_TX_NO_BUFFERS = -2,
+  SESSION_TX_NO_DATA,
+  SESSION_TX_OK
+};
+
+
 static void
 session_tx_trace_frame (vlib_main_t * vm, vlib_node_runtime_t * node,
 			u32 next_index, u32 * to_next, u16 n_segs,
@@ -378,7 +386,7 @@
     {
       if (rv < 2)
 	vec_add1 (smm->pending_event_vector[thread_index], *e);
-      return 0;
+      return SESSION_TX_NO_DATA;
     }
 
   next_index = smm->session_type_to_next[s->session_type];
@@ -393,7 +401,7 @@
   if (ctx->snd_space == 0 || ctx->snd_mss == 0)
     {
       vec_add1 (smm->pending_event_vector[thread_index], *e);
-      return 0;
+      return SESSION_TX_NO_DATA;
     }
 
   /* Allow enqueuing of a new event */
@@ -404,7 +412,7 @@
 				 peek_data);
 
   if (PREDICT_FALSE (!ctx->max_len_to_snd))
-    return 0;
+    return SESSION_TX_NO_DATA;
 
   n_bufs = vec_len (smm->tx_buffers[thread_index]);
   n_bufs_needed = ctx->n_segs_per_evt * ctx->n_bufs_per_seg;
@@ -419,7 +427,7 @@
       if (PREDICT_FALSE (n_bufs < n_bufs_needed))
 	{
 	  vec_add1 (smm->pending_event_vector[thread_index], *e);
-	  return -1;
+	  return SESSION_TX_NO_BUFFERS;
 	}
     }
 
@@ -519,35 +527,35 @@
 	if (svm_fifo_set_event (s->server_tx_fifo))
 	  vec_add1 (smm->pending_event_vector[thread_index], *e);
     }
-  return 0;
+  return SESSION_TX_OK;
 }
 
 int
 session_tx_fifo_peek_and_snd (vlib_main_t * vm, vlib_node_runtime_t * node,
-			      session_fifo_event_t * e0,
-			      stream_session_t * s0, int *n_tx_pkts)
+			      session_fifo_event_t * e,
+			      stream_session_t * s, int *n_tx_pkts)
 {
-  return session_tx_fifo_read_and_snd_i (vm, node, e0, s0, n_tx_pkts, 1);
+  return session_tx_fifo_read_and_snd_i (vm, node, e, s, n_tx_pkts, 1);
 }
 
 int
 session_tx_fifo_dequeue_and_snd (vlib_main_t * vm, vlib_node_runtime_t * node,
-				 session_fifo_event_t * e0,
-				 stream_session_t * s0, int *n_tx_pkts)
+				 session_fifo_event_t * e,
+				 stream_session_t * s, int *n_tx_pkts)
 {
-  return session_tx_fifo_read_and_snd_i (vm, node, e0, s0, n_tx_pkts, 0);
+  return session_tx_fifo_read_and_snd_i (vm, node, e, s, n_tx_pkts, 0);
 }
 
 int
 session_tx_fifo_dequeue_internal (vlib_main_t * vm,
 				  vlib_node_runtime_t * node,
-				  session_fifo_event_t * e0,
-				  stream_session_t * s0, int *n_tx_pkts)
+				  session_fifo_event_t * e,
+				  stream_session_t * s, int *n_tx_pkts)
 {
   application_t *app;
-  app = application_get (s0->opaque);
-  svm_fifo_unset_event (s0->server_tx_fifo);
-  return app->cb_fns.builtin_app_tx_callback (s0);
+  app = application_get (s->opaque);
+  svm_fifo_unset_event (s->server_tx_fifo);
+  return app->cb_fns.builtin_app_tx_callback (s);
 }
 
 always_inline stream_session_t *
@@ -556,6 +564,184 @@
   return session_get_if_valid (e->fifo->master_session_index, thread_index);
 }
 
+static uword
+session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
+		       vlib_frame_t * frame)
+{
+  session_manager_main_t *smm = vnet_get_session_manager_main ();
+  session_fifo_event_t *pending_events, *e;
+  session_fifo_event_t *fifo_events;
+  u32 n_to_dequeue, n_events;
+  svm_queue_t *q;
+  application_t *app;
+  int n_tx_packets = 0;
+  u32 thread_index = vm->thread_index;
+  int i, rv;
+  f64 now = vlib_time_now (vm);
+  void (*fp) (void *);
+
+  SESSION_EVT_DBG (SESSION_EVT_POLL_GAP_TRACK, smm, thread_index);
+
+  /*
+   *  Update transport time
+   */
+  transport_update_time (now, thread_index);
+
+  /*
+   * Get vpp queue events
+   */
+  q = smm->vpp_event_queues[thread_index];
+  if (PREDICT_FALSE (q == 0))
+    return 0;
+
+  fifo_events = smm->free_event_vector[thread_index];
+
+  /* min number of events we can dequeue without blocking */
+  n_to_dequeue = q->cursize;
+  pending_events = smm->pending_event_vector[thread_index];
+
+  if (!n_to_dequeue && !vec_len (pending_events)
+      && !vec_len (smm->pending_disconnects[thread_index]))
+    return 0;
+
+  SESSION_EVT_DBG (SESSION_EVT_DEQ_NODE, 0);
+
+  /*
+   * If we didn't manage to process previous events try going
+   * over them again without dequeuing new ones.
+   * XXX: Handle senders to sessions that can't keep up
+   */
+  if (0 && vec_len (pending_events) >= 100)
+    {
+      clib_warning ("too many fifo events unsolved");
+      goto skip_dequeue;
+    }
+
+  /* See you in the next life, don't be late
+   * XXX: we may need priorities here
+   */
+  if (pthread_mutex_trylock (&q->mutex))
+    return 0;
+
+  for (i = 0; i < n_to_dequeue; i++)
+    {
+      vec_add2 (fifo_events, e, 1);
+      svm_queue_sub_raw (q, (u8 *) e);
+    }
+
+  /* The other side of the connection is not polling */
+  if (q->cursize < (q->maxsize / 8))
+    (void) pthread_cond_broadcast (&q->condvar);
+  pthread_mutex_unlock (&q->mutex);
+
+  vec_append (fifo_events, pending_events);
+  vec_append (fifo_events, smm->pending_disconnects[thread_index]);
+
+  _vec_len (pending_events) = 0;
+  smm->pending_event_vector[thread_index] = pending_events;
+  _vec_len (smm->pending_disconnects[thread_index]) = 0;
+
+skip_dequeue:
+  n_events = vec_len (fifo_events);
+  for (i = 0; i < n_events; i++)
+    {
+      stream_session_t *s;	/* $$$ prefetch 1 ahead maybe */
+      session_fifo_event_t *e;
+
+      e = &fifo_events[i];
+      switch (e->event_type)
+	{
+	case FIFO_EVENT_APP_TX:
+	  /* Don't try to send more that one frame per dispatch cycle */
+	  if (n_tx_packets == VLIB_FRAME_SIZE)
+	    {
+	      vec_add1 (smm->pending_event_vector[thread_index], *e);
+	      break;
+	    }
+
+	  s = session_event_get_session (e, thread_index);
+	  if (PREDICT_FALSE (!s))
+	    {
+	      clib_warning ("It's dead, Jim!");
+	      continue;
+	    }
+
+	  /* Spray packets in per session type frames, since they go to
+	   * different nodes */
+	  rv = (smm->session_tx_fns[s->session_type]) (vm, node, e, s,
+						       &n_tx_packets);
+	  if (PREDICT_TRUE (rv == SESSION_TX_OK))
+	    {
+	      session_dequeue_notify (s);
+	    }
+	  else if (PREDICT_FALSE (rv == SESSION_TX_NO_BUFFERS))
+	    {
+	      vlib_node_increment_counter (vm, node->node_index,
+					   SESSION_QUEUE_ERROR_NO_BUFFER, 1);
+	      continue;
+	    }
+	  break;
+	case FIFO_EVENT_DISCONNECT:
+	  /* Make sure stream disconnects run after the pending list is
+	   * drained */
+	  s = session_get_from_handle (e->session_handle);
+	  if (!e->postponed)
+	    {
+	      e->postponed = 1;
+	      vec_add1 (smm->pending_disconnects[thread_index], *e);
+	      continue;
+	    }
+	  /* If tx queue is still not empty, wait */
+	  if (svm_fifo_max_dequeue (s->server_tx_fifo))
+	    {
+	      vec_add1 (smm->pending_disconnects[thread_index], *e);
+	      continue;
+	    }
+
+	  stream_session_disconnect_transport (s);
+	  break;
+	case FIFO_EVENT_BUILTIN_RX:
+	  s = session_event_get_session (e, thread_index);
+	  if (PREDICT_FALSE (!s))
+	    continue;
+	  svm_fifo_unset_event (s->server_rx_fifo);
+	  app = application_get (s->app_index);
+	  app->cb_fns.builtin_app_rx_callback (s);
+	  break;
+	case FIFO_EVENT_RPC:
+	  fp = e->rpc_args.fp;
+	  (*fp) (e->rpc_args.arg);
+	  break;
+
+	default:
+	  clib_warning ("unhandled event type %d", e->event_type);
+	}
+    }
+
+  _vec_len (fifo_events) = 0;
+  smm->free_event_vector[thread_index] = fifo_events;
+
+  vlib_node_increment_counter (vm, session_queue_node.index,
+			       SESSION_QUEUE_ERROR_TX, n_tx_packets);
+
+  SESSION_EVT_DBG (SESSION_EVT_DISPATCH_END, smm, thread_index);
+
+  return n_tx_packets;
+}
+
+/* *INDENT-OFF* */
+VLIB_REGISTER_NODE (session_queue_node) =
+{
+  .function = session_queue_node_fn,
+  .name = "session-queue",
+  .format_trace = format_session_queue_trace,
+  .type = VLIB_NODE_TYPE_INPUT,
+  .n_errors = ARRAY_LEN (session_queue_error_strings),
+  .error_strings = session_queue_error_strings,
+  .state = VLIB_NODE_STATE_DISABLED,
+};
+/* *INDENT-ON* */
+
 void
 dump_thread_0_event_queue (void)
 {
@@ -686,178 +872,6 @@
   return found;
 }
 
-static uword
-session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
-		       vlib_frame_t * frame)
-{
-  session_manager_main_t *smm = vnet_get_session_manager_main ();
-  session_fifo_event_t *my_pending_event_vector, *e;
-  session_fifo_event_t *my_fifo_events;
-  u32 n_to_dequeue, n_events;
-  svm_queue_t *q;
-  application_t *app;
-  int n_tx_packets = 0;
-  u32 thread_index = vm->thread_index;
-  int i, rv;
-  f64 now = vlib_time_now (vm);
-  void (*fp) (void *);
-
-  SESSION_EVT_DBG (SESSION_EVT_POLL_GAP_TRACK, smm, thread_index);
-
-  /*
-   *  Update transport time
-   */
-  transport_update_time (now, thread_index);
-
-  /*
-   * Get vpp queue events
-   */
-  q = smm->vpp_event_queues[thread_index];
-  if (PREDICT_FALSE (q == 0))
-    return 0;
-
-  my_fifo_events = smm->free_event_vector[thread_index];
-
-  /* min number of events we can dequeue without blocking */
-  n_to_dequeue = q->cursize;
-  my_pending_event_vector = smm->pending_event_vector[thread_index];
-
-  if (!n_to_dequeue && !vec_len (my_pending_event_vector)
-      && !vec_len (smm->pending_disconnects[thread_index]))
-    return 0;
-
-  SESSION_EVT_DBG (SESSION_EVT_DEQ_NODE, 0);
-
-  /*
-   * If we didn't manage to process previous events try going
-   * over them again without dequeuing new ones.
-   */
-  /* XXX: Block senders to sessions that can't keep up */
-  if (0 && vec_len (my_pending_event_vector) >= 100)
-    {
-      clib_warning ("too many fifo events unsolved");
-      goto skip_dequeue;
-    }
-
-  /* See you in the next life, don't be late */
-  if (pthread_mutex_trylock (&q->mutex))
-    return 0;
-
-  for (i = 0; i < n_to_dequeue; i++)
-    {
-      vec_add2 (my_fifo_events, e, 1);
-      svm_queue_sub_raw (q, (u8 *) e);
-    }
-
-  /* The other side of the connection is not polling */
-  if (q->cursize < (q->maxsize / 8))
-    (void) pthread_cond_broadcast (&q->condvar);
-  pthread_mutex_unlock (&q->mutex);
-
-  vec_append (my_fifo_events, my_pending_event_vector);
-  vec_append (my_fifo_events, smm->pending_disconnects[thread_index]);
-
-  _vec_len (my_pending_event_vector) = 0;
-  smm->pending_event_vector[thread_index] = my_pending_event_vector;
-  _vec_len (smm->pending_disconnects[thread_index]) = 0;
-
-skip_dequeue:
-  n_events = vec_len (my_fifo_events);
-  for (i = 0; i < n_events; i++)
-    {
-      stream_session_t *s0;	/* $$$ prefetch 1 ahead maybe */
-      session_fifo_event_t *e0;
-
-      e0 = &my_fifo_events[i];
-      switch (e0->event_type)
-	{
-	case FIFO_EVENT_APP_TX:
-	  if (n_tx_packets == VLIB_FRAME_SIZE)
-	    {
-	      vec_add1 (smm->pending_event_vector[thread_index], *e0);
-	      break;
-	    }
-
-	  s0 = session_event_get_session (e0, thread_index);
-	  if (PREDICT_FALSE (!s0))
-	    {
-	      clib_warning ("It's dead, Jim!");
-	      continue;
-	    }
-
-	  /* Spray packets in per session type frames, since they go to
-	   * different nodes */
-	  rv = (smm->session_tx_fns[s0->session_type]) (vm, node, e0, s0,
-							&n_tx_packets);
-	  /* Out of buffers */
-	  if (PREDICT_FALSE (rv < 0))
-	    {
-	      vlib_node_increment_counter (vm, node->node_index,
-					   SESSION_QUEUE_ERROR_NO_BUFFER, 1);
-	      continue;
-	    }
-	  break;
-	case FIFO_EVENT_DISCONNECT:
-	  /* Make sure stream disconnects run after the pending list is
-	   * drained */
-	  s0 = session_get_from_handle (e0->session_handle);
-	  if (!e0->postponed)
-	    {
-	      e0->postponed = 1;
-	      vec_add1 (smm->pending_disconnects[thread_index], *e0);
-	      continue;
-	    }
-	  /* If tx queue is still not empty, wait */
-	  if (svm_fifo_max_dequeue (s0->server_tx_fifo))
-	    {
-	      vec_add1 (smm->pending_disconnects[thread_index], *e0);
-	      continue;
-	    }
-
-	  stream_session_disconnect_transport (s0);
-	  break;
-	case FIFO_EVENT_BUILTIN_RX:
-	  s0 = session_event_get_session (e0, thread_index);
-	  if (PREDICT_FALSE (!s0))
-	    continue;
-	  svm_fifo_unset_event (s0->server_rx_fifo);
-	  app = application_get (s0->app_index);
-	  app->cb_fns.builtin_app_rx_callback (s0);
-	  break;
-	case FIFO_EVENT_RPC:
-	  fp = e0->rpc_args.fp;
-	  (*fp) (e0->rpc_args.arg);
-	  break;
-
-	default:
-	  clib_warning ("unhandled event type %d", e0->event_type);
-	}
-    }
-
-  _vec_len (my_fifo_events) = 0;
-  smm->free_event_vector[thread_index] = my_fifo_events;
-
-  vlib_node_increment_counter (vm, session_queue_node.index,
-			       SESSION_QUEUE_ERROR_TX, n_tx_packets);
-
-  SESSION_EVT_DBG (SESSION_EVT_DISPATCH_END, smm, thread_index);
-
-  return n_tx_packets;
-}
-
-/* *INDENT-OFF* */
-VLIB_REGISTER_NODE (session_queue_node) =
-{
-  .function = session_queue_node_fn,
-  .name = "session-queue",
-  .format_trace = format_session_queue_trace,
-  .type = VLIB_NODE_TYPE_INPUT,
-  .n_errors = ARRAY_LEN (session_queue_error_strings),
-  .error_strings = session_queue_error_strings,
-  .state = VLIB_NODE_STATE_DISABLED,
-};
-/* *INDENT-ON* */
-
 static clib_error_t *
 session_queue_exit (vlib_main_t * vm)
 {