[DMAAP-48] Initial code import

Change-Id: I3e65371093487d7de167ec6c29f327f366f1e299
Signed-off-by: sg481n <sg481n@att.com>
diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/Delivery.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/Delivery.java
new file mode 100644
index 0000000..d0e88ec
--- /dev/null
+++ b/datarouter-node/src/main/java/com/att/research/datarouter/node/Delivery.java
@@ -0,0 +1,253 @@
+/*******************************************************************************

+ * ============LICENSE_START==================================================

+ * * org.onap.dmaap

+ * * ===========================================================================

+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.

+ * * ===========================================================================

+ * * Licensed under the Apache License, Version 2.0 (the "License");

+ * * you may not use this file except in compliance with the License.

+ * * You may obtain a copy of the License at

+ * * 

+ *  *      http://www.apache.org/licenses/LICENSE-2.0

+ * * 

+ *  * Unless required by applicable law or agreed to in writing, software

+ * * distributed under the License is distributed on an "AS IS" BASIS,

+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * * See the License for the specific language governing permissions and

+ * * limitations under the License.

+ * * ============LICENSE_END====================================================

+ * *

+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.

+ * *

+ ******************************************************************************/

+

+package com.att.research.datarouter.node;

+

+import java.util.*;

+import java.io.*;

+import org.apache.log4j.Logger;

+

+/**

+ *	Main control point for delivering files to destinations.

+ *	<p>

+ *	The Delivery class manages assignment of delivery threads to delivery

+ *	queues and creation and destruction of delivery queues as

+ *	configuration changes.  DeliveryQueues are assigned threads based on a

+ *	modified round-robin approach giving priority to queues with more work

+ *	as measured by both bytes to deliver and files to deliver and lower

+ *	priority to queues that already have delivery threads working.

+ *	A delivery thread continues to work for a delivery queue as long as

+ *	that queue has more files to deliver.

+ */

+public class Delivery {

+	private static Logger logger = Logger.getLogger("com.att.research.datarouter.node.Delivery");

+	private static class DelItem implements Comparable<DelItem>	{

+		private String pubid;

+		private String spool;

+		public int compareTo(DelItem x) {

+			int i = pubid.compareTo(x.pubid);

+			if (i == 0) {

+				i = spool.compareTo(x.spool);

+			}

+			return(i);

+		}

+		public String getPublishId() {

+			return(pubid);

+		}

+		public String getSpool() {

+			return(spool);

+		}

+		public DelItem(String pubid, String spool) {

+			this.pubid = pubid;

+			this.spool = spool;

+		}

+	}

+	private double	fdstart;

+	private double	fdstop;

+	private int	threads;

+	private int	curthreads;

+	private NodeConfigManager	config;

+	private Hashtable<String, DeliveryQueue>	dqs = new Hashtable<String, DeliveryQueue>();

+	private DeliveryQueue[]	queues = new DeliveryQueue[0];

+	private int	qpos = 0;

+	private long	nextcheck;

+	private Runnable	cmon = new Runnable() {

+		public void run() {

+			checkconfig();

+		}

+	};

+	/**

+	 *	Constructs a new Delivery system using the specified configuration manager.

+	 *	@param config	The configuration manager for this delivery system.

+	 */

+	public Delivery(NodeConfigManager config) {

+		this.config = config;

+		config.registerConfigTask(cmon);

+		checkconfig();

+	}

+	private void cleardir(String dir) {

+		if (dqs.get(dir) != null) {

+			return;

+		}

+		File fdir = new File(dir);

+		for (File junk: fdir.listFiles()) {

+			if (junk.isFile()) {

+				junk.delete();

+			}

+		}

+		fdir.delete();

+	}

+	private void freeDiskCheck() {

+		File spoolfile = new File(config.getSpoolBase());

+		long tspace = spoolfile.getTotalSpace();

+		long start = (long)(tspace * fdstart);

+		long stop = (long)(tspace * fdstop);

+		long cur = spoolfile.getUsableSpace();

+		if (cur >= start) {

+			return;

+		}

+		Vector<DelItem> cv = new Vector<DelItem>();

+		for (String sdir: dqs.keySet()) {

+			for (String meta: (new File(sdir)).list()) {

+				if (!meta.endsWith(".M") || meta.charAt(0) == '.') {

+					continue;

+				}

+				cv.add(new DelItem(meta.substring(0, meta.length() - 2), sdir));

+			}

+		}

+		DelItem[] items = cv.toArray(new DelItem[cv.size()]);

+		Arrays.sort(items);

+		logger.info("NODE0501 Free disk space below red threshold.  current=" + cur + " red=" + start + " total=" + tspace);

+		for (DelItem item: items) {

+			long amount = dqs.get(item.getSpool()).cancelTask(item.getPublishId());

+			logger.info("NODE0502 Attempting to discard " + item.getSpool() + "/" + item.getPublishId() + " to free up disk");

+			if (amount > 0) {

+				cur += amount;

+				if (cur >= stop) {

+					cur = spoolfile.getUsableSpace();

+				}

+				if (cur >= stop) {

+					logger.info("NODE0503 Free disk space at or above yellow threshold.  current=" + cur + " yellow=" + stop + " total=" + tspace);

+					return;

+				}

+			}

+		}

+		cur = spoolfile.getUsableSpace();

+		if (cur >= stop) {

+			logger.info("NODE0503 Free disk space at or above yellow threshold.  current=" + cur + " yellow=" + stop + " total=" + tspace);

+			return;

+		}

+		logger.warn("NODE0504 Unable to recover sufficient disk space to reach green status.  current=" + cur + " yellow=" + stop + " total=" + tspace);

+	}

+	private void cleardirs() {

+		String basedir = config.getSpoolBase();

+		String nbase = basedir + "/n";

+		for (String nodedir: (new File(nbase)).list()) {

+			if (!nodedir.startsWith(".")) {

+				cleardir(nbase + "/" + nodedir);

+			}

+		}

+		String sxbase = basedir + "/s";

+		for (String sxdir: (new File(sxbase)).list()) {

+			if (sxdir.startsWith(".")) {

+				continue;

+			}

+			File sxf = new File(sxbase + "/" + sxdir);

+			for (String sdir: sxf.list()) {

+				if (!sdir.startsWith(".")) {

+					cleardir(sxbase + "/" + sxdir + "/" + sdir);

+				}

+			}

+			sxf.delete();  // won't if anything still in it

+		}

+	}

+	private synchronized void checkconfig() {

+		if (!config.isConfigured()) {

+			return;

+		}

+		fdstart = config.getFreeDiskStart();

+		fdstop = config.getFreeDiskStop();

+		threads = config.getDeliveryThreads();

+		if (threads < 1) {

+			threads = 1;

+		}

+		DestInfo[] alldis = config.getAllDests();

+		DeliveryQueue[] nqs = new DeliveryQueue[alldis.length];

+		qpos = 0;

+		Hashtable<String, DeliveryQueue> ndqs = new Hashtable<String, DeliveryQueue>();

+		for (DestInfo di: alldis) {

+			String spl = di.getSpool();

+			DeliveryQueue dq = dqs.get(spl);

+			if (dq == null) {

+				dq = new DeliveryQueue(config, di);

+			} else {

+				dq.config(di);

+			}

+			ndqs.put(spl, dq);

+			nqs[qpos++] = dq;

+		}

+		queues = nqs;

+		dqs = ndqs;

+		cleardirs();

+		while (curthreads < threads) {

+			curthreads++;

+			(new Thread() {

+				{

+					setName("Delivery Thread");

+				}

+				public void run() {

+					dodelivery();

+				}

+			}).start();

+		}

+		nextcheck = 0;

+		notify();

+	}

+	private void dodelivery() {

+		DeliveryQueue dq;

+		while ((dq = getNextQueue()) != null) {

+			dq.run();

+		}

+	}

+	private synchronized DeliveryQueue getNextQueue() {

+		while (true) {

+			if (curthreads > threads) {

+				curthreads--;

+				return(null);

+			}

+			if (qpos < queues.length) {

+				DeliveryQueue dq = queues[qpos++];

+				if (dq.isSkipSet()) {

+					continue;

+				}

+				nextcheck = 0;

+				notify();

+				return(dq);

+			}

+			long now = System.currentTimeMillis();

+			if (now < nextcheck) {

+				try {

+					wait(nextcheck + 500 - now);

+				} catch (Exception e) {

+				}

+				now = System.currentTimeMillis();

+			}

+			if (now >= nextcheck) {

+				nextcheck = now + 5000;

+				qpos = 0;

+				freeDiskCheck();

+			}

+		}

+	}

+	/**

+	 *	Reset the retry timer for a delivery queue

+	 */

+	public synchronized void resetQueue(String spool) {

+		if (spool != null) {

+			DeliveryQueue dq = dqs.get(spool);

+			if (dq != null) {

+				dq.resetQueue();

+			}

+		}

+	}

+}

diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/DeliveryQueue.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/DeliveryQueue.java
new file mode 100644
index 0000000..71c7797
--- /dev/null
+++ b/datarouter-node/src/main/java/com/att/research/datarouter/node/DeliveryQueue.java
@@ -0,0 +1,348 @@
+/*******************************************************************************

+ * ============LICENSE_START==================================================

+ * * org.onap.dmaap

+ * * ===========================================================================

+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.

+ * * ===========================================================================

+ * * Licensed under the Apache License, Version 2.0 (the "License");

+ * * you may not use this file except in compliance with the License.

+ * * You may obtain a copy of the License at

+ * * 

+ *  *      http://www.apache.org/licenses/LICENSE-2.0

+ * * 

+ *  * Unless required by applicable law or agreed to in writing, software

+ * * distributed under the License is distributed on an "AS IS" BASIS,

+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * * See the License for the specific language governing permissions and

+ * * limitations under the License.

+ * * ============LICENSE_END====================================================

+ * *

+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.

+ * *

+ ******************************************************************************/

+

+

+package com.att.research.datarouter.node;

+

+import java.io.*;

+import java.util.*;

+

+/**

+ *	Mechanism for monitoring and controlling delivery of files to a destination.

+ *	<p>

+ *	The DeliveryQueue class maintains lists of DeliveryTasks for a single

+ *	destination (a subscription or another data router node) and assigns

+ *	delivery threads to try to deliver them.  It also maintains a delivery

+ *	status that causes it to back off on delivery attempts after a failure.

+ *	<p>

+ *	If the most recent delivery result was a failure, then no more attempts

+ *	will be made for a period of time.  Initially, and on the first failure

+ *	following a success, this delay will be DeliveryQueueHelper.getInitFailureTimer() (milliseconds).

+ *	If, after this delay, additional failures occur, each failure will

+ *	multiply the delay by DeliveryQueueHelper.getFailureBackoff() up to a

+ *	maximum delay specified by DeliveryQueueHelper.getMaxFailureTimer().

+ *	Note that this behavior applies to the delivery queue as a whole and not

+ *	to individual files in the queue.  If multiple files are being

+ *	delivered and one fails, the delay will be started.  If a second

+ *	delivery fails while the delay was active, it will not change the delay

+ *	or change the duration of any subsequent delay.

+ *	If, however, it succeeds, it will cancel the delay.

+ *	<p>

+ *	The queue maintains 3 collections of files to deliver: A todo list of

+ *	files that will be attempted, a working set of files that are being

+ *	attempted, and a retry set of files that were attempted and failed.

+ *	Whenever the todo list is empty and needs to be refilled, a scan of the

+ *	spool directory is made and the file names sorted.  Any files in the working set are ignored.

+ *	If a DeliveryTask for the file is in the retry set, then that delivery

+ *	task is placed on the todo list.  Otherwise, a new DeliveryTask for the

+ *	file is created and placed on the todo list.

+ *	If, when a DeliveryTask is about to be removed from the todo list, its

+ *	age exceeds DeliveryQueueHelper.getExpirationTimer(), then it is instead

+ *	marked as expired.

+ *	<p>

+ *	A delivery queue also maintains a skip flag.  This flag is true if the

+ *	failure timer is active or if no files are found in a directory scan.

+ */

+public class DeliveryQueue implements Runnable, DeliveryTaskHelper	{

+	private DeliveryQueueHelper	dqh;

+	private DestInfo	di;

+	private Hashtable<String, DeliveryTask>	working = new Hashtable<String, DeliveryTask>();

+	private Hashtable<String, DeliveryTask> retry = new Hashtable<String, DeliveryTask>();

+	private int	todoindex;

+	private boolean	failed;

+	private long	failduration;

+	private long	resumetime;

+	File	dir;

+	private Vector<DeliveryTask> todo = new Vector<DeliveryTask>();

+	/**

+	 *	Try to cancel a delivery task.

+	 *	@return	The length of the task in bytes or 0 if the task cannot be cancelled.

+	 */

+	public synchronized long cancelTask(String pubid) {

+		if (working.get(pubid) != null) {

+			return(0);

+		}

+		DeliveryTask dt = retry.get(pubid);

+		if (dt == null) {

+			for (int i = todoindex; i < todo.size(); i++) {

+				DeliveryTask xdt = todo.get(i);

+				if (xdt.getPublishId().equals(pubid)) {

+					dt = xdt;

+					break;

+				}

+			}

+		}

+		if (dt == null) {

+			dt = new DeliveryTask(this, pubid);

+			if (dt.getFileId() == null) {

+				return(0);

+			}

+		}

+		if (dt.isCleaned()) {

+			return(0);

+		}

+		StatusLog.logExp(dt.getPublishId(), dt.getFeedId(), dt.getSubId(), dt.getURL(), dt.getMethod(), dt.getCType(), dt.getLength(), "diskFull", dt.getAttempts());

+		dt.clean();

+		return(dt.getLength());

+	}

+	/**

+	 *	Mark that a delivery task has succeeded.

+	 */

+	public synchronized void markSuccess(DeliveryTask task) {

+		working.remove(task.getPublishId());

+		task.clean();

+		failed = false;

+		failduration = 0;

+	}

+	/**

+	 *	Mark that a delivery task has expired.

+	 */

+	public synchronized void markExpired(DeliveryTask task) {

+		task.clean();

+	}

+	/**

+	 *	Mark that a delivery task has failed permanently.

+	 */

+	public synchronized void markFailNoRetry(DeliveryTask task) {

+		working.remove(task.getPublishId());

+		task.clean();

+		failed = false;

+		failduration = 0;

+	}

+	private void fdupdate() {

+		if (!failed) {

+			failed = true;

+			if (failduration == 0) {

+				failduration = dqh.getInitFailureTimer();

+			}

+			resumetime = System.currentTimeMillis() + failduration;

+			long maxdur = dqh.getMaxFailureTimer();

+			failduration = (long)(failduration * dqh.getFailureBackoff());

+			if (failduration > maxdur) {

+				failduration = maxdur;

+			}

+		}

+	}

+	/**

+	 *	Mark that a delivery task has been redirected.

+	 */

+	public synchronized void markRedirect(DeliveryTask task) {

+		working.remove(task.getPublishId());

+		retry.put(task.getPublishId(), task);

+	}

+	/**

+	 *	Mark that a delivery task has temporarily failed.

+	 */

+	public synchronized void markFailWithRetry(DeliveryTask task) {

+		working.remove(task.getPublishId());

+		retry.put(task.getPublishId(), task);

+		fdupdate();

+	}

+	/**

+	 *	Get the next task.

+	 */

+	public synchronized DeliveryTask getNext() {

+		DeliveryTask ret = peekNext();

+		if (ret != null) {

+			todoindex++;

+			working.put(ret.getPublishId(), ret);

+		}

+		return(ret);

+	}

+	/**

+	 *	Peek at the next task.

+	 */

+	public synchronized DeliveryTask peekNext() {

+		long now = System.currentTimeMillis();

+		long mindate = now - dqh.getExpirationTimer();

+		if (failed) {

+			if (now > resumetime) {

+				failed = false;

+			} else {

+				return(null);

+			}

+		}

+		while (true) {

+			if (todoindex >= todo.size()) {

+				todoindex = 0;

+				todo = new Vector<DeliveryTask>();

+				String[] files = dir.list();

+				Arrays.sort(files);

+				for (String fname: files) {

+					if (!fname.endsWith(".M")) {

+						continue;

+					}

+					String fname2 = fname.substring(0, fname.length() - 2);

+					long pidtime = 0;

+					int dot = fname2.indexOf('.');

+					if (dot < 1) {

+						continue;

+					}

+					try {

+						pidtime = Long.parseLong(fname2.substring(0, dot));

+					} catch (Exception e) {

+					}

+					if (pidtime < 1000000000000L) {

+						continue;

+					}

+					if (working.get(fname2) != null) {

+						continue;

+					}

+					DeliveryTask dt = retry.get(fname2);

+					if (dt == null) {

+						dt = new DeliveryTask(this, fname2);

+					}

+					todo.add(dt);

+				}

+				retry = new Hashtable<String, DeliveryTask>();

+			}

+			if (todoindex < todo.size()) {

+				DeliveryTask dt = todo.get(todoindex);

+				if (dt.isCleaned()) {

+					todoindex++;

+					continue;

+				}

+				if (dt.getDate() >= mindate) {

+					return(dt);

+				}

+				todoindex++;

+				reportExpiry(dt);

+				continue;

+			}

+			return(null);

+		}

+	}

+	/**

+	 *	Create a delivery queue for a given destination info

+	 */

+	public DeliveryQueue(DeliveryQueueHelper dqh, DestInfo di) {

+		this.dqh = dqh;

+		this.di = di;

+		dir = new File(di.getSpool());

+		dir.mkdirs();

+	}

+	/**

+	 *	Update the destination info for this delivery queue

+	 */

+	public void config(DestInfo di) {

+		this.di = di;

+	}

+	/**

+	 *	Get the dest info

+	 */

+	public DestInfo getDestInfo() {

+		return(di);

+	}

+	/**

+	 *	Get the config manager

+	 */

+	public DeliveryQueueHelper getConfig() {

+		return(dqh);

+	}

+	/**

+	 *	Exceptional condition occurred during delivery

+	 */

+	public void reportDeliveryExtra(DeliveryTask task, long sent) {

+		StatusLog.logDelExtra(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getLength(), sent);

+	}

+	/**

+	 *	Message too old to deliver

+	 */

+	public void reportExpiry(DeliveryTask task) {

+		StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), "retriesExhausted", task.getAttempts());

+		markExpired(task);

+	}

+	/**

+	 *	Completed a delivery attempt

+	 */

+	public void reportStatus(DeliveryTask task, int status, String xpubid, String location) {

+		if (status < 300) {

+			StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), status, xpubid);

+			markSuccess(task);

+		} else if (status < 400 && dqh.isFollowRedirects()) {

+			StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), status, location);

+			if (dqh.handleRedirection(di, location, task.getFileId())) {

+				markRedirect(task);

+			} else {

+				StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), "notRetryable", task.getAttempts());

+				markFailNoRetry(task);

+			}

+		} else if (status < 500) {

+			StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), status, location);

+			StatusLog.logExp(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), "notRetryable", task.getAttempts());

+			markFailNoRetry(task);

+		} else {

+			StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), status, location);

+			markFailWithRetry(task);

+		}

+	}

+	/**

+	 *	Delivery failed by reason of an exception

+	 */

+	public void reportException(DeliveryTask task, Exception exception) {

+		StatusLog.logDel(task.getPublishId(), task.getFeedId(), task.getSubId(), task.getURL(), task.getMethod(), task.getCType(), task.getLength(), di.getAuthUser(), -1, exception.toString());

+		dqh.handleUnreachable(di);

+		markFailWithRetry(task);

+	}

+	/**

+	 *	Get the feed ID for a subscription

+	 *	@param subid	The subscription ID

+	 *	@return	The feed ID

+	 */

+	public String getFeedId(String subid) {

+		return(dqh.getFeedId(subid));

+	}

+	/**

+	 *	Get the URL to deliver a message to given the file ID

+	 */

+	public String getDestURL(String fileid) {

+		return(dqh.getDestURL(di, fileid));

+	}

+	/**

+	 *	Deliver files until there's a failure or there are no more

+	 *	files to deliver

+	 */

+	public void run() {

+		DeliveryTask t;

+		long endtime = System.currentTimeMillis() + dqh.getFairTimeLimit();

+		int filestogo = dqh.getFairFileLimit();

+		while ((t = getNext()) != null) {

+			t.run();

+			if (--filestogo <= 0 || System.currentTimeMillis() > endtime) {

+				break;

+			}

+		}

+	}

+	/**

+	 *	Is there no work to do for this queue right now?

+	 */

+	public synchronized boolean isSkipSet() {

+		return(peekNext() == null);

+	}

+	/**

+	 *	Reset the retry timer

+	 */

+	public void resetQueue() {

+		resumetime = System.currentTimeMillis();

+	}

+}

diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/DeliveryQueueHelper.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/DeliveryQueueHelper.java
new file mode 100644
index 0000000..770db1d
--- /dev/null
+++ b/datarouter-node/src/main/java/com/att/research/datarouter/node/DeliveryQueueHelper.java
@@ -0,0 +1,89 @@
+/*******************************************************************************

+ * ============LICENSE_START==================================================

+ * * org.onap.dmaap

+ * * ===========================================================================

+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.

+ * * ===========================================================================

+ * * Licensed under the Apache License, Version 2.0 (the "License");

+ * * you may not use this file except in compliance with the License.

+ * * You may obtain a copy of the License at

+ * * 

+ *  *      http://www.apache.org/licenses/LICENSE-2.0

+ * * 

+ *  * Unless required by applicable law or agreed to in writing, software

+ * * distributed under the License is distributed on an "AS IS" BASIS,

+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * * See the License for the specific language governing permissions and

+ * * limitations under the License.

+ * * ============LICENSE_END====================================================

+ * *

+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.

+ * *

+ ******************************************************************************/

+

+

+package com.att.research.datarouter.node;

+

+/**

+ *	Interface to allow independent testing of the DeliveryQueue code

+ *	<p>

+ *	This interface represents all of the configuration information and

+ *	feedback mechanisms that a delivery queue needs.

+ */

+public interface	DeliveryQueueHelper	{

+	/**

+	 *	Get the timeout (milliseconds) before retrying after an initial delivery failure

+	 */

+	public long getInitFailureTimer();

+	/**

+	 *	Get the ratio between timeouts on consecutive delivery attempts

+	 */

+	public double	getFailureBackoff();

+	/**

+	 *	Get the maximum timeout (milliseconds) between delivery attempts

+	 */

+	public long	getMaxFailureTimer();

+	/**

+	 *	Get the expiration timer (milliseconds) for deliveries

+	 */

+	public long	getExpirationTimer();

+	/**

+	 *	Get the maximum number of file delivery attempts before checking

+	 *	if another queue has work to be performed.

+	 */

+	public int getFairFileLimit();

+	/**

+	 *	Get the maximum amount of time spent delivering files before checking if another queue has work to be performed.

+	 */

+	public long getFairTimeLimit();

+	/**

+	 *	Get the URL for delivering a file

+	 *	@param dest	The destination information for the file to be delivered.

+	 *	@param fileid	The file id for the file to be delivered.

+	 *	@return	The URL for delivering the file (typically, dest.getURL() + "/" + fileid).

+	 */

+	public String	getDestURL(DestInfo dest, String fileid);

+	/**

+	 *	Forget redirections associated with a subscriber

+	 *	@param	dest	Destination information to forget

+	 */

+	public void	handleUnreachable(DestInfo dest);

+	/**

+	 *	Post redirection for a subscriber

+	 *	@param	dest	Destination information to update

+	 *	@param	location	Location given by subscriber

+	 *	@param	fileid	File ID of request

+	 *	@return	true if this 3xx response is retryable, otherwise, false.

+	 */

+	public boolean	handleRedirection(DestInfo dest, String location, String fileid);

+	/**

+	 *	Should I handle 3xx responses differently than 4xx responses?

+	 */

+	public boolean	isFollowRedirects();

+	/**

+	 *	Get the feed ID for a subscription

+	 *	@param subid	The subscription ID

+	 *	@return	The feed ID

+	 */

+	public String getFeedId(String subid);

+}

diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/DeliveryTask.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/DeliveryTask.java
new file mode 100644
index 0000000..3d72a41
--- /dev/null
+++ b/datarouter-node/src/main/java/com/att/research/datarouter/node/DeliveryTask.java
@@ -0,0 +1,308 @@
+/*******************************************************************************

+ * ============LICENSE_START==================================================

+ * * org.onap.dmaap

+ * * ===========================================================================

+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.

+ * * ===========================================================================

+ * * Licensed under the Apache License, Version 2.0 (the "License");

+ * * you may not use this file except in compliance with the License.

+ * * You may obtain a copy of the License at

+ * * 

+ *  *      http://www.apache.org/licenses/LICENSE-2.0

+ * * 

+ *  * Unless required by applicable law or agreed to in writing, software

+ * * distributed under the License is distributed on an "AS IS" BASIS,

+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * * See the License for the specific language governing permissions and

+ * * limitations under the License.

+ * * ============LICENSE_END====================================================

+ * *

+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.

+ * *

+ ******************************************************************************/

+

+

+package com.att.research.datarouter.node;

+

+import java.io.*;

+import java.net.*;

+import java.util.*;

+import org.apache.log4j.Logger;

+

+/**

+ *	A file to be delivered to a destination.

+ *	<p>

+ *	A Delivery task represents a work item for the data router - a file that

+ *	needs to be delivered and provides mechanisms to get information about

+ *	the file and its delivery data as well as to attempt delivery.

+ */

+public class DeliveryTask implements Runnable, Comparable<DeliveryTask>	{

+	private static Logger logger = Logger.getLogger("com.att.research.datarouter.node.DeliveryTask");

+	private DeliveryTaskHelper	dth;

+	private String	pubid;

+	private	DestInfo	di;

+	private String	spool;

+	private File	datafile;

+	private File	metafile;

+	private long	length;

+	private long	date;

+	private String	method;

+	private String	fileid;

+	private String	ctype;

+	private String	url;

+	private String	feedid;

+	private String	subid;

+	private int	attempts;

+	private String[][]	hdrs;

+	/**

+	 *	Is the object a DeliveryTask with the same publication ID?

+	 */

+	public boolean equals(Object o) {

+		if (!(o instanceof DeliveryTask)) {

+			return(false);

+		}

+		return(pubid.equals(((DeliveryTask)o).pubid));

+	}

+	/**

+	 *	Compare the publication IDs.

+	 */

+	public int compareTo(DeliveryTask o) {

+		return(pubid.compareTo(o.pubid));

+	}

+	/**

+	 *	Get the hash code of the publication ID.

+	 */

+	public int hashCode() {

+		return(pubid.hashCode());

+	}

+	/**

+	 *	Return the publication ID.

+	 */

+	public String toString() {

+		return(pubid);

+	}

+	/**

+	 *	Create a delivery task for a given delivery queue and pub ID

+	 *	@param	dth	The delivery task helper for the queue this task is in.

+	 *	@param	pubid	The publish ID for this file.  This is used as

+	 *	the base for the file name in the spool directory and is of

+	 *	the form <milliseconds since 1970>.<fqdn of initial data router node>

+	 */

+	public DeliveryTask(DeliveryTaskHelper dth, String pubid) {

+		this.dth = dth;

+		this.pubid = pubid;

+		di = dth.getDestInfo();

+		subid = di.getSubId();

+		feedid = di.getLogData();

+		spool = di.getSpool();

+		String dfn = spool + "/" + pubid;

+		String mfn = dfn + ".M";

+		datafile = new File(spool + "/" + pubid);

+		metafile = new File(mfn);

+		boolean monly = di.isMetaDataOnly();

+		date = Long.parseLong(pubid.substring(0, pubid.indexOf('.')));

+		Vector<String[]> hdrv = new Vector<String[]>();

+		try {

+			BufferedReader br = new BufferedReader(new FileReader(metafile));

+			String s = br.readLine();

+			int i = s.indexOf('\t');

+			method = s.substring(0, i);

+			if (!"DELETE".equals(method) && !monly) {

+				length = datafile.length();

+			}

+			fileid = s.substring(i + 1);

+			while ((s = br.readLine()) != null) {

+				i = s.indexOf('\t');

+				String h = s.substring(0, i);

+				String v = s.substring(i + 1);

+				if ("x-att-dr-routing".equalsIgnoreCase(h)) {

+					subid = v.replaceAll("[^ ]*/", "");

+					feedid = dth.getFeedId(subid.replaceAll(" .*", ""));

+				}

+				if (length == 0 && h.toLowerCase().startsWith("content-")) {

+					continue;

+				}

+				if (h.equalsIgnoreCase("content-type")) {

+					ctype = v;

+				}

+				hdrv.add(new String[] {h, v});

+			}

+			br.close();

+		} catch (Exception e) {

+		}

+		hdrs = hdrv.toArray(new String[hdrv.size()][]);

+		url = dth.getDestURL(fileid);

+	}

+	/**

+	 *	Get the publish ID

+	 */

+	public String getPublishId() {

+		return(pubid);

+	}

+	/**

+	 *	Attempt delivery

+	 */

+	public void run() {

+		attempts++;

+		try {

+			di = dth.getDestInfo();

+			boolean expect100 = di.isUsing100();

+			boolean monly = di.isMetaDataOnly();

+			length = 0;

+			if (!"DELETE".equals(method) && !monly) {

+				length = datafile.length();

+			}

+			url = dth.getDestURL(fileid);

+			URL u = new URL(url);

+			HttpURLConnection uc = (HttpURLConnection)u.openConnection();

+			uc.setConnectTimeout(60000);

+			uc.setReadTimeout(60000);

+			uc.setInstanceFollowRedirects(false);

+			uc.setRequestMethod(method);

+			uc.setRequestProperty("Content-Length", Long.toString(length));

+			uc.setRequestProperty("Authorization", di.getAuth());

+			uc.setRequestProperty("X-ATT-DR-PUBLISH-ID", pubid);

+			for (String[] nv: hdrs) {

+				uc.addRequestProperty(nv[0], nv[1]);

+			}

+			if (length > 0) {

+				if (expect100) {

+					uc.setRequestProperty("Expect", "100-continue");

+				}

+				uc.setFixedLengthStreamingMode(length);

+				uc.setDoOutput(true);

+				OutputStream os = null;

+				try {

+					os = uc.getOutputStream();

+				} catch (ProtocolException pe) {

+					dth.reportDeliveryExtra(this, -1L);

+					// Rcvd error instead of 100-continue

+				}

+				if (os != null) {

+					long sofar = 0;

+					try {

+						byte[] buf = new byte[1024 * 1024];

+						InputStream is = new FileInputStream(datafile);

+						while (sofar < length) {

+							int i = buf.length;

+							if (sofar + i > length) {

+								i = (int)(length - sofar);

+							}

+							i = is.read(buf, 0, i);

+							if (i <= 0) {

+								throw new IOException("Unexpected problem reading data file " + datafile);

+							}

+							sofar += i;

+							os.write(buf, 0, i);

+						}

+						is.close();

+						os.close();

+					} catch (IOException ioe) {

+						dth.reportDeliveryExtra(this, sofar);

+						throw ioe;

+					}

+				}

+			}

+			int rc = uc.getResponseCode();

+			String rmsg = uc.getResponseMessage();

+			if (rmsg == null) {

+				String h0 = uc.getHeaderField(0);

+				if (h0 != null) {

+					int i = h0.indexOf(' ');

+					int j = h0.indexOf(' ', i + 1);

+					if (i != -1 && j != -1) {

+						rmsg = h0.substring(j + 1);

+					}

+				}

+			}

+			String xpubid = null;

+			InputStream is;

+			if (rc >= 200 && rc <= 299) {

+				is = uc.getInputStream();

+				xpubid = uc.getHeaderField("X-ATT-DR-PUBLISH-ID");

+			} else {

+				if (rc >= 300 && rc <= 399) {

+					rmsg = uc.getHeaderField("Location");

+				}

+				is = uc.getErrorStream();

+			}

+			byte[] buf = new byte[4096];

+			if (is != null) {

+				while (is.read(buf) > 0) {

+				}

+				is.close();

+			}

+			dth.reportStatus(this, rc, xpubid, rmsg);

+		} catch (Exception e) {

+			dth.reportException(this, e);

+		}

+	}

+	/**

+	 *	Remove meta and data files

+	 */

+	public void clean() {

+		datafile.delete();

+		metafile.delete();

+		hdrs = null;

+	}

+	/**

+	 *	Has this delivery task been cleaned?

+	 */

+	public boolean isCleaned() {

+		return(hdrs == null);

+	}

+	/**

+	 *	Get length of body

+	 */

+	public long	getLength() {

+		return(length);

+	}

+	/**

+	 *	Get creation date as encoded in the publish ID.

+	 */

+	public long	getDate() {

+		return(date);

+	}

+	/**

+	 *	Get the most recent delivery attempt URL

+	 */

+	public String getURL() {

+		return(url);

+	}

+	/**

+	 *	Get the content type

+	 */

+	public String	getCType() {

+		return(ctype);

+	}

+	/**

+	 *	Get the method

+	 */

+	public String	getMethod() {

+		return(method);

+	}

+	/**

+	 *	Get the file ID

+	 */

+	public String	getFileId() {

+		return(fileid);

+	}

+	/**

+	 *	Get the number of delivery attempts

+	 */

+	public int	getAttempts() {

+		return(attempts);

+	}

+	/**

+	 *	Get the (space delimited list of) subscription ID for this delivery task

+	 */

+	public String	getSubId() {

+		return(subid);

+	}

+	/**

+	 *	Get the feed ID for this delivery task

+	 */

+	public String	getFeedId() {

+		return(feedid);

+	}

+}

diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/DeliveryTaskHelper.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/DeliveryTaskHelper.java
new file mode 100644
index 0000000..702bb29
--- /dev/null
+++ b/datarouter-node/src/main/java/com/att/research/datarouter/node/DeliveryTaskHelper.java
@@ -0,0 +1,72 @@
+/*******************************************************************************

+ * ============LICENSE_START==================================================

+ * * org.onap.dmaap

+ * * ===========================================================================

+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.

+ * * ===========================================================================

+ * * Licensed under the Apache License, Version 2.0 (the "License");

+ * * you may not use this file except in compliance with the License.

+ * * You may obtain a copy of the License at

+ * * 

+ *  *      http://www.apache.org/licenses/LICENSE-2.0

+ * * 

+ *  * Unless required by applicable law or agreed to in writing, software

+ * * distributed under the License is distributed on an "AS IS" BASIS,

+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * * See the License for the specific language governing permissions and

+ * * limitations under the License.

+ * * ============LICENSE_END====================================================

+ * *

+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.

+ * *

+ ******************************************************************************/

+

+

+package com.att.research.datarouter.node;

+

+/**

+ *	Interface to allow independent testing of the DeliveryTask code.

+ *	<p>

+ *	This interface represents all the configuraiton information and

+ *	feedback mechanisms that a delivery task needs.

+ */

+

+public interface DeliveryTaskHelper	{

+	/**

+	 *	Report that a delivery attempt failed due to an exception (like can't connect to remote host)

+	 *	@param task	The task that failed

+	 *	@param exception	The exception that occurred

+	 */

+	public void reportException(DeliveryTask task, Exception exception);

+	/**

+	 *	Report that a delivery attempt completed (successfully or unsuccessfully)

+	 *	@param task	The task that failed

+	 *	@param status	The HTTP status

+	 *	@param xpubid	The publish ID from the far end (if any)

+	 *	@param location	The redirection location for a 3XX response

+	 */

+	public void reportStatus(DeliveryTask task, int status, String xpubid, String location);

+	/**

+	 *	Report that a delivery attempt either failed while sending data or that an error was returned instead of a 100 Continue.

+	 *	@param task	The task that failed

+	 *	@param sent	The number of bytes sent or -1 if an error was returned instead of 100 Continue.

+	 */

+	public void reportDeliveryExtra(DeliveryTask task, long sent);

+	/**

+	 *	Get the destination information for the delivery queue

+	 *	@return	The destination information

+	 */

+	public DestInfo getDestInfo();

+	/**

+	 *	Given a file ID, get the URL to deliver to

+	 *	@param fileid	The file id

+	 *	@return	The URL to deliver to

+	 */

+	public String	getDestURL(String fileid);

+	/**

+	 *	Get the feed ID for a subscription

+	 *	@param subid	The subscription ID

+	 *	@return	The feed iD

+	 */

+	public String	getFeedId(String subid);

+}

diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/DestInfo.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/DestInfo.java
new file mode 100644
index 0000000..e57fef8
--- /dev/null
+++ b/datarouter-node/src/main/java/com/att/research/datarouter/node/DestInfo.java
@@ -0,0 +1,132 @@
+/*******************************************************************************

+ * ============LICENSE_START==================================================

+ * * org.onap.dmaap

+ * * ===========================================================================

+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.

+ * * ===========================================================================

+ * * Licensed under the Apache License, Version 2.0 (the "License");

+ * * you may not use this file except in compliance with the License.

+ * * You may obtain a copy of the License at

+ * * 

+ *  *      http://www.apache.org/licenses/LICENSE-2.0

+ * * 

+ *  * Unless required by applicable law or agreed to in writing, software

+ * * distributed under the License is distributed on an "AS IS" BASIS,

+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * * See the License for the specific language governing permissions and

+ * * limitations under the License.

+ * * ============LICENSE_END====================================================

+ * *

+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.

+ * *

+ ******************************************************************************/

+

+

+package com.att.research.datarouter.node;

+

+/**

+ *	Information for a delivery destination that doesn't change from message to message

+ */

+public class DestInfo	{

+	private String	name;

+	private String	spool;

+	private String	subid;

+	private String	logdata;

+	private String	url;

+	private String	authuser;

+	private String	authentication;

+	private boolean	metaonly;

+	private boolean	use100;

+	/**

+	 *	Create a destination information object.

+	 *	@param	name	n:fqdn or s:subid

+	 *	@param	spool	The directory where files are spooled.

+	 *	@param	subid	The subscription ID (if applicable).

+	 *	@param	logdata	Text to be included in log messages

+	 *	@param	url	The URL to deliver to.

+	 *	@param	authuser	The auth user for logging.

+	 *	@param	authentication	The credentials.

+	 *	@param	metaonly	Is this a metadata only delivery?

+	 *	@param	use100	Should I use expect 100-continue?

+	 */

+	public DestInfo(String name, String spool, String subid, String logdata, String url, String authuser, String authentication, boolean metaonly, boolean use100) {

+		this.name = name;

+		this.spool = spool;

+		this.subid = subid;

+		this.logdata = logdata;

+		this.url = url;

+		this.authuser = authuser;

+		this.authentication = authentication;

+		this.metaonly = metaonly;

+		this.use100 = use100;

+	}

+	public boolean equals(Object o) {

+		return((o instanceof DestInfo) && ((DestInfo)o).spool.equals(spool));

+	}

+	public int hashCode() {

+		return(spool.hashCode());

+	}

+	/**

+	 *	Get the name of this destination

+	 */

+	public String getName() {

+		return(name);

+	}

+	/**

+	 *	Get the spool directory for this destination.

+	 *	@return	The spool directory

+	 */

+	public String getSpool() {

+		return(spool);

+	}

+	/**

+	 *	Get the subscription ID.

+	 *	@return	Subscription ID or null if this is a node to node delivery.

+	 */

+	public String getSubId() {

+		return(subid);

+	}

+	/**

+	 *	Get the log data.

+	 *	@return	Text to be included in a log message about delivery attempts.

+	 */

+	public String getLogData() {

+		return(logdata);

+	}

+	/**

+	 *	Get the delivery URL.

+	 *	@return	The URL to deliver to (the primary URL).

+	 */

+	public String getURL() {

+		return(url);

+

+	}

+	/**

+	 *	Get the user for authentication

+	 *	@return	The name of the user for logging

+	 */

+	public String	getAuthUser() {

+		return(authuser);

+	}

+	/**

+	 *	Get the authentication header

+	 *	@return	The string to use to authenticate to the recipient.

+	 */

+	public String getAuth() {

+		return(authentication);

+	}

+	/**

+	 *	Is this a metadata only delivery?

+	 *	@return	True if this is a metadata only delivery

+	 */

+	public boolean	isMetaDataOnly() {

+		return(metaonly);

+	}

+	/**

+	 *	Should I send expect 100-continue header?

+	 *	@return	True if I should.

+	 */

+	public boolean isUsing100() {

+		return(use100);

+	}

+}

diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/IsFrom.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/IsFrom.java
new file mode 100644
index 0000000..bb3e413
--- /dev/null
+++ b/datarouter-node/src/main/java/com/att/research/datarouter/node/IsFrom.java
@@ -0,0 +1,82 @@
+/*******************************************************************************

+ * ============LICENSE_START==================================================

+ * * org.onap.dmaap

+ * * ===========================================================================

+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.

+ * * ===========================================================================

+ * * Licensed under the Apache License, Version 2.0 (the "License");

+ * * you may not use this file except in compliance with the License.

+ * * You may obtain a copy of the License at

+ * * 

+ *  *      http://www.apache.org/licenses/LICENSE-2.0

+ * * 

+ *  * Unless required by applicable law or agreed to in writing, software

+ * * distributed under the License is distributed on an "AS IS" BASIS,

+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * * See the License for the specific language governing permissions and

+ * * limitations under the License.

+ * * ============LICENSE_END====================================================

+ * *

+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.

+ * *

+ ******************************************************************************/

+

+

+package com.att.research.datarouter.node;

+

+import java.util.*;

+import java.net.*;

+

+/**

+ *	Determine if an IP address is from a machine

+ */

+public class IsFrom	{

+	private long	nextcheck;

+	private String[] ips;

+	private String	fqdn;

+	/**

+	 *	Configure the JVM DNS cache to have a 10 second TTL.  This needs to be called very very early or it won't have any effect.

+	 */

+	public static void setDNSCache() {

+		java.security.Security.setProperty("networkaddress.cache.ttl", "10");

+	}

+	/**

+	 *	Create an IsFrom for the specified fully qualified domain name.

+	 */

+	public IsFrom(String fqdn) {

+		this.fqdn = fqdn;

+	}

+	/**

+	 *	Check if an IP address matches.  If it has been more than

+	 *	10 seconds since DNS was last checked for changes to the

+	 *	IP address(es) of this FQDN, check again.  Then check

+	 *	if the specified IP address belongs to the FQDN.

+	 */

+	public synchronized boolean isFrom(String ip) {

+		long now = System.currentTimeMillis();

+		if (now > nextcheck) {

+			nextcheck = now + 10000;

+			Vector<String> v = new Vector<String>();

+			try {

+				InetAddress[] addrs = InetAddress.getAllByName(fqdn);

+				for (InetAddress a: addrs) {

+					v.add(a.getHostAddress());

+				}

+			} catch (Exception e) {

+			}

+			ips = v.toArray(new String[v.size()]);

+		}

+		for (String s: ips) {

+			if (s.equals(ip)) {

+				return(true);

+			}

+		}

+		return(false);

+	}

+	/**

+	 *	Return the fully qualified domain name

+	 */

+	public String toString() {

+		return(fqdn);

+	}

+}

diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/LogManager.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/LogManager.java
new file mode 100644
index 0000000..078deaa
--- /dev/null
+++ b/datarouter-node/src/main/java/com/att/research/datarouter/node/LogManager.java
@@ -0,0 +1,159 @@
+/*******************************************************************************

+ * ============LICENSE_START==================================================

+ * * org.onap.dmaap

+ * * ===========================================================================

+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.

+ * * ===========================================================================

+ * * Licensed under the Apache License, Version 2.0 (the "License");

+ * * you may not use this file except in compliance with the License.

+ * * You may obtain a copy of the License at

+ * * 

+ *  *      http://www.apache.org/licenses/LICENSE-2.0

+ * * 

+ *  * Unless required by applicable law or agreed to in writing, software

+ * * distributed under the License is distributed on an "AS IS" BASIS,

+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * * See the License for the specific language governing permissions and

+ * * limitations under the License.

+ * * ============LICENSE_END====================================================

+ * *

+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.

+ * *

+ ******************************************************************************/

+package com.att.research.datarouter.node;

+

+import java.util.*;

+import java.util.regex.*;

+import java.io.*;

+import java.nio.file.*;

+import java.text.*;

+

+/**

+ *	Cleanup of old log files.

+ *	<p>

+ *	Periodically scan the log directory for log files that are older than

+ *	the log file retention interval, and delete them.  In a future release,

+ *	This class will also be responsible for uploading events logs to the

+ *	log server to support the log query APIs.

+ */

+

+public class LogManager	extends TimerTask	{

+	private NodeConfigManager	config;

+	private Matcher	isnodelog;

+	private Matcher	iseventlog;

+	private Uploader	worker;

+	private String	uploaddir;

+	private String	logdir;

+	private class Uploader extends Thread implements DeliveryQueueHelper {

+		public long getInitFailureTimer() { return(10000L); }

+		public double getFailureBackoff() { return(2.0); }

+		public long getMaxFailureTimer() { return(150000L); }

+		public long getExpirationTimer() { return(604800000L); }

+		public int getFairFileLimit() { return(10000); }

+		public long getFairTimeLimit() { return(86400000); }

+		public String getDestURL(DestInfo dest, String fileid) {

+			return(config.getEventLogUrl());

+		}

+		public void handleUnreachable(DestInfo dest) {}

+		public boolean handleRedirection(DestInfo dest, String location, String fileid) { return(false); }

+		public boolean isFollowRedirects() { return(false); }

+		public String getFeedId(String subid) { return(null); }

+		private DeliveryQueue dq;

+		public Uploader() {

+			dq = new DeliveryQueue(this, new DestInfo("LogUpload", uploaddir, null, null, null, config.getMyName(), config.getMyAuth(), false, false));

+			setDaemon(true);

+			setName("Log Uploader");

+			start();

+		}

+		private synchronized void snooze() {

+			try {

+				wait(10000);

+			} catch (Exception e) {

+			}

+		}

+		private synchronized void poke() {

+			notify();

+		}

+		public void run() {

+			while (true) {

+				scan();

+				dq.run();

+				snooze();

+			}

+		}

+		private void scan() {

+			long threshold = System.currentTimeMillis() - config.getLogRetention();

+			File dir = new File(logdir);

+			String[] fns = dir.list();

+			Arrays.sort(fns);

+			String lastqueued = "events-000000000000.log";

+			String curlog = StatusLog.getCurLogFile();

+			curlog = curlog.substring(curlog.lastIndexOf('/') + 1);

+			try {

+				Writer w = new FileWriter(uploaddir + "/.meta");

+				w.write("POST\tlogdata\nContent-Type\ttext/plain\n");

+				w.close();

+				BufferedReader br = new BufferedReader(new FileReader(uploaddir + "/.lastqueued"));

+				lastqueued = br.readLine();

+				br.close();

+			} catch (Exception e) {

+			}

+			for (String fn: fns) {

+				if (!isnodelog.reset(fn).matches()) {

+					if (!iseventlog.reset(fn).matches()) {

+						continue;

+					}

+					if (lastqueued.compareTo(fn) < 0 && curlog.compareTo(fn) > 0) {

+						lastqueued = fn;

+						try {

+							String pid = config.getPublishId();

+							Files.createLink(Paths.get(uploaddir + "/" + pid), Paths.get(logdir + "/" + fn));

+							Files.createLink(Paths.get(uploaddir + "/" + pid + ".M"), Paths.get(uploaddir + "/.meta"));

+						} catch (Exception e) {

+						}

+					}

+				}

+				File f = new File(dir, fn);

+				if (f.lastModified() < threshold) {

+					f.delete();

+				}

+			}

+			try {

+				(new File(uploaddir + "/.meta")).delete();

+				Writer w = new FileWriter(uploaddir + "/.lastqueued");

+				w.write(lastqueued + "\n");

+				w.close();

+			} catch (Exception e) {

+			}

+		}

+	}

+	/**

+	 *	Construct a log manager

+	 *	<p>

+	 *	The log manager will check for expired log files every 5 minutes

+	 *	at 20 seconds after the 5 minute boundary.  (Actually, the

+	 *	interval is the event log rollover interval, which

+	 *	defaults to 5 minutes).

+	 */

+	public LogManager(NodeConfigManager config) {

+		this.config = config;

+		try {

+			isnodelog = Pattern.compile("node\\.log\\.\\d{8}").matcher("");

+			iseventlog = Pattern.compile("events-\\d{12}\\.log").matcher("");

+		} catch (Exception e) {}

+		logdir = config.getLogDir();

+		uploaddir = logdir + "/.spool";

+		(new File(uploaddir)).mkdirs();

+		long now = System.currentTimeMillis();

+		long intvl = StatusLog.parseInterval(config.getEventLogInterval(), 300000);

+		long when = now - now % intvl + intvl + 20000L;

+		config.getTimer().scheduleAtFixedRate(this, when - now, intvl);

+		worker = new Uploader();

+	}

+	/**

+	 *	Trigger check for expired log files and log files to upload

+	 */

+	public void run() {

+		worker.poke();

+	}

+}

diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeConfig.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeConfig.java
new file mode 100644
index 0000000..689f765
--- /dev/null
+++ b/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeConfig.java
@@ -0,0 +1,722 @@
+/*******************************************************************************

+ * ============LICENSE_START==================================================

+ * * org.onap.dmaap

+ * * ===========================================================================

+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.

+ * * ===========================================================================

+ * * Licensed under the Apache License, Version 2.0 (the "License");

+ * * you may not use this file except in compliance with the License.

+ * * You may obtain a copy of the License at

+ * * 

+ *  *      http://www.apache.org/licenses/LICENSE-2.0

+ * * 

+ *  * Unless required by applicable law or agreed to in writing, software

+ * * distributed under the License is distributed on an "AS IS" BASIS,

+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * * See the License for the specific language governing permissions and

+ * * limitations under the License.

+ * * ============LICENSE_END====================================================

+ * *

+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.

+ * *

+ ******************************************************************************/

+

+

+package com.att.research.datarouter.node;

+

+import java.util.*;

+import java.io.*;

+

+/**

+ *	Processed configuration for this node.

+ *	<p>

+ *	The NodeConfig represents a processed configuration from the Data Router

+ *	provisioning server.  Each time configuration data is received from the

+ *	provisioning server, a new NodeConfig is created and the previous one

+ *	discarded.

+ */

+public class NodeConfig	{

+	/**

+	 *	Raw configuration entry for a data router node

+	 */

+	public static class ProvNode {

+		private String cname;

+		/**

+		 *	Construct a node configuration entry.

+		 *	@param cname	The cname of the node.

+		 */

+		public ProvNode(String cname) {

+			this.cname = cname;

+		}

+		/**

+		 *	Get the cname of the node

+		 */

+		public String getCName() {

+			return(cname);

+		}

+	}

+	/**

+	 *	Raw configuration entry for a provisioning parameter

+	 */

+	public static class ProvParam {

+		private String name;

+		private String value;

+		/**

+		 *	Construct a provisioning parameter configuration entry.

+		 *	@param	name The name of the parameter.

+		 *	@param	value The value of the parameter.

+		 */

+		public ProvParam(String name, String value) {

+			this.name = name;

+			this.value = value;

+		}

+		/**

+		 *	Get the name of the parameter.

+		 */

+		public String getName() {

+			return(name);

+		}

+		/**

+		 *	Get the value of the parameter.

+		 */

+		public String getValue() {

+			return(value);

+		}

+	}

+	/**

+	 *	Raw configuration entry for a data feed.

+	 */

+	public static class ProvFeed {

+		private String id;

+		private String logdata;

+		private String status;

+		/**

+		 *	Construct a feed configuration entry.

+		 *	@param id	The feed ID of the entry.

+		 *	@param logdata	String for log entries about the entry.

+		 *	@param status	The reason why this feed cannot be used (Feed has been deleted, Feed has been suspended) or null if it is valid.

+		 */

+		public ProvFeed(String id, String logdata, String status) {

+			this.id = id;

+			this.logdata = logdata;

+			this.status = status;

+		}

+		/**

+		 *	Get the feed id of the data feed.

+		 */

+		public String getId() {

+			return(id);

+		}

+		/**

+		 *	Get the log data of the data feed.

+		 */

+		public String getLogData() {

+			return(logdata);

+		}

+		/**

+		 *	Get the status of the data feed.

+		 */

+		public String getStatus() {

+			return(status);

+		}

+	}

+	/**

+	 *	Raw configuration entry for a feed user.

+	 */

+	public static class ProvFeedUser	{

+		private String feedid;

+		private String user;

+		private String credentials;

+		/**

+		 *	Construct a feed user configuration entry

+		 *	@param feedid	The feed id.

+		 *	@param user	The user that will publish to the feed.

+		 *	@param credentials	The Authorization header the user will use to publish.

+		 */

+		public ProvFeedUser(String feedid, String user, String credentials) {

+			this.feedid = feedid;

+			this.user = user;

+			this.credentials = credentials;

+		}

+		/**

+		 *	Get the feed id of the feed user.

+		 */

+		public String getFeedId() {

+			return(feedid);

+		}

+		/**

+		 *	Get the user for the feed user.

+		 */

+		public String getUser() {

+			return(user);

+		}

+		/**

+		 *	Get the credentials for the feed user.

+		 */

+		public String getCredentials() {

+			return(credentials);

+		}

+	}

+	/**

+	 *	Raw configuration entry for a feed subnet

+	 */

+	public static class ProvFeedSubnet	{

+		private String feedid;

+		private String cidr;

+		/**

+		 *	Construct a feed subnet configuration entry

+		 *	@param feedid	The feed ID

+		 *	@param cidr	The CIDR allowed to publish to the feed.

+		 */

+		public ProvFeedSubnet(String feedid, String cidr) {

+			this.feedid = feedid;

+			this.cidr = cidr;

+		}

+		/**

+		 *	Get the feed id of the feed subnet.

+		 */

+		public String getFeedId() {

+			return(feedid);

+		}

+		/**

+		 *	Get the CIDR of the feed subnet.

+		 */

+		public String getCidr() {

+			return(cidr);

+		}

+	}

+	/**

+	 *	Raw configuration entry for a subscription

+	 */

+	public static class ProvSubscription	{

+		private String	subid;

+		private String	feedid;

+		private String	url;

+		private String	authuser;

+		private String	credentials;

+		private boolean	metaonly;

+		private boolean	use100;

+		/**

+		 *	Construct a subscription configuration entry

+		 *	@param subid	The subscription ID

+		 *	@param feedid	The feed ID

+		 *	@param url	The base delivery URL (not including the fileid)

+		 *	@param authuser	The user in the credentials used to deliver

+		 *	@param credentials	The credentials used to authenticate to the delivery URL exactly as they go in the Authorization header.

+		 *	@param metaonly	Is this a meta data only subscription?

+		 *	@param use100	Should we send Expect: 100-continue?

+		 */

+		public ProvSubscription(String subid, String feedid, String url, String authuser, String credentials, boolean metaonly, boolean use100) {

+			this.subid = subid;

+			this.feedid = feedid;

+			this.url = url;

+			this.authuser = authuser;

+			this.credentials = credentials;

+			this.metaonly = metaonly;

+			this.use100 = use100;

+		}

+		/**

+		 *	Get the subscription ID

+		 */

+		public String getSubId() {

+			return(subid);

+		}

+		/**

+		 *	Get the feed ID

+		 */

+		public String getFeedId() {

+			return(feedid);

+		}

+		/**

+		 *	Get the delivery URL

+		 */

+		public String getURL() {

+			return(url);

+		}

+		/**

+		 *	Get the user

+		 */

+		public String getAuthUser() {

+			return(authuser);

+		}

+		/**

+		 *	Get the delivery credentials

+		 */

+		public String getCredentials() {

+			return(credentials);

+		}

+		/**

+		 *	Is this a meta data only subscription?

+		 */

+		public boolean isMetaDataOnly() {

+			return(metaonly);

+		}

+		/**

+		 *	Should we send Expect: 100-continue?

+		 */

+		public boolean isUsing100() {

+			return(use100);

+		}

+	}

+	/**

+	 *	Raw configuration entry for controlled ingress to the data router node

+	 */

+	public static class ProvForceIngress	{

+		private String feedid;

+		private String subnet;

+		private String user;

+		private String[] nodes;

+		/**

+		 *	Construct a forced ingress configuration entry

+		 *	@param feedid	The feed ID that this entry applies to

+		 *	@param subnet	The CIDR for which publisher IP addresses this entry applies to or "" if it applies to all publisher IP addresses

+		 *	@param user	The publishing user this entry applies to or "" if it applies to all publishing users.

+		 *	@param nodes	The array of FQDNs of the data router nodes to redirect publication attempts to.

+		 */

+		public ProvForceIngress(String feedid, String subnet, String user, String[] nodes) {

+			this.feedid = feedid;

+			this.subnet = subnet;

+			this.user = user;

+			this.nodes = nodes;

+		}

+		/**

+		 *	Get the feed ID

+		 */

+		public String getFeedId() {

+			return(feedid);

+		}

+		/**

+		 *	Get the subnet

+		 */

+		public String getSubnet() {

+			return(subnet);

+		}

+		/**

+		 *	Get the user

+		 */

+		public String getUser() {

+			return(user);

+		}

+		/**

+		 *	Get the node

+		 */

+		public String[] getNodes() {

+			return(nodes);

+		}

+	}

+	/**

+	 *	Raw configuration entry for controlled egress from the data router

+	 */

+	public static class ProvForceEgress	{

+		private String subid;

+		private String node;

+		/**

+		 *	Construct a forced egress configuration entry

+		 *	@param subid	The subscription ID the subscription with forced egress

+		 *	@param node	The node handling deliveries for this subscription

+		 */

+		public ProvForceEgress(String subid, String node) {

+			this.subid = subid;

+			this.node = node;

+		}

+		/**

+		 *	Get the subscription ID

+		 */

+		public String getSubId() {

+			return(subid);

+		}

+		/**

+		 *	Get the node

+		 */

+		public String getNode() {

+			return(node);

+		}

+	}

+	/**

+	 *	Raw configuration entry for routing within the data router network

+	 */

+	public static class ProvHop	{

+		private String	from;

+		private String	to;

+		private String	via;

+		/**

+		 *	A human readable description of this entry

+		 */

+		public String toString() {

+			return("Hop " + from + "->" + to + " via " + via);

+		}

+		/**

+		 *	Construct a hop entry

+		 *	@param from	The FQDN of the node with the data to be delivered

+		 *	@param to	The FQDN of the node that will deliver to the subscriber

+		 *	@param via	The FQDN of the node where the from node should send the data

+		 */

+		public ProvHop(String from, String to, String via) {

+			this.from = from;

+			this.to = to;

+			this.via = via;

+		}

+		/**

+		 *	Get the from node

+		 */

+		public String getFrom() {

+			return(from);

+		}

+		/**

+		 *	Get the to node

+		 */

+		public String getTo() {

+			return(to);

+		}

+		/**

+		 *	Get the next intermediate node

+		 */

+		public String getVia() {

+			return(via);

+		}

+	}

+	private static class Redirection	{

+		public SubnetMatcher snm;

+		public String user;

+		public String[] nodes;

+	}

+	private static class Feed	{

+		public String	loginfo;

+		public String	status;

+		public SubnetMatcher[] subnets;

+		public Hashtable<String, String> authusers = new Hashtable<String, String>();

+		public Redirection[]	redirections;

+		public Target[]	targets;

+	}

+	private Hashtable<String, String> params = new Hashtable<String, String>();

+	private Hashtable<String, Feed>	feeds = new Hashtable<String, Feed>();

+	private Hashtable<String, DestInfo> nodeinfo = new Hashtable<String, DestInfo>();

+	private Hashtable<String, DestInfo> subinfo = new Hashtable<String, DestInfo>();

+	private Hashtable<String, IsFrom> nodes = new Hashtable<String, IsFrom>();

+	private String	myname;

+	private String	myauth;

+	private DestInfo[]	alldests;

+	private int	rrcntr;

+	/**

+	 *	Process the raw provisioning data to configure this node

+	 *	@param pd	The parsed provisioning data

+	 *	@param myname	My name as seen by external systems

+	 *	@param spooldir	The directory where temporary files live

+	 *	@param port	The port number for URLs

+	 *	@param nodeauthkey	The keying string used to generate node authentication credentials

+	 */

+	public NodeConfig(ProvData pd, String myname, String spooldir, int port, String nodeauthkey) {

+		this.myname = myname;

+		for (ProvParam p: pd.getParams()) {

+			params.put(p.getName(), p.getValue());

+		}

+		Vector<DestInfo>	div = new Vector<DestInfo>();

+		myauth = NodeUtils.getNodeAuthHdr(myname, nodeauthkey);

+		for (ProvNode pn: pd.getNodes()) {

+			String cn = pn.getCName();

+			if (nodeinfo.get(cn) != null) {

+				continue;

+			}

+			String auth = NodeUtils.getNodeAuthHdr(cn, nodeauthkey);

+			DestInfo di = new DestInfo("n:" + cn, spooldir + "/n/" + cn, null, "n2n-" + cn, "https://" + cn + ":" + port + "/internal/publish", cn, myauth, false, true);

+			(new File(di.getSpool())).mkdirs();

+			div.add(di);

+			nodeinfo.put(cn, di);

+			nodes.put(auth, new IsFrom(cn));

+		}

+		PathFinder pf = new PathFinder(myname, nodeinfo.keySet().toArray(new String[nodeinfo.size()]), pd.getHops());

+		Hashtable<String, Vector<Redirection>> rdtab = new Hashtable<String, Vector<Redirection>>();

+		for (ProvForceIngress pfi: pd.getForceIngress()) {

+			Vector<Redirection> v = rdtab.get(pfi.getFeedId());

+			if (v == null) {

+				v = new Vector<Redirection>();

+				rdtab.put(pfi.getFeedId(), v);

+			}

+			Redirection r = new Redirection();

+			if (pfi.getSubnet() != null) {

+				r.snm = new SubnetMatcher(pfi.getSubnet());

+			}

+			r.user = pfi.getUser();

+			r.nodes = pfi.getNodes();

+			v.add(r);

+		}

+		Hashtable<String, Hashtable<String, String>> pfutab = new Hashtable<String, Hashtable<String, String>>();

+		for (ProvFeedUser pfu: pd.getFeedUsers()) {

+			Hashtable<String, String> t = pfutab.get(pfu.getFeedId());

+			if (t == null) {

+				t = new Hashtable<String, String>();

+				pfutab.put(pfu.getFeedId(), t);

+			}

+			t.put(pfu.getCredentials(), pfu.getUser());

+		}

+		Hashtable<String, String> egrtab = new Hashtable<String, String>();

+		for (ProvForceEgress pfe: pd.getForceEgress()) {

+			if (pfe.getNode().equals(myname) || nodeinfo.get(pfe.getNode()) == null) {

+				continue;

+			}

+			egrtab.put(pfe.getSubId(), pfe.getNode());

+		}

+		Hashtable<String, Vector<SubnetMatcher>> pfstab = new Hashtable<String, Vector<SubnetMatcher>>();

+		for (ProvFeedSubnet pfs: pd.getFeedSubnets()) {

+			Vector<SubnetMatcher> v = pfstab.get(pfs.getFeedId());

+			if (v == null) {

+				v = new Vector<SubnetMatcher>();

+				pfstab.put(pfs.getFeedId(), v);

+			}

+			v.add(new SubnetMatcher(pfs.getCidr()));

+		}

+		Hashtable<String, StringBuffer> ttab = new Hashtable<String, StringBuffer>();

+		HashSet<String> allfeeds = new HashSet<String>();

+		for (ProvFeed pfx: pd.getFeeds()) {

+			if (pfx.getStatus() == null) {

+				allfeeds.add(pfx.getId());

+			}

+		}

+		for (ProvSubscription ps: pd.getSubscriptions()) {

+			String sid = ps.getSubId();

+			String fid = ps.getFeedId();

+			if (!allfeeds.contains(fid)) {

+				continue;

+			}

+			if (subinfo.get(sid) != null) {

+				continue;

+			}

+			int sididx = 999;

+			try {

+				sididx = Integer.parseInt(sid);

+				sididx -= sididx % 100;

+			} catch (Exception e) {

+			}

+			String siddir = sididx + "/" + sid;

+			DestInfo di = new DestInfo("s:" + sid, spooldir + "/s/" + siddir, sid, fid, ps.getURL(), ps.getAuthUser(), ps.getCredentials(), ps.isMetaDataOnly(), ps.isUsing100());

+			(new File(di.getSpool())).mkdirs();

+			div.add(di);

+			subinfo.put(sid, di);

+			String egr = egrtab.get(sid);

+			if (egr != null) {

+				sid = pf.getPath(egr) + sid;

+			}

+			StringBuffer sb = ttab.get(fid);

+			if (sb == null) {

+				sb = new StringBuffer();

+				ttab.put(fid, sb);

+			}

+			sb.append(' ').append(sid);

+		}

+		alldests = div.toArray(new DestInfo[div.size()]);

+		for (ProvFeed pfx: pd.getFeeds()) {

+			String fid = pfx.getId();

+			Feed f = feeds.get(fid);

+			if (f != null) {

+				continue;

+			}

+			f = new Feed();

+			feeds.put(fid, f);

+			f.loginfo = pfx.getLogData();

+			f.status = pfx.getStatus();

+			Vector<SubnetMatcher> v1 = pfstab.get(fid);

+			if (v1 == null) {

+				f.subnets = new SubnetMatcher[0];

+			} else {

+				f.subnets = v1.toArray(new SubnetMatcher[v1.size()]);

+			}

+			Hashtable<String, String> h1 = pfutab.get(fid);

+			if (h1 == null) {

+				h1 = new Hashtable<String, String>();

+			}

+			f.authusers = h1;

+			Vector<Redirection> v2 = rdtab.get(fid);

+			if (v2 == null) {

+				f.redirections = new Redirection[0];

+			} else {

+				f.redirections = v2.toArray(new Redirection[v2.size()]);

+			}

+			StringBuffer sb = ttab.get(fid);

+			if (sb == null) {

+				f.targets = new Target[0];

+			} else {

+				f.targets = parseRouting(sb.toString());

+			}

+		}

+	}

+	/**

+	 *	Parse a target string into an array of targets

+	 *	@param routing Target string

+	 *	@return	Array of targets.

+	 */

+	public Target[] parseRouting(String routing) {

+		routing = routing.trim();

+		if ("".equals(routing)) {

+			return(new Target[0]);

+		}

+		String[] xx = routing.split("\\s+");

+		Hashtable<String, Target> tmap = new Hashtable<String, Target>();

+		HashSet<String> subset = new HashSet<String>();

+		Vector<Target> tv = new Vector<Target>();

+		Target[] ret = new Target[xx.length];

+		for (int i = 0; i < xx.length; i++) {

+			String t = xx[i];

+			int j = t.indexOf('/');

+			if (j == -1) {

+				DestInfo di = subinfo.get(t);

+				if (di == null) {

+					tv.add(new Target(null, t));

+				} else {

+					if (!subset.contains(t)) {

+						subset.add(t);

+						tv.add(new Target(di, null));

+					}

+				}

+			} else {

+				String node = t.substring(0, j);

+				String rtg = t.substring(j + 1);

+				DestInfo di = nodeinfo.get(node);

+				if (di == null) {

+					tv.add(new Target(null, t));

+				} else {

+					Target tt = tmap.get(node);

+					if (tt == null) {

+						tt = new Target(di, rtg);

+						tmap.put(node, tt);

+						tv.add(tt);

+					} else {

+						tt.addRouting(rtg);

+					}

+				}

+			}

+		}

+		return(tv.toArray(new Target[tv.size()]));

+	}

+	/**

+	 *	Check whether this is a valid node-to-node transfer

+	 *	@param credentials	Credentials offered by the supposed node

+	 *	@param ip	IP address the request came from

+	 */

+	public boolean isAnotherNode(String credentials, String ip) {

+		IsFrom n = nodes.get(credentials);

+		return (n != null && n.isFrom(ip));

+	}

+	/**

+	 *	Check whether publication is allowed.

+	 *	@param feedid	The ID of the feed being requested.

+	 *	@param credentials	The offered credentials

+	 *	@param ip	The requesting IP address

+	 */

+	public String isPublishPermitted(String feedid, String credentials, String ip) {

+		Feed f = feeds.get(feedid);

+		String nf = "Feed does not exist";

+		if (f != null) {

+			nf = f.status;

+		}

+		if (nf != null) {

+			return(nf);

+		}

+		String user = f.authusers.get(credentials);

+		if (user == null) {

+			return("Publisher not permitted for this feed");

+		}

+		if (f.subnets.length == 0) {

+			return(null);

+		}

+		byte[] addr = NodeUtils.getInetAddress(ip);

+		for (SubnetMatcher snm: f.subnets) {

+			if (snm.matches(addr)) {

+				return(null);

+			}

+		}

+		return("Publisher not permitted for this feed");

+	}

+	/**

+	 *	Get authenticated user

+	 */

+	public String getAuthUser(String feedid, String credentials) {

+		return(feeds.get(feedid).authusers.get(credentials));

+	}

+	/**

+	 *	Check if the request should be redirected to a different ingress node

+	 */

+	public String getIngressNode(String feedid, String user, String ip) {

+		Feed f = feeds.get(feedid);

+		if (f.redirections.length == 0) {

+			return(null);

+		}

+		byte[] addr = NodeUtils.getInetAddress(ip);

+		for (Redirection r: f.redirections) {

+			if (r.user != null && !user.equals(r.user)) {

+				continue;

+			}

+			if (r.snm != null && !r.snm.matches(addr)) {

+				continue;

+			}

+			for (String n: r.nodes) {

+				if (myname.equals(n)) {

+					return(null);

+				}

+			}

+			if (r.nodes.length == 0) {

+				return(null);

+			}

+			return(r.nodes[rrcntr++ % r.nodes.length]);

+		}

+		return(null);

+	}

+	/**

+	 *	Get a provisioned configuration parameter

+	 */

+	public String getProvParam(String name) {

+		return(params.get(name));

+	}

+	/**

+	 *	Get all the DestInfos

+	 */

+	public DestInfo[]	getAllDests() {

+		return(alldests);

+	}

+	/**

+	 *	Get the targets for a feed

+	 *	@param feedid	The feed ID

+	 *	@return	The targets this feed should be delivered to

+	 */

+	public Target[] getTargets(String feedid) {

+		if (feedid == null) {

+			return(new Target[0]);

+		}

+		Feed f = feeds.get(feedid);

+		if (f == null) {

+			return(new Target[0]);

+		}

+		return(f.targets);

+	}

+	/**

+	 *	Get the feed ID for a subscription

+	 *	@param subid	The subscription ID

+	 *	@return	The feed ID

+	 */

+	public String getFeedId(String subid) {

+		DestInfo di = subinfo.get(subid);

+		if (di == null) {

+			return(null);

+		}

+		return(di.getLogData());

+	}

+	/**

+	 *	Get the spool directory for a subscription

+	 *	@param subid	The subscription ID

+	 *	@return The spool directory

+	 */

+	public String getSpoolDir(String subid) {

+		DestInfo di = subinfo.get(subid);

+		if (di == null) {

+			return(null);

+		}

+		return(di.getSpool());

+	}

+	/**

+	 *	Get the Authorization value this node uses

+	 *	@return The Authorization header value for this node

+	 */

+	public String getMyAuth() {

+		return(myauth);

+	}

+

+}

diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeConfigManager.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeConfigManager.java
new file mode 100644
index 0000000..01ca442
--- /dev/null
+++ b/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeConfigManager.java
@@ -0,0 +1,599 @@
+/*******************************************************************************

+ * ============LICENSE_START==================================================

+ * * org.onap.dmaap

+ * * ===========================================================================

+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.

+ * * ===========================================================================

+ * * Licensed under the Apache License, Version 2.0 (the "License");

+ * * you may not use this file except in compliance with the License.

+ * * You may obtain a copy of the License at

+ * * 

+ *  *      http://www.apache.org/licenses/LICENSE-2.0

+ * * 

+ *  * Unless required by applicable law or agreed to in writing, software

+ * * distributed under the License is distributed on an "AS IS" BASIS,

+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * * See the License for the specific language governing permissions and

+ * * limitations under the License.

+ * * ============LICENSE_END====================================================

+ * *

+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.

+ * *

+ ******************************************************************************/

+

+

+package com.att.research.datarouter.node;

+

+import java.net.*;

+import java.util.*;

+import java.io.*;

+import org.apache.log4j.Logger;

+

+import com.att.eelf.configuration.EELFLogger;

+import com.att.eelf.configuration.EELFManager;

+import com.att.research.datarouter.node.eelf.EelfMsgs;

+

+

+/**

+ *	Maintain the configuration of a Data Router node

+ *	<p>

+ *	The NodeConfigManager is the single point of contact for servlet, delivery, event logging, and log retention subsystems to access configuration information.  (Log4J has its own configuration mechanism).

+ *	<p>

+ *	There are two basic sets of configuration data.  The

+ *	static local configuration data, stored in a local configuration file (created

+ *	as part of installation by SWM), and the dynamic global

+ *	configuration data fetched from the data router provisioning server.

+ */

+public class NodeConfigManager implements DeliveryQueueHelper	{

+    private static EELFLogger eelflogger = EELFManager.getInstance().getLogger("com.att.research.datarouter.node.NodeConfigManager");

+	private static Logger logger = Logger.getLogger("com.att.research.datarouter.node.NodeConfigManager");

+	private static NodeConfigManager	base = new NodeConfigManager();

+

+	private Timer timer = new Timer("Node Configuration Timer", true);

+	private long	maxfailuretimer;

+	private long	initfailuretimer;

+	private long	expirationtimer;

+	private double	failurebackoff;

+	private long	fairtimelimit;

+	private int	fairfilelimit;

+	private double	fdpstart;

+	private double	fdpstop;

+	private int	deliverythreads;

+	private String	provurl;

+	private String	provhost;

+	private IsFrom	provcheck;

+	private int	gfport;

+	private int	svcport;

+	private int	port;

+	private String	spooldir;

+	private String	logdir;

+	private long	logretention;

+	private String	redirfile;

+	private String	kstype;

+	private String	ksfile;

+	private String	kspass;

+	private String	kpass;

+	private String	tstype;

+	private String	tsfile;

+	private String	tspass;

+	private String	myname;

+	private RedirManager	rdmgr;

+	private RateLimitedOperation	pfetcher;

+	private NodeConfig	config;

+	private File	quiesce;

+	private PublishId	pid;

+	private String	nak;

+	private TaskList	configtasks = new TaskList();

+	private String	eventlogurl;

+	private String	eventlogprefix;

+	private String	eventlogsuffix;

+	private String	eventloginterval;

+	private boolean	followredirects;

+

+	

+	/**

+	 *	Get the default node configuration manager

+	 */

+	public static NodeConfigManager getInstance() {

+		return(base);

+	}

+	/**

+	 *	Initialize the configuration of a Data Router node

+	 */

+	private NodeConfigManager() {

+		Properties p = new Properties();

+		try {

+			p.load(new FileInputStream(System.getProperty("com.att.research.datarouter.node.ConfigFile", "/opt/app/datartr/etc/node.properties")));

+		} catch (Exception e) {

+			

+			NodeUtils.setIpAndFqdnForEelf("NodeConfigManager");

+			eelflogger.error(EelfMsgs.MESSAGE_PROPERTIES_LOAD_ERROR);

+			logger.error("NODE0301 Unable to load local configuration file " + System.getProperty("com.att.research.datarouter.node.ConfigFile", "/opt/app/datartr/etc/node.properties"), e);

+		}

+		provurl = p.getProperty("ProvisioningURL", "https://feeds-drtr.web.att.com/internal/prov");

+		try {

+			provhost = (new URL(provurl)).getHost();

+		} catch (Exception e) {

+			NodeUtils.setIpAndFqdnForEelf("NodeConfigManager");

+			eelflogger.error(EelfMsgs.MESSAGE_BAD_PROV_URL, provurl);

+			logger.error("NODE0302 Bad provisioning server URL " + provurl);

+			System.exit(1);

+		}

+		logger.info("NODE0303 Provisioning server is " + provhost);

+		eventlogurl = p.getProperty("LogUploadURL", "https://feeds-drtr.web.att.com/internal/logs");

+		provcheck = new IsFrom(provhost);

+		gfport = Integer.parseInt(p.getProperty("IntHttpPort", "8080"));

+		svcport = Integer.parseInt(p.getProperty("IntHttpsPort", "8443"));

+		port = Integer.parseInt(p.getProperty("ExtHttpsPort", "443"));

+		long minpfinterval = Long.parseLong(p.getProperty("MinProvFetchInterval", "10000"));

+		long minrsinterval = Long.parseLong(p.getProperty("MinRedirSaveInterval", "10000"));

+		spooldir = p.getProperty("SpoolDir", "spool");

+		File fdir = new File(spooldir + "/f");

+		fdir.mkdirs();

+		for (File junk: fdir.listFiles()) {

+			if (junk.isFile()) {

+				junk.delete();

+			}

+		}

+		logdir = p.getProperty("LogDir", "logs");

+		(new File(logdir)).mkdirs();

+		logretention = Long.parseLong(p.getProperty("LogRetention", "30")) * 86400000L;

+		eventlogprefix = logdir + "/events";

+		eventlogsuffix = ".log";

+		String redirfile = p.getProperty("RedirectionFile", "etc/redirections.dat");

+		kstype = p.getProperty("KeyStoreType", "jks");

+		ksfile = p.getProperty("KeyStoreFile", "etc/keystore");

+		kspass = p.getProperty("KeyStorePassword", "changeme");

+		kpass = p.getProperty("KeyPassword", "changeme");

+		tstype = p.getProperty("TrustStoreType", "jks");

+		tsfile = p.getProperty("TrustStoreFile");

+		tspass = p.getProperty("TrustStorePassword", "changeme");

+		if (tsfile != null && tsfile.length() > 0) {

+			System.setProperty("javax.net.ssl.trustStoreType", tstype);

+			System.setProperty("javax.net.ssl.trustStore", tsfile);

+			System.setProperty("javax.net.ssl.trustStorePassword", tspass);

+		}

+		nak = p.getProperty("NodeAuthKey", "Node123!");

+		quiesce = new File(p.getProperty("QuiesceFile", "etc/SHUTDOWN"));

+		myname = NodeUtils.getCanonicalName(kstype, ksfile, kspass);

+		if (myname == null) {

+			NodeUtils.setIpAndFqdnForEelf("NodeConfigManager");

+			eelflogger.error(EelfMsgs.MESSAGE_KEYSTORE_FETCH_ERROR, ksfile);

+			logger.error("NODE0309 Unable to fetch canonical name from keystore file " + ksfile);

+			System.exit(1);

+		}

+		logger.info("NODE0304 My certificate says my name is " + myname);

+		pid = new PublishId(myname);

+		rdmgr = new RedirManager(redirfile, minrsinterval, timer);

+		pfetcher = new RateLimitedOperation(minpfinterval, timer) {

+			public void run() {

+				fetchconfig();

+			}

+		};

+		logger.info("NODE0305 Attempting to fetch configuration at " + provurl);

+		pfetcher.request();

+	}

+	private void localconfig() {

+		followredirects = Boolean.parseBoolean(getProvParam("FOLLOW_REDIRECTS", "false"));

+		eventloginterval = getProvParam("LOGROLL_INTERVAL", "5m");

+		initfailuretimer = 10000;

+		maxfailuretimer = 3600000;

+		expirationtimer = 86400000;

+		failurebackoff = 2.0;

+		deliverythreads = 40;

+		fairfilelimit = 100;

+		fairtimelimit = 60000;

+		fdpstart = 0.05;

+		fdpstop = 0.2;

+		try { initfailuretimer = (long)(Double.parseDouble(getProvParam("DELIVERY_INIT_RETRY_INTERVAL")) * 1000); } catch (Exception e) {}

+		try { maxfailuretimer = (long)(Double.parseDouble(getProvParam("DELIVERY_MAX_RETRY_INTERVAL")) * 1000); } catch (Exception e) {}

+		try { expirationtimer = (long)(Double.parseDouble(getProvParam("DELIVERY_MAX_AGE")) * 1000); } catch (Exception e) {}

+		try { failurebackoff = Double.parseDouble(getProvParam("DELIVERY_RETRY_RATIO")); } catch (Exception e) {}

+		try { deliverythreads = Integer.parseInt(getProvParam("DELIVERY_THREADS")); } catch (Exception e) {}

+		try { fairfilelimit = Integer.parseInt(getProvParam("FAIR_FILE_LIMIT")); } catch (Exception e) {}

+		try { fairtimelimit = (long)(Double.parseDouble(getProvParam("FAIR_TIME_LIMIT")) * 1000); } catch (Exception e) {}

+		try { fdpstart = Double.parseDouble(getProvParam("FREE_DISK_RED_PERCENT")) / 100.0; } catch (Exception e) {}

+		try { fdpstop = Double.parseDouble(getProvParam("FREE_DISK_YELLOW_PERCENT")) / 100.0; } catch (Exception e) {}

+		if (fdpstart < 0.01) {

+			fdpstart = 0.01;

+		}

+		if (fdpstart > 0.5) {

+			fdpstart = 0.5;

+		}

+		if (fdpstop < fdpstart) {

+			fdpstop = fdpstart;

+		}

+		if (fdpstop > 0.5) {

+			fdpstop = 0.5;

+		}

+	}

+	private void fetchconfig() {

+		try {

+			System.out.println("provurl:: "+provurl);

+			Reader r = new InputStreamReader((new URL(provurl)).openStream());

+			config = new NodeConfig(new ProvData(r), myname, spooldir, port, nak);

+			localconfig();

+			configtasks.startRun();

+			Runnable rr;

+			while ((rr = configtasks.next()) != null) {

+				try {

+					rr.run();

+				} catch (Exception e) {

+				}

+			}

+		} catch (Exception e) {

+			e.printStackTrace();

+			NodeUtils.setIpAndFqdnForEelf("fetchconfigs");

+			eelflogger.error(EelfMsgs.MESSAGE_CONF_FAILED, e.toString());

+			logger.error("NODE0306 Configuration failed " + e.toString() + " - try again later", e);

+			pfetcher.request();

+		}

+	}

+	/**

+	 *	Process a gofetch request from a particular IP address.  If the

+	 *	IP address is not an IP address we would go to to fetch the

+	 *	provisioning data, ignore the request.  If the data has been

+	 *	fetched very recently (default 10 seconds), wait a while before fetching again.

+	 */

+	public synchronized void gofetch(String remoteaddr) {

+		if (provcheck.isFrom(remoteaddr)) {

+			logger.info("NODE0307 Received configuration fetch request from provisioning server " + remoteaddr);

+			pfetcher.request();

+		} else {

+			logger.info("NODE0308 Received configuration fetch request from unexpected server " + remoteaddr);

+		}

+	}

+	/**

+	 *	Am I configured?

+	 */

+	public boolean isConfigured() {

+		return(config != null);

+	}

+	/**

+	 *	Am I shut down?

+	 */

+	public boolean isShutdown() {

+		return(quiesce.exists());

+	}

+	/**

+	 *	Given a routing string, get the targets.

+	 *	@param routing	Target string

+	 *	@return	array of targets

+	 */

+	public Target[] parseRouting(String routing) {

+		return(config.parseRouting(routing));

+	}

+	/**

+	 *	Given a set of credentials and an IP address, is this request from another node?

+	 *	@param credentials	Credentials offered by the supposed node

+	 *	@param ip	IP address the request came from

+	 *	@return	If the credentials and IP address are recognized, true, otherwise false.

+	 */

+	public boolean isAnotherNode(String credentials, String ip) {

+		return(config.isAnotherNode(credentials, ip));

+	}

+	/**

+	 *	Check whether publication is allowed.

+	 *	@param feedid	The ID of the feed being requested

+	 *	@param credentials	The offered credentials

+	 *	@param ip	The requesting IP address

+	 *	@return	True if the IP and credentials are valid for the specified feed.

+	 */

+	public String isPublishPermitted(String feedid, String credentials, String ip) {

+		return(config.isPublishPermitted(feedid, credentials, ip));

+	}

+	/**

+	 *	Check who the user is given the feed ID and the offered credentials.

+	 *	@param feedid	The ID of the feed specified

+	 *	@param credentials	The offered credentials

+	 *	@return	Null if the credentials are invalid or the user if they are valid.

+	 */

+	public String getAuthUser(String feedid, String credentials) {

+		return(config.getAuthUser(feedid, credentials));

+	}

+	/**

+	 *	Check if the publish request should be sent to another node based on the feedid, user, and source IP address.

+	 *	@param feedid	The ID of the feed specified

+	 *	@param user	The publishing user

+	 *	@param ip	The IP address of the publish endpoint

+	 *	@return	Null if the request should be accepted or the correct hostname if it should be sent to another node.

+	 */

+	public String getIngressNode(String feedid, String user, String ip) {

+		return(config.getIngressNode(feedid, user, ip));

+	}

+	/**

+	 *	Get a provisioned configuration parameter (from the provisioning server configuration)

+	 *	@param name	The name of the parameter

+	 *	@return	The value of the parameter or null if it is not defined.

+	 */

+	public String getProvParam(String name) {

+		return(config.getProvParam(name));

+	}

+	/**

+	 *	Get a provisioned configuration parameter (from the provisioning server configuration)

+	 *	@param name	The name of the parameter

+	 *	@param deflt	The value to use if the parameter is not defined

+	 *	@return	The value of the parameter or deflt if it is not defined.

+	 */

+	public String getProvParam(String name, String deflt) {

+		name = config.getProvParam(name);

+		if (name == null) {

+			name = deflt;

+		}

+		return(name);

+	}

+	/**

+	 *	Generate a publish ID

+	 */

+	public String getPublishId() {

+		return(pid.next());

+	}

+	/**

+	 *	Get all the outbound spooling destinations.

+	 *	This will include both subscriptions and nodes.

+	 */

+	public DestInfo[] getAllDests() {

+		return(config.getAllDests());

+	}

+	/**

+	 *	Register a task to run whenever the configuration changes

+	 */

+	public void registerConfigTask(Runnable task) {

+		configtasks.addTask(task);

+	}

+	/**

+	 *	Deregister a task to run whenever the configuration changes

+	 */

+	public void deregisterConfigTask(Runnable task) {

+		configtasks.removeTask(task);

+	}

+	/**

+	 *	Get the URL to deliver a message to.

+	 *	@param destinfo	The destination information

+	 *	@param fileid	The file ID

+	 *	@return	The URL to deliver to

+	 */

+	public String getDestURL(DestInfo destinfo, String fileid) {

+		String subid = destinfo.getSubId();

+		String purl = destinfo.getURL();

+		if (followredirects && subid != null) {

+			purl = rdmgr.lookup(subid, purl);

+		}

+		return(purl + "/" + fileid);

+	}

+	/**

+	 *	Is a destination redirected?

+	 */

+	public boolean isDestRedirected(DestInfo destinfo) {

+		return(followredirects && rdmgr.isRedirected(destinfo.getSubId()));

+	}

+	/**

+	 *	Set up redirection on receipt of a 3XX from a target URL

+	 */

+	public boolean handleRedirection(DestInfo destinfo, String redirto, String fileid) {

+		fileid = "/" + fileid;

+		String subid = destinfo.getSubId();

+		String purl = destinfo.getURL();

+		if (followredirects && subid != null && redirto.endsWith(fileid)) {

+			redirto = redirto.substring(0, redirto.length() - fileid.length());

+			if (!redirto.equals(purl)) {

+				rdmgr.redirect(subid, purl, redirto);

+				return(true);

+			}

+		}

+		return(false);

+	}

+	/**

+	 *	Handle unreachable target URL

+	 */

+	public void handleUnreachable(DestInfo destinfo) {

+		String subid = destinfo.getSubId();

+		if (followredirects && subid != null) {

+			rdmgr.forget(subid);

+		}

+	}

+	/**

+	 *	Get the timeout before retrying after an initial delivery failure

+	 */

+	public long getInitFailureTimer() {

+		return(initfailuretimer);

+	}

+	/**

+	 *	Get the maximum timeout between delivery attempts

+	 */

+	public long getMaxFailureTimer() {

+		return(maxfailuretimer);

+	}

+	/**

+	 *	Get the ratio between consecutive delivery attempts

+	 */

+	public double getFailureBackoff() {

+		return(failurebackoff);

+	}

+	/**

+	 *	Get the expiration timer for deliveries

+	 */

+	public long getExpirationTimer() {

+		return(expirationtimer);

+	}

+	/**

+	 *	Get the maximum number of file delivery attempts before checking

+	 *	if another queue has work to be performed.

+	 */

+	public int getFairFileLimit() {

+		return(fairfilelimit);

+	}

+	/**

+	 *	Get the maximum amount of time spent delivering files before

+	 *	checking if another queue has work to be performed.

+	 */

+	public long getFairTimeLimit() {

+		return(fairtimelimit);

+	}

+	/**

+	 *	Get the targets for a feed

+	 *	@param feedid	The feed ID

+	 *	@return	The targets this feed should be delivered to

+	 */

+	public Target[] getTargets(String feedid) {

+		return(config.getTargets(feedid));

+	}

+	/**

+	 *	Get the spool directory for temporary files

+	 */

+	public String getSpoolDir() {

+		return(spooldir + "/f");

+	}

+	/**

+	 *	Get the base directory for spool directories

+	 */

+	public String getSpoolBase() {

+		return(spooldir);

+	}

+	/**

+	 *	Get the key store type

+	 */

+	public String getKSType() {

+		return(kstype);

+	}

+	/**

+	 *	Get the key store file

+	 */

+	public String getKSFile() {

+		return(ksfile);

+	}

+	/**

+	 *	Get the key store password

+	 */

+	public String getKSPass() {

+		return(kspass);

+	}

+	/**

+	 *	Get the key password

+	 */

+	public String getKPass() {

+		return(kpass);

+	}

+	/**

+	 *	Get the http port

+	 */

+	public int getHttpPort() {

+		return(gfport);

+	}

+	/**

+	 *	Get the https port

+	 */

+	public int getHttpsPort() {

+		return(svcport);

+	}

+	/**

+	 *	Get the externally visible https port

+	 */

+	public int getExtHttpsPort() {

+		return(port);

+	}

+	/**

+	 *	Get the external name of this machine

+	 */

+	public String getMyName() {

+		return(myname);

+	}

+	/**

+	 *	Get the number of threads to use for delivery

+	 */

+	public int	getDeliveryThreads() {

+		return(deliverythreads);

+	}

+	/**

+	 *	Get the URL for uploading the event log data

+	 */

+	public String	getEventLogUrl() {

+		return(eventlogurl);

+	}

+	/**

+	 *	Get the prefix for the names of event log files

+	 */

+	public String	getEventLogPrefix() {

+		return(eventlogprefix);

+	}

+	/**

+	 *	Get the suffix for the names of the event log files

+	 */

+	public String	getEventLogSuffix() {

+		return(eventlogsuffix);

+	}

+	/**

+	 *	Get the interval between event log file rollovers

+	 */

+	public String getEventLogInterval() {

+		return(eventloginterval);

+	}

+	/**

+	 *	Should I follow redirects from subscribers?

+	 */

+	public boolean isFollowRedirects() {

+		return(followredirects);

+	}

+	/**

+	 *	Get the directory where the event and node log files live

+	 */

+	public String getLogDir() {

+		return(logdir);

+	}

+	/**

+	 *	How long do I keep log files (in milliseconds)

+	 */

+	public long getLogRetention() {

+		return(logretention);

+	}

+	/**

+	 *	Get the timer

+	 */

+	public Timer getTimer() {

+		return(timer);

+	}

+	/**

+	 *	Get the feed ID for a subscription

+	 *	@param subid	The subscription ID

+	 *	@return	The feed ID

+	 */

+	public String getFeedId(String subid) {

+		return(config.getFeedId(subid));

+	}

+	/**

+	 *	Get the authorization string this node uses

+	 *	@return The Authorization string for this node

+	 */

+	public String getMyAuth() {

+		return(config.getMyAuth());

+	}

+	/**

+	 *	Get the fraction of free spool disk space where we start throwing away undelivered files.  This is FREE_DISK_RED_PERCENT / 100.0.  Default is 0.05.  Limited by 0.01 <= FreeDiskStart <= 0.5.

+	 */

+	public double getFreeDiskStart() {

+		return(fdpstart);

+	}

+	/**

+	 *	Get the fraction of free spool disk space where we stop throwing away undelivered files.  This is FREE_DISK_YELLOW_PERCENT / 100.0.  Default is 0.2.  Limited by FreeDiskStart <= FreeDiskStop <= 0.5.

+	 */

+	public double getFreeDiskStop() {

+		return(fdpstop);

+	}

+	/**

+	 *	Get the spool directory for a subscription

+	 */

+	public String getSpoolDir(String subid, String remoteaddr) {

+		if (provcheck.isFrom(remoteaddr)) {

+			String sdir = config.getSpoolDir(subid);

+			if (sdir != null) {

+				logger.info("NODE0310 Received subscription reset request for subscription " + subid + " from provisioning server " + remoteaddr);

+			} else {

+				logger.info("NODE0311 Received subscription reset request for unknown subscription " + subid + " from provisioning server " + remoteaddr);

+			}

+			return(sdir);

+		} else {

+			logger.info("NODE0312 Received subscription reset request from unexpected server " + remoteaddr);

+			return(null);

+		}

+	}

+}

diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeMain.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeMain.java
new file mode 100644
index 0000000..c939041
--- /dev/null
+++ b/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeMain.java
@@ -0,0 +1,113 @@
+/*******************************************************************************

+ * ============LICENSE_START==================================================

+ * * org.onap.dmaap

+ * * ===========================================================================

+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.

+ * * ===========================================================================

+ * * Licensed under the Apache License, Version 2.0 (the "License");

+ * * you may not use this file except in compliance with the License.

+ * * You may obtain a copy of the License at

+ * * 

+ *  *      http://www.apache.org/licenses/LICENSE-2.0

+ * * 

+ *  * Unless required by applicable law or agreed to in writing, software

+ * * distributed under the License is distributed on an "AS IS" BASIS,

+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * * See the License for the specific language governing permissions and

+ * * limitations under the License.

+ * * ============LICENSE_END====================================================

+ * *

+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.

+ * *

+ ******************************************************************************/

+

+

+package com.att.research.datarouter.node;

+

+import org.eclipse.jetty.servlet.*;

+import org.eclipse.jetty.util.ssl.*;

+import org.eclipse.jetty.server.*;

+import org.eclipse.jetty.server.nio.*;

+import org.eclipse.jetty.server.ssl.*;

+import org.apache.log4j.Logger;

+

+/**

+ *	The main starting point for the Data Router node

+ */

+public class NodeMain	{

+	private NodeMain() {}

+	private static Logger	logger = Logger.getLogger("com.att.research.datarouter.node.NodeMain");

+	private static class wfconfig implements Runnable	{

+		private NodeConfigManager ncm;

+		public wfconfig(NodeConfigManager ncm) {

+			this.ncm = ncm;

+		}

+		public synchronized void run() {

+			notify();

+		}

+		public synchronized void waitforconfig() {

+			ncm.registerConfigTask(this);

+			while (!ncm.isConfigured()) {

+				logger.info("NODE0003 Waiting for Node Configuration");

+				try {

+					wait();

+				} catch (Exception e) {

+				}

+			}

+			ncm.deregisterConfigTask(this);

+			logger.info("NODE0004 Node Configuration Data Received");

+		}

+	}

+	private static Delivery d;

+	private static NodeConfigManager ncm;

+	/**

+	 *	Reset the retry timer for a subscription

+	 */

+	public static void resetQueue(String subid, String ip) {

+		d.resetQueue(ncm.getSpoolDir(subid, ip));

+	}

+	/**

+	 *	Start the data router.

+	 *	<p>

+	 *	The location of the node configuration file can be set using the

+	 *	com.att.research.datarouter.node.ConfigFile system property.  By

+	 *	default, it is "etc/node.properties".

+	 */

+	public static void main(String[] args) throws Exception {

+		logger.info("NODE0001 Data Router Node Starting");

+		IsFrom.setDNSCache();

+		ncm = NodeConfigManager.getInstance();

+		logger.info("NODE0002 I am " + ncm.getMyName());

+		(new wfconfig(ncm)).waitforconfig();

+		d = new Delivery(ncm);

+		LogManager lm = new LogManager(ncm);

+		Server server = new Server();

+		SelectChannelConnector http = new SelectChannelConnector();

+		http.setPort(ncm.getHttpPort());

+		http.setMaxIdleTime(2000);

+		http.setRequestHeaderSize(2048);

+		SslSelectChannelConnector https = new SslSelectChannelConnector();

+		https.setPort(ncm.getHttpsPort());

+		https.setMaxIdleTime(30000);

+		https.setRequestHeaderSize(8192);

+		SslContextFactory cf = https.getSslContextFactory();

+		

+		/**Skip SSLv3 Fixes*/

+		cf.addExcludeProtocols("SSLv3");

+		logger.info("Excluded protocols node-"+cf.getExcludeProtocols());

+		/**End of SSLv3 Fixes*/

+

+		cf.setKeyStoreType(ncm.getKSType());

+		cf.setKeyStorePath(ncm.getKSFile());

+		cf.setKeyStorePassword(ncm.getKSPass());

+		cf.setKeyManagerPassword(ncm.getKPass());

+		server.setConnectors(new Connector[] { http, https });

+		ServletContextHandler ctxt = new ServletContextHandler(0);

+		ctxt.setContextPath("/");

+		server.setHandler(ctxt);

+		ctxt.addServlet(new ServletHolder(new NodeServlet()), "/*");

+		logger.info("NODE0005 Data Router Node Activating Service");

+		server.start();

+		server.join();

+	}

+}

diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeServlet.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeServlet.java
new file mode 100644
index 0000000..e0ec1f5
--- /dev/null
+++ b/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeServlet.java
@@ -0,0 +1,380 @@
+/*******************************************************************************

+ * ============LICENSE_START==================================================

+ * * org.onap.dmaap

+ * * ===========================================================================

+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.

+ * * ===========================================================================

+ * * Licensed under the Apache License, Version 2.0 (the "License");

+ * * you may not use this file except in compliance with the License.

+ * * You may obtain a copy of the License at

+ * * 

+ *  *      http://www.apache.org/licenses/LICENSE-2.0

+ * * 

+ *  * Unless required by applicable law or agreed to in writing, software

+ * * distributed under the License is distributed on an "AS IS" BASIS,

+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * * See the License for the specific language governing permissions and

+ * * limitations under the License.

+ * * ============LICENSE_END====================================================

+ * *

+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.

+ * *

+ ******************************************************************************/

+

+

+package com.att.research.datarouter.node;

+

+import javax.servlet.*;

+import javax.servlet.http.*;

+import java.util.*;

+import java.util.regex.*;

+import java.io.*;

+import java.nio.file.*;

+import org.apache.log4j.Logger;

+

+import com.att.eelf.configuration.EELFLogger;

+import com.att.eelf.configuration.EELFManager;

+import com.att.research.datarouter.node.eelf.EelfMsgs;

+

+import java.net.*;

+

+/**

+ *	Servlet for handling all http and https requests to the data router node

+ *	<p>

+ *	Handled requests are:

+ *	<br>

+ *	GET http://<i>node</i>/internal/fetchProv - fetch the provisioning data

+ *	<br>

+ *	PUT/DELETE https://<i>node</i>/internal/publish/<i>fileid</i> - n2n transfer

+ *	<br>

+ *	PUT/DELETE https://<i>node</i>/publish/<i>feedid</i>/<i>fileid</i> - publsh request

+ */

+public class NodeServlet extends HttpServlet	{

+	private static Logger logger = Logger.getLogger("com.att.research.datarouter.node.NodeServlet");

+	private static NodeConfigManager	config;

+	private static Pattern	MetaDataPattern;

+	private static SubnetMatcher internalsubnet = new SubnetMatcher("135.207.136.128/25");

+	//Adding EELF Logger Rally:US664892  

+    private static EELFLogger eelflogger = EELFManager.getInstance().getLogger("com.att.research.datarouter.node.NodeServlet");

+

+	static {

+		try {

+			String ws = "\\s*";

+			// assume that \\ and \" have been replaced by X

+			String string = "\"[^\"]*\"";

+			//String string = "\"(?:[^\"\\\\]|\\\\.)*\"";

+			String number = "[+-]?(?:\\.\\d+|(?:0|[1-9]\\d*)(?:\\.\\d*)?)(?:[eE][+-]?\\d+)?";

+			String value = "(?:" + string + "|" + number + "|null|true|false)";

+			String item = string + ws + ":" + ws + value + ws;

+			String object = ws + "\\{" + ws + "(?:" + item + "(?:" + "," + ws + item + ")*)?\\}" + ws;

+			MetaDataPattern = Pattern.compile(object, Pattern.DOTALL);

+		} catch (Exception e) {

+		}

+	}

+	/**

+	 *	Get the NodeConfigurationManager

+	 */

+	public void init() {

+		config = NodeConfigManager.getInstance();

+		logger.info("NODE0101 Node Servlet Configured");

+	}

+	private boolean down(HttpServletResponse resp) throws IOException {

+		if (config.isShutdown() || !config.isConfigured()) {

+			resp.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE);

+			logger.info("NODE0102 Rejecting request: Service is being quiesced");

+			return(true);

+		}

+		return(false);

+	}

+	/**

+	 *	Handle a GET for /internal/fetchProv

+	 */

+	protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {

+		NodeUtils.setIpAndFqdnForEelf("doGet");

+		eelflogger.info(EelfMsgs.MESSAGE_WITH_BEHALF_AND_FEEDID, req.getHeader("X-ATT-DR-ON-BEHALF-OF"),getIdFromPath(req)+"");

+		if (down(resp)) {

+			return;

+		}

+		String path = req.getPathInfo();

+		String qs = req.getQueryString();

+		String ip = req.getRemoteAddr();

+		if (qs != null) {

+			path = path + "?" + qs;

+		}

+		if ("/internal/fetchProv".equals(path)) {

+			config.gofetch(ip);

+			resp.setStatus(HttpServletResponse.SC_NO_CONTENT);

+			return;

+		} else if (path.startsWith("/internal/resetSubscription/")) {

+			String subid = path.substring(28);

+			if (subid.length() != 0 && subid.indexOf('/') == -1) {

+				NodeMain.resetQueue(subid, ip);

+				resp.setStatus(HttpServletResponse.SC_NO_CONTENT);

+				return;

+			}

+		}

+		if (internalsubnet.matches(NodeUtils.getInetAddress(ip))) {

+			if (path.startsWith("/internal/logs/")) {

+				String f = path.substring(15);

+				File fn = new File(config.getLogDir() + "/" + f);

+				if (f.indexOf('/') != -1 || !fn.isFile()) {

+					logger.info("NODE0103 Rejecting invalid GET of " + path + " from " + ip);

+					resp.sendError(HttpServletResponse.SC_NOT_FOUND);

+					return;

+				}

+				byte[] buf = new byte[65536];

+				resp.setContentType("text/plain");

+				resp.setContentLength((int)fn.length());

+				resp.setStatus(200);

+				InputStream is = new FileInputStream(fn);

+				OutputStream os = resp.getOutputStream();

+				int i;

+				while ((i = is.read(buf)) > 0) {

+					os.write(buf, 0, i);

+				}

+				is.close();

+				return;

+			}

+			if (path.startsWith("/internal/rtt/")) {

+				String xip = path.substring(14);

+				long st = System.currentTimeMillis();

+				String status = " unknown";

+				try {

+					Socket s = new Socket(xip, 443);

+					s.close();

+					status = " connected";

+				} catch (Exception e) {

+					status = " error " + e.toString();

+				}

+				long dur = System.currentTimeMillis() - st;

+				resp.setContentType("text/plain");

+				resp.setStatus(200);

+				byte[] buf = (dur + status + "\n").getBytes();

+				resp.setContentLength(buf.length);

+				resp.getOutputStream().write(buf);

+				return;

+			}

+		}

+		logger.info("NODE0103 Rejecting invalid GET of " + path + " from " + ip);

+		resp.sendError(HttpServletResponse.SC_NOT_FOUND);

+		return;

+	}

+	/**

+	 *	Handle all PUT requests

+	 */

+	protected void doPut(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {

+		NodeUtils.setIpAndFqdnForEelf("doPut");

+		eelflogger.info(EelfMsgs.MESSAGE_WITH_BEHALF_AND_FEEDID, req.getHeader("X-ATT-DR-ON-BEHALF-OF"),getIdFromPath(req)+"");

+		common(req, resp, true);

+	}

+	/**

+	 *	Handle all DELETE requests

+	 */

+	protected void doDelete(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {

+		NodeUtils.setIpAndFqdnForEelf("doDelete");

+		eelflogger.info(EelfMsgs.MESSAGE_WITH_BEHALF_AND_FEEDID, req.getHeader("X-ATT-DR-ON-BEHALF-OF"),getIdFromPath(req)+"");

+		common(req, resp, false);

+	}

+	private void common(HttpServletRequest req, HttpServletResponse resp, boolean isput) throws ServletException, IOException {

+		if (down(resp)) {

+			return;

+		}

+		if (!req.isSecure()) {

+			logger.info("NODE0104 Rejecting insecure PUT or DELETE of " + req.getPathInfo() + " from " + req.getRemoteAddr());

+			resp.sendError(HttpServletResponse.SC_FORBIDDEN, "https required on publish requests");

+			return;

+		}

+		String fileid = req.getPathInfo();

+		if (fileid == null) {

+			logger.info("NODE0105 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + " from " + req.getRemoteAddr());

+			resp.sendError(HttpServletResponse.SC_NOT_FOUND, "Invalid request URI.  Expecting <feed-publishing-url>/<fileid>.");

+			return;

+		}

+		String feedid = null;

+		String user = null;

+		String credentials = req.getHeader("Authorization");

+		if (credentials == null) {

+			logger.info("NODE0106 Rejecting unauthenticated PUT or DELETE of " + req.getPathInfo() + " from " + req.getRemoteAddr());

+			resp.sendError(HttpServletResponse.SC_FORBIDDEN, "Authorization header required");

+			return;

+		}

+		String ip = req.getRemoteAddr();

+		String lip = req.getLocalAddr();

+		String pubid = null;

+		String xpubid = null;

+		String rcvd = NodeUtils.logts(System.currentTimeMillis()) + ";from=" + ip + ";by=" + lip;

+		Target[]	targets = null;

+		if (fileid.startsWith("/publish/")) {

+			fileid = fileid.substring(9);

+			int i = fileid.indexOf('/');

+			if (i == -1 || i == fileid.length() - 1) {

+				logger.info("NODE0105 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + " from " + req.getRemoteAddr());

+				resp.sendError(HttpServletResponse.SC_NOT_FOUND, "Invalid request URI.  Expecting <feed-publishing-url>/<fileid>.  Possible missing fileid.");

+				return;

+			}

+			feedid = fileid.substring(0, i);

+			fileid = fileid.substring(i + 1);

+			pubid = config.getPublishId();

+			xpubid = req.getHeader("X-ATT-DR-PUBLISH-ID");

+			targets = config.getTargets(feedid);

+		} else if (fileid.startsWith("/internal/publish/")) {

+			if (!config.isAnotherNode(credentials, ip)) {

+				logger.info("NODE0107 Rejecting unauthorized node-to-node transfer attempt from " + ip);

+				resp.sendError(HttpServletResponse.SC_FORBIDDEN);

+				return;

+			}

+			fileid = fileid.substring(18);

+			pubid = req.getHeader("X-ATT-DR-PUBLISH-ID");

+			targets = config.parseRouting(req.getHeader("X-ATT-DR-ROUTING"));

+		} else {

+			logger.info("NODE0105 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + " from " + req.getRemoteAddr());

+			resp.sendError(HttpServletResponse.SC_NOT_FOUND, "Invalid request URI.  Expecting <feed-publishing-url>/<fileid>.");

+			return;

+		}

+		if (fileid.indexOf('/') != -1) {

+			logger.info("NODE0105 Rejecting bad URI for PUT or DELETE of " + req.getPathInfo() + " from " + req.getRemoteAddr());

+			resp.sendError(HttpServletResponse.SC_NOT_FOUND, "Invalid request URI.  Expecting <feed-publishing-url>/<fileid>.");

+			return;

+		}

+		String qs = req.getQueryString();

+		if (qs != null) {

+			fileid = fileid + "?" + qs;

+		}

+		String hp = config.getMyName();

+		int xp = config.getExtHttpsPort();

+		if (xp != 443) {

+			hp = hp + ":" + xp;

+		}

+		String logurl = "https://" + hp + "/internal/publish/" + fileid;

+		if (feedid != null) {

+			logurl = "https://" + hp + "/publish/" + feedid + "/" + fileid;

+			String reason = config.isPublishPermitted(feedid, credentials, ip);

+			if (reason != null) {

+				logger.info("NODE0111 Rejecting unauthorized publish attempt to feed " + feedid + " fileid " + fileid + " from " + ip + " reason " + reason);

+				resp.sendError(HttpServletResponse.SC_FORBIDDEN,reason);

+				return;

+			}

+			user = config.getAuthUser(feedid, credentials);

+			String newnode = config.getIngressNode(feedid, user, ip);

+			if (newnode != null) {

+				String port = "";

+				int iport = config.getExtHttpsPort();

+				if (iport != 443) {

+					port = ":" + iport;

+				}

+				String redirto = "https://" + newnode + port + "/publish/" + feedid + "/" + fileid;

+				logger.info("NODE0108 Redirecting publish attempt for feed " + feedid + " user " + user + " ip " + ip + " to " + redirto);

+				resp.sendRedirect(redirto);

+				return;

+			}

+			resp.setHeader("X-ATT-DR-PUBLISH-ID", pubid);

+		}

+		String fbase = config.getSpoolDir() + "/" + pubid;

+		File data = new File(fbase);

+		File meta = new File(fbase + ".M");

+		OutputStream dos = null;

+		Writer mw = null;

+		InputStream is = null;

+		try {

+			StringBuffer mx = new StringBuffer();

+			mx.append(req.getMethod()).append('\t').append(fileid).append('\n');

+			Enumeration hnames = req.getHeaderNames();

+			String ctype = null;

+			while (hnames.hasMoreElements()) {

+				String hn = (String)hnames.nextElement();

+				String hnlc = hn.toLowerCase();

+				if ((isput && ("content-type".equals(hnlc) ||

+				    "content-language".equals(hnlc) ||

+				    "content-md5".equals(hnlc) ||

+				    "content-range".equals(hnlc))) ||

+				    "x-att-dr-meta".equals(hnlc) ||

+				    (feedid == null && "x-att-dr-received".equals(hnlc)) ||

+				    (hnlc.startsWith("x-") && !hnlc.startsWith("x-att-dr-"))) {

+					Enumeration hvals = req.getHeaders(hn);

+					while (hvals.hasMoreElements()) {

+						String hv = (String)hvals.nextElement();

+						if ("content-type".equals(hnlc)) {

+							ctype = hv;

+						}

+						if ("x-att-dr-meta".equals(hnlc)) {

+							if (hv.length() > 4096) {

+								logger.info("NODE0109 Rejecting publish attempt with metadata too long for feed " + feedid + " user " + user + " ip " + ip);

+								resp.sendError(HttpServletResponse.SC_BAD_REQUEST, "Metadata too long");

+								return;

+							}

+							if (!MetaDataPattern.matcher(hv.replaceAll("\\\\.", "X")).matches()) {

+								logger.info("NODE0109 Rejecting publish attempt with malformed metadata for feed " + feedid + " user " + user + " ip " + ip);

+								resp.sendError(HttpServletResponse.SC_BAD_REQUEST, "Malformed metadata");

+								return;

+							}

+						}

+						mx.append(hn).append('\t').append(hv).append('\n');

+					}

+				}

+			}

+			mx.append("X-ATT-DR-RECEIVED\t").append(rcvd).append('\n');

+			String metadata = mx.toString();

+			byte[] buf = new byte[1024 * 1024];

+			int i;

+			try {

+				is = req.getInputStream();

+				dos = new FileOutputStream(data);

+				while ((i = is.read(buf)) > 0) {

+					dos.write(buf, 0, i);

+				}

+				is.close();

+				is = null;

+				dos.close();

+				dos = null;

+			} catch (IOException ioe) {

+				long exlen = -1;

+				try {

+					exlen = Long.parseLong(req.getHeader("Content-Length"));

+				} catch (Exception e) {

+				}

+				StatusLog.logPubFail(pubid, feedid, logurl, req.getMethod(), ctype, exlen, data.length(), ip, user, ioe.getMessage());

+				throw ioe;

+			}

+			Path dpath = Paths.get(fbase);

+			for (Target t: targets) {

+				DestInfo di = t.getDestInfo();

+				if (di == null) {

+					// TODO: unknown destination

+					continue;

+				}

+				String dbase = di.getSpool() + "/" + pubid;

+				Files.createLink(Paths.get(dbase), dpath);

+				mw = new FileWriter(meta);

+				mw.write(metadata);

+				if (di.getSubId() == null) {

+					mw.write("X-ATT-DR-ROUTING\t" + t.getRouting() + "\n");

+				}

+				mw.close();

+				meta.renameTo(new File(dbase + ".M"));

+			}

+			resp.setStatus(HttpServletResponse.SC_NO_CONTENT);

+			resp.getOutputStream().close();

+			StatusLog.logPub(pubid, feedid, logurl, req.getMethod(), ctype, data.length(), ip, user, HttpServletResponse.SC_NO_CONTENT);

+		} catch (IOException ioe) {

+			logger.info("NODE0110 IO Exception receiving publish attempt for feed " + feedid + " user " + user + " ip " + ip + " " + ioe.toString(), ioe);

+			throw ioe;

+		} finally {

+			if (is != null) { try { is.close(); } catch (Exception e) {}}

+			if (dos != null) { try { dos.close(); } catch (Exception e) {}}

+			if (mw != null) { try { mw.close(); } catch (Exception e) {}}

+			try { data.delete(); } catch (Exception e) {}

+			try { meta.delete(); } catch (Exception e) {}

+		}

+	}

+	

+	private int getIdFromPath(HttpServletRequest req) {

+		String path = req.getPathInfo();

+		if (path == null || path.length() < 2)

+			return -1;

+		try {

+			return Integer.parseInt(path.substring(1));

+		} catch (NumberFormatException e) {

+			return -1;

+		}

+	}

+}

diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeUtils.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeUtils.java
new file mode 100644
index 0000000..5471c0d
--- /dev/null
+++ b/datarouter-node/src/main/java/com/att/research/datarouter/node/NodeUtils.java
@@ -0,0 +1,226 @@
+/*******************************************************************************

+ * ============LICENSE_START==================================================

+ * * org.onap.dmaap

+ * * ===========================================================================

+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.

+ * * ===========================================================================

+ * * Licensed under the Apache License, Version 2.0 (the "License");

+ * * you may not use this file except in compliance with the License.

+ * * You may obtain a copy of the License at

+ * * 

+ *  *      http://www.apache.org/licenses/LICENSE-2.0

+ * * 

+ *  * Unless required by applicable law or agreed to in writing, software

+ * * distributed under the License is distributed on an "AS IS" BASIS,

+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * * See the License for the specific language governing permissions and

+ * * limitations under the License.

+ * * ============LICENSE_END====================================================

+ * *

+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.

+ * *

+ ******************************************************************************/

+

+

+package com.att.research.datarouter.node;

+

+import static com.att.eelf.configuration.Configuration.MDC_SERVER_FQDN;

+import static com.att.eelf.configuration.Configuration.MDC_SERVER_IP_ADDRESS;

+import static com.att.eelf.configuration.Configuration.MDC_SERVICE_NAME;

+

+import java.security.*;

+import java.io.*;

+import java.util.*;

+import java.security.cert.*;

+import java.net.*;

+import java.text.*;

+import org.apache.commons.codec.binary.Base64;

+import org.apache.log4j.Logger;

+import org.slf4j.MDC;

+

+import com.att.eelf.configuration.EELFLogger;

+import com.att.eelf.configuration.EELFManager;

+import com.att.research.datarouter.node.eelf.EelfMsgs;

+

+/**

+ *	Utility functions for the data router node

+ */

+public class NodeUtils	{

+    private static EELFLogger eelflogger = EELFManager.getInstance().getLogger("com.att.research.datarouter.node.NodeUtils");

+	private static Logger logger = Logger.getLogger("com.att.research.datarouter.node.NodeUtils");

+	private static SimpleDateFormat	logdate;

+	static {

+		logdate = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");

+		logdate.setTimeZone(TimeZone.getTimeZone("GMT"));

+	}

+	private NodeUtils() {}

+	/**

+	 *	Base64 encode a byte array

+	 *	@param raw	The bytes to be encoded

+	 *	@return	The encoded string

+	 */

+	public static String base64Encode(byte[] raw) {

+		return(Base64.encodeBase64String(raw));

+	}

+	/**

+	 *	Given a user and password, generate the credentials

+	 *	@param user	User name

+	 *	@param password	User password

+	 *	@return	Authorization header value

+	 */

+	public static String getAuthHdr(String user, String password) {

+		if (user == null || password == null) {

+			return(null);

+		}

+		return("Basic " + base64Encode((user + ":" + password).getBytes()));

+	}

+	/**

+	 *	Given a node name, generate the credentials

+	 *	@param node	Node name

+	 */

+	public static String	getNodeAuthHdr(String node, String key) {

+		try {

+			MessageDigest md = MessageDigest.getInstance("SHA");

+			md.update(key.getBytes());

+			md.update(node.getBytes());

+			md.update(key.getBytes());

+			return(getAuthHdr(node, base64Encode(md.digest())));

+		} catch (Exception e) {

+			return(null);

+		}

+	}

+	/**

+	 *	Given a keystore file and its password, return the value of the CN of the first private key entry with a certificate.

+	 *	@param kstype	The type of keystore

+	 *	@param ksfile	The file name of the keystore

+	 *	@param kspass	The password of the keystore

+	 *	@return	CN of the certificate subject or null

+	 */

+	public static String getCanonicalName(String kstype, String ksfile, String kspass) {

+		try {

+			KeyStore ks = KeyStore.getInstance(kstype);

+			ks.load(new FileInputStream(ksfile), kspass.toCharArray());

+			return(getCanonicalName(ks));

+		} catch (Exception e) {

+			setIpAndFqdnForEelf("getCanonicalName");

+			eelflogger.error(EelfMsgs.MESSAGE_KEYSTORE_LOAD_ERROR, ksfile, e.toString());

+			logger.error("NODE0401 Error loading my keystore file + " + ksfile + " " + e.toString(), e);

+			return(null);

+		}

+	}

+	/**

+	 *	Given a keystore, return the value of the CN of the first private key entry with a certificate.

+	 *	@param ks	The KeyStore

+	 *	@return	CN of the certificate subject or null

+	 */

+	public static String getCanonicalName(KeyStore ks) {

+		try {

+			Enumeration<String> aliases = ks.aliases();

+			while (aliases.hasMoreElements()) {

+				String s = aliases.nextElement();

+				if (ks.entryInstanceOf(s, KeyStore.PrivateKeyEntry.class)) {

+					X509Certificate c = (X509Certificate)ks.getCertificate(s);

+					if (c != null) {

+						String subject = c.getSubjectX500Principal().getName();

+						String[] parts = subject.split(",");

+						if (parts.length < 1) {

+							return(null);

+						}

+						subject = parts[0].trim();

+						if (!subject.startsWith("CN=")) {

+							return(null);

+

+						}

+						return(subject.substring(3));

+					}

+				}

+			}

+		} catch (Exception e) {

+			logger.error("NODE0402 Error extracting my name from my keystore file " + e.toString(), e);

+		}

+		return(null);

+	}

+	/**

+	 *	Given a string representation of an IP address, get the corresponding byte array

+	 *	@param ip	The IP address as a string

+	 *	@return	The IP address as a byte array or null if the address is invalid

+	 */

+	public static byte[] getInetAddress(String ip) {

+		try {

+			return(InetAddress.getByName(ip).getAddress());

+		} catch (Exception e) {

+		}

+		return(null);

+	}

+	/**

+	 *	Given a uri with parameters, split out the feed ID and file ID

+	 */

+	public static String[] getFeedAndFileID(String uriandparams) {

+		int end = uriandparams.length();

+		int i = uriandparams.indexOf('#');

+		if (i != -1 && i < end) {

+			end = i;

+		}

+		i = uriandparams.indexOf('?');

+		if (i != -1 && i < end) {

+			end = i;

+		}

+		end = uriandparams.lastIndexOf('/', end);

+		if (end < 2) {

+			return(null);

+		}

+		i = uriandparams.lastIndexOf('/', end - 1);

+		if (i == -1) {

+			return(null);

+		}

+		return(new String[] { uriandparams.substring(i + 1, end - 1), uriandparams.substring(end + 1) });

+	}

+	/**

+	 *	Escape fields that might contain vertical bar, backslash, or newline by replacing them with backslash p, backslash e and backslash n.

+	 */

+	public static String loge(String s) {

+		if (s == null) {

+			return(s);

+		}

+		return(s.replaceAll("\\\\", "\\\\e").replaceAll("\\|", "\\\\p").replaceAll("\n", "\\\\n"));

+	}

+	/**

+	 *	Undo what loge does.

+	 */

+	public static String unloge(String s) {

+		if (s == null) {

+			return(s);

+		}

+		return(s.replaceAll("\\\\p", "\\|").replaceAll("\\\\n", "\n").replaceAll("\\\\e", "\\\\"));

+	}

+	/**

+	 *	Format a logging timestamp as yyyy-mm-ddThh:mm:ss.mmmZ

+	 */

+	public static String logts(long when) {

+		return(logts(new Date(when)));

+	}

+	/**

+	 *	Format a logging timestamp as yyyy-mm-ddThh:mm:ss.mmmZ

+	 */

+	public static synchronized String logts(Date when) {

+		return(logdate.format(when));

+	}

+	

+	/* Method prints method name, server FQDN and IP Address of the machine in EELF logs

+	 * @Method - setIpAndFqdnForEelf - Rally:US664892  

+	 * @Params - method, prints method name in EELF log.

+	 */	

+	public static void setIpAndFqdnForEelf(String method) {

+	 	MDC.clear();

+        MDC.put(MDC_SERVICE_NAME, method);

+        try {

+            MDC.put(MDC_SERVER_FQDN, InetAddress.getLocalHost().getHostName());

+            MDC.put(MDC_SERVER_IP_ADDRESS, InetAddress.getLocalHost().getHostAddress());

+        } catch (Exception e) {

+            e.printStackTrace();

+        }

+

+	}

+	

+

+}

diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/PathFinder.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/PathFinder.java
new file mode 100644
index 0000000..7ff9183
--- /dev/null
+++ b/datarouter-node/src/main/java/com/att/research/datarouter/node/PathFinder.java
@@ -0,0 +1,132 @@
+/*******************************************************************************

+ * ============LICENSE_START==================================================

+ * * org.onap.dmaap

+ * * ===========================================================================

+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.

+ * * ===========================================================================

+ * * Licensed under the Apache License, Version 2.0 (the "License");

+ * * you may not use this file except in compliance with the License.

+ * * You may obtain a copy of the License at

+ * * 

+ *  *      http://www.apache.org/licenses/LICENSE-2.0

+ * * 

+ *  * Unless required by applicable law or agreed to in writing, software

+ * * distributed under the License is distributed on an "AS IS" BASIS,

+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * * See the License for the specific language governing permissions and

+ * * limitations under the License.

+ * * ============LICENSE_END====================================================

+ * *

+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.

+ * *

+ ******************************************************************************/

+

+

+package com.att.research.datarouter.node;

+

+import java.util.*;

+

+/**

+ *	Given a set of node names and next hops, identify and ignore any cycles and figure out the sequence of next hops to get from this node to any other node

+ */

+

+public class PathFinder	{

+	private static class Hop	{

+		public boolean	mark;

+		public boolean	bad;

+		public NodeConfig.ProvHop	basis;

+	}

+	private Vector<String> errors = new Vector<String>();

+	private Hashtable<String, String> routes = new Hashtable<String, String>();

+	/**

+	 *	Get list of errors encountered while finding paths

+	 *	@return array of error descriptions

+	 */

+	public String[] getErrors() {

+		return(errors.toArray(new String[errors.size()]));

+	}

+	/**

+	 *	Get the route from this node to the specified node

+	 *	@param destination node

+	 *	@return	list of node names separated by and ending with "/"

+	 */

+	public String getPath(String destination) {

+		String ret = routes.get(destination);

+		if (ret == null) {

+			return("");

+		}

+		return(ret);

+	}

+	private String plot(String from, String to, Hashtable<String, Hop> info) {

+		Hop nh = info.get(from);

+		if (nh == null || nh.bad) {

+			return(to);

+		}

+		if (nh.mark) {

+			// loop detected;

+			while (!nh.bad) {

+				nh.bad = true;

+				errors.add(nh.basis + " is part of a cycle");

+				nh = info.get(nh.basis.getVia());

+			}

+			return(to);

+		}

+		nh.mark = true;

+		String x = plot(nh.basis.getVia(), to, info);

+		nh.mark = false;

+		if (nh.bad) {

+			return(to);

+		}

+		return(nh.basis.getVia() + "/" + x);

+	}

+	/**

+	 *	Find routes from a specified origin to all of the nodes given a set of specified next hops.

+	 *	@param origin	where we start

+	 *	@param nodes	where we can go

+	 *	@param hops	detours along the way

+	 */

+	public PathFinder(String origin, String[] nodes, NodeConfig.ProvHop[] hops) {

+		HashSet<String> known = new HashSet<String>();

+		Hashtable<String, Hashtable<String, Hop>> ht = new Hashtable<String, Hashtable<String, Hop>>();

+		for (String n: nodes) {

+			known.add(n);

+			ht.put(n, new Hashtable<String, Hop>());

+		}

+		for (NodeConfig.ProvHop ph: hops) {

+			if (!known.contains(ph.getFrom())) {

+				errors.add(ph + " references unknown from node");

+				continue;

+			}

+			if (!known.contains(ph.getTo())) {

+				errors.add(ph + " references unknown destination node");

+				continue;

+			}

+			Hashtable<String, Hop> ht2 = ht.get(ph.getTo());

+			Hop h = ht2.get(ph.getFrom());

+			if (h != null) {

+				h.bad = true;

+				errors.add(ph + " gives duplicate next hop - previous via was " + h.basis.getVia());

+				continue;

+			}

+			h = new Hop();

+			h.basis = ph;

+			ht2.put(ph.getFrom(), h);

+			if (!known.contains(ph.getVia())) {

+				errors.add(ph + " references unknown via node");

+				h.bad = true;

+				continue;

+			}

+			if (ph.getVia().equals(ph.getTo())) {

+				errors.add(ph + " gives destination as via");

+				h.bad = true;

+				continue;

+			}

+		}

+		for (String n: known) {

+			if (n.equals(origin)) {

+				routes.put(n, "");

+			}

+			routes.put(n, plot(origin, n, ht.get(n)) + "/");

+		}

+	}

+}

diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/ProvData.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/ProvData.java
new file mode 100644
index 0000000..19cb899
--- /dev/null
+++ b/datarouter-node/src/main/java/com/att/research/datarouter/node/ProvData.java
@@ -0,0 +1,302 @@
+/*******************************************************************************

+ * ============LICENSE_START==================================================

+ * * org.onap.dmaap

+ * * ===========================================================================

+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.

+ * * ===========================================================================

+ * * Licensed under the Apache License, Version 2.0 (the "License");

+ * * you may not use this file except in compliance with the License.

+ * * You may obtain a copy of the License at

+ * * 

+ *  *      http://www.apache.org/licenses/LICENSE-2.0

+ * * 

+ *  * Unless required by applicable law or agreed to in writing, software

+ * * distributed under the License is distributed on an "AS IS" BASIS,

+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * * See the License for the specific language governing permissions and

+ * * limitations under the License.

+ * * ============LICENSE_END====================================================

+ * *

+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.

+ * *

+ ******************************************************************************/

+

+

+package com.att.research.datarouter.node;

+

+import java.io.*;

+import java.util.*;

+import org.json.*;

+import org.apache.log4j.Logger;

+

+import com.att.eelf.configuration.EELFLogger;

+import com.att.eelf.configuration.EELFManager;

+import com.att.research.datarouter.node.eelf.EelfMsgs;

+

+/**

+ *	Parser for provisioning data from the provisioning server.

+ *	<p>

+ *	The ProvData class uses a Reader for the text configuration from the

+ *	provisioning server to construct arrays of raw configuration entries.

+ */

+public class ProvData	{

+    private static EELFLogger eelflogger = EELFManager.getInstance().getLogger("com.att.research.datarouter.node.ProvData");

+	private static Logger logger = Logger.getLogger("com.att.research.datarouter.node.ProvData");

+	private NodeConfig.ProvNode[]	pn;

+	private NodeConfig.ProvParam[]	pp;

+	private NodeConfig.ProvFeed[]	pf;

+	private NodeConfig.ProvFeedUser[]	pfu;

+	private NodeConfig.ProvFeedSubnet[]	pfsn;

+	private NodeConfig.ProvSubscription[]	ps;

+	private NodeConfig.ProvForceIngress[]	pfi;

+	private NodeConfig.ProvForceEgress[]	pfe;

+	private NodeConfig.ProvHop[]	ph;

+	private static String[] gvasa(JSONArray a, int index) {

+		return(gvasa(a.get(index)));

+	}

+	private static String[] gvasa(JSONObject o, String key) {

+		return(gvasa(o.opt(key)));

+	}

+	private static String[] gvasa(Object o) {

+		if (o instanceof JSONArray) {

+			JSONArray a = (JSONArray)o;

+			Vector<String> v = new Vector<String>();

+			for (int i = 0; i < a.length(); i++) {

+				String s = gvas(a, i);

+				if (s != null) {

+					v.add(s);

+				}

+			}

+			return(v.toArray(new String[v.size()]));

+		} else {

+			String s = gvas(o);

+			if (s == null) {

+				return(new String[0]);

+			} else {

+				return(new String[] { s });

+			}

+		}

+	}

+	private static String gvas(JSONArray a, int index) {

+		return(gvas(a.get(index)));

+	}

+	private static String gvas(JSONObject o, String key) {

+		return(gvas(o.opt(key)));

+	}

+	private static String gvas(Object o) {

+		if (o instanceof Boolean || o instanceof Number || o instanceof String) {

+			return(o.toString());

+		}

+		return(null);

+	}

+	/**

+	 *	Construct raw provisioing data entries from the text (JSON)

+	 *	provisioning document received from the provisioning server

+	 *	@param r	The reader for the JSON text.

+	 */

+	public ProvData(Reader r) throws IOException {

+		Vector<NodeConfig.ProvNode> pnv = new Vector<NodeConfig.ProvNode>();

+		Vector<NodeConfig.ProvParam> ppv = new Vector<NodeConfig.ProvParam>();

+		Vector<NodeConfig.ProvFeed> pfv = new Vector<NodeConfig.ProvFeed>();

+		Vector<NodeConfig.ProvFeedUser> pfuv = new Vector<NodeConfig.ProvFeedUser>();

+		Vector<NodeConfig.ProvFeedSubnet> pfsnv = new Vector<NodeConfig.ProvFeedSubnet>();

+		Vector<NodeConfig.ProvSubscription> psv = new Vector<NodeConfig.ProvSubscription>();

+		Vector<NodeConfig.ProvForceIngress> pfiv = new Vector<NodeConfig.ProvForceIngress>();

+		Vector<NodeConfig.ProvForceEgress> pfev = new Vector<NodeConfig.ProvForceEgress>();

+		Vector<NodeConfig.ProvHop> phv = new Vector<NodeConfig.ProvHop>();

+		try {

+			JSONTokener jtx = new JSONTokener(r);

+			JSONObject jcfg = new JSONObject(jtx);

+			char c = jtx.nextClean();

+			if (c != '\0') {

+				throw new JSONException("Spurious characters following configuration");

+			}

+			r.close();

+			JSONArray jfeeds = jcfg.optJSONArray("feeds");

+			if (jfeeds != null) {

+				for (int fx = 0; fx < jfeeds.length(); fx++) {

+					JSONObject jfeed = jfeeds.getJSONObject(fx);

+					String stat = null;

+					if (jfeed.optBoolean("suspend", false)) {

+						stat = "Feed is suspended";

+					}

+					if (jfeed.optBoolean("deleted", false)) {

+						stat = "Feed is deleted";

+					}

+					String fid = gvas(jfeed, "feedid");

+					String fname = gvas(jfeed, "name");

+					String fver = gvas(jfeed, "version");

+					pfv.add(new NodeConfig.ProvFeed(fid, fname + "//" + fver, stat));

+					JSONObject jauth = jfeed.optJSONObject("authorization");

+					if (jauth == null) {

+						continue;

+					}

+					JSONArray jeids = jauth.optJSONArray("endpoint_ids");

+					if (jeids != null) {

+						for (int ux = 0; ux < jeids.length(); ux++) {

+							JSONObject ju = jeids.getJSONObject(ux);

+							String login = gvas(ju, "id");

+							String password = gvas(ju, "password");

+							pfuv.add(new NodeConfig.ProvFeedUser(fid, login, NodeUtils.getAuthHdr(login, password)));

+						}

+					}

+					JSONArray jeips = jauth.optJSONArray("endpoint_addrs");

+					if (jeips != null) {

+						for (int ix = 0; ix < jeips.length(); ix++) {

+							String sn = gvas(jeips, ix);

+							pfsnv.add(new NodeConfig.ProvFeedSubnet(fid, sn));

+						}

+					}

+				}

+			}

+			JSONArray jsubs = jcfg.optJSONArray("subscriptions");

+			if (jsubs != null) {

+				for (int sx = 0; sx < jsubs.length(); sx++) {

+					JSONObject jsub = jsubs.getJSONObject(sx);

+					if (jsub.optBoolean("suspend", false)) {

+						continue;

+					}

+					String sid = gvas(jsub, "subid");

+					String fid = gvas(jsub, "feedid");

+					JSONObject jdel = jsub.getJSONObject("delivery");

+					String delurl = gvas(jdel, "url");

+					String id = gvas(jdel, "user");

+					String password = gvas(jdel, "password");

+					boolean monly = jsub.getBoolean("metadataOnly");

+					boolean use100 = jdel.getBoolean("use100");

+					psv.add(new NodeConfig.ProvSubscription(sid, fid, delurl, id, NodeUtils.getAuthHdr(id, password), monly, use100));

+				}

+			}

+			JSONObject jparams = jcfg.optJSONObject("parameters");

+			if (jparams != null) {

+				for (String pname: JSONObject.getNames(jparams)) {

+					String pvalue = gvas(jparams, pname);

+					if (pvalue != null) {

+						ppv.add(new NodeConfig.ProvParam(pname, pvalue));

+					}

+				}

+				String sfx = gvas(jparams, "PROV_DOMAIN");

+				JSONArray jnodes = jparams.optJSONArray("NODES");

+				if (jnodes != null) {

+					for (int nx = 0; nx < jnodes.length(); nx++) {

+						String nn = gvas(jnodes, nx);

+						if (nn.indexOf('.') == -1) {

+							nn = nn + "." + sfx;

+						}

+						pnv.add(new NodeConfig.ProvNode(nn));

+					}

+				}

+			}

+			JSONArray jingresses = jcfg.optJSONArray("ingress");

+			if (jingresses != null) {

+				for (int fx = 0; fx < jingresses.length(); fx++) {

+					JSONObject jingress = jingresses.getJSONObject(fx);

+					String fid = gvas(jingress, "feedid");

+					String subnet = gvas(jingress, "subnet");

+					String user = gvas(jingress, "user");

+					String[] nodes = gvasa(jingress, "node");

+					if (fid == null || "".equals(fid)) {

+						continue;

+					}

+					if ("".equals(subnet)) {

+						subnet = null;

+					}

+					if ("".equals(user)) {

+						user = null;

+					}

+					pfiv.add(new NodeConfig.ProvForceIngress(fid, subnet, user, nodes));

+				}

+			}

+			JSONObject jegresses = jcfg.optJSONObject("egress");

+			if (jegresses != null && JSONObject.getNames(jegresses) != null) {

+				for (String esid: JSONObject.getNames(jegresses)) {

+					String enode = gvas(jegresses, esid);

+					if (esid != null && enode != null && !"".equals(esid) && !"".equals(enode)) {

+						pfev.add(new NodeConfig.ProvForceEgress(esid, enode));

+					}

+				}

+			}

+			JSONArray jhops = jcfg.optJSONArray("routing");

+			if (jhops != null) {

+				for (int fx = 0; fx < jhops.length(); fx++) {

+					JSONObject jhop = jhops.getJSONObject(fx);

+					String from = gvas(jhop, "from");

+					String to = gvas(jhop, "to");

+					String via = gvas(jhop, "via");

+					if (from == null || to == null || via == null || "".equals(from) || "".equals(to) || "".equals(via)) {

+						continue;

+					}

+					phv.add(new NodeConfig.ProvHop(from, to, via));

+				}

+			}

+		} catch (JSONException jse) {

+			NodeUtils.setIpAndFqdnForEelf("ProvData");

+			eelflogger.error(EelfMsgs.MESSAGE_PARSING_ERROR, jse.toString());

+			logger.error("NODE0201 Error parsing configuration data from provisioning server " + jse.toString(), jse);

+			throw new IOException(jse.toString(), jse);

+		}

+		pn = pnv.toArray(new NodeConfig.ProvNode[pnv.size()]);

+		pp = ppv.toArray(new NodeConfig.ProvParam[ppv.size()]);

+		pf = pfv.toArray(new NodeConfig.ProvFeed[pfv.size()]);

+		pfu = pfuv.toArray(new NodeConfig.ProvFeedUser[pfuv.size()]);

+		pfsn = pfsnv.toArray(new NodeConfig.ProvFeedSubnet[pfsnv.size()]);

+		ps = psv.toArray(new NodeConfig.ProvSubscription[psv.size()]);

+		pfi = pfiv.toArray(new NodeConfig.ProvForceIngress[pfiv.size()]);

+		pfe = pfev.toArray(new NodeConfig.ProvForceEgress[pfev.size()]);

+		ph = phv.toArray(new NodeConfig.ProvHop[phv.size()]);

+	}

+	/**

+	 *	Get the raw node configuration entries

+	 */

+	public NodeConfig.ProvNode[] getNodes() {

+		return(pn);

+	}

+	/**

+	 *	Get the raw parameter configuration entries

+	 */

+	public NodeConfig.ProvParam[] getParams() {

+		return(pp);

+	}

+	/**

+	 *	Ge the raw feed configuration entries

+	 */

+	public NodeConfig.ProvFeed[] getFeeds() {

+		return(pf);

+	}

+	/**

+	 *	Get the raw feed user configuration entries

+	 */

+	public NodeConfig.ProvFeedUser[] getFeedUsers() {

+		return(pfu);

+	}

+	/**

+	 *	Get the raw feed subnet configuration entries

+	 */

+	public NodeConfig.ProvFeedSubnet[] getFeedSubnets() {

+		return(pfsn);

+	}

+	/**

+	 *	Get the raw subscription entries

+	 */

+	public NodeConfig.ProvSubscription[] getSubscriptions() {

+		return(ps);

+	}

+	/**

+	 *	Get the raw forced ingress entries

+	 */

+	public NodeConfig.ProvForceIngress[] getForceIngress() {

+		return(pfi);

+	}

+	/**

+	 *	Get the raw forced egress entries

+	 */

+	public NodeConfig.ProvForceEgress[] getForceEgress() {

+		return(pfe);

+	}

+	/**

+	 *	Get the raw next hop entries

+	 */

+	public NodeConfig.ProvHop[] getHops() {

+		return(ph);

+	}

+}

diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/PublishId.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/PublishId.java
new file mode 100644
index 0000000..436adba
--- /dev/null
+++ b/datarouter-node/src/main/java/com/att/research/datarouter/node/PublishId.java
@@ -0,0 +1,52 @@
+/*******************************************************************************

+ * ============LICENSE_START==================================================

+ * * org.onap.dmaap

+ * * ===========================================================================

+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.

+ * * ===========================================================================

+ * * Licensed under the Apache License, Version 2.0 (the "License");

+ * * you may not use this file except in compliance with the License.

+ * * You may obtain a copy of the License at

+ * * 

+ *  *      http://www.apache.org/licenses/LICENSE-2.0

+ * * 

+ *  * Unless required by applicable law or agreed to in writing, software

+ * * distributed under the License is distributed on an "AS IS" BASIS,

+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * * See the License for the specific language governing permissions and

+ * * limitations under the License.

+ * * ============LICENSE_END====================================================

+ * *

+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.

+ * *

+ ******************************************************************************/

+

+

+package com.att.research.datarouter.node;

+

+/**

+ *	Generate publish IDs

+ */

+public class PublishId	{

+	private long	nextuid;

+	private String	myname;

+

+	/**

+	 *	Generate publish IDs for the specified name

+	 *	@param myname	Unique identifier for this publish ID generator (usually fqdn of server)

+	 */

+	public PublishId(String myname) {

+		this.myname = myname;

+	}

+	/**

+	 *	Generate a Data Router Publish ID that uniquely identifies the particular invocation of the Publish API for log correlation purposes.

+	 */

+	public synchronized String next() {

+		long now = System.currentTimeMillis();

+		if (now < nextuid) {

+			now = nextuid;

+		}

+		nextuid = now + 1;

+		return(now + "." + myname);

+	}

+}

diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/RateLimitedOperation.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/RateLimitedOperation.java
new file mode 100644
index 0000000..5bcbed8
--- /dev/null
+++ b/datarouter-node/src/main/java/com/att/research/datarouter/node/RateLimitedOperation.java
@@ -0,0 +1,102 @@
+/*******************************************************************************

+ * ============LICENSE_START==================================================

+ * * org.onap.dmaap

+ * * ===========================================================================

+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.

+ * * ===========================================================================

+ * * Licensed under the Apache License, Version 2.0 (the "License");

+ * * you may not use this file except in compliance with the License.

+ * * You may obtain a copy of the License at

+ * * 

+ *  *      http://www.apache.org/licenses/LICENSE-2.0

+ * * 

+ *  * Unless required by applicable law or agreed to in writing, software

+ * * distributed under the License is distributed on an "AS IS" BASIS,

+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * * See the License for the specific language governing permissions and

+ * * limitations under the License.

+ * * ============LICENSE_END====================================================

+ * *

+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.

+ * *

+ ******************************************************************************/

+

+

+package com.att.research.datarouter.node;

+

+import java.util.*;

+

+/**

+ *	Execute an operation no more frequently than a specified interval

+ */

+

+public abstract class RateLimitedOperation implements Runnable	{

+	private boolean	marked;	// a timer task exists

+	private boolean	executing;	// the operation is currently in progress

+	private boolean remark;	// a request was made while the operation was in progress

+	private Timer	timer;

+	private long	last;	// when the last operation started

+	private long	mininterval;

+	/**

+	 *	Create a rate limited operation

+	 *	@param mininterval	The minimum number of milliseconds after the last execution starts before a new execution can begin

+	 *	@param timer	The timer used to perform deferred executions

+	 */

+	public RateLimitedOperation(long mininterval, Timer timer) {

+		this.timer = timer;

+		this.mininterval = mininterval;

+	}

+	private class deferred extends TimerTask	{

+		public void run() {

+			execute();

+		}

+	}

+	private synchronized void unmark() {

+		marked = false;

+	}

+	private void execute() {

+		unmark();

+		request();

+	}

+	/**

+	 *	Request that the operation be performed by this thread or at a later time by the timer

+	 */

+	public void request() {

+		if (premark()) {

+			return;

+		}

+		do {

+			run();

+		} while (demark());

+	}

+	private synchronized boolean premark() {

+		if (executing) {

+			// currently executing - wait until it finishes

+			remark = true;

+			return(true);

+		}

+		if (marked) {

+			// timer currently running - will run when it expires

+			return(true);

+		}

+		long now = System.currentTimeMillis();

+		if (last + mininterval > now) {

+			// too soon - schedule a timer

+			marked = true;

+			timer.schedule(new deferred(), last + mininterval - now);

+			return(true);

+		}

+		last = now;

+		executing = true;

+		// start execution

+		return(false);

+	}

+	private synchronized boolean demark() {

+		executing = false;

+		if (remark) {

+			remark = false;

+			return(!premark());

+		}

+		return(false);

+	}

+}

diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/RedirManager.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/RedirManager.java
new file mode 100644
index 0000000..09473c1
--- /dev/null
+++ b/datarouter-node/src/main/java/com/att/research/datarouter/node/RedirManager.java
@@ -0,0 +1,118 @@
+/*******************************************************************************

+ * ============LICENSE_START==================================================

+ * * org.onap.dmaap

+ * * ===========================================================================

+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.

+ * * ===========================================================================

+ * * Licensed under the Apache License, Version 2.0 (the "License");

+ * * you may not use this file except in compliance with the License.

+ * * You may obtain a copy of the License at

+ * * 

+ *  *      http://www.apache.org/licenses/LICENSE-2.0

+ * * 

+ *  * Unless required by applicable law or agreed to in writing, software

+ * * distributed under the License is distributed on an "AS IS" BASIS,

+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * * See the License for the specific language governing permissions and

+ * * limitations under the License.

+ * * ============LICENSE_END====================================================

+ * *

+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.

+ * *

+ ******************************************************************************/

+

+

+package com.att.research.datarouter.node;

+

+import java.util.*;

+import java.io.*;

+

+/**

+ *	Track redirections of subscriptions

+ */

+public class RedirManager	{

+	private Hashtable<String, String> sid2primary = new Hashtable<String, String>();

+	private Hashtable<String, String> sid2secondary = new Hashtable<String, String>();

+	private String	redirfile;

+	RateLimitedOperation	op;

+	/**

+	 *	Create a mechanism for maintaining subscription redirections.

+	 *	@param redirfile	The file to store the redirection information.

+	 *	@param mininterval	The minimum number of milliseconds between writes to the redirection information file.

+	 *	@param timer	The timer thread used to run delayed file writes.

+	 */

+	public RedirManager(String redirfile, long mininterval, Timer timer) {

+		this.redirfile = redirfile;

+		op = new RateLimitedOperation(mininterval, timer) {

+			public void run() {

+				try {

+					StringBuffer sb = new StringBuffer();

+					for (String s: sid2primary.keySet()) {

+						sb.append(s).append(' ').append(sid2primary.get(s)).append(' ').append(sid2secondary.get(s)).append('\n');

+					}

+					OutputStream os = new FileOutputStream(RedirManager.this.redirfile);

+					os.write(sb.toString().getBytes());

+					os.close();

+				} catch (Exception e) {

+				}

+			}

+		};

+		try {

+			String s;

+			BufferedReader br = new BufferedReader(new FileReader(redirfile));

+			while ((s = br.readLine()) != null) {

+				s = s.trim();

+				String[] sx = s.split(" ");

+				if (s.startsWith("#") || sx.length != 3) {

+					continue;

+				}

+				sid2primary.put(sx[0], sx[1]);

+				sid2secondary.put(sx[0], sx[2]);

+			}

+			br.close();

+		} catch (Exception e) {

+			// missing file is normal

+		}

+	}

+	/**

+	 *	Set up redirection.  If a request is to be sent to subscription ID sid, and that is configured to go to URL primary, instead, go to secondary.

+	 *	@param sid	The subscription ID to be redirected

+	 *	@param primary	The URL associated with that subscription ID

+	 *	@param secondary	The replacement URL to use instead

+	 */

+	public synchronized void redirect(String sid, String primary, String secondary) {

+		sid2primary.put(sid, primary);

+		sid2secondary.put(sid, secondary);

+		op.request();

+	}

+	/**

+	 *	Cancel redirection.  If a request is to be sent to subscription ID sid, send it to its primary URL.

+	 *	@param	sid	The subscription ID to remove from the table.

+	 */

+	public synchronized void forget(String sid) {

+		sid2primary.remove(sid);

+		sid2secondary.remove(sid);

+		op.request();

+	}

+	/**

+	 *	Look up where to send a subscription.  If the primary has changed or there is no redirection, use the primary.  Otherwise, redirect to the secondary URL.

+	 *	@param	sid	The subscription ID to look up.

+	 *	@param	primary	The configured primary URL.

+	 *	@return	The destination URL to really use.

+	 */

+	public synchronized String lookup(String sid, String primary) {

+		String oprim = sid2primary.get(sid);

+		if (primary.equals(oprim)) {

+			return(sid2secondary.get(sid));

+		} else if (oprim != null) {

+			forget(sid);

+		}	

+		return(primary);

+	}

+	/**

+	 *	Is a subscription redirected?

+	 */

+	public synchronized boolean isRedirected(String sid) {

+		return(sid != null && sid2secondary.get(sid) != null);

+	}

+}

diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/StatusLog.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/StatusLog.java
new file mode 100644
index 0000000..66aa4ad
--- /dev/null
+++ b/datarouter-node/src/main/java/com/att/research/datarouter/node/StatusLog.java
@@ -0,0 +1,229 @@
+/*******************************************************************************

+ * ============LICENSE_START==================================================

+ * * org.onap.dmaap

+ * * ===========================================================================

+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.

+ * * ===========================================================================

+ * * Licensed under the Apache License, Version 2.0 (the "License");

+ * * you may not use this file except in compliance with the License.

+ * * You may obtain a copy of the License at

+ * * 

+ *  *      http://www.apache.org/licenses/LICENSE-2.0

+ * * 

+ *  * Unless required by applicable law or agreed to in writing, software

+ * * distributed under the License is distributed on an "AS IS" BASIS,

+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * * See the License for the specific language governing permissions and

+ * * limitations under the License.

+ * * ============LICENSE_END====================================================

+ * *

+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.

+ * *

+ ******************************************************************************/

+

+package com.att.research.datarouter.node;

+

+import java.util.regex.*;

+import java.util.*;

+import java.io.*;

+import java.nio.file.*;

+import java.text.*;

+

+/**

+ *	Logging for data router delivery events (PUB/DEL/EXP)

+ */

+public class StatusLog	{

+	private static StatusLog instance = new StatusLog();

+	private HashSet<String> toship = new HashSet<String>();

+	private SimpleDateFormat	filedate;

+	private String	prefix = "logs/events";

+	private	String	suffix = ".log";

+	private String	plainfile;

+	private String	curfile;

+	private long	nexttime;

+	private OutputStream	os;

+	private long	intvl;

+	private NodeConfigManager	config = NodeConfigManager.getInstance();

+	{

+		try { filedate = new SimpleDateFormat("-yyyyMMddHHmm"); } catch (Exception e) {}

+	}

+	/**

+	 *	Parse an interval of the form xxhyymzzs and round it to the nearest whole fraction of 24 hours.  If no units are specified, assume seconds.

+	 */

+	public static long parseInterval(String interval, int def) {

+		try {

+			Matcher m = Pattern.compile("(?:(\\d+)[Hh])?(?:(\\d+)[Mm])?(?:(\\d+)[Ss]?)?").matcher(interval);

+			if (m.matches()) {

+				int dur = 0;

+				String x = m.group(1);

+				if (x != null) {

+					dur += 3600 * Integer.parseInt(x);

+				}

+				x = m.group(2);

+				if (x != null) {

+					dur += 60 * Integer.parseInt(x);

+				}

+				x = m.group(3);

+				if (x != null) {

+					dur += Integer.parseInt(x);

+				}

+				if (dur < 60) {

+					dur = 60;

+				}

+				int best = 86400;

+				int dist = best - dur;

+				if (dur > best) {

+					dist = dur - best;

+				}

+				int base = 1;

+				for (int i = 0; i < 8; i++) {

+					int base2 = base;

+					base *= 2;

+					for (int j = 0; j < 4; j++) {

+						int base3 = base2;

+						base2 *= 3;

+						for (int k = 0; k < 3; k++) {

+							int cur = base3;

+							base3 *= 5;

+							int ndist = cur - dur;

+							if (dur > cur) {

+								ndist = dur - cur;

+							}

+							if (ndist < dist) {

+								best = cur;

+								dist = ndist;

+							}

+						}

+					}

+				}

+				def = best * 1000;

+			}

+		} catch (Exception e) {

+		}

+		return(def);

+	}

+	private synchronized void checkRoll(long now) throws IOException {

+		if (now >= nexttime) {

+			if (os != null) {

+				os.close();

+				os = null;

+			}

+			intvl = parseInterval(config.getEventLogInterval(), 300000);

+			prefix = config.getEventLogPrefix();

+			suffix = config.getEventLogSuffix();

+			nexttime = now - now % intvl + intvl;

+			curfile = prefix + filedate.format(new Date(nexttime - intvl)) + suffix;

+			plainfile = prefix + suffix;

+			notify();

+		}

+	}

+	/**

+	 *	Get the name of the current log file

+	 *	@return	The full path name of the current event log file

+	 */

+	public static synchronized String getCurLogFile() {

+		try {

+			instance.checkRoll(System.currentTimeMillis());

+		} catch (Exception e) {

+		}

+		return(instance.curfile);

+	}

+	private synchronized void log(String s) {

+		try {

+			long now = System.currentTimeMillis();

+			checkRoll(now);

+			if (os == null) {

+				os = new FileOutputStream(curfile, true);

+				(new File(plainfile)).delete();

+				Files.createLink(Paths.get(plainfile), Paths.get(curfile));

+			}

+			os.write((NodeUtils.logts(new Date(now)) + '|' + s + '\n').getBytes());

+			os.flush();

+		} catch (IOException ioe) {

+		}

+	}

+	/**

+	 *	Log a received publication attempt.

+	 *	@param pubid	The publish ID assigned by the node

+	 *	@param feedid	The feed id given by the publisher

+	 *	@param requrl	The URL of the received request

+	 *	@param method	The method (DELETE or PUT) in the received request

+	 *	@param ctype	The content type (if method is PUT and clen > 0)

+	 *	@param clen	The content length (if method is PUT)

+	 *	@param srcip	The IP address of the publisher

+	 *	@param user	The identity of the publisher

+	 *	@param status	The status returned to the publisher

+	 */

+	public static void logPub(String pubid, String feedid, String requrl, String method, String ctype, long clen, String srcip, String user, int status) {

+		instance.log("PUB|" + pubid + "|" + feedid + "|" + requrl + "|" + method + "|" + ctype + "|" + clen + "|" + srcip + "|" + user + "|" + status);

+	}

+	/**

+	 *	Log a data transfer error receiving a publication attempt

+	 *	@param pubid	The publish ID assigned by the node

+	 *	@param feedid	The feed id given by the publisher

+	 *	@param requrl	The URL of the received request

+	 *	@param method	The method (DELETE or PUT) in the received request

+	 *	@param ctype	The content type (if method is PUT and clen > 0)

+	 *	@param clen	The expected content length (if method is PUT)

+	 *	@param rcvd	The content length received

+	 *	@param srcip	The IP address of the publisher

+	 *	@param user	The identity of the publisher

+	 *	@param error	The error message from the IO exception

+	 */

+	public static void logPubFail(String pubid, String feedid, String requrl, String method, String ctype, long clen, long rcvd, String srcip, String user, String error) {

+		instance.log("PBF|" + pubid + "|" + feedid + "|" + requrl + "|" + method + "|" + ctype + "|" + clen + "|" + rcvd + "|" + srcip + "|" + user + "|" + error);

+	}

+	/**

+	 *	Log a delivery attempt.

+	 *	@param pubid	The publish ID assigned by the node

+	 *	@param feedid	The feed ID

+	 *	@param subid	The (space delimited list of) subscription ID

+	 *	@param requrl	The URL used in the attempt

+	 *	@param method	The method (DELETE or PUT) in the attempt

+	 *	@param ctype	The content type (if method is PUT, not metaonly, and clen > 0)

+	 *	@param clen	The content length (if PUT and not metaonly)

+	 *	@param user	The identity given to the subscriber

+	 *	@param status	The status returned by the subscriber or -1 if an exeception occured trying to connect

+	 *	@param xpubid	The publish ID returned by the subscriber

+	 */

+	public static void logDel(String pubid, String feedid, String subid, String requrl, String method, String ctype, long clen, String user, int status, String xpubid) {

+		if (feedid == null) {

+			return;

+		}

+		instance.log("DEL|" + pubid + "|" + feedid + "|" + subid + "|" + requrl + "|" + method + "|" + ctype + "|" + clen + "|" + user + "|" + status + "|" + xpubid);

+	}

+	/**

+	 *	Log delivery attempts expired

+	 *	@param pubid	The publish ID assigned by the node

+	 *	@param feedid	The feed ID

+	 *	@param subid	The (space delimited list of) subscription ID

+	 *	@param requrl	The URL that would be delivered to

+	 *	@param method	The method (DELETE or PUT) in the request

+	 *	@param ctype	The content type (if method is PUT, not metaonly, and clen > 0)

+	 *	@param clen	The content length (if PUT and not metaonly)

+	 *	@param reason	The reason the attempts were discontinued

+	 *	@param attempts	The number of attempts made

+	 */

+	public static void logExp(String pubid, String feedid, String subid, String requrl, String method, String ctype, long clen, String reason, int attempts) {

+		if (feedid == null) {

+			return;

+		}

+		instance.log("EXP|" + pubid + "|" + feedid + "|" + subid + "|" + requrl + "|" + method + "|" + ctype + "|" + clen + "|" + reason + "|" + attempts);

+	}

+	/**

+	 *	Log extra statistics about unsuccessful delivery attempts.

+	 *	@param pubid	The publish ID assigned by the node

+	 *	@param feedid	The feed ID

+	 *	@param subid	The (space delimited list of) subscription ID

+	 *	@param clen	The content length

+	 *	@param sent	The # of bytes sent or -1 if subscriber returned an error instead of 100 Continue, otherwise, the number of bytes sent before an error occurred.

+	 */

+	public static void logDelExtra(String pubid, String feedid, String subid, long clen, long sent) {

+		if (feedid == null) {

+			return;

+		}

+		instance.log("DLX|" + pubid + "|" + feedid + "|" + subid + "|" + clen + "|" + sent);

+	}

+	private StatusLog() {

+	}

+}

diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/SubnetMatcher.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/SubnetMatcher.java
new file mode 100644
index 0000000..c1cfeaa
--- /dev/null
+++ b/datarouter-node/src/main/java/com/att/research/datarouter/node/SubnetMatcher.java
@@ -0,0 +1,71 @@
+/*******************************************************************************

+ * ============LICENSE_START==================================================

+ * * org.onap.dmaap

+ * * ===========================================================================

+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.

+ * * ===========================================================================

+ * * Licensed under the Apache License, Version 2.0 (the "License");

+ * * you may not use this file except in compliance with the License.

+ * * You may obtain a copy of the License at

+ * * 

+ *  *      http://www.apache.org/licenses/LICENSE-2.0

+ * * 

+ *  * Unless required by applicable law or agreed to in writing, software

+ * * distributed under the License is distributed on an "AS IS" BASIS,

+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * * See the License for the specific language governing permissions and

+ * * limitations under the License.

+ * * ============LICENSE_END====================================================

+ * *

+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.

+ * *

+ ******************************************************************************/

+

+

+package com.att.research.datarouter.node;

+

+import java.net.*;

+

+/**

+ *	Compare IP addresses as byte arrays to a subnet specified as a CIDR

+ */

+public class SubnetMatcher	{

+	private byte[]	sn;

+	private int	len;

+	private int	mask;

+	/**

+	 *	Construct a subnet matcher given a CIDR

+	 *	@param subnet	The CIDR to match

+	 */

+	public SubnetMatcher(String subnet) {

+		int i = subnet.lastIndexOf('/');

+		if (i == -1) {

+			sn = NodeUtils.getInetAddress(subnet);

+			len = sn.length;

+		} else {

+			len = Integer.parseInt(subnet.substring(i + 1));

+			sn = NodeUtils.getInetAddress(subnet.substring(0, i));

+			mask = ((0xff00) >> (len % 8)) & 0xff;

+			len /= 8;

+		}

+	}

+	/**

+	 *	Is the IP address in the CIDR?

+	 *	@param addr the IP address as bytes in network byte order

+	 *	@return true if the IP address matches.

+	 */

+	public boolean matches(byte[] addr) {

+		if (addr.length != sn.length) {

+			return(false);

+		}

+		for (int i = 0; i < len; i++) {

+			if (addr[i] != sn[i]) {

+				return(false);

+			}

+		}

+		if (mask != 0 && ((addr[len] ^ sn[len]) & mask) != 0) {

+			return(false);

+		}

+		return(true);

+	}

+}

diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/Target.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/Target.java
new file mode 100644
index 0000000..fe595d5
--- /dev/null
+++ b/datarouter-node/src/main/java/com/att/research/datarouter/node/Target.java
@@ -0,0 +1,60 @@
+/*******************************************************************************

+ * ============LICENSE_START==================================================

+ * * org.onap.dmaap

+ * * ===========================================================================

+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.

+ * * ===========================================================================

+ * * Licensed under the Apache License, Version 2.0 (the "License");

+ * * you may not use this file except in compliance with the License.

+ * * You may obtain a copy of the License at

+ * * 

+ *  *      http://www.apache.org/licenses/LICENSE-2.0

+ * * 

+ *  * Unless required by applicable law or agreed to in writing, software

+ * * distributed under the License is distributed on an "AS IS" BASIS,

+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * * See the License for the specific language governing permissions and

+ * * limitations under the License.

+ * * ============LICENSE_END====================================================

+ * *

+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.

+ * *

+ ******************************************************************************/

+

+

+package com.att.research.datarouter.node;

+

+/**

+ *	A destination to deliver a message

+ */

+public class Target	{

+	private DestInfo	destinfo;

+	private String	routing;

+	/**

+	 *	A destination to deliver a message

+	 *	@param destinfo	Either info for a subscription ID or info for a node-to-node transfer

+	 *	@param routing	For a node-to-node transfer, what to do when it gets there.

+	 */

+	public Target(DestInfo destinfo, String routing) {

+		this.destinfo = destinfo;

+		this.routing = routing;

+	}

+	/**

+	 *	Add additional routing

+	 */

+	public void addRouting(String routing) {

+		this.routing = this.routing + " " + routing;

+	}

+	/**

+	 *	Get the destination information for this target

+	 */

+	public DestInfo getDestInfo() {

+		return(destinfo);

+	}

+	/**

+	 *	Get the next hop information for this target

+	 */

+	public String getRouting() {

+		return(routing);

+	}

+}

diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/TaskList.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/TaskList.java
new file mode 100644
index 0000000..401c72a
--- /dev/null
+++ b/datarouter-node/src/main/java/com/att/research/datarouter/node/TaskList.java
@@ -0,0 +1,113 @@
+/*******************************************************************************

+ * ============LICENSE_START==================================================

+ * * org.onap.dmaap

+ * * ===========================================================================

+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.

+ * * ===========================================================================

+ * * Licensed under the Apache License, Version 2.0 (the "License");

+ * * you may not use this file except in compliance with the License.

+ * * You may obtain a copy of the License at

+ * * 

+ *  *      http://www.apache.org/licenses/LICENSE-2.0

+ * * 

+ *  * Unless required by applicable law or agreed to in writing, software

+ * * distributed under the License is distributed on an "AS IS" BASIS,

+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * * See the License for the specific language governing permissions and

+ * * limitations under the License.

+ * * ============LICENSE_END====================================================

+ * *

+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.

+ * *

+ ******************************************************************************/

+

+

+package com.att.research.datarouter.node;

+

+import java.util.*;

+

+/**

+ *	Manage a list of tasks to be executed when an event occurs.

+ *	This makes the following guarantees:

+ *	<ul>

+ *	<li>Tasks can be safely added and removed in the middle of a run.</li>

+ *	<li>No task will be returned more than once during a run.</li>

+ *	<li>No task will be returned when it is not, at that moment, in the list of tasks.</li>

+ *	<li>At the moment when next() returns null, all tasks on the list have been returned during the run.</li>

+ *	<li>Initially and once next() returns null during a run, next() will continue to return null until startRun() is called.

+ *	</ul>

+ */

+public class TaskList	{

+	private Iterator<Runnable>	runlist;

+	private HashSet<Runnable>	tasks = new HashSet<Runnable>();

+	private HashSet<Runnable>	togo;

+	private HashSet<Runnable>	sofar;

+	private HashSet<Runnable>	added;

+	private HashSet<Runnable>	removed;

+	/**

+	 *	Construct a new TaskList

+	 */

+	public TaskList() {

+	}

+	/**

+	 *	Start executing the sequence of tasks.

+	 */

+	public synchronized void	startRun() {

+		sofar = new HashSet<Runnable>();

+		added = new HashSet<Runnable>();

+		removed = new HashSet<Runnable>();

+		togo = new HashSet<Runnable>(tasks);

+		runlist = togo.iterator();

+	}

+	/**

+	 *	Get the next task to execute

+	 */

+	public synchronized Runnable	next() {

+		while (runlist != null) {

+			if (runlist.hasNext()) {

+				Runnable task = runlist.next();

+				if (removed.contains(task)) {

+					continue;

+				}

+				if (sofar.contains(task)) {

+					continue;

+				}

+				sofar.add(task);

+				return(task);

+			}

+			if (added.size() != 0) {

+				togo = added;

+				added = new HashSet<Runnable>();

+				removed.clear();

+				runlist = togo.iterator();

+				continue;

+			}

+			togo = null;

+			added = null;

+			removed = null;

+			sofar = null;

+			runlist = null;

+		}

+		return(null);

+	}

+	/**

+	 *	Add a task to the list of tasks to run whenever the event occurs.

+	 */

+	public synchronized void addTask(Runnable task) {

+		if (runlist != null) {

+			added.add(task);

+			removed.remove(task);

+		}

+		tasks.add(task);

+	}

+	/**

+	 *	Remove a task from the list of tasks to run whenever the event occurs.

+	 */

+	public synchronized void removeTask(Runnable task) {

+		if (runlist != null) {

+			removed.add(task);

+			added.remove(task);

+		}

+		tasks.remove(task);

+	}

+}

diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/eelf/EELFFilter.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/eelf/EELFFilter.java
new file mode 100644
index 0000000..9b00658
--- /dev/null
+++ b/datarouter-node/src/main/java/com/att/research/datarouter/node/eelf/EELFFilter.java
@@ -0,0 +1,43 @@
+/*******************************************************************************

+ * ============LICENSE_START==================================================

+ * * org.onap.dmaap

+ * * ===========================================================================

+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.

+ * * ===========================================================================

+ * * Licensed under the Apache License, Version 2.0 (the "License");

+ * * you may not use this file except in compliance with the License.

+ * * You may obtain a copy of the License at

+ * * 

+ *  *      http://www.apache.org/licenses/LICENSE-2.0

+ * * 

+ *  * Unless required by applicable law or agreed to in writing, software

+ * * distributed under the License is distributed on an "AS IS" BASIS,

+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * * See the License for the specific language governing permissions and

+ * * limitations under the License.

+ * * ============LICENSE_END====================================================

+ * *

+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.

+ * *

+ ******************************************************************************/

+package com.att.research.datarouter.node.eelf;

+

+import ch.qos.logback.classic.spi.ILoggingEvent;

+import ch.qos.logback.core.filter.Filter;

+import ch.qos.logback.core.spi.FilterReply;

+

+/*

+ * When EELF functionality added it default started logging Jetty logs as well which in turn stopped existing functionality of logging jetty statements in node.log

+ * added code in logback.xml to add jetty statements in node.log.

+ * This class removes extran EELF statements from node.log since they are being logged in apicalls.log 

+ */

+public class EELFFilter extends Filter<ILoggingEvent>{

+	  @Override

+	  public FilterReply decide(ILoggingEvent event) {    

+	    if (event.getMessage().contains("EELF")) {

+	      return FilterReply.DENY;

+	    } else {

+	      return FilterReply.ACCEPT;

+	    }

+	  }

+}

diff --git a/datarouter-node/src/main/java/com/att/research/datarouter/node/eelf/EelfMsgs.java b/datarouter-node/src/main/java/com/att/research/datarouter/node/eelf/EelfMsgs.java
new file mode 100644
index 0000000..9963f41
--- /dev/null
+++ b/datarouter-node/src/main/java/com/att/research/datarouter/node/eelf/EelfMsgs.java
@@ -0,0 +1,96 @@
+/*******************************************************************************

+ * ============LICENSE_START==================================================

+ * * org.onap.dmaap

+ * * ===========================================================================

+ * * Copyright © 2017 AT&T Intellectual Property. All rights reserved.

+ * * ===========================================================================

+ * * Licensed under the Apache License, Version 2.0 (the "License");

+ * * you may not use this file except in compliance with the License.

+ * * You may obtain a copy of the License at

+ * * 

+ *  *      http://www.apache.org/licenses/LICENSE-2.0

+ * * 

+ *  * Unless required by applicable law or agreed to in writing, software

+ * * distributed under the License is distributed on an "AS IS" BASIS,

+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * * See the License for the specific language governing permissions and

+ * * limitations under the License.

+ * * ============LICENSE_END====================================================

+ * *

+ * * ECOMP is a trademark and service mark of AT&T Intellectual Property.

+ * *

+ ******************************************************************************/

+package com.att.research.datarouter.node.eelf;

+

+import com.att.eelf.i18n.EELFResolvableErrorEnum;

+import com.att.eelf.i18n.EELFResourceManager;

+

+public enum EelfMsgs implements EELFResolvableErrorEnum {

+	

+	/**

+     * Application message prints user (accepts one argument)

+     */

+	MESSAGE_WITH_BEHALF,

+

+	/**

+     * Application message prints user and FeedID (accepts two arguments)

+     */

+

+	MESSAGE_WITH_BEHALF_AND_FEEDID,

+	

+	/**

+     * Application message prints keystore file error in EELF errors log

+     */

+

+	MESSAGE_KEYSTORE_LOAD_ERROR,

+	

+	/**

+     * Application message prints Error extracting my name from my keystore file

+     */

+

+	MESSAGE_KEYSORE_NAME_ERROR,	

+	

+	/**

+     * Application message prints Error parsing configuration data from provisioning server.

+     */

+

+

+	MESSAGE_PARSING_ERROR,		

+	

+	/**

+     * Application message printsConfiguration failed

+     */

+

+

+	MESSAGE_CONF_FAILED,		

+	

+	/**

+     * Application message prints Bad provisioning server URL

+     */

+

+

+	MESSAGE_BAD_PROV_URL,		

+	

+	/**

+     * Application message prints Unable to fetch canonical name from keystore file

+     */

+

+

+	MESSAGE_KEYSTORE_FETCH_ERROR,

+	

+	/**

+     * Application message prints Unable to load local configuration file.

+     */

+

+

+	MESSAGE_PROPERTIES_LOAD_ERROR;

+

+    

+    /**

+     * Static initializer to ensure the resource bundles for this class are loaded...

+     * Here this application loads messages from three bundles

+     */

+    static {

+        EELFResourceManager.loadMessageBundle("EelfMessages");

+    }

+}

diff --git a/datarouter-node/src/main/resources/EelfMessages.properties b/datarouter-node/src/main/resources/EelfMessages.properties
new file mode 100644
index 0000000..8c17417
--- /dev/null
+++ b/datarouter-node/src/main/resources/EelfMessages.properties
@@ -0,0 +1,70 @@
+#-------------------------------------------------------------------------------

+# ============LICENSE_START==================================================

+# * org.onap.dmaap

+# * ===========================================================================

+# * Copyright © 2017 AT&T Intellectual Property. All rights reserved.

+# * ===========================================================================

+# * Licensed under the Apache License, Version 2.0 (the "License");

+# * you may not use this file except in compliance with the License.

+# * You may obtain a copy of the License at

+# * 

+#  *      http://www.apache.org/licenses/LICENSE-2.0

+# * 

+#  * Unless required by applicable law or agreed to in writing, software

+# * distributed under the License is distributed on an "AS IS" BASIS,

+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+# * See the License for the specific language governing permissions and

+# * limitations under the License.

+# * ============LICENSE_END====================================================

+# *

+# * ECOMP is a trademark and service mark of AT&T Intellectual Property.

+# *

+#-------------------------------------------------------------------------------

+########################################################################

+#Resource key=Error Code|Message text|Resolution text |Description text

+#######

+#Newlines can be utilized to add some clarity ensuring continuing line

+#has atleast one leading space

+#ResourceKey=\

+#             ERR0000E\

+#             Sample error msg txt\

+#             Sample resolution msg\

+#             Sample description txt

+#

+######

+#Error code classification category

+#100	Permission errors

+#200	Availability errors/Timeouts

+#300	Data errors

+#400	Schema Interface type/validation errors

+#500	Business process errors

+#900	Unknown errors

+#

+########################################################################

+

+# Messages for Data Router EELF framework

+

+#Prints FeedID in the EELF apicalls log

+MESSAGE_WITH__FEEDID=EELF0001I| FeedID  = {0}

+

+#Prints User in the EELF apicalls log

+MESSAGE_WITH_BEHALF=EELF0002I| User = {0}

+

+#Prints User and FeedID in the EELF apicalls log

+MESSAGE_WITH_BEHALF_AND_FEEDID=EELF0003I| User = {0} FeedID  = {1}

+

+#Prints keystore file error in EELF errors log

+MESSAGE_KEYSTORE_LOAD_ERROR=EELF0001E| Error loading my keystore file {0} {1}

+

+MESSAGE_KEYSORE_NAME_ERROR=EELF0002E| Error extracting my name from my keystore file. {0}

+

+MESSAGE_PARSING_ERROR=EELF0003E| Error parsing configuration data from provisioning server. {0}

+

+MESSAGE_CONF_FAILED=EELF0004E| Configuration failed. {0} - try again later.

+

+MESSAGE_BAD_PROV_URL=EELF0005E| Bad provisioning server URL {0}

+

+MESSAGE_KEYSTORE_FETCH_ERROR=EELF0006E| Unable to fetch canonical name from keystore file {0}

+

+MESSAGE_PROPERTIES_LOAD_ERROR=EELF0007E| Unable to load local configuration file - etc/node.properties

+

diff --git a/datarouter-node/src/main/resources/docker/Dockerfile b/datarouter-node/src/main/resources/docker/Dockerfile
new file mode 100644
index 0000000..fbf5456
--- /dev/null
+++ b/datarouter-node/src/main/resources/docker/Dockerfile
@@ -0,0 +1,7 @@
+FROM java:8 

+ADD opt /opt/

+ADD startup.sh /startup.sh

+RUN chmod 700 /startup.sh

+ENTRYPOINT ./startup.sh start

+EXPOSE 8443

+EXPOSE 8080
\ No newline at end of file
diff --git a/datarouter-node/src/main/resources/docker/startup.sh b/datarouter-node/src/main/resources/docker/startup.sh
new file mode 100644
index 0000000..8cb71dd
--- /dev/null
+++ b/datarouter-node/src/main/resources/docker/startup.sh
@@ -0,0 +1,18 @@
+LIB=/opt/app/datartr/lib
+ETC=/opt/app/datartr/etc
+echo "this is LIB" $LIB
+echo "this is ETC" $ETC
+mkdir -p /opt/app/datartr/logs
+mkdir -p /opt/app/datartr/spool
+mkdir -p /opt/app/datartr/spool/f
+mkdir -p /opt/app/datartr/spool/n
+mkdir -p /opt/app/datartr/spool/s
+CLASSPATH=$ETC
+for FILE in `find $LIB -name *.jar`; do
+  CLASSPATH=$CLASSPATH:$FILE
+done
+java -classpath $CLASSPATH  com.att.research.datarouter.node.NodeMain
+
+runner_file="$LIB/datarouter-node-jar-with-dependencies.jar"
+echo "Starting using" $runner_file
+java -Dcom.att.eelf.logging.file==/opt/app/datartr/etc/logback.xml -Dcom.att.eelf.logging.path=/ -Dcom.att.research.datarouter.node.ConfigFile==/opt/app/datartr/etc/node.properties -jar $runner_file
\ No newline at end of file
diff --git a/datarouter-node/src/main/resources/log4j.properties b/datarouter-node/src/main/resources/log4j.properties
new file mode 100644
index 0000000..5b2f019
--- /dev/null
+++ b/datarouter-node/src/main/resources/log4j.properties
@@ -0,0 +1,32 @@
+#-------------------------------------------------------------------------------

+# ============LICENSE_START==================================================

+# * org.onap.dmaap

+# * ===========================================================================

+# * Copyright © 2017 AT&T Intellectual Property. All rights reserved.

+# * ===========================================================================

+# * Licensed under the Apache License, Version 2.0 (the "License");

+# * you may not use this file except in compliance with the License.

+# * You may obtain a copy of the License at

+# * 

+#  *      http://www.apache.org/licenses/LICENSE-2.0

+# * 

+#  * Unless required by applicable law or agreed to in writing, software

+# * distributed under the License is distributed on an "AS IS" BASIS,

+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+# * See the License for the specific language governing permissions and

+# * limitations under the License.

+# * ============LICENSE_END====================================================

+# *

+# * ECOMP is a trademark and service mark of AT&T Intellectual Property.

+# *

+#-------------------------------------------------------------------------------

+log4j.debug=FALSE

+log4j.rootLogger=INFO,Root

+

+log4j.appender.Root=org.apache.log4j.DailyRollingFileAppender

+log4j.appender.Root.file=/root/node.log

+log4j.appender.Root.datePattern='.'yyyyMMdd

+log4j.appender.Root.append=true

+log4j.appender.Root.layout=org.apache.log4j.PatternLayout

+log4j.appender.Root.layout.ConversionPattern=%d %p %m%n

+!

diff --git a/datarouter-node/src/main/resources/log4j.properties.tmpl b/datarouter-node/src/main/resources/log4j.properties.tmpl
new file mode 100644
index 0000000..299edbf
--- /dev/null
+++ b/datarouter-node/src/main/resources/log4j.properties.tmpl
@@ -0,0 +1,11 @@
+cat <<!EOF
+log4j.debug=FALSE
+log4j.rootLogger=INFO,Root
+
+log4j.appender.Root=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.Root.file=C:/Users/sg481n/node.log
+log4j.appender.Root.datePattern='.'yyyyMMdd
+log4j.appender.Root.append=true
+log4j.appender.Root.layout=org.apache.log4j.PatternLayout
+log4j.appender.Root.layout.ConversionPattern=%d %p %m%n
+!EOF
diff --git a/datarouter-node/src/main/resources/logback.xml b/datarouter-node/src/main/resources/logback.xml
new file mode 100644
index 0000000..a47486d
--- /dev/null
+++ b/datarouter-node/src/main/resources/logback.xml
@@ -0,0 +1,405 @@
+<!--

+  ============LICENSE_START==================================================

+  * org.onap.dmaap

+  * ===========================================================================

+  * Copyright © 2017 AT&T Intellectual Property. All rights reserved.

+  * ===========================================================================

+  * Licensed under the Apache License, Version 2.0 (the "License");

+  * you may not use this file except in compliance with the License.

+  * You may obtain a copy of the License at

+  * 

+   *      http://www.apache.org/licenses/LICENSE-2.0

+  * 

+   * Unless required by applicable law or agreed to in writing, software

+  * distributed under the License is distributed on an "AS IS" BASIS,

+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+  * See the License for the specific language governing permissions and

+  * limitations under the License.

+  * ============LICENSE_END====================================================

+  *

+  * ECOMP is a trademark and service mark of AT&T Intellectual Property.

+  *

+-->

+<configuration scan="true" scanPeriod="3 seconds" debug="true">

+  <!--<jmxConfigurator /> -->

+  <!-- directory path for all other type logs -->

+  <!-- property name="logDir" value="/home/eby/dr2/logs" / -->

+  <property name="logDir" value="/opt/app/datartr/logs" />

+ 

+  <!-- directory path for debugging type logs -->

+  <!-- property name="debugDir" value="/home/eby/dr2/debug-logs" /-->

+  

+  <!--  specify the component name 

+    <ECOMP-component-name>::= "MSO" | "DCAE" | "ASDC " | "AAI" |"Policy" | "SDNC" | "AC"  -->

+  <!-- This creates the MSO directory in in the LogDir which is not needed, mentioned last directory of the path-->

+  <!-- property name="componentName" value="logs"></property -->

+  

+  <!--  log file names -->

+  <property name="generalLogName" value="apicalls" />

+  <!-- name="securityLogName" value="security" -->

+  <!-- name="performanceLogName" value="performance" -->

+  <!-- name="serverLogName" value="server" -->

+  <!-- name="policyLogName" value="policy"-->

+  <property name="errorLogName" value="errors" />

+  <!-- name="metricsLogName" value="metrics" -->

+  <!-- name="auditLogName" value="audit" -->

+  <!-- name="debugLogName" value="debug" -->

+  <property name="jettyAndNodeLogName" value="node"></property> 

+  <property name="defaultPattern"    value="%d{MM/dd-HH:mm:ss.SSS}|%logger|%X{RequestId}|%X{ServiceInstanceId}|%thread|%X{ServiceName}|%X{InstanceUUID}|%.-5level|%X{AlertSeverity}|%X{ServerIPAddress}|%X{ServerFQDN}|%X{RemoteHost}|%X{Timer}|%msg%n" />

+  <property name="jettyAndNodeLoggerPattern" value="%d{MM/dd-HH:mm:ss.SSS}|%logger|%thread|%.-5level|%msg%n" />

+  

+  <property name="debugLoggerPattern" value="%d{MM/dd-HH:mm:ss.SSS}|%X{RequestId}|%X{ServiceInstanceId}|%thread|%X{ServiceName}|%X{InstanceUUID}|%.-5level|%X{AlertSeverity}|%X{ServerIPAddress}|%X{ServerFQDN}|%X{RemoteHost}|%X{Timer}|[%caller{3}]|%msg%n" />

+     

+  <property name="logDirectory" value="${logDir}" />

+  <!-- property name="debugLogDirectory" value="${debugDir}/${componentName}" /-->

+  

+  

+  <!-- Example evaluator filter applied against console appender -->

+  <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">

+    <encoder>

+      <pattern>${defaultPattern}</pattern>

+    </encoder>

+  </appender>

+

+  <!-- ============================================================================ -->

+  <!-- EELF Appenders -->

+  <!-- ============================================================================ -->

+

+  <!-- The EELFAppender is used to record events to the general application 

+    log -->

+    

+    

+  <appender name="EELF"

+    class="ch.qos.logback.core.rolling.RollingFileAppender">

+    <file>${logDirectory}/${generalLogName}.log</file>

+     <filter class="ch.qos.logback.classic.filter.LevelFilter">

+		<level>INFO</level>

+		<onMatch>ACCEPT</onMatch>

+		<onMismatch>DENY</onMismatch>

+	</filter>

+    <rollingPolicy

+      class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">

+      <fileNamePattern>${logDirectory}/${generalLogName}.%i.log.zip

+      </fileNamePattern>

+      <minIndex>1</minIndex>

+      <maxIndex>9</maxIndex>

+    </rollingPolicy>

+    <triggeringPolicy

+      class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">

+      <maxFileSize>5MB</maxFileSize>

+    </triggeringPolicy>

+    <encoder>

+      <pattern>${defaultPattern}</pattern>

+    </encoder>

+  </appender>

+  

+  <appender name="asyncEELF" class="ch.qos.logback.classic.AsyncAppender">

+    <queueSize>256</queueSize>

+    <appender-ref ref="EELF" />

+  </appender>

+

+  <!-- EELF Security Appender. This appender is used to record security events 

+    to the security log file. Security events are separate from other loggers 

+    in EELF so that security log records can be captured and managed in a secure 

+    way separate from the other logs. This appender is set to never discard any 

+    events. -->

+  <!--appender name="EELFSecurity"

+    class="ch.qos.logback.core.rolling.RollingFileAppender">

+    <file>${logDirectory}/${securityLogName}.log</file>

+    <rollingPolicy

+      class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">

+      <fileNamePattern>${logDirectory}/${securityLogName}.%i.log.zip

+      </fileNamePattern>

+      <minIndex>1</minIndex>

+      <maxIndex>9</maxIndex>

+    </rollingPolicy>

+    <triggeringPolicy

+      class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">

+      <maxFileSize>5MB</maxFileSize>

+    </triggeringPolicy>

+    <encoder>

+      <pattern>${defaultPattern}</pattern>

+    </encoder>

+  </appender>

+  

+  <appender name="asyncEELFSecurity" class="ch.qos.logback.classic.AsyncAppender">

+    <queueSize>256</queueSize>

+    <discardingThreshold>0</discardingThreshold>

+    <appender-ref ref="EELFSecurity" />

+  </appender-->

+

+  <!-- EELF Performance Appender. This appender is used to record performance 

+    records. -->

+  <!--appender name="EELFPerformance"

+    class="ch.qos.logback.core.rolling.RollingFileAppender">

+    <file>${logDirectory}/${performanceLogName}.log</file>

+    <rollingPolicy

+      class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">

+      <fileNamePattern>${logDirectory}/${performanceLogName}.%i.log.zip

+      </fileNamePattern>

+      <minIndex>1</minIndex>

+      <maxIndex>9</maxIndex>

+    </rollingPolicy>

+    <triggeringPolicy

+      class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">

+      <maxFileSize>5MB</maxFileSize>

+    </triggeringPolicy>

+    <encoder>

+      <outputPatternAsHeader>true</outputPatternAsHeader>

+      <pattern>${defaultPattern}</pattern>

+    </encoder>

+  </appender>

+  <appender name="asyncEELFPerformance" class="ch.qos.logback.classic.AsyncAppender">

+    <queueSize>256</queueSize>

+    <appender-ref ref="EELFPerformance" />

+  </appender-->

+

+  <!-- EELF Server Appender. This appender is used to record Server related 

+    logging events. The Server logger and appender are specializations of the 

+    EELF application root logger and appender. This can be used to segregate Server 

+    events from other components, or it can be eliminated to record these events 

+    as part of the application root log. -->

+  <!--appender name="EELFServer"

+    class="ch.qos.logback.core.rolling.RollingFileAppender">

+    <file>${logDirectory}/${serverLogName}.log</file>

+    <rollingPolicy

+      class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">

+      <fileNamePattern>${logDirectory}/${serverLogName}.%i.log.zip

+      </fileNamePattern>

+      <minIndex>1</minIndex>

+      <maxIndex>9</maxIndex>

+    </rollingPolicy>

+    <triggeringPolicy

+      class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">

+      <maxFileSize>5MB</maxFileSize>

+    </triggeringPolicy>

+    <encoder>

+        <pattern>${defaultPattern}</pattern>

+    </encoder>

+  </appender>

+  <appender name="asyncEELFServer" class="ch.qos.logback.classic.AsyncAppender">

+    <queueSize>256</queueSize>

+    <appender-ref ref="EELFServer" />

+  </appender-->

+

+  

+  <!-- EELF Policy Appender. This appender is used to record Policy engine 

+    related logging events. The Policy logger and appender are specializations 

+    of the EELF application root logger and appender. This can be used to segregate 

+    Policy engine events from other components, or it can be eliminated to record 

+    these events as part of the application root log. -->

+  <!--appender name="EELFPolicy"

+    class="ch.qos.logback.core.rolling.RollingFileAppender">

+    <file>${logDirectory}/${policyLogName}.log</file>

+    <rollingPolicy

+      class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">

+      <fileNamePattern>${logDirectory}/${policyLogName}.%i.log.zip

+      </fileNamePattern>

+      <minIndex>1</minIndex>

+      <maxIndex>9</maxIndex>

+    </rollingPolicy>

+    <triggeringPolicy

+      class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">

+      <maxFileSize>5MB</maxFileSize>

+    </triggeringPolicy>

+    <encoder>

+        <pattern>${defaultPattern}</pattern>

+    </encoder>

+  </appender>

+  <appender name="asyncEELFPolicy" class="ch.qos.logback.classic.AsyncAppender">

+    <queueSize>256</queueSize>

+    <appender-ref ref="EELFPolicy" >

+  </appender-->

+  

+  

+  <!-- EELF Audit Appender. This appender is used to record audit engine 

+    related logging events. The audit logger and appender are specializations 

+    of the EELF application root logger and appender. This can be used to segregate 

+    Policy engine events from other components, or it can be eliminated to record 

+    these events as part of the application root log. -->

+    

+  <!--appender name="EELFAudit"

+    class="ch.qos.logback.core.rolling.RollingFileAppender">

+    <file>${logDirectory}/${auditLogName}.log</file>

+    <rollingPolicy

+      class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">

+      <fileNamePattern>${logDirectory}/${auditLogName}.%i.log.zip

+      </fileNamePattern>

+      <minIndex>1</minIndex>

+      <maxIndex>9</maxIndex>

+    </rollingPolicy>

+    <triggeringPolicy

+      class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">

+      <maxFileSize>5MB</maxFileSize>

+    </triggeringPolicy>

+    <encoder>

+         <pattern>${defaultPattern}</pattern>

+    </encoder>

+  </appender>

+  <appender name="asyncEELFAudit" class="ch.qos.logback.classic.AsyncAppender">

+    <queueSize>256</queueSize>

+    <appender-ref ref="EELFAudit" />

+  </appender-->

+

+<!--appender name="EELFMetrics"

+    class="ch.qos.logback.core.rolling.RollingFileAppender">

+    <file>${logDirectory}/${metricsLogName}.log</file>

+    <rollingPolicy

+      class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">

+      <fileNamePattern>${logDirectory}/${metricsLogName}.%i.log.zip

+      </fileNamePattern>

+      <minIndex>1</minIndex>

+      <maxIndex>9</maxIndex>

+    </rollingPolicy>

+    <triggeringPolicy

+      class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">

+      <maxFileSize>5MB</maxFileSize>

+    </triggeringPolicy>

+    <encoder-->

+      <!-- <pattern>"%d{HH:mm:ss.SSS} [%thread] %-5level %logger{1024} - 

+        %msg%n"</pattern> -->

+      <!--pattern>${defaultPattern}</pattern>

+    </encoder>

+  </appender>

+  

+  

+  <appender name="asyncEELFMetrics" class="ch.qos.logback.classic.AsyncAppender">

+    <queueSize>256</queueSize>

+    <appender-ref ref="EELFMetrics"/>

+  </appender-->

+   

+  <appender name="EELFError"

+    class="ch.qos.logback.core.rolling.RollingFileAppender">

+    <file>${logDirectory}/${errorLogName}.log</file>

+    <filter class="ch.qos.logback.classic.filter.LevelFilter">

+		<level>ERROR</level>

+		<onMatch>ACCEPT</onMatch>

+		<onMismatch>DENY</onMismatch>

+	</filter>

+    <rollingPolicy

+      class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">

+      <fileNamePattern>${logDirectory}/${errorLogName}.%i.log.zip

+      </fileNamePattern>

+      <minIndex>1</minIndex>

+      <maxIndex>9</maxIndex>

+    </rollingPolicy>

+    <triggeringPolicy

+      class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">

+      <maxFileSize>5MB</maxFileSize>

+    </triggeringPolicy>

+    <encoder>

+      <pattern>${defaultPattern}</pattern>

+    </encoder>

+  </appender>

+  

+  <appender name="asyncEELFError" class="ch.qos.logback.classic.AsyncAppender">

+    <queueSize>256</queueSize>

+    <appender-ref ref="EELFError"/>

+  </appender>

+  

+  <!-- ============================================================================ -->

+   <appender name="jettyAndNodelog"

+    class="ch.qos.logback.core.rolling.RollingFileAppender">

+    <file>${logDirectory}/${jettyAndNodeLogName}.log</file>

+     <filter class="com.att.research.datarouter.node.eelf.EELFFilter" />

+    <rollingPolicy

+      class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">

+      <fileNamePattern>${logDirectory}/${jettyAndNodeLogName}.%i.log.zip

+      </fileNamePattern>

+      <minIndex>1</minIndex>

+      <maxIndex>9</maxIndex>

+    </rollingPolicy>

+    <triggeringPolicy

+      class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">

+      <maxFileSize>5MB</maxFileSize>

+    </triggeringPolicy>

+    <encoder>

+      <pattern>${jettyAndNodeLoggerPattern}</pattern>

+    </encoder>

+  </appender>

+  

+  <appender name="asyncEELFjettyAndNodelog" class="ch.qos.logback.classic.AsyncAppender">

+    <queueSize>256</queueSize>

+    <appender-ref ref="jettyAndNodelog" />

+    <includeCallerData>true</includeCallerData>

+  </appender>

+  

+   <!-- ============================================================================ -->

+

+

+   <!--appender name="EELFDebug"

+    class="ch.qos.logback.core.rolling.RollingFileAppender">

+    <file>${debugLogDirectory}/${debugLogName}.log</file>

+    <rollingPolicy

+      class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">

+      <fileNamePattern>${debugLogDirectory}/${debugLogName}.%i.log.zip

+      </fileNamePattern>

+      <minIndex>1</minIndex>

+      <maxIndex>9</maxIndex>

+    </rollingPolicy>

+    <triggeringPolicy

+      class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">

+      <maxFileSize>5MB</maxFileSize>

+    </triggeringPolicy>

+    <encoder>

+      <pattern>${debugLoggerPattern}</pattern>

+    </encoder>

+  </appender>

+  

+  <appender name="asyncEELFDebug" class="ch.qos.logback.classic.AsyncAppender">

+    <queueSize>256</queueSize>

+    <appender-ref ref="EELFDebug" />

+    <includeCallerData>true</includeCallerData>

+  </appender-->

+ 

+  

+  <!-- ============================================================================ -->

+  <!--  EELF loggers -->

+  <!-- ============================================================================ -->

+  <logger name="com.att.eelf" level="info" additivity="false">

+    <appender-ref ref="asyncEELF" />

+  </logger>

+  

+     <logger name="com.att.eelf.error" level="error" additivity="false">

+ 		 <appender-ref ref="asyncEELFError" />

+ 	 </logger>

+  

+     <logger name="log4j.logger.org.eclipse.jetty" additivity="false" level="info">

+		<appender-ref ref="asyncEELFjettyAndNodelog"/>

+	</logger> 

+	

+  <!-- logger name="com.att.eelf.security" level="info" additivity="false">

+    <appender-ref ref="asyncEELFSecurity" /> 

+  </logger>

+  <logger name="com.att.eelf.perf" level="info" additivity="false">

+    <appender-ref ref="asyncEELFPerformance" />

+  </logger>

+  <logger name="com.att.eelf.server" level="info" additivity="false">

+    <appender-ref ref="asyncEELFServer" />

+  </logger>

+  <logger name="com.att.eelf.policy" level="info" additivity="false">

+    <appender-ref ref="asyncEELFPolicy" />

+  </logger>

+

+  <logger name="com.att.eelf.audit" level="info" additivity="false">

+    <appender-ref ref="asyncEELFAudit" />

+  </logger>

+  

+  <logger name="com.att.eelf.metrics" level="info" additivity="false">

+        <appender-ref ref="asyncEELFMetrics" />

+  </logger>

+   

+   <logger name="com.att.eelf.debug" level="debug" additivity="false">

+        <appender-ref ref="asyncEELFDebug" />

+  </logger-->

+

+  

+

+  

+  <root level="INFO">

+    <appender-ref ref="asyncEELF" />

+    <appender-ref ref="asyncEELFError" />

+     <appender-ref ref="asyncEELFjettyAndNodelog" />

+  </root>

+

+</configuration>

diff --git a/datarouter-node/src/main/resources/misc/descriptor.xml b/datarouter-node/src/main/resources/misc/descriptor.xml
new file mode 100644
index 0000000..88fccc1
--- /dev/null
+++ b/datarouter-node/src/main/resources/misc/descriptor.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0" encoding="utf-8"?>

+<!--

+  ============LICENSE_START==================================================

+  * org.onap.dmaap

+  * ===========================================================================

+  * Copyright © 2017 AT&T Intellectual Property. All rights reserved.

+  * ===========================================================================

+  * Licensed under the Apache License, Version 2.0 (the "License");

+  * you may not use this file except in compliance with the License.

+  * You may obtain a copy of the License at

+  * 

+   *      http://www.apache.org/licenses/LICENSE-2.0

+  * 

+   * Unless required by applicable law or agreed to in writing, software

+  * distributed under the License is distributed on an "AS IS" BASIS,

+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+  * See the License for the specific language governing permissions and

+  * limitations under the License.

+  * ============LICENSE_END====================================================

+  *

+  * ECOMP is a trademark and service mark of AT&T Intellectual Property.

+  *

+-->

+<descriptor version="1" xmlns="http://aft.att.com/swm/descriptor">

+	<platforms>

+		<platform os="Linux" osVersions="*" architecture="*"/>

+	</platforms>

+	<paths>

+		<path name="/opt/app/datartr" user="datartr" group="datartr" permissions="755,644" recursive="true"/>

+		<path name="/opt/app/platform/init.d/drtrnode" user="datartr" group="datartr" permissions="755"/>

+	</paths>

+	<actions>

+		<action type="INIT">

+			<proc stage="POST" user="datartr" group="datartr"/>

+		</action>

+		<action type="FALL">

+			<proc stage="PRE" user="datartr" group="datartr"/>

+			<proc stage="POST" user="datartr" group="datartr"/>

+		</action>

+		<action type="INST">

+			<proc stage="PRE" user="datartr" group="datartr"/>

+			<proc stage="POST" user="datartr" group="datartr"/>

+		</action>

+		<action type="DINST">

+			<proc stage="PRE" user="datartr" group="datartr"/>

+		</action>

+	</actions>

+	<dependencies>

+		<dependencyFilter componentName="com.att.java:jdk8lin" versions="[1.8.0.77-02]" sequence="1"/>

+		<dependencyFilter componentName="com.att.platform:initd" versions="[1.0.15,)" sequence="2"/>

+		<dependencyFilter componentName="com.att.dmaap.datarouter:util" versions="[1.0.7,)" sequence="3"/>

+	</dependencies>

+</descriptor>

diff --git a/datarouter-node/src/main/resources/misc/doaction b/datarouter-node/src/main/resources/misc/doaction
new file mode 100644
index 0000000..617b01d
--- /dev/null
+++ b/datarouter-node/src/main/resources/misc/doaction
@@ -0,0 +1,42 @@
+#!/bin/bash
+
+cd /opt/app/datartr/etc
+for action in "$@"
+do
+case "$action" in
+'backup')
+	cp log4j.properties log4j.properties.save 2>/dev/null
+	cp node.properties node.properties.save 2>/dev/null
+	cp havecert havecert.save 2>/dev/null
+	;;
+'stop')
+	/opt/app/platform/init.d/drtrnode stop
+	;;
+'start')
+	/opt/app/platform/init.d/drtrnode start || exit 1
+	;;
+'config')
+	/bin/bash log4j.properties.tmpl >log4j.properties
+	/bin/bash node.properties.tmpl >node.properties
+	/bin/bash havecert.tmpl >havecert
+	echo "$AFTSWM_ACTION_NEW_VERSION" >VERSION.node
+	chmod +x havecert
+	rm -f /opt/app/platform/rc.d/K90drtrnode /opt/app/platform/rc.d/S10drtrnode
+	ln -s ../init.d/drtrnode /opt/app/platform/rc.d/K90drtrnode
+	ln -s ../init.d/drtrnode /opt/app/platform/rc.d/S10drtrnode
+	;;
+'restore')
+	cp log4j.properties.save log4j.properties 2>/dev/null
+	cp node.properties.save node.properties 2>/dev/null
+	cp havecert.save havecert 2>/dev/null
+	;;
+'clean')
+	rm -f log4j.properties node.properties havecert log4j.properties.save node.properties.save havecert.save SHUTDOWN redirections.dat VERSION.node
+	rm -f /opt/app/platform/rc.d/K90drtrnode /opt/app/platform/rc.d/S10drtrnode
+	;;
+*)
+	exit 1
+	;;
+esac
+done
+exit 0
diff --git a/datarouter-node/src/main/resources/misc/drtrnode b/datarouter-node/src/main/resources/misc/drtrnode
new file mode 100644
index 0000000..ba784f3
--- /dev/null
+++ b/datarouter-node/src/main/resources/misc/drtrnode
@@ -0,0 +1,114 @@
+#!/bin/bash
+
+umask 0022
+TZ=GMT0
+export TZ
+PATH=/usr/local/bin:/bin:/usr/bin:/usr/local/sbin:/usr/sbin:/sbin:/opt/java/jdk/jdk180/bin
+export PATH
+CLASSPATH=`echo /opt/app/datartr/etc /opt/app/datartr/lib/*.jar | tr ' ' ':'` 
+export CLASSPATH
+
+pids() {
+	ps -ef | grep java | grep node.NodeMain | sed -e 's/[^ ]* *//' -e 's/ .*//'
+}
+
+start() {
+	ID=`id -n -u`
+	GRP=`id -n -g`
+	if [ "$ID" != "root" ]
+	then
+		echo drtrnode must be started as user datartr not $ID
+		exit 1
+	fi
+	if [ "$GRP" != "datartr" ]
+	then
+		echo drtrnode must be started as group datartr not $GRP
+		exit 1
+	fi
+	cd /opt/app/datartr
+	if etc/havecert
+	then
+		echo >/dev/null
+	else
+		echo No certificate file available.  Cannot start
+		exit 0
+	fi
+	PIDS=`pids`
+	if [ "$PIDS" != "" ]
+	then
+		echo drtrnode already running
+		exit 0
+	fi
+
+	mkdir -p /opt/app/datartr/spool/s
+	chmod 755 /opt/app/datartr/spool/s
+
+	rm -f /opt/app/datartr/etc/SHUTDOWN
+	nohup java com.att.research.datarouter.node.NodeMain </dev/null >/dev/null 2>&1 &
+	sleep 5
+	PIDS=`pids`
+	if [ "$PIDS" = "" ]
+	then
+		echo drtrnode startup failed
+	else
+		echo drtrnode started
+	fi
+}
+
+stop() {
+	ID=`id -n -u`
+	GRP=`id -n -g`
+	if [ "$ID" != "datartr" ]
+	then
+		echo drtrnode must be stopped as user datartr not $ID
+		exit 1
+	fi
+	if [ "$GRP" != "datartr" ]
+	then
+		echo drtrnode must be stopped as group datartr not $GRP
+		exit 1
+	fi
+	touch /opt/app/datartr/etc/SHUTDOWN
+	PIDS=`pids`
+	if [ "$PIDS" != "" ]
+	then
+		sleep 5
+		kill -9 $PIDS
+		sleep 5
+		echo drtrnode stopped
+	else
+		echo drtrnode not running
+	fi
+}
+
+status() {
+	PIDS=`pids`
+	if [ "$PIDS" != "" ]
+	then
+		echo drtrnode running
+	else
+		echo drtrnode not running
+	fi
+}
+
+case "$1" in
+'start')
+	start
+	;;
+'stop')
+	stop
+	;;
+'restart')
+	stop
+	sleep 20
+	start
+	;;
+'status')
+	status
+	;;
+*)
+	echo "Usage: $0 { start | stop | restart }"
+	exit 1
+	;;
+esac
+exit 0
diff --git a/datarouter-node/src/main/resources/misc/havecert.tmpl b/datarouter-node/src/main/resources/misc/havecert.tmpl
new file mode 100644
index 0000000..2e813ba
--- /dev/null
+++ b/datarouter-node/src/main/resources/misc/havecert.tmpl
@@ -0,0 +1,11 @@
+#!/bin/bash
+cat <<!EOF
+TZ=GMT0
+cd /opt/app/datartr;
+if [ -f ${DRTR_NODE_KSTOREFILE:-etc/keystore} ]
+then
+	exit 0
+fi
+echo `date '+%F %T,000'` WARN Certificate file "${DRTR_NODE_KSTOREFILE:-etc/keystore}" is missing >>${DRTR_NODE_LOGS:-logs}/node.log
+exit 1
+!EOF
diff --git a/datarouter-node/src/main/resources/misc/log4j.properties.tmpl b/datarouter-node/src/main/resources/misc/log4j.properties.tmpl
new file mode 100644
index 0000000..24bd3df
--- /dev/null
+++ b/datarouter-node/src/main/resources/misc/log4j.properties.tmpl
@@ -0,0 +1,11 @@
+cat <<!EOF
+log4j.debug=FALSE
+log4j.rootLogger=INFO,Root
+
+log4j.appender.Root=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.Root.file=${DRTR_NODE_LOGS:-logs}/node.log
+log4j.appender.Root.datePattern='.'yyyyMMdd
+log4j.appender.Root.append=true
+log4j.appender.Root.layout=org.apache.log4j.PatternLayout
+log4j.appender.Root.layout.ConversionPattern=%d %p %m%n
+!EOF
diff --git a/datarouter-node/src/main/resources/misc/node.properties b/datarouter-node/src/main/resources/misc/node.properties
new file mode 100644
index 0000000..fb97702
--- /dev/null
+++ b/datarouter-node/src/main/resources/misc/node.properties
@@ -0,0 +1,112 @@
+#-------------------------------------------------------------------------------

+# ============LICENSE_START==================================================

+# * org.onap.dmaap

+# * ===========================================================================

+# * Copyright © 2017 AT&T Intellectual Property. All rights reserved.

+# * ===========================================================================

+# * Licensed under the Apache License, Version 2.0 (the "License");

+# * you may not use this file except in compliance with the License.

+# * You may obtain a copy of the License at

+# * 

+#  *      http://www.apache.org/licenses/LICENSE-2.0

+# * 

+#  * Unless required by applicable law or agreed to in writing, software

+# * distributed under the License is distributed on an "AS IS" BASIS,

+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+# * See the License for the specific language governing permissions and

+# * limitations under the License.

+# * ============LICENSE_END====================================================

+# *

+# * ECOMP is a trademark and service mark of AT&T Intellectual Property.

+# *

+#-------------------------------------------------------------------------------

+#

+#	Configuration parameters fixed at startup for the DataRouter node

+#

+#	URL to retrieve dynamic configuration

+#

+#ProvisioningURL:	${DRTR_PROV_INTURL}

+ProvisioningURL=https://prov.datarouternew.com:8443/internal/prov

+

+#

+#	URL to upload PUB/DEL/EXP logs

+#

+#LogUploadURL:	${DRTR_LOG_URL}

+LogUploadURL=https://prov.datarouternew.com:8443/internal/logs

+

+#

+#	The port number for http as seen within the server

+#

+#IntHttpPort:	${DRTR_NODE_INTHTTPPORT:-8080}

+IntHttpPort=8080

+#

+#	The port number for https as seen within the server

+#

+IntHttpsPort=8443

+#

+#	The external port number for https taking port mapping into account

+#

+ExtHttpsPort=443

+#

+#	The minimum interval between fetches of the dynamic configuration

+#	from the provisioning server

+#

+MinProvFetchInterval=10000

+#

+#	The minimum interval between saves of the redirection data file

+#

+MinRedirSaveInterval=10000

+#

+#	The path to the directory where log files are stored

+#

+LogDir=/opt/app/datartr/logs

+#

+#	The retention interval (in days) for log files

+#

+LogRetention=30

+#

+#	The path to the directories where data and meta data files are stored

+#

+SpoolDir=/opt/app/datartr/spool

+#

+#	The path to the redirection data file

+#

+#RedirectionFile:	etc/redirections.dat

+#

+#	The type of keystore for https

+#

+KeyStoreType:	jks

+#

+#	The path to the keystore for https

+#

+KeyStoreFile:/opt/app/datartr/self_signed/keystore.jks

+#

+#	The password for the https keystore

+#

+KeyStorePassword=changeit

+#

+#	The password for the private key in the https keystore

+#

+KeyPassword=changeit

+#

+#	The type of truststore for https

+#

+TrustStoreType=jks

+#

+#	The path to the truststore for https

+#

+#TrustStoreFile=/usr/lib/jvm/java-8-oracle/jre/lib/security/cacerts

+TrustStoreFile=/opt/app/datartr/self_signed/cacerts.jks

+#

+#	The password for the https truststore

+#

+TrustStorePassword=changeit

+#

+#	The path to the file used to trigger an orderly shutdown

+#

+QuiesceFile=etc/SHUTDOWN

+#

+#	The key used to generate passwords for node to node transfers

+#

+NodeAuthKey=Node123!

+

diff --git a/datarouter-node/src/main/resources/misc/notes b/datarouter-node/src/main/resources/misc/notes
new file mode 100644
index 0000000..f37a8ea
--- /dev/null
+++ b/datarouter-node/src/main/resources/misc/notes
@@ -0,0 +1,54 @@
+package notes for com.att.dmaap.datarouter:node
+
+This component is for the Data Router Node software.
+
+The following pre-requisite components should already be present:
+	com.att.aft.swm:swm-cli
+	com.att.aft.swm:swm-node
+	- SWM Variables: AFTSWM_AUTOLINK_PARENTS=/opt/app:/opt/app/workload,/opt/app/aft:/opt/app/workload/aft
+	com.att.platform:uam-auto
+	com.att.java:jdk8lin
+	com.att.platform:initd
+	com.att.platform:port-fwd
+	- SWM Variables: PLATFORM_PORT_FWD=80,8080|443,8443
+	com.att.dmaap.datarouter:util
+
+In a non-production environment, the URL for fetching provisioning data from
+the provisioning server must be overridden.  This can be done by setting a SWM
+variable prior to installing this component.  The production (default) value for
+this variable is:
+	DRTR_PROV_INTURL=https://feeds-drtr.web.att.com/internal/prov
+
+Similarly, the URL for uploading event logs to the log server must be overridden.  This can also be done by setting a SWM variable.  The production (default) value is:
+	DRTR_LOG_URL=https://feeds-drtr.web.att.com/internal/logs
+
+Other SWM variables that can be set are:
+
+DRTR_NODE_INTHTTPPORT (default 8080)
+	The TCP/IP port number the component should listen on for "go fetch"
+	requests from the provisioning server
+DRTR_NODE_INTHTTPSPORT (default 8443)
+	The TCP/IP port number the component should listen on for publish
+	requests from feed publishers and other nodes
+DRTR_NODE_EXTHTTPSPORT (default 443)
+	The TCP/IP port number the component should use for node-to-node
+	transfers and for sending redirect requests back to publishers
+DRTR_NODE_SPOOL (default /opt/app/datartr/spool)
+	The directory where data files should be saved while in transit
+DRTR_NODE_LOGS (default /opt/app/datartr/logs)
+	The directory where log files should be kept
+DRTR_NODE_LOG_RETENTION (default 30)
+	How long a log file is kept before being deleted
+DRTR_NODE_KSTOREFILE (default /opt/app/datartr/etc/keystore)
+	The java keystore file containing the server certificate and private key
+	for this server
+DRTR_NODE_KSTOREPASS (default changeit)
+	The password for the keystore file
+DRTR_NODE_PVTKEYPASS (default changeit)
+	The password for the private key in the keystore file
+DRTR_NODE_TSTOREFILE (by default, use the truststore from the Java JDK)
+	The java keystore file containing the trusted certificate authority
+	certificates
+DRTR_NODE_TSTOREPASS (default changeit)
+	The password for the trust store file.  Only applies if a trust store
+	file is specified.
diff --git a/datarouter-node/src/main/resources/node.properties b/datarouter-node/src/main/resources/node.properties
new file mode 100644
index 0000000..fb97702
--- /dev/null
+++ b/datarouter-node/src/main/resources/node.properties
@@ -0,0 +1,112 @@
+#-------------------------------------------------------------------------------

+# ============LICENSE_START==================================================

+# * org.onap.dmaap

+# * ===========================================================================

+# * Copyright © 2017 AT&T Intellectual Property. All rights reserved.

+# * ===========================================================================

+# * Licensed under the Apache License, Version 2.0 (the "License");

+# * you may not use this file except in compliance with the License.

+# * You may obtain a copy of the License at

+# * 

+#  *      http://www.apache.org/licenses/LICENSE-2.0

+# * 

+#  * Unless required by applicable law or agreed to in writing, software

+# * distributed under the License is distributed on an "AS IS" BASIS,

+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+# * See the License for the specific language governing permissions and

+# * limitations under the License.

+# * ============LICENSE_END====================================================

+# *

+# * ECOMP is a trademark and service mark of AT&T Intellectual Property.

+# *

+#-------------------------------------------------------------------------------

+#

+#	Configuration parameters fixed at startup for the DataRouter node

+#

+#	URL to retrieve dynamic configuration

+#

+#ProvisioningURL:	${DRTR_PROV_INTURL}

+ProvisioningURL=https://prov.datarouternew.com:8443/internal/prov

+

+#

+#	URL to upload PUB/DEL/EXP logs

+#

+#LogUploadURL:	${DRTR_LOG_URL}

+LogUploadURL=https://prov.datarouternew.com:8443/internal/logs

+

+#

+#	The port number for http as seen within the server

+#

+#IntHttpPort:	${DRTR_NODE_INTHTTPPORT:-8080}

+IntHttpPort=8080

+#

+#	The port number for https as seen within the server

+#

+IntHttpsPort=8443

+#

+#	The external port number for https taking port mapping into account

+#

+ExtHttpsPort=443

+#

+#	The minimum interval between fetches of the dynamic configuration

+#	from the provisioning server

+#

+MinProvFetchInterval=10000

+#

+#	The minimum interval between saves of the redirection data file

+#

+MinRedirSaveInterval=10000

+#

+#	The path to the directory where log files are stored

+#

+LogDir=/opt/app/datartr/logs

+#

+#	The retention interval (in days) for log files

+#

+LogRetention=30

+#

+#	The path to the directories where data and meta data files are stored

+#

+SpoolDir=/opt/app/datartr/spool

+#

+#	The path to the redirection data file

+#

+#RedirectionFile:	etc/redirections.dat

+#

+#	The type of keystore for https

+#

+KeyStoreType:	jks

+#

+#	The path to the keystore for https

+#

+KeyStoreFile:/opt/app/datartr/self_signed/keystore.jks

+#

+#	The password for the https keystore

+#

+KeyStorePassword=changeit

+#

+#	The password for the private key in the https keystore

+#

+KeyPassword=changeit

+#

+#	The type of truststore for https

+#

+TrustStoreType=jks

+#

+#	The path to the truststore for https

+#

+#TrustStoreFile=/usr/lib/jvm/java-8-oracle/jre/lib/security/cacerts

+TrustStoreFile=/opt/app/datartr/self_signed/cacerts.jks

+#

+#	The password for the https truststore

+#

+TrustStorePassword=changeit

+#

+#	The path to the file used to trigger an orderly shutdown

+#

+QuiesceFile=etc/SHUTDOWN

+#

+#	The key used to generate passwords for node to node transfers

+#

+NodeAuthKey=Node123!

+