vcl: use events for epoll/select/read/write
Have vcl poll and wait on the event message queues as opposed to
constantly polling the session fifos. This also adds event signaling to
cut through sessions.
On the downside, because we can't wait on multiple condvars, i.e., when
we have multiple message queues because of cut-through registrations, we
do timed waits.
Change-Id: I29ade95dba449659fe46008bb1af502276a7c5fd
Signed-off-by: Florin Coras <fcoras@cisco.com>
diff --git a/src/svm/message_queue.c b/src/svm/message_queue.c
index 0f9be9c..1b2d2e1 100644
--- a/src/svm/message_queue.c
+++ b/src/svm/message_queue.c
@@ -73,6 +73,7 @@
ring = &mq->rings[i];
ring->elsize = cfg->ring_cfgs[i].elsize;
ring->nitems = cfg->ring_cfgs[i].nitems;
+ ring->cursize = ring->head = ring->tail = 0;
if (cfg->ring_cfgs[i].data)
ring->data = cfg->ring_cfgs[i].data;
else
@@ -97,10 +98,10 @@
svm_msg_q_msg_t
svm_msg_q_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index)
{
- svm_msg_q_msg_t msg = {.as_u64 = ~0 };
+ svm_msg_q_msg_t msg;
svm_msg_q_ring_t *ring = svm_msg_q_ring_inline (mq, ring_index);
- ASSERT (ring->cursize != ring->nitems);
+ ASSERT (ring->cursize < ring->nitems);
msg.ring_index = ring - mq->rings;
msg.elt_index = ring->tail;
ring->tail = (ring->tail + 1) % ring->nitems;
@@ -131,12 +132,9 @@
else
{
svm_msg_q_lock (mq);
+ while (svm_msg_q_ring_is_full (mq, ring_index))
+ svm_msg_q_wait (mq);
*msg = svm_msg_q_alloc_msg_w_ring (mq, ring_index);
- while (svm_msg_q_msg_is_invalid (msg))
- {
- svm_msg_q_wait (mq);
- *msg = svm_msg_q_alloc_msg_w_ring (mq, ring_index);
- }
}
return 0;
}
@@ -190,18 +188,20 @@
static int
svm_msq_q_msg_is_valid (svm_msg_q_t * mq, svm_msg_q_msg_t * msg)
{
+ u32 dist1, dist2, tail, head;
svm_msg_q_ring_t *ring;
- u32 dist1, dist2;
if (vec_len (mq->rings) <= msg->ring_index)
return 0;
ring = &mq->rings[msg->ring_index];
+ tail = ring->tail;
+ head = ring->head;
- dist1 = ((ring->nitems + msg->elt_index) - ring->head) % ring->nitems;
- if (ring->tail == ring->head)
+ dist1 = ((ring->nitems + msg->elt_index) - head) % ring->nitems;
+ if (tail == head)
dist2 = (ring->cursize == 0) ? 0 : ring->nitems;
else
- dist2 = ((ring->nitems + ring->tail) - ring->head) % ring->nitems;
+ dist2 = ((ring->nitems + tail) - head) % ring->nitems;
return (dist1 < dist2);
}
diff --git a/src/svm/message_queue.h b/src/svm/message_queue.h
index e4a5f07..4c16c97 100644
--- a/src/svm/message_queue.h
+++ b/src/svm/message_queue.h
@@ -22,6 +22,7 @@
#include <vppinfra/clib.h>
#include <vppinfra/error.h>
+#include <vppinfra/time.h>
#include <svm/queue.h>
typedef struct svm_msg_q_ring_
@@ -274,12 +275,6 @@
return pthread_mutex_lock (&mq->q->mutex);
}
-static inline void
-svm_msg_q_wait (svm_msg_q_t * mq)
-{
- pthread_cond_wait (&mq->q->condvar, &mq->q->mutex);
-}
-
/**
* Unlock message queue
*/
@@ -292,6 +287,37 @@
pthread_mutex_unlock (&mq->q->mutex);
}
+/**
+ * Wait for message queue event
+ *
+ * Must be called with mutex held
+ */
+static inline void
+svm_msg_q_wait (svm_msg_q_t * mq)
+{
+ pthread_cond_wait (&mq->q->condvar, &mq->q->mutex);
+}
+
+/**
+ * Timed wait for message queue event
+ *
+ * Must be called with mutex held.
+ *
+ * @param mq message queue
+ * @param timeout time in seconds
+ */
+static inline int
+svm_msg_q_timedwait (svm_msg_q_t * mq, double timeout)
+{
+ struct timespec ts;
+
+ ts.tv_sec = unix_time_now () + (u32) timeout;
+ ts.tv_nsec = (timeout - (u32) timeout) * 1e9;
+ if (pthread_cond_timedwait (&mq->q->condvar, &mq->q->mutex, &ts))
+ return -1;
+ return 0;
+}
+
#endif /* SRC_SVM_MESSAGE_QUEUE_H_ */
/*
diff --git a/src/svm/svm_fifo.c b/src/svm/svm_fifo.c
index 47df225..018827e 100644
--- a/src/svm/svm_fifo.c
+++ b/src/svm/svm_fifo.c
@@ -179,7 +179,7 @@
if (verbose > 1)
s = format
- (s, " server session %d thread %d client session %d thread %d\n",
+ (s, " vpp session %d thread %d app session %d thread %d\n",
f->master_session_index, f->master_thread_index,
f->client_session_index, f->client_thread_index);