blob: 7aff421b7779f334165657a1b5229193a96c64c2 [file] [log] [blame]
Varun Gudisenacc9de9b2017-08-30 20:49:32 -05001/*******************************************************************************
2 * ============LICENSE_START=======================================================
3 * org.onap.dmaap
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
18 *
19 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
20 *
21 *******************************************************************************/
22
23/*
24 * This is a client library for the AT&T Cambria Event Routing Service.
25 */
26
27#include "cambria.h"
28
29#include <arpa/inet.h>
30#include <sys/socket.h>
31#include <netdb.h>
32#include <unistd.h>
33#include <stdio.h>
34#include <stdlib.h>
35#include <stdarg.h>
36#include <string.h>
37
38#include <string>
39#include <list>
40#include <sstream>
41#include <iomanip>
42#include <algorithm>
43
44// field used in JSON encoding to signal stream name
45const char* kPartition = "cambria.partition";
46
47// map from opaque handle to object pointer
48#define toOpaque(x) ((CAMBRIA_CLIENT)x)
49#define fromOpaque(x) ((cambriaClient*)x)
50
51// trace support
52extern void traceOutput ( const char* format, ... );
53#ifdef CAMBRIA_TRACING
54 #define TRACE traceOutput
55#else
56 #define TRACE 1 ? (void) 0 : traceOutput
57#endif
58
59/*
60 * internal cambria client class
61 */
62class cambriaClient
63{
64public:
65 cambriaClient ( const std::string& host, int port, const std::string& topic, const std::string& format );
66 ~cambriaClient ();
67
68 cambriaSendResponse* send ( const char* streamName, const char* message );
69 cambriaSendResponse* send ( const char* streamName, const char** message, unsigned int count );
70
71 cambriaGetResponse* get ( int timeoutMs, int limit );
72
73private:
74
75 std::string fHost;
76 int fPort;
77 std::string fTopic;
78 std::string fFormat;
79
80 bool buildMessageBody ( const char* streamName, const char** msgs, unsigned int count, std::string& buffer );
81
82 void write ( int socket, const char* line );
83};
84
85cambriaClient::cambriaClient ( const std::string& host, int port, const std::string& topic, const std::string& format ) :
86 fHost ( host ),
87 fPort ( port ),
88 fTopic ( topic ),
89 fFormat ( format )
90{
91}
92
93cambriaClient::~cambriaClient ()
94{
95}
96
97/*
98 * This isn't quite right -- if the message already has cambria.partition,
99 * it'll wind up with two entries. Also, message MUST start with '{' and
100 * have at least one field.
101 */
102static char* makeJsonMessage ( const char* streamName, const char* message )
103{
104 int len = ::strlen ( message );
105 if ( streamName )
106 {
107 len += ::strlen ( kPartition );
108 len += ::strlen ( streamName );
109 len += 6; // quote each and a colon and comma
110 }
111
112 char* msg = new char [ len + 1 ];
113 ::strcpy ( msg, "{" );
114 if ( streamName )
115 {
116 ::strcat ( msg, "\"" );
117 ::strcat ( msg, kPartition );
118 ::strcat ( msg, "\":\"" );
119 ::strcat ( msg, streamName );
120 ::strcat ( msg, "\"," );
121 }
122 ::strcat ( msg, message + 1 );
123 return msg;
124}
125
126cambriaSendResponse* cambriaClient::send ( const char* streamName, const char* message )
127{
128 return send ( streamName, &message, 1 );
129}
130
131static bool replace ( std::string& str, const std::string& from, const std::string& to )
132{
133 size_t start_pos = str.find ( from );
134 if(start_pos == std::string::npos)
135 return false;
136 str.replace(start_pos, from.length(), to);
137 return true;
138}
139
140static void readResponse ( int s, std::string& response )
141{
142 char buffer [ 4096 ];
143
144 ssize_t n = 0;
145 while ( ( n = ::read ( s, buffer, 4095 ) ) > 0 )
146 {
147 buffer[n] = '\0';
148 response += buffer;
149 }
150}
151
152static int openSocket ( std::string& host, int port, int& ss )
153{
154 TRACE( "connecting to %s\n", host.c_str() );
155
156 struct hostent *he = ::gethostbyname ( host.c_str() );
157 if ( !he )
158 {
159 TRACE("no host entry\n");
160 return CAMBRIA_NO_HOST;
161 }
162
163 if ( he->h_addrtype != AF_INET )
164 {
165 TRACE("not AF_INET\n");
166 return CAMBRIA_NO_HOST;
167 }
168
169 int s = ::socket ( AF_INET, SOCK_STREAM, 0 );
170 if ( s == -1 )
171 {
172 TRACE("no socket available\n");
173 return CAMBRIA_CANT_CONNECT;
174 }
175
176 struct sockaddr_in servaddr;
177 ::memset ( &servaddr, 0, sizeof(servaddr) );
178
179 ::memcpy ( &servaddr.sin_addr, he->h_addr_list[0], he->h_length );
180 servaddr.sin_family = AF_INET;
181 servaddr.sin_port = ::htons ( port );
182
183 if ( ::connect ( s, (struct sockaddr *)&servaddr, sizeof(servaddr) ) )
184 {
185 TRACE("couldn't connect\n");
186 return CAMBRIA_CANT_CONNECT;
187 }
188
189 ss = s;
190 return 0;
191}
192
193cambriaSendResponse* cambriaClient::send ( const char* streamName, const char** msgs, unsigned int count )
194{
195 TRACE ( "Sending %d messages.", count );
196
197 cambriaSendResponse* result = new cambriaSendResponse ();
198 result->statusCode = 0;
199 result->statusMessage = NULL;
200 result->responseBody = NULL;
201
202 TRACE( "building message body\n" );
203
204 std::string body;
205 if ( !buildMessageBody ( streamName, msgs, count, body ) )
206 {
207 result->statusCode = CAMBRIA_UNRECOGNIZED_FORMAT;
208 return result;
209 }
210
211 int s = -1;
212 int err = ::openSocket ( fHost, fPort, s );
213 if ( err > 0 )
214 {
215 result->statusCode = err;
216 return result;
217 }
218
219 // construct path
220 std::string path = "/cambriaApiServer/v1/event/";
221 path += fTopic;
222
223 // send post prefix
224 char line[4096];
225 ::sprintf ( line,
226 "POST %s HTTP/1.0\r\n"
227 "Host: %s\r\n"
228 "Content-Type: %s\r\n"
229 "Content-Length: %d\r\n"
230 "\r\n",
231 path.c_str(), fHost.c_str(), fFormat.c_str(), body.length() );
232 write ( s, line );
233
234 // send the body
235 write ( s, body.c_str() );
236
237 TRACE ( "\n" );
238 TRACE ( "send complete, reading reply\n" );
239
240 // receive the response
241 std::string response;
242 readResponse ( s, response );
243 ::close ( s );
244
245 // parse the header and body: split header and body on first occurrence of \r\n\r\n
246 result->statusCode = CAMBRIA_BAD_RESPONSE;
247
248 size_t headerBreak = response.find ( "\r\n\r\n" );
249 if ( headerBreak != std::string::npos )
250 {
251 std::string responseBody = response.substr ( headerBreak + 4 );
252 result->responseBody = new char [ responseBody.length() + 1 ];
253 ::strcpy ( result->responseBody, responseBody.c_str() );
254
255 // all we need from the header for now is the status line
256 std::string headerPart = response.substr ( 0, headerBreak + 2 );
257
258 size_t newline = headerPart.find ( '\r' );
259 if ( newline != std::string::npos )
260 {
261 std::string statusLine = headerPart.substr ( 0, newline );
262
263 size_t firstSpace = statusLine.find ( ' ' );
264 if ( firstSpace != std::string::npos )
265 {
266 size_t secondSpace = statusLine.find ( ' ', firstSpace + 1 );
267 if ( secondSpace != std::string::npos )
268 {
269 result->statusCode = ::atoi ( statusLine.substr ( firstSpace + 1, secondSpace - firstSpace + 1 ).c_str() );
270 std::string statusMessage = statusLine.substr ( secondSpace + 1 );
271 result->statusMessage = new char [ statusMessage.length() + 1 ];
272 ::strcpy ( result->statusMessage, statusMessage.c_str() );
273 }
274 }
275 }
276 }
277 return result;
278}
279
280void cambriaClient::write ( int socket, const char* str )
281{
282 int len = str ? ::strlen ( str ) : 0;
283 ::write ( socket, str, len );
284
285 // elaborate tracing nonsense...
286 std::string trace ( "> " );
287 trace += str;
288 while ( replace ( trace, "\r\n", "\\r\\n\n> " ) );
289
290 TRACE ( "%s", trace.c_str() );
291}
292
293bool cambriaClient::buildMessageBody ( const char* streamName, const char** msgs, unsigned int count, std::string& buffer )
294{
295 if ( fFormat == CAMBRIA_NATIVE_FORMAT )
296 {
297 int snLen = ::strlen ( streamName );
298 for ( unsigned int i=0; i<count; i++ )
299 {
300 const char* msg = msgs[i];
301
302 std::ostringstream s;
303 s << snLen << '.' << ::strlen(msg) << '.' << streamName << msg;
304 buffer.append ( s.str() );
305 }
306 }
307 else if ( fFormat == CAMBRIA_JSON_FORMAT )
308 {
309 buffer.append ( "[" );
310 for ( unsigned int i=0; i<count; i++ )
311 {
312 if ( i>0 )
313 {
314 buffer.append ( "," );
315 }
316 const char* msg = msgs[i];
317 char* jsonMsg = ::makeJsonMessage ( streamName, msg );
318 buffer.append ( jsonMsg );
319 delete jsonMsg; // FIXME: allocating memory here just to delete it
320 }
321 buffer.append ( "]" );
322 }
323 else
324 {
325 return false;
326 }
327 return true;
328}
329
330// read the next string into value, and return the end pos, or 0 on error
331static int readNextJsonString ( const std::string& body, int startPos, std::string& value )
332{
333 value = "";
334
335 if ( startPos >= body.length () )
336 {
337 return 0;
338 }
339
340 // skip a comma
341 int current = startPos;
342 if ( body[current] == ',' ) current++;
343
344 if ( current >= body.length() || body[current] != '"' )
345 {
346 return 0;
347 }
348 current++;
349
350 // walk the string for the closing quote (FIXME: unicode support)
351 bool esc = false;
352 int hex = 0;
353 while ( ( body[current] != '"' || esc ) && current < body.length() )
354 {
355 if ( hex > 0 )
356 {
357 hex--;
358 if ( hex == 0 )
359 {
360 // presumably read a unicode escape. this code isn't
361 // equipped for multibyte or unicode, so just skip it
362 value += '?';
363 }
364 }
365 else if ( esc )
366 {
367 esc = false;
368 switch ( body[current] )
369 {
370 case '"':
371 case '\\':
372 case '/':
373 value += body[current];
374 break;
375
376 case 'b': value += '\b'; break;
377 case 'f': value += '\f'; break;
378 case 'n': value += '\n'; break;
379 case 'r': value += '\r'; break;
380 case 't': value += '\t'; break;
381
382 case 'u': hex=4; break;
383 }
384 }
385 else
386 {
387 esc = body[current] == '\\';
388 if ( !esc ) value += body[current];
389 }
390 current++;
391 }
392
393 return current + 1;
394}
395
396static void readGetBody ( std::string& body, cambriaGetResponse& response )
397{
398 TRACE("response %s\n", body.c_str() );
399
400 if ( body.length() < 2 || body[0] != '[' || body[body.length()-1] != ']' )
401 {
402 response.statusCode = CAMBRIA_BAD_RESPONSE;
403 }
404
405 std::list<char*> msgs;
406 std::string val;
407 int current = 1;
408 while ( ( current = readNextJsonString ( body, current, val ) ) > 0 )
409 {
410 char* msg = new char [ val.length() + 1 ];
411 ::strcpy ( msg, val.c_str() );
412 msgs.push_back ( msg );
413 }
414
415 // now build a response
416 response.messageCount = msgs.size();
417 response.messageSet = new char* [ msgs.size() ];
418 int index = 0;
419 for ( std::list<char*>::iterator it = msgs.begin(); it != msgs.end(); it++ )
420 {
421 response.messageSet [ index++ ] = *it;
422 }
423}
424
425cambriaGetResponse* cambriaClient::get ( int timeoutMs, int limit )
426{
427 cambriaGetResponse* result = new cambriaGetResponse ();
428 result->statusCode = 0;
429 result->statusMessage = NULL;
430 result->messageCount = 0;
431 result->messageSet = new char* [ 1 ];
432
433 int s = -1;
434 int err = ::openSocket ( fHost, fPort, s );
435 if ( err > 0 )
436 {
437 result->statusCode = err;
438 return result;
439 }
440
441 // construct path
442 std::string path = "/cambriaApiServer/v1/event/";
443 path += fTopic;
444
445 bool haveAdds = false;
446 std::ostringstream adds;
447 if ( timeoutMs > -1 )
448 {
449 adds << "timeout=" << timeoutMs;
450 haveAdds = true;
451 }
452 if ( limit > -1 )
453 {
454 if ( haveAdds )
455 {
456 adds << "&";
457 }
458 adds << "limit=" << limit;
459 haveAdds = true;
460 }
461 if ( haveAdds )
462 {
463 path += "?";
464 path += adds.str();
465 }
466
467 // send post prefix
468 char line[4096];
469 ::sprintf ( line,
470 "GET %s HTTP/1.0\r\n"
471 "Host: %s\r\n"
472 "\r\n",
473 path.c_str(), fHost.c_str() );
474 write ( s, line );
475
476 TRACE ( "\n" );
477 TRACE ( "request sent; reading reply\n" );
478
479 // receive the response (FIXME: would be nice to stream rather than load it all)
480 std::string response;
481 readResponse ( s, response );
482 ::close ( s );
483
484 // parse the header and body: split header and body on first occurrence of \r\n\r\n
485 result->statusCode = CAMBRIA_BAD_RESPONSE;
486
487 size_t headerBreak = response.find ( "\r\n\r\n" );
488 if ( headerBreak != std::string::npos )
489 {
490 // get the header line
491 std::string headerPart = response.substr ( 0, headerBreak + 2 );
492
493 size_t newline = headerPart.find ( '\r' );
494 if ( newline != std::string::npos )
495 {
496 std::string statusLine = headerPart.substr ( 0, newline );
497
498 size_t firstSpace = statusLine.find ( ' ' );
499 if ( firstSpace != std::string::npos )
500 {
501 size_t secondSpace = statusLine.find ( ' ', firstSpace + 1 );
502 if ( secondSpace != std::string::npos )
503 {
504 result->statusCode = ::atoi ( statusLine.substr ( firstSpace + 1, secondSpace - firstSpace + 1 ).c_str() );
505 std::string statusMessage = statusLine.substr ( secondSpace + 1 );
506 result->statusMessage = new char [ statusMessage.length() + 1 ];
507 ::strcpy ( result->statusMessage, statusMessage.c_str() );
508 }
509 }
510 }
511
512 if ( result->statusCode < 300 )
513 {
514 std::string responseBody = response.substr ( headerBreak + 4 );
515 readGetBody ( responseBody, *result );
516 }
517 }
518 return result;
519}
520
521
522///////////////////////////////////////////////////////////////////////////////
523///////////////////////////////////////////////////////////////////////////////
524///////////////////////////////////////////////////////////////////////////////
525
526CAMBRIA_CLIENT cambriaCreateClient ( const char* host, int port, const char* topic, const char* format )
527{
528 cambriaClient* cc = new cambriaClient ( host, port, topic, format );
529 return toOpaque(cc);
530}
531
532void cambriaDestroyClient ( CAMBRIA_CLIENT client )
533{
534 delete fromOpaque ( client );
535}
536
537cambriaSendResponse* cambriaSendMessage ( CAMBRIA_CLIENT client, const char* streamName, const char* message )
538{
539 cambriaClient* c = fromOpaque ( client );
540 return c->send ( streamName, message );
541}
542
543cambriaSendResponse* cambriaSendMessages ( CAMBRIA_CLIENT client, const char* streamName, const char** messages, unsigned int count )
544{
545 cambriaClient* c = fromOpaque ( client );
546 return c->send ( streamName, messages, count );
547}
548
549cambriaGetResponse* cambriaGetMessages ( CAMBRIA_CLIENT client, unsigned long timeoutMs, unsigned int limit )
550{
551 cambriaClient* c = fromOpaque ( client );
552 return c->get ( timeoutMs, limit );
553}
554
555void cambriaDestroySendResponse ( CAMBRIA_CLIENT client, const cambriaSendResponse* response )
556{
557 if ( response )
558 {
559 delete response->statusMessage;
560 delete response->responseBody;
561 delete response;
562 }
563}
564
565void cambriaDestroyGetResponse ( CAMBRIA_CLIENT client, const cambriaGetResponse* response )
566{
567 if ( response )
568 {
569 delete response->statusMessage;
570 for ( int i=0; i<response->messageCount; i++ )
571 {
572 delete response->messageSet[i];
573 }
574 delete response;
575 }
576}
577
578int cambriaSimpleSend ( const char* host, int port, const char* topic, const char* streamName, const char* msg )
579{
580 return cambriaSimpleSendMultiple ( host, port, topic, streamName, &msg, 1 );
581}
582
583int cambriaSimpleSendMultiple ( const char* host, int port, const char* topic, const char* streamName, const char** messages, unsigned int msgCount )
584{
585 int count = 0;
586
587 const CAMBRIA_CLIENT cc = ::cambriaCreateClient ( host, port, topic, CAMBRIA_NATIVE_FORMAT );
588 if ( cc )
589 {
590 const cambriaSendResponse* response = ::cambriaSendMessages ( cc, streamName, messages, msgCount );
591 if ( response && response->statusCode < 300 )
592 {
593 count = msgCount;
594 }
595 ::cambriaDestroySendResponse ( cc, response );
596 ::cambriaDestroyClient ( cc );
597 }
598
599 return count;
600}
601
602////////////////////////////////////////////////////////////////////////////////
603////////////////////////////////////////////////////////////////////////////////
604////////////////////////////////////////////////////////////////////////////////
605
606const unsigned int kMaxTraceBuffer = 2048;
607
608static void writeTraceString ( const char* msg )
609{
610 ::fprintf ( stdout, "%s", msg );
611 ::fflush ( stdout ); // because we want output before core dumping :-)
612}
613
614void traceOutput ( const char* format, ... )
615{
616 char buffer [ kMaxTraceBuffer ];
617 ::memset ( buffer, '\0', kMaxTraceBuffer * sizeof ( char ) );
618
619 va_list list;
620 va_start ( list, format );
621 ::vsprintf ( buffer, format, list );
622 writeTraceString ( buffer );
623 va_end ( list );
624}