fix(rtable): Prevent direct send hairpins

If the current node is listed as an endpoint for a message type
that this process would add to the route table a hair-pin loop
back to the node can happen. This isn't desireable and this
change will prevent the current node from being added as a
recipient endpoint to any route table entry.

Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Change-Id: I5a1b2bdaad2eab499ae5a1c7430238c1da2f3256

Add latency call/receive test support

Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Change-Id: I70be670d063adfdf6505431a4a2ce72e846df0a4

Actual mods to implement hairpin loop avodiance

Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Change-Id: Ieead4fbf69ade58db1f37d1949cf0f03683c64de
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
diff --git a/test/app_test/Makefile b/test/app_test/Makefile
index 07748ee..67acb9f 100644
--- a/test/app_test/Makefile
+++ b/test/app_test/Makefile
@@ -55,6 +55,9 @@
 mt_receiver: receiver.c
 	gcc -I $${C_INCLUDE_PATH:-.} -DMTC $< -g -o $@ -lrmr_nng -lnng -lpthread -lm
 
+lreceiver: lreceiver.c
+	gcc -I $${C_INCLUDE_PATH:-.} $< -g -o $@ -lrmr_nng -lnng -lpthread -lm
+
 sender_nano: sender.c
 	gcc -I $${C_INCLUDE_PATH:-.} $< -g -o $@ -lrmr -lnanomsg -lpthread -lm
 
@@ -64,6 +67,10 @@
 caller: caller.c
 	gcc -I $${C_INCLUDE_PATH:-.} $< -g -o $@ -lrmr_nng -lnng -lpthread -lm
 
+lcaller: lcaller.c
+	gcc -I $${C_INCLUDE_PATH:-.} $< -g -o $@  -lrmr_nng -lnng -lpthread -lm
+
+
 
 # clean removes intermediates; nuke removes everything that can be built
 .PHONY: clean nuke
diff --git a/test/app_test/lcaller.c b/test/app_test/lcaller.c
new file mode 100644
index 0000000..0d246d8
--- /dev/null
+++ b/test/app_test/lcaller.c
@@ -0,0 +1,458 @@
+// :vim ts=4 sw=4 noet:
+/*
+==================================================================================
+	Copyright (c) 2019 Nokia
+	Copyright (c) 2018-2019 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+	   http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+/*
+	Mnemonic:	lcaller.c
+	Abstract:	This is a simple sender which will send a series of messages using
+				rmr_call().   Similar to caller.c the major difference is that
+				a timestamp is placed into the message and the receiver is expected
+				to add a timestamp before executing an rts call.  We can then
+				compute the total round trip latency as well as the forward send
+				latency.
+
+				Overall, N threads are started each sending the desired number
+				of messages and expecting an 'ack' for each. Each ack is examined
+				to verify that the thread id placed into the message matches (meaning
+				that the ack was delivered by RMr to the correct thread's chute.
+
+				The message format is 'binary' defined by the lc_msg struct.
+
+				Parms:	argv[1] == number of msgs to send (10)
+						argv[2] == delay		(mu-seconds, 1000000 default)
+						argv[3] == number of threads (3)
+						argv[4] == listen port
+
+				Sender will send for at most 20 seconds, so if nmsgs and delay extend
+				beyond that period the total number of messages sent will be less
+				than n.
+
+	Date:		18 April 2019
+	Author:		E. Scott Daniels
+*/
+
+#include <unistd.h>
+#include <errno.h>
+#include <string.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <sys/epoll.h>
+#include <time.h>
+#include <pthread.h>
+
+
+#include <rmr/rmr.h>
+
+#define TRACE_SIZE 40		// bytes in header to provide for trace junk
+#define	SUCCESS		(-1)
+
+/*
+	Thread data
+*/
+typedef struct tdata {
+	int		id;					// the id we'll pass to RMr mt-call function NOT the thread id
+	int		n2send;				// number of messages to send
+	int		delay;				// ms delay between messages
+	void*	mrc;				// RMr context
+	int		state;
+	int*	in_bins;			// latency count bins
+	int*	out_bins;
+	int		nbins;				// number of bins allocated
+	long long in_max;
+	long long out_max;
+	int		out_oor;			// out of range count
+	int		in_oor;
+	int		in_bcount;				// total messages tracked in bins
+	int		out_bcount;				// total messages tracked in bins
+} tdata_t;
+
+
+/*
+	The message type placed into the payload.
+*/
+typedef struct lc_msg {
+	struct timespec out_ts;			// time just before call executed
+	struct timespec turn_ts;		// time at the receiver,  on receipt
+	struct timespec in_ts;			// time received back by the caller
+	int		out_retries;			// number of retries required to send
+	int		turn_retries;			// number of retries required to send
+} lc_msg_t;
+
+// --------------------------------------------------------------------------------
+
+
+static int sum( char* str ) {
+	int sum = 0;
+	int	i = 0;
+
+	while( *str ) {
+		sum += *(str++) + i++;
+	}
+
+	return sum % 255;
+}
+
+static void print_stats( tdata_t* td, int out, int hist ) {
+	int sum;					// sum of latencies
+	int csum = 0;				// cutoff sum
+	int i95 = 0;				// bin for the 95th count
+	int i99 = 0;				// bin for the 99th count
+	int mean = -1;
+	int cutoff_95;				// 95% of total messages
+	int cutoff_99;				// 99% of total messages
+	int	oor;
+	int max;
+	int j;
+
+	if( out ) {
+		cutoff_95 = .95 * (td->out_oor + td->out_bcount);
+		cutoff_99 = .95 * (td->out_oor + td->out_bcount);
+		oor = td->out_oor;
+		max = td->out_max;
+	} else {
+		cutoff_95 = .95 * (td->in_oor + td->in_bcount);
+		cutoff_99 = .95 * (td->in_oor + td->in_bcount);
+		oor = td->in_oor;
+		max = td->in_max;
+	}
+
+	sum = 0;
+	for( j = 0; j < td->nbins; j++ ) {
+		if( csum < cutoff_95 ) {
+			i95++;
+		}
+		if( csum < cutoff_99 ) {
+			i99++;
+		}
+
+		if( out ) {
+			csum += td->out_bins[j];
+			sum += td->out_bins[j] * j;
+		} else {
+			csum += td->in_bins[j];
+			sum += td->in_bins[j] * j;
+		}
+	}
+
+	if( out ) {
+		if( td->out_bcount ) {
+			mean = sum/(td->out_bcount);
+		}
+	} else {
+		if( td->in_bcount ) {
+			mean = sum/(td->in_bcount);
+		}
+	}
+
+	if( hist ) {
+		for( j = 0; j < td->nbins; j++ ) {
+			fprintf( stderr, "%3d %d\n", j, out ? td->out_bins[j] : td->in_bins[j] );
+		}
+	}
+
+	fprintf( stderr, "%s: oor=%d max=%.2fms  mean=%.2fms  95th=%.2fms 99th=%.2f\n", 
+		out ? "out" : " in", oor, (double)max/1000000.0, (double)mean/100.0, (double) i95/100.0, i99/100.0 );
+}
+
+/*
+	Given a message, compute the in/out and round trip latencies.
+*/
+static void compute_latency( tdata_t* td, lc_msg_t* lcm ) {
+	long long out;
+	long long turn;
+	long long in;
+	double rtl;		// round trip latency
+	double outl;	// caller to receiver latency (out)
+	double inl;		// receiver to caller latency (in)
+	int bin;
+
+	if( lcm == NULL || td == NULL ) {
+		return;
+	}
+
+	out = (lcm->out_ts.tv_sec * 1000000000) + lcm->out_ts.tv_nsec;
+	in = (lcm->in_ts.tv_sec * 1000000000) + lcm->in_ts.tv_nsec;
+	turn = (lcm->turn_ts.tv_sec * 1000000000) + lcm->turn_ts.tv_nsec;
+
+	if( in - turn > td->in_max ) {
+		td->in_max = in - turn;
+	}
+	if( turn - out > td->out_max ) {
+		td->out_max = turn-out;
+	}
+	
+	bin = (turn-out) / 10000;			// 100ths of ms
+
+#ifdef PRINT
+	outl = ((double) turn - out) / 1000000.0;		// convert to ms
+	inl = ((double) in - turn) / 1000000.0;
+	rtl = ((double) in - out) / 1000000.0;
+
+	fprintf( stderr, "outl = %5.3fms   inl = %5.3fms  rtl = %5.3fms bin=%d\n", outl, inl, rtl, bin );
+#else
+
+	bin = (turn - out) / 10000;			// 100ths of ms
+	if( bin < td->nbins ) {
+		td->out_bins[bin]++;
+		td->out_bcount++;
+	} else {
+		td->out_oor++;
+	}
+
+	bin = (in - turn) / 10000;			// 100ths of ms
+	if( bin < td->nbins ) {
+		td->in_bins[bin]++;
+		td->in_bcount++;
+	} else {
+		td->in_oor++;
+	}
+
+#endif
+}
+
+/*
+	Executed as a thread, this puppy will generate calls to ensure that we get the
+	response back to the right thread, that we can handle threads, etc.
+*/
+static void* mk_calls( void* data ) {
+	lc_msg_t*	lcm;						// pointer at the payload as a struct
+	tdata_t*	control;
+	rmr_mbuf_t*		sbuf;					// send buffer
+	int		count = 0;
+	int		ack_count = 0;
+	int		rt_count = 0;					// number of messages requiring a spin retry
+	int		ok_msg = 0;						// received messages that were sent by us
+	int		bad_msg = 0;					// received messages that were sent by a different thread
+	int		drops = 0;
+	int		fail_count = 0;					// # of failure sends after first successful send
+	int		successful = 0;					// set to true after we have a successful send
+	char	xbuf[1024];						// build transaction string here
+	int		xaction_id = 1;
+	char*	tok;
+	int		state = 0;
+
+	if( (control  = (tdata_t *) data) == NULL ) {
+		fprintf( stderr, "thread data was nil; bailing out\n" );
+	}
+	//fprintf( stderr, "<THRD> thread started with parallel call id=%d sending=%d delay=%d\n", control->id, control->n2send, control->delay );
+
+	sbuf = rmr_alloc_msg( control->mrc, 512 );	// alloc first send buffer; subsequent buffers allcoated on send
+
+	usleep( rand() % 777 );					// stagger starts a bit so that they all don't pile up on the first connections
+
+	while( count < control->n2send ) {								// we send n messages after the first message is successful
+		lcm = (lc_msg_t *) sbuf->payload;
+
+		snprintf( xbuf, 200, "%31d", xaction_id );
+		xaction_id += control->id;
+		rmr_bytes2xact( sbuf, xbuf, 32 );
+
+		sbuf->mtype = 5;												// all go with the same type
+		sbuf->len =  sizeof( *lcm );
+		sbuf->state = RMR_OK;
+		lcm->out_retries = 0;
+		lcm->turn_retries = 0;
+		clock_gettime( CLOCK_REALTIME, &lcm->out_ts );					// mark time out
+		sbuf = rmr_mt_call( control->mrc, sbuf, control->id, 1000 );	// send it (send returns an empty payload on success, or the original payload on fail/retry)
+
+		if( sbuf && sbuf->state == RMR_ERR_RETRY ) {					// number of times we had to spin to send
+			rt_count++;
+		}
+		while( sbuf != NULL && sbuf->state == RMR_ERR_RETRY ) {				// send blocked; keep trying
+			lcm->out_retries++;
+			sbuf = rmr_mt_call( control->mrc, sbuf, control->id, 100 );		// call and wait up to 100ms for a response
+		}
+
+		count++;
+		if( sbuf != NULL ) {
+			switch( sbuf->state ) {
+				case RMR_OK:														// we should have a buffer back from the sender here
+					lcm = (lc_msg_t *) sbuf->payload;
+					clock_gettime( CLOCK_REALTIME, &lcm->in_ts );					// mark time back
+					successful = 1;
+					compute_latency( control, lcm );
+					
+					ack_count++;
+					//fprintf( stderr, "%d  have received %d\n", control->id, count );
+					break;
+
+				default:
+					fprintf( stderr, "unexpected error: tid=%d rmr-state=%d ernro=%d\n", control->id, sbuf->state, errno );
+					sbuf = rmr_alloc_msg( control->mrc, 512 );			// allocate a sendable buffer
+					if( successful ) {
+						fail_count++;							// count failures after first successful message
+					} else {
+						// some error (not connected likely), don't count this
+						sleep( 1 );
+					}
+					break;
+			}
+		} else {
+			//fprintf( stderr, "<THRD> tid=%-2d call finished, no sbuf\n", control->id );
+			sbuf = rmr_alloc_msg( control->mrc, 512 );				// loop expects an subf
+			drops++;
+		}
+
+		if( control->delay > 0 ) {
+			usleep( control->delay );
+		}
+	}
+
+	control->state = SUCCESS;
+	fprintf( stderr, "<THRD> %d finished sent %d, received %d  messages\n", control->id, count, ack_count );
+	return NULL;
+}
+
+int main( int argc, char** argv ) {
+	void* mrc;      						// msg router context
+	rmr_mbuf_t*	rbuf = NULL;				// received on 'normal' flow
+	struct	epoll_event events[1];			// list of events to give to epoll
+	struct	epoll_event epe;				// event definition for event to listen to
+	int     ep_fd = -1;						// epoll's file des (given to epoll_wait)
+	int		rcv_fd;    						// file des that NNG tickles -- give this to epoll to listen on
+	int		nready;							// number of events ready for receive
+	char*	listen_port = "43086";
+	long	timeout = 0;
+	int		delay = 100000;					// usec between send attempts
+	int		nmsgs = 10;						// number of messages to send
+	int		nthreads = 3;
+	int		cutoff;
+	int		sum;
+	tdata_t*	cvs;						// vector of control blocks
+	int			i;
+	int			j;
+	pthread_t*	pt_info;					// thread stuff
+	int 	failures = 0;
+	int		pings = 0;						// number of messages received on normal channel
+
+	if( argc > 1 ) {
+		nmsgs = atoi( argv[1] );
+	}
+	if( argc > 2 ) {
+		delay = atoi( argv[2] );
+	}
+	if( argc > 3 ) {
+		nthreads = atoi( argv[3] );
+	}
+	if( argc > 4 ) {
+		listen_port = argv[4];
+	}
+
+	fprintf( stderr, "<CALL> listen port: %s; sending %d messages; delay=%d\n", listen_port, nmsgs, delay );
+
+	if( (mrc = rmr_init( listen_port, 1400, RMRFL_MTCALL )) == NULL ) {		// initialise with multi-threaded call enabled
+		fprintf( stderr, "<CALL> unable to initialise RMr\n" );
+		exit( 1 );
+	}
+
+	//rmr_init_trace( mrc, TRACE_SIZE );
+
+	if( (rcv_fd = rmr_get_rcvfd( mrc )) >= 0 ) {							// epoll only available from NNG -- skip receive later if not NNG
+		if( rcv_fd < 0 ) {
+			fprintf( stderr, "<CALL> unable to set up polling fd\n" );
+			exit( 1 );
+		}
+		if( (ep_fd = epoll_create1( 0 )) < 0 ) {
+			fprintf( stderr, "<CALL> [FAIL] unable to create epoll fd: %d\n", errno );
+			exit( 1 );
+		}
+		epe.events = EPOLLIN;
+		epe.data.fd = rcv_fd;
+
+		if( epoll_ctl( ep_fd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 )  {
+			fprintf( stderr, "<CALL> [FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
+			exit( 1 );
+		}
+	} else {
+		rmr_set_rtimeout( mrc, 0 );			// for nano we must set the receive timeout to 0; non-blocking receive
+	}
+
+
+	cvs = malloc( sizeof( tdata_t ) * nthreads );
+	pt_info = malloc( sizeof( pthread_t ) * nthreads );
+	if( cvs == NULL ) {
+		fprintf( stderr, "<CALL> unable to allocate control vector\n" );
+		exit( 1 );	
+	}
+
+
+	timeout = time( NULL ) + 20;		// give rmr 20s to find the route table (shouldn't need that much)
+	while( ! rmr_ready( mrc ) ) {		// must have a route table before we can send; wait til RMr says it has one
+		fprintf( stderr, "<CALL> waiting for rmr to show ready\n" );
+		sleep( 1 );
+
+		if( time( NULL ) > timeout ) {
+			fprintf( stderr, "<CALL> giving up\n" );
+			exit( 1 );
+		}
+	}
+	fprintf( stderr, "<CALL> rmr is ready; starting threads\n" );
+
+	for( i = 0; i < nthreads; i++ ) {
+		cvs[i].mrc = mrc;
+		cvs[i].id = i + 2;				// we pass this as the call-id to rmr, so must be >1
+		cvs[i].delay = delay;
+		cvs[i].n2send = nmsgs;
+		cvs[i].state = 1;
+
+		cvs[i].nbins = 100;
+		cvs[i].out_bins = (int *) malloc( sizeof( int ) * cvs[i].nbins );
+		cvs[i].in_bins = (int *) malloc( sizeof( int ) * cvs[i].nbins );
+		memset( cvs[i].out_bins, 0, sizeof( int ) * cvs[i].nbins );
+		memset( cvs[i].in_bins, 0, sizeof( int ) * cvs[i].nbins );
+
+		pthread_create( &pt_info[i], NULL, mk_calls, &cvs[i] );		// kick a thread
+	}
+
+	timeout = time( NULL ) + 20;
+	i = 0;
+	while( nthreads > 0 ) {
+		if( cvs[i].state < 1 ) {			// states 0 or below indicate done. 0 == failure, -n == success
+			//print_stats( &cvs[i], 1, i == 0 );
+			print_stats( &cvs[i], 1, 0 );
+			print_stats( &cvs[i], 0, 0 );
+
+			nthreads--;
+			if( cvs[i].state == 0 ) {
+				failures++;
+			}
+			i++;
+		} else {
+		//	sleep( 1 );
+			rbuf = rmr_torcv_msg( mrc, rbuf, 1000 );
+			if( rbuf != NULL && rbuf->state != RMR_ERR_RETRY ) {
+				pings++;
+				rmr_free_msg( rbuf );
+				rbuf = NULL;
+			}
+		}
+		if( time( NULL ) > timeout ) {
+			failures += nthreads;
+			fprintf( stderr, "<CALL> timeout waiting for threads to finish; %d were not finished\n", nthreads );
+			break;
+		}
+	}
+
+	fprintf( stderr, "<CALL> [%s] failing threads=%d  pings reeived=%d\n", failures == 0 ? "PASS" : "FAIL",  failures, pings );
+	sleep( 2 );
+	rmr_close( mrc );
+
+	return failures > 0;
+}
+
diff --git a/test/app_test/lreceiver.c b/test/app_test/lreceiver.c
new file mode 100644
index 0000000..161abc7
--- /dev/null
+++ b/test/app_test/lreceiver.c
@@ -0,0 +1,178 @@
+// :vim ts=4 sw=4 noet:
+/*
+==================================================================================
+	Copyright (c) 2019 Nokia
+	Copyright (c) 2018-2019 AT&T Intellectual Property.
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+	   http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+/*
+	Mnemonic:	rmr_rcvr.c
+	Abstract:	This is a very simple receiver that listens for messages and
+				returns each to the sender after adding a timestamp to the 
+				payload.  The payload is expected to be lc_msg_t (see lcaller.c)
+				and this will update the 'turn' timestamp on receipt.
+
+				Define these environment variables to have some control:
+					RMR_SEED_RT -- path to the static routing table
+					RMR_RTG_SVC -- port to listen for RTG connections
+
+	Date:		18 April 2019
+	Author:		E. Scott Daniels
+*/
+
+#include <unistd.h>
+#include <errno.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <time.h>
+#include <string.h>
+
+#include <rmr/rmr.h>
+
+/*
+	The message type placed into the payload.
+*/
+typedef struct lc_msg {
+	struct timespec out_ts;			// time just before call executed
+	struct timespec turn_ts;		// time at the receiver,  on receipt
+	struct timespec in_ts;			// time received back by the caller
+	int		out_retries;			// number of retries required to send
+	int		turn_retries;			// number of retries required to send
+} lc_msg_t;
+
+// ----------------------------------------------------------------------------------
+
+static int sum( char* str ) {
+	int sum = 0;
+	int	i = 0;
+
+	while( *str ) {
+		sum += *(str++) + i++;
+	}
+
+	return sum % 255;
+}
+
+/*
+	Split the message at the first sep and return a pointer to the first
+	character after.
+*/
+static char* split( char* str, char sep ) {
+	char*	s;
+
+	s = strchr( str, sep );
+
+	if( s ) {
+		return s+1;
+	}
+
+	fprintf( stderr, "<RCVR> no pipe in message: (%s)\n", str );
+	return NULL;
+}
+
+int main( int argc, char** argv ) {
+	void* mrc;      					// msg router context
+	lc_msg_t*	lmc;					// latency message type from caller
+	rmr_mbuf_t* msg = NULL;				// message received
+	int		i;
+	int		errors = 0;
+	char*	listen_port = "4560";
+	long	count = 0;						// total received
+	long	timeout = 0;
+	char*	data;
+	int		nmsgs = 10;					// number of messages to stop after (argv[1] overrides)
+	int		rt_count = 0;				// retry count
+	time_t	now;
+	int		active;
+
+	data = getenv( "RMR_RTG_SVC" );
+	if( data == NULL ) {
+		setenv( "RMR_RTG_SVC", "19289", 1 );		// set one that won't collide with the sender if on same host
+	}
+
+	if( argc > 1 ) {
+		nmsgs = atoi( argv[1] );
+	}
+	if( argc > 2 ) {
+		listen_port = argv[2];
+	}
+
+
+	fprintf( stderr, "<RCVR> listening on port: %s for a max of %d messages\n", listen_port, nmsgs );
+
+	mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_MTCALL );	// start your engines!
+	//mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, 0 );	// start your engines!
+	if( mrc == NULL ) {
+		fprintf( stderr, "<RCVR> ABORT:  unable to initialise RMr\n" );
+		exit( 1 );
+	}
+
+	timeout = time( NULL ) + 20;
+	while( ! rmr_ready( mrc ) ) {								// wait for RMr to load a route table
+		fprintf( stderr, "<RCVR> waiting for RMr to show ready\n" );
+		sleep( 1 );
+
+		if( time( NULL ) > timeout ) {
+			fprintf( stderr, "<RCVR> giving up\n" );
+			exit( 1 );
+		}
+	}
+	fprintf( stderr, "<RCVR> rmr now shows ready, listening begins\n" );
+
+	timeout = time( NULL ) + 2;			// once we start, we assume if we go 2s w/o a message that we're done
+	//while( count < nmsgs ) {
+	while( 1 ) {
+		active = 0;
+		msg = rmr_torcv_msg( mrc, msg, 1000 );				// pop every second or so to timeout if needed
+
+		if( msg ) {
+			active = 1;
+			if( msg->state == RMR_OK ) {
+				lmc = (lc_msg_t *) msg->payload;
+				clock_gettime( CLOCK_REALTIME, &lmc->turn_ts );		// mark time that we received it.
+				count++;
+				
+				msg = rmr_rts_msg( mrc, msg );
+				rt_count = 1000;
+				while( rt_count > 0 && msg != NULL && msg->state == RMR_ERR_RETRY ) {		// to work right in nano we need this :(
+					lmc->turn_retries++;
+					if( count < 1 ) {										// 1st msg, so we need to connect, and we'll wait for that
+						sleep( 1 );
+					}
+					rt_count--;
+					msg = rmr_rts_msg( mrc, msg );							// we don't try to resend if this returns retry
+				}
+			}
+		}
+
+		now = time( NULL );
+		if( now > timeout ) {
+			break;
+		}
+
+		if( active ) {
+			timeout = now + 2;
+		}
+	}
+
+	fprintf( stderr, "<RCVR> %ld is finished got %ld messages\n", (long) getpid(), count );
+
+	
+	sleep( 3 );
+	rmr_close( mrc );
+	return 0;
+}
+
diff --git a/test/app_test/run_all.ksh b/test/app_test/run_all.ksh
index ffcd3e1..7271cfd 100644
--- a/test/app_test/run_all.ksh
+++ b/test/app_test/run_all.ksh
@@ -15,7 +15,7 @@
 done
 
 set -e
-ksh run_app_test.ksh $build
+ksh run_app_test.ksh -v -i $build
 ksh run_multi_test.ksh
 ksh run_rr_test.ksh
 ksh run_rts_test.ksh -s 20
diff --git a/test/app_test/run_app_test.ksh b/test/app_test/run_app_test.ksh
index ff0673f..964a080 100644
--- a/test/app_test/run_app_test.ksh
+++ b/test/app_test/run_app_test.ksh
@@ -27,9 +27,9 @@
 #				recevier then  run the basic test.
 #
 #				Example command line:
-#					ksh ./run		# default 10 messages at 1 msg/sec
-#					ksh ./run -N    # default but with nanomsg lib
-#					ksh ./run -d 100 -n 10000 # send 10k messages with 100ms delay between
+#					ksh ./run_app_test.ksh		# default 20 messages at 2 msg/sec
+#					ksh ./run_app_test.ksh -N    # default but with nanomsg lib
+#					ksh ./run_app_test.ksh -d 100 -n 10000 # send 10k messages with 100ms delay between
 #
 #	Date:		22 April 2019
 #	Author:		E. Scott Daniels
@@ -59,22 +59,61 @@
 	echo $? >/tmp/PID$$.rrc
 }
 
+# snarf the first v4 IP (not the loopback) that belongs to this box/container/guest
+function snarf_ip {
+	ip addr| sed -e '/inet /b c; d' -e ':c' -e '/127.0.0.1/d; s!/.*!!; s!^.* !!; q'
+}
+
+#	Drop a contrived route table in. This table should add a reference to our 
+#	local IP to an entry to ensure that the route table collector code in RMr
+#	is removing 'hairpin' loops. If RMr isn't removing the references to our
+#	hostname and IP address when it builds the endpoint lists, the sender will
+#	send messages to itself some of the time, causing the receiver to come up 
+#	short when comparing messages received with expected count and thus failing.
+#
+function set_rt {
+	typeset port=4560			# port the receiver listens on by default
+
+	cat <<endKat >app_test.rt
+		newrt | start
+			mse | 0 |  0 | localhost:$port,$my_ip:43086
+			mse | 1 | 10 | localhost:$port,${my_host//.*/}:43086
+			mse | 2 | 20 | localhost:$port
+			rte | 3 | localhost:$port
+			mse | 3 | 100 | localhost:$port	# special test to ensure that this does not affect previous entry
+			rte | 4 | localhost:$port
+			rte | 5 | localhost:$port
+			rte | 6 | localhost:$port
+			rte | 7 | localhost:$port
+			rte | 8 | localhost:$port
+			rte | 9 | localhost:$port
+			rte | 10 | localhost:$port
+			rte | 11 | localhost:$port
+			rte | 12 | localhost:$port
+			rte | 13 | localhost:$port
+		newrt | end
+
+head -3 app_test.rt
+
+endKat
+
+}
+
 # ---------------------------------------------------------
 
-if [[ ! -f local.rt ]]		# we need the real host name in the local.rt; build one from mask if not there
-then
-	hn=$(hostname)
-	sed "s!%%hostname%%!$hn!" rt.mask >local.rt
-fi
-
-nmsg=10						# total number of messages to be exchanged (-n value changes)
-delay=1000000				# microsec sleep between msg 1,000,000 == 1s
+nmsg=20						# total number of messages to be exchanged (-n value changes)
+							# need two sent to each receiver to ensure hairpin entries were removed (will fail if they were not)
+delay=500000				# microsec sleep between msg 1,000,000 == 1s
 nano_sender=0				# start nano version if set (-N)
 nano_receiver=0
 wait=1
 rebuild=0
 nopull=""					# -b sets so that build does not pull
 verbose=0
+use_installed=0
+my_ip=$(snarf_ip)			# get an ip to insert into the route table
+keep=0
+
 
 while [[ $1 == -* ]]
 do
@@ -82,6 +121,8 @@
 		-B)	rebuild=1;;						# build with pull first
 		-b)	rebuild=1; nopull="nopull";;	# buld without pull
 		-d)	delay=$2; shift;;
+		-k) keep=1;;
+		-i) use_installed=1;;
 		-N)	nano_sender=1
 			nano_receiver=1
 			;;
@@ -89,8 +130,12 @@
 		-v)	verbose=1;;
 
 		*)	echo "unrecognised option: $1"
-			echo "usage: $0 [-B] [-d micor-sec-delay] [-N] [-n num-msgs]"
+			echo "usage: $0 [-B] [-d micor-sec-delay] [-i] [-k] [-N] [-n num-msgs]"
 			echo "  -B forces a rebuild which will use .build"
+			echo "  -i causes the installd libraries (/usr/local) to be referenced; -B is ignored if supplied"
+			echo "  -k keeps the route table"
+			echo ""
+			echo "total number of messages must > 20 to correctly test hairpin loop removal"
 			exit 1
 			;;
 	esac
@@ -98,38 +143,54 @@
 	shift
 done
 
+if [[ ! -f app_test.rt ]]		# we need the real host name in the local.rt; build one from mask if not there
+then
+	my_host=$(hostname)
+	set_rt
+	if (( verbose ))
+	then
+		cat app_test.rt
+	fi
+fi
+
 if (( verbose ))
 then
 	echo "2" >.verbose
 	export RMR_VCTL_FILE=".verbose"
 fi
 
-if (( rebuild ))
-then
-	set -e
-	ksh ./rebuild.ksh $nopull | read build_path
-	set +e
-else
-	build_path=${BUILD_PATH:-"../../.build"}	# we prefer .build at the root level, but allow user option
 
-	if [[ ! -d $build_path ]]
+if (( use_installed ))			# point at installed library rather than testing the build
+then
+	export LD_LIBRARY_PATH=/usr/local/lib
+	export LIBRARY_PATH=$LD_LIBRARY_PATH
+else
+	if (( rebuild ))
 	then
-		echo "cannot find build in: $build_path"
-		echo "either create, and then build RMr, or set BUILD_PATH as an evironment var before running this"
-		exit 1
+		set -e
+		ksh ./rebuild.ksh $nopull | read build_path
+		set +e
+	else
+		build_path=${BUILD_PATH:-"../../.build"}	# we prefer .build at the root level, but allow user option
+	
+		if [[ ! -d $build_path ]]
+		then
+			echo "cannot find build in: $build_path"
+			echo "either create, and then build RMr, or set BUILD_PATH as an evironment var before running this"
+			exit 1
+		fi
 	fi
+
+	if [[ -d $build_path/lib64 ]]
+	then
+		export LD_LIBRARY_PATH=$build_path:$build_path/lib64
+	else
+		export LD_LIBRARY_PATH=$build_path:$build_path/lib
+	fi
+	export LIBRARY_PATH=$LD_LIBRARY_PATH
 fi
 
-if [[ -d $build_path/lib64 ]]
-then
-	export LD_LIBRARY_PATH=$build_path:$build_path/lib64
-else
-	export LD_LIBRARY_PATH=$build_path:$build_path/lib
-fi
-
-export LIBRARY_PATH=$LD_LIBRARY_PATH
-export RMR_SEED_RT=${RMR_SEED_RT:-./local.rt}		# allow easy testing with different rt
-
+export RMR_SEED_RT=${RMR_SEED_RT:-./app_test.rt}		# allow easy testing with different rt
 
 if [[ ! -f ./sender ]]
 then
@@ -141,7 +202,7 @@
 fi
 
 run_rcvr &
-sleep 2				# if sender starts faster than rcvr we can drop, so pause a bit
+sleep 2				# if sender starts faster than rcvr we can drop msgs, so pause a bit
 run_sender &
 
 wait
@@ -155,6 +216,11 @@
 	echo "[PASS] sender rc=$src  receiver rc=$rrc"
 fi
 
+if (( ! keep )) 
+then
+	rm app_test.rt
+fi
+
 rm /tmp/PID$$.*
 rm -f .verbose
 
diff --git a/test/app_test/run_lcall_test.ksh b/test/app_test/run_lcall_test.ksh
new file mode 100644
index 0000000..3d5d691
--- /dev/null
+++ b/test/app_test/run_lcall_test.ksh
@@ -0,0 +1,235 @@
+#!/usr/bin/env ksh
+# :vi ts=4 sw=4 noet :
+#==================================================================================
+#    Copyright (c) 2019 Nokia
+#    Copyright (c) 2018-2019 AT&T Intellectual Property.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#==================================================================================
+#
+
+# ---------------------------------------------------------------------------------
+#	Mnemonic:	run_lcall_test.ksh
+#	Abstract:	This is a simple script to set up and run the basic send/receive
+#				processes for some library validation on top of nano/nng. This
+#				particular test starts the latency caller and latency receiver
+#				processes such that they exchange messages and track the latency
+#				from the caller's perepective (both outbound to receiver, and then
+#				back.  Stats are presented at the end.   This test is NOT intended
+#				to be used as a CI validation test.
+#
+#				The sender and receiver processes all have a 20s timeout (+/-)
+#				which means that all messages must be sent, and acked within that
+#				time or the processes will give up and report failure.  Keep in mind
+#				that n messages with a delay value (-d) set will affect whether or
+#				not the messages can be sent in the 20s timeout period.  There is
+#				currently no provision to adjust the timeout other than by changing
+#				the C source.  The default (100 msgs with 500 micro-sec delay) works
+#				just fine for base testing.
+#
+#				Example command line:
+#					# run with 10 caller threads sending 10,000 meessages each, 
+#					# 5 receivers, and a 10 mu-s delay between each caller send
+#					ksh ./run_lcall_test.ksh -d 10 -n 10000 -r 5 -c 10
+#
+#	Date:		28 May 2019
+#	Author:		E. Scott Daniels
+# ---------------------------------------------------------------------------------
+
+
+# The sender and receivers are run asynch. Their exit statuses are captured in a
+# file in order for the 'main' to pick them up easily.
+#
+function run_sender {
+	if (( $nano_sender ))
+	then
+		echo "nanomsg not supportded"
+		exit 1
+	else
+		./lcaller ${nmsg:-10} ${delay:-500} ${cthreads:-3} 
+	fi
+	echo $? >/tmp/PID$$.src		# must communicate state back via file b/c asynch
+}
+
+# $1 is the instance so we can keep logs separate
+function run_rcvr {
+	typeset port
+
+	port=$(( 4460 + ${1:-0} ))
+	export RMR_RTG_SVC=$(( 9990 + $1 ))
+	if (( $nano_receiver ))
+	then
+		echo "nanomsg not supported"
+		exit 1
+	else
+		./lreceiver $(( ((nmsg * cthreads)/nrcvrs) + 10 )) $port
+	fi
+	echo $? >/tmp/PID$$.$1.rrc
+}
+
+#	Drop a contrived route table in such that the sender sends each message to n
+#	receivers.
+#
+function set_rt {
+	typeset port=4460
+	typeset groups="localhost:4460"
+	for (( i=1; i < ${1:-3}; i++ ))
+	do
+		groups="$groups,localhost:$((port+i))"
+	done
+
+	cat <<endKat >lcall.rt
+		newrt | start
+		mse |0 | 0 | $groups
+		mse |1 | 10 | $groups
+		mse |2 | 20 | $groups
+		rte |3 | $groups
+		rte |4 | $groups
+		rte |5 | $groups
+		rte |6 | $groups
+		rte |7 | $groups
+		rte |8 | $groups
+		rte |9 | $groups
+		rte |10 | $groups
+		rte |11 | $groups
+		newrt | end
+endKat
+}
+
+# ---------------------------------------------------------
+
+if [[ ! -f local.rt ]]		# we need the real host name in the local.rt; build one from mask if not there
+then
+	hn=$(hostname)
+	sed "s!%%hostname%%!$hn!" rt.mask >local.rt
+fi
+
+cthreads=3					# number of caller threads
+nmsg=100					# total number of messages to be exchanged (-n value changes)
+delay=500					# microsec sleep between msg 1,000,000 == 1s
+nano_sender=0				# start nano version if set (-N)
+nano_receiver=0
+wait=1
+rebuild=0
+verbose=0
+nrcvrs=3					# this is sane, but -r allows it to be set up
+use_installed=0
+
+while [[ $1 == -* ]]
+do
+	case $1 in
+		-c)	cthreads=$2; shift;;
+		-B)	rebuild=1;;
+		-d)	delay=$2; shift;;
+		-i)	use_installed=1;;
+		-N)	echo "abort: nanomsg does not support epoll and thus cannot be used for mt-caller"
+			echo ""
+			exit 1;
+			;;
+		-n)	nmsg=$2; shift;;
+		-r)	nrcvrs=$2; shift;;
+		-v)	verbose=1;;
+
+		*)	echo "unrecognised option: $1"
+			echo "usage: $0 [-B] [-c caller-threads] [-d micor-sec-delay] [-i] [-n num-msgs] [-r num-receivers]"
+			echo "  -B forces a rebuild which will use .build"
+			echo "  -i will use installed libraries (/usr/local) and cause -B to be ignored if supplied)"
+			exit 1
+			;;
+	esac
+
+	shift
+done
+
+if (( verbose ))
+then
+	echo "2" >.verbose
+	export RMR_VCTL_FILE=".verbose"
+fi
+
+if (( use_installed ))			# point at installed library
+then
+	export LD_LIBRARY_PATH=/usr/local/lib
+	export LIBRARY_PATH=$LD_LIBRARY_PATH
+else
+	if (( rebuild ))
+	then
+		build_path=../../.build		# if we rebuild we can insist that it is in .build :)
+		set -e
+		ksh ./rebuild.ksh
+		set +e
+	else
+		build_path=${BUILD_PATH:-"../../.build"}	# we prefer .build at the root level, but allow user option
+
+		if [[ ! -d $build_path ]]
+		then
+			echo "cannot find build in: $build_path"
+			echo "either create, and then build RMr, or set BUILD_PATH as an evironment var before running this"
+			exit 1
+		fi
+	fi
+
+	if [[ -d $build_path/lib64 ]]
+	then
+		export LD_LIBRARY_PATH=$build_path:$build_path/lib64
+	else
+		export LD_LIBRARY_PATH=$build_path:$build_path/lib
+	fi
+fi
+
+export LIBRARY_PATH=$LD_LIBRARY_PATH
+export RMR_SEED_RT=./lcall.rt
+
+set_rt $nrcvrs						# set up the rt for n receivers
+
+if (( rebuild )) || [[ ! -f ./lcaller ]]
+then
+	if ! make -B lcaller lreceiver >/dev/null 2>&1
+	then
+		echo "[FAIL] cannot find lcaller binary, and cannot make it.... humm?"
+		exit 1
+	fi
+fi
+
+for (( i=0; i < nrcvrs; i++ ))		# start the receivers with an instance number
+do
+	run_rcvr $i &
+done
+
+sleep 2				# let receivers init so we don't shoot at an empty target
+run_sender &
+
+wait
+
+
+for (( i=0; i < nrcvrs; i++ ))		# collect return codes
+do
+	head -1 /tmp/PID$$.$i.rrc | read x
+	(( rrc += x ))
+done
+
+head -1 /tmp/PID$$.src | read src
+
+if (( !! (src + rrc) ))
+then
+	echo "[FAIL] sender rc=$src  receiver rc=$rrc"
+else
+	echo "[PASS] sender rc=$src  receiver rc=$rrc"
+	rm -f lcall.rt
+fi
+
+rm /tmp/PID$$.*
+rm -f .verbose
+
+exit $(( !! (src + rrc) ))
+