Enable multi-thread receiver support
This change implements locking on the receive and free
message rings such that multiple user threads can concurrently
invoke rmr receive functions.
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Change-Id: If012c5699e071f1d85f604c79baf8c4e8b77e94a
diff --git a/CHANGES b/CHANGES
index 976169f..f5d970d 100644
--- a/CHANGES
+++ b/CHANGES
@@ -2,6 +2,9 @@
API and build change and fix summaries. Doc correctsions
and/or changes are not mentioned here; see the commit messages.
+2020 January 22; verison 3.0.3
+ Enable thread support for multiple receive threads.
+
2020 January 21; verison 3.0.2
Fix bug in SI95 (missing reallocate payload function).
diff --git a/CMakeLists.txt b/CMakeLists.txt
index ef8308b..2714703 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -38,7 +38,7 @@
set( major_version "3" ) # should be automatically populated from git tag later, but until CI process sets a tag we use this
set( minor_version "0" )
-set( patch_level "2" )
+set( patch_level "3" )
set( install_root "${CMAKE_INSTALL_PREFIX}" )
set( install_inc "include/rmr" )
diff --git a/src/rmr/common/include/rmr.h b/src/rmr/common/include/rmr.h
index e3d08d9..a9805a8 100644
--- a/src/rmr/common/include/rmr.h
+++ b/src/rmr/common/include/rmr.h
@@ -47,6 +47,7 @@
#define RMRFL_MTCALL 0x02 // set up multi-threaded call support (rmr_init)
#define RMRFL_AUTO_ALLOC 0x03 // send auto allocates a zerocopy buffer
#define RMRFL_NAME_ONLY 0x04 // only the hostname:ip is provided as source information for rts() calls
+#define RMRFL_NOLOCK 0x08 // disable receive ring locking (user app ensures single thread or provides collision protection)
#define RMR_DEF_SIZE 0 // pass as size to have msg allocation use the default msg size
diff --git a/src/rmr/common/include/rmr_agnostic.h b/src/rmr/common/include/rmr_agnostic.h
index 86f4660..6ef3aee 100644
--- a/src/rmr/common/include/rmr_agnostic.h
+++ b/src/rmr/common/include/rmr_agnostic.h
@@ -238,12 +238,18 @@
// --------------- ring things -------------------------------------------------
+#define RING_NONE 0 // no options
+#define RING_RLOCK 0x01 // create/destroy the read lock on the ring
+#define RING_WLOCK 0x02 // create/destroy the write lockk on the ring
+
typedef struct ring {
uint16_t head; // index of the head of the ring (insert point)
uint16_t tail; // index of the tail (extract point)
uint16_t nelements; // number of elements in the ring
void** data; // the ring data (pointers to blobs of stuff)
int pfd; // event fd for the ring for epoll
+ pthread_mutex_t* rgate; // read lock if used
+ pthread_mutex_t* wgate; // write lock if used
} ring_t;
@@ -273,6 +279,7 @@
// --- message ring --------------------------
static void* uta_mk_ring( int size );
+static int uta_ring_config( void* vr, int options );
static void uta_ring_free( void* vr );
static inline void* uta_ring_extract( void* vr );
static inline int uta_ring_insert( void* vr, void* new_data );
diff --git a/src/rmr/common/src/ring_static.c b/src/rmr/common/src/ring_static.c
index 93dfdb7..37bb4db 100644
--- a/src/rmr/common/src/ring_static.c
+++ b/src/rmr/common/src/ring_static.c
@@ -62,7 +62,10 @@
}
/*
- Make a new ring.
+ Make a new ring. The default is to NOT create a lock; if the user
+ wants read locking then uta_config_ring() can be used to setup the
+ mutex. (We use several rings internally and the assumption is that
+ there is no locking for these.)
*/
static void* uta_mk_ring( int size ) {
ring_t* r;
@@ -72,6 +75,8 @@
return NULL;
}
+ r->rgate = NULL;
+ r->wgate = NULL;
r->head = r->tail = 0;
max = (r->head - 1);
@@ -91,6 +96,49 @@
}
/*
+ Allows for configuration of a ring after it has been allocated.
+ Options are RING_* options that allow for things like setting/clearing
+ read locking. Returns 0 for failure 1 on success.
+
+ Options can be ORd together and all made effective at the same time, but
+ it will be impossible to determine a specific failure if invoked this
+ way. Control is returned on the first error, and no provision is made
+ to "undo" previously set options if an error occurs.
+*/
+static int uta_ring_config( void* vr, int options ) {
+ ring_t* r;
+
+ if( (r = (ring_t*) vr) == NULL ) {
+ errno = EINVAL;
+ return 0;
+ }
+
+ if( options & RING_WLOCK ) {
+ if( r->wgate == NULL ) { // don't realloc
+ r->wgate = (pthread_mutex_t *) malloc( sizeof( *r->wgate ) );
+ if( r->wgate == NULL ) {
+ return 0;
+ }
+
+ pthread_mutex_init( r->wgate, NULL );
+ }
+ }
+
+ if( options & RING_RLOCK ) {
+ if( r->rgate == NULL ) { // don't realloc
+ r->rgate = (pthread_mutex_t *) malloc( sizeof( *r->rgate ) );
+ if( r->rgate == NULL ) {
+ return 0;
+ }
+
+ pthread_mutex_init( r->rgate, NULL );
+ }
+ }
+
+ return 1;
+}
+
+/*
Ditch the ring. The caller is responsible for extracting any remaining
pointers and freeing them as needed.
*/
@@ -108,6 +156,11 @@
/*
Pull the next data pointer from the ring; null if there isn't
anything to be pulled.
+
+ If the read lock exists for the ring, then this will BLOCK until
+ it gets the lock. There is always a chance that once the lock
+ is obtained that the ring is empty, so the caller MUST handle
+ a nil pointer as the return.
*/
static inline void* uta_ring_extract( void* vr ) {
ring_t* r;
@@ -122,10 +175,17 @@
r = (ring_t*) vr;
}
- if( r->tail == r->head ) { // empty ring
+ if( r->tail == r->head ) { // empty ring we can bail out quickly
return NULL;
}
+ if( r->rgate != NULL ) { // if lock exists we must honour it
+ pthread_mutex_lock( r->rgate );
+ if( r->tail == r->head ) { // ensure ring didn't go empty while waiting
+ return NULL;
+ }
+ }
+
ti = r->tail;
r->tail++;
if( r->tail >= r->nelements ) {
@@ -138,6 +198,10 @@
if( r->tail == r->head ) { // if this emptied the ring, turn off ready
}
*/
+
+ if( r->rgate != NULL ) { // if locked above...
+ pthread_mutex_unlock( r->rgate );
+ }
return r->data[ti];
}
@@ -157,7 +221,14 @@
r = (ring_t*) vr;
}
+ if( r->wgate != NULL ) { // if lock exists we must honour it
+ pthread_mutex_lock( r->wgate );
+ }
+
if( r->head+1 == r->tail || (r->head+1 >= r->nelements && !r->tail) ) { // ring is full
+ if( r->wgate != NULL ) { // ensure released if needed
+ pthread_mutex_unlock( r->wgate );
+ }
return 0;
}
@@ -174,6 +245,9 @@
r->head = 0;
}
+ if( r->wgate != NULL ) { // if lock exists we must unlock before going
+ pthread_mutex_unlock( r->wgate );
+ }
return 1;
}
diff --git a/src/rmr/si/src/rmr_si.c b/src/rmr/si/src/rmr_si.c
index 6c08e83..c8c3b4a 100644
--- a/src/rmr/si/src/rmr_si.c
+++ b/src/rmr/si/src/rmr_si.c
@@ -570,9 +570,16 @@
ctx->max_ibm = max_msg_size; // default to user supplied message size
ctx->mring = uta_mk_ring( 4096 ); // message ring is always on for si
+ ctx->zcb_mring = uta_mk_ring( 128 ); // zero copy buffer mbuf ring to reduce malloc/free calls
+
+ if( ! (flags & RMRFL_NOLOCK) ) { // user did not specifically ask that it be off; turn it on
+ uta_ring_config( ctx->mring, RING_RLOCK ); // concurrent rcv calls require read lock
+ uta_ring_config( ctx->zcb_mring, RING_WLOCK ); // concurrent free calls from userland require write lock
+ } else {
+ fprintf( stderr, "[INFO] receive ring locking disabled by user application\n" );
+ }
init_mtcall( ctx ); // set up call chutes
- ctx->zcb_mring = uta_mk_ring( 128 ); // zero copy buffer mbuf ring
ctx->max_plen = RMR_MAX_RCV_BYTES; // max user payload lengh
if( max_msg_size > 0 ) {
diff --git a/test/ring_static_test.c b/test/ring_static_test.c
index bd2b428..b194928 100644
--- a/test/ring_static_test.c
+++ b/test/ring_static_test.c
@@ -78,6 +78,8 @@
int data[20];
int* dp;
int size = 18;
+ int pfd = -1; // pollable file descriptor for the ring
+ int errors = 0;
r = uta_mk_ring( 0 ); // should return nil
if( r != NULL ) {
@@ -96,6 +98,19 @@
return 1;
}
+ pfd = uta_ring_getpfd( r ); // get pollable file descriptor
+ if( pfd < 0 ) {
+ fprintf( stderr, "<FAIL> expected a pollable file descriptor >= 0, but got: %d\n", pfd );
+ errors++;
+ }
+
+ pfd = uta_ring_config( r, 0x03 ); // turn on locking for reads and writes
+ if( pfd != 1 ) {
+ fprintf( stderr, "<FAIL> config attempt to enable locking failed\n" );
+ errors++;
+ }
+
+
for( i = 0; i < 20; i++ ) { // test to ensure it reports full when head/tail start at 0
data[i] = i;
if( ! uta_ring_insert( r, &data[i] ) ) {
@@ -161,5 +176,5 @@
}
fprintf( stderr, "<INFO> all ring tests pass\n" );
- return 0;
+ return errors;
}