blob: 771033d7d8ac110eca1df68d7db09d044bbfba58 [file] [log] [blame]
Dave Barach371e4e12016-07-08 09:38:52 -04001/*
Ed Warnickecb9cada2015-12-08 15:45:58 -07002 *------------------------------------------------------------------
Florin Corase86a8ed2018-01-05 03:20:25 -08003 * svm_queue.c - unidirectional shared-memory queues
Ed Warnickecb9cada2015-12-08 15:45:58 -07004 *
5 * Copyright (c) 2009 Cisco and/or its affiliates.
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at:
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *------------------------------------------------------------------
18 */
19
Florin Corase86a8ed2018-01-05 03:20:25 -080020
Ed Warnickecb9cada2015-12-08 15:45:58 -070021#include <stdio.h>
22#include <stdlib.h>
23#include <string.h>
24#include <pthread.h>
25#include <vppinfra/mem.h>
26#include <vppinfra/format.h>
27#include <vppinfra/cache.h>
Florin Corase86a8ed2018-01-05 03:20:25 -080028#include <svm/queue.h>
Mohsin Kazmi3fca5672018-01-04 18:57:26 +010029#include <vppinfra/time.h>
Florin Coras99368312018-08-02 10:45:44 -070030#include <vppinfra/lock.h>
Ed Warnickecb9cada2015-12-08 15:45:58 -070031
Florin Corase86a8ed2018-01-05 03:20:25 -080032svm_queue_t *
Florin Corasc470e222018-08-01 07:53:18 -070033svm_queue_init (void *base, int nels, int elsize)
Ed Warnickecb9cada2015-12-08 15:45:58 -070034{
Florin Corase86a8ed2018-01-05 03:20:25 -080035 svm_queue_t *q;
Dave Barach371e4e12016-07-08 09:38:52 -040036 pthread_mutexattr_t attr;
37 pthread_condattr_t cattr;
Ed Warnickecb9cada2015-12-08 15:45:58 -070038
Florin Corasc470e222018-08-01 07:53:18 -070039 q = (svm_queue_t *) base;
Dave Barach371e4e12016-07-08 09:38:52 -040040 memset (q, 0, sizeof (*q));
Ed Warnickecb9cada2015-12-08 15:45:58 -070041
Dave Barach371e4e12016-07-08 09:38:52 -040042 q->elsize = elsize;
43 q->maxsize = nels;
Florin Corasc470e222018-08-01 07:53:18 -070044 q->producer_evtfd = -1;
45 q->consumer_evtfd = -1;
Ed Warnickecb9cada2015-12-08 15:45:58 -070046
Dave Barach371e4e12016-07-08 09:38:52 -040047 memset (&attr, 0, sizeof (attr));
Dave Barach68b0fb02017-02-28 15:15:56 -050048 memset (&cattr, 0, sizeof (cattr));
Ed Warnickecb9cada2015-12-08 15:45:58 -070049
Dave Barach371e4e12016-07-08 09:38:52 -040050 if (pthread_mutexattr_init (&attr))
51 clib_unix_warning ("mutexattr_init");
52 if (pthread_mutexattr_setpshared (&attr, PTHREAD_PROCESS_SHARED))
53 clib_unix_warning ("pthread_mutexattr_setpshared");
54 if (pthread_mutex_init (&q->mutex, &attr))
55 clib_unix_warning ("mutex_init");
56 if (pthread_mutexattr_destroy (&attr))
57 clib_unix_warning ("mutexattr_destroy");
58 if (pthread_condattr_init (&cattr))
59 clib_unix_warning ("condattr_init");
60 /* prints funny-looking messages in the Linux target */
61 if (pthread_condattr_setpshared (&cattr, PTHREAD_PROCESS_SHARED))
62 clib_unix_warning ("condattr_setpshared");
63 if (pthread_cond_init (&q->condvar, &cattr))
64 clib_unix_warning ("cond_init1");
65 if (pthread_condattr_destroy (&cattr))
66 clib_unix_warning ("cond_init2");
67
68 return (q);
Ed Warnickecb9cada2015-12-08 15:45:58 -070069}
70
Florin Corasc470e222018-08-01 07:53:18 -070071svm_queue_t *
72svm_queue_alloc_and_init (int nels, int elsize, int consumer_pid)
73{
74 svm_queue_t *q;
75
76 q = clib_mem_alloc_aligned (sizeof (svm_queue_t)
77 + nels * elsize, CLIB_CACHE_LINE_BYTES);
78 memset (q, 0, sizeof (*q));
79 q = svm_queue_init (q, nels, elsize);
80 q->consumer_pid = consumer_pid;
81
82 return q;
83}
84
Ed Warnickecb9cada2015-12-08 15:45:58 -070085/*
Florin Corase86a8ed2018-01-05 03:20:25 -080086 * svm_queue_free
Ed Warnickecb9cada2015-12-08 15:45:58 -070087 */
Dave Barach371e4e12016-07-08 09:38:52 -040088void
Florin Corase86a8ed2018-01-05 03:20:25 -080089svm_queue_free (svm_queue_t * q)
Ed Warnickecb9cada2015-12-08 15:45:58 -070090{
Dave Barach371e4e12016-07-08 09:38:52 -040091 (void) pthread_mutex_destroy (&q->mutex);
92 (void) pthread_cond_destroy (&q->condvar);
93 clib_mem_free (q);
Ed Warnickecb9cada2015-12-08 15:45:58 -070094}
95
Dave Barach371e4e12016-07-08 09:38:52 -040096void
Florin Corase86a8ed2018-01-05 03:20:25 -080097svm_queue_lock (svm_queue_t * q)
Ed Warnickecb9cada2015-12-08 15:45:58 -070098{
Dave Barach371e4e12016-07-08 09:38:52 -040099 pthread_mutex_lock (&q->mutex);
Ed Warnickecb9cada2015-12-08 15:45:58 -0700100}
101
Dave Barach371e4e12016-07-08 09:38:52 -0400102void
Florin Corase86a8ed2018-01-05 03:20:25 -0800103svm_queue_unlock (svm_queue_t * q)
Ed Warnickecb9cada2015-12-08 15:45:58 -0700104{
Dave Barach371e4e12016-07-08 09:38:52 -0400105 pthread_mutex_unlock (&q->mutex);
Ed Warnickecb9cada2015-12-08 15:45:58 -0700106}
107
Dave Barach371e4e12016-07-08 09:38:52 -0400108int
Florin Corase86a8ed2018-01-05 03:20:25 -0800109svm_queue_is_full (svm_queue_t * q)
Ed Warnickecb9cada2015-12-08 15:45:58 -0700110{
Dave Barach371e4e12016-07-08 09:38:52 -0400111 return q->cursize == q->maxsize;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700112}
113
Florin Corasc470e222018-08-01 07:53:18 -0700114static inline void
115svm_queue_send_signal (svm_queue_t * q, u8 is_prod)
116{
117 if (q->producer_evtfd == -1)
118 {
119 (void) pthread_cond_broadcast (&q->condvar);
120 }
121 else
122 {
123 int __clib_unused rv, fd;
124 u64 data = 1;
125 ASSERT (q->consumer_evtfd != -1);
126 fd = is_prod ? q->producer_evtfd : q->consumer_evtfd;
127 rv = write (fd, &data, sizeof (data));
128 }
129}
130
Florin Coras99368312018-08-02 10:45:44 -0700131static inline void
132svm_queue_wait_inline (svm_queue_t * q)
133{
134 if (q->producer_evtfd == -1)
135 {
136 pthread_cond_wait (&q->condvar, &q->mutex);
137 }
138 else
139 {
140 /* Fake a wait for event. We could use epoll but that would mean
141 * using yet another fd. Should do for now */
142 u32 cursize = q->cursize;
143 pthread_mutex_unlock (&q->mutex);
144 while (q->cursize == cursize)
145 CLIB_PAUSE ();
146 pthread_mutex_lock (&q->mutex);
147 }
148}
149
150void
151svm_queue_wait (svm_queue_t * q)
152{
153 svm_queue_wait_inline (q);
154}
155
156static inline int
157svm_queue_timedwait_inline (svm_queue_t * q, double timeout)
158{
159 struct timespec ts;
160 ts.tv_sec = unix_time_now () + (u32) timeout;
161 ts.tv_nsec = (timeout - (u32) timeout) * 1e9;
162
163 if (q->producer_evtfd == -1)
164 {
165 return pthread_cond_timedwait (&q->condvar, &q->mutex, &ts);
166 }
167 else
168 {
169 double max_time = unix_time_now () + timeout;
170 u32 cursize = q->cursize;
171 int rv;
172
173 pthread_mutex_unlock (&q->mutex);
174 while (q->cursize == cursize && unix_time_now () < max_time)
175 CLIB_PAUSE ();
176 rv = unix_time_now () < max_time ? 0 : ETIMEDOUT;
177 pthread_mutex_lock (&q->mutex);
178 return rv;
179 }
180}
181
182int
183svm_queue_timedwait (svm_queue_t * q, double timeout)
184{
185 return svm_queue_timedwait_inline (q, timeout);
186}
187
Ed Warnickecb9cada2015-12-08 15:45:58 -0700188/*
Florin Corase86a8ed2018-01-05 03:20:25 -0800189 * svm_queue_add_nolock
Ed Warnickecb9cada2015-12-08 15:45:58 -0700190 */
Dave Barach371e4e12016-07-08 09:38:52 -0400191int
Florin Corase86a8ed2018-01-05 03:20:25 -0800192svm_queue_add_nolock (svm_queue_t * q, u8 * elem)
Ed Warnickecb9cada2015-12-08 15:45:58 -0700193{
Dave Barach371e4e12016-07-08 09:38:52 -0400194 i8 *tailp;
195 int need_broadcast = 0;
196
197 if (PREDICT_FALSE (q->cursize == q->maxsize))
198 {
199 while (q->cursize == q->maxsize)
Florin Coras99368312018-08-02 10:45:44 -0700200 svm_queue_wait_inline (q);
Ed Warnickecb9cada2015-12-08 15:45:58 -0700201 }
Ed Warnickecb9cada2015-12-08 15:45:58 -0700202
Dave Barach371e4e12016-07-08 09:38:52 -0400203 tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
204 clib_memcpy (tailp, elem, q->elsize);
Ed Warnickecb9cada2015-12-08 15:45:58 -0700205
Dave Barach371e4e12016-07-08 09:38:52 -0400206 q->tail++;
207 q->cursize++;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700208
Dave Barach371e4e12016-07-08 09:38:52 -0400209 need_broadcast = (q->cursize == 1);
Ed Warnickecb9cada2015-12-08 15:45:58 -0700210
Dave Barach371e4e12016-07-08 09:38:52 -0400211 if (q->tail == q->maxsize)
212 q->tail = 0;
213
214 if (need_broadcast)
Florin Corasc470e222018-08-01 07:53:18 -0700215 svm_queue_send_signal (q, 1);
Dave Barach371e4e12016-07-08 09:38:52 -0400216 return 0;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700217}
218
Florin Coras3c2fed52018-07-04 04:15:05 -0700219void
Florin Corase86a8ed2018-01-05 03:20:25 -0800220svm_queue_add_raw (svm_queue_t * q, u8 * elem)
Ed Warnickecb9cada2015-12-08 15:45:58 -0700221{
Dave Barach371e4e12016-07-08 09:38:52 -0400222 i8 *tailp;
223
Dave Barach371e4e12016-07-08 09:38:52 -0400224 tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
225 clib_memcpy (tailp, elem, q->elsize);
Ed Warnickecb9cada2015-12-08 15:45:58 -0700226
Florin Coras3c2fed52018-07-04 04:15:05 -0700227 q->tail = (q->tail + 1) % q->maxsize;
Dave Barach371e4e12016-07-08 09:38:52 -0400228 q->cursize++;
Florin Coras99368312018-08-02 10:45:44 -0700229
230 if (q->cursize == 1)
231 svm_queue_send_signal (q, 1);
Ed Warnickecb9cada2015-12-08 15:45:58 -0700232}
233
234
235/*
Florin Corase86a8ed2018-01-05 03:20:25 -0800236 * svm_queue_add
Ed Warnickecb9cada2015-12-08 15:45:58 -0700237 */
Dave Barach371e4e12016-07-08 09:38:52 -0400238int
Florin Corase86a8ed2018-01-05 03:20:25 -0800239svm_queue_add (svm_queue_t * q, u8 * elem, int nowait)
Ed Warnickecb9cada2015-12-08 15:45:58 -0700240{
Dave Barach371e4e12016-07-08 09:38:52 -0400241 i8 *tailp;
242 int need_broadcast = 0;
243
244 if (nowait)
245 {
246 /* zero on success */
247 if (pthread_mutex_trylock (&q->mutex))
248 {
249 return (-1);
250 }
Ed Warnickecb9cada2015-12-08 15:45:58 -0700251 }
Dave Barach371e4e12016-07-08 09:38:52 -0400252 else
253 pthread_mutex_lock (&q->mutex);
Ed Warnickecb9cada2015-12-08 15:45:58 -0700254
Dave Barach371e4e12016-07-08 09:38:52 -0400255 if (PREDICT_FALSE (q->cursize == q->maxsize))
256 {
257 if (nowait)
258 {
259 pthread_mutex_unlock (&q->mutex);
260 return (-2);
261 }
262 while (q->cursize == q->maxsize)
Florin Coras99368312018-08-02 10:45:44 -0700263 svm_queue_wait_inline (q);
Ed Warnickecb9cada2015-12-08 15:45:58 -0700264 }
Ed Warnickecb9cada2015-12-08 15:45:58 -0700265
Dave Barach371e4e12016-07-08 09:38:52 -0400266 tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
267 clib_memcpy (tailp, elem, q->elsize);
268
269 q->tail++;
270 q->cursize++;
271
272 need_broadcast = (q->cursize == 1);
273
274 if (q->tail == q->maxsize)
275 q->tail = 0;
276
277 if (need_broadcast)
Florin Corasc470e222018-08-01 07:53:18 -0700278 svm_queue_send_signal (q, 1);
279
Dave Barach371e4e12016-07-08 09:38:52 -0400280 pthread_mutex_unlock (&q->mutex);
281
282 return 0;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700283}
284
285/*
Florin Corase86a8ed2018-01-05 03:20:25 -0800286 * svm_queue_add2
Klement Sekera8f2a4ea2017-05-04 06:15:18 +0200287 */
288int
Florin Corase86a8ed2018-01-05 03:20:25 -0800289svm_queue_add2 (svm_queue_t * q, u8 * elem, u8 * elem2, int nowait)
Klement Sekera8f2a4ea2017-05-04 06:15:18 +0200290{
291 i8 *tailp;
292 int need_broadcast = 0;
293
294 if (nowait)
295 {
296 /* zero on success */
297 if (pthread_mutex_trylock (&q->mutex))
298 {
299 return (-1);
300 }
301 }
302 else
303 pthread_mutex_lock (&q->mutex);
304
305 if (PREDICT_FALSE (q->cursize + 1 == q->maxsize))
306 {
307 if (nowait)
308 {
309 pthread_mutex_unlock (&q->mutex);
310 return (-2);
311 }
312 while (q->cursize + 1 == q->maxsize)
Florin Coras99368312018-08-02 10:45:44 -0700313 svm_queue_wait_inline (q);
Klement Sekera8f2a4ea2017-05-04 06:15:18 +0200314 }
315
316 tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
317 clib_memcpy (tailp, elem, q->elsize);
318
319 q->tail++;
320 q->cursize++;
321
322 if (q->tail == q->maxsize)
323 q->tail = 0;
324
325 need_broadcast = (q->cursize == 1);
326
327 tailp = (i8 *) (&q->data[0] + q->elsize * q->tail);
328 clib_memcpy (tailp, elem2, q->elsize);
329
330 q->tail++;
331 q->cursize++;
332
333 if (q->tail == q->maxsize)
334 q->tail = 0;
335
336 if (need_broadcast)
Florin Corasc470e222018-08-01 07:53:18 -0700337 svm_queue_send_signal (q, 1);
338
Klement Sekera8f2a4ea2017-05-04 06:15:18 +0200339 pthread_mutex_unlock (&q->mutex);
340
341 return 0;
342}
343
344/*
Florin Corase86a8ed2018-01-05 03:20:25 -0800345 * svm_queue_sub
Ed Warnickecb9cada2015-12-08 15:45:58 -0700346 */
Dave Barach371e4e12016-07-08 09:38:52 -0400347int
Mohsin Kazmi3fca5672018-01-04 18:57:26 +0100348svm_queue_sub (svm_queue_t * q, u8 * elem, svm_q_conditional_wait_t cond,
349 u32 time)
Ed Warnickecb9cada2015-12-08 15:45:58 -0700350{
Dave Barach371e4e12016-07-08 09:38:52 -0400351 i8 *headp;
352 int need_broadcast = 0;
Mohsin Kazmi3fca5672018-01-04 18:57:26 +0100353 int rc = 0;
Dave Barach371e4e12016-07-08 09:38:52 -0400354
Mohsin Kazmi3fca5672018-01-04 18:57:26 +0100355 if (cond == SVM_Q_NOWAIT)
Dave Barach371e4e12016-07-08 09:38:52 -0400356 {
357 /* zero on success */
358 if (pthread_mutex_trylock (&q->mutex))
359 {
360 return (-1);
361 }
Ed Warnickecb9cada2015-12-08 15:45:58 -0700362 }
Dave Barach371e4e12016-07-08 09:38:52 -0400363 else
364 pthread_mutex_lock (&q->mutex);
Ed Warnickecb9cada2015-12-08 15:45:58 -0700365
Dave Barach371e4e12016-07-08 09:38:52 -0400366 if (PREDICT_FALSE (q->cursize == 0))
367 {
Mohsin Kazmi3fca5672018-01-04 18:57:26 +0100368 if (cond == SVM_Q_NOWAIT)
Dave Barach371e4e12016-07-08 09:38:52 -0400369 {
370 pthread_mutex_unlock (&q->mutex);
371 return (-2);
372 }
Mohsin Kazmi3fca5672018-01-04 18:57:26 +0100373 else if (cond == SVM_Q_TIMEDWAIT)
Dave Barach371e4e12016-07-08 09:38:52 -0400374 {
Mohsin Kazmi3fca5672018-01-04 18:57:26 +0100375 while (q->cursize == 0 && rc == 0)
Florin Coras99368312018-08-02 10:45:44 -0700376 rc = svm_queue_timedwait_inline (q, time);
377
Mohsin Kazmi3fca5672018-01-04 18:57:26 +0100378 if (rc == ETIMEDOUT)
379 {
380 pthread_mutex_unlock (&q->mutex);
381 return ETIMEDOUT;
382 }
383 }
384 else
385 {
386 while (q->cursize == 0)
Florin Coras99368312018-08-02 10:45:44 -0700387 svm_queue_wait_inline (q);
Dave Barach371e4e12016-07-08 09:38:52 -0400388 }
389 }
Ed Warnickecb9cada2015-12-08 15:45:58 -0700390
Dave Barach371e4e12016-07-08 09:38:52 -0400391 headp = (i8 *) (&q->data[0] + q->elsize * q->head);
392 clib_memcpy (elem, headp, q->elsize);
Ed Warnickecb9cada2015-12-08 15:45:58 -0700393
Dave Barach371e4e12016-07-08 09:38:52 -0400394 q->head++;
Dave Barach68b0fb02017-02-28 15:15:56 -0500395 /* $$$$ JFC shouldn't this be == 0? */
Dave Barach371e4e12016-07-08 09:38:52 -0400396 if (q->cursize == q->maxsize)
397 need_broadcast = 1;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700398
Dave Barach371e4e12016-07-08 09:38:52 -0400399 q->cursize--;
400
401 if (q->head == q->maxsize)
402 q->head = 0;
403
404 if (need_broadcast)
Florin Corasc470e222018-08-01 07:53:18 -0700405 svm_queue_send_signal (q, 0);
Dave Barach371e4e12016-07-08 09:38:52 -0400406
407 pthread_mutex_unlock (&q->mutex);
408
409 return 0;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700410}
411
Dave Barach371e4e12016-07-08 09:38:52 -0400412int
Florin Corase86a8ed2018-01-05 03:20:25 -0800413svm_queue_sub2 (svm_queue_t * q, u8 * elem)
414{
415 int need_broadcast;
416 i8 *headp;
417
418 pthread_mutex_lock (&q->mutex);
419 if (q->cursize == 0)
420 {
421 pthread_mutex_unlock (&q->mutex);
422 return -1;
423 }
424
425 headp = (i8 *) (&q->data[0] + q->elsize * q->head);
426 clib_memcpy (elem, headp, q->elsize);
427
428 q->head++;
429 need_broadcast = (q->cursize == q->maxsize / 2);
430 q->cursize--;
431
432 if (PREDICT_FALSE (q->head == q->maxsize))
433 q->head = 0;
434 pthread_mutex_unlock (&q->mutex);
435
436 if (need_broadcast)
Florin Corasc470e222018-08-01 07:53:18 -0700437 svm_queue_send_signal (q, 0);
Florin Corase86a8ed2018-01-05 03:20:25 -0800438
439 return 0;
440}
441
442int
443svm_queue_sub_raw (svm_queue_t * q, u8 * elem)
Ed Warnickecb9cada2015-12-08 15:45:58 -0700444{
Dave Barach371e4e12016-07-08 09:38:52 -0400445 i8 *headp;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700446
Dave Barach371e4e12016-07-08 09:38:52 -0400447 if (PREDICT_FALSE (q->cursize == 0))
448 {
449 while (q->cursize == 0)
450 ;
451 }
452
453 headp = (i8 *) (&q->data[0] + q->elsize * q->head);
454 clib_memcpy (elem, headp, q->elsize);
455
Florin Coras3c2fed52018-07-04 04:15:05 -0700456 q->head = (q->head + 1) % q->maxsize;
Dave Barach371e4e12016-07-08 09:38:52 -0400457 q->cursize--;
458
Dave Barach371e4e12016-07-08 09:38:52 -0400459 return 0;
Ed Warnickecb9cada2015-12-08 15:45:58 -0700460}
Dave Barach371e4e12016-07-08 09:38:52 -0400461
Florin Corasc470e222018-08-01 07:53:18 -0700462void
463svm_queue_set_producer_event_fd (svm_queue_t * q, int fd)
464{
465 q->producer_evtfd = fd;
466}
467
468void
469svm_queue_set_consumer_event_fd (svm_queue_t * q, int fd)
470{
471 q->consumer_evtfd = fd;
472}
473
Dave Barach371e4e12016-07-08 09:38:52 -0400474/*
475 * fd.io coding-style-patch-verification: ON
476 *
477 * Local Variables:
478 * eval: (c-set-style "gnu")
479 * End:
480 */