session: use msg queue for events

Change-Id: I3c58367eec2243fe19b75be78a175c5261863e9e
Signed-off-by: Florin Coras <fcoras@cisco.com>
diff --git a/src/vnet/session/application.c b/src/vnet/session/application.c
index 60019dd..6041b49 100644
--- a/src/vnet/session/application.c
+++ b/src/vnet/session/application.c
@@ -807,6 +807,143 @@
   return &app->sm_properties;
 }
 
+static inline int
+app_enqueue_evt (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, u8 lock)
+{
+  if (PREDICT_TRUE (!svm_msg_q_is_full (mq)))
+    {
+      if (lock)
+	{
+	  svm_msg_q_add_w_lock (mq, msg);
+	  svm_msg_q_unlock (mq);
+	}
+      else if (svm_msg_q_add (mq, msg, SVM_Q_WAIT))
+	{
+	  clib_warning ("msg q add returned");
+	  if (lock)
+	    svm_msg_q_unlock (mq);
+	  return -1;
+	}
+    }
+  else
+    {
+      clib_warning ("evt q full");
+      svm_msg_q_free_msg (mq, msg);
+      if (lock)
+	svm_msg_q_unlock (mq);
+      return -1;
+    }
+  return 0;
+}
+
+static inline int
+app_send_io_evt_rx (application_t * app, stream_session_t * s, u8 lock)
+{
+  session_fifo_event_t *evt;
+  svm_msg_q_msg_t msg;
+  svm_msg_q_t *mq;
+
+  if (PREDICT_FALSE (s->session_state == SESSION_STATE_CLOSED))
+    {
+      /* Session is closed so app will never clean up. Flush rx fifo */
+      svm_fifo_dequeue_drop_all (s->server_rx_fifo);
+      return 0;
+    }
+
+  /* Built-in app? Hand event to the callback... */
+  if (app->cb_fns.builtin_app_rx_callback)
+    return app->cb_fns.builtin_app_rx_callback (s);
+
+  /* If no need for event, return */
+  if (!svm_fifo_set_event (s->server_rx_fifo))
+    return 0;
+
+  mq = app->event_queue;
+  if (lock)
+    svm_msg_q_lock (mq);
+
+  if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
+    {
+      clib_warning ("evt q rings full");
+      if (lock)
+	svm_msg_q_unlock (mq);
+      return -1;
+    }
+
+  msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
+  ASSERT (!svm_msg_q_msg_is_invalid (&msg));
+
+  evt = (session_fifo_event_t *) svm_msg_q_msg_data (mq, &msg);
+  evt->fifo = s->server_rx_fifo;
+  evt->event_type = FIFO_EVENT_APP_RX;
+
+  return app_enqueue_evt (mq, &msg, lock);
+}
+
+static inline int
+app_send_io_evt_tx (application_t * app, stream_session_t * s, u8 lock)
+{
+  svm_msg_q_t *mq;
+  session_fifo_event_t *evt;
+  svm_msg_q_msg_t msg;
+
+  if (application_is_builtin (app))
+    return 0;
+
+  mq = app->event_queue;
+  if (lock)
+    svm_msg_q_lock (mq);
+
+  if (PREDICT_FALSE (svm_msg_q_ring_is_full (mq, SESSION_MQ_IO_EVT_RING)))
+    {
+      clib_warning ("evt q rings full");
+      if (lock)
+	svm_msg_q_unlock (mq);
+      return -1;
+    }
+
+  msg = svm_msg_q_alloc_msg_w_ring (mq, SESSION_MQ_IO_EVT_RING);
+  ASSERT (!svm_msg_q_msg_is_invalid (&msg));
+
+  evt = (session_fifo_event_t *) svm_msg_q_msg_data (mq, &msg);
+  evt->event_type = FIFO_EVENT_APP_TX;
+  evt->fifo = s->server_tx_fifo;
+
+  return app_enqueue_evt (mq, &msg, lock);
+}
+
+/* *INDENT-OFF* */
+typedef int (app_send_evt_handler_fn) (application_t *app,
+				       stream_session_t *s,
+				       u8 lock);
+static app_send_evt_handler_fn * const app_send_evt_handler_fns[2] = {
+    app_send_io_evt_rx,
+    app_send_io_evt_tx,
+};
+/* *INDENT-ON* */
+
+/**
+ * Send event to application
+ *
+ * Logic from queue perspective is non-blocking. That is, if there's
+ * not enough space to enqueue a message, we return. However, if the lock
+ * flag is set, we do wait for queue mutex.
+ */
+int
+application_send_event (application_t * app, stream_session_t * s,
+			u8 evt_type)
+{
+  ASSERT (app && evt_type <= FIFO_EVENT_APP_TX);
+  return app_send_evt_handler_fns[evt_type] (app, s, 0 /* lock */ );
+}
+
+int
+application_lock_and_send_event (application_t * app, stream_session_t * s,
+				 u8 evt_type)
+{
+  return app_send_evt_handler_fns[evt_type] (app, s, 1 /* lock */ );
+}
+
 local_session_t *
 application_alloc_local_session (application_t * app)
 {
@@ -949,14 +1086,14 @@
   svm_fifo_segment_private_t *seg;
   segment_manager_t *sm;
   local_session_t *ls;
-  svm_queue_t *sq, *cq;
+  svm_msg_q_t *sq, *cq;
 
   ls = application_alloc_local_session (server);
 
   props = application_segment_manager_properties (server);
   cprops = application_segment_manager_properties (client);
   evt_q_elts = props->evt_q_size + cprops->evt_q_size;
-  evt_q_sz = evt_q_elts * sizeof (session_fifo_event_t);
+  evt_q_sz = segment_manager_evt_q_expected_size (evt_q_elts);
   seg_size = props->rx_fifo_size + props->tx_fifo_size + evt_q_sz + margin;
 
   has_transport = session_has_transport ((stream_session_t *) ll);