blob: 7b7336a879ee2ed2d1c43196e6773636ba51d608 [file] [log] [blame]
E. Scott Daniels412d53d2019-05-20 20:00:52 +00001// :vim ts=4 sw=4 noet:
2/*
3==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
10
11 http://www.apache.org/licenses/LICENSE-2.0
12
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18==================================================================================
19*/
20
21/*
22 Mnemonic: caller.c
23 Abstract: This is a simple sender which will send a series of messages using
24 rmr_call(). N threads are started each sending the desired number
25 of messages and expecting an 'ack' for each. Each ack is examined
26 to verify that the thread id placed into the message matches (meaning
27 that the ack was delivered by RMr to the correct thread's chute.
28
29 In addition, the main thread listens for messages in order to verify
30 that a main or receiving thread can receive messages concurrently
31 while call acks are pending and being processed.
32
33 Message format is:
34 ck1 ck2|<msg-txt> @ tid<nil>
35
36 Ck1 is the simple check sum of the msg-text (NOT includeing <nil>)
37 Ck2 is the simple check sum of the trace data which is a nil terminated
38 series of bytes.
39 tid is the thread id assigned by the main thread.
40
41 Parms: argv[1] == number of msgs to send (10)
42 argv[2] == delay (mu-seconds, 1000000 default)
43 argv[3] == number of threads (3)
44 argv[4] == listen port
45
46 Sender will send for at most 20 seconds, so if nmsgs and delay extend
47 beyond that period the total number of messages sent will be less
48 than n.
49
50 Date: 18 April 2019
51 Author: E. Scott Daniels
52*/
53
54#include <unistd.h>
55#include <errno.h>
56#include <string.h>
57#include <stdio.h>
58#include <stdlib.h>
59#include <sys/epoll.h>
60#include <time.h>
61#include <pthread.h>
62
63
64#include <rmr/rmr.h>
65
66#define TRACE_SIZE 40 // bytes in header to provide for trace junk
67
68/*
69 Thread data
70*/
71typedef struct tdata {
72 int id; // the id we'll pass to RMr mt-call function NOT the thread id
73 int n2send; // number of messages to send
74 int delay; // ms delay between messages
75 void* mrc; // RMr context
76 int state;
77} tdata_t;
78
79
80
81// --------------------------------------------------------------------------------
82
83
84static int sum( char* str ) {
85 int sum = 0;
86 int i = 0;
87
88 while( *str ) {
89 sum += *(str++) + i++;
90 }
91
92 return sum % 255;
93}
94
95
96
97/*
98 Executed as a thread, this puppy will generate calls to ensure that we get the
99 response back to the right thread, that we can handle threads, etc.
100*/
101static void* mk_calls( void* data ) {
102 tdata_t* control;
103 rmr_mbuf_t* sbuf; // send buffer
104 int count = 0;
105 int rt_count = 0; // number of messages requiring a spin retry
106 int ok_msg = 0; // received messages that were sent by us
107 int bad_msg = 0; // received messages that were sent by a different thread
108 int drops = 0;
109 int fail_count = 0; // # of failure sends after first successful send
110 int successful = 0; // set to true after we have a successful send
111 char wbuf[1024];
112 char xbuf[1024]; // build transaction string here
113 char trace[1024];
114 int xaction_id = 1;
115 char* tok;
116 int state = 0;
117
118 if( (control = (tdata_t *) data) == NULL ) {
119 fprintf( stderr, "thread data was nil; bailing out\n" );
120 }
121 //fprintf( stderr, "<THRD> thread started with parallel call id=%d sending=%d delay=%d\n", control->id, control->n2send, control->delay );
122
123 sbuf = rmr_alloc_msg( control->mrc, 512 ); // alloc first send buffer; subsequent buffers allcoated on send
124
125 memset( trace, 0, sizeof( trace ) );
126 while( count < control->n2send ) { // we send n messages after the first message is successful
127 snprintf( trace, 100, "%lld", (long long) time( NULL ) );
128 rmr_set_trace( sbuf, trace, TRACE_SIZE ); // fully populate so we dont cause a buffer realloc
129
130 snprintf( wbuf, 200, "count=%d tr=%s %d stand up and cheer! @ %d", count, trace, rand(), control->id );
131 snprintf( sbuf->payload, 300, "%d %d|%s", sum( wbuf ), sum( trace ), wbuf );
132 snprintf( xbuf, 200, "%31d", xaction_id );
133 rmr_bytes2xact( sbuf, xbuf, 32 );
134
135 sbuf->mtype = 5; // mtype is always 5 as the test receiver acks just mtype 5 messages
136 sbuf->len = strlen( sbuf->payload ) + 1; // our receiver likely wants a nice acsii-z string
137 sbuf->state = 0;
138 sbuf = rmr_mt_call( control->mrc, sbuf, control->id, 1000 ); // send it (send returns an empty payload on success, or the original payload on fail/retry)
139
140 if( sbuf && sbuf->state == RMR_ERR_RETRY ) { // number of times we had to spin to send
141 rt_count++;
142 }
143 while( sbuf != NULL && sbuf->state == RMR_ERR_RETRY ) { // send blocked; keep trying
144 sbuf = rmr_mt_call( control->mrc, sbuf, control->id, 100 ); // call and wait up to 100ms for a response
145 }
146
147 if( sbuf != NULL ) {
148 switch( sbuf->state ) {
149 case RMR_OK: // we should have a buffer back from the sender here
150 successful = 1;
151 if( (tok = strchr( sbuf->payload, '@' )) != NULL ) {
152 if( atoi( tok+1 ) == control->id ) {
153 //fprintf( stderr, "<THRD> tid=%-2d ok ack\n", control->id );
154 ok_msg++;
155 } else {
156 bad_msg++;
157 //fprintf( stderr, "<THRD> tid=%-2d bad ack %s\n", control->id, sbuf->payload );
158 }
159 }
160 //fprintf( stderr, "<THRD> tid=%-2d call returned valid msg: %s\n", control->id, sbuf->payload );
161 // future -- verify that we see our ID at the end of the message
162 count++;
163 break;
164
165 default:
166 fprintf( stderr, "unexpected error: tid=%d rmr-state=%d ernro=%d\n", control->id, sbuf->state, errno );
167 sbuf = rmr_alloc_msg( control->mrc, 512 ); // allocate a sendable buffer
168 if( successful ) {
169 fail_count++; // count failures after first successful message
170 } else {
171 // some error (not connected likely), don't count this
172 sleep( 1 );
173 }
174 break;
175 }
176 } else {
177 //fprintf( stderr, "<THRD> tid=%-2d call finished, no sbuf\n", control->id );
178 sbuf = rmr_alloc_msg( control->mrc, 512 ); // loop expects an subf
179 drops++;
180 count++;
181 }
182
183 if( control->delay > 0 ) {
184 usleep( control->delay );
185 }
186 }
187
188 state = 1;
189 if( ok_msg < (control->n2send-1) || bad_msg > 0 ) { // allow one drop to pass
190 state = 0;
191 }
192 if( count < control->n2send ) {
193 state = 0;
194 }
195
196 control->state = -state; // signal inactive to main thread; -1 == pass, 0 == fail
197 fprintf( stderr, "<THRD> [%s] tid=%-2d sent=%d ok-acks=%d bad-acks=%d drops=%d failures=%d retries=%d\n",
198 state ? "PASS" : "FAIL", control->id, count, ok_msg, bad_msg, drops, fail_count, rt_count );
199
200
201 return NULL;
202}
203
204int main( int argc, char** argv ) {
205 void* mrc; // msg router context
206 rmr_mbuf_t* rbuf = NULL; // received on 'normal' flow
207 struct epoll_event events[1]; // list of events to give to epoll
208 struct epoll_event epe; // event definition for event to listen to
209 int ep_fd = -1; // epoll's file des (given to epoll_wait)
210 int rcv_fd; // file des that NNG tickles -- give this to epoll to listen on
211 int nready; // number of events ready for receive
212 char* listen_port = "43086";
213 long timeout = 0;
214 int delay = 100000; // usec between send attempts
215 int nmsgs = 10; // number of messages to send
216 int nthreads = 3;
217 tdata_t* cvs; // vector of control blocks
218 int i;
219 pthread_t* pt_info; // thread stuff
220 int failures = 0;
221 int pings = 0; // number of messages received on normal channel
222
223 if( argc > 1 ) {
224 nmsgs = atoi( argv[1] );
225 }
226 if( argc > 2 ) {
227 delay = atoi( argv[2] );
228 }
229 if( argc > 3 ) {
230 nthreads = atoi( argv[3] );
231 }
232 if( argc > 4 ) {
233 listen_port = argv[4];
234 }
235
236 fprintf( stderr, "<CALL> listen port: %s; sending %d messages; delay=%d\n", listen_port, nmsgs, delay );
237
238 if( (mrc = rmr_init( listen_port, 1400, RMRFL_MTCALL )) == NULL ) { // initialise with multi-threaded call enabled
239 fprintf( stderr, "<CALL> unable to initialise RMr\n" );
240 exit( 1 );
241 }
242
243 rmr_init_trace( mrc, TRACE_SIZE );
244
245 if( (rcv_fd = rmr_get_rcvfd( mrc )) >= 0 ) { // epoll only available from NNG -- skip receive later if not NNG
246 if( rcv_fd < 0 ) {
247 fprintf( stderr, "<CALL> unable to set up polling fd\n" );
248 exit( 1 );
249 }
250 if( (ep_fd = epoll_create1( 0 )) < 0 ) {
251 fprintf( stderr, "<CALL> [FAIL] unable to create epoll fd: %d\n", errno );
252 exit( 1 );
253 }
254 epe.events = EPOLLIN;
255 epe.data.fd = rcv_fd;
256
257 if( epoll_ctl( ep_fd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 ) {
258 fprintf( stderr, "<CALL> [FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
259 exit( 1 );
260 }
261 } else {
262 rmr_set_rtimeout( mrc, 0 ); // for nano we must set the receive timeout to 0; non-blocking receive
263 }
264
265
266 cvs = malloc( sizeof( tdata_t ) * nthreads );
267 pt_info = malloc( sizeof( pthread_t ) * nthreads );
268 if( cvs == NULL ) {
269 fprintf( stderr, "<CALL> unable to allocate control vector\n" );
270 exit( 1 );
271 }
272
273
274 timeout = time( NULL ) + 20; // give rmr 20s to find the route table (shouldn't need that much)
275 while( ! rmr_ready( mrc ) ) { // must have a route table before we can send; wait til RMr says it has one
276 fprintf( stderr, "<CALL> waiting for rmr to show ready\n" );
277 sleep( 1 );
278
279 if( time( NULL ) > timeout ) {
280 fprintf( stderr, "<CALL> giving up\n" );
281 exit( 1 );
282 }
283 }
284 fprintf( stderr, "<CALL> rmr is ready; starting threads\n" );
285
286 for( i = 0; i < nthreads; i++ ) {
287 cvs[i].mrc = mrc;
288 cvs[i].id = i + 2; // we pass this as the call-id to rmr, so must be >1
289 cvs[i].delay = delay;
290 cvs[i].n2send = nmsgs;
291 cvs[i].state = 1;
292
293 pthread_create( &pt_info[i], NULL, mk_calls, &cvs[i] ); // kick a thread
294 }
295
296 timeout = time( NULL ) + 20;
297 i = 0;
298 while( nthreads > 0 ) {
299 if( cvs[i].state < 1 ) { // states 0 or below indicate done. 0 == failure, -n == success
300 nthreads--;
301 if( cvs[i].state == 0 ) {
302 failures++;
303 }
304 i++;
305 } else {
306 // sleep( 1 );
307 rbuf = rmr_torcv_msg( mrc, rbuf, 1000 );
308 if( rbuf != NULL && rbuf->state != RMR_ERR_RETRY ) {
309 pings++;
310 rmr_free_msg( rbuf );
311 rbuf = NULL;
312 }
313 }
314 if( time( NULL ) > timeout ) {
315 failures += nthreads;
316 fprintf( stderr, "<CALL> timeout waiting for threads to finish; %d were not finished\n", nthreads );
317 break;
318 }
319 }
320
321 fprintf( stderr, "<CALL> [%s] failing threads=%d pings reeived=%d\n", failures == 0 ? "PASS" : "FAIL", failures, pings );
322 rmr_close( mrc );
323
324 return failures > 0;
325}
326