blob: 6f739a568270acf318001056232c7e1aee69ce8b [file] [log] [blame]
E. Scott Danielse8a5b2c2019-04-22 17:04:10 +00001// :vim ts=4 sw=4 noet:
2/*
3==================================================================================
E. Scott Daniels8790bf02019-04-23 12:59:28 +00004 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
E. Scott Danielse8a5b2c2019-04-22 17:04:10 +00006
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
10
E. Scott Daniels8790bf02019-04-23 12:59:28 +000011 http://www.apache.org/licenses/LICENSE-2.0
E. Scott Danielse8a5b2c2019-04-22 17:04:10 +000012
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18==================================================================================
19*/
20
21/*
22 Mnemonic: rmr_rcvr.c
23 Abstract: This is a very simple receiver that does nothing but listen
24 for messages and write stats every so often to the tty.
25
26 The receiver expects messages which have some trace information
27 and a message format of:
28 ck1 ck2|<msg text><nil>
29
30 ck1 is a simple checksum of the message text (NOT including the
E. Scott Daniels8790bf02019-04-23 12:59:28 +000031 nil at the end of the string.
E. Scott Danielse8a5b2c2019-04-22 17:04:10 +000032
33 ck2 is a simple checksum of the trace data which for the purposes
34 of testing is assumed to have a terminating nil to keep this simple.
35
36 Good messages are messages where both computed checksums match
E. Scott Daniels8790bf02019-04-23 12:59:28 +000037 the ck1 and ck2 values.
E. Scott Danielse8a5b2c2019-04-22 17:04:10 +000038
39 The receiver will send an 'ack' message back to the sender for
40 all type 5 messages received.
41
42 The sender and receiver can be run on the same host/container
E. Scott Daniels8790bf02019-04-23 12:59:28 +000043 or on different hosts. The route table is the key to setting
E. Scott Danielse8a5b2c2019-04-22 17:04:10 +000044 things up properly. See the sender code for rt information.
45
46 Define these environment variables to have some control:
47 RMR_SEED_RT -- path to the static routing table
48 RMR_RTG_SVC -- port to listen for RTG connections
49
E. Scott Daniels412d53d2019-05-20 20:00:52 +000050 Compile time options
51 if -DMTC is defined on the compile command, then RMr is initialised
52 with the multi-threaded receive thread rather than using the same
53 process receive function. All other functions in the receiver are
54 the same.
55
E. Scott Danielse8a5b2c2019-04-22 17:04:10 +000056 Date: 18 April 2019
57 Author: E. Scott Daniels
58*/
59
60#include <unistd.h>
61#include <errno.h>
62#include <stdio.h>
63#include <stdlib.h>
64#include <time.h>
65#include <string.h>
66
67#include <rmr/rmr.h>
68
69static int sum( char* str ) {
70 int sum = 0;
71 int i = 0;
72
73 while( *str ) {
74 sum += *(str++) + i++;
75 }
76
77 return sum % 255;
78}
79
80/*
81 Split the message at the first sep and return a pointer to the first
E. Scott Daniels8790bf02019-04-23 12:59:28 +000082 character after.
E. Scott Danielse8a5b2c2019-04-22 17:04:10 +000083*/
84static char* split( char* str, char sep ) {
85 char* s;
86
87 s = strchr( str, sep );
88
89 if( s ) {
90 return s+1;
91 }
92
93 fprintf( stderr, "<RCVR> no pipe in message: (%s)\n", str );
94 return NULL;
95}
96
97int main( int argc, char** argv ) {
E. Scott Daniels8790bf02019-04-23 12:59:28 +000098 void* mrc; // msg router context
99 rmr_mbuf_t* msg = NULL; // message received
E. Scott Danielse8a5b2c2019-04-22 17:04:10 +0000100 int i;
101 int state;
102 int errors = 0;
103 char* listen_port = "4560";
104 long count = 0; // total received
105 long good = 0; // good palyload buffers
106 long bad = 0; // payload buffers which were not correct
107 long bad_tr = 0; // trace buffers that were not correct
E. Scott Danielsa41c6f52019-04-23 18:24:25 +0000108 long bad_sid = 0; // bad subscription ids
E. Scott Danielse8a5b2c2019-04-22 17:04:10 +0000109 long timeout = 0;
110 char* data;
E. Scott Danielse8a5b2c2019-04-22 17:04:10 +0000111 int nmsgs = 10; // number of messages to stop after (argv[1] overrides)
112 int rt_count = 0; // retry count
113 long ack_count = 0; // number of acks sent
E. Scott Danielsa41c6f52019-04-23 18:24:25 +0000114 int count_bins[11]; // histogram bins based on msg type (0-10)
115 char wbuf[1024]; // we'll pull trace data into here, and use as general working buffer
116 char sbuf[128]; // short buffer
E. Scott Danielse8a5b2c2019-04-22 17:04:10 +0000117
118 data = getenv( "RMR_RTG_SVC" );
119 if( data == NULL ) {
120 setenv( "RMR_RTG_SVC", "19289", 1 ); // set one that won't collide with the sender if on same host
121 }
122
123 if( argc > 1 ) {
124 nmsgs = atoi( argv[1] );
125 }
126 if( argc > 2 ) {
127 listen_port = argv[2];
128 }
E. Scott Daniels8790bf02019-04-23 12:59:28 +0000129
E. Scott Danielsa41c6f52019-04-23 18:24:25 +0000130 memset( count_bins, 0, sizeof( count_bins ) );
131
E. Scott Danielse8a5b2c2019-04-22 17:04:10 +0000132 fprintf( stderr, "<RCVR> listening on port: %s for a max of %d messages\n", listen_port, nmsgs );
133
E. Scott Daniels412d53d2019-05-20 20:00:52 +0000134#ifdef MTC
E. Scott Daniels2596b232019-08-30 12:58:54 -0400135 fprintf( stderr, "<RCVR> starting in multi-threaded mode\n" );
E. Scott Daniels412d53d2019-05-20 20:00:52 +0000136 mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_MTCALL ); // start RMr in mt-receive mode
E. Scott Daniels412d53d2019-05-20 20:00:52 +0000137#else
E. Scott Daniels2596b232019-08-30 12:58:54 -0400138 fprintf( stderr, "<RCVR> starting in direct receive mode\n" );
E. Scott Daniels8790bf02019-04-23 12:59:28 +0000139 mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_NONE ); // start your engines!
E. Scott Daniels412d53d2019-05-20 20:00:52 +0000140#endif
E. Scott Danielse8a5b2c2019-04-22 17:04:10 +0000141 if( mrc == NULL ) {
142 fprintf( stderr, "<RCVR> ABORT: unable to initialise RMr\n" );
143 exit( 1 );
144 }
145
146 timeout = time( NULL ) + 20;
147 while( ! rmr_ready( mrc ) ) { // wait for RMr to load a route table
148 fprintf( stderr, "<RCVR> waiting for RMr to show ready\n" );
149 sleep( 1 );
150
151 if( time( NULL ) > timeout ) {
152 fprintf( stderr, "<RCVR> giving up\n" );
153 exit( 1 );
154 }
155 }
156 fprintf( stderr, "<RCVR> rmr now shows ready, listening begins\n" );
157
158 timeout = time( NULL ) + 20;
E. Scott Daniels8790bf02019-04-23 12:59:28 +0000159 while( count < nmsgs ) {
E. Scott Danielse8a5b2c2019-04-22 17:04:10 +0000160 msg = rmr_torcv_msg( mrc, msg, 1000 ); // wait for about 1s so that if sender never starts we eventually escape
E. Scott Daniels8790bf02019-04-23 12:59:28 +0000161
E. Scott Danielse8a5b2c2019-04-22 17:04:10 +0000162 if( msg ) {
163 if( msg->state == RMR_OK ) {
164 if( (data = split( msg->payload, '|' )) != NULL ) {
165 if( sum( data ) == atoi( (char *) msg->payload ) ) {
166 good++;
167 } else {
168 fprintf( stderr, "<RCVR> chk sum bad: computed=%d expected;%d (%s)\n", sum( data ), atoi( msg->payload ), data );
169 bad++;
170 }
171 }
172
173 if( (data = split( msg->payload, ' ' )) != NULL ) { // data will point to the chksum for the trace data
174 state = rmr_get_trace( msg, wbuf, 1024 ); // should only copy upto the trace size; we'll check that
175 if( state > 128 || state < 1 ) {
176 fprintf( stderr, "trace data size listed unexpectedly long: %d\n", state );
177 }
178 if( sum( wbuf ) != atoi( data ) ) {
179 fprintf( stderr, "<RCVR> trace chk sum bad: computed=%d expected;%d len=%d (%s)\n", sum( wbuf ), atoi( data ), state, wbuf );
180 bad_tr++;
181 }
182 }
183 count++; // messages received for stats output
184
E. Scott Danielsa41c6f52019-04-23 18:24:25 +0000185 if( msg->mtype < 3 ) { // count number of properly set subscription id
186 if( msg->sub_id != msg->mtype * 10 ) {
187 bad_sid++;
188 }
189 }
190
191 if( msg->mtype >= 0 && msg->mtype <= 10 ) {
192 count_bins[msg->mtype]++;
193 }
194
195 if( msg->mtype == 5 ) { // send an ack; sender will count but not process, so data in message is moot
196 msg = rmr_rts_msg( mrc, msg );
E. Scott Danielse8a5b2c2019-04-22 17:04:10 +0000197 rt_count = 1000;
198 while( rt_count > 0 && msg != NULL && msg->state == RMR_ERR_RETRY ) { // to work right in nano we need this :(
E. Scott Danielsa41c6f52019-04-23 18:24:25 +0000199 if( ack_count < 1 ) { // 1st ack, so we need to connect, and we'll wait for that
E. Scott Danielse8a5b2c2019-04-22 17:04:10 +0000200 sleep( 1 );
E. Scott Danielsde63b292019-11-14 16:03:51 -0500201 if( rt_count > 5 ) {
202 rt_count = 5; // but only for 5sec; not 1000sec!
203 }
E. Scott Danielse8a5b2c2019-04-22 17:04:10 +0000204 }
205 rt_count--;
206 msg = rmr_rts_msg( mrc, msg ); // we don't try to resend if this returns retry
207 }
208 if( msg && msg->state == RMR_OK ) { // if it eventually worked
209 ack_count++;
210 }
211 }
E. Scott Danielsde63b292019-11-14 16:03:51 -0500212
213 timeout = time( NULL ) + 10; // extend timeout to 10s past last received message
E. Scott Danielse8a5b2c2019-04-22 17:04:10 +0000214 }
215 }
216
217 if( time( NULL ) > timeout ) {
218 fprintf( stderr, "receiver timed out\n" );
219 errors++;
220 break;
221 }
E. Scott Daniels8790bf02019-04-23 12:59:28 +0000222 }
E. Scott Danielse8a5b2c2019-04-22 17:04:10 +0000223
E. Scott Danielsa41c6f52019-04-23 18:24:25 +0000224 wbuf[0] = 0;
225 for( i = 0; i < 11; i++ ) {
226 snprintf( sbuf, sizeof( sbuf ), "%6d ", count_bins[i] );
227 strcat( wbuf, sbuf );
228 }
229
230 fprintf( stderr, "<RCVR> mtype histogram: %s\n", wbuf );
231 fprintf( stderr, "<RCVR> [%s] %ld messages; good=%ld acked=%ld bad=%ld bad-trace=%ld bad-sub_id=%ld\n",
232 !!(errors + bad + bad_tr) ? "FAIL" : "PASS", count, good, ack_count, bad, bad_tr, bad_sid );
233
E. Scott Danielse8a5b2c2019-04-22 17:04:10 +0000234 sleep( 2 ); // let any outbound acks flow before closing
235
236 rmr_close( mrc );
237 return !!(errors + bad + bad_tr); // bad rc if any are !0
238}
239