blob: 7aff421b7779f334165657a1b5229193a96c64c2 [file] [log] [blame]
/*******************************************************************************
* ============LICENSE_START=======================================================
* org.onap.dmaap
* ================================================================================
* Copyright © 2017 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ============LICENSE_END=========================================================
*
* ECOMP is a trademark and service mark of AT&T Intellectual Property.
*
*******************************************************************************/
/*
* This is a client library for the AT&T Cambria Event Routing Service.
*/
#include "cambria.h"
#include <arpa/inet.h>
#include <sys/socket.h>
#include <netdb.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>
#include <string.h>
#include <string>
#include <list>
#include <sstream>
#include <iomanip>
#include <algorithm>
// field used in JSON encoding to signal stream name
const char* kPartition = "cambria.partition";
// map from opaque handle to object pointer
#define toOpaque(x) ((CAMBRIA_CLIENT)x)
#define fromOpaque(x) ((cambriaClient*)x)
// trace support
extern void traceOutput ( const char* format, ... );
#ifdef CAMBRIA_TRACING
#define TRACE traceOutput
#else
#define TRACE 1 ? (void) 0 : traceOutput
#endif
/*
* internal cambria client class
*/
class cambriaClient
{
public:
cambriaClient ( const std::string& host, int port, const std::string& topic, const std::string& format );
~cambriaClient ();
cambriaSendResponse* send ( const char* streamName, const char* message );
cambriaSendResponse* send ( const char* streamName, const char** message, unsigned int count );
cambriaGetResponse* get ( int timeoutMs, int limit );
private:
std::string fHost;
int fPort;
std::string fTopic;
std::string fFormat;
bool buildMessageBody ( const char* streamName, const char** msgs, unsigned int count, std::string& buffer );
void write ( int socket, const char* line );
};
cambriaClient::cambriaClient ( const std::string& host, int port, const std::string& topic, const std::string& format ) :
fHost ( host ),
fPort ( port ),
fTopic ( topic ),
fFormat ( format )
{
}
cambriaClient::~cambriaClient ()
{
}
/*
* This isn't quite right -- if the message already has cambria.partition,
* it'll wind up with two entries. Also, message MUST start with '{' and
* have at least one field.
*/
static char* makeJsonMessage ( const char* streamName, const char* message )
{
int len = ::strlen ( message );
if ( streamName )
{
len += ::strlen ( kPartition );
len += ::strlen ( streamName );
len += 6; // quote each and a colon and comma
}
char* msg = new char [ len + 1 ];
::strcpy ( msg, "{" );
if ( streamName )
{
::strcat ( msg, "\"" );
::strcat ( msg, kPartition );
::strcat ( msg, "\":\"" );
::strcat ( msg, streamName );
::strcat ( msg, "\"," );
}
::strcat ( msg, message + 1 );
return msg;
}
cambriaSendResponse* cambriaClient::send ( const char* streamName, const char* message )
{
return send ( streamName, &message, 1 );
}
static bool replace ( std::string& str, const std::string& from, const std::string& to )
{
size_t start_pos = str.find ( from );
if(start_pos == std::string::npos)
return false;
str.replace(start_pos, from.length(), to);
return true;
}
static void readResponse ( int s, std::string& response )
{
char buffer [ 4096 ];
ssize_t n = 0;
while ( ( n = ::read ( s, buffer, 4095 ) ) > 0 )
{
buffer[n] = '\0';
response += buffer;
}
}
static int openSocket ( std::string& host, int port, int& ss )
{
TRACE( "connecting to %s\n", host.c_str() );
struct hostent *he = ::gethostbyname ( host.c_str() );
if ( !he )
{
TRACE("no host entry\n");
return CAMBRIA_NO_HOST;
}
if ( he->h_addrtype != AF_INET )
{
TRACE("not AF_INET\n");
return CAMBRIA_NO_HOST;
}
int s = ::socket ( AF_INET, SOCK_STREAM, 0 );
if ( s == -1 )
{
TRACE("no socket available\n");
return CAMBRIA_CANT_CONNECT;
}
struct sockaddr_in servaddr;
::memset ( &servaddr, 0, sizeof(servaddr) );
::memcpy ( &servaddr.sin_addr, he->h_addr_list[0], he->h_length );
servaddr.sin_family = AF_INET;
servaddr.sin_port = ::htons ( port );
if ( ::connect ( s, (struct sockaddr *)&servaddr, sizeof(servaddr) ) )
{
TRACE("couldn't connect\n");
return CAMBRIA_CANT_CONNECT;
}
ss = s;
return 0;
}
cambriaSendResponse* cambriaClient::send ( const char* streamName, const char** msgs, unsigned int count )
{
TRACE ( "Sending %d messages.", count );
cambriaSendResponse* result = new cambriaSendResponse ();
result->statusCode = 0;
result->statusMessage = NULL;
result->responseBody = NULL;
TRACE( "building message body\n" );
std::string body;
if ( !buildMessageBody ( streamName, msgs, count, body ) )
{
result->statusCode = CAMBRIA_UNRECOGNIZED_FORMAT;
return result;
}
int s = -1;
int err = ::openSocket ( fHost, fPort, s );
if ( err > 0 )
{
result->statusCode = err;
return result;
}
// construct path
std::string path = "/cambriaApiServer/v1/event/";
path += fTopic;
// send post prefix
char line[4096];
::sprintf ( line,
"POST %s HTTP/1.0\r\n"
"Host: %s\r\n"
"Content-Type: %s\r\n"
"Content-Length: %d\r\n"
"\r\n",
path.c_str(), fHost.c_str(), fFormat.c_str(), body.length() );
write ( s, line );
// send the body
write ( s, body.c_str() );
TRACE ( "\n" );
TRACE ( "send complete, reading reply\n" );
// receive the response
std::string response;
readResponse ( s, response );
::close ( s );
// parse the header and body: split header and body on first occurrence of \r\n\r\n
result->statusCode = CAMBRIA_BAD_RESPONSE;
size_t headerBreak = response.find ( "\r\n\r\n" );
if ( headerBreak != std::string::npos )
{
std::string responseBody = response.substr ( headerBreak + 4 );
result->responseBody = new char [ responseBody.length() + 1 ];
::strcpy ( result->responseBody, responseBody.c_str() );
// all we need from the header for now is the status line
std::string headerPart = response.substr ( 0, headerBreak + 2 );
size_t newline = headerPart.find ( '\r' );
if ( newline != std::string::npos )
{
std::string statusLine = headerPart.substr ( 0, newline );
size_t firstSpace = statusLine.find ( ' ' );
if ( firstSpace != std::string::npos )
{
size_t secondSpace = statusLine.find ( ' ', firstSpace + 1 );
if ( secondSpace != std::string::npos )
{
result->statusCode = ::atoi ( statusLine.substr ( firstSpace + 1, secondSpace - firstSpace + 1 ).c_str() );
std::string statusMessage = statusLine.substr ( secondSpace + 1 );
result->statusMessage = new char [ statusMessage.length() + 1 ];
::strcpy ( result->statusMessage, statusMessage.c_str() );
}
}
}
}
return result;
}
void cambriaClient::write ( int socket, const char* str )
{
int len = str ? ::strlen ( str ) : 0;
::write ( socket, str, len );
// elaborate tracing nonsense...
std::string trace ( "> " );
trace += str;
while ( replace ( trace, "\r\n", "\\r\\n\n> " ) );
TRACE ( "%s", trace.c_str() );
}
bool cambriaClient::buildMessageBody ( const char* streamName, const char** msgs, unsigned int count, std::string& buffer )
{
if ( fFormat == CAMBRIA_NATIVE_FORMAT )
{
int snLen = ::strlen ( streamName );
for ( unsigned int i=0; i<count; i++ )
{
const char* msg = msgs[i];
std::ostringstream s;
s << snLen << '.' << ::strlen(msg) << '.' << streamName << msg;
buffer.append ( s.str() );
}
}
else if ( fFormat == CAMBRIA_JSON_FORMAT )
{
buffer.append ( "[" );
for ( unsigned int i=0; i<count; i++ )
{
if ( i>0 )
{
buffer.append ( "," );
}
const char* msg = msgs[i];
char* jsonMsg = ::makeJsonMessage ( streamName, msg );
buffer.append ( jsonMsg );
delete jsonMsg; // FIXME: allocating memory here just to delete it
}
buffer.append ( "]" );
}
else
{
return false;
}
return true;
}
// read the next string into value, and return the end pos, or 0 on error
static int readNextJsonString ( const std::string& body, int startPos, std::string& value )
{
value = "";
if ( startPos >= body.length () )
{
return 0;
}
// skip a comma
int current = startPos;
if ( body[current] == ',' ) current++;
if ( current >= body.length() || body[current] != '"' )
{
return 0;
}
current++;
// walk the string for the closing quote (FIXME: unicode support)
bool esc = false;
int hex = 0;
while ( ( body[current] != '"' || esc ) && current < body.length() )
{
if ( hex > 0 )
{
hex--;
if ( hex == 0 )
{
// presumably read a unicode escape. this code isn't
// equipped for multibyte or unicode, so just skip it
value += '?';
}
}
else if ( esc )
{
esc = false;
switch ( body[current] )
{
case '"':
case '\\':
case '/':
value += body[current];
break;
case 'b': value += '\b'; break;
case 'f': value += '\f'; break;
case 'n': value += '\n'; break;
case 'r': value += '\r'; break;
case 't': value += '\t'; break;
case 'u': hex=4; break;
}
}
else
{
esc = body[current] == '\\';
if ( !esc ) value += body[current];
}
current++;
}
return current + 1;
}
static void readGetBody ( std::string& body, cambriaGetResponse& response )
{
TRACE("response %s\n", body.c_str() );
if ( body.length() < 2 || body[0] != '[' || body[body.length()-1] != ']' )
{
response.statusCode = CAMBRIA_BAD_RESPONSE;
}
std::list<char*> msgs;
std::string val;
int current = 1;
while ( ( current = readNextJsonString ( body, current, val ) ) > 0 )
{
char* msg = new char [ val.length() + 1 ];
::strcpy ( msg, val.c_str() );
msgs.push_back ( msg );
}
// now build a response
response.messageCount = msgs.size();
response.messageSet = new char* [ msgs.size() ];
int index = 0;
for ( std::list<char*>::iterator it = msgs.begin(); it != msgs.end(); it++ )
{
response.messageSet [ index++ ] = *it;
}
}
cambriaGetResponse* cambriaClient::get ( int timeoutMs, int limit )
{
cambriaGetResponse* result = new cambriaGetResponse ();
result->statusCode = 0;
result->statusMessage = NULL;
result->messageCount = 0;
result->messageSet = new char* [ 1 ];
int s = -1;
int err = ::openSocket ( fHost, fPort, s );
if ( err > 0 )
{
result->statusCode = err;
return result;
}
// construct path
std::string path = "/cambriaApiServer/v1/event/";
path += fTopic;
bool haveAdds = false;
std::ostringstream adds;
if ( timeoutMs > -1 )
{
adds << "timeout=" << timeoutMs;
haveAdds = true;
}
if ( limit > -1 )
{
if ( haveAdds )
{
adds << "&";
}
adds << "limit=" << limit;
haveAdds = true;
}
if ( haveAdds )
{
path += "?";
path += adds.str();
}
// send post prefix
char line[4096];
::sprintf ( line,
"GET %s HTTP/1.0\r\n"
"Host: %s\r\n"
"\r\n",
path.c_str(), fHost.c_str() );
write ( s, line );
TRACE ( "\n" );
TRACE ( "request sent; reading reply\n" );
// receive the response (FIXME: would be nice to stream rather than load it all)
std::string response;
readResponse ( s, response );
::close ( s );
// parse the header and body: split header and body on first occurrence of \r\n\r\n
result->statusCode = CAMBRIA_BAD_RESPONSE;
size_t headerBreak = response.find ( "\r\n\r\n" );
if ( headerBreak != std::string::npos )
{
// get the header line
std::string headerPart = response.substr ( 0, headerBreak + 2 );
size_t newline = headerPart.find ( '\r' );
if ( newline != std::string::npos )
{
std::string statusLine = headerPart.substr ( 0, newline );
size_t firstSpace = statusLine.find ( ' ' );
if ( firstSpace != std::string::npos )
{
size_t secondSpace = statusLine.find ( ' ', firstSpace + 1 );
if ( secondSpace != std::string::npos )
{
result->statusCode = ::atoi ( statusLine.substr ( firstSpace + 1, secondSpace - firstSpace + 1 ).c_str() );
std::string statusMessage = statusLine.substr ( secondSpace + 1 );
result->statusMessage = new char [ statusMessage.length() + 1 ];
::strcpy ( result->statusMessage, statusMessage.c_str() );
}
}
}
if ( result->statusCode < 300 )
{
std::string responseBody = response.substr ( headerBreak + 4 );
readGetBody ( responseBody, *result );
}
}
return result;
}
///////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////////
CAMBRIA_CLIENT cambriaCreateClient ( const char* host, int port, const char* topic, const char* format )
{
cambriaClient* cc = new cambriaClient ( host, port, topic, format );
return toOpaque(cc);
}
void cambriaDestroyClient ( CAMBRIA_CLIENT client )
{
delete fromOpaque ( client );
}
cambriaSendResponse* cambriaSendMessage ( CAMBRIA_CLIENT client, const char* streamName, const char* message )
{
cambriaClient* c = fromOpaque ( client );
return c->send ( streamName, message );
}
cambriaSendResponse* cambriaSendMessages ( CAMBRIA_CLIENT client, const char* streamName, const char** messages, unsigned int count )
{
cambriaClient* c = fromOpaque ( client );
return c->send ( streamName, messages, count );
}
cambriaGetResponse* cambriaGetMessages ( CAMBRIA_CLIENT client, unsigned long timeoutMs, unsigned int limit )
{
cambriaClient* c = fromOpaque ( client );
return c->get ( timeoutMs, limit );
}
void cambriaDestroySendResponse ( CAMBRIA_CLIENT client, const cambriaSendResponse* response )
{
if ( response )
{
delete response->statusMessage;
delete response->responseBody;
delete response;
}
}
void cambriaDestroyGetResponse ( CAMBRIA_CLIENT client, const cambriaGetResponse* response )
{
if ( response )
{
delete response->statusMessage;
for ( int i=0; i<response->messageCount; i++ )
{
delete response->messageSet[i];
}
delete response;
}
}
int cambriaSimpleSend ( const char* host, int port, const char* topic, const char* streamName, const char* msg )
{
return cambriaSimpleSendMultiple ( host, port, topic, streamName, &msg, 1 );
}
int cambriaSimpleSendMultiple ( const char* host, int port, const char* topic, const char* streamName, const char** messages, unsigned int msgCount )
{
int count = 0;
const CAMBRIA_CLIENT cc = ::cambriaCreateClient ( host, port, topic, CAMBRIA_NATIVE_FORMAT );
if ( cc )
{
const cambriaSendResponse* response = ::cambriaSendMessages ( cc, streamName, messages, msgCount );
if ( response && response->statusCode < 300 )
{
count = msgCount;
}
::cambriaDestroySendResponse ( cc, response );
::cambriaDestroyClient ( cc );
}
return count;
}
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
const unsigned int kMaxTraceBuffer = 2048;
static void writeTraceString ( const char* msg )
{
::fprintf ( stdout, "%s", msg );
::fflush ( stdout ); // because we want output before core dumping :-)
}
void traceOutput ( const char* format, ... )
{
char buffer [ kMaxTraceBuffer ];
::memset ( buffer, '\0', kMaxTraceBuffer * sizeof ( char ) );
va_list list;
va_start ( list, format );
::vsprintf ( buffer, format, list );
writeTraceString ( buffer );
va_end ( list );
}