Changes to the DMaap Client

Added new API to the DMaapClient

Issue-ID: DMAAP-214
Change-Id: I4de2da7ca42ad1b5925a2df9d26672875dd15b10
Signed-off-by: sunil.unnava <su622b@att.com>
diff --git a/pom.xml b/pom.xml
index d893c94..c8327cc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -25,7 +25,7 @@
 	<groupId>org.onap.dmaap.messagerouter.dmaapclient</groupId>
 	<artifactId>dmaapClient</artifactId>
 	<packaging>jar</packaging>
-	<version>1.1.0-SNAPSHOT</version>
+	<version>1.1.1-SNAPSHOT</version>
 	<name>dmaap-messagerouter-dmaapclient</name>
 	<description>Client library for MR event routing API</description>
 	<url>https://github.com/att/dmaap-framework</url>
diff --git a/src/main/java/com/att/nsa/mr/client/MRClientFactory.java b/src/main/java/com/att/nsa/mr/client/MRClientFactory.java
index 59e472c..b654282 100644
--- a/src/main/java/com/att/nsa/mr/client/MRClientFactory.java
+++ b/src/main/java/com/att/nsa/mr/client/MRClientFactory.java
@@ -43,158 +43,205 @@
 /**
  * A factory for MR clients.<br/>
  * <br/>
- * Use caution selecting a consumer creator factory. If the call doesn't accept a consumer group name, then it creates
- * a consumer that is not restartable. That is, if you stop your process and start it again, your client will NOT receive
- * any missed messages on the topic. If you need to ensure receipt of missed messages, then you must use a consumer that's
- * created with a group name and ID. (If you create multiple consumer processes using the same group, load is split across
- * them. Be sure to use a different ID for each instance.)<br/>
+ * Use caution selecting a consumer creator factory. If the call doesn't accept
+ * a consumer group name, then it creates a consumer that is not restartable.
+ * That is, if you stop your process and start it again, your client will NOT
+ * receive any missed messages on the topic. If you need to ensure receipt of
+ * missed messages, then you must use a consumer that's created with a group
+ * name and ID. (If you create multiple consumer processes using the same group,
+ * load is split across them. Be sure to use a different ID for each
+ * instance.)<br/>
  * <br/>
- * Publishers  
+ * Publishers
  * 
  * @author author
  */
-public class MRClientFactory
-{
+public class MRClientFactory {
 	public static MultivaluedMap<String, Object> HTTPHeadersMap;
 	public static Map<String, String> DME2HeadersMap;
 	public static String routeFilePath;
-	
+
 	public static FileReader routeReader;
-	
-	public static FileWriter routeWriter= null;
-	public static Properties prop=null;
-	//routeReader= new FileReader(new File (routeFilePath));
-	//props= new Properties();
+
+	public static FileWriter routeWriter = null;
+	public static Properties prop = null;
+
+	// routeReader= new FileReader(new File (routeFilePath));
+	// props= new Properties();
 	/**
-	 * Create a consumer instance with the default timeout and no limit
-	 * on messages returned. This consumer operates as an independent consumer (i.e., not in a group) and is NOT re-startable
+	 * Create a consumer instance with the default timeout and no limit on
+	 * messages returned. This consumer operates as an independent consumer
+	 * (i.e., not in a group) and is NOT re-startable across sessions.
+	 * 
+	 * @param hostList
+	 *            A comma separated list of hosts to use to connect to MR. You
+	 *            can include port numbers (3904 is the default). For example,
+	 *            "hostname:8080,"
+	 * 
+	 * @param topic
+	 *            The topic to consume
+	 * 
+	 * @return a consumer
+	 */
+	public static MRConsumer createConsumer(String hostList, String topic) {
+		return createConsumer(MRConsumerImpl.stringToList(hostList), topic);
+	}
+
+	/**
+	 * Create a consumer instance with the default timeout and no limit on
+	 * messages returned. This consumer operates as an independent consumer
+	 * (i.e., not in a group) and is NOT re-startable across sessions.
+	 * 
+	 * @param hostSet
+	 *            The host used in the URL to MR. Entries can be "host:port".
+	 * @param topic
+	 *            The topic to consume
+	 * 
+	 * @return a consumer
+	 */
+	public static MRConsumer createConsumer(Collection<String> hostSet, String topic) {
+		return createConsumer(hostSet, topic, null);
+	}
+
+	/**
+	 * Create a consumer instance with server-side filtering, the default
+	 * timeout, and no limit on messages returned. This consumer operates as an
+	 * independent consumer (i.e., not in a group) and is NOT re-startable
 	 * across sessions.
 	 * 
-	 * @param hostList A comma separated list of hosts to use to connect to MR.
-	 * You can include port numbers (3904 is the default). For example, "hostname:8080,"
-	 * 
-	 * @param topic The topic to consume
+	 * @param hostSet
+	 *            The host used in the URL to MR. Entries can be "host:port".
+	 * @param topic
+	 *            The topic to consume
+	 * @param filter
+	 *            a filter to use on the server side
 	 * 
 	 * @return a consumer
 	 */
-	public static MRConsumer createConsumer ( String hostList, String topic )
-	{
-		return createConsumer ( MRConsumerImpl.stringToList(hostList), topic );
+	public static MRConsumer createConsumer(Collection<String> hostSet, String topic, String filter) {
+		return createConsumer(hostSet, topic, UUID.randomUUID().toString(), "0", -1, -1, filter, null, null);
 	}
 
 	/**
-	 * Create a consumer instance with the default timeout and no limit
-	 * on messages returned. This consumer operates as an independent consumer (i.e., not in a group) and is NOT re-startable
-	 * across sessions.
+	 * Create a consumer instance with the default timeout, and no limit on
+	 * messages returned. This consumer can operate in a logical group and is
+	 * re-startable across sessions when you use the same group and ID on
+	 * restart.
 	 * 
-	 * @param hostSet The host used in the URL to MR. Entries can be "host:port".
-	 * @param topic The topic to consume
+	 * @param hostSet
+	 *            The host used in the URL to MR. Entries can be "host:port".
+	 * @param topic
+	 *            The topic to consume
+	 * @param consumerGroup
+	 *            The name of the consumer group this consumer is part of
+	 * @param consumerId
+	 *            The unique id of this consume in its group
 	 * 
 	 * @return a consumer
 	 */
-	public static MRConsumer createConsumer ( Collection<String> hostSet, String topic )
-	{
-		return createConsumer ( hostSet, topic, null );
+	public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
+			final String consumerId) {
+		return createConsumer(hostSet, topic, consumerGroup, consumerId, -1, -1);
 	}
 
 	/**
-	 * Create a consumer instance with server-side filtering, the default timeout, and no limit
-	 * on messages returned. This consumer operates as an independent consumer (i.e., not in a group) and is NOT re-startable
-	 * across sessions.
+	 * Create a consumer instance with the default timeout, and no limit on
+	 * messages returned. This consumer can operate in a logical group and is
+	 * re-startable across sessions when you use the same group and ID on
+	 * restart.
 	 * 
-	 * @param hostSet The host used in the URL to MR. Entries can be "host:port".
-	 * @param topic The topic to consume
-	 * @param filter a filter to use on the server side
+	 * @param hostSet
+	 *            The host used in the URL to MR. Entries can be "host:port".
+	 * @param topic
+	 *            The topic to consume
+	 * @param consumerGroup
+	 *            The name of the consumer group this consumer is part of
+	 * @param consumerId
+	 *            The unique id of this consume in its group
+	 * @param timeoutMs
+	 *            The amount of time in milliseconds that the server should keep
+	 *            the connection open while waiting for message traffic. Use -1
+	 *            for default timeout.
+	 * @param limit
+	 *            A limit on the number of messages returned in a single call.
+	 *            Use -1 for no limit.
 	 * 
 	 * @return a consumer
 	 */
-	public static MRConsumer createConsumer ( Collection<String> hostSet, String topic, String filter )
-	{
-		return createConsumer ( hostSet, topic, UUID.randomUUID ().toString (), "0", -1, -1, filter, null, null );
+	public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
+			final String consumerId, int timeoutMs, int limit) {
+		return createConsumer(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, null, null, null);
 	}
 
 	/**
-	 * Create a consumer instance with the default timeout, and no limit
-	 * on messages returned. This consumer can operate in a logical group and is re-startable
-	 * across sessions when you use the same group and ID on restart.
+	 * Create a consumer instance with the default timeout, and no limit on
+	 * messages returned. This consumer can operate in a logical group and is
+	 * re-startable across sessions when you use the same group and ID on
+	 * restart. This consumer also uses server-side filtering.
 	 * 
-	 * @param hostSet The host used in the URL to MR. Entries can be "host:port".
-	 * @param topic The topic to consume
-	 * @param consumerGroup The name of the consumer group this consumer is part of
-	 * @param consumerId The unique id of this consume in its group
+	 * @param hostList
+	 *            A comma separated list of hosts to use to connect to MR. You
+	 *            can include port numbers (3904 is the default). For example,
+	 *            "ueb01hydc.it.att.com:8080,ueb02hydc.it.att.com"
+	 * @param topic
+	 *            The topic to consume
+	 * @param consumerGroup
+	 *            The name of the consumer group this consumer is part of
+	 * @param consumerId
+	 *            The unique id of this consume in its group
+	 * @param timeoutMs
+	 *            The amount of time in milliseconds that the server should keep
+	 *            the connection open while waiting for message traffic. Use -1
+	 *            for default timeout.
+	 * @param limit
+	 *            A limit on the number of messages returned in a single call.
+	 *            Use -1 for no limit.
+	 * @param filter
+	 *            A Highland Park filter expression using only built-in filter
+	 *            components. Use null for "no filter".
 	 * 
 	 * @return a consumer
 	 */
-	public static MRConsumer createConsumer ( Collection<String> hostSet, final String topic, final String consumerGroup, final String consumerId )
-	{
-		return createConsumer ( hostSet, topic, consumerGroup, consumerId, -1, -1 );
+	public static MRConsumer createConsumer(String hostList, final String topic, final String consumerGroup,
+			final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) {
+		return createConsumer(MRConsumerImpl.stringToList(hostList), topic, consumerGroup, consumerId, timeoutMs, limit,
+				filter, apiKey, apiSecret);
 	}
 
 	/**
-	 * Create a consumer instance with the default timeout, and no limit
-	 * on messages returned. This consumer can operate in a logical group and is re-startable
-	 * across sessions when you use the same group and ID on restart.
+	 * Create a consumer instance with the default timeout, and no limit on
+	 * messages returned. This consumer can operate in a logical group and is
+	 * re-startable across sessions when you use the same group and ID on
+	 * restart. This consumer also uses server-side filtering.
 	 * 
-	 * @param hostSet The host used in the URL to MR. Entries can be "host:port".
-	 * @param topic The topic to consume
-	 * @param consumerGroup The name of the consumer group this consumer is part of
-	 * @param consumerId The unique id of this consume in its group
-	 * @param timeoutMs	The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic. Use -1 for default timeout.
-	 * @param limit A limit on the number of messages returned in a single call. Use -1 for no limit.
+	 * @param hostSet
+	 *            The host used in the URL to MR. Entries can be "host:port".
+	 * @param topic
+	 *            The topic to consume
+	 * @param consumerGroup
+	 *            The name of the consumer group this consumer is part of
+	 * @param consumerId
+	 *            The unique id of this consume in its group
+	 * @param timeoutMs
+	 *            The amount of time in milliseconds that the server should keep
+	 *            the connection open while waiting for message traffic. Use -1
+	 *            for default timeout.
+	 * @param limit
+	 *            A limit on the number of messages returned in a single call.
+	 *            Use -1 for no limit.
+	 * @param filter
+	 *            A Highland Park filter expression using only built-in filter
+	 *            components. Use null for "no filter".
 	 * 
 	 * @return a consumer
 	 */
-	public static MRConsumer createConsumer ( Collection<String> hostSet, final String topic, final String consumerGroup, final String consumerId, int timeoutMs, int limit)
-	{
-		return createConsumer ( hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, null, null, null );
-	}
-
-	/**
-	 * Create a consumer instance with the default timeout, and no limit
-	 * on messages returned. This consumer can operate in a logical group and is re-startable
-	 * across sessions when you use the same group and ID on restart. This consumer also uses
-	 * server-side filtering.
-	 * 
-	 * @param hostList A comma separated list of hosts to use to connect to MR.
-	 * You can include port numbers (3904 is the default). For example, "ueb01hydc.it.att.com:8080,ueb02hydc.it.att.com"
-	 * @param topic The topic to consume
-	 * @param consumerGroup The name of the consumer group this consumer is part of
-	 * @param consumerId The unique id of this consume in its group
-	 * @param timeoutMs	The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic. Use -1 for default timeout.
-	 * @param limit A limit on the number of messages returned in a single call. Use -1 for no limit.
-	 * @param filter A Highland Park filter expression using only built-in filter components. Use null for "no filter".
-	 * 
-	 * @return a consumer
-	 */
-	public static MRConsumer createConsumer ( String hostList, final String topic, final String consumerGroup,
-		final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret )
-	{
-		return createConsumer ( MRConsumerImpl.stringToList(hostList), topic, consumerGroup,
-			consumerId, timeoutMs, limit, filter, apiKey, apiSecret );
-	}
-
-	/**
-	 * Create a consumer instance with the default timeout, and no limit
-	 * on messages returned. This consumer can operate in a logical group and is re-startable
-	 * across sessions when you use the same group and ID on restart. This consumer also uses
-	 * server-side filtering.
-	 * 
-	 * @param hostSet The host used in the URL to MR. Entries can be "host:port".
-	 * @param topic The topic to consume
-	 * @param consumerGroup The name of the consumer group this consumer is part of
-	 * @param consumerId The unique id of this consume in its group
-	 * @param timeoutMs	The amount of time in milliseconds that the server should keep the connection open while waiting for message traffic. Use -1 for default timeout.
-	 * @param limit A limit on the number of messages returned in a single call. Use -1 for no limit.
-	 * @param filter A Highland Park filter expression using only built-in filter components. Use null for "no filter".
-	 * 
-	 * @return a consumer
-	 */
-	public static MRConsumer createConsumer ( Collection<String> hostSet, final String topic, final String consumerGroup,
-		final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret )
-	{
-		if ( MRClientBuilders.sfConsumerMock != null ) return MRClientBuilders.sfConsumerMock;
+	public static MRConsumer createConsumer(Collection<String> hostSet, final String topic, final String consumerGroup,
+			final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret) {
+		if (MRClientBuilders.sfConsumerMock != null)
+			return MRClientBuilders.sfConsumerMock;
 		try {
-			return new MRConsumerImpl ( hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey, apiSecret );
+			return new MRConsumerImpl(hostSet, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey,
+					apiSecret);
 		} catch (MalformedURLException e) {
 			throw new RuntimeException(e);
 		}
@@ -203,282 +250,339 @@
 	/*************************************************************************/
 	/*************************************************************************/
 	/*************************************************************************/
-	
+
 	/**
-	 * Create a publisher that sends each message (or group of messages) immediately. Most
-	 * applications should favor higher latency for much higher message throughput and the
-	 * "simple publisher" is not a good choice. 
-	 *  
-	 * @param hostlist The host used in the URL to MR. Can be "host:port", can be multiple comma-separated entries.
-	 * @param topic The topic on which to publish messages.
+	 * Create a publisher that sends each message (or group of messages)
+	 * immediately. Most applications should favor higher latency for much
+	 * higher message throughput and the "simple publisher" is not a good
+	 * choice.
+	 * 
+	 * @param hostlist
+	 *            The host used in the URL to MR. Can be "host:port", can be
+	 *            multiple comma-separated entries.
+	 * @param topic
+	 *            The topic on which to publish messages.
 	 * @return a publisher
 	 */
-	public static MRBatchingPublisher createSimplePublisher ( String hostlist, String topic )
-	{
-		return createBatchingPublisher ( hostlist, topic, 1, 1 );
+	public static MRBatchingPublisher createSimplePublisher(String hostlist, String topic) {
+		return createBatchingPublisher(hostlist, topic, 1, 1);
 	}
 
 	/**
-	 * Create a publisher that batches messages. Be sure to close the publisher to
-	 * send the last batch and ensure a clean shutdown. Message payloads are not compressed.
+	 * Create a publisher that batches messages. Be sure to close the publisher
+	 * to send the last batch and ensure a clean shutdown. Message payloads are
+	 * not compressed.
 	 * 
-	 * @param hostlist The host used in the URL to MR. Can be "host:port", can be multiple comma-separated entries.
-	 * @param topic The topic on which to publish messages.
-	 * @param maxBatchSize The largest set of messages to batch
-	 * @param maxAgeMs The maximum age of a message waiting in a batch
+	 * @param hostlist
+	 *            The host used in the URL to MR. Can be "host:port", can be
+	 *            multiple comma-separated entries.
+	 * @param topic
+	 *            The topic on which to publish messages.
+	 * @param maxBatchSize
+	 *            The largest set of messages to batch
+	 * @param maxAgeMs
+	 *            The maximum age of a message waiting in a batch
 	 * 
 	 * @return a publisher
 	 */
-	public static MRBatchingPublisher createBatchingPublisher ( String hostlist, String topic, int maxBatchSize, long maxAgeMs )
-	{
-		return createBatchingPublisher ( hostlist, topic, maxBatchSize, maxAgeMs, false );
+	public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize,
+			long maxAgeMs) {
+		return createBatchingPublisher(hostlist, topic, maxBatchSize, maxAgeMs, false);
 	}
 
 	/**
-	 * Create a publisher that batches messages. Be sure to close the publisher to
-	 * send the last batch and ensure a clean shutdown. 
+	 * Create a publisher that batches messages. Be sure to close the publisher
+	 * to send the last batch and ensure a clean shutdown.
 	 * 
-	 * @param hostlist The host used in the URL to MR. Can be "host:port", can be multiple comma-separated entries.
-	 * @param topic The topic on which to publish messages.
-	 * @param maxBatchSize The largest set of messages to batch
-	 * @param maxAgeMs The maximum age of a message waiting in a batch
-	 * @param compress use gzip compression
+	 * @param hostlist
+	 *            The host used in the URL to MR. Can be "host:port", can be
+	 *            multiple comma-separated entries.
+	 * @param topic
+	 *            The topic on which to publish messages.
+	 * @param maxBatchSize
+	 *            The largest set of messages to batch
+	 * @param maxAgeMs
+	 *            The maximum age of a message waiting in a batch
+	 * @param compress
+	 *            use gzip compression
 	 * 
 	 * @return a publisher
 	 */
-	public static MRBatchingPublisher createBatchingPublisher ( String hostlist, String topic, int maxBatchSize, long maxAgeMs, boolean compress )
-	{
-		return createBatchingPublisher ( MRConsumerImpl.stringToList(hostlist), topic, maxBatchSize, maxAgeMs, compress );
+	public static MRBatchingPublisher createBatchingPublisher(String hostlist, String topic, int maxBatchSize,
+			long maxAgeMs, boolean compress) {
+		return createBatchingPublisher(MRConsumerImpl.stringToList(hostlist), topic, maxBatchSize, maxAgeMs, compress);
 	}
 
 	/**
-	 * Create a publisher that batches messages. Be sure to close the publisher to
-	 * send the last batch and ensure a clean shutdown. 
+	 * Create a publisher that batches messages. Be sure to close the publisher
+	 * to send the last batch and ensure a clean shutdown.
 	 * 
-	 * @param hostSet A set of hosts to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover.
-	 * @param topic The topic on which to publish messages.
-	 * @param maxBatchSize The largest set of messages to batch
-	 * @param maxAgeMs The maximum age of a message waiting in a batch
-	 * @param compress use gzip compression
+	 * @param hostSet
+	 *            A set of hosts to be used in the URL to MR. Can be
+	 *            "host:port". Use multiple entries to enable failover.
+	 * @param topic
+	 *            The topic on which to publish messages.
+	 * @param maxBatchSize
+	 *            The largest set of messages to batch
+	 * @param maxAgeMs
+	 *            The maximum age of a message waiting in a batch
+	 * @param compress
+	 *            use gzip compression
 	 * 
 	 * @return a publisher
 	 */
-	public static MRBatchingPublisher createBatchingPublisher ( String[] hostSet, String topic, int maxBatchSize, long maxAgeMs, boolean compress )
-	{
-		final TreeSet<String> hosts = new TreeSet<String> ();
-		for ( String hp : hostSet )
-		{
-			hosts.add ( hp );
+	public static MRBatchingPublisher createBatchingPublisher(String[] hostSet, String topic, int maxBatchSize,
+			long maxAgeMs, boolean compress) {
+		final TreeSet<String> hosts = new TreeSet<String>();
+		for (String hp : hostSet) {
+			hosts.add(hp);
 		}
-		return createBatchingPublisher ( hosts, topic, maxBatchSize, maxAgeMs, compress );
+		return createBatchingPublisher(hosts, topic, maxBatchSize, maxAgeMs, compress);
 	}
 
 	/**
-	 * Create a publisher that batches messages. Be sure to close the publisher to
-	 * send the last batch and ensure a clean shutdown. 
+	 * Create a publisher that batches messages. Be sure to close the publisher
+	 * to send the last batch and ensure a clean shutdown.
 	 * 
-	 * @param hostSet A set of hosts to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover.
-	 * @param topic The topic on which to publish messages.
-	 * @param maxBatchSize The largest set of messages to batch
-	 * @param maxAgeMs The maximum age of a message waiting in a batch
-	 * @param compress use gzip compression
+	 * @param hostSet
+	 *            A set of hosts to be used in the URL to MR. Can be
+	 *            "host:port". Use multiple entries to enable failover.
+	 * @param topic
+	 *            The topic on which to publish messages.
+	 * @param maxBatchSize
+	 *            The largest set of messages to batch
+	 * @param maxAgeMs
+	 *            The maximum age of a message waiting in a batch
+	 * @param compress
+	 *            use gzip compression
 	 * 
 	 * @return a publisher
 	 */
-	public static MRBatchingPublisher createBatchingPublisher ( Collection<String> hostSet, String topic, int maxBatchSize, long maxAgeMs, boolean compress )
-	{
-		return new MRSimplerBatchPublisher.Builder ().
-			againstUrls ( hostSet ).
-			onTopic ( topic ).
-			batchTo ( maxBatchSize, maxAgeMs ).
-			compress ( compress ).
-			build ();
+	public static MRBatchingPublisher createBatchingPublisher(Collection<String> hostSet, String topic,
+			int maxBatchSize, long maxAgeMs, boolean compress) {
+		return new MRSimplerBatchPublisher.Builder().againstUrls(hostSet).onTopic(topic).batchTo(maxBatchSize, maxAgeMs)
+				.compress(compress).build();
 	}
-	
+
 	/**
-	 * Create a publisher that batches messages. Be sure to close the publisher to
-	 * send the last batch and ensure a clean shutdown. 
-	 * @param host A host to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover.
-	 * @param topic The topic on which to publish messages.
-	 * @param username username 
-	 * @param password password
-	 * @param maxBatchSize The largest set of messages to batch
-	 * @param maxAgeMs The maximum age of a message waiting in a batch
-	 * @param compress use gzip compression
-	 * @param protocolFlag  http auth or ueb auth or dme2 method
-	 * @param producerFilePath all properties for publisher
+	 * Create a publisher that batches messages. Be sure to close the publisher
+	 * to send the last batch and ensure a clean shutdown.
+	 * 
+	 * @param host
+	 *            A host to be used in the URL to MR. Can be "host:port". Use
+	 *            multiple entries to enable failover.
+	 * @param topic
+	 *            The topic on which to publish messages.
+	 * @param username
+	 *            username
+	 * @param password
+	 *            password
+	 * @param maxBatchSize
+	 *            The largest set of messages to batch
+	 * @param maxAgeMs
+	 *            The maximum age of a message waiting in a batch
+	 * @param compress
+	 *            use gzip compression
+	 * @param protocolFlag
+	 *            http auth or ueb auth or dme2 method
+	 * @param producerFilePath
+	 *            all properties for publisher
 	 * @return MRBatchingPublisher obj
 	 */
-	public static MRBatchingPublisher createBatchingPublisher ( String host, String topic, final String username, final String password, int maxBatchSize, long maxAgeMs, boolean compress, String protocolFlag, String producerFilePath )
-	{
-		MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder ().
-			againstUrls(MRConsumerImpl.stringToList(host)).
-			onTopic ( topic ).
-			batchTo ( maxBatchSize, maxAgeMs ).
-			compress ( compress ).
-			build ();
-		
+	public static MRBatchingPublisher createBatchingPublisher(String host, String topic, final String username,
+			final String password, int maxBatchSize, long maxAgeMs, boolean compress, String protocolFlag,
+			String producerFilePath) {
+		MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder()
+				.againstUrls(MRConsumerImpl.stringToList(host)).onTopic(topic).batchTo(maxBatchSize, maxAgeMs)
+				.compress(compress).build();
+
 		pub.setHost(host);
 		pub.setUsername(username);
 		pub.setPassword(password);
 		pub.setProtocolFlag(protocolFlag);
-		pub.setProducerFilePath(producerFilePath);
 		return pub;
 	}
-	
-	
+
 	/**
-	 * Create a publisher that batches messages. Be sure to close the publisher to
-	 * send the last batch and ensure a clean shutdown
-	 * @param producerFilePath set all properties for publishing message
+	 * Create a publisher that batches messages. Be sure to close the publisher
+	 * to send the last batch and ensure a clean shutdown
+	 * 
+	 * @param Properties
+	 *            props set all properties for publishing message
 	 * @return MRBatchingPublisher obj
-	 * @throws FileNotFoundException exc
-	 * @throws IOException ioex
+	 * @throws FileNotFoundException
+	 *             exc
+	 * @throws IOException
+	 *             ioex
 	 */
-	public static MRBatchingPublisher createBatchingPublisher ( final String producerFilePath ) throws FileNotFoundException,IOException	{
-		FileReader reader = new FileReader(new File (producerFilePath));
-		Properties props = new Properties();		
+	public static MRBatchingPublisher createBatchingPublisher(Properties props, boolean withResponse)
+			throws FileNotFoundException, IOException {
+		return createInternalBatchingPublisher(props, withResponse);
+	}
+
+	/**
+	 * Create a publisher that batches messages. Be sure to close the publisher
+	 * to send the last batch and ensure a clean shutdown
+	 * 
+	 * @param Properties
+	 *            props set all properties for publishing message
+	 * @return MRBatchingPublisher obj
+	 * @throws FileNotFoundException
+	 *             exc
+	 * @throws IOException
+	 *             ioex
+	 */
+	public static MRBatchingPublisher createBatchingPublisher(Properties props)
+			throws FileNotFoundException, IOException {
+		return createInternalBatchingPublisher(props, false);
+	}
+
+	/**
+	 * Create a publisher that batches messages. Be sure to close the publisher
+	 * to send the last batch and ensure a clean shutdown
+	 * 
+	 * @param producerFilePath
+	 *            set all properties for publishing message
+	 * @return MRBatchingPublisher obj
+	 * @throws FileNotFoundException
+	 *             exc
+	 * @throws IOException
+	 *             ioex
+	 */
+	public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath)
+			throws FileNotFoundException, IOException {
+		FileReader reader = new FileReader(new File(producerFilePath));
+		Properties props = new Properties();
 		props.load(reader);
-		MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder ().
-			againstUrls(MRConsumerImpl.stringToList(props.getProperty("host"))).
-			onTopic ( props.getProperty("topic") ).
-			batchTo (Integer.parseInt(props.getProperty("maxBatchSize")), Integer.parseInt(props.getProperty("maxAgeMs").toString())).
-			compress (Boolean.parseBoolean(props.getProperty("compress"))).
-			httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))).
-			build ();		
+		return createBatchingPublisher(props);
+	}
+
+	/**
+	 * Create a publisher that will contain send methods that return response
+	 * object to user.
+	 * 
+	 * @param producerFilePath
+	 *            set all properties for publishing message
+	 * @return MRBatchingPublisher obj
+	 * @throws FileNotFoundException
+	 *             exc
+	 * @throws IOException
+	 *             ioex
+	 */
+	public static MRBatchingPublisher createBatchingPublisher(final String producerFilePath, boolean withResponse)
+			throws FileNotFoundException, IOException {
+		FileReader reader = new FileReader(new File(producerFilePath));
+		Properties props = new Properties();
+		props.load(reader);
+		return createBatchingPublisher(props, withResponse);
+	}
+
+	protected static MRBatchingPublisher createInternalBatchingPublisher(Properties props, boolean withResponse)
+			throws FileNotFoundException, IOException {
+		assert props != null;
+		MRSimplerBatchPublisher pub;
+		if (withResponse) {
+			pub = new MRSimplerBatchPublisher.Builder()
+					.againstUrls(MRConsumerImpl.stringToList(props.getProperty("host")))
+					.onTopic(props.getProperty("topic"))
+					.batchTo(Integer.parseInt(props.getProperty("maxBatchSize")),
+							Integer.parseInt(props.getProperty("maxAgeMs").toString()))
+					.compress(Boolean.parseBoolean(props.getProperty("compress")))
+					.httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance")))
+					.withResponse(withResponse).build();
+		} else {
+			pub = new MRSimplerBatchPublisher.Builder()
+					.againstUrls(MRConsumerImpl.stringToList(props.getProperty("host")))
+					.onTopic(props.getProperty("topic"))
+					.batchTo(Integer.parseInt(props.getProperty("maxBatchSize")),
+							Integer.parseInt(props.getProperty("maxAgeMs").toString()))
+					.compress(Boolean.parseBoolean(props.getProperty("compress")))
+					.httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))).build();
+		}
 		pub.setHost(props.getProperty("host"));
-		if(props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())){
-				
+		if (props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) {
+
 			pub.setAuthKey(props.getProperty("authKey"));
 			pub.setAuthDate(props.getProperty("authDate"));
 			pub.setUsername(props.getProperty("username"));
 			pub.setPassword(props.getProperty("password"));
-		}else{
+		} else {
 			pub.setUsername(props.getProperty("username"));
 			pub.setPassword(props.getProperty("password"));
 		}
-		pub.setProducerFilePath(producerFilePath);
 		pub.setProtocolFlag(props.getProperty("TransportType"));
 		pub.setProps(props);
-		routeFilePath=props.getProperty("DME2preferredRouterFilePath");
-		routeReader= new FileReader(new File (routeFilePath));
-		prop= new Properties();
-		File fo= new File(routeFilePath);
-		if(!fo.exists()){
-			routeWriter=new FileWriter(new File(routeFilePath));
+		routeFilePath = props.getProperty("DME2preferredRouterFilePath");
+		routeReader = new FileReader(new File(routeFilePath));
+		prop = new Properties();
+		File fo = new File(routeFilePath);
+		if (!fo.exists()) {
+			routeWriter = new FileWriter(new File(routeFilePath));
 		}
-		//pub.setContentType(contentType);
 		return pub;
 	}
-	
-	/**
-	 * Create a publisher that will contain send methods that return 
-	 * response object to user. 
-	 * @param producerFilePath set all properties for publishing message
-	 * @return MRBatchingPublisher obj
-	 * @throws FileNotFoundException exc
-	 * @throws IOException ioex
-	 */
-	public static MRBatchingPublisher createBatchingPublisher ( final String producerFilePath, boolean withResponse ) throws FileNotFoundException,IOException	{
-		FileReader reader = new FileReader(new File (producerFilePath));
-		Properties props = new Properties();		
-		props.load(reader);
-		MRSimplerBatchPublisher pub = new MRSimplerBatchPublisher.Builder ().
-			againstUrls(MRConsumerImpl.stringToList(props.getProperty("host"))).
-			onTopic ( props.getProperty("topic") ).
-			batchTo (Integer.parseInt(props.getProperty("maxBatchSize")), Integer.parseInt(props.getProperty("maxAgeMs").toString())).
-			compress (Boolean.parseBoolean(props.getProperty("compress"))).
-			httpThreadTime(Integer.parseInt(props.getProperty("MessageSentThreadOccurance"))).
-			withResponse(withResponse).
-			build ();		
-		pub.setHost(props.getProperty("host"));
-		if(props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())){
-				
-			pub.setAuthKey(props.getProperty("authKey"));
-			pub.setAuthDate(props.getProperty("authDate"));
-			pub.setUsername(props.getProperty("username"));
-			pub.setPassword(props.getProperty("password"));
-		}else{
-			pub.setUsername(props.getProperty("username"));
-			pub.setPassword(props.getProperty("password"));
-		}
-		pub.setProducerFilePath(producerFilePath);
-		pub.setProtocolFlag(props.getProperty("TransportType"));
-		pub.setProps(props);
-		routeFilePath=props.getProperty("DME2preferredRouterFilePath");
-		routeReader= new FileReader(new File (routeFilePath));
-		prop= new Properties();
-		File fo= new File(routeFilePath);
-		if(!fo.exists()){
-			routeWriter=new FileWriter(new File(routeFilePath));
-		}
-		//pub.setContentType(contentType);
-		return pub;
-	}
-	
-	
-	
 
-	
-	
-	
-	
-	
-
-	
 	/**
 	 * Create an identity manager client to work with API keys.
-	 * @param hostSet A set of hosts to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover.
-	 * @param apiKey Your API key
-	 * @param apiSecret Your API secret
+	 * 
+	 * @param hostSet
+	 *            A set of hosts to be used in the URL to MR. Can be
+	 *            "host:port". Use multiple entries to enable failover.
+	 * @param apiKey
+	 *            Your API key
+	 * @param apiSecret
+	 *            Your API secret
 	 * @return an identity manager
 	 */
-	public static MRIdentityManager createIdentityManager ( Collection<String> hostSet, String apiKey, String apiSecret )
-	{
+	public static MRIdentityManager createIdentityManager(Collection<String> hostSet, String apiKey, String apiSecret) {
 		MRIdentityManager cim;
 		try {
-			cim = new MRMetaClient ( hostSet );
+			cim = new MRMetaClient(hostSet);
 		} catch (MalformedURLException e) {
 			throw new RuntimeException(e);
 		}
-		cim.setApiCredentials ( apiKey, apiSecret );
+		cim.setApiCredentials(apiKey, apiSecret);
 		return cim;
 	}
 
 	/**
 	 * Create a topic manager for working with topics.
-	 * @param hostSet A set of hosts to be used in the URL to MR. Can be "host:port". Use multiple entries to enable failover.
-	 * @param apiKey Your API key
-	 * @param apiSecret Your API secret
+	 * 
+	 * @param hostSet
+	 *            A set of hosts to be used in the URL to MR. Can be
+	 *            "host:port". Use multiple entries to enable failover.
+	 * @param apiKey
+	 *            Your API key
+	 * @param apiSecret
+	 *            Your API secret
 	 * @return a topic manager
 	 */
-	public static MRTopicManager createTopicManager ( Collection<String> hostSet, String apiKey, String apiSecret )
-	{
+	public static MRTopicManager createTopicManager(Collection<String> hostSet, String apiKey, String apiSecret) {
 		MRMetaClient tmi;
 		try {
-			tmi = new MRMetaClient ( hostSet );
+			tmi = new MRMetaClient(hostSet);
 		} catch (MalformedURLException e) {
 			throw new RuntimeException(e);
 		}
-		tmi.setApiCredentials ( apiKey, apiSecret );
+		tmi.setApiCredentials(apiKey, apiSecret);
 		return tmi;
 	}
 
 	/**
 	 * Inject a consumer. Used to support unit tests.
+	 * 
 	 * @param cc
 	 */
-	public static void $testInject ( MRConsumer cc )
-	{
+	public static void $testInject(MRConsumer cc) {
 		MRClientBuilders.sfConsumerMock = cc;
 	}
 
-	public static MRConsumer createConsumer(String host, String topic, String username,
-			String password, String group, String id, int i, int j,String protocalFlag,String consumerFilePath) {
+	public static MRConsumer createConsumer(String host, String topic, String username, String password, String group,
+			String id, int i, int j, String protocalFlag, String consumerFilePath) {
 
 		MRConsumerImpl sub;
 		try {
-			sub = new MRConsumerImpl ( MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null );
+			sub = new MRConsumerImpl(MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null);
 		} catch (MalformedURLException e) {
 			throw new RuntimeException(e);
 		}
@@ -488,15 +592,15 @@
 		sub.setProtocolFlag(protocalFlag);
 		sub.setConsumerFilePath(consumerFilePath);
 		return sub;
-	
+
 	}
-	
-	public static MRConsumer createConsumer(String host, String topic, String username,
-			String password, String group, String id,String protocalFlag,String consumerFilePath, int i, int j) {
+
+	public static MRConsumer createConsumer(String host, String topic, String username, String password, String group,
+			String id, String protocalFlag, String consumerFilePath, int i, int j) {
 
 		MRConsumerImpl sub;
 		try {
-			sub = new MRConsumerImpl ( MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null );
+			sub = new MRConsumerImpl(MRConsumerImpl.stringToList(host), topic, group, id, i, j, null, null, null);
 		} catch (MalformedURLException e) {
 			throw new RuntimeException(e);
 		}
@@ -506,52 +610,61 @@
 		sub.setProtocolFlag(protocalFlag);
 		sub.setConsumerFilePath(consumerFilePath);
 		return sub;
-	
+
 	}
 
-	public static MRConsumer createConsumer(String consumerFilePath) throws FileNotFoundException,IOException {
-		FileReader reader = new FileReader(new File (consumerFilePath));
-		Properties props = new Properties();		
+	public static MRConsumer createConsumer(String consumerFilePath) throws FileNotFoundException, IOException {
+		FileReader reader = new FileReader(new File(consumerFilePath));
+		Properties props = new Properties();
 		props.load(reader);
+
+		return createConsumer(props);
+	}
+
+	public static MRConsumer createConsumer(Properties props) throws FileNotFoundException, IOException {
 		int timeout;
-		if(props.getProperty("timeout")!=null)
-			timeout=Integer.parseInt(props.getProperty("timeout"));
+		if (props.getProperty("timeout") != null)
+			timeout = Integer.parseInt(props.getProperty("timeout"));
 		else
-			timeout=-1;
+			timeout = -1;
 		int limit;
-		if(props.getProperty("limit")!=null)
-			limit=Integer.parseInt(props.getProperty("limit"));
+		if (props.getProperty("limit") != null)
+			limit = Integer.parseInt(props.getProperty("limit"));
 		else
-			limit=-1;
+			limit = -1;
 		String group;
-		if(props.getProperty("group")==null)
-		group=UUID.randomUUID ().toString();
+		if (props.getProperty("group") == null)
+			group = UUID.randomUUID().toString();
 		else
-			group=props.getProperty("group");
-		MRConsumerImpl sub=null;
-		if(props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())){
-			sub = new MRConsumerImpl ( MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty("topic"), group, props.getProperty("id"),timeout,  limit, props.getProperty("filter"),props.getProperty("authKey"), props.getProperty("authDate")  );
+			group = props.getProperty("group");
+		MRConsumerImpl sub = null;
+		if (props.getProperty("TransportType").equalsIgnoreCase(ProtocolTypeConstants.AUTH_KEY.getValue())) {
+			sub = new MRConsumerImpl(MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty("topic"),
+					group, props.getProperty("id"), timeout, limit, props.getProperty("filter"),
+					props.getProperty("authKey"), props.getProperty("authDate"));
 			sub.setAuthKey(props.getProperty("authKey"));
 			sub.setAuthDate(props.getProperty("authDate"));
 			sub.setUsername(props.getProperty("username"));
 			sub.setPassword(props.getProperty("password"));
-		}else{
-			sub = new MRConsumerImpl ( MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty("topic"), group, props.getProperty("id"), timeout, limit, props.getProperty("filter"),props.getProperty("username"), props.getProperty("password")  );
+		} else {
+			sub = new MRConsumerImpl(MRConsumerImpl.stringToList(props.getProperty("host")), props.getProperty("topic"),
+					group, props.getProperty("id"), timeout, limit, props.getProperty("filter"),
+					props.getProperty("username"), props.getProperty("password"));
 			sub.setUsername(props.getProperty("username"));
 			sub.setPassword(props.getProperty("password"));
 		}
 		sub.setRouterFilePath(props.getProperty("DME2preferredRouterFilePath"));
-	    sub.setProps(props);
+		sub.setProps(props);
 		sub.setHost(props.getProperty("host"));
 		sub.setProtocolFlag(props.getProperty("TransportType"));
-		//sub.setConsumerFilePath(consumerFilePath);
+		// sub.setConsumerFilePath(consumerFilePath);
 		sub.setfFilter(props.getProperty("filter"));
-		routeFilePath=props.getProperty("DME2preferredRouterFilePath");
-		routeReader= new FileReader(new File (routeFilePath));
-		prop= new Properties();
-		File fo= new File(routeFilePath);
-		if(!fo.exists()){
-				routeWriter=new FileWriter(new File(routeFilePath));
+		routeFilePath = props.getProperty("DME2preferredRouterFilePath");
+		routeReader = new FileReader(new File(routeFilePath));
+		prop = new Properties();
+		File fo = new File(routeFilePath);
+		if (!fo.exists()) {
+			routeWriter = new FileWriter(new File(routeFilePath));
 		}
 		return sub;
 	}
diff --git a/src/main/java/com/att/nsa/mr/client/impl/MRBaseClient.java b/src/main/java/com/att/nsa/mr/client/impl/MRBaseClient.java
index 012e95e..999d7ef 100644
--- a/src/main/java/com/att/nsa/mr/client/impl/MRBaseClient.java
+++ b/src/main/java/com/att/nsa/mr/client/impl/MRBaseClient.java
@@ -50,286 +50,318 @@
 import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
 //import com.fasterxml.jackson.core.JsonProcessingException;
 
-public class MRBaseClient extends HttpClient implements MRClient 
-{
-	
+public class MRBaseClient extends HttpClient implements MRClient {
+
 	private static final String MR_AUTH_CONSTANT = "X-CambriaAuth";
 	private static final String MR_DATE_CONSTANT = "X-CambriaDate";
-	
-	protected MRBaseClient ( Collection<String> hosts ) throws MalformedURLException
-	{
-		super ( ConnectionType.HTTP, hosts, MRConstants.kStdMRServicePort );
-		
-		fLog = LoggerFactory.getLogger ( this.getClass().getName () );
+
+	protected MRBaseClient(Collection<String> hosts) throws MalformedURLException {
+		super(ConnectionType.HTTP, hosts, MRConstants.kStdMRServicePort);
+
+		fLog = LoggerFactory.getLogger(this.getClass().getName());
 	}
 
-	protected MRBaseClient ( Collection<String> hosts, int stdSvcPort ) throws MalformedURLException {
-		super ( ConnectionType.HTTP,hosts, stdSvcPort);
+	protected MRBaseClient(Collection<String> hosts, int stdSvcPort) throws MalformedURLException {
+		super(ConnectionType.HTTP, hosts, stdSvcPort);
 
-		fLog = LoggerFactory.getLogger ( this.getClass().getName () );
+		fLog = LoggerFactory.getLogger(this.getClass().getName());
 	}
 
-	protected MRBaseClient ( Collection<String> hosts, String clientSignature ) throws MalformedURLException
-	{
-		super(ConnectionType.HTTP, hosts, MRConstants.kStdMRServicePort, clientSignature, CacheUse.NONE, 1, 1L, TimeUnit.MILLISECONDS, 32, 32, 600000);
+	protected MRBaseClient(Collection<String> hosts, String clientSignature) throws MalformedURLException {
+		super(ConnectionType.HTTP, hosts, MRConstants.kStdMRServicePort, clientSignature, CacheUse.NONE, 1, 1L,
+				TimeUnit.MILLISECONDS, 32, 32, 600000);
 
-		fLog = LoggerFactory.getLogger ( this.getClass().getName () );
+		fLog = LoggerFactory.getLogger(this.getClass().getName());
 	}
 
-
 	@Override
-	public void close ()
-	{
+	public void close() {
 	}
 
-	protected Set<String> jsonArrayToSet ( JSONArray a )
-	{
-		if ( a == null ) return null;
+	protected Set<String> jsonArrayToSet(JSONArray a) {
+		if (a == null)
+			return null;
 
-		final TreeSet<String> set = new TreeSet<String> ();
-		for ( int i=0; i<a.length (); i++ )
-		{
-			set.add ( a.getString ( i ));
+		final TreeSet<String> set = new TreeSet<String>();
+		for (int i = 0; i < a.length(); i++) {
+			set.add(a.getString(i));
 		}
 		return set;
 	}
 
-	public void logTo ( Logger log )
-	{
+	public void logTo(Logger log) {
 		fLog = log;
-		replaceLogger ( log );
+		replaceLogger(log);
 	}
 
-	protected Logger getLog ()
-	{
+	protected Logger getLog() {
 		return fLog;
 	}
 
 	private Logger fLog;
-	
-	public JSONObject post(final String path, final byte[] data, final String contentType, final String username, final String password, final String protocolFlag) throws HttpException, JSONException{
+
+	public JSONObject post(final String path, final byte[] data, final String contentType, final String username,
+			final String password, final String protocolFlag) throws HttpException, JSONException {
 		if ((null != username && null != password)) {
 			WebTarget target = null;
 
 			Response response = null;
-			
+
 			target = getTarget(path, username, password);
-			String encoding = Base64.encodeAsString(username+":"+password);
-			
-			
-			response = target.request().header("Authorization", "Basic " + encoding).post(Entity.entity(data, contentType));
-			
+			String encoding = Base64.encodeAsString(username + ":" + password);
+
+			response = target.request().header("Authorization", "Basic " + encoding)
+					.post(Entity.entity(data, contentType));
+
 			return getResponseDataInJson(response);
 		} else {
-			throw new HttpException("Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty.");
+			throw new HttpException(
+					"Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty.");
 		}
 	}
-	public String postWithResponse(final String path, final byte[] data, final String contentType, final String username, final String password, final String protocolFlag) throws HttpException, JSONException{
+
+	public String postWithResponse(final String path, final byte[] data, final String contentType,
+			final String username, final String password, final String protocolFlag)
+			throws HttpException, JSONException {
 		String responseData = null;
 		if ((null != username && null != password)) {
 			WebTarget target = null;
 
 			Response response = null;
-			
+
 			target = getTarget(path, username, password);
-			String encoding = Base64.encodeAsString(username+":"+password);
-			
-			
-			response = target.request().header("Authorization", "Basic " + encoding).post(Entity.entity(data, contentType));
-			
+			String encoding = Base64.encodeAsString(username + ":" + password);
+
+			response = target.request().header("Authorization", "Basic " + encoding)
+					.post(Entity.entity(data, contentType));
+
 			responseData = response.readEntity(String.class);
 			return responseData;
 		} else {
-			throw new HttpException("Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty.");
+			throw new HttpException(
+					"Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty.");
 		}
 	}
-	public JSONObject postAuth(final String path, final byte[] data, final String contentType, final String authKey,final String authDate,final String username, final String password, final String protocolFlag) throws HttpException, JSONException{
+
+	public JSONObject postAuth(final String path, final byte[] data, final String contentType, final String authKey,
+			final String authDate, final String username, final String password, final String protocolFlag)
+			throws HttpException, JSONException {
 		if ((null != username && null != password)) {
 			WebTarget target = null;
 
 			Response response = null;
-				target= getTarget(path,username, password);
-				response = target.request()
-						.header(MR_AUTH_CONSTANT, authKey)
-						.header(MR_DATE_CONSTANT, authDate)
-						.post(Entity.entity(data, contentType));
-				
+			target = getTarget(path, username, password);
+			response = target.request().header(MR_AUTH_CONSTANT, authKey).header(MR_DATE_CONSTANT, authDate)
+					.post(Entity.entity(data, contentType));
+
 			return getResponseDataInJson(response);
 		} else {
-			throw new HttpException("Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty.");
+			throw new HttpException(
+					"Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty.");
 		}
 	}
-	public String postAuthwithResponse(final String path, final byte[] data, final String contentType, final String authKey,final String authDate,final String username, final String password, final String protocolFlag) throws HttpException, JSONException{
+
+	public String postAuthwithResponse(final String path, final byte[] data, final String contentType,
+			final String authKey, final String authDate, final String username, final String password,
+			final String protocolFlag) throws HttpException, JSONException {
 		String responseData = null;
 		if ((null != username && null != password)) {
 			WebTarget target = null;
 
 			Response response = null;
-				target= getTarget(path,username, password);
-				response = target.request()
-						.header(MR_AUTH_CONSTANT, authKey)
-						.header(MR_DATE_CONSTANT, authDate)
-						.post(Entity.entity(data, contentType));
-				responseData = response.readEntity(String.class);
-				return responseData;
-			
+			target = getTarget(path, username, password);
+			response = target.request().header(MR_AUTH_CONSTANT, authKey).header(MR_DATE_CONSTANT, authDate)
+					.post(Entity.entity(data, contentType));
+			responseData = response.readEntity(String.class);
+			return responseData;
+
 		} else {
-			throw new HttpException("Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty.");
+			throw new HttpException(
+					"Authentication Failed: Username/password/AuthKey/AuthDate parameter(s) cannot be null or empty.");
 		}
 	}
 
-
-	public JSONObject get(final String path, final String username, final String password, final String protocolFlag) throws HttpException, JSONException {
+	public JSONObject get(final String path, final String username, final String password, final String protocolFlag)
+			throws HttpException, JSONException {
 		if (null != username && null != password) {
-			
+
 			WebTarget target = null;
 
 			Response response = null;
 			if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
-				target=getTarget(path);
-				response = target.request()
-						.header(MR_AUTH_CONSTANT, username)
-						.header(MR_DATE_CONSTANT, password)
-						.get();
+				target = getTarget(path);
+				response = target.request().header(MR_AUTH_CONSTANT, username).header(MR_DATE_CONSTANT, password).get();
 			} else {
 				target = getTarget(path, username, password);
-				String encoding = Base64.encodeAsString(username+":"+password);
-				
-				response = target.request().header("Authorization", "Basic " + encoding).get();	
-						
+				String encoding = Base64.encodeAsString(username + ":" + password);
+
+				response = target.request().header("Authorization", "Basic " + encoding).get();
+
 			}
 			return getResponseDataInJson(response);
 		} else {
-			throw new HttpException("Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty.");
+			throw new HttpException(
+					"Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty.");
 		}
 	}
-	
-	
-	public String getResponse(final String path, final String username, final String password, final String protocolFlag) throws HttpException, JSONException {
+
+	public String getResponse(final String path, final String username, final String password,
+			final String protocolFlag) throws HttpException, JSONException {
 		String responseData = null;
 		if (null != username && null != password) {
-			
+
 			WebTarget target = null;
 
 			Response response = null;
 			if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
-				target=getTarget(path);
-				response = target.request()
-						.header(MR_AUTH_CONSTANT, username)
-						.header(MR_DATE_CONSTANT, password)
-						.get();
+				target = getTarget(path);
+				response = target.request().header(MR_AUTH_CONSTANT, username).header(MR_DATE_CONSTANT, password).get();
 			} else {
 				target = getTarget(path, username, password);
-				String encoding = Base64.encodeAsString(username+":"+password);				
-				response = target.request().header("Authorization", "Basic " + encoding).get();							
+				String encoding = Base64.encodeAsString(username + ":" + password);
+				response = target.request().header("Authorization", "Basic " + encoding).get();
 			}
-			MRClientFactory.HTTPHeadersMap=response.getHeaders();
-		
-			String transactionid=response.getHeaderString("transactionid");
-				if (transactionid!=null && !transactionid.equalsIgnoreCase("")) {
-					fLog.info("TransactionId : " + transactionid);
+			MRClientFactory.HTTPHeadersMap = response.getHeaders();
+
+			String transactionid = response.getHeaderString("transactionid");
+			if (transactionid != null && !transactionid.equalsIgnoreCase("")) {
+				fLog.info("TransactionId : " + transactionid);
 			}
-			
+
 			responseData = response.readEntity(String.class);
 			return responseData;
 		} else {
-			throw new HttpException("Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty.");
+			throw new HttpException(
+					"Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty.");
 		}
 	}
-	
-	public JSONObject getAuth(final String path, final String authKey, final String authDate,final String username, final String password, final String protocolFlag) throws HttpException, JSONException {
+
+	public JSONObject getAuth(final String path, final String authKey, final String authDate, final String username,
+			final String password, final String protocolFlag) throws HttpException, JSONException {
 		if (null != username && null != password) {
-			
+
 			WebTarget target = null;
 
 			Response response = null;
-				target=getTarget(path, username, password);
-				response = target.request()
-						.header(MR_AUTH_CONSTANT, authKey)
-						.header(MR_DATE_CONSTANT, authDate)
-						.get();
-						
+			target = getTarget(path, username, password);
+			response = target.request().header(MR_AUTH_CONSTANT, authKey).header(MR_DATE_CONSTANT, authDate).get();
+
 			return getResponseDataInJson(response);
 		} else {
-			throw new HttpException("Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty.");
+			throw new HttpException(
+					"Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty.");
 		}
 	}
-	
-	public String getAuthResponse(final String path, final String authKey, final String authDate,final String username, final String password, final String protocolFlag) throws HttpException, JSONException {
-		String responseData = null;
+
+	public JSONObject getNoAuth(final String path, final String username, final String password,
+			final String protocolFlag) throws HttpException, JSONException {
 		if (null != username && null != password) {
-			
+
 			WebTarget target = null;
 
 			Response response = null;
-				target=getTarget(path, username, password);
-				response = target.request()
-						.header(MR_AUTH_CONSTANT, authKey)
-						.header(MR_DATE_CONSTANT, authDate)
-						.get();
-				
-				MRClientFactory.HTTPHeadersMap=response.getHeaders();
-				
-				String transactionid=response.getHeaderString("transactionid");
-					if (transactionid!=null && !transactionid.equalsIgnoreCase("")) {
-						fLog.info("TransactionId : " + transactionid);
-				}
-						
-				responseData = response.readEntity(String.class);
-				return responseData;
+			target = getTarget(path, username, password);
+			response = target.request().get();
+
+			return getResponseDataInJson(response);
 		} else {
-			throw new HttpException("Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty.");
+			throw new HttpException(
+					"Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty.");
 		}
 	}
 
+	public String getAuthResponse(final String path, final String authKey, final String authDate, final String username,
+			final String password, final String protocolFlag) throws HttpException, JSONException {
+		String responseData = null;
+		if (null != username && null != password) {
+
+			WebTarget target = null;
+
+			Response response = null;
+			target = getTarget(path, username, password);
+			response = target.request().header(MR_AUTH_CONSTANT, authKey).header(MR_DATE_CONSTANT, authDate).get();
+
+			MRClientFactory.HTTPHeadersMap = response.getHeaders();
+
+			String transactionid = response.getHeaderString("transactionid");
+			if (transactionid != null && !transactionid.equalsIgnoreCase("")) {
+				fLog.info("TransactionId : " + transactionid);
+			}
+
+			responseData = response.readEntity(String.class);
+			return responseData;
+		} else {
+			throw new HttpException(
+					"Authentication Failed: Username/password/AuthKey/Authdate parameter(s) cannot be null or empty.");
+		}
+	}
+
+	public String getNoAuthResponse(String path, final String username, final String password,
+			final String protocolFlag) throws HttpException, JSONException {
+		String responseData = null;
+
+		WebTarget target = null;
+
+		Response response = null;
+		target = getTarget(path, username, password);
+		response = target.request().get();
+
+		MRClientFactory.HTTPHeadersMap = response.getHeaders();
+
+		String transactionid = response.getHeaderString("transactionid");
+		if (transactionid != null && !transactionid.equalsIgnoreCase("")) {
+			fLog.info("TransactionId : " + transactionid);
+		}
+
+		responseData = response.readEntity(String.class);
+		return responseData;
+
+	}
+
 	private WebTarget getTarget(final String path, final String username, final String password) {
 
 		Client client = ClientBuilder.newClient();
 
-		
-			// Using UNIVERSAL as it supports both BASIC and DIGEST authentication types.
-			HttpAuthenticationFeature feature = HttpAuthenticationFeature.universal(username, password);
-			client.register(feature);
-		
+		// Using UNIVERSAL as it supports both BASIC and DIGEST authentication
+		// types.
+		HttpAuthenticationFeature feature = HttpAuthenticationFeature.universal(username, password);
+		client.register(feature);
+
 		return client.target(path);
 	}
 
-
 	private WebTarget getTarget(final String path) {
 
 		Client client = ClientBuilder.newClient();
 		return client.target(path);
 	}
+
 	private JSONObject getResponseDataInJson(Response response) throws JSONException {
 		try {
-			MRClientFactory.HTTPHeadersMap=response.getHeaders();
-		//	fLog.info("DMAAP response status: " + response.getStatus());
-			
-						
-			//MultivaluedMap<String, Object> headersMap = response.getHeaders();
-			//for(String key : headersMap.keySet()) {
-			String transactionid=response.getHeaderString("transactionid");
-			if (transactionid!=null && !transactionid.equalsIgnoreCase("")) {
+			MRClientFactory.HTTPHeadersMap = response.getHeaders();
+			// fLog.info("DMAAP response status: " + response.getStatus());
+
+			// MultivaluedMap<String, Object> headersMap =
+			// response.getHeaders();
+			// for(String key : headersMap.keySet()) {
+			String transactionid = response.getHeaderString("transactionid");
+			if (transactionid != null && !transactionid.equalsIgnoreCase("")) {
 				fLog.info("TransactionId : " + transactionid);
 			}
 
-			/*final String responseData = response.readEntity(String.class);
-			JSONTokener jsonTokener = new JSONTokener(responseData);
-			JSONObject jsonObject = null;
-			final char firstChar = jsonTokener.next();
-			jsonTokener.back();
-			if ('[' == firstChar) {
-				JSONArray jsonArray = new JSONArray(jsonTokener);
-				jsonObject = new JSONObject();
-				jsonObject.put("result", jsonArray);
-			} else {
-				jsonObject = new JSONObject(jsonTokener);
-			}
+			/*
+			 * final String responseData = response.readEntity(String.class);
+			 * JSONTokener jsonTokener = new JSONTokener(responseData);
+			 * JSONObject jsonObject = null; final char firstChar =
+			 * jsonTokener.next(); jsonTokener.back(); if ('[' == firstChar) {
+			 * JSONArray jsonArray = new JSONArray(jsonTokener); jsonObject =
+			 * new JSONObject(); jsonObject.put("result", jsonArray); } else {
+			 * jsonObject = new JSONObject(jsonTokener); }
+			 * 
+			 * return jsonObject;
+			 */
 
-			return jsonObject;*/
-			
-
-			if(response.getStatus()==403) {
+			if (response.getStatus() == 403) {
 				JSONObject jsonObject = null;
 				jsonObject = new JSONObject();
 				JSONArray jsonArray = new JSONArray();
@@ -339,11 +371,11 @@
 				return jsonObject;
 			}
 			String responseData = response.readEntity(String.class);
-				
+
 			JSONTokener jsonTokener = new JSONTokener(responseData);
 			JSONObject jsonObject = null;
 			final char firstChar = jsonTokener.next();
-	    	jsonTokener.back();
+			jsonTokener.back();
 			if ('[' == firstChar) {
 				JSONArray jsonArray = new JSONArray(jsonTokener);
 				jsonObject = new JSONObject();
@@ -361,35 +393,35 @@
 		}
 
 	}
-	
-	public String getHTTPErrorResponseMessage(String responseString){
-		
+
+	public String getHTTPErrorResponseMessage(String responseString) {
+
 		String response = null;
 		int beginIndex = 0;
 		int endIndex = 0;
-		if(responseString.contains("<body>")){
-			
-			beginIndex = responseString.indexOf("body>")+5;
+		if (responseString.contains("<body>")) {
+
+			beginIndex = responseString.indexOf("body>") + 5;
 			endIndex = responseString.indexOf("</body");
-			response = responseString.substring(beginIndex,endIndex);
+			response = responseString.substring(beginIndex, endIndex);
 		}
-		
+
 		return response;
-		
+
 	}
-	
-	public String getHTTPErrorResponseCode(String responseString){
-		
+
+	public String getHTTPErrorResponseCode(String responseString) {
+
 		String response = null;
 		int beginIndex = 0;
 		int endIndex = 0;
-		if(responseString.contains("<title>")){
-			beginIndex = responseString.indexOf("title>")+6;
+		if (responseString.contains("<title>")) {
+			beginIndex = responseString.indexOf("title>") + 6;
 			endIndex = responseString.indexOf("</title");
-			response = responseString.substring(beginIndex,endIndex);
+			response = responseString.substring(beginIndex, endIndex);
 		}
-				
-		return response;		
+
+		return response;
 	}
-	
+
 }
diff --git a/src/main/java/com/att/nsa/mr/client/impl/MRConsumerImpl.java b/src/main/java/com/att/nsa/mr/client/impl/MRConsumerImpl.java
index eb7fd91..78f37fc 100644
--- a/src/main/java/com/att/nsa/mr/client/impl/MRConsumerImpl.java
+++ b/src/main/java/com/att/nsa/mr/client/impl/MRConsumerImpl.java
@@ -46,46 +46,43 @@
 
 import com.att.aft.dme2.api.DME2Client;
 import com.att.aft.dme2.api.DME2Exception;
+import com.att.nsa.mr.client.HostSelector;
 import com.att.nsa.mr.client.MRClientFactory;
 import com.att.nsa.mr.client.MRConsumer;
 import com.att.nsa.mr.client.response.MRConsumerResponse;
 import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
 
-public class MRConsumerImpl extends MRBaseClient implements MRConsumer
-{
-	
+public class MRConsumerImpl extends MRBaseClient implements MRConsumer {
+
 	private static final String SUCCESS_MESSAGE = "Success";
-	
-	
-	private Logger log = LoggerFactory.getLogger ( this.getClass().getName () );
-	public static List<String> stringToList ( String str )
-	{
-		final LinkedList<String> set = new LinkedList<String> ();
-		if ( str != null )
-		{
-			final String[] parts = str.trim ().split ( "," );
-			for ( String part : parts )
-			{
+
+	private Logger log = LoggerFactory.getLogger(this.getClass().getName());
+
+	public static List<String> stringToList(String str) {
+		final LinkedList<String> set = new LinkedList<String>();
+		if (str != null) {
+			final String[] parts = str.trim().split(",");
+			for (String part : parts) {
 				final String trimmed = part.trim();
-				if ( trimmed.length () > 0 )
-				{
-					set.add ( trimmed );
+				if (trimmed.length() > 0) {
+					set.add(trimmed);
 				}
 			}
 		}
 		return set;
 	}
-	
-	public MRConsumerImpl ( Collection<String> hostPart, final String topic, final String consumerGroup,
-			final String consumerId, int timeoutMs, int limit, String filter, String apiKey_username, String apiSecret_password ) throws MalformedURLException
-		{
-			this( hostPart, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey_username, apiSecret_password, false );
-		}
-	
-	public MRConsumerImpl ( Collection<String> hostPart, final String topic, final String consumerGroup,
-		final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret, boolean allowSelfSignedCerts ) throws MalformedURLException
-	{
-		super ( hostPart, topic + "::" + consumerGroup + "::" + consumerId );
+
+	public MRConsumerImpl(Collection<String> hostPart, final String topic, final String consumerGroup,
+			final String consumerId, int timeoutMs, int limit, String filter, String apiKey_username,
+			String apiSecret_password) throws MalformedURLException {
+		this(hostPart, topic, consumerGroup, consumerId, timeoutMs, limit, filter, apiKey_username, apiSecret_password,
+				false);
+	}
+
+	public MRConsumerImpl(Collection<String> hostPart, final String topic, final String consumerGroup,
+			final String consumerId, int timeoutMs, int limit, String filter, String apiKey, String apiSecret,
+			boolean allowSelfSignedCerts) throws MalformedURLException {
+		super(hostPart, topic + "::" + consumerGroup + "::" + consumerId);
 
 		fTopic = topic;
 		fGroup = consumerGroup;
@@ -94,233 +91,243 @@
 		fLimit = limit;
 		fFilter = filter;
 
-		//setApiCredentials ( apiKey, apiSecret );
+		fHostSelector = new HostSelector(hostPart);
 	}
 
 	@Override
-	public Iterable<String> fetch () throws IOException,Exception
-	{
+	public Iterable<String> fetch() throws IOException, Exception {
 		// fetch with the timeout and limit set in constructor
-		return fetch ( fTimeoutMs, fLimit );
+		return fetch(fTimeoutMs, fLimit);
 	}
 
 	@Override
-	public Iterable<String> fetch ( int timeoutMs, int limit ) throws IOException,Exception
-	{
-		final LinkedList<String> msgs = new LinkedList<String> ();
+	public Iterable<String> fetch(int timeoutMs, int limit) throws IOException, Exception {
+		final LinkedList<String> msgs = new LinkedList<String>();
 
-// FIXME: the timeout on the socket needs to be at least as long as the long poll
-//		// sanity check for long poll timeout vs. socket read timeout
-//		final int maxReasonableTimeoutMs = CambriaSingletonHttpClient.sfSoTimeoutMs * 9/10;
-//		if ( timeoutMs > maxReasonableTimeoutMs )
-//		{
-//			log.warn ( "Long poll time (" + timeoutMs + ") is too high w.r.t. socket read timeout (" +
-//				CambriaSingletonHttpClient.sfSoTimeoutMs + "). Reducing long poll timeout to " + maxReasonableTimeoutMs + "." );
-//			timeoutMs = maxReasonableTimeoutMs;
-//		}
+		// FIXME: the timeout on the socket needs to be at least as long as the
+		// long poll
+		// // sanity check for long poll timeout vs. socket read timeout
+		// final int maxReasonableTimeoutMs =
+		// CambriaSingletonHttpClient.sfSoTimeoutMs * 9/10;
+		// if ( timeoutMs > maxReasonableTimeoutMs )
+		// {
+		// log.warn ( "Long poll time (" + timeoutMs + ") is too high w.r.t.
+		// socket read timeout (" +
+		// CambriaSingletonHttpClient.sfSoTimeoutMs + "). Reducing long poll
+		// timeout to " + maxReasonableTimeoutMs + "." );
+		// timeoutMs = maxReasonableTimeoutMs;
+		// }
 
-	//	final String urlPath = createUrlPath ( timeoutMs, limit );
+		// final String urlPath = createUrlPath ( timeoutMs, limit );
 
-		//getLog().info ( "UEB GET " + urlPath );
-		try
-		{
+		// getLog().info ( "UEB GET " + urlPath );
+		try {
 			if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
 				DMEConfigure(timeoutMs, limit);
-			try 
-			{
-				//getLog().info ( "Receiving msgs from: " + url+subContextPath );
-				String reply = sender.sendAndWait(timeoutMs+10000L);				
-			//	System.out.println("Message received = "+reply);
-				final JSONObject o =getResponseDataInJson(reply);
-				//msgs.add(reply);
-				if ( o != null )
-				{
-					final JSONArray a = o.getJSONArray ( "result" );
-				//	final int b = o.getInt("status" );
-					//if ( a != null && a.length()>0 )
-					if ( a != null)
-					{
-						for ( int i=0; i<a.length (); i++ )
-						{
-							//msgs.add("DMAAP response status: "+Integer.toString(b));
-							if (a.get(i) instanceof String)
-								msgs.add ( a.getString(i) );
-							else
-						    	msgs.add ( a.getJSONObject(i).toString() );
-							
-							
-						}
-					}
-//					else if(a != null && a.length()<1){
-//						msgs.add ("[]");		
-//						}
-				}
-			}	
-			catch ( JSONException e )
-				{
-					// unexpected response
-					reportProblemWithResponse ();
-                                    log.error("exception: ", e);
-				}
-				catch ( HttpException e )
-				{
-					throw new IOException ( e );
-				}	
-			}
-			
-			if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
-				final String urlPath = createUrlPath (MRConstants.makeConsumerUrl ( host, fTopic, fGroup, fId,props.getProperty("Protocol")), timeoutMs, limit );
-			
-				
-				try
-				{
-					final JSONObject o = get ( urlPath, username, password, protocolFlag );
-
-					if ( o != null )
-					{
-						final JSONArray a = o.getJSONArray ( "result" );
-						final int b = o.getInt("status" );
-						//if ( a != null && a.length()>0 )
-						if ( a != null)
-						{
-							for ( int i=0; i<a.length (); i++ )
-							{
-								msgs.add("DMAAP response status: "+Integer.toString(b));
+				try {
+					// getLog().info ( "Receiving msgs from: " +
+					// url+subContextPath );
+					String reply = sender.sendAndWait(timeoutMs + 10000L);
+					final JSONObject o = getResponseDataInJson(reply);
+					// msgs.add(reply);
+					if (o != null) {
+						final JSONArray a = o.getJSONArray("result");
+						// final int b = o.getInt("status" );
+						// if ( a != null && a.length()>0 )
+						if (a != null) {
+							for (int i = 0; i < a.length(); i++) {
+								// msgs.add("DMAAP response status:
+								// "+Integer.toString(b));
 								if (a.get(i) instanceof String)
-									msgs.add ( a.getString(i) );
+									msgs.add(a.getString(i));
 								else
-									msgs.add ( a.getJSONObject(i).toString() );
-								
+									msgs.add(a.getJSONObject(i).toString());
+
 							}
 						}
-//						else if(a != null && a.length()<1)
-//							{
-//								msgs.add ("[]");		
-//							}
+						// else if(a != null && a.length()<1){
+						// msgs.add ("[]");
+						// }
 					}
-				}
-				catch ( JSONException e )
-				{
+				} catch (JSONException e) {
 					// unexpected response
-					reportProblemWithResponse ();
-                                    log.error("exception: ", e);
+					reportProblemWithResponse();
+					log.error("exception: ", e);
+				} catch (HttpException e) {
+					throw new IOException(e);
 				}
-				catch ( HttpException e )
-				{
-					throw new IOException ( e );
-				}
-			} 
-			
-			if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
-				final String urlPath = createUrlPath (MRConstants.makeConsumerUrl ( host, fTopic, fGroup, fId ,props.getProperty("Protocol")), timeoutMs, limit );
-				
+			}
 
-			try 
-			{
-				final JSONObject o = getAuth(urlPath, authKey, authDate, username, password, protocolFlag );
-				if ( o != null )
-				{
-					final JSONArray a = o.getJSONArray ( "result" );
-					final int b = o.getInt("status" );
-					//if ( a != null && a.length()>0)
-					if ( a != null)
-					{
-						for ( int i=0; i<a.length (); i++ )
-						{
-							msgs.add("DMAAP response status: "+Integer.toString(b));
-							if (a.get(i) instanceof String)
-								msgs.add ( a.getString(i) );
-							else
-						    	msgs.add ( a.getJSONObject(i).toString() );
-							
+			if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
+				// final String urlPath = createUrlPath
+				// (MRConstants.makeConsumerUrl ( host, fTopic, fGroup,
+				// fId,props.getProperty("Protocol")), timeoutMs, limit );
+				final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
+						fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
+
+				try {
+					final JSONObject o = get(urlPath, username, password, protocolFlag);
+
+					if (o != null) {
+						final JSONArray a = o.getJSONArray("result");
+						final int b = o.getInt("status");
+						// if ( a != null && a.length()>0 )
+						if (a != null) {
+							for (int i = 0; i < a.length(); i++) {
+								// msgs.add("DMAAP response status:
+								// "+Integer.toString(b));
+								if (a.get(i) instanceof String)
+									msgs.add(a.getString(i));
+								else
+									msgs.add(a.getJSONObject(i).toString());
+
+							}
 						}
+						// else if(a != null && a.length()<1)
+						// {
+						// msgs.add ("[]");
+						// }
 					}
-//					else if(a != null && a.length()<1){
-//						msgs.add ("[]");		
-//						}
+				} catch (JSONException e) {
+					// unexpected response
+					reportProblemWithResponse();
+					log.error("exception: ", e);
+				} catch (HttpException e) {
+					throw new IOException(e);
 				}
 			}
-			catch ( JSONException e )
-			{
-				// unexpected response
-				reportProblemWithResponse ();
-                            log.error("exception: ", e);
+
+			if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
+				final String urlPath = createUrlPath(
+						MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty("Protocol")),
+						timeoutMs, limit);
+
+				try {
+					final JSONObject o = getAuth(urlPath, authKey, authDate, username, password, protocolFlag);
+					if (o != null) {
+						final JSONArray a = o.getJSONArray("result");
+						final int b = o.getInt("status");
+						// if ( a != null && a.length()>0)
+						if (a != null) {
+							for (int i = 0; i < a.length(); i++) {
+								// msgs.add("DMAAP response status:
+								// "+Integer.toString(b));
+								if (a.get(i) instanceof String)
+									msgs.add(a.getString(i));
+								else
+									msgs.add(a.getJSONObject(i).toString());
+
+							}
+						}
+						// else if(a != null && a.length()<1){
+						// msgs.add ("[]");
+						// }
+					}
+				} catch (JSONException e) {
+					// unexpected response
+					reportProblemWithResponse();
+					log.error("exception: ", e);
+				} catch (HttpException e) {
+					throw new IOException(e);
+				}
+
 			}
-			catch ( HttpException e )
-			{
-				throw new IOException ( e );
+			if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
+				final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
+						fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
+
+				try {
+					final JSONObject o = getNoAuth(urlPath, username, password, protocolFlag);
+					if (o != null) {
+						final JSONArray a = o.getJSONArray("result");
+						final int b = o.getInt("status");
+						// if ( a != null && a.length()>0)
+						if (a != null) {
+							for (int i = 0; i < a.length(); i++) {
+								// msgs.add("DMAAP response status:
+								// "+Integer.toString(b));
+								if (a.get(i) instanceof String)
+									msgs.add(a.getString(i));
+								else
+									msgs.add(a.getJSONObject(i).toString());
+
+							}
+						}
+
+					}
+				} catch (JSONException e) {
+					// unexpected response
+					reportProblemWithResponse();
+				} catch (HttpException e) {
+					throw new IOException(e);
+				}
+
 			}
-				
-			}
-			
-		} catch ( JSONException e ) {
+
+		} catch (JSONException e) {
 			// unexpected response
-			reportProblemWithResponse ();
-                    log.error("exception: ", e);
+			reportProblemWithResponse();
+			log.error("exception: ", e);
 		} catch (HttpException e) {
 			throw new IOException(e);
-		} catch (Exception e ) {
+		} catch (Exception e) {
 			throw e;
 		}
 
-
 		return msgs;
 	}
 
 	private JSONObject getResponseDataInJson(String response) {
-	try {
-		
-		
-		//log.info("DMAAP response status: " + response.getStatus());
+		try {
 
-		//	final String responseData = response.readEntity(String.class);
+			// log.info("DMAAP response status: " + response.getStatus());
+
+			// final String responseData = response.readEntity(String.class);
+			JSONTokener jsonTokener = new JSONTokener(response);
+			JSONObject jsonObject = null;
+			final char firstChar = jsonTokener.next();
+			jsonTokener.back();
+			if ('[' == firstChar) {
+				JSONArray jsonArray = new JSONArray(jsonTokener);
+				jsonObject = new JSONObject();
+				jsonObject.put("result", jsonArray);
+			} else {
+				jsonObject = new JSONObject(jsonTokener);
+			}
+
+			return jsonObject;
+		} catch (JSONException excp) {
+			// log.error("DMAAP - Error reading response data.", excp);
+			return null;
+		}
+
+	}
+
+	private JSONObject getResponseDataInJsonWithResponseReturned(String response) {
 		JSONTokener jsonTokener = new JSONTokener(response);
 		JSONObject jsonObject = null;
 		final char firstChar = jsonTokener.next();
-    	jsonTokener.back();
+		jsonTokener.back();
+		if (null != response && response.length() == 0) {
+			return null;
+		}
+
 		if ('[' == firstChar) {
 			JSONArray jsonArray = new JSONArray(jsonTokener);
 			jsonObject = new JSONObject();
 			jsonObject.put("result", jsonArray);
+		} else if ('{' == firstChar) {
+			return null;
+		} else if ('<' == firstChar) {
+			return null;
 		} else {
 			jsonObject = new JSONObject(jsonTokener);
 		}
 
 		return jsonObject;
-	} catch (JSONException excp) {
-	//	log.error("DMAAP - Error reading response data.", excp);
-		return null;
-	}
-	
-	
-	
-}
-	
-	private JSONObject getResponseDataInJsonWithResponseReturned(String response) {
-			JSONTokener jsonTokener = new JSONTokener(response);
-			JSONObject jsonObject = null;
-			final char firstChar = jsonTokener.next();
-	    	jsonTokener.back();
-	    	if(null != response && response.length()==0){
-	    		return null;
-	    	}
-	    	
-			if ('[' == firstChar) {
-				JSONArray jsonArray = new JSONArray(jsonTokener);
-				jsonObject = new JSONObject();
-				jsonObject.put("result", jsonArray);
-			} else if('{' == firstChar){
-				return null;
-			} else if('<' == firstChar){
-				return null;
-			}else{
-				jsonObject = new JSONObject(jsonTokener);
-			}
 
-			return jsonObject;
-		
 	}
-	
+
 	private final String fTopic;
 	private final String fGroup;
 	private final String fId;
@@ -330,187 +337,184 @@
 	private String username;
 	private String password;
 	private String host;
-	 private  String latitude;
-		private  String longitude;
-		private  String version;
-		private  String serviceName;
-		private  String env;
-		private  String partner;
-		private String routeOffer;
-		private  String subContextPath;
-		private  String protocol;
-		private  String methodType;
-		private  String url;
-		private  String dmeuser;
-		private  String dmepassword;
-		private  String contenttype;
-	    private DME2Client sender;
-		public String protocolFlag = ProtocolTypeConstants.DME2.getValue();
-		public String consumerFilePath;
-		private String authKey;
-		private String authDate;
-        private Properties props;
-    	private HashMap<String, String> DMETimeOuts;
-    	private String handlers;
-    	public static final String routerFilePath = null;
-    	public static String getRouterFilePath() {
-    		return routerFilePath;
-    	}
+	HostSelector fHostSelector = null;
+	private String latitude;
+	private String longitude;
+	private String version;
+	private String serviceName;
+	private String env;
+	private String partner;
+	private String routeOffer;
+	private String subContextPath;
+	private String protocol;
+	private String methodType;
+	private String url;
+	private String dmeuser;
+	private String dmepassword;
+	private String contenttype;
+	private DME2Client sender;
+	public String protocolFlag = ProtocolTypeConstants.DME2.getValue();
+	public String consumerFilePath;
+	private String authKey;
+	private String authDate;
+	private Properties props;
+	private HashMap<String, String> DMETimeOuts;
+	private String handlers;
+	public static final String routerFilePath = null;
 
-    	public static void setRouterFilePath(String routerFilePath) {
-    		MRSimplerBatchPublisher.routerFilePath = routerFilePath;
-    	}
-		public String getConsumerFilePath() {
-			return consumerFilePath;
+	public static String getRouterFilePath() {
+		return routerFilePath;
+	}
+
+	public static void setRouterFilePath(String routerFilePath) {
+		MRSimplerBatchPublisher.routerFilePath = routerFilePath;
+	}
+
+	public String getConsumerFilePath() {
+		return consumerFilePath;
+	}
+
+	public void setConsumerFilePath(String consumerFilePath) {
+		this.consumerFilePath = consumerFilePath;
+	}
+
+	public String getProtocolFlag() {
+		return protocolFlag;
+	}
+
+	public void setProtocolFlag(String protocolFlag) {
+		this.protocolFlag = protocolFlag;
+	}
+
+	private void DMEConfigure(int timeoutMs, int limit) throws IOException, DME2Exception, URISyntaxException {
+		latitude = props.getProperty("Latitude");
+		longitude = props.getProperty("Longitude");
+		version = props.getProperty("Version");
+		serviceName = props.getProperty("ServiceName");
+		env = props.getProperty("Environment");
+		partner = props.getProperty("Partner");
+		routeOffer = props.getProperty("routeOffer");
+
+		subContextPath = props.getProperty("SubContextPath") + fTopic + "/" + fGroup + "/" + fId;
+		// subContextPath=createUrlPath (subContextPath, timeoutMs, limit);
+		// if (timeoutMs != -1) subContextPath=createUrlPath (subContextPath,
+		// timeoutMs);
+
+		protocol = props.getProperty("Protocol");
+		methodType = props.getProperty("MethodType");
+		dmeuser = props.getProperty("username");
+		dmepassword = props.getProperty("password");
+		contenttype = props.getProperty("contenttype");
+		handlers = props.getProperty("sessionstickinessrequired");
+		// url =protocol+"://DME2SEARCH/"+
+		// "service="+serviceName+"/"+"version="+version+"/"+"envContext="+env+"/"+"partner="+partner;
+		// url = protocol +
+		// "://"+serviceName+"?version="+version+"&envContext="+env+"&routeOffer="+partner;
+
+		/**
+		 * Changes to DME2Client url to use Partner for auto failover between
+		 * data centers When Partner value is not provided use the routeOffer
+		 * value for auto failover within a cluster
+		 */
+
+		String preferredRouteKey = readRoute("preferredRouteKey");
+
+		if (partner != null && !partner.isEmpty() && preferredRouteKey != null && !preferredRouteKey.isEmpty()) {
+			url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner=" + partner
+					+ "&routeoffer=" + preferredRouteKey;
+		} else if (partner != null && !partner.isEmpty()) {
+			url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner=" + partner;
+		} else if (routeOffer != null && !routeOffer.isEmpty()) {
+			url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&routeoffer="
+					+ routeOffer;
 		}
 
-		public void setConsumerFilePath(String consumerFilePath) {
-			this.consumerFilePath = consumerFilePath;
-		}
+		// log.info("url :"+url);
 
-		public String getProtocolFlag() {
-			return protocolFlag;
-		}
+		if (timeoutMs != -1)
+			url = url + "&timeout=" + timeoutMs;
+		if (limit != -1)
+			url = url + "&limit=" + limit;
 
-		public void setProtocolFlag(String protocolFlag) {
-			this.protocolFlag = protocolFlag;
-		}
-		
-		private void DMEConfigure(int timeoutMs, int limit)throws IOException,DME2Exception, URISyntaxException{ 
-			latitude = props.getProperty("Latitude");
-			longitude = props.getProperty("Longitude");
-			version = props.getProperty("Version");
-			serviceName = props.getProperty("ServiceName");
-			env = props.getProperty("Environment");
-			partner = props.getProperty("Partner");
-			routeOffer = props.getProperty("routeOffer");
-			
-			subContextPath=props.getProperty("SubContextPath")+fTopic+"/"+fGroup+"/"+fId;
-		//	subContextPath=createUrlPath (subContextPath, timeoutMs, limit);
-			//if (timeoutMs != -1) subContextPath=createUrlPath (subContextPath, timeoutMs);
-			
-			protocol = props.getProperty("Protocol"); 
-			methodType = props.getProperty("MethodType");
-			dmeuser = props.getProperty("username");
-			dmepassword = props.getProperty("password");
-			contenttype = props.getProperty("contenttype");
-			handlers = props.getProperty("sessionstickinessrequired");
-			//url =protocol+"://DME2SEARCH/"+ "service="+serviceName+"/"+"version="+version+"/"+"envContext="+env+"/"+"partner="+partner;
-		//	url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&routeOffer="+partner;
-		
-			/**
-			 * Changes to DME2Client url to use Partner for auto failover between data centers
-			 * When Partner value is not provided use the routeOffer value for auto failover within a cluster 
-			 */
-			
-			String preferredRouteKey = readRoute("preferredRouteKey");
-						
-			if (partner != null && !partner.isEmpty() && preferredRouteKey != null && !preferredRouteKey.isEmpty()) 
-			{ 
-				url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&partner="+partner+"&routeoffer="+preferredRouteKey; 
-			}else  if (partner != null && !partner.isEmpty()) 
-			{ 
-				url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&partner="+partner; 
-			}
-			else if (routeOffer!=null && !routeOffer.isEmpty()) 
-			{ 
-				url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&routeoffer="+routeOffer;
-			}
-			
-			//log.info("url :"+url);
-						
-			if(timeoutMs != -1 )url=url+"&timeout="+timeoutMs;
-			if(limit != -1 )url=url+"&limit="+limit;
+		// Add filter to DME2 Url
+		if (fFilter != null && fFilter.length() > 0)
+			url = url + "&filter=" + URLEncoder.encode(fFilter, "UTF-8");
 
-			DMETimeOuts = new HashMap<String, String>();
-			DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
-			DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS"));
-			DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
-			DMETimeOuts.put("Content-Type", contenttype);
-			System.setProperty("AFT_LATITUDE", latitude);
-			System.setProperty("AFT_LONGITUDE", longitude);
-			System.setProperty("AFT_ENVIRONMENT",props.getProperty("AFT_ENVIRONMENT"));
-		//	System.setProperty("DME2.DEBUG", "true");
-			
-			//SSL changes
-			System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS",
-					"SSLv3,TLSv1,TLSv1.1");
-			System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
-			System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
-			//SSL changes
-            
-			sender = new DME2Client(new URI(url), timeoutMs+10000L);
-			sender.setAllowAllHttpReturnCodes(true);
-			sender.setMethod(methodType);
-			sender.setSubContext(subContextPath);	
-			if(dmeuser != null && dmepassword != null){
+		DMETimeOuts = new HashMap<String, String>();
+		DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
+		DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS"));
+		DMETimeOuts.put("AFT_DME2_EP_CONN_TIMEOUT", props.getProperty("AFT_DME2_EP_CONN_TIMEOUT"));
+		DMETimeOuts.put("Content-Type", contenttype);
+		System.setProperty("AFT_LATITUDE", latitude);
+		System.setProperty("AFT_LONGITUDE", longitude);
+		System.setProperty("AFT_ENVIRONMENT", props.getProperty("AFT_ENVIRONMENT"));
+		// System.setProperty("DME2.DEBUG", "true");
+
+		// SSL changes
+		// System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS",
+		// "SSLv3,TLSv1,TLSv1.1");
+		System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2");
+		System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
+		System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
+		// SSL changes
+
+		sender = new DME2Client(new URI(url), timeoutMs + 10000L);
+		sender.setAllowAllHttpReturnCodes(true);
+		sender.setMethod(methodType);
+		sender.setSubContext(subContextPath);
+		if (dmeuser != null && dmepassword != null) {
 			sender.setCredentials(dmeuser, dmepassword);
-			//System.out.println(dmepassword);
-			}
-			sender.setHeaders(DMETimeOuts);
-			sender.setPayload("");               
-			
-			if(handlers.equalsIgnoreCase("yes")){
-				sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
-				sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
-				sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON"));
-				}else{
-					sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler");
-				}
-		/*	HeaderReplyHandler headerhandler= new HeaderReplyHandler(); 
-			sender.setReplyHandler(headerhandler);*/
-//			} catch (DME2Exception x) {
-//				getLog().warn(x.getMessage(), x);
-//				System.out.println("XXXXXXXXXXXX"+x);
-//			} catch (URISyntaxException x) {
-//				System.out.println(x);
-//				getLog().warn(x.getMessage(), x);
-//			} catch (Exception x) {
-//				System.out.println("XXXXXXXXXXXX"+x);
-//				getLog().warn(x.getMessage(), x);
-//			}
 		}
+		sender.setHeaders(DMETimeOuts);
+		sender.setPayload("");
+
+		if (handlers.equalsIgnoreCase("yes")) {
+			sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS",
+					props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
+			sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
+			sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON"));
+		} else {
+			sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler");
+		}
+	}
+
 	public Properties getProps() {
-			return props;
-		}
+		return props;
+	}
 
-		public void setProps(Properties props) {
-			this.props = props;
-		}
+	public void setProps(Properties props) {
+		this.props = props;
+	}
 
-	protected String createUrlPath (String url, int timeoutMs , int limit ) throws IOException
-	{
-		final StringBuffer contexturl= new StringBuffer(url);
-	//	final StringBuffer url = new StringBuffer ( CambriaConstants.makeConsumerUrl ( host, fTopic, fGroup, fId ) );
-		final StringBuffer adds = new StringBuffer ();
-		if ( timeoutMs > -1 ) adds.append ( "timeout=" ).append ( timeoutMs ); 
-		if ( limit > -1 )
-		{
-			if ( adds.length () > 0 )
-			{
-				adds.append ( "&" );
+	protected String createUrlPath(String url, int timeoutMs, int limit) throws IOException {
+		final StringBuffer contexturl = new StringBuffer(url);
+		// final StringBuffer url = new StringBuffer (
+		// CambriaConstants.makeConsumerUrl ( host, fTopic, fGroup, fId ) );
+		final StringBuffer adds = new StringBuffer();
+		if (timeoutMs > -1)
+			adds.append("timeout=").append(timeoutMs);
+		if (limit > -1) {
+			if (adds.length() > 0) {
+				adds.append("&");
 			}
-			adds.append ( "limit=" ).append ( limit );
+			adds.append("limit=").append(limit);
 		}
-		if ( fFilter != null && fFilter.length () > 0 )
-		{
+		if (fFilter != null && fFilter.length() > 0) {
 			try {
-				if ( adds.length () > 0 )
-				{
-					adds.append ( "&" );
+				if (adds.length() > 0) {
+					adds.append("&");
 				}
 				adds.append("filter=").append(URLEncoder.encode(fFilter, "UTF-8"));
 			} catch (UnsupportedEncodingException e) {
 				throw new RuntimeException(e.getMessage() + "....say whaaaat?!");
 			}
 		}
-		if ( adds.length () > 0 )
-		{
-			contexturl.append ( "?" ).append ( adds.toString () );
+		if (adds.length() > 0) {
+			contexturl.append("?").append(adds.toString());
 		}
-		
-		//sender.setSubContext(url.toString());
-		return contexturl.toString ();
+
+		// sender.setSubContext(url.toString());
+		return contexturl.toString();
 	}
 
 	public String getUsername() {
@@ -560,20 +564,20 @@
 	public void setfFilter(String fFilter) {
 		this.fFilter = fFilter;
 	}
-	
+
 	private String readRoute(String routeKey) {
 
 		try {
-			
-			MRClientFactory.prop.load(new FileReader(new File (MRClientFactory.routeFilePath)));
+
+			MRClientFactory.prop.load(new FileReader(new File(MRClientFactory.routeFilePath)));
 
 		} catch (Exception ex) {
-			log.error("Reply Router Error " + ex.toString() );
+			log.error("Reply Router Error " + ex.toString());
 		}
-		String routeOffer = MRClientFactory.prop.getProperty(routeKey);		
+		String routeOffer = MRClientFactory.prop.getProperty(routeKey);
 		return routeOffer;
 	}
-	
+
 	@Override
 	public MRConsumerResponse fetchWithReturnConsumerResponse() {
 
@@ -582,13 +586,11 @@
 	}
 
 	@Override
-	public MRConsumerResponse fetchWithReturnConsumerResponse(int timeoutMs,
-			int limit) {
+	public MRConsumerResponse fetchWithReturnConsumerResponse(int timeoutMs, int limit) {
 		final LinkedList<String> msgs = new LinkedList<String>();
 		MRConsumerResponse mrConsumerResponse = new MRConsumerResponse();
 		try {
-			if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(
-					protocolFlag)) {
+			if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
 				DMEConfigure(timeoutMs, limit);
 
 				String reply = sender.sendAndWait(timeoutMs + 10000L);
@@ -599,7 +601,7 @@
 					final JSONArray a = o.getJSONArray("result");
 
 					if (a != null) {
-						for (int i = 0; i < a.length(); i++) {							
+						for (int i = 0; i < a.length(); i++) {
 							if (a.get(i) instanceof String)
 								msgs.add(a.getString(i));
 							else
@@ -612,15 +614,16 @@
 				createMRConsumerResponse(reply, mrConsumerResponse);
 			}
 
-			if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(
-					protocolFlag)) {
-				final String urlPath = createUrlPath(
-						MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId,
-								props.getProperty("Protocol")), timeoutMs,
-						limit);
+			if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
+				/*
+				 * final String urlPath = createUrlPath(
+				 * MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId,
+				 * props.getProperty("Protocol")), timeoutMs, limit);
+				 */
 
-				String response = getResponse(urlPath, username, password,
-						protocolFlag);
+				final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
+						fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
+				String response = getResponse(urlPath, username, password, protocolFlag);
 
 				final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
 
@@ -628,33 +631,6 @@
 					final JSONArray a = o.getJSONArray("result");
 
 					if (a != null) {
-						for (int i = 0; i < a.length(); i++) {							
-							if (a.get(i) instanceof String)
-								msgs.add(a.getString(i));
-							else
-								msgs.add(a.getJSONObject(i).toString());
-
-						}
-					}
-
-				}
-				createMRConsumerResponse(response, mrConsumerResponse);
-			}
-
-			if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(
-					protocolFlag)) {
-				final String urlPath = createUrlPath(
-						MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId,
-								props.getProperty("Protocol")), timeoutMs,
-						limit);
-
-				String response  = getAuthResponse(urlPath, authKey, authDate,
-						username, password, protocolFlag);
-				final JSONObject o = getResponseDataInJsonWithResponseReturned(response);			
-				if (o != null) {
-					final JSONArray a = o.getJSONArray("result");
-
-					if (a != null) {
 						for (int i = 0; i < a.length(); i++) {
 							if (a.get(i) instanceof String)
 								msgs.add(a.getString(i));
@@ -667,51 +643,98 @@
 				}
 				createMRConsumerResponse(response, mrConsumerResponse);
 			}
-			
-			
-			
-		} catch (JSONException e) {	
+
+			if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
+				final String urlPath = createUrlPath(
+						MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId, props.getProperty("Protocol")),
+						timeoutMs, limit);
+
+				String response = getAuthResponse(urlPath, authKey, authDate, username, password, protocolFlag);
+				final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
+				if (o != null) {
+					final JSONArray a = o.getJSONArray("result");
+
+					if (a != null) {
+						for (int i = 0; i < a.length(); i++) {
+							if (a.get(i) instanceof String)
+								msgs.add(a.getString(i));
+							else
+								msgs.add(a.getJSONObject(i).toString());
+
+						}
+					}
+
+				}
+				createMRConsumerResponse(response, mrConsumerResponse);
+			}
+			if (ProtocolTypeConstants.HTTPNOAUTH.getValue().equalsIgnoreCase(protocolFlag)) {
+				// final String urlPath = createUrlPath(
+				// MRConstants.makeConsumerUrl(host, fTopic, fGroup, fId,
+				// props.getProperty("Protocol")), timeoutMs,
+				// limit);
+				final String urlPath = createUrlPath(MRConstants.makeConsumerUrl(fHostSelector.selectBaseHost(), fTopic,
+						fGroup, fId, props.getProperty("Protocol")), timeoutMs, limit);
+
+				String response = getNoAuthResponse(urlPath, username, password, protocolFlag);
+				final JSONObject o = getResponseDataInJsonWithResponseReturned(response);
+				if (o != null) {
+					final JSONArray a = o.getJSONArray("result");
+
+					if (a != null) {
+						for (int i = 0; i < a.length(); i++) {
+							if (a.get(i) instanceof String)
+								msgs.add(a.getString(i));
+							else
+								msgs.add(a.getJSONObject(i).toString());
+
+						}
+					}
+
+				}
+				createMRConsumerResponse(response, mrConsumerResponse);
+			}
+
+		} catch (JSONException e) {
 			mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
 			mrConsumerResponse.setResponseMessage(e.getMessage());
-                        log.error("json exception: ", e);
-		} catch (HttpException e) {			
+			log.error("json exception: ", e);
+		} catch (HttpException e) {
 			mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
 			mrConsumerResponse.setResponseMessage(e.getMessage());
-                        log.error("http exception: ", e);
-		}catch(DME2Exception e){			
+			log.error("http exception: ", e);
+		} catch (DME2Exception e) {
 			mrConsumerResponse.setResponseCode(e.getErrorCode());
 			mrConsumerResponse.setResponseMessage(e.getErrorMessage());
-                        log.error("DME2 exception: ", e);
-		}catch (Exception e) {			
+			log.error("DME2 exception: ", e);
+		} catch (Exception e) {
 			mrConsumerResponse.setResponseMessage(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
 			mrConsumerResponse.setResponseMessage(e.getMessage());
-                        log.error("exception: ", e);
+			log.error("exception: ", e);
 		}
 		mrConsumerResponse.setActualMessages(msgs);
 		return mrConsumerResponse;
 	}
 
 	private void createMRConsumerResponse(String reply, MRConsumerResponse mrConsumerResponse) {
-		
-		if(reply.startsWith("{")){
+
+		if (reply.startsWith("{")) {
 			JSONObject jObject = new JSONObject(reply);
 			String message = jObject.getString("message");
 			int status = jObject.getInt("status");
-		
+
 			mrConsumerResponse.setResponseCode(Integer.toString(status));
-			
-			if(null != message){
-				mrConsumerResponse.setResponseMessage(message);	
-			}	
-		}else if (reply.startsWith("<")){
+
+			if (null != message) {
+				mrConsumerResponse.setResponseMessage(message);
+			}
+		} else if (reply.startsWith("<")) {
 			mrConsumerResponse.setResponseCode(getHTTPErrorResponseCode(reply));
-			mrConsumerResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));			
-		}else{
+			mrConsumerResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));
+		} else {
 			mrConsumerResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
-			mrConsumerResponse.setResponseMessage(SUCCESS_MESSAGE);	
+			mrConsumerResponse.setResponseMessage(SUCCESS_MESSAGE);
 		}
-		
+
 	}
 
- 
 }
diff --git a/src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java b/src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java
index 398558d..db982ec 100644
--- a/src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java
+++ b/src/main/java/com/att/nsa/mr/client/impl/MRSimplerBatchPublisher.java
@@ -47,6 +47,7 @@
 import org.apache.http.HttpStatus;
 import org.json.JSONArray;
 import org.json.JSONObject;
+import org.json.JSONTokener;
 
 import com.att.aft.dme2.api.DME2Client;
 import com.att.aft.dme2.api.DME2Exception;
@@ -55,76 +56,66 @@
 import com.att.nsa.mr.client.response.MRPublisherResponse;
 import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
 
-public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingPublisher
-{
+public class MRSimplerBatchPublisher extends MRBaseClient implements MRBatchingPublisher {
 	private static final Logger logger = LoggerFactory.getLogger(MRSimplerBatchPublisher.class);
 
-	public static class Builder 
-	{
-		public Builder ()
-		{
+	public static class Builder {
+		public Builder() {
 		}
 
-		public Builder againstUrls ( Collection<String> baseUrls )
-		{
+		public Builder againstUrls(Collection<String> baseUrls) {
 			fUrls = baseUrls;
 			return this;
 		}
 
-		public Builder onTopic ( String topic )
-		{
+		public Builder onTopic(String topic) {
 			fTopic = topic;
 			return this;
 		}
 
-		public Builder batchTo ( int maxBatchSize, long maxBatchAgeMs )
-		{
+		public Builder batchTo(int maxBatchSize, long maxBatchAgeMs) {
 			fMaxBatchSize = maxBatchSize;
 			fMaxBatchAgeMs = maxBatchAgeMs;
 			return this;
 		}
 
-		public Builder compress ( boolean compress )
-		{
+		public Builder compress(boolean compress) {
 			fCompress = compress;
 			return this;
 		}
-		
-		public Builder httpThreadTime ( int threadOccuranceTime )
-		{
+
+		public Builder httpThreadTime(int threadOccuranceTime) {
 			this.threadOccuranceTime = threadOccuranceTime;
 			return this;
 		}
-		
-		public Builder allowSelfSignedCertificates( boolean allowSelfSignedCerts )
-		{
+
+		public Builder allowSelfSignedCertificates(boolean allowSelfSignedCerts) {
 			fAllowSelfSignedCerts = allowSelfSignedCerts;
 			return this;
 		}
-		
-		public Builder withResponse ( boolean withResponse)
-		{
+
+		public Builder withResponse(boolean withResponse) {
 			fWithResponse = withResponse;
 			return this;
 		}
-		public MRSimplerBatchPublisher build ()
-		{
-			if(!fWithResponse) 
-			{
+
+		public MRSimplerBatchPublisher build() {
+			if (!fWithResponse) {
 				try {
-					return new MRSimplerBatchPublisher ( fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress, fAllowSelfSignedCerts,threadOccuranceTime);
+					return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress,
+							fAllowSelfSignedCerts, threadOccuranceTime);
 				} catch (MalformedURLException e) {
 					throw new RuntimeException(e);
 				}
-			} else 
-			{
+			} else {
 				try {
-					return new MRSimplerBatchPublisher ( fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress, fAllowSelfSignedCerts, fMaxBatchSize);
+					return new MRSimplerBatchPublisher(fUrls, fTopic, fMaxBatchSize, fMaxBatchAgeMs, fCompress,
+							fAllowSelfSignedCerts, fMaxBatchSize);
 				} catch (MalformedURLException e) {
 					throw new RuntimeException(e);
 				}
 			}
-				
+
 		}
 
 		private Collection<String> fUrls;
@@ -135,262 +126,232 @@
 		private int threadOccuranceTime = 50;
 		private boolean fAllowSelfSignedCerts = false;
 		private boolean fWithResponse = false;
-		
+
 	};
 
 	@Override
-	public int send ( String partition, String msg )
-	{
-		return send ( new message ( partition, msg ) );
-	}
-	@Override
-	public int send ( String msg )
-	{
-		return send ( new message ( null, msg ) );
+	public int send(String partition, String msg) {
+		return send(new message(partition, msg));
 	}
 
-
 	@Override
-	public int send ( message msg )
-	{
-		final LinkedList<message> list = new LinkedList<message> ();
-		list.add ( msg );
-		return send ( list );
+	public int send(String msg) {
+		return send(new message(null, msg));
 	}
-	
-	
 
 	@Override
-	public synchronized int send ( Collection<message> msgs )
-	{
-		if ( fClosed )
-		{
-			throw new IllegalStateException ( "The publisher was closed." );
+	public int send(message msg) {
+		final LinkedList<message> list = new LinkedList<message>();
+		list.add(msg);
+		return send(list);
+	}
+
+	@Override
+	public synchronized int send(Collection<message> msgs) {
+		if (fClosed) {
+			throw new IllegalStateException("The publisher was closed.");
 		}
-		
-		for ( message userMsg : msgs )
-		{
-			fPending.add ( new TimestampedMessage ( userMsg ) );
+
+		for (message userMsg : msgs) {
+			fPending.add(new TimestampedMessage(userMsg));
 		}
-		return getPendingMessageCount ();
+		return getPendingMessageCount();
 	}
 
 	@Override
-	public synchronized int getPendingMessageCount ()
-	{
-		return fPending.size ();
+	public synchronized int getPendingMessageCount() {
+		return fPending.size();
 	}
 
 	@Override
-	public void close ()
-	{
-		try
-		{
-			final List<message> remains = close ( Long.MAX_VALUE, TimeUnit.MILLISECONDS );
-			if ( remains.size() > 0 )
-			{
-				getLog().warn ( "Closing publisher with " + remains.size() + " messages unsent. "
-					+ "Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close." );
+	public void close() {
+		try {
+			final List<message> remains = close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+			if (remains.size() > 0) {
+				getLog().warn("Closing publisher with " + remains.size() + " messages unsent. "
+						+ "Consider using MRBatchingPublisher.close( long timeout, TimeUnit timeoutUnits ) to recapture unsent messages on close.");
 			}
-		}
-		catch ( InterruptedException e )
-		{
-			getLog().warn ( "Possible message loss. " + e.getMessage(), e );
-		}
-		catch ( IOException e )
-		{
-			getLog().warn ( "Possible message loss. " + e.getMessage(), e );
+		} catch (InterruptedException e) {
+			getLog().warn("Possible message loss. " + e.getMessage(), e);
+		} catch (IOException e) {
+			getLog().warn("Possible message loss. " + e.getMessage(), e);
 		}
 	}
 
 	@Override
-	public List<message> close ( long time, TimeUnit unit ) throws IOException, InterruptedException
-	{
-		synchronized ( this )
-		{
+	public List<message> close(long time, TimeUnit unit) throws IOException, InterruptedException {
+		synchronized (this) {
 			fClosed = true;
 
 			// stop the background sender
-			fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy ( false );
-			fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy ( false );
-			fExec.shutdown ();
+			fExec.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+			fExec.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+			fExec.shutdown();
 		}
 
-		final long now = Clock.now ();
-		final long waitInMs = TimeUnit.MILLISECONDS.convert ( time, unit );
+		final long now = Clock.now();
+		final long waitInMs = TimeUnit.MILLISECONDS.convert(time, unit);
 		final long timeoutAtMs = now + waitInMs;
 
-		while ( Clock.now() < timeoutAtMs && getPendingMessageCount() > 0 )
-		{
-			send ( true );
-			Thread.sleep ( 250 );
+		while (Clock.now() < timeoutAtMs && getPendingMessageCount() > 0) {
+			send(true);
+			Thread.sleep(250);
 		}
 
-		synchronized ( this )
-		{
-			final LinkedList<message> result = new LinkedList<message> ();
-			fPending.drainTo ( result );
+		synchronized (this) {
+			final LinkedList<message> result = new LinkedList<message>();
+			fPending.drainTo(result);
 			return result;
 		}
 	}
 
 	/**
-	 * Possibly send a batch to the MR server. This is called by the background thread
-	 * and the close() method
+	 * Possibly send a batch to the MR server. This is called by the background
+	 * thread and the close() method
 	 * 
 	 * @param force
 	 */
-	private synchronized void send ( boolean force )
-	{
-		if ( force || shouldSendNow () )
-		{
-			if ( !sendBatch () )
-			{
-				getLog().warn ( "Send failed, " + fPending.size() + " message to send." );
+	private synchronized void send(boolean force) {
+		if (force || shouldSendNow()) {
+			if (!sendBatch()) {
+				getLog().warn("Send failed, " + fPending.size() + " message to send.");
 
 				// note the time for back-off
-				fDontSendUntilMs = sfWaitAfterError + Clock.now ();
+				fDontSendUntilMs = sfWaitAfterError + Clock.now();
 			}
 		}
 	}
 
-	private synchronized boolean shouldSendNow ()
-	{
+	private synchronized boolean shouldSendNow() {
 		boolean shouldSend = false;
-		if ( fPending.size () > 0 )
-		{
-			final long nowMs = Clock.now ();
+		if (fPending.size() > 0) {
+			final long nowMs = Clock.now();
 
-			shouldSend = ( fPending.size() >= fMaxBatchSize );
-			if ( !shouldSend )
-			{
+			shouldSend = (fPending.size() >= fMaxBatchSize);
+			if (!shouldSend) {
 				final long sendAtMs = fPending.peek().timestamp + fMaxBatchAgeMs;
 				shouldSend = sendAtMs <= nowMs;
 			}
 
 			// however, wait after an error
-			shouldSend = shouldSend && nowMs >= fDontSendUntilMs; 
+			shouldSend = shouldSend && nowMs >= fDontSendUntilMs;
 		}
 		return shouldSend;
 	}
 
-	private synchronized boolean sendBatch ()
-	{
-		// it's possible for this call to be made with an empty list. in this case, just return.
-		if ( fPending.size() < 1 )
-		{
+	/**
+	 * Method to parse published JSON Objects and Arrays
+	 * 
+	 * @return JSONArray
+	 */
+	private JSONArray parseJSON() {
+		JSONArray jsonArray = new JSONArray();
+		for (TimestampedMessage m : fPending) {
+			JSONTokener jsonTokener = new JSONTokener(m.fMsg);
+			JSONObject jsonObject = null;
+			JSONArray tempjsonArray = null;
+			final char firstChar = jsonTokener.next();
+			jsonTokener.back();
+			if ('[' == firstChar) {
+				tempjsonArray = new JSONArray(jsonTokener);
+				if (null != tempjsonArray) {
+					for (int i = 0; i < tempjsonArray.length(); i++) {
+						jsonArray.put(tempjsonArray.getJSONObject(i));
+					}
+				}
+			} else {
+				jsonObject = new JSONObject(jsonTokener);
+				jsonArray.put(jsonObject);
+			}
+
+		}
+		return jsonArray;
+	}
+
+	private synchronized boolean sendBatch() {
+		// it's possible for this call to be made with an empty list. in this
+		// case, just return.
+		if (fPending.size() < 1) {
 			return true;
 		}
 
-		final long nowMs = Clock.now ();
-		
-		host = this.fHostSelector.selectBaseHost();
-		
-		final String httpurl = MRConstants.makeUrl ( host, fTopic,props.getProperty("Protocol"),props.getProperty("partition") );
-		
+		final long nowMs = Clock.now();
 
-		try
-		{
-			/*final String contentType =
-				fCompress ?
-					MRFormat.CAMBRIA_ZIP.toString () :
-					MRFormat.CAMBRIA.toString () 
-			;*/
-            
-			final ByteArrayOutputStream baseStream = new ByteArrayOutputStream ();
+		host = this.fHostSelector.selectBaseHost();
+
+		final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"),
+				props.getProperty("partition"));
+
+		try {
+			/*
+			 * final String contentType = fCompress ?
+			 * MRFormat.CAMBRIA_ZIP.toString () : MRFormat.CAMBRIA.toString () ;
+			 */
+
+			final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
 			OutputStream os = baseStream;
 			final String contentType = props.getProperty("contenttype");
-			if(contentType.equalsIgnoreCase("application/json")){
-				JSONArray jsonArray = new JSONArray();
-				for ( TimestampedMessage m : fPending )
-				{
-					JSONObject jsonObject = new JSONObject(m.fMsg);
-								
-						jsonArray.put(jsonObject);
-				
-				}
-				os.write (jsonArray.toString().getBytes() );	
+			if (contentType.equalsIgnoreCase("application/json")) {
+				JSONArray jsonArray = parseJSON();
+				os.write(jsonArray.toString().getBytes());
 				os.close();
 
-				}else if (contentType.equalsIgnoreCase("text/plain")){
-					for ( TimestampedMessage m : fPending )
-					{										
-						os.write ( m.fMsg.getBytes() );
-						os.write ( '\n' );
-					}
-					os.close ();
-				} else if (contentType.equalsIgnoreCase("application/cambria") ||  (contentType.equalsIgnoreCase("application/cambria-zip"))){
-					if ( contentType.equalsIgnoreCase("application/cambria-zip") )
-					{
-						os = new GZIPOutputStream ( baseStream );
-					}
-					for ( TimestampedMessage m : fPending )
-					{
-						
-						os.write ( ( "" + m.fPartition.length () ).getBytes() );
-						os.write ( '.' );
-						os.write ( ( "" + m.fMsg.length () ).getBytes() );
-						os.write ( '.' );
-						os.write ( m.fPartition.getBytes() );
-						os.write ( m.fMsg.getBytes() );
-						os.write ( '\n' );
-					}
-					os.close ();
-				}else{
-					for ( TimestampedMessage m : fPending )
-					{										
-						os.write ( m.fMsg.getBytes() );
-					
-					}
-					os.close ();
+			} else if (contentType.equalsIgnoreCase("text/plain")) {
+				for (TimestampedMessage m : fPending) {
+					os.write(m.fMsg.getBytes());
+					os.write('\n');
 				}
-             
-		
+				os.close();
+			} else if (contentType.equalsIgnoreCase("application/cambria")
+					|| (contentType.equalsIgnoreCase("application/cambria-zip"))) {
+				if (contentType.equalsIgnoreCase("application/cambria-zip")) {
+					os = new GZIPOutputStream(baseStream);
+				}
+				for (TimestampedMessage m : fPending) {
 
-			final long startMs = Clock.now ();
-			if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
-				
-			
-				DME2Configue();
-				
-				Thread.sleep(5);
-				getLog().info ( "sending " + fPending.size() + " msgs to " + url+subContextPath + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms"  );
-				sender.setPayload(os.toString());               
-				String dmeResponse = sender.sendAndWait(5000L);
-				
-				final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):"
-						+ dmeResponse.toString();
-				getLog().info(logLine);
-				fPending.clear();
-				return true;
-			} 
-			
-			if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
-				getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms"  );
-				final JSONObject result = postAuth(httpurl, baseStream.toByteArray(), contentType, authKey, authDate, username, password,protocolFlag);
-				//System.out.println(result.getInt("status"));
-				//Here we are checking for error response. If HTTP status
-				//code is not within the http success response code
-				//then we consider this as error and return false
-				if(result.getInt("status") < 200 || result.getInt("status") > 299) {
-					return false;
+					os.write(("" + m.fPartition.length()).getBytes());
+					os.write('.');
+					os.write(("" + m.fMsg.length()).getBytes());
+					os.write('.');
+					os.write(m.fPartition.getBytes());
+					os.write(m.fMsg.getBytes());
+					os.write('\n');
 				}
-				final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
+				os.close();
+			} else {
+				for (TimestampedMessage m : fPending) {
+					os.write(m.fMsg.getBytes());
+
+				}
+				os.close();
+			}
+
+			final long startMs = Clock.now();
+			if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
+
+				DME2Configue();
+
+				Thread.sleep(5);
+				getLog().info("sending " + fPending.size() + " msgs to " + url + subContextPath + ". Oldest: "
+						+ (nowMs - fPending.peek().timestamp) + " ms");
+				sender.setPayload(os.toString());
+				String dmeResponse = sender.sendAndWait(5000L);
+
+				final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + dmeResponse.toString();
 				getLog().info(logLine);
 				fPending.clear();
 				return true;
-			} 
-			
-			if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
-				getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms"  );
-				final JSONObject result = post(httpurl, baseStream.toByteArray(), contentType, username, password, protocolFlag);
-				
-				
-				//System.out.println(result.getInt("status"));
-				//Here we are checking for error response. If HTTP status
-				//code is not within the http success response code
-				//then we consider this as error and return false
-				if(result.getInt("status") < 200 || result.getInt("status") > 299) {
+			}
+
+			if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
+				getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
+						+ (nowMs - fPending.peek().timestamp) + " ms");
+				final JSONObject result = postAuth(httpurl, baseStream.toByteArray(), contentType, authKey, authDate,
+						username, password, protocolFlag);
+				// Here we are checking for error response. If HTTP status
+				// code is not within the http success response code
+				// then we consider this as error and return false
+				if (result.getInt("status") < 200 || result.getInt("status") > 299) {
 					return false;
 				}
 				final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
@@ -398,118 +359,118 @@
 				fPending.clear();
 				return true;
 			}
-		}
-		catch ( IllegalArgumentException x ) {
-			getLog().warn ( x.getMessage(), x );
-		} catch ( IOException x ) {
-			getLog().warn ( x.getMessage(), x );
+
+			if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
+				getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
+						+ (nowMs - fPending.peek().timestamp) + " ms");
+				final JSONObject result = post(httpurl, baseStream.toByteArray(), contentType, username, password,
+						protocolFlag);
+
+				// Here we are checking for error response. If HTTP status
+				// code is not within the http success response code
+				// then we consider this as error and return false
+				if (result.getInt("status") < 200 || result.getInt("status") > 299) {
+					return false;
+				}
+				final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
+				getLog().info(logLine);
+				fPending.clear();
+				return true;
+			}
+		} catch (IllegalArgumentException x) {
+			getLog().warn(x.getMessage(), x);
+		} catch (IOException x) {
+			getLog().warn(x.getMessage(), x);
 		} catch (HttpException x) {
-			getLog().warn ( x.getMessage(), x );
+			getLog().warn(x.getMessage(), x);
 		} catch (Exception x) {
 			getLog().warn(x.getMessage(), x);
 		}
 		return false;
 	}
 
-	public synchronized MRPublisherResponse sendBatchWithResponse () 
-	{
-		// it's possible for this call to be made with an empty list. in this case, just return.
-		if ( fPending.size() < 1 )
-		{
+	public synchronized MRPublisherResponse sendBatchWithResponse() {
+		// it's possible for this call to be made with an empty list. in this
+		// case, just return.
+		if (fPending.size() < 1) {
 			pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
 			pubResponse.setResponseMessage("No Messages to send");
 			return pubResponse;
 		}
 
-		final long nowMs = Clock.now ();
-		
-		host = this.fHostSelector.selectBaseHost();
-		
-		final String httpurl = MRConstants.makeUrl ( host, fTopic,props.getProperty("Protocol"),props.getProperty("partition") );
-		OutputStream os=null;
-		try
-		{
-			
-			final ByteArrayOutputStream baseStream = new ByteArrayOutputStream ();
-			 os = baseStream;
-			final String contentType = props.getProperty("contenttype");
-			if(contentType.equalsIgnoreCase("application/json")){
-				JSONArray jsonArray = new JSONArray();
-				for ( TimestampedMessage m : fPending )
-				{
-					JSONObject jsonObject = new JSONObject(m.fMsg);
-								
-						jsonArray.put(jsonObject);
-				
-				}
-				os.write (jsonArray.toString().getBytes() );	
-				}else if (contentType.equalsIgnoreCase("text/plain")){
-					for ( TimestampedMessage m : fPending )
-					{										
-						os.write ( m.fMsg.getBytes() );
-						os.write ( '\n' );
-					}
-				} else if (contentType.equalsIgnoreCase("application/cambria") ||  (contentType.equalsIgnoreCase("application/cambria-zip"))){
-					if ( contentType.equalsIgnoreCase("application/cambria-zip") )
-					{
-						os = new GZIPOutputStream ( baseStream );
-					}
-					for ( TimestampedMessage m : fPending )
-					{
-						
-						os.write ( ( "" + m.fPartition.length () ).getBytes() );
-						os.write ( '.' );
-						os.write ( ( "" + m.fMsg.length () ).getBytes() );
-						os.write ( '.' );
-						os.write ( m.fPartition.getBytes() );
-						os.write ( m.fMsg.getBytes() );
-						os.write ( '\n' );
-					}
-					os.close ();
-				}else{
-					for ( TimestampedMessage m : fPending )
-					{										
-						os.write ( m.fMsg.getBytes() );
-					
-					}
-				}
-             
-		
+		final long nowMs = Clock.now();
 
-			final long startMs = Clock.now ();
+		host = this.fHostSelector.selectBaseHost();
+
+		final String httpurl = MRConstants.makeUrl(host, fTopic, props.getProperty("Protocol"),
+				props.getProperty("partition"));
+		OutputStream os = null;
+		try {
+
+			final ByteArrayOutputStream baseStream = new ByteArrayOutputStream();
+			os = baseStream;
+			final String contentType = props.getProperty("contenttype");
+			if (contentType.equalsIgnoreCase("application/json")) {
+				JSONArray jsonArray = parseJSON();
+				os.write(jsonArray.toString().getBytes());
+			} else if (contentType.equalsIgnoreCase("text/plain")) {
+				for (TimestampedMessage m : fPending) {
+					os.write(m.fMsg.getBytes());
+					os.write('\n');
+				}
+			} else if (contentType.equalsIgnoreCase("application/cambria")
+					|| (contentType.equalsIgnoreCase("application/cambria-zip"))) {
+				if (contentType.equalsIgnoreCase("application/cambria-zip")) {
+					os = new GZIPOutputStream(baseStream);
+				}
+				for (TimestampedMessage m : fPending) {
+
+					os.write(("" + m.fPartition.length()).getBytes());
+					os.write('.');
+					os.write(("" + m.fMsg.length()).getBytes());
+					os.write('.');
+					os.write(m.fPartition.getBytes());
+					os.write(m.fMsg.getBytes());
+					os.write('\n');
+				}
+				os.close();
+			} else {
+				for (TimestampedMessage m : fPending) {
+					os.write(m.fMsg.getBytes());
+
+				}
+			}
+
+			final long startMs = Clock.now();
 			if (ProtocolTypeConstants.DME2.getValue().equalsIgnoreCase(protocolFlag)) {
-				
-			
+
 				try {
-				DME2Configue();
-				
-				Thread.sleep(5);
-				getLog().info ( "sending " + fPending.size() + " msgs to " + url+subContextPath + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms"  );
-				sender.setPayload(os.toString());               
-						
-				
-				String dmeResponse = sender.sendAndWait(5000L);
-				System.out.println("dmeres->"+dmeResponse);		
-				
-				
-				pubResponse = createMRPublisherResponse(dmeResponse,pubResponse);
-				
-				if(Integer.valueOf(pubResponse.getResponseCode()) < 200 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
-					
-					return pubResponse;
-				}
-				final String logLine = String.valueOf((Clock.now() - startMs))
-						+ dmeResponse.toString();
-				getLog().info(logLine);
-				fPending.clear();
-				
-				}
-				catch (DME2Exception x) {
+					DME2Configue();
+
+					Thread.sleep(5);
+					getLog().info("sending " + fPending.size() + " msgs to " + url + subContextPath + ". Oldest: "
+							+ (nowMs - fPending.peek().timestamp) + " ms");
+					sender.setPayload(os.toString());
+
+					String dmeResponse = sender.sendAndWait(5000L);
+
+					pubResponse = createMRPublisherResponse(dmeResponse, pubResponse);
+
+					if (Integer.valueOf(pubResponse.getResponseCode()) < 200
+							|| Integer.valueOf(pubResponse.getResponseCode()) > 299) {
+
+						return pubResponse;
+					}
+					final String logLine = String.valueOf((Clock.now() - startMs)) + dmeResponse.toString();
+					getLog().info(logLine);
+					fPending.clear();
+
+				} catch (DME2Exception x) {
 					getLog().warn(x.getMessage(), x);
 					pubResponse.setResponseCode(x.getErrorCode());
 					pubResponse.setResponseMessage(x.getErrorMessage());
 				} catch (URISyntaxException x) {
-					
+
 					getLog().warn(x.getMessage(), x);
 					pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
 					pubResponse.setResponseMessage(x.getMessage());
@@ -517,135 +478,127 @@
 
 					pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
 					pubResponse.setResponseMessage(x.getMessage());
-                                        logger.error("exception: ", x);
-					
+					logger.error("exception: ", x);
+
 				}
-				
+
 				return pubResponse;
-			} 
-			
+			}
+
 			if (ProtocolTypeConstants.AUTH_KEY.getValue().equalsIgnoreCase(protocolFlag)) {
-				getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms"  );
-				final String result = postAuthwithResponse(httpurl, baseStream.toByteArray(), contentType, authKey, authDate, username, password,protocolFlag);
-				//System.out.println(result.getInt("status"));
-				//Here we are checking for error response. If HTTP status
-				//code is not within the http success response code
-				//then we consider this as error and return false
-				
-				
-				pubResponse = createMRPublisherResponse(result,pubResponse);
-				
-				if(Integer.valueOf(pubResponse.getResponseCode()) < 200 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
-					
+				getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
+						+ (nowMs - fPending.peek().timestamp) + " ms");
+				final String result = postAuthwithResponse(httpurl, baseStream.toByteArray(), contentType, authKey,
+						authDate, username, password, protocolFlag);
+				// Here we are checking for error response. If HTTP status
+				// code is not within the http success response code
+				// then we consider this as error and return false
+
+				pubResponse = createMRPublisherResponse(result, pubResponse);
+
+				if (Integer.valueOf(pubResponse.getResponseCode()) < 200
+						|| Integer.valueOf(pubResponse.getResponseCode()) > 299) {
+
 					return pubResponse;
 				}
-				
+
 				final String logLine = "MR reply ok (" + (Clock.now() - startMs) + " ms):" + result.toString();
 				getLog().info(logLine);
 				fPending.clear();
 				return pubResponse;
-			} 
-			
+			}
+
 			if (ProtocolTypeConstants.AAF_AUTH.getValue().equalsIgnoreCase(protocolFlag)) {
-				getLog().info ( "sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: " + ( nowMs - fPending.peek().timestamp ) + " ms"  );
-				final String result = postWithResponse(httpurl, baseStream.toByteArray(), contentType, username, password, protocolFlag);
-				
-				//System.out.println(result.getInt("status"));
-				//Here we are checking for error response. If HTTP status
-				//code is not within the http success response code
-				//then we consider this as error and return false
-				pubResponse = createMRPublisherResponse(result,pubResponse);
-				
-				if(Integer.valueOf(pubResponse.getResponseCode()) < 200 || Integer.valueOf(pubResponse.getResponseCode()) > 299) {
-					
+				getLog().info("sending " + fPending.size() + " msgs to " + httpurl + ". Oldest: "
+						+ (nowMs - fPending.peek().timestamp) + " ms");
+				final String result = postWithResponse(httpurl, baseStream.toByteArray(), contentType, username,
+						password, protocolFlag);
+
+				// Here we are checking for error response. If HTTP status
+				// code is not within the http success response code
+				// then we consider this as error and return false
+				pubResponse = createMRPublisherResponse(result, pubResponse);
+
+				if (Integer.valueOf(pubResponse.getResponseCode()) < 200
+						|| Integer.valueOf(pubResponse.getResponseCode()) > 299) {
+
 					return pubResponse;
 				}
-				
+
 				final String logLine = String.valueOf((Clock.now() - startMs));
 				getLog().info(logLine);
 				fPending.clear();
 				return pubResponse;
 			}
-		}
-		catch ( IllegalArgumentException x ) {
-			getLog().warn ( x.getMessage(), x );
+		} catch (IllegalArgumentException x) {
+			getLog().warn(x.getMessage(), x);
 			pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
 			pubResponse.setResponseMessage(x.getMessage());
-			
-		} catch ( IOException x ) {
-			getLog().warn ( x.getMessage(), x );
+
+		} catch (IOException x) {
+			getLog().warn(x.getMessage(), x);
 			pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
 			pubResponse.setResponseMessage(x.getMessage());
-			
+
 		} catch (HttpException x) {
-			getLog().warn ( x.getMessage(), x );
+			getLog().warn(x.getMessage(), x);
 			pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
 			pubResponse.setResponseMessage(x.getMessage());
-			
+
 		} catch (Exception x) {
 			getLog().warn(x.getMessage(), x);
-			
+
 			pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
 			pubResponse.setResponseMessage(x.getMessage());
-			
+
 		}
-		
+
 		finally {
-			if (fPending.size()>0) {
-				getLog().warn ( "Send failed, " + fPending.size() + " message to send." );
+			if (fPending.size() > 0) {
+				getLog().warn("Send failed, " + fPending.size() + " message to send.");
 				pubResponse.setPendingMsgs(fPending.size());
 			}
 			if (os != null) {
 				try {
-				os.close();
+					os.close();
 				} catch (Exception x) {
 					getLog().warn(x.getMessage(), x);
 					pubResponse.setResponseCode(String.valueOf(HttpStatus.SC_INTERNAL_SERVER_ERROR));
 					pubResponse.setResponseMessage("Error in closing Output Stream");
 				}
-				}
+			}
 		}
-		
+
 		return pubResponse;
 	}
-	
-private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherResponse mrPubResponse) {
-		
-	 if (reply.isEmpty()) 
-	 {
-		 
-		 mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
-		 mrPubResponse.setResponseMessage("Please verify the Producer properties");
-	 }
-	 else if(reply.startsWith("{"))
-	 {
+
+	private MRPublisherResponse createMRPublisherResponse(String reply, MRPublisherResponse mrPubResponse) {
+
+		if (reply.isEmpty()) {
+
+			mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_BAD_REQUEST));
+			mrPubResponse.setResponseMessage("Please verify the Producer properties");
+		} else if (reply.startsWith("{")) {
 			JSONObject jObject = new JSONObject(reply);
-			if(jObject.has("message") && jObject.has("status"))
-			{
+			if (jObject.has("message") && jObject.has("status")) {
 				String message = jObject.getString("message");
-				if(null != message)
-				{
-					mrPubResponse.setResponseMessage(message);	
+				if (null != message) {
+					mrPubResponse.setResponseMessage(message);
 				}
 				mrPubResponse.setResponseCode(Integer.toString(jObject.getInt("status")));
+			} else {
+				mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
+				mrPubResponse.setResponseMessage(reply);
 			}
-			else
-			 {
-					mrPubResponse.setResponseCode(String.valueOf(HttpStatus.SC_OK));
-					mrPubResponse.setResponseMessage(reply);	
-			 }
-     }
-	 else if (reply.startsWith("<"))
-	 {
-		 String responseCode = getHTTPErrorResponseCode(reply);
-		 if( responseCode.contains("403"))
-			{
-			 responseCode = "403";
-			}	
-		 mrPubResponse.setResponseCode(responseCode);
-			mrPubResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));	
-	 }
-	 
+		} else if (reply.startsWith("<")) {
+			String responseCode = getHTTPErrorResponseCode(reply);
+			if (responseCode.contains("403")) {
+				responseCode = "403";
+			}
+			mrPubResponse.setResponseCode(responseCode);
+			mrPubResponse.setResponseMessage(getHTTPErrorResponseMessage(reply));
+		}
+
 		return mrPubResponse;
 	}
 
@@ -658,10 +611,10 @@
 	private String username;
 	private String password;
 	private String host;
-	
-	//host selector
+
+	// host selector
 	private HostSelector fHostSelector = null;
-	
+
 	private final LinkedBlockingQueue<TimestampedMessage> fPending;
 	private long fDontSendUntilMs;
 	private final ScheduledThreadPoolExecutor fExec;
@@ -684,25 +637,24 @@
 	private HashMap<String, String> DMETimeOuts;
 	private DME2Client sender;
 	public String protocolFlag = ProtocolTypeConstants.DME2.getValue();
-	public String producerFilePath;
 	private String authKey;
 	private String authDate;
 	private String handlers;
 	private Properties props;
 	public static String routerFilePath;
-	protected static final Map<String, String> headers=new HashMap<String, String>();
+	protected static final Map<String, String> headers = new HashMap<String, String>();
 	public static MultivaluedMap<String, Object> headersMap;
-	
-	
+
 	private MRPublisherResponse pubResponse;
-	
+
 	public MRPublisherResponse getPubResponse() {
 		return pubResponse;
 	}
+
 	public void setPubResponse(MRPublisherResponse pubResponse) {
 		this.pubResponse = pubResponse;
 	}
-	
+
 	public static String getRouterFilePath() {
 		return routerFilePath;
 	}
@@ -719,14 +671,6 @@
 		this.props = props;
 	}
 
-	public String getProducerFilePath() {
-		return producerFilePath;
-	}
-
-	public void setProducerFilePath(String producerFilePath) {
-		this.producerFilePath = producerFilePath;
-	}
-
 	public String getProtocolFlag() {
 		return protocolFlag;
 	}
@@ -734,14 +678,14 @@
 	public void setProtocolFlag(String protocolFlag) {
 		this.protocolFlag = protocolFlag;
 	}
-	
-	
+
 	private void DME2Configue() throws Exception {
 		try {
-			
-		/*	FileReader reader = new FileReader(new File (producerFilePath));
-			Properties props = new Properties();		
-			props.load(reader);*/
+
+			/*
+			 * FileReader reader = new FileReader(new File (producerFilePath));
+			 * Properties props = new Properties(); props.load(reader);
+			 */
 			latitude = props.getProperty("Latitude");
 			longitude = props.getProperty("Longitude");
 			version = props.getProperty("Version");
@@ -749,41 +693,43 @@
 			env = props.getProperty("Environment");
 			partner = props.getProperty("Partner");
 			routeOffer = props.getProperty("routeOffer");
-			subContextPath = props.getProperty("SubContextPath")+fTopic;
-			/*if(props.getProperty("partition")!=null && !props.getProperty("partition").equalsIgnoreCase("")){
-				subContextPath=subContextPath+"?partitionKey="+props.getProperty("partition");
-			}*/			
+			subContextPath = props.getProperty("SubContextPath") + fTopic;
+			/*
+			 * if(props.getProperty("partition")!=null &&
+			 * !props.getProperty("partition").equalsIgnoreCase("")){
+			 * subContextPath=subContextPath+"?partitionKey="+props.getProperty(
+			 * "partition"); }
+			 */
 			protocol = props.getProperty("Protocol");
 			methodType = props.getProperty("MethodType");
 			dmeuser = props.getProperty("username");
 			dmepassword = props.getProperty("password");
 			contentType = props.getProperty("contenttype");
 			handlers = props.getProperty("sessionstickinessrequired");
-			routerFilePath= props.getProperty("DME2preferredRouterFilePath");
-			
+			routerFilePath = props.getProperty("DME2preferredRouterFilePath");
+
 			/**
-			 * Changes to DME2Client url to use Partner for auto failover between data centers
-			 * When Partner value is not provided use the routeOffer value for auto failover within a cluster 
+			 * Changes to DME2Client url to use Partner for auto failover
+			 * between data centers When Partner value is not provided use the
+			 * routeOffer value for auto failover within a cluster
 			 */
-			
 
 			String partitionKey = props.getProperty("partition");
-			
-			if (partner != null && !partner.isEmpty() ) 
-			{ 
-				url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&partner="+partner; 
-                if(partitionKey!=null && !partitionKey.equalsIgnoreCase("")){
-                    url = url + "&partitionKey=" + partitionKey;
-                }
+
+			if (partner != null && !partner.isEmpty()) {
+				url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&partner="
+						+ partner;
+				if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) {
+					url = url + "&partitionKey=" + partitionKey;
+				}
+			} else if (routeOffer != null && !routeOffer.isEmpty()) {
+				url = protocol + "://" + serviceName + "?version=" + version + "&envContext=" + env + "&routeoffer="
+						+ routeOffer;
+				if (partitionKey != null && !partitionKey.equalsIgnoreCase("")) {
+					url = url + "&partitionKey=" + partitionKey;
+				}
 			}
-			else if (routeOffer!=null && !routeOffer.isEmpty()) 
-			{ 
-				url = protocol + "://"+serviceName+"?version="+version+"&envContext="+env+"&routeoffer="+routeOffer;
-                if(partitionKey!=null && !partitionKey.equalsIgnoreCase("")){
-                    url = url + "&partitionKey=" + partitionKey;
-                }
-			}
-			 
+
 			DMETimeOuts = new HashMap<String, String>();
 			DMETimeOuts.put("AFT_DME2_EP_READ_TIMEOUT_MS", props.getProperty("AFT_DME2_EP_READ_TIMEOUT_MS"));
 			DMETimeOuts.put("AFT_DME2_ROUNDTRIP_TIMEOUT_MS", props.getProperty("AFT_DME2_ROUNDTRIP_TIMEOUT_MS"));
@@ -791,56 +737,56 @@
 			DMETimeOuts.put("Content-Type", contentType);
 			System.setProperty("AFT_LATITUDE", latitude);
 			System.setProperty("AFT_LONGITUDE", longitude);
-			System.setProperty("AFT_ENVIRONMENT",props.getProperty("AFT_ENVIRONMENT"));
-			//System.setProperty("DME2.DEBUG", "true");
-		//	System.setProperty("AFT_DME2_HTTP_EXCHANGE_TRACE_ON", "true");
-			//System.out.println("XXXXXX"+url);
-			
-			//SSL changes
-			System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS",
-					"SSLv3,TLSv1,TLSv1.1");
+			System.setProperty("AFT_ENVIRONMENT", props.getProperty("AFT_ENVIRONMENT"));
+			// System.setProperty("DME2.DEBUG", "true");
+
+			// SSL changes
+			// System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS",
+			// "SSLv3,TLSv1,TLSv1.1");
+			System.setProperty("AFT_DME2_CLIENT_SSL_INCLUDE_PROTOCOLS", "TLSv1.1,TLSv1.2");
 			System.setProperty("AFT_DME2_CLIENT_IGNORE_SSL_CONFIG", "false");
 			System.setProperty("AFT_DME2_CLIENT_KEYSTORE_PASSWORD", "changeit");
-			
-			//SSL changes
-			
+
+			// SSL changes
+
 			sender = new DME2Client(new URI(url), 5000L);
-				
+
 			sender.setAllowAllHttpReturnCodes(true);
 			sender.setMethod(methodType);
-			sender.setSubContext(subContextPath);	
+			sender.setSubContext(subContextPath);
 			sender.setCredentials(dmeuser, dmepassword);
 			sender.setHeaders(DMETimeOuts);
-			if(handlers.equalsIgnoreCase("yes")){
-				sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
-				sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
+			if (handlers.equalsIgnoreCase("yes")) {
+				sender.addHeader("AFT_DME2_EXCHANGE_REQUEST_HANDLERS",
+						props.getProperty("AFT_DME2_EXCHANGE_REQUEST_HANDLERS"));
+				sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS",
+						props.getProperty("AFT_DME2_EXCHANGE_REPLY_HANDLERS"));
 				sender.addHeader("AFT_DME2_REQ_TRACE_ON", props.getProperty("AFT_DME2_REQ_TRACE_ON"));
-				}else{
-					sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler");
-				}
+			} else {
+				sender.addHeader("AFT_DME2_EXCHANGE_REPLY_HANDLERS", "com.att.nsa.mr.dme.client.HeaderReplyHandler");
+			}
 		} catch (DME2Exception x) {
 			getLog().warn(x.getMessage(), x);
-			throw new DME2Exception(x.getErrorCode(),x.getErrorMessage());
+			throw new DME2Exception(x.getErrorCode(), x.getErrorMessage());
 		} catch (URISyntaxException x) {
-			
+
 			getLog().warn(x.getMessage(), x);
-			throw new URISyntaxException(url,x.getMessage());
+			throw new URISyntaxException(url, x.getMessage());
 		} catch (Exception x) {
 
 			getLog().warn(x.getMessage(), x);
 			throw new Exception(x.getMessage());
 		}
 	}
-	
-	private MRSimplerBatchPublisher ( Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs, boolean compress) throws MalformedURLException
-	{
-		super ( hosts );
 
-		if ( topic == null || topic.length() < 1 )
-		{
-			throw new IllegalArgumentException ( "A topic must be provided." );
+	private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs,
+			boolean compress) throws MalformedURLException {
+		super(hosts);
+
+		if (topic == null || topic.length() < 1) {
+			throw new IllegalArgumentException("A topic must be provided.");
 		}
-		
+
 		fHostSelector = new HostSelector(hosts, null);
 		fClosed = false;
 		fTopic = topic;
@@ -848,49 +794,45 @@
 		fMaxBatchAgeMs = maxBatchAgeMs;
 		fCompress = compress;
 
-		fPending = new LinkedBlockingQueue<TimestampedMessage> ();
+		fPending = new LinkedBlockingQueue<TimestampedMessage>();
 		fDontSendUntilMs = 0;
-		fExec = new ScheduledThreadPoolExecutor ( 1 );
+		fExec = new ScheduledThreadPoolExecutor(1);
 		pubResponse = new MRPublisherResponse();
-		
-	}
-	
-	private MRSimplerBatchPublisher ( Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs, boolean compress, boolean allowSelfSignedCerts, int httpThreadOccurnace ) throws MalformedURLException
-	{
-		super ( hosts );
 
-		if ( topic == null || topic.length() < 1 )
-		{
-			throw new IllegalArgumentException ( "A topic must be provided." );
+	}
+
+	private MRSimplerBatchPublisher(Collection<String> hosts, String topic, int maxBatchSize, long maxBatchAgeMs,
+			boolean compress, boolean allowSelfSignedCerts, int httpThreadOccurnace) throws MalformedURLException {
+		super(hosts);
+
+		if (topic == null || topic.length() < 1) {
+			throw new IllegalArgumentException("A topic must be provided.");
 		}
-		
+
 		fHostSelector = new HostSelector(hosts, null);
 		fClosed = false;
 		fTopic = topic;
 		fMaxBatchSize = maxBatchSize;
 		fMaxBatchAgeMs = maxBatchAgeMs;
 		fCompress = compress;
-		threadOccuranceTime=httpThreadOccurnace;
-		fPending = new LinkedBlockingQueue<TimestampedMessage> ();
+		threadOccuranceTime = httpThreadOccurnace;
+		fPending = new LinkedBlockingQueue<TimestampedMessage>();
 		fDontSendUntilMs = 0;
-		fExec = new ScheduledThreadPoolExecutor ( 1 );
-		fExec.scheduleAtFixedRate ( new Runnable()
-		{
+		fExec = new ScheduledThreadPoolExecutor(1);
+		fExec.scheduleAtFixedRate(new Runnable() {
 			@Override
-			public void run ()
-			{
-				send ( false );
+			public void run() {
+				send(false);
 			}
-		}, 100, threadOccuranceTime, TimeUnit.MILLISECONDS );
+		}, 100, threadOccuranceTime, TimeUnit.MILLISECONDS);
 	}
 
-	private static class TimestampedMessage extends message
-	{
-		public TimestampedMessage ( message m )
-		{
-			super ( m );
+	private static class TimestampedMessage extends message {
+		public TimestampedMessage(message m) {
+			super(m);
 			timestamp = Clock.now();
 		}
+
 		public final long timestamp;
 	}
 
@@ -941,5 +883,5 @@
 	public void setAuthDate(String authDate) {
 		this.authDate = authDate;
 	}
-	
+
 }
diff --git a/src/main/java/com/att/nsa/mr/dme/client/SimpleExampleConsumer.java b/src/main/java/com/att/nsa/mr/dme/client/SimpleExampleConsumer.java
index bdd15d4..2886db5 100644
--- a/src/main/java/com/att/nsa/mr/dme/client/SimpleExampleConsumer.java
+++ b/src/main/java/com/att/nsa/mr/dme/client/SimpleExampleConsumer.java
@@ -38,14 +38,14 @@
 
 	private static final Logger logger = LoggerFactory.getLogger(SimpleExampleConsumer.class);
 
-    private SimpleExampleConsumer() {
-    }
+	private SimpleExampleConsumer() {
+	}
 
 	public static void main(String[] args) {
 
 		long count = 0;
 		long nextReport = 5000;
-                String key;
+		String key;
 
 		final long startMs = System.currentTimeMillis();
 
@@ -54,24 +54,24 @@
 			final MRConsumer cc = MRClientFactory.createConsumer("D:\\SG\\consumer.properties");
 			while (true) {
 				for (String msg : cc.fetch()) {
-                                        logger.debug("Message Received: " + msg);
+					logger.debug("Message Received: " + msg);
 				}
 				// Header for DME2 Call.
 				MultivaluedMap<String, Object> headersMap = MRClientFactory.HTTPHeadersMap;
-				for (MultivaluedMap.Entry<String,List<Object>> entry: headersMap.entrySet()) {
-                                    key = entry.getKey();
-                                    logger.debug("Header Key " + key);
-                                    logger.debug("Header Value " + headersMap.get(key));
+				for (MultivaluedMap.Entry<String, List<Object>> entry : headersMap.entrySet()) {
+					key = entry.getKey();
+					logger.debug("Header Key " + key);
+					logger.debug("Header Value " + headersMap.get(key));
 				}
 				// Header for HTTP Call.
-				
-				 Map<String, String> dme2headersMap=MRClientFactory.DME2HeadersMap;
-                                 for(Map.Entry<String,String> entry: dme2headersMap.entrySet()) {
-                                     key = entry.getKey();
-                                     logger.debug("Header Key " + key);
-                                     logger.debug("Header Value " + dme2headersMap.get(key));
-                                 }
-				 
+
+				Map<String, String> dme2headersMap = MRClientFactory.DME2HeadersMap;
+				for (Map.Entry<String, String> entry : dme2headersMap.entrySet()) {
+					key = entry.getKey();
+					logger.debug("Header Key " + key);
+					logger.debug("Header Value " + dme2headersMap.get(key));
+				}
+
 				if (count > nextReport) {
 					nextReport += 5000;
 
@@ -79,11 +79,10 @@
 					final long elapsedMs = endMs - startMs;
 					final double elapsedSec = elapsedMs / 1000.0;
 					final double eps = count / elapsedSec;
-					logger.error("Consumed " + count + " in " + elapsedSec + "; " + eps + " eps");
 				}
 			}
 		} catch (Exception x) {
-                    logger.error(x.getClass().getName() + ": " + x.getMessage());
+			logger.error(x.getClass().getName() + ": " + x.getMessage());
 		}
 	}
 }
diff --git a/src/main/java/com/att/nsa/mr/test/clients/ProtocolTypeConstants.java b/src/main/java/com/att/nsa/mr/test/clients/ProtocolTypeConstants.java
index 6e86d47..a4a176e 100644
--- a/src/main/java/com/att/nsa/mr/test/clients/ProtocolTypeConstants.java
+++ b/src/main/java/com/att/nsa/mr/test/clients/ProtocolTypeConstants.java
@@ -29,11 +29,9 @@
  *
  */
 public enum ProtocolTypeConstants {
-	
-	DME2("DME2"),
-	AAF_AUTH("HTTPAAF"),
-	AUTH_KEY("HTTPAUTH");
-	
+
+	DME2("DME2"), AAF_AUTH("HTTPAAF"), AUTH_KEY("HTTPAUTH"), HTTPNOAUTH("HTTPNOAUTH");
+
 	private String value;
 
 	private ProtocolTypeConstants(String value) {
diff --git a/src/main/java/com/att/nsa/mr/test/clients/SimpleExampleConsumer.java b/src/main/java/com/att/nsa/mr/test/clients/SimpleExampleConsumer.java
index 5ae36d2..0e3ee5a 100644
--- a/src/main/java/com/att/nsa/mr/test/clients/SimpleExampleConsumer.java
+++ b/src/main/java/com/att/nsa/mr/test/clients/SimpleExampleConsumer.java
@@ -33,57 +33,52 @@
 import com.att.nsa.mr.client.MRClientFactory;
 import com.att.nsa.mr.client.MRConsumer;
 
-public class SimpleExampleConsumer
-{
+public class SimpleExampleConsumer {
 
-	static FileWriter routeWriter= null;
-	static Properties props=null;	
-	static FileReader routeReader=null;
-	public static void main ( String[] args )
-	{
+	static FileWriter routeWriter = null;
+	static Properties props = null;
+	static FileReader routeReader = null;
+
+	public static void main(String[] args) {
 		final Logger LOG = LoggerFactory.getLogger(SimpleExampleConsumer.class);
-	
+
 		long count = 0;
 		long nextReport = 5000;
 
-		final long startMs = System.currentTimeMillis ();
-				
-		try
-		{
-			String routeFilePath="/src/main/resources/dme2/preferredRoute.txt";
-						        
-			
-			File fo= new File(routeFilePath);
-			if(!fo.exists()){
-					routeWriter=new FileWriter(new File (routeFilePath));
-			}	
-			routeReader= new FileReader(new File (routeFilePath));
-			props= new Properties();
-			final MRConsumer cc = MRClientFactory.createConsumer ( "/src/main/resources/dme2/consumer.properties" );
-			while ( true )
-			{
-				for ( String msg : cc.fetch () )
-				{
-					//System.out.println ( "" + (++count) + ": " + msg );
+		final long startMs = System.currentTimeMillis();
+
+		try {
+			String routeFilePath = "/src/main/resources/dme2/preferredRoute.txt";
+
+			File fo = new File(routeFilePath);
+			if (!fo.exists()) {
+				routeWriter = new FileWriter(new File(routeFilePath));
+			}
+			routeReader = new FileReader(new File(routeFilePath));
+			props = new Properties();
+			final MRConsumer cc = MRClientFactory.createConsumer("/src/main/resources/dme2/consumer.properties");
+			int i = 0;
+			while (i < 10) {
+				Thread.sleep(2);
+				i++;
+				for (String msg : cc.fetch()) {
+					// System.out.println ( "" + (++count) + ": " + msg );
 					System.out.println(msg);
 				}
-	
-				if ( count > nextReport )
-				{
+
+				if (count > nextReport) {
 					nextReport += 5000;
-	
-					final long endMs = System.currentTimeMillis ();
+
+					final long endMs = System.currentTimeMillis();
 					final long elapsedMs = endMs - startMs;
 					final double elapsedSec = elapsedMs / 1000.0;
 					final double eps = count / elapsedSec;
-					System.out.println ( "Consumed " + count + " in " + elapsedSec + "; " + eps + " eps" );
+					System.out.println("Consumed " + count + " in " + elapsedSec + "; " + eps + " eps");
 				}
 			}
-		}
-		catch ( Exception x )
-		{
-			System.err.println ( x.getClass().getName () + ": " + x.getMessage () );
-                    LOG.error("exception: ", x);
+		} catch (Exception x) {
+			System.err.println(x.getClass().getName() + ": " + x.getMessage());
+			LOG.error("exception: ", x);
 		}
 	}
 }
diff --git a/version.properties b/version.properties
index d6e413c..e1118ab 100644
--- a/version.properties
+++ b/version.properties
@@ -27,7 +27,7 @@
 
 major=1
 minor=1
-patch=0
+patch=1
 
 base_version=${major}.${minor}.${patch}