blob: 89a0afc6beac75bc5ceea80616f2985a722e2edd [file] [log] [blame]
E. Scott Danielse8a5b2c2019-04-22 17:04:10 +00001// :vim ts=4 sw=4 noet:
2/*
3==================================================================================
4 Copyright (c) 2019 Nokia
5 Copyright (c) 2018-2019 AT&T Intellectual Property.
6
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
10
11 http://www.apache.org/licenses/LICENSE-2.0
12
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18==================================================================================
19*/
20
21/*
22 Mnemonic: sender.c
23 Abstract: This is a simple sender which will send a series of messages.
24 It is expected that the first attempt(s) will fail if the receiver
25 is not up and this does not start decrementing the number to
26 send until it has a good send.
27
28 The process will check the receive queue and list received messages
29 but pass/fail is not dependent on what comes back.
30
31 If the receiver(s) do not become connectable in 20 sec this process
32 will give up and fail.
33
34
35 Message types will vary between 1 and 10, so the route table must
36 be set up to support those message types.
37
38 Message format is:
39 ck1 ck2|<msg-txt><nil>
40
41 Ck1 is the simple check sum of the msg-text (NOT includeing <nil>)
42 Ck2 is the simple check sum of the trace data which is a nil terminated
43 series of bytes.
44
45 Parms: argv[1] == nmsgs; argv[2] == delay; argv[3] == listen port
46
47 Sender will send for at most 20 seconds, so if nmsgs and delay extend
48 beyond that period the total number of messages sent will be less
49 than n.
50
51 Date: 18 April 2019
52 Author: E. Scott Daniels
53*/
54
55#include <unistd.h>
56#include <errno.h>
57#include <string.h>
58#include <stdio.h>
59#include <stdlib.h>
60#include <sys/epoll.h>
61#include <time.h>
62
63#include <rmr/rmr.h>
64
65static int sum( char* str ) {
66 int sum = 0;
67 int i = 0;
68
69 while( *str ) {
70 sum += *(str++) + i++;
71 }
72
73 return sum % 255;
74}
75
76int main( int argc, char** argv ) {
77 void* mrc; // msg router context
78 struct epoll_event events[1]; // list of events to give to epoll
79 struct epoll_event epe; // event definition for event to listen to
80 int ep_fd = -1; // epoll's file des (given to epoll_wait)
81 int rcv_fd; // file des that NNG tickles -- give this to epoll to listen on
82 int nready; // number of events ready for receive
83 rmr_mbuf_t* sbuf; // send buffer
84 rmr_mbuf_t* rbuf; // received buffer
85 int count = 0;
86 int rt_count = 0; // number of messages requiring a spin retry
87 int rcvd_count = 0;
88 char* listen_port = "43086";
89 int mtype = 0;
90 int stats_freq = 100;
91 int successful = 0; // set to true after we have a successful send
92 char wbuf[1024];
93 char trace[1024];
94 long timeout = 0;
95 int delay = 100000; // usec between send attempts
96 int nmsgs = 10; // number of messages to send
97
98 if( argc > 1 ) {
99 nmsgs = atoi( argv[1] );
100 }
101 if( argc > 2 ) {
102 delay = atoi( argv[2] );
103 }
104 if( argc > 3 ) {
105 listen_port = argv[3];
106 }
107
108 fprintf( stderr, "<SNDR> listen port: %s; sending %d messages; delay=%d\n", listen_port, nmsgs, delay );
109
110 if( (mrc = rmr_init( listen_port, 1400, RMRFL_NONE )) == NULL ) {
111 fprintf( stderr, "<SNDR> unable to initialise RMr\n" );
112 exit( 1 );
113 }
114
115 if( (rcv_fd = rmr_get_rcvfd( mrc )) >= 0 ) { // epoll only available from NNG -- skip receive later if not NNG
116 if( rcv_fd < 0 ) {
117 fprintf( stderr, "<SNDR> unable to set up polling fd\n" );
118 exit( 1 );
119 }
120 if( (ep_fd = epoll_create1( 0 )) < 0 ) {
121 fprintf( stderr, "<SNDR> [FAIL] unable to create epoll fd: %d\n", errno );
122 exit( 1 );
123 }
124 epe.events = EPOLLIN;
125 epe.data.fd = rcv_fd;
126
127 if( epoll_ctl( ep_fd, EPOLL_CTL_ADD, rcv_fd, &epe ) != 0 ) {
128 fprintf( stderr, "<SNDR> [FAIL] epoll_ctl status not 0 : %s\n", strerror( errno ) );
129 exit( 1 );
130 }
131 } else {
132 rmr_set_rtimeout( mrc, 0 ); // for nano we must set the receive timeout to 0; non-blocking receive
133 }
134
135 sbuf = rmr_alloc_msg( mrc, 512 ); // alloc first send buffer; subsequent buffers allcoated on send
136 //sbuf = rmr_tralloc_msg( mrc, 512, 11, "xxxxxxxxxx" ); // alloc first send buffer; subsequent buffers allcoated on send
137 rbuf = NULL; // don't need to alloc receive buffer
138
139 timeout = time( NULL ) + 20; // give rmr 20s to find the route table (shouldn't need that much)
140 while( ! rmr_ready( mrc ) ) { // must have a route table before we can send; wait til RMr says it has one
141 fprintf( stderr, "<SNDR> waiting for rmr to show ready\n" );
142 sleep( 1 );
143
144 if( time( NULL ) > timeout ) {
145 fprintf( stderr, "<SNDR> giving up\n" );
146 exit( 1 );
147 }
148 }
149 fprintf( stderr, "<SNDR> rmr is ready; starting to send\n" );
150
151 timeout = time( NULL ) + 20;
152
153 while( count < nmsgs ) { // we send 10 messages after the first message is successful
154 snprintf( trace, 100, "%lld", (long long) time( NULL ) );
155 rmr_set_trace( sbuf, trace, strlen( trace ) + 1 );
156 snprintf( wbuf, 200, "count=%d tr=%s %d stand up and cheer!", count, trace, rand() );
157 snprintf( sbuf->payload, 300, "%d %d|%s", sum( wbuf ), sum( trace ), wbuf );
158
159 sbuf->mtype = mtype; // fill in the message bits
160 sbuf->len = strlen( sbuf->payload ) + 1; // our receiver likely wants a nice acsii-z string
161 sbuf->state = 0;
162 sbuf = rmr_send_msg( mrc, sbuf ); // send it (send returns an empty payload on success, or the original payload on fail/retry)
163
164 switch( sbuf->state ) {
165 case RMR_ERR_RETRY:
166 rt_count++;
167 while( sbuf->state == RMR_ERR_RETRY ) { // soft failure (device busy?) retry
168 sbuf = rmr_send_msg( mrc, sbuf ); // retry send until it's good (simple test; real programmes should do better)
169 }
170 successful = 1;
171 break;
172
173 case RMR_OK:
174 successful = 1;
175 break;
176
177 default:
178 // some error (not connected likely), don't count this
179 break;
180 }
181
182 if( successful ) { // once we have a message that was sent, start to increase things
183 count++;
184 mtype++;
185 if( mtype > 10 ) { // if large number of sends don't require infinite rt entries :)
186 mtype = 1;
187 }
188 }
189
190 if( rcv_fd >= 0 ) {
191 while( (nready = epoll_wait( ep_fd, events, 1, 0 )) > 0 ) { // if something ready to receive (non-blocking check)
192 if( events[0].data.fd == rcv_fd ) { // we only are waiting on 1 thing, so [0] is ok
193 errno = 0;
194 rbuf = rmr_rcv_msg( mrc, rbuf );
195 if( rbuf ) {
196 rcvd_count++;
197 }
198 }
199 }
200 } else { // nano, we will only pick up one at a time.
201 if( (rbuf = rmr_rcv_msg( mrc, rbuf ) ) != NULL ) {
202 if( rbuf->state == RMR_OK ) {
203 rcvd_count++;
204 }
205 }
206 }
207
208 if( time( NULL ) > timeout ) { // should only happen if we never connect or nmsg > what we can send in 20sec
209 fprintf( stderr, "sender timeout\n" );
210 break;
211 }
212
213 if( delay > 0 ) {
214 usleep( delay );
215 }
216 }
217
218
219 timeout = time( NULL ) + 2; // allow 2 seconds for the pipe to drain from the receiver
220 while( time( NULL ) < timeout );
221 if( rcv_fd >= 0 ) {
222 while( (nready = epoll_wait( ep_fd, events, 1, 100 )) > 0 ) { // if something ready to receive (non-blocking check)
223 if( events[0].data.fd == rcv_fd ) { // we only are waiting on 1 thing, so [0] is ok
224 errno = 0;
225 rbuf = rmr_rcv_msg( mrc, rbuf );
226 if( rbuf ) {
227 rcvd_count++;
228 timeout = time( NULL ) + 2;
229 }
230 }
231 }
232 } else { // nano, we will only pick up one at a time.
233 if( (rbuf = rmr_torcv_msg( mrc, rbuf, 100 ) ) != NULL ) {
234 if( rbuf->state == RMR_OK ) {
235 rcvd_count++;
236 }
237 }
238 }
239
240 fprintf( stderr, "<SNDR> [%s] sent %d messages received %d acks retries=%d\n", count == nmsgs ? "PASS" : "FAIL", count, rcvd_count, rt_count );
241 rmr_close( mrc );
242
243 return !( count == nmsgs );
244}
245