blob: bd76eda5d880a6694497e963f2682d15617d0ea9 [file] [log] [blame]
Florin Coras65784c12018-07-04 04:17:41 -07001/*
Florin Corasc5df8c72019-04-08 07:42:30 -07002 * Copyright (c) 2018-2019 Cisco and/or its affiliates.
Florin Coras65784c12018-07-04 04:17:41 -07003 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15/**
16 * @file
17 * @brief Unidirectional shared-memory multi-ring message queue
18 */
19
20#ifndef SRC_SVM_MESSAGE_QUEUE_H_
21#define SRC_SVM_MESSAGE_QUEUE_H_
22
23#include <vppinfra/clib.h>
Florin Coras3c2fed52018-07-04 04:15:05 -070024#include <vppinfra/error.h>
Florin Coras5398dfb2021-01-25 20:31:27 -080025#include <vppinfra/lock.h>
Florin Coras65784c12018-07-04 04:17:41 -070026#include <svm/queue.h>
27
Florin Coras86f12322021-01-22 15:05:14 -080028typedef struct svm_msg_q_shr_queue_
29{
30 pthread_mutex_t mutex; /* 8 bytes */
31 pthread_cond_t condvar; /* 8 bytes */
32 u32 head;
33 u32 tail;
34 volatile u32 cursize;
35 u32 maxsize;
36 u32 elsize;
37 u32 pad;
38 u8 data[0];
39} svm_msg_q_shared_queue_t;
40
41typedef struct svm_msg_q_queue_
42{
43 svm_msg_q_shared_queue_t *shr; /**< pointer to shared queue */
44 int evtfd; /**< producer/consumer eventfd */
Florin Coras5398dfb2021-01-25 20:31:27 -080045 clib_spinlock_t lock; /**< private lock for multi-producer */
Florin Coras86f12322021-01-22 15:05:14 -080046} svm_msg_q_queue_t;
47
Florin Corasb4624182020-12-11 13:58:12 -080048typedef struct svm_msg_q_ring_shared_
Florin Coras65784c12018-07-04 04:17:41 -070049{
50 volatile u32 cursize; /**< current size of the ring */
51 u32 nitems; /**< max size of the ring */
Florin Coras3c2fed52018-07-04 04:15:05 -070052 volatile u32 head; /**< current head (for dequeue) */
53 volatile u32 tail; /**< current tail (for enqueue) */
Florin Coras65784c12018-07-04 04:17:41 -070054 u32 elsize; /**< size of an element */
Florin Corasb4624182020-12-11 13:58:12 -080055 u8 data[0]; /**< chunk of memory for msg data */
56} svm_msg_q_ring_shared_t;
57
58typedef struct svm_msg_q_ring_
59{
60 u32 nitems; /**< max size of the ring */
61 u32 elsize; /**< size of an element */
62 svm_msg_q_ring_shared_t *shr; /**< ring in shared memory */
Florin Corase91bdb32018-07-11 16:35:38 -070063} __clib_packed svm_msg_q_ring_t;
Florin Coras65784c12018-07-04 04:17:41 -070064
Florin Corasb4624182020-12-11 13:58:12 -080065typedef struct svm_msg_q_shared_
66{
Florin Coras86f12322021-01-22 15:05:14 -080067 u32 n_rings; /**< number of rings after q */
68 u32 pad; /**< 8 byte alignment for q */
69 svm_msg_q_shared_queue_t q[0]; /**< queue for exchanging messages */
Florin Corasb4624182020-12-11 13:58:12 -080070} __clib_packed svm_msg_q_shared_t;
71
Florin Coras65784c12018-07-04 04:17:41 -070072typedef struct svm_msg_q_
73{
Florin Coras86f12322021-01-22 15:05:14 -080074 svm_msg_q_queue_t q; /**< queue for exchanging messages */
Florin Coras65784c12018-07-04 04:17:41 -070075 svm_msg_q_ring_t *rings; /**< rings with message data*/
Florin Corase91bdb32018-07-11 16:35:38 -070076} __clib_packed svm_msg_q_t;
Florin Coras65784c12018-07-04 04:17:41 -070077
78typedef struct svm_msg_q_ring_cfg_
79{
80 u32 nitems;
81 u32 elsize;
82 void *data;
83} svm_msg_q_ring_cfg_t;
84
85typedef struct svm_msg_q_cfg_
86{
87 int consumer_pid; /**< pid of msg consumer */
88 u32 q_nitems; /**< msg queue size (not rings) */
89 u32 n_rings; /**< number of msg rings */
90 svm_msg_q_ring_cfg_t *ring_cfgs; /**< array of ring cfgs */
91} svm_msg_q_cfg_t;
92
93typedef union
94{
95 struct
96 {
97 u32 ring_index; /**< ring index, could be u8 */
98 u32 elt_index; /**< index in ring */
99 };
100 u64 as_u64;
101} svm_msg_q_msg_t;
102
Florin Coras3c2fed52018-07-04 04:15:05 -0700103#define SVM_MQ_INVALID_MSG { .as_u64 = ~0 }
Florin Coras5398dfb2021-01-25 20:31:27 -0800104
105typedef enum svm_msg_q_wait_type_
106{
107 SVM_MQ_WAIT_EMPTY,
108 SVM_MQ_WAIT_FULL
109} svm_msg_q_wait_type_t;
110
Florin Coras65784c12018-07-04 04:17:41 -0700111/**
112 * Allocate message queue
113 *
114 * Allocates a message queue on the heap. Based on the configuration options,
115 * apart from the message queue this also allocates (one or multiple)
116 * shared-memory rings for the messages.
117 *
118 * @param cfg configuration options: queue len, consumer pid,
119 * ring configs
120 * @return message queue
121 */
Florin Corasb4624182020-12-11 13:58:12 -0800122svm_msg_q_shared_t *svm_msg_q_alloc (svm_msg_q_cfg_t *cfg);
123svm_msg_q_shared_t *svm_msg_q_init (void *base, svm_msg_q_cfg_t *cfg);
Florin Coras213b1bb2020-12-07 14:33:58 -0800124uword svm_msg_q_size_to_alloc (svm_msg_q_cfg_t *cfg);
Florin Coras65784c12018-07-04 04:17:41 -0700125
Florin Corasb4624182020-12-11 13:58:12 -0800126void svm_msg_q_attach (svm_msg_q_t *mq, void *smq_base);
127
Florin Coras65784c12018-07-04 04:17:41 -0700128/**
Florin Coras8c517c82021-03-30 00:23:54 -0700129 * Cleanup mq's private data
130 */
131void svm_msg_q_cleanup (svm_msg_q_t *mq);
132
133/**
Florin Coras65784c12018-07-04 04:17:41 -0700134 * Free message queue
135 *
136 * @param mq message queue to be freed
137 */
138void svm_msg_q_free (svm_msg_q_t * mq);
139
140/**
141 * Allocate message buffer
142 *
143 * Message is allocated on the first available ring capable of holding
144 * the requested number of bytes.
145 *
146 * @param mq message queue
147 * @param nbytes number of bytes needed for message
148 * @return message structure pointing to the ring and position
149 * allocated
150 */
151svm_msg_q_msg_t svm_msg_q_alloc_msg (svm_msg_q_t * mq, u32 nbytes);
152
153/**
Florin Coras3c2fed52018-07-04 04:15:05 -0700154 * Allocate message buffer on ring
155 *
156 * Message is allocated, on requested ring. The caller MUST check that
157 * the ring is not full.
158 *
159 * @param mq message queue
160 * @param ring_index ring on which the allocation should occur
161 * @return message structure pointing to the ring and position
162 * allocated
163 */
164svm_msg_q_msg_t svm_msg_q_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index);
165
166/**
167 * Lock message queue and allocate message buffer on ring
168 *
169 * This should be used when multiple writers/readers are expected to
170 * compete for the rings/queue. Message should be enqueued by calling
171 * @ref svm_msg_q_add_w_lock and the caller MUST unlock the queue once
172 * the message in enqueued.
173 *
174 * @param mq message queue
175 * @param ring_index ring on which the allocation should occur
176 * @param noblock flag that indicates if request should block
177 * @param msg pointer to message to be filled in
178 * @return 0 on success, negative number otherwise
179 */
180int svm_msg_q_lock_and_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index,
181 u8 noblock, svm_msg_q_msg_t * msg);
182
183/**
Florin Coras65784c12018-07-04 04:17:41 -0700184 * Free message buffer
185 *
186 * Marks message buffer on ring as free.
187 *
188 * @param mq message queue
189 * @param msg message to be freed
190 */
191void svm_msg_q_free_msg (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
Florin Coras3c2fed52018-07-04 04:15:05 -0700192
Florin Coras65784c12018-07-04 04:17:41 -0700193/**
194 * Producer enqueue one message to queue
195 *
196 * Prior to calling this, the producer should've obtained a message buffer
197 * from one of the rings by calling @ref svm_msg_q_alloc_msg.
198 *
199 * @param mq message queue
200 * @param msg message (pointer to ring position) to be enqueued
201 * @param nowait flag to indicate if request is blocking or not
202 * @return success status
203 */
Florin Coras3c2fed52018-07-04 04:15:05 -0700204int svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, int nowait);
205
206/**
207 * Producer enqueue one message to queue with mutex held
208 *
209 * Prior to calling this, the producer should've obtained a message buffer
210 * from one of the rings by calling @ref svm_msg_q_alloc_msg. It assumes
211 * the queue mutex is held.
212 *
213 * @param mq message queue
214 * @param msg message (pointer to ring position) to be enqueued
215 * @return success status
216 */
Florin Coras52207f12018-07-12 14:48:06 -0700217void svm_msg_q_add_and_unlock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
Florin Coras65784c12018-07-04 04:17:41 -0700218
219/**
220 * Consumer dequeue one message from queue
221 *
222 * This returns the message pointing to the data in the message rings.
Florin Coras5398dfb2021-01-25 20:31:27 -0800223 * Should only be used in single consumer scenarios as no locks are grabbed.
Florin Coras65784c12018-07-04 04:17:41 -0700224 * The consumer is expected to call @ref svm_msg_q_free_msg once it
225 * finishes processing/copies the message data.
226 *
227 * @param mq message queue
228 * @param msg pointer to structure where message is to be received
229 * @param cond flag that indicates if request should block or not
Florin Coras3c2fed52018-07-04 04:15:05 -0700230 * @param time time to wait if condition it SVM_Q_TIMEDWAIT
Florin Coras65784c12018-07-04 04:17:41 -0700231 * @return success status
232 */
233int svm_msg_q_sub (svm_msg_q_t * mq, svm_msg_q_msg_t * msg,
234 svm_q_conditional_wait_t cond, u32 time);
235
236/**
Florin Coras5398dfb2021-01-25 20:31:27 -0800237 * Consumer dequeue one message from queue
Florin Coras3c2fed52018-07-04 04:15:05 -0700238 *
Florin Coras5398dfb2021-01-25 20:31:27 -0800239 * Returns the message pointing to the data in the message rings. Should only
240 * be used in single consumer scenarios as no locks are grabbed. The consumer
241 * is expected to call @ref svm_msg_q_free_msg once it finishes
Florin Coras3c2fed52018-07-04 04:15:05 -0700242 * processing/copies the message data.
243 *
244 * @param mq message queue
245 * @param msg pointer to structure where message is to be received
246 * @return success status
247 */
Florin Coras5398dfb2021-01-25 20:31:27 -0800248int svm_msg_q_sub_raw (svm_msg_q_t *mq, svm_msg_q_msg_t *elem);
249
250/**
251 * Consumer dequeue multiple messages from queue
252 *
253 * Returns the message pointing to the data in the message rings. Should only
254 * be used in single consumer scenarios as no locks are grabbed. The consumer
255 * is expected to call @ref svm_msg_q_free_msg once it finishes
256 * processing/copies the message data.
257 *
258 * @param mq message queue
259 * @param msg_buf pointer to array of messages to received
260 * @param n_msgs lengt of msg_buf array
261 * @return number of messages dequeued
262 */
263int svm_msg_q_sub_raw_batch (svm_msg_q_t *mq, svm_msg_q_msg_t *msg_buf,
264 u32 n_msgs);
Florin Coras3c2fed52018-07-04 04:15:05 -0700265
266/**
267 * Get data for message in queue
Florin Coras65784c12018-07-04 04:17:41 -0700268 *
269 * @param mq message queue
270 * @param msg message for which the data is requested
271 * @return pointer to data
272 */
273void *svm_msg_q_msg_data (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
274
Florin Coras3c2fed52018-07-04 04:15:05 -0700275/**
276 * Get message queue ring
277 *
278 * @param mq message queue
279 * @param ring_index index of ring
280 * @return pointer to ring
281 */
282svm_msg_q_ring_t *svm_msg_q_ring (svm_msg_q_t * mq, u32 ring_index);
283
284/**
Florin Coras86f12322021-01-22 15:05:14 -0800285 * Set event fd for queue
Florin Coras99368312018-08-02 10:45:44 -0700286 *
287 * If set, queue will exclusively use eventfds for signaling. Moreover,
288 * afterwards, the queue should only be used in non-blocking mode. Waiting
289 * for events should be done externally using something like epoll.
290 *
291 * @param mq message queue
292 * @param fd consumer eventfd
293 */
Florin Coras86f12322021-01-22 15:05:14 -0800294void svm_msg_q_set_eventfd (svm_msg_q_t *mq, int fd);
Florin Coras99368312018-08-02 10:45:44 -0700295
296/**
Florin Coras86f12322021-01-22 15:05:14 -0800297 * Allocate event fd for queue
Florin Coras99368312018-08-02 10:45:44 -0700298 */
Florin Coras86f12322021-01-22 15:05:14 -0800299int svm_msg_q_alloc_eventfd (svm_msg_q_t *mq);
Nathan Skrzypczakcfdb1092019-12-02 16:44:42 +0100300
301/**
302 * Format message queue, shows msg count for each ring
303 */
Florin Coras86f12322021-01-22 15:05:14 -0800304u8 *format_svm_msg_q (u8 *s, va_list *args);
305
306/**
307 * Check length of message queue
308 */
309static inline u32
310svm_msg_q_size (svm_msg_q_t *mq)
311{
312 return clib_atomic_load_relax_n (&mq->q.shr->cursize);
313}
Nathan Skrzypczakcfdb1092019-12-02 16:44:42 +0100314
Florin Coras99368312018-08-02 10:45:44 -0700315/**
Florin Coras3c2fed52018-07-04 04:15:05 -0700316 * Check if message queue is full
317 */
318static inline u8
319svm_msg_q_is_full (svm_msg_q_t * mq)
320{
Florin Coras86f12322021-01-22 15:05:14 -0800321 return (svm_msg_q_size (mq) == mq->q.shr->maxsize);
Florin Coras3c2fed52018-07-04 04:15:05 -0700322}
323
324static inline u8
325svm_msg_q_ring_is_full (svm_msg_q_t * mq, u32 ring_index)
326{
Florin Corasb4624182020-12-11 13:58:12 -0800327 svm_msg_q_ring_t *ring = vec_elt_at_index (mq->rings, ring_index);
Florin Coras86f12322021-01-22 15:05:14 -0800328 return (clib_atomic_load_relax_n (&ring->shr->cursize) >= ring->nitems);
Florin Coras3c2fed52018-07-04 04:15:05 -0700329}
330
331/**
332 * Check if message queue is empty
333 */
334static inline u8
335svm_msg_q_is_empty (svm_msg_q_t * mq)
336{
Florin Coras86f12322021-01-22 15:05:14 -0800337 return (svm_msg_q_size (mq) == 0);
Florin Coras3c2fed52018-07-04 04:15:05 -0700338}
339
340/**
341 * Check if message is invalid
342 */
343static inline u8
344svm_msg_q_msg_is_invalid (svm_msg_q_msg_t * msg)
345{
346 return (msg->as_u64 == (u64) ~ 0);
347}
348
349/**
350 * Try locking message queue
351 */
352static inline int
353svm_msg_q_try_lock (svm_msg_q_t * mq)
354{
Florin Coras5398dfb2021-01-25 20:31:27 -0800355 if (mq->q.evtfd == -1)
356 {
357 int rv = pthread_mutex_trylock (&mq->q.shr->mutex);
358 if (PREDICT_FALSE (rv == EOWNERDEAD))
359 rv = pthread_mutex_consistent (&mq->q.shr->mutex);
360 return rv;
361 }
362 else
363 {
364 return !clib_spinlock_trylock (&mq->q.lock);
365 }
Florin Coras3c2fed52018-07-04 04:15:05 -0700366}
367
368/**
369 * Lock, or block trying, the message queue
370 */
371static inline int
372svm_msg_q_lock (svm_msg_q_t * mq)
373{
Florin Coras5398dfb2021-01-25 20:31:27 -0800374 if (mq->q.evtfd == -1)
375 {
376 int rv = pthread_mutex_lock (&mq->q.shr->mutex);
377 if (PREDICT_FALSE (rv == EOWNERDEAD))
378 rv = pthread_mutex_consistent (&mq->q.shr->mutex);
379 return rv;
380 }
381 else
382 {
383 clib_spinlock_lock (&mq->q.lock);
384 return 0;
385 }
Florin Coras3c2fed52018-07-04 04:15:05 -0700386}
387
Florin Coras3c2fed52018-07-04 04:15:05 -0700388/**
389 * Unlock message queue
390 */
391static inline void
392svm_msg_q_unlock (svm_msg_q_t * mq)
393{
Florin Coras5398dfb2021-01-25 20:31:27 -0800394 if (mq->q.evtfd == -1)
395 {
396 pthread_mutex_unlock (&mq->q.shr->mutex);
397 }
398 else
399 {
400 clib_spinlock_unlock (&mq->q.lock);
401 }
Florin Coras3c2fed52018-07-04 04:15:05 -0700402}
403
Florin Coras54693d22018-07-17 10:46:29 -0700404/**
405 * Wait for message queue event
406 *
Florin Coras89c98a42021-03-25 11:24:33 -0700407 * When eventfds are not configured, the shared memory mutex is locked
408 * before waiting on the condvar. Typically called by consumers.
Florin Coras54693d22018-07-17 10:46:29 -0700409 */
Florin Coras5398dfb2021-01-25 20:31:27 -0800410int svm_msg_q_wait (svm_msg_q_t *mq, svm_msg_q_wait_type_t type);
Florin Coras54693d22018-07-17 10:46:29 -0700411
412/**
Florin Coras89c98a42021-03-25 11:24:33 -0700413 * Wait for message queue event as producer
414 *
415 * Similar to @ref svm_msg_q_wait but lock (mutex or spinlock) must
416 * be held. Should only be called by producers.
417 */
418int svm_msg_q_wait_prod (svm_msg_q_t *mq);
419
420/**
Florin Coras54693d22018-07-17 10:46:29 -0700421 * Timed wait for message queue event
422 *
423 * Must be called with mutex held.
424 *
425 * @param mq message queue
426 * @param timeout time in seconds
427 */
Florin Coras86f12322021-01-22 15:05:14 -0800428int svm_msg_q_timedwait (svm_msg_q_t *mq, double timeout);
Florin Coras54693d22018-07-17 10:46:29 -0700429
Florin Coras99368312018-08-02 10:45:44 -0700430static inline int
Florin Coras86f12322021-01-22 15:05:14 -0800431svm_msg_q_get_eventfd (svm_msg_q_t *mq)
Florin Coras99368312018-08-02 10:45:44 -0700432{
Florin Coras86f12322021-01-22 15:05:14 -0800433 return mq->q.evtfd;
Florin Coras54693d22018-07-17 10:46:29 -0700434}
435
Florin Coras65784c12018-07-04 04:17:41 -0700436#endif /* SRC_SVM_MESSAGE_QUEUE_H_ */
437
438/*
439 * fd.io coding-style-patch-verification: ON
440 *
441 * Local Variables:
442 * eval: (c-set-style "gnu")
443 * End:
444 */