| /******************************************************************************* |
| * ============LICENSE_START======================================================= |
| * org.onap.dmaap |
| * ================================================================================ |
| * Copyright © 2017 AT&T Intellectual Property. All rights reserved. |
| * ================================================================================ |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * ============LICENSE_END========================================================= |
| * |
| * ECOMP is a trademark and service mark of AT&T Intellectual Property. |
| * |
| *******************************************************************************/ |
| |
| /* |
| * This is a client library for the AT&T Cambria Event Routing Service. |
| */ |
| |
| #include "cambria.h" |
| |
| #include <arpa/inet.h> |
| #include <sys/socket.h> |
| #include <netdb.h> |
| #include <unistd.h> |
| #include <stdio.h> |
| #include <stdlib.h> |
| #include <stdarg.h> |
| #include <string.h> |
| |
| #include <string> |
| #include <list> |
| #include <sstream> |
| #include <iomanip> |
| #include <algorithm> |
| |
| // field used in JSON encoding to signal stream name |
| const char* kPartition = "cambria.partition"; |
| |
| // map from opaque handle to object pointer |
| #define toOpaque(x) ((CAMBRIA_CLIENT)x) |
| #define fromOpaque(x) ((cambriaClient*)x) |
| |
| // trace support |
| extern void traceOutput ( const char* format, ... ); |
| #ifdef CAMBRIA_TRACING |
| #define TRACE traceOutput |
| #else |
| #define TRACE 1 ? (void) 0 : traceOutput |
| #endif |
| |
| /* |
| * internal cambria client class |
| */ |
| class cambriaClient |
| { |
| public: |
| cambriaClient ( const std::string& host, int port, const std::string& topic, const std::string& format ); |
| ~cambriaClient (); |
| |
| cambriaSendResponse* send ( const char* streamName, const char* message ); |
| cambriaSendResponse* send ( const char* streamName, const char** message, unsigned int count ); |
| |
| cambriaGetResponse* get ( int timeoutMs, int limit ); |
| |
| private: |
| |
| std::string fHost; |
| int fPort; |
| std::string fTopic; |
| std::string fFormat; |
| |
| bool buildMessageBody ( const char* streamName, const char** msgs, unsigned int count, std::string& buffer ); |
| |
| void write ( int socket, const char* line ); |
| }; |
| |
| cambriaClient::cambriaClient ( const std::string& host, int port, const std::string& topic, const std::string& format ) : |
| fHost ( host ), |
| fPort ( port ), |
| fTopic ( topic ), |
| fFormat ( format ) |
| { |
| } |
| |
| cambriaClient::~cambriaClient () |
| { |
| } |
| |
| /* |
| * This isn't quite right -- if the message already has cambria.partition, |
| * it'll wind up with two entries. Also, message MUST start with '{' and |
| * have at least one field. |
| */ |
| static char* makeJsonMessage ( const char* streamName, const char* message ) |
| { |
| int len = ::strlen ( message ); |
| if ( streamName ) |
| { |
| len += ::strlen ( kPartition ); |
| len += ::strlen ( streamName ); |
| len += 6; // quote each and a colon and comma |
| } |
| |
| char* msg = new char [ len + 1 ]; |
| ::strcpy ( msg, "{" ); |
| if ( streamName ) |
| { |
| ::strcat ( msg, "\"" ); |
| ::strcat ( msg, kPartition ); |
| ::strcat ( msg, "\":\"" ); |
| ::strcat ( msg, streamName ); |
| ::strcat ( msg, "\"," ); |
| } |
| ::strcat ( msg, message + 1 ); |
| return msg; |
| } |
| |
| cambriaSendResponse* cambriaClient::send ( const char* streamName, const char* message ) |
| { |
| return send ( streamName, &message, 1 ); |
| } |
| |
| static bool replace ( std::string& str, const std::string& from, const std::string& to ) |
| { |
| size_t start_pos = str.find ( from ); |
| if(start_pos == std::string::npos) |
| return false; |
| str.replace(start_pos, from.length(), to); |
| return true; |
| } |
| |
| static void readResponse ( int s, std::string& response ) |
| { |
| char buffer [ 4096 ]; |
| |
| ssize_t n = 0; |
| while ( ( n = ::read ( s, buffer, 4095 ) ) > 0 ) |
| { |
| buffer[n] = '\0'; |
| response += buffer; |
| } |
| } |
| |
| static int openSocket ( std::string& host, int port, int& ss ) |
| { |
| TRACE( "connecting to %s\n", host.c_str() ); |
| |
| struct hostent *he = ::gethostbyname ( host.c_str() ); |
| if ( !he ) |
| { |
| TRACE("no host entry\n"); |
| return CAMBRIA_NO_HOST; |
| } |
| |
| if ( he->h_addrtype != AF_INET ) |
| { |
| TRACE("not AF_INET\n"); |
| return CAMBRIA_NO_HOST; |
| } |
| |
| int s = ::socket ( AF_INET, SOCK_STREAM, 0 ); |
| if ( s == -1 ) |
| { |
| TRACE("no socket available\n"); |
| return CAMBRIA_CANT_CONNECT; |
| } |
| |
| struct sockaddr_in servaddr; |
| ::memset ( &servaddr, 0, sizeof(servaddr) ); |
| |
| ::memcpy ( &servaddr.sin_addr, he->h_addr_list[0], he->h_length ); |
| servaddr.sin_family = AF_INET; |
| servaddr.sin_port = ::htons ( port ); |
| |
| if ( ::connect ( s, (struct sockaddr *)&servaddr, sizeof(servaddr) ) ) |
| { |
| TRACE("couldn't connect\n"); |
| return CAMBRIA_CANT_CONNECT; |
| } |
| |
| ss = s; |
| return 0; |
| } |
| |
| cambriaSendResponse* cambriaClient::send ( const char* streamName, const char** msgs, unsigned int count ) |
| { |
| TRACE ( "Sending %d messages.", count ); |
| |
| cambriaSendResponse* result = new cambriaSendResponse (); |
| result->statusCode = 0; |
| result->statusMessage = NULL; |
| result->responseBody = NULL; |
| |
| TRACE( "building message body\n" ); |
| |
| std::string body; |
| if ( !buildMessageBody ( streamName, msgs, count, body ) ) |
| { |
| result->statusCode = CAMBRIA_UNRECOGNIZED_FORMAT; |
| return result; |
| } |
| |
| int s = -1; |
| int err = ::openSocket ( fHost, fPort, s ); |
| if ( err > 0 ) |
| { |
| result->statusCode = err; |
| return result; |
| } |
| |
| // construct path |
| std::string path = "/cambriaApiServer/v1/event/"; |
| path += fTopic; |
| |
| // send post prefix |
| char line[4096]; |
| ::sprintf ( line, |
| "POST %s HTTP/1.0\r\n" |
| "Host: %s\r\n" |
| "Content-Type: %s\r\n" |
| "Content-Length: %d\r\n" |
| "\r\n", |
| path.c_str(), fHost.c_str(), fFormat.c_str(), body.length() ); |
| write ( s, line ); |
| |
| // send the body |
| write ( s, body.c_str() ); |
| |
| TRACE ( "\n" ); |
| TRACE ( "send complete, reading reply\n" ); |
| |
| // receive the response |
| std::string response; |
| readResponse ( s, response ); |
| ::close ( s ); |
| |
| // parse the header and body: split header and body on first occurrence of \r\n\r\n |
| result->statusCode = CAMBRIA_BAD_RESPONSE; |
| |
| size_t headerBreak = response.find ( "\r\n\r\n" ); |
| if ( headerBreak != std::string::npos ) |
| { |
| std::string responseBody = response.substr ( headerBreak + 4 ); |
| result->responseBody = new char [ responseBody.length() + 1 ]; |
| ::strcpy ( result->responseBody, responseBody.c_str() ); |
| |
| // all we need from the header for now is the status line |
| std::string headerPart = response.substr ( 0, headerBreak + 2 ); |
| |
| size_t newline = headerPart.find ( '\r' ); |
| if ( newline != std::string::npos ) |
| { |
| std::string statusLine = headerPart.substr ( 0, newline ); |
| |
| size_t firstSpace = statusLine.find ( ' ' ); |
| if ( firstSpace != std::string::npos ) |
| { |
| size_t secondSpace = statusLine.find ( ' ', firstSpace + 1 ); |
| if ( secondSpace != std::string::npos ) |
| { |
| result->statusCode = ::atoi ( statusLine.substr ( firstSpace + 1, secondSpace - firstSpace + 1 ).c_str() ); |
| std::string statusMessage = statusLine.substr ( secondSpace + 1 ); |
| result->statusMessage = new char [ statusMessage.length() + 1 ]; |
| ::strcpy ( result->statusMessage, statusMessage.c_str() ); |
| } |
| } |
| } |
| } |
| return result; |
| } |
| |
| void cambriaClient::write ( int socket, const char* str ) |
| { |
| int len = str ? ::strlen ( str ) : 0; |
| ::write ( socket, str, len ); |
| |
| // elaborate tracing nonsense... |
| std::string trace ( "> " ); |
| trace += str; |
| while ( replace ( trace, "\r\n", "\\r\\n\n> " ) ); |
| |
| TRACE ( "%s", trace.c_str() ); |
| } |
| |
| bool cambriaClient::buildMessageBody ( const char* streamName, const char** msgs, unsigned int count, std::string& buffer ) |
| { |
| if ( fFormat == CAMBRIA_NATIVE_FORMAT ) |
| { |
| int snLen = ::strlen ( streamName ); |
| for ( unsigned int i=0; i<count; i++ ) |
| { |
| const char* msg = msgs[i]; |
| |
| std::ostringstream s; |
| s << snLen << '.' << ::strlen(msg) << '.' << streamName << msg; |
| buffer.append ( s.str() ); |
| } |
| } |
| else if ( fFormat == CAMBRIA_JSON_FORMAT ) |
| { |
| buffer.append ( "[" ); |
| for ( unsigned int i=0; i<count; i++ ) |
| { |
| if ( i>0 ) |
| { |
| buffer.append ( "," ); |
| } |
| const char* msg = msgs[i]; |
| char* jsonMsg = ::makeJsonMessage ( streamName, msg ); |
| buffer.append ( jsonMsg ); |
| delete jsonMsg; // FIXME: allocating memory here just to delete it |
| } |
| buffer.append ( "]" ); |
| } |
| else |
| { |
| return false; |
| } |
| return true; |
| } |
| |
| // read the next string into value, and return the end pos, or 0 on error |
| static int readNextJsonString ( const std::string& body, int startPos, std::string& value ) |
| { |
| value = ""; |
| |
| if ( startPos >= body.length () ) |
| { |
| return 0; |
| } |
| |
| // skip a comma |
| int current = startPos; |
| if ( body[current] == ',' ) current++; |
| |
| if ( current >= body.length() || body[current] != '"' ) |
| { |
| return 0; |
| } |
| current++; |
| |
| // walk the string for the closing quote (FIXME: unicode support) |
| bool esc = false; |
| int hex = 0; |
| while ( ( body[current] != '"' || esc ) && current < body.length() ) |
| { |
| if ( hex > 0 ) |
| { |
| hex--; |
| if ( hex == 0 ) |
| { |
| // presumably read a unicode escape. this code isn't |
| // equipped for multibyte or unicode, so just skip it |
| value += '?'; |
| } |
| } |
| else if ( esc ) |
| { |
| esc = false; |
| switch ( body[current] ) |
| { |
| case '"': |
| case '\\': |
| case '/': |
| value += body[current]; |
| break; |
| |
| case 'b': value += '\b'; break; |
| case 'f': value += '\f'; break; |
| case 'n': value += '\n'; break; |
| case 'r': value += '\r'; break; |
| case 't': value += '\t'; break; |
| |
| case 'u': hex=4; break; |
| } |
| } |
| else |
| { |
| esc = body[current] == '\\'; |
| if ( !esc ) value += body[current]; |
| } |
| current++; |
| } |
| |
| return current + 1; |
| } |
| |
| static void readGetBody ( std::string& body, cambriaGetResponse& response ) |
| { |
| TRACE("response %s\n", body.c_str() ); |
| |
| if ( body.length() < 2 || body[0] != '[' || body[body.length()-1] != ']' ) |
| { |
| response.statusCode = CAMBRIA_BAD_RESPONSE; |
| } |
| |
| std::list<char*> msgs; |
| std::string val; |
| int current = 1; |
| while ( ( current = readNextJsonString ( body, current, val ) ) > 0 ) |
| { |
| char* msg = new char [ val.length() + 1 ]; |
| ::strcpy ( msg, val.c_str() ); |
| msgs.push_back ( msg ); |
| } |
| |
| // now build a response |
| response.messageCount = msgs.size(); |
| response.messageSet = new char* [ msgs.size() ]; |
| int index = 0; |
| for ( std::list<char*>::iterator it = msgs.begin(); it != msgs.end(); it++ ) |
| { |
| response.messageSet [ index++ ] = *it; |
| } |
| } |
| |
| cambriaGetResponse* cambriaClient::get ( int timeoutMs, int limit ) |
| { |
| cambriaGetResponse* result = new cambriaGetResponse (); |
| result->statusCode = 0; |
| result->statusMessage = NULL; |
| result->messageCount = 0; |
| result->messageSet = new char* [ 1 ]; |
| |
| int s = -1; |
| int err = ::openSocket ( fHost, fPort, s ); |
| if ( err > 0 ) |
| { |
| result->statusCode = err; |
| return result; |
| } |
| |
| // construct path |
| std::string path = "/cambriaApiServer/v1/event/"; |
| path += fTopic; |
| |
| bool haveAdds = false; |
| std::ostringstream adds; |
| if ( timeoutMs > -1 ) |
| { |
| adds << "timeout=" << timeoutMs; |
| haveAdds = true; |
| } |
| if ( limit > -1 ) |
| { |
| if ( haveAdds ) |
| { |
| adds << "&"; |
| } |
| adds << "limit=" << limit; |
| haveAdds = true; |
| } |
| if ( haveAdds ) |
| { |
| path += "?"; |
| path += adds.str(); |
| } |
| |
| // send post prefix |
| char line[4096]; |
| ::sprintf ( line, |
| "GET %s HTTP/1.0\r\n" |
| "Host: %s\r\n" |
| "\r\n", |
| path.c_str(), fHost.c_str() ); |
| write ( s, line ); |
| |
| TRACE ( "\n" ); |
| TRACE ( "request sent; reading reply\n" ); |
| |
| // receive the response (FIXME: would be nice to stream rather than load it all) |
| std::string response; |
| readResponse ( s, response ); |
| ::close ( s ); |
| |
| // parse the header and body: split header and body on first occurrence of \r\n\r\n |
| result->statusCode = CAMBRIA_BAD_RESPONSE; |
| |
| size_t headerBreak = response.find ( "\r\n\r\n" ); |
| if ( headerBreak != std::string::npos ) |
| { |
| // get the header line |
| std::string headerPart = response.substr ( 0, headerBreak + 2 ); |
| |
| size_t newline = headerPart.find ( '\r' ); |
| if ( newline != std::string::npos ) |
| { |
| std::string statusLine = headerPart.substr ( 0, newline ); |
| |
| size_t firstSpace = statusLine.find ( ' ' ); |
| if ( firstSpace != std::string::npos ) |
| { |
| size_t secondSpace = statusLine.find ( ' ', firstSpace + 1 ); |
| if ( secondSpace != std::string::npos ) |
| { |
| result->statusCode = ::atoi ( statusLine.substr ( firstSpace + 1, secondSpace - firstSpace + 1 ).c_str() ); |
| std::string statusMessage = statusLine.substr ( secondSpace + 1 ); |
| result->statusMessage = new char [ statusMessage.length() + 1 ]; |
| ::strcpy ( result->statusMessage, statusMessage.c_str() ); |
| } |
| } |
| } |
| |
| if ( result->statusCode < 300 ) |
| { |
| std::string responseBody = response.substr ( headerBreak + 4 ); |
| readGetBody ( responseBody, *result ); |
| } |
| } |
| return result; |
| } |
| |
| |
| /////////////////////////////////////////////////////////////////////////////// |
| /////////////////////////////////////////////////////////////////////////////// |
| /////////////////////////////////////////////////////////////////////////////// |
| |
| CAMBRIA_CLIENT cambriaCreateClient ( const char* host, int port, const char* topic, const char* format ) |
| { |
| cambriaClient* cc = new cambriaClient ( host, port, topic, format ); |
| return toOpaque(cc); |
| } |
| |
| void cambriaDestroyClient ( CAMBRIA_CLIENT client ) |
| { |
| delete fromOpaque ( client ); |
| } |
| |
| cambriaSendResponse* cambriaSendMessage ( CAMBRIA_CLIENT client, const char* streamName, const char* message ) |
| { |
| cambriaClient* c = fromOpaque ( client ); |
| return c->send ( streamName, message ); |
| } |
| |
| cambriaSendResponse* cambriaSendMessages ( CAMBRIA_CLIENT client, const char* streamName, const char** messages, unsigned int count ) |
| { |
| cambriaClient* c = fromOpaque ( client ); |
| return c->send ( streamName, messages, count ); |
| } |
| |
| cambriaGetResponse* cambriaGetMessages ( CAMBRIA_CLIENT client, unsigned long timeoutMs, unsigned int limit ) |
| { |
| cambriaClient* c = fromOpaque ( client ); |
| return c->get ( timeoutMs, limit ); |
| } |
| |
| void cambriaDestroySendResponse ( CAMBRIA_CLIENT client, const cambriaSendResponse* response ) |
| { |
| if ( response ) |
| { |
| delete response->statusMessage; |
| delete response->responseBody; |
| delete response; |
| } |
| } |
| |
| void cambriaDestroyGetResponse ( CAMBRIA_CLIENT client, const cambriaGetResponse* response ) |
| { |
| if ( response ) |
| { |
| delete response->statusMessage; |
| for ( int i=0; i<response->messageCount; i++ ) |
| { |
| delete response->messageSet[i]; |
| } |
| delete response; |
| } |
| } |
| |
| int cambriaSimpleSend ( const char* host, int port, const char* topic, const char* streamName, const char* msg ) |
| { |
| return cambriaSimpleSendMultiple ( host, port, topic, streamName, &msg, 1 ); |
| } |
| |
| int cambriaSimpleSendMultiple ( const char* host, int port, const char* topic, const char* streamName, const char** messages, unsigned int msgCount ) |
| { |
| int count = 0; |
| |
| const CAMBRIA_CLIENT cc = ::cambriaCreateClient ( host, port, topic, CAMBRIA_NATIVE_FORMAT ); |
| if ( cc ) |
| { |
| const cambriaSendResponse* response = ::cambriaSendMessages ( cc, streamName, messages, msgCount ); |
| if ( response && response->statusCode < 300 ) |
| { |
| count = msgCount; |
| } |
| ::cambriaDestroySendResponse ( cc, response ); |
| ::cambriaDestroyClient ( cc ); |
| } |
| |
| return count; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////////// |
| //////////////////////////////////////////////////////////////////////////////// |
| //////////////////////////////////////////////////////////////////////////////// |
| |
| const unsigned int kMaxTraceBuffer = 2048; |
| |
| static void writeTraceString ( const char* msg ) |
| { |
| ::fprintf ( stdout, "%s", msg ); |
| ::fflush ( stdout ); // because we want output before core dumping :-) |
| } |
| |
| void traceOutput ( const char* format, ... ) |
| { |
| char buffer [ kMaxTraceBuffer ]; |
| ::memset ( buffer, '\0', kMaxTraceBuffer * sizeof ( char ) ); |
| |
| va_list list; |
| va_start ( list, format ); |
| ::vsprintf ( buffer, format, list ); |
| writeTraceString ( buffer ); |
| va_end ( list ); |
| } |