E. Scott Daniels | 412d53d | 2019-05-20 20:00:52 +0000 | [diff] [blame] | 1 | // :vim ts=4 sw=4 noet: |
| 2 | /* |
| 3 | ================================================================================== |
| 4 | Copyright (c) 2019 Nokia |
| 5 | Copyright (c) 2018-2019 AT&T Intellectual Property. |
| 6 | |
| 7 | Licensed under the Apache License, Version 2.0 (the "License"); |
| 8 | you may not use this file except in compliance with the License. |
| 9 | You may obtain a copy of the License at |
| 10 | |
| 11 | http://www.apache.org/licenses/LICENSE-2.0 |
| 12 | |
| 13 | Unless required by applicable law or agreed to in writing, software |
| 14 | distributed under the License is distributed on an "AS IS" BASIS, |
| 15 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 16 | See the License for the specific language governing permissions and |
| 17 | limitations under the License. |
| 18 | ================================================================================== |
| 19 | */ |
| 20 | |
| 21 | /* |
| 22 | Mnemonic: caller.c |
| 23 | Abstract: This is a simple sender which will send a series of messages using |
| 24 | rmr_call(). N threads are started each sending the desired number |
| 25 | of messages and expecting an 'ack' for each. Each ack is examined |
| 26 | to verify that the thread id placed into the message matches (meaning |
| 27 | that the ack was delivered by RMr to the correct thread's chute. |
| 28 | |
| 29 | In addition, the main thread listens for messages in order to verify |
| 30 | that a main or receiving thread can receive messages concurrently |
| 31 | while call acks are pending and being processed. |
| 32 | |
| 33 | Message format is: |
| 34 | ck1 ck2|<msg-txt> @ tid<nil> |
| 35 | |
| 36 | Ck1 is the simple check sum of the msg-text (NOT includeing <nil>) |
| 37 | Ck2 is the simple check sum of the trace data which is a nil terminated |
| 38 | series of bytes. |
| 39 | tid is the thread id assigned by the main thread. |
| 40 | |
| 41 | Parms: argv[1] == number of msgs to send (10) |
| 42 | argv[2] == delay (mu-seconds, 1000000 default) |
| 43 | argv[3] == number of threads (3) |
| 44 | argv[4] == listen port |
| 45 | |
| 46 | Sender will send for at most 20 seconds, so if nmsgs and delay extend |
| 47 | beyond that period the total number of messages sent will be less |
| 48 | than n. |
| 49 | |
| 50 | Date: 18 April 2019 |
| 51 | Author: E. Scott Daniels |
| 52 | */ |
| 53 | |
| 54 | #include <unistd.h> |
| 55 | #include <errno.h> |
| 56 | #include <string.h> |
| 57 | #include <stdio.h> |
| 58 | #include <stdlib.h> |
| 59 | #include <sys/epoll.h> |
| 60 | #include <time.h> |
| 61 | #include <pthread.h> |
| 62 | |
| 63 | |
| 64 | #include <rmr/rmr.h> |
| 65 | |
| 66 | #define TRACE_SIZE 40 // bytes in header to provide for trace junk |
| 67 | |
| 68 | /* |
| 69 | Thread data |
| 70 | */ |
| 71 | typedef struct tdata { |
| 72 | int id; // the id we'll pass to RMr mt-call function NOT the thread id |
| 73 | int n2send; // number of messages to send |
| 74 | int delay; // ms delay between messages |
| 75 | void* mrc; // RMr context |
| 76 | int state; |
| 77 | } tdata_t; |
| 78 | |
| 79 | |
| 80 | |
| 81 | // -------------------------------------------------------------------------------- |
| 82 | |
| 83 | |
| 84 | static int sum( char* str ) { |
| 85 | int sum = 0; |
| 86 | int i = 0; |
| 87 | |
| 88 | while( *str ) { |
| 89 | sum += *(str++) + i++; |
| 90 | } |
| 91 | |
| 92 | return sum % 255; |
| 93 | } |
| 94 | |
| 95 | |
| 96 | |
| 97 | /* |
| 98 | Executed as a thread, this puppy will generate calls to ensure that we get the |
| 99 | response back to the right thread, that we can handle threads, etc. |
| 100 | */ |
| 101 | static void* mk_calls( void* data ) { |
| 102 | tdata_t* control; |
| 103 | rmr_mbuf_t* sbuf; // send buffer |
| 104 | int count = 0; |
| 105 | int rt_count = 0; // number of messages requiring a spin retry |
| 106 | int ok_msg = 0; // received messages that were sent by us |
| 107 | int bad_msg = 0; // received messages that were sent by a different thread |
| 108 | int drops = 0; |
| 109 | int fail_count = 0; // # of failure sends after first successful send |
| 110 | int successful = 0; // set to true after we have a successful send |
| 111 | char wbuf[1024]; |
| 112 | char xbuf[1024]; // build transaction string here |
| 113 | char trace[1024]; |
| 114 | int xaction_id = 1; |
| 115 | char* tok; |
| 116 | int state = 0; |
| 117 | |
| 118 | if( (control = (tdata_t *) data) == NULL ) { |
| 119 | fprintf( stderr, "thread data was nil; bailing out\n" ); |
| 120 | } |
| 121 | //fprintf( stderr, "<THRD> thread started with parallel call id=%d sending=%d delay=%d\n", control->id, control->n2send, control->delay ); |
| 122 | |
| 123 | sbuf = rmr_alloc_msg( control->mrc, 512 ); // alloc first send buffer; subsequent buffers allcoated on send |
| 124 | |
| 125 | memset( trace, 0, sizeof( trace ) ); |
| 126 | while( count < control->n2send ) { // we send n messages after the first message is successful |
| 127 | snprintf( trace, 100, "%lld", (long long) time( NULL ) ); |
| 128 | rmr_set_trace( sbuf, trace, TRACE_SIZE ); // fully populate so we dont cause a buffer realloc |
| 129 | |
| 130 | snprintf( wbuf, 200, "count=%d tr=%s %d stand up and cheer! @ %d", count, trace, rand(), control->id ); |
| 131 | snprintf( sbuf->payload, 300, "%d %d|%s", sum( wbuf ), sum( trace ), wbuf ); |
| 132 | snprintf( xbuf, 200, "%31d", xaction_id ); |
| 133 | rmr_bytes2xact( sbuf, xbuf, 32 ); |
| 134 | |
| 135 | sbuf->mtype = 5; // mtype is always 5 as the test receiver acks just mtype 5 messages |
| 136 | sbuf->len = strlen( sbuf->payload ) + 1; // our receiver likely wants a nice acsii-z string |
| 137 | sbuf->state = 0; |
| 138 | sbuf = rmr_mt_call( control->mrc, sbuf, control->id, 1000 ); // send it (send returns an empty payload on success, or the original payload on fail/retry) |
| 139 | |
| 140 | if( sbuf && sbuf->state == RMR_ERR_RETRY ) { // number of times we had to spin to send |
| 141 | rt_count++; |
| 142 | } |
| 143 | while( sbuf != NULL && sbuf->state == RMR_ERR_RETRY ) { // send blocked; keep trying |
| 144 | sbuf = rmr_mt_call( control->mrc, sbuf, control->id, 100 ); // call and wait up to 100ms for a response |
| 145 | } |
| 146 | |
| 147 | if( sbuf != NULL ) { |
| 148 | switch( sbuf->state ) { |
| 149 | case RMR_OK: // we should have a buffer back from the sender here |
| 150 | successful = 1; |
| 151 | if( (tok = strchr( sbuf->payload, '@' )) != NULL ) { |
| 152 | if( atoi( tok+1 ) == control->id ) { |
| 153 | //fprintf( stderr, "<THRD> tid=%-2d ok ack\n", control->id ); |
| 154 | ok_msg++; |
| 155 | } else { |
| 156 | bad_msg++; |
| 157 | //fprintf( stderr, "<THRD> tid=%-2d bad ack %s\n", control->id, sbuf->payload ); |
| 158 | } |
| 159 | } |
| 160 | //fprintf( stderr, "<THRD> tid=%-2d call returned valid msg: %s\n", control->id, sbuf->payload ); |
| 161 | // future -- verify that we see our ID at the end of the message |
| 162 | count++; |
| 163 | break; |
| 164 | |
| 165 | default: |
| 166 | fprintf( stderr, "unexpected error: tid=%d rmr-state=%d ernro=%d\n", control->id, sbuf->state, errno ); |
| 167 | sbuf = rmr_alloc_msg( control->mrc, 512 ); // allocate a sendable buffer |
| 168 | if( successful ) { |
| 169 | fail_count++; // count failures after first successful message |
| 170 | } else { |
| 171 | // some error (not connected likely), don't count this |
| 172 | sleep( 1 ); |
| 173 | } |
| 174 | break; |
| 175 | } |
| 176 | } else { |
| 177 | //fprintf( stderr, "<THRD> tid=%-2d call finished, no sbuf\n", control->id ); |
| 178 | sbuf = rmr_alloc_msg( control->mrc, 512 ); // loop expects an subf |
| 179 | drops++; |
| 180 | count++; |
| 181 | } |
| 182 | |
| 183 | if( control->delay > 0 ) { |
| 184 | usleep( control->delay ); |
| 185 | } |
| 186 | } |
| 187 | |
| 188 | state = 1; |
| 189 | if( ok_msg < (control->n2send-1) || bad_msg > 0 ) { // allow one drop to pass |
| 190 | state = 0; |
| 191 | } |
| 192 | if( count < control->n2send ) { |
| 193 | state = 0; |
| 194 | } |
| 195 | |
| 196 | control->state = -state; // signal inactive to main thread; -1 == pass, 0 == fail |
| 197 | fprintf( stderr, "<THRD> [%s] tid=%-2d sent=%d ok-acks=%d bad-acks=%d drops=%d failures=%d retries=%d\n", |
| 198 | state ? "PASS" : "FAIL", control->id, count, ok_msg, bad_msg, drops, fail_count, rt_count ); |
| 199 | |
| 200 | |
| 201 | return NULL; |
| 202 | } |
| 203 | |
| 204 | int main( int argc, char** argv ) { |
| 205 | void* mrc; // msg router context |
| 206 | rmr_mbuf_t* rbuf = NULL; // received on 'normal' flow |
| 207 | struct epoll_event events[1]; // list of events to give to epoll |
| 208 | struct epoll_event epe; // event definition for event to listen to |
| 209 | int ep_fd = -1; // epoll's file des (given to epoll_wait) |
| 210 | int rcv_fd; // file des that NNG tickles -- give this to epoll to listen on |
| 211 | int nready; // number of events ready for receive |
| 212 | char* listen_port = "43086"; |
| 213 | long timeout = 0; |
| 214 | int delay = 100000; // usec between send attempts |
| 215 | int nmsgs = 10; // number of messages to send |
| 216 | int nthreads = 3; |
| 217 | tdata_t* cvs; // vector of control blocks |
| 218 | int i; |
| 219 | pthread_t* pt_info; // thread stuff |
| 220 | int failures = 0; |
| 221 | int pings = 0; // number of messages received on normal channel |
| 222 | |
| 223 | if( argc > 1 ) { |
| 224 | nmsgs = atoi( argv[1] ); |
| 225 | } |
| 226 | if( argc > 2 ) { |
| 227 | delay = atoi( argv[2] ); |
| 228 | } |
| 229 | if( argc > 3 ) { |
| 230 | nthreads = atoi( argv[3] ); |
| 231 | } |
| 232 | if( argc > 4 ) { |
| 233 | listen_port = argv[4]; |
| 234 | } |
| 235 | |
| 236 | fprintf( stderr, "<CALL> listen port: %s; sending %d messages; delay=%d\n", listen_port, nmsgs, delay ); |
| 237 | |
| 238 | if( (mrc = rmr_init( listen_port, 1400, RMRFL_MTCALL )) == NULL ) { // initialise with multi-threaded call enabled |
| 239 | fprintf( stderr, "<CALL> unable to initialise RMr\n" ); |
| 240 | exit( 1 ); |
| 241 | } |
| 242 | |
| 243 | rmr_init_trace( mrc, TRACE_SIZE ); |
| 244 | |
| 245 | if( (rcv_fd = rmr_get_rcvfd( mrc )) >= 0 ) { // epoll only available from NNG -- skip receive later if not NNG |
| 246 | if( rcv_fd < 0 ) { |
| 247 | fprintf( stderr, "<CALL> unable to set up polling fd\n" ); |
| 248 | exit( 1 ); |
| 249 | } |
| 250 | if( (ep_fd = epoll_create1( 0 )) < 0 ) { |
| 251 | fprintf( stderr, "<CALL> [FAIL] unable to create epoll fd: %d\n", errno ); |
| 252 | exit( 1 ); |
| 253 | } |
| 254 | epe.events = EPOLLIN; |
| 255 | epe.data.fd = rcv_fd; |
| 256 | |
| 257 | if( epoll_ctl( ep_fd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 ) { |
| 258 | fprintf( stderr, "<CALL> [FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) ); |
| 259 | exit( 1 ); |
| 260 | } |
| 261 | } else { |
| 262 | rmr_set_rtimeout( mrc, 0 ); // for nano we must set the receive timeout to 0; non-blocking receive |
| 263 | } |
| 264 | |
| 265 | |
| 266 | cvs = malloc( sizeof( tdata_t ) * nthreads ); |
| 267 | pt_info = malloc( sizeof( pthread_t ) * nthreads ); |
| 268 | if( cvs == NULL ) { |
| 269 | fprintf( stderr, "<CALL> unable to allocate control vector\n" ); |
| 270 | exit( 1 ); |
| 271 | } |
| 272 | |
| 273 | |
| 274 | timeout = time( NULL ) + 20; // give rmr 20s to find the route table (shouldn't need that much) |
| 275 | while( ! rmr_ready( mrc ) ) { // must have a route table before we can send; wait til RMr says it has one |
| 276 | fprintf( stderr, "<CALL> waiting for rmr to show ready\n" ); |
| 277 | sleep( 1 ); |
| 278 | |
| 279 | if( time( NULL ) > timeout ) { |
| 280 | fprintf( stderr, "<CALL> giving up\n" ); |
| 281 | exit( 1 ); |
| 282 | } |
| 283 | } |
| 284 | fprintf( stderr, "<CALL> rmr is ready; starting threads\n" ); |
| 285 | |
| 286 | for( i = 0; i < nthreads; i++ ) { |
| 287 | cvs[i].mrc = mrc; |
| 288 | cvs[i].id = i + 2; // we pass this as the call-id to rmr, so must be >1 |
| 289 | cvs[i].delay = delay; |
| 290 | cvs[i].n2send = nmsgs; |
| 291 | cvs[i].state = 1; |
| 292 | |
| 293 | pthread_create( &pt_info[i], NULL, mk_calls, &cvs[i] ); // kick a thread |
| 294 | } |
| 295 | |
| 296 | timeout = time( NULL ) + 20; |
| 297 | i = 0; |
| 298 | while( nthreads > 0 ) { |
| 299 | if( cvs[i].state < 1 ) { // states 0 or below indicate done. 0 == failure, -n == success |
| 300 | nthreads--; |
| 301 | if( cvs[i].state == 0 ) { |
| 302 | failures++; |
| 303 | } |
| 304 | i++; |
| 305 | } else { |
| 306 | // sleep( 1 ); |
| 307 | rbuf = rmr_torcv_msg( mrc, rbuf, 1000 ); |
| 308 | if( rbuf != NULL && rbuf->state != RMR_ERR_RETRY ) { |
| 309 | pings++; |
| 310 | rmr_free_msg( rbuf ); |
| 311 | rbuf = NULL; |
| 312 | } |
| 313 | } |
| 314 | if( time( NULL ) > timeout ) { |
| 315 | failures += nthreads; |
| 316 | fprintf( stderr, "<CALL> timeout waiting for threads to finish; %d were not finished\n", nthreads ); |
| 317 | break; |
| 318 | } |
| 319 | } |
| 320 | |
| 321 | fprintf( stderr, "<CALL> [%s] failing threads=%d pings reeived=%d\n", failures == 0 ? "PASS" : "FAIL", failures, pings ); |
| 322 | rmr_close( mrc ); |
| 323 | |
| 324 | return failures > 0; |
| 325 | } |
| 326 | |