Eliminate the SI receive buffer length requirement

This change eliminates the need to enforce a receive
buffer maximum length as specified by the user application.
The length supplied will be assumed to be the "normal"
maximum and used as the default receive buffer size,
but when messages larger than this are received RMR will
now allocate a larger buffer to use.

Issue-ID: RIC-309

Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Change-Id: I9dacadea40b982fde604b78789568b8118a89b35
diff --git a/CHANGES_CORE.txt b/CHANGES_CORE.txt
index 1e00418..e7c461a 100644
--- a/CHANGES_CORE.txt
+++ b/CHANGES_CORE.txt
@@ -5,6 +5,12 @@
 # API and build change  and fix summaries. Doc correctsions
 # and/or changes are not mentioned here; see the commit messages.
 
+2020 April 9; version 3.7.1
+	The max length restriction for receiving messages when using SI95 has
+	been removed.  The length supplied during initialisation is used as
+	the "normal maximum" and default buffer allocation size, but messages
+	arriving which are larger are accepted. (RIC-309)
+
 2020 April 7; version 3.7.0
 	The health check support programme was renamed to rmr_probe (RIC-308).
 
diff --git a/CMakeLists.txt b/CMakeLists.txt
index c73b5dc..4aa5466 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -40,7 +40,7 @@
 
 set( major_version "3" )		# should be automatically populated from git tag later, but until CI process sets a tag we use this
 set( minor_version "7" )
-set( patch_level "0" )
+set( patch_level "1" )
 
 set( install_root "${CMAKE_INSTALL_PREFIX}" )
 set( install_inc "include/rmr" )
diff --git a/doc/src/man/rmr_alloc_msg.3.xfm b/doc/src/man/rmr_alloc_msg.3.xfm
index d892915..cbd3dc2 100644
--- a/doc/src/man/rmr_alloc_msg.3.xfm
+++ b/doc/src/man/rmr_alloc_msg.3.xfm
@@ -45,8 +45,14 @@
 programme can write into and then send through the RMR library.
 The buffer is allocated such that sending it requires no additional copying
 out of the buffer.
-If the value passed in &cw(size) is 0, then the default size supplied on the
-&ital(rmr_init) call will be used.
+If the value passed in &cw(size) is less than or equal to 0, then the
+&ital(normal maximum size) supplied on the &ital(rmr_init) call will be used.
+When &ital(size) is greater than zero, the message allocated will have at least
+the indicated number of bytes in the payload.
+There is no maximum size imposed by RMR, however the underlying system memory
+managerment (e.g. malloc) functions may impose a limit.
+
+&space
 The &ital(ctx) parameter is the void context pointer that was returned by
 the &ital(rmr_init) function.
 
diff --git a/doc/src/man/rmr_init.3.xfm b/doc/src/man/rmr_init.3.xfm
index 99d28bc..acace5a 100644
--- a/doc/src/man/rmr_init.3.xfm
+++ b/doc/src/man/rmr_init.3.xfm
@@ -38,7 +38,7 @@
 &ex_start
 #include <rmr/rmr.h>
 
-void* rmr_init( char* proto_port, int max_msg_size, int flags );
+void* rmr_init( char* proto_port, int norm_msg_size, int flags );
 &ex_end
 
 &uindent
@@ -51,15 +51,26 @@
 
 &space
 &ital(Port) is used to listen for connection requests from other RMR based applications.
-The &ital(max_msg_size) parameter is used to allocate receive buffers and is the
-maximum message size which the application expects to receive.
-This value is the sum of &bold(both) the maximum payload size &bold(and) the maximum
-trace data size.
-This value is also used as the default message size when allocating message buffers.
-Messages arriving which are longer than the given maximum will be dropped without
-notification to the application.
-A warning is written to standard error for the first message which is too large on
-each connection.
+The &ital(norm_msg_size) parameter is used to allocate receive buffers and should be
+set to what the user application expects to be a size which will hold the vast majority
+of expected messages.
+When computing the size, the application should consider the usual payload size &bold(and)
+the maximum trace data size that will be used.
+This value is also used as the default message size when allocating message buffers (when
+a zero size is given to rmr_alloc_msg(); see the rmr_alloc_msg() manual page).
+Messages arriving which are longer than the given normal size will cause RMR to allocate
+a new buffer which is large enough for the arriving message.
+
+&space
+Starting with version 3.8.0 RMR no longer places a maximum buffer size for received
+messages.
+The underlying system memory manager might impose such a limit and the attempt to
+allocate a buffer larger than that limit will likely result in an application abort.
+Other than the potential performance impact from extra memory allocation and release,
+there is no penality to the user programme for specifyning a normal buffer size which
+is usually smaller than received buffers.
+Similarly, the only penality to the application for over specifying the normal buffer
+size might be a larger memory footprint.
 
 &space
 &ital(Flags) allows for selection of some RMr options at the time of initialisation.
diff --git a/src/rmr/common/include/rmr_agnostic.h b/src/rmr/common/include/rmr_agnostic.h
index 43faf4a..27e1de8 100644
--- a/src/rmr/common/include/rmr_agnostic.h
+++ b/src/rmr/common/include/rmr_agnostic.h
@@ -87,6 +87,7 @@
 #define MFL_NOALLOC		0x02		// send should NOT allocate a new buffer before returning
 #define MFL_ADDSRC		0x04		// source must be added on send
 #define MFL_RAW			0x08		// message is 'raw' and not from an RMr based sender (no header)
+#define MFL_HUGE		0x10		// buffer was larger than applications indicated usual max; don't cache
 
 #define MAX_EP_GROUP	32			// max number of endpoints in a group
 #define MAX_RTG_MSG_SZ	2048		// max expected message size from route generator
diff --git a/src/rmr/si/include/rmr_si_private.h b/src/rmr/si/include/rmr_si_private.h
index ddc9c3f..6c6f74a 100644
--- a/src/rmr/si/include/rmr_si_private.h
+++ b/src/rmr/si/include/rmr_si_private.h
@@ -34,8 +34,8 @@
 #define _uta_private_h
 
 // if pmode is off we don't compile in some checks in hopes of speeding things up
-#ifndef PARINOID_CHECKS
-#	define PARINOID_CHECKS 0
+#ifndef PARANOID_CHECKS
+#	define PARANOID_CHECKS 0
 #endif
 
 
@@ -51,6 +51,10 @@
 #define RF_NOTIFIED	0x01	// notification made about river issue
 #define RF_DROP		0x02	// this message is large and being dropped
 
+#define	TP_SZFIELD_LEN	((sizeof(uint32_t)*2)+1)	// number of bytes needed for msg size in transport header
+#define	TP_SZ_MARKER	'$'							// marker indicating net byte order used
+
+
 #define SI_MAX_ADDR_LEN		512
 
 /*
diff --git a/src/rmr/si/src/mt_call_si_static.c b/src/rmr/si/src/mt_call_si_static.c
index d18a8e0..2d422bb 100644
--- a/src/rmr/si/src/mt_call_si_static.c
+++ b/src/rmr/si/src/mt_call_si_static.c
@@ -1,7 +1,7 @@
 // : vi ts=4 sw=4 noet:
 /*
 ==================================================================================
-	Copyright (c) 2019 Nokia
+	Copyright (c) 2020 Nokia
 	Copyright (c) 2018-2020 AT&T Intellectual Property.
 
    Licensed under the Apache License, Version 2.0 (the "License");
@@ -21,7 +21,10 @@
 /*
 	Mnemonic:	mt_call_si static.c
 	Abstract:	Static funcitons related to the multi-threaded call feature
-				which are SI specific.
+				which are SI specific. The functions here also provide the
+				message construction functions which build a message that
+				might be split across multiple "datagrams" received from the
+				underlying transport.
 
 	Author:		E. Scott Daniels
 	Date:		20 May 2019
@@ -62,7 +65,7 @@
 	chute_t*		chute;
 	unsigned int	call_id;	// the id assigned to the call generated message
 
-	if( PARINOID_CHECKS ) {									// PARINOID mode is slower; off by default
+	if( PARANOID_CHECKS ) {									// PARANOID mode is slower; off by default
 		if( raw_msg == NULL || msg_size <= 0 ) {
 			return;
 		}
@@ -71,6 +74,9 @@
 	if( (mbuf = alloc_mbuf( ctx, RMR_ERR_UNSET )) != NULL ) {
 		mbuf->tp_buf = raw_msg;
 		mbuf->rts_fd = sender_fd;
+		if( msg_size > ctx->max_ibm + 1024 ) {
+			mbuf->flags |= MFL_HUGE;				// prevent caching of oversized buffers
+		}
 
 		ref_tpbuf( mbuf, msg_size );				// point mbuf at bits in the datagram
 		hdr = mbuf->header;							// convenience
@@ -94,27 +100,52 @@
 }
 
 /*
+	Given a buffer, extract the size. We assume the buffer contains one of:
+		<int1><int2><mark>
+		<int1>
+
+	where <int1> is the size in native storage order (v1) and <int2>
+	is the size in network order. If <mark> is present then we assume
+	that <int2> is present and we use that after translating from net
+	byte order. If <mark> is not present, we use <int1>. This allows
+	old versions of RMR to continue to work with new versions that now
+	do the right thing with byte ordering.
+*/
+static inline uint32_t extract_mlen( unsigned char* buf ) {
+	uint32_t	size;		// adjusted (if needed) size for return
+	uint32_t*	blen;		// length in the buffer to extract
+
+	blen = (uint32_t *) buf;
+	if( *(buf + sizeof( int ) * 2 ) == TP_SZ_MARKER ) {
+		size = ntohl( *(blen+1) );				// pick up the second integer
+		if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "extract msg len converted from net order to: %d\n", size );
+	} else {
+		size = *blen;							// old sender didn't encode size
+		if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "extract msg len no conversion: %d\n", size );
+	}
+
+	return size;
+}
+
+/*
 	This is the callback invoked when tcp data is received. It adds the data
 	to the buffer for the connection and if a complete message is received
 	then the message is queued onto the receive ring.
 
 	Return value indicates only that we handled the buffer and SI should continue
 	or that SI should terminate, so on error it's NOT wrong to return "ok".
-
-
-	FUTURE: to do this better, SI needs to support a 'ready to read' callback
-	which allows us to to the actual receive directly into our buffer.
 */
 static int mt_data_cb( void* vctx, int fd, char* buf, int buflen ) {
 	uta_ctx_t*		ctx;
 	river_t*		river;			// river associated with the fd passed in
+	unsigned char*	old_accum;		// old accumulator reference should we need to realloc
 	int				bidx = 0;		// transport buffer index
 	int				remain;			// bytes in transport buf that need to be moved
 	int* 			mlen;			// pointer to spot in buffer for conversion to int
 	int				need;			// bytes needed for something
 	int				i;
 
-	if( PARINOID_CHECKS ) {									// PARINOID mode is slower; off by default
+	if( PARANOID_CHECKS ) {									// PARANOID mode is slower; off by default
 		if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
 			return SI_RET_OK;
 		}
@@ -123,6 +154,8 @@
 			if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "callback fd is out of range: %d nrivers=%d\n", fd, ctx->nrivers );
 			return SI_RET_OK;
 		}
+	} else {
+		ctx = (uta_ctx_t *) vctx;
 	}
 
 	if( buflen <= 0 ) {
@@ -133,7 +166,6 @@
 	if( river->state != RS_GOOD ) {				// all states which aren't good require reset first
 		if( river->state == RS_NEW ) {
 			memset( river, 0, sizeof( *river ) );
-			//river->nbytes = sizeof( char ) * (8 * 1024);
 			river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024);		// max inbound message size
 			river->accum = (char *) malloc( river->nbytes );
 			river->ipt = 0;
@@ -148,10 +180,9 @@
 	while( remain > 0 ) {								// until we've done something with all bytes passed in
 		if( DEBUG )  rmr_vlog( RMR_VL_DEBUG, "====== data callback top of loop bidx=%d msize=%d ipt=%d remain=%d\n", bidx, river->msg_size, river->ipt, remain );
 
-		// FIX ME: size in the message  needs to be network byte order
 		if( river->msg_size <= 0 ) {				// don't have a size yet
 													// FIX ME: we need a frame indicator to ensure alignment
-			need = sizeof( int ) - river->ipt;							// what we need from transport buffer
+			need = TP_SZFIELD_LEN - river->ipt;		// what we need to compute length
 			if( need > remain ) {										// the whole size isn't there
 				if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "need more for size than we have: need=%d rmain=%d ipt=%d\n", need, remain, river->ipt );
 				memcpy( &river->accum[river->ipt], buf+bidx, remain );			// grab what we can and depart
@@ -165,7 +196,7 @@
 				river->ipt += need;
 				bidx += need;
 				remain -= need;
-				river->msg_size = *((int *) river->accum);		
+				river->msg_size = extract_mlen( river->accum );
 				if( DEBUG ) {
 					rmr_vlog( RMR_VL_DEBUG, "size from accumulator =%d\n", river->msg_size );
 					if( DEBUG > 1 ) {
@@ -176,12 +207,21 @@
 					}
 				}
 			} else {
-				river->msg_size = *((int *) &buf[bidx]);					// snarf directly and copy with rest later
+				river->msg_size = extract_mlen( &buf[bidx] );			// pull from buf as it's all there; it will copy later
 			}
 			if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "data callback setting msg size: %d\n", river->msg_size );
 
-			if( river->msg_size > river->nbytes ) {				// message is too big, we will drop it
-				river->flags |= RF_DROP;
+			if( river->msg_size > river->nbytes ) {						// message bigger than app max size; grab huge buffer
+				//river->flags |= RF_DROP;
+				if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "received message is huge (%d) reallocating buffer\n", river->msg_size );
+				old_accum = river->accum;					// need to copy any bytes we snarfed getting the size, so hold
+				river->nbytes = river->msg_size + 128;					// buffer large enough with a bit of fudge room
+				river->accum = (char *) malloc( river->nbytes );
+				if( river->ipt > 0 ) {
+					memcpy( river->accum, old_accum, river->ipt + 1 );		// copy anything snarfed in getting the sie
+				}
+
+				free( old_accum );
 			}
 		}
 
@@ -198,6 +238,7 @@
 			if( (river->flags & RF_DROP) == 0  ) {
 				memcpy( &river->accum[river->ipt], buf+bidx, need );				// grab just what is needed (might be more)
 				buf2mbuf( ctx, river->accum, river->nbytes, fd );					// build an RMR mbuf and queue
+				river->nbytes = sizeof( char ) * (ctx->max_ibm + 1024);				// prevent huge size from persisting
 				river->accum = (char *) malloc( sizeof( char ) *  river->nbytes );	// fresh accumulator
 			} else {
 				if( !(river->flags & RF_NOTIFIED) ) {
diff --git a/src/rmr/si/src/rmr_si.c b/src/rmr/si/src/rmr_si.c
index 17014a6..78a90ff 100644
--- a/src/rmr/si/src/rmr_si.c
+++ b/src/rmr/si/src/rmr_si.c
@@ -82,10 +82,8 @@
 	Clean up a context.
 */
 static void free_ctx( uta_ctx_t* ctx ) {
-	if( ctx ) {
-		if( ctx->rtg_addr ) {
-			free( ctx->rtg_addr );
-		}
+	if( ctx && ctx->rtg_addr ) {
+		free( ctx->rtg_addr );
 	}
 }
 
@@ -175,14 +173,14 @@
 	Return the message to the available pool, or free it outright.
 */
 extern void rmr_free_msg( rmr_mbuf_t* mbuf ) {
-	//fprintf( stderr, "SKIPPING FREE: %p\n", mbuf );
-	//return;
-
 	if( mbuf == NULL ) {
 		return;
 	}
 
-	if( !mbuf->ring || ! uta_ring_insert( mbuf->ring, mbuf ) ) {			// just queue, free if ring is full
+	if( mbuf->flags & MFL_HUGE || 							// don't cache oversized messages
+		! mbuf->ring || 									// cant cache if no ring
+		! uta_ring_insert( mbuf->ring, mbuf ) ) {			// or ring is full
+
 		if( mbuf->tp_buf ) {
 			free( mbuf->tp_buf );
 			mbuf->tp_buf = NULL;		// just in case user tries to reuse this mbuf; this will be an NPE
@@ -263,7 +261,6 @@
 extern rmr_mbuf_t*  rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
 	int			nn_sock;			// endpoint socket for send
 	uta_ctx_t*	ctx;
-	int			state;
 	char*		hold_src;			// we need the original source if send fails
 	char*		hold_ip;			// also must hold original ip
 	int			sock_ok = 0;		// true if we found a valid endpoint socket
@@ -626,10 +623,8 @@
 		port = proto_port;			// assume something like "1234" was passed
 	}
 
-	if( (tok = getenv( "ENV_RTG_PORT" )) != NULL ) {				// must check port here -- if < 1 then we just start static file 'listener'
-		if( atoi( tok ) < 1 ) {
-			static_rtc = 1;
-		}
+	if( (tok = getenv( "ENV_RTG_PORT" )) != NULL && atoi( tok ) < 1 ) { 	// must check here -- if < 1 then we just start static file 'listener'
+		static_rtc = 1;
 	}
 
 	if( (tok = getenv( ENV_SRC_ID )) != NULL ) {							// env var overrides what we dig from system
@@ -840,7 +835,6 @@
 */
 extern rmr_mbuf_t* rmr_mt_rcv( void* vctx, rmr_mbuf_t* mbuf, int max_wait ) {
 	uta_ctx_t*	ctx;
-	uta_mhdr_t*	hdr;			// header in the transport buffer
 	chute_t*	chute;
 	struct timespec	ts;			// time info if we have a timeout
 	long	new_ms;				// adjusted mu-sec
diff --git a/src/rmr/si/src/rtable_si_static.c b/src/rmr/si/src/rtable_si_static.c
index 86f50fe..c389f0f 100644
--- a/src/rmr/si/src/rtable_si_static.c
+++ b/src/rmr/si/src/rtable_si_static.c
@@ -223,7 +223,7 @@
 	endpoint_t*		ep;
 	int				state = FALSE;
 
-	if( PARINOID_CHECKS ) {
+	if( PARANOID_CHECKS ) {
 		if( ctx == NULL || (rt = ctx->rtable) == NULL || (si_ctx = ctx->si_ctx) == NULL  ) {
 			if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_byname: parinoia check pop ctx=%p rt=%p\n", ctx, rt );
 			return FALSE;
@@ -299,7 +299,7 @@
 	rrgroup_t* rrg;
 	int	idx;
 
-	if( PARINOID_CHECKS ) {
+	if( PARANOID_CHECKS ) {
 		if( ctx == NULL || (si_ctx = ctx->si_ctx) == NULL  ) {
 			return FALSE;
 		}
@@ -403,7 +403,7 @@
 	char*	meid;
 	si_ctx_t*	si_ctx;
 
-	if( PARINOID_CHECKS ) {
+	if( PARANOID_CHECKS ) {
 		if( ctx == NULL || (si_ctx = ctx->si_ctx) == NULL  ) {
 			return FALSE;
 		}
diff --git a/src/rmr/si/src/si95/siconnect.c b/src/rmr/si/src/si95/siconnect.c
index 114e873..a48cb43 100644
--- a/src/rmr/si/src/si95/siconnect.c
+++ b/src/rmr/si/src/si95/siconnect.c
@@ -57,7 +57,7 @@
 	int alen = 0;					//  len of address struct 
 	int fd = SI_ERROR;             	//  file descriptor to return to caller 
 
-	if( PARINOID_CHECKS ) {
+	if( PARANOID_CHECKS ) {
 		if( gptr == NULL ) {
 			return SI_ERROR;
 		}
diff --git a/src/rmr/si/src/si95/silisten.c b/src/rmr/si/src/si95/silisten.c
index 8cd484a..3ce0e18 100644
--- a/src/rmr/si/src/si95/silisten.c
+++ b/src/rmr/si/src/si95/silisten.c
@@ -45,7 +45,7 @@
 	struct tp_blk *tpptr;      		//  pointer into tp list
 	int status = SI_ERROR;     		//  status of processing
 
-	if( PARINOID_CHECKS ) {
+	if( PARANOID_CHECKS ) {
 		if( gptr == NULL ) {
 			return status;
 		}
diff --git a/src/rmr/si/src/si95/sipoll.c b/src/rmr/si/src/si95/sipoll.c
index c47fb97..8c28977 100644
--- a/src/rmr/si/src/si95/sipoll.c
+++ b/src/rmr/si/src/si95/sipoll.c
@@ -53,7 +53,7 @@
  char ibuf[1025];
  int i;                        //  loop index
  struct tp_blk *tpptr;         //  pointer at tp stuff
- struct tp_blk *nextone;        //  pointer at next block to process
+ struct tp_blk *nextone = NULL;	//  pointer at next block to process
  int pstat;                    //  poll status
  int kstat;                    //  keyboard status
  struct timeval  delay;        //  delay to use on select call
@@ -120,7 +120,6 @@
       }
 
      // for( tpptr = gptr->tplist; tpptr != NULL; tpptr = tpptr->next )
-     for( tpptr = gptr->tplist; tpptr != NULL; tpptr = nextone )
 	tpptr = gptr->tplist; 
 	while( tpptr != NULL ) {
 		nextone = tpptr->next;					//  allow for a delete in loop
diff --git a/src/rmr/si/src/si95/socket_if.h b/src/rmr/si/src/si95/socket_if.h
index 5546800..f7311fc 100644
--- a/src/rmr/si/src/si95/socket_if.h
+++ b/src/rmr/si/src/si95/socket_if.h
@@ -24,7 +24,7 @@
 *  Mnemonic:	socket_if.h
 *  Abstract:	Main set of definitions needed by the SI native functions and
 *				any user code.
-*  Date:	    26 March 1995	(original) 
+*  Date:	    26 March 1995	(original)
 *				3 January 2020  (revised)
 *  Author:		E. Scott Daniels
 *
@@ -34,81 +34,81 @@
 #ifndef _SOCKET_IF_H
 #define _SOCKET_IF_H
 
-#ifndef PARINOID_CHECKS
-#	define PARINOID_CHECKS 1
+#ifndef PARANOID_CHECKS
+#	define PARANOID_CHECKS 0
 #endif
 
-#define TCP_DEVICE	0     	//  device type of socket 
-#define UDP_DEVICE	1	
+#define TCP_DEVICE	0     	//  device type of socket
+#define UDP_DEVICE	1
 
-//  these are for SIclose, must be negative so as to be distinguished from real fd values 
-#define TCP_LISTEN_PORT	(-1)	//  close first listen port found 
-#define UDP_PORT	(-2)	//  close first udp port found 
+//  these are for SIclose, must be negative so as to be distinguished from real fd values
+#define TCP_LISTEN_PORT	(-1)	//  close first listen port found
+#define UDP_PORT	(-2)	//  close first udp port found
 
 #define SI_BAD_HANDLE  ((void *) 0)
 
 #define SI_OPT_NONE		0		// initialisation options
 #define SI_OPT_FORK    0x01      //  fork new sessions
-#define SI_OPT_FG      0x02      //  keep process in the "foreground" 
-#define SI_OPT_TTY     0x04      //  processes keyboard interrupts if fg 
-#define SI_OPT_ALRM    0x08      //  cause setsig to be called with alarm flg 
+#define SI_OPT_FG      0x02      //  keep process in the "foreground"
+#define SI_OPT_TTY     0x04      //  processes keyboard interrupts if fg
+#define SI_OPT_ALRM    0x08      //  cause setsig to be called with alarm flg
 
-                                 //  offsets of callbacks in table 
-                                 //  used to indentify cb in SIcbreg 
-#define SI_CB_SIGNAL   0         //  usr signal/alarm received 
-#define SI_CB_RDATA    1         //  handles data arrival on raw interface 
-#define SI_CB_CDATA    2         //  handles data arrival on cooked interface 
-#define SI_CB_KDATA    3         //  handles data arrival from keyboard 
-#define SI_CB_SECURITY 4         //  authorizes acceptance of a conect req 
-#define SI_CB_CONN     5         //  called when a session is accepted 
-#define SI_CB_DISC     6         //  called when a session is lost 
+                                 //  offsets of callbacks in table
+                                 //  used to indentify cb in SIcbreg
+#define SI_CB_SIGNAL   0         //  usr signal/alarm received
+#define SI_CB_RDATA    1         //  handles data arrival on raw interface
+#define SI_CB_CDATA    2         //  handles data arrival on cooked interface
+#define SI_CB_KDATA    3         //  handles data arrival from keyboard
+#define SI_CB_SECURITY 4         //  authorizes acceptance of a conect req
+#define SI_CB_CONN     5         //  called when a session is accepted
+#define SI_CB_DISC     6         //  called when a session is lost
 #define SI_CB_POLL     7
 
                                  //  return values callbacks are expected to produce
 #define SI_RET_OK      0         //  processing ok -- continue
 #define SI_RET_ERROR   (-1)      //  processing not ok -- reject/disallow
-#define SI_RET_UNREG   1         //  unregester (never call again) the cb 
+#define SI_RET_UNREG   1         //  unregester (never call again) the cb
 #define SI_RET_QUIT    2         //  set the shutdown flag  and terminate the SI environment
 
-                                 //  values returned to user by SI routines 
-#define SI_ERROR       (-1)      //  unable to process 
-#define SI_OK          0         //  processing completed successfully 
-#define SI_QUEUED      1         //  send messages was queued 
+                                 //  values returned to user by SI routines
+#define SI_ERROR       (-1)      //  unable to process
+#define SI_OK          0         //  processing completed successfully
+#define SI_QUEUED      1         //  send messages was queued
 
-                                  //  flags passed to signal callback 
-#define SI_SF_QUIT     0x01      //  program should terminate 
-#define SI_SF_USR1     0x02      //  user 1 signal received 
-#define SI_SF_USR2     0x04      //  user 2 signal received 
-#define SI_SF_ALRM     0x08      //  alarm clock signal received 
+                                  //  flags passed to signal callback
+#define SI_SF_QUIT     0x01      //  program should terminate
+#define SI_SF_USR1     0x02      //  user 1 signal received
+#define SI_SF_USR2     0x04      //  user 2 signal received
+#define SI_SF_ALRM     0x08      //  alarm clock signal received
 
-                                 //  signal bitmasks for the setsig routine 
-#define SI_SIG_QUIT    0x01      //  please catch quit signal 
-#define SI_SIG_HUP     0x02      //  catch hangup signal 
-#define SI_SIG_TERM    0x04      //  catch the term signal 
-#define SI_SIG_USR1    0x08      //  catch user signals 
+                                 //  signal bitmasks for the setsig routine
+#define SI_SIG_QUIT    0x01      //  please catch quit signal
+#define SI_SIG_HUP     0x02      //  catch hangup signal
+#define SI_SIG_TERM    0x04      //  catch the term signal
+#define SI_SIG_USR1    0x08      //  catch user signals
 #define SI_SIG_USR2    0x10
-#define SI_SIG_ALRM    0x20      //  catch alarm signals 
-#define SI_DEF_SIGS    0x1F      //  default signals to catch 
+#define SI_SIG_ALRM    0x20      //  catch alarm signals
+#define SI_DEF_SIGS    0x1F      //  default signals to catch
 
-                                 //  SIerrno values set in public rtns 
-#define SI_ERR_NONE     0        //  no error as far as we can tell 
-#define SI_ERR_QUEUED   1	//  must be same as queued 
-#define SI_ERR_TPORT    2        //  could not bind to requested tcp port 
-#define SI_ERR_UPORT    3        //  could not bind to requested udp port 
-#define SI_ERR_FORK     4        //  could not fork to become daemon 
-#define SI_ERR_HANDLE   5        //  global information pointer went off 
-#define SI_ERR_SESSID   6        //  invalid session id 
-#define SI_ERR_TP       7        //  error occured in transport provider 
-#define SI_ERR_SHUTD    8        //  cannot process because in shutdown mode 
-#define SI_ERR_NOFDS    9        //  no file descriptors are open 
-#define SI_ERR_SIGUSR1  10       //  signal received data not read 
-#define SI_ERR_SIGUSR2  11       //  signal received data not read 
-#define SI_ERR_DISC     12       //  session disconnected 
-#define SI_ERR_TIMEOUT  13       //  poll attempt timed out - no data 
-#define SI_ERR_ORDREL   14       //  orderly release received 
-#define SI_ERR_SIGALRM  15       //  alarm signal received 
-#define SI_ERR_NOMEM    16       //  could not allocate needed memory 
-#define SI_ERR_ADDR    	17       //  address conversion failed 
+                                 //  SIerrno values set in public rtns
+#define SI_ERR_NONE     0        //  no error as far as we can tell
+#define SI_ERR_QUEUED   1	//  must be same as queued
+#define SI_ERR_TPORT    2        //  could not bind to requested tcp port
+#define SI_ERR_UPORT    3        //  could not bind to requested udp port
+#define SI_ERR_FORK     4        //  could not fork to become daemon
+#define SI_ERR_HANDLE   5        //  global information pointer went off
+#define SI_ERR_SESSID   6        //  invalid session id
+#define SI_ERR_TP       7        //  error occured in transport provider
+#define SI_ERR_SHUTD    8        //  cannot process because in shutdown mode
+#define SI_ERR_NOFDS    9        //  no file descriptors are open
+#define SI_ERR_SIGUSR1  10       //  signal received data not read
+#define SI_ERR_SIGUSR2  11       //  signal received data not read
+#define SI_ERR_DISC     12       //  session disconnected
+#define SI_ERR_TIMEOUT  13       //  poll attempt timed out - no data
+#define SI_ERR_ORDREL   14       //  orderly release received
+#define SI_ERR_SIGALRM  15       //  alarm signal received
+#define SI_ERR_NOMEM    16       //  could not allocate needed memory
+#define SI_ERR_ADDR    	17       //  address conversion failed
 #define SI_ERR_BLOCKED	18		// operation would block
 
 #define SI_TF_NONE		0		// tcp flags in the global info applied to each session
@@ -116,7 +116,7 @@
 #define SI_TF_FASTACK	0x02	// set fast ack on for each connection
 
 #ifndef _SI_ERRNO
-extern int SIerrno;               //  error number set by public routines 
+extern int SIerrno;               //  error number set by public routines
 #define _SI_ERRNO
 #endif
 
diff --git a/src/rmr/si/src/sr_si_static.c b/src/rmr/si/src/sr_si_static.c
index 40ae61b..3919621 100644
--- a/src/rmr/si/src/sr_si_static.c
+++ b/src/rmr/si/src/sr_si_static.c
@@ -41,9 +41,9 @@
 	if( label ) {
 		fprintf( stderr, ">>>>> %s p=%p %d bytes\n", label, p, n );
 	}
-	
+
 	rows = (n/16) + ((n % 16) ? 1 : 0);
-	
+
 	for( j = 0; j < rows; j++ ) {
 		fprintf( stderr, "%04x: ", j * 16 );
 
@@ -59,7 +59,7 @@
 	backwards compatability.
 */
 static void dump_40( char *p, char* label ) {
-	dump_n( p, label, 40 ); 
+	dump_n( p, label, 40 );
 }
 
 /*
@@ -70,7 +70,7 @@
 
 	The addition of the connection shut error code to the switch requires
 	that the NNG version at commit e618abf8f3db2a94269a (or after) be
-	used for compiling RMR. 
+	used for compiling RMR.
 */
 static inline int xlate_si_state( int state, int def_state ) {
 
@@ -111,6 +111,25 @@
 }
 
 /*
+	Given a message size and a buffer (assumed to be TP_SZFIELD_LEN or larger)
+	this will put in the size such that it is compatable with old versions
+	of RMR (that expect the message size to not be in network byte order)
+	and with new versions that do. See extract function in mt_call_si_static.c
+	for details on what ends up in the buffer.
+*/
+static inline void insert_mlen( uint32_t len, char* buf ) {
+	uint32_t* blen;							// pointer into buffer where we'll add the len
+
+	blen = (uint32_t *) buf;				// old systems expect an unconverted integer
+	*blen = len;
+
+	blen++;
+	*blen = htonl( len );					// new systems want a converted integer
+
+	buf[TP_SZFIELD_LEN-1] = TP_SZ_MARKER;	// marker to flag this is generated by a new message
+}
+
+/*
 	Alloc a new nano zero copy buffer and put into msg. If msg is nil, then we will alloc
 	a new message struct as well. Size is the size of the zc buffer to allocate (not
 	including our header). If size is 0, then the buffer allocated is the size previously
@@ -161,13 +180,13 @@
 		abort( );											// toss out a core file for this
 	}
 
-#ifdef DEBUG
-	memset( msg->tp_buf, 0, mlen );    // NOT for production (debug only)	valgrind will complain about uninitalised use if we don't set
-	memcpy( msg->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!**", TP_HDR_LEN );		// NOT for production -- debugging eyecatcher
-#endif
+	if( DEBUG ) {
+		// for speed we don't do this in production; for testing valgrind will complain about uninitialised use if not set
+		memset( msg->tp_buf, 0, mlen );
+		memcpy( msg->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!==", 34 );		// do NOT use a $ in this string!
+	}
 
-	alen = (int *) msg->tp_buf;
-	*alen = mlen;						// FIX ME: need a stuct to go in these first bytes, not just dummy len
+	insert_mlen( (uint32_t) mlen, msg->tp_buf );			// this will likely be overwriten on send to shirnk
 
 	msg->header = ((char *) msg->tp_buf) + TP_HDR_LEN;
 	memset( msg->header, 0, sizeof( uta_mhdr_t ) );				// ensure no junk in the header area
@@ -391,10 +410,12 @@
 		rmr_vlog( RMR_VL_CRIT, "rmr_clone: cannot get memory for zero copy buffer: %d\n", ENOMEM );
 		exit( 1 );
 	}
-	memset( nm->tp_buf, 0, tpb_len );
-	memcpy( nm->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!**", 34 );		// DEBUGGING
-	alen = (int *) nm->tp_buf;
-	*alen = tpb_len;						// FIX ME: need a stuct to go in these first bytes, not just dummy len
+	if( DEBUG ) {
+		memset( nm->tp_buf, 0, tpb_len );
+		memcpy( nm->tp_buf, "@@!!@@!!@@!!@@!!@@!!@@!!@@!!@@!!==", 34 );		// DEBUGGING do NOT use $ in this string!!
+	}
+
+	insert_mlen( (uint32_t) tpb_len, nm->tp_buf );			// this len will likely be reset on send to shrink
 
 	nm->header = ((char *) nm->tp_buf) + TP_HDR_LEN;
 
@@ -436,7 +457,7 @@
 }
 
 /*
-	Realloc the message such that the payload is at least payload_len bytes.  
+	Realloc the message such that the payload is at least payload_len bytes.
 	The clone and copy options affect what portion of the original payload is copied to
 	the reallocated message, and whether or not the original payload is lost after the
 	reallocation process has finished.
@@ -459,7 +480,7 @@
 		clone == false
 		The old payload will be lost after reallocation. The message buffer pointer which
 		is returned will likely reference the same structure (don't depend on that).
-		
+
 
 	CAUTION:
 	If the message is not a message which was received, the mtype, sub-id, length values in the
@@ -520,7 +541,7 @@
 	omhdr = old_msg->header;
 	mlen = hdr_len + (payload_len > old_psize ? payload_len : old_psize);		// must have larger in case copy is true
 
-	if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "reallocate for payload increase. new message size: %d\n", (int) mlen );	
+	if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "reallocate for payload increase. new message size: %d\n", (int) mlen );
 	if( (nm->tp_buf = (char *) malloc( sizeof( char ) * mlen )) == NULL ) {
 		rmr_vlog( RMR_VL_CRIT, "rmr_realloc_payload: cannot get memory for zero copy buffer. bytes requested: %d\n", (int) mlen );
 		free( nm );
@@ -580,7 +601,7 @@
 	Called by rmr_send_msg() and rmr_rts_msg(), etc. and thus we assume that all pointer
 	validation has been done prior.
 
-	When msg->state is not ok, this function must set tp_state in the message as some API 
+	When msg->state is not ok, this function must set tp_state in the message as some API
 	fucntions return the message directly and do not propigate errno into the message.
 */
 static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock, int retries ) {
@@ -615,7 +636,7 @@
 		if( tot_len > msg->alloc_len ) {
 			tot_len = msg->alloc_len;									// likely bad length from user :(
 		}
-		*((int*) msg->tp_buf) = tot_len;
+		insert_mlen( tot_len, msg->tp_buf );	// shrink to fit
 
 		if( DEBUG > 1 ) rmr_vlog( RMR_VL_DEBUG, "send_msg: ending %d (%x) bytes  usr_len=%d alloc=%d retries=%d\n", tot_len, tot_len, msg->len, msg->alloc_len, retries );
 		if( DEBUG > 2 ) dump_40( msg->tp_buf, "sending" );
@@ -679,8 +700,8 @@
 	message type is used.  If the initial lookup, with a subid, fails, then a
 	second lookup using just the mtype is tried.
 
-	When msg->state is not OK, this function must set tp_state in the message as 
-	some API fucntions return the message directly and do not propigate errno into 
+	When msg->state is not OK, this function must set tp_state in the message as
+	some API fucntions return the message directly and do not propigate errno into
 	the message.
 
 	CAUTION: this is a non-blocking send.  If the message cannot be sent, then
@@ -761,7 +782,7 @@
 				if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
 				msg->flags |= MFL_NOALLOC;								// keep send from allocating a new message; we have a clone to use
 				msg = send_msg( ctx, msg, nn_sock, max_to );			// do the hard work, msg should be nil on success
-	
+
 				if( msg != NULL ) {										// returned message indicates send error of some sort
 					rmr_free_msg( msg );								// must ditchone; pick msg so we don't have to unfiddle flags
 					msg = clone_m;
@@ -773,7 +794,7 @@
 				msg = send_msg( ctx, msg, nn_sock, max_to );			// send the last, and allocate a new buffer; drops the clone if it was
 				if( DEBUG ) {
 					if( msg == NULL ) {
-						rmr_vlog( RMR_VL_DEBUG, "mtosend_msg:  send returned nil message!\n" );		
+						rmr_vlog( RMR_VL_DEBUG, "mtosend_msg:  send returned nil message!\n" );
 					}
 				}
 			}
@@ -783,7 +804,7 @@
 					case RMR_OK:
 						ep->scounts[EPSC_GOOD]++;
 						break;
-				
+	
 					case RMR_ERR_RETRY:
 						ep->scounts[EPSC_TRANS]++;
 						break;
@@ -806,9 +827,9 @@
 		if( ok_sends ) {				// multiple rr-groups and one was successful; report ok
 			msg->state = RMR_OK;
 		}
-	
+
 		if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "final send stats: ok=%d group=%d state=%d\n", ok_sends, group, msg->state );
-	
+
 		msg->tp_state = errno;
 	}
 
diff --git a/test/app_test/ex_rts_receiver.c b/test/app_test/ex_rts_receiver.c
index e2a73e6..287143f 100644
--- a/test/app_test/ex_rts_receiver.c
+++ b/test/app_test/ex_rts_receiver.c
@@ -100,6 +100,8 @@
 	char	osrc[128];					// src strings to test when cloning
 	char	nsrc[128];
 	int		verbose = 0;
+	int		old_len;
+	int		dmb_size = 2048;			// default message buffer size
 
 	data = getenv( "RMR_RTG_SVC" );
 	if( data == NULL ) {
@@ -135,15 +137,20 @@
 
 	memset( count_bins, 0, sizeof( count_bins ) );
 
-	fprintf( stderr, "<EXRCVR> listening on port: %s for a max of %d messages\n", listen_port, nmsgs );
+	fprintf( stderr, "<EXRCVR> listening on port %s for a max of 20 seconds\n", listen_port );
 	fprintf( stderr, "<EXRCVR> copy=%d clone=%d\n", copy, clone );
 
+	if( ! clone ) {
+		dmb_size = 128;			// not cloning, force small buffer to test larger buffer receives
+	}
+
+	fprintf( stderr, "<EXRCVR> default receive message buffer size: %d\n", dmb_size );
 #ifdef MTC
 	fprintf( stderr, "<EXRCVR> starting in multi-threaded mode\n" );
-	mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_MTCALL ); // start RMr in mt-receive mode
+	mrc = rmr_init( listen_port, dmb_size, RMRFL_MTCALL ); // start RMr in mt-receive mode
 #else
 	fprintf( stderr, "<EXRCVR> starting in direct receive mode\n" );
-	mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_NONE );	// start your engines!
+	mrc = rmr_init( listen_port, dmb_size, RMRFL_NONE );	// start your engines!
 #endif
 	if( mrc == NULL ) {
 		fprintf( stderr, "<EXRCVR> ABORT:  unable to initialise RMr\n" );
@@ -180,18 +187,22 @@
 				}
 
 				if( clone ) {
-					need = rmr_payload_size( msg );
+					need = msg->len;							// clone what was sent, not entire payload
 				} else {
 					need = generate_payload( ack_header, ack_data, 0, 0 );		// create an ack w/ random payload in payload, and set data in header
 				}
 
 				if( clone || rmr_payload_size( msg ) < need ) {					// received message too small or we want to clone the original for test
+					old_len = msg->len;
 					resized++;
 					omsg = msg;
 					rmr_get_src( omsg, osrc );
 					msg = rmr_realloc_payload( msg, need, copy, clone );		// reallocate the message with a payload big enough or clone if set
 					rmr_get_src( msg, nsrc );
 
+					if( msg->len != old_len ) {
+						fprintf( stderr, "[ERR] after realloc len=%d didn't match old len=%d\n", msg->len, old_len );
+					}
 					if( strcmp( osrc, nsrc ) != 0 ) {
 						fprintf( stderr, "[ERR] realloc source strings don't match (%s) new=(%s)\n", osrc, nsrc );
 					} else {
diff --git a/test/app_test/run_all.sh b/test/app_test/run_all.sh
index 51747fb..4fef41d 100644
--- a/test/app_test/run_all.sh
+++ b/test/app_test/run_all.sh
@@ -186,5 +186,5 @@
 fi
 
 
-fm -fr goober_dir
+rm -fr goober_dir
 exit $(( !! errors ))
diff --git a/test/app_test/run_exrts_test.sh b/test/app_test/run_exrts_test.sh
index 9f0a436..5cb6e8c 100644
--- a/test/app_test/run_exrts_test.sh
+++ b/test/app_test/run_exrts_test.sh
@@ -38,11 +38,22 @@
 
 ulimit -c unlimited
 
+# driven with -L sender or -L receiver on the command line
+# run something though valgrind to check for leaks; requires valgind
+function leak_anal {
+	valgrind  -v --leak-resolution=high --leak-check=yes $opt "$@"
+}
+
 # The sender and receivers are run asynch. Their exit statuses are captured in a
 # file in order for the 'main' to pick them up easily.
 #
 function run_sender {
-	./v_sender${si} ${nmsg:-10} ${delay:-100000} ${mtype_start_stop:-0:1} 
+	if (( la_sender ))
+	then
+		leak_anal ./v_sender${si} ${nmsg:-10} ${delay:-100000} ${mtype_start_stop:-0:1}  >/tmp/la.log 2>&1
+	else
+		./v_sender${si} ${nmsg:-10} ${delay:-100000} ${mtype_start_stop:-0:1}
+	fi
 	echo $? >/tmp/PID$$.src		# must communicate state back via file b/c asynch
 }
 
@@ -52,7 +63,12 @@
 
 	port=$(( 4460 + ${1:-0} ))
 	export RMR_RTG_SVC=$(( 9990 + $1 ))
-	./ex_rts_receiver${si} $copyclone -p $port
+	if (( la_receiver ))
+	then
+		leak_anal ./ex_rts_receiver${si} $copyclone -p $port >/tmp/la.log 2>&1
+	else
+		./ex_rts_receiver${si} $copyclone -p $port
+	fi
 	echo $? >/tmp/PID$$.$1.rrc
 }
 
@@ -114,6 +130,7 @@
 		-B)	rebuild=1;;
 		-d)	delay=${2//,/}; shift;;				# delay in micro seconds allow 1,000 to make it easier on user
 		-i)	use_installed=1;;
+		-L)	leak_anal=$2; shift;;
 		-m)	mtype_start_stop="$2"; shift;;
 		-M)	mt_call_flag="EX_CFLAGS=-DMTC=1";;	# turn on mt-call receiver option
 		-N)	si="";;								# enable nng based testing
@@ -126,6 +143,7 @@
 			echo "usage: $0 [-B] [-c caller-threads] [-d micor-sec-delay] [-i] [-M] [-m mtype] [-n num-msgs] [-r num-receivers] [-S] [-v]"
 			echo "  -B forces a rebuild which will use .build"
 			echo "  -i will use installed libraries (/usr/local) and cause -B to be ignored if supplied)"
+			echo "  -L {sender|receiver} run the sender or recevier code under valgrind for leak analysis (output to /tmp/la.log)"
 			echo "  -m mtype  will set the stopping (max) message type; sender will loop through 0 through mtype-1"
 			echo "  -m start:stop  will set the starting and stopping mtypes; start through stop -1"
 			echo "  -M enables mt-call receive processing to test the RMR asynch receive pthread"
@@ -141,6 +159,11 @@
 	shift
 done
 
+# set leak analysis (do not do this from any automated tests)
+case $leak_anal in 
+	s*)	la_sender=1;;
+	r*)	la_receiver=1;;
+esac
 
 if (( verbose ))
 then
diff --git a/test/lg_buf_static_test.c b/test/lg_buf_static_test.c
new file mode 100644
index 0000000..a74e080
--- /dev/null
+++ b/test/lg_buf_static_test.c
@@ -0,0 +1,75 @@
+// : vi ts=4 sw=4 noet :
+/*
+==================================================================================
+	    Copyright (c) 2020 Nokia
+	    Copyright (c) 2020 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+	   http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+/*
+	Mmemonic:	rmr_si_lgb_static_test.c
+	Abstract:	These are specific tests to exercise the ability to receive
+				a buffer larger than what was specified as the normal max
+				on the init call.
+
+	Author:		E. Scott Daniels
+	Date:		9 April 2020
+*/
+
+#include <unistd.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <strings.h>
+#include <errno.h>
+#include <string.h>
+#include <stdint.h>
+#include <pthread.h>
+#include <semaphore.h>
+
+#include "rmr.h"
+#include "rmr_agnostic.h"
+
+static int rmr_lgbuf_test( ) {
+	int		errors = 0;
+	void*	rmc;				// route manager context
+	rmr_mbuf_t*	msg;			// message buffers
+
+	if( (rmc = rmr_init( "4560", 128, FL_NOTHREAD )) == NULL ) {		// use a short "normal max" value
+		fail_if_nil( rmc, "rmr_init returned a nil pointer when trying to initialise a short normal max"  );
+		return 1;
+	}
+
+	gen_rt( rmc );							// dummy route table so the send works
+	msg = rmr_alloc_msg( rmc, 4024 );		// payload size, and the msg len must be larger than 128
+	msg->len = 4000;
+	msg->mtype = 1;
+	msg->state = 999;
+	snprintf( msg->payload, msg->len, "Rambling Wreck from Georgia Tech!\n\n" );
+	msg = rmr_send_msg( rmc, msg );
+	if( msg && msg->state != RMR_OK ) {
+		printf( stderr, "[ERR] lg buf test: send failed? %d\n", msg->state );
+	}
+	rmr_free_msg( msg );
+
+	msg = rmr_torcv_msg( rmc, NULL, 2000 );
+	errors += fail_if( msg == NULL, "rmr_rcv_msg returned nil msg when given nil msg "  );
+	if( msg ) {
+		errors += fail_not_equal( msg->state, RMR_OK, "receive given nil message did not return msg with good state"  );
+
+		errors += fail_if_false( msg->len > 128, "expected larger message than the small buffer" );
+	}
+
+	return errors;
+}
diff --git a/test/rmr_si_test.c b/test/rmr_si_test.c
index 9f49530..aa1b6ab 100644
--- a/test/rmr_si_test.c
+++ b/test/rmr_si_test.c
@@ -55,6 +55,7 @@
 #include <semaphore.h>
 
 #define DEBUG 1
+#define PARANOID_CHECKS	1					// must have parinoid testing on to not fail on nil pointer tests
 
 											// specific test tools in this directory
 #undef NNG_UNDER_TEST 
@@ -73,6 +74,7 @@
 #include "rmr_si.c"
 #include "mbuf_api.c"
 
+
 static void gen_rt( uta_ctx_t* ctx );		// defined in sr_si_static_test, but used by a few others (eliminate order requirement below)
 
 											// and finally....
@@ -83,6 +85,7 @@
 #include "wormhole_static_test.c"
 #include "mbuf_api_static_test.c"
 #include "sr_si_static_test.c"
+#include "lg_buf_static_test.c"
 
 #include "rmr_si_api_static_test.c"
 
@@ -95,6 +98,10 @@
 
 	rmr_set_vlevel( 5 );			// enable all debugging
 
+	fprintf( stderr, "\n<INFO> starting lg buffer tests (%d)\n", errors );
+	errors += rmr_lgbuf_test();
+	fprintf( stderr, "<INFO> error count: %d\n", errors );
+
 	fprintf( stderr, "\n<INFO> starting ring tests (%d)\n", errors );
 	errors += ring_test();
 	fprintf( stderr, "<INFO> error count: %d\n", errors );
diff --git a/test/test_si95_em.c b/test/test_si95_em.c
index 6dc33cc..c2cf4cb 100644
--- a/test/test_si95_em.c
+++ b/test/test_si95_em.c
@@ -96,7 +96,7 @@
 void *em_cb_data = NULL;
 static void em_sicbreg( struct ginfo_blk *gptr, int type, int ((*fptr)()), void * dptr ) {
 	if( em_cb_data == NULL ) {
-		fprintf( stderr, "<SIEM> calldback dptr saved for type %d\n", type );
+		fprintf( stderr, "<SIEM> calldback dptr %p saved for type %d\n", dptr, type );
 		em_cb_data = dptr;
 	}
 	return;