Add Initial Code Import
Intial Code import for dmaapClient
Issue-id: DMAAP-82
Change-Id: Ib627672d37e233b796619f93dd91f5caaf1592e4
Signed-off-by: Varun Gudisena <vg411h@att.com>
diff --git a/src/main/bash/cambriaDel.sh b/src/main/bash/cambriaDel.sh
new file mode 100644
index 0000000..b5e71b4
--- /dev/null
+++ b/src/main/bash/cambriaDel.sh
@@ -0,0 +1,71 @@
+#!/bin/bash
+#*******************************************************************************
+# ============LICENSE_START=======================================================
+# org.onap.dmaap
+# ================================================================================
+# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+#
+#*******************************************************************************
+
+# format://
+# cambriaDel.sh <apiPath>
+
+if [ $# -gt 2 ]; then
+ echo "usage: cambriaDel.sh <apiPath>"
+ exit
+fi
+if [ $# -lt 1 ]; then
+ echo "usage: cambriaDel.sh <apiPath>"
+ exit
+fi
+
+API=$1
+
+# the date needs to be in one of the formats cambria accepts
+case "$(uname -s)" in
+
+ Darwin)
+ # "EEE MMM dd HH:mm:ss z yyyy"
+ DATE=`date`
+ ;;
+
+ Linux)
+ # "EEE MMM dd HH:mm:ss z yyyy"
+ DATE=`date`
+ ;;
+
+ CYGWIN*|MINGW32*|MSYS*)
+ DATE=`date --rfc-2822`
+ ;;
+
+ *)
+ DATE=`date`
+ ;;
+esac
+
+
+URI="http://$CAMBRIA_SERVER/$API"
+
+if [ -z "$CAMBRIA_APIKEY" ]; then
+ echo "no auth"
+ curl -i -X GET $AUTHPART $URI
+else
+ echo "auth in use"
+ SIGNATURE=`echo -n "$DATE" | openssl sha1 -hmac $CAMBRIA_APISECRET -binary | openssl base64`
+ curl -i -X DELETE -H "X-CambriaAuth: $CAMBRIA_APIKEY:$SIGNATURE" -H "X-CambriaDate: $DATE" $URI
+fi
+
diff --git a/src/main/bash/cambriaGet.sh b/src/main/bash/cambriaGet.sh
new file mode 100644
index 0000000..2e9c789
--- /dev/null
+++ b/src/main/bash/cambriaGet.sh
@@ -0,0 +1,71 @@
+#!/bin/bash
+#*******************************************************************************
+# ============LICENSE_START=======================================================
+# org.onap.dmaap
+# ================================================================================
+# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+#
+#*******************************************************************************
+
+# format://
+# cambriaGet.sh <apiPath>
+
+if [ $# -gt 2 ]; then
+ echo "usage: cambriaGet.sh <apiPath>"
+ exit
+fi
+if [ $# -lt 1 ]; then
+ echo "usage: cambriaGet.sh <apiPath>"
+ exit
+fi
+
+API=$1
+
+# the date needs to be in one of the formats cambria accepts
+case "$(uname -s)" in
+
+ Darwin)
+ # "EEE MMM dd HH:mm:ss z yyyy"
+ DATE=`date`
+ ;;
+
+ Linux)
+ # "EEE MMM dd HH:mm:ss z yyyy"
+ DATE=`date`
+ ;;
+
+ CYGWIN*|MINGW32*|MSYS*)
+ DATE=`date --rfc-2822`
+ ;;
+
+ *)
+ DATE=`date`
+ ;;
+esac
+
+
+URI="http://$CAMBRIA_SERVER/$API"
+
+if [ -z "$CAMBRIA_APIKEY" ]; then
+ echo "no auth"
+ curl -i -X GET $AUTHPART $URI
+else
+ echo "auth in use"
+ SIGNATURE=`echo -n "$DATE" | openssl sha1 -hmac $CAMBRIA_APISECRET -binary | openssl base64`
+ curl -i -X GET -H "X-CambriaAuth: $CAMBRIA_APIKEY:$SIGNATURE" -H "X-CambriaDate: $DATE" $URI
+fi
+
diff --git a/src/main/bash/cambriaPost.sh b/src/main/bash/cambriaPost.sh
new file mode 100644
index 0000000..7bd2911
--- /dev/null
+++ b/src/main/bash/cambriaPost.sh
@@ -0,0 +1,75 @@
+#!/bin/bash
+#*******************************************************************************
+# ============LICENSE_START=======================================================
+# org.onap.dmaap
+# ================================================================================
+# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+#
+#*******************************************************************************
+
+# format:
+# cambriaPut.sh <apiPath> [<file>]
+
+if [ $# -gt 2 ]; then
+ echo "usage: cambriaPost.sh <apiPath> [<file>]"
+ exit
+fi
+if [ $# -lt 1 ]; then
+ echo "usage: cambriaPost.sh <apiPath> [<file>]"
+ exit
+fi
+
+API=$1
+FILE=
+if [ $# -gt 1 ]; then
+ FILE="-d @$2"
+fi
+
+# the date needs to be in one of the formats cambria accepts
+case "$(uname -s)" in
+
+ Darwin)
+ # "EEE MMM dd HH:mm:ss z yyyy"
+ DATE=`date`
+ ;;
+
+ Linux)
+ # "EEE MMM dd HH:mm:ss z yyyy"
+ DATE=`date`
+ ;;
+
+ CYGWIN*|MINGW32*|MSYS*)
+ DATE=`date --rfc-2822`
+ ;;
+
+ *)
+ DATE=`date`
+ ;;
+esac
+
+
+URI="http://$CAMBRIA_SERVER/$API"
+
+if [ -z "$CAMBRIA_APIKEY" ]; then
+ echo "no auth"
+ curl -i -X POST $FILE -H "Content-Type: application/json" $AUTHPART $URI
+else
+ echo "auth in use"
+ SIGNATURE=`echo -n "$DATE" | openssl sha1 -hmac $CAMBRIA_APISECRET -binary | openssl base64`
+ curl -i -X POST $FILE -H "Content-Type: application/json" -H "X-CambriaAuth: $CAMBRIA_APIKEY:$SIGNATURE" -H "X-CambriaDate: $DATE" $URI
+fi
+
diff --git a/src/main/bash/cambriaPut.sh b/src/main/bash/cambriaPut.sh
new file mode 100644
index 0000000..36ee63f
--- /dev/null
+++ b/src/main/bash/cambriaPut.sh
@@ -0,0 +1,75 @@
+#!/bin/bash
+#*******************************************************************************
+# ============LICENSE_START=======================================================
+# org.onap.dmaap
+# ================================================================================
+# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+#
+#*******************************************************************************
+
+# format:
+# cambriaPut.sh <apiPath> [<file>]
+
+if [ $# -gt 2 ]; then
+ echo "usage: cambriaPut.sh <apiPath> [<file>]"
+ exit
+fi
+if [ $# -lt 1 ]; then
+ echo "usage: cambriaPut.sh <apiPath> [<file>]"
+ exit
+fi
+
+API=$1
+FILE=
+if [ $# -gt 1 ]; then
+ FILE="-d @$2"
+fi
+
+# the date needs to be in one of the formats cambria accepts
+case "$(uname -s)" in
+
+ Darwin)
+ # "EEE MMM dd HH:mm:ss z yyyy"
+ DATE=`date`
+ ;;
+
+ Linux)
+ # "EEE MMM dd HH:mm:ss z yyyy"
+ DATE=`date`
+ ;;
+
+ CYGWIN*|MINGW32*|MSYS*)
+ DATE=`date --rfc-2822`
+ ;;
+
+ *)
+ DATE=`date`
+ ;;
+esac
+
+
+URI="http://$CAMBRIA_SERVER/$API"
+
+if [ -z "$CAMBRIA_APIKEY" ]; then
+ echo "no auth"
+ curl -i -X PUT $FILE -H "Content-Type: application/json" $AUTHPART $URI
+else
+ echo "auth in use"
+ SIGNATURE=`echo -n "$DATE" | openssl sha1 -hmac $CAMBRIA_APISECRET -binary | openssl base64`
+ curl -i -X PUT $FILE -H "Content-Type: application/json" -H "X-CambriaAuth: $CAMBRIA_APIKEY:$SIGNATURE" -H "X-CambriaDate: $DATE" $URI
+fi
+
diff --git a/src/main/cpp/cambria.cpp b/src/main/cpp/cambria.cpp
new file mode 100644
index 0000000..7aff421
--- /dev/null
+++ b/src/main/cpp/cambria.cpp
@@ -0,0 +1,624 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+
+/*
+ * This is a client library for the AT&T Cambria Event Routing Service.
+ */
+
+#include "cambria.h"
+
+#include <arpa/inet.h>
+#include <sys/socket.h>
+#include <netdb.h>
+#include <unistd.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <stdarg.h>
+#include <string.h>
+
+#include <string>
+#include <list>
+#include <sstream>
+#include <iomanip>
+#include <algorithm>
+
+// field used in JSON encoding to signal stream name
+const char* kPartition = "cambria.partition";
+
+// map from opaque handle to object pointer
+#define toOpaque(x) ((CAMBRIA_CLIENT)x)
+#define fromOpaque(x) ((cambriaClient*)x)
+
+// trace support
+extern void traceOutput ( const char* format, ... );
+#ifdef CAMBRIA_TRACING
+ #define TRACE traceOutput
+#else
+ #define TRACE 1 ? (void) 0 : traceOutput
+#endif
+
+/*
+ * internal cambria client class
+ */
+class cambriaClient
+{
+public:
+ cambriaClient ( const std::string& host, int port, const std::string& topic, const std::string& format );
+ ~cambriaClient ();
+
+ cambriaSendResponse* send ( const char* streamName, const char* message );
+ cambriaSendResponse* send ( const char* streamName, const char** message, unsigned int count );
+
+ cambriaGetResponse* get ( int timeoutMs, int limit );
+
+private:
+
+ std::string fHost;
+ int fPort;
+ std::string fTopic;
+ std::string fFormat;
+
+ bool buildMessageBody ( const char* streamName, const char** msgs, unsigned int count, std::string& buffer );
+
+ void write ( int socket, const char* line );
+};
+
+cambriaClient::cambriaClient ( const std::string& host, int port, const std::string& topic, const std::string& format ) :
+ fHost ( host ),
+ fPort ( port ),
+ fTopic ( topic ),
+ fFormat ( format )
+{
+}
+
+cambriaClient::~cambriaClient ()
+{
+}
+
+/*
+ * This isn't quite right -- if the message already has cambria.partition,
+ * it'll wind up with two entries. Also, message MUST start with '{' and
+ * have at least one field.
+ */
+static char* makeJsonMessage ( const char* streamName, const char* message )
+{
+ int len = ::strlen ( message );
+ if ( streamName )
+ {
+ len += ::strlen ( kPartition );
+ len += ::strlen ( streamName );
+ len += 6; // quote each and a colon and comma
+ }
+
+ char* msg = new char [ len + 1 ];
+ ::strcpy ( msg, "{" );
+ if ( streamName )
+ {
+ ::strcat ( msg, "\"" );
+ ::strcat ( msg, kPartition );
+ ::strcat ( msg, "\":\"" );
+ ::strcat ( msg, streamName );
+ ::strcat ( msg, "\"," );
+ }
+ ::strcat ( msg, message + 1 );
+ return msg;
+}
+
+cambriaSendResponse* cambriaClient::send ( const char* streamName, const char* message )
+{
+ return send ( streamName, &message, 1 );
+}
+
+static bool replace ( std::string& str, const std::string& from, const std::string& to )
+{
+ size_t start_pos = str.find ( from );
+ if(start_pos == std::string::npos)
+ return false;
+ str.replace(start_pos, from.length(), to);
+ return true;
+}
+
+static void readResponse ( int s, std::string& response )
+{
+ char buffer [ 4096 ];
+
+ ssize_t n = 0;
+ while ( ( n = ::read ( s, buffer, 4095 ) ) > 0 )
+ {
+ buffer[n] = '\0';
+ response += buffer;
+ }
+}
+
+static int openSocket ( std::string& host, int port, int& ss )
+{
+ TRACE( "connecting to %s\n", host.c_str() );
+
+ struct hostent *he = ::gethostbyname ( host.c_str() );
+ if ( !he )
+ {
+ TRACE("no host entry\n");
+ return CAMBRIA_NO_HOST;
+ }
+
+ if ( he->h_addrtype != AF_INET )
+ {
+ TRACE("not AF_INET\n");
+ return CAMBRIA_NO_HOST;
+ }
+
+ int s = ::socket ( AF_INET, SOCK_STREAM, 0 );
+ if ( s == -1 )
+ {
+ TRACE("no socket available\n");
+ return CAMBRIA_CANT_CONNECT;
+ }
+
+ struct sockaddr_in servaddr;
+ ::memset ( &servaddr, 0, sizeof(servaddr) );
+
+ ::memcpy ( &servaddr.sin_addr, he->h_addr_list[0], he->h_length );
+ servaddr.sin_family = AF_INET;
+ servaddr.sin_port = ::htons ( port );
+
+ if ( ::connect ( s, (struct sockaddr *)&servaddr, sizeof(servaddr) ) )
+ {
+ TRACE("couldn't connect\n");
+ return CAMBRIA_CANT_CONNECT;
+ }
+
+ ss = s;
+ return 0;
+}
+
+cambriaSendResponse* cambriaClient::send ( const char* streamName, const char** msgs, unsigned int count )
+{
+ TRACE ( "Sending %d messages.", count );
+
+ cambriaSendResponse* result = new cambriaSendResponse ();
+ result->statusCode = 0;
+ result->statusMessage = NULL;
+ result->responseBody = NULL;
+
+ TRACE( "building message body\n" );
+
+ std::string body;
+ if ( !buildMessageBody ( streamName, msgs, count, body ) )
+ {
+ result->statusCode = CAMBRIA_UNRECOGNIZED_FORMAT;
+ return result;
+ }
+
+ int s = -1;
+ int err = ::openSocket ( fHost, fPort, s );
+ if ( err > 0 )
+ {
+ result->statusCode = err;
+ return result;
+ }
+
+ // construct path
+ std::string path = "/cambriaApiServer/v1/event/";
+ path += fTopic;
+
+ // send post prefix
+ char line[4096];
+ ::sprintf ( line,
+ "POST %s HTTP/1.0\r\n"
+ "Host: %s\r\n"
+ "Content-Type: %s\r\n"
+ "Content-Length: %d\r\n"
+ "\r\n",
+ path.c_str(), fHost.c_str(), fFormat.c_str(), body.length() );
+ write ( s, line );
+
+ // send the body
+ write ( s, body.c_str() );
+
+ TRACE ( "\n" );
+ TRACE ( "send complete, reading reply\n" );
+
+ // receive the response
+ std::string response;
+ readResponse ( s, response );
+ ::close ( s );
+
+ // parse the header and body: split header and body on first occurrence of \r\n\r\n
+ result->statusCode = CAMBRIA_BAD_RESPONSE;
+
+ size_t headerBreak = response.find ( "\r\n\r\n" );
+ if ( headerBreak != std::string::npos )
+ {
+ std::string responseBody = response.substr ( headerBreak + 4 );
+ result->responseBody = new char [ responseBody.length() + 1 ];
+ ::strcpy ( result->responseBody, responseBody.c_str() );
+
+ // all we need from the header for now is the status line
+ std::string headerPart = response.substr ( 0, headerBreak + 2 );
+
+ size_t newline = headerPart.find ( '\r' );
+ if ( newline != std::string::npos )
+ {
+ std::string statusLine = headerPart.substr ( 0, newline );
+
+ size_t firstSpace = statusLine.find ( ' ' );
+ if ( firstSpace != std::string::npos )
+ {
+ size_t secondSpace = statusLine.find ( ' ', firstSpace + 1 );
+ if ( secondSpace != std::string::npos )
+ {
+ result->statusCode = ::atoi ( statusLine.substr ( firstSpace + 1, secondSpace - firstSpace + 1 ).c_str() );
+ std::string statusMessage = statusLine.substr ( secondSpace + 1 );
+ result->statusMessage = new char [ statusMessage.length() + 1 ];
+ ::strcpy ( result->statusMessage, statusMessage.c_str() );
+ }
+ }
+ }
+ }
+ return result;
+}
+
+void cambriaClient::write ( int socket, const char* str )
+{
+ int len = str ? ::strlen ( str ) : 0;
+ ::write ( socket, str, len );
+
+ // elaborate tracing nonsense...
+ std::string trace ( "> " );
+ trace += str;
+ while ( replace ( trace, "\r\n", "\\r\\n\n> " ) );
+
+ TRACE ( "%s", trace.c_str() );
+}
+
+bool cambriaClient::buildMessageBody ( const char* streamName, const char** msgs, unsigned int count, std::string& buffer )
+{
+ if ( fFormat == CAMBRIA_NATIVE_FORMAT )
+ {
+ int snLen = ::strlen ( streamName );
+ for ( unsigned int i=0; i<count; i++ )
+ {
+ const char* msg = msgs[i];
+
+ std::ostringstream s;
+ s << snLen << '.' << ::strlen(msg) << '.' << streamName << msg;
+ buffer.append ( s.str() );
+ }
+ }
+ else if ( fFormat == CAMBRIA_JSON_FORMAT )
+ {
+ buffer.append ( "[" );
+ for ( unsigned int i=0; i<count; i++ )
+ {
+ if ( i>0 )
+ {
+ buffer.append ( "," );
+ }
+ const char* msg = msgs[i];
+ char* jsonMsg = ::makeJsonMessage ( streamName, msg );
+ buffer.append ( jsonMsg );
+ delete jsonMsg; // FIXME: allocating memory here just to delete it
+ }
+ buffer.append ( "]" );
+ }
+ else
+ {
+ return false;
+ }
+ return true;
+}
+
+// read the next string into value, and return the end pos, or 0 on error
+static int readNextJsonString ( const std::string& body, int startPos, std::string& value )
+{
+ value = "";
+
+ if ( startPos >= body.length () )
+ {
+ return 0;
+ }
+
+ // skip a comma
+ int current = startPos;
+ if ( body[current] == ',' ) current++;
+
+ if ( current >= body.length() || body[current] != '"' )
+ {
+ return 0;
+ }
+ current++;
+
+ // walk the string for the closing quote (FIXME: unicode support)
+ bool esc = false;
+ int hex = 0;
+ while ( ( body[current] != '"' || esc ) && current < body.length() )
+ {
+ if ( hex > 0 )
+ {
+ hex--;
+ if ( hex == 0 )
+ {
+ // presumably read a unicode escape. this code isn't
+ // equipped for multibyte or unicode, so just skip it
+ value += '?';
+ }
+ }
+ else if ( esc )
+ {
+ esc = false;
+ switch ( body[current] )
+ {
+ case '"':
+ case '\\':
+ case '/':
+ value += body[current];
+ break;
+
+ case 'b': value += '\b'; break;
+ case 'f': value += '\f'; break;
+ case 'n': value += '\n'; break;
+ case 'r': value += '\r'; break;
+ case 't': value += '\t'; break;
+
+ case 'u': hex=4; break;
+ }
+ }
+ else
+ {
+ esc = body[current] == '\\';
+ if ( !esc ) value += body[current];
+ }
+ current++;
+ }
+
+ return current + 1;
+}
+
+static void readGetBody ( std::string& body, cambriaGetResponse& response )
+{
+ TRACE("response %s\n", body.c_str() );
+
+ if ( body.length() < 2 || body[0] != '[' || body[body.length()-1] != ']' )
+ {
+ response.statusCode = CAMBRIA_BAD_RESPONSE;
+ }
+
+ std::list<char*> msgs;
+ std::string val;
+ int current = 1;
+ while ( ( current = readNextJsonString ( body, current, val ) ) > 0 )
+ {
+ char* msg = new char [ val.length() + 1 ];
+ ::strcpy ( msg, val.c_str() );
+ msgs.push_back ( msg );
+ }
+
+ // now build a response
+ response.messageCount = msgs.size();
+ response.messageSet = new char* [ msgs.size() ];
+ int index = 0;
+ for ( std::list<char*>::iterator it = msgs.begin(); it != msgs.end(); it++ )
+ {
+ response.messageSet [ index++ ] = *it;
+ }
+}
+
+cambriaGetResponse* cambriaClient::get ( int timeoutMs, int limit )
+{
+ cambriaGetResponse* result = new cambriaGetResponse ();
+ result->statusCode = 0;
+ result->statusMessage = NULL;
+ result->messageCount = 0;
+ result->messageSet = new char* [ 1 ];
+
+ int s = -1;
+ int err = ::openSocket ( fHost, fPort, s );
+ if ( err > 0 )
+ {
+ result->statusCode = err;
+ return result;
+ }
+
+ // construct path
+ std::string path = "/cambriaApiServer/v1/event/";
+ path += fTopic;
+
+ bool haveAdds = false;
+ std::ostringstream adds;
+ if ( timeoutMs > -1 )
+ {
+ adds << "timeout=" << timeoutMs;
+ haveAdds = true;
+ }
+ if ( limit > -1 )
+ {
+ if ( haveAdds )
+ {
+ adds << "&";
+ }
+ adds << "limit=" << limit;
+ haveAdds = true;
+ }
+ if ( haveAdds )
+ {
+ path += "?";
+ path += adds.str();
+ }
+
+ // send post prefix
+ char line[4096];
+ ::sprintf ( line,
+ "GET %s HTTP/1.0\r\n"
+ "Host: %s\r\n"
+ "\r\n",
+ path.c_str(), fHost.c_str() );
+ write ( s, line );
+
+ TRACE ( "\n" );
+ TRACE ( "request sent; reading reply\n" );
+
+ // receive the response (FIXME: would be nice to stream rather than load it all)
+ std::string response;
+ readResponse ( s, response );
+ ::close ( s );
+
+ // parse the header and body: split header and body on first occurrence of \r\n\r\n
+ result->statusCode = CAMBRIA_BAD_RESPONSE;
+
+ size_t headerBreak = response.find ( "\r\n\r\n" );
+ if ( headerBreak != std::string::npos )
+ {
+ // get the header line
+ std::string headerPart = response.substr ( 0, headerBreak + 2 );
+
+ size_t newline = headerPart.find ( '\r' );
+ if ( newline != std::string::npos )
+ {
+ std::string statusLine = headerPart.substr ( 0, newline );
+
+ size_t firstSpace = statusLine.find ( ' ' );
+ if ( firstSpace != std::string::npos )
+ {
+ size_t secondSpace = statusLine.find ( ' ', firstSpace + 1 );
+ if ( secondSpace != std::string::npos )
+ {
+ result->statusCode = ::atoi ( statusLine.substr ( firstSpace + 1, secondSpace - firstSpace + 1 ).c_str() );
+ std::string statusMessage = statusLine.substr ( secondSpace + 1 );
+ result->statusMessage = new char [ statusMessage.length() + 1 ];
+ ::strcpy ( result->statusMessage, statusMessage.c_str() );
+ }
+ }
+ }
+
+ if ( result->statusCode < 300 )
+ {
+ std::string responseBody = response.substr ( headerBreak + 4 );
+ readGetBody ( responseBody, *result );
+ }
+ }
+ return result;
+}
+
+
+///////////////////////////////////////////////////////////////////////////////
+///////////////////////////////////////////////////////////////////////////////
+///////////////////////////////////////////////////////////////////////////////
+
+CAMBRIA_CLIENT cambriaCreateClient ( const char* host, int port, const char* topic, const char* format )
+{
+ cambriaClient* cc = new cambriaClient ( host, port, topic, format );
+ return toOpaque(cc);
+}
+
+void cambriaDestroyClient ( CAMBRIA_CLIENT client )
+{
+ delete fromOpaque ( client );
+}
+
+cambriaSendResponse* cambriaSendMessage ( CAMBRIA_CLIENT client, const char* streamName, const char* message )
+{
+ cambriaClient* c = fromOpaque ( client );
+ return c->send ( streamName, message );
+}
+
+cambriaSendResponse* cambriaSendMessages ( CAMBRIA_CLIENT client, const char* streamName, const char** messages, unsigned int count )
+{
+ cambriaClient* c = fromOpaque ( client );
+ return c->send ( streamName, messages, count );
+}
+
+cambriaGetResponse* cambriaGetMessages ( CAMBRIA_CLIENT client, unsigned long timeoutMs, unsigned int limit )
+{
+ cambriaClient* c = fromOpaque ( client );
+ return c->get ( timeoutMs, limit );
+}
+
+void cambriaDestroySendResponse ( CAMBRIA_CLIENT client, const cambriaSendResponse* response )
+{
+ if ( response )
+ {
+ delete response->statusMessage;
+ delete response->responseBody;
+ delete response;
+ }
+}
+
+void cambriaDestroyGetResponse ( CAMBRIA_CLIENT client, const cambriaGetResponse* response )
+{
+ if ( response )
+ {
+ delete response->statusMessage;
+ for ( int i=0; i<response->messageCount; i++ )
+ {
+ delete response->messageSet[i];
+ }
+ delete response;
+ }
+}
+
+int cambriaSimpleSend ( const char* host, int port, const char* topic, const char* streamName, const char* msg )
+{
+ return cambriaSimpleSendMultiple ( host, port, topic, streamName, &msg, 1 );
+}
+
+int cambriaSimpleSendMultiple ( const char* host, int port, const char* topic, const char* streamName, const char** messages, unsigned int msgCount )
+{
+ int count = 0;
+
+ const CAMBRIA_CLIENT cc = ::cambriaCreateClient ( host, port, topic, CAMBRIA_NATIVE_FORMAT );
+ if ( cc )
+ {
+ const cambriaSendResponse* response = ::cambriaSendMessages ( cc, streamName, messages, msgCount );
+ if ( response && response->statusCode < 300 )
+ {
+ count = msgCount;
+ }
+ ::cambriaDestroySendResponse ( cc, response );
+ ::cambriaDestroyClient ( cc );
+ }
+
+ return count;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+////////////////////////////////////////////////////////////////////////////////
+////////////////////////////////////////////////////////////////////////////////
+
+const unsigned int kMaxTraceBuffer = 2048;
+
+static void writeTraceString ( const char* msg )
+{
+ ::fprintf ( stdout, "%s", msg );
+ ::fflush ( stdout ); // because we want output before core dumping :-)
+}
+
+void traceOutput ( const char* format, ... )
+{
+ char buffer [ kMaxTraceBuffer ];
+ ::memset ( buffer, '\0', kMaxTraceBuffer * sizeof ( char ) );
+
+ va_list list;
+ va_start ( list, format );
+ ::vsprintf ( buffer, format, list );
+ writeTraceString ( buffer );
+ va_end ( list );
+}
diff --git a/src/main/cpp/cambria.h b/src/main/cpp/cambria.h
new file mode 100644
index 0000000..68e9ca9
--- /dev/null
+++ b/src/main/cpp/cambria.h
@@ -0,0 +1,114 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+
+#ifndef _CABMRIA_H_
+#define _CABMRIA_H_
+
+/*
+ * This is a client library for the AT&T Cambria Event Routing Service.
+ *
+ * Cambria clients post string messages to the broker on a topic, optionally
+ * with a partition name.
+ */
+
+/* An opaque type for the client instance. */
+typedef void* CAMBRIA_CLIENT;
+
+/* Cambria has two formats. CAMBRIA_NATIVE_FORMAT is preferred. */
+#define CAMBRIA_NATIVE_FORMAT "application/cambria"
+#define CAMBRIA_JSON_FORMAT "application/json"
+
+/* pseudo-HTTP client-side status codes */
+#define CAMBRIA_NO_HOST 470
+#define CAMBRIA_CANT_CONNECT 471
+#define CAMBRIA_UNRECOGNIZED_FORMAT 472
+#define CAMBRIA_BAD_RESPONSE 570
+
+/*
+ * Send response structure. Be sure to call cambriaDestroySendResponse() after receiving this.
+ */
+struct cambriaSendResponse
+{
+ int statusCode;
+ char* statusMessage;
+ char* responseBody;
+};
+
+/*
+ * Get response structure. Be sure to call cambriaDestroyGetResponse() after receiving this.
+ */
+struct cambriaGetResponse
+{
+ int statusCode;
+ char* statusMessage;
+
+ int messageCount;
+ char** messageSet;
+};
+
+/*
+ * Send a message in a single call. Returns the number sent (1 or 0).
+ */
+extern "C" int cambriaSimpleSend ( const char* host, int port, const char* topic, const char* streamName, const char* msg );
+
+/*
+ * Send multiple messages in a single call. Returns the number sent.
+ */
+extern "C" int cambriaSimpleSendMultiple ( const char* host, int port, const char* topic, const char* streamName, const char** messages, unsigned int msgCount );
+
+/*
+ * Create a client instance to post messages to the given host:port, topic, and
+ * either the CAMBRIA_NATIVE_FORMAT or CAMBRIA_JSON_FORMAT.
+ */
+extern "C" CAMBRIA_CLIENT cambriaCreateClient ( const char* host, int port, const char* topic, const char* format );
+
+/*
+ * Cleanup a client instance.
+ */
+extern "C" void cambriaDestroyClient ( CAMBRIA_CLIENT client );
+
+/*
+ * Send a single message to the broker using the stream name provided. (If null, no stream name is used.)
+ */
+extern "C" cambriaSendResponse* cambriaSendMessage ( CAMBRIA_CLIENT client, const char* streamName, const char* message );
+
+/*
+ * Send a batch of messages to the broker using the stream name provided. (If null, no stream name is used.)
+ */
+extern "C" cambriaSendResponse* cambriaSendMessages ( CAMBRIA_CLIENT client, const char* streamName, const char** messages, unsigned int count );
+
+/*
+ * Retrieve messages from the broker. If a timeout value is 0 (or lower), the broker returns a response
+ * immediately. Otherwise, the server holds the connection open up to the given timeout. Likewise, if limit
+ * is 0 (or lower), the server sends as many messages as it cares to. Otherwise, at most 'limit' messages are
+ * returned.
+ */
+extern "C" cambriaGetResponse* cambriaGetMessages ( CAMBRIA_CLIENT client, unsigned long timeoutMs, unsigned int limit );
+
+/*
+ * After processing a response, pass it back to the library for cleanup.
+ */
+extern "C" void cambriaDestroySendResponse ( CAMBRIA_CLIENT client, const cambriaSendResponse* response );
+
+extern "C" void cambriaDestroyGetResponse ( CAMBRIA_CLIENT client, const cambriaGetResponse* response );
+
+#endif
diff --git a/src/main/cpp/loopingPostClient.cpp b/src/main/cpp/loopingPostClient.cpp
new file mode 100644
index 0000000..1396eea
--- /dev/null
+++ b/src/main/cpp/loopingPostClient.cpp
@@ -0,0 +1,92 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+
+#include <stdio.h>
+#include <ctime>
+#include <string.h>
+#include "cambria.h"
+
+const char* kAlarm =
+ "<EVENT>"
+ "<AGENT_ADDR>12.123.70.213</AGENT_ADDR>"
+ "<AGENT_RESOLVED>ptdor306me1.els-an.att.net</AGENT_RESOLVED>"
+ "<TIME_RECEIVED>1364716208</TIME_RECEIVED>"
+ " <PROTOCOL_VERSION>V1</PROTOCOL_VERSION>"
+ " <ENTERPRISE_LEN>9</ENTERPRISE_LEN>"
+ " <ENTERPRISE>.1.3.6.1.4.1.9.9.187</ENTERPRISE>"
+ " <GENERIC>6</GENERIC>"
+ " <SPECIFIC>2</SPECIFIC>"
+ " <COMMAND>167</COMMAND>"
+ " <REQUEST_ID>0</REQUEST_ID>"
+ " <ERROR_STATUS>0</ERROR_STATUS>"
+ " <ERROR_INDEX>0</ERROR_INDEX>"
+ " <AGENT_TIME_UP>1554393204</AGENT_TIME_UP>"
+ " <COMMUNITY_LEN>10</COMMUNITY_LEN>"
+ " <COMMUNITY>nidVeskaf0</COMMUNITY>"
+ " <VARBIND>"
+ " <VARBIND_OID>.1.3.6.1.2.1.15.3.1.14.32.4.52.58</VARBIND_OID>"
+ " <VARBIND_TYPE>OCTET_STRING_HEX</VARBIND_TYPE>"
+ " <VARBIND_VALUE>02 02 </VARBIND_VALUE>"
+ " </VARBIND>"
+ " <VARBIND>"
+ " <VARBIND_OID>.1.3.6.1.2.1.15.3.1.2.32.4.52.58</VARBIND_OID>"
+ " <VARBIND_TYPE>INTEGER</VARBIND_TYPE>"
+ " <VARBIND_VALUE>1</VARBIND_VALUE>"
+ " </VARBIND>"
+ " <VARBIND>"
+ " <VARBIND_OID>.1.3.6.1.4.1.9.9.187.1.2.1.1.7.32.4.52.58</VARBIND_OID>"
+ " <VARBIND_TYPE>OCTET_STRING_ASCII</VARBIND_TYPE>"
+ " <VARBIND_VALUE>peer in wrong AS</VARBIND_VALUE>"
+ " </VARBIND>"
+ " <VARBIND>"
+ " <VARBIND_OID>.1.3.6.1.4.1.9.9.187.1.2.1.1.8.32.4.52.58</VARBIND_OID>"
+ " <VARBIND_TYPE>INTEGER</VARBIND_TYPE>"
+ " <VARBIND_VALUE>4</VARBIND_VALUE>"
+ " </VARBIND>"
+ "</EVENT>";
+
+int main ( int argc, const char* argv[] )
+{
+ char** msgs = new char* [ 100 ];
+ for ( int i=0; i<100; i++ )
+ {
+ msgs[i] = new char [ ::strlen ( kAlarm + 1 ) ];
+ ::strcpy ( msgs[i], kAlarm );
+ }
+
+ std::time_t start = std::time ( NULL );
+ for ( int i=0; i<5000; i++ )
+ {
+ ::cambriaSimpleSendMultiple ( "localhost", 8080, "topic", "streamName", (const char**)msgs, 100 );
+ if ( i % 50 == 0 )
+ {
+ std::time_t end = std::time ( NULL );
+ double seconds = difftime ( end, start );
+ ::printf ( "%.f seconds for %u posts.\n", seconds, i*100 );
+ }
+ }
+ std::time_t end = std::time ( NULL );
+ double seconds = difftime ( end, start );
+ ::printf ( "%.f seconds for 1,000,000 posts.\n", seconds );
+
+ return 0;
+}
diff --git a/src/main/cpp/make.sh b/src/main/cpp/make.sh
new file mode 100644
index 0000000..a9b2726
--- /dev/null
+++ b/src/main/cpp/make.sh
@@ -0,0 +1,34 @@
+#*******************************************************************************
+# ============LICENSE_START=======================================================
+# org.onap.dmaap
+# ================================================================================
+# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+#
+#*******************************************************************************
+
+rm -rf *.o
+rm -rf cambriaSamplePost
+rm -rf cambriaSampleFetch
+rm -rf loopPost
+
+#-DCAMBRIA_TRACING
+
+g++ cambria.cpp samplePostClient.cpp -o cambriaSamplePost
+g++ cambria.cpp sampleGetClient.cpp -o cambriaSampleFetch
+
+g++ cambria.cpp loopingPostClient.cpp -o loopPost
+
diff --git a/src/main/cpp/sampleGetClient.cpp b/src/main/cpp/sampleGetClient.cpp
new file mode 100644
index 0000000..610988c
--- /dev/null
+++ b/src/main/cpp/sampleGetClient.cpp
@@ -0,0 +1,61 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+
+#include <stdio.h>
+#include "cambria.h"
+
+int main ( int argc, const char* argv[] )
+{
+ const CAMBRIA_CLIENT cc = ::cambriaCreateClient ( "localhost", 8080, "topic", CAMBRIA_NATIVE_FORMAT );
+ if ( !cc )
+ {
+ ::printf ( "Couldn't create client.\n" );
+ return 1;
+ }
+
+ int count = 0;
+ while ( 1 )
+ {
+ cambriaGetResponse* response = ::cambriaGetMessages ( cc, 5000, 1024*1024 );
+ if ( response && response->statusCode < 300 )
+ {
+ for ( int i=0; i<response->messageCount; i++ )
+ {
+ const char* msg = response->messageSet [ i ];
+ ::printf ( "%d: %s\n", count++, msg );
+ }
+ ::cambriaDestroyGetResponse ( cc, response );
+ }
+ else if ( response )
+ {
+ ::fprintf ( stderr, "%d %s", response->statusCode, response->statusMessage );
+ }
+ else
+ {
+ ::fprintf ( stderr, "No response object.\n" );
+ }
+ }
+
+ ::cambriaDestroyClient ( cc );
+
+ return 0;
+}
diff --git a/src/main/cpp/samplePostClient.cpp b/src/main/cpp/samplePostClient.cpp
new file mode 100644
index 0000000..a4b2207
--- /dev/null
+++ b/src/main/cpp/samplePostClient.cpp
@@ -0,0 +1,112 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+
+#include <stdio.h>
+#include "cambria.h"
+
+void handleResponse ( const CAMBRIA_CLIENT cc, const cambriaSendResponse* response )
+{
+ if ( response )
+ {
+ ::printf ( "\t%d %s\n", response->statusCode, ( response->statusMessage ? response->statusMessage : "" ) );
+ ::printf ( "\t%s\n", response->responseBody ? response->responseBody : "" );
+
+ // destroy the response (or it'll leak)
+ ::cambriaDestroySendResponse ( cc, response );
+ }
+ else
+ {
+ ::fprintf ( stderr, "No response object.\n" );
+ }
+}
+
+int main ( int argc, const char* argv[] )
+{
+ ////////////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////
+
+ // you can send single message in one call...
+ ::printf ( "Sending single message...\n" );
+ int sent = ::cambriaSimpleSend ( "localhost", 8080, "topic", "streamName",
+ "{ \"field\":\"this is a JSON formatted alarm\" }" );
+ ::printf ( "\t%d sent\n\n", sent );
+
+ // you can also send multiple messages in one call with cambriaSimpleSendMultiple.
+ // the message argument becomes an array of strings, and you pass an array
+ // count too.
+ const char* msgs[] =
+ {
+ "{\"format\":\"json\"}",
+ "<format>xml</format>",
+ "or whatever. they're just strings."
+ };
+ sent = ::cambriaSimpleSendMultiple ( "localhost", 8080, "topic", "streamName", msgs, 3 );
+ ::printf ( "\t%d sent\n\n", sent );
+
+ ////////////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////
+ ////////////////////////////////////////////////////////////////////////////
+
+ // you can also create a client instance to keep around and make multiple
+ // send requests to. Chunked sending isn't supported right now, so each
+ // call to cambriaSendMessage results in a full socket open / post / close
+ // cycle, but hopefully we can improve this with chunking so that subsequent
+ // sends just push the message into the socket.
+
+ // create a client
+ const CAMBRIA_CLIENT cc = ::cambriaCreateClient ( "localhost", 8080, "topic", CAMBRIA_NATIVE_FORMAT );
+ if ( !cc )
+ {
+ ::printf ( "Couldn't create client.\n" );
+ return 1;
+ }
+
+ ////////////////////////////////////////////////////////////////////////////
+ // send a single message
+ ::printf ( "Sending single message...\n" );
+ const cambriaSendResponse* response = ::cambriaSendMessage ( cc, "streamName", "{\"foo\":\"bar\"}" );
+ handleResponse ( cc, response );
+
+ ////////////////////////////////////////////////////////////////////////////
+ // send a few messages at once
+ const char* msgs2[] =
+ {
+ "{\"foo\":\"bar\"}",
+ "{\"bar\":\"baz\"}",
+ "{\"zoo\":\"zee\"}",
+ "{\"foo\":\"bar\"}",
+ "{\"foo\":\"bar\"}",
+ "{\"foo\":\"bar\"}",
+ };
+ unsigned int count = sizeof(msgs2)/sizeof(const char*);
+
+ ::printf ( "Sending %d messages...\n", count );
+ response = ::cambriaSendMessages ( cc, "streamName", msgs2, count );
+ handleResponse ( cc, response );
+
+ ////////////////////////////////////////////////////////////////////////////
+ // destroy the client (or it'll leak)
+ ::cambriaDestroyClient ( cc );
+
+ return 0;
+}
diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/HostSelector.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/HostSelector.java
new file mode 100644
index 0000000..b30c2f1
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/HostSelector.java
@@ -0,0 +1,191 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.Vector;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class HostSelector
+{
+ private final TreeSet<String> fBaseHosts;
+ private final DelayQueue<BlacklistEntry> fBlacklist;
+ private String fIdealHost;
+ private String fCurrentHost;
+ private static final Logger log = LoggerFactory.getLogger(HostSelector.class);
+
+ public HostSelector(String hostPart)
+ {
+ this(makeSet(hostPart), null);
+ }
+
+ public HostSelector(Collection<String> baseHosts)
+ {
+ this(baseHosts, null);
+ }
+
+ public HostSelector(Collection<String> baseHosts, String signature)
+ {
+ if (baseHosts.size() < 1)
+ {
+ throw new IllegalArgumentException("At least one host must be provided.");
+ }
+
+ this.fBaseHosts = new TreeSet(baseHosts);
+ this.fBlacklist = new DelayQueue();
+ this.fIdealHost = null;
+
+ if (signature == null) {
+ return;
+ }
+ int index = Math.abs(signature.hashCode()) % baseHosts.size();
+
+ Iterator it = this.fBaseHosts.iterator();
+ while (index-- > 0)
+ {
+ it.next();
+ }
+ this.fIdealHost = ((String)it.next());
+ }
+
+ public String selectBaseHost()
+ {
+ if (this.fCurrentHost == null)
+ {
+ makeSelection();
+ }
+ return this.fCurrentHost;
+ }
+
+ public void reportReachabilityProblem(long blacklistUnit, TimeUnit blacklistTimeUnit)
+ {
+ if (this.fCurrentHost == null)
+ {
+ log.warn("Reporting reachability problem, but no host is currently selected.");
+ }
+
+ if (blacklistUnit > 0L)
+ {
+ for (BlacklistEntry be : this.fBlacklist)
+ {
+ if (be.getHost().equals(this.fCurrentHost))
+ {
+ be.expireNow();
+ }
+ }
+
+ LinkedList devNull = new LinkedList();
+ this.fBlacklist.drainTo(devNull);
+
+ if (this.fCurrentHost != null)
+ {
+ this.fBlacklist.add(new BlacklistEntry(this.fCurrentHost, TimeUnit.MILLISECONDS.convert(blacklistUnit, blacklistTimeUnit)));
+ }
+ }
+ this.fCurrentHost = null;
+ }
+
+ private String makeSelection()
+ {
+ TreeSet workingSet = new TreeSet(this.fBaseHosts);
+
+ LinkedList devNull = new LinkedList();
+ this.fBlacklist.drainTo(devNull);
+ for (BlacklistEntry be : this.fBlacklist)
+ {
+ workingSet.remove(be.getHost());
+ }
+
+ if (workingSet.size() == 0)
+ {
+ log.warn("All hosts were blacklisted; reverting to full set of hosts.");
+ workingSet.addAll(this.fBaseHosts);
+ this.fCurrentHost = null;
+ }
+
+ String selection = null;
+ if ((this.fCurrentHost != null) && (workingSet.contains(this.fCurrentHost)))
+ {
+ selection = this.fCurrentHost;
+ }
+ else if ((this.fIdealHost != null) && (workingSet.contains(this.fIdealHost)))
+ {
+ selection = this.fIdealHost;
+ }
+ else
+ {
+ Vector v = new Vector(workingSet);
+ int index = Math.abs(new Random().nextInt()) % workingSet.size();
+ selection = (String)v.elementAt(index);
+ }
+
+ this.fCurrentHost = selection;
+ return this.fCurrentHost;
+ }
+
+ private static Set<String> makeSet(String s)
+ {
+ TreeSet set = new TreeSet();
+ set.add(s);
+ return set; }
+
+ private static class BlacklistEntry implements Delayed {
+ private final String fHost;
+ private long fExpireAtMs;
+
+ public BlacklistEntry(String host, long delayMs) {
+ this.fHost = host;
+ this.fExpireAtMs = (System.currentTimeMillis() + delayMs);
+ }
+
+ public void expireNow()
+ {
+ this.fExpireAtMs = 0L;
+ }
+
+ public String getHost()
+ {
+ return this.fHost;
+ }
+
+ public int compareTo(Delayed o)
+ {
+ Long thisDelay = Long.valueOf(getDelay(TimeUnit.MILLISECONDS));
+ return thisDelay.compareTo(Long.valueOf(o.getDelay(TimeUnit.MILLISECONDS)));
+ }
+
+ public long getDelay(TimeUnit unit)
+ {
+ long remainingMs = this.fExpireAtMs - System.currentTimeMillis();
+ return unit.convert(remainingMs, TimeUnit.MILLISECONDS);
+ }
+ }
+}
diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/MRBatchingPublisher.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/MRBatchingPublisher.java
new file mode 100644
index 0000000..4f09fce
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/MRBatchingPublisher.java
@@ -0,0 +1,56 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.response.MRPublisherResponse;
+
+/**
+ * A MR batching publisher is a publisher with additional functionality
+ * for managing delayed sends.
+ *
+ * @author author
+ *
+ */
+public interface MRBatchingPublisher extends MRPublisher
+{
+ /**
+ * Get the number of messages that have not yet been sent.
+ * @return the number of pending messages
+ */
+ int getPendingMessageCount ();
+
+ /**
+ * Close this publisher, sending any remaining messages.
+ * @param timeout an amount of time to wait for unsent messages to be sent
+ * @param timeoutUnits the time unit for the timeout arg
+ * @return a list of any unsent messages after the timeout
+ * @throws IOException exception
+ * @throws InterruptedException exception
+ */
+ List<message> close ( long timeout, TimeUnit timeoutUnits ) throws IOException, InterruptedException;
+
+ MRPublisherResponse sendBatchWithResponse ();
+}
diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/MRClient.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/MRClient.java
new file mode 100644
index 0000000..fea00a1
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/MRClient.java
@@ -0,0 +1,66 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client;
+
+import org.slf4j.Logger;
+
+
+public interface MRClient
+{
+ /**
+ * An exception at the MR layer. This is used when the HTTP transport
+ * layer returns a success code but the transaction is not completed as expected.
+ */
+ public class MRApiException extends Exception
+ {
+ public MRApiException ( String msg ) { super ( msg ); }
+ public MRApiException ( String msg, Throwable t ) { super ( msg, t ); }
+ private static final long serialVersionUID = 1L;
+ }
+
+ /**
+ * Optionally set the Logger to use
+ * @param log log
+ */
+ void logTo ( Logger log );
+
+ /**
+ * Set the API credentials for this client connection. Subsequent calls will
+ * include authentication headers.who i
+ */
+ /**
+ * @param apiKey apikey
+ * @param apiSecret apisec
+ */
+ void setApiCredentials ( String apiKey, String apiSecret );
+
+ /**
+ * Remove API credentials, if any, on this connection. Subsequent calls will not include
+ * authentication headers.
+ */
+ void clearApiCredentials ();
+
+ /**
+ * Close this connection. Some client interfaces have additional close capability.
+ */
+ void close ();
+}
diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/MRClientBuilders.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/MRClientBuilders.java
new file mode 100644
index 0000000..572f6d4
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/MRClientBuilders.java
@@ -0,0 +1,382 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client;
+
+import java.net.MalformedURLException;
+import java.util.Collection;
+import java.util.TreeSet;
+import java.util.UUID;
+
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.impl.MRConsumerImpl;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.impl.MRMetaClient;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.impl.MRSimplerBatchPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A collection of builders for various types of MR API clients
+ *
+ * @author author
+ */
+public class MRClientBuilders
+{
+ /**
+ * A builder for a topic Consumer
+ * @author author
+ */
+ public static class ConsumerBuilder
+ {
+ /**
+ * Construct a consumer builder.
+ */
+ public ConsumerBuilder () {}
+
+ /**
+ * Set the host list
+ * @param hostList a comma-separated list of hosts to use to connect to MR
+ * @return this builder
+ */
+ public ConsumerBuilder usingHosts ( String hostList ) { return usingHosts ( MRConsumerImpl.stringToList(hostList) ); }
+
+ /**
+ * Set the host list
+ * @param hostSet a set of hosts to use to connect to MR
+ * @return this builder
+ */
+ public ConsumerBuilder usingHosts ( Collection<String> hostSet ) { fHosts = hostSet; return this; }
+
+ /**
+ * Set the topic
+ * @param topic the name of the topic to consume
+ * @return this builder
+ */
+ public ConsumerBuilder onTopic ( String topic ) { fTopic=topic; return this; }
+
+ /**
+ * Set the consumer's group and ID
+ * @param consumerGroup The name of the consumer group this consumer is part of
+ * @param consumerId The unique id of this consumer in its group
+ * @return this builder
+ */
+ public ConsumerBuilder knownAs ( String consumerGroup, String consumerId ) { fGroup = consumerGroup; fId = consumerId; return this; }
+
+ /**
+ * Set the API key and secret for this client.
+ * @param apiKey
+ * @param apiSecret
+ * @return this builder
+ */
+ public ConsumerBuilder authenticatedBy ( String apiKey, String apiSecret ) { fApiKey = apiKey; fApiSecret = apiSecret; return this; }
+
+ /**
+ * Set the server side timeout
+ * @param timeoutMs The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic.
+ * @return this builder
+ */
+ public ConsumerBuilder waitAtServer ( int timeoutMs ) { fTimeoutMs = timeoutMs; return this; };
+
+ /**
+ * Set the maximum number of messages to receive per transaction
+ * @param limit The maximum number of messages to receive from the server in one transaction.
+ * @return this builder
+ */
+ public ConsumerBuilder receivingAtMost ( int limit ) { fLimit = limit; return this; };
+
+ /**
+ * Set a filter to use on the server
+ * @param filter a Highland Park standard library filter encoded in JSON
+ * @return this builder
+ */
+ public ConsumerBuilder withServerSideFilter ( String filter ) { fFilter = filter; return this; }
+
+ /**
+ * Build the consumer
+ * @return a consumer
+ */
+ public MRConsumer build ()
+ {
+ if ( fHosts == null || fHosts.size() == 0 || fTopic == null )
+ {
+ throw new IllegalArgumentException ( "You must provide at least one host and a topic name." );
+ }
+
+ if ( fGroup == null )
+ {
+ fGroup = UUID.randomUUID ().toString ();
+ fId = "0";
+ log.info ( "Creating non-restartable client with group " + fGroup + " and ID " + fId + "." );
+ }
+
+ if ( sfConsumerMock != null ) return sfConsumerMock;
+ try {
+ return new MRConsumerImpl ( fHosts, fTopic, fGroup, fId, fTimeoutMs, fLimit, fFilter, fApiKey, fApiSecret );
+ } catch (MalformedURLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private Collection<String> fHosts = null;
+ private String fTopic = null;
+ private String fGroup = null;
+ private String fId = null;
+ private String fApiKey = null;
+ private String fApiSecret = null;
+ private int fTimeoutMs = -1;
+ private int fLimit = -1;
+ private String fFilter = null;
+ }
+
+ /*************************************************************************/
+ /*************************************************************************/
+ /*************************************************************************/
+
+ /**
+ * A publisher builder
+ * @author author
+ */
+ public static class PublisherBuilder
+ {
+ public PublisherBuilder () {}
+
+ /**
+ * Set the MR/UEB host(s) to use
+ * @param hostlist The host(s) used in the URL to MR. Can be "host:port", can be multiple comma-separated entries.
+ * @return this builder
+ */
+ public PublisherBuilder usingHosts ( String hostlist ) { return usingHosts ( MRConsumerImpl.stringToList(hostlist) ); }
+
+ /**
+ * Set the MR/UEB host(s) to use
+ * @param hostSet The host(s) used in the URL to MR. Can be "host:port"
+ * @return this builder
+ */
+ public PublisherBuilder usingHosts ( String[] hostSet )
+ {
+ final TreeSet<String> hosts = new TreeSet<String> ();
+ for ( String hp : hostSet )
+ {
+ hosts.add ( hp );
+ }
+ return usingHosts ( hosts );
+ }
+
+ /**
+ * Set the MR/UEB host(s) to use
+ * @param hostlist The host(s) used in the URL to MR. Can be "host:port".
+ * @return this builder
+ */
+ public PublisherBuilder usingHosts ( Collection<String> hostlist ) { fHosts=hostlist; return this; }
+
+ /**
+ * Set the topic to publish on
+ * @param topic The topic on which to publish messages.
+ * @return this builder
+ */
+ public PublisherBuilder onTopic ( String topic ) { fTopic = topic; return this; }
+
+ /**
+ * Batch message sends with the given limits.
+ * @param messageCount The largest set of messages to batch.
+ * @param ageInMs The maximum age of a message waiting in a batch.
+ * @return this builder
+ */
+ public PublisherBuilder limitBatch ( int messageCount, int ageInMs ) { fMaxBatchSize = messageCount; fMaxBatchAgeMs = ageInMs; return this; }
+
+ /**
+ * Compress transactions
+ * @return this builder
+ */
+ public PublisherBuilder withCompresion () { return enableCompresion(true); }
+
+ /**
+ * Do not compress transactions
+ * @return this builder
+ */
+ public PublisherBuilder withoutCompresion () { return enableCompresion(false); }
+
+ /**
+ * Set the compression option
+ * @param compress true to gzip compress transactions
+ * @return this builder
+ */
+ public PublisherBuilder enableCompresion ( boolean compress ) { fCompress = compress; return this; }
+
+ /**
+ * Set the API key and secret for this client.
+ * @param apiKey
+ * @param apiSecret
+ * @return this builder
+ */
+ public PublisherBuilder authenticatedBy ( String apiKey, String apiSecret ) { fApiKey = apiKey; fApiSecret = apiSecret; return this; }
+
+ /**
+ * Build the publisher
+ * @return a batching publisher
+ */
+ public MRBatchingPublisher build ()
+ {
+ if ( fHosts == null || fHosts.size() == 0 || fTopic == null )
+ {
+ throw new IllegalArgumentException ( "You must provide at least one host and a topic name." );
+ }
+
+ if ( sfPublisherMock != null ) return sfPublisherMock;
+
+ final MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder ().
+ againstUrls ( fHosts ).
+ onTopic ( fTopic ).
+ batchTo ( fMaxBatchSize, fMaxBatchAgeMs ).
+ compress ( fCompress ).
+ build ();
+ if ( fApiKey != null )
+ {
+ pub.setApiCredentials ( fApiKey, fApiSecret );
+ }
+ return pub;
+ }
+
+ private Collection<String> fHosts = null;
+ private String fTopic = null;
+ private int fMaxBatchSize = 1;
+ private int fMaxBatchAgeMs = 1;
+ private boolean fCompress = false;
+ private String fApiKey = null;
+ private String fApiSecret = null;
+ }
+
+ /**
+ * A builder for an identity manager
+ * @author author
+ */
+ public static class IdentityManagerBuilder extends AbstractAuthenticatedManagerBuilder<MRIdentityManager>
+ {
+ /**
+ * Construct an identity manager builder.
+ */
+ public IdentityManagerBuilder () {}
+
+ @Override
+ protected MRIdentityManager constructClient ( Collection<String> hosts ) { try {
+ return new MRMetaClient ( hosts );
+ } catch (MalformedURLException e) {
+ throw new RuntimeException(e);
+ } }
+ }
+
+ /**
+ * A builder for a topic manager
+ * @author author
+ */
+ public static class TopicManagerBuilder extends AbstractAuthenticatedManagerBuilder<MRTopicManager>
+ {
+ /**
+ * Construct an topic manager builder.
+ */
+ public TopicManagerBuilder () {}
+
+ @Override
+ protected MRTopicManager constructClient ( Collection<String> hosts ) { try {
+ return new MRMetaClient ( hosts );
+ } catch (MalformedURLException e) {
+ throw new RuntimeException(e);
+ } }
+ }
+
+ /**
+ * Inject a consumer. Used to support unit tests.
+ * @param cc
+ */
+ public static void $testInject ( MRConsumer cc )
+ {
+ sfConsumerMock = cc;
+ }
+
+ /**
+ * Inject a publisher. Used to support unit tests.
+ * @param pub
+ */
+ public static void $testInject ( MRBatchingPublisher pub )
+ {
+ sfPublisherMock = pub;
+ }
+
+ static MRConsumer sfConsumerMock = null;
+ static MRBatchingPublisher sfPublisherMock = null;
+
+ /**
+ * A builder for an identity manager
+ * @author author
+ */
+ public static abstract class AbstractAuthenticatedManagerBuilder<T extends MRClient>
+ {
+ /**
+ * Construct an identity manager builder.
+ */
+ public AbstractAuthenticatedManagerBuilder () {}
+
+ /**
+ * Set the host list
+ * @param hostList a comma-separated list of hosts to use to connect to MR
+ * @return this builder
+ */
+ public AbstractAuthenticatedManagerBuilder<T> usingHosts ( String hostList ) { return usingHosts ( MRConsumerImpl.stringToList(hostList) ); }
+
+ /**
+ * Set the host list
+ * @param hostSet a set of hosts to use to connect to MR
+ * @return this builder
+ */
+ public AbstractAuthenticatedManagerBuilder<T> usingHosts ( Collection<String> hostSet ) { fHosts = hostSet; return this; }
+
+ /**
+ * Set the API key and secret for this client.
+ * @param apiKey
+ * @param apiSecret
+ * @return this builder
+ */
+ public AbstractAuthenticatedManagerBuilder<T> authenticatedBy ( String apiKey, String apiSecret ) { fApiKey = apiKey; fApiSecret = apiSecret; return this; }
+
+ /**
+ * Build the consumer
+ * @return a consumer
+ */
+ public T build ()
+ {
+ if ( fHosts == null || fHosts.size() == 0 )
+ {
+ throw new IllegalArgumentException ( "You must provide at least one host and a topic name." );
+ }
+
+ final T mgr = constructClient ( fHosts );
+ mgr.setApiCredentials ( fApiKey, fApiSecret );
+ return mgr;
+ }
+
+ protected abstract T constructClient ( Collection<String> hosts );
+
+ private Collection<String> fHosts = null;
+ private String fApiKey = null;
+ private String fApiSecret = null;
+ }
+
+ private static final Logger log = LoggerFactory.getLogger ( MRClientBuilders.class );
+}
diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/MRClientFactory.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/MRClientFactory.java
new file mode 100644
index 0000000..93d50be
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/MRClientFactory.java
@@ -0,0 +1,558 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Properties;
+import java.util.TreeSet;
+import java.util.UUID;
+
+import javax.ws.rs.core.MultivaluedMap;
+
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.impl.MRConsumerImpl;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.impl.MRMetaClient;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.impl.MRSimplerBatchPublisher;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.test.clients.ProtocolTypeConstants;
+
+/**
+ * A factory for MR clients.<br/>
+ * <br/>
+ * Use caution selecting a consumer creator factory. If the call doesn't accept a consumer group name, then it creates
+ * a consumer that is not restartable. That is, if you stop your process and start it again, your client will NOT receive
+ * any missed messages on the topic. If you need to ensure receipt of missed messages, then you must use a consumer that's
+ * created with a group name and ID. (If you create multiple consumer processes using the same group, load is split across
+ * them. Be sure to use a different ID for each instance.)<br/>
+ * <br/>
+ * Publishers
+ *
+ * @author author
+ */
+public class MRClientFactory
+{
+ public static MultivaluedMap<String, Object> HTTPHeadersMap;
+ public static Map<String, String> DME2HeadersMap;
+ public static String routeFilePath;
+
+ public static FileReader routeReader;
+
+ public static FileWriter routeWriter= null;
+ public static Properties prop=null;
+ //routeReader= new FileReader(new File (routeFilePath));
+ //props= new Properties();
+ /**
+ * Create a consumer instance with the default timeout and no limit
+ * on messages returned. This consumer operates as an independent consumer (i.e., not in a group) and is NOT re-startable
+ * across sessions.
+ *
+ * @param hostList A comma separated list of hosts to use to connect to MR.
+ * You can include port numbers (3904 is the default). For example, "hostname:8080,"
+ *
+ * @param topic The topic to consume
+ *
+ * @return a consumer
+ */
+ public static MRConsumer createConsumer ( String hostList, String topic )
+ {
+ return createConsumer ( MRConsumerImpl.stringToList(hostList), topic );
+ }
+
+ /**
+ * Create a consumer instance with the default timeout and no limit
+ * on messages returned. This consumer operates as an independent consumer (i.e., not in a group) and is NOT re-startable
+ * across sessions.
+ *
+ * @param hostSet The host used in the URL to MR. Entries can be "host:port".
+ * @param topic The topic to consume
+ *
+ * @return a consumer
+ */
+ public static MRConsumer createConsumer ( Collection<String> hostSet, String topic )
+ {
+ return createConsumer ( hostSet, topic, null );
+ }
+
+ /**
+ * Create a consumer instance with server-side filtering, the default timeout, and no limit
+ * on messages returned. This consumer operates as an independent consumer (i.e., not in a group) and is NOT re-startable
+ * across sessions.
+ *
+ * @param hostSet The host used in the URL to MR. Entries can be "host:port".
+ * @param topic The topic to consume
+ * @param filter a filter to use on the server side
+ *
+ * @return a consumer
+ */
+ public static MRConsumer createConsumer ( Collection<String> hostSet, String topic, String filter )
+ {
+ return createConsumer ( hostSet, topic, UUID.randomUUID ().toString (), "0", -1, -1, filter, null, null );
+ }
+
+ /**
+ * Create a consumer instance with the default timeout, and no limit
+ * on messages returned. This consumer can operate in a logical group and is re-startable
+ * across sessions when you use the same group and ID on restart.
+ *
+ * @param hostSet The host used in the URL to MR. Entries can be "host:port".
+ * @param topic The topic to consume
+ * @param consumerGroup The name of the consumer group this consumer is part of
+ * @param consumerId The unique id of this consume in its group
+ *
+ * @return a consumer
+ */
+ public static MRConsumer createConsumer ( Collection<String> hostSet, final String topic, final String consumerGroup, final String consumerId )
+ {
+ return createConsumer ( hostSet, topic, consumerGroup, consumerId, -1, -1 );
+ }
+
+ /**
+ * Create a consumer instance with the default timeout, and no limit
+ * on messages returned. This consumer can operate in a logical group and is re-startable
+ * across sessions when you use the same group and ID on restart.
+ *
+ * @param hostSet The host used in the URL to MR. Entries can be "host:port".
+ * @param topic The topic to consume
+ * @param consumerGroup The name of the consumer group this consumer is part of
+ * @param consumerId The unique id of this consume in its group
+ * @param timeoutMs The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic. Use -1 for default timeout.
+ * @param limit A limit on the number of messages returned in a single call. Use -1 for no limit.
+ *
+ * @return a consumer
+ */
+ public static MRConsumer createConsumer ( Collection<String> hostSet, final String topic, final String consumerGroup, final String consumerId, int timeoutMs, int limit)
+ {
+ return createConsumer ( hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, null, null, null );
+ }
+
+ /**
+ * Create a consumer instance with the default timeout, and no limit
+ * on messages returned. This consumer can operate in a logical group and is re-startable
+ * across sessions when you use the same group and ID on restart. This consumer also uses
+ * server-side filtering.
+ *
+ * @param hostList A comma separated list of hosts to use to connect to MR.
+ * You can include port numbers (3904 is the default). For example, "ueb01hydc.it.att.com:8080,ueb02hydc.it.att.com"
+ * @param topic The topic to consume
+ * @param consumerGroup The name of the consumer group this consumer is part of
+ * @param consumerId The unique id of this consume in its group
+ * @param timeoutMs The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic. Use -1 for default timeout.
+ * @param limit A limit on the number of messages returned in a single call. Use -1 for no limit.
+ * @param filter A Highland Park filter expression using only built-in filter components. Use null for "no filter".
+ *
+ * @return a consumer
+ */
+ public static MRConsumer createConsumer ( String hostList, final String topic, final String consumerGroup,
+ final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret )
+ {
+ return createConsumer ( MRConsumerImpl.stringToList(hostList), topic, consumerGroup,
+ consumerId, timeoutMs, limit, filter, apiKey, apiSecret );
+ }
+
+ /**
+ * Create a consumer instance with the default timeout, and no limit
+ * on messages returned. This consumer can operate in a logical group and is re-startable
+ * across sessions when you use the same group and ID on restart. This consumer also uses
+ * server-side filtering.
+ *
+ * @param hostSet The host used in the URL to MR. Entries can be "host:port".
+ * @param topic The topic to consume
+ * @param consumerGroup The name of the consumer group this consumer is part of
+ * @param consumerId The unique id of this consume in its group
+ * @param timeoutMs The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic. Use -1 for default timeout.
+ * @param limit A limit on the number of messages returned in a single call. Use -1 for no limit.
+ * @param filter A Highland Park filter expression using only built-in filter components. Use null for "no filter".
+ *
+ * @return a consumer
+ */
+ public static MRConsumer createConsumer ( Collection<String> hostSet, final String topic, final String consumerGroup,
+ final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret )
+ {
+ if ( MRClientBuilders.sfConsumerMock != null ) return MRClientBuilders.sfConsumerMock;
+ try {
+ return new MRConsumerImpl ( hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey, apiSecret );
+ } catch (MalformedURLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /*************************************************************************/
+ /*************************************************************************/
+ /*************************************************************************/
+
+ /**
+ * Create a publisher that sends each message (or group of messages) immediately. Most
+ * applications should favor higher latency for much higher message throughput and the
+ * "simple publisher" is not a good choice.
+ *
+ * @param hostlist The host used in the URL to MR. Can be "host:port", can be multiple comma-separated entries.
+ * @param topic The topic on which to publish messages.
+ * @return a publisher
+ */
+ public static MRBatchingPublisher createSimplePublisher ( String hostlist, String topic )
+ {
+ return createBatchingPublisher ( hostlist, topic, 1, 1 );
+ }
+
+ /**
+ * Create a publisher that batches messages. Be sure to close the publisher to
+ * send the last batch and ensure a clean shutdown. Message payloads are not compressed.
+ *
+ * @param hostlist The host used in the URL to MR. Can be "host:port", can be multiple comma-separated entries.
+ * @param topic The topic on which to publish messages.
+ * @param maxBatchSize The largest set of messages to batch
+ * @param maxAgeMs The maximum age of a message waiting in a batch
+ *
+ * @return a publisher
+ */
+ public static MRBatchingPublisher createBatchingPublisher ( String hostlist, String topic, int maxBatchSize, long maxAgeMs )
+ {
+ return createBatchingPublisher ( hostlist, topic, maxBatchSize, maxAgeMs, false );
+ }
+
+ /**
+ * Create a publisher that batches messages. Be sure to close the publisher to
+ * send the last batch and ensure a clean shutdown.
+ *
+ * @param hostlist The host used in the URL to MR. Can be "host:port", can be multiple comma-separated entries.
+ * @param topic The topic on which to publish messages.
+ * @param maxBatchSize The largest set of messages to batch
+ * @param maxAgeMs The maximum age of a message waiting in a batch
+ * @param compress use gzip compression
+ *
+ * @return a publisher
+ */
+ public static MRBatchingPublisher createBatchingPublisher ( String hostlist, String topic, int maxBatchSize, long maxAgeMs, boolean compress )
+ {
+ return createBatchingPublisher ( MRConsumerImpl.stringToList(hostlist), topic, maxBatchSize, maxAgeMs, compress );
+ }
+
+ /**
+ * Create a publisher that batches messages. Be sure to close the publisher to
+ * send the last batch and ensure a clean shutdown.
+ *
+ * @param hostSet A set of hosts to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover.
+ * @param topic The topic on which to publish messages.
+ * @param maxBatchSize The largest set of messages to batch
+ * @param maxAgeMs The maximum age of a message waiting in a batch
+ * @param compress use gzip compression
+ *
+ * @return a publisher
+ */
+ public static MRBatchingPublisher createBatchingPublisher ( String[] hostSet, String topic, int maxBatchSize, long maxAgeMs, boolean compress )
+ {
+ final TreeSet<String> hosts = new TreeSet<String> ();
+ for ( String hp : hostSet )
+ {
+ hosts.add ( hp );
+ }
+ return createBatchingPublisher ( hosts, topic, maxBatchSize, maxAgeMs, compress );
+ }
+
+ /**
+ * Create a publisher that batches messages. Be sure to close the publisher to
+ * send the last batch and ensure a clean shutdown.
+ *
+ * @param hostSet A set of hosts to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover.
+ * @param topic The topic on which to publish messages.
+ * @param maxBatchSize The largest set of messages to batch
+ * @param maxAgeMs The maximum age of a message waiting in a batch
+ * @param compress use gzip compression
+ *
+ * @return a publisher
+ */
+ public static MRBatchingPublisher createBatchingPublisher ( Collection<String> hostSet, String topic, int maxBatchSize, long maxAgeMs, boolean compress )
+ {
+ return new MRSimplerBatchPublisher.Builder ().
+ againstUrls ( hostSet ).
+ onTopic ( topic ).
+ batchTo ( maxBatchSize, maxAgeMs ).
+ compress ( compress ).
+ build ();
+ }
+
+ /**
+ * Create a publisher that batches messages. Be sure to close the publisher to
+ * send the last batch and ensure a clean shutdown.
+ * @param host A host to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover.
+ * @param topic The topic on which to publish messages.
+ * @param username username
+ * @param password password
+ * @param maxBatchSize The largest set of messages to batch
+ * @param maxAgeMs The maximum age of a message waiting in a batch
+ * @param compress use gzip compression
+ * @param protocolFlag http auth or ueb auth or dme2 method
+ * @param producerFilePath all properties for publisher
+ * @return MRBatchingPublisher obj
+ */
+ public static MRBatchingPublisher createBatchingPublisher ( String host, String topic, final String username, final String password, int maxBatchSize, long maxAgeMs, boolean compress, String protocolFlag, String producerFilePath )
+ {
+ MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder ().
+ againstUrls(MRConsumerImpl.stringToList(host)).
+ onTopic ( topic ).
+ batchTo ( maxBatchSize, maxAgeMs ).
+ compress ( compress ).
+ build ();
+
+ pub.setHost(host);
+ pub.setUsername(username);
+ pub.setPassword(password);
+ pub.setProtocolFlag(protocolFlag);
+ pub.setProducerFilePath(producerFilePath);
+ return pub;
+ }
+
+
+ /**
+ * Create a publisher that batches messages. Be sure to close the publisher to
+ * send the last batch and ensure a clean shutdown
+ * @param producerFilePath set all properties for publishing message
+ * @return MRBatchingPublisher obj
+ * @throws FileNotFoundException exc
+ * @throws IOException ioex
+ */
+ public static MRBatchingPublisher createBatchingPublisher ( final String producerFilePath ) throws FileNotFoundException,IOException {
+ FileReader reader = new FileReader(new File (producerFilePath));
+ Properties props = new Properties();
+ props.load(reader);
+ MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder ().
+ againstUrls(MRConsumerImpl.stringToList(props.getProperty("host"))).
+ onTopic ( props.getProperty("topic") ).
+ batchTo (Integer.parseInt(props.getProperty("maxBatchSize")), Integer.parseInt(props.getProperty("maxAgeMs").toString())).
+ compress (Boolean.parseBoolean(props.getProperty("compress"))).
+ httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))).
+ build ();
+ pub.setHost(props.getProperty("host"));
+ if(props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())){
+
+ pub.setAuthKey(props.getProperty("authKey"));
+ pub.setAuthDate(props.getProperty("authDate"));
+ pub.setUsername(props.getProperty("username"));
+ pub.setPassword(props.getProperty("password"));
+ }else{
+ pub.setUsername(props.getProperty("username"));
+ pub.setPassword(props.getProperty("password"));
+ }
+ pub.setProducerFilePath(producerFilePath);
+ pub.setProtocolFlag(props.getProperty("TransportType"));
+ pub.setProps(props);
+ routeFilePath=props.getProperty("DME2preferredRouterFilePath");
+ routeReader= new FileReader(new File (routeFilePath));
+ prop= new Properties();
+ File fo= new File(routeFilePath);
+ if(!fo.exists()){
+ routeWriter=new FileWriter(new File(routeFilePath));
+ }
+ //pub.setContentType(contentType);
+ return pub;
+ }
+
+ /**
+ * Create a publisher that will contain send methods that return
+ * response object to user.
+ * @param producerFilePath set all properties for publishing message
+ * @return MRBatchingPublisher obj
+ * @throws FileNotFoundException exc
+ * @throws IOException ioex
+ */
+ public static MRBatchingPublisher createBatchingPublisher ( final String producerFilePath, boolean withResponse ) throws FileNotFoundException,IOException {
+ FileReader reader = new FileReader(new File (producerFilePath));
+ Properties props = new Properties();
+ props.load(reader);
+ MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder ().
+ againstUrls(MRConsumerImpl.stringToList(props.getProperty("host"))).
+ onTopic ( props.getProperty("topic") ).
+ batchTo (Integer.parseInt(props.getProperty("maxBatchSize")), Integer.parseInt(props.getProperty("maxAgeMs").toString())).
+ compress (Boolean.parseBoolean(props.getProperty("compress"))).
+ httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))).
+ withResponse(withResponse).
+ build ();
+ pub.setHost(props.getProperty("host"));
+ if(props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())){
+
+ pub.setAuthKey(props.getProperty("authKey"));
+ pub.setAuthDate(props.getProperty("authDate"));
+ pub.setUsername(props.getProperty("username"));
+ pub.setPassword(props.getProperty("password"));
+ }else{
+ pub.setUsername(props.getProperty("username"));
+ pub.setPassword(props.getProperty("password"));
+ }
+ pub.setProducerFilePath(producerFilePath);
+ pub.setProtocolFlag(props.getProperty("TransportType"));
+ pub.setProps(props);
+ routeFilePath=props.getProperty("DME2preferredRouterFilePath");
+ routeReader= new FileReader(new File (routeFilePath));
+ prop= new Properties();
+ File fo= new File(routeFilePath);
+ if(!fo.exists()){
+ routeWriter=new FileWriter(new File(routeFilePath));
+ }
+ //pub.setContentType(contentType);
+ return pub;
+ }
+
+
+
+
+
+
+
+
+
+
+
+ /**
+ * Create an identity manager client to work with API keys.
+ * @param hostSet A set of hosts to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover.
+ * @param apiKey Your API key
+ * @param apiSecret Your API secret
+ * @return an identity manager
+ */
+ public static MRIdentityManager createIdentityManager ( Collection<String> hostSet, String apiKey, String apiSecret )
+ {
+ MRIdentityManager cim;
+ try {
+ cim = new MRMetaClient ( hostSet );
+ } catch (MalformedURLException e) {
+ throw new RuntimeException(e);
+ }
+ cim.setApiCredentials ( apiKey, apiSecret );
+ return cim;
+ }
+
+ /**
+ * Create a topic manager for working with topics.
+ * @param hostSet A set of hosts to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover.
+ * @param apiKey Your API key
+ * @param apiSecret Your API secret
+ * @return a topic manager
+ */
+ public static MRTopicManager createTopicManager ( Collection<String> hostSet, String apiKey, String apiSecret )
+ {
+ MRMetaClient tmi;
+ try {
+ tmi = new MRMetaClient ( hostSet );
+ } catch (MalformedURLException e) {
+ throw new RuntimeException(e);
+ }
+ tmi.setApiCredentials ( apiKey, apiSecret );
+ return tmi;
+ }
+
+ /**
+ * Inject a consumer. Used to support unit tests.
+ * @param cc
+ */
+ public static void $testInject ( MRConsumer cc )
+ {
+ MRClientBuilders.sfConsumerMock = cc;
+ }
+
+ public static MRConsumer createConsumer(String host, String topic, String username,
+ String password, String group, String id, int i, int j,String protocalFlag,String consumerFilePath) {
+
+ MRConsumerImpl sub;
+ try {
+ sub = new MRConsumerImpl ( MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null );
+ } catch (MalformedURLException e) {
+ throw new RuntimeException(e);
+ }
+ sub.setUsername(username);
+ sub.setPassword(password);
+ sub.setHost(host);
+ sub.setProtocolFlag(protocalFlag);
+ sub.setConsumerFilePath(consumerFilePath);
+ return sub;
+
+ }
+
+ public static MRConsumer createConsumer(String host, String topic, String username,
+ String password, String group, String id,String protocalFlag,String consumerFilePath, int i, int j) {
+
+ MRConsumerImpl sub;
+ try {
+ sub = new MRConsumerImpl ( MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null );
+ } catch (MalformedURLException e) {
+ throw new RuntimeException(e);
+ }
+ sub.setUsername(username);
+ sub.setPassword(password);
+ sub.setHost(host);
+ sub.setProtocolFlag(protocalFlag);
+ sub.setConsumerFilePath(consumerFilePath);
+ return sub;
+
+ }
+
+ public static MRConsumer createConsumer(String consumerFilePath) throws FileNotFoundException,IOException {
+ FileReader reader = new FileReader(new File (consumerFilePath));
+ Properties props = new Properties();
+ props.load(reader);
+ int timeout;
+ if(props.getProperty("timeout")!=null)
+ timeout=Integer.parseInt(props.getProperty("timeout"));
+ else
+ timeout=-1;
+ int limit;
+ if(props.getProperty("limit")!=null)
+ limit=Integer.parseInt(props.getProperty("limit"));
+ else
+ limit=-1;
+ String group;
+ if(props.getProperty("group")==null)
+ group=UUID.randomUUID ().toString();
+ else
+ group=props.getProperty("group");
+ MRConsumerImpl sub=null;
+ if(props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())){
+ sub = new MRConsumerImpl ( MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty("topic"), group, props.getProperty("id"),timeout, limit, props.getProperty("filter"),props.getProperty("authKey"), props.getProperty("authDate") );
+ sub.setAuthKey(props.getProperty("authKey"));
+ sub.setAuthDate(props.getProperty("authDate"));
+ sub.setUsername(props.getProperty("username"));
+ sub.setPassword(props.getProperty("password"));
+ }else{
+ sub = new MRConsumerImpl ( MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty("topic"), group, props.getProperty("id"), timeout, limit, props.getProperty("filter"),props.getProperty("username"), props.getProperty("password") );
+ sub.setUsername(props.getProperty("username"));
+ sub.setPassword(props.getProperty("password"));
+ }
+ sub.setRouterFilePath(props.getProperty("DME2preferredRouterFilePath"));
+ sub.setProps(props);
+ sub.setHost(props.getProperty("host"));
+ sub.setProtocolFlag(props.getProperty("TransportType"));
+ //sub.setConsumerFilePath(consumerFilePath);
+ sub.setfFilter(props.getProperty("filter"));
+ routeFilePath=props.getProperty("DME2preferredRouterFilePath");
+ routeReader= new FileReader(new File (routeFilePath));
+ prop= new Properties();
+ File fo= new File(routeFilePath);
+ if(!fo.exists()){
+ routeWriter=new FileWriter(new File(routeFilePath));
+ }
+ return sub;
+ }
+}
diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/MRConsumer.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/MRConsumer.java
new file mode 100644
index 0000000..214552b
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/MRConsumer.java
@@ -0,0 +1,54 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client;
+
+import java.io.IOException;
+
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.response.MRConsumerResponse;
+
+public interface MRConsumer extends MRClient
+{
+ /**
+ * Fetch a set of messages. The consumer's timeout and message limit are used if set in the constructor call.
+
+ * @return a set of messages
+ * @throws IOException
+ */
+ Iterable<String> fetch () throws IOException, Exception;
+
+ /**
+ * Fetch a set of messages with an explicit timeout and limit for this call. These values
+ * override any set in the constructor call.
+ *
+ * @param timeoutMs The amount of time in milliseconds that the server should keep the connection
+ * open while waiting for message traffic. Use -1 for default timeout (controlled on the server-side).
+ * @param limit A limit on the number of messages returned in a single call. Use -1 for no limit.
+ * @return a set messages
+ * @throws IOException if there's a problem connecting to the server
+ */
+ Iterable<String> fetch ( int timeoutMs, int limit ) throws IOException, Exception;
+
+ MRConsumerResponse fetchWithReturnConsumerResponse ();
+
+
+ MRConsumerResponse fetchWithReturnConsumerResponse ( int timeoutMs, int limit );
+}
diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/MRIdentityManager.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/MRIdentityManager.java
new file mode 100644
index 0000000..34826c9
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/MRIdentityManager.java
@@ -0,0 +1,100 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client;
+
+import java.io.IOException;
+
+import com.att.nsa.apiClient.credentials.ApiCredential;
+import com.att.nsa.apiClient.http.HttpException;
+import com.att.nsa.apiClient.http.HttpObjectNotFoundException;
+
+/**
+ * A client for manipulating API keys.
+ * @author author
+ *
+ */
+public interface MRIdentityManager extends MRClient
+{
+ /**
+ * An API Key record
+ */
+ public interface ApiKey
+ {
+ /**
+ * Get the email address associated with the API key
+ * @return the email address on the API key or null
+ */
+ String getEmail ();
+
+ /**
+ * Get the description associated with the API key
+ * @return the description on the API key or null
+ */
+ String getDescription ();
+ }
+
+ /**
+ * Create a new API key on the UEB cluster. The returned credential instance
+ * contains the new API key and API secret. This is the only time the secret
+ * is available to the client -- there's no API for retrieving it later -- so
+ * your application must store it securely.
+ *
+ * @param email
+ * @param description
+ * @return a new credential
+ * @throws HttpException
+ * @throws MRApiException
+ * @throws IOException
+ */
+ ApiCredential createApiKey ( String email, String description ) throws HttpException, MRApiException, IOException;
+
+ /**
+ * Get basic info about a known API key
+ * @param apiKey
+ * @return the API key's info or null if it doesn't exist
+ * @throws HttpObjectNotFoundException, HttpException, MRApiException
+ * @throws IOException
+ */
+ ApiKey getApiKey ( String apiKey ) throws HttpObjectNotFoundException, HttpException, MRApiException, IOException;
+
+ /**
+ * Update the record for the API key used to authenticate this request. The UEB
+ * API requires that you authenticate with the same key you're updating, so the
+ * API key being changed is the one used for setApiCredentials.
+ *
+ * @param email use null to keep the current value
+ * @param description use null to keep the current value
+ * @throws IOException
+ * @throws HttpException
+ * @throws HttpObjectNotFoundException
+ */
+ void updateCurrentApiKey ( String email, String description ) throws HttpObjectNotFoundException, HttpException, IOException;
+
+ /**
+ * Delete the *current* API key. After this call returns, the API key
+ * used to authenticate will no longer be valid.
+ *
+ * @throws IOException
+ * @throws HttpException
+ */
+ void deleteCurrentApiKey () throws HttpException, IOException;
+}
diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/MRPublisher.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/MRPublisher.java
new file mode 100644
index 0000000..165bf0f
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/MRPublisher.java
@@ -0,0 +1,93 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * A MR publishing interface.
+ *
+ */
+public interface MRPublisher extends MRClient
+{
+ /**
+ * A simple message container
+ */
+ public static class message
+ {
+ public message ( String partition, String msg )
+ {
+ fPartition = partition == null ? "" : partition;
+ fMsg = msg;
+ if ( fMsg == null )
+ {
+ throw new IllegalArgumentException ( "Can't send a null message." );
+ }
+ }
+
+ public message ( message msg )
+ {
+ this ( msg.fPartition, msg.fMsg );
+ }
+
+ public final String fPartition;
+ public final String fMsg;
+ }
+
+ /**
+ * Send the given message without partition. partition will be placed at HTTP request level.
+ * @param msg message to sent
+ * @return the number of pending messages
+ * @throws IOException exception
+ */
+ int send ( String msg ) throws IOException;
+ /**
+ * Send the given message using the given partition.
+ * @param partition partition
+ * @param msg message
+ * @return the number of pending messages
+ * @throws IOException exception
+ */
+ int send ( String partition, String msg ) throws IOException;
+
+ /**
+ * Send the given message using its partition.
+ * @param msg mesg
+ * @return the number of pending messages
+ * @throws IOException exp
+ */
+ int send ( message msg ) throws IOException;
+
+ /**
+ * Send the given messages using their partitions.
+ * @param msgs msg
+ * @return the number of pending messages
+ * @throws IOException exp
+ */
+ int send ( Collection<message> msgs ) throws IOException;
+
+ /**
+ * Close this publisher. It's an error to call send() after close()
+ */
+ void close ();
+}
diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/MRTopicManager.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/MRTopicManager.java
new file mode 100644
index 0000000..3703385
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/MRTopicManager.java
@@ -0,0 +1,183 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client;
+
+import java.io.IOException;
+import java.util.Set;
+
+import com.att.nsa.apiClient.http.HttpException;
+import com.att.nsa.apiClient.http.HttpObjectNotFoundException;
+
+
+/**
+ * A client for working with topic metadata.
+ * @author author
+ */
+public interface MRTopicManager extends MRClient
+{
+ /**
+ * Get the topics available in the cluster
+ * @return a set of topic names
+ * @throws IOException
+ */
+ Set<String> getTopics () throws IOException;
+
+ /**
+ * Information about a topic.
+ */
+ public interface TopicInfo
+ {
+ /**
+ * Get the owner of the topic
+ * @return the owner, or null if no entry
+ */
+ String getOwner ();
+
+ /**
+ * Get the description for this topic
+ * @return the description, or null if no entry
+ */
+ String getDescription ();
+
+ /**
+ * Get the set of allowed producers (as API keys) on this topic
+ * @return the set of allowed producers, null of no ACL exists/enabled
+ */
+ Set<String> getAllowedProducers ();
+
+ /**
+ * Get the set of allowed consumers (as API keys) on this topic
+ * @return the set of allowed consumers, null of no ACL exists/enabled
+ */
+ Set<String> getAllowedConsumers ();
+ }
+
+ /**
+ * Get information about a topic.
+ * @param topic
+ * @return topic information
+ * @throws IOException
+ * @throws HttpObjectNotFoundException
+ */
+ TopicInfo getTopicMetadata ( String topic ) throws HttpObjectNotFoundException, IOException;
+
+ /**
+ * Create a new topic.
+ * @param topicName
+ * @param topicDescription
+ * @param partitionCount
+ * @param replicationCount
+ * @throws HttpException
+ * @throws IOException
+ */
+ void createTopic ( String topicName, String topicDescription, int partitionCount, int replicationCount ) throws HttpException, IOException;
+
+ /**
+ * Delete the topic. This call must be authenticated and the API key listed as owner on the topic.
+ * NOTE: The MR (UEB) API server does not support topic deletion at this time (mid 2015)
+ * @param topic
+ * @throws HttpException
+ * @throws IOException
+ * @deprecated If/when the Kafka system supports topic delete, or the implementation changes, this will be restored.
+ */
+ @Deprecated
+ void deleteTopic ( String topic ) throws HttpException, IOException;
+
+ /**
+ * Can any client produce events into this topic without authentication?
+ * @param topic
+ * @return true if the topic is open for producing
+ * @throws IOException
+ * @throws HttpObjectNotFoundException
+ */
+ boolean isOpenForProducing ( String topic ) throws HttpObjectNotFoundException, IOException;
+
+ /**
+ * Get the set of allowed producers. If the topic is open, the result is null.
+ * @param topic
+ * @return a set of allowed producers or null
+ * @throws IOException
+ * @throws HttpObjectNotFoundException
+ */
+ Set<String> getAllowedProducers ( String topic ) throws HttpObjectNotFoundException, IOException;
+
+ /**
+ * Allow the given API key to produce messages on the given topic. The caller must
+ * own this topic.
+ * @param topic
+ * @param apiKey
+ * @throws HttpException
+ * @throws HttpObjectNotFoundException
+ * @throws IOException
+ */
+ void allowProducer ( String topic, String apiKey ) throws HttpObjectNotFoundException, HttpException, IOException;
+
+ /**
+ * Revoke the given API key's authorization to produce messages on the given topic.
+ * The caller must own this topic.
+ * @param topic
+ * @param apiKey
+ * @throws HttpException
+ * @throws IOException
+ */
+ void revokeProducer ( String topic, String apiKey ) throws HttpException, IOException;
+
+ /**
+ * Can any client consume events from this topic without authentication?
+ * @param topic
+ * @return true if the topic is open for consuming
+ * @throws IOException
+ * @throws HttpObjectNotFoundException
+ */
+ boolean isOpenForConsuming ( String topic ) throws HttpObjectNotFoundException, IOException;
+
+ /**
+ * Get the set of allowed consumers. If the topic is open, the result is null.
+ * @param topic
+ * @return a set of allowed consumers or null
+ * @throws IOException
+ * @throws HttpObjectNotFoundException
+ */
+ Set<String> getAllowedConsumers ( String topic ) throws HttpObjectNotFoundException, IOException;
+
+ /**
+ * Allow the given API key to consume messages on the given topic. The caller must
+ * own this topic.
+ * @param topic
+ * @param apiKey
+ * @throws HttpException
+ * @throws HttpObjectNotFoundException
+ * @throws IOException
+ */
+ void allowConsumer ( String topic, String apiKey ) throws HttpObjectNotFoundException, HttpException, IOException;
+
+ /**
+ * Revoke the given API key's authorization to consume messages on the given topic.
+ * The caller must own this topic.
+ * @param topic
+ * @param apiKey
+ * @throws HttpException
+ * @throws IOException
+ */
+ void revokeConsumer ( String topic, String apiKey ) throws HttpException, IOException;
+}
+
diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/Clock.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/Clock.java
new file mode 100644
index 0000000..e1ea06e
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/Clock.java
@@ -0,0 +1,63 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.impl;
+
+public class Clock
+{
+ public synchronized static Clock getIt ()
+ {
+ if ( sfClock == null )
+ {
+ sfClock = new Clock ();
+ }
+ return sfClock;
+ }
+
+ /**
+ * Get the system's current time in milliseconds.
+ * @return the current time
+ */
+ public static long now ()
+ {
+ return getIt().nowImpl ();
+ }
+
+ /**
+ * Get current time in milliseconds
+ * @return current time in ms
+ */
+ protected long nowImpl ()
+ {
+ return System.currentTimeMillis ();
+ }
+
+ protected Clock ()
+ {
+ }
+
+ private static Clock sfClock = null;
+
+ protected synchronized static void register ( Clock testClock )
+ {
+ sfClock = testClock;
+ }
+}
diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/MRBaseClient.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/MRBaseClient.java
new file mode 100644
index 0000000..405ed90
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/MRBaseClient.java
@@ -0,0 +1,392 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.impl;
+
+import java.net.MalformedURLException;
+import java.util.Collection;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.Response;
+
+import org.apache.http.HttpException;
+import org.glassfish.jersey.client.authentication.HttpAuthenticationFeature;
+import org.glassfish.jersey.internal.util.Base64;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.json.JSONTokener;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRClient;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRClientFactory;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.test.clients.ProtocolTypeConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.att.nsa.apiClient.http.CacheUse;
+import com.att.nsa.apiClient.http.HttpClient;
+
+public class MRBaseClient extends HttpClient implements MRClient
+{
+
+ private static final String MR_AUTH_CONSTANT = "X-CambriaAuth";
+ private static final String MR_DATE_CONSTANT = "X-CambriaDate";
+
+ protected MRBaseClient ( Collection<String> hosts ) throws MalformedURLException
+ {
+ super ( ConnectionType.HTTP, hosts, MRConstants.kStdMRServicePort );
+ }
+
+ protected MRBaseClient ( Collection<String> hosts, int stdSvcPort ) throws MalformedURLException {
+ super ( ConnectionType.HTTP,hosts, stdSvcPort);
+
+ fLog = LoggerFactory.getLogger ( this.getClass().getName () );
+ }
+
+ protected MRBaseClient ( Collection<String> hosts, String clientSignature ) throws MalformedURLException
+ {
+ super(ConnectionType.HTTP, hosts, MRConstants.kStdMRServicePort, clientSignature, CacheUse.NONE, 1, 1L, TimeUnit.MILLISECONDS, 32, 32, 600000);
+
+ fLog = LoggerFactory.getLogger ( this.getClass().getName () );
+ }
+
+
+ @Override
+ public void close ()
+ {
+ }
+
+ protected Set<String> jsonArrayToSet ( JSONArray a )
+ {
+ if ( a == null ) return null;
+
+ final TreeSet<String> set = new TreeSet<String> ();
+ for ( int i=0; i<a.length (); i++ )
+ {
+ set.add ( a.getString ( i ));
+ }
+ return set;
+ }
+
+ public void logTo ( Logger log )
+ {
+ fLog = log;
+ replaceLogger ( log );
+ }
+
+ protected Logger getLog ()
+ {
+ return fLog;
+ }
+
+ private Logger fLog;
+
+ public JSONObject post(final String path, final byte[] data, final String contentType, final String username, final String password, final String protocolFlag) throws HttpException, JSONException{
+ if ((null != username && null != password)) {
+ WebTarget target = null;
+
+ Response response = null;
+
+ target = getTarget(path, username, password);
+ String encoding = Base64.encodeAsString(username+":"+password);
+
+
+ response = target.request().header("Authorization", "Basic " + encoding).post(Entity.entity(data, contentType));
+
+ return getResponseDataInJson(response);
+ } else {
+ throw new HttpException("Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty.");
+ }
+ }
+ public String postWithResponse(final String path, final byte[] data, final String contentType, final String username, final String password, final String protocolFlag) throws HttpException, JSONException{
+ String responseData = null;
+ if ((null != username && null != password)) {
+ WebTarget target = null;
+
+ Response response = null;
+
+ target = getTarget(path, username, password);
+ String encoding = Base64.encodeAsString(username+":"+password);
+
+
+ response = target.request().header("Authorization", "Basic " + encoding).post(Entity.entity(data, contentType));
+
+ responseData = response.readEntity(String.class);
+ return responseData;
+ } else {
+ throw new HttpException("Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty.");
+ }
+ }
+ public JSONObject postAuth(final String path, final byte[] data, final String contentType, final String authKey,final String authDate,final String username, final String password, final String protocolFlag) throws HttpException, JSONException{
+ if ((null != username && null != password)) {
+ WebTarget target = null;
+
+ Response response = null;
+ target= getTarget(path,username, password);
+ response = target.request()
+ .header(MR_AUTH_CONSTANT, authKey)
+ .header(MR_DATE_CONSTANT, authDate)
+ .post(Entity.entity(data, contentType));
+
+ return getResponseDataInJson(response);
+ } else {
+ throw new HttpException("Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty.");
+ }
+ }
+ public String postAuthwithResponse(final String path, final byte[] data, final String contentType, final String authKey,final String authDate,final String username, final String password, final String protocolFlag) throws HttpException, JSONException{
+ String responseData = null;
+ if ((null != username && null != password)) {
+ WebTarget target = null;
+
+ Response response = null;
+ target= getTarget(path,username, password);
+ response = target.request()
+ .header(MR_AUTH_CONSTANT, authKey)
+ .header(MR_DATE_CONSTANT, authDate)
+ .post(Entity.entity(data, contentType));
+ responseData = response.readEntity(String.class);
+ return responseData;
+
+ } else {
+ throw new HttpException("Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty.");
+ }
+ }
+
+
+ public JSONObject get(final String path, final String username, final String password, final String protocolFlag) throws HttpException, JSONException {
+ if (null != username && null != password) {
+
+ WebTarget target = null;
+
+ Response response = null;
+ if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
+ target=getTarget(path);
+ response = target.request()
+ .header(MR_AUTH_CONSTANT, username)
+ .header(MR_DATE_CONSTANT, password)
+ .get();
+ } else {
+ target = getTarget(path, username, password);
+ String encoding = Base64.encodeAsString(username+":"+password);
+
+ response = target.request().header("Authorization", "Basic " + encoding).get();
+
+ }
+ return getResponseDataInJson(response);
+ } else {
+ throw new HttpException("Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty.");
+ }
+ }
+
+
+ public String getResponse(final String path, final String username, final String password, final String protocolFlag) throws HttpException, JSONException {
+ String responseData = null;
+ if (null != username && null != password) {
+
+ WebTarget target = null;
+
+ Response response = null;
+ if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
+ target=getTarget(path);
+ response = target.request()
+ .header(MR_AUTH_CONSTANT, username)
+ .header(MR_DATE_CONSTANT, password)
+ .get();
+ } else {
+ target = getTarget(path, username, password);
+ String encoding = Base64.encodeAsString(username+":"+password);
+ response = target.request().header("Authorization", "Basic " + encoding).get();
+ }
+ MRClientFactory.HTTPHeadersMap=response.getHeaders();
+
+ String transactionid=response.getHeaderString("transactionid");
+ if (transactionid!=null && !transactionid.equalsIgnoreCase("")) {
+ fLog.info("TransactionId : " + transactionid);
+ }
+
+ responseData = response.readEntity(String.class);
+ return responseData;
+ } else {
+ throw new HttpException("Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty.");
+ }
+ }
+
+ public JSONObject getAuth(final String path, final String authKey, final String authDate,final String username, final String password, final String protocolFlag) throws HttpException, JSONException {
+ if (null != username && null != password) {
+
+ WebTarget target = null;
+
+ Response response = null;
+ target=getTarget(path, username, password);
+ response = target.request()
+ .header(MR_AUTH_CONSTANT, authKey)
+ .header(MR_DATE_CONSTANT, authDate)
+ .get();
+
+ return getResponseDataInJson(response);
+ } else {
+ throw new HttpException("Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty.");
+ }
+ }
+
+ public String getAuthResponse(final String path, final String authKey, final String authDate,final String username, final String password, final String protocolFlag) throws HttpException, JSONException {
+ String responseData = null;
+ if (null != username && null != password) {
+
+ WebTarget target = null;
+
+ Response response = null;
+ target=getTarget(path, username, password);
+ response = target.request()
+ .header(MR_AUTH_CONSTANT, authKey)
+ .header(MR_DATE_CONSTANT, authDate)
+ .get();
+
+ MRClientFactory.HTTPHeadersMap=response.getHeaders();
+
+ String transactionid=response.getHeaderString("transactionid");
+ if (transactionid!=null && !transactionid.equalsIgnoreCase("")) {
+ fLog.info("TransactionId : " + transactionid);
+ }
+
+ responseData = response.readEntity(String.class);
+ return responseData;
+ } else {
+ throw new HttpException("Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty.");
+ }
+ }
+
+ private WebTarget getTarget(final String path, final String username, final String password) {
+
+ Client client = ClientBuilder.newClient();
+
+
+ // Using UNIVERSAL as it supports both BASIC and DIGEST authentication types.
+ HttpAuthenticationFeature feature = HttpAuthenticationFeature.universal(username, password);
+ client.register(feature);
+
+ return client.target(path);
+ }
+
+
+ private WebTarget getTarget(final String path) {
+
+ Client client = ClientBuilder.newClient();
+ return client.target(path);
+ }
+ private JSONObject getResponseDataInJson(Response response) throws JSONException {
+ try {
+ MRClientFactory.HTTPHeadersMap=response.getHeaders();
+ // fLog.info("DMAAP response status: " + response.getStatus());
+
+
+ //MultivaluedMap<String, Object> headersMap = response.getHeaders();
+ //for(String key : headersMap.keySet()) {
+ String transactionid=response.getHeaderString("transactionid");
+ if (transactionid!=null && !transactionid.equalsIgnoreCase("")) {
+ fLog.info("TransactionId : " + transactionid);
+ }
+
+ /*final String responseData = response.readEntity(String.class);
+ JSONTokener jsonTokener = new JSONTokener(responseData);
+ JSONObject jsonObject = null;
+ final char firstChar = jsonTokener.next();
+ jsonTokener.back();
+ if ('[' == firstChar) {
+ JSONArray jsonArray = new JSONArray(jsonTokener);
+ jsonObject = new JSONObject();
+ jsonObject.put("result", jsonArray);
+ } else {
+ jsonObject = new JSONObject(jsonTokener);
+ }
+
+ return jsonObject;*/
+
+
+ if(response.getStatus()==403) {
+ JSONObject jsonObject = null;
+ jsonObject = new JSONObject();
+ JSONArray jsonArray = new JSONArray();
+ jsonArray.put(response.getEntity());
+ jsonObject.put("result", jsonArray);
+ jsonObject.put("status", response.getStatus());
+ return jsonObject;
+ }
+ String responseData = response.readEntity(String.class);
+
+ JSONTokener jsonTokener = new JSONTokener(responseData);
+ JSONObject jsonObject = null;
+ final char firstChar = jsonTokener.next();
+ jsonTokener.back();
+ if ('[' == firstChar) {
+ JSONArray jsonArray = new JSONArray(jsonTokener);
+ jsonObject = new JSONObject();
+ jsonObject.put("result", jsonArray);
+ jsonObject.put("status", response.getStatus());
+ } else {
+ jsonObject = new JSONObject(jsonTokener);
+ jsonObject.put("status", response.getStatus());
+ }
+
+ return jsonObject;
+ } catch (JSONException excp) {
+ fLog.error("DMAAP - Error reading response data.", excp);
+ return null;
+ }
+
+ }
+
+ public String getHTTPErrorResponseMessage(String responseString){
+
+ String response = null;
+ int beginIndex = 0;
+ int endIndex = 0;
+ if(responseString.contains("<body>")){
+
+ beginIndex = responseString.indexOf("body>")+5;
+ endIndex = responseString.indexOf("</body");
+ response = responseString.substring(beginIndex,endIndex);
+ }
+
+ return response;
+
+ }
+
+ public String getHTTPErrorResponseCode(String responseString){
+
+ String response = null;
+ int beginIndex = 0;
+ int endIndex = 0;
+ if(responseString.contains("<title>")){
+ beginIndex = responseString.indexOf("title>")+6;
+ endIndex = responseString.indexOf("</title");
+ response = responseString.substring(beginIndex,endIndex);
+ }
+
+ return response;
+ }
+
+}
diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/MRBatchPublisher.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/MRBatchPublisher.java
new file mode 100644
index 0000000..28affcc
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/MRBatchPublisher.java
@@ -0,0 +1,485 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.impl;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.MalformedURLException;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+import java.util.zip.GZIPOutputStream;
+
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRBatchingPublisher;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.response.MRPublisherResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.att.nsa.apiClient.http.HttpClient;
+import com.att.nsa.apiClient.http.HttpException;
+
+/**
+ * This is a batching publisher class that allows the client to publish messages
+ * in batches that are limited in terms of size and/or hold time.
+ *
+ * @author author
+ * @deprecated This class's tricky locking doesn't quite work
+ *
+ */
+@Deprecated
+public class MRBatchPublisher implements MRBatchingPublisher
+{
+ public static final long kMinMaxAgeMs = 1;
+
+ /**
+ * Create a batch publisher.
+ *
+ * @param baseUrls the base URLs, like "localhost:8080". This class adds the correct application path.
+ * @param topic the topic to publish to
+ * @param maxBatchSize the maximum size of a batch
+ * @param maxAgeMs the maximum age of a batch
+ */
+ public MRBatchPublisher ( Collection<String> baseUrls, String topic, int maxBatchSize, long maxAgeMs, boolean compress )
+ {
+ if ( maxAgeMs < kMinMaxAgeMs )
+ {
+ fLog.warn ( "Max age in ms is less than the minimum. Overriding to " + kMinMaxAgeMs );
+ maxAgeMs = kMinMaxAgeMs;
+ }
+
+ try {
+ fSender = new Sender ( baseUrls, topic, maxBatchSize, maxAgeMs, compress );
+ } catch (MalformedURLException e) {
+ throw new RuntimeException(e);
+ }
+
+ // FIXME: this strategy needs an overhaul -- why not just run a thread that knows how to wait for
+ // the oldest msg to hit max age? (locking is complicated, but should be do-able)
+ fExec = new ScheduledThreadPoolExecutor ( 1 );
+ fExec.scheduleAtFixedRate ( fSender, 100, 50, TimeUnit.MILLISECONDS );
+ }
+
+ @Override
+ public void setApiCredentials ( String apiKey, String apiSecret )
+ {
+ fSender.setApiCredentials ( apiKey, apiSecret );
+ }
+
+ @Override
+ public void clearApiCredentials ()
+ {
+ fSender.clearApiCredentials ();
+ }
+
+ /**
+ * Send the given message with the given partition
+ * @param partition
+ * @param msg
+ * @throws IOException
+ */
+ @Override
+ public int send ( String partition, String msg ) throws IOException
+ {
+ return send ( new message ( partition, msg ) );
+ }
+ @Override
+ public int send ( String msg ) throws IOException
+ {
+ return send ( new message ( "",msg ) );
+ }
+ /**
+ * Send the given message
+ * @param userMsg a message
+ * @throws IOException
+ */
+ @Override
+ public int send ( message userMsg ) throws IOException
+ {
+ final LinkedList<message> list = new LinkedList<message> ();
+ list.add ( userMsg );
+ return send ( list );
+ }
+
+ /**
+ * Send the given set of messages
+ * @param msgs the set of messages, sent in order of iteration
+ * @return the number of messages in the pending queue (this could actually be less than the size of the given collection, depending on thread timing)
+ * @throws IOException
+ */
+ @Override
+ public int send ( Collection<message> msgs ) throws IOException
+ {
+ if ( msgs.size() > 0 )
+ {
+ fSender.queue ( msgs );
+ }
+ return fSender.size ();
+ }
+
+ @Override
+ public int getPendingMessageCount ()
+ {
+ return fSender.size ();
+ }
+
+ /**
+ * Send any pending messages and close this publisher.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Override
+ public void close ()
+ {
+ try
+ {
+ final List<message> remains = close ( Long.MAX_VALUE, TimeUnit.MILLISECONDS );
+ if ( remains.size() > 0 )
+ {
+ fLog.warn ( "Closing publisher with " + remains.size() + " messages unsent. "
+ + "(Consider using the alternate close method to capture unsent messages in this case.)" );
+ }
+ }
+ catch ( InterruptedException e )
+ {
+ fLog.warn ( "Possible message loss. " + e.getMessage(), e );
+ }
+ catch ( IOException e )
+ {
+ fLog.warn ( "Possible message loss. " + e.getMessage(), e );
+ }
+ }
+
+ public List<message> close ( long time, TimeUnit unit ) throws InterruptedException, IOException
+ {
+ fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy ( false );
+ fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy ( false );
+ fExec.shutdown ();
+
+ final long waitInMs = TimeUnit.MILLISECONDS.convert ( time, unit );
+ final long timeoutAtMs = System.currentTimeMillis () + waitInMs;
+ while ( System.currentTimeMillis () < timeoutAtMs && getPendingMessageCount() > 0 )
+ {
+ fSender.checkSend ( true );
+ Thread.sleep ( 250 );
+ }
+
+ final LinkedList<message> result = new LinkedList<message> ();
+ fSender.drainTo ( result );
+ return result;
+ }
+
+ private final ScheduledThreadPoolExecutor fExec;
+ private final Sender fSender;
+
+ private static class TimestampedMessage extends message
+ {
+ public TimestampedMessage ( message m )
+ {
+ super ( m );
+ timestamp = System.currentTimeMillis ();
+ }
+ public final long timestamp;
+ }
+
+ private Logger fLog = LoggerFactory.getLogger ( MRBatchPublisher.class );
+
+ private class Sender extends MRBaseClient implements Runnable
+ {
+ public Sender ( Collection<String> baseUrls, String topic, int maxBatch, long maxAgeMs, boolean compress ) throws MalformedURLException
+ {
+ super ( baseUrls );
+
+ fNextBatch = new LinkedList<TimestampedMessage> ();
+ fSendingBatch = null;
+ fTopic = topic;
+ fMaxBatchSize = maxBatch;
+ fMaxAgeMs = maxAgeMs;
+ fCompress = compress;
+ fLock = new ReentrantReadWriteLock ();
+ fWriteLock = fLock.writeLock ();
+ fReadLock = fLock.readLock ();
+ fDontSendUntilMs = 0;
+ }
+
+ public void drainTo ( LinkedList<message> list )
+ {
+ fWriteLock.lock ();
+ try
+ {
+ if ( fSendingBatch != null )
+ {
+ list.addAll ( fSendingBatch );
+ }
+ list.addAll ( fNextBatch );
+
+ fSendingBatch = null;
+ fNextBatch.clear ();
+ }
+ finally
+ {
+ fWriteLock.unlock ();
+ }
+ }
+
+ /**
+ * Called periodically by the background executor.
+ */
+ @Override
+ public void run ()
+ {
+ try
+ {
+ checkSend ( false );
+ }
+ catch ( IOException e )
+ {
+ fLog.warn ( "MR background send: " + e.getMessage () );
+ }
+ }
+
+ public int size ()
+ {
+ fReadLock.lock ();
+ try
+ {
+ return fNextBatch.size () + ( fSendingBatch == null ? 0 : fSendingBatch.size () );
+ }
+ finally
+ {
+ fReadLock.unlock ();
+ }
+ }
+
+ /**
+ * Called to queue a message.
+ * @param m
+ * @throws IOException
+ */
+ public void queue ( Collection<message> msgs ) throws IOException
+ {
+ fWriteLock.lock ();
+ try
+ {
+ for ( message userMsg : msgs )
+ {
+ if ( userMsg != null )
+ {
+ fNextBatch.add ( new TimestampedMessage ( userMsg ) );
+ }
+ else
+ {
+ fLog.warn ( "MRBatchPublisher::Sender::queue received a null message." );
+ }
+ }
+ }
+ finally
+ {
+ fWriteLock.unlock();
+ }
+ checkSend ( false );
+ }
+
+ /**
+ * Send a batch if the queue is long enough, or the first pending message is old enough.
+ * @param force
+ * @throws IOException
+ */
+ public void checkSend ( boolean force ) throws IOException
+ {
+ // hold a read lock just long enough to evaluate whether a batch
+ // should be sent
+ boolean shouldSend = false;
+ fReadLock.lock ();
+ try
+ {
+ if ( fNextBatch.size () > 0 )
+ {
+ final long nowMs = System.currentTimeMillis ();
+ shouldSend = ( force || fNextBatch.size() >= fMaxBatchSize );
+ if ( !shouldSend )
+ {
+ final long sendAtMs = fNextBatch.getFirst ().timestamp + fMaxAgeMs;
+ shouldSend = sendAtMs <= nowMs;
+ }
+
+ // however, unless forced, wait after an error
+ shouldSend = force || ( shouldSend && nowMs >= fDontSendUntilMs );
+ }
+ // else: even in 'force', there's nothing to send, so shouldSend=false is fine
+ }
+ finally
+ {
+ fReadLock.unlock ();
+ }
+
+ // if a send is required, acquire a write lock, swap out the next batch,
+ // swap in a fresh batch, and release the lock for the caller to start
+ // filling a batch again. After releasing the lock, send the current
+ // batch. (There could be more messages added between read unlock and
+ // write lock, but that's fine.)
+ if ( shouldSend )
+ {
+ fSendingBatch = null;
+
+ fWriteLock.lock ();
+ try
+ {
+ fSendingBatch = fNextBatch;
+ fNextBatch = new LinkedList<TimestampedMessage> ();
+ }
+ finally
+ {
+ fWriteLock.unlock ();
+ }
+
+ if ( !doSend ( fSendingBatch, this, fTopic, fCompress, fLog ) )
+ {
+ fLog.warn ( "Send failed, rebuilding send queue." );
+
+ // note the time for back-off
+ fDontSendUntilMs = sfWaitAfterError + System.currentTimeMillis ();
+
+ // the send failed. reconstruct the pending queue
+ fWriteLock.lock ();
+ try
+ {
+ final LinkedList<TimestampedMessage> nextGroup = fNextBatch;
+ fNextBatch = fSendingBatch;
+ fNextBatch.addAll ( nextGroup );
+ fSendingBatch = null;
+ fLog.info ( "Send queue rebuilt; " + fNextBatch.size () + " messages to send." );
+ }
+ finally
+ {
+ fWriteLock.unlock ();
+ }
+ }
+ else
+ {
+ fWriteLock.lock ();
+ try
+ {
+ fSendingBatch = null;
+ }
+ finally
+ {
+ fWriteLock.unlock ();
+ }
+ }
+ }
+ }
+
+ private LinkedList<TimestampedMessage> fNextBatch;
+ private LinkedList<TimestampedMessage> fSendingBatch;
+ private final String fTopic;
+ private final int fMaxBatchSize;
+ private final long fMaxAgeMs;
+ private final boolean fCompress;
+ private final ReentrantReadWriteLock fLock;
+ private final WriteLock fWriteLock;
+ private final ReadLock fReadLock;
+ private long fDontSendUntilMs;
+ private static final long sfWaitAfterError = 1000;
+ }
+
+ // this is static so that it's clearly not using any mutable member data outside of a lock
+ private static boolean doSend ( LinkedList<TimestampedMessage> toSend, HttpClient client, String topic, boolean compress, Logger log )
+ {
+ // it's possible for this call to be made with an empty list. in this case, just return.
+ if ( toSend.size() < 1 )
+ {
+ return true;
+ }
+
+ final long nowMs = System.currentTimeMillis ();
+ final String url = MRConstants.makeUrl ( topic );
+
+ log.info ( "sending " + toSend.size() + " msgs to " + url + ". Oldest: " + ( nowMs - toSend.getFirst().timestamp ) + " ms" );
+
+ final ByteArrayOutputStream baseStream = new ByteArrayOutputStream ();
+ try
+ {
+ OutputStream os = baseStream;
+ if ( compress )
+ {
+ os = new GZIPOutputStream ( baseStream );
+ }
+ for ( TimestampedMessage m : toSend )
+ {
+ os.write ( ( "" + m.fPartition.length () ).getBytes() );
+ os.write ( '.' );
+ os.write ( ( "" + m.fMsg.length () ).getBytes() );
+ os.write ( '.' );
+ os.write ( m.fPartition.getBytes() );
+ os.write ( m.fMsg.getBytes() );
+ os.write ( '\n' );
+ }
+ os.close ();
+ }
+ catch ( IOException e )
+ {
+ log.warn ( "Problem writing stream to post: " + e.getMessage () );
+ return false;
+ }
+
+ boolean result = false;
+ final long startMs = System.currentTimeMillis ();
+ try
+ {
+ client.post ( url, compress ?
+ MRFormat.CAMBRIA_ZIP.toString () :
+ MRFormat.CAMBRIA.toString (),
+ baseStream.toByteArray(), false );
+ result = true;
+ }
+ catch ( HttpException e )
+ {
+ log.warn ( "Problem posting to MR: " + e.getMessage() );
+ }
+ catch ( IOException e )
+ {
+ log.warn ( "Problem posting to MR: " + e.getMessage() );
+ }
+
+ log.info ( "MR response (" + (System.currentTimeMillis ()-startMs) + " ms): OK" );
+ return result;
+ }
+
+ @Override
+ public void logTo ( Logger log )
+ {
+ fLog = log;
+ }
+
+ @Override
+ public MRPublisherResponse sendBatchWithResponse() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+}
diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/MRClientVersionInfo.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/MRClientVersionInfo.java
new file mode 100644
index 0000000..3b68363
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/MRClientVersionInfo.java
@@ -0,0 +1,54 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.impl;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+
+public class MRClientVersionInfo
+{
+ public static String getVersion ()
+ {
+ return version;
+ }
+
+ private static final Properties props = new Properties();
+ private static final String version;
+ static
+ {
+ String use = null;
+ try
+ {
+ final InputStream is = MRClientVersionInfo.class.getResourceAsStream ( "/MRClientVersion.properties" );
+ if ( is != null )
+ {
+ props.load ( is );
+ use = props.getProperty ( "MRClientVersion", null );
+ }
+ }
+ catch ( IOException e )
+ {
+ }
+ version = use;
+ }
+}
diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/MRConstants.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/MRConstants.java
new file mode 100644
index 0000000..4039a0b
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/MRConstants.java
@@ -0,0 +1,180 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.impl;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.http.HttpHost;
+
+class MRConstants
+{
+ private static final String PROTOCOL = "http";
+ public static final String context = "/";
+ public static final String kBasePath = "events/";
+ //public static final int kStdMRServicePort = 3904;
+ public static final int kStdMRServicePort = 8080;
+
+ public static String escape ( String s )
+ {
+ try
+ {
+ return URLEncoder.encode ( s, "UTF-8");
+ }
+ catch ( UnsupportedEncodingException e )
+ {
+ throw new RuntimeException ( e );
+ }
+ }
+
+ public static String makeUrl ( String rawTopic )
+ {
+ final String cleanTopic = escape ( rawTopic );
+
+ final StringBuffer url = new StringBuffer().
+ append ( MRConstants.context ).
+ append ( MRConstants.kBasePath ).
+ append ( cleanTopic );
+ return url.toString ();
+ }
+
+ public static String makeUrl ( final String host, final String rawTopic )
+ {
+ final String cleanTopic = escape ( rawTopic );
+
+ final StringBuffer url = new StringBuffer();
+
+ if (!host.startsWith("http") || !host.startsWith("https") ) {
+ url.append( PROTOCOL + "://" );
+ }
+ url.append(host);
+ url.append ( MRConstants.context );
+ url.append ( MRConstants.kBasePath );
+ url.append ( cleanTopic );
+ return url.toString ();
+ }
+
+ public static String makeUrl ( final String host, final String rawTopic, final String transferprotocol,final String parttion )
+ {
+ final String cleanTopic = escape ( rawTopic );
+
+ final StringBuffer url = new StringBuffer();
+
+ if (transferprotocol !=null && !transferprotocol.equals("")) {
+ url.append( transferprotocol + "://" );
+ }else{
+ url.append( PROTOCOL + "://" );
+ }
+ url.append(host);
+ url.append ( MRConstants.context );
+ url.append ( MRConstants.kBasePath );
+ url.append ( cleanTopic );
+ if(parttion!=null && !parttion.equalsIgnoreCase(""))
+ url.append("?partitionKey=").append(parttion);
+ return url.toString ();
+ }
+ public static String makeConsumerUrl ( String topic, String rawConsumerGroup, String rawConsumerId )
+ {
+ final String cleanConsumerGroup = escape ( rawConsumerGroup );
+ final String cleanConsumerId = escape ( rawConsumerId );
+ return MRConstants.context + MRConstants.kBasePath + topic + "/" + cleanConsumerGroup + "/" + cleanConsumerId;
+ }
+
+ /**
+ * Create a list of HttpHosts from an input list of strings. Input strings have
+ * host[:port] as format. If the port section is not provided, the default port is used.
+ *
+ * @param hosts
+ * @return a list of hosts
+ */
+ public static List<HttpHost> createHostsList(Collection<String> hosts)
+ {
+ final ArrayList<HttpHost> convertedHosts = new ArrayList<HttpHost> ();
+ for ( String host : hosts )
+ {
+ if ( host.length () == 0 ) continue;
+ convertedHosts.add ( hostForString ( host ) );
+ }
+ return convertedHosts;
+ }
+
+ /**
+ * Return an HttpHost from an input string. Input string has
+ * host[:port] as format. If the port section is not provided, the default port is used.
+ *
+ * @param hosts
+ * @return a list of hosts
+ */
+ public static HttpHost hostForString ( String host )
+ {
+ if ( host.length() < 1 ) throw new IllegalArgumentException ( "An empty host entry is invalid." );
+
+ String hostPart = host;
+ int port = kStdMRServicePort;
+
+ final int colon = host.indexOf ( ':' );
+ if ( colon == 0 ) throw new IllegalArgumentException ( "Host entry '" + host + "' is invalid." );
+ if ( colon > 0 )
+ {
+ hostPart = host.substring ( 0, colon ).trim();
+
+ final String portPart = host.substring ( colon + 1 ).trim();
+ if ( portPart.length () > 0 )
+ {
+ try
+ {
+ port = Integer.parseInt ( portPart );
+ }
+ catch ( NumberFormatException x )
+ {
+ throw new IllegalArgumentException ( "Host entry '" + host + "' is invalid.", x );
+ }
+ }
+ // else: use default port on "foo:"
+ }
+
+ return new HttpHost ( hostPart, port );
+ }
+
+ public static String makeConsumerUrl(String host, String fTopic, String fGroup, String fId,final String transferprotocol) {
+ final String cleanConsumerGroup = escape ( fGroup );
+ final String cleanConsumerId = escape ( fId );
+
+ StringBuffer url = new StringBuffer();
+
+ if (transferprotocol !=null && !transferprotocol.equals("")) {
+ url.append( transferprotocol + "://" );
+ }else{
+ url.append( PROTOCOL + "://" );
+ }
+
+ url.append(host);
+ url.append(context);
+ url.append(kBasePath);
+ url.append(fTopic + "/" + cleanConsumerGroup + "/" + cleanConsumerId);
+
+ return url.toString();
+ }
+}
diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/MRConsumerImpl.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/MRConsumerImpl.java
new file mode 100644
index 0000000..8f4d777
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/MRConsumerImpl.java
@@ -0,0 +1,709 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.impl;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URLEncoder;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.http.HttpException;
+import org.apache.http.HttpStatus;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.json.JSONTokener;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRClientFactory;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRConsumer;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.response.MRConsumerResponse;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.test.clients.ProtocolTypeConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.att.aft.dme2.api.DME2Client;
+import com.att.aft.dme2.api.DME2Exception;
+
+public class MRConsumerImpl extends MRBaseClient implements MRConsumer
+{
+
+ private static final String SUCCESS_MESSAGE = "Success";
+
+
+ private Logger fLog = LoggerFactory.getLogger ( this.getClass().getName () );
+ public static List<String> stringToList ( String str )
+ {
+ final LinkedList<String> set = new LinkedList<String> ();
+ if ( str != null )
+ {
+ final String[] parts = str.trim ().split ( "," );
+ for ( String part : parts )
+ {
+ final String trimmed = part.trim();
+ if ( trimmed.length () > 0 )
+ {
+ set.add ( trimmed );
+ }
+ }
+ }
+ return set;
+ }
+
+ public MRConsumerImpl ( Collection<String> hostPart, final String topic, final String consumerGroup,
+ final String consumerId, int timeoutMs, int limit, String filter, String apiKey_username, String apiSecret_password ) throws MalformedURLException
+ {
+ this( hostPart, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey_username, apiSecret_password, false );
+ }
+
+ public MRConsumerImpl ( Collection<String> hostPart, final String topic, final String consumerGroup,
+ final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret, boolean allowSelfSignedCerts ) throws MalformedURLException
+ {
+ super ( hostPart, topic + "::" + consumerGroup + "::" + consumerId );
+
+ fTopic = topic;
+ fGroup = consumerGroup;
+ fId = consumerId;
+ fTimeoutMs = timeoutMs;
+ fLimit = limit;
+ fFilter = filter;
+
+ //setApiCredentials ( apiKey, apiSecret );
+ }
+
+ @Override
+ public Iterable<String> fetch () throws IOException,Exception
+ {
+ // fetch with the timeout and limit set in constructor
+ return fetch ( fTimeoutMs, fLimit );
+ }
+
+ @Override
+ public Iterable<String> fetch ( int timeoutMs, int limit ) throws IOException,Exception
+ {
+ final LinkedList<String> msgs = new LinkedList<String> ();
+
+// FIXME: the timeout on the socket needs to be at least as long as the long poll
+// // sanity check for long poll timeout vs. socket read timeout
+// final int maxReasonableTimeoutMs = CambriaSingletonHttpClient.sfSoTimeoutMs * 9/10;
+// if ( timeoutMs > maxReasonableTimeoutMs )
+// {
+// log.warn ( "Long poll time (" + timeoutMs + ") is too high w.r.t. socket read timeout (" +
+// CambriaSingletonHttpClient.sfSoTimeoutMs + "). Reducing long poll timeout to " + maxReasonableTimeoutMs + "." );
+// timeoutMs = maxReasonableTimeoutMs;
+// }
+
+ // final String urlPath = createUrlPath ( timeoutMs, limit );
+
+ //getLog().info ( "UEB GET " + urlPath );
+ try
+ {
+ if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
+ DMEConfigure(timeoutMs, limit);
+ try
+ {
+ //getLog().info ( "Receiving msgs from: " + url+subContextPath );
+ String reply = sender.sendAndWait(timeoutMs+10000L);
+ // System.out.println("Message received = "+reply);
+ final JSONObject o =getResponseDataInJson(reply);
+ //msgs.add(reply);
+ if ( o != null )
+ {
+ final JSONArray a = o.getJSONArray ( "result" );
+ // final int b = o.getInt("status" );
+ //if ( a != null && a.length()>0 )
+ if ( a != null)
+ {
+ for ( int i=0; i<a.length (); i++ )
+ {
+ //msgs.add("DMAAP response status: "+Integer.toString(b));
+ if (a.get(i) instanceof String)
+ msgs.add ( a.getString(i) );
+ else
+ msgs.add ( a.getJSONObject(i).toString() );
+
+
+ }
+ }
+// else if(a != null && a.length()<1){
+// msgs.add ("[]");
+// }
+ }
+ }
+ catch ( JSONException e )
+ {
+ // unexpected response
+ reportProblemWithResponse ();
+ }
+ catch ( HttpException e )
+ {
+ throw new IOException ( e );
+ }
+ }
+
+ if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
+ final String urlPath = createUrlPath (MRConstants.makeConsumerUrl ( host, fTopic, fGroup, fId,props.getProperty("Protocol")), timeoutMs, limit );
+
+
+ try
+ {
+ final JSONObject o = get ( urlPath, username, password, protocolFlag );
+
+ if ( o != null )
+ {
+ final JSONArray a = o.getJSONArray ( "result" );
+ final int b = o.getInt("status" );
+ //if ( a != null && a.length()>0 )
+ if ( a != null)
+ {
+ for ( int i=0; i<a.length (); i++ )
+ {
+ msgs.add("DMAAP response status: "+Integer.toString(b));
+ if (a.get(i) instanceof String)
+ msgs.add ( a.getString(i) );
+ else
+ msgs.add ( a.getJSONObject(i).toString() );
+
+ }
+ }
+// else if(a != null && a.length()<1)
+// {
+// msgs.add ("[]");
+// }
+ }
+ }
+ catch ( JSONException e )
+ {
+ // unexpected response
+ reportProblemWithResponse ();
+ }
+ catch ( HttpException e )
+ {
+ throw new IOException ( e );
+ }
+ }
+
+ if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
+ final String urlPath = createUrlPath (MRConstants.makeConsumerUrl ( host, fTopic, fGroup, fId ,props.getProperty("Protocol")), timeoutMs, limit );
+
+
+ try
+ {
+ final JSONObject o = getAuth(urlPath, authKey, authDate, username, password, protocolFlag );
+ if ( o != null )
+ {
+ final JSONArray a = o.getJSONArray ( "result" );
+ final int b = o.getInt("status" );
+ //if ( a != null && a.length()>0)
+ if ( a != null)
+ {
+ for ( int i=0; i<a.length (); i++ )
+ {
+ msgs.add("DMAAP response status: "+Integer.toString(b));
+ if (a.get(i) instanceof String)
+ msgs.add ( a.getString(i) );
+ else
+ msgs.add ( a.getJSONObject(i).toString() );
+
+ }
+ }
+// else if(a != null && a.length()<1){
+// msgs.add ("[]");
+// }
+ }
+ }
+ catch ( JSONException e )
+ {
+ // unexpected response
+ reportProblemWithResponse ();
+ }
+ catch ( HttpException e )
+ {
+ throw new IOException ( e );
+ }
+
+ }
+
+ } catch ( JSONException e ) {
+ // unexpected response
+ reportProblemWithResponse ();
+ } catch (HttpException e) {
+ throw new IOException(e);
+ } catch (Exception e ) {
+ throw e;
+ }
+
+
+ return msgs;
+ }
+
+ private JSONObject getResponseDataInJson(String response) {
+ try {
+
+
+ //fLog.info("DMAAP response status: " + response.getStatus());
+
+ // final String responseData = response.readEntity(String.class);
+ JSONTokener jsonTokener = new JSONTokener(response);
+ JSONObject jsonObject = null;
+ final char firstChar = jsonTokener.next();
+ jsonTokener.back();
+ if ('[' == firstChar) {
+ JSONArray jsonArray = new JSONArray(jsonTokener);
+ jsonObject = new JSONObject();
+ jsonObject.put("result", jsonArray);
+ } else {
+ jsonObject = new JSONObject(jsonTokener);
+ }
+
+ return jsonObject;
+ } catch (JSONException excp) {
+ // fLog.error("DMAAP - Error reading response data.", excp);
+ return null;
+ }
+
+
+
+}
+
+ private JSONObject getResponseDataInJsonWithResponseReturned(String response) {
+ JSONTokener jsonTokener = new JSONTokener(response);
+ JSONObject jsonObject = null;
+ final char firstChar = jsonTokener.next();
+ jsonTokener.back();
+ if(null != response && response.length()==0){
+ return null;
+ }
+
+ if ('[' == firstChar) {
+ JSONArray jsonArray = new JSONArray(jsonTokener);
+ jsonObject = new JSONObject();
+ jsonObject.put("result", jsonArray);
+ } else if('{' == firstChar){
+ return null;
+ } else if('<' == firstChar){
+ return null;
+ }else{
+ jsonObject = new JSONObject(jsonTokener);
+ }
+
+ return jsonObject;
+
+ }
+
+ private final String fTopic;
+ private final String fGroup;
+ private final String fId;
+ private final int fTimeoutMs;
+ private final int fLimit;
+ private String fFilter;
+ private String username;
+ private String password;
+ private String host;
+ private String latitude;
+ private String longitude;
+ private String version;
+ private String serviceName;
+ private String env;
+ private String partner;
+ private String routeOffer;
+ private String subContextPath;
+ private String protocol;
+ private String methodType;
+ private String url;
+ private String dmeuser;
+ private String dmepassword;
+ private String contenttype;
+ private DME2Client sender;
+ public String protocolFlag = ProtocolTypeConstants.DME2.getValue();
+ public String consumerFilePath;
+ private String authKey;
+ private String authDate;
+ private Properties props;
+ private HashMap<String, String> DMETimeOuts;
+ private String handlers;
+ public static String routerFilePath;
+ public static String getRouterFilePath() {
+ return routerFilePath;
+ }
+
+ public static void setRouterFilePath(String routerFilePath) {
+ MRSimplerBatchPublisher.routerFilePath = routerFilePath;
+ }
+ public String getConsumerFilePath() {
+ return consumerFilePath;
+ }
+
+ public void setConsumerFilePath(String consumerFilePath) {
+ this.consumerFilePath = consumerFilePath;
+ }
+
+ public String getProtocolFlag() {
+ return protocolFlag;
+ }
+
+ public void setProtocolFlag(String protocolFlag) {
+ this.protocolFlag = protocolFlag;
+ }
+
+ private void DMEConfigure(int timeoutMs, int limit)throws IOException,DME2Exception, URISyntaxException{
+ latitude = props.getProperty("Latitude");
+ longitude = props.getProperty("Longitude");
+ version = props.getProperty("Version");
+ serviceName = props.getProperty("ServiceName");
+ env = props.getProperty("Environment");
+ partner = props.getProperty("Partner");
+ routeOffer = props.getProperty("routeOffer");
+
+ subContextPath=props.getProperty("SubContextPath")+fTopic+"/"+fGroup+"/"+fId;
+ // subContextPath=createUrlPath (subContextPath, timeoutMs, limit);
+ //if (timeoutMs != -1) subContextPath=createUrlPath (subContextPath, timeoutMs);
+
+ protocol = props.getProperty("Protocol");
+ methodType = props.getProperty("MethodType");
+ dmeuser = props.getProperty("username");
+ dmepassword = props.getProperty("password");
+ contenttype = props.getProperty("contenttype");
+ handlers = props.getProperty("sessionstickinessrequired");
+ //url =protocol+"://DME2SEARCH/"+ "service="+serviceName+"/"+"version="+version+"/"+"envContext="+env+"/"+"partner="+partner;
+ // url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&routeOffer="+partner;
+
+ /**
+ * Changes to DME2Client url to use Partner for auto failover between data centers
+ * When Partner value is not provided use the routeOffer value for auto failover within a cluster
+ */
+
+ String preferredRouteKey = readRoute("preferredRouteKey");
+
+ if (partner != null && !partner.isEmpty() && preferredRouteKey != null && !preferredRouteKey.isEmpty())
+ {
+ url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&partner="+partner+"&routeoffer="+preferredRouteKey;
+ }else if (partner != null && !partner.isEmpty())
+ {
+ url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&partner="+partner;
+ }
+ else if (routeOffer!=null && !routeOffer.isEmpty())
+ {
+ url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&routeoffer="+routeOffer;
+ }
+
+ //fLog.info("url :"+url);
+
+ if(timeoutMs != -1 )url=url+"&timeout="+timeoutMs;
+ if(limit != -1 )url=url+"&limit="+limit;
+
+ DMETimeOuts = new HashMap<String, String>();
+ DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
+ DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS"));
+ DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
+ DMETimeOuts.put("Content-Type", contenttype);
+ System.setProperty("AFT_LATITUDE", latitude);
+ System.setProperty("AFT_LONGITUDE", longitude);
+ System.setProperty("AFT_ENVIRONMENT",props.getProperty("AFT_ENVIRONMENT"));
+ // System.setProperty("DME2.DEBUG", "true");
+
+ //SSL changes
+ System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS",
+ "SSLv3,TLSv1,TLSv1.1");
+ System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
+ System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
+ //SSL changes
+
+ sender = new DME2Client(new URI(url), timeoutMs+10000L);
+ sender.setAllowAllHttpReturnCodes(true);
+ sender.setMethod(methodType);
+ sender.setSubContext(subContextPath);
+ if(dmeuser != null && dmepassword != null){
+ sender.setCredentials(dmeuser, dmepassword);
+ //System.out.println(dmepassword);
+ }
+ sender.setHeaders(DMETimeOuts);
+ sender.setPayload("");
+
+ if(handlers.equalsIgnoreCase("yes")){
+ sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
+ sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
+ sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON"));
+ }else{
+ sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.dme.client.HeaderReplyHandler");
+ }
+ /* HeaderReplyHandler headerhandler= new HeaderReplyHandler();
+ sender.setReplyHandler(headerhandler);*/
+// } catch (DME2Exception x) {
+// getLog().warn(x.getMessage(), x);
+// System.out.println("XXXXXXXXXXXX"+x);
+// } catch (URISyntaxException x) {
+// System.out.println(x);
+// getLog().warn(x.getMessage(), x);
+// } catch (Exception x) {
+// System.out.println("XXXXXXXXXXXX"+x);
+// getLog().warn(x.getMessage(), x);
+// }
+ }
+ public Properties getProps() {
+ return props;
+ }
+
+ public void setProps(Properties props) {
+ this.props = props;
+ }
+
+ protected String createUrlPath (String url, int timeoutMs , int limit ) throws IOException
+ {
+ final StringBuffer contexturl= new StringBuffer(url);
+ // final StringBuffer url = new StringBuffer ( CambriaConstants.makeConsumerUrl ( host, fTopic, fGroup, fId ) );
+ final StringBuffer adds = new StringBuffer ();
+ if ( timeoutMs > -1 ) adds.append ( "timeout=" ).append ( timeoutMs );
+ if ( limit > -1 )
+ {
+ if ( adds.length () > 0 )
+ {
+ adds.append ( "&" );
+ }
+ adds.append ( "limit=" ).append ( limit );
+ }
+ if ( fFilter != null && fFilter.length () > 0 )
+ {
+ try {
+ if ( adds.length () > 0 )
+ {
+ adds.append ( "&" );
+ }
+ adds.append("filter=").append(URLEncoder.encode(fFilter, "UTF-8"));
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e.getMessage() + "....say whaaaat?!");
+ }
+ }
+ if ( adds.length () > 0 )
+ {
+ contexturl.append ( "?" ).append ( adds.toString () );
+ }
+
+ //sender.setSubContext(url.toString());
+ return contexturl.toString ();
+ }
+
+ public String getUsername() {
+ return username;
+ }
+
+ public void setUsername(String username) {
+ this.username = username;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public String getAuthKey() {
+ return authKey;
+ }
+
+ public void setAuthKey(String authKey) {
+ this.authKey = authKey;
+ }
+
+ public String getAuthDate() {
+ return authDate;
+ }
+
+ public void setAuthDate(String authDate) {
+ this.authDate = authDate;
+ }
+
+ public String getfFilter() {
+ return fFilter;
+ }
+
+ public void setfFilter(String fFilter) {
+ this.fFilter = fFilter;
+ }
+
+ private String readRoute(String routeKey) {
+
+ try {
+
+ MRClientFactory.prop.load(new FileReader(new File (MRClientFactory.routeFilePath)));
+
+ } catch (Exception ex) {
+ fLog.error("Reply Router Error " + ex.toString() );
+ }
+ String routeOffer = MRClientFactory.prop.getProperty(routeKey);
+ return routeOffer;
+ }
+
+ @Override
+ public MRConsumerResponse fetchWithReturnConsumerResponse() {
+
+ // fetch with the timeout and limit set in constructor
+ return fetchWithReturnConsumerResponse(fTimeoutMs, fLimit);
+ }
+
+ @Override
+ public MRConsumerResponse fetchWithReturnConsumerResponse(int timeoutMs,
+ int limit) {
+ final LinkedList<String> msgs = new LinkedList<String>();
+ MRConsumerResponse mrConsumerResponse = new MRConsumerResponse();
+ try {
+ if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(
+ protocolFlag)) {
+ DMEConfigure(timeoutMs, limit);
+
+ String reply = sender.sendAndWait(timeoutMs + 10000L);
+
+ final JSONObject o = getResponseDataInJsonWithResponseReturned(reply);
+
+ if (o != null) {
+ final JSONArray a = o.getJSONArray("result");
+
+ if (a != null) {
+ for (int i = 0; i < a.length(); i++) {
+ if (a.get(i) instanceof String)
+ msgs.add(a.getString(i));
+ else
+ msgs.add(a.getJSONObject(i).toString());
+
+ }
+ }
+
+ }
+ createMRConsumerResponse(reply, mrConsumerResponse);
+ }
+
+ if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(
+ protocolFlag)) {
+ final String urlPath = createUrlPath(
+ MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId,
+ props.getProperty("Protocol")), timeoutMs,
+ limit);
+
+ String response = getResponse(urlPath, username, password,
+ protocolFlag);
+
+ final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
+
+ if (o != null) {
+ final JSONArray a = o.getJSONArray("result");
+
+ if (a != null) {
+ for (int i = 0; i < a.length(); i++) {
+ if (a.get(i) instanceof String)
+ msgs.add(a.getString(i));
+ else
+ msgs.add(a.getJSONObject(i).toString());
+
+ }
+ }
+
+ }
+ createMRConsumerResponse(response, mrConsumerResponse);
+ }
+
+ if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(
+ protocolFlag)) {
+ final String urlPath = createUrlPath(
+ MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId,
+ props.getProperty("Protocol")), timeoutMs,
+ limit);
+
+ String response = getAuthResponse(urlPath, authKey, authDate,
+ username, password, protocolFlag);
+ final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
+ if (o != null) {
+ final JSONArray a = o.getJSONArray("result");
+
+ if (a != null) {
+ for (int i = 0; i < a.length(); i++) {
+ if (a.get(i) instanceof String)
+ msgs.add(a.getString(i));
+ else
+ msgs.add(a.getJSONObject(i).toString());
+
+ }
+ }
+
+ }
+ createMRConsumerResponse(response, mrConsumerResponse);
+ }
+
+
+
+ } catch (JSONException e) {
+ mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
+ mrConsumerResponse.setResponseMessage(e.getMessage());
+ } catch (HttpException e) {
+ mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
+ mrConsumerResponse.setResponseMessage(e.getMessage());
+ }catch(DME2Exception e){
+ mrConsumerResponse.setResponseCode(e.getErrorCode());
+ mrConsumerResponse.setResponseMessage(e.getErrorMessage());
+ }catch (Exception e) {
+ mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
+ mrConsumerResponse.setResponseMessage(e.getMessage());
+ }
+ mrConsumerResponse.setActualMessages(msgs);
+ return mrConsumerResponse;
+ }
+
+ private void createMRConsumerResponse(String reply, MRConsumerResponse mrConsumerResponse) {
+
+ if(reply.startsWith("{")){
+ JSONObject jObject = new JSONObject(reply);
+ String message = jObject.getString("message");
+ int status = jObject.getInt("status");
+
+ mrConsumerResponse.setResponseCode(Integer.toString(status));
+
+ if(null != message){
+ mrConsumerResponse.setResponseMessage(message);
+ }
+ }else if (reply.startsWith("<")){
+ mrConsumerResponse.setResponseCode(getHTTPErrorResponseCode(reply));
+ mrConsumerResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));
+ }else{
+ mrConsumerResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
+ mrConsumerResponse.setResponseMessage(SUCCESS_MESSAGE);
+ }
+
+ }
+
+
+}
diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/MRFormat.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/MRFormat.java
new file mode 100644
index 0000000..07128ed
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/MRFormat.java
@@ -0,0 +1,49 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.impl;
+
+enum MRFormat
+{
+ /**
+ * Messages are sent using MR's message format.
+ */
+ CAMBRIA
+ {
+ public String toString() { return "application/cambria"; }
+ },
+
+ /**
+ * Messages are sent using MR's message format with compression.
+ */
+ CAMBRIA_ZIP
+ {
+ public String toString() { return "application/cambria-zip"; }
+ },
+
+ /**
+ * messages are sent as simple JSON objects.
+ */
+ JSON
+ {
+ public String toString() { return "application/json"; }
+ }
+}
diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/MRMetaClient.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/MRMetaClient.java
new file mode 100644
index 0000000..90b50f6
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/MRMetaClient.java
@@ -0,0 +1,260 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.impl;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.util.Collection;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRIdentityManager;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRTopicManager;
+
+import com.att.nsa.apiClient.credentials.ApiCredential;
+import com.att.nsa.apiClient.http.HttpException;
+import com.att.nsa.apiClient.http.HttpObjectNotFoundException;
+
+public class MRMetaClient extends MRBaseClient implements MRTopicManager, MRIdentityManager
+{
+ public MRMetaClient ( Collection<String> baseUrls ) throws MalformedURLException
+ {
+ super ( baseUrls );
+ }
+
+ @Override
+ public Set<String> getTopics () throws IOException
+ {
+ final TreeSet<String> set = new TreeSet<String> ();
+ try
+ {
+ final JSONObject topicSet = get ( "/topics" );
+ final JSONArray a = topicSet.getJSONArray ( "topics" );
+ for ( int i=0; i<a.length (); i++ )
+ {
+ set.add ( a.getString ( i ) );
+ }
+ }
+ catch ( HttpObjectNotFoundException e )
+ {
+ getLog().warn ( "No /topics endpoint on service." );
+ }
+ catch ( JSONException e )
+ {
+ getLog().warn ( "Bad /topics result from service." );
+ }
+ catch ( HttpException e )
+ {
+ throw new IOException ( e );
+ }
+ return set;
+ }
+
+ @Override
+ public TopicInfo getTopicMetadata ( String topic ) throws HttpObjectNotFoundException, IOException
+ {
+ try
+ {
+ final JSONObject topicData = get ( "/topics/" + MRConstants.escape ( topic ) );
+ return new TopicInfo ()
+ {
+ @Override
+ public String getOwner ()
+ {
+ return topicData.optString ( "owner", null );
+ }
+
+ @Override
+ public String getDescription ()
+ {
+ return topicData.optString ( "description", null );
+ }
+
+ @Override
+ public Set<String> getAllowedProducers ()
+ {
+ final JSONObject acl = topicData.optJSONObject ( "writerAcl" );
+ if ( acl != null && acl.optBoolean ( "enabled", true ) )
+ {
+ return jsonArrayToSet ( acl.optJSONArray ( "users" ) );
+ }
+ return null;
+ }
+
+ @Override
+ public Set<String> getAllowedConsumers ()
+ {
+ final JSONObject acl = topicData.optJSONObject ( "readerAcl" );
+ if ( acl != null && acl.optBoolean ( "enabled", true ) )
+ {
+ return jsonArrayToSet ( acl.optJSONArray ( "users" ) );
+ }
+ return null;
+ }
+ };
+ }
+ catch ( JSONException e )
+ {
+ throw new IOException ( e );
+ }
+ catch ( HttpException e )
+ {
+ throw new IOException ( e );
+ }
+ }
+
+ @Override
+ public void createTopic ( String topicName, String topicDescription, int partitionCount, int replicationCount ) throws HttpException, IOException
+ {
+ final JSONObject o = new JSONObject ();
+ o.put ( "topicName", topicName );
+ o.put ( "topicDescription", topicDescription );
+ o.put ( "partitionCount", partitionCount );
+ o.put ( "replicationCount", replicationCount );
+ post ( "/topics/create", o, false );
+ }
+
+ @Override
+ public void deleteTopic ( String topic ) throws HttpException, IOException
+ {
+ delete ( "/topics/" + MRConstants.escape ( topic ) );
+ }
+
+ @Override
+ public boolean isOpenForProducing ( String topic ) throws HttpObjectNotFoundException, IOException
+ {
+ return null == getAllowedProducers ( topic );
+ }
+
+ @Override
+ public Set<String> getAllowedProducers ( String topic ) throws HttpObjectNotFoundException, IOException
+ {
+ return getTopicMetadata ( topic ).getAllowedProducers ();
+ }
+
+ @Override
+ public void allowProducer ( String topic, String apiKey ) throws HttpObjectNotFoundException, HttpException, IOException
+ {
+ put ( "/topics/" + MRConstants.escape ( topic ) + "/producers/" + MRConstants.escape ( apiKey ), new JSONObject() );
+ }
+
+ @Override
+ public void revokeProducer ( String topic, String apiKey ) throws HttpException, IOException
+ {
+ delete ( "/topics/" + MRConstants.escape ( topic ) + "/producers/" + MRConstants.escape ( apiKey ) );
+ }
+
+ @Override
+ public boolean isOpenForConsuming ( String topic ) throws HttpObjectNotFoundException, IOException
+ {
+ return null == getAllowedConsumers ( topic );
+ }
+
+ @Override
+ public Set<String> getAllowedConsumers ( String topic ) throws HttpObjectNotFoundException, IOException
+ {
+ return getTopicMetadata ( topic ).getAllowedConsumers ();
+ }
+
+ @Override
+ public void allowConsumer ( String topic, String apiKey ) throws HttpObjectNotFoundException, HttpException, IOException
+ {
+ put ( "/topics/" + MRConstants.escape ( topic ) + "/consumers/" + MRConstants.escape ( apiKey ), new JSONObject() );
+ }
+
+ @Override
+ public void revokeConsumer ( String topic, String apiKey ) throws HttpException, IOException
+ {
+ delete ( "/topics/" + MRConstants.escape ( topic ) + "/consumers/" + MRConstants.escape ( apiKey ) );
+ }
+
+ @Override
+ public ApiCredential createApiKey ( String email, String description ) throws HttpException, MRApiException, IOException
+ {
+ try
+ {
+ final JSONObject o = new JSONObject ();
+ o.put ( "email", email );
+ o.put ( "description", description );
+ final JSONObject reply = post ( "/apiKeys/create", o, true );
+ return new ApiCredential ( reply.getString ( "key" ), reply.getString ( "secret" ) );
+ }
+ catch ( JSONException e )
+ {
+ // the response doesn't meet our expectation
+ throw new MRApiException ( "The API key response is incomplete.", e );
+ }
+ }
+
+ @Override
+ public ApiKey getApiKey ( String apiKey ) throws HttpObjectNotFoundException, HttpException, IOException
+ {
+ final JSONObject keyEntry = get ( "/apiKeys/" + MRConstants.escape ( apiKey ) );
+ if ( keyEntry == null )
+ {
+ return null;
+ }
+
+ return new ApiKey ()
+ {
+ @Override
+ public String getEmail ()
+ {
+ final JSONObject aux = keyEntry.optJSONObject ( "aux" );
+ if ( aux != null )
+ {
+ return aux.optString ( "email" );
+ }
+ return null;
+ }
+
+ @Override
+ public String getDescription ()
+ {
+ final JSONObject aux = keyEntry.optJSONObject ( "aux" );
+ if ( aux != null )
+ {
+ return aux.optString ( "description" );
+ }
+ return null;
+ }
+ };
+ }
+
+ @Override
+ public void updateCurrentApiKey ( String email, String description ) throws HttpObjectNotFoundException, HttpException, IOException
+ {
+ final JSONObject o = new JSONObject ();
+ if ( email != null ) o.put ( "email", email );
+ if ( description != null ) o.put ( "description", description );
+ patch ( "/apiKeys/" + MRConstants.escape ( getCurrentApiKey() ), o );
+ }
+
+ @Override
+ public void deleteCurrentApiKey () throws HttpException, IOException
+ {
+ delete ( "/apiKeys/" + MRConstants.escape ( getCurrentApiKey() ) );
+ }
+}
diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/MRSimplerBatchPublisher.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/MRSimplerBatchPublisher.java
new file mode 100644
index 0000000..10eef5e
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/impl/MRSimplerBatchPublisher.java
@@ -0,0 +1,939 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.impl;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.zip.GZIPOutputStream;
+
+import javax.ws.rs.core.MultivaluedMap;
+
+import org.apache.http.HttpException;
+import org.apache.http.HttpStatus;
+import org.json.JSONArray;
+import org.json.JSONObject;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.HostSelector;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRBatchingPublisher;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.response.MRPublisherResponse;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.test.clients.ProtocolTypeConstants;
+
+import com.att.aft.dme2.api.DME2Client;
+import com.att.aft.dme2.api.DME2Exception;
+
+public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingPublisher
+{
+ public static class Builder
+ {
+ public Builder ()
+ {
+ }
+
+ public Builder againstUrls ( Collection<String> baseUrls )
+ {
+ fUrls = baseUrls;
+ return this;
+ }
+
+ public Builder onTopic ( String topic )
+ {
+ fTopic = topic;
+ return this;
+ }
+
+ public Builder batchTo ( int maxBatchSize, long maxBatchAgeMs )
+ {
+ fMaxBatchSize = maxBatchSize;
+ fMaxBatchAgeMs = maxBatchAgeMs;
+ return this;
+ }
+
+ public Builder compress ( boolean compress )
+ {
+ fCompress = compress;
+ return this;
+ }
+
+ public Builder httpThreadTime ( int threadOccuranceTime )
+ {
+ this.threadOccuranceTime = threadOccuranceTime;
+ return this;
+ }
+
+ public Builder allowSelfSignedCertificates( boolean allowSelfSignedCerts )
+ {
+ fAllowSelfSignedCerts = allowSelfSignedCerts;
+ return this;
+ }
+
+ public Builder withResponse ( boolean withResponse)
+ {
+ fWithResponse = withResponse;
+ return this;
+ }
+ public MRSimplerBatchPublisher build ()
+ {
+ if(!fWithResponse)
+ {
+ try {
+ return new MRSimplerBatchPublisher ( fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress, fAllowSelfSignedCerts,threadOccuranceTime);
+ } catch (MalformedURLException e) {
+ throw new RuntimeException(e);
+ }
+ } else
+ {
+ try {
+ return new MRSimplerBatchPublisher ( fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress, fAllowSelfSignedCerts, fMaxBatchSize);
+ } catch (MalformedURLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ }
+
+ private Collection<String> fUrls;
+ private String fTopic;
+ private int fMaxBatchSize = 100;
+ private long fMaxBatchAgeMs = 1000;
+ private boolean fCompress = false;
+ private int threadOccuranceTime = 50;
+ private boolean fAllowSelfSignedCerts = false;
+ private boolean fWithResponse = false;
+
+ };
+
+ @Override
+ public int send ( String partition, String msg )
+ {
+ return send ( new message ( partition, msg ) );
+ }
+ @Override
+ public int send ( String msg )
+ {
+ return send ( new message ( null, msg ) );
+ }
+
+
+ @Override
+ public int send ( message msg )
+ {
+ final LinkedList<message> list = new LinkedList<message> ();
+ list.add ( msg );
+ return send ( list );
+ }
+
+
+
+ @Override
+ public synchronized int send ( Collection<message> msgs )
+ {
+ if ( fClosed )
+ {
+ throw new IllegalStateException ( "The publisher was closed." );
+ }
+
+ for ( message userMsg : msgs )
+ {
+ fPending.add ( new TimestampedMessage ( userMsg ) );
+ }
+ return getPendingMessageCount ();
+ }
+
+ @Override
+ public synchronized int getPendingMessageCount ()
+ {
+ return fPending.size ();
+ }
+
+ @Override
+ public void close ()
+ {
+ try
+ {
+ final List<message> remains = close ( Long.MAX_VALUE, TimeUnit.MILLISECONDS );
+ if ( remains.size() > 0 )
+ {
+ getLog().warn ( "Closing publisher with " + remains.size() + " messages unsent. "
+ + "Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close." );
+ }
+ }
+ catch ( InterruptedException e )
+ {
+ getLog().warn ( "Possible message loss. " + e.getMessage(), e );
+ }
+ catch ( IOException e )
+ {
+ getLog().warn ( "Possible message loss. " + e.getMessage(), e );
+ }
+ }
+
+ @Override
+ public List<message> close ( long time, TimeUnit unit ) throws IOException, InterruptedException
+ {
+ synchronized ( this )
+ {
+ fClosed = true;
+
+ // stop the background sender
+ fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy ( false );
+ fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy ( false );
+ fExec.shutdown ();
+ }
+
+ final long now = Clock.now ();
+ final long waitInMs = TimeUnit.MILLISECONDS.convert ( time, unit );
+ final long timeoutAtMs = now + waitInMs;
+
+ while ( Clock.now() < timeoutAtMs && getPendingMessageCount() > 0 )
+ {
+ send ( true );
+ Thread.sleep ( 250 );
+ }
+
+ synchronized ( this )
+ {
+ final LinkedList<message> result = new LinkedList<message> ();
+ fPending.drainTo ( result );
+ return result;
+ }
+ }
+
+ /**
+ * Possibly send a batch to the MR server. This is called by the background thread
+ * and the close() method
+ *
+ * @param force
+ */
+ private synchronized void send ( boolean force )
+ {
+ if ( force || shouldSendNow () )
+ {
+ if ( !sendBatch () )
+ {
+ getLog().warn ( "Send failed, " + fPending.size() + " message to send." );
+
+ // note the time for back-off
+ fDontSendUntilMs = sfWaitAfterError + Clock.now ();
+ }
+ }
+ }
+
+ private synchronized boolean shouldSendNow ()
+ {
+ boolean shouldSend = false;
+ if ( fPending.size () > 0 )
+ {
+ final long nowMs = Clock.now ();
+
+ shouldSend = ( fPending.size() >= fMaxBatchSize );
+ if ( !shouldSend )
+ {
+ final long sendAtMs = fPending.peek().timestamp + fMaxBatchAgeMs;
+ shouldSend = sendAtMs <= nowMs;
+ }
+
+ // however, wait after an error
+ shouldSend = shouldSend && nowMs >= fDontSendUntilMs;
+ }
+ return shouldSend;
+ }
+
+ private synchronized boolean sendBatch ()
+ {
+ // it's possible for this call to be made with an empty list. in this case, just return.
+ if ( fPending.size() < 1 )
+ {
+ return true;
+ }
+
+ final long nowMs = Clock.now ();
+
+ host = this.fHostSelector.selectBaseHost();
+
+ final String httpurl = MRConstants.makeUrl ( host, fTopic,props.getProperty("Protocol"),props.getProperty("partition") );
+
+
+ try
+ {
+ /*final String contentType =
+ fCompress ?
+ MRFormat.CAMBRIA_ZIP.toString () :
+ MRFormat.CAMBRIA.toString ()
+ ;*/
+
+ final ByteArrayOutputStream baseStream = new ByteArrayOutputStream ();
+ OutputStream os = baseStream;
+ final String contentType = props.getProperty("contenttype");
+ if(contentType.equalsIgnoreCase("application/json")){
+ JSONArray jsonArray = new JSONArray();
+ for ( TimestampedMessage m : fPending )
+ {
+ JSONObject jsonObject = new JSONObject(m.fMsg);
+
+ jsonArray.put(jsonObject);
+
+ }
+ os.write (jsonArray.toString().getBytes() );
+ os.close();
+
+ }else if (contentType.equalsIgnoreCase("text/plain")){
+ for ( TimestampedMessage m : fPending )
+ {
+ os.write ( m.fMsg.getBytes() );
+ os.write ( '\n' );
+ }
+ os.close ();
+ } else if (contentType.equalsIgnoreCase("application/cambria") || (contentType.equalsIgnoreCase("application/cambria-zip"))){
+ if ( contentType.equalsIgnoreCase("application/cambria-zip") )
+ {
+ os = new GZIPOutputStream ( baseStream );
+ }
+ for ( TimestampedMessage m : fPending )
+ {
+
+ os.write ( ( "" + m.fPartition.length () ).getBytes() );
+ os.write ( '.' );
+ os.write ( ( "" + m.fMsg.length () ).getBytes() );
+ os.write ( '.' );
+ os.write ( m.fPartition.getBytes() );
+ os.write ( m.fMsg.getBytes() );
+ os.write ( '\n' );
+ }
+ os.close ();
+ }else{
+ for ( TimestampedMessage m : fPending )
+ {
+ os.write ( m.fMsg.getBytes() );
+
+ }
+ os.close ();
+ }
+
+
+
+ final long startMs = Clock.now ();
+ if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
+
+
+ DME2Configue();
+
+ Thread.sleep(5);
+ getLog().info ( "sending " + fPending.size() + " msgs to " + url+subContextPath + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" );
+ sender.setPayload(os.toString());
+ String dmeResponse = sender.sendAndWait(5000L);
+
+ final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):"
+ + dmeResponse.toString();
+ getLog().info(logLine);
+ fPending.clear();
+ return true;
+ }
+
+ if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
+ getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" );
+ final JSONObject result = postAuth(httpurl, baseStream.toByteArray(), contentType, authKey, authDate, username, password,protocolFlag);
+ //System.out.println(result.getInt("status"));
+ //Here we are checking for error response. If HTTP status
+ //code is not within the http success response code
+ //then we consider this as error and return false
+ if(result.getInt("status") < 200 || result.getInt("status") > 299) {
+ return false;
+ }
+ final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
+ getLog().info(logLine);
+ fPending.clear();
+ return true;
+ }
+
+ if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
+ getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" );
+ final JSONObject result = post(httpurl, baseStream.toByteArray(), contentType, username, password, protocolFlag);
+
+
+ //System.out.println(result.getInt("status"));
+ //Here we are checking for error response. If HTTP status
+ //code is not within the http success response code
+ //then we consider this as error and return false
+ if(result.getInt("status") < 200 || result.getInt("status") > 299) {
+ return false;
+ }
+ final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
+ getLog().info(logLine);
+ fPending.clear();
+ return true;
+ }
+ }
+ catch ( IllegalArgumentException x ) {
+ getLog().warn ( x.getMessage(), x );
+ } catch ( IOException x ) {
+ getLog().warn ( x.getMessage(), x );
+ } catch (HttpException x) {
+ getLog().warn ( x.getMessage(), x );
+ } catch (Exception x) {
+ getLog().warn(x.getMessage(), x);
+ }
+ return false;
+ }
+
+ public synchronized MRPublisherResponse sendBatchWithResponse ()
+ {
+ // it's possible for this call to be made with an empty list. in this case, just return.
+ if ( fPending.size() < 1 )
+ {
+ pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
+ pubResponse.setResponseMessage("No Messages to send");
+ return pubResponse;
+ }
+
+ final long nowMs = Clock.now ();
+
+ host = this.fHostSelector.selectBaseHost();
+
+ final String httpurl = MRConstants.makeUrl ( host, fTopic,props.getProperty("Protocol"),props.getProperty("partition") );
+ OutputStream os=null;
+ try
+ {
+
+ final ByteArrayOutputStream baseStream = new ByteArrayOutputStream ();
+ os = baseStream;
+ final String contentType = props.getProperty("contenttype");
+ if(contentType.equalsIgnoreCase("application/json")){
+ JSONArray jsonArray = new JSONArray();
+ for ( TimestampedMessage m : fPending )
+ {
+ JSONObject jsonObject = new JSONObject(m.fMsg);
+
+ jsonArray.put(jsonObject);
+
+ }
+ os.write (jsonArray.toString().getBytes() );
+ }else if (contentType.equalsIgnoreCase("text/plain")){
+ for ( TimestampedMessage m : fPending )
+ {
+ os.write ( m.fMsg.getBytes() );
+ os.write ( '\n' );
+ }
+ } else if (contentType.equalsIgnoreCase("application/cambria") || (contentType.equalsIgnoreCase("application/cambria-zip"))){
+ if ( contentType.equalsIgnoreCase("application/cambria-zip") )
+ {
+ os = new GZIPOutputStream ( baseStream );
+ }
+ for ( TimestampedMessage m : fPending )
+ {
+
+ os.write ( ( "" + m.fPartition.length () ).getBytes() );
+ os.write ( '.' );
+ os.write ( ( "" + m.fMsg.length () ).getBytes() );
+ os.write ( '.' );
+ os.write ( m.fPartition.getBytes() );
+ os.write ( m.fMsg.getBytes() );
+ os.write ( '\n' );
+ }
+ os.close ();
+ }else{
+ for ( TimestampedMessage m : fPending )
+ {
+ os.write ( m.fMsg.getBytes() );
+
+ }
+ }
+
+
+
+ final long startMs = Clock.now ();
+ if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
+
+
+ try {
+ DME2Configue();
+
+ Thread.sleep(5);
+ getLog().info ( "sending " + fPending.size() + " msgs to " + url+subContextPath + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" );
+ sender.setPayload(os.toString());
+
+
+ String dmeResponse = sender.sendAndWait(5000L);
+ System.out.println("dmeres->"+dmeResponse);
+
+
+ pubResponse = createMRPublisherResponse(dmeResponse,pubResponse);
+
+ if(Integer.valueOf(pubResponse.getResponseCode()) < 200 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
+
+ return pubResponse;
+ }
+ final String logLine = String.valueOf((Clock.now() - startMs))
+ + dmeResponse.toString();
+ getLog().info(logLine);
+ fPending.clear();
+
+ }
+ catch (DME2Exception x) {
+ getLog().warn(x.getMessage(), x);
+ pubResponse.setResponseCode(x.getErrorCode());
+ pubResponse.setResponseMessage(x.getErrorMessage());
+ } catch (URISyntaxException x) {
+
+ getLog().warn(x.getMessage(), x);
+ pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
+ pubResponse.setResponseMessage(x.getMessage());
+ } catch (Exception x) {
+
+ pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
+ pubResponse.setResponseMessage(x.getMessage());
+
+ }
+
+ return pubResponse;
+ }
+
+ if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
+ getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" );
+ final String result = postAuthwithResponse(httpurl, baseStream.toByteArray(), contentType, authKey, authDate, username, password,protocolFlag);
+ //System.out.println(result.getInt("status"));
+ //Here we are checking for error response. If HTTP status
+ //code is not within the http success response code
+ //then we consider this as error and return false
+
+
+ pubResponse = createMRPublisherResponse(result,pubResponse);
+
+ if(Integer.valueOf(pubResponse.getResponseCode()) < 200 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
+
+ return pubResponse;
+ }
+
+ final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
+ getLog().info(logLine);
+ fPending.clear();
+ return pubResponse;
+ }
+
+ if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
+ getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms" );
+ final String result = postWithResponse(httpurl, baseStream.toByteArray(), contentType, username, password, protocolFlag);
+
+ //System.out.println(result.getInt("status"));
+ //Here we are checking for error response. If HTTP status
+ //code is not within the http success response code
+ //then we consider this as error and return false
+ pubResponse = createMRPublisherResponse(result,pubResponse);
+
+ if(Integer.valueOf(pubResponse.getResponseCode()) < 200 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
+
+ return pubResponse;
+ }
+
+ final String logLine = String.valueOf((Clock.now() - startMs));
+ getLog().info(logLine);
+ fPending.clear();
+ return pubResponse;
+ }
+ }
+ catch ( IllegalArgumentException x ) {
+ getLog().warn ( x.getMessage(), x );
+ pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
+ pubResponse.setResponseMessage(x.getMessage());
+
+ } catch ( IOException x ) {
+ getLog().warn ( x.getMessage(), x );
+ pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
+ pubResponse.setResponseMessage(x.getMessage());
+
+ } catch (HttpException x) {
+ getLog().warn ( x.getMessage(), x );
+ pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
+ pubResponse.setResponseMessage(x.getMessage());
+
+ } catch (Exception x) {
+ getLog().warn(x.getMessage(), x);
+
+ pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
+ pubResponse.setResponseMessage(x.getMessage());
+
+ }
+
+ finally {
+ if (fPending.size()>0) {
+ getLog().warn ( "Send failed, " + fPending.size() + " message to send." );
+ pubResponse.setPendingMsgs(fPending.size());
+ }
+ if (os != null) {
+ try {
+ os.close();
+ } catch (Exception x) {
+ getLog().warn(x.getMessage(), x);
+ pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
+ pubResponse.setResponseMessage("Error in closing Output Stream");
+ }
+ }
+ }
+
+ return pubResponse;
+ }
+
+private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherResponse mrPubResponse) {
+
+ if (reply.isEmpty())
+ {
+
+ mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
+ mrPubResponse.setResponseMessage("Please verify the Producer properties");
+ }
+ else if(reply.startsWith("{"))
+ {
+ JSONObject jObject = new JSONObject(reply);
+ if(jObject.has("message") && jObject.has("status"))
+ {
+ String message = jObject.getString("message");
+ if(null != message)
+ {
+ mrPubResponse.setResponseMessage(message);
+ }
+ mrPubResponse.setResponseCode(Integer.toString(jObject.getInt("status")));
+ }
+ else
+ {
+ mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
+ mrPubResponse.setResponseMessage(reply);
+ }
+ }
+ else if (reply.startsWith("<"))
+ {
+ String responseCode = getHTTPErrorResponseCode(reply);
+ if( responseCode.contains("403"))
+ {
+ responseCode = "403";
+ }
+ mrPubResponse.setResponseCode(responseCode);
+ mrPubResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));
+ }
+
+ return mrPubResponse;
+ }
+
+ private final String fTopic;
+ private final int fMaxBatchSize;
+ private final long fMaxBatchAgeMs;
+ private final boolean fCompress;
+ private int threadOccuranceTime;
+ private boolean fClosed;
+ private String username;
+ private String password;
+ private String host;
+
+ //host selector
+ private HostSelector fHostSelector = null;
+
+ private final LinkedBlockingQueue<TimestampedMessage> fPending;
+ private long fDontSendUntilMs;
+ private final ScheduledThreadPoolExecutor fExec;
+
+ private String latitude;
+ private String longitude;
+ private String version;
+ private String serviceName;
+ private String env;
+ private String partner;
+ private String routeOffer;
+ private String subContextPath;
+ private String protocol;
+ private String methodType;
+ private String url;
+ private String dmeuser;
+ private String dmepassword;
+ private String contentType;
+ private static final long sfWaitAfterError = 10000;
+ private HashMap<String, String> DMETimeOuts;
+ private DME2Client sender;
+ public String protocolFlag = ProtocolTypeConstants.DME2.getValue();
+ public String producerFilePath;
+ private String authKey;
+ private String authDate;
+ private String handlers;
+ private Properties props;
+ public static String routerFilePath;
+ public static Map<String, String> headers=new HashMap<String, String>();
+ public static MultivaluedMap<String, Object> headersMap;
+
+
+ private MRPublisherResponse pubResponse;
+
+ public MRPublisherResponse getPubResponse() {
+ return pubResponse;
+ }
+ public void setPubResponse(MRPublisherResponse pubResponse) {
+ this.pubResponse = pubResponse;
+ }
+
+ public static String getRouterFilePath() {
+ return routerFilePath;
+ }
+
+ public static void setRouterFilePath(String routerFilePath) {
+ MRSimplerBatchPublisher.routerFilePath = routerFilePath;
+ }
+
+ public Properties getProps() {
+ return props;
+ }
+
+ public void setProps(Properties props) {
+ this.props = props;
+ }
+
+ public String getProducerFilePath() {
+ return producerFilePath;
+ }
+
+ public void setProducerFilePath(String producerFilePath) {
+ this.producerFilePath = producerFilePath;
+ }
+
+ public String getProtocolFlag() {
+ return protocolFlag;
+ }
+
+ public void setProtocolFlag(String protocolFlag) {
+ this.protocolFlag = protocolFlag;
+ }
+
+
+ private void DME2Configue() throws Exception {
+ try {
+
+ /* FileReader reader = new FileReader(new File (producerFilePath));
+ Properties props = new Properties();
+ props.load(reader);*/
+ latitude = props.getProperty("Latitude");
+ longitude = props.getProperty("Longitude");
+ version = props.getProperty("Version");
+ serviceName = props.getProperty("ServiceName");
+ env = props.getProperty("Environment");
+ partner = props.getProperty("Partner");
+ routeOffer = props.getProperty("routeOffer");
+ subContextPath = props.getProperty("SubContextPath")+fTopic;
+ /*if(props.getProperty("partition")!=null && !props.getProperty("partition").equalsIgnoreCase("")){
+ subContextPath=subContextPath+"?partitionKey="+props.getProperty("partition");
+ }*/
+ protocol = props.getProperty("Protocol");
+ methodType = props.getProperty("MethodType");
+ dmeuser = props.getProperty("username");
+ dmepassword = props.getProperty("password");
+ contentType = props.getProperty("contenttype");
+ handlers = props.getProperty("sessionstickinessrequired");
+ routerFilePath= props.getProperty("DME2preferredRouterFilePath");
+
+ /**
+ * Changes to DME2Client url to use Partner for auto failover between data centers
+ * When Partner value is not provided use the routeOffer value for auto failover within a cluster
+ */
+
+
+ String partitionKey = props.getProperty("partition");
+
+ if (partner != null && !partner.isEmpty() )
+ {
+ url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&partner="+partner;
+ if(partitionKey!=null && !partitionKey.equalsIgnoreCase("")){
+ url = url + "&partitionKey=" + partitionKey;
+ }
+ }
+ else if (routeOffer!=null && !routeOffer.isEmpty())
+ {
+ url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&routeoffer="+routeOffer;
+ if(partitionKey!=null && !partitionKey.equalsIgnoreCase("")){
+ url = url + "&partitionKey=" + partitionKey;
+ }
+ }
+
+ DMETimeOuts = new HashMap<String, String>();
+ DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
+ DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS"));
+ DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
+ DMETimeOuts.put("Content-Type", contentType);
+ System.setProperty("AFT_LATITUDE", latitude);
+ System.setProperty("AFT_LONGITUDE", longitude);
+ System.setProperty("AFT_ENVIRONMENT",props.getProperty("AFT_ENVIRONMENT"));
+ //System.setProperty("DME2.DEBUG", "true");
+ // System.setProperty("AFT_DME2_HTTP_EXCHANGE_TRACE_ON", "true");
+ //System.out.println("XXXXXX"+url);
+
+ //SSL changes
+ System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS",
+ "SSLv3,TLSv1,TLSv1.1");
+ System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
+ System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
+
+ //SSL changes
+
+ sender = new DME2Client(new URI(url), 5000L);
+
+ sender.setAllowAllHttpReturnCodes(true);
+ sender.setMethod(methodType);
+ sender.setSubContext(subContextPath);
+ sender.setCredentials(dmeuser, dmepassword);
+ sender.setHeaders(DMETimeOuts);
+ if(handlers.equalsIgnoreCase("yes")){
+ sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
+ sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
+ sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON"));
+ }else{
+ sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.dme.client.HeaderReplyHandler");
+ }
+ } catch (DME2Exception x) {
+ getLog().warn(x.getMessage(), x);
+ throw new DME2Exception(x.getErrorCode(),x.getErrorMessage());
+ } catch (URISyntaxException x) {
+
+ getLog().warn(x.getMessage(), x);
+ throw new URISyntaxException(url,x.getMessage());
+ } catch (Exception x) {
+
+ getLog().warn(x.getMessage(), x);
+ throw new Exception(x.getMessage());
+ }
+ }
+
+ private MRSimplerBatchPublisher ( Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs, boolean compress) throws MalformedURLException
+ {
+ super ( hosts );
+
+ if ( topic == null || topic.length() < 1 )
+ {
+ throw new IllegalArgumentException ( "A topic must be provided." );
+ }
+
+ fHostSelector = new HostSelector(hosts, null);
+ fClosed = false;
+ fTopic = topic;
+ fMaxBatchSize = maxBatchSize;
+ fMaxBatchAgeMs = maxBatchAgeMs;
+ fCompress = compress;
+
+ fPending = new LinkedBlockingQueue<TimestampedMessage> ();
+ fDontSendUntilMs = 0;
+ fExec = new ScheduledThreadPoolExecutor ( 1 );
+ pubResponse = new MRPublisherResponse();
+
+ }
+
+ private MRSimplerBatchPublisher ( Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs, boolean compress, boolean allowSelfSignedCerts, int httpThreadOccurnace ) throws MalformedURLException
+ {
+ super ( hosts );
+
+ if ( topic == null || topic.length() < 1 )
+ {
+ throw new IllegalArgumentException ( "A topic must be provided." );
+ }
+
+ fHostSelector = new HostSelector(hosts, null);
+ fClosed = false;
+ fTopic = topic;
+ fMaxBatchSize = maxBatchSize;
+ fMaxBatchAgeMs = maxBatchAgeMs;
+ fCompress = compress;
+ threadOccuranceTime=httpThreadOccurnace;
+ fPending = new LinkedBlockingQueue<TimestampedMessage> ();
+ fDontSendUntilMs = 0;
+ fExec = new ScheduledThreadPoolExecutor ( 1 );
+ fExec.scheduleAtFixedRate ( new Runnable()
+ {
+ @Override
+ public void run ()
+ {
+ send ( false );
+ }
+ }, 100, threadOccuranceTime, TimeUnit.MILLISECONDS );
+ }
+
+ private static class TimestampedMessage extends message
+ {
+ public TimestampedMessage ( message m )
+ {
+ super ( m );
+ timestamp = Clock.now();
+ }
+ public final long timestamp;
+ }
+
+ public String getUsername() {
+ return username;
+ }
+
+ public void setUsername(String username) {
+ this.username = username;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public String getContentType() {
+ return contentType;
+ }
+
+ public void setContentType(String contentType) {
+ this.contentType = contentType;
+ }
+
+ public String getAuthKey() {
+ return authKey;
+ }
+
+ public void setAuthKey(String authKey) {
+ this.authKey = authKey;
+ }
+
+ public String getAuthDate() {
+ return authDate;
+ }
+
+ public void setAuthDate(String authDate) {
+ this.authDate = authDate;
+ }
+
+}
diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/response/MRConsumerResponse.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/response/MRConsumerResponse.java
new file mode 100644
index 0000000..0006852
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/response/MRConsumerResponse.java
@@ -0,0 +1,60 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.response;
+
+public class MRConsumerResponse {
+
+ private String responseCode;
+
+ private String responseMessage;
+
+ private Iterable<String> actualMessages;
+
+
+
+
+ public String getResponseCode() {
+ return responseCode;
+ }
+
+ public void setResponseCode(String responseCode) {
+ this.responseCode = responseCode;
+ }
+
+ public String getResponseMessage() {
+ return responseMessage;
+ }
+
+ public void setResponseMessage(String responseMessage) {
+ this.responseMessage = responseMessage;
+ }
+
+ public Iterable<String> getActualMessages() {
+ return actualMessages;
+ }
+
+ public void setActualMessages(Iterable<String> actualMessages) {
+ this.actualMessages = actualMessages;
+ }
+
+
+}
diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/response/MRPublisherResponse.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/response/MRPublisherResponse.java
new file mode 100644
index 0000000..fd53de5
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/client/response/MRPublisherResponse.java
@@ -0,0 +1,66 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.response;
+
+/**
+ * Response for Publisher
+ * @author author
+ *
+ */
+public class MRPublisherResponse {
+ private String responseCode;
+
+ private String responseMessage;
+
+ private int pendingMsgs;
+
+ public String getResponseCode() {
+ return responseCode;
+ }
+
+ public void setResponseCode(String responseCode) {
+ this.responseCode = responseCode;
+ }
+
+ public String getResponseMessage() {
+ return responseMessage;
+ }
+
+ public void setResponseMessage(String responseMessage) {
+ this.responseMessage = responseMessage;
+ }
+
+ public int getPendingMsgs() {
+ return pendingMsgs;
+ }
+
+ public void setPendingMsgs(int pendingMsgs) {
+ this.pendingMsgs = pendingMsgs;
+ }
+
+ public String toString() {
+ return "Response Code:" + this.responseCode + ","
+ + "Response Message:" + this.responseMessage + "," + "Pending Messages Count"
+ + this.pendingMsgs;
+ }
+
+}
diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/dme/client/DefaultLoggingFailoverFaultHandler.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/dme/client/DefaultLoggingFailoverFaultHandler.java
new file mode 100644
index 0000000..a97793b
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/dme/client/DefaultLoggingFailoverFaultHandler.java
@@ -0,0 +1,50 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.dme.client;
+
+//package com.att.aft.dme2.api;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+//import com.att.aft.dme2.api.DME2FailoverFaultHandler;
+//import com.att.aft.dme2.api.util.DME2Constants;
+//import com.att.aft.dme2.api.util.DME2ExchangeFaultContext;
+//import com.att.aft.dme2.api.util.LogMessage;
+//import com.att.aft.dme2.api.util.LogUtil;
+public class DefaultLoggingFailoverFaultHandler /*implements DME2FailoverFaultHandler*/ {
+ /** The logger. */
+ //private static Logger logger = DME2Constants.getLogger(DefaultLoggingFailoverFaultHandler.class.getName());
+
+// @Override
+// public void handleEndpointFailover(/*DME2ExchangeFaultContext context*/) {
+// // LogUtil.INSTANCE.report(logger, Level.WARNING, LogMessage.SEP_FAILOVER, context.getService(),context.getRequestURL(),context.getRouteOffer(),context.getResponseCode(),context.getException());
+// }
+// @Override
+// /**
+// * The DME2Exchange already has a log message when the route offer is failed over. We dont need to log it again here.
+// */
+// public void handleRouteOfferFailover(DME2ExchangeFaultContext context) {
+// //noop
+//
+// }
+}
\ No newline at end of file
diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/dme/client/HeaderReplyHandler.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/dme/client/HeaderReplyHandler.java
new file mode 100644
index 0000000..f5c133a
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/dme/client/HeaderReplyHandler.java
@@ -0,0 +1,63 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.dme.client;
+
+
+import java.util.Map;
+
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRClientFactory;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.impl.MRSimplerBatchPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.att.aft.dme2.api.util.DME2ExchangeFaultContext;
+import com.att.aft.dme2.api.util.DME2ExchangeReplyHandler;
+import com.att.aft.dme2.api.util.DME2ExchangeResponseContext;
+
+
+
+//public class HeaderReplyHandler implements DME2ReplyHandler {
+
+ public class HeaderReplyHandler implements DME2ExchangeReplyHandler {
+
+ private Logger fLog = LoggerFactory.getLogger ( this.getClass().getName () );
+
+
+ @Override public void handleFault(DME2ExchangeFaultContext responseData) {
+ // TODO Auto-generated method stub
+ //StaticCache.getInstance().setHandleFaultInvoked(true);
+ }
+ @Override public void handleEndpointFault(DME2ExchangeFaultContext responseData) {
+ // TODO Auto-generated method stub
+ //StaticCache.getInstance().setHandleEndpointFaultInvoked(true);
+ }
+@Override public void handleReply(DME2ExchangeResponseContext responseData) {
+
+ if(responseData != null) {
+ MRClientFactory.DME2HeadersMap=responseData.getResponseHeaders();
+ if (responseData.getResponseHeaders().get("transactionId")!=null)
+ fLog.info("Transaction Id : " + responseData.getResponseHeaders().get("transactionId"));
+
+ }
+}
+
+}
diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/dme/client/PreferredRouteReplyHandler.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/dme/client/PreferredRouteReplyHandler.java
new file mode 100644
index 0000000..bf57836
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/dme/client/PreferredRouteReplyHandler.java
@@ -0,0 +1,74 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.dme.client;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.InputStream;
+
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRClientFactory;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.impl.MRSimplerBatchPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.att.aft.dme2.api.util.DME2ExchangeFaultContext;
+import com.att.aft.dme2.api.util.DME2ExchangeReplyHandler;
+import com.att.aft.dme2.api.util.DME2ExchangeResponseContext;
+
+public class PreferredRouteReplyHandler implements DME2ExchangeReplyHandler {
+ private Logger fLog = LoggerFactory.getLogger ( this.getClass().getName () );
+ @Override public void handleReply(DME2ExchangeResponseContext responseData) {
+
+ if(responseData != null) {
+ MRClientFactory.DME2HeadersMap=responseData.getResponseHeaders();
+ if (responseData.getResponseHeaders().get("transactionId")!=null)
+
+ fLog.info("Transaction_Id : " + responseData.getResponseHeaders().get("transactionId"));
+
+ if(responseData.getRouteOffer() != null ){
+ routeWriter("preferredRouteKey",responseData.getRouteOffer());
+
+ }
+ }
+}
+
+ @Override public void handleFault(DME2ExchangeFaultContext responseData) {
+ // TODO Auto-generated method stub
+ //StaticCache.getInstance().setHandleFaultInvoked(true);
+ }
+ @Override public void handleEndpointFault(DME2ExchangeFaultContext responseData) {
+ // TODO Auto-generated method stub
+ //StaticCache.getInstance().setHandleEndpointFaultInvoked(true);
+ }
+ public void routeWriter(String routeKey, String routeValue){
+
+ try{
+
+ FileWriter routeWriter=new FileWriter(new File (MRSimplerBatchPublisher.routerFilePath));
+ routeWriter.write(routeKey+"="+routeValue);
+ routeWriter.close();
+
+ }catch(Exception ex){
+ fLog.error("Reply Router Error " + ex.toString() );
+ }
+
+ }
+}
diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/dme/client/PreferredRouteRequestHandler.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/dme/client/PreferredRouteRequestHandler.java
new file mode 100644
index 0000000..14ab313
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/dme/client/PreferredRouteRequestHandler.java
@@ -0,0 +1,54 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.dme.client;
+
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRClientFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.att.aft.dme2.api.util.DME2ExchangeRequestContext;
+import com.att.aft.dme2.api.util.DME2ExchangeRequestHandler;
+
+public class PreferredRouteRequestHandler implements DME2ExchangeRequestHandler {
+ private Logger fLog = LoggerFactory.getLogger(this.getClass().getName());
+
+ @Override
+ public void handleRequest(DME2ExchangeRequestContext requestData) {
+
+ if (requestData != null) {
+
+ requestData.setPreferredRouteOffer(readRoute("preferredRouteKey"));
+ }
+ }
+
+ public String readRoute(String routeKey) {
+
+ try {
+
+ MRClientFactory.prop.load(MRClientFactory.routeReader);
+
+ } catch (Exception ex) {
+ fLog.error("Request Router Error " + ex.toString());
+ }
+ return MRClientFactory.prop.getProperty(routeKey);
+ }
+}
diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/dme/client/SimpleExampleConsumer.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/dme/client/SimpleExampleConsumer.java
new file mode 100644
index 0000000..fa1075c
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/dme/client/SimpleExampleConsumer.java
@@ -0,0 +1,77 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+
+package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.dme.client;
+
+import java.util.Map;
+
+import javax.ws.rs.core.MultivaluedMap;
+
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRClientFactory;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRConsumer;
+
+public class SimpleExampleConsumer {
+
+ public static void main(String[] args) {
+
+ long count = 0;
+ long nextReport = 5000;
+
+ final long startMs = System.currentTimeMillis();
+
+ try {
+
+ final MRConsumer cc = MRClientFactory.createConsumer("D:\\SG\\consumer.properties");
+ while (true) {
+ for (String msg : cc.fetch()) {
+
+ System.out.println("Message Received: " + msg);
+ }
+ // Header for DME2 Call.
+ MultivaluedMap<String, Object> headersMap = MRClientFactory.HTTPHeadersMap;
+ for (String key : headersMap.keySet()) {
+ System.out.println("Header Key " + key);
+ System.out.println("Header Value " + headersMap.get(key));
+ }
+ // Header for HTTP Call.
+
+ Map<String, String>
+ dme2headersMap=MRClientFactory.DME2HeadersMap; for(String key
+ : dme2headersMap.keySet()) { System.out.println("Header Key "
+ + key); System.out.println("Header Value " +
+ dme2headersMap.get(key)); }
+
+ if (count > nextReport) {
+ nextReport += 5000;
+
+ final long endMs = System.currentTimeMillis();
+ final long elapsedMs = endMs - startMs;
+ final double elapsedSec = elapsedMs / 1000.0;
+ final double eps = count / elapsedSec;
+ System.out.println("Consumed " + count + " in " + elapsedSec + "; " + eps + " eps");
+ }
+ }
+ } catch (Exception x) {
+ System.err.println(x.getClass().getName() + ": " + x.getMessage());
+ }
+ }
+}
diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/dme/client/SimpleExamplePublisher.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/dme/client/SimpleExamplePublisher.java
new file mode 100644
index 0000000..c4b006f
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/dme/client/SimpleExamplePublisher.java
@@ -0,0 +1,134 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+
+package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.dme.client;
+
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.ws.rs.core.MultivaluedMap;
+
+import org.json.JSONObject;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRBatchingPublisher;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRClientFactory;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRPublisher.message;
+
+/**
+ * An example of how to use the Java publisher.
+ *
+ * @author author
+ */
+public class SimpleExamplePublisher {
+ static String content = null;
+ static String messageSize = null;
+ static String transport = null;
+ static String messageCount = null;
+
+ public void publishMessage(String producerFilePath) throws IOException, InterruptedException, Exception {
+
+ // create our publisher
+
+ // publish some messages
+
+
+ StringBuilder sb = new StringBuilder();
+ final MRBatchingPublisher pub = MRClientFactory.createBatchingPublisher(producerFilePath);
+
+ if (content.equalsIgnoreCase("text/plain")) {
+ for (int i = 0; i < Integer.parseInt(messageCount); i++) {
+ for (int j = 0; j < Integer.parseInt(messageSize); j++) {
+ sb.append("T");
+ }
+
+ pub.send(sb.toString());
+ }
+ } else if (content.equalsIgnoreCase("application/cambria")) {
+ for (int i = 0; i < Integer.parseInt(messageCount); i++) {
+ for (int j = 0; j < Integer.parseInt(messageSize); j++) {
+ sb.append("C");
+ }
+
+ pub.send("Key", sb.toString());
+ }
+ } else if (content.equalsIgnoreCase("application/json")) {
+ for (int i = 0; i < Integer.parseInt(messageCount); i++) {
+
+ final JSONObject msg12 = new JSONObject();
+ msg12.put("Name", "DMaaP Reference Client to Test jason Message");
+
+ pub.send(msg12.toString());
+
+ }
+ }
+
+ // ...
+
+ // close the publisher to make sure everything's sent before exiting.
+ // The batching
+ // publisher interface allows the app to get the set of unsent messages.
+ // It could
+ // write them to disk, for example, to try to send them later.
+ /* final List<message> stuck = pub.close(20, TimeUnit.SECONDS);
+ if (stuck.size() > 0) {
+ System.err.println(stuck.size() + " messages unsent");
+ } else {
+ System.out.println("Clean exit; all messages sent.");
+ }*/
+
+ if (transport.equalsIgnoreCase("HTTP")) {
+ MultivaluedMap<String, Object> headersMap = MRClientFactory.HTTPHeadersMap;
+ for (String key : headersMap.keySet()) {
+ System.out.println("Header Key " + key);
+ System.out.println("Header Value " + headersMap.get(key));
+ }
+ } else {
+ Map<String, String> dme2headersMap = MRClientFactory.DME2HeadersMap;
+ for (String key : dme2headersMap.keySet()) {
+ System.out.println("Header Key " + key);
+ System.out.println("Header Value " + dme2headersMap.get(key));
+ }
+ }
+
+ }
+
+ public static void main(String[] args) throws InterruptedException, Exception {
+
+ String producerFilePath = args[0];
+ content = args[1];
+ messageSize = args[2];
+ transport = args[3];
+ messageCount = args[4];
+ /*String producerFilePath = null;
+ content = null;
+ messageSize =null;
+ transport =null;
+ messageCount = null;*/
+ SimpleExamplePublisher publisher = new SimpleExamplePublisher();
+
+ publisher.publishMessage("D:\\SG\\producer.properties");
+ }
+
+}
diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/logging/MRAppender.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/logging/MRAppender.java
new file mode 100644
index 0000000..96389f5
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/logging/MRAppender.java
@@ -0,0 +1,159 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+/**
+ *
+ */
+package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.logging;
+
+import java.io.IOException;
+
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.helpers.LogLog;
+import org.apache.log4j.spi.LoggingEvent;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRClientFactory;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRPublisher;
+
+/**
+ * @author author
+ *
+ */
+public class MRAppender extends AppenderSkeleton {
+
+ private MRPublisher fPublisher;
+
+ //Provided through log4j configuration
+ private String topic;
+ private String partition;
+ private String hosts;
+ private int maxBatchSize = 1;
+ private int maxAgeMs = 1000;
+ private boolean compress = false;
+
+ /**
+ *
+ */
+ public MRAppender() {
+ super();
+ }
+
+ /**
+ * @param isActive
+ */
+ public MRAppender(boolean isActive) {
+ super(isActive);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.log4j.Appender#close()
+ */
+ @Override
+ public void close() {
+ if (!this.closed) {
+ this.closed = true;
+ fPublisher.close();
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.log4j.Appender#requiresLayout()
+ */
+ @Override
+ public boolean requiresLayout() {
+ return false;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.log4j.AppenderSkeleton#append(org.apache.log4j.spi.LoggingEvent)
+ */
+ @Override
+ protected void append(LoggingEvent event) {
+ final String message;
+
+ if (this.layout == null) {
+ message = event.getRenderedMessage();
+ } else {
+ message = this.layout.format(event);
+ }
+
+ try {
+ fPublisher.send(partition, message);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void activateOptions() {
+ if (hosts != null && topic != null && partition != null) {
+ fPublisher = MRClientFactory.createBatchingPublisher(hosts.split(","), topic, maxBatchSize, maxAgeMs, compress);
+ } else {
+ LogLog.error("The Hosts, Topic, and Partition parameter are required to create a MR Log4J Appender");
+ }
+ }
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public String getPartition() {
+ return partition;
+ }
+
+ public void setPartition(String partition) {
+ this.partition = partition;
+ }
+
+ public String getHosts() {
+ return hosts;
+ }
+
+ public void setHosts(String hosts) {
+ this.hosts = hosts;
+ }
+
+ public int getMaxBatchSize() {
+ return maxBatchSize;
+ }
+
+ public void setMaxBatchSize(int maxBatchSize) {
+ this.maxBatchSize = maxBatchSize;
+ }
+
+ public int getMaxAgeMs() {
+ return maxAgeMs;
+ }
+
+ public void setMaxAgeMs(int maxAgeMs) {
+ this.maxAgeMs = maxAgeMs;
+ }
+
+ public boolean isCompress() {
+ return compress;
+ }
+
+ public void setCompress(boolean compress) {
+ this.compress = compress;
+ }
+
+}
diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/clients/ConsolePublisher.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/clients/ConsolePublisher.java
new file mode 100644
index 0000000..20eacd8
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/clients/ConsolePublisher.java
@@ -0,0 +1,87 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+
+package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.test.clients;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRBatchingPublisher;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRClientFactory;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRPublisher.message;
+
+/**
+ * A simple publisher that reads from std in, sending each line as a message.
+ * @author author
+ */
+public class ConsolePublisher
+{
+ public static void main ( String[] args ) throws IOException //throws IOException, InterruptedException
+ {
+ // read the hosts(s) from the command line
+ final String hosts = ( args.length > 0 ? args[0] : "aaa.it.att.com,bbb.it.att.com,ccc.it.att.com" );
+
+ // read the topic name from the command line
+ final String topic = ( args.length > 1 ? args[1] : "TEST-TOPIC" );
+
+ // read the topic name from the command line
+ final String partition = ( args.length > 2 ? args[2] : UUID.randomUUID ().toString () );
+
+ // set up some batch limits and the compression flag
+ final int maxBatchSize = 100;
+ final long maxAgeMs = 250;
+ final boolean withGzip = false;
+
+ // create our publisher
+ final MRBatchingPublisher pub = MRClientFactory.createBatchingPublisher ( hosts, topic, maxBatchSize, maxAgeMs, withGzip );
+
+ final BufferedReader cin = new BufferedReader ( new InputStreamReader ( System.in ) );
+ try
+ {
+ String line = null;
+ while ( ( line = cin.readLine () ) != null )
+ {
+ pub.send ( partition, line );
+ }
+ }
+ finally
+ {
+ List<message> leftovers = null;
+ try
+ {
+ leftovers = pub.close ( 10, TimeUnit.SECONDS );
+ }
+ catch ( InterruptedException e )
+ {
+ System.err.println ( "Send on close interrupted." );
+ }
+ for ( message m : leftovers )
+ {
+ System.err.println ( "Unsent message: " + m.fMsg );
+ }
+ }
+ }
+}
diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/clients/ProtocolTypeConstants.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/clients/ProtocolTypeConstants.java
new file mode 100644
index 0000000..c60657b
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/clients/ProtocolTypeConstants.java
@@ -0,0 +1,46 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+/**
+ *
+ */
+package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.test.clients;
+
+/**
+ * @author author
+ *
+ */
+public enum ProtocolTypeConstants {
+
+ DME2("DME2"),
+ AAF_AUTH("HTTPAAF"),
+ AUTH_KEY("HTTPAUTH");
+
+ private String value;
+
+ private ProtocolTypeConstants(String value) {
+ this.value = value;
+ }
+
+ public String getValue() {
+ return value;
+ }
+}
diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/clients/SampleConsumer.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/clients/SampleConsumer.java
new file mode 100644
index 0000000..79bf2ed
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/clients/SampleConsumer.java
@@ -0,0 +1,86 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.test.clients;
+
+import java.io.IOException;
+import java.util.LinkedList;
+
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRClientFactory;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SampleConsumer {
+ public static void main ( String[] args )
+ {
+ final Logger LOG = LoggerFactory.getLogger(SampleConsumer.class);
+
+
+ LOG.info("Sample Consumer Class executing");
+ final String topic = "org.onap.dmaap.messagerouter.dmaapclient.app.dmaap.mr.testingTopic";
+ final String url = ( args.length > 1 ? args[1] : "localhost:8181" );
+ final String group = ( args.length > 2 ? args[2] :"grp" );
+ /*final String id = ( args.length > 3 ? args[3] : "0" );*/
+ final String id = ( args.length > 3 ? args[3] : "1" );
+
+ long count = 0;
+ long nextReport = 5000;
+
+ final long startMs = System.currentTimeMillis ();
+
+ final LinkedList<String> urlList = new LinkedList<String> ();
+ for ( String u : url.split ( "," ) )
+ {
+ urlList.add ( u );
+ }
+
+ final MRConsumer cc = MRClientFactory.createConsumer ( urlList, topic, group, id, 10*1000, 1000, null, "CG0TXc2Aa3v8LfBk", "pj2rhxJWKP23pgy8ahMnjH88" );
+ try
+ {
+ while ( true )
+ {
+ for ( String msg : cc.fetch () )
+ {
+ //System.out.println ( "" + (++count) + ": " + msg );
+ LOG.info ( "" + (++count) + ": " + msg );
+ }
+
+ if ( count > nextReport )
+ {
+ nextReport += 5000;
+
+ final long endMs = System.currentTimeMillis ();
+ final long elapsedMs = endMs - startMs;
+ final double elapsedSec = elapsedMs / 1000.0;
+ final double eps = count / elapsedSec;
+ //System.out.println ( "Consumed " + count + " in " + elapsedSec + "; " + eps + " eps" );
+ LOG.info ( "Consumed " + count + " in " + elapsedSec + "; " + eps + " eps" );
+ }
+ LOG.info ( "" + (++count) + ": consumed message" );
+ }
+ }
+ catch ( Exception x )
+ {
+ System.err.println ( x.getClass().getName () + ": " + x.getMessage () );
+ }
+ }
+}
diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/clients/SamplePublisher.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/clients/SamplePublisher.java
new file mode 100644
index 0000000..5ed3a2b
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/clients/SamplePublisher.java
@@ -0,0 +1,85 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.test.clients;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.json.JSONObject;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRBatchingPublisher;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRClientFactory;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRClientBuilders.PublisherBuilder;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRPublisher.message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SamplePublisher {
+ public static void main ( String[] args ) throws IOException, InterruptedException
+ {
+ final Logger LOG = LoggerFactory.getLogger(SampleConsumer.class);
+ // read the hosts(s) from the command line
+ final String hosts = ( args.length > 0 ? args[0] : "localhost:8181" );
+
+ // read the topic name from the command line
+ //final String topic = ( args.length > 1 ? args[1] : "MY-EXAMPLE-TOPIC" );
+ final String topic = ( args.length > 1 ? args[1] : "org.onap.dmaap.messagerouter.dmaapclient.app.dmaap.mr.testingTopic" );
+
+ // set up some batch limits and the compression flag
+ final int maxBatchSize = 100;
+ final int maxAgeMs = 250;
+ final boolean withGzip = false;
+
+ // create our publisher
+
+ final MRBatchingPublisher pub = new PublisherBuilder ().
+ usingHosts ( hosts ).
+ onTopic ( topic ).limitBatch(maxBatchSize, maxAgeMs).
+ authenticatedBy ( "CG0TXc2Aa3v8LfBk", "pj2rhxJWKP23pgy8ahMnjH88" ).
+ build ()
+ ;
+ // publish some messages
+ final JSONObject msg1 = new JSONObject ();
+ msg1.put ( "name", "tttttttttttttttt" );
+ msg1.put ( "greeting", "ooooooooooooooooo" );
+ pub.send ( "MyPartitionKey", msg1.toString () );
+
+ final JSONObject msg2 = new JSONObject ();
+ msg2.put ( "now", System.currentTimeMillis () );
+ pub.send ( "MyOtherPartitionKey", msg2.toString () );
+
+ // ...
+
+ // close the publisher to make sure everything's sent before exiting. The batching
+ // publisher interface allows the app to get the set of unsent messages. It could
+ // write them to disk, for example, to try to send them later.
+ final List<message> stuck = pub.close ( 20, TimeUnit.SECONDS );
+ if ( stuck.size () > 0 )
+ {
+ LOG.warn ( stuck.size() + " messages unsent" );
+ }
+ else
+ {
+ LOG.info ( "Clean exit; all messages sent." );
+ }
+ }
+}
diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/clients/SimpleExampleConsumer.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/clients/SimpleExampleConsumer.java
new file mode 100644
index 0000000..fe43802
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/clients/SimpleExampleConsumer.java
@@ -0,0 +1,86 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+
+package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.test.clients;
+
+
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.util.Properties;
+
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRClientFactory;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRConsumer;
+
+public class SimpleExampleConsumer
+{
+
+ static FileWriter routeWriter= null;
+ static Properties props=null;
+ static FileReader routeReader=null;
+ public static void main ( String[] args )
+ {
+
+ long count = 0;
+ long nextReport = 5000;
+
+ final long startMs = System.currentTimeMillis ();
+
+ try
+ {
+ String routeFilePath="/src/main/resources/dme2/preferredRoute.txt";
+
+
+ File fo= new File(routeFilePath);
+ if(!fo.exists()){
+ routeWriter=new FileWriter(new File (routeFilePath));
+ }
+ routeReader= new FileReader(new File (routeFilePath));
+ props= new Properties();
+ final MRConsumer cc = MRClientFactory.createConsumer ( "/src/main/resources/dme2/consumer.properties" );
+ while ( true )
+ {
+ for ( String msg : cc.fetch () )
+ {
+ //System.out.println ( "" + (++count) + ": " + msg );
+ System.out.println(msg);
+ }
+
+ if ( count > nextReport )
+ {
+ nextReport += 5000;
+
+ final long endMs = System.currentTimeMillis ();
+ final long elapsedMs = endMs - startMs;
+ final double elapsedSec = elapsedMs / 1000.0;
+ final double eps = count / elapsedSec;
+ System.out.println ( "Consumed " + count + " in " + elapsedSec + "; " + eps + " eps" );
+ }
+ }
+ }
+ catch ( Exception x )
+ {
+ System.err.println ( x.getClass().getName () + ": " + x.getMessage () );
+ }
+ }
+}
diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/clients/SimpleExampleConsumerWithReturnResponse.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/clients/SimpleExampleConsumerWithReturnResponse.java
new file mode 100644
index 0000000..836fb90
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/clients/SimpleExampleConsumerWithReturnResponse.java
@@ -0,0 +1,90 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.test.clients;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.util.Properties;
+
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRClientFactory;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRConsumer;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.response.MRConsumerResponse;
+
+public class SimpleExampleConsumerWithReturnResponse {
+
+
+ static FileWriter routeWriter= null;
+ static Properties props=null;
+ static FileReader routeReader=null;
+ public static void main ( String[] args )
+ {
+
+ long count = 0;
+ long nextReport = 5000;
+
+ final long startMs = System.currentTimeMillis ();
+
+ try
+ {
+ String routeFilePath="src/main/resources/dme2/preferredRoute.txt";
+
+
+ File fo= new File(routeFilePath);
+ if(!fo.exists()){
+ routeWriter=new FileWriter(new File (routeFilePath));
+ }
+ routeReader= new FileReader(new File (routeFilePath));
+ props= new Properties();
+ final MRConsumer cc = MRClientFactory.createConsumer ( "src/main/resources/dme2/consumer.properties" );
+ while ( true )
+ {
+ MRConsumerResponse mrConsumerResponse = cc.fetchWithReturnConsumerResponse();
+ System.out.println("mrConsumerResponse code :"+mrConsumerResponse.getResponseCode());
+
+ System.out.println("mrConsumerResponse Message :"+mrConsumerResponse.getResponseMessage());
+
+ System.out.println("mrConsumerResponse ActualMessage :"+mrConsumerResponse.getActualMessages());
+ /*for ( String msg : mrConsumerResponse.getActualMessages() )
+ {
+ //System.out.println ( "" + (++count) + ": " + msg );
+ System.out.println(msg);
+ }*/
+ if ( count > nextReport )
+ {
+ nextReport += 5000;
+
+ final long endMs = System.currentTimeMillis ();
+ final long elapsedMs = endMs - startMs;
+ final double elapsedSec = elapsedMs / 1000.0;
+ final double eps = count / elapsedSec;
+ System.out.println ( "Consumed " + count + " in " + elapsedSec + "; " + eps + " eps" );
+ }
+ }
+ }
+ catch ( Exception x )
+ {
+ System.err.println ( x.getClass().getName () + ": " + x.getMessage () );
+ }
+ }
+
+}
diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/clients/SimpleExamplePublisher.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/clients/SimpleExamplePublisher.java
new file mode 100644
index 0000000..8579e68
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/clients/SimpleExamplePublisher.java
@@ -0,0 +1,97 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+
+package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.test.clients;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.json.JSONObject;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRBatchingPublisher;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRClientFactory;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRPublisher.message;
+
+/**
+ * An example of how to use the Java publisher.
+ * @author author
+ */
+public class SimpleExamplePublisher
+{
+ static FileWriter routeWriter= null;
+ static Properties props=null;
+ static FileReader routeReader=null;
+ public void publishMessage ( String producerFilePath ) throws IOException, InterruptedException, Exception
+ {
+
+ // create our publisher
+ final MRBatchingPublisher pub = MRClientFactory.createBatchingPublisher (producerFilePath);
+ // publish some messages
+ final JSONObject msg1 = new JSONObject ();
+ msg1.put ( "Name", "Sprint" );
+ //msg1.put ( "greeting", "Hello .." );
+ pub.send ( "First cambria messge" );
+ pub.send ( "MyPartitionKey", msg1.toString () );
+
+ final JSONObject msg2 = new JSONObject ();
+ //msg2.put ( "mrclient1", System.currentTimeMillis () );
+
+
+ // ...
+
+ // close the publisher to make sure everything's sent before exiting. The batching
+ // publisher interface allows the app to get the set of unsent messages. It could
+ // write them to disk, for example, to try to send them later.
+ final List<message> stuck = pub.close ( 20, TimeUnit.SECONDS );
+ if ( stuck.size () > 0 )
+ {
+ System.err.println ( stuck.size() + " messages unsent" );
+ }
+ else
+ {
+ System.out.println ( "Clean exit; all messages sent." );
+ }
+ }
+
+ public static void main(String []args) throws InterruptedException, Exception{
+
+ String routeFilePath="/src/main/resources/dme2/preferredRoute.txt";
+
+ SimpleExamplePublisher publisher = new SimpleExamplePublisher();
+
+
+ File fo= new File(routeFilePath);
+ if(!fo.exists()){
+ routeWriter=new FileWriter(new File (routeFilePath));
+ }
+ routeReader= new FileReader(new File (routeFilePath));
+ props= new Properties();
+ publisher.publishMessage("/src/main/resources/dme2/producer.properties");
+ }
+
+}
+
diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/clients/SimpleExamplePublisherWithResponse.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/clients/SimpleExamplePublisherWithResponse.java
new file mode 100644
index 0000000..74b1462
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/clients/SimpleExamplePublisherWithResponse.java
@@ -0,0 +1,84 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.test.clients;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import org.json.JSONObject;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRBatchingPublisher;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRClientFactory;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.response.MRPublisherResponse;
+ /**
+ *An example of how to use the Java publisher.
+ * @author author
+ *
+ */
+ public class SimpleExamplePublisherWithResponse
+ {
+ static FileWriter routeWriter= null;
+ static Properties props=null;
+ static FileReader routeReader=null;
+
+ public static void main(String []args) throws InterruptedException, Exception{
+
+ String routeFilePath="src/main/resources/dme2/preferredRoute.txt";
+ String msgCount = args[0];
+ SimpleExamplePublisherWithResponse publisher = new SimpleExamplePublisherWithResponse();
+ File fo= new File(routeFilePath);
+ if(!fo.exists()){
+ routeWriter=new FileWriter(new File (routeFilePath));
+ }
+ routeReader= new FileReader(new File (routeFilePath));
+ props= new Properties();
+ int i=0;
+ while (i< Integer.valueOf(msgCount))
+ {
+ publisher.publishMessage("src/main/resources/dme2/producer.properties",Integer.valueOf(msgCount));
+ i++;
+ }
+ }
+
+ public void publishMessage ( String producerFilePath , int count ) throws IOException, InterruptedException, Exception
+ {
+ // create our publisher
+ final MRBatchingPublisher pub = MRClientFactory.createBatchingPublisher (producerFilePath,true);
+ // publish some messages
+ final JSONObject msg1 = new JSONObject ();
+
+ msg1.put ( "Partition:1", "Message:"+count);
+ msg1.put ( "greeting", "Hello .." );
+
+
+ pub.send ( "1", msg1.toString());
+ pub.send ( "1", msg1.toString());
+
+ MRPublisherResponse res= pub.sendBatchWithResponse();
+
+ System.out.println("Pub response->"+res.toString());
+ }
+
+
+ }
diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/support/MRBatchingPublisherMock.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/support/MRBatchingPublisherMock.java
new file mode 100644
index 0000000..eca5874
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/support/MRBatchingPublisherMock.java
@@ -0,0 +1,183 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.test.support;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRBatchingPublisher;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.response.MRPublisherResponse;
+import org.slf4j.Logger;
+
+/**
+ * A helper for unit testing systems that use a MRPublisher. When setting
+ * up your test, inject an instance into MRClientFactory to have it return
+ * the mock client.
+ *
+ * @author author
+ *
+ */
+public class MRBatchingPublisherMock implements MRBatchingPublisher
+{
+ public class Entry
+ {
+ public Entry ( String partition, String msg )
+ {
+ fPartition = partition;
+ fMessage = msg;
+ }
+
+ @Override
+ public String toString ()
+ {
+ return fMessage;
+ }
+
+ public final String fPartition;
+ public final String fMessage;
+ }
+
+ public MRBatchingPublisherMock ()
+ {
+ fCaptures = new LinkedList<Entry> ();
+ }
+
+ public interface Listener
+ {
+ void onMessage ( Entry e );
+ }
+ public void addListener ( Listener listener )
+ {
+ fListeners.add ( listener );
+ }
+
+ public List<Entry> getCaptures ()
+ {
+ return getCaptures ( new MessageFilter () { @Override public boolean match ( String msg ) { return true; } } );
+ }
+
+ public interface MessageFilter
+ {
+ boolean match ( String msg );
+ }
+
+ public List<Entry> getCaptures ( MessageFilter filter )
+ {
+ final LinkedList<Entry> result = new LinkedList<Entry> ();
+ for ( Entry capture : fCaptures )
+ {
+ if ( filter.match ( capture.fMessage ) )
+ {
+ result.add ( capture );
+ }
+ }
+ return result;
+ }
+
+ public int received ()
+ {
+ return fCaptures.size();
+ }
+
+ public void reset ()
+ {
+ fCaptures.clear ();
+ }
+
+ @Override
+ public int send ( String partition, String msg )
+ {
+ final Entry e = new Entry ( partition, msg );
+
+ fCaptures.add ( e );
+ for ( Listener l : fListeners )
+ {
+ l.onMessage ( e );
+ }
+ return 1;
+ }
+
+ @Override
+ public int send ( message msg )
+ {
+ return send ( msg.fPartition, msg.fMsg );
+ }
+ @Override
+ public int send ( String msg )
+ {
+ return 1;
+
+ }
+
+ @Override
+ public int send ( Collection<message> msgs )
+ {
+ int sum = 0;
+ for ( message m : msgs )
+ {
+ sum += send ( m );
+ }
+ return sum;
+ }
+
+ @Override
+ public int getPendingMessageCount ()
+ {
+ return 0;
+ }
+
+ @Override
+ public List<message> close ( long timeout, TimeUnit timeoutUnits )
+ {
+ return new LinkedList<message> ();
+ }
+
+ @Override
+ public void close ()
+ {
+ }
+
+ @Override
+ public void setApiCredentials ( String apiKey, String apiSecret )
+ {
+ }
+
+ @Override
+ public void clearApiCredentials ()
+ {
+ }
+
+ @Override
+ public void logTo ( Logger log )
+ {
+ }
+
+ private final LinkedList<Entry> fCaptures;
+ private LinkedList<Listener> fListeners = new LinkedList<Listener> ();
+ @Override
+ public MRPublisherResponse sendBatchWithResponse() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+}
diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/support/MRConsumerMock.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/support/MRConsumerMock.java
new file mode 100644
index 0000000..9728053
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/test/support/MRConsumerMock.java
@@ -0,0 +1,167 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.test.support;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRConsumer;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.response.MRConsumerResponse;
+import org.slf4j.Logger;
+
+/**
+ * A helper for unit testing systems that use a MRConsumer. When setting
+ * up your test, inject an instance into MRClientFactory to have it return
+ * the mock client.
+ *
+ * @author author
+ *
+ */
+public class MRConsumerMock implements MRConsumer
+{
+ public class Entry
+ {
+ public Entry ( long waitMs, int statusCode, List<String> msgs )
+ {
+ fWaitMs = waitMs;
+ fStatusCode = statusCode;
+ fStatusMsg = null;
+ fMsgs = new LinkedList<String> ( msgs );
+ }
+
+ public Entry ( long waitMs, int statusCode, String statusMsg )
+ {
+ fWaitMs = waitMs;
+ fStatusCode = statusCode;
+ fStatusMsg = statusMsg;
+ fMsgs = null;
+ }
+
+ public LinkedList<String> run () throws IOException
+ {
+ try
+ {
+ Thread.sleep ( fWaitMs );
+ if ( fStatusCode >= 200 && fStatusCode <= 299 )
+ {
+ return fMsgs;
+ }
+ throw new IOException ( "" + fStatusCode + " " + fStatusMsg );
+ }
+ catch ( InterruptedException e )
+ {
+ throw new IOException ( e );
+ }
+ }
+
+ private final long fWaitMs;
+ private final int fStatusCode;
+ private final String fStatusMsg;
+ private final LinkedList<String> fMsgs;
+ }
+
+ public MRConsumerMock ()
+ {
+ fReplies = new LinkedList<Entry> ();
+ }
+
+ @Override
+ public void close ()
+ {
+ }
+
+ @Override
+ public void setApiCredentials ( String apiKey, String apiSecret )
+ {
+ }
+
+ @Override
+ public void clearApiCredentials ()
+ {
+ }
+
+ public synchronized void add ( Entry e )
+ {
+ fReplies.add ( e );
+ }
+
+ public void addImmediateMsg ( String msg )
+ {
+ addDelayedMsg ( 0, msg );
+ }
+
+ public void addDelayedMsg ( long delay, String msg )
+ {
+ final LinkedList<String> list = new LinkedList<String> ();
+ list.add ( msg );
+ add ( new Entry ( delay, 200, list ) );
+ }
+
+ public void addImmediateMsgGroup ( List<String> msgs )
+ {
+ addDelayedMsgGroup ( 0, msgs );
+ }
+
+ public void addDelayedMsgGroup ( long delay, List<String> msgs )
+ {
+ final LinkedList<String> list = new LinkedList<String> ( msgs );
+ add ( new Entry ( delay, 200, list ) );
+ }
+
+ public void addImmediateError ( int statusCode, String statusText )
+ {
+ add ( new Entry ( 0, statusCode, statusText ) );
+ }
+
+ @Override
+ public Iterable<String> fetch () throws IOException
+ {
+ return fetch ( -1, -1 );
+ }
+
+ @Override
+ public Iterable<String> fetch ( int timeoutMs, int limit ) throws IOException
+ {
+ return fReplies.size () > 0 ? fReplies.removeFirst ().run() : new LinkedList<String>();
+ }
+
+ @Override
+ public void logTo ( Logger log )
+ {
+ }
+
+ private final LinkedList<Entry> fReplies;
+
+ @Override
+ public MRConsumerResponse fetchWithReturnConsumerResponse() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public MRConsumerResponse fetchWithReturnConsumerResponse(int timeoutMs,
+ int limit) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+}
diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/tools/ApiKeyCommand.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/tools/ApiKeyCommand.java
new file mode 100644
index 0000000..f15c3ae
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/tools/ApiKeyCommand.java
@@ -0,0 +1,135 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.tools;
+
+import java.io.IOException;
+import java.io.PrintStream;
+
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRClientFactory;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRIdentityManager;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRClient.MRApiException;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRIdentityManager.ApiKey;
+
+import com.att.nsa.apiClient.credentials.ApiCredential;
+import com.att.nsa.apiClient.http.HttpException;
+import com.att.nsa.apiClient.http.HttpObjectNotFoundException;
+import com.att.nsa.cmdtool.Command;
+import com.att.nsa.cmdtool.CommandNotReadyException;
+
+public class ApiKeyCommand implements Command<MRCommandContext>
+{
+
+ @Override
+ public String[] getMatches ()
+ {
+ return new String[]{
+ "key (create|update) (\\S*) (\\S*)",
+ "key (list) (\\S*)",
+ "key (revoke)",
+ };
+ }
+
+ @Override
+ public void checkReady ( MRCommandContext context ) throws CommandNotReadyException
+ {
+ if ( !context.checkClusterReady () )
+ {
+ throw new CommandNotReadyException ( "Use 'cluster' to specify a cluster to use." );
+ }
+ }
+
+ @Override
+ public void execute ( String[] parts, MRCommandContext context, PrintStream out ) throws CommandNotReadyException
+ {
+ final MRIdentityManager tm = MRClientFactory.createIdentityManager ( context.getCluster(), context.getApiKey(), context.getApiPwd() );
+ context.applyTracer ( tm );
+
+ try
+ {
+ if ( parts[0].equals ( "list" ) )
+ {
+ final ApiKey key = tm.getApiKey ( parts[1] );
+ if ( key != null )
+ {
+ out.println ( "email: " + key.getEmail () );
+ out.println ( "description: " + key.getDescription () );
+ }
+ else
+ {
+ out.println ( "No key returned" );
+ }
+ }
+ else if ( parts[0].equals ( "create" ) )
+ {
+ final ApiCredential ac = tm.createApiKey ( parts[1], parts[2] );
+ if ( ac != null )
+ {
+ out.println ( " key: " + ac.getApiKey () );
+ out.println ( "secret: " + ac.getApiSecret () );
+ }
+ else
+ {
+ out.println ( "No credential returned?" );
+ }
+ }
+ else if ( parts[0].equals ( "update" ) )
+ {
+ tm.updateCurrentApiKey ( parts[1], parts[2] );
+ out.println ( "Updated" );
+ }
+ else if ( parts[0].equals ( "revoke" ) )
+ {
+ tm.deleteCurrentApiKey ();
+ out.println ( "Updated" );
+ }
+ }
+ catch ( HttpObjectNotFoundException e )
+ {
+ out.println ( "Object not found: " + e.getMessage () );
+ }
+ catch ( HttpException e )
+ {
+ out.println ( "HTTP exception: " + e.getMessage () );
+ }
+ catch ( MRApiException e )
+ {
+ out.println ( "API exception: " + e.getMessage () );
+ }
+ catch ( IOException e )
+ {
+ out.println ( "IO exception: " + e.getMessage () );
+ }
+ finally
+ {
+ tm.close ();
+ }
+ }
+
+ @Override
+ public void displayHelp ( PrintStream out )
+ {
+ out.println ( "key create <email> <description>" );
+ out.println ( "key update <email> <description>" );
+ out.println ( "key list <key>" );
+ out.println ( "key revoke" );
+ }
+}
diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/tools/AuthCommand.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/tools/AuthCommand.java
new file mode 100644
index 0000000..0845432
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/tools/AuthCommand.java
@@ -0,0 +1,69 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.tools;
+
+import java.io.PrintStream;
+
+import com.att.nsa.cmdtool.Command;
+import com.att.nsa.cmdtool.CommandNotReadyException;
+
+public class AuthCommand implements Command<MRCommandContext>
+{
+ @Override
+ public void checkReady ( MRCommandContext context ) throws CommandNotReadyException
+ {
+ }
+
+ @Override
+ public void execute ( String[] parts, MRCommandContext context, PrintStream out ) throws CommandNotReadyException
+ {
+ if ( parts.length > 0 )
+ {
+ context.setAuth ( parts[0], parts[1] );
+ out.println ( "Now authenticating with " + parts[0] );
+ }
+ else
+ {
+ context.clearAuth ();
+ out.println ( "No longer authenticating." );
+ }
+ }
+
+ @Override
+ public void displayHelp ( PrintStream out )
+ {
+ out.println ( "auth <apiKey> <apiSecret>" );
+ out.println ( "\tuse these credentials on subsequent transactions" );
+ out.println ( "noauth" );
+ out.println ( "\tdo not use credentials on subsequent transactions" );
+ }
+
+ @Override
+ public String[] getMatches ()
+ {
+ return new String[]
+ {
+ "auth (\\S*) (\\S*)",
+ "noauth"
+ };
+ }
+}
diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/tools/ClusterCommand.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/tools/ClusterCommand.java
new file mode 100644
index 0000000..69198d4
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/tools/ClusterCommand.java
@@ -0,0 +1,81 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.tools;
+
+import java.io.PrintStream;
+
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.impl.MRConsumerImpl;
+
+import com.att.nsa.cmdtool.Command;
+import com.att.nsa.cmdtool.CommandNotReadyException;
+
+public class ClusterCommand implements Command<MRCommandContext>
+{
+
+ @Override
+ public String[] getMatches ()
+ {
+ return new String[]{
+ "cluster",
+ "cluster (\\S*)?",
+ };
+ }
+
+ @Override
+ public void checkReady ( MRCommandContext context ) throws CommandNotReadyException
+ {
+ }
+
+ @Override
+ public void execute ( String[] parts, MRCommandContext context, PrintStream out ) throws CommandNotReadyException
+ {
+ if ( parts.length == 0 )
+ {
+ for ( String host : context.getCluster () )
+ {
+ out.println ( host );
+ }
+ }
+ else
+ {
+ context.clearCluster ();
+ for ( String part : parts )
+ {
+ String[] hosts = part.trim().split ( "\\s+" );
+ for ( String host : hosts )
+ {
+ for ( String splitHost : MRConsumerImpl.stringToList(host) )
+ {
+ context.addClusterHost ( splitHost );
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void displayHelp ( PrintStream out )
+ {
+ out.println ( "cluster host1 host2 ..." );
+ }
+
+}
diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/tools/MRCommandContext.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/tools/MRCommandContext.java
new file mode 100644
index 0000000..7b11573
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/tools/MRCommandContext.java
@@ -0,0 +1,101 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.tools;
+
+import java.util.Collection;
+import java.util.LinkedList;
+
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRClient;
+
+import com.att.nsa.apiClient.http.HttpClient;
+import com.att.nsa.apiClient.http.HttpTracer;
+import com.att.nsa.cmdtool.CommandContext;
+
+public class MRCommandContext implements CommandContext
+{
+ public MRCommandContext ()
+ {
+ fApiKey = null;
+ fApiPwd = null;
+
+ fCluster = new LinkedList<String> ();
+ fCluster.add ( "localhost" );
+ }
+
+ @Override
+ public void requestShutdown ()
+ {
+ fShutdown = true;
+ }
+
+ @Override
+ public boolean shouldContinue ()
+ {
+ return !fShutdown;
+ }
+
+ public void setAuth ( String key, String pwd ) { fApiKey = key; fApiPwd = pwd; }
+ public void clearAuth () { setAuth(null,null); }
+
+ public boolean checkClusterReady ()
+ {
+ return ( fCluster.size () != 0 );
+ }
+
+ public Collection<String> getCluster ()
+ {
+ return new LinkedList<String> ( fCluster );
+ }
+
+ public void clearCluster ()
+ {
+ fCluster.clear ();
+ }
+
+ public void addClusterHost ( String host )
+ {
+ fCluster.add ( host );
+ }
+
+ public String getApiKey () { return fApiKey; }
+ public String getApiPwd () { return fApiPwd; }
+
+ public void useTracer ( HttpTracer t )
+ {
+ fTracer = t;
+ }
+ public void noTracer () { fTracer = null; }
+
+ public void applyTracer ( MRClient cc )
+ {
+ if ( cc instanceof HttpClient && fTracer != null )
+ {
+ ((HttpClient)cc).installTracer ( fTracer );
+ }
+ }
+
+ private boolean fShutdown;
+ private String fApiKey;
+ private String fApiPwd;
+ private final LinkedList<String> fCluster;
+ private HttpTracer fTracer = null;
+}
diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/tools/MRTool.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/tools/MRTool.java
new file mode 100644
index 0000000..563315e
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/tools/MRTool.java
@@ -0,0 +1,50 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.tools;
+
+import java.io.IOException;
+
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.impl.MRClientVersionInfo;
+
+import com.att.nsa.cmdtool.CommandLineTool;
+
+public class MRTool extends CommandLineTool<MRCommandContext>
+{
+ protected MRTool ()
+ {
+ super ( "MR Tool (" + MRClientVersionInfo.getVersion () + ")", "MR> " );
+
+ registerCommand ( new ApiKeyCommand () );
+ registerCommand ( new AuthCommand () );
+ registerCommand ( new ClusterCommand () );
+ registerCommand ( new MessageCommand () );
+ registerCommand ( new TopicCommand () );
+ registerCommand ( new TraceCommand () );
+ }
+
+ public static void main ( String[] args ) throws IOException
+ {
+ final MRTool ct = new MRTool ();
+ final MRCommandContext ccc = new MRCommandContext ();
+ ct.runFromMain ( args, ccc );
+ }
+}
diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/tools/MessageCommand.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/tools/MessageCommand.java
new file mode 100644
index 0000000..afee95f
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/tools/MessageCommand.java
@@ -0,0 +1,129 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.tools;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRBatchingPublisher;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRClientFactory;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRConsumer;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRClientBuilders.PublisherBuilder;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRPublisher.message;
+
+import com.att.nsa.cmdtool.Command;
+import com.att.nsa.cmdtool.CommandNotReadyException;
+
+public class MessageCommand implements Command<MRCommandContext>
+{
+
+ @Override
+ public String[] getMatches ()
+ {
+ return new String[]{
+ "(post) (\\S*) (\\S*) (.*)",
+ "(read) (\\S*) (\\S*) (\\S*)",
+ };
+ }
+
+ @Override
+ public void checkReady ( MRCommandContext context ) throws CommandNotReadyException
+ {
+ if ( !context.checkClusterReady () )
+ {
+ throw new CommandNotReadyException ( "Use 'cluster' to specify a cluster to use." );
+ }
+ }
+
+ @Override
+ public void execute ( String[] parts, MRCommandContext context, PrintStream out ) throws CommandNotReadyException
+ {
+ if ( parts[0].equalsIgnoreCase ( "read" ))
+ {
+ final MRConsumer cc = MRClientFactory.createConsumer ( context.getCluster (), parts[1], parts[2], parts[3],
+ -1, -1, null, context.getApiKey(), context.getApiPwd() );
+ context.applyTracer ( cc );
+ try
+ {
+ for ( String msg : cc.fetch () )
+ {
+ out.println ( msg );
+ }
+ }
+ catch ( Exception e )
+ {
+ out.println ( "Problem fetching messages: " + e.getMessage() );
+ }
+ finally
+ {
+ cc.close ();
+ }
+ }
+ else
+ {
+ final MRBatchingPublisher pub = new PublisherBuilder ().
+ usingHosts ( context.getCluster () ).
+ onTopic ( parts[1] ).
+ authenticatedBy ( context.getApiKey(), context.getApiPwd() ).
+ build ()
+ ;
+ try
+ {
+ pub.send ( parts[2], parts[3] );
+ }
+ catch ( IOException e )
+ {
+ out.println ( "Problem sending message: " + e.getMessage() );
+ }
+ finally
+ {
+ List<message> left = null;
+ try
+ {
+ left = pub.close ( 500, TimeUnit.MILLISECONDS );
+ }
+ catch ( IOException e )
+ {
+ out.println ( "Problem sending message: " + e.getMessage() );
+ }
+ catch ( InterruptedException e )
+ {
+ out.println ( "Problem sending message: " + e.getMessage() );
+ }
+ if ( left != null && left.size () > 0 )
+ {
+ out.println ( left.size() + " messages not sent." );
+ }
+ }
+ }
+ }
+
+ @Override
+ public void displayHelp ( PrintStream out )
+ {
+ out.println ( "post <topicName> <partition> <message>" );
+ out.println ( "read <topicName> <consumerGroup> <consumerId>" );
+ }
+
+}
diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/tools/TopicCommand.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/tools/TopicCommand.java
new file mode 100644
index 0000000..6bc6c14
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/tools/TopicCommand.java
@@ -0,0 +1,210 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.tools;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Set;
+
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRClientFactory;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRTopicManager;
+import org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.client.MRTopicManager.TopicInfo;
+
+import com.att.nsa.apiClient.http.HttpException;
+import com.att.nsa.apiClient.http.HttpObjectNotFoundException;
+import com.att.nsa.cmdtool.Command;
+import com.att.nsa.cmdtool.CommandNotReadyException;
+
+public class TopicCommand implements Command<MRCommandContext>
+{
+
+ @Override
+ public String[] getMatches ()
+ {
+ return new String[]{
+ "topic (list)",
+ "topic (list) (\\S*)",
+ "topic (create) (\\S*) (\\S*) (\\S*)",
+ "topic (grant|revoke) (read|write) (\\S*) (\\S*)",
+ };
+ }
+
+ @Override
+ public void checkReady ( MRCommandContext context ) throws CommandNotReadyException
+ {
+ if ( !context.checkClusterReady () )
+ {
+ throw new CommandNotReadyException ( "Use 'cluster' to specify a cluster to use." );
+ }
+ }
+
+ @Override
+ public void execute ( String[] parts, MRCommandContext context, PrintStream out ) throws CommandNotReadyException
+ {
+ final MRTopicManager tm = MRClientFactory.createTopicManager ( context.getCluster(), context.getApiKey(), context.getApiPwd() );
+ context.applyTracer ( tm );
+
+ try
+ {
+ if ( parts[0].equals ( "list" ) )
+ {
+ try
+ {
+ if ( parts.length == 1 )
+ {
+ for ( String topic : tm.getTopics () )
+ {
+ out.println ( topic );
+ }
+ }
+ else
+ {
+ final TopicInfo ti = tm.getTopicMetadata ( parts[1] );
+
+ final String owner = ti.getOwner ();
+ out.println ( " owner: " + ( owner == null ? "<none>" : owner ) );
+
+ final String desc = ti.getDescription ();
+ out.println ( "description: " + ( desc == null ? "<none>" : desc ) );
+
+ final Set<String> prods = ti.getAllowedProducers ();
+ if ( prods != null )
+ {
+ out.println ( " write ACL: " );
+ for ( String key : prods )
+ {
+ out.println ( "\t" + key );
+ }
+ }
+ else
+ {
+ out.println ( " write ACL: <not active>" );
+ }
+
+ final Set<String> cons = ti.getAllowedConsumers ();
+ if ( cons != null )
+ {
+ out.println ( " read ACL: " );
+ for ( String key : cons )
+ {
+ out.println ( "\t" + key );
+ }
+ }
+ else
+ {
+ out.println ( " read ACL: <not active>" );
+ }
+ }
+ }
+ catch ( IOException x )
+ {
+ out.println ( "Problem with request: " + x.getMessage () );
+ }
+ catch ( HttpObjectNotFoundException e )
+ {
+ out.println ( "Not found: " + e.getMessage () );
+ }
+ }
+ else if ( parts[0].equals ( "create" ) )
+ {
+ try
+ {
+ final int partitions = Integer.parseInt ( parts[2] );
+ final int replicas = Integer.parseInt ( parts[3] );
+
+ tm.createTopic ( parts[1], "", partitions, replicas );
+ }
+ catch ( HttpException e )
+ {
+ out.println ( "Problem with request: " + e.getMessage () );
+ }
+ catch ( IOException e )
+ {
+ out.println ( "Problem with request: " + e.getMessage () );
+ }
+ catch ( NumberFormatException e )
+ {
+ out.println ( "Problem with request: " + e.getMessage () );
+ }
+ }
+ else if ( parts[0].equals ( "grant" ) )
+ {
+ try
+ {
+ if ( parts[1].equals ( "write" ) )
+ {
+ tm.allowProducer ( parts[2], parts[3] );
+ }
+ else if ( parts[1].equals ( "read" ) )
+ {
+ tm.allowConsumer ( parts[2], parts[3] );
+ }
+ }
+ catch ( HttpException e )
+ {
+ out.println ( "Problem with request: " + e.getMessage () );
+ }
+ catch ( IOException e )
+ {
+ out.println ( "Problem with request: " + e.getMessage () );
+ }
+ }
+ else if ( parts[0].equals ( "revoke" ) )
+ {
+ try
+ {
+ if ( parts[1].equals ( "write" ) )
+ {
+ tm.revokeProducer ( parts[2], parts[3] );
+ }
+ else if ( parts[1].equals ( "read" ) )
+ {
+ tm.revokeConsumer ( parts[2], parts[3] );
+ }
+ }
+ catch ( HttpException e )
+ {
+ out.println ( "Problem with request: " + e.getMessage () );
+ }
+ catch ( IOException e )
+ {
+ out.println ( "Problem with request: " + e.getMessage () );
+ }
+ }
+ }
+ finally
+ {
+ tm.close ();
+ }
+ }
+
+ @Override
+ public void displayHelp ( PrintStream out )
+ {
+ out.println ( "topic list" );
+ out.println ( "topic list <topicName>" );
+ out.println ( "topic create <topicName> <partitions> <replicas>" );
+ out.println ( "topic grant write|read <topicName> <apiKey>" );
+ out.println ( "topic revoke write|read <topicName> <apiKey>" );
+ }
+
+}
diff --git a/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/tools/TraceCommand.java b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/tools/TraceCommand.java
new file mode 100644
index 0000000..280c0ad
--- /dev/null
+++ b/src/main/java/org/onap/dmaap/messagerouter/dmaapclient/nsa/mr/tools/TraceCommand.java
@@ -0,0 +1,118 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * org.onap.dmaap
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ *
+ *******************************************************************************/
+package org.onap.dmaap.messagerouter.dmaapclient.nsa.mr.tools;
+
+import java.io.PrintStream;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+
+import com.att.nsa.apiClient.http.HttpTracer;
+import com.att.nsa.cmdtool.Command;
+import com.att.nsa.cmdtool.CommandNotReadyException;
+
+public class TraceCommand implements Command<MRCommandContext>
+{
+ @Override
+ public void checkReady ( MRCommandContext context ) throws CommandNotReadyException
+ {
+ }
+
+ @Override
+ public void execute ( String[] parts, MRCommandContext context, final PrintStream out ) throws CommandNotReadyException
+ {
+ if ( parts[0].equalsIgnoreCase ( "on" ))
+ {
+ context.useTracer ( new HttpTracer ()
+ {
+ @Override
+ public void outbound ( URI uri, Map<String, List<String>> headers, String method, byte[] entity )
+ {
+ out.println ( kLineBreak );
+ out.println ( ">>> " + method + " " + uri.toString() );
+ for ( Map.Entry<String,List<String>> e : headers.entrySet () )
+ {
+ final StringBuffer vals = new StringBuffer ();
+ for ( String val : e.getValue () )
+ {
+ if ( vals.length () > 0 ) vals.append ( ", " );
+ vals.append ( val );
+ }
+ out.println ( ">>> " + e.getKey () + ": " + vals.toString() );
+ }
+ if ( entity != null )
+ {
+ out.println ();
+ out.println ( new String ( entity ) );
+ }
+ out.println ( kLineBreak );
+ }
+
+ @Override
+ public void inbound ( Map<String, List<String>> headers, int statusCode, String responseLine, byte[] entity )
+ {
+ out.println ( kLineBreak );
+ out.println ( "<<< " + responseLine );
+ for ( Map.Entry<String,List<String>> e : headers.entrySet () )
+ {
+ final StringBuffer vals = new StringBuffer ();
+ for ( String val : e.getValue () )
+ {
+ if ( vals.length () > 0 ) vals.append ( ", " );
+ vals.append ( val );
+ }
+ out.println ( "<<< " + e.getKey () + ": " + vals.toString() );
+ }
+ if ( entity != null )
+ {
+ out.println ();
+ out.println ( new String ( entity ) );
+ }
+ out.println ( kLineBreak );
+ }
+ } );
+ }
+ else
+ {
+ context.noTracer ();
+ }
+ }
+
+ @Override
+ public void displayHelp ( PrintStream out )
+ {
+ out.println ( "trace on|off" );
+ out.println ( "\tWhen trace is on, HTTP interaction is printed to the console." );
+ }
+
+ @Override
+ public String[] getMatches ()
+ {
+ return new String[]
+ {
+ "trace (on)",
+ "trace (off)"
+ };
+ }
+
+ private static final String kLineBreak = "======================================================================";
+}
diff --git a/src/main/resources/MRClientVersion.properties b/src/main/resources/MRClientVersion.properties
new file mode 100644
index 0000000..c088e10
--- /dev/null
+++ b/src/main/resources/MRClientVersion.properties
@@ -0,0 +1,23 @@
+###############################################################################
+# ============LICENSE_START=======================================================
+# org.onap.dmaap
+# ================================================================================
+# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+#
+###############################################################################
+
+MRClientVersion=${project.version}
diff --git a/src/main/resources/dme2/consumer.properties b/src/main/resources/dme2/consumer.properties
new file mode 100644
index 0000000..22dfc58
--- /dev/null
+++ b/src/main/resources/dme2/consumer.properties
@@ -0,0 +1,56 @@
+###############################################################################
+# ============LICENSE_START=======================================================
+# org.onap.dmaap
+# ================================================================================
+# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+#
+###############################################################################
+TransportType=DME2
+Latitude =47.778998
+Longitude =-122.182883
+Version =1.0
+ServiceName =dmaap-v1.dev.dmaap.dt.saat.acsi.att.com/events
+Environment =TEST
+Partner=BOT_R
+routeOffer=MR1
+SubContextPath =/
+Protocol =http
+MethodType =GET
+username =<att uid>
+password =<password>
+contenttype =application/json
+authKey=<auth key>
+authDate=2016-02-18T13:57:37-0800
+#host=uebsb91bodc.it.att.com:3904
+host=<host>:<port>
+topic=org.onap.dmaap.messagerouter.dmaapclient.ecomp_test.crm.preDemo1
+group=con
+id=5
+timeout=15000
+limit=1000
+filter=
+AFT_DME2_EXCHANGE_REQUEST_HANDLERS=org.onap.dmaap.messagerouter.dmaapclient.nsa.test.PreferredRouteRequestHandler
+AFT_DME2_EXCHANGE_REPLY_HANDLERS=org.onap.dmaap.messagerouter.dmaapclient.nsa.test.PreferredRouteReplyHandler
+AFT_DME2_REQ_TRACE_ON=true
+AFT_ENVIRONMENT=AFTUAT
+AFT_DME2_EP_CONN_TIMEOUT=15000
+AFT_DME2_ROUNDTRIP_TIMEOUT_MS=240000
+AFT_DME2_EP_READ_TIMEOUT_MS=50000
+sessionstickinessrequired=NO
+DME2preferredRouterFilePath=/src/main/resources/dme2/preferredRoute.txt
+
+
diff --git a/src/main/resources/dme2/message.txt b/src/main/resources/dme2/message.txt
new file mode 100644
index 0000000..99e97ec
--- /dev/null
+++ b/src/main/resources/dme2/message.txt
@@ -0,0 +1,2 @@
+this is a test file for producer
+this ia a sample file
\ No newline at end of file
diff --git a/src/main/resources/dme2/preferredRoute.txt b/src/main/resources/dme2/preferredRoute.txt
new file mode 100644
index 0000000..662b0aa
--- /dev/null
+++ b/src/main/resources/dme2/preferredRoute.txt
@@ -0,0 +1 @@
+preferredRouteKey=MR1
\ No newline at end of file
diff --git a/src/main/resources/dme2/producer.properties b/src/main/resources/dme2/producer.properties
new file mode 100644
index 0000000..49d3117
--- /dev/null
+++ b/src/main/resources/dme2/producer.properties
@@ -0,0 +1,54 @@
+###############################################################################
+# ============LICENSE_START=======================================================
+# org.onap.dmaap
+# ================================================================================
+# Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+#
+# ECOMP is a trademark and service mark of AT&T Intellectual Property.
+#
+###############################################################################
+TransportType=DME2
+Latitude =47.778998
+Longitude =-122.182883
+Version =1.0
+ServiceName =dmaap-v1.dev.dmaap.dt.saat.acsi.att.com/events
+#com.att.acsi.saat.dt.dmaap.dev.mrclientnew1
+Environment =TEST
+Partner=BOT_R
+routeOffer=MR1
+SubContextPath =/
+Protocol =http
+MethodType =POST
+username =<att uid>
+password =<global logon password>
+contenttype = application/json
+authKey=<auth key>
+authDate=2016-07-20T11:30:56-0700
+host=<host>:<port>
+topic=org.onap.dmaap.messagerouter.dmaapclient.ecomp_test.crm.preDemo1
+#host=uebsb91bodc.it.att.com:3904
+partition=1
+maxBatchSize=100
+maxAgeMs=250
+AFT_DME2_EXCHANGE_REQUEST_HANDLERS=org.onap.dmaap.messagerouter.dmaapclient.nsa.test.PreferredRouteRequestHandler
+AFT_DME2_EXCHANGE_REPLY_HANDLERS=org.onap.dmaap.messagerouter.dmaapclient.nsa.test.PreferredRouteReplyHandler
+AFT_DME2_REQ_TRACE_ON=true
+AFT_ENVIRONMENT=AFTUAT
+AFT_DME2_EP_CONN_TIMEOUT=15000
+AFT_DME2_ROUNDTRIP_TIMEOUT_MS=240000
+AFT_DME2_EP_READ_TIMEOUT_MS=50000
+sessionstickinessrequired=NO
+DME2preferredRouterFilePath=/src/main/resources/dme2/preferredRoute.txt
+MessageSentThreadOccurance=50
diff --git a/src/main/resources/log4j.xml b/src/main/resources/log4j.xml
new file mode 100644
index 0000000..7817a4a
--- /dev/null
+++ b/src/main/resources/log4j.xml
@@ -0,0 +1,85 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ============LICENSE_START=======================================================
+ org.onap.dmaap
+ ================================================================================
+ Copyright © 2017 AT&T Intellectual Property. All rights reserved.
+ ================================================================================
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ ============LICENSE_END=========================================================
+
+ ECOMP is a trademark and service mark of AT&T Intellectual Property.
+
+ -->
+
+<!DOCTYPE log4j:configuration PUBLIC
+ "-//APACHE//DTD LOG4J 1.2//EN" "http://logging.apache.org/log4j/1.2/apidocs/org/apache/log4j/xml/doc-files/log4j.dtd">
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/"
+ debug="false">
+
+ <appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
+ <param name="threshold" value="INFO" />
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern"
+ value="[DEV: %d{yyyy-MMM-dd HH:mm:ss,SSS}][%-5p][%-10t]%m%n" />
+ </layout>
+ </appender>
+
+<!-- <appender name="FILE" class="org.apache.log4j.RollingFileAppender">
+ <param name="threshold" value="INFO" />
+ <param name="File" value="C:\\Users\\author\\Documents\\DMaaP_ATT_Docs\\Reference Client\\MRReferenceClient.log" />
+ <param name="MaxFileSize" value="128MB" />
+ <param name="MaxBackupIndex" value="10" />
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern"
+ value="[%d{yyyy-MMM-dd HH:mm:ss,SSS}][%-5p][%-10t][%-5c][%4L]%m%n" />
+ </layout>
+ </appender>-->
+
+ <logger name="com.att" additivity="false">
+ <level value="INFO" />
+ <appender-ref ref="CONSOLE" />
+ </logger>
+ <logger name="org.onap.dmaap.messagerouter.dmaapclient" additivity="false">
+ <level value="INFO" />
+ <appender-ref ref="CONSOLE" />
+ </logger>
+
+ <!-- ############################ -->
+ <!-- ROOT -->
+ <!-- ############################ -->
+ <root>
+ <level value="INFO" />
+ <appender-ref ref="CONSOLE" />
+
+ </root>
+
+</log4j:configuration>
+<!-- Log4J Configuration Quick Reference: ====================================
+ Priority order is DEBUG < INFO < WARN < ERROR < FATAL PatternLayout conversion
+ characters: %c Category of the logging event %C Fully qualified class name
+ of the caller %d Date of the logging event (example: %d{HH:mm:ss,SSS} ) %F
+ File name where the logging request was issued (caution: extremely slow)
+ %l Location information of the caller (caution: extremely slow) %L Line number
+ from where the logging request was issued (caution: extremely slow) %m Application-supplied
+ message %M Method name from where the logging request was issued (caution:
+ extremely slow) %n Line separator %p Priority of the logging event %r Number
+ of milliseconds since the start of the application %t Name of the thread
+ that generated the logging event %x Nested diagnotic context associated with
+ the thread %% A single percent sign Format modifiers examples: %20c Left
+ pad with spaces if category is less than 20 characters long %-20c Right pad
+ with spaces if category is less than 20 characters long %.30c Truncate from
+ the beginning if category is more than 30 chars long %20.30c Left pad 20
+ chars + truncate from beginning if more than 30 chars %-20.30c Right pad
+ 20 chars + truncate from beginning if more than 30 chars Examples: "%r [%t]
+ %-5p %c %x - %m\n" "%-6r [%15.15t] %-5p %30.30c %x - %m\n" -->