Revert RTS to use unidirectional connection
Because of potential performance impacts it was decided to
revert return to sender messages to NOT use the same
connection that the message was received on. This applies
only to the SI95; RTS has always been on uni-directional
connections with NNG.
This change also enables the MEID routing in the SI95 code.
Issue-ID: RIC-153
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Change-Id: I6415effb3283f5961aa0098f1ff0d1380d7aa197
diff --git a/CHANGES b/CHANGES
index 2854cd5..f77ae13 100644
--- a/CHANGES
+++ b/CHANGES
@@ -2,6 +2,9 @@
API and build change and fix summaries. Doc correctsions
and/or changes are not mentioned here; see the commit messages.
+2020 February 21; version 3.2.3
+ Add meid routing support to the SI95 interface.
+
2020 February 20; version 3.2.2
Fix receive thread related core dump (ring early unlock).
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 41eee9f..678a408 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -38,7 +38,7 @@
set( major_version "3" ) # should be automatically populated from git tag later, but until CI process sets a tag we use this
set( minor_version "2" )
-set( patch_level "2" )
+set( patch_level "3" )
set( install_root "${CMAKE_INSTALL_PREFIX}" )
set( install_inc "include/rmr" )
diff --git a/src/rmr/common/include/rmr_agnostic.h b/src/rmr/common/include/rmr_agnostic.h
index 6b1537b..c2139c3 100644
--- a/src/rmr/common/include/rmr_agnostic.h
+++ b/src/rmr/common/include/rmr_agnostic.h
@@ -301,6 +301,7 @@
static inline uint64_t build_rt_key( int32_t sub_id, int32_t mtype );
static void collect_things( void* st, void* entry, char const* name, void* thing, void* vthing_list );
static void del_rte( void* st, void* entry, char const* name, void* thing, void* data );
+static endpoint_t* get_meid_owner( route_table_t *rt, char* meid );
static char* uta_fib( char* fname );
static route_table_t* uta_rt_init( );
static route_table_t* uta_rt_clone( route_table_t* srt );
diff --git a/src/rmr/common/src/rt_generic_static.c b/src/rmr/common/src/rt_generic_static.c
index f098f53..16fddd1 100644
--- a/src/rmr/common/src/rt_generic_static.c
+++ b/src/rmr/common/src/rt_generic_static.c
@@ -1235,4 +1235,18 @@
return key;
}
+/*
+ Given a route table and meid string, find the owner (if known). Returns a pointer to
+ the endpoint struct or nil.
+*/
+static inline endpoint_t* get_meid_owner( route_table_t *rt, char* meid ) {
+ endpoint_t* ep; // the ep we found in the hash
+
+ if( rt == NULL || rt->hash == NULL || meid == NULL || *meid == 0 ) {
+ return NULL;
+ }
+
+ return (endpoint_t *) rmr_sym_get( rt->hash, meid, RT_ME_SPACE );
+}
+
#endif
diff --git a/src/rmr/nng/src/rtable_nng_static.c b/src/rmr/nng/src/rtable_nng_static.c
index 6adf230..2cec490 100644
--- a/src/rmr/nng/src/rtable_nng_static.c
+++ b/src/rmr/nng/src/rtable_nng_static.c
@@ -391,20 +391,6 @@
}
/*
- Given a route table and meid string, find the owner (if known). Returns a pointer to
- the endpoint struct or nil.
-*/
-static inline endpoint_t* get_meid_owner( route_table_t *rt, char* meid ) {
- endpoint_t* ep; // the ep we found in the hash
-
- if( rt == NULL || rt->hash == NULL || meid == NULL || *meid == 0 ) {
- return NULL;
- }
-
- return (endpoint_t *) rmr_sym_get( rt->hash, meid, RT_ME_SPACE );
-}
-
-/*
Return a string of count information. E.g.:
<ep-name>:<port> <good> <hard-fail> <soft-fail>
diff --git a/src/rmr/si/src/rmr_si.c b/src/rmr/si/src/rmr_si.c
index 0a4f806..9c67dc3 100644
--- a/src/rmr/si/src/rmr_si.c
+++ b/src/rmr/si/src/rmr_si.c
@@ -102,8 +102,8 @@
The allocated len stored in the msg is:
transport header length +
- message header +
- user requested payload
+ message header +
+ user requested payload
The msg header is a combination of the fixed RMR header and the variable
trace data and d2 fields which may vary for each message.
@@ -205,7 +205,7 @@
d1 = DATA1_ADDR( msg->header );
d1[D1_CALLID_IDX] = NO_CALL_ID; // must blot out so it doesn't queue on a chute at the other end
- }
+ }
return mtosend_msg( vctx, msg, max_to );
}
@@ -223,7 +223,7 @@
d1 = DATA1_ADDR( msg->header );
d1[D1_CALLID_IDX] = NO_CALL_ID; // must blot out so it doesn't queue on a chute at the other end
- }
+ }
return rmr_mtosend_msg( vctx, msg, -1 ); // retries < 0 uses default from ctx
}
@@ -231,16 +231,20 @@
/*
Return to sender allows a message to be sent back to the endpoint where it originated.
- In the SI world the file descriptor that was the source of the message is captured in
- the mbuffer and thus can be used to quickly find the target for an RTS call.
+ With SI95 it was thought that the return to sender would be along the same open conneciton
+ and thus no table lookup would be needed to open a 'reverse direction' path. However, for
+ applications sending at high message rates, returning responses on the same connection
+ causes major strife. Thus the decision was made to use the same method as NNG and just
+ open a second connection for reverse path.
- The source information in the message is used to select the socket on which to write
- the message rather than using the message type and round-robin selection. This
- should return a message buffer with the state of the send operation set. On success
- (state is RMR_OK, the caller may use the buffer for another receive operation), and on
- error it can be passed back to this function to retry the send if desired. On error,
- errno will liklely have the failure reason set by the nng send processing.
- The following are possible values for the state in the message buffer:
+ We will attempt to use the name in the received message to look up the endpoint. If
+ that failes, then we will write on the connection that the message arrived on as a
+ falback.
+
+ On success (state is RMR_OK, the caller may use the buffer for another receive operation),
+ and on error it can be passed back to this function to retry the send if desired. On error,
+ errno will liklely have the failure reason set by the nng send processing. The following
+ are possible values for the state in the message buffer:
Message states returned:
RMR_ERR_BADARG - argument (context or msg) was nil or invalid
@@ -253,7 +257,7 @@
failure. The value of errno might give a clue as to what is wrong.
CAUTION:
- Like send_msg(), this is non-blocking and will return the msg if there is an errror.
+ Like send_msg(), this is non-blocking and will return the msg if there is an error.
The caller must check for this and handle it properly.
*/
extern rmr_mbuf_t* rmr_rts_msg( void* vctx, rmr_mbuf_t* msg ) {
@@ -284,21 +288,20 @@
((uta_mhdr_t *) msg->header)->flags &= ~HFL_CALL_MSG; // must ensure call flag is off
-/*
- sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock, &ep, ctx->si_ctx ); // src is always used first for rts
+ sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->src, &nn_sock, &ep ); // always try src first
if( ! sock_ok ) {
-*/
- if( (nn_sock = msg->rts_fd) < 0 ) {
- if( HDR_VERSION( msg->header ) > 2 ) { // with ver2 the ip is there, try if src name not known
- //sock_ok = uta_epsock_byname( ctx->rtable, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep, ctx->si_ctx );
- sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep );
- }
- if( ! sock_ok ) {
- msg->state = RMR_ERR_NOENDPT;
- return msg; // preallocated msg can be reused since not given back to nn
+ if( (nn_sock = msg->rts_fd) < 0 ) {
+ if( HDR_VERSION( msg->header ) > 2 ) { // with ver2 the ip is there, try if src name not known
+ sock_ok = uta_epsock_byname( ctx, (char *) ((uta_mhdr_t *)msg->header)->srcip, &nn_sock, &ep );
+ }
+ if( ! sock_ok ) {
+ msg->state = RMR_ERR_NOENDPT;
+ return msg;
+ }
}
}
+
msg->state = RMR_OK; // ensure it is clear before send
hold_src = strdup( (char *) ((uta_mhdr_t *)msg->header)->src ); // the dest where we're returning the message to
hold_ip = strdup( (char *) ((uta_mhdr_t *)msg->header)->srcip ); // both the src host and src ip
@@ -310,7 +313,7 @@
case RMR_OK:
ep->scounts[EPSC_GOOD]++;
break;
-
+
case RMR_ERR_RETRY:
ep->scounts[EPSC_TRANS]++;
break;
@@ -335,7 +338,7 @@
If multi-threading call is turned on, this invokes that mechanism with the special call
id of 1 and a max wait of 1 second. If multi threaded call is not on, then the original
behavour (described below) is carried out. This is safe to use when mt is enabled, but
- the user app is invoking rmr_call() from only one thread, and the caller doesn't need
+ the user app is invoking rmr_call() from only one thread, and the caller doesn't need
a flexible timeout.
On timeout this function will return a nil pointer. If the original message could not
@@ -686,7 +689,7 @@
if( (interface = getenv( ENV_BIND_IF )) == NULL ) {
interface = "0.0.0.0";
}
-
+
snprintf( bind_info, sizeof( bind_info ), "%s:%s", interface, port ); // FIXME -- si only supports 0.0.0.0 by default
if( (state = SIlistener( ctx->si_ctx, TCP_DEVICE, bind_info )) < 0 ) {
rmr_vlog( RMR_VL_CRIT, "rmr_init: unable to start si listener for %s: %s\n", bind_info, strerror( errno ) );
@@ -843,7 +846,7 @@
long nano_sec; // max wait xlated to nano seconds
int state;
rmr_mbuf_t* ombuf; // mbuf user passed; if we timeout we return state here
-
+
if( (ctx = (uta_ctx_t *) vctx) == NULL ) {
errno = EINVAL;
if( mbuf ) {
@@ -861,7 +864,7 @@
if( (mbuf = (rmr_mbuf_t *) uta_ring_extract( ctx->mring )) != NULL ) { // pop if queued
if( ombuf ) {
rmr_free_msg( ombuf ); // can't reuse, caller's must be trashed now
- }
+ }
} else {
mbuf = ombuf; // return original if it was given with timeout status
if( ombuf != NULL ) {
@@ -957,7 +960,7 @@
long seconds = 0; // max wait seconds
long nano_sec; // max wait xlated to nano seconds
int state;
-
+
errno = EINVAL;
if( (ctx = (uta_ctx_t *) vctx) == NULL || mbuf == NULL ) {
if( mbuf ) {
@@ -986,7 +989,7 @@
rmr_free_msg( chute->mbuf );
chute->mbuf = NULL;
}
-
+
hdr = (uta_mhdr_t *) mbuf->header;
hdr->flags |= HFL_CALL_MSG; // must signal this sent with a call
memcpy( chute->expect, mbuf->xaction, RMR_MAX_XID ); // xaction that we will wait for
@@ -995,7 +998,7 @@
mbuf->flags |= MFL_NOALLOC; // send message without allocating a new one (expect nil from mtosend
if( max_wait >= 0 ) {
- clock_gettime( CLOCK_REALTIME, &ts );
+ clock_gettime( CLOCK_REALTIME, &ts );
if( max_wait > 999 ) {
seconds = max_wait / 1000;
@@ -1059,20 +1062,20 @@
Given an existing message buffer, reallocate the payload portion to
be at least new_len bytes. The message header will remain such that
the caller may use the rmr_rts_msg() function to return a payload
- to the sender.
+ to the sender.
The mbuf passed in may or may not be reallocated and the caller must
- use the returned pointer and should NOT assume that it can use the
+ use the returned pointer and should NOT assume that it can use the
pointer passed in with the exceptions based on the clone flag.
If the clone flag is set, then a duplicated message, with larger payload
size, is allocated and returned. The old_msg pointer in this situation is
- still valid and must be explicitly freed by the application. If the clone
+ still valid and must be explicitly freed by the application. If the clone
message is not set (0), then any memory management of the old message is
handled by the function.
- If the copy flag is set, the contents of the old message's payload is
- copied to the reallocated payload. If the flag is not set, then the
+ If the copy flag is set, the contents of the old message's payload is
+ copied to the reallocated payload. If the flag is not set, then the
contents of the payload is undetermined.
*/
extern rmr_mbuf_t* rmr_realloc_payload( rmr_mbuf_t* old_msg, int new_len, int copy, int clone ) {
diff --git a/src/rmr/si/src/rtable_si_static.c b/src/rmr/si/src/rtable_si_static.c
index 659f857..6215176 100644
--- a/src/rmr/si/src/rtable_si_static.c
+++ b/src/rmr/si/src/rtable_si_static.c
@@ -387,6 +387,66 @@
}
/*
+ Given a message, use the meid field to find the owner endpoint for the meid.
+ The owner ep is then used to extract the socket through which the message
+ is sent. This returns TRUE if we found a socket and it was written to the
+ nn_sock pointer; false if we didn't.
+
+ We've been told that the meid is a string, thus we count on it being a nil
+ terminated set of bytes.
+*/
+static int epsock_meid( uta_ctx_t* ctx, route_table_t *rtable, rmr_mbuf_t* msg, int* nn_sock, endpoint_t** uepp ) {
+ endpoint_t* ep; // seected end point
+ int state = FALSE; // processing state
+ char* meid;
+ si_ctx_t* si_ctx;
+
+ if( PARINOID_CHECKS ) {
+ if( ctx == NULL || (si_ctx = ctx->si_ctx) == NULL ) {
+ return FALSE;
+ }
+ } else {
+ si_ctx = ctx->si_ctx;
+ }
+
+ errno = 0;
+ if( ! nn_sock || msg == NULL || rtable == NULL ) { // missing stuff; bail fast
+ errno = EINVAL;
+ return FALSE;
+ }
+
+ meid = ((uta_mhdr_t *) msg->header)->meid;
+
+ if( (ep = get_meid_owner( rtable, meid )) == NULL ) {
+ if( uepp != NULL ) { // caller needs refernce to endpoint too
+ *uepp = NULL;
+ }
+
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_meid: no ep in hash for (%s)\n", meid );
+ return FALSE;
+ }
+
+ state = TRUE;
+ if( ! ep->open ) { // not connected
+ if( ep->addr == NULL ) { // name didn't resolve before, try again
+ ep->addr = strdup( ep->name ); // use the name directly; if not IP then transport will do dns lookup
+ }
+
+ if( uta_link2( si_ctx, ep ) ) { // find entry in table and create link
+ ep->open = TRUE;
+ *nn_sock = ep->nn_sock; // pass socket back to caller
+ } else {
+ state = FALSE;
+ }
+ if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "epsock_meid: connection attempted with %s: %s\n", ep->name, state ? "[OK]" : "[FAIL]" );
+ } else {
+ *nn_sock = ep->nn_sock;
+ }
+
+ return state;
+}
+
+/*
Finds the rtable entry which matches the key. Returns a nil pointer if
no entry is found. If try_alternate is set, then we will attempt
to find the entry with a key based only on the message type.
diff --git a/src/rmr/si/src/sr_si_static.c b/src/rmr/si/src/sr_si_static.c
index 0b726dc..485611e 100644
--- a/src/rmr/si/src/sr_si_static.c
+++ b/src/rmr/si/src/sr_si_static.c
@@ -713,7 +713,12 @@
send_again = 1; // force loop entry
group = 0; // always start with group 0
while( send_again ) {
- sock_ok = uta_epsock_rr( ctx, rte, group, &send_again, &nn_sock, &ep ); // select endpt from rr group and set again if more groups
+ if( rte->nrrgroups > 0 ) { // this is a round robin entry if groups are listed
+ sock_ok = uta_epsock_rr( ctx, rte, group, &send_again, &nn_sock, &ep ); // select endpt from rr group and set again if more groups
+ } else {
+ sock_ok = epsock_meid( ctx, ctx->rtable, msg, &nn_sock, &ep );
+ send_again = 0;
+ }
if( DEBUG ) rmr_vlog( RMR_VL_DEBUG, "mtosend_msg: flgs=0x%04x type=%d again=%d group=%d len=%d sock_ok=%d\n",
msg->flags, msg->mtype, send_again, group, msg->len, sock_ok );