blob: 4b314b837e9048c63e5265e74b924e3c42669688 [file] [log] [blame]
Florin Coras65784c12018-07-04 04:17:41 -07001/*
Florin Corasc5df8c72019-04-08 07:42:30 -07002 * Copyright (c) 2018-2019 Cisco and/or its affiliates.
Florin Coras65784c12018-07-04 04:17:41 -07003 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at:
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15/**
16 * @file
17 * @brief Unidirectional shared-memory multi-ring message queue
18 */
19
20#ifndef SRC_SVM_MESSAGE_QUEUE_H_
21#define SRC_SVM_MESSAGE_QUEUE_H_
22
23#include <vppinfra/clib.h>
Florin Coras3c2fed52018-07-04 04:15:05 -070024#include <vppinfra/error.h>
Florin Coras65784c12018-07-04 04:17:41 -070025#include <svm/queue.h>
26
Florin Corasb4624182020-12-11 13:58:12 -080027typedef struct svm_msg_q_ring_shared_
Florin Coras65784c12018-07-04 04:17:41 -070028{
29 volatile u32 cursize; /**< current size of the ring */
30 u32 nitems; /**< max size of the ring */
Florin Coras3c2fed52018-07-04 04:15:05 -070031 volatile u32 head; /**< current head (for dequeue) */
32 volatile u32 tail; /**< current tail (for enqueue) */
Florin Coras65784c12018-07-04 04:17:41 -070033 u32 elsize; /**< size of an element */
Florin Corasb4624182020-12-11 13:58:12 -080034 u8 data[0]; /**< chunk of memory for msg data */
35} svm_msg_q_ring_shared_t;
36
37typedef struct svm_msg_q_ring_
38{
39 u32 nitems; /**< max size of the ring */
40 u32 elsize; /**< size of an element */
41 svm_msg_q_ring_shared_t *shr; /**< ring in shared memory */
Florin Corase91bdb32018-07-11 16:35:38 -070042} __clib_packed svm_msg_q_ring_t;
Florin Coras65784c12018-07-04 04:17:41 -070043
Florin Corasb4624182020-12-11 13:58:12 -080044typedef struct svm_msg_q_shared_
45{
46 u32 n_rings; /**< number of rings after q */
47 u32 pad; /**< 8 byte alignment for q */
48 svm_queue_t q[0]; /**< queue for exchanging messages */
49} __clib_packed svm_msg_q_shared_t;
50
Florin Coras65784c12018-07-04 04:17:41 -070051typedef struct svm_msg_q_
52{
53 svm_queue_t *q; /**< queue for exchanging messages */
54 svm_msg_q_ring_t *rings; /**< rings with message data*/
Florin Corase91bdb32018-07-11 16:35:38 -070055} __clib_packed svm_msg_q_t;
Florin Coras65784c12018-07-04 04:17:41 -070056
57typedef struct svm_msg_q_ring_cfg_
58{
59 u32 nitems;
60 u32 elsize;
61 void *data;
62} svm_msg_q_ring_cfg_t;
63
64typedef struct svm_msg_q_cfg_
65{
66 int consumer_pid; /**< pid of msg consumer */
67 u32 q_nitems; /**< msg queue size (not rings) */
68 u32 n_rings; /**< number of msg rings */
69 svm_msg_q_ring_cfg_t *ring_cfgs; /**< array of ring cfgs */
70} svm_msg_q_cfg_t;
71
72typedef union
73{
74 struct
75 {
76 u32 ring_index; /**< ring index, could be u8 */
77 u32 elt_index; /**< index in ring */
78 };
79 u64 as_u64;
80} svm_msg_q_msg_t;
81
Florin Coras3c2fed52018-07-04 04:15:05 -070082#define SVM_MQ_INVALID_MSG { .as_u64 = ~0 }
Florin Coras65784c12018-07-04 04:17:41 -070083/**
84 * Allocate message queue
85 *
86 * Allocates a message queue on the heap. Based on the configuration options,
87 * apart from the message queue this also allocates (one or multiple)
88 * shared-memory rings for the messages.
89 *
90 * @param cfg configuration options: queue len, consumer pid,
91 * ring configs
92 * @return message queue
93 */
Florin Corasb4624182020-12-11 13:58:12 -080094svm_msg_q_shared_t *svm_msg_q_alloc (svm_msg_q_cfg_t *cfg);
95svm_msg_q_shared_t *svm_msg_q_init (void *base, svm_msg_q_cfg_t *cfg);
Florin Coras213b1bb2020-12-07 14:33:58 -080096uword svm_msg_q_size_to_alloc (svm_msg_q_cfg_t *cfg);
Florin Coras65784c12018-07-04 04:17:41 -070097
Florin Corasb4624182020-12-11 13:58:12 -080098void svm_msg_q_attach (svm_msg_q_t *mq, void *smq_base);
99
Florin Coras65784c12018-07-04 04:17:41 -0700100/**
101 * Free message queue
102 *
103 * @param mq message queue to be freed
104 */
105void svm_msg_q_free (svm_msg_q_t * mq);
106
107/**
108 * Allocate message buffer
109 *
110 * Message is allocated on the first available ring capable of holding
111 * the requested number of bytes.
112 *
113 * @param mq message queue
114 * @param nbytes number of bytes needed for message
115 * @return message structure pointing to the ring and position
116 * allocated
117 */
118svm_msg_q_msg_t svm_msg_q_alloc_msg (svm_msg_q_t * mq, u32 nbytes);
119
120/**
Florin Coras3c2fed52018-07-04 04:15:05 -0700121 * Allocate message buffer on ring
122 *
123 * Message is allocated, on requested ring. The caller MUST check that
124 * the ring is not full.
125 *
126 * @param mq message queue
127 * @param ring_index ring on which the allocation should occur
128 * @return message structure pointing to the ring and position
129 * allocated
130 */
131svm_msg_q_msg_t svm_msg_q_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index);
132
133/**
134 * Lock message queue and allocate message buffer on ring
135 *
136 * This should be used when multiple writers/readers are expected to
137 * compete for the rings/queue. Message should be enqueued by calling
138 * @ref svm_msg_q_add_w_lock and the caller MUST unlock the queue once
139 * the message in enqueued.
140 *
141 * @param mq message queue
142 * @param ring_index ring on which the allocation should occur
143 * @param noblock flag that indicates if request should block
144 * @param msg pointer to message to be filled in
145 * @return 0 on success, negative number otherwise
146 */
147int svm_msg_q_lock_and_alloc_msg_w_ring (svm_msg_q_t * mq, u32 ring_index,
148 u8 noblock, svm_msg_q_msg_t * msg);
149
150/**
Florin Coras65784c12018-07-04 04:17:41 -0700151 * Free message buffer
152 *
153 * Marks message buffer on ring as free.
154 *
155 * @param mq message queue
156 * @param msg message to be freed
157 */
158void svm_msg_q_free_msg (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
Florin Coras3c2fed52018-07-04 04:15:05 -0700159
Florin Coras65784c12018-07-04 04:17:41 -0700160/**
161 * Producer enqueue one message to queue
162 *
163 * Prior to calling this, the producer should've obtained a message buffer
164 * from one of the rings by calling @ref svm_msg_q_alloc_msg.
165 *
166 * @param mq message queue
167 * @param msg message (pointer to ring position) to be enqueued
168 * @param nowait flag to indicate if request is blocking or not
169 * @return success status
170 */
Florin Coras3c2fed52018-07-04 04:15:05 -0700171int svm_msg_q_add (svm_msg_q_t * mq, svm_msg_q_msg_t * msg, int nowait);
172
173/**
174 * Producer enqueue one message to queue with mutex held
175 *
176 * Prior to calling this, the producer should've obtained a message buffer
177 * from one of the rings by calling @ref svm_msg_q_alloc_msg. It assumes
178 * the queue mutex is held.
179 *
180 * @param mq message queue
181 * @param msg message (pointer to ring position) to be enqueued
182 * @return success status
183 */
Florin Coras52207f12018-07-12 14:48:06 -0700184void svm_msg_q_add_and_unlock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
Florin Coras65784c12018-07-04 04:17:41 -0700185
186/**
187 * Consumer dequeue one message from queue
188 *
189 * This returns the message pointing to the data in the message rings.
190 * The consumer is expected to call @ref svm_msg_q_free_msg once it
191 * finishes processing/copies the message data.
192 *
193 * @param mq message queue
194 * @param msg pointer to structure where message is to be received
195 * @param cond flag that indicates if request should block or not
Florin Coras3c2fed52018-07-04 04:15:05 -0700196 * @param time time to wait if condition it SVM_Q_TIMEDWAIT
Florin Coras65784c12018-07-04 04:17:41 -0700197 * @return success status
198 */
199int svm_msg_q_sub (svm_msg_q_t * mq, svm_msg_q_msg_t * msg,
200 svm_q_conditional_wait_t cond, u32 time);
201
202/**
Florin Coras3c2fed52018-07-04 04:15:05 -0700203 * Consumer dequeue one message from queue with mutex held
204 *
205 * Returns the message pointing to the data in the message rings under the
206 * assumption that the message queue lock is already held. The consumer is
207 * expected to call @ref svm_msg_q_free_msg once it finishes
208 * processing/copies the message data.
209 *
210 * @param mq message queue
211 * @param msg pointer to structure where message is to be received
212 * @return success status
213 */
214void svm_msg_q_sub_w_lock (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
215
216/**
217 * Get data for message in queue
Florin Coras65784c12018-07-04 04:17:41 -0700218 *
219 * @param mq message queue
220 * @param msg message for which the data is requested
221 * @return pointer to data
222 */
223void *svm_msg_q_msg_data (svm_msg_q_t * mq, svm_msg_q_msg_t * msg);
224
Florin Coras3c2fed52018-07-04 04:15:05 -0700225/**
226 * Get message queue ring
227 *
228 * @param mq message queue
229 * @param ring_index index of ring
230 * @return pointer to ring
231 */
232svm_msg_q_ring_t *svm_msg_q_ring (svm_msg_q_t * mq, u32 ring_index);
233
234/**
Florin Coras99368312018-08-02 10:45:44 -0700235 * Set event fd for queue consumer
236 *
237 * If set, queue will exclusively use eventfds for signaling. Moreover,
238 * afterwards, the queue should only be used in non-blocking mode. Waiting
239 * for events should be done externally using something like epoll.
240 *
241 * @param mq message queue
242 * @param fd consumer eventfd
243 */
244void svm_msg_q_set_consumer_eventfd (svm_msg_q_t * mq, int fd);
245
246/**
247 * Set event fd for queue producer
248 *
249 * If set, queue will exclusively use eventfds for signaling. Moreover,
250 * afterwards, the queue should only be used in non-blocking mode. Waiting
251 * for events should be done externally using something like epoll.
252 *
253 * @param mq message queue
254 * @param fd producer eventfd
255 */
256void svm_msg_q_set_producer_eventfd (svm_msg_q_t * mq, int fd);
257
258/**
259 * Allocate event fd for queue consumer
260 */
261int svm_msg_q_alloc_consumer_eventfd (svm_msg_q_t * mq);
262
263/**
264 * Allocate event fd for queue consumer
265 */
266int svm_msg_q_alloc_producer_eventfd (svm_msg_q_t * mq);
267
Nathan Skrzypczakcfdb1092019-12-02 16:44:42 +0100268
269/**
270 * Format message queue, shows msg count for each ring
271 */
272u8 *format_svm_msg_q (u8 * s, va_list * args);
273
Florin Coras99368312018-08-02 10:45:44 -0700274/**
Florin Coras3c2fed52018-07-04 04:15:05 -0700275 * Check if message queue is full
276 */
277static inline u8
278svm_msg_q_is_full (svm_msg_q_t * mq)
279{
280 return (mq->q->cursize == mq->q->maxsize);
281}
282
283static inline u8
284svm_msg_q_ring_is_full (svm_msg_q_t * mq, u32 ring_index)
285{
Florin Corasb4624182020-12-11 13:58:12 -0800286 svm_msg_q_ring_t *ring = vec_elt_at_index (mq->rings, ring_index);
287 return (ring->shr->cursize >= ring->nitems);
Florin Coras3c2fed52018-07-04 04:15:05 -0700288}
289
290/**
291 * Check if message queue is empty
292 */
293static inline u8
294svm_msg_q_is_empty (svm_msg_q_t * mq)
295{
296 return (mq->q->cursize == 0);
297}
298
299/**
300 * Check length of message queue
301 */
302static inline u32
303svm_msg_q_size (svm_msg_q_t * mq)
304{
305 return mq->q->cursize;
306}
307
308/**
309 * Check if message is invalid
310 */
311static inline u8
312svm_msg_q_msg_is_invalid (svm_msg_q_msg_t * msg)
313{
314 return (msg->as_u64 == (u64) ~ 0);
315}
316
317/**
318 * Try locking message queue
319 */
320static inline int
321svm_msg_q_try_lock (svm_msg_q_t * mq)
322{
Florin Coras1d56ac42020-12-02 07:57:00 -0800323 int rv = pthread_mutex_trylock (&mq->q->mutex);
324 if (PREDICT_FALSE (rv == EOWNERDEAD))
325 rv = pthread_mutex_consistent (&mq->q->mutex);
326 return rv;
Florin Coras3c2fed52018-07-04 04:15:05 -0700327}
328
329/**
330 * Lock, or block trying, the message queue
331 */
332static inline int
333svm_msg_q_lock (svm_msg_q_t * mq)
334{
Florin Coras2440a8a2020-11-27 09:49:10 -0800335 int rv = pthread_mutex_lock (&mq->q->mutex);
336 if (PREDICT_FALSE (rv == EOWNERDEAD))
337 rv = pthread_mutex_consistent (&mq->q->mutex);
338 return rv;
Florin Coras3c2fed52018-07-04 04:15:05 -0700339}
340
Florin Coras3c2fed52018-07-04 04:15:05 -0700341/**
342 * Unlock message queue
343 */
344static inline void
345svm_msg_q_unlock (svm_msg_q_t * mq)
346{
Florin Coras3c2fed52018-07-04 04:15:05 -0700347 pthread_mutex_unlock (&mq->q->mutex);
348}
349
Florin Coras54693d22018-07-17 10:46:29 -0700350/**
351 * Wait for message queue event
352 *
Florin Coras99368312018-08-02 10:45:44 -0700353 * Must be called with mutex held. The queue only works non-blocking
354 * with eventfds, so handle blocking calls as an exception here.
Florin Coras54693d22018-07-17 10:46:29 -0700355 */
356static inline void
357svm_msg_q_wait (svm_msg_q_t * mq)
358{
Florin Coras99368312018-08-02 10:45:44 -0700359 svm_queue_wait (mq->q);
Florin Coras54693d22018-07-17 10:46:29 -0700360}
361
362/**
363 * Timed wait for message queue event
364 *
365 * Must be called with mutex held.
366 *
367 * @param mq message queue
368 * @param timeout time in seconds
369 */
370static inline int
371svm_msg_q_timedwait (svm_msg_q_t * mq, double timeout)
372{
Florin Coras99368312018-08-02 10:45:44 -0700373 return svm_queue_timedwait (mq->q, timeout);
374}
Florin Coras54693d22018-07-17 10:46:29 -0700375
Florin Coras99368312018-08-02 10:45:44 -0700376static inline int
377svm_msg_q_get_consumer_eventfd (svm_msg_q_t * mq)
378{
379 return mq->q->consumer_evtfd;
380}
381
382static inline int
383svm_msg_q_get_producer_eventfd (svm_msg_q_t * mq)
384{
385 return mq->q->producer_evtfd;
Florin Coras54693d22018-07-17 10:46:29 -0700386}
387
Florin Coras65784c12018-07-04 04:17:41 -0700388#endif /* SRC_SVM_MESSAGE_QUEUE_H_ */
389
390/*
391 * fd.io coding-style-patch-verification: ON
392 *
393 * Local Variables:
394 * eval: (c-set-style "gnu")
395 * End:
396 */