blob: c72e0723a0b82862034992859d7cde4f9879000c [file] [log] [blame]
E. Scott Daniels412d53d2019-05-20 20:00:52 +00001// :vim ts=4 sw=4 noet:
2/*
3==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
10
11 http://www.apache.org/licenses/LICENSE-2.0
12
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18==================================================================================
19*/
20
21/*
22 Mnemonic: caller.c
23 Abstract: This is a simple sender which will send a series of messages using
24 rmr_call(). N threads are started each sending the desired number
25 of messages and expecting an 'ack' for each. Each ack is examined
26 to verify that the thread id placed into the message matches (meaning
27 that the ack was delivered by RMr to the correct thread's chute.
28
29 In addition, the main thread listens for messages in order to verify
30 that a main or receiving thread can receive messages concurrently
31 while call acks are pending and being processed.
32
33 Message format is:
34 ck1 ck2|<msg-txt> @ tid<nil>
35
36 Ck1 is the simple check sum of the msg-text (NOT includeing <nil>)
37 Ck2 is the simple check sum of the trace data which is a nil terminated
38 series of bytes.
39 tid is the thread id assigned by the main thread.
40
41 Parms: argv[1] == number of msgs to send (10)
42 argv[2] == delay (mu-seconds, 1000000 default)
43 argv[3] == number of threads (3)
44 argv[4] == listen port
45
46 Sender will send for at most 20 seconds, so if nmsgs and delay extend
47 beyond that period the total number of messages sent will be less
48 than n.
49
50 Date: 18 April 2019
51 Author: E. Scott Daniels
52*/
53
54#include <unistd.h>
55#include <errno.h>
56#include <string.h>
57#include <stdio.h>
58#include <stdlib.h>
59#include <sys/epoll.h>
60#include <time.h>
61#include <pthread.h>
62
63
64#include <rmr/rmr.h>
65
66#define TRACE_SIZE 40 // bytes in header to provide for trace junk
E. Scott Danielsec88d3c2019-11-13 09:40:22 -050067#define WBUF_SIZE 1024
E. Scott Daniels412d53d2019-05-20 20:00:52 +000068
69/*
70 Thread data
71*/
72typedef struct tdata {
73 int id; // the id we'll pass to RMr mt-call function NOT the thread id
74 int n2send; // number of messages to send
75 int delay; // ms delay between messages
76 void* mrc; // RMr context
77 int state;
78} tdata_t;
79
80
81
82// --------------------------------------------------------------------------------
83
84
85static int sum( char* str ) {
86 int sum = 0;
87 int i = 0;
88
89 while( *str ) {
90 sum += *(str++) + i++;
91 }
92
93 return sum % 255;
94}
95
96
97
98/*
99 Executed as a thread, this puppy will generate calls to ensure that we get the
100 response back to the right thread, that we can handle threads, etc.
101*/
102static void* mk_calls( void* data ) {
103 tdata_t* control;
104 rmr_mbuf_t* sbuf; // send buffer
105 int count = 0;
106 int rt_count = 0; // number of messages requiring a spin retry
107 int ok_msg = 0; // received messages that were sent by us
108 int bad_msg = 0; // received messages that were sent by a different thread
109 int drops = 0;
110 int fail_count = 0; // # of failure sends after first successful send
111 int successful = 0; // set to true after we have a successful send
E. Scott Danielsec88d3c2019-11-13 09:40:22 -0500112 char* wbuf = NULL;
E. Scott Daniels412d53d2019-05-20 20:00:52 +0000113 char xbuf[1024]; // build transaction string here
114 char trace[1024];
115 int xaction_id = 1;
116 char* tok;
117 int state = 0;
118
E. Scott Danielsec88d3c2019-11-13 09:40:22 -0500119 wbuf = (char *) malloc( sizeof( char ) * WBUF_SIZE );
120
E. Scott Daniels412d53d2019-05-20 20:00:52 +0000121 if( (control = (tdata_t *) data) == NULL ) {
122 fprintf( stderr, "thread data was nil; bailing out\n" );
123 }
124 //fprintf( stderr, "<THRD> thread started with parallel call id=%d sending=%d delay=%d\n", control->id, control->n2send, control->delay );
125
126 sbuf = rmr_alloc_msg( control->mrc, 512 ); // alloc first send buffer; subsequent buffers allcoated on send
127
128 memset( trace, 0, sizeof( trace ) );
129 while( count < control->n2send ) { // we send n messages after the first message is successful
130 snprintf( trace, 100, "%lld", (long long) time( NULL ) );
131 rmr_set_trace( sbuf, trace, TRACE_SIZE ); // fully populate so we dont cause a buffer realloc
132
133 snprintf( wbuf, 200, "count=%d tr=%s %d stand up and cheer! @ %d", count, trace, rand(), control->id );
134 snprintf( sbuf->payload, 300, "%d %d|%s", sum( wbuf ), sum( trace ), wbuf );
135 snprintf( xbuf, 200, "%31d", xaction_id );
136 rmr_bytes2xact( sbuf, xbuf, 32 );
137
138 sbuf->mtype = 5; // mtype is always 5 as the test receiver acks just mtype 5 messages
139 sbuf->len = strlen( sbuf->payload ) + 1; // our receiver likely wants a nice acsii-z string
140 sbuf->state = 0;
141 sbuf = rmr_mt_call( control->mrc, sbuf, control->id, 1000 ); // send it (send returns an empty payload on success, or the original payload on fail/retry)
142
143 if( sbuf && sbuf->state == RMR_ERR_RETRY ) { // number of times we had to spin to send
144 rt_count++;
145 }
146 while( sbuf != NULL && sbuf->state == RMR_ERR_RETRY ) { // send blocked; keep trying
147 sbuf = rmr_mt_call( control->mrc, sbuf, control->id, 100 ); // call and wait up to 100ms for a response
148 }
149
150 if( sbuf != NULL ) {
151 switch( sbuf->state ) {
152 case RMR_OK: // we should have a buffer back from the sender here
153 successful = 1;
154 if( (tok = strchr( sbuf->payload, '@' )) != NULL ) {
155 if( atoi( tok+1 ) == control->id ) {
156 //fprintf( stderr, "<THRD> tid=%-2d ok ack\n", control->id );
157 ok_msg++;
158 } else {
159 bad_msg++;
160 //fprintf( stderr, "<THRD> tid=%-2d bad ack %s\n", control->id, sbuf->payload );
161 }
162 }
163 //fprintf( stderr, "<THRD> tid=%-2d call returned valid msg: %s\n", control->id, sbuf->payload );
164 // future -- verify that we see our ID at the end of the message
165 count++;
166 break;
167
168 default:
169 fprintf( stderr, "unexpected error: tid=%d rmr-state=%d ernro=%d\n", control->id, sbuf->state, errno );
170 sbuf = rmr_alloc_msg( control->mrc, 512 ); // allocate a sendable buffer
171 if( successful ) {
172 fail_count++; // count failures after first successful message
173 } else {
174 // some error (not connected likely), don't count this
175 sleep( 1 );
176 }
177 break;
178 }
179 } else {
180 //fprintf( stderr, "<THRD> tid=%-2d call finished, no sbuf\n", control->id );
181 sbuf = rmr_alloc_msg( control->mrc, 512 ); // loop expects an subf
182 drops++;
183 count++;
184 }
185
186 if( control->delay > 0 ) {
187 usleep( control->delay );
188 }
189 }
190
191 state = 1;
192 if( ok_msg < (control->n2send-1) || bad_msg > 0 ) { // allow one drop to pass
193 state = 0;
194 }
195 if( count < control->n2send ) {
196 state = 0;
197 }
198
199 control->state = -state; // signal inactive to main thread; -1 == pass, 0 == fail
200 fprintf( stderr, "<THRD> [%s] tid=%-2d sent=%d ok-acks=%d bad-acks=%d drops=%d failures=%d retries=%d\n",
201 state ? "PASS" : "FAIL", control->id, count, ok_msg, bad_msg, drops, fail_count, rt_count );
202
203
204 return NULL;
205}
206
207int main( int argc, char** argv ) {
208 void* mrc; // msg router context
209 rmr_mbuf_t* rbuf = NULL; // received on 'normal' flow
210 struct epoll_event events[1]; // list of events to give to epoll
211 struct epoll_event epe; // event definition for event to listen to
212 int ep_fd = -1; // epoll's file des (given to epoll_wait)
213 int rcv_fd; // file des that NNG tickles -- give this to epoll to listen on
214 int nready; // number of events ready for receive
215 char* listen_port = "43086";
216 long timeout = 0;
217 int delay = 100000; // usec between send attempts
218 int nmsgs = 10; // number of messages to send
219 int nthreads = 3;
220 tdata_t* cvs; // vector of control blocks
221 int i;
222 pthread_t* pt_info; // thread stuff
223 int failures = 0;
224 int pings = 0; // number of messages received on normal channel
225
226 if( argc > 1 ) {
227 nmsgs = atoi( argv[1] );
228 }
229 if( argc > 2 ) {
230 delay = atoi( argv[2] );
231 }
232 if( argc > 3 ) {
233 nthreads = atoi( argv[3] );
234 }
235 if( argc > 4 ) {
236 listen_port = argv[4];
237 }
238
239 fprintf( stderr, "<CALL> listen port: %s; sending %d messages; delay=%d\n", listen_port, nmsgs, delay );
240
241 if( (mrc = rmr_init( listen_port, 1400, RMRFL_MTCALL )) == NULL ) { // initialise with multi-threaded call enabled
242 fprintf( stderr, "<CALL> unable to initialise RMr\n" );
243 exit( 1 );
244 }
245
246 rmr_init_trace( mrc, TRACE_SIZE );
247
248 if( (rcv_fd = rmr_get_rcvfd( mrc )) >= 0 ) { // epoll only available from NNG -- skip receive later if not NNG
249 if( rcv_fd < 0 ) {
250 fprintf( stderr, "<CALL> unable to set up polling fd\n" );
251 exit( 1 );
252 }
253 if( (ep_fd = epoll_create1( 0 )) < 0 ) {
254 fprintf( stderr, "<CALL> [FAIL] unable to create epoll fd: %d\n", errno );
255 exit( 1 );
256 }
257 epe.events = EPOLLIN;
258 epe.data.fd = rcv_fd;
259
260 if( epoll_ctl( ep_fd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 ) {
261 fprintf( stderr, "<CALL> [FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
262 exit( 1 );
263 }
264 } else {
265 rmr_set_rtimeout( mrc, 0 ); // for nano we must set the receive timeout to 0; non-blocking receive
266 }
267
268
269 cvs = malloc( sizeof( tdata_t ) * nthreads );
270 pt_info = malloc( sizeof( pthread_t ) * nthreads );
271 if( cvs == NULL ) {
272 fprintf( stderr, "<CALL> unable to allocate control vector\n" );
273 exit( 1 );
274 }
275
276
277 timeout = time( NULL ) + 20; // give rmr 20s to find the route table (shouldn't need that much)
278 while( ! rmr_ready( mrc ) ) { // must have a route table before we can send; wait til RMr says it has one
279 fprintf( stderr, "<CALL> waiting for rmr to show ready\n" );
280 sleep( 1 );
281
282 if( time( NULL ) > timeout ) {
283 fprintf( stderr, "<CALL> giving up\n" );
284 exit( 1 );
285 }
286 }
287 fprintf( stderr, "<CALL> rmr is ready; starting threads\n" );
288
289 for( i = 0; i < nthreads; i++ ) {
290 cvs[i].mrc = mrc;
291 cvs[i].id = i + 2; // we pass this as the call-id to rmr, so must be >1
292 cvs[i].delay = delay;
293 cvs[i].n2send = nmsgs;
294 cvs[i].state = 1;
295
296 pthread_create( &pt_info[i], NULL, mk_calls, &cvs[i] ); // kick a thread
297 }
298
299 timeout = time( NULL ) + 20;
300 i = 0;
301 while( nthreads > 0 ) {
302 if( cvs[i].state < 1 ) { // states 0 or below indicate done. 0 == failure, -n == success
303 nthreads--;
304 if( cvs[i].state == 0 ) {
305 failures++;
306 }
307 i++;
308 } else {
309 // sleep( 1 );
310 rbuf = rmr_torcv_msg( mrc, rbuf, 1000 );
311 if( rbuf != NULL && rbuf->state != RMR_ERR_RETRY ) {
312 pings++;
313 rmr_free_msg( rbuf );
314 rbuf = NULL;
315 }
316 }
317 if( time( NULL ) > timeout ) {
318 failures += nthreads;
319 fprintf( stderr, "<CALL> timeout waiting for threads to finish; %d were not finished\n", nthreads );
320 break;
321 }
322 }
323
324 fprintf( stderr, "<CALL> [%s] failing threads=%d pings reeived=%d\n", failures == 0 ? "PASS" : "FAIL", failures, pings );
325 rmr_close( mrc );
326
327 return failures > 0;
328}
329