| /* |
| *------------------------------------------------------------------ |
| * svm_queue.c - unidirectional shared-memory queues |
| * |
| * Copyright (c) 2009 Cisco and/or its affiliates. |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at: |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| *------------------------------------------------------------------ |
| */ |
| |
| |
| #include <stdio.h> |
| #include <stdlib.h> |
| #include <string.h> |
| #include <pthread.h> |
| #include <vppinfra/mem.h> |
| #include <vppinfra/format.h> |
| #include <vppinfra/cache.h> |
| #include <svm/queue.h> |
| #include <vppinfra/time.h> |
| #include <vppinfra/lock.h> |
| |
| svm_queue_t * |
| svm_queue_init (void *base, int nels, int elsize) |
| { |
| svm_queue_t *q; |
| pthread_mutexattr_t attr; |
| pthread_condattr_t cattr; |
| |
| q = (svm_queue_t *) base; |
| clib_memset (q, 0, sizeof (*q)); |
| |
| q->elsize = elsize; |
| q->maxsize = nels; |
| q->producer_evtfd = -1; |
| q->consumer_evtfd = -1; |
| |
| clib_memset (&attr, 0, sizeof (attr)); |
| clib_memset (&cattr, 0, sizeof (cattr)); |
| |
| if (pthread_mutexattr_init (&attr)) |
| clib_unix_warning ("mutexattr_init"); |
| if (pthread_mutexattr_setpshared (&attr, PTHREAD_PROCESS_SHARED)) |
| clib_unix_warning ("pthread_mutexattr_setpshared"); |
| if (pthread_mutex_init (&q->mutex, &attr)) |
| clib_unix_warning ("mutex_init"); |
| if (pthread_mutexattr_destroy (&attr)) |
| clib_unix_warning ("mutexattr_destroy"); |
| if (pthread_condattr_init (&cattr)) |
| clib_unix_warning ("condattr_init"); |
| /* prints funny-looking messages in the Linux target */ |
| if (pthread_condattr_setpshared (&cattr, PTHREAD_PROCESS_SHARED)) |
| clib_unix_warning ("condattr_setpshared"); |
| if (pthread_cond_init (&q->condvar, &cattr)) |
| clib_unix_warning ("cond_init1"); |
| if (pthread_condattr_destroy (&cattr)) |
| clib_unix_warning ("cond_init2"); |
| |
| return (q); |
| } |
| |
| svm_queue_t * |
| svm_queue_alloc_and_init (int nels, int elsize, int consumer_pid) |
| { |
| svm_queue_t *q; |
| |
| q = clib_mem_alloc_aligned (sizeof (svm_queue_t) |
| + nels * elsize, CLIB_CACHE_LINE_BYTES); |
| clib_memset (q, 0, sizeof (*q)); |
| q = svm_queue_init (q, nels, elsize); |
| q->consumer_pid = consumer_pid; |
| |
| return q; |
| } |
| |
| /* |
| * svm_queue_free |
| */ |
| void |
| svm_queue_free (svm_queue_t * q) |
| { |
| (void) pthread_mutex_destroy (&q->mutex); |
| (void) pthread_cond_destroy (&q->condvar); |
| clib_mem_free (q); |
| } |
| |
| void |
| svm_queue_lock (svm_queue_t * q) |
| { |
| pthread_mutex_lock (&q->mutex); |
| } |
| |
| void |
| svm_queue_unlock (svm_queue_t * q) |
| { |
| pthread_mutex_unlock (&q->mutex); |
| } |
| |
| int |
| svm_queue_is_full (svm_queue_t * q) |
| { |
| return q->cursize == q->maxsize; |
| } |
| |
| static inline void |
| svm_queue_send_signal (svm_queue_t * q, u8 is_prod) |
| { |
| if (q->producer_evtfd == -1) |
| { |
| (void) pthread_cond_broadcast (&q->condvar); |
| } |
| else |
| { |
| int __clib_unused rv, fd; |
| u64 data = 1; |
| ASSERT (q->consumer_evtfd > 0 && q->producer_evtfd > 0); |
| fd = is_prod ? q->producer_evtfd : q->consumer_evtfd; |
| rv = write (fd, &data, sizeof (data)); |
| } |
| } |
| |
| static inline void |
| svm_queue_wait_inline (svm_queue_t * q) |
| { |
| if (q->producer_evtfd == -1) |
| { |
| pthread_cond_wait (&q->condvar, &q->mutex); |
| } |
| else |
| { |
| /* Fake a wait for event. We could use epoll but that would mean |
| * using yet another fd. Should do for now */ |
| u32 cursize = q->cursize; |
| pthread_mutex_unlock (&q->mutex); |
| while (q->cursize == cursize) |
| CLIB_PAUSE (); |
| pthread_mutex_lock (&q->mutex); |
| } |
| } |
| |
| void |
| svm_queue_wait (svm_queue_t * q) |
| { |
| svm_queue_wait_inline (q); |
| } |
| |
| static inline int |
| svm_queue_timedwait_inline (svm_queue_t * q, double timeout) |
| { |
| struct timespec ts; |
| ts.tv_sec = unix_time_now () + (u32) timeout; |
| ts.tv_nsec = (timeout - (u32) timeout) * 1e9; |
| |
| if (q->producer_evtfd == -1) |
| { |
| return pthread_cond_timedwait (&q->condvar, &q->mutex, &ts); |
| } |
| else |
| { |
| double max_time = unix_time_now () + timeout; |
| u32 cursize = q->cursize; |
| int rv; |
| |
| pthread_mutex_unlock (&q->mutex); |
| while (q->cursize == cursize && unix_time_now () < max_time) |
| CLIB_PAUSE (); |
| rv = unix_time_now () < max_time ? 0 : ETIMEDOUT; |
| pthread_mutex_lock (&q->mutex); |
| return rv; |
| } |
| } |
| |
| int |
| svm_queue_timedwait (svm_queue_t * q, double timeout) |
| { |
| return svm_queue_timedwait_inline (q, timeout); |
| } |
| |
| /* |
| * svm_queue_add_nolock |
| */ |
| int |
| svm_queue_add_nolock (svm_queue_t * q, u8 * elem) |
| { |
| i8 *tailp; |
| int need_broadcast = 0; |
| |
| if (PREDICT_FALSE (q->cursize == q->maxsize)) |
| { |
| while (q->cursize == q->maxsize) |
| svm_queue_wait_inline (q); |
| } |
| |
| tailp = (i8 *) (&q->data[0] + q->elsize * q->tail); |
| clib_memcpy (tailp, elem, q->elsize); |
| |
| q->tail++; |
| q->cursize++; |
| |
| need_broadcast = (q->cursize == 1); |
| |
| if (q->tail == q->maxsize) |
| q->tail = 0; |
| |
| if (need_broadcast) |
| svm_queue_send_signal (q, 1); |
| return 0; |
| } |
| |
| void |
| svm_queue_add_raw (svm_queue_t * q, u8 * elem) |
| { |
| i8 *tailp; |
| |
| tailp = (i8 *) (&q->data[0] + q->elsize * q->tail); |
| clib_memcpy (tailp, elem, q->elsize); |
| |
| q->tail = (q->tail + 1) % q->maxsize; |
| q->cursize++; |
| |
| if (q->cursize == 1) |
| svm_queue_send_signal (q, 1); |
| } |
| |
| |
| /* |
| * svm_queue_add |
| */ |
| int |
| svm_queue_add (svm_queue_t * q, u8 * elem, int nowait) |
| { |
| i8 *tailp; |
| int need_broadcast = 0; |
| |
| if (nowait) |
| { |
| /* zero on success */ |
| if (pthread_mutex_trylock (&q->mutex)) |
| { |
| return (-1); |
| } |
| } |
| else |
| pthread_mutex_lock (&q->mutex); |
| |
| if (PREDICT_FALSE (q->cursize == q->maxsize)) |
| { |
| if (nowait) |
| { |
| pthread_mutex_unlock (&q->mutex); |
| return (-2); |
| } |
| while (q->cursize == q->maxsize) |
| svm_queue_wait_inline (q); |
| } |
| |
| tailp = (i8 *) (&q->data[0] + q->elsize * q->tail); |
| clib_memcpy (tailp, elem, q->elsize); |
| |
| q->tail++; |
| q->cursize++; |
| |
| need_broadcast = (q->cursize == 1); |
| |
| if (q->tail == q->maxsize) |
| q->tail = 0; |
| |
| if (need_broadcast) |
| svm_queue_send_signal (q, 1); |
| |
| pthread_mutex_unlock (&q->mutex); |
| |
| return 0; |
| } |
| |
| /* |
| * svm_queue_add2 |
| */ |
| int |
| svm_queue_add2 (svm_queue_t * q, u8 * elem, u8 * elem2, int nowait) |
| { |
| i8 *tailp; |
| int need_broadcast = 0; |
| |
| if (nowait) |
| { |
| /* zero on success */ |
| if (pthread_mutex_trylock (&q->mutex)) |
| { |
| return (-1); |
| } |
| } |
| else |
| pthread_mutex_lock (&q->mutex); |
| |
| if (PREDICT_FALSE (q->cursize + 1 == q->maxsize)) |
| { |
| if (nowait) |
| { |
| pthread_mutex_unlock (&q->mutex); |
| return (-2); |
| } |
| while (q->cursize + 1 == q->maxsize) |
| svm_queue_wait_inline (q); |
| } |
| |
| tailp = (i8 *) (&q->data[0] + q->elsize * q->tail); |
| clib_memcpy (tailp, elem, q->elsize); |
| |
| q->tail++; |
| q->cursize++; |
| |
| if (q->tail == q->maxsize) |
| q->tail = 0; |
| |
| need_broadcast = (q->cursize == 1); |
| |
| tailp = (i8 *) (&q->data[0] + q->elsize * q->tail); |
| clib_memcpy (tailp, elem2, q->elsize); |
| |
| q->tail++; |
| q->cursize++; |
| |
| if (q->tail == q->maxsize) |
| q->tail = 0; |
| |
| if (need_broadcast) |
| svm_queue_send_signal (q, 1); |
| |
| pthread_mutex_unlock (&q->mutex); |
| |
| return 0; |
| } |
| |
| /* |
| * svm_queue_sub |
| */ |
| int |
| svm_queue_sub (svm_queue_t * q, u8 * elem, svm_q_conditional_wait_t cond, |
| u32 time) |
| { |
| i8 *headp; |
| int need_broadcast = 0; |
| int rc = 0; |
| |
| if (cond == SVM_Q_NOWAIT) |
| { |
| /* zero on success */ |
| if (pthread_mutex_trylock (&q->mutex)) |
| { |
| return (-1); |
| } |
| } |
| else |
| pthread_mutex_lock (&q->mutex); |
| |
| if (PREDICT_FALSE (q->cursize == 0)) |
| { |
| if (cond == SVM_Q_NOWAIT) |
| { |
| pthread_mutex_unlock (&q->mutex); |
| return (-2); |
| } |
| else if (cond == SVM_Q_TIMEDWAIT) |
| { |
| while (q->cursize == 0 && rc == 0) |
| rc = svm_queue_timedwait_inline (q, time); |
| |
| if (rc == ETIMEDOUT) |
| { |
| pthread_mutex_unlock (&q->mutex); |
| return ETIMEDOUT; |
| } |
| } |
| else |
| { |
| while (q->cursize == 0) |
| svm_queue_wait_inline (q); |
| } |
| } |
| |
| headp = (i8 *) (&q->data[0] + q->elsize * q->head); |
| clib_memcpy (elem, headp, q->elsize); |
| |
| q->head++; |
| /* $$$$ JFC shouldn't this be == 0? */ |
| if (q->cursize == q->maxsize) |
| need_broadcast = 1; |
| |
| q->cursize--; |
| |
| if (q->head == q->maxsize) |
| q->head = 0; |
| |
| if (need_broadcast) |
| svm_queue_send_signal (q, 0); |
| |
| pthread_mutex_unlock (&q->mutex); |
| |
| return 0; |
| } |
| |
| int |
| svm_queue_sub2 (svm_queue_t * q, u8 * elem) |
| { |
| int need_broadcast; |
| i8 *headp; |
| |
| pthread_mutex_lock (&q->mutex); |
| if (q->cursize == 0) |
| { |
| pthread_mutex_unlock (&q->mutex); |
| return -1; |
| } |
| |
| headp = (i8 *) (&q->data[0] + q->elsize * q->head); |
| clib_memcpy (elem, headp, q->elsize); |
| |
| q->head++; |
| need_broadcast = (q->cursize == q->maxsize / 2); |
| q->cursize--; |
| |
| if (PREDICT_FALSE (q->head == q->maxsize)) |
| q->head = 0; |
| pthread_mutex_unlock (&q->mutex); |
| |
| if (need_broadcast) |
| svm_queue_send_signal (q, 0); |
| |
| return 0; |
| } |
| |
| int |
| svm_queue_sub_raw (svm_queue_t * q, u8 * elem) |
| { |
| i8 *headp; |
| |
| if (PREDICT_FALSE (q->cursize == 0)) |
| { |
| while (q->cursize == 0) |
| ; |
| } |
| |
| headp = (i8 *) (&q->data[0] + q->elsize * q->head); |
| clib_memcpy (elem, headp, q->elsize); |
| |
| q->head = (q->head + 1) % q->maxsize; |
| q->cursize--; |
| |
| return 0; |
| } |
| |
| void |
| svm_queue_set_producer_event_fd (svm_queue_t * q, int fd) |
| { |
| q->producer_evtfd = fd; |
| } |
| |
| void |
| svm_queue_set_consumer_event_fd (svm_queue_t * q, int fd) |
| { |
| q->consumer_evtfd = fd; |
| } |
| |
| /* |
| * fd.io coding-style-patch-verification: ON |
| * |
| * Local Variables: |
| * eval: (c-set-style "gnu") |
| * End: |
| */ |