feat(API): Add subscription id and source retrieval
Subscription id is needed for applications which wish to route
messages based on subscription rather than message type. This
field is now available to user applications by the sub_id
field in the message. Subscription based applications may also
need to have access to the message source, so a get source
function has been added.
Change-Id: I796392a6e3899e8f01a3292796e54e7abd56c32f
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 6de24c5..2782147 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -24,7 +24,7 @@
set( major_version "1" ) # should be automatically populated from git tag later, but until CI process sets a tag we use this
set( minor_version "0" )
-set( patch_level "17" )
+set( patch_level "18" )
set( install_root "${CMAKE_INSTALL_PREFIX}" )
set( install_lib "lib" )
diff --git a/doc/CMakeLists.txt b/doc/CMakeLists.txt
index 2dbca2b..ddd737f 100644
--- a/doc/CMakeLists.txt
+++ b/doc/CMakeLists.txt
@@ -78,6 +78,7 @@
rmr_set_trace.3
rmr_tralloc_msg.3
rmr_get_trlen.3
+ rmr_get_src.3
)
# empty list of roff/troff input files we generated
diff --git a/doc/src/man/rmr.7.xfm b/doc/src/man/rmr.7.xfm
index eab9b9a..2ff5e6c 100644
--- a/doc/src/man/rmr.7.xfm
+++ b/doc/src/man/rmr.7.xfm
@@ -1,6 +1,6 @@
.if false
==================================================================================
- Copyright (c) 2019 Nokia
+ Copyright (c) 2019 Nokia
Copyright (c) 2018-2019 AT&T Intellectual Property.
Licensed under the Apache License, Version 2.0 (the "License");
@@ -25,7 +25,7 @@
.** if formatting with tfm, the roff.im will cause roff output to be generated
.** and rst.im will cause rst to be generated depending on OUTPUT_TYPE env
-.** var.
+.** var.
.** if formatting with pfm, then pretty postscript will be generated
.gv e LIB lib
@@ -33,7 +33,7 @@
.im &{lib}/generic_ps.im
.ei
.gv e OUTPUT_RST use_rst
- .if .ev &use_rst 1 =
+ .if .ev &use_rst 1 =
.im &{lib}/rst.im
.ei
.im &{lib}/roff.im
@@ -47,13 +47,13 @@
RMr -- Ric Message Router Library
&h2(DESCRIPTION)
-RMr is a library which provides a user application with the ability
+RMr is a library which provides a user application with the ability
to send and receive messages to/from other RMr based applications
without having to understand the underlying messaging transport environment (e.g. Nanomsg)
and without needing to know which other endpoint applications are currently
available and accepting messages.
To do this, RMr depends on a routing table generated by an external source.
-This table is used to determine the destination endpoint of each message sent by mapping the
+This table is used to determine the destination endpoint of each message sent by mapping the
message type T (supplied by the user application) to an endpoint entry.
Once determined, the message is sent directly to the endpoint.
The user application is unaware of which endpoint actually receives the
@@ -62,28 +62,28 @@
&space
RMr functions do provide for the ability to respond to the specific source
-instance of a message allowing for either a request response, or call
-response relationship when needed.
+instance of a message allowing for either a request response, or call
+response relationship when needed.
&h3(The Route Table)
-The library is supplied with a route table which maps message numbers to
+The library is supplied with a route table which maps message numbers to
endpoint groups such that each time a message of type T is sent, the message
-is delivered to one member of each group associated with T.
+is delivered to one member of each group associated with T.
For example, message type 2 might route to two different groups where
group A consists of worker1 and worker2, while group B consists only of
logger1.
&space
It is the responsibility of the route table generator to know which endpoints
-belong to which groups, and which groups accept which message types.
+belong to which groups, and which groups accept which message types.
Once understood, the route table generator publishes a table that is ingested
by RMr and used for mapping messages to end points.
&h3(Environment)
-To enable configuration of the library behaviour outside of direct user application
-control, RMr supports a number of environment variables which provide information
-to the library.
+To enable configuration of the library behaviour outside of direct user application
+control, RMr supports a number of environment variables which provide information
+to the library.
The following is a list of the various environment variables, what they control
and the defaults which RMr uses if undefined.
@@ -93,18 +93,18 @@
This should be the IP address assigned to the interface that RMr should listen
on, and if not defined RMr will listen on all interfaces.
-&di(RMR_RTG_SVC) RMr opens a TCP listen socket using the port defined by this
+&di(RMR_RTG_SVC) RMr opens a TCP listen socket using the port defined by this
environment variable and expects that the route table generator process
- will connect to this port.
+ will connect to this port.
If not supplied the port 4561 is used.
&di(RMR_RTG_ISRAW) Is set to 1 if the route table generator is sending "plain" messages
(not using RMr to send messages, 0 if the rtg is using RMr to send. The default
- is 1 as we don't expect the rtg to use RMr.
+ is 1 as we don't expect the rtg to use RMr.
-&di(RMR_SEED_RT) This is used to supply a static route table which can be used for
+&di(RMR_SEED_RT) This is used to supply a static route table which can be used for
debugging, testing, or if no route table generator process is being used to
- supply the route table.
+ supply the route table.
If not defined, no static table is used and RMr will not report &ital(ready)
until a table is received.
&end_dlist
@@ -118,6 +118,7 @@
rmr_free_msg(3),
rmr_init(3),
rmr_init_trace(3),
+rmr_get_src(3),
rmr_get_trace(3),
rmr_get_trlen(3),
rmr_payload_size(3),
diff --git a/doc/src/man/rmr_get_src.3.xfm b/doc/src/man/rmr_get_src.3.xfm
new file mode 100644
index 0000000..5ea92f3
--- /dev/null
+++ b/doc/src/man/rmr_get_src.3.xfm
@@ -0,0 +1,112 @@
+.if false
+==================================================================================
+ Copyright (c) 2019 Nokia
+ Copyright (c) 2018-2019 AT&T Intellectual Property.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+==================================================================================
+.fi
+
+.if false
+ Mnemonic rmr_get_src.xfm
+ Abstract The manual page for the rmr_get_src function.
+ Author E. Scott Daniels
+ Date 8 March 2019
+.fi
+
+.** if formatting with tfm, the roff.im will cause roff output to be generated
+.** if formatting with pfm, then pretty postscript will be generated
+.gv e LIB lib
+.if pfm
+ .im &{lib}/generic_ps.im
+.ei
+ .gv e OUTPUT_RST use_rst
+ .if .ev &use_rst 1 =
+ .im &{lib}/rst.im
+ .ei
+ .im &{lib}/roff.im
+ .fi
+.fi
+
+&line_len(6i)
+
+&h1(RMR Library Functions)
+&h2(NAME)
+ rmr_get_src
+
+&h2(SYNOPSIS)
+&indent
+&ex_start
+#include <rmr/rmr.h>
+
+unsigned char* rmr_get_src( rmr_mbuf_t* mbuf, unsigned char* dest )
+&ex_end
+
+&uindent
+
+&h2(DESCRIPTION)
+The &cw(rmr_get_src) function will copy the &ital(source) information from the message to a buffer
+(dest) supplied by the user.
+In an RMr message, the source is the sender's information that is used for return to sender
+function calls, and is generally the hostname and port in the form &ital(name:port).
+The source might be an IP address port combination; the data is populated by the sending process
+and the only requirement is that it be capable of being used to start a TCP session with the
+sender.
+.sp
+
+The maximum size allowed by RMr is 64 bytes (including the nil string terminator), so the user
+must ensure that the destination buffer given is at least 64 bytes.
+
+&h2(RETURN VALUE)
+On success, a pointer to the destination buffer is given as a convenience to the user programme.
+On failure, a nil pointer is returned and the value of errno is set.
+
+&h2(ERRORS)
+If an error occurs, the value of the global variable &cw( errno ) will be set to one of
+the following with the indicated meaning.
+
+&beg_dlist(.75i : ^&bold_font )
+&half_space
+&di(EINVAL) The message, or an internal portion of the message, was corrupted or the pointer was invalid.
+&end_dilist
+
+
+
+&h2(SEE ALSO )
+.ju off
+rmr_alloc_msg(3),
+rmr_bytes2xact(3),
+rmr_bytes2meid(3),
+rmr_call(3),
+rmr_free_msg(3),
+rmr_get_rcvfd(3),
+rmr_payload_size(3),
+rmr_send_msg(3),
+rmr_rcv_msg(3),
+rmr_rcv_specific(3),
+rmr_rts_msg(3),
+rmr_ready(3),
+rmr_fib(3),
+rmr_has_str(3),
+rmr_tokenise(3),
+rmr_mk_ring(3),
+rmr_ring_free(3),
+rmr_str2meid(3),
+rmr_str2xact(3),
+rmr_wh_open(3),
+rmr_wh_send_msg(3)
+.ju on
+
+
+.qu
+
diff --git a/src/common/include/rmr.h b/src/common/include/rmr.h
index 15c8400..4e662ef 100644
--- a/src/common/include/rmr.h
+++ b/src/common/include/rmr.h
@@ -74,11 +74,12 @@
into or out of their environment they dup it all, not just what we choose to expose.)
*/
typedef struct {
- int state; // state of processing
+ int state; // state of processing
int mtype; // message type
int len; // length of data in the payload (send or received)
unsigned char* payload; // transported data
unsigned char* xaction; // pointer to fixed length transaction id bytes
+ int sub_id; // subscription id
// these things are off limits to the user application
void* tp_buf; // underlying transport allocated pointer (e.g. nng message)
@@ -121,6 +122,7 @@
extern int rmr_bytes2xact( rmr_mbuf_t* mbuf, unsigned char const* src, int len );
extern void rmr_free_msg( rmr_mbuf_t* mbuf );
extern unsigned char* rmr_get_meid( rmr_mbuf_t* mbuf, unsigned char* dest );
+extern unsigned char* rmr_get_src( rmr_mbuf_t* mbuf, unsigned char* dest );
extern rmr_mbuf_t* rmr_realloc_msg( rmr_mbuf_t* mbuf, int new_tr_size );
extern int rmr_str2meid( rmr_mbuf_t* mbuf, unsigned char const* str );
extern void rmr_str2payload( rmr_mbuf_t* mbuf, unsigned char const* str );
diff --git a/src/common/include/rmr_agnostic.h b/src/common/include/rmr_agnostic.h
index c28ccb9..c25a535 100644
--- a/src/common/include/rmr_agnostic.h
+++ b/src/common/include/rmr_agnostic.h
@@ -72,7 +72,10 @@
//#define DEF_RTG_MSGID "" // default to pick up all messages from rtg
#define DEF_RTG_PORT "tcp:4561" // default port that we accept rtg connections on
#define DEF_COMM_PORT "tcp:4560" // default port we use for normal communications
-#define DEF_TR_LEN -1 // use default trace data len from context
+#define DEF_TR_LEN (-1) // use default trace data len from context
+
+#define UNSET_SUBID (-1) // initial value on msg allocation indicating not set
+#define UNSET_MSGTYPE (-1)
// -- header length/offset macros must ensure network conversion ----
#define RMR_HDR_LEN(h) (ntohl(((uta_mhdr_t *)h)->len0)+htonl(((uta_mhdr_t *)h)->len1)+htonl(((uta_mhdr_t *)h)->len2)+htonl(((uta_mhdr_t *)h)->len3)) // ALL things, not just formal struct
@@ -134,7 +137,7 @@
int32_t len1; // length of the tracing data
int32_t len2; // length of data 1 (d1)
int32_t len3; // length of data 2 (d2)
-
+ int32_t sub_id; // subscription id (-1 invalid)
} uta_mhdr_t;
diff --git a/src/common/src/mbuf_api.c b/src/common/src/mbuf_api.c
index ef0c28f..4b6d218 100644
--- a/src/common/src/mbuf_api.c
+++ b/src/common/src/mbuf_api.c
@@ -316,3 +316,28 @@
return RMR_TR_LEN( hdr );
}
+
+/*
+ Returns the string in the source portion of the header. This is assumed to be
+ something that can be used for direct sends (hostname:port). Regardless, it
+ will be a nil terminated, ascii string with max of 64 characters including
+ the final nil. So, the user must ensure that dest is at least 64 bytes.
+
+ As a convenience, the pointer to dest is returned on success; nil on failure
+ with errno set.
+*/
+extern unsigned char* rmr_get_src( rmr_mbuf_t* msg, unsigned char* dest ) {
+ uta_mhdr_t* hdr = NULL;
+
+ if( msg == NULL ) {
+ errno = EINVAL;
+ return NULL;
+ }
+
+ if( dest != NULL ) {
+ hdr = msg->header;
+ strcpy( dest, hdr->src );
+ }
+
+ return dest;
+}
diff --git a/src/nanomsg/src/sr_static.c b/src/nanomsg/src/sr_static.c
index 0f59da2..1c2bec8 100644
--- a/src/nanomsg/src/sr_static.c
+++ b/src/nanomsg/src/sr_static.c
@@ -100,6 +100,7 @@
hdr = (uta_mhdr_t *) msg->header;
hdr->rmr_ver = htonl( RMR_MSG_VER ); // current version
+ hdr->sub_id = htonl( UNSET_SUBID );
SET_HDR_LEN( hdr );
SET_HDR_TR_LEN( hdr, tr_len ); // set the actual length used
//SET_HDR_D1_LEN( hdr, ctx->d1_len ); // moot until we actually need these data areas
@@ -145,6 +146,7 @@
memcpy( nm->header, old_msg->header, RMR_HDR_LEN( old_msg->header ) ); // copy complete header, trace and other data
nm->mtype = old_msg->mtype;
+ nm->sub_id = old_msg->sub_id;
nm->len = old_msg->len; // length of data in the payload
nm->alloc_len = mlen; // length of allocated payload
nm->payload = PAYLOAD_ADDR( nm->header ); // reference the payload
@@ -208,6 +210,7 @@
// --- these are all version agnostic -----------------------------------
nm->mtype = old_msg->mtype;
+ nm->sub_id = old_msg->sub_id;
nm->len = old_msg->len; // length of data in the payload
nm->alloc_len = mlen; // length of allocated payload
@@ -249,6 +252,7 @@
msg->len = msg->state - RMR_HDR_LEN( hdr );
}
msg->mtype = ntohl( hdr->mtype ); // capture and convert from network order to local order
+ msg->sub_id = ntohl( hdr->sub_id ); // capture and convert from network order to local order
msg->state = RMR_OK;
msg->flags |= MFL_ADDSRC; // turn on so if user app tries to send this buffer we reset src
msg->payload = PAYLOAD_ADDR( msg->header );
@@ -284,7 +288,8 @@
msg->state = nn_recv( ctx->nn_sock, msg->header, msg->alloc_len, NO_FLAGS ); // read and state will be length
if( msg->state >= 0 ) {
msg->xaction = NULL;
- msg->mtype = -1;
+ msg->mtype = UNSET_MSGTYPE;
+ msg->sub_id = UNSET_SUBID;
msg->len = msg->state; // no header; len is the entire thing received
msg->state = RMR_OK;
msg->flags = MFL_RAW; // prevent any sending of this headerless buffer
@@ -295,7 +300,8 @@
msg->state = RMR_ERR_EMPTY;
msg->payload = NULL;
msg->xaction = NULL;
- msg->mtype = -1;
+ msg->mtype = UNSET_MSGTYPE;
+ msg->sub_id = UNSET_SUBID;
}
return msg;
@@ -318,7 +324,8 @@
// future: ensure that application did not overrun the XID buffer; last byte must be 0
hdr = (uta_mhdr_t *) msg->header;
- hdr->mtype = htonl( msg->mtype ); // stash type/len in network byte order for transport
+ hdr->mtype = htonl( msg->mtype ); // stash type/len/sub-id in network byte order for transport
+ hdr->sub_id = htonl( msg->sub_id );
hdr->plen = htonl( msg->len );
if( msg->flags & MFL_ADDSRC ) { // buffer was allocated as a receive buffer; must add our source
diff --git a/src/nng/src/sr_nng_static.c b/src/nng/src/sr_nng_static.c
index 3592a7b..c7bd049 100644
--- a/src/nng/src/sr_nng_static.c
+++ b/src/nng/src/sr_nng_static.c
@@ -139,6 +139,7 @@
memset( msg->header, 0, sizeof( uta_mhdr_t ) ); // ensure no junk in the header area
if( (hdr = (uta_mhdr_t *) msg->header) != NULL ) {
hdr->rmr_ver = htonl( RMR_MSG_VER ); // set current version
+ hdr->sub_id = htonl( UNSET_SUBID );
SET_HDR_LEN( hdr ); // ensure these are converted to net byte order
SET_HDR_TR_LEN( hdr, ctx->trace_data_len );
//SET_HDR_D1_LEN( hdr, ctx->d1_len ); // no need until we start using them
@@ -225,6 +226,7 @@
msg->xaction = &v1hdr->xid[0]; // point at transaction id in header area
msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
msg->mtype = ntohl( v1hdr->mtype ); // capture and convert from network order to local order
+ msg->sub_id = UNSET_SUBID; // type 1 messages didn't have this
msg->state = RMR_OK;
hlen = sizeof( uta_v1mhdr_t );
break;
@@ -238,6 +240,7 @@
msg->xaction = &hdr->xid[0]; // point at transaction id in header area
msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
msg->mtype = ntohl( hdr->mtype ); // capture and convert from network order to local order
+ msg->sub_id = ntohl( hdr->sub_id );
hlen = RMR_HDR_LEN( hdr ); // len to use for truncated check later
break;
}
@@ -290,6 +293,7 @@
// --- these are all version agnostic -----------------------------------
nm->mtype = old_msg->mtype;
+ nm->sub_id = old_msg->sub_id;
nm->len = old_msg->len; // length of data in the payload
nm->alloc_len = mlen; // length of allocated payload
@@ -362,6 +366,7 @@
// --- these are all version agnostic -----------------------------------
nm->mtype = old_msg->mtype;
+ nm->sub_id = old_msg->sub_id;
nm->len = old_msg->len; // length of data in the payload
nm->alloc_len = mlen; // length of allocated payload
@@ -444,7 +449,8 @@
msg->payload = NULL;
msg->xaction = NULL;
msg->flags |= MFL_ZEROCOPY; // this is a zerocopy sendable message
- msg->mtype = -1;
+ msg->mtype = UNSET_MSGTYPE;
+ msg->sub_id = UNSET_SUBID;
}
return msg;
@@ -480,7 +486,8 @@
msg->header = nng_msg_body( msg->tp_buf );
msg->len = rsize; // len is the number of bytes received
msg->alloc_len = rsize;
- msg->mtype = -1; // raw message has no type
+ msg->mtype = UNSET_MSGTYPE; // raw message has no type
+ msg->sub_id = UNSET_SUBID; // nor a subscription id
msg->state = RMR_OK;
msg->flags = MFL_RAW;
msg->payload = msg->header; // payload is the whole thing; no header
@@ -511,7 +518,8 @@
// future: ensure that application did not overrun the XID buffer; last byte must be 0
hdr = (uta_mhdr_t *) msg->header;
- hdr->mtype = htonl( msg->mtype ); // stash type/len in network byte order for transport
+ hdr->mtype = htonl( msg->mtype ); // stash type/len/sub_id in network byte order for transport
+ hdr->sub_id = htonl( msg->sub_id );
hdr->plen = htonl( msg->len );
tr_len = RMR_TR_LEN( hdr ); // snarf trace len before sending as hdr is invalid after send
diff --git a/test/mbuf_api_static_test.c b/test/mbuf_api_static_test.c
index 8598136..f232b3e 100644
--- a/test/mbuf_api_static_test.c
+++ b/test/mbuf_api_static_test.c
@@ -1,7 +1,7 @@
// : vi ts=4 sw=4 noet :
/*
==================================================================================
- Copyright (c) 2019 Nokia
+ Copyright (c) 2019 Nokia
Copyright (c) 2018-2019 AT&T Intellectual Property.
Licensed under the Apache License, Version 2.0 (the "License");
@@ -21,7 +21,7 @@
/*
Mmemonic: mbuf_api_static_test.c
Abstract: Test the message buffer funcitons. These are meant to be included at compile
- time by the test driver.
+ time by the test driver.
Author: E. Scott Daniels
Date: 3 April 2019
@@ -44,6 +44,7 @@
int i;
int state;
int errors = 0;
+ char* buf;
rmr_mbuf_t* mbuf;
unsigned char src_buf[256];
@@ -131,7 +132,7 @@
errno = 0;
c = rmr_get_meid( NULL, NULL );
errors += fail_if( c != NULL, "get meid with nil message buffer" );
- errors += fail_if( errno == 0, "(errno bad) get meid with nil msg buffer" );
+ errors += fail_if( errno == 0, "(errno bad) get meid with nil msg buffer" );
c = rmr_get_meid( mbuf, NULL ); // should allocate and return c
errors += fail_if( c == NULL, "get meid with nil dest pointer (did not allocate a buffer)" );
@@ -248,6 +249,20 @@
errors+= fail_not_equal( state, 0, "compare of pulled trace info did not match" );
i = rmr_get_trlen( mbuf );
+
+
+ // ------------- source field tests ------------------------------------------------------------
+ // we cannot force anything into the message source field, so no way to test the content, but we
+ // can test pointers and expected nils
+
+ buf = rmr_get_src( NULL, src_buf ); // coverage test for nil msg check
+ errors += fail_not_nil( buf, "rmr_get_src returned a pointer when given a nil message" );
+
+ buf = rmr_get_src( mbuf, NULL ); // coverage test for nil dst check
+ errors += fail_not_nil( buf, "rmr_get_src returned a pointer when given a nil dest buffer" );
+
+ buf = rmr_get_src( mbuf, src_buf );
+ errors += fail_not_equal( buf, src_buf, "rmr_get_src didn't return expexted buffer pointer" );
return errors > 0; // overall exit code bad if errors