enhance(test): Add route generation simulator
The simulator provids the means to generate one or more
route tables which are delivered through the RMr route table
collector thread.
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
Change-Id: I95d73ad5a0dc28c380bed2b9cb314a8b646429d8
Signed-off-by: E. Scott Daniels <daniels@research.att.com>
diff --git a/test/rtg_sim/BUILD b/test/rtg_sim/BUILD
new file mode 100644
index 0000000..0f2feee
--- /dev/null
+++ b/test/rtg_sim/BUILD
@@ -0,0 +1,38 @@
+#
+#==================================================================================
+# Copyright (c) 2019 Nokia
+# Copyright (c) 2018-2019 AT&T Intellectual Property.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#==================================================================================
+#
+
+To build the rtm_sim application, the expected CMake "process" is followed
+and described below:
+
+ mkdir .build
+ cd .build
+ cmake ..
+ make package
+
+
+This will generate at least a .deb, and if the underlying system can support
+it an RPM package as well.
+
+
+Once installed, the rtm_sim application can be started from the command line:
+ rtm_sim config-file-name
+
+The config file defines the table(s) which are to be constructed and distributed
+to applications.
+
diff --git a/test/rtg_sim/CMakeLists.txt b/test/rtg_sim/CMakeLists.txt
new file mode 100644
index 0000000..6753dd3
--- /dev/null
+++ b/test/rtg_sim/CMakeLists.txt
@@ -0,0 +1,77 @@
+
+#
+#==================================================================================
+# Copyright (c) 2019 Nokia
+# Copyright (c) 2018-2019 AT&T Intellectual Property.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#==================================================================================
+#
+
+# simulate the route table manager
+project( rtm_sim LANGUAGES C )
+cmake_minimum_required( VERSION 3.5 )
+
+set( major_version "1" )
+set( minor_version "0" )
+set( patch_level "1" )
+
+set( CMAKE_POSITION_INDEPENDENT_CODE ON )
+set( CMAKE_CXX_FLAGS "-g -Wall -I /usr/local/include" )
+
+find_library( nanomsg libnanomsg.a )
+
+find_program( rpm NAMES rpmbuild ) # rpm package gen requires this to be installed
+
+if( "${rpm}" MATCHES "rpm-NOTFOUND" ) # cannot build rpm
+ set( pkg_list "DEB" )
+ message( "+++ `make package` will generate only deb package; cannot find support to generate rpm packages" )
+else()
+ set( pkg_list "DEB;RPM" )
+ message( "+++ `make package` will generate both deb and rpm packages" )
+endif()
+
+add_executable( rtm_sim rtm_sim.c )
+target_link_libraries( rtm_sim nanomsg )
+
+install(
+ TARGETS rtm_sim
+ DESTINATION /usr/local/bin
+)
+
+IF( EXISTS "${CMAKE_ROOT}/Modules/CPack.cmake" )
+ include( InstallRequiredSystemLibraries )
+
+ set( CPACK_set_DESTDIR "on" )
+ set( CPACK_PACKAGING_INSTALL_PREFIX "${install_root}" )
+ set( CPACK_GENERATOR ${pkg_list} )
+
+ set( CPACK_PACKAGE_DESCRIPTION "Simplistic route table manager simulator." )
+ set( CPACK_PACKAGE_DESCRIPTION_SUMMARY "RT manager simulation" )
+ set( CPACK_PACKAGE_VENDOR "None" )
+ set( CPACK_PACKAGE_CONTACT "None" )
+ set( CPACK_PACKAGE_VERSION_MAJOR "${major_version}" )
+ set( CPACK_PACKAGE_VERSION_MINOR "${minor_version}" )
+ set( CPACK_PACKAGE_VERSION_PATCH "${patch_level}" )
+ set( CPACK_PACKAGE_FILE_NAME "rtm_sim-${major_version}.${minor_version}.${patch_level}-${CMAKE_SYSTEM_PROCESSOR}" )
+
+ # we build and ship the libraries, so there is NO dependency
+ set( CPACK_DEBIAN_PACKAGE_DEPENDS "libnanomsg0 (>=0.4)" )
+
+ set( CPACK_DEBIAN_PACKAGE_PRIORITY "optional" )
+ set( CPACK_DEBIAN_PACKAGE_SECTION "test" )
+ set( CPACK_DEBIAN_ARCHITECTURE ${CMAKE_SYSTEM_PROCESSOR} )
+ set( CPACK_RPM_ARCHITECTURE ${CMAKE_SYSTEM_PROCESSOR} )
+
+ INCLUDE( CPack )
+ENDIF()
diff --git a/test/rtg_sim/README b/test/rtg_sim/README
new file mode 100644
index 0000000..2ab2b7c
--- /dev/null
+++ b/test/rtg_sim/README
@@ -0,0 +1,36 @@
+#
+#==================================================================================
+# Copyright (c) 2019 Nokia
+# Copyright (c) 2018-2019 AT&T Intellectual Property.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#==================================================================================
+#
+
+The rtm_sim is an extremely simple application which reads a configuration
+file, builds one or more route tables, and sends the table(s) to the
+indicated applications. The applications are assumed to be RMr based
+applications which are listening either on a well known port (4561) for
+table updates, or on a port which is indicated in the config file.
+
+The sim is _not_ intended to be a complete replacement for any route
+table mangager, but simply allows the ability to:
+
+ 1) ensure that the RMr based application can receive real-time
+ table updates (thus is configured correctly with the needed
+ port properly exposed via the container environment if
+ applicable).
+
+ 2) allow the generation of one or more route tables from a central
+ point during testing without having to run a more complicated
+ route manager for basic testing.
diff --git a/test/rtg_sim/req_resp.c b/test/rtg_sim/req_resp.c
new file mode 100644
index 0000000..9d58472
--- /dev/null
+++ b/test/rtg_sim/req_resp.c
@@ -0,0 +1,352 @@
+// :vi ts=4 sw=4 noet:
+/*
+==================================================================================
+ Copyright (c) 2019 Nokia
+ Copyright (c) 2018-2019 AT&T Intellectual Property.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+==================================================================================
+*/
+
+/*
+ Mnemonic: req_rep.c
+ Abstract: A "library" module which allows a programme to easily be a requestor
+ or replier. Some functions are compatable with publishing (mbuf
+ allocation and management). Underlying we use the NN_PAIR and NOT
+ the req/rep model as that model is an inflexible, lock step, exchange
+ which does not lend well for a request that results in more than one
+ response messages, or no response.
+
+ The user must be aware that once a session is established on the
+ host:port listener, another session will not be accepted until the
+ first is terminated; nano makes no provision for multiple concurrent
+ sesssions with either the PAIR or REQ/RESP models.
+
+ We also support starting the publisher socket as the buffer and
+ send functions can be used for the publisher too.
+
+ CAUTION: this is based on nanomsg, not NNG. The underlying protocols
+ are compatable, and because NNG has an emulation mode it is possible
+ to link successsfully with the nng library, BUT that will not
+ work here. Link only with nanomsg.
+
+ Date: 18 January 2018
+ Author: E. Scott Daniels
+
+*/
+
+#include <ctype.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <time.h>
+#include <unistd.h>
+#include <errno.h>
+#include <stdint.h>
+
+#include <nanomsg/nn.h>
+#include <nanomsg/pair.h>
+#include <nanomsg/pipeline.h>
+#include <nanomsg/pubsub.h>
+
+#include "req_resp.h"
+
+#define NULL_SOCKET 0 // fluff that is treated like a nil pointer check by coverage checker
+
+
+/*
+ Connect to the host as a requestor. returns context if
+ successful.
+*/
+extern void* rr_connect( char* host, char* port ) {
+ rr_ctx_t* ctx = NULL;
+ char wbuf[1024];
+ int state;
+
+ if( host == NULL || port == NULL ) {
+ errno = EINVAL;
+ return NULL;
+ }
+
+ ctx = (rr_ctx_t *) malloc( sizeof *ctx );
+ if( ctx == NULL ) {
+ errno = ENOMEM;
+ return NULL;
+ }
+
+ //ctx->nn_sock = nn_socket( AF_SP, NN_PAIR );
+ ctx->nn_sock = nn_socket( AF_SP, NN_PUSH );
+ if( ctx->nn_sock < NULL_SOCKET ) {
+ free( ctx );
+ return NULL;
+ }
+ snprintf( wbuf, sizeof( wbuf ), "tcp://%s:%s", host, port );
+ state = nn_connect( ctx->nn_sock, wbuf );
+ if( state < 0 ) {
+ fprintf( stderr, "rr_conn: connect failed: %s: %d %s\n", wbuf, errno, strerror( errno ) );
+ nn_close( ctx->nn_sock );
+ free( ctx );
+ return NULL;
+ }
+
+ //fprintf( stderr, "rr_conn: connect successful: %s\n", wbuf );
+ return (void *) ctx;
+}
+
+
+/*
+ Set up as a listener on any interface with the given port.
+*/
+extern void* rr_start_listening( char* port ) {
+ rr_ctx_t* ctx;
+ char wbuf[1024];
+ int state;
+
+ if( port == NULL ) {
+ errno = EINVAL;
+ return NULL;
+ }
+
+ ctx = (rr_ctx_t *) malloc( sizeof *ctx );
+ if( ctx == NULL ) {
+ errno = EINVAL;
+ return NULL;
+ }
+
+ //ctx->nn_sock = nn_socket( AF_SP, NN_PAIR );
+ ctx->nn_sock = nn_socket( AF_SP, NN_PULL );
+ if( ctx->nn_sock < NULL_SOCKET ) {
+ free( ctx );
+ return NULL;
+ }
+
+ snprintf( wbuf, sizeof( wbuf ), "tcp://0.0.0.0:%s", port );
+ state = nn_bind( ctx->nn_sock, wbuf );
+ if( state < 0 ) {
+ nn_close( ctx->nn_sock );
+ free( ctx );
+ return NULL;
+ }
+
+ return (void *) ctx;
+}
+
+/*
+ Configure and bind the publisher. Port is a string as it's probably read from
+ the command line, so no need to atoi() it for us. We can use the rr_* functions
+ for message buffers and sending, so we reuse their context rather than define our
+ own.
+
+*/
+extern void* open_publisher( char* port ) {
+ rr_ctx_t* pctx;
+ char conn_info[1024];
+
+ if( (pctx = (rr_ctx_t *) malloc( sizeof( *pctx )) ) == NULL ) {
+ return NULL;
+ }
+
+ pctx->nn_sock = nn_socket( AF_SP, NN_PUB ); // publishing socket
+ if( pctx->nn_sock < 0 ) {
+ fprintf( stderr, "[CRI] unable to open publish socket: %s\n", strerror( errno ) );
+ free( pctx );
+ return NULL;
+ }
+
+ snprintf( conn_info, sizeof( conn_info ), "tcp://0.0.0.0:%s", port ); // listen on any interface
+ if( nn_bind( pctx->nn_sock, conn_info ) < 0) { // bind and automatically accept client sessions
+ fprintf (stderr, "[CRI] unable to bind publising port: %s: %s\n", port, strerror( errno ) );
+ nn_close ( pctx->nn_sock );
+ free( pctx );
+ return NULL;
+ }
+
+ return (void *) pctx;
+}
+
+extern rr_mbuf_t* rr_new_buffer( rr_mbuf_t* mb, int len ) {
+
+ if( ! mb ) {
+ mb = (rr_mbuf_t *) malloc( sizeof( *mb ) );
+ mb->size = len;
+ mb->payload = NULL;
+ } else {
+ if( mb->size < len ) { // if requested len is larger than current payload
+ nn_freemsg( mb->payload );
+ mb->payload = NULL;
+ } else {
+ len = mb->size;
+ }
+ }
+ mb->used = 0;
+
+ if( len > 0 && !mb->payload ) { // allow a payloadless buffer to be allocated
+ mb->payload = nn_allocmsg( len, 0 );
+ }
+
+ return mb;
+}
+
+/*
+ Closes the currently open session.
+*/
+extern void rr_close( void* vctx ) {
+ rr_ctx_t* ctx;
+
+ if( (ctx = (rr_ctx_t *) vctx) == NULL ) {
+ return;
+ }
+
+ if( ctx->nn_sock < NULL_SOCKET ) {
+ return;
+ }
+
+ nn_close( ctx->nn_sock );
+ ctx->nn_sock = -1;
+}
+
+extern void rr_free( void* vctx ) {
+ rr_ctx_t* ctx;
+
+ if( (ctx = (rr_ctx_t *) vctx) == NULL ) {
+ return;
+ }
+
+ rr_close( ctx );
+ nn_term();
+ free( ctx );
+}
+
+extern void rr_free_mbuf( rr_mbuf_t* mbuf ) {
+ if( mbuf->payload ) {
+ nn_freemsg( mbuf->payload );
+ mbuf->payload = NULL;
+ mbuf->used = -2; // just in case they held a pointer and try to use it
+ }
+
+ free( mbuf );
+}
+
+extern rr_mbuf_t* rr_receive( void* vctx, rr_mbuf_t* mbuf, int len ) {
+ rr_ctx_t* ctx;
+
+ if( (ctx = (rr_ctx_t *) vctx) == NULL ) {
+ errno = EINVAL;
+ return NULL;
+ }
+ if( ctx->nn_sock < 0 ) {
+ errno = ESTALE; // stale/bad socket fd
+ return NULL;
+ }
+
+ mbuf = rr_new_buffer( mbuf, len );
+ if( mbuf == NULL ) {
+ return NULL;
+ }
+
+ *mbuf->payload = 0;
+ if( (mbuf->used = nn_recv( ctx->nn_sock, mbuf->payload, mbuf->size, 0 )) > 0 ) {
+ errno = 0; // nano doesn't seem to clear errno here
+ }
+ return mbuf;
+}
+
+extern rr_mbuf_t* rr_send( void* vctx, rr_mbuf_t* mbuf, int alloc_buf ) {
+ rr_ctx_t* ctx;
+ int len;
+ int state;
+
+ if( (ctx = (rr_ctx_t *) vctx) == NULL ) {
+ errno = EINVAL;
+ return NULL;
+ }
+
+ if( ctx->nn_sock < 0 ) {
+ errno = ESTALE; // stale/bad socket fd
+ return NULL;
+ }
+
+ if( ! mbuf ) {
+ errno = ENOBUFS; // not quite right, but close enough
+ return NULL;
+ }
+
+ if( ! mbuf->payload ) { // no payload????
+ errno = EFAULT; // nil is a bad address after all :)
+ return mbuf;
+ }
+
+ errno = 0;
+ //fprintf( stderr, "rrsend is sending %d bytes....\n", mbuf->used );
+ if( (state = nn_send( ctx->nn_sock, &mbuf->payload, NN_MSG, 0 )) > 0 ) {
+ //fprintf( stderr, "send ok to %d: %d %s\n", ctx->nn_sock, state, strerror( errno ) );
+ mbuf->used = 0;
+ if( alloc_buf ) {
+ mbuf->payload = nn_allocmsg( mbuf->size, 0 ); // allocate the next send buffer
+ } else {
+ mbuf->payload = NULL;
+ mbuf->used = -1;
+ }
+
+ errno = 0;
+ } else {
+ fprintf( stderr, "send failed %d %s\n", state, strerror( errno ) );
+ }
+
+ return mbuf;
+}
+
+/*
+ Set the receive timeout to time. If time >100 we assume the time is milliseconds,
+ else we assume seconds. Setting -1 is always block.
+ Returns the nn value (0 on success <0 on error).
+*/
+extern int rr_rcv_to( void* vctx, int time ) {
+ rr_ctx_t* ctx;
+
+ if( (ctx = (rr_ctx_t *) vctx) == NULL ) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if( time > 0 ) {
+ if( time < 100 ) {
+ time = time * 1000; // assume seconds, nn wants ms
+ }
+ }
+
+ return nn_setsockopt( ctx->nn_sock, NN_SOL_SOCKET, NN_RCVTIMEO, &time, sizeof( time ) );
+}
+
+/*
+ Set the send timeout to time. If time >100 we assume the time is milliseconds,
+ else we assume seconds. Setting -1 is always block.
+ Returns the nn value (0 on success <0 on error).
+*/
+extern int rr_send_to( void* vctx, int time ) {
+ rr_ctx_t* ctx;
+
+ if( (ctx = (rr_ctx_t *) vctx) == NULL ) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if( time > 0 ) {
+ if( time < 100 ) {
+ time = time * 1000; // assume seconds, nn wants ms
+ }
+ }
+
+ return nn_setsockopt( ctx->nn_sock, NN_SOL_SOCKET, NN_SNDTIMEO, &time, sizeof( time ) );
+}
+
diff --git a/test/rtg_sim/req_resp.h b/test/rtg_sim/req_resp.h
new file mode 100644
index 0000000..0473c8b
--- /dev/null
+++ b/test/rtg_sim/req_resp.h
@@ -0,0 +1,60 @@
+// :vi ts=4 sw=4 noet:
+/*
+==================================================================================
+ Copyright (c) 2019 Nokia
+ Copyright (c) 2018-2019 AT&T Intellectual Property.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+==================================================================================
+*/
+
+#ifndef _req_resp_h
+#define _req_resp_h
+
+/*
+ A message buffer that can be used for either req/resp context setup
+ or for publishing in a pub/sub setup.
+*/
+typedef struct rr_mbuf {
+ int used; // bytes actually used
+ int size; // allocated size of payload
+ char* payload;
+} rr_mbuf_t;
+
+
+/*
+ A 'context' to interface with nano; so far very simple, but could
+ expand, so we use the struct.
+*/
+typedef struct rr_ctx {
+ int nn_sock;
+} rr_ctx_t;
+
+
+// ---- prototypes for the rr library ----------------------
+// vctx is the pointer returned by the connect or start listening functions
+// and is passed to nearly all other functions.
+
+extern void* rr_connect( char* host, char* port );
+extern void* rr_start_listening( char* port );
+extern rr_mbuf_t* rr_new_buffer( rr_mbuf_t* mb, int len );
+extern void rr_close( void* vctx );
+extern void rr_free( void* vctx );
+extern void rr_free_mbuf( rr_mbuf_t* mbuf );
+extern void* open_publisher( char* port );
+extern int rr_rcv_to( void* vctx, int time );
+extern int rr_send_to( void* vctx, int time );
+extern rr_mbuf_t* rr_receive( void* vctx, rr_mbuf_t* mbuf, int len );
+extern rr_mbuf_t* rr_send( void* vctx, rr_mbuf_t* mbuf, int alloc_buf );
+
+#endif
diff --git a/test/rtg_sim/rtm_sim.c b/test/rtg_sim/rtm_sim.c
new file mode 100644
index 0000000..c52bb2d
--- /dev/null
+++ b/test/rtg_sim/rtm_sim.c
@@ -0,0 +1,803 @@
+// :vim ts=4 sw=4 noet:
+/*
+==================================================================================
+ Copyright (c) 2019 Nokia
+ Copyright (c) 2018-2019 AT&T Intellectual Property.
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+==================================================================================
+*/
+
+/*
+ Mnemonic: rtm_sim.c
+ Abstract: This is a simple route manager simulation which provides the ability
+ to push a route table into one or more xAPPs. Designed just to
+ drive the internal RMr route table collector outside of the static
+ file allowing for testing of port definition in some containerised
+ environments.
+
+ This application does not persist; it generates a set of tables based
+ on the config file, connects to all applications listed, distributes
+ the table, and exits. If periodic delivery of one or more different
+ configurations needs to be executed, use a shell script to wrap this
+ application in a loop.
+
+ Date: 14 June 2019
+ Author: E. Scott Daniels
+*/
+
+/*
+config file format:
+ # comment and blank lines allowed
+ # trailing comments allowed
+
+ # port is used for any app listed in send2 which does not have a trailing :port
+ # it may be supplied as a different value before each table, and if not
+ # redefined applies to all subsequent tables.
+ #
+
+ # A table consists of a send2 list (app[:port]) which are the applications that will
+ # receive the table. Each table may contain one or more entries. Entries define
+ # the message type and subscription ID, along with one or more round robin groups.
+ # A rrgroup is one or more app:port "endpoints" which RMr will use when sending
+ # messages of the indicated type/subid. Port on a rrgroup is rquired and is the
+ # port that the application uses for app to app communications.
+ #
+
+ port: xapp-rtg-listen-port # 4561 default
+ table:
+ send2: app1:port app2:port ... appn:port
+ entry:
+ mtype: n
+ subid: n
+ rrgroup: app:port... app:port
+
+ entry:
+ mtype: n
+ subid: n
+ rrgroup: app:port ... app:port
+*/
+
+
+#include <ctype.h>
+#include <unistd.h>
+#include <errno.h>
+#include <string.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <sys/epoll.h>
+#include <time.h>
+#include <stdio.h>
+#include <fcntl.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+
+#include "req_resp.c" // simple nano interface for request/response connections
+
+
+#define CONNECTED 1 // we've established a shunt connection to the app
+
+#define TRUE 1
+#define FALSE 0
+
+#define ALLOC_NEW 1 // rrsend should allocate a new buffer
+
+#define MAX_TABLES 16 // total tables we support
+#define MAX_SEND2 64 // max number of apps that a table can be sent to
+#define MAX_APPS 1024 // max total apps defined (tables * send2)
+#define MAX_GROUPS 64 // max num of round robin groups per entry
+#define MAX_RRG_SIZE 64 // max number of apps in a group
+#define MAX_ENTRIES 256 // max entries in a table
+#define MAX_TOKENS 512 // max tokens we'll break a buffer into
+
+
+// ---------------------------------------------------------------------------------------
+
+/*
+ Things we need to track for an application.
+*/
+typedef struct app {
+ void* shunt; // the rr context to shunt directly to an app
+ int state; // connected or not
+ char* name; // IP address or DNS name and port for connecting
+ char* port; // rr wants two strings as it builds it's own NN string
+} app_t;
+
+
+
+// ----- table stuff (very staticly sized, but this isn't for prod) ----------------------
+/*
+ A round robin group in a table entry
+*/
+typedef struct rrgroup {
+ int napps; // number of apps
+ char* apps[MAX_RRG_SIZE];
+} rrgroup_t;
+
+/*
+ A single table entry.
+*/
+typedef struct entry {
+ int mtype; // entry message type
+ int subid; // entry sub id
+ int ngroups;
+ rrgroup_t groups[MAX_GROUPS]; // the entry's groups
+} entry_t;
+
+/*
+ Defines a table which we will distribute.
+*/
+typedef struct table {
+ int napps; // number of apps this table is sent to
+ int first_app; // first app in minfo that we send to
+ int nentries; // number of entries
+ entry_t entries[MAX_ENTRIES];
+} table_t;
+
+/*
+ Master set of contextual information.
+*/
+typedef struct master {
+ int napps; // number in use (next insert point)
+ int ntables;
+ int port; // the port applications open by default for our connections
+ app_t apps[MAX_APPS];
+ table_t tables[MAX_TABLES];
+} master_t;
+
+/*
+ Record buffer; file in memory which can be iterated over a record at a time.
+*/
+typedef struct rbuffer {
+ char* buffer; // stuff read from file
+ char* rec; // next record
+ int at_end; // true if end was reached
+} rbuffer_t;
+
+/*
+ Set of tokens.
+*/
+typedef struct tokens {
+ char* buffer; // buffer that tokens points into
+ int ntoks; // number of tokens in tokens
+ char* tokens[MAX_TOKENS]; // pointers into buffer at the start of each token 0..ntokens-1
+} tokens_t;
+
+
+// ----- token utilities ------------------------------------------------------------
+
+/*
+ Frees a token manager.
+*/
+static void free_tokens( tokens_t* t ) {
+ if( t == NULL ) {
+ return;
+ }
+
+ free( t );
+}
+
+/*
+ Simple tokeniser. If sep is whitespace, then leading whitespace (all, not just
+ sep) is ignored; if sep is not whitespace, then leadign whitespace is included
+ in the first token. If sep is whitespace, consecutive instances of whitespace
+ are treated as a single seperator:
+ if sep given as space, then
+ "bug boo" and "bug boo" both generate two tokens: (bug) (boo)
+
+ if sep given as pipe (|), then
+ "bug||boo" generates three tokens: (bug), (), (boo)
+
+ Each token is a zero terminated string.
+
+*/
+static tokens_t* tokenise( char* buf, char sep ) {
+ tokens_t* t;
+ int i;
+ char end_sep; // if quoted endsep will be the quote mark
+
+ if( !buf || !(*buf) ) {
+ return NULL;
+ }
+
+ t = (tokens_t *) malloc( sizeof( *t ) );
+ memset( t, 0, sizeof( *t ) );
+
+ t->buffer = strdup( buf );
+ buf = t->buffer; // convenience
+
+ if( isspace( sep ) ) { // if sep is in whitespace class
+ while( buf != NULL && *buf && isspace( *buf ) ) { // pass over any leading whitespace
+ buf++;
+ }
+
+ for( i = 0; i < strlen( buf ); i++ ) {
+ if( buf[i] == '\t' ) {
+ buf[i] = ' ';
+ }
+ }
+ }
+
+ while( buf != NULL && t->ntoks < MAX_TOKENS && *buf ) {
+ if( *buf == '"' ) {
+ end_sep = '"';
+ buf++;
+
+ } else {
+ end_sep = sep;
+ }
+
+ t->tokens[t->ntoks++] = buf; // capture token start
+
+ if( (buf = strchr( buf, end_sep )) != NULL ) { // find token end
+ *(buf++) = 0;
+
+ if( end_sep != sep ) {
+ buf++;
+ }
+
+ if( isspace( sep ) ) { // treat consec seperators as one if sep is whitespace
+ while( *buf == sep ) {
+ buf++;
+ }
+ }
+ }
+ }
+
+ return t;
+}
+
+// ----- file/record management utilities ------------------------------------------------------------
+
+/*
+ Read an entire file into a single buffer.
+*/
+static char* f2b( char* fname ) {
+ struct stat stats;
+ off_t fsize = 8192; // size of the file
+ off_t nread; // number of bytes read
+ int fd;
+ char* buf; // input buffer
+
+ if( (fd = open( fname, O_RDONLY )) >= 0 ) {
+ if( fstat( fd, &stats ) >= 0 ) {
+ if( stats.st_size <= 0 ) { // empty file
+ close( fd );
+ fd = -1;
+ } else {
+ fsize = stats.st_size; // stat ok, save the file size
+ }
+ } else {
+ fsize = 8192; // stat failed, we'll leave the file open and try to read a default max of 8k
+ }
+ }
+
+ if( fd < 0 ) { // didn't open or empty
+ if( (buf = (char *) malloc( sizeof( char ) * 1 )) == NULL ) {
+ return NULL;
+ }
+
+ *buf = 0;
+ return buf;
+ }
+
+ if( (buf = (char *) malloc( sizeof( char ) * fsize + 2 )) == NULL ) { // enough to add nil char to make string
+ close( fd );
+ errno = ENOMEM;
+ return NULL;
+ }
+
+ nread = read( fd, buf, fsize );
+ if( nread < 0 || nread > fsize ) { // failure of some kind
+ free( buf );
+ errno = EFBIG; // likely too much to handle
+ close( fd );
+ return NULL;
+ }
+
+ buf[nread] = 0;
+
+ close( fd );
+ return buf;
+}
+
+/*
+ Read a file into a buffer, and set a record buffer to manage it.
+*/
+static rbuffer_t* f2r( char* fname ) {
+ char* raw; // raw buffer
+ rbuffer_t* r;
+
+ if( (raw = f2b( fname )) == NULL ) {
+ return NULL;
+ }
+
+ r = (rbuffer_t *) malloc( sizeof( *r ) );
+ memset( r, 0, sizeof( *r ) );
+ r->buffer = raw;
+ r->rec = raw; // point at first (only) record
+}
+
+/*
+ Return a pointer to the next record in the buffer, or nil if at
+ end of buffer.
+*/
+static char* next_rec( rbuffer_t* r ) {
+ char* rec;
+
+ if( !r || r->at_end ) {
+ return NULL;
+ }
+
+ rec = r->rec;
+ r->rec = strchr( r->rec, '\n' );
+ if( r->rec ) {
+ *r->rec = 0;
+ r->rec++;
+ if( *r->rec == 0 ) {
+ r->at_end = TRUE;
+ }
+ } else {
+ r->at_end = TRUE; // mark for next call
+ }
+
+ return rec;
+}
+
+static void free_rbuf( rbuffer_t* r ) {
+ if( r != NULL ) {
+ if( r->buffer != NULL ) {
+ free( r->buffer );
+ }
+
+ free( r );
+ }
+}
+
+
+// ----- table management ------------------------------------------------------------
+
+/*
+ Run the app list and attempt to open a shunt to any unconnected application.
+ Returns the number of applications that could not be connected to.
+*/
+static int connect2all( master_t* mi ) {
+ int errors = 0;
+ int i;
+
+
+ if( mi == NULL ) {
+ return 1;
+ }
+
+ for( i = 0; i < mi->napps; i++ ) {
+ if( mi->apps[i].state != CONNECTED ) {
+ fprintf( stderr, "[INF] opening shunt to: %s:%s\n", mi->apps[i].name, mi->apps[i].port );
+ mi->apps[i].shunt = rr_connect( mi->apps[i].name, mi->apps[i].port );
+ if( mi->apps[i].shunt == NULL) {
+ errors++;
+ } else {
+ fprintf( stderr, "[INFO] shunt created to: %s:%s\n", mi->apps[i].name, mi->apps[i].port );
+ mi->apps[i].state = CONNECTED;
+ }
+ }
+ }
+
+ return errors;
+}
+
+/*
+ Add an application to the current table.
+*/
+static void add_app( master_t* mi, char* app_name ) {
+ char wbuf[256];
+ char* app_port;
+ char* ch;
+
+ if( mi == NULL || app_name == NULL ) {
+ return;
+ }
+
+ if( mi->napps > MAX_APPS ) {
+ fprintf( stderr, "[WARN] too many applications, ignoring: %s\n", app_name );
+ }
+
+ if( (ch = strchr( app_name, ':' )) == NULL ) { // assume we are using the default rm listen port
+ snprintf( wbuf, sizeof( wbuf ), "%d", mi->port );
+ app_port = wbuf;
+ } else {
+ *(ch++) = 0; // name port given, split and point at port
+ app_port = ch;
+ }
+
+ mi->apps[mi->napps].name = strdup( app_name );
+ mi->apps[mi->napps].port = strdup( app_port );
+ mi->apps[mi->napps].state = !CONNECTED;
+ mi->napps++;
+}
+
+
+/*
+ Initialise things; returns a master info context.
+*/
+static master_t* init( char* cfname ) {
+ master_t* mi;
+ char wbuf[128];
+ rbuffer_t* rb; // record manager for reading config file
+ char* rec;
+ tokens_t* tokens;
+ int i;
+ int errors;
+ char* tok;
+ table_t* table = NULL;
+ rrgroup_t* rrg = NULL;
+ entry_t* entry = NULL;
+ int rec_num = 0;
+
+ mi = (master_t *) malloc( sizeof( *mi ) );
+ if( mi == NULL ) {
+ return NULL;
+ }
+ memset( mi, 0, sizeof( *mi ) );
+ mi->port = 4561;
+
+ rb = f2r( cfname ); // get a record buffer to parse the config file
+ if( rb == NULL ) {
+ fprintf( stderr, "[FAIL] unable to open config file: %s: %s\n", cfname, strerror( errno ) );
+ free( mi );
+ return NULL;
+ }
+
+ while( (rec = next_rec( rb )) != NULL ) {
+ if( *rec ) {
+ rec_num++;
+ tokens = tokenise( rec, ' ' );
+
+ fprintf( stderr, "parsing %d: %s\n", rec_num, rec );
+
+ for( i = 0; i < tokens->ntoks && *tokens->tokens[i] != '#'; i++ ); // simple comment strip
+ tokens->ntoks = i;
+
+ if( tokens->ntoks > 0 ) {
+ tok = tokens->tokens[0];
+ switch( *tok ) { // faster jump table based on 1st ch; strcmp only if needed later
+ case '#':
+ break;
+
+ case 'e':
+ if( table != NULL && table->nentries < MAX_ENTRIES ) {
+ entry = &table->entries[table->nentries++];
+ entry->subid = -1; // no subscription id if user omits
+ } else {
+ fprintf( stderr, "[ERR] @%d no table started, or table full\n", rec_num );
+ }
+ break;
+
+ case 'm':
+ if( entry != NULL ) {
+ entry->mtype = atoi( tokens->tokens[1] );
+ } else {
+ fprintf( stderr, "[ERR] @%d no entry started\n", rec_num );
+ }
+ break;
+
+ case 'p':
+ if( tokens->ntoks > 1 ) {
+ mi->port = atoi( tokens->tokens[1] );
+ if( mi->port < 1000 ) {
+ fprintf( stderr, "[WRN] @%d assigned default xAPP port smells fishy: %s\n", rec_num, tokens->tokens[1] );
+ }
+ }
+ break;
+
+ case 'r': // round robin group
+ if( entry != NULL && entry->ngroups < MAX_GROUPS ) {
+ if( tokens->ntoks < MAX_RRG_SIZE ) {
+ rrg = &entry->groups[entry->ngroups++];
+
+ for( i = 1; i < tokens->ntoks; i++ ) {
+ rrg->apps[rrg->napps++] = strdup( tokens->tokens[i] );
+ }
+ } else {
+ fprintf( stderr, "[ERR] @%d round robin group too big.\n", rec_num );
+ }
+ } else {
+ fprintf( stderr, "[ERR] @%d no previous entry, or entry is full\n", rec_num );
+ }
+ break;
+
+ case 's':
+ if( *(tok+1) == 'e' ) { // send2
+ if( table != NULL && tokens->ntoks < MAX_SEND2 ) {
+ table->first_app = mi->napps;
+
+ for( i = 1; i < tokens->ntoks; i++ ) {
+ add_app( mi, tokens->tokens[i] );
+ }
+
+ table->napps = tokens->ntoks - 1;
+ }
+ } else { // subid
+ if( entry != NULL ) {
+ entry->subid = atoi( tokens->tokens[1] );
+ } else {
+ fprintf( stderr, "[ERR] @%d no entry started\n", rec_num );
+ }
+ }
+ break;
+
+ case 't':
+ entry = NULL;
+ if( mi->ntables < MAX_TABLES ) {
+ table = &mi->tables[mi->ntables++];
+ } else {
+ fprintf( stderr, "[ERR] @%d too many tables defined\n", rec_num );
+ table = NULL;
+ }
+ break;
+
+ default:
+ fprintf( stderr, "record from config was ignored: %s\n", rec );
+ break;
+ }
+ }
+ }
+ }
+
+ free_rbuf( rb );
+ return mi;
+}
+
+
+/*
+ Build a buffer with the entry n from table t. Both entry and table
+ numbers are 0 based.
+ Caller must free returned buffer.
+*/
+static char* mk_entry( master_t* mi, int table, int entry ) {
+ char wbuf[4096];
+ char sbuf[256];
+ table_t* tab;
+ entry_t* ent;
+ int i;
+ int j;
+ int len;
+ int alen;
+
+ if( !mi || mi->ntables <= table ) {
+ return NULL;
+ }
+
+ tab = &mi->tables[table];
+ if( tab->nentries <= entry ) {
+ return NULL;
+ }
+
+ ent = &tab->entries[entry];
+
+ snprintf( wbuf, sizeof( wbuf ), "mse | %d | %d | ", ent->mtype, ent->subid ); // we only generate mse records
+
+ len = strlen( wbuf );
+ for( i = 0; i < ent->ngroups; i++ ) {
+ if( i ) {
+ strcat( wbuf, "; " );
+ }
+
+ for( j = 0; j < ent->groups[i].napps; j++ ) {
+ alen = strlen( ent->groups[i].apps[j] ) + 3; // not percise, but close enough for testing
+ if( alen + len > sizeof( wbuf ) ) {
+ fprintf( stderr, "[ERR] entry %d for table %d is too large to format\n", entry, table );
+ return NULL;
+ }
+
+ if( j ) {
+ strcat( wbuf, "," );
+ }
+
+ strcat( wbuf, ent->groups[i].apps[j] );
+ }
+ }
+
+ strcat( wbuf, "\n" );
+ return strdup( wbuf );
+}
+
+/*
+ Sends a buffer to all apps in the range.
+*/
+void send2range( master_t* mi, char* buf, int first, int n2send ) {
+ int a; // application offset in master array
+ int last; // stopping point (index)
+ rr_mbuf_t* mbuf = NULL;
+
+ if( !mi ) { // safe to dance
+ return;
+ }
+
+ a = first;
+ last = first + n2send;
+
+ mbuf = rr_new_buffer( mbuf, strlen( buf ) + 5 ); // ensure buffer is large enough
+ while( a < last ) {
+ fprintf( stderr, "%s ", mi->apps[a].name );
+
+ memcpy( mbuf->payload, buf, strlen( buf ) );
+ mbuf->used = strlen( buf );
+ mbuf = rr_send( mi->apps[a].shunt, mbuf, ALLOC_NEW );
+
+ a++;
+
+ fprintf( stderr, "\n" );
+ }
+
+ rr_free_mbuf( mbuf );
+}
+
+/*
+ Formats the entries for the table and sends to all applications that the
+ table should be sent to per the send2 directive in the config.
+*/
+static void send_table( master_t* mi, int table ) {
+ table_t* tab;
+ char* ent; // entry string to send
+ int e = 0; // entry number
+ int a;
+ int last;
+ rr_mbuf_t* mbuf = NULL;
+
+ if( !mi || table > mi->ntables ) {
+ return;
+ }
+
+ tab = &mi->tables[table];
+ fprintf( stderr, "[INF] send table start message: " );
+ send2range( mi, "newrt | start\n", tab->first_app, tab->napps ); // send table start req to all
+
+ while( (ent = mk_entry( mi, table, e++ )) != NULL ) { // build each entry once, send to all
+ fprintf( stderr, "[INF] sending table %d entry %d: ", table, e );
+ send2range( mi, ent, tab->first_app, tab->napps );
+ }
+
+ fprintf( stderr, "[INF] send table end message: " );
+ send2range( mi, "newrt | end\n", tab->first_app, tab->napps ); // send table end notice to all
+}
+
+/*
+ Run the list of tables and send them all out.
+*/
+static void send_all_tables( master_t* mi ) {
+ int i;
+
+ if( ! mi ) {
+ return;
+ }
+
+ for( i = 0; i < mi->ntables; i++ ) {
+ send_table( mi, i );
+ }
+}
+
+
+// ----------------- testing ----------------------------------------------------------------------
+
+/*
+ Dump table entries in the form we'll send to apps to stderr.
+*/
+static void print_tables( master_t* mi ) {
+ char* ent;
+ int t;
+ int e;
+
+ if( ! mi ) {
+ return;
+ }
+
+ for( t = 0; t < mi->ntables; t++ ) {
+ fprintf( stderr, "=== table %d ===\n", t );
+ e = 0;
+ while( (ent = mk_entry( mi, t, e++ )) != NULL ) {
+ fprintf( stderr, "%s", ent );
+ free( ent );
+ }
+ }
+}
+
+static void print_tokens( tokens_t* tokens ) {
+ int i;
+
+ fprintf( stderr, "there are %d tokens\n", tokens->ntoks );
+ for( i = 0; i < tokens->ntoks; i++ ) {
+ fprintf( stderr, "[%02d] (%s)\n", i, tokens->tokens[i] );
+ }
+}
+
+static void self_test( char* fname ) {
+ char* s0 = " Now is the time for all to stand up and cheer!";
+ char* s1 = " Now is \"the time for all\" to stand up and cheer!";
+ char* s2 = " field1 | field2||field4|field5";
+ tokens_t* tokens;
+ rbuffer_t *rb;
+ int i;
+
+ tokens = tokenise( s0, ' ' );
+ if( tokens->ntoks != 11 ) {
+ fprintf( stderr, "didn't parse into 11 tokens (got %d): %s\n", tokens->ntoks, s1 );
+ }
+ print_tokens( tokens );
+
+ tokens = tokenise( s1, ' ' );
+ if( tokens->ntoks != 8 ) {
+ fprintf( stderr, "didn't parse into 11 tokens (got %d): %s\n", tokens->ntoks, s1 );
+ }
+ print_tokens( tokens );
+
+ tokens = tokenise( s2, '|' );
+ if( tokens->ntoks != 5 ) {
+ fprintf( stderr, "didn't parse into 5 tokens (got %d): %s\n", tokens->ntoks, s2 );
+ }
+ print_tokens( tokens );
+
+ free_tokens( tokens );
+
+ if( fname == NULL ) {
+ return;
+ }
+
+ rb = f2r( fname );
+ if( rb == NULL ) {
+ fprintf( stderr, "[FAIL] couldn't read file into rbuffer: %s\n", strerror( errno ) );
+ } else {
+ while( (s2 = next_rec( rb )) != NULL ) {
+ fprintf( stderr, "record: (%s)\n", s2 );
+ }
+
+ free_rbuf( rb );
+ }
+
+ return;
+}
+
+
+// ----------------------------------------------------------------------------------------------
+
+int main( int argc, char** argv ) {
+ void* mi;
+ int not_ready;
+
+ if( argc > 1 ) {
+ if( strcmp( argv[1], "selftest" ) == 0 ) {
+ self_test( argc > 1 ? argv[2] : NULL );
+ exit( 0 );
+ }
+
+ mi = init( argv[1] ); // parse the config and generate table structs
+ if( ! mi ) {
+ fprintf( stderr, "[CRI] initialisation failed\n" );
+ exit( 1 );
+ }
+
+ print_tables( mi );
+
+ while( (not_ready = connect2all( mi ) ) > 0 ) {
+ fprintf( stderr, "[INF] still waiting to connect to %d applications\n", not_ready );
+ sleep( 2 );
+ }
+
+ fprintf( stderr, "[INF] connected to all applications, sending tables\n" );
+
+ send_all_tables( mi );
+ } else {
+ fprintf( stderr, "[INFO] usage: %s file-name\n", argv[0] );
+ }
+}
+