Enable multi-thread receiver support

This change implements locking on the receive and free
message rings such that multiple user threads can concurrently
invoke rmr receive functions.

Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Change-Id: If012c5699e071f1d85f604c79baf8c4e8b77e94a
diff --git a/CHANGES b/CHANGES
index 976169f..f5d970d 100644
--- a/CHANGES
+++ b/CHANGES
@@ -2,6 +2,9 @@
 API and build change  and fix summaries. Doc correctsions
 and/or changes are not mentioned here; see the commit messages.
 
+2020 January 22; verison 3.0.3
+	Enable thread support for multiple receive threads.
+
 2020 January 21; verison 3.0.2
 	Fix bug in SI95 (missing reallocate payload function).
 
diff --git a/CMakeLists.txt b/CMakeLists.txt
index ef8308b..2714703 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -38,7 +38,7 @@
 
 set( major_version "3" )		# should be automatically populated from git tag later, but until CI process sets a tag we use this
 set( minor_version "0" )
-set( patch_level "2" )
+set( patch_level "3" )
 
 set( install_root "${CMAKE_INSTALL_PREFIX}" )
 set( install_inc "include/rmr" )
diff --git a/src/rmr/common/include/rmr.h b/src/rmr/common/include/rmr.h
index e3d08d9..a9805a8 100644
--- a/src/rmr/common/include/rmr.h
+++ b/src/rmr/common/include/rmr.h
@@ -47,6 +47,7 @@
 #define RMRFL_MTCALL		0x02	// set up multi-threaded call support (rmr_init)
 #define RMRFL_AUTO_ALLOC	0x03	// send auto allocates a zerocopy buffer
 #define RMRFL_NAME_ONLY		0x04	// only the hostname:ip is provided as source information for rts() calls
+#define RMRFL_NOLOCK		0x08	// disable receive ring locking (user app ensures single thread or provides collision protection)
 
 #define RMR_DEF_SIZE		0		// pass as size to have msg allocation use the default msg size
 
diff --git a/src/rmr/common/include/rmr_agnostic.h b/src/rmr/common/include/rmr_agnostic.h
index 86f4660..6ef3aee 100644
--- a/src/rmr/common/include/rmr_agnostic.h
+++ b/src/rmr/common/include/rmr_agnostic.h
@@ -238,12 +238,18 @@
 
 
 // --------------- ring things  -------------------------------------------------
+#define RING_NONE	0			// no options
+#define RING_RLOCK	0x01		// create/destroy the read lock on the ring
+#define RING_WLOCK	0x02		// create/destroy the write lockk on the ring
+
 typedef struct ring {
 	uint16_t head;				// index of the head of the ring (insert point)
 	uint16_t tail;				// index of the tail (extract point)
 	uint16_t nelements;			// number of elements in the ring
 	void**	data;				// the ring data (pointers to blobs of stuff)
 	int		pfd;				// event fd for the ring for epoll
+	pthread_mutex_t*	rgate;	// read lock if used
+	pthread_mutex_t*	wgate;	// write lock if used
 } ring_t;
 
 
@@ -273,6 +279,7 @@
 
 // --- message ring --------------------------
 static void* uta_mk_ring( int size );
+static int uta_ring_config( void* vr, int options );
 static void uta_ring_free( void* vr );
 static inline void* uta_ring_extract( void* vr );
 static inline int uta_ring_insert( void* vr, void* new_data );
diff --git a/src/rmr/common/src/ring_static.c b/src/rmr/common/src/ring_static.c
index 93dfdb7..37bb4db 100644
--- a/src/rmr/common/src/ring_static.c
+++ b/src/rmr/common/src/ring_static.c
@@ -62,7 +62,10 @@
 }
 
 /*
-	Make a new ring.
+	Make a new ring. The default is to NOT create a lock; if the user 
+	wants read locking then uta_config_ring() can be used to setup the
+	mutex. (We use several rings internally and the assumption is that
+	there is no locking for these.)
 */
 static void* uta_mk_ring( int size ) {
 	ring_t*	r;
@@ -72,6 +75,8 @@
 		return NULL;
 	}
 
+	r->rgate = NULL;
+	r->wgate = NULL;
 	r->head = r->tail = 0;
 
 	max = (r->head - 1);
@@ -91,6 +96,49 @@
 }
 
 /*
+	Allows for configuration of a ring after it has been allocated.
+	Options are RING_* options that allow for things like setting/clearing
+	read locking. Returns 0 for failure 1 on success.
+
+	Options can be ORd together and all made effective at the same time, but
+	it will be impossible to determine a specific failure if invoked this
+	way.  Control is returned on the first error, and no provision is made
+	to "undo" previously set options if an error occurs.
+*/
+static int uta_ring_config( void* vr, int options ) {
+	ring_t*	r;
+
+	if( (r = (ring_t*) vr) == NULL ) {
+		errno = EINVAL;
+		return 0;
+	}
+
+	if( options & RING_WLOCK ) {
+		if( r->wgate == NULL ) {		// don't realloc
+			r->wgate = (pthread_mutex_t *) malloc( sizeof( *r->wgate ) );
+			if( r->wgate == NULL ) {
+				return 0;
+			}
+	
+			pthread_mutex_init( r->wgate, NULL );
+		}
+	}
+
+	if( options & RING_RLOCK ) {
+		if( r->rgate == NULL ) {		// don't realloc
+			r->rgate = (pthread_mutex_t *) malloc( sizeof( *r->rgate ) );
+			if( r->rgate == NULL ) {
+				return 0;
+			}
+	
+			pthread_mutex_init( r->rgate, NULL );
+		}
+	}
+
+	return 1;
+}
+
+/*
 	Ditch the ring. The caller is responsible for extracting any remaining
 	pointers and freeing them as needed.
 */
@@ -108,6 +156,11 @@
 /*
 	Pull the next data pointer from the ring; null if there isn't
 	anything to be pulled.
+
+	If the read lock exists for the ring, then this will BLOCK until
+	it gets the lock.  There is always a chance that once the lock
+	is obtained that the ring is empty, so the caller MUST handle
+	a nil pointer as the return.
 */
 static inline void* uta_ring_extract( void* vr ) {
 	ring_t*		r;
@@ -122,10 +175,17 @@
 		r = (ring_t*) vr;
 	}
 
-	if( r->tail == r->head ) {			// empty ring
+	if( r->tail == r->head ) {						// empty ring we can bail out quickly
 		return NULL;
 	}
 
+	if( r->rgate != NULL ) {						// if lock exists we must honour it
+		pthread_mutex_lock( r->rgate );
+		if( r->tail == r->head ) {					// ensure ring didn't go empty while waiting
+			return NULL;
+		}
+	}
+
 	ti = r->tail;
 	r->tail++;
 	if( r->tail >= r->nelements ) {
@@ -138,6 +198,10 @@
 	if( r->tail == r->head ) {								// if this emptied the ring, turn off ready
 	}
 */
+
+	if( r->rgate != NULL ) {							// if locked above...
+		pthread_mutex_unlock( r->rgate );
+	}
 	return r->data[ti];
 }
 
@@ -157,7 +221,14 @@
 		r = (ring_t*) vr;
 	}
 
+	if( r->wgate != NULL ) {						// if lock exists we must honour it
+		pthread_mutex_lock( r->wgate );
+	}
+
 	if( r->head+1 == r->tail || (r->head+1 >= r->nelements && !r->tail) ) {		// ring is full
+		if( r->wgate != NULL ) {					// ensure released if needed
+			pthread_mutex_unlock( r->wgate );
+		}
 		return 0;
 	}
 
@@ -174,6 +245,9 @@
 		r->head = 0;
 	}
 
+	if( r->wgate != NULL ) {						// if lock exists we must unlock before going
+		pthread_mutex_unlock( r->wgate );
+	}
 	return 1;
 }
 
diff --git a/src/rmr/si/src/rmr_si.c b/src/rmr/si/src/rmr_si.c
index 6c08e83..c8c3b4a 100644
--- a/src/rmr/si/src/rmr_si.c
+++ b/src/rmr/si/src/rmr_si.c
@@ -570,9 +570,16 @@
 	ctx->max_ibm = max_msg_size;					// default to user supplied message size
 
 	ctx->mring = uta_mk_ring( 4096 );				// message ring is always on for si
+	ctx->zcb_mring = uta_mk_ring( 128 );			// zero copy buffer mbuf ring to reduce malloc/free calls
+
+	if( ! (flags & RMRFL_NOLOCK) ) {				// user did not specifically ask that it be off; turn it on
+		uta_ring_config( ctx->mring, RING_RLOCK );			// concurrent rcv calls require read lock
+		uta_ring_config( ctx->zcb_mring, RING_WLOCK );		// concurrent free calls from userland require write lock
+	} else {
+		fprintf( stderr, "[INFO] receive ring locking disabled by user application\n" );
+	}
 	init_mtcall( ctx );								// set up call chutes
 
-	ctx->zcb_mring = uta_mk_ring( 128 );			// zero copy buffer mbuf ring
 
 	ctx->max_plen = RMR_MAX_RCV_BYTES;				// max user payload lengh
 	if( max_msg_size > 0 ) {
diff --git a/test/ring_static_test.c b/test/ring_static_test.c
index bd2b428..b194928 100644
--- a/test/ring_static_test.c
+++ b/test/ring_static_test.c
@@ -78,6 +78,8 @@
 	int	data[20];
 	int*	dp;
 	int size = 18;
+	int	pfd = -1;					// pollable file descriptor for the ring
+	int	errors = 0;
 
 	r = uta_mk_ring( 0 );			// should return nil
 	if( r != NULL ) {
@@ -96,6 +98,19 @@
 		return 1;
 	}
 
+	pfd = uta_ring_getpfd( r );		// get pollable file descriptor
+	if( pfd < 0 ) {
+		fprintf( stderr, "<FAIL> expected a pollable file descriptor >= 0, but got: %d\n", pfd );
+		errors++;
+	}
+
+	pfd = uta_ring_config( r, 0x03 );		// turn on locking for reads and writes
+	if( pfd != 1 ) {
+		fprintf( stderr, "<FAIL> config attempt to enable locking failed\n" );
+		errors++;
+	}
+	
+
 	for( i = 0; i < 20; i++ ) {		// test to ensure it reports full when head/tail start at 0
 		data[i] = i;
 		if( ! uta_ring_insert( r, &data[i] ) ) {
@@ -161,5 +176,5 @@
 	}
 
 	fprintf( stderr, "<INFO> all ring tests pass\n" );
-	return 0;
+	return errors;
 }