fix(rtable): Prevent direct send hairpins
If the current node is listed as an endpoint for a message type
that this process would add to the route table a hair-pin loop
back to the node can happen. This isn't desireable and this
change will prevent the current node from being added as a
recipient endpoint to any route table entry.
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Change-Id: I5a1b2bdaad2eab499ae5a1c7430238c1da2f3256
Add latency call/receive test support
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Change-Id: I70be670d063adfdf6505431a4a2ce72e846df0a4
Actual mods to implement hairpin loop avodiance
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Change-Id: Ieead4fbf69ade58db1f37d1949cf0f03683c64de
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
diff --git a/test/app_test/Makefile b/test/app_test/Makefile
index 07748ee..67acb9f 100644
--- a/test/app_test/Makefile
+++ b/test/app_test/Makefile
@@ -55,6 +55,9 @@
mt_receiver: receiver.c
gcc -I $${C_INCLUDE_PATH:-.} -DMTC $< -g -o $@ -lrmr_nng -lnng -lpthread -lm
+lreceiver: lreceiver.c
+ gcc -I $${C_INCLUDE_PATH:-.} $< -g -o $@ -lrmr_nng -lnng -lpthread -lm
+
sender_nano: sender.c
gcc -I $${C_INCLUDE_PATH:-.} $< -g -o $@ -lrmr -lnanomsg -lpthread -lm
@@ -64,6 +67,10 @@
caller: caller.c
gcc -I $${C_INCLUDE_PATH:-.} $< -g -o $@ -lrmr_nng -lnng -lpthread -lm
+lcaller: lcaller.c
+ gcc -I $${C_INCLUDE_PATH:-.} $< -g -o $@ -lrmr_nng -lnng -lpthread -lm
+
+
# clean removes intermediates; nuke removes everything that can be built
.PHONY: clean nuke
diff --git a/test/app_test/lcaller.c b/test/app_test/lcaller.c
new file mode 100644
index 0000000..0d246d8
--- /dev/null
+++ b/test/app_test/lcaller.c
@@ -0,0 +1,458 @@
+// :vim ts=4 sw=4 noet:
+/*
+==================================================================================
+ Copyright (c) 2019 Nokia
+ Copyright (c) 2018-2019 AT&T Intellectual Property.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+==================================================================================
+*/
+
+/*
+ Mnemonic: lcaller.c
+ Abstract: This is a simple sender which will send a series of messages using
+ rmr_call(). Similar to caller.c the major difference is that
+ a timestamp is placed into the message and the receiver is expected
+ to add a timestamp before executing an rts call. We can then
+ compute the total round trip latency as well as the forward send
+ latency.
+
+ Overall, N threads are started each sending the desired number
+ of messages and expecting an 'ack' for each. Each ack is examined
+ to verify that the thread id placed into the message matches (meaning
+ that the ack was delivered by RMr to the correct thread's chute.
+
+ The message format is 'binary' defined by the lc_msg struct.
+
+ Parms: argv[1] == number of msgs to send (10)
+ argv[2] == delay (mu-seconds, 1000000 default)
+ argv[3] == number of threads (3)
+ argv[4] == listen port
+
+ Sender will send for at most 20 seconds, so if nmsgs and delay extend
+ beyond that period the total number of messages sent will be less
+ than n.
+
+ Date: 18 April 2019
+ Author: E. Scott Daniels
+*/
+
+#include <unistd.h>
+#include <errno.h>
+#include <string.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <sys/epoll.h>
+#include <time.h>
+#include <pthread.h>
+
+
+#include <rmr/rmr.h>
+
+#define TRACE_SIZE 40 // bytes in header to provide for trace junk
+#define SUCCESS (-1)
+
+/*
+ Thread data
+*/
+typedef struct tdata {
+ int id; // the id we'll pass to RMr mt-call function NOT the thread id
+ int n2send; // number of messages to send
+ int delay; // ms delay between messages
+ void* mrc; // RMr context
+ int state;
+ int* in_bins; // latency count bins
+ int* out_bins;
+ int nbins; // number of bins allocated
+ long long in_max;
+ long long out_max;
+ int out_oor; // out of range count
+ int in_oor;
+ int in_bcount; // total messages tracked in bins
+ int out_bcount; // total messages tracked in bins
+} tdata_t;
+
+
+/*
+ The message type placed into the payload.
+*/
+typedef struct lc_msg {
+ struct timespec out_ts; // time just before call executed
+ struct timespec turn_ts; // time at the receiver, on receipt
+ struct timespec in_ts; // time received back by the caller
+ int out_retries; // number of retries required to send
+ int turn_retries; // number of retries required to send
+} lc_msg_t;
+
+// --------------------------------------------------------------------------------
+
+
+static int sum( char* str ) {
+ int sum = 0;
+ int i = 0;
+
+ while( *str ) {
+ sum += *(str++) + i++;
+ }
+
+ return sum % 255;
+}
+
+static void print_stats( tdata_t* td, int out, int hist ) {
+ int sum; // sum of latencies
+ int csum = 0; // cutoff sum
+ int i95 = 0; // bin for the 95th count
+ int i99 = 0; // bin for the 99th count
+ int mean = -1;
+ int cutoff_95; // 95% of total messages
+ int cutoff_99; // 99% of total messages
+ int oor;
+ int max;
+ int j;
+
+ if( out ) {
+ cutoff_95 = .95 * (td->out_oor + td->out_bcount);
+ cutoff_99 = .95 * (td->out_oor + td->out_bcount);
+ oor = td->out_oor;
+ max = td->out_max;
+ } else {
+ cutoff_95 = .95 * (td->in_oor + td->in_bcount);
+ cutoff_99 = .95 * (td->in_oor + td->in_bcount);
+ oor = td->in_oor;
+ max = td->in_max;
+ }
+
+ sum = 0;
+ for( j = 0; j < td->nbins; j++ ) {
+ if( csum < cutoff_95 ) {
+ i95++;
+ }
+ if( csum < cutoff_99 ) {
+ i99++;
+ }
+
+ if( out ) {
+ csum += td->out_bins[j];
+ sum += td->out_bins[j] * j;
+ } else {
+ csum += td->in_bins[j];
+ sum += td->in_bins[j] * j;
+ }
+ }
+
+ if( out ) {
+ if( td->out_bcount ) {
+ mean = sum/(td->out_bcount);
+ }
+ } else {
+ if( td->in_bcount ) {
+ mean = sum/(td->in_bcount);
+ }
+ }
+
+ if( hist ) {
+ for( j = 0; j < td->nbins; j++ ) {
+ fprintf( stderr, "%3d %d\n", j, out ? td->out_bins[j] : td->in_bins[j] );
+ }
+ }
+
+ fprintf( stderr, "%s: oor=%d max=%.2fms mean=%.2fms 95th=%.2fms 99th=%.2f\n",
+ out ? "out" : " in", oor, (double)max/1000000.0, (double)mean/100.0, (double) i95/100.0, i99/100.0 );
+}
+
+/*
+ Given a message, compute the in/out and round trip latencies.
+*/
+static void compute_latency( tdata_t* td, lc_msg_t* lcm ) {
+ long long out;
+ long long turn;
+ long long in;
+ double rtl; // round trip latency
+ double outl; // caller to receiver latency (out)
+ double inl; // receiver to caller latency (in)
+ int bin;
+
+ if( lcm == NULL || td == NULL ) {
+ return;
+ }
+
+ out = (lcm->out_ts.tv_sec * 1000000000) + lcm->out_ts.tv_nsec;
+ in = (lcm->in_ts.tv_sec * 1000000000) + lcm->in_ts.tv_nsec;
+ turn = (lcm->turn_ts.tv_sec * 1000000000) + lcm->turn_ts.tv_nsec;
+
+ if( in - turn > td->in_max ) {
+ td->in_max = in - turn;
+ }
+ if( turn - out > td->out_max ) {
+ td->out_max = turn-out;
+ }
+
+ bin = (turn-out) / 10000; // 100ths of ms
+
+#ifdef PRINT
+ outl = ((double) turn - out) / 1000000.0; // convert to ms
+ inl = ((double) in - turn) / 1000000.0;
+ rtl = ((double) in - out) / 1000000.0;
+
+ fprintf( stderr, "outl = %5.3fms inl = %5.3fms rtl = %5.3fms bin=%d\n", outl, inl, rtl, bin );
+#else
+
+ bin = (turn - out) / 10000; // 100ths of ms
+ if( bin < td->nbins ) {
+ td->out_bins[bin]++;
+ td->out_bcount++;
+ } else {
+ td->out_oor++;
+ }
+
+ bin = (in - turn) / 10000; // 100ths of ms
+ if( bin < td->nbins ) {
+ td->in_bins[bin]++;
+ td->in_bcount++;
+ } else {
+ td->in_oor++;
+ }
+
+#endif
+}
+
+/*
+ Executed as a thread, this puppy will generate calls to ensure that we get the
+ response back to the right thread, that we can handle threads, etc.
+*/
+static void* mk_calls( void* data ) {
+ lc_msg_t* lcm; // pointer at the payload as a struct
+ tdata_t* control;
+ rmr_mbuf_t* sbuf; // send buffer
+ int count = 0;
+ int ack_count = 0;
+ int rt_count = 0; // number of messages requiring a spin retry
+ int ok_msg = 0; // received messages that were sent by us
+ int bad_msg = 0; // received messages that were sent by a different thread
+ int drops = 0;
+ int fail_count = 0; // # of failure sends after first successful send
+ int successful = 0; // set to true after we have a successful send
+ char xbuf[1024]; // build transaction string here
+ int xaction_id = 1;
+ char* tok;
+ int state = 0;
+
+ if( (control = (tdata_t *) data) == NULL ) {
+ fprintf( stderr, "thread data was nil; bailing out\n" );
+ }
+ //fprintf( stderr, "<THRD> thread started with parallel call id=%d sending=%d delay=%d\n", control->id, control->n2send, control->delay );
+
+ sbuf = rmr_alloc_msg( control->mrc, 512 ); // alloc first send buffer; subsequent buffers allcoated on send
+
+ usleep( rand() % 777 ); // stagger starts a bit so that they all don't pile up on the first connections
+
+ while( count < control->n2send ) { // we send n messages after the first message is successful
+ lcm = (lc_msg_t *) sbuf->payload;
+
+ snprintf( xbuf, 200, "%31d", xaction_id );
+ xaction_id += control->id;
+ rmr_bytes2xact( sbuf, xbuf, 32 );
+
+ sbuf->mtype = 5; // all go with the same type
+ sbuf->len = sizeof( *lcm );
+ sbuf->state = RMR_OK;
+ lcm->out_retries = 0;
+ lcm->turn_retries = 0;
+ clock_gettime( CLOCK_REALTIME, &lcm->out_ts ); // mark time out
+ sbuf = rmr_mt_call( control->mrc, sbuf, control->id, 1000 ); // send it (send returns an empty payload on success, or the original payload on fail/retry)
+
+ if( sbuf && sbuf->state == RMR_ERR_RETRY ) { // number of times we had to spin to send
+ rt_count++;
+ }
+ while( sbuf != NULL && sbuf->state == RMR_ERR_RETRY ) { // send blocked; keep trying
+ lcm->out_retries++;
+ sbuf = rmr_mt_call( control->mrc, sbuf, control->id, 100 ); // call and wait up to 100ms for a response
+ }
+
+ count++;
+ if( sbuf != NULL ) {
+ switch( sbuf->state ) {
+ case RMR_OK: // we should have a buffer back from the sender here
+ lcm = (lc_msg_t *) sbuf->payload;
+ clock_gettime( CLOCK_REALTIME, &lcm->in_ts ); // mark time back
+ successful = 1;
+ compute_latency( control, lcm );
+
+ ack_count++;
+ //fprintf( stderr, "%d have received %d\n", control->id, count );
+ break;
+
+ default:
+ fprintf( stderr, "unexpected error: tid=%d rmr-state=%d ernro=%d\n", control->id, sbuf->state, errno );
+ sbuf = rmr_alloc_msg( control->mrc, 512 ); // allocate a sendable buffer
+ if( successful ) {
+ fail_count++; // count failures after first successful message
+ } else {
+ // some error (not connected likely), don't count this
+ sleep( 1 );
+ }
+ break;
+ }
+ } else {
+ //fprintf( stderr, "<THRD> tid=%-2d call finished, no sbuf\n", control->id );
+ sbuf = rmr_alloc_msg( control->mrc, 512 ); // loop expects an subf
+ drops++;
+ }
+
+ if( control->delay > 0 ) {
+ usleep( control->delay );
+ }
+ }
+
+ control->state = SUCCESS;
+ fprintf( stderr, "<THRD> %d finished sent %d, received %d messages\n", control->id, count, ack_count );
+ return NULL;
+}
+
+int main( int argc, char** argv ) {
+ void* mrc; // msg router context
+ rmr_mbuf_t* rbuf = NULL; // received on 'normal' flow
+ struct epoll_event events[1]; // list of events to give to epoll
+ struct epoll_event epe; // event definition for event to listen to
+ int ep_fd = -1; // epoll's file des (given to epoll_wait)
+ int rcv_fd; // file des that NNG tickles -- give this to epoll to listen on
+ int nready; // number of events ready for receive
+ char* listen_port = "43086";
+ long timeout = 0;
+ int delay = 100000; // usec between send attempts
+ int nmsgs = 10; // number of messages to send
+ int nthreads = 3;
+ int cutoff;
+ int sum;
+ tdata_t* cvs; // vector of control blocks
+ int i;
+ int j;
+ pthread_t* pt_info; // thread stuff
+ int failures = 0;
+ int pings = 0; // number of messages received on normal channel
+
+ if( argc > 1 ) {
+ nmsgs = atoi( argv[1] );
+ }
+ if( argc > 2 ) {
+ delay = atoi( argv[2] );
+ }
+ if( argc > 3 ) {
+ nthreads = atoi( argv[3] );
+ }
+ if( argc > 4 ) {
+ listen_port = argv[4];
+ }
+
+ fprintf( stderr, "<CALL> listen port: %s; sending %d messages; delay=%d\n", listen_port, nmsgs, delay );
+
+ if( (mrc = rmr_init( listen_port, 1400, RMRFL_MTCALL )) == NULL ) { // initialise with multi-threaded call enabled
+ fprintf( stderr, "<CALL> unable to initialise RMr\n" );
+ exit( 1 );
+ }
+
+ //rmr_init_trace( mrc, TRACE_SIZE );
+
+ if( (rcv_fd = rmr_get_rcvfd( mrc )) >= 0 ) { // epoll only available from NNG -- skip receive later if not NNG
+ if( rcv_fd < 0 ) {
+ fprintf( stderr, "<CALL> unable to set up polling fd\n" );
+ exit( 1 );
+ }
+ if( (ep_fd = epoll_create1( 0 )) < 0 ) {
+ fprintf( stderr, "<CALL> [FAIL] unable to create epoll fd: %d\n", errno );
+ exit( 1 );
+ }
+ epe.events = EPOLLIN;
+ epe.data.fd = rcv_fd;
+
+ if( epoll_ctl( ep_fd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 ) {
+ fprintf( stderr, "<CALL> [FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
+ exit( 1 );
+ }
+ } else {
+ rmr_set_rtimeout( mrc, 0 ); // for nano we must set the receive timeout to 0; non-blocking receive
+ }
+
+
+ cvs = malloc( sizeof( tdata_t ) * nthreads );
+ pt_info = malloc( sizeof( pthread_t ) * nthreads );
+ if( cvs == NULL ) {
+ fprintf( stderr, "<CALL> unable to allocate control vector\n" );
+ exit( 1 );
+ }
+
+
+ timeout = time( NULL ) + 20; // give rmr 20s to find the route table (shouldn't need that much)
+ while( ! rmr_ready( mrc ) ) { // must have a route table before we can send; wait til RMr says it has one
+ fprintf( stderr, "<CALL> waiting for rmr to show ready\n" );
+ sleep( 1 );
+
+ if( time( NULL ) > timeout ) {
+ fprintf( stderr, "<CALL> giving up\n" );
+ exit( 1 );
+ }
+ }
+ fprintf( stderr, "<CALL> rmr is ready; starting threads\n" );
+
+ for( i = 0; i < nthreads; i++ ) {
+ cvs[i].mrc = mrc;
+ cvs[i].id = i + 2; // we pass this as the call-id to rmr, so must be >1
+ cvs[i].delay = delay;
+ cvs[i].n2send = nmsgs;
+ cvs[i].state = 1;
+
+ cvs[i].nbins = 100;
+ cvs[i].out_bins = (int *) malloc( sizeof( int ) * cvs[i].nbins );
+ cvs[i].in_bins = (int *) malloc( sizeof( int ) * cvs[i].nbins );
+ memset( cvs[i].out_bins, 0, sizeof( int ) * cvs[i].nbins );
+ memset( cvs[i].in_bins, 0, sizeof( int ) * cvs[i].nbins );
+
+ pthread_create( &pt_info[i], NULL, mk_calls, &cvs[i] ); // kick a thread
+ }
+
+ timeout = time( NULL ) + 20;
+ i = 0;
+ while( nthreads > 0 ) {
+ if( cvs[i].state < 1 ) { // states 0 or below indicate done. 0 == failure, -n == success
+ //print_stats( &cvs[i], 1, i == 0 );
+ print_stats( &cvs[i], 1, 0 );
+ print_stats( &cvs[i], 0, 0 );
+
+ nthreads--;
+ if( cvs[i].state == 0 ) {
+ failures++;
+ }
+ i++;
+ } else {
+ // sleep( 1 );
+ rbuf = rmr_torcv_msg( mrc, rbuf, 1000 );
+ if( rbuf != NULL && rbuf->state != RMR_ERR_RETRY ) {
+ pings++;
+ rmr_free_msg( rbuf );
+ rbuf = NULL;
+ }
+ }
+ if( time( NULL ) > timeout ) {
+ failures += nthreads;
+ fprintf( stderr, "<CALL> timeout waiting for threads to finish; %d were not finished\n", nthreads );
+ break;
+ }
+ }
+
+ fprintf( stderr, "<CALL> [%s] failing threads=%d pings reeived=%d\n", failures == 0 ? "PASS" : "FAIL", failures, pings );
+ sleep( 2 );
+ rmr_close( mrc );
+
+ return failures > 0;
+}
+
diff --git a/test/app_test/lreceiver.c b/test/app_test/lreceiver.c
new file mode 100644
index 0000000..161abc7
--- /dev/null
+++ b/test/app_test/lreceiver.c
@@ -0,0 +1,178 @@
+// :vim ts=4 sw=4 noet:
+/*
+==================================================================================
+ Copyright (c) 2019 Nokia
+ Copyright (c) 2018-2019 AT&T Intellectual Property.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+==================================================================================
+*/
+
+/*
+ Mnemonic: rmr_rcvr.c
+ Abstract: This is a very simple receiver that listens for messages and
+ returns each to the sender after adding a timestamp to the
+ payload. The payload is expected to be lc_msg_t (see lcaller.c)
+ and this will update the 'turn' timestamp on receipt.
+
+ Define these environment variables to have some control:
+ RMR_SEED_RT -- path to the static routing table
+ RMR_RTG_SVC -- port to listen for RTG connections
+
+ Date: 18 April 2019
+ Author: E. Scott Daniels
+*/
+
+#include <unistd.h>
+#include <errno.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <time.h>
+#include <string.h>
+
+#include <rmr/rmr.h>
+
+/*
+ The message type placed into the payload.
+*/
+typedef struct lc_msg {
+ struct timespec out_ts; // time just before call executed
+ struct timespec turn_ts; // time at the receiver, on receipt
+ struct timespec in_ts; // time received back by the caller
+ int out_retries; // number of retries required to send
+ int turn_retries; // number of retries required to send
+} lc_msg_t;
+
+// ----------------------------------------------------------------------------------
+
+static int sum( char* str ) {
+ int sum = 0;
+ int i = 0;
+
+ while( *str ) {
+ sum += *(str++) + i++;
+ }
+
+ return sum % 255;
+}
+
+/*
+ Split the message at the first sep and return a pointer to the first
+ character after.
+*/
+static char* split( char* str, char sep ) {
+ char* s;
+
+ s = strchr( str, sep );
+
+ if( s ) {
+ return s+1;
+ }
+
+ fprintf( stderr, "<RCVR> no pipe in message: (%s)\n", str );
+ return NULL;
+}
+
+int main( int argc, char** argv ) {
+ void* mrc; // msg router context
+ lc_msg_t* lmc; // latency message type from caller
+ rmr_mbuf_t* msg = NULL; // message received
+ int i;
+ int errors = 0;
+ char* listen_port = "4560";
+ long count = 0; // total received
+ long timeout = 0;
+ char* data;
+ int nmsgs = 10; // number of messages to stop after (argv[1] overrides)
+ int rt_count = 0; // retry count
+ time_t now;
+ int active;
+
+ data = getenv( "RMR_RTG_SVC" );
+ if( data == NULL ) {
+ setenv( "RMR_RTG_SVC", "19289", 1 ); // set one that won't collide with the sender if on same host
+ }
+
+ if( argc > 1 ) {
+ nmsgs = atoi( argv[1] );
+ }
+ if( argc > 2 ) {
+ listen_port = argv[2];
+ }
+
+
+ fprintf( stderr, "<RCVR> listening on port: %s for a max of %d messages\n", listen_port, nmsgs );
+
+ mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_MTCALL ); // start your engines!
+ //mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, 0 ); // start your engines!
+ if( mrc == NULL ) {
+ fprintf( stderr, "<RCVR> ABORT: unable to initialise RMr\n" );
+ exit( 1 );
+ }
+
+ timeout = time( NULL ) + 20;
+ while( ! rmr_ready( mrc ) ) { // wait for RMr to load a route table
+ fprintf( stderr, "<RCVR> waiting for RMr to show ready\n" );
+ sleep( 1 );
+
+ if( time( NULL ) > timeout ) {
+ fprintf( stderr, "<RCVR> giving up\n" );
+ exit( 1 );
+ }
+ }
+ fprintf( stderr, "<RCVR> rmr now shows ready, listening begins\n" );
+
+ timeout = time( NULL ) + 2; // once we start, we assume if we go 2s w/o a message that we're done
+ //while( count < nmsgs ) {
+ while( 1 ) {
+ active = 0;
+ msg = rmr_torcv_msg( mrc, msg, 1000 ); // pop every second or so to timeout if needed
+
+ if( msg ) {
+ active = 1;
+ if( msg->state == RMR_OK ) {
+ lmc = (lc_msg_t *) msg->payload;
+ clock_gettime( CLOCK_REALTIME, &lmc->turn_ts ); // mark time that we received it.
+ count++;
+
+ msg = rmr_rts_msg( mrc, msg );
+ rt_count = 1000;
+ while( rt_count > 0 && msg != NULL && msg->state == RMR_ERR_RETRY ) { // to work right in nano we need this :(
+ lmc->turn_retries++;
+ if( count < 1 ) { // 1st msg, so we need to connect, and we'll wait for that
+ sleep( 1 );
+ }
+ rt_count--;
+ msg = rmr_rts_msg( mrc, msg ); // we don't try to resend if this returns retry
+ }
+ }
+ }
+
+ now = time( NULL );
+ if( now > timeout ) {
+ break;
+ }
+
+ if( active ) {
+ timeout = now + 2;
+ }
+ }
+
+ fprintf( stderr, "<RCVR> %ld is finished got %ld messages\n", (long) getpid(), count );
+
+
+ sleep( 3 );
+ rmr_close( mrc );
+ return 0;
+}
+
diff --git a/test/app_test/run_all.ksh b/test/app_test/run_all.ksh
index ffcd3e1..7271cfd 100644
--- a/test/app_test/run_all.ksh
+++ b/test/app_test/run_all.ksh
@@ -15,7 +15,7 @@
done
set -e
-ksh run_app_test.ksh $build
+ksh run_app_test.ksh -v -i $build
ksh run_multi_test.ksh
ksh run_rr_test.ksh
ksh run_rts_test.ksh -s 20
diff --git a/test/app_test/run_app_test.ksh b/test/app_test/run_app_test.ksh
index ff0673f..964a080 100644
--- a/test/app_test/run_app_test.ksh
+++ b/test/app_test/run_app_test.ksh
@@ -27,9 +27,9 @@
# recevier then run the basic test.
#
# Example command line:
-# ksh ./run # default 10 messages at 1 msg/sec
-# ksh ./run -N # default but with nanomsg lib
-# ksh ./run -d 100 -n 10000 # send 10k messages with 100ms delay between
+# ksh ./run_app_test.ksh # default 20 messages at 2 msg/sec
+# ksh ./run_app_test.ksh -N # default but with nanomsg lib
+# ksh ./run_app_test.ksh -d 100 -n 10000 # send 10k messages with 100ms delay between
#
# Date: 22 April 2019
# Author: E. Scott Daniels
@@ -59,22 +59,61 @@
echo $? >/tmp/PID$$.rrc
}
+# snarf the first v4 IP (not the loopback) that belongs to this box/container/guest
+function snarf_ip {
+ ip addr| sed -e '/inet /b c; d' -e ':c' -e '/127.0.0.1/d; s!/.*!!; s!^.* !!; q'
+}
+
+# Drop a contrived route table in. This table should add a reference to our
+# local IP to an entry to ensure that the route table collector code in RMr
+# is removing 'hairpin' loops. If RMr isn't removing the references to our
+# hostname and IP address when it builds the endpoint lists, the sender will
+# send messages to itself some of the time, causing the receiver to come up
+# short when comparing messages received with expected count and thus failing.
+#
+function set_rt {
+ typeset port=4560 # port the receiver listens on by default
+
+ cat <<endKat >app_test.rt
+ newrt | start
+ mse | 0 | 0 | localhost:$port,$my_ip:43086
+ mse | 1 | 10 | localhost:$port,${my_host//.*/}:43086
+ mse | 2 | 20 | localhost:$port
+ rte | 3 | localhost:$port
+ mse | 3 | 100 | localhost:$port # special test to ensure that this does not affect previous entry
+ rte | 4 | localhost:$port
+ rte | 5 | localhost:$port
+ rte | 6 | localhost:$port
+ rte | 7 | localhost:$port
+ rte | 8 | localhost:$port
+ rte | 9 | localhost:$port
+ rte | 10 | localhost:$port
+ rte | 11 | localhost:$port
+ rte | 12 | localhost:$port
+ rte | 13 | localhost:$port
+ newrt | end
+
+head -3 app_test.rt
+
+endKat
+
+}
+
# ---------------------------------------------------------
-if [[ ! -f local.rt ]] # we need the real host name in the local.rt; build one from mask if not there
-then
- hn=$(hostname)
- sed "s!%%hostname%%!$hn!" rt.mask >local.rt
-fi
-
-nmsg=10 # total number of messages to be exchanged (-n value changes)
-delay=1000000 # microsec sleep between msg 1,000,000 == 1s
+nmsg=20 # total number of messages to be exchanged (-n value changes)
+ # need two sent to each receiver to ensure hairpin entries were removed (will fail if they were not)
+delay=500000 # microsec sleep between msg 1,000,000 == 1s
nano_sender=0 # start nano version if set (-N)
nano_receiver=0
wait=1
rebuild=0
nopull="" # -b sets so that build does not pull
verbose=0
+use_installed=0
+my_ip=$(snarf_ip) # get an ip to insert into the route table
+keep=0
+
while [[ $1 == -* ]]
do
@@ -82,6 +121,8 @@
-B) rebuild=1;; # build with pull first
-b) rebuild=1; nopull="nopull";; # buld without pull
-d) delay=$2; shift;;
+ -k) keep=1;;
+ -i) use_installed=1;;
-N) nano_sender=1
nano_receiver=1
;;
@@ -89,8 +130,12 @@
-v) verbose=1;;
*) echo "unrecognised option: $1"
- echo "usage: $0 [-B] [-d micor-sec-delay] [-N] [-n num-msgs]"
+ echo "usage: $0 [-B] [-d micor-sec-delay] [-i] [-k] [-N] [-n num-msgs]"
echo " -B forces a rebuild which will use .build"
+ echo " -i causes the installd libraries (/usr/local) to be referenced; -B is ignored if supplied"
+ echo " -k keeps the route table"
+ echo ""
+ echo "total number of messages must > 20 to correctly test hairpin loop removal"
exit 1
;;
esac
@@ -98,38 +143,54 @@
shift
done
+if [[ ! -f app_test.rt ]] # we need the real host name in the local.rt; build one from mask if not there
+then
+ my_host=$(hostname)
+ set_rt
+ if (( verbose ))
+ then
+ cat app_test.rt
+ fi
+fi
+
if (( verbose ))
then
echo "2" >.verbose
export RMR_VCTL_FILE=".verbose"
fi
-if (( rebuild ))
-then
- set -e
- ksh ./rebuild.ksh $nopull | read build_path
- set +e
-else
- build_path=${BUILD_PATH:-"../../.build"} # we prefer .build at the root level, but allow user option
- if [[ ! -d $build_path ]]
+if (( use_installed )) # point at installed library rather than testing the build
+then
+ export LD_LIBRARY_PATH=/usr/local/lib
+ export LIBRARY_PATH=$LD_LIBRARY_PATH
+else
+ if (( rebuild ))
then
- echo "cannot find build in: $build_path"
- echo "either create, and then build RMr, or set BUILD_PATH as an evironment var before running this"
- exit 1
+ set -e
+ ksh ./rebuild.ksh $nopull | read build_path
+ set +e
+ else
+ build_path=${BUILD_PATH:-"../../.build"} # we prefer .build at the root level, but allow user option
+
+ if [[ ! -d $build_path ]]
+ then
+ echo "cannot find build in: $build_path"
+ echo "either create, and then build RMr, or set BUILD_PATH as an evironment var before running this"
+ exit 1
+ fi
fi
+
+ if [[ -d $build_path/lib64 ]]
+ then
+ export LD_LIBRARY_PATH=$build_path:$build_path/lib64
+ else
+ export LD_LIBRARY_PATH=$build_path:$build_path/lib
+ fi
+ export LIBRARY_PATH=$LD_LIBRARY_PATH
fi
-if [[ -d $build_path/lib64 ]]
-then
- export LD_LIBRARY_PATH=$build_path:$build_path/lib64
-else
- export LD_LIBRARY_PATH=$build_path:$build_path/lib
-fi
-
-export LIBRARY_PATH=$LD_LIBRARY_PATH
-export RMR_SEED_RT=${RMR_SEED_RT:-./local.rt} # allow easy testing with different rt
-
+export RMR_SEED_RT=${RMR_SEED_RT:-./app_test.rt} # allow easy testing with different rt
if [[ ! -f ./sender ]]
then
@@ -141,7 +202,7 @@
fi
run_rcvr &
-sleep 2 # if sender starts faster than rcvr we can drop, so pause a bit
+sleep 2 # if sender starts faster than rcvr we can drop msgs, so pause a bit
run_sender &
wait
@@ -155,6 +216,11 @@
echo "[PASS] sender rc=$src receiver rc=$rrc"
fi
+if (( ! keep ))
+then
+ rm app_test.rt
+fi
+
rm /tmp/PID$$.*
rm -f .verbose
diff --git a/test/app_test/run_lcall_test.ksh b/test/app_test/run_lcall_test.ksh
new file mode 100644
index 0000000..3d5d691
--- /dev/null
+++ b/test/app_test/run_lcall_test.ksh
@@ -0,0 +1,235 @@
+#!/usr/bin/env ksh
+# :vi ts=4 sw=4 noet :
+#==================================================================================
+# Copyright (c) 2019 Nokia
+# Copyright (c) 2018-2019 AT&T Intellectual Property.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#==================================================================================
+#
+
+# ---------------------------------------------------------------------------------
+# Mnemonic: run_lcall_test.ksh
+# Abstract: This is a simple script to set up and run the basic send/receive
+# processes for some library validation on top of nano/nng. This
+# particular test starts the latency caller and latency receiver
+# processes such that they exchange messages and track the latency
+# from the caller's perepective (both outbound to receiver, and then
+# back. Stats are presented at the end. This test is NOT intended
+# to be used as a CI validation test.
+#
+# The sender and receiver processes all have a 20s timeout (+/-)
+# which means that all messages must be sent, and acked within that
+# time or the processes will give up and report failure. Keep in mind
+# that n messages with a delay value (-d) set will affect whether or
+# not the messages can be sent in the 20s timeout period. There is
+# currently no provision to adjust the timeout other than by changing
+# the C source. The default (100 msgs with 500 micro-sec delay) works
+# just fine for base testing.
+#
+# Example command line:
+# # run with 10 caller threads sending 10,000 meessages each,
+# # 5 receivers, and a 10 mu-s delay between each caller send
+# ksh ./run_lcall_test.ksh -d 10 -n 10000 -r 5 -c 10
+#
+# Date: 28 May 2019
+# Author: E. Scott Daniels
+# ---------------------------------------------------------------------------------
+
+
+# The sender and receivers are run asynch. Their exit statuses are captured in a
+# file in order for the 'main' to pick them up easily.
+#
+function run_sender {
+ if (( $nano_sender ))
+ then
+ echo "nanomsg not supportded"
+ exit 1
+ else
+ ./lcaller ${nmsg:-10} ${delay:-500} ${cthreads:-3}
+ fi
+ echo $? >/tmp/PID$$.src # must communicate state back via file b/c asynch
+}
+
+# $1 is the instance so we can keep logs separate
+function run_rcvr {
+ typeset port
+
+ port=$(( 4460 + ${1:-0} ))
+ export RMR_RTG_SVC=$(( 9990 + $1 ))
+ if (( $nano_receiver ))
+ then
+ echo "nanomsg not supported"
+ exit 1
+ else
+ ./lreceiver $(( ((nmsg * cthreads)/nrcvrs) + 10 )) $port
+ fi
+ echo $? >/tmp/PID$$.$1.rrc
+}
+
+# Drop a contrived route table in such that the sender sends each message to n
+# receivers.
+#
+function set_rt {
+ typeset port=4460
+ typeset groups="localhost:4460"
+ for (( i=1; i < ${1:-3}; i++ ))
+ do
+ groups="$groups,localhost:$((port+i))"
+ done
+
+ cat <<endKat >lcall.rt
+ newrt | start
+ mse |0 | 0 | $groups
+ mse |1 | 10 | $groups
+ mse |2 | 20 | $groups
+ rte |3 | $groups
+ rte |4 | $groups
+ rte |5 | $groups
+ rte |6 | $groups
+ rte |7 | $groups
+ rte |8 | $groups
+ rte |9 | $groups
+ rte |10 | $groups
+ rte |11 | $groups
+ newrt | end
+endKat
+}
+
+# ---------------------------------------------------------
+
+if [[ ! -f local.rt ]] # we need the real host name in the local.rt; build one from mask if not there
+then
+ hn=$(hostname)
+ sed "s!%%hostname%%!$hn!" rt.mask >local.rt
+fi
+
+cthreads=3 # number of caller threads
+nmsg=100 # total number of messages to be exchanged (-n value changes)
+delay=500 # microsec sleep between msg 1,000,000 == 1s
+nano_sender=0 # start nano version if set (-N)
+nano_receiver=0
+wait=1
+rebuild=0
+verbose=0
+nrcvrs=3 # this is sane, but -r allows it to be set up
+use_installed=0
+
+while [[ $1 == -* ]]
+do
+ case $1 in
+ -c) cthreads=$2; shift;;
+ -B) rebuild=1;;
+ -d) delay=$2; shift;;
+ -i) use_installed=1;;
+ -N) echo "abort: nanomsg does not support epoll and thus cannot be used for mt-caller"
+ echo ""
+ exit 1;
+ ;;
+ -n) nmsg=$2; shift;;
+ -r) nrcvrs=$2; shift;;
+ -v) verbose=1;;
+
+ *) echo "unrecognised option: $1"
+ echo "usage: $0 [-B] [-c caller-threads] [-d micor-sec-delay] [-i] [-n num-msgs] [-r num-receivers]"
+ echo " -B forces a rebuild which will use .build"
+ echo " -i will use installed libraries (/usr/local) and cause -B to be ignored if supplied)"
+ exit 1
+ ;;
+ esac
+
+ shift
+done
+
+if (( verbose ))
+then
+ echo "2" >.verbose
+ export RMR_VCTL_FILE=".verbose"
+fi
+
+if (( use_installed )) # point at installed library
+then
+ export LD_LIBRARY_PATH=/usr/local/lib
+ export LIBRARY_PATH=$LD_LIBRARY_PATH
+else
+ if (( rebuild ))
+ then
+ build_path=../../.build # if we rebuild we can insist that it is in .build :)
+ set -e
+ ksh ./rebuild.ksh
+ set +e
+ else
+ build_path=${BUILD_PATH:-"../../.build"} # we prefer .build at the root level, but allow user option
+
+ if [[ ! -d $build_path ]]
+ then
+ echo "cannot find build in: $build_path"
+ echo "either create, and then build RMr, or set BUILD_PATH as an evironment var before running this"
+ exit 1
+ fi
+ fi
+
+ if [[ -d $build_path/lib64 ]]
+ then
+ export LD_LIBRARY_PATH=$build_path:$build_path/lib64
+ else
+ export LD_LIBRARY_PATH=$build_path:$build_path/lib
+ fi
+fi
+
+export LIBRARY_PATH=$LD_LIBRARY_PATH
+export RMR_SEED_RT=./lcall.rt
+
+set_rt $nrcvrs # set up the rt for n receivers
+
+if (( rebuild )) || [[ ! -f ./lcaller ]]
+then
+ if ! make -B lcaller lreceiver >/dev/null 2>&1
+ then
+ echo "[FAIL] cannot find lcaller binary, and cannot make it.... humm?"
+ exit 1
+ fi
+fi
+
+for (( i=0; i < nrcvrs; i++ )) # start the receivers with an instance number
+do
+ run_rcvr $i &
+done
+
+sleep 2 # let receivers init so we don't shoot at an empty target
+run_sender &
+
+wait
+
+
+for (( i=0; i < nrcvrs; i++ )) # collect return codes
+do
+ head -1 /tmp/PID$$.$i.rrc | read x
+ (( rrc += x ))
+done
+
+head -1 /tmp/PID$$.src | read src
+
+if (( !! (src + rrc) ))
+then
+ echo "[FAIL] sender rc=$src receiver rc=$rrc"
+else
+ echo "[PASS] sender rc=$src receiver rc=$rrc"
+ rm -f lcall.rt
+fi
+
+rm /tmp/PID$$.*
+rm -f .verbose
+
+exit $(( !! (src + rrc) ))
+