Add Initial Code Import

Intial Code import for dmaapClient

Issue-id: DMAAP-82
Change-Id: Ib627672d37e233b796619f93dd91f5caaf1592e4
Signed-off-by: Varun Gudisena <vg411h@att.com>
diff --git a/src/main/cpp/cambria.cpp b/src/main/cpp/cambria.cpp
new file mode 100644
index 0000000..7aff421
--- /dev/null
+++ b/src/main/cpp/cambria.cpp
@@ -0,0 +1,624 @@
+/*******************************************************************************
+ *  ============LICENSE_START=======================================================
+ *  org.onap.dmaap
+ *  ================================================================================
+ *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *  ============LICENSE_END=========================================================
+ *
+ *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *  
+ *******************************************************************************/
+
+/*
+ * This is a client library for the AT&T Cambria Event Routing Service.
+ */
+
+#include "cambria.h"
+
+#include <arpa/inet.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <unistd.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <stdarg.h>
+#include <string.h>
+
+#include <string>
+#include <list>
+#include <sstream>
+#include <iomanip>
+#include <algorithm>
+
+// field used in JSON encoding to signal stream name
+const char* kPartition = "cambria.partition";
+
+// map from opaque handle to object pointer
+#define toOpaque(x) ((CAMBRIA_CLIENT)x)
+#define fromOpaque(x) ((cambriaClient*)x)
+
+// trace support
+extern void traceOutput ( const char* format, ... );
+#ifdef CAMBRIA_TRACING
+	#define TRACE traceOutput
+#else
+	#define TRACE 1 ? (void) 0 : traceOutput
+#endif
+
+/*
+ *	internal cambria client class
+ */
+class cambriaClient
+{
+public:
+	cambriaClient ( const std::string& host, int port, const std::string& topic, const std::string& format );
+	~cambriaClient ();
+
+	cambriaSendResponse* send ( const char* streamName, const char* message );
+	cambriaSendResponse* send ( const char* streamName, const char** message, unsigned int count );
+
+	cambriaGetResponse* get ( int timeoutMs, int limit );
+
+private:
+
+	std::string fHost;
+	int fPort;
+	std::string fTopic;
+	std::string fFormat;
+
+	bool buildMessageBody ( const char* streamName, const char** msgs, unsigned int count, std::string& buffer );
+
+	void write ( int socket, const char* line );
+};
+
+cambriaClient::cambriaClient ( const std::string& host, int port, const std::string& topic, const std::string& format ) :
+	fHost ( host ),
+	fPort ( port ),
+	fTopic ( topic ),
+	fFormat ( format )
+{
+}
+
+cambriaClient::~cambriaClient ()
+{
+}
+
+/*
+ *	This isn't quite right -- if the message already has cambria.partition,
+ *	it'll wind up with two entries. Also, message MUST start with '{' and
+ *	have at least one field.
+ */
+static char* makeJsonMessage ( const char* streamName, const char* message )
+{
+	int len = ::strlen ( message );
+	if ( streamName )
+	{
+		len += ::strlen ( kPartition );
+		len += ::strlen ( streamName );
+		len += 6; 	// quote each and a colon and comma
+	}
+
+	char* msg = new char [ len + 1 ];
+	::strcpy ( msg, "{" );
+	if ( streamName )
+	{
+		::strcat ( msg, "\"" );
+		::strcat ( msg, kPartition );
+		::strcat ( msg, "\":\"" );
+		::strcat ( msg, streamName );
+		::strcat ( msg, "\"," );
+	}
+	::strcat ( msg, message + 1 );
+	return msg;
+}
+
+cambriaSendResponse* cambriaClient::send ( const char* streamName, const char* message )
+{
+	return send ( streamName, &message, 1 );
+}
+
+static bool replace ( std::string& str, const std::string& from, const std::string& to )
+{
+	size_t start_pos = str.find ( from );
+	if(start_pos == std::string::npos)
+		return false;
+	str.replace(start_pos, from.length(), to);
+	return true;
+}
+
+static void readResponse ( int s, std::string& response )
+{
+	char buffer [ 4096 ];
+
+	ssize_t n = 0;
+	while ( ( n = ::read ( s, buffer, 4095 ) ) > 0 )
+	{
+		buffer[n] = '\0';
+		response += buffer;
+	}
+}
+
+static int openSocket ( std::string& host, int port, int& ss )
+{
+	TRACE( "connecting to %s\n", host.c_str() );
+
+	struct hostent *he = ::gethostbyname ( host.c_str() );
+	if ( !he )
+	{
+		TRACE("no host entry\n");
+		return CAMBRIA_NO_HOST;
+	}
+
+	if ( he->h_addrtype != AF_INET )
+	{
+		TRACE("not AF_INET\n");
+		return CAMBRIA_NO_HOST;
+	}
+
+	int s = ::socket ( AF_INET, SOCK_STREAM, 0 );
+	if ( s == -1 )
+	{
+		TRACE("no socket available\n");
+		return CAMBRIA_CANT_CONNECT;
+	}
+
+	struct sockaddr_in servaddr;
+	::memset ( &servaddr, 0, sizeof(servaddr) );
+
+	::memcpy ( &servaddr.sin_addr, he->h_addr_list[0], he->h_length );
+	servaddr.sin_family = AF_INET;
+	servaddr.sin_port = ::htons ( port );
+
+	if ( ::connect ( s, (struct sockaddr *)&servaddr, sizeof(servaddr) ) )
+	{
+		TRACE("couldn't connect\n");
+		return CAMBRIA_CANT_CONNECT;
+	}
+
+	ss = s;
+	return 0;
+}
+
+cambriaSendResponse* cambriaClient::send ( const char* streamName, const char** msgs, unsigned int count )
+{
+	TRACE ( "Sending %d messages.", count );
+
+	cambriaSendResponse* result = new cambriaSendResponse ();
+	result->statusCode = 0;
+	result->statusMessage = NULL;
+	result->responseBody = NULL;
+
+	TRACE( "building message body\n" );
+
+	std::string body;
+	if ( !buildMessageBody ( streamName, msgs, count, body ) )
+	{
+		result->statusCode = CAMBRIA_UNRECOGNIZED_FORMAT;
+		return result;
+	}
+
+	int s = -1;
+	int err = ::openSocket ( fHost, fPort, s );
+	if ( err > 0 )
+	{
+		result->statusCode = err;
+		return result;
+	}
+
+	// construct path
+	std::string path = "/cambriaApiServer/v1/event/";
+	path += fTopic;
+
+	// send post prefix
+	char line[4096];
+	::sprintf ( line,
+		"POST %s HTTP/1.0\r\n"
+		"Host: %s\r\n"
+		"Content-Type: %s\r\n"
+		"Content-Length: %d\r\n"
+		"\r\n",
+		path.c_str(), fHost.c_str(), fFormat.c_str(), body.length() );
+	write ( s, line );
+
+	// send the body
+	write ( s, body.c_str() );
+
+	TRACE ( "\n" );
+	TRACE ( "send complete, reading reply\n" );
+
+	// receive the response
+	std::string response;
+	readResponse ( s, response );
+	::close ( s );
+
+	// parse the header and body: split header and body on first occurrence of \r\n\r\n
+	result->statusCode = CAMBRIA_BAD_RESPONSE;
+
+	size_t headerBreak = response.find ( "\r\n\r\n" );
+    if ( headerBreak != std::string::npos )
+    {
+		std::string responseBody = response.substr ( headerBreak + 4 );
+		result->responseBody = new char [ responseBody.length() + 1 ];
+		::strcpy ( result->responseBody, responseBody.c_str() );
+
+		// all we need from the header for now is the status line
+		std::string headerPart = response.substr ( 0, headerBreak + 2 );
+		
+		size_t newline = headerPart.find ( '\r' );
+		if ( newline != std::string::npos )
+		{
+			std::string statusLine = headerPart.substr ( 0, newline );
+
+			size_t firstSpace = statusLine.find ( ' ' );
+			if ( firstSpace != std::string::npos )
+			{
+				size_t secondSpace = statusLine.find ( ' ', firstSpace + 1 );
+				if ( secondSpace != std::string::npos )
+				{
+					result->statusCode = ::atoi ( statusLine.substr ( firstSpace + 1, secondSpace - firstSpace + 1 ).c_str() );
+					std::string statusMessage = statusLine.substr ( secondSpace + 1 );
+					result->statusMessage = new char [ statusMessage.length() + 1 ];
+					::strcpy ( result->statusMessage, statusMessage.c_str() );
+				}
+			}
+		}
+	}
+	return result;
+}
+
+void cambriaClient::write ( int socket, const char* str )
+{
+	int len = str ? ::strlen ( str ) : 0;
+	::write ( socket, str, len );
+
+	// elaborate tracing nonsense...
+	std::string trace ( "> " );
+	trace += str;
+	while ( replace ( trace, "\r\n", "\\r\\n\n> " ) );
+
+	TRACE ( "%s", trace.c_str() );
+}
+
+bool cambriaClient::buildMessageBody ( const char* streamName, const char** msgs, unsigned int count, std::string& buffer )
+{
+	if ( fFormat == CAMBRIA_NATIVE_FORMAT )
+	{
+		int snLen = ::strlen ( streamName );
+		for ( unsigned int i=0; i<count; i++ )
+		{
+			const char* msg = msgs[i];
+
+			std::ostringstream s;
+			s << snLen << '.' << ::strlen(msg) << '.' << streamName << msg;
+			buffer.append ( s.str() );
+		}
+	}
+	else if ( fFormat == CAMBRIA_JSON_FORMAT )
+	{
+		buffer.append ( "[" );
+		for ( unsigned int i=0; i<count; i++ )
+		{
+			if ( i>0 )
+			{
+				buffer.append ( "," );
+			}
+			const char* msg = msgs[i];
+			char* jsonMsg = ::makeJsonMessage ( streamName, msg );
+			buffer.append ( jsonMsg );
+			delete jsonMsg;	// FIXME: allocating memory here just to delete it
+		}
+		buffer.append ( "]" );
+	}
+	else
+	{
+		return false;
+	}
+	return true;
+}
+
+// read the next string into value, and return the end pos, or 0 on error
+static int readNextJsonString ( const std::string& body, int startPos, std::string& value )
+{
+	value = "";
+
+	if ( startPos >= body.length () )
+	{
+		return 0;
+	}
+
+	// skip a comma
+	int current = startPos;
+	if ( body[current] == ',' ) current++;
+
+	if ( current >= body.length() || body[current] != '"' )
+	{
+		return 0;
+	}
+	current++;
+
+	// walk the string for the closing quote (FIXME: unicode support)
+	bool esc = false;
+	int hex = 0;
+	while ( ( body[current] != '"' || esc ) && current < body.length() )
+	{
+		if ( hex > 0 )
+		{
+			hex--;
+			if ( hex == 0 )
+			{
+				// presumably read a unicode escape. this code isn't
+				// equipped for multibyte or unicode, so just skip it
+				value += '?';
+			}
+		}
+		else if ( esc )
+		{
+			esc = false;
+			switch ( body[current] )
+			{
+				case '"':
+				case '\\':
+				case '/':
+					value += body[current];
+					break;
+
+				case 'b': value += '\b'; break;
+				case 'f': value += '\f'; break;
+				case 'n': value += '\n'; break;
+				case 'r': value += '\r'; break;
+				case 't': value += '\t'; break;
+
+				case 'u': hex=4; break;
+			}
+		}
+		else
+		{
+			esc = body[current] == '\\';
+			if ( !esc ) value += body[current];
+		}
+		current++;
+	}
+
+	return current + 1;
+}
+
+static void readGetBody ( std::string& body, cambriaGetResponse& response )
+{
+	TRACE("response %s\n", body.c_str() );	
+	
+	if ( body.length() < 2 || body[0] != '[' || body[body.length()-1] != ']' )
+	{
+		response.statusCode = CAMBRIA_BAD_RESPONSE;
+	}
+
+	std::list<char*> msgs;
+	std::string val;
+	int current = 1;
+	while ( ( current = readNextJsonString ( body, current, val ) ) > 0 )
+	{
+		char* msg = new char [ val.length() + 1 ];
+		::strcpy ( msg, val.c_str() );
+		msgs.push_back ( msg );
+	}
+	
+	// now build a response
+	response.messageCount = msgs.size();
+	response.messageSet = new char* [ msgs.size() ];
+	int index = 0;
+	for ( std::list<char*>::iterator it = msgs.begin(); it != msgs.end(); it++ )
+	{
+		response.messageSet [ index++ ] = *it;
+	}
+}
+
+cambriaGetResponse* cambriaClient::get ( int timeoutMs, int limit )
+{
+	cambriaGetResponse* result = new cambriaGetResponse ();
+	result->statusCode = 0;
+	result->statusMessage = NULL;
+	result->messageCount = 0;
+	result->messageSet = new char* [ 1 ];
+
+	int s = -1;
+	int err = ::openSocket ( fHost, fPort, s );
+	if ( err > 0 )
+	{
+		result->statusCode = err;
+		return result;
+	}
+	
+	// construct path
+	std::string path = "/cambriaApiServer/v1/event/";
+	path += fTopic;
+
+	bool haveAdds = false;
+	std::ostringstream adds;
+	if ( timeoutMs > -1 )
+	{
+		adds << "timeout=" << timeoutMs;
+		haveAdds = true;
+	}
+	if ( limit > -1 )
+	{
+		if ( haveAdds )
+		{
+			adds << "&";
+		}
+		adds << "limit=" << limit;
+		haveAdds = true;
+	}
+	if ( haveAdds )
+	{
+		path += "?";
+		path += adds.str();
+	}
+
+	// send post prefix
+	char line[4096];
+	::sprintf ( line,
+		"GET %s HTTP/1.0\r\n"
+		"Host: %s\r\n"
+		"\r\n",
+		path.c_str(), fHost.c_str() );
+	write ( s, line );
+
+	TRACE ( "\n" );
+	TRACE ( "request sent; reading reply\n" );
+
+	// receive the response (FIXME: would be nice to stream rather than load it all)
+	std::string response;
+	readResponse ( s, response );
+	::close ( s );
+
+	// parse the header and body: split header and body on first occurrence of \r\n\r\n
+	result->statusCode = CAMBRIA_BAD_RESPONSE;
+
+	size_t headerBreak = response.find ( "\r\n\r\n" );
+    if ( headerBreak != std::string::npos )
+    {
+		// get the header line
+		std::string headerPart = response.substr ( 0, headerBreak + 2 );
+
+		size_t newline = headerPart.find ( '\r' );
+		if ( newline != std::string::npos )
+		{
+			std::string statusLine = headerPart.substr ( 0, newline );
+
+			size_t firstSpace = statusLine.find ( ' ' );
+			if ( firstSpace != std::string::npos )
+			{
+				size_t secondSpace = statusLine.find ( ' ', firstSpace + 1 );
+				if ( secondSpace != std::string::npos )
+				{
+					result->statusCode = ::atoi ( statusLine.substr ( firstSpace + 1, secondSpace - firstSpace + 1 ).c_str() );
+					std::string statusMessage = statusLine.substr ( secondSpace + 1 );
+					result->statusMessage = new char [ statusMessage.length() + 1 ];
+					::strcpy ( result->statusMessage, statusMessage.c_str() );
+				}
+			}
+		}
+
+		if ( result->statusCode < 300 )
+		{
+			std::string responseBody = response.substr ( headerBreak + 4 );
+			readGetBody ( responseBody, *result );
+		}
+	}
+	return result;
+}
+
+
+///////////////////////////////////////////////////////////////////////////////
+///////////////////////////////////////////////////////////////////////////////
+///////////////////////////////////////////////////////////////////////////////
+
+CAMBRIA_CLIENT cambriaCreateClient ( const char* host, int port, const char* topic, const char* format )
+{
+	cambriaClient* cc = new cambriaClient ( host, port, topic, format );
+	return toOpaque(cc);
+}
+
+void cambriaDestroyClient ( CAMBRIA_CLIENT client )
+{
+	delete fromOpaque ( client );
+}
+
+cambriaSendResponse* cambriaSendMessage ( CAMBRIA_CLIENT client, const char* streamName, const char* message )
+{
+	cambriaClient* c = fromOpaque ( client );
+	return c->send ( streamName, message );
+}
+
+cambriaSendResponse* cambriaSendMessages ( CAMBRIA_CLIENT client, const char* streamName, const char** messages, unsigned int count )
+{
+	cambriaClient* c = fromOpaque ( client );
+	return c->send ( streamName, messages, count );
+}
+
+cambriaGetResponse* cambriaGetMessages ( CAMBRIA_CLIENT client, unsigned long timeoutMs, unsigned int limit )
+{
+	cambriaClient* c = fromOpaque ( client );
+	return c->get ( timeoutMs, limit );
+}
+
+void cambriaDestroySendResponse ( CAMBRIA_CLIENT client, const cambriaSendResponse* response )
+{
+	if ( response )
+	{
+		delete response->statusMessage;
+		delete response->responseBody;
+		delete response;
+	}
+}
+
+void cambriaDestroyGetResponse ( CAMBRIA_CLIENT client, const cambriaGetResponse* response )
+{
+	if ( response )
+	{
+		delete response->statusMessage;
+		for ( int i=0; i<response->messageCount; i++ )
+		{
+			delete response->messageSet[i];
+		}
+		delete response;
+	}
+}
+
+int cambriaSimpleSend ( const char* host, int port, const char* topic, const char* streamName, const char* msg )
+{
+	return cambriaSimpleSendMultiple ( host, port, topic, streamName, &msg, 1 );
+}
+
+int cambriaSimpleSendMultiple ( const char* host, int port, const char* topic, const char* streamName, const char** messages, unsigned int msgCount )
+{
+	int count = 0;
+
+	const CAMBRIA_CLIENT cc = ::cambriaCreateClient ( host, port, topic, CAMBRIA_NATIVE_FORMAT );
+	if ( cc )
+	{
+		const cambriaSendResponse* response = ::cambriaSendMessages ( cc, streamName, messages, msgCount );
+		if ( response && response->statusCode < 300 )
+		{
+			count = msgCount;
+		}
+		::cambriaDestroySendResponse ( cc, response );
+		::cambriaDestroyClient ( cc );
+	}
+
+	return count;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+////////////////////////////////////////////////////////////////////////////////
+////////////////////////////////////////////////////////////////////////////////
+
+const unsigned int kMaxTraceBuffer = 2048;
+
+static void writeTraceString ( const char* msg )
+{
+	::fprintf ( stdout, "%s", msg );
+	::fflush ( stdout );	// because we want output before core dumping :-)
+}
+
+void traceOutput ( const char* format, ... )
+{
+	char buffer [ kMaxTraceBuffer ];
+	::memset ( buffer, '\0', kMaxTraceBuffer * sizeof ( char ) );
+
+	va_list list;
+	va_start ( list, format );
+	::vsprintf ( buffer, format, list );
+	writeTraceString ( buffer );
+	va_end ( list );
+}
diff --git a/src/main/cpp/cambria.h b/src/main/cpp/cambria.h
new file mode 100644
index 0000000..68e9ca9
--- /dev/null
+++ b/src/main/cpp/cambria.h
@@ -0,0 +1,114 @@
+/*******************************************************************************
+ *  ============LICENSE_START=======================================================
+ *  org.onap.dmaap
+ *  ================================================================================
+ *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *  ============LICENSE_END=========================================================
+ *
+ *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *  
+ *******************************************************************************/
+
+#ifndef _CABMRIA_H_
+#define _CABMRIA_H_
+
+/*
+ * This is a client library for the AT&T Cambria Event Routing Service.
+ * 
+ * Cambria clients post string messages to the broker on a topic, optionally
+ * with a partition name.
+ */
+
+/* An opaque type for the client instance. */
+typedef void* CAMBRIA_CLIENT;
+
+/* Cambria has two formats. CAMBRIA_NATIVE_FORMAT is preferred. */
+#define CAMBRIA_NATIVE_FORMAT "application/cambria"
+#define CAMBRIA_JSON_FORMAT "application/json"
+
+/* pseudo-HTTP client-side status codes */
+#define CAMBRIA_NO_HOST 470
+#define CAMBRIA_CANT_CONNECT 471
+#define CAMBRIA_UNRECOGNIZED_FORMAT 472
+#define CAMBRIA_BAD_RESPONSE 570
+
+/*
+ * Send response structure. Be sure to call cambriaDestroySendResponse() after receiving this.
+ */
+struct cambriaSendResponse
+{
+	int statusCode;
+	char* statusMessage;
+	char* responseBody;
+};
+
+/*
+ * Get response structure. Be sure to call cambriaDestroyGetResponse() after receiving this.
+ */
+struct cambriaGetResponse
+{
+	int statusCode;
+	char* statusMessage;
+
+	int messageCount;
+	char** messageSet;
+};
+
+/*
+ *	Send a message in a single call. Returns the number sent (1 or 0).
+ */ 
+extern "C" int cambriaSimpleSend ( const char* host, int port, const char* topic, const char* streamName, const char* msg );
+
+/*
+ *	Send multiple messages in a single call. Returns the number sent.
+ */ 
+extern "C" int cambriaSimpleSendMultiple ( const char* host, int port, const char* topic, const char* streamName, const char** messages, unsigned int msgCount );
+
+/*
+ *	Create a client instance to post messages to the given host:port, topic, and
+ *	either the CAMBRIA_NATIVE_FORMAT or CAMBRIA_JSON_FORMAT.
+ */
+extern "C" CAMBRIA_CLIENT cambriaCreateClient ( const char* host, int port, const char* topic, const char* format );
+
+/*
+ *	Cleanup a client instance.
+ */ 
+extern "C" void cambriaDestroyClient ( CAMBRIA_CLIENT client );
+
+/*
+ *	Send a single message to the broker using the stream name provided. (If null, no stream name is used.)
+ */
+extern "C" cambriaSendResponse* cambriaSendMessage ( CAMBRIA_CLIENT client, const char* streamName, const char* message );
+
+/*
+ *	Send a batch of messages to the broker using the stream name provided. (If null, no stream name is used.)
+ */
+extern "C" cambriaSendResponse* cambriaSendMessages ( CAMBRIA_CLIENT client, const char* streamName, const char** messages, unsigned int count );
+
+/*
+ *	Retrieve messages from the broker. If a timeout value is 0 (or lower), the broker returns a response
+ * 	immediately. Otherwise, the server holds the connection open up to the given timeout. Likewise, if limit
+ * 	is 0 (or lower), the server sends as many messages as it cares to. Otherwise, at most 'limit' messages are
+ * 	returned.
+ */
+extern "C" cambriaGetResponse* cambriaGetMessages ( CAMBRIA_CLIENT client, unsigned long timeoutMs, unsigned int limit );
+
+/*
+ *	After processing a response, pass it back to the library for cleanup. 
+ */
+extern "C" void cambriaDestroySendResponse ( CAMBRIA_CLIENT client, const cambriaSendResponse* response );
+
+extern "C" void cambriaDestroyGetResponse ( CAMBRIA_CLIENT client, const cambriaGetResponse* response );
+
+#endif
diff --git a/src/main/cpp/loopingPostClient.cpp b/src/main/cpp/loopingPostClient.cpp
new file mode 100644
index 0000000..1396eea
--- /dev/null
+++ b/src/main/cpp/loopingPostClient.cpp
@@ -0,0 +1,92 @@
+/*******************************************************************************
+ *  ============LICENSE_START=======================================================
+ *  org.onap.dmaap
+ *  ================================================================================
+ *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *  ============LICENSE_END=========================================================
+ *
+ *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *  
+ *******************************************************************************/
+
+#include <stdio.h>
+#include <ctime>
+#include <string.h>
+#include "cambria.h"
+
+const char* kAlarm =
+	"<EVENT>"
+        "<AGENT_ADDR>12.123.70.213</AGENT_ADDR>"
+        "<AGENT_RESOLVED>ptdor306me1.els-an.att.net</AGENT_RESOLVED>"
+        "<TIME_RECEIVED>1364716208</TIME_RECEIVED>"
+        "  <PROTOCOL_VERSION>V1</PROTOCOL_VERSION>"
+        "  <ENTERPRISE_LEN>9</ENTERPRISE_LEN>"
+        "  <ENTERPRISE>.1.3.6.1.4.1.9.9.187</ENTERPRISE>"
+        "  <GENERIC>6</GENERIC>"
+        "  <SPECIFIC>2</SPECIFIC>"
+        "  <COMMAND>167</COMMAND>"
+        "  <REQUEST_ID>0</REQUEST_ID>"
+        "  <ERROR_STATUS>0</ERROR_STATUS>"
+        "  <ERROR_INDEX>0</ERROR_INDEX>"
+        "  <AGENT_TIME_UP>1554393204</AGENT_TIME_UP>"
+        "  <COMMUNITY_LEN>10</COMMUNITY_LEN>"
+        "  <COMMUNITY>nidVeskaf0</COMMUNITY>"
+        "    <VARBIND>"
+        "      <VARBIND_OID>.1.3.6.1.2.1.15.3.1.14.32.4.52.58</VARBIND_OID>"
+        "      <VARBIND_TYPE>OCTET_STRING_HEX</VARBIND_TYPE>"
+        "      <VARBIND_VALUE>02 02 </VARBIND_VALUE>"
+        "    </VARBIND>"
+        "    <VARBIND>"
+        "      <VARBIND_OID>.1.3.6.1.2.1.15.3.1.2.32.4.52.58</VARBIND_OID>"
+        "      <VARBIND_TYPE>INTEGER</VARBIND_TYPE>"
+        "      <VARBIND_VALUE>1</VARBIND_VALUE>"
+        "    </VARBIND>"
+        "    <VARBIND>"
+        "      <VARBIND_OID>.1.3.6.1.4.1.9.9.187.1.2.1.1.7.32.4.52.58</VARBIND_OID>"
+        "      <VARBIND_TYPE>OCTET_STRING_ASCII</VARBIND_TYPE>"
+        "      <VARBIND_VALUE>peer in wrong AS</VARBIND_VALUE>"
+        "    </VARBIND>"
+        "    <VARBIND>"
+        "      <VARBIND_OID>.1.3.6.1.4.1.9.9.187.1.2.1.1.8.32.4.52.58</VARBIND_OID>"
+        "      <VARBIND_TYPE>INTEGER</VARBIND_TYPE>"
+        "      <VARBIND_VALUE>4</VARBIND_VALUE>"
+        "    </VARBIND>"
+      "</EVENT>";
+
+int main ( int argc, const char* argv[] )
+{
+	char** msgs = new char* [ 100 ];
+	for ( int i=0; i<100; i++ )
+	{
+		msgs[i] = new char [ ::strlen ( kAlarm + 1 ) ];
+		::strcpy ( msgs[i], kAlarm );
+	}
+
+	std::time_t start = std::time ( NULL );
+	for ( int i=0; i<5000; i++ )
+	{
+		::cambriaSimpleSendMultiple ( "localhost", 8080, "topic", "streamName", (const char**)msgs, 100 );
+		if ( i % 50 == 0 )
+		{
+			std::time_t end = std::time ( NULL );
+			double seconds = difftime ( end, start );
+			::printf ( "%.f seconds for %u posts.\n", seconds, i*100 );
+		}
+	}
+	std::time_t end = std::time ( NULL );
+	double seconds = difftime ( end, start );
+	::printf ( "%.f seconds for 1,000,000 posts.\n", seconds );
+
+	return 0;
+}
diff --git a/src/main/cpp/make.sh b/src/main/cpp/make.sh
new file mode 100644
index 0000000..a9b2726
--- /dev/null
+++ b/src/main/cpp/make.sh
@@ -0,0 +1,34 @@
+#*******************************************************************************
+#  ============LICENSE_START=======================================================
+#  org.onap.dmaap
+#  ================================================================================
+#  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+#  ================================================================================
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#        http://www.apache.org/licenses/LICENSE-2.0
+#  
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#  ============LICENSE_END=========================================================
+#
+#  ECOMP is a trademark and service mark of AT&T Intellectual Property.
+#  
+#*******************************************************************************
+
+rm -rf *.o
+rm -rf cambriaSamplePost
+rm -rf cambriaSampleFetch
+rm -rf loopPost
+
+#-DCAMBRIA_TRACING
+
+g++ cambria.cpp samplePostClient.cpp -o cambriaSamplePost
+g++ cambria.cpp sampleGetClient.cpp -o cambriaSampleFetch 
+
+g++ cambria.cpp loopingPostClient.cpp -o loopPost
+
diff --git a/src/main/cpp/sampleGetClient.cpp b/src/main/cpp/sampleGetClient.cpp
new file mode 100644
index 0000000..610988c
--- /dev/null
+++ b/src/main/cpp/sampleGetClient.cpp
@@ -0,0 +1,61 @@
+/*******************************************************************************
+ *  ============LICENSE_START=======================================================
+ *  org.onap.dmaap
+ *  ================================================================================
+ *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *  ============LICENSE_END=========================================================
+ *
+ *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *  
+ *******************************************************************************/
+
+#include <stdio.h>
+#include "cambria.h"
+
+int main ( int argc, const char* argv[] )
+{
+	const CAMBRIA_CLIENT cc = ::cambriaCreateClient ( "localhost", 8080, "topic", CAMBRIA_NATIVE_FORMAT );
+	if ( !cc )
+	{
+		::printf ( "Couldn't create client.\n" );
+		return 1;
+	}
+
+	int count = 0;
+	while ( 1 )
+	{
+		cambriaGetResponse* response = ::cambriaGetMessages ( cc, 5000, 1024*1024 );
+		if ( response && response->statusCode < 300 )
+		{
+			for ( int i=0; i<response->messageCount; i++ )
+			{
+				const char* msg = response->messageSet [ i ];
+				::printf ( "%d: %s\n", count++,	 msg );
+			}
+			::cambriaDestroyGetResponse ( cc, response );
+		}
+		else if ( response )
+		{
+			::fprintf ( stderr, "%d %s", response->statusCode, response->statusMessage );
+		}
+		else
+		{
+			::fprintf ( stderr, "No response object.\n" );
+		}
+	}
+
+	::cambriaDestroyClient ( cc );
+
+	return 0;
+}
diff --git a/src/main/cpp/samplePostClient.cpp b/src/main/cpp/samplePostClient.cpp
new file mode 100644
index 0000000..a4b2207
--- /dev/null
+++ b/src/main/cpp/samplePostClient.cpp
@@ -0,0 +1,112 @@
+/*******************************************************************************
+ *  ============LICENSE_START=======================================================
+ *  org.onap.dmaap
+ *  ================================================================================
+ *  Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ *  ================================================================================
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *  ============LICENSE_END=========================================================
+ *
+ *  ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *  
+ *******************************************************************************/
+
+#include <stdio.h>
+#include "cambria.h"
+
+void handleResponse ( const CAMBRIA_CLIENT cc, const cambriaSendResponse* response )
+{
+	if ( response )
+	{
+		::printf ( "\t%d %s\n", response->statusCode, ( response->statusMessage ? response->statusMessage : "" ) );
+		::printf ( "\t%s\n", response->responseBody ? response->responseBody : "" );
+
+		// destroy the response (or it'll leak)
+		::cambriaDestroySendResponse ( cc, response );
+	}
+	else
+	{
+		::fprintf ( stderr, "No response object.\n" );
+	}
+}
+
+int main ( int argc, const char* argv[] )
+{
+	////////////////////////////////////////////////////////////////////////////
+	////////////////////////////////////////////////////////////////////////////
+	////////////////////////////////////////////////////////////////////////////
+	
+	// you can send single message in one call...
+	::printf ( "Sending single message...\n" );
+	int sent = ::cambriaSimpleSend ( "localhost", 8080, "topic", "streamName",
+		"{ \"field\":\"this is a JSON formatted alarm\" }" );
+	::printf ( "\t%d sent\n\n", sent );
+
+	// you can also send multiple messages in one call with cambriaSimpleSendMultiple.
+	// the message argument becomes an array of strings, and you pass an array
+	// count too.
+	const char* msgs[] =
+	{
+		"{\"format\":\"json\"}",
+		"<format>xml</format>",
+		"or whatever. they're just strings."
+	};
+	sent = ::cambriaSimpleSendMultiple ( "localhost", 8080, "topic", "streamName", msgs, 3 );
+	::printf ( "\t%d sent\n\n", sent );
+
+	////////////////////////////////////////////////////////////////////////////
+	////////////////////////////////////////////////////////////////////////////
+	////////////////////////////////////////////////////////////////////////////
+
+	// you can also create a client instance to keep around and make multiple
+	// send requests to. Chunked sending isn't supported right now, so each
+	// call to cambriaSendMessage results in a full socket open / post / close
+	// cycle, but hopefully we can improve this with chunking so that subsequent
+	// sends just push the message into the socket.
+
+	// create a client
+	const CAMBRIA_CLIENT cc = ::cambriaCreateClient ( "localhost", 8080, "topic", CAMBRIA_NATIVE_FORMAT );
+	if ( !cc )
+	{
+		::printf ( "Couldn't create client.\n" );
+		return 1;
+	}
+
+	////////////////////////////////////////////////////////////////////////////
+	// send a single message
+	::printf ( "Sending single message...\n" );
+	const cambriaSendResponse* response = ::cambriaSendMessage ( cc, "streamName", "{\"foo\":\"bar\"}" );
+	handleResponse ( cc, response );
+
+	////////////////////////////////////////////////////////////////////////////
+	// send a few messages at once
+	const char* msgs2[] =
+	{
+		"{\"foo\":\"bar\"}",
+		"{\"bar\":\"baz\"}",
+		"{\"zoo\":\"zee\"}",
+		"{\"foo\":\"bar\"}",
+		"{\"foo\":\"bar\"}",
+		"{\"foo\":\"bar\"}",
+	};
+	unsigned int count = sizeof(msgs2)/sizeof(const char*);
+
+	::printf ( "Sending %d messages...\n", count );
+	response = ::cambriaSendMessages ( cc, "streamName", msgs2, count );
+	handleResponse ( cc, response );
+
+	////////////////////////////////////////////////////////////////////////////
+	// destroy the client (or it'll leak)
+	::cambriaDestroyClient ( cc );
+
+	return 0;
+}