feat(routing): Support session based routing
The session id field in a message buffer is now used
directly for routing.
Change-Id: I3634c97588b11172db964b2d06c96c317d8b8ae3
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Routing table entry changes to pick up subid
Change-Id: If08dc21aae4acaab350ba75a8854ad2f24007b03
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Fix unit test for rmr_call
It was not properly setting message type and now that RMr
ensures that invalid message type is set by default on a newly
created message this was causing unit test to fail.
Change-Id: I50f08d1038ea7fca2a070cdd949657bfbc25f3fd
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Add application level tests
Added round robin and multi group application level
test scripts.
Change-Id: Ic6aebaf3bc1edb763decc7fd0aebb09df116f20c
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
NNG based sub-id support added
Change-Id: I0d36b55bb90a315ba94c9476df88e2c7eac6c383
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Correct bug in app test script
Change-Id: I5b4a9f32aa1bc2907f320b8ad4628e0948062904
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Nano sub-id changes and unit test updates
Change-Id: Ia69f2fb33de3bbee2f33f9a4c5def779c872e52c
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Change nil-sub_id key to high order key
If there is no sub-id, then the key is based only on the
message type, but to allow for a sub-id == 0 the key
when there is no subscription id must be set to
0xffffffff00000000 + msg type.
New version for deb is 1.0.19
Change-Id: I55f89d368466a0137fdea99410c76ba72e1923ab
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 2782147..365d370 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -24,7 +24,7 @@
set( major_version "1" ) # should be automatically populated from git tag later, but until CI process sets a tag we use this
set( minor_version "0" )
-set( patch_level "18" )
+set( patch_level "19" )
set( install_root "${CMAKE_INSTALL_PREFIX}" )
set( install_lib "lib" )
diff --git a/src/common/include/rmr.h b/src/common/include/rmr.h
index 6bfe075..e6c1660 100644
--- a/src/common/include/rmr.h
+++ b/src/common/include/rmr.h
@@ -47,6 +47,8 @@
#define RMR_DEF_SIZE 0 // pass as size to have msg allocation use the default msg size
+#define RMR_VOID_MSGTYPE (-1) // unset/invalid message type and sub id
+#define RMR_VOID_SUBID (-1)
#define RMR_OK 0 // state is good
#define RMR_ERR_BADARG 1 // argument passd to function was unusable
diff --git a/src/common/include/rmr_agnostic.h b/src/common/include/rmr_agnostic.h
index a23cff7..06da060 100644
--- a/src/common/include/rmr_agnostic.h
+++ b/src/common/include/rmr_agnostic.h
@@ -230,6 +230,7 @@
// ----- route table generic static things ---------
+static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype );
static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list );
static void del_rte( void* st, void* entry, char const* name, void* thing, void* data );
static char* uta_fib( char* fname );
@@ -237,7 +238,7 @@
static route_table_t* uta_rt_clone( route_table_t* srt );
static void uta_rt_drop( route_table_t* rt );
static endpoint_t* uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_name, int group );
-static rtable_ent_t* uta_add_rte( route_table_t* rt, int mtype, int nrrgroups );
+static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups );
static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name );
static void read_static_rt( uta_ctx_t* ctx, int vlevel );
static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel );
diff --git a/src/common/include/rmr_symtab.h b/src/common/include/rmr_symtab.h
index 7d4d18c..74c5aa3 100644
--- a/src/common/include/rmr_symtab.h
+++ b/src/common/include/rmr_symtab.h
@@ -28,6 +28,8 @@
#ifndef _rmr_symtab_h
#define _rmr_symtab_h
+#include <netdb.h>
+
/* --------- symtab ---------------- */
#define UT_FL_NOCOPY 0x00 /* use user pointer */
#define UT_FL_COPY 0x01 /* make a copy of the string data */
@@ -39,12 +41,12 @@
extern void rmr_sym_dump( void *s );
extern void *rmr_sym_alloc( int size );
extern void rmr_sym_del( void *s, const char *name, unsigned int class );
-extern void *rmr_sym_ndel( void *vtable, int key );
+extern void *rmr_sym_ndel( void *vtable, uint64_t key );
extern void rmr_sym_free( void *vtable );
extern void *rmr_sym_get( void *s, const char *name, unsigned int class );
extern int rmr_sym_put( void *s, const char *name, unsigned int class, void *val );
-extern int rmr_sym_map( void *s, unsigned int key, void *val );
-extern void *rmr_sym_pull( void *vtable, int key );
+extern int rmr_sym_map( void *s, uint64_t key, void *val );
+extern void *rmr_sym_pull( void *vtable, uint64_t key );
extern void rmr_sym_stats( void *s, int level );
extern void rmr_sym_foreach_class( void *vst, unsigned int class, void (* user_fun)( void*, void*, const char*, void*, void* ), void *user_data );
diff --git a/src/common/src/rt_generic_static.c b/src/common/src/rt_generic_static.c
index 4dd9344..5bbacbd 100644
--- a/src/common/src/rt_generic_static.c
+++ b/src/common/src/rt_generic_static.c
@@ -44,6 +44,7 @@
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
+#include <netdb.h>
/*
@@ -56,13 +57,43 @@
void** things;
} thing_list_t;
+// ------------------------------------------------------------------------------------------------
+
+
+/*
+ Little diddy to trim whitespace and trailing comments. Like shell, trailing comments
+ must be at the start of a word (i.e. must be immediatly preceeded by whitespace).
+*/
+static char* clip( char* buf ) {
+ char* tok;
+
+ while( *buf && isspace( *buf ) ) { // skip leading whitespace
+ buf++;
+ }
+
+ if( (tok = strchr( buf, '#' )) != NULL ) {
+ if( tok == buf ) {
+ return buf; // just push back; leading comment sym handled there
+ }
+
+ if( isspace( *(tok-1) ) ) {
+ *tok = 0;
+ }
+ }
+
+ for( tok = buf + (strlen( buf ) - 1); tok > buf && isspace( *tok ); tok-- ); // trim trailing spaces too
+ *(tok+1) = 0;
+
+ return buf;
+}
+
/*
Given a message type create a route table entry and add to the hash keyed on the
message type. Once in the hash, endpoints can be added with uta_add_ep. Size
is the number of group slots to allocate in the entry.
*/
-static rtable_ent_t* uta_add_rte( route_table_t* rt, int mtype, int nrrgroups ) {
+static rtable_ent_t* uta_add_rte( route_table_t* rt, uint64_t key, int nrrgroups ) {
rtable_ent_t* rte;
if( rt == NULL ) {
@@ -87,19 +118,79 @@
memset( rte->rrgroups, 0, sizeof( rrgroup_t *) * nrrgroups );
rte->nrrgroups = nrrgroups;
- rmr_sym_map( rt->hash, mtype, rte ); // add to hash using numeric mtype as key
+ rmr_sym_map( rt->hash, key, rte ); // add to hash using numeric mtype as key
- if( DEBUG ) fprintf( stderr, "[DBUG] route table entry created: mt=%d groups=%d\n", mtype, nrrgroups );
+ if( DEBUG ) fprintf( stderr, "[DBUG] route table entry created: k=%lu groups=%d\n", key, nrrgroups );
return rte;
}
/*
+ This accepts partially parsed information from a record sent by route manager or read from
+ a file such that:
+ ts_field is the msg-type,sender field
+ subid is the integer subscription id
+ rr_field is the endpoint information for round robening message over
+
+ If all goes well, this will add an RTE to the table under construction.
+
+ The ts_field is checked to see if we should ingest this record. We ingest if one of
+ these is true:
+ there is no sender info (a generic entry for all)
+ there is sender and our host:port matches one of the senders
+ the sender info is an IP address that matches one of our IP addresses
+*/
+static void build_entry( uta_ctx_t* ctx, char* ts_field, uint32_t subid, char* rr_field, int vlevel ) {
+ rtable_ent_t* rte; // route table entry added
+ char* tok;
+ int ntoks;
+ uint64_t key = 0; // the symtab key will be mtype or sub_id+mtype
+ char* tokens[128];
+ char* gtokens[64];
+ int i;
+ int ngtoks; // number of tokens in the group list
+ int grp; // index into group list
+
+ ts_field = clip( ts_field ); // ditch extra whitespace and trailing comments
+ rr_field = clip( rr_field );
+
+ if( ((tok = strchr( ts_field, ',' )) == NULL ) || // no sender names (generic entry for all)
+ (uta_has_str( ts_field, ctx->my_name, ',', 127) >= 0) || // our name is in the list
+ has_myip( ts_field, ctx->ip_list, ',', 127 ) ) { // the list has one of our IP addresses
+
+ key = build_rt_key( subid, atoi( ts_field ) );
+
+ if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] create rte for mtype=%s subid=%d key=%lu\n", ts_field, subid, key );
+
+ if( (ngtoks = uta_tokenise( rr_field, gtokens, 64, ';' )) > 0 ) { // split round robin groups
+ rte = uta_add_rte( ctx->new_rtable, key, ngtoks ); // get/create entry for this key
+
+ for( grp = 0; grp < ngtoks; grp++ ) {
+ if( (ntoks = uta_tokenise( gtokens[grp], tokens, 64, ',' )) > 0 ) {
+ for( i = 0; i < ntoks; i++ ) {
+ if( DEBUG > 1 || (vlevel > 1)) fprintf( stderr, "[DBUG] add endpoint %s\n", ts_field );
+ uta_add_ep( ctx->new_rtable, rte, tokens[i], grp );
+ }
+ }
+ }
+ }
+ } else {
+ if( DEBUG || (vlevel > 2) )
+ fprintf( stderr, "entry not included, sender not matched: %s\n", tokens[1] );
+ }
+}
+
+/*
Parse a single record recevied from the route table generator, or read
from a static route table file. Start records cause a new table to
be started (if a partial table was received it is discarded. Table
entry records are added to the currenly 'in progress' table, and an
end record causes the in progress table to be finalised and the
currently active table is replaced.
+
+ We expect one of several types:
+ newrt|{start|end}
+ rte|<mtype>[,sender]|<endpoint-grp>[;<endpoint-grp>,...]
+ mse|<mtype>[,sender]|<sub-id>|<endpoint-grp>[;<endpoint-grp>,...]
*/
static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel ) {
int i;
@@ -129,6 +220,7 @@
break;
case 'n': // newrt|{start|end}
+ tokens[1] = clip( tokens[1] );
if( strcmp( tokens[1], "end" ) == 0 ) { // wrap up the table we were building
if( ctx->new_rtable ) {
uta_rt_drop( ctx->old_rtable ); // time to drop one that was previously replaced
@@ -155,33 +247,29 @@
}
break;
+ case 'm': // assume mse entry
+ if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently
+ break;
+ }
+
+ if( ntoks < 4 ) {
+ if( DEBUG ) fprintf( stderr, "[WRN] rmr_rtc: mse record had too few fields: %d instead of 4\n", ntoks );
+ break;
+ }
+
+ build_entry( ctx, tokens[1], atoi( tokens[2] ), tokens[3], vlevel );
+ break;
+
case 'r': // assume rt entry
if( ! ctx->new_rtable ) { // bad sequence, or malloc issue earlier; ignore siliently
break;
}
- if( ((tok = strchr( tokens[1], ',' )) == NULL ) || // no sender names
- (uta_has_str( tokens[1], ctx->my_name, ',', 127) >= 0) || // our name isn't in the list
- has_myip( tokens[1], ctx->ip_list, ',', 127 ) ) { // the list has one of our IP addresses
-
- if( DEBUG > 1 || (vlevel > 1) ) fprintf( stderr, "[DBUG] create rte for mtype=%s\n", tokens[1] );
-
- if( (ngtoks = uta_tokenise( tokens[2], gtokens, 64, ';' )) > 0 ) { // split last field by groups first
- rte = uta_add_rte( ctx->new_rtable, atoi( tokens[1] ), ngtoks ); // get/create entry for message type
- for( grp = 0; grp < ngtoks; grp++ ) {
- if( (ntoks = uta_tokenise( gtokens[grp], tokens, 64, ',' )) > 0 ) {
- for( i = 0; i < ntoks; i++ ) {
- if( DEBUG > 1 || (vlevel > 1)) fprintf( stderr, "[DBUG] add endpoint %s\n", tokens[i] );
- uta_add_ep( ctx->new_rtable, rte, tokens[i], grp );
- }
- }
- }
- }
+ if( ntoks > 3 ) { // assume new entry with subid last
+ build_entry( ctx, tokens[1], atoi( tokens[3] ), tokens[2], vlevel );
} else {
- if( DEBUG || (vlevel > 2) )
- fprintf( stderr, "entry not included, sender not matched: %s\n", tokens[1] );
+ build_entry( ctx, tokens[1], UNSET_SUBID, tokens[2], vlevel ); // old school entry has no sub id
}
-
break;
default:
@@ -463,4 +551,21 @@
}
+/*
+ Given a session id and message type build a key that can be used to look up the rte in the route
+ table hash. Sub_id is expected to be -1 if there is no session id associated with the entry.
+*/
+static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype ) {
+ uint64_t key;
+
+ if( sub_id == UNSET_SUBID ) {
+ key = 0xffffffff00000000 | mtype;
+ } else {
+ key = (((uint64_t) sub_id) << 32) | (mtype & 0xffffffff);
+ }
+
+ return key;
+}
+
+
#endif
diff --git a/src/common/src/symtab.c b/src/common/src/symtab.c
index a1a792d..31f150f 100644
--- a/src/common/src/symtab.c
+++ b/src/common/src/symtab.c
@@ -28,6 +28,7 @@
Things changed for the Ric Msg implemention (Nov 2018):
- no concept of copy/free of the user data (functions removed)
- add ability to support an integer key (class 0)
+ Numeric key is an unsigned, 64bit key.
- externally visible names given a rmr_ extension as it's being
incorporated into the RIC msg routing library and will be
available to user applications.
@@ -46,6 +47,7 @@
#include <string.h>
#include <stdlib.h>
#include <memory.h>
+#include <netdb.h>
#include "rmr_symtab.h"
@@ -56,7 +58,7 @@
struct Sym_ele *next; /* pointer at next element in list */
struct Sym_ele *prev; /* larger table, easier deletes */
const char *name; /* symbol name */
- unsigned int nkey; // the numeric key
+ uint64_t nkey; // the numeric key
void *val; /* user data associated with name */
unsigned long mcount; /* modificaitons to value */
unsigned long rcount; /* references to symbol */
@@ -136,11 +138,11 @@
much the same.
*/
static int putin( Sym_tab *table, const char *name, unsigned int class, void *val ) {
- Sym_ele *eptr; /* pointer into hash table */
+ Sym_ele *eptr; /* pointer into hash table */
Sym_ele **sym_tab; /* pointer into hash table */
- int hv; /* hash value */
- int rc = 0; /* assume it existed */
- unsigned int nkey = 0; // numeric key if class == 0
+ int hv; /* hash value */
+ int rc = 0; /* assume it existed */
+ uint64_t nkey = 0; // numeric key if class == 0
sym_tab = table->symlist;
@@ -148,7 +150,7 @@
hv = sym_hash( name, table->size ); // hash it
for( eptr=sym_tab[hv]; eptr && ! same( class, eptr->class, eptr->name, name); eptr=eptr->next );
} else {
- nkey = *((int *) name);
+ nkey = *((uint64_t *) name);
hv = nkey % table->size; // just hash the number
for( eptr=sym_tab[hv]; eptr && eptr->nkey != nkey; eptr=eptr->next );
}
@@ -237,7 +239,7 @@
if( eptr->val && eptr->class ) {
fprintf( stderr, "key=%s val@=%p\n", eptr->name, eptr->val );
} else {
- fprintf( stderr, "nkey=%d val@=%p\n", eptr->nkey, eptr->val );
+ fprintf( stderr, "nkey=%lu val@=%p\n", (unsigned long) eptr->nkey, eptr->val );
}
}
}
@@ -284,9 +286,9 @@
{
Sym_tab *table;
Sym_ele **sym_tab;
- Sym_ele *eptr; /* pointer into hash table */
- int hv; /* hash value */
- unsigned int nkey; // class 0, name points to integer not string
+ Sym_ele *eptr; /* pointer into hash table */
+ int hv; /* hash value */
+ uint64_t nkey; // class 0, name points to integer not string
table = (Sym_tab *) vtable;
sym_tab = table->symlist;
@@ -306,7 +308,7 @@
/*
Delete element by numberic key.
*/
-extern void *rmr_sym_ndel( void *vtable, int key ) {
+extern void *rmr_sym_ndel( void *vtable, uint64_t key ) {
rmr_sym_del( vtable, (const char *) &key, 0 );
}
@@ -317,7 +319,7 @@
Sym_ele **sym_tab;
Sym_ele *eptr; // element from table
int hv; // hash value of key
- unsigned int nkey; // numeric key if class 0
+ uint64_t nkey; // numeric key if class 0
table = (Sym_tab *) vtable;
sym_tab = table->symlist;
@@ -326,7 +328,7 @@
hv = sym_hash( name, table->size );
for(eptr=sym_tab[hv]; eptr && ! same(class, eptr->class, eptr->name, name); eptr=eptr->next );
} else {
- nkey = *((int *) name);
+ nkey = *((uint64_t *) name);
hv = nkey % table->size; // just hash the number
for( eptr=sym_tab[hv]; eptr && eptr->nkey != nkey; eptr=eptr->next );
}
@@ -343,7 +345,7 @@
/*
Retrieve the data referenced by a numerical key.
*/
-extern void *rmr_sym_pull( void *vtable, int key ) {
+extern void *rmr_sym_pull( void *vtable, uint64_t key ) {
return rmr_sym_get( vtable, (const char *) &key, 0 );
}
@@ -366,11 +368,11 @@
}
/*
- Add a new entry assuming that the key is an unsigned integer.
+ Add a new entry assuming that the key is an unsigned, 64 bit, integer.
Returns 1 if new, 0 if existed
*/
-extern int rmr_sym_map( void *vtable, unsigned int key, void *val ) {
+extern int rmr_sym_map( void *vtable, uint64_t key, void *val ) {
Sym_tab *table;
table = (Sym_tab *) vtable;
@@ -407,7 +409,7 @@
if( eptr->class ) { // a string key
fprintf( stderr, "sym: (%d) key=%s val@=%p ref=%ld mod=%lu\n", i, eptr->name, eptr->val, eptr->rcount, eptr->mcount );
} else {
- fprintf( stderr, "sym: (%d) key=%d val@=%p ref=%ld mod=%lu\n", i, eptr->nkey, eptr->val, eptr->rcount, eptr->mcount );
+ fprintf( stderr, "sym: (%d) key=%lu val@=%p ref=%ld mod=%lu\n", i, (unsigned long) eptr->nkey, eptr->val, eptr->rcount, eptr->mcount );
}
}
}
@@ -434,7 +436,7 @@
if( eptr->class ) {
fprintf( stderr, "\t%s\n", eptr->name );
} else {
- fprintf( stderr, "\t%d (numeric key)\n", eptr->nkey );
+ fprintf( stderr, "\t%lu (numeric key)\n", (unsigned long) eptr->nkey );
}
}
}
diff --git a/src/nanomsg/include/rmr_private.h b/src/nanomsg/include/rmr_private.h
index dde00ae..7d78382 100644
--- a/src/nanomsg/include/rmr_private.h
+++ b/src/nanomsg/include/rmr_private.h
@@ -97,7 +97,7 @@
static int rt_link2_ep( endpoint_t* ep );
static endpoint_t* uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_name, int group );
static int uta_epsock_byname( route_table_t* rt, char* ep_name );
-static int uta_epsock_rr( route_table_t *rt, int mtype, int group, int* more );
+static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more );
// ------ msg ------------------------------------------------
static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state, int tr_size );
@@ -107,48 +107,6 @@
static rmr_mbuf_t* send_msg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int nn_sock );
static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg );
-
-
-/*
-// --- message ring --------------------------
-static void* uta_mk_ring( int size );
-static void uta_ring_free( void* vr );
-static inline void* uta_ring_extract( void* vr );
-static inline int uta_ring_insert( void* vr, void* new_data );
-
-// --- message and context management --------
-static int ie_test( void* r, int i_factor, long inserts );
-static void free_ctx( uta_ctx_t* ctx );
-static rmr_mbuf_t* alloc_zcmsg( uta_ctx_t* ctx, rmr_mbuf_t* msg, int size, int state );
-static inline rmr_mbuf_t* clone_msg( rmr_mbuf_t* old_msg );
-static rmr_mbuf_t* rcv_msg( uta_ctx_t* ctx, rmr_mbuf_t* old_msg );
-
-// ----- route table static things ---------
-static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list );
-static void del_rte( void* st, void* entry, char const* name, void* thing, void* data );
-static char* uta_fib( char* fname );
-static route_table_t* uta_rt_init( );
-static route_table_t* uta_rt_clone( route_table_t* srt );
-static void uta_rt_drop( route_table_t* rt );
-static endpoint_t* uta_add_ep( route_table_t* rt, rtable_ent_t* rte, char* ep_name, int group );
-static rtable_ent_t* uta_add_rte( route_table_t* rt, int mtype, int nrrgroups );
-static endpoint_t* uta_get_ep( route_table_t* rt, char const* ep_name );
-static int uta_epsock_byname( route_table_t* rt, char* ep_name );
-static int uta_epsock_rr( route_table_t *rt, int mtype, int group, int* more );
-static void read_static_rt( uta_ctx_t* ctx, int vlevel );
-static void parse_rt_rec( uta_ctx_t* ctx, char* buf, int vlevel );
-static void* rtc( void* vctx );
-static endpoint_t* rt_ensure_ep( route_table_t* rt, char const* ep_name );
-
-
-// --- tools_static protos ------------------
-static int uta_tokenise( char* buf, char** tokens, int max, char sep );
-static char* uta_h2ip( char const* hname );
-static int uta_link2( char* target );
-static int uta_lookup_rtg( uta_ctx_t* ctx );
-static int uta_has_str( char const* buf, char const* str, char sep, int max );
-*/
-
static int rt_link2_ep( endpoint_t* ep );
static rmr_mbuf_t* send2ep( uta_ctx_t* ctx, endpoint_t* ep, rmr_mbuf_t* msg );
diff --git a/src/nanomsg/src/rmr.c b/src/nanomsg/src/rmr.c
index dcf7e67..e67402f 100644
--- a/src/nanomsg/src/rmr.c
+++ b/src/nanomsg/src/rmr.c
@@ -242,6 +242,8 @@
int group; // selected group to get socket for
int send_again; // true if the message must be sent again
rmr_mbuf_t* clone_m; // cloned message for an nth send
+ uint64_t key; // lookup key is now subid and mtype
+ int max_rt = 1000;
if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
errno = EINVAL; // if msg is null, this is their clue
@@ -263,8 +265,10 @@
send_again = 1; // force loop entry
group = 0; // always start with group 0
+ key = build_rt_key( msg->sub_id, msg->mtype ); // what we need to find the route table entry
while( send_again ) {
- nn_sock = uta_epsock_rr( ctx->rtable, msg->mtype, group, &send_again ); // round robin select endpoint; again set if mult groups
+ max_rt = 1000;
+ nn_sock = uta_epsock_rr( ctx->rtable, key, group, &send_again ); // round robin select endpoint; again set if mult groups
if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d socket=%d len=%d\n",
msg->mtype, send_again, group, nn_sock, msg->len );
group++;
@@ -277,18 +281,21 @@
if( send_again ) {
clone_m = clone_msg( msg ); // must make a copy as once we send this message is not available
- if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d len=%d\n", msg->mtype, msg->len );
+ if( DEBUG ) fprintf( stderr, "[DBUG] msg cloned: type=%d sub_id=%d len=%d\n", msg->mtype, msg->sub_id, msg->len );
msg->flags |= MFL_NOALLOC; // send should not allocate a new buffer
msg = send_msg( ctx, msg, nn_sock ); // do the hard work, msg should be nil on success
- /*
- if( msg ) {
- // error do we need to count successes/errors, how to report some success, esp if last fails?
+ while( max_rt > 0 && msg && msg->state == RMR_ERR_RETRY ) {
+ msg = send_msg( ctx, msg, nn_sock );
+ max_rt--;
}
- */
msg = clone_m; // clone will be the next to send
} else {
msg = send_msg( ctx, msg, nn_sock ); // send the last, and allocate a new buffer; drops the clone if it was
+ while( max_rt > 0 && msg && msg->state == RMR_ERR_RETRY ) {
+ msg = send_msg( ctx, msg, nn_sock );
+ max_rt--;
+ }
}
}
diff --git a/src/nanomsg/src/rtable_static.c b/src/nanomsg/src/rtable_static.c
index 3ebad2d..cfcb27e 100644
--- a/src/nanomsg/src/rtable_static.c
+++ b/src/nanomsg/src/rtable_static.c
@@ -58,13 +58,13 @@
nn_sock = nn_socket( AF_SP, NN_PUSH ); // the socket we'll use to connect to the target
if( nn_sock < 0 ) {
- fprintf( stderr, "[WARN] rmr: link2: unable to create socket for link to target: %s: %d\n", target, errno );
+ fprintf( stderr, "[WARN] rmr: link2: unable to create socket for link to target: %s: %d\n\n\n", target, errno );
return -1;
}
snprintf( conn_info, sizeof( conn_info ), "tcp://%s", target );
if( nn_connect( nn_sock, conn_info ) < 0 ) { // connect failed
- fprintf( stderr, "[WARN] rmr: link2: unable to create link to target: %s: %d\n", target, errno );
+ fprintf( stderr, "[WARN] rmr: link2: unable to create link to target: %s: %d\n\n\n", target, errno );
nn_close( nn_sock );
return -1;
}
@@ -138,6 +138,7 @@
}
ep->nn_sock = -1; // not connected
+ ep->open = 0;
ep->addr = uta_h2ip( ep_name );
ep->name = strdup( ep_name );
@@ -203,14 +204,15 @@
invoke this function again to make a selection against that group. If there
are no more groups, more is set to 0.
*/
-static int uta_epsock_rr( route_table_t *rt, int mtype, int group, int* more ) {
+static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more ) {
rtable_ent_t* rte; // matching rt entry
endpoint_t* ep; // seected end point
- int nn_sock = -1;
+ int nn_sock = -2;
int dummy;
rrgroup_t* rrg;
- if( ! more ) { // eliminate cheks each time we need to user
+
+ if( ! more ) { // eliminate checks each time we need to use
more = &dummy;
}
@@ -219,20 +221,20 @@
return -1;
}
- if( (rte = rmr_sym_pull( rt->hash, mtype )) == NULL ) {
+ if( (rte = rmr_sym_pull( rt->hash, key )) == NULL ) {
*more = 0;
- //if( DEBUG ) fprintf( stderr, ">>>> rte not found for type = %d\n", mtype );
+ //if( DEBUG ) fprintf( stderr, "#### >>> rte not found for type key=%lu\n", key );
return -1;
}
if( group < 0 || group >= rte->nrrgroups ) {
- //if( DEBUG ) fprintf( stderr, ">>>> group out of range: mtype=%d group=%d max=%d\n", mtype, group, rte->nrrgroups );
+ //if( DEBUG ) fprintf( stderr, ">>>> group out of range: key=%lu group=%d max=%d\n", key, group, rte->nrrgroups );
*more = 0;
return -1;
}
if( (rrg = rte->rrgroups[group]) == NULL ) {
- //if( DEBUG ) fprintf( stderr, ">>>> rrg not found for type = %d\n", mtype );
+ //if( DEBUG ) fprintf( stderr, ">>>> rrg not found for type = %lu\n", key );
*more = 0; // groups are inserted contig, so nothing should be after a nil pointer
return -1;
}
@@ -241,10 +243,10 @@
switch( rrg->nused ) {
case 0: // nothing allocated, just punt
- //if( DEBUG ) fprintf( stderr, ">>>> nothing allocated for the rrg\n" );
+ //if( DEBUG )
return -1;
- case 1: // exactly one, no rr to deal with and more is not possible even if fanout > 1
+ case 1: // exactly one, no rr to deal with
nn_sock = rrg->epts[0]->nn_sock;
ep = rrg->epts[0];
break;
@@ -258,7 +260,7 @@
break;
}
- if( ! ep->open ) { // not connected
+ if( ep && ! ep->open ) { // not connected
if( ep->addr == NULL ) { // name didn't resolve before, try again
ep->addr = uta_h2ip( ep->name );
}
diff --git a/src/nanomsg/src/sr_static.c b/src/nanomsg/src/sr_static.c
index 77a0e64..18acdda 100644
--- a/src/nanomsg/src/sr_static.c
+++ b/src/nanomsg/src/sr_static.c
@@ -98,6 +98,8 @@
exit( 1 );
}
+ memset( msg->header, 0, sizeof( uta_mhdr_t ) ); // must ensure that header portion of tpbuf is 0
+ msg->tp_buf = msg->header;
hdr = (uta_mhdr_t *) msg->header;
hdr->rmr_ver = htonl( RMR_MSG_VER ); // current version
hdr->sub_id = htonl( UNSET_SUBID );
@@ -108,6 +110,8 @@
msg->len = 0; // length of data in the payload
msg->alloc_len = mlen; // length of allocated payload
+ msg->sub_id = UNSET_SUBID;
+ msg->mtype = UNSET_MSGTYPE;
msg->payload = PAYLOAD_ADDR( hdr ); // point at the payload in transport
msg->xaction = ((uta_mhdr_t *)msg->header)->xid; // point at transaction id in header area
msg->state = state; // fill in caller's state (likely the state of the last operation)
@@ -323,6 +327,7 @@
// future: ensure that application did not overrun the XID buffer; last byte must be 0
+ //fprintf( stderr, ">>>>>> sending to %d %d\n", nn_sock, msg->mtype );
hdr = (uta_mhdr_t *) msg->header;
hdr->mtype = htonl( msg->mtype ); // stash type/len/sub-id in network byte order for transport
hdr->sub_id = htonl( msg->sub_id );
diff --git a/src/nng/include/rmr_nng_private.h b/src/nng/include/rmr_nng_private.h
index b810bdb..6bca91f 100644
--- a/src/nng/include/rmr_nng_private.h
+++ b/src/nng/include/rmr_nng_private.h
@@ -104,7 +104,7 @@
static int uta_link2( char* target, nng_socket* nn_sock, nng_dialer* dialer );
static int rt_link2_ep( endpoint_t* ep );
static int uta_epsock_byname( route_table_t* rt, char* ep_name, nng_socket* nn_sock );
-static int uta_epsock_rr( route_table_t *rt, int mtype, int group, int* more, nng_socket* nn_sock );
+static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more, nng_socket* nn_sock );
static inline int xlate_nng_state( int state, int def_state );
diff --git a/src/nng/src/rmr_nng.c b/src/nng/src/rmr_nng.c
index 70542f8..ba7c852 100644
--- a/src/nng/src/rmr_nng.c
+++ b/src/nng/src/rmr_nng.c
@@ -204,6 +204,7 @@
int send_again; // true if the message must be sent again
rmr_mbuf_t* clone_m; // cloned message for an nth send
int sock_ok; // got a valid socket from round robin select
+ uint64_t key; // mtype or sub-id/mtype sym table key
if( (ctx = (uta_ctx_t *) vctx) == NULL || msg == NULL ) { // bad stuff, bail fast
errno = EINVAL; // if msg is null, this is their clue
@@ -229,8 +230,9 @@
send_again = 1; // force loop entry
group = 0; // always start with group 0
+ key = build_rt_key( msg->sub_id, msg->mtype ); // route table key to find the entry
while( send_again ) {
- sock_ok = uta_epsock_rr( ctx->rtable, msg->mtype, group, &send_again, &nn_sock ); // round robin sel epoint; again set if mult groups
+ sock_ok = uta_epsock_rr( ctx->rtable, key, group, &send_again, &nn_sock ); // round robin sel epoint; again set if mult groups
if( DEBUG ) fprintf( stderr, "[DBUG] send msg: type=%d again=%d group=%d len=%d sock_ok=%d\n",
msg->mtype, send_again, group, msg->len, sock_ok );
group++;
diff --git a/src/nng/src/rtable_nng_static.c b/src/nng/src/rtable_nng_static.c
index d6d3490..6122f69 100644
--- a/src/nng/src/rtable_nng_static.c
+++ b/src/nng/src/rtable_nng_static.c
@@ -231,7 +231,7 @@
during test that different entries are being seleted; we cannot depend on the nng
socket being different as we could with nano.
*/
-static int uta_epsock_rr( route_table_t *rt, int mtype, int group, int* more, nng_socket* nn_sock ) {
+static int uta_epsock_rr( route_table_t *rt, uint64_t key, int group, int* more, nng_socket* nn_sock ) {
rtable_ent_t* rte; // matching rt entry
endpoint_t* ep; // seected end point
int state = FALSE; // processing state
@@ -254,20 +254,20 @@
return FALSE;
}
- if( (rte = rmr_sym_pull( rt->hash, mtype )) == NULL ) {
+ if( (rte = rmr_sym_pull( rt->hash, key )) == NULL ) {
*more = 0;
- //if( DEBUG ) fprintf( stderr, ">>>> rte not found for type = %d\n", mtype );
+ //if( DEBUG ) fprintf( stderr, ">>>> rte not found for type = %lu\n", key );
return FALSE;
}
if( group < 0 || group >= rte->nrrgroups ) {
- //if( DEBUG ) fprintf( stderr, ">>>> group out of range: mtype=%d group=%d max=%d\n", mtype, group, rte->nrrgroups );
+ //if( DEBUG ) fprintf( stderr, ">>>> group out of range: key=%lu group=%d max=%d\n", key, group, rte->nrrgroups );
*more = 0;
return FALSE;
}
if( (rrg = rte->rrgroups[group]) == NULL ) {
- //if( DEBUG ) fprintf( stderr, ">>>> rrg not found for type = %d\n", mtype );
+ //if( DEBUG ) fprintf( stderr, ">>>> rrg not found for type key=%lu\n", key );
*more = 0; // groups are inserted contig, so nothing should be after a nil pointer
return FALSE;
}
diff --git a/src/nng/src/sr_nng_static.c b/src/nng/src/sr_nng_static.c
index e1a2fa5..7c17589 100644
--- a/src/nng/src/sr_nng_static.c
+++ b/src/nng/src/sr_nng_static.c
@@ -147,6 +147,8 @@
}
msg->len = 0; // length of data in the payload
msg->alloc_len = mlen; // length of allocated transport buffer
+ msg->sub_id = UNSET_SUBID;
+ msg->mtype = UNSET_MSGTYPE;
msg->payload = PAYLOAD_ADDR( hdr ); // point to payload (past all header junk)
msg->xaction = ((uta_mhdr_t *)msg->header)->xid; // point at transaction id in header area
msg->state = state; // fill in caller's state (likely the state of the last operation)
@@ -175,6 +177,8 @@
memset( msg, 0, sizeof( *msg ) );
+ msg->sub_id = UNSET_SUBID;
+ msg->mtype = UNSET_MSGTYPE;
msg->tp_buf = NULL;
msg->header = NULL;
msg->len = -1; // no payload; invalid len
diff --git a/test/app_test/rebuild.ksh b/test/app_test/rebuild.ksh
new file mode 100644
index 0000000..04367ef
--- /dev/null
+++ b/test/app_test/rebuild.ksh
@@ -0,0 +1,46 @@
+#!/usr/bin/env ksh
+# :vi ts=4 sw=4 noet :
+#==================================================================================
+# Copyright (c) 2019 Nokia
+# Copyright (c) 2018-2019 AT&T Intellectual Property.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#==================================================================================
+#
+
+# ---------------------------------------------------------------------------------
+# Mnemonic: rebuild.ksh
+# Abstract: This is a simple script that will cause RMr to be rebuilt. It
+# may be invoked by any of the run_* scripts in this directory.
+#
+# Date: 24 April 2019
+# Author: E. Scott Daniels
+# ---------------------------------------------------------------------------------
+
+
+build_path=../../.build
+
+(
+ set -e
+ mkdir -p $build_path
+ cd ${build_path%/*} # cd barfs on ../../.build, so we do this
+ cd ${build_path##*/}
+ cmake ..
+ make package
+)
+if (( $? != 0 ))
+then
+ echo "build failed"
+ exit 1
+fi
+
diff --git a/test/app_test/receiver.c b/test/app_test/receiver.c
index dd639ce..ed2450b 100644
--- a/test/app_test/receiver.c
+++ b/test/app_test/receiver.c
@@ -99,12 +99,15 @@
long good = 0; // good palyload buffers
long bad = 0; // payload buffers which were not correct
long bad_tr = 0; // trace buffers that were not correct
+ long bad_sid = 0; // bad subscription ids
long timeout = 0;
char* data;
- char wbuf[1024]; // we'll pull trace data into here
int nmsgs = 10; // number of messages to stop after (argv[1] overrides)
int rt_count = 0; // retry count
long ack_count = 0; // number of acks sent
+ int count_bins[11]; // histogram bins based on msg type (0-10)
+ char wbuf[1024]; // we'll pull trace data into here, and use as general working buffer
+ char sbuf[128]; // short buffer
data = getenv( "RMR_RTG_SVC" );
if( data == NULL ) {
@@ -118,6 +121,8 @@
listen_port = argv[2];
}
+ memset( count_bins, 0, sizeof( count_bins ) );
+
fprintf( stderr, "<RCVR> listening on port: %s for a max of %d messages\n", listen_port, nmsgs );
mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_NONE ); // start your engines!
@@ -165,11 +170,21 @@
}
count++; // messages received for stats output
- if( msg->mtype == 5 ) { // send an ack; sender will count but not process, so data in message is moot
- msg = rmr_rts_msg( mrc, msg ); // we don't try to resend if this returns retry
+ if( msg->mtype < 3 ) { // count number of properly set subscription id
+ if( msg->sub_id != msg->mtype * 10 ) {
+ bad_sid++;
+ }
+ }
+
+ if( msg->mtype >= 0 && msg->mtype <= 10 ) {
+ count_bins[msg->mtype]++;
+ }
+
+ if( msg->mtype == 5 ) { // send an ack; sender will count but not process, so data in message is moot
+ msg = rmr_rts_msg( mrc, msg );
rt_count = 1000;
while( rt_count > 0 && msg != NULL && msg->state == RMR_ERR_RETRY ) { // to work right in nano we need this :(
- if( ack_count < 1 ) { // need to connect, so hard wait
+ if( ack_count < 1 ) { // 1st ack, so we need to connect, and we'll wait for that
sleep( 1 );
}
rt_count--;
@@ -189,7 +204,16 @@
}
}
- fprintf( stderr, "<RCVR> [%s] %ld messages; good=%ld acked=%ld bad=%ld bad-trace=%ld\n", !!(errors + bad + bad_tr) ? "FAIL" : "PASS", count, good, ack_count, bad, bad_tr );
+ wbuf[0] = 0;
+ for( i = 0; i < 11; i++ ) {
+ snprintf( sbuf, sizeof( sbuf ), "%6d ", count_bins[i] );
+ strcat( wbuf, sbuf );
+ }
+
+ fprintf( stderr, "<RCVR> mtype histogram: %s\n", wbuf );
+ fprintf( stderr, "<RCVR> [%s] %ld messages; good=%ld acked=%ld bad=%ld bad-trace=%ld bad-sub_id=%ld\n",
+ !!(errors + bad + bad_tr) ? "FAIL" : "PASS", count, good, ack_count, bad, bad_tr, bad_sid );
+
sleep( 2 ); // let any outbound acks flow before closing
rmr_close( mrc );
diff --git a/test/app_test/rt.mask b/test/app_test/rt.mask
index 80d139f..5fab235 100644
--- a/test/app_test/rt.mask
+++ b/test/app_test/rt.mask
@@ -1,21 +1,24 @@
# This is a 'mask' such that the run command can generate with the
-# host name for the sender.
+# host name for the sender. (not needed after RMr 1.0.18)
-newrt|start
-rte|0|localhost:4560
-rte|1|localhost:4560
-rte|2|localhost:4560
-rte|3|localhost:4560
-rte|4|localhost:4560
-rte|5|localhost:4560
-rte|6|localhost:4560
-rte|7|localhost:4560
-rte|8|localhost:4560
-rte|9|localhost:4560
-rte|10|localhost:4560
-rte|11|localhost:4560
-rte|12|localhost:4560
-rte|13|localhost:4560
-rte|999|%%hostname%%:43086
-newrt|end
+newrt | start
+mse | 0 | 0 | localhost:4560
+mse | 1 | 10 | localhost:4560
+mse | 2 | 20 | localhost:4560
+rte | 3 | localhost:4560
+mse | 3 | 100 | localhost:4560 # special test to ensure that this does not affect previous entry
+rte | 4 | localhost:4560
+rte | 5 | localhost:4560
+rte | 6 | localhost:4560
+rte | 7 | localhost:4560
+rte | 8 | localhost:4560
+rte | 9 | localhost:4560
+rte | 10 | localhost:4560
+rte | 11 | localhost:4560
+rte | 12 | localhost:4560
+rte | 13 | localhost:4560
+
+# this entry isn't needed after RMr 1.0.18
+rte | 999 | %%hostname%%:43086
+newrt | end
diff --git a/test/app_test/run_app_test.ksh b/test/app_test/run_app_test.ksh
index 7ad4934..3c4f20e 100644
--- a/test/app_test/run_app_test.ksh
+++ b/test/app_test/run_app_test.ksh
@@ -73,6 +73,7 @@
nano_receiver=0
wait=1
rebuild=0
+verbose=0
while [[ $1 == -* ]]
do
@@ -83,6 +84,7 @@
nano_receiver=1
;;
-n) nmsg=$2; shift;;
+ -v) verbose=1;;
*) echo "unrecognised option: $1"
echo "usage: $0 [-B] [-d micor-sec-delay] [-N] [-n num-msgs]"
@@ -94,23 +96,18 @@
shift
done
+if (( verbose ))
+then
+ echo "2" >.verbose
+ export RMR_VCTL_FILE=".verbose"
+fi
+
if (( rebuild ))
then
build_path=../../.build
-
- (
- set -e
- mkdir -p $build_path
- cd ${build_path%/*} # cd barfs on ../../.build, so we do this
- cd ${build_path##*/}
- cmake ..
- make package
- )
- if (( $? != 0 ))
- then
- echo "build failed"
- exit 1
- fi
+ set -e
+ ksh ./rebuild.ksh
+ set +e
else
build_path=${BUILD_PATH:-"../../.build"} # we prefer .build at the root level, but allow user option
@@ -137,6 +134,7 @@
fi
run_rcvr &
+sleep 2 # if sender starts faster than rcvr we can drop, so pause a bit
run_sender &
wait
@@ -151,6 +149,7 @@
fi
rm /tmp/PID$$.*
+rm -f .verbose
exit $(( !! (src + rrc) ))
diff --git a/test/app_test/run_multi_test.ksh b/test/app_test/run_multi_test.ksh
new file mode 100644
index 0000000..9bfc59b
--- /dev/null
+++ b/test/app_test/run_multi_test.ksh
@@ -0,0 +1,208 @@
+#!/usr/bin/env ksh
+# :vi ts=4 sw=4 noet :
+#==================================================================================
+# Copyright (c) 2019 Nokia
+# Copyright (c) 2018-2019 AT&T Intellectual Property.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#==================================================================================
+#
+
+# ---------------------------------------------------------------------------------
+# Mnemonic: run_multi_test.ksh
+# Abstract: This is a simple script to set up and run the basic send/receive
+# processes for some library validation on top of nano/nng. This
+# particular tests starts several receivers and creates a route table
+# which causes messages to be sent to all receivers in parallel
+# (forcing message cloning internally in RMr).
+# It should be possible to clone the repo, switch to this directory
+# and execute 'ksh run -B' which will build RMr, make the sender and
+# recevier then run the basic test.
+#
+# Example command line:
+# ksh ./run_multi_test.ksh # default 10 messages at 1 msg/sec
+# ksh ./run_multi_test.ksh -N # default but with nanomsg lib
+# ksh ./run_multi_test.ksh -d 100 -n 10000 # send 10k messages with 100ms delay between
+#
+# Date: 24 April 2019
+# Author: E. Scott Daniels
+# ---------------------------------------------------------------------------------
+
+
+# The sender and receivers are run asynch. Their exit statuses are captured in a
+# file in order for the 'main' to pick them up easily.
+#
+function run_sender {
+ if (( $nano_sender ))
+ then
+ ./sender_nano $nmsg $delay
+ else
+ ./sender $nmsg $delay
+ fi
+ echo $? >/tmp/PID$$.src # must communicate state back via file b/c asynch
+}
+
+# $1 is the instance so we can keep logs separate
+function run_rcvr {
+ typeset port
+
+ port=$(( 4460 + ${1:-0} ))
+ export RMR_RTG_SVC=$(( 9990 + $1 ))
+ if (( $nano_receiver ))
+ then
+ ./receiver_nano $nmsg $port
+ else
+ ./receiver $nmsg $port
+ fi
+ echo $? >/tmp/PID$$.$1.rrc
+}
+
+# Drop a contrived route table in such that the sender sends each message to n
+# receivers.
+#
+function set_rt {
+ typeset port=4460
+ typeset groups="localhost:4460"
+ for (( i=1; i < ${1:-3}; i++ ))
+ do
+ groups="$groups;localhost:$((port+i))"
+ done
+
+ cat <<endKat >multi.rt
+ newrt | start
+ mse |0 | 0 | $groups
+ mse |1 | 10 | $groups
+ mse |2 | 20 | $groups
+ rte |3 | $groups
+ rte |4 | $groups
+ rte |5 | $groups
+ rte |6 | $groups
+ rte |7 | $groups
+ rte |8 | $groups
+ rte |9 | $groups
+ rte |10 | $groups
+ rte |11 | $groups
+ newrt | end
+endKat
+
+}
+
+# ---------------------------------------------------------
+
+if [[ ! -f local.rt ]] # we need the real host name in the local.rt; build one from mask if not there
+then
+ hn=$(hostname)
+ sed "s!%%hostname%%!$hn!" rt.mask >local.rt
+fi
+
+nmsg=10 # total number of messages to be exchanged (-n value changes)
+delay=1000000 # microsec sleep between msg 1,000,000 == 1s
+nano_sender=0 # start nano version if set (-N)
+nano_receiver=0
+wait=1
+rebuild=0
+verbose=0
+nrcvrs=3 # this is sane, but -r allows it to be set up
+
+while [[ $1 == -* ]]
+do
+ case $1 in
+ -B) rebuild=1;;
+ -d) delay=$2; shift;;
+ -N) nano_sender=1
+ nano_receiver=1
+ ;;
+ -n) nmsg=$2; shift;;
+ -r) nrcvrs=$2; shift;;
+ -v) verbose=1;;
+
+ *) echo "unrecognised option: $1"
+ echo "usage: $0 [-B] [-d micor-sec-delay] [-N] [-n num-msgs]"
+ echo " -B forces a rebuild which will use .build"
+ exit 1
+ ;;
+ esac
+
+ shift
+done
+
+if (( verbose ))
+then
+ echo "2" >.verbose
+ export RMR_VCTL_FILE=".verbose"
+fi
+
+if (( rebuild ))
+then
+ build_path=../../.build # if we rebuild we can insist that it is in .build :)
+ set -e
+ ksh ./rebuild.ksh
+ set +e
+else
+ build_path=${BUILD_PATH:-"../../.build"} # we prefer .build at the root level, but allow user option
+
+ if [[ ! -d $build_path ]]
+ then
+ echo "cannot find build in: $build_path"
+ echo "either create, and then build RMr, or set BUILD_PATH as an evironment var before running this"
+ exit 1
+ fi
+fi
+
+export LD_LIBRARY_PATH=$build_path:$build_path/lib
+export LIBRARY_PATH=$LD_LIBRARY_PATH
+export RMR_SEED_RT=./multi.rt
+
+set_rt $nrcvrs # set up the rt for n receivers
+
+if [[ ! -f ./sender ]]
+then
+ if ! make >/dev/null 2>&1
+ then
+ echo "[FAIL] cannot find sender binary, and cannot make it.... humm?"
+ exit 1
+ fi
+fi
+
+for (( i=0; i < nrcvrs; i++ )) # start the receivers with an instance number
+do
+ run_rcvr $i &
+done
+
+sleep 2 # let receivers init so we don't shoot at an empty target
+run_sender &
+
+wait
+
+
+for (( i=0; i < nrcvrs; i++ )) # collect return codes
+do
+ head -1 /tmp/PID$$.$i.rrc | read x
+ (( rrc += x ))
+done
+
+head -1 /tmp/PID$$.src | read src
+
+if (( !! (src + rrc) ))
+then
+ echo "[FAIL] sender rc=$src receiver rc=$rrc"
+else
+ echo "[PASS] sender rc=$src receiver rc=$rrc"
+ rm -f multi.rt
+fi
+
+rm /tmp/PID$$.*
+rm -f .verbose
+
+exit $(( !! (src + rrc) ))
+
diff --git a/test/app_test/run_rr_test.ksh b/test/app_test/run_rr_test.ksh
new file mode 100644
index 0000000..2e52aa6
--- /dev/null
+++ b/test/app_test/run_rr_test.ksh
@@ -0,0 +1,211 @@
+#!/usr/bin/env ksh
+# :vi ts=4 sw=4 noet :
+#==================================================================================
+# Copyright (c) 2019 Nokia
+# Copyright (c) 2018-2019 AT&T Intellectual Property.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#==================================================================================
+#
+
+# ---------------------------------------------------------------------------------
+# Mnemonic: run_multi_test.ksh
+# Abstract: This is a simple script to set up and run the basic send/receive
+# processes for some library validation on top of nano/nng. This
+# particular tests starts several receivers and creates a route table
+# which causes messages to be sent round robin to all of the receivers.
+# The number of messages command line parameter (-n) will be the number
+# of messages that each receiver should expect; the sender will be asked
+# to send r times that many messages so that as they are round robbined
+# each receiver should get the same number of messages.
+#
+# Example command line:
+# ksh ./run_rr_test.ksh # default 10 messages at 1 msg/sec
+# ksh ./run_rr_test.ksh -N # default but with nanomsg lib
+# ksh ./run_rr_test.ksh -d 100 -n 10000 # send 10k messages with 100ms delay between
+#
+# Date: 24 April 2019
+# Author: E. Scott Daniels
+# ---------------------------------------------------------------------------------
+
+
+# The sender and receivers are run asynch. Their exit statuses are captured in a
+# file in order for the 'main' to pick them up easily.
+#
+function run_sender {
+ export RMR_RTG_SVC=8990
+ if (( $nano_sender ))
+ then
+ ./sender_nano $(( nmsg * nrcvrs )) $delay 1
+ else
+ ./sender $(( nmsg * nrcvrs )) $delay 1
+ fi
+ echo $? >/tmp/PID$$.src # must communicate state back via file b/c asynch
+}
+
+# $1 is the instance so we can keep logs separate
+function run_rcvr {
+ typeset port
+
+ port=$(( 4560 + ${1:-0} ))
+ export RMR_RTG_SVC=$(( 9990 + $1 ))
+ if (( $nano_receiver ))
+ then
+ ./receiver_nano $nmsg $port
+ else
+ ./receiver $nmsg $port
+ fi
+ echo $? >/tmp/PID$$.$1.rrc
+}
+
+#
+# Drop a contrived route table in such that the sender sends each message to n
+# receivers.
+#
+function set_rt {
+ typeset port=4560
+ typeset endpoints="localhost:4560"
+ for (( i=1; i < ${1:-3}; i++ ))
+ do
+ endpoints="$endpoints,localhost:$((port+i))"
+ done
+
+ cat <<endKat >rr.rt
+ newrt |start
+ rte |0 | $endpoints |0
+ rte |1 | $endpoints |10
+ mse |2 | 20 | $endpoints # new style mtype/subid entry
+ rte |3 | $endpoints |0
+ rte |4 | $endpoints |0
+ rte |5 | $endpoints |0
+ rte |6 | $endpoints |0
+ rte |7 | $endpoints |0
+ rte |8 | $endpoints |0
+ rte |9 | $endpoints |0
+ rte |10 | $endpoints |0
+ rte |11 | $endpoints |0
+ newrt |end
+endKat
+
+}
+
+# ---------------------------------------------------------
+
+if [[ ! -f local.rt ]] # we need the real host name in the local.rt; build one from mask if not there
+then
+ hn=$(hostname)
+ sed "s!%%hostname%%!$hn!" rt.mask >local.rt
+fi
+
+nmsg=10 # total number of messages to be exchanged (-n value changes)
+delay=1000 # microsec sleep between msg 1,000,000 == 1s (shorter than others b/c/ we are sending to multiple)
+nano_sender=0 # start nano version if set (-N)
+nano_receiver=0
+wait=1
+rebuild=0
+verbose=0
+nrcvrs=3 # this is sane, but -r allows it to be set up
+
+while [[ $1 == -* ]]
+do
+ case $1 in
+ -B) rebuild=1;;
+ -d) delay=$2; shift;;
+ -N) nano_sender=1
+ nano_receiver=1
+ ;;
+ -n) nmsg=$2; shift;;
+ -r) nrcvrs=$2; shift;;
+ -v) verbose=1;;
+
+ *) echo "unrecognised option: $1"
+ echo "usage: $0 [-B] [-d micor-sec-delay] [-N] [-n num-msgs]"
+ echo " -B forces a rebuild which will use .build"
+ exit 1
+ ;;
+ esac
+
+ shift
+done
+
+
+if (( verbose ))
+then
+ echo "2" >.verbose
+ export RMR_VCTL_FILE=".verbose"
+fi
+
+if (( rebuild ))
+then
+ build_path=../../.build
+ set -e
+ ksh ./rebuild.ksh
+ set +e
+else
+ build_path=${BUILD_PATH:-"../../.build"} # we prefer .build at the root level, but allow user option
+
+ if [[ ! -d $build_path ]]
+ then
+ echo "cannot find build in: $build_path"
+ echo "either create, and then build RMr, or set BUILD_PATH as an evironment var before running this"
+ exit 1
+ fi
+fi
+
+export LD_LIBRARY_PATH=$build_path:$build_path/lib
+export LIBRARY_PATH=$LD_LIBRARY_PATH
+export RMR_SEED_RT=./rr.rt
+
+set_rt $nrcvrs
+
+if [[ ! -f ./sender ]]
+then
+ if ! make >/dev/null 2>&1
+ then
+ echo "[FAIL] cannot find sender binary, and cannot make it.... humm?"
+ exit 1
+ fi
+fi
+
+for (( i=0; i < nrcvrs; i++ )) # start the receivers with an instance number
+do
+ run_rcvr $i &
+done
+
+sleep 2 # wait to start sender else we might send before receivers up and drop messages
+run_sender &
+
+wait
+
+
+for (( i=0; i < nrcvrs; i++ )) # collect return codes
+do
+ head -1 /tmp/PID$$.$i.rrc | read x
+ (( rrc += x ))
+done
+
+head -1 /tmp/PID$$.src | read src
+
+if (( !! (src + rrc) ))
+then
+ echo "[FAIL] sender rc=$src receiver rc=$rrc"
+else
+ echo "[PASS] sender rc=$src receiver rc=$rrc"
+fi
+
+rm /tmp/PID$$.*
+rm -f .verbose
+rm -f rr.rt
+
+exit $(( !! (src + rrc) ))
+
diff --git a/test/app_test/sender.c b/test/app_test/sender.c
index 3602d4f..ac7d869 100644
--- a/test/app_test/sender.c
+++ b/test/app_test/sender.c
@@ -32,8 +32,11 @@
will give up and fail.
- Message types will vary between 1 and 10, so the route table must
- be set up to support those message types.
+ Message types will vary between 0 and 9, so the route table must
+ be set up to support those message types. Further, for message types
+ 0, 1 and 2, the subscription ID will be set to type x 10, so the route
+ table must be set to include the sub-id for those types in order for
+ the messages to reach their destination.
Message format is:
ck1 ck2|<msg-txt><nil>
@@ -42,7 +45,10 @@
Ck2 is the simple check sum of the trace data which is a nil terminated
series of bytes.
- Parms: argv[1] == nmsgs; argv[2] == delay; argv[3] == listen port
+ Parms: argv[1] == number of msgs to send (10)
+ argv[2] == delay (mu-seconds, 1000000 default)
+ argv[3] == max msg type (not inclusive; default 10)
+ argv[4] == listen port
Sender will send for at most 20 seconds, so if nmsgs and delay extend
beyond that period the total number of messages sent will be less
@@ -75,16 +81,17 @@
int main( int argc, char** argv ) {
void* mrc; // msg router context
- struct epoll_event events[1]; // list of events to give to epoll
- struct epoll_event epe; // event definition for event to listen to
+ struct epoll_event events[1]; // list of events to give to epoll
+ struct epoll_event epe; // event definition for event to listen to
int ep_fd = -1; // epoll's file des (given to epoll_wait)
- int rcv_fd; // file des that NNG tickles -- give this to epoll to listen on
- int nready; // number of events ready for receive
+ int rcv_fd; // file des that NNG tickles -- give this to epoll to listen on
+ int nready; // number of events ready for receive
rmr_mbuf_t* sbuf; // send buffer
rmr_mbuf_t* rbuf; // received buffer
- int count = 0;
- int rt_count = 0; // number of messages requiring a spin retry
- int rcvd_count = 0;
+ int count = 0;
+ int rt_count = 0; // number of messages requiring a spin retry
+ int rcvd_count = 0;
+ int fail_count = 0; // # of failure sends after first successful send
char* listen_port = "43086";
int mtype = 0;
int stats_freq = 100;
@@ -94,6 +101,7 @@
long timeout = 0;
int delay = 100000; // usec between send attempts
int nmsgs = 10; // number of messages to send
+ int max_mt = 10; // reset point for message type
if( argc > 1 ) {
nmsgs = atoi( argv[1] );
@@ -102,7 +110,10 @@
delay = atoi( argv[2] );
}
if( argc > 3 ) {
- listen_port = argv[3];
+ max_mt = atoi( argv[3] );
+ }
+ if( argc > 4 ) {
+ listen_port = argv[4];
}
fprintf( stderr, "<SNDR> listen port: %s; sending %d messages; delay=%d\n", listen_port, nmsgs, delay );
@@ -150,13 +161,19 @@
timeout = time( NULL ) + 20;
- while( count < nmsgs ) { // we send 10 messages after the first message is successful
+ while( count < nmsgs ) { // we send n messages after the first message is successful
snprintf( trace, 100, "%lld", (long long) time( NULL ) );
rmr_set_trace( sbuf, trace, strlen( trace ) + 1 );
snprintf( wbuf, 200, "count=%d tr=%s %d stand up and cheer!", count, trace, rand() );
snprintf( sbuf->payload, 300, "%d %d|%s", sum( wbuf ), sum( trace ), wbuf );
sbuf->mtype = mtype; // fill in the message bits
+ if( mtype < 3 ) {
+ sbuf->sub_id = mtype * 10;
+ } else {
+ sbuf->sub_id = -1;
+ }
+
sbuf->len = strlen( sbuf->payload ) + 1; // our receiver likely wants a nice acsii-z string
sbuf->state = 0;
sbuf = rmr_send_msg( mrc, sbuf ); // send it (send returns an empty payload on success, or the original payload on fail/retry)
@@ -167,7 +184,13 @@
while( sbuf->state == RMR_ERR_RETRY ) { // soft failure (device busy?) retry
sbuf = rmr_send_msg( mrc, sbuf ); // retry send until it's good (simple test; real programmes should do better)
}
- successful = 1;
+ if( sbuf->state == RMR_OK ) {
+ successful = 1; // indicates only that we sent one successful message, not the current state
+ } else {
+ if( successful ) {
+ fail_count++; // count failures after first successful message
+ }
+ }
break;
case RMR_OK:
@@ -175,15 +198,19 @@
break;
default:
+ if( successful ) {
+ fail_count++; // count failures after first successful message
+ }
// some error (not connected likely), don't count this
+ //sleep( 1 );
break;
}
if( successful ) { // once we have a message that was sent, start to increase things
count++;
mtype++;
- if( mtype > 10 ) { // if large number of sends don't require infinite rt entries :)
- mtype = 1;
+ if( mtype >= max_mt ) { // if large number of sends don't require infinite rt entries :)
+ mtype = 0;
}
}
@@ -237,7 +264,7 @@
}
}
- fprintf( stderr, "<SNDR> [%s] sent %d messages received %d acks retries=%d\n", count == nmsgs ? "PASS" : "FAIL", count, rcvd_count, rt_count );
+ fprintf( stderr, "<SNDR> [%s] sent=%d rcvd-acks=%d failures=%d retries=%d\n", count == nmsgs ? "PASS" : "FAIL", count, rcvd_count, fail_count, rt_count );
rmr_close( mrc );
return !( count == nmsgs );
diff --git a/test/rmr_nng_api_static_test.c b/test/rmr_nng_api_static_test.c
index e86c6f5..cefc09f 100644
--- a/test/rmr_nng_api_static_test.c
+++ b/test/rmr_nng_api_static_test.c
@@ -139,6 +139,10 @@
msg2 = rmr_send_msg( NULL, NULL ); // drive for coverage
errors += fail_not_nil( msg2, "send_msg returned msg pointer when given a nil message and context" );
+ msg->state = 0;
+ msg = rmr_send_msg( NULL, msg );
+ errors += fail_if( msg->state == 0, "rmr_send_msg did not set msg state when msg given with nil context" );
+
// --- sends will fail with a no endpoint error until a dummy route table is set, so we test fail case first.
msg->len = 100;
msg->mtype = 1;
@@ -194,17 +198,28 @@
rmr_rts_msg( rmc, NULL );
errors += fail_if( errno == 0, "rmr_rts_msg did not set errno when given a nil message" );
+ msg->state = 0;
+ msg = rmr_rts_msg( NULL, msg ); // should set state in msg
+ errors += fail_if_equal( msg->state, 0, "rmr_rts_msg did not set state when given valid message but no context" );
+
+
msg = rmr_rts_msg( rmc, msg ); // return the buffer to the sender
errors += fail_if_nil( msg, "rmr_rts_msg did not return a message pointer" );
errors += fail_if( errno != 0, "rmr_rts_msg did not reset errno" );
snprintf( msg->xaction, 17, "%015d", 16 ); // dummy transaction id (emulation generates, this should arrive after a few calls to recv)
+
+ msg->state = 0;
+ msg = rmr_call( NULL, msg );
+ errors += fail_if( msg->state == 0, "rmr_call did not set message state when given message with nil context" );
+
+ msg->mtype = 0;
msg = rmr_call( rmc, msg ); // this call should return a message as we can anticipate a dummy message in
errors += fail_if_nil( msg, "rmr_call returned a nil message on call expected to succeed" );
if( msg ) {
errors += fail_not_equal( msg->state, RMR_OK, "rmr_call did not properly set state on successful return" );
- errors += fail_if( errno != 0, "rmr_call did not properly set errno on successful return" );
+ errors += fail_not_equal( errno, 0, "rmr_call did not properly set errno (a) on successful return" );
}
snprintf( wbuf, 17, "%015d", 14 ); // if we call receive we should find this in the first 15 tries
@@ -240,6 +255,11 @@
rmr_free_msg( msg2 );
+ msg2 = rmr_torcv_msg( NULL, NULL, 10 );
+ errors += fail_not_nil( msg2, "rmr_torcv_msg returned a pointer when given nil information" );
+ msg2 = rmr_torcv_msg( rmc, NULL, 10 );
+ errors += fail_if_nil( msg2, "rmr_torcv_msg did not return a message pointer when given a nil old msg" );
+
// --- test timeout receive; our dummy epoll function will return 1 ready on first call and 0 ready (timeout emulation) on second
// however we must drain the swamp (queue) first, so run until we get a timeout error, or 20 and report error if we get to 20.
msg = NULL;
diff --git a/test/rt_static_test.c b/test/rt_static_test.c
index 129eec1..710c75e 100644
--- a/test/rt_static_test.c
+++ b/test/rt_static_test.c
@@ -228,5 +228,15 @@
}
*/
+ state = uta_link2( "worm", NULL, NULL );
+ errors += fail_if_true( state, "link2 did not return false when given nil pointers" );
+
+ state = uta_epsock_rr( rt, 122, 0, NULL, NULL );
+ errors += fail_if_true( state, "uta_epsock_rr returned bad state when given nil socket pointer" );
+
+ rt = uta_rt_init( ); // get us a route table
+ state = uta_epsock_rr( rt, 0, -1, NULL, &nn_sock );
+ errors += fail_if_true( state, "uta_epsock_rr returned bad state (true) when given negative group number" );
+
return !!errors; // 1 or 0 regardless of count
}
diff --git a/test/sr_nng_static_test.c b/test/sr_nng_static_test.c
index d477391..4997706 100644
--- a/test/sr_nng_static_test.c
+++ b/test/sr_nng_static_test.c
@@ -46,14 +46,21 @@
char* rt_stuff; // strings for the route table
rt_stuff =
+ "newrt|end\n" // end of table check before start of table found
+ "# comment to drive full comment test\n"
+ "\n" // handle blank lines
+ " \n" // handle blank lines
+ "mse|4|10|localhost:4561\n" // entry before start message
+ "rte|4|localhost:4561\n" // entry before start message
"newrt|start\n" // false start to drive detection
"xxx|badentry to drive default case"
"newrt|start\n"
- "rte|0|localhost:4560,localhost:4562\n"
+ "rte|0|localhost:4560,localhost:4562\n" // these are legitimate entries for our testing
"rte|1|localhost:4562;localhost:4561,localhost:4569\n"
- "rte|2|localhost:4562\n"
- "rte|4|localhost:4561\n"
- "rte|5|localhost:4563\n"
+ "rte|2|localhost:4562| 10\n" // new subid at end
+ "mse|4|10|localhost:4561\n" // new msg/subid specifier rec
+ "mse|4|localhost:4561\n" // new mse entry with less than needed fields
+ " rte| 5 |localhost:4563 #garbage comment\n" // tests white space cleanup
"rte|6|localhost:4562\n"
"newrt|end\n";
@@ -92,6 +99,7 @@
nng_socket nn_dummy_sock; // dummy needed to drive send
int size;
int i;
+ void* p;
//ctx = rmr_init( "tcp:4360", 2048, 0 ); // do NOT call init -- that starts the rtc thread which isn't good here
ctx = (uta_ctx_t *) malloc( sizeof( uta_ctx_t ) ); // alloc the context manually
@@ -104,6 +112,10 @@
uta_lookup_rtg( ctx );
gen_rt( ctx ); // forces a static load with some known info since we don't start the rtc()
+ gen_rt( ctx ); // force a second load to test cloning
+
+ p = rt_ensure_ep( NULL, "foo" ); // drive for coverage
+ errors += fail_not_nil( p, "rt_ensure_ep did not return nil when given nil route table" );
state = rmr_ready( NULL );
errors += fail_if_true( state, "reported ready when given a nil context" );
diff --git a/test/unit_test.ksh b/test/unit_test.ksh
index 21848af..74bc048 100755
--- a/test/unit_test.ksh
+++ b/test/unit_test.ksh
@@ -267,6 +267,7 @@
show_all=1 # show all things -F sets to show failures only
strict=0 # -s (strict) will set; when off, coverage state ignored in final pass/fail
show_output=0 # show output from each test execution (-S)
+quiet=0
while [[ $1 == "-"* ]]
do
@@ -285,6 +286,7 @@
-s) strict=1;; # coverage counts toward pass/fail state
-S) show_output=1;; # test output shown even on success
-v) (( verbose++ ));;
+ -q) quiet=1;; # less chatty when spilling error log files
-h) usage; exit 0;;
--help) usage; exit 0;;
@@ -353,7 +355,12 @@
if ! ./${tfile%.c} >/tmp/PID$$.log 2>&1
then
echo "[FAIL] unit test failed for: $tfile"
- cat /tmp/PID$$.log
+ if (( quiet ))
+ then
+ grep "^<" /tmp/PID$$.log # in quiet mode just dump <...> messages which are assumed from the test programme not appl
+ else
+ cat /tmp/PID$$.log
+ fi
(( ut_errors++ )) # cause failure even if not in strict mode
continue # skip coverage tests for this
else
diff --git a/test/wormhole_static_test.c b/test/wormhole_static_test.c
index 9cb8d82..f2d4bf6 100644
--- a/test/wormhole_static_test.c
+++ b/test/wormhole_static_test.c
@@ -48,6 +48,7 @@
char wbuf[1024];
int errors = 0; // number errors found
int i;
+ void* p;
rmr_mbuf_t* mbuf; // mbuf to send to peer
int whid = -1;
@@ -100,11 +101,22 @@
whid = rmr_wh_open( ctx, "localhost:21961" );
errors += fail_not_equal( whid, 3, "attempt to fill in a hole didn't return expected" );
- rmr_wh_send_msg( NULL, 0, NULL ); // tests for coverage
- rmr_wh_send_msg( ctx, 0, NULL );
+ p = rmr_wh_send_msg( NULL, 0, NULL ); // tests for coverage
+ fail_not_nil( p, "wh_send_msg returned a pointer when given nil context and message" );
+
+ p = rmr_wh_send_msg( ctx, 0, NULL );
+ fail_not_nil( p, "wh_send_msg returned a pointer when given nil message with valid context" );
mbuf = rmr_alloc_msg( ctx, 2048 ); // get an muf to pass round
errors += fail_if_nil( mbuf, "unable to allocate mbuf for send tests (giving up on send tests)" );
+
+ mbuf->state = 0;
+ mbuf = rmr_wh_send_msg( NULL, 0, mbuf );
+ if( mbuf ) {
+ fail_if_equal( mbuf->state, 0, "wh_send_msg returned a zero state when given a nil context" );
+ }
+ fail_if_nil( mbuf, "wh_send_msg returned a nil message buffer when given a nil context" );
+
while( mbuf ) {
if( !(mbuf = rmr_wh_send_msg( ctx, 50, mbuf )) ) { // test for coverage
errors += fail_if_nil( mbuf, "send didn't return an mbuf (skip rest of send tests)" );