blob: 2801e42bad9c187b1ca98284c106267d335dba5a [file] [log] [blame]
E. Scott Daniels39257742019-06-06 16:44:33 +00001// :vim ts=4 sw=4 noet:
2/*
3==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
10
11 http://www.apache.org/licenses/LICENSE-2.0
12
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18==================================================================================
19*/
20
21/*
22 Mnemonic: rmr_rcvr.c
23 Abstract: This is a very simple receiver that listens for messages and
24 returns each to the sender after adding a timestamp to the
25 payload. The payload is expected to be lc_msg_t (see lcaller.c)
26 and this will update the 'turn' timestamp on receipt.
27
28 Define these environment variables to have some control:
29 RMR_SEED_RT -- path to the static routing table
30 RMR_RTG_SVC -- port to listen for RTG connections
31
32 Date: 18 April 2019
33 Author: E. Scott Daniels
34*/
35
36#include <unistd.h>
37#include <errno.h>
38#include <stdio.h>
39#include <stdlib.h>
40#include <time.h>
41#include <string.h>
42
43#include <rmr/rmr.h>
44
45/*
46 The message type placed into the payload.
47*/
48typedef struct lc_msg {
49 struct timespec out_ts; // time just before call executed
50 struct timespec turn_ts; // time at the receiver, on receipt
51 struct timespec in_ts; // time received back by the caller
52 int out_retries; // number of retries required to send
53 int turn_retries; // number of retries required to send
54} lc_msg_t;
55
56// ----------------------------------------------------------------------------------
57
58static int sum( char* str ) {
59 int sum = 0;
60 int i = 0;
61
62 while( *str ) {
63 sum += *(str++) + i++;
64 }
65
66 return sum % 255;
67}
68
69/*
70 Split the message at the first sep and return a pointer to the first
71 character after.
72*/
73static char* split( char* str, char sep ) {
74 char* s;
75
76 s = strchr( str, sep );
77
78 if( s ) {
79 return s+1;
80 }
81
82 fprintf( stderr, "<RCVR> no pipe in message: (%s)\n", str );
83 return NULL;
84}
85
86int main( int argc, char** argv ) {
87 void* mrc; // msg router context
88 lc_msg_t* lmc; // latency message type from caller
89 rmr_mbuf_t* msg = NULL; // message received
90 int i;
91 int errors = 0;
92 char* listen_port = "4560";
93 long count = 0; // total received
94 long timeout = 0;
95 char* data;
96 int nmsgs = 10; // number of messages to stop after (argv[1] overrides)
97 int rt_count = 0; // retry count
98 time_t now;
99 int active;
100
101 data = getenv( "RMR_RTG_SVC" );
102 if( data == NULL ) {
103 setenv( "RMR_RTG_SVC", "19289", 1 ); // set one that won't collide with the sender if on same host
104 }
105
106 if( argc > 1 ) {
107 nmsgs = atoi( argv[1] );
108 }
109 if( argc > 2 ) {
110 listen_port = argv[2];
111 }
112
113
114 fprintf( stderr, "<RCVR> listening on port: %s for a max of %d messages\n", listen_port, nmsgs );
115
116 mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, RMRFL_MTCALL ); // start your engines!
117 //mrc = rmr_init( listen_port, RMR_MAX_RCV_BYTES, 0 ); // start your engines!
118 if( mrc == NULL ) {
119 fprintf( stderr, "<RCVR> ABORT: unable to initialise RMr\n" );
120 exit( 1 );
121 }
122
123 timeout = time( NULL ) + 20;
124 while( ! rmr_ready( mrc ) ) { // wait for RMr to load a route table
125 fprintf( stderr, "<RCVR> waiting for RMr to show ready\n" );
126 sleep( 1 );
127
128 if( time( NULL ) > timeout ) {
129 fprintf( stderr, "<RCVR> giving up\n" );
130 exit( 1 );
131 }
132 }
133 fprintf( stderr, "<RCVR> rmr now shows ready, listening begins\n" );
134
135 timeout = time( NULL ) + 2; // once we start, we assume if we go 2s w/o a message that we're done
136 //while( count < nmsgs ) {
137 while( 1 ) {
138 active = 0;
139 msg = rmr_torcv_msg( mrc, msg, 1000 ); // pop every second or so to timeout if needed
140
141 if( msg ) {
E. Scott Daniels39257742019-06-06 16:44:33 +0000142 if( msg->state == RMR_OK ) {
E. Scott Daniels02e8d492020-01-21 12:23:28 -0500143 active = 1;
E. Scott Daniels39257742019-06-06 16:44:33 +0000144 lmc = (lc_msg_t *) msg->payload;
145 clock_gettime( CLOCK_REALTIME, &lmc->turn_ts ); // mark time that we received it.
146 count++;
147
148 msg = rmr_rts_msg( mrc, msg );
149 rt_count = 1000;
150 while( rt_count > 0 && msg != NULL && msg->state == RMR_ERR_RETRY ) { // to work right in nano we need this :(
151 lmc->turn_retries++;
152 if( count < 1 ) { // 1st msg, so we need to connect, and we'll wait for that
153 sleep( 1 );
154 }
155 rt_count--;
156 msg = rmr_rts_msg( mrc, msg ); // we don't try to resend if this returns retry
157 }
158 }
159 }
160
161 now = time( NULL );
162 if( now > timeout ) {
163 break;
164 }
165
166 if( active ) {
167 timeout = now + 2;
168 }
169 }
170
171 fprintf( stderr, "<RCVR> %ld is finished got %ld messages\n", (long) getpid(), count );
172
173
174 sleep( 3 );
175 rmr_close( mrc );
176 return 0;
177}
178