Initial OpenECOMP policy/drools-pdp commit

Change-Id: I0072ccab6f40ed32da39667f9f8523b6d6dad2e2
Signed-off-by: Pamela Dragosh <pdragosh@research.att.com>
diff --git a/policy-endpoints/pom.xml b/policy-endpoints/pom.xml
new file mode 100644
index 0000000..e62853c
--- /dev/null
+++ b/policy-endpoints/pom.xml
@@ -0,0 +1,157 @@
+<!--
+  ============LICENSE_START=======================================================
+  ECOMP Policy Engine - Drools PDP
+  ================================================================================
+  Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+  ================================================================================
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+  
+       http://www.apache.org/licenses/LICENSE-2.0
+  
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  ============LICENSE_END=========================================================
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+         
+  <modelVersion>4.0.0</modelVersion>
+  
+  <parent>
+    <groupId>org.openecomp.policy.drools-pdp</groupId>
+    <artifactId>drools-pdp</artifactId>
+    <version>1.0.0-SNAPSHOT</version>
+  </parent>
+  
+  <artifactId>policy-endpoints</artifactId>
+  
+  <name>policy-endpoints</name>
+  <description>Policy UEB support</description>
+
+  <properties>
+          <maven.compiler.source>1.8</maven.compiler.source>
+          <maven.compiler.target>1.8</maven.compiler.target>
+          <cambria.version>0.0.1</cambria.version>
+          <dmaap.version>0.2.12</dmaap.version>
+          <jetty.version>9.3.14.v20161028</jetty.version>
+          <jersey.version>2.22.1</jersey.version>
+          <jackson.version>2.8.4</jackson.version>
+        
+  </properties>
+
+	<dependencies>
+		<dependency>
+			<groupId>com.att.nsa</groupId>
+			<artifactId>cambriaClient</artifactId>
+			<version>${cambria.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>org.slf4j</groupId>
+					<artifactId>slf4j-log4j12</artifactId>
+				</exclusion>	
+				<exclusion>
+					<groupId>com.att.nsa</groupId>
+					<artifactId>saClientLibrary</artifactId>
+				</exclusion>						
+			</exclusions>
+		</dependency>
+		<dependency>
+			<groupId>com.att.nsa</groupId>
+			<artifactId>dmaapClient</artifactId>
+			<version>${dmaap.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>org.slf4j</groupId>
+					<artifactId>slf4j-log4j12</artifactId>
+				</exclusion>	
+			</exclusions>
+		</dependency>	
+		<dependency>
+			<groupId>org.eclipse.jetty</groupId>
+			<artifactId>jetty-server</artifactId>
+			<version>${jetty.version}</version>
+		</dependency>
+	
+		<dependency>
+			<groupId>org.eclipse.jetty</groupId>
+			<artifactId>jetty-servlet</artifactId>
+			<version>${jetty.version}</version>
+		</dependency>
+	
+		<dependency>
+			<groupId>org.glassfish.jersey.core</groupId>
+			<artifactId>jersey-server</artifactId>
+			<version>${jersey.version}</version>
+		</dependency>
+	
+		<dependency>
+			<groupId>org.glassfish.jersey.containers</groupId>
+			<artifactId>jersey-container-servlet-core</artifactId>
+			<version>${jersey.version}</version>
+		</dependency>
+	
+		<dependency>
+			<groupId>org.glassfish.jersey.media</groupId>
+			<artifactId>jersey-media-json-jackson</artifactId>
+			<version>${jersey.version}</version>
+		</dependency>
+	
+		<dependency>
+			<groupId>org.glassfish.jersey.containers</groupId>
+			<artifactId>jersey-container-jetty-http</artifactId>
+			<version>${jersey.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>org.eclipse.jetty</groupId>
+					<artifactId>jetty-util</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		
+		<dependency>
+			<groupId>org.glassfish.jersey.core</groupId>
+			<artifactId>jersey-client</artifactId>
+			<version>${jersey.version}</version>
+		</dependency>
+	
+		<dependency>
+			<groupId>com.fasterxml.jackson.core</groupId>
+			<artifactId>jackson-databind</artifactId>
+			<version>${jackson.version}</version>
+		</dependency>
+	
+		<dependency>
+			<groupId>com.fasterxml.jackson.datatype</groupId>
+			<artifactId>jackson-datatype-jsr310</artifactId>
+			<version>${jackson.version}</version>
+		</dependency>
+
+	    <dependency>
+			<groupId>org.apache.httpcomponents</groupId>
+			<artifactId>httpcore</artifactId>
+			<version>4.4.4</version>
+	    </dependency>	
+	    <dependency>
+			<groupId>org.apache.httpcomponents</groupId>
+			<artifactId>httpclient</artifactId>
+			<version>4.5</version>
+	    </dependency>
+	    <dependency>
+		<groupId>org.apache.commons</groupId>
+			<artifactId>commons-collections4</artifactId>
+			<version>4.1</version>
+		</dependency>
+		<dependency>
+			<groupId>org.openecomp.policy.drools-pdp</groupId>
+			<artifactId>policy-core</artifactId>
+			<version>1.0.0-SNAPSHOT</version>
+		</dependency>
+	</dependencies>
+
+</project>
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/Topic.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/Topic.java
new file mode 100644
index 0000000..d38bab5
--- /dev/null
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/Topic.java
@@ -0,0 +1,76 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.policy.drools.event.comm;
+
+import java.util.List;
+
+/**
+ * Essential Topic Data
+ */
+public interface Topic {
+	
+	public static final String NETWORK_LOGGER = "networkLogger";
+	
+	/**
+	 * Underlying Communication infrastructure Types
+	 */
+	public enum CommInfrastructure {
+		/**
+		 * UEB Communication Infrastructure
+		 */
+		UEB,
+		/**
+		 * DMAAP Communication Infrastructure
+		 */		
+		DMAAP,
+		/**
+		 * REST Communication Infrastructure
+		 */				
+		REST
+	}
+	
+	/**
+	 * gets the topic name
+	 * 
+	 * @return topic name
+	 */
+	public String getTopic();
+	
+	/**
+	 * gets the communication infrastructure type
+	 * @return
+	 */
+	public CommInfrastructure getTopicCommInfrastructure();
+	
+	/**
+	 * return list of servers
+	 * @return bus servers
+	 */
+	public List<String> getServers();	
+
+	/**
+	 * get the more recent events in this topic entity
+	 * 
+	 * @return list of most recent events
+	 */
+	public String[] getRecentEvents();
+
+}
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicEndpoint.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicEndpoint.java
new file mode 100644
index 0000000..b3f236f
--- /dev/null
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicEndpoint.java
@@ -0,0 +1,692 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.policy.drools.event.comm;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.openecomp.policy.drools.event.comm.bus.DmaapTopicSink;
+import org.openecomp.policy.drools.event.comm.bus.DmaapTopicSource;
+import org.openecomp.policy.drools.event.comm.bus.UebTopicSink;
+import org.openecomp.policy.drools.event.comm.bus.UebTopicSource;
+import org.openecomp.policy.common.logging.flexlogger.FlexLogger;
+import org.openecomp.policy.common.logging.flexlogger.Logger;
+import org.openecomp.policy.common.logging.eelf.MessageCodes;
+import org.openecomp.policy.drools.properties.Lockable;
+import org.openecomp.policy.drools.properties.Startable;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+/**
+ * Abstraction to managed the system's Networked Topic Endpoints,
+ * sources of all events input into the System. 
+ */
+public interface TopicEndpoint extends Startable, Lockable {
+	
+	/**
+	 * Add Topic Sources to the communication infrastructure initialized per
+	 * properties
+	 * 
+	 * @param properties properties for Topic Source construction
+	 * @return a generic Topic Source
+	 * @throws IllegalArgumentException when invalid arguments are provided
+	 */
+	public List<? extends TopicSource> addTopicSources(Properties properties) 
+		   throws IllegalArgumentException;
+
+	/**
+	 * Add Topic Sinks to the communication infrastructure initialized per
+	 * properties
+	 * 
+	 * @param properties properties for Topic Sink construction
+	 * @return a generic Topic Sink
+	 * @throws IllegalArgumentException when invalid arguments are provided
+	 */
+	public List<? extends TopicSink> addTopicSinks(Properties properties) 
+			   throws IllegalArgumentException;
+	
+	/**
+	 * gets all Topic Sources
+	 * @return the Topic Source List
+	 */
+	List<? extends TopicSource> getTopicSources();
+	
+	/**
+	 * get the Topic Sources for the given topic name
+	 * 
+	 * @param topicName the topic name
+	 * 
+	 * @return the Topic Source List
+	 * @throws IllegalStateException if the entity is in an invalid state
+	 * @throws IllegalArgumentException if invalid parameters are present
+	 */
+	public List<? extends TopicSource> getTopicSources(List<String> topicNames) 
+			throws IllegalStateException, IllegalArgumentException;
+	
+	/**
+	 * gets the Topic Source for the given topic name and 
+	 * underlying communication infrastructure type
+	 * 
+	 * @param commType communication infrastructure type
+	 * @param topicName the topic name
+	 * 
+	 * @return the Topic Source
+	 * @throws IllegalStateException if the entity is in an invalid state, for
+	 * example multiple TopicReaders for a topic name and communication infrastructure
+	 * @throws IllegalArgumentException if invalid parameters are present
+	 * @throws UnsupportedOperationException if the operation is not supported.
+	 */
+	public TopicSource getTopicSource(Topic.CommInfrastructure commType, 
+			                          String topicName) 
+			throws IllegalStateException, IllegalArgumentException, 
+			       UnsupportedOperationException;
+	
+	/**
+	 * get the UEB Topic Source for the given topic name
+	 * 
+	 * @param topicName the topic name
+	 * 
+	 * @return the UEB Topic Source
+	 * @throws IllegalStateException if the entity is in an invalid state, for
+	 * example multiple TopicReaders for a topic name and communication infrastructure
+	 * @throws IllegalArgumentException if invalid parameters are present
+	 */
+	public UebTopicSource getUebTopicSource(String topicName)
+			throws IllegalStateException, IllegalArgumentException;
+	
+	/**
+	 * get the DMAAP Topic Source for the given topic name
+	 * 
+	 * @param topicName the topic name
+	 * 
+	 * @return the DMAAP Topic Source
+	 * @throws IllegalStateException if the entity is in an invalid state, for
+	 * example multiple TopicReaders for a topic name and communication infrastructure
+	 * @throws IllegalArgumentException if invalid parameters are present
+	 */
+	public DmaapTopicSource getDmaapTopicSource(String topicName)
+			throws IllegalStateException, IllegalArgumentException;
+	
+	/**
+	 * get the Topic Sinks for the given topic name
+	 * 
+	 * @param topicNames the topic names
+	 * @return the Topic Sink List
+	 * @throws IllegalStateException
+	 * @throws IllegalArgumentException
+	 */
+	public List<? extends TopicSink> getTopicSinks(List<String> topicNames) 
+			throws IllegalStateException, IllegalArgumentException;
+	
+	/**
+	 * get the Topic Sinks for the given topic name and 
+	 * underlying communication infrastructure type
+	 * 
+	 * @param topicName the topic name
+	 * @param commType communication infrastructure type
+	 * 
+	 * @return the Topic Sink List
+	 * @throws IllegalStateException if the entity is in an invalid state, for
+	 * example multiple TopicWriters for a topic name and communication infrastructure
+	 * @throws IllegalArgumentException if invalid parameters are present
+	 */
+	public TopicSink getTopicSink(Topic.CommInfrastructure commType, 
+			                      String topicName) 
+			throws IllegalStateException, IllegalArgumentException,
+			       UnsupportedOperationException;
+	
+	/**
+	 * get the Topic Sinks for the given topic name and 
+	 * all the underlying communication infrastructure type
+	 * 
+	 * @param topicName the topic name
+	 * @param commType communication infrastructure type
+	 * 
+	 * @return the Topic Sink List
+	 * @throws IllegalStateException if the entity is in an invalid state, for
+	 * example multiple TopicWriters for a topic name and communication infrastructure
+	 * @throws IllegalArgumentException if invalid parameters are present
+	 */
+	public List<? extends TopicSink> getTopicSinks(String topicName) 
+			throws IllegalStateException, IllegalArgumentException;
+	
+	/**
+	 * get the UEB Topic Source for the given topic name
+	 * 
+	 * @param topicName the topic name
+	 * 
+	 * @return the Topic Source
+	 * @throws IllegalStateException if the entity is in an invalid state, for
+	 * example multiple TopicReaders for a topic name and communication infrastructure
+	 * @throws IllegalArgumentException if invalid parameters are present
+	 */
+	public UebTopicSink getUebTopicSink(String topicName)
+			throws IllegalStateException, IllegalArgumentException;
+	
+	/**
+	 * get the DMAAP Topic Source for the given topic name
+	 * 
+	 * @param topicName the topic name
+	 * 
+	 * @return the Topic Source
+	 * @throws IllegalStateException if the entity is in an invalid state, for
+	 * example multiple TopicReaders for a topic name and communication infrastructure
+	 * @throws IllegalArgumentException if invalid parameters are present
+	 */
+	public DmaapTopicSink getDmaapTopicSink(String topicName)
+			throws IllegalStateException, IllegalArgumentException;
+	
+	/**
+	 * gets only the UEB Topic Sources
+	 * @return the UEB Topic Source List
+	 */
+	public List<UebTopicSource> getUebTopicSources();
+	
+	/**
+	 * gets only the DMAAP Topic Sources
+	 * @return the DMAAP Topic Source List
+	 */
+	public List<DmaapTopicSource> getDmaapTopicSources();
+	
+	/**
+	 * gets all Topic Sinks
+	 * @return the Topic Sink List
+	 */
+	public List<? extends TopicSink> getTopicSinks();
+	
+	/**
+	 * gets only the UEB Topic Sinks
+	 * @return the UEB Topic Sink List
+	 */
+	public List<UebTopicSink> getUebTopicSinks();
+	
+	/**
+	 * gets only the DMAAP Topic Sinks
+	 * @return the DMAAP Topic Sink List
+	 */
+	public List<DmaapTopicSink> getDmaapTopicSinks();
+	
+	/**
+	 * singleton for global access
+	 */
+	public static final TopicEndpoint manager = new ProxyTopicEndpointManager();
+}
+
+/*
+ * ----------------- implementation -------------------
+ */
+
+/**
+ * This implementation of the Topic Endpoint Manager, proxies operations to appropriate
+ * implementations according to the communication infrastructure that are supported
+ */
+class ProxyTopicEndpointManager implements TopicEndpoint {
+	// get an instance of logger 
+	private static Logger  logger = FlexLogger.getLogger(ProxyTopicEndpointManager.class);
+	/**
+	 * Is this element locked?
+	 */
+	protected volatile boolean locked = false;
+	
+	/**
+	 * Is this element alive?
+	 */
+	protected volatile boolean alive = false;
+	
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public List<? extends TopicSource> addTopicSources(Properties properties) throws IllegalArgumentException {
+		
+		// 1. Create UEB Sources
+		// 2. Create DMAAP Sources
+		
+		List<TopicSource> sources = new ArrayList<TopicSource>();	
+		
+		sources.addAll(UebTopicSource.factory.build(properties));
+		sources.addAll(DmaapTopicSource.factory.build(properties));
+		
+		if (this.isLocked()) {
+			for (TopicSource source : sources) {
+				source.lock();
+			}
+		}
+		
+		return sources;
+	}
+	
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public List<? extends TopicSink> addTopicSinks(Properties properties) throws IllegalArgumentException {
+		// 1. Create UEB Sinks
+		// 2. Create DMAAP Sinks
+		
+		List<TopicSink> sinks = new ArrayList<TopicSink>();	
+		
+		sinks.addAll(UebTopicSink.factory.build(properties));
+		sinks.addAll(DmaapTopicSink.factory.build(properties));
+		
+		if (this.isLocked()) {
+			for (TopicSink sink : sinks) {
+				sink.lock();
+			}
+		}
+		
+		return sinks;
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public List<? extends TopicSource> getTopicSources() {
+	
+		List<TopicSource> sources = new ArrayList<TopicSource>();
+		
+		sources.addAll(UebTopicSource.factory.inventory());
+		sources.addAll(DmaapTopicSource.factory.inventory());
+		
+		return sources;
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public List<? extends TopicSink> getTopicSinks() {
+		
+		List<TopicSink> sinks = new ArrayList<TopicSink>();	
+		
+		sinks.addAll(UebTopicSink.factory.inventory());
+		sinks.addAll(DmaapTopicSink.factory.inventory());
+		
+		return sinks;
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@JsonIgnore
+	@Override
+	public List<UebTopicSource> getUebTopicSources() {
+		return UebTopicSource.factory.inventory();
+	}
+	
+	/**
+	 * {@inheritDoc}
+	 */
+	@JsonIgnore
+	@Override
+	public List<DmaapTopicSource> getDmaapTopicSources() {
+		return DmaapTopicSource.factory.inventory();
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@JsonIgnore
+	@Override
+	public List<UebTopicSink> getUebTopicSinks() {
+		return UebTopicSink.factory.inventory();
+	}
+	
+	/**
+	 * {@inheritDoc}
+	 */
+	@JsonIgnore
+	@Override
+	public List<DmaapTopicSink> getDmaapTopicSinks() {
+		return DmaapTopicSink.factory.inventory();
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public boolean start() throws IllegalStateException {
+		
+		synchronized (this) {
+			if (this.locked) {
+				throw new IllegalStateException(this + " is locked");
+			}
+			
+			if (this.alive) {
+				return true;
+			}
+			
+			this.alive = true;
+		}
+		
+		List<Startable> endpoints = getEndpoints();
+		
+		boolean success = true;
+		for (Startable endpoint: endpoints) {
+			try {
+				success = endpoint.start() && success;
+			} catch (Exception e) {
+				success = false;
+				logger.error(MessageCodes.EXCEPTION_ERROR, e, endpoint.toString(), this.toString());
+			}
+		}
+		
+		return success;
+	}
+
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public boolean stop() throws IllegalStateException {
+		
+		/* 
+		 * stop regardless if it is locked, in other
+		 * words, stop operation has precedence over
+		 * locks.
+		 */
+		synchronized (this) {			
+			this.alive = false;
+		}
+		
+		List<Startable> endpoints = getEndpoints();
+		
+		boolean success = true;
+		for (Startable endpoint: endpoints) {
+			try {
+				success = endpoint.stop() && success;
+			} catch (Exception e) {
+				success = false;
+				logger.error(MessageCodes.EXCEPTION_ERROR, e, endpoint.toString(), this.toString());
+			}
+		}
+		
+		return success;
+	}
+	
+	/**
+	 * 
+	 * @return list of managed endpoints
+	 */
+	@JsonIgnore
+	protected List<Startable> getEndpoints() {
+		List<Startable> endpoints = new ArrayList<Startable>();
+
+		endpoints.addAll(this.getTopicSources());
+		endpoints.addAll(this.getTopicSinks());
+		
+		return endpoints;
+	}
+	
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public void shutdown() throws IllegalStateException {
+		UebTopicSource.factory.destroy();
+		UebTopicSink.factory.destroy();
+		
+		DmaapTopicSource.factory.destroy();
+		DmaapTopicSink.factory.destroy();
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public boolean isAlive() {
+		return this.alive;
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public boolean lock() {
+		
+		synchronized (this) {
+			if (locked)
+				return true;
+			
+			this.locked = true;
+		}
+		
+		for (TopicSource source: this.getTopicSources()) {
+			source.lock();
+		}
+		
+		for (TopicSink sink: this.getTopicSinks()) {
+			sink.lock();
+		}
+		
+		return true;
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public boolean unlock() {
+		synchronized (this) {
+			if (!locked)
+				return true;
+			
+			this.locked = false;
+		}
+		
+		for (TopicSource source: this.getTopicSources()) {
+			source.unlock();
+		}
+		
+		for (TopicSink sink: this.getTopicSinks()) {
+			sink.unlock();
+		}
+		
+		return true;
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public boolean isLocked() {
+		return this.locked;
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public List<? extends TopicSource> getTopicSources(List<String> topicNames)
+			throws IllegalStateException, IllegalArgumentException {
+		
+		if (topicNames == null) {
+			throw new IllegalArgumentException("must provide a list of topics");
+		}
+		
+		List<TopicSource> sources = new ArrayList<TopicSource>();
+		for (String topic: topicNames) {
+			try {
+				TopicSource uebSource = this.getUebTopicSource(topic);
+				if (uebSource != null)
+					sources.add(uebSource);
+			} catch (Exception e) {
+				logger.info("No UEB source for topic: " + topic);
+			}
+			
+			try {
+				TopicSource dmaapSource = this.getDmaapTopicSource(topic);
+				if (dmaapSource != null)
+					sources.add(dmaapSource);
+			} catch (Exception e) {
+				logger.info("No DMAAP source for topic: " + topic);
+			}
+		}
+		return sources;
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public List<? extends TopicSink> getTopicSinks(List<String> topicNames)
+			throws IllegalStateException, IllegalArgumentException {
+		
+		if (topicNames == null) {
+			throw new IllegalArgumentException("must provide a list of topics");
+		}
+		
+		List<TopicSink> sinks = new ArrayList<TopicSink>();
+		for (String topic: topicNames) {
+			try {
+				TopicSink uebSink = this.getUebTopicSink(topic);
+				if (uebSink != null)
+					sinks.add(uebSink);
+			} catch (Exception e) {
+				logger.info("No UEB sink for topic: " + topic);
+			}
+			
+			try {
+				TopicSink dmaapSink = this.getDmaapTopicSink(topic);
+				if (dmaapSink != null)
+					sinks.add(dmaapSink);
+			} catch (Exception e) {
+				logger.info("No DMAAP sink for topic: " + topic);
+			}
+		}
+		return sinks;
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public TopicSource getTopicSource(Topic.CommInfrastructure commType, String topicName)
+			throws IllegalStateException, IllegalArgumentException, UnsupportedOperationException {
+		
+		if (commType == null) {
+			throw new IllegalArgumentException
+				("Invalid parameter: a communication infrastructure required to fetch " + topicName);
+		}
+		
+		if (topicName == null) {
+			throw new IllegalArgumentException
+				("Invalid parameter: a communication infrastructure required to fetch " + topicName);
+		}
+		
+		switch (commType) {
+		case UEB:
+			return this.getUebTopicSource(topicName);
+		case DMAAP:
+			return this.getDmaapTopicSource(topicName);
+		case REST:
+		default:
+			throw new UnsupportedOperationException("Unsupported " + commType.name());
+		}
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public TopicSink getTopicSink(Topic.CommInfrastructure commType, String topicName)
+			throws IllegalStateException, IllegalArgumentException, UnsupportedOperationException {
+		if (commType == null) {
+			throw new IllegalArgumentException
+				("Invalid parameter: a communication infrastructure required to fetch " + topicName);
+		}
+		
+		if (topicName == null) {
+			throw new IllegalArgumentException
+				("Invalid parameter: a communication infrastructure required to fetch " + topicName);
+		}
+		
+		switch (commType) {
+		case UEB:
+			return this.getUebTopicSink(topicName);
+		case DMAAP:
+			return this.getDmaapTopicSink(topicName);
+		case REST:
+		default:
+			throw new UnsupportedOperationException("Unsupported " + commType.name());
+		}
+	}
+	
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public List<? extends TopicSink> getTopicSinks(String topicName) 
+			throws IllegalStateException, IllegalArgumentException {
+
+		if (topicName == null) {
+			throw new IllegalArgumentException
+				("Invalid parameter: a communication infrastructure required to fetch " + topicName);
+		}
+		
+		List<TopicSink> sinks = new ArrayList<TopicSink>();
+		
+		try {
+			sinks.add(this.getUebTopicSink(topicName));
+		} catch (Exception e) {
+			;
+		}
+		
+		try {
+			sinks.add(this.getDmaapTopicSink(topicName));
+		} catch (Exception e) {
+			;
+		}
+		
+		return sinks;
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public UebTopicSource getUebTopicSource(String topicName) throws IllegalStateException, IllegalArgumentException {
+		return UebTopicSource.factory.get(topicName);
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public UebTopicSink getUebTopicSink(String topicName) throws IllegalStateException, IllegalArgumentException {
+		return UebTopicSink.factory.get(topicName);
+	}
+
+	@Override
+	public DmaapTopicSource getDmaapTopicSource(String topicName)
+			throws IllegalStateException, IllegalArgumentException {
+		return DmaapTopicSource.factory.get(topicName);
+	}
+
+	@Override
+	public DmaapTopicSink getDmaapTopicSink(String topicName) throws IllegalStateException, IllegalArgumentException {
+		return DmaapTopicSink.factory.get(topicName);
+	}
+	
+}
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicListener.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicListener.java
new file mode 100644
index 0000000..7a2e971
--- /dev/null
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicListener.java
@@ -0,0 +1,42 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.policy.drools.event.comm;
+
+/**
+ * Listener for event messages entering the Policy Engine
+ */
+public interface TopicListener {
+	
+	/**
+	 * Notification of a new Event over a given Topic
+	 * 
+	 * @param commType communication infrastructure type
+	 * @param topic topic name
+	 * @param event event message as a string
+	 * 
+	 * @return boolean.  True if the invoking event dispatcher should continue 
+	 * dispatching the event to subsequent listeners.  False if it is requested
+	 * to the invoking event dispatcher to stop dispatching the same event to
+	 * other listeners of less priority.   This mechanism is generally not used.
+	 */
+	public boolean onTopicEvent(Topic.CommInfrastructure commType, String topic, String event);
+
+}
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicRegisterable.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicRegisterable.java
new file mode 100644
index 0000000..2ce8e9e
--- /dev/null
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicRegisterable.java
@@ -0,0 +1,42 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.policy.drools.event.comm;
+
+/**
+ * Marks a Topic entity as registerable
+ */
+public interface TopicRegisterable {
+	
+	/**
+	 * Register for notification of events with this Topic Entity
+	 * 
+	 * @param topicListener the listener of events
+	 */
+	public void register(TopicListener topicListener);
+	
+	/**
+	 * Unregisters for notification of events with this Topic Entity
+	 * 
+	 * @param topicListener the listener of events
+	 */
+	public void unregister(TopicListener topicListener);
+
+}
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicSink.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicSink.java
new file mode 100644
index 0000000..2250b1e
--- /dev/null
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicSink.java
@@ -0,0 +1,42 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.policy.drools.event.comm;
+
+import org.openecomp.policy.drools.properties.Lockable;
+import org.openecomp.policy.drools.properties.Startable;
+
+/**
+ * Marks a given Topic Endpoint as able to send messages over a topic
+ */
+public interface TopicSink extends Topic, Startable, Lockable {
+	
+	/**
+	 * Sends a string message over this Topic Endpoint
+	 * 
+	 * @param message message to send
+	 * @return true if the send operation succeeded, false otherwise
+	 * @throws IllegalArgumentException an invalid message has been provided
+	 * @throws IllegalStateException the entity is in an state that prevents
+	 * it from sending messages, for example, locked or stopped.
+	 */
+	public boolean send(String message) throws IllegalArgumentException, IllegalStateException;
+
+}
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicSource.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicSource.java
new file mode 100644
index 0000000..0dfbe1c
--- /dev/null
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/TopicSource.java
@@ -0,0 +1,40 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.policy.drools.event.comm;
+
+import org.openecomp.policy.drools.properties.Lockable;
+import org.openecomp.policy.drools.properties.Startable;
+
+/**
+ * Marker for a Topic Entity, indicating that the entity is able to read
+ * over a topic
+ */
+public interface TopicSource extends TopicRegisterable, Topic, Startable, Lockable {
+	
+	/**
+	 * pushes an event into the source programatically
+	 * 
+	 * @param event the event in json format
+	 * @return true if it can be processed correctly, false otherwise
+	 */
+	public boolean offer(String event);
+
+}
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/BusTopic.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/BusTopic.java
new file mode 100644
index 0000000..c38f627
--- /dev/null
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/BusTopic.java
@@ -0,0 +1,26 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.policy.drools.event.comm.bus;
+
+public interface BusTopic {
+	public String getApiKey();
+	public String getApiSecret();
+}
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/BusTopicSink.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/BusTopicSink.java
new file mode 100644
index 0000000..30978c2
--- /dev/null
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/BusTopicSink.java
@@ -0,0 +1,47 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.policy.drools.event.comm.bus;
+
+import org.openecomp.policy.drools.event.comm.TopicSink;
+
+/**
+ * Topic Sink over Bus Infrastructure (DMAAP/UEB)
+ */
+public interface BusTopicSink extends BusTopic, TopicSink {
+	/**
+	 * Log Failures after X number of retries
+	 */
+	public static final int DEFAULT_LOG_SEND_FAILURES_AFTER = 1;
+	
+	/**
+	 * Sets the UEB partition key for published messages
+	 * 
+	 * @param partitionKey the partition key
+	 */
+	public void setPartitionKey(String partitionKey);
+	
+	/**
+	 * return the partition key in used by the system to publish messages
+	 * 
+	 * @return the partition key
+	 */
+	public String getPartitionKey();
+}
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/BusTopicSource.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/BusTopicSource.java
new file mode 100644
index 0000000..e6a46d2
--- /dev/null
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/BusTopicSource.java
@@ -0,0 +1,83 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.policy.drools.event.comm.bus;
+
+import org.openecomp.policy.drools.event.comm.TopicSource;
+
+/**
+ * Generic Topic Source for UEB/DMAAP Communication Infrastructure
+ *
+ */
+public interface BusTopicSource extends BusTopic, TopicSource {
+	
+	/**
+	 * Default Consumer Instance Value
+	 */
+	public static String DEFAULT_CONSUMER_INSTANCE = "0";
+	
+	/**
+	 * Default Timeout fetching in milliseconds
+	 */
+	public static int DEFAULT_TIMEOUT_MS_FETCH = 15000;
+	
+	/**
+	 * Default maximum number of messages fetch at the time
+	 */
+	public static int DEFAULT_LIMIT_FETCH = 100;
+	
+	/**
+	 * Definition of No Timeout fetching
+	 */
+	public static int NO_TIMEOUT_MS_FETCH = -1;
+	
+	/**
+	 * Definition of No limit fetching
+	 */
+	public static int NO_LIMIT_FETCH = -1;
+	
+	/**
+	 * gets the consumer group
+	 * 
+	 * @return consumer group
+	 */
+	public String getConsumerGroup();
+	
+	/**
+	 * gets the consumer instance
+	 * 
+	 * @return consumer instance
+	 */
+	public String getConsumerInstance();
+	
+	/**
+	 * gets the fetch timeout
+	 * 
+	 * @return fetch timeout
+	 */
+	public int getFetchTimeout();
+	
+	/**
+	 * gets the fetch limit
+	 * 
+	 * @return fetch limit
+	 */
+	public int getFetchLimit();
+}
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSink.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSink.java
new file mode 100644
index 0000000..3c55c9f
--- /dev/null
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSink.java
@@ -0,0 +1,30 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.policy.drools.event.comm.bus;
+
+public interface DmaapTopicSink extends BusTopicSink {
+
+	/**
+	 * Factory of UebTopicWriter for instantiation and management purposes
+	 */
+	
+	public static final DmaapTopicSinkFactory factory = new IndexedDmaapTopicSinkFactory();
+}
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSinkFactory.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSinkFactory.java
new file mode 100644
index 0000000..5b4cfd4
--- /dev/null
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSinkFactory.java
@@ -0,0 +1,308 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.policy.drools.event.comm.bus;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Properties;
+
+import org.openecomp.policy.drools.event.comm.bus.internal.InlineDmaapTopicSink;
+import org.openecomp.policy.common.logging.flexlogger.FlexLogger;
+import org.openecomp.policy.common.logging.flexlogger.Logger;
+import org.openecomp.policy.drools.properties.PolicyProperties;
+
+/**
+ * DMAAP Topic Sink Factory
+ */
+public interface DmaapTopicSinkFactory {
+	
+	/**
+	 * Instantiates a new DMAAP Topic Sink
+	 * 
+	 * @param servers list of servers
+	 * @param topic topic name
+	 * @param apiKey API Key
+	 * @param apiSecret API Secret
+	 * @param userName AAF user name
+	 * @param password AAF password
+	 * @param partitionKey Consumer Group
+	 * @param managed is this sink endpoint managed?
+	 * 
+	 * @return an DMAAP Topic Sink
+	 * @throws IllegalArgumentException if invalid parameters are present
+	 */
+	public DmaapTopicSink build(List<String> servers, 
+								String topic, 
+								String apiKey, 
+								String apiSecret,
+								String userName,
+								String password,
+								String partitionKey,
+								boolean managed)
+			throws IllegalArgumentException;
+	
+	/**
+	 * Creates an DMAAP Topic Sink based on properties files
+	 * 
+	 * @param properties Properties containing initialization values
+	 * 
+	 * @return an DMAAP Topic Sink
+	 * @throws IllegalArgumentException if invalid parameters are present
+	 */
+	public List<DmaapTopicSink> build(Properties properties)
+			throws IllegalArgumentException;
+	
+	/**
+	 * Instantiates a new DMAAP Topic Sink
+	 * 
+	 * @param servers list of servers
+	 * @param topic topic name
+	 * 
+	 * @return an DMAAP Topic Sink
+	 * @throws IllegalArgumentException if invalid parameters are present
+	 */
+	public DmaapTopicSink build(List<String> servers, String topic)
+			throws IllegalArgumentException;
+	
+	/**
+	 * Destroys an DMAAP Topic Sink based on a topic
+	 * 
+	 * @param topic topic name
+	 * @throws IllegalArgumentException if invalid parameters are present
+	 */
+	public void destroy(String topic);
+
+	/**
+	 * gets an DMAAP Topic Sink based on topic name
+	 * @param topic the topic name
+	 * 
+	 * @return an DMAAP Topic Sink with topic name
+	 * @throws IllegalArgumentException if an invalid topic is provided
+	 * @throws IllegalStateException if the DMAAP Topic Reader is 
+	 * an incorrect state
+	 */
+	public DmaapTopicSink get(String topic)
+			   throws IllegalArgumentException, IllegalStateException;
+	
+	/**
+	 * Provides a snapshot of the DMAAP Topic Sinks
+	 * @return a list of the DMAAP Topic Sinks
+	 */
+	public List<DmaapTopicSink> inventory();
+
+	/**
+	 * Destroys all DMAAP Topic Sinks
+	 */
+	public void destroy();
+}
+
+/* ------------- implementation ----------------- */
+
+/**
+ * Factory of DMAAP Reader Topics indexed by topic name
+ */
+class IndexedDmaapTopicSinkFactory implements DmaapTopicSinkFactory {
+	// get an instance of logger 
+	private static Logger  logger = FlexLogger.getLogger(IndexedDmaapTopicSinkFactory.class);	
+	
+	/**
+	 * DMAAP Topic Name Index
+	 */
+	protected HashMap<String, DmaapTopicSink> dmaapTopicWriters =
+			new HashMap<String, DmaapTopicSink>();
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public DmaapTopicSink build(List<String> servers, 
+								String topic, 
+								String apiKey, 
+								String apiSecret,
+								String userName,
+								String password,
+								String partitionKey,
+								boolean managed) 
+			throws IllegalArgumentException {
+		
+		if (topic == null || topic.isEmpty()) {
+			throw new IllegalArgumentException("A topic must be provided");
+		}
+		
+		synchronized (this) {
+			if (dmaapTopicWriters.containsKey(topic)) {
+				return dmaapTopicWriters.get(topic);
+			}
+			
+			DmaapTopicSink dmaapTopicSink = 
+					new InlineDmaapTopicSink(servers, topic, 
+										     apiKey, apiSecret,
+										     userName, password,
+										     partitionKey);
+			
+			if (managed)
+				dmaapTopicWriters.put(topic, dmaapTopicSink);
+			return dmaapTopicSink;
+		}
+	}
+	
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public DmaapTopicSink build(List<String> servers, String topic) throws IllegalArgumentException {
+		return this.build(servers, topic, null, null, null, null, null, true);
+	}
+	
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public List<DmaapTopicSink> build(Properties properties) throws IllegalArgumentException {
+		
+		String writeTopics = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS);
+		if (writeTopics == null || writeTopics.isEmpty()) {
+			logger.warn("No topic for DMAAP Sink " + properties);
+			return new ArrayList<DmaapTopicSink>();
+		}
+		List<String> writeTopicList = new ArrayList<String>(Arrays.asList(writeTopics.split("\\s*,\\s*")));
+		
+		synchronized(this) {
+			List<DmaapTopicSink> dmaapTopicWriters = new ArrayList<DmaapTopicSink>();
+			for (String topic: writeTopicList) {
+				
+				String servers = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + "." + 
+				                                        topic + 
+				                                        PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
+				if (servers == null || servers.isEmpty()) {
+					logger.error("No DMAAP servers provided in " + properties);
+					continue;
+				}
+				
+				List<String> serverList = new ArrayList<String>(Arrays.asList(servers.split("\\s*,\\s*")));
+				
+				String apiKey = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + 
+						                               "." + topic + 
+						                               PolicyProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);		 
+				String apiSecret = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + 
+                                                          "." + topic + 
+                                                          PolicyProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
+				
+				String aafMechId = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + 
+                                                          "." + topic + 
+                                                          PolicyProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX);
+				String aafPassword = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + 
+         				                                    "." + topic + 
+         				                                    PolicyProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX);
+				
+				String partitionKey = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SINK_TOPICS + 
+                                                             "." + topic + 
+                                                             PolicyProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX);
+				
+				String managedString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic +
+						                                      PolicyProperties.PROPERTY_MANAGED_SUFFIX);
+				boolean managed = true;
+				if (managedString != null && !managedString.isEmpty()) {
+					managed = Boolean.parseBoolean(managedString);
+				}
+				
+				DmaapTopicSink dmaapTopicSink = this.build(serverList, topic, 
+						   						           apiKey, apiSecret, aafMechId, aafPassword,
+						   						           partitionKey, managed);
+				dmaapTopicWriters.add(dmaapTopicSink);
+			}
+			return dmaapTopicWriters;
+		}
+	}
+	
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public void destroy(String topic) 
+		   throws IllegalArgumentException {
+		
+		if (topic == null || topic.isEmpty()) {
+			throw new IllegalArgumentException("A topic must be provided");
+		}
+		
+		DmaapTopicSink dmaapTopicWriter;
+		synchronized(this) {
+			if (!dmaapTopicWriters.containsKey(topic)) {
+				return;
+			}
+			
+			dmaapTopicWriter = dmaapTopicWriters.remove(topic);
+		}
+		
+		dmaapTopicWriter.shutdown();
+	}
+	
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public void destroy() {
+		List<DmaapTopicSink> writers = this.inventory();
+		for (DmaapTopicSink writer: writers) {
+			writer.shutdown();
+		}
+		
+		synchronized(this) {
+			this.dmaapTopicWriters.clear();
+		}
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public DmaapTopicSink get(String topic) 
+			throws IllegalArgumentException, IllegalStateException {
+		
+		if (topic == null || topic.isEmpty()) {
+			throw new IllegalArgumentException("A topic must be provided");
+		}
+		
+		synchronized(this) {
+			if (dmaapTopicWriters.containsKey(topic)) {
+				return dmaapTopicWriters.get(topic);
+			} else {
+				throw new IllegalStateException("DmaapTopicSink for " + topic + " not found");
+			}
+		}
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public synchronized List<DmaapTopicSink> inventory() {
+		 List<DmaapTopicSink> writers = 
+				 new ArrayList<DmaapTopicSink>(this.dmaapTopicWriters.values());
+		 return writers;
+	}
+	
+}
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSource.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSource.java
new file mode 100644
index 0000000..8da7906
--- /dev/null
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSource.java
@@ -0,0 +1,29 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.policy.drools.event.comm.bus;
+
+public interface DmaapTopicSource extends BusTopicSource {
+
+	/**
+	 * factory for managing and tracking DMAAP sources
+	 */
+	public static DmaapTopicSourceFactory factory = new IndexedDmaapTopicSourceFactory();
+}
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSourceFactory.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSourceFactory.java
new file mode 100644
index 0000000..f8d85eb
--- /dev/null
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/DmaapTopicSourceFactory.java
@@ -0,0 +1,380 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.policy.drools.event.comm.bus;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Properties;
+
+import org.openecomp.policy.drools.event.comm.bus.internal.SingleThreadedDmaapTopicSource;
+import org.openecomp.policy.common.logging.flexlogger.FlexLogger;
+import org.openecomp.policy.common.logging.flexlogger.Logger;
+import org.openecomp.policy.drools.properties.PolicyProperties;
+/**
+ * DMAAP Topic Source Factory
+ */
+public interface DmaapTopicSourceFactory {
+	
+	/**
+	 * Creates an DMAAP Topic Source based on properties files
+	 * 
+	 * @param properties Properties containing initialization values
+	 * 
+	 * @return an DMAAP Topic Source
+	 * @throws IllegalArgumentException if invalid parameters are present
+	 */
+	public List<DmaapTopicSource> build(Properties properties)
+			throws IllegalArgumentException;
+	
+	/**
+	 * Instantiates a new DMAAP Topic Source
+	 * 
+	 * @param servers list of servers
+	 * @param topic topic name
+	 * @param apiKey API Key
+	 * @param apiSecret API Secret
+	 * @param userName user name
+	 * @param password password
+	 * @param consumerGroup Consumer Group
+	 * @param consumerInstance Consumer Instance
+	 * @param fetchTimeout Read Fetch Timeout
+	 * @param fetchLimit Fetch Limit
+	 * @param managed is this endpoind managed?
+	 * 
+	 * @return an DMAAP Topic Source
+	 * @throws IllegalArgumentException if invalid parameters are present
+	 */
+	public DmaapTopicSource build(List<String> servers, 
+								String topic, 
+								String apiKey, 
+								String apiSecret, 
+								String userName, 
+								String password,
+								String consumerGroup, 
+								String consumerInstance,
+								int fetchTimeout,
+								int fetchLimit,
+								boolean managed)
+			throws IllegalArgumentException;
+	
+	/**
+	 * Instantiates a new DMAAP Topic Source
+	 * 
+	 * @param servers list of servers
+	 * @param topic topic name
+	 * @param apiKey API Key
+	 * @param apiSecret API Secret
+	 * 
+	 * @return an DMAAP Topic Source
+	 * @throws IllegalArgumentException if invalid parameters are present
+	 */
+	public DmaapTopicSource build(List<String> servers, 
+								String topic, 
+								String apiKey, 
+								String apiSecret)
+			throws IllegalArgumentException;
+
+	/**
+	 * Instantiates a new DMAAP Topic Source
+	 * 
+	 * @param uebTopicReaderType Implementation type
+	 * @param servers list of servers
+	 * @param topic topic name
+	 * 
+	 * @return an DMAAP Topic Source
+	 * @throws IllegalArgumentException if invalid parameters are present
+	 */
+	public DmaapTopicSource build(List<String> servers, 
+								String topic)
+			throws IllegalArgumentException;	
+	
+	/**
+	 * Destroys an DMAAP Topic Source based on a topic
+	 * 
+	 * @param topic topic name
+	 * @throws IllegalArgumentException if invalid parameters are present
+	 */
+	public void destroy(String topic);
+	
+	/**
+	 * Destroys all DMAAP Topic Sources
+	 */
+	public void destroy();
+	
+	/**
+	 * gets an DMAAP Topic Source based on topic name
+	 * @param topic the topic name
+	 * @return an DMAAP Topic Source with topic name
+	 * @throws IllegalArgumentException if an invalid topic is provided
+	 * @throws IllegalStateException if the DMAAP Topic Source is 
+	 * an incorrect state
+	 */
+	public DmaapTopicSource get(String topic)
+		   throws IllegalArgumentException, IllegalStateException;
+	
+	/**
+	 * Provides a snapshot of the DMAAP Topic Sources
+	 * @return a list of the DMAAP Topic Sources
+	 */
+	public List<DmaapTopicSource> inventory();
+}
+
+
+/* ------------- implementation ----------------- */
+
+/**
+ * Factory of DMAAP Source Topics indexed by topic name
+ */
+
+class IndexedDmaapTopicSourceFactory implements DmaapTopicSourceFactory {
+	// get an instance of logger 
+	private static Logger  logger = FlexLogger.getLogger(IndexedDmaapTopicSourceFactory.class);		
+	/**
+	 * UEB Topic Name Index
+	 */
+	protected HashMap<String, DmaapTopicSource> dmaapTopicSources =
+			new HashMap<String, DmaapTopicSource>();
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public DmaapTopicSource build(List<String> servers, 
+								String topic, 
+								String apiKey, 
+								String apiSecret, 
+								String userName, 
+								String password,
+								String consumerGroup, 
+								String consumerInstance,
+								int fetchTimeout,
+								int fetchLimit,
+								boolean managed) 
+			throws IllegalArgumentException {
+		
+		if (topic == null || topic.isEmpty()) {
+			throw new IllegalArgumentException("A topic must be provided");
+		}
+		
+		synchronized(this) {
+			if (dmaapTopicSources.containsKey(topic)) {
+				return dmaapTopicSources.get(topic);
+			}
+			
+			DmaapTopicSource dmaapTopicSource = 
+					new SingleThreadedDmaapTopicSource(servers, topic, 
+													 apiKey, apiSecret, userName, password,
+													 consumerGroup, consumerInstance, 
+													 fetchTimeout, fetchLimit);
+			
+			if (managed)
+				dmaapTopicSources.put(topic, dmaapTopicSource);
+			
+			return dmaapTopicSource;
+		}
+	}
+	
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public List<DmaapTopicSource> build(Properties properties) 
+			throws IllegalArgumentException {
+		
+		String readTopics = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS);
+		if (readTopics == null || readTopics.isEmpty()) {
+			logger.warn("No topic for UEB Source " + properties);
+			return new ArrayList<DmaapTopicSource>();
+		}
+		List<String> readTopicList = new ArrayList<String>(Arrays.asList(readTopics.split("\\s*,\\s*")));		
+		
+		List<DmaapTopicSource> dmaapTopicSource_s = new ArrayList<DmaapTopicSource>();
+		synchronized(this) {
+			for (String topic: readTopicList) {
+				
+				String servers = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + "." + 
+                                                        topic + 
+                                                        PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
+				
+				if (servers == null || servers.isEmpty()) {
+					logger.error("No UEB servers provided in " + properties);
+					continue;
+				}
+				
+				List<String> serverList = new ArrayList<String>(Arrays.asList(servers.split("\\s*,\\s*")));
+				
+				String apiKey = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + 
+                        							   "." + topic + 
+                                                       PolicyProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
+				
+				String apiSecret = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + 
+						                                  "." + topic + 
+                                                          PolicyProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
+				
+				String aafMechId = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + 
+								                          "." + topic + 
+								                          PolicyProperties.PROPERTY_TOPIC_AAF_MECHID_SUFFIX);
+
+				String aafPassword = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + 
+				                           				  "." + topic + 
+				                           				  PolicyProperties.PROPERTY_TOPIC_AAF_PASSWORD_SUFFIX);
+				
+				String consumerGroup = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + 
+						                                      "." + topic + 
+                                                              PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX);
+				
+				String consumerInstance = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + 
+						                                         "." + topic + 
+                                                                 PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX);
+				
+				String fetchTimeoutString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + 
+						                                           "." + topic + 
+                                                                   PolicyProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
+				int fetchTimeout = DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH;
+				if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) {
+					try {
+						fetchTimeout = Integer.parseInt(fetchTimeoutString);
+					} catch (NumberFormatException nfe) {
+						logger.warn("Fetch Timeout in invalid format for topic " + topic + ": " + fetchTimeoutString);
+					}
+				}
+					
+				String fetchLimitString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + 
+                                                                 "." + topic + 
+                                                                 PolicyProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
+				int fetchLimit = DmaapTopicSource.DEFAULT_LIMIT_FETCH;
+				if (fetchLimitString != null && !fetchLimitString.isEmpty()) {
+					try {
+						fetchLimit = Integer.parseInt(fetchLimitString);
+					} catch (NumberFormatException nfe) {
+						logger.warn("Fetch Limit in invalid format for topic " + topic + ": " + fetchLimitString);
+					}
+				}
+				
+				String managedString = properties.getProperty(PolicyProperties.PROPERTY_DMAAP_SOURCE_TOPICS + 
+                                                               "." + topic + 
+                                                              PolicyProperties.PROPERTY_MANAGED_SUFFIX);
+				boolean managed = true;
+				if (managedString != null && !managedString.isEmpty()) {
+					managed = Boolean.parseBoolean(managedString);
+				}
+				
+				DmaapTopicSource uebTopicSource = this.build(serverList, topic, 
+						   						           apiKey, apiSecret, aafMechId, aafPassword,
+						   						           consumerGroup, consumerInstance, 
+						   						           fetchTimeout, fetchLimit, managed);
+				dmaapTopicSource_s.add(uebTopicSource);
+			}
+		}
+		return dmaapTopicSource_s;
+	}
+	
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public DmaapTopicSource build(List<String> servers, 
+								String topic,
+								String apiKey, 
+								String apiSecret) {
+		return this.build(servers, topic, 
+				  		  apiKey, apiSecret, null, null,
+				  		  null, null,
+				  		  DmaapTopicSource.DEFAULT_TIMEOUT_MS_FETCH,
+				  		  DmaapTopicSource.DEFAULT_LIMIT_FETCH,
+				  		  true);
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public DmaapTopicSource build(List<String> servers, String topic) {
+		return this.build(servers, topic, null, null);
+	}	
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public void destroy(String topic) 
+		   throws IllegalArgumentException {
+		
+		if (topic == null || topic.isEmpty()) {
+			throw new IllegalArgumentException("A topic must be provided");
+		}
+		
+		DmaapTopicSource uebTopicSource;
+		
+		synchronized(this) {
+			if (!dmaapTopicSources.containsKey(topic)) {
+				return;
+			}
+			
+			uebTopicSource = dmaapTopicSources.remove(topic);
+		}
+		
+		uebTopicSource.shutdown();
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public DmaapTopicSource get(String topic) 
+	       throws IllegalArgumentException, IllegalStateException {
+		
+		if (topic == null || topic.isEmpty()) {
+			throw new IllegalArgumentException("A topic must be provided");
+		}
+		
+		synchronized(this) {
+			if (dmaapTopicSources.containsKey(topic)) {
+				return dmaapTopicSources.get(topic);
+			} else {
+				throw new IllegalArgumentException("DmaapTopicSource for " + topic + " not found");
+			}
+		}
+	}
+
+	@Override
+	public synchronized List<DmaapTopicSource> inventory() {
+		 List<DmaapTopicSource> readers = 
+				 new ArrayList<DmaapTopicSource>(this.dmaapTopicSources.values());
+		 return readers;
+	}
+
+	@Override
+	public void destroy() {
+		List<DmaapTopicSource> readers = this.inventory();
+		for (DmaapTopicSource reader: readers) {
+			reader.shutdown();
+		}
+		
+		synchronized(this) {
+			this.dmaapTopicSources.clear();
+		}
+	}
+	
+}
+
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSink.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSink.java
new file mode 100644
index 0000000..efa4dc5
--- /dev/null
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSink.java
@@ -0,0 +1,32 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.policy.drools.event.comm.bus;
+
+/**
+ * Topic Writer over UEB Infrastructure
+ */
+public interface UebTopicSink extends BusTopicSink {
+	
+	/**
+	 * Factory of UebTopicWriter for instantiation and management purposes
+	 */
+	public static final UebTopicSinkFactory factory = new IndexedUebTopicSinkFactory();
+}
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSinkFactory.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSinkFactory.java
new file mode 100644
index 0000000..85b9883
--- /dev/null
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSinkFactory.java
@@ -0,0 +1,292 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.policy.drools.event.comm.bus;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Properties;
+
+import org.openecomp.policy.drools.event.comm.bus.internal.InlineUebTopicSink;
+import org.openecomp.policy.common.logging.flexlogger.FlexLogger;
+import org.openecomp.policy.common.logging.flexlogger.Logger;
+import org.openecomp.policy.drools.properties.PolicyProperties;
+
+/**
+ * UEB Topic Sink Factory
+ */
+public interface UebTopicSinkFactory {
+	
+	/**
+	 * Instantiates a new UEB Topic Writer
+	 * 
+	 * @param servers list of servers
+	 * @param topic topic name
+	 * @param apiKey API Key
+	 * @param apiSecret API Secret
+	 * @param partitionKey Consumer Group
+	 * @param managed is this sink endpoint managed?
+	 * 
+	 * @return an UEB Topic Writer
+	 * @throws IllegalArgumentException if invalid parameters are present
+	 */
+	public UebTopicSink build(List<String> servers, 
+								String topic, 
+								String apiKey, 
+								String apiSecret,
+								String partitionKey,
+								boolean managed)
+			throws IllegalArgumentException;
+	
+	/**
+	 * Creates an UEB Topic Writer based on properties files
+	 * 
+	 * @param properties Properties containing initialization values
+	 * 
+	 * @return an UEB Topic Writer
+	 * @throws IllegalArgumentException if invalid parameters are present
+	 */
+	public List<UebTopicSink> build(Properties properties)
+			throws IllegalArgumentException;
+	
+	/**
+	 * Instantiates a new UEB Topic Writer
+	 * 
+	 * @param servers list of servers
+	 * @param topic topic name
+	 * 
+	 * @return an UEB Topic Writer
+	 * @throws IllegalArgumentException if invalid parameters are present
+	 */
+	public UebTopicSink build(List<String> servers, String topic)
+			throws IllegalArgumentException;
+	
+	/**
+	 * Destroys an UEB Topic Writer based on a topic
+	 * 
+	 * @param topic topic name
+	 * @throws IllegalArgumentException if invalid parameters are present
+	 */
+	public void destroy(String topic);
+
+	/**
+	 * gets an UEB Topic Writer based on topic name
+	 * @param topic the topic name
+	 * 
+	 * @return an UEB Topic Writer with topic name
+	 * @throws IllegalArgumentException if an invalid topic is provided
+	 * @throws IllegalStateException if the UEB Topic Reader is 
+	 * an incorrect state
+	 */
+	public UebTopicSink get(String topic)
+			   throws IllegalArgumentException, IllegalStateException;
+	
+	/**
+	 * Provides a snapshot of the UEB Topic Writers
+	 * @return a list of the UEB Topic Writers
+	 */
+	public List<UebTopicSink> inventory();
+
+	/**
+	 * Destroys all UEB Topic Writers
+	 */
+	public void destroy();
+}
+
+/* ------------- implementation ----------------- */
+
+/**
+ * Factory of UEB Reader Topics indexed by topic name
+ */
+class IndexedUebTopicSinkFactory implements UebTopicSinkFactory {
+	// get an instance of logger 
+	private static Logger  logger = FlexLogger.getLogger(IndexedUebTopicSinkFactory.class);		
+	/**
+	 * UEB Topic Name Index
+	 */
+	protected HashMap<String, UebTopicSink> uebTopicSinks =
+			new HashMap<String, UebTopicSink>();
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public UebTopicSink build(List<String> servers, 
+								String topic, 
+								String apiKey, 
+								String apiSecret,
+								String partitionKey,
+								boolean managed) 
+			throws IllegalArgumentException {
+		
+		if (topic == null || topic.isEmpty()) {
+			throw new IllegalArgumentException("A topic must be provided");
+		}
+		
+		synchronized (this) {
+			if (uebTopicSinks.containsKey(topic)) {
+				return uebTopicSinks.get(topic);
+			}
+			
+			UebTopicSink uebTopicWriter = 
+					new InlineUebTopicSink(servers, topic, 
+										   apiKey, apiSecret,partitionKey);
+			
+			if (managed)
+				uebTopicSinks.put(topic, uebTopicWriter);
+			
+			return uebTopicWriter;
+		}
+	}
+	
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public UebTopicSink build(List<String> servers, String topic) throws IllegalArgumentException {
+		return this.build(servers, topic, null, null, null, true);
+	}
+	
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public List<UebTopicSink> build(Properties properties) throws IllegalArgumentException {
+		
+		String writeTopics = properties.getProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS);
+		if (writeTopics == null || writeTopics.isEmpty()) {
+			logger.warn("No topic for UEB Sink " + properties);
+			return new ArrayList<UebTopicSink>();
+		}
+		List<String> writeTopicList = new ArrayList<String>(Arrays.asList(writeTopics.split("\\s*,\\s*")));
+		
+		synchronized(this) {
+			List<UebTopicSink> uebTopicWriters = new ArrayList<UebTopicSink>();
+			for (String topic: writeTopicList) {
+				
+				String servers = properties.getProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." + 
+				                                        topic + 
+				                                        PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
+				if (servers == null || servers.isEmpty()) {
+					logger.error("No UEB servers provided in " + properties);
+					continue;
+				}
+				
+				List<String> serverList = new ArrayList<String>(Arrays.asList(servers.split("\\s*,\\s*")));
+				
+				String apiKey = properties.getProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + 
+						                               "." + topic + 
+						                               PolicyProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);		 
+				String apiSecret = properties.getProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + 
+                                                          "." + topic + 
+                                                          PolicyProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);	
+				String partitionKey = properties.getProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + 
+                                                             "." + topic + 
+                                                             PolicyProperties.PROPERTY_TOPIC_SINK_PARTITION_KEY_SUFFIX);
+				
+				String managedString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SINK_TOPICS + "." + topic +
+						                                      PolicyProperties.PROPERTY_MANAGED_SUFFIX);
+				boolean managed = true;
+				if (managedString != null && !managedString.isEmpty()) {
+					managed = Boolean.parseBoolean(managedString);
+				}
+				
+				UebTopicSink uebTopicWriter = this.build(serverList, topic, 
+						   						         apiKey, apiSecret, 
+						   						         partitionKey, managed);
+				uebTopicWriters.add(uebTopicWriter);
+			}
+			return uebTopicWriters;
+		}
+	}
+	
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public void destroy(String topic) 
+		   throws IllegalArgumentException {
+		
+		if (topic == null || topic.isEmpty()) {
+			throw new IllegalArgumentException("A topic must be provided");
+		}
+		
+		UebTopicSink uebTopicWriter;
+		synchronized(this) {
+			if (!uebTopicSinks.containsKey(topic)) {
+				return;
+			}
+			
+			uebTopicWriter = uebTopicSinks.remove(topic);
+		}
+		
+		uebTopicWriter.shutdown();
+	}
+	
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public void destroy() {
+		List<UebTopicSink> writers = this.inventory();
+		for (UebTopicSink writer: writers) {
+			writer.shutdown();
+		}
+		
+		synchronized(this) {
+			this.uebTopicSinks.clear();
+		}
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public UebTopicSink get(String topic) 
+			throws IllegalArgumentException, IllegalStateException {
+		
+		if (topic == null || topic.isEmpty()) {
+			throw new IllegalArgumentException("A topic must be provided");
+		}
+		
+		synchronized(this) {
+			if (uebTopicSinks.containsKey(topic)) {
+				return uebTopicSinks.get(topic);
+			} else {
+				throw new IllegalStateException("UebTopicSink for " + topic + " not found");
+			}
+		}
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public synchronized List<UebTopicSink> inventory() {
+		 List<UebTopicSink> writers = 
+				 new ArrayList<UebTopicSink>(this.uebTopicSinks.values());
+		 return writers;
+	}
+	
+}
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSource.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSource.java
new file mode 100644
index 0000000..4da0130
--- /dev/null
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSource.java
@@ -0,0 +1,34 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.policy.drools.event.comm.bus;
+
+/**
+ * Topic Source for UEB Communication Infrastructure
+ *
+ */
+public interface UebTopicSource extends BusTopicSource {
+	
+	/**
+	 * factory for managing and tracking UEB readers
+	 */
+	public static UebTopicSourceFactory factory = 
+					new IndexedUebTopicSourceFactory();
+}
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSourceFactory.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSourceFactory.java
new file mode 100644
index 0000000..bf2a403
--- /dev/null
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/UebTopicSourceFactory.java
@@ -0,0 +1,362 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.policy.drools.event.comm.bus;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Properties;
+
+import org.openecomp.policy.drools.event.comm.bus.internal.SingleThreadedUebTopicSource;
+import org.openecomp.policy.common.logging.flexlogger.FlexLogger;
+import org.openecomp.policy.common.logging.flexlogger.Logger;
+import org.openecomp.policy.drools.properties.PolicyProperties;
+
+/**
+ * UEB Topic Source Factory
+ */
+public interface UebTopicSourceFactory {
+	
+	/**
+	 * Creates an UEB Topic Source based on properties files
+	 * 
+	 * @param properties Properties containing initialization values
+	 * 
+	 * @return an UEB Topic Source
+	 * @throws IllegalArgumentException if invalid parameters are present
+	 */
+	public List<UebTopicSource> build(Properties properties)
+			throws IllegalArgumentException;
+	
+	/**
+	 * Instantiates a new UEB Topic Source
+	 * 
+	 * @param servers list of servers
+	 * @param topic topic name
+	 * @param apiKey API Key
+	 * @param apiSecret API Secret
+	 * @param consumerGroup Consumer Group
+	 * @param consumerInstance Consumer Instance
+	 * @param fetchTimeout Read Fetch Timeout
+	 * @param fetchLimit Fetch Limit
+	 * @param managed is this source endpoint managed?
+	 * 
+	 * @return an UEB Topic Source
+	 * @throws IllegalArgumentException if invalid parameters are present
+	 */
+	public UebTopicSource build(List<String> servers, 
+								String topic, 
+								String apiKey, 
+								String apiSecret, 
+								String consumerGroup, 
+								String consumerInstance,
+								int fetchTimeout,
+								int fetchLimit,
+								boolean managed)
+			throws IllegalArgumentException;
+	
+	/**
+	 * Instantiates a new UEB Topic Source
+	 * 
+	 * @param servers list of servers
+	 * @param topic topic name
+	 * @param apiKey API Key
+	 * @param apiSecret API Secret
+	 * 
+	 * @return an UEB Topic Source
+	 * @throws IllegalArgumentException if invalid parameters are present
+	 */
+	public UebTopicSource build(List<String> servers, 
+								String topic, 
+								String apiKey, 
+								String apiSecret)
+			throws IllegalArgumentException;
+
+	/**
+	 * Instantiates a new UEB Topic Source
+	 * 
+	 * @param uebTopicSourceType Implementation type
+	 * @param servers list of servers
+	 * @param topic topic name
+	 * 
+	 * @return an UEB Topic Source
+	 * @throws IllegalArgumentException if invalid parameters are present
+	 */
+	public UebTopicSource build(List<String> servers, 
+								String topic)
+			throws IllegalArgumentException;	
+	
+	/**
+	 * Destroys an UEB Topic Source based on a topic
+	 * 
+	 * @param topic topic name
+	 * @throws IllegalArgumentException if invalid parameters are present
+	 */
+	public void destroy(String topic);
+	
+	/**
+	 * Destroys all UEB Topic Sources
+	 */
+	public void destroy();
+	
+	/**
+	 * gets an UEB Topic Source based on topic name
+	 * @param topic the topic name
+	 * @return an UEB Topic Source with topic name
+	 * @throws IllegalArgumentException if an invalid topic is provided
+	 * @throws IllegalStateException if the UEB Topic Source is 
+	 * an incorrect state
+	 */
+	public UebTopicSource get(String topic)
+		   throws IllegalArgumentException, IllegalStateException;
+	
+	/**
+	 * Provides a snapshot of the UEB Topic Sources
+	 * @return a list of the UEB Topic Sources
+	 */
+	public List<UebTopicSource> inventory();
+}
+
+/* ------------- implementation ----------------- */
+
+/**
+ * Factory of UEB Source Topics indexed by topic name
+ */
+class IndexedUebTopicSourceFactory implements UebTopicSourceFactory {
+	// get an instance of logger 
+	private static Logger  logger = FlexLogger.getLogger(IndexedUebTopicSourceFactory.class);	
+	/**
+	 * UEB Topic Name Index
+	 */
+	protected HashMap<String, UebTopicSource> uebTopicSources =
+			new HashMap<String, UebTopicSource>();
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public UebTopicSource build(List<String> servers, 
+								String topic, 
+								String apiKey, 
+								String apiSecret, 
+								String consumerGroup, 
+								String consumerInstance,
+								int fetchTimeout,
+								int fetchLimit,
+								boolean managed) 
+	throws IllegalArgumentException {
+		
+		if (topic == null || topic.isEmpty()) {
+			throw new IllegalArgumentException("A topic must be provided");
+		}
+		
+		synchronized(this) {
+			if (uebTopicSources.containsKey(topic)) {
+				return uebTopicSources.get(topic);
+			}
+			
+			UebTopicSource uebTopicSource = 
+					new SingleThreadedUebTopicSource(servers, topic, 
+													 apiKey, apiSecret,
+													 consumerGroup, consumerInstance, 
+													 fetchTimeout, fetchLimit);
+			
+			if (managed)
+				uebTopicSources.put(topic, uebTopicSource);
+			
+			return uebTopicSource;
+		}
+	}
+	
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public List<UebTopicSource> build(Properties properties) 
+			throws IllegalArgumentException {
+		
+		String readTopics = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS);
+		if (readTopics == null || readTopics.isEmpty()) {
+			logger.warn("No topic for UEB Source " + properties);
+			return new ArrayList<UebTopicSource>();
+		}
+		List<String> readTopicList = new ArrayList<String>(Arrays.asList(readTopics.split("\\s*,\\s*")));		
+		
+		List<UebTopicSource> uebTopicSources = new ArrayList<UebTopicSource>();
+		synchronized(this) {
+			for (String topic: readTopicList) {
+				
+				String servers = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." + 
+                                                        topic + 
+                                                        PolicyProperties.PROPERTY_TOPIC_SERVERS_SUFFIX);
+				
+				if (servers == null || servers.isEmpty()) {
+					logger.error("No UEB servers provided in " + properties);
+					continue;
+				}
+				
+				List<String> serverList = new ArrayList<String>(Arrays.asList(servers.split("\\s*,\\s*")));
+				
+				String apiKey = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + 
+                        							   "." + topic + 
+                                                       PolicyProperties.PROPERTY_TOPIC_API_KEY_SUFFIX);
+				
+				String apiSecret = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + 
+						                                  "." + topic + 
+                                                          PolicyProperties.PROPERTY_TOPIC_API_SECRET_SUFFIX);
+				
+				String consumerGroup = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + 
+						                                      "." + topic + 
+                                                              PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_GROUP_SUFFIX);
+				
+				String consumerInstance = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + 
+						                                         "." + topic + 
+                                                                 PolicyProperties.PROPERTY_TOPIC_SOURCE_CONSUMER_INSTANCE_SUFFIX);
+				
+				String fetchTimeoutString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + 
+						                                           "." + topic + 
+                                                                   PolicyProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
+				int fetchTimeout = UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH;
+				if (fetchTimeoutString != null && !fetchTimeoutString.isEmpty()) {
+					try {
+						fetchTimeout = Integer.parseInt(fetchTimeoutString);
+					} catch (NumberFormatException nfe) {
+						logger.warn("Fetch Timeout in invalid format for topic " + topic + ": " + fetchTimeoutString);
+					}
+				}
+					
+				String fetchLimitString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + 
+                                                                 "." + topic + 
+                                                                 PolicyProperties.PROPERTY_TOPIC_SOURCE_FETCH_TIMEOUT_SUFFIX);
+				int fetchLimit = UebTopicSource.DEFAULT_LIMIT_FETCH;
+				if (fetchLimitString != null && !fetchLimitString.isEmpty()) {
+					try {
+						fetchLimit = Integer.parseInt(fetchLimitString);
+					} catch (NumberFormatException nfe) {
+						logger.warn("Fetch Limit in invalid format for topic " + topic + ": " + fetchLimitString);
+					}
+				}
+				
+				String managedString = properties.getProperty(PolicyProperties.PROPERTY_UEB_SOURCE_TOPICS + "." +
+						                                      topic + PolicyProperties.PROPERTY_MANAGED_SUFFIX);
+				boolean managed = true;
+				if (managedString != null && !managedString.isEmpty()) {
+					managed = Boolean.parseBoolean(managedString);
+				}
+			
+				UebTopicSource uebTopicSource = this.build(serverList, topic, 
+						   						           apiKey, apiSecret,
+						   						           consumerGroup, consumerInstance, 
+						   						           fetchTimeout, fetchLimit, managed);
+				uebTopicSources.add(uebTopicSource);
+			}
+		}
+		return uebTopicSources;
+	}
+	
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public UebTopicSource build(List<String> servers, 
+								String topic,
+								String apiKey, 
+								String apiSecret) {
+		return this.build(servers, topic, 
+				  		  apiKey, apiSecret,
+				  		  null, null,
+				  		  UebTopicSource.DEFAULT_TIMEOUT_MS_FETCH,
+				  		  UebTopicSource.DEFAULT_LIMIT_FETCH, true);
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public UebTopicSource build(List<String> servers, String topic) {
+		return this.build(servers, topic, null, null);
+	}	
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public void destroy(String topic) 
+		   throws IllegalArgumentException {
+		
+		if (topic == null || topic.isEmpty()) {
+			throw new IllegalArgumentException("A topic must be provided");
+		}
+		
+		UebTopicSource uebTopicSource;
+		
+		synchronized(this) {
+			if (!uebTopicSources.containsKey(topic)) {
+				return;
+			}
+			
+			uebTopicSource = uebTopicSources.remove(topic);
+		}
+		
+		uebTopicSource.shutdown();
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public UebTopicSource get(String topic) 
+	       throws IllegalArgumentException, IllegalStateException {
+		
+		if (topic == null || topic.isEmpty()) {
+			throw new IllegalArgumentException("A topic must be provided");
+		}
+		
+		synchronized(this) {
+			if (uebTopicSources.containsKey(topic)) {
+				return uebTopicSources.get(topic);
+			} else {
+				throw new IllegalStateException("UebTopiceSource for " + topic + " not found");
+			}
+		}
+	}
+
+	@Override
+	public synchronized List<UebTopicSource> inventory() {
+		 List<UebTopicSource> readers = 
+				 new ArrayList<UebTopicSource>(this.uebTopicSources.values());
+		 return readers;
+	}
+
+	@Override
+	public void destroy() {
+		List<UebTopicSource> readers = this.inventory();
+		for (UebTopicSource reader: readers) {
+			reader.shutdown();
+		}
+		
+		synchronized(this) {
+			this.uebTopicSources.clear();
+		}
+	}
+	
+}
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusConsumer.java
new file mode 100644
index 0000000..6fee5ce
--- /dev/null
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusConsumer.java
@@ -0,0 +1,204 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.policy.drools.event.comm.bus.internal;
+
+import java.net.MalformedURLException;
+import java.security.GeneralSecurityException;
+import java.util.List;
+import java.util.Properties;
+
+import com.att.nsa.cambria.client.CambriaClientBuilders;
+import com.att.nsa.cambria.client.CambriaConsumer;
+import com.att.nsa.mr.client.impl.MRConsumerImpl;
+import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
+import com.att.nsa.cambria.client.CambriaClientBuilders.ConsumerBuilder;
+
+/**
+ * Wrapper around libraries to consume from message bus
+ *
+ */
+public interface BusConsumer {
+	
+	/**
+	 * fetch messages
+	 * 
+	 * @return list of messages
+	 * @throws Exception when error encountered by underlying libraries
+	 */
+	public Iterable<String> fetch() throws Exception;
+	
+	/**
+	 * close underlying library consumer
+	 */
+	public void close();
+
+	/**
+	 * Cambria based consumer
+	 */
+	public static class CambriaConsumerWrapper implements BusConsumer {
+		/**
+		 * Cambria client
+		 */
+		protected CambriaConsumer consumer;
+		
+		/**
+		 * Cambria Consumer Wrapper
+		 * 
+		 * @param servers messaging bus hosts
+		 * @param topic topic
+		 * @param apiKey API Key
+		 * @param apiSecret API Secret
+		 * @param consumerGroup Consumer Group
+		 * @param consumerInstance Consumer Instance
+		 * @param fetchTimeout Fetch Timeout
+		 * @param fetchLimit Fetch Limit
+		 * @throws GeneralSecurityException 
+		 * @throws MalformedURLException 
+		 */
+		public CambriaConsumerWrapper(List<String> servers, String topic, 
+								  String apiKey, String apiSecret,
+								  String consumerGroup, String consumerInstance,
+								  int fetchTimeout, int fetchLimit) 
+		       throws IllegalArgumentException {
+			
+			ConsumerBuilder builder = 
+					new CambriaClientBuilders.ConsumerBuilder();
+			
+			builder.knownAs(consumerGroup, consumerInstance)
+			       .usingHosts(servers)
+			       .onTopic(topic)
+			       .waitAtServer(fetchTimeout)
+			       .receivingAtMost(fetchLimit);
+			
+			if (apiKey != null && !apiKey.isEmpty() &&
+				apiSecret != null && !apiSecret.isEmpty()) {
+				builder.authenticatedBy(apiKey, apiSecret);
+			}
+					
+			try {
+				this.consumer = builder.build();
+			} catch (MalformedURLException | GeneralSecurityException e) {
+				throw new IllegalArgumentException(e);
+			}
+		}
+		
+		/**
+		 * {@inheritDoc}
+		 */
+		public Iterable<String> fetch() throws Exception {
+			return this.consumer.fetch();
+		}
+		
+		/**
+		 * {@inheritDoc}
+		 */
+		public void close() {
+			this.consumer.close();
+		}
+		
+		@Override
+		public String toString() {
+			return "CambriaConsumerWrapper []";
+		}
+	}
+	
+	/**
+	 * MR based consumer
+	 */
+	public static class DmaapConsumerWrapper implements BusConsumer {
+		
+		/**
+		 * MR Consumer
+		 */
+		protected MRConsumerImpl consumer;
+		
+		/**
+		 * MR Consumer Wrapper
+		 * 
+		 * @param servers messaging bus hosts
+		 * @param topic topic
+		 * @param apiKey API Key
+		 * @param apiSecret API Secret
+		 * @param aafLogin AAF Login
+		 * @param aafPassword AAF Password
+		 * @param consumerGroup Consumer Group
+		 * @param consumerInstance Consumer Instance
+		 * @param fetchTimeout Fetch Timeout
+		 * @param fetchLimit Fetch Limit
+		 */
+		public DmaapConsumerWrapper(List<String> servers, String topic, 
+								String apiKey, String apiSecret,
+								String aafLogin, String aafPassword,
+								String consumerGroup, String consumerInstance,
+								int fetchTimeout, int fetchLimit) 
+		throws Exception {
+					
+			this.consumer = new MRConsumerImpl(servers, topic, 
+											   consumerGroup, consumerInstance, 
+											   fetchTimeout, fetchLimit, 
+									           null, apiKey, apiSecret);
+			
+			this.consumer.setUsername(aafLogin);
+			this.consumer.setPassword(aafPassword);
+			
+			this.consumer.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
+			
+			Properties props = new Properties();
+			props.setProperty("Protocol", "http");
+			this.consumer.setProps(props);
+			this.consumer.setHost(servers.get(0) + ":3904");;
+		}
+		
+		/**
+		 * {@inheritDoc}
+		 */
+		public Iterable<String> fetch() throws Exception {
+			return this.consumer.fetch();
+		}
+		
+		/**
+		 * {@inheritDoc}
+		 */
+		public void close() {
+			this.consumer.close();
+		}
+		
+		@Override
+		public String toString() {
+			StringBuilder builder = new StringBuilder();
+			builder.
+			append("DmaapConsumerWrapper [").
+			append("consumer.getAuthDate()=").append(consumer.getAuthDate()).
+			append(", consumer.getAuthKey()=").append(consumer.getAuthKey()).
+			append(", consumer.getHost()=").append(consumer.getHost()).
+			append(", consumer.getProtocolFlag()=").append(consumer.getProtocolFlag()).
+			append(", consumer.getUsername()=").append(consumer.getUsername()).
+			append("]");
+			return builder.toString();
+		}
+	}
+
+	
+}
+
+
+
+
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusPublisher.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusPublisher.java
new file mode 100644
index 0000000..798bf98
--- /dev/null
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusPublisher.java
@@ -0,0 +1,231 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.policy.drools.event.comm.bus.internal;
+
+import java.net.MalformedURLException;
+import java.security.GeneralSecurityException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.openecomp.policy.common.logging.eelf.PolicyLogger;
+import com.att.nsa.cambria.client.CambriaBatchingPublisher;
+import com.att.nsa.cambria.client.CambriaClientBuilders;
+import com.att.nsa.cambria.client.CambriaClientBuilders.PublisherBuilder;
+import com.att.nsa.mr.client.impl.MRSimplerBatchPublisher;
+import com.att.nsa.mr.test.clients.ProtocolTypeConstants;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+public interface BusPublisher {
+	
+	/**
+	 * sends a message
+	 * 
+	 * @param partition id
+	 * @param message the message
+	 * @return true if success, false otherwise
+	 * @throws IllegalArgumentException if no message provided
+	 */
+	public boolean send(String partitionId, String message) throws IllegalArgumentException;
+	
+	/**
+	 * closes the publisher
+	 */
+	public void close();
+	
+	/**
+	 * Cambria based library publisher
+	 */
+	public static class CambriaPublisherWrapper implements BusPublisher {
+
+		/**
+		 * The actual Cambria publisher
+		 */
+		@JsonIgnore
+		protected volatile CambriaBatchingPublisher publisher;
+		
+		public CambriaPublisherWrapper(List<String> servers, String topic,
+						               String apiKey,
+						               String apiSecret) 
+		       throws IllegalArgumentException {
+			PublisherBuilder builder = new CambriaClientBuilders.PublisherBuilder();
+			
+			builder.usingHosts(servers)
+			       .onTopic(topic);
+			
+				   // Only supported in 0.2.4 version
+			       // .logSendFailuresAfter(DEFAULT_LOG_SEND_FAILURES_AFTER);
+			
+			if (apiKey != null && !apiKey.isEmpty() &&
+				apiSecret != null && !apiSecret.isEmpty()) {
+				builder.authenticatedBy(apiKey, apiSecret);
+			}
+			
+			try {
+				this.publisher = builder.build();
+			} catch (MalformedURLException | GeneralSecurityException e) {
+				throw new IllegalArgumentException(e);
+			}
+		}
+		
+		/**
+		 * {@inheritDoc}
+		 */
+		@Override
+		public boolean send(String partitionId, String message) 
+				throws IllegalArgumentException {
+			if (message == null)
+				throw new IllegalArgumentException("No message provided");
+			
+			try {
+				this.publisher.send(partitionId, message);
+			} catch (Exception e) {
+				PolicyLogger.warn(CambriaPublisherWrapper.class.getName(), 
+		                          "SEND of " + message + " IN " +
+		                          this + " cannot be performed because of " + 
+						          e.getMessage());
+				return false;
+			}
+			return true;			
+		}
+		
+		/**
+		 * {@inheritDoc}
+		 */
+		@Override
+		public void close() {
+			if (PolicyLogger.isInfoEnabled())
+				PolicyLogger.info(CambriaPublisherWrapper.class.getName(), 
+				                  "CREATION: " + this);
+			
+			try {
+				this.publisher.close();
+			} catch (Exception e) {
+				PolicyLogger.warn(CambriaPublisherWrapper.class.getName(), 
+				                  "CLOSE on " + this + " FAILED because of " + 
+								  e.getMessage());
+			}
+		}
+		
+		
+		@Override
+		public String toString() {
+			StringBuilder builder = new StringBuilder();
+			builder.append("CambriaPublisherWrapper [").
+			append("publisher.getPendingMessageCount()=").
+			append(publisher.getPendingMessageCount()).
+			append("]");
+			return builder.toString();
+		}
+		
+	}
+	
+	/**
+	 * DmaapClient library wrapper
+	 */
+	public static class DmaapPublisherWrapper implements BusPublisher {
+		/**
+		 * MR based Publisher
+		 */		
+		protected MRSimplerBatchPublisher publisher;
+		
+		public DmaapPublisherWrapper(List<String> servers, String topic,
+				                     String aafLogin,
+				                     String aafPassword) {
+			
+			ArrayList<String> dmaapServers = new ArrayList<String>();
+			for (String server: servers) {
+				dmaapServers.add(server + ":3904");
+			}
+					
+			this.publisher = 
+				new MRSimplerBatchPublisher.Builder().
+			                                againstUrls(dmaapServers).
+			                                onTopic(topic).
+			                                build();
+			
+			this.publisher.setProtocolFlag(ProtocolTypeConstants.AAF_AUTH.getValue());
+			
+			this.publisher.setUsername(aafLogin);
+			this.publisher.setPassword(aafPassword);  
+			
+			Properties props = new Properties();
+			props.setProperty("Protocol", "http");
+			props.setProperty("contenttype", "application/json");
+			
+			this.publisher.setProps(props);
+			
+			this.publisher.setHost(servers.get(0));
+			
+			if (PolicyLogger.isInfoEnabled())
+				PolicyLogger.info(DmaapPublisherWrapper.class.getName(), 
+						          "CREATION: " + this);
+		}
+
+		/**
+		 * {@inheritDoc}
+		 */
+		@Override
+		public void close() {
+			if (PolicyLogger.isInfoEnabled())
+				PolicyLogger.info(DmaapPublisherWrapper.class.getName(), 
+				                  "CREATION: " + this);
+			
+			try {
+				this.publisher.close(1, TimeUnit.SECONDS);
+			} catch (Exception e) {
+				PolicyLogger.warn(DmaapPublisherWrapper.class.getName(), 
+				                  "CLOSE: " + this + " because of " + 
+								  e.getMessage());
+			}
+		}
+		
+		/**
+		 * {@inheritDoc}
+		 */
+		@Override
+		public boolean send(String partitionId, String message) 
+				throws IllegalArgumentException {
+			if (message == null)
+				throw new IllegalArgumentException("No message provided");
+			
+			this.publisher.send(partitionId, message);
+			return true;
+			
+		}
+		
+		@Override
+		public String toString() {
+			StringBuilder builder = new StringBuilder();
+			builder.append("DmaapPublisherWrapper [").
+			append("publisher.getAuthDate()=").append(publisher.getAuthDate()).
+			append(", publisher.getAuthKey()=").append(publisher.getAuthKey()).
+			append(", publisher.getHost()=").append(publisher.getHost()).
+			append(", publisher.getProtocolFlag()=").append(publisher.getProtocolFlag()).
+			append(", publisher.getUsername()=").append(publisher.getUsername()).
+			append(", publisher.getPendingMessageCount()=").append(publisher.getPendingMessageCount()).
+			append("]");
+			return builder.toString();
+		}
+	}
+
+}
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusTopicBase.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusTopicBase.java
new file mode 100644
index 0000000..e36e3af
--- /dev/null
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/BusTopicBase.java
@@ -0,0 +1,112 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.policy.drools.event.comm.bus.internal;
+
+import java.util.List;
+
+import org.apache.commons.collections4.queue.CircularFifoQueue;
+
+import org.openecomp.policy.drools.event.comm.Topic;
+import org.openecomp.policy.drools.event.comm.bus.BusTopic;
+
+public abstract class BusTopicBase implements BusTopic, Topic {
+	
+	protected List<String> servers;
+
+	protected String topic;
+	
+	protected String apiKey;
+	protected String apiSecret;
+	
+	protected CircularFifoQueue<String> recentEvents = new CircularFifoQueue<String>(10);
+	
+	public BusTopicBase(List<String> servers, 
+						  String topic, 
+						  String apiKey, 
+						  String apiSecret) 
+	throws IllegalArgumentException {
+		
+		if (servers == null || servers.isEmpty()) {
+			throw new IllegalArgumentException("UEB Server(s) must be provided");
+		}
+		
+		if (topic == null || topic.isEmpty()) {
+			throw new IllegalArgumentException("An UEB Topic must be provided");
+		}
+		
+		this.servers = servers;
+		this.topic = topic;
+		
+		this.apiKey = apiKey;
+		this.apiSecret = apiSecret;
+	}
+	
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public String getTopic() {
+		return topic;
+	}
+	
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public List<String> getServers() {
+		return servers;
+	}
+	
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public String getApiKey() {
+		return apiKey;
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public String getApiSecret() {
+		return apiSecret;
+	}
+	
+	/**
+	 * @return the recentEvents
+	 */
+	@Override
+	public synchronized String[] getRecentEvents() {
+		String[] events = new String[recentEvents.size()];
+		return recentEvents.toArray(events);
+	}
+
+
+	@Override
+	public String toString() {
+		StringBuilder builder = new StringBuilder();
+		builder.append("UebTopicBase [servers=").append(servers).append(", topic=").append(topic).append(", apiKey=")
+				.append(apiKey).append(", apiSecret=").append(apiSecret).append("]");
+		return builder.toString();
+	}
+
+}
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java
new file mode 100644
index 0000000..bd88818
--- /dev/null
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineBusTopicSink.java
@@ -0,0 +1,284 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.policy.drools.event.comm.bus.internal;
+
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.log4j.Logger;
+
+import org.openecomp.policy.drools.event.comm.bus.BusTopicSink;
+import org.openecomp.policy.common.logging.flexlogger.FlexLogger;
+import org.openecomp.policy.common.logging.eelf.MessageCodes;
+
+/**
+ * Transport Agnostic Bus Topic Sink to carry out the core functionality
+ * to interact with a sink regardless if it is UEB or DMaaP.
+ *
+ */
+public abstract class InlineBusTopicSink extends BusTopicBase implements BusTopicSink {
+	
+	/**
+	 * logger 
+	 */
+	private static org.openecomp.policy.common.logging.flexlogger.Logger logger = 
+										FlexLogger.getLogger(InlineBusTopicSink.class);
+	
+	/**
+	 * Not to be converted to PolicyLogger.
+	 * This will contain all in/out traffic and only that in a single file in a concise format.
+	 */
+	protected static final Logger networkLogger = Logger.getLogger(NETWORK_LOGGER);
+	
+	/**
+	 * The partition key to publish to
+	 */
+	protected String partitionId;
+	
+	/**
+	 * Am I running?
+	 * reflects invocation of start()/stop() 
+	 * !locked & start() => alive
+	 * stop() => !alive
+	 */
+	protected volatile boolean alive = false;
+	
+	/**
+	 * Am I locked?
+	 * reflects invocation of lock()/unlock() operations
+	 * locked => !alive (but not in the other direction necessarily)
+	 * locked => !offer, !run, !start, !stop (but this last one is obvious
+	 *                                        since locked => !alive)
+	 */
+	protected volatile boolean locked = false;
+	
+	/**
+	 * message bus publisher
+	 */
+	protected BusPublisher publisher;
+
+	/**
+	 * constructor for abstract sink
+	 * 
+	 * @param servers servers
+	 * @param topic topic
+	 * @param apiKey api secret
+	 * @param apiSecret api secret
+	 * @param partitionId partition id
+	 * @throws IllegalArgumentException in invalid parameters are passed in
+	 */
+	public InlineBusTopicSink(List<String> servers, String topic, 
+			                  String apiKey, String apiSecret, String partitionId)
+			throws IllegalArgumentException {
+		
+		super(servers, topic, apiKey, apiSecret);		
+		
+		if (partitionId == null || partitionId.isEmpty()) {
+			this.partitionId = UUID.randomUUID ().toString();
+		}
+	}
+	
+	/**
+	 * Initialize the Bus publisher
+	 */
+	public abstract void init();
+	
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public boolean start() throws IllegalStateException {
+		
+		if (logger.isInfoEnabled())
+			logger.info("START: " + this);
+		
+		synchronized(this) {
+			
+			if (this.alive)
+				return true;
+			
+			if (locked)
+				throw new IllegalStateException(this + " is locked.");
+			
+			this.alive = true;
+		}
+				
+		this.init();
+		return true;
+	}
+	
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public boolean stop() {
+		
+		BusPublisher publisherCopy;
+		synchronized(this) {
+			this.alive = false;
+			publisherCopy = this.publisher;
+			this.publisher = null;
+		}
+		
+		if (publisherCopy != null) {
+			try {
+				publisherCopy.close();
+			} catch (Exception e) {
+				logger.warn(MessageCodes.EXCEPTION_ERROR, e, "PUBLISHER.CLOSE", this.toString());
+				e.printStackTrace();
+			}
+		} else {
+			logger.warn("No publisher to close: " + this);
+			return false;
+		}
+		
+		return true;
+	}
+	
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public boolean lock() {
+		
+		if (logger.isInfoEnabled())
+			logger.info("LOCK: " + this);	
+		
+		synchronized (this) {
+			if (this.locked)
+				return true;
+			
+			this.locked = true;
+		}
+		
+		return this.stop();
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public boolean unlock() {
+		
+		if (logger.isInfoEnabled())
+			logger.info("UNLOCK: " + this);
+		
+		synchronized(this) {
+			if (!this.locked)
+				return true;
+			
+			this.locked = false;
+		}
+		
+		try {
+			return this.start();
+		} catch (Exception e) {
+			logger.warn("can't start after unlocking " + this + 
+					     " because of " + e.getMessage());
+			e.printStackTrace();
+			return false;
+		}
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public boolean isLocked() {
+		return this.locked;
+	}	
+	
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public boolean isAlive() {
+		return this.alive;
+	}	
+	
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public boolean send(String message) throws IllegalArgumentException, IllegalStateException {
+		
+		if (message == null || message.isEmpty()) {
+			throw new IllegalArgumentException("Message to send is empty");
+		}
+
+		if (!this.alive) {
+			throw new IllegalStateException(this + " is stopped");
+		}
+		
+		try {
+			synchronized (this) {
+				this.recentEvents.add(message);
+			}
+			
+			if (networkLogger.isInfoEnabled()) {
+				networkLogger.info("[OUT|" + this.getTopicCommInfrastructure() + "|" + 
+			                       this.topic + "]:" + 
+			                       message);
+			}
+			
+			publisher.send(this.partitionId, message);
+		} catch (Exception e) {
+			logger.error("can't start after unlocking " + this + 
+				         " because of " + e.getMessage());
+			e.printStackTrace();
+			return false;
+		}
+		
+		return true;
+	}
+	
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public void setPartitionKey(String partitionKey) {
+		this.partitionId = partitionKey;
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public String getPartitionKey() {
+		return this.partitionId;
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public void shutdown() throws IllegalStateException {
+		this.stop();
+	}
+	
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public abstract CommInfrastructure getTopicCommInfrastructure();
+
+}
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java
new file mode 100644
index 0000000..417c6d4
--- /dev/null
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineDmaapTopicSink.java
@@ -0,0 +1,84 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.policy.drools.event.comm.bus.internal;
+
+import java.util.List;
+
+import org.openecomp.policy.drools.event.comm.Topic;
+import org.openecomp.policy.drools.event.comm.bus.DmaapTopicSink;
+import org.openecomp.policy.common.logging.flexlogger.FlexLogger;
+import org.openecomp.policy.common.logging.flexlogger.Logger;
+
+/**
+ * This implementation publishes events for the associated DMAAP topic,
+ * inline with the calling thread.
+ */
+public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTopicSink {
+	
+	protected static Logger logger = 
+			FlexLogger.getLogger(InlineDmaapTopicSink.class);
+	
+	protected final String userName;
+	protected final String password;
+	
+	public InlineDmaapTopicSink(List<String> servers, String topic, 
+			                    String apiKey, String apiSecret,
+	                            String userName, String password,
+			                    String partitionKey) 
+		throws IllegalArgumentException {
+		
+		super(servers, topic, apiKey, apiSecret, partitionKey);
+		
+		this.userName = userName;
+		this.password = password;
+	}
+	
+
+	@Override
+	public void init() {
+		this.publisher = 
+				new BusPublisher.DmaapPublisherWrapper(this.servers, 
+						                               this.topic, 
+						                               this.userName, 
+						                               this.password);
+		if (logger.isInfoEnabled())
+			logger.info("DMAAP SINK TOPIC created " + this);
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public CommInfrastructure getTopicCommInfrastructure() {
+		return Topic.CommInfrastructure.DMAAP;
+	}
+
+
+	@Override
+	public String toString() {
+		StringBuilder builder = new StringBuilder();
+		builder.append("InlineDmaapTopicSink [userName=").append(userName).append(", password=").append(password)
+				.append(", getTopicCommInfrastructure()=").append(getTopicCommInfrastructure()).append(", toString()=")
+				.append(super.toString()).append("]");
+		return builder.toString();
+	}
+
+}
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineUebTopicSink.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineUebTopicSink.java
new file mode 100644
index 0000000..2d4b155
--- /dev/null
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/InlineUebTopicSink.java
@@ -0,0 +1,91 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.policy.drools.event.comm.bus.internal;
+
+import java.util.List;
+
+import org.openecomp.policy.drools.event.comm.Topic;
+import org.openecomp.policy.drools.event.comm.bus.UebTopicSink;
+import org.openecomp.policy.common.logging.flexlogger.FlexLogger;
+
+/**
+ * This implementation publishes events for the associated UEB topic,
+ * inline with the calling thread.
+ */
+public class InlineUebTopicSink extends InlineBusTopicSink implements UebTopicSink {
+	
+	/**
+	 * logger 
+	 */
+	private static org.openecomp.policy.common.logging.flexlogger.Logger logger = 
+										FlexLogger.getLogger(InlineUebTopicSink.class);
+	
+	/**
+	 * Argument-based UEB Topic Writer instantiation
+	 * 
+	 * @param servers list of UEB servers available for publishing
+	 * @param topic the topic to publish to
+	 * @param apiKey the api key (optional)
+	 * @param apiSecret the api secret (optional)
+	 * @param partitionId the partition key (optional, autogenerated if not provided)
+	 * 
+	 * @throws IllegalArgumentException if invalid arguments are detected
+	 */
+	public InlineUebTopicSink(List<String> servers, 
+								String topic, 
+            					String apiKey, 
+            					String apiSecret,
+            					String partitionId) 
+    throws IllegalArgumentException {		
+		super(servers, topic, apiKey, apiSecret, partitionId);
+	}
+	
+	/**
+	 * Instantiation of internal resources
+	 */
+	@Override
+	public void init() {	
+		
+		this.publisher = 
+				new BusPublisher.CambriaPublisherWrapper(this.servers, 
+						                                 this.topic, 
+						                                 this.apiKey, 
+						                                 this.apiSecret);
+		if (logger.isInfoEnabled())
+			logger.info("UEB SINK TOPIC created " + this);
+	}
+	
+	@Override
+	public String toString() {
+		StringBuilder builder = new StringBuilder();
+		builder.append("InlineUebTopicSink [getTopicCommInfrastructure()=").append(getTopicCommInfrastructure())
+				.append(", toString()=").append(super.toString()).append("]");
+		return builder.toString();
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public CommInfrastructure getTopicCommInfrastructure() {
+		return Topic.CommInfrastructure.UEB;
+	}
+}
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java
new file mode 100644
index 0000000..f37c349
--- /dev/null
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java
@@ -0,0 +1,477 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.policy.drools.event.comm.bus.internal;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.log4j.Logger;
+
+import org.openecomp.policy.drools.event.comm.TopicListener;
+import org.openecomp.policy.drools.event.comm.bus.BusTopicSource;
+import org.openecomp.policy.common.logging.eelf.MessageCodes;
+import org.openecomp.policy.common.logging.eelf.PolicyLogger;
+
+/**
+ * This topic source implementation specializes in reading messages
+ * over a bus topic source and notifying its listeners
+ */
+public abstract class SingleThreadedBusTopicSource 
+       extends BusTopicBase
+       implements Runnable, BusTopicSource {
+	   
+	private String className = SingleThreadedBusTopicSource.class.getName();
+	/**
+	 * Not to be converted to PolicyLogger.
+	 * This will contain all instract /out traffic and only that in a single file in a concise format.
+	 */
+	protected static final Logger networkLogger = Logger.getLogger(NETWORK_LOGGER);
+	
+	/**
+	 * Bus consumer group
+	 */
+	protected final String consumerGroup;
+	
+	/**
+	 * Bus consumer instance
+	 */
+	protected final String consumerInstance;
+	
+	/**
+	 * Bus fetch timeout
+	 */
+	protected final int fetchTimeout;
+	
+	/**
+	 * Bus fetch limit
+	 */
+	protected final int fetchLimit;
+	
+	/**
+	 * Message Bus Consumer
+	 */
+	protected BusConsumer consumer;
+	
+	/**
+	 * Am I running?
+	 * reflects invocation of start()/stop() 
+	 * !locked & start() => alive
+	 * stop() => !alive
+	 */
+	protected volatile boolean alive = false;
+	
+	/**
+	 * Am I locked?
+	 * reflects invocation of lock()/unlock() operations
+	 * locked => !alive (but not in the other direction necessarily)
+	 * locked => !offer, !run, !start, !stop (but this last one is obvious
+	 *                                        since locked => !alive)
+	 */
+	protected volatile boolean locked = false;
+	
+	/**
+	 * Independent thread reading message over my topic
+	 */
+	protected Thread busPollerThread;
+	
+	/**
+	 * All my subscribers for new message notifications
+	 */
+	protected final ArrayList<TopicListener> topicListeners = new ArrayList<TopicListener>();
+	
+	/**
+	 * 
+	 * @param servers Bus servers
+	 * @param topic Bus Topic to be monitored
+	 * @param apiKey Bus API Key (optional)
+	 * @param apiSecret Bus API Secret (optional)
+	 * @param consumerGroup Bus Reader Consumer Group
+	 * @param consumerInstance Bus Reader Instance
+	 * @param fetchTimeout Bus fetch timeout
+	 * @param fetchLimit Bus fetch limit
+	 * @throws IllegalArgumentException An invalid parameter passed in
+	 */
+	public SingleThreadedBusTopicSource(List<String> servers, 
+										String topic, 
+			                  			String apiKey, 
+			                  			String apiSecret, 
+			                  			String consumerGroup, 
+			                  			String consumerInstance,
+			                  			int fetchTimeout,
+			                  			int fetchLimit) 
+	throws IllegalArgumentException {
+		
+		super(servers, topic, apiKey, apiSecret);
+		
+		if (consumerGroup == null || consumerGroup.isEmpty()) {
+			this.consumerGroup = UUID.randomUUID ().toString();
+		} else {
+			this.consumerGroup = consumerGroup;
+		}
+		
+		if (consumerInstance == null || consumerInstance.isEmpty()) {
+			this.consumerInstance = DEFAULT_CONSUMER_INSTANCE;
+		} else {
+			this.consumerInstance = consumerInstance;
+		}
+		
+		if (fetchTimeout <= 0) {
+			this.fetchTimeout = NO_TIMEOUT_MS_FETCH;
+		} else {
+			this.fetchTimeout = fetchTimeout;
+		}
+		
+		if (fetchLimit <= 0) {
+			this.fetchLimit = NO_LIMIT_FETCH;
+		} else {
+			this.fetchLimit = fetchLimit;
+		}
+	}
+	
+	/**
+	 * Initialize the Bus client
+	 */
+	public abstract void init() throws Exception;
+	
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public void register(TopicListener topicListener) 
+		throws IllegalArgumentException {		
+		
+		PolicyLogger.info(className,"REGISTER: " + topicListener + " INTO " + this);
+		
+		synchronized(this) {
+			if (topicListener == null)
+				throw new IllegalArgumentException("TopicListener must be provided");
+			
+			/* check that this listener is not registered already */
+			for (TopicListener listener: this.topicListeners) {
+				if (listener == topicListener) {
+					// already registered
+					return;
+				}
+			}
+			
+			this.topicListeners.add(topicListener);
+		}
+		
+		try {
+			this.start();
+		} catch (Exception e) {
+			PolicyLogger.info(className, "new registration of " + topicListener +  
+					          ",but can't start source because of " + e.getMessage());
+		}
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public void unregister(TopicListener topicListener) {
+		
+		PolicyLogger.info(className, "UNREGISTER: " + topicListener + " FROM " + this);
+		
+		boolean stop = false;
+		synchronized (this) {
+			if (topicListener == null)
+				throw new IllegalArgumentException("TopicListener must be provided");
+			
+			this.topicListeners.remove(topicListener);
+			stop = (this.topicListeners.isEmpty());
+		}
+		
+		if (stop) {		
+			this.stop();
+		}
+	}
+	
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public boolean lock() {	
+		PolicyLogger.info(className, "LOCK: " + this);
+		
+		synchronized (this) {
+			if (this.locked)
+				return true;
+			
+			this.locked = true;
+		}
+		
+		return this.stop();
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public boolean unlock() {
+		PolicyLogger.info(className, "UNLOCK: " + this);
+		
+		synchronized(this) {
+			if (!this.locked)
+				return true;
+			
+			this.locked = false;
+		}
+		
+		try {
+			return this.start();
+		} catch (Exception e) {
+			PolicyLogger.warn("can't start after unlocking " + this + 
+					          " because of " + e.getMessage());
+			return false;
+		}
+	}
+	
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public boolean start() throws IllegalStateException {
+		
+		PolicyLogger.info(className, "START: " + this);
+		
+		synchronized(this) {
+			
+			if (alive) {
+				return true;
+			}
+			
+			if (locked) {
+				throw new IllegalStateException(this + " is locked.");
+			}
+			
+			if (this.busPollerThread == null || 
+				!this.busPollerThread.isAlive() || 
+				this.consumer == null) {
+				
+				try {
+					this.init();
+					this.alive = true;
+					this.busPollerThread = new Thread(this);
+					this.busPollerThread.setName(this.getTopicCommInfrastructure() + "-source-" + this.getTopic());
+					busPollerThread.start();
+				} catch (Exception e) {
+					e.printStackTrace();
+					throw new IllegalStateException(e);
+				}
+			}
+		}
+		
+		return this.alive;
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public boolean stop() {
+		PolicyLogger.info(className, "STOP: " + this);
+		
+		synchronized(this) {
+			BusConsumer consumerCopy = this.consumer;
+			
+			this.alive = false;
+			this.consumer = null;
+			
+			if (consumerCopy != null) {
+				try {
+					consumerCopy.close();
+				} catch (Exception e) {
+					PolicyLogger.warn(MessageCodes.EXCEPTION_ERROR, e, "CONSUMER.CLOSE", this.toString());
+				}
+			}
+		}
+							
+		Thread.yield();
+				
+		return true;
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public boolean isLocked() {
+		return this.locked;
+	}
+	
+	/**
+	 * broadcast event to all listeners
+	 * 
+	 * @param message the event
+	 * @return true if all notifications are performed with no error, false otherwise
+	 */
+	protected boolean broadcast(String message) {
+		
+		/* take a snapshot of listeners */
+		List<TopicListener> snapshotListeners = this.snapshotTopicListeners();
+		
+		boolean success = true;
+		for (TopicListener topicListener: snapshotListeners) {
+			try {
+				topicListener.onTopicEvent(this.getTopicCommInfrastructure(), this.topic, message);
+			} catch (Exception e) {
+				PolicyLogger.warn(this.className, "ERROR notifying " + topicListener.toString() + 
+						          " because of " + e.getMessage() + " @ " + this.toString());
+				success = false;
+			}
+		}
+		return success;
+	}
+	
+	/**
+	 * take a snapshot of current topic listeners
+	 * 
+	 * @return the topic listeners
+	 */
+	protected synchronized List<TopicListener> snapshotTopicListeners() {
+		@SuppressWarnings("unchecked")
+		List<TopicListener> listeners = (List<TopicListener>) topicListeners.clone();
+		return listeners;
+	}
+	
+	/**
+	 * Run thread method for the Bus Reader
+	 */
+	@Override
+	public void run() {
+		while (this.alive) {
+			try {
+				for (String event: this.consumer.fetch()) {					
+					synchronized (this) {
+						this.recentEvents.add(event);
+					}
+					
+					if (networkLogger.isInfoEnabled()) {
+						networkLogger.info("IN[" + this.getTopicCommInfrastructure() + "|" + 
+						                   this.topic + "]:" + 
+						                   event);
+					}
+					
+					PolicyLogger.info(className, this.topic + " <-- " + event);
+					broadcast(event);
+					
+					if (!this.alive)
+						break;
+				}
+			} catch (Exception e) {
+				PolicyLogger.error( MessageCodes.EXCEPTION_ERROR, className, e, "CONSUMER.FETCH", this.toString());
+			}
+		}
+		
+		PolicyLogger.warn(this.className, "Exiting: " + this);
+	}
+	
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public boolean offer(String event) {
+		PolicyLogger.info(className, "OFFER: " + event + " TO " + this);
+		
+		if (!this.alive) {
+			throw new IllegalStateException(this + " is not alive.");
+		}
+		
+		synchronized (this) {
+			this.recentEvents.add(event);
+		}
+		
+		if (networkLogger.isInfoEnabled()) {
+			networkLogger.info("IN[" + this.getTopicCommInfrastructure() + "|" + 
+			                    this.topic + "]:" + 
+			                    event);
+		}
+		
+		
+		return broadcast(event);
+	}
+	
+
+	@Override
+	public String toString() {
+		StringBuilder builder = new StringBuilder();
+		builder.append("SingleThreadedBusTopicSource [consumerGroup=").append(consumerGroup)
+				.append(", consumerInstance=").append(consumerInstance).append(", fetchTimeout=").append(fetchTimeout)
+				.append(", fetchLimit=").append(fetchLimit)
+				.append(", consumer=").append(this.consumer).append(", alive=")
+				.append(alive).append(", locked=").append(locked).append(", uebThread=").append(busPollerThread)
+				.append(", topicListeners=").append(topicListeners.size()).append(", toString()=").append(super.toString())
+				.append("]");
+		return builder.toString();
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public boolean isAlive() {
+		return alive;
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public String getConsumerGroup() {
+		return consumerGroup;
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public String getConsumerInstance() {
+		return consumerInstance;
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public void shutdown() throws IllegalStateException {
+		this.stop();
+		this.topicListeners.clear();
+	}
+	
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public int getFetchTimeout() {
+		return fetchTimeout;
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public int getFetchLimit() {
+		return fetchLimit;
+	}
+
+}
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java
new file mode 100644
index 0000000..e65d44a
--- /dev/null
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedDmaapTopicSource.java
@@ -0,0 +1,120 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.policy.drools.event.comm.bus.internal;
+
+import java.util.List;
+
+import org.openecomp.policy.drools.event.comm.Topic;
+import org.openecomp.policy.drools.event.comm.bus.DmaapTopicSource;
+import org.openecomp.policy.common.logging.eelf.PolicyLogger;
+
+/**
+ * This topic reader implementation specializes in reading messages
+ * over DMAAP topic and notifying its listeners
+ */
+public class SingleThreadedDmaapTopicSource extends SingleThreadedBusTopicSource
+                                            implements DmaapTopicSource, Runnable {
+	
+	protected final String userName;
+	protected final String password;
+	private String className = SingleThreadedDmaapTopicSource.class.getName();
+
+	/**
+	 * 
+	 * @param servers DMaaP servers
+	 * @param topic DMaaP Topic to be monitored
+	 * @param apiKey DMaaP API Key (optional)
+	 * @param apiSecret DMaaP API Secret (optional)
+	 * @param consumerGroup DMaaP Reader Consumer Group
+	 * @param consumerInstance DMaaP Reader Instance
+	 * @param fetchTimeout DMaaP fetch timeout
+	 * @param fetchLimit DMaaP fetch limit
+	 * @throws IllegalArgumentException An invalid parameter passed in
+	 */
+	public SingleThreadedDmaapTopicSource(List<String> servers, String topic, 
+			                              String apiKey, String apiSecret,
+			                              String userName, String password,
+			                              String consumerGroup, String consumerInstance, 
+			                              int fetchTimeout, int fetchLimit)
+			throws IllegalArgumentException {
+		
+		
+		super(servers, topic, apiKey, apiSecret, 
+			  consumerGroup, consumerInstance, 
+			  fetchTimeout, fetchLimit);
+		
+		this.userName = userName;
+		this.password = password;		
+		
+		try {
+			this.init();
+		} catch (Exception e) {
+			e.printStackTrace();
+			throw new IllegalArgumentException(e);
+		}
+	}
+	
+
+	/**
+	 * Initialize the Cambria or MR Client
+	 */
+	@Override
+	public void init() throws Exception {
+		
+		if (this.userName == null || this.userName.isEmpty() || 
+			this.password == null || this.password.isEmpty()) {
+			this.consumer =
+					new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic, 
+							                           this.apiKey, this.apiSecret,
+							                           this.consumerGroup, this.consumerInstance,
+							                           this.fetchTimeout, this.fetchLimit);			
+		} else {
+			this.consumer =
+					new BusConsumer.DmaapConsumerWrapper(this.servers, this.topic, 
+							                            this.apiKey, this.apiSecret,
+							                            this.userName, this.password,
+							                            this.consumerGroup, this.consumerInstance,
+							                            this.fetchTimeout, this.fetchLimit);
+		}
+			
+		PolicyLogger.info(className, "CREATION: " + this);
+	}
+	
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public CommInfrastructure getTopicCommInfrastructure() {
+		return Topic.CommInfrastructure.DMAAP;
+	}
+
+	@Override
+	public String toString() {
+		StringBuilder builder = new StringBuilder();
+		builder.append("SingleThreadedDmaapTopicSource [userName=").append(userName).append(", password=")
+				.append((password == null || password.isEmpty()) ? "-" : password.length())
+				.append(", getTopicCommInfrastructure()=").append(getTopicCommInfrastructure())
+				.append(", toString()=").append(super.toString()).append("]");
+		return builder.toString();
+	}
+
+
+}
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedUebTopicSource.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedUebTopicSource.java
new file mode 100644
index 0000000..edb55c7
--- /dev/null
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/event/comm/bus/internal/SingleThreadedUebTopicSource.java
@@ -0,0 +1,89 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.policy.drools.event.comm.bus.internal;
+
+import java.util.List;
+
+import org.openecomp.policy.drools.event.comm.Topic;
+import org.openecomp.policy.drools.event.comm.bus.UebTopicSource;
+
+/**
+ * This topic source implementation specializes in reading messages
+ * over an UEB Bus topic source and notifying its listeners
+ */
+public class SingleThreadedUebTopicSource extends SingleThreadedBusTopicSource 
+                                          implements UebTopicSource {
+
+	/**
+	 * 
+	 * @param servers UEB servers
+	 * @param topic UEB Topic to be monitored
+	 * @param apiKey UEB API Key (optional)
+	 * @param apiSecret UEB API Secret (optional)
+	 * @param consumerGroup UEB Reader Consumer Group
+	 * @param consumerInstance UEB Reader Instance
+	 * @param fetchTimeout UEB fetch timeout
+	 * @param fetchLimit UEB fetch limit
+	 * @throws IllegalArgumentException An invalid parameter passed in
+	 */
+	public SingleThreadedUebTopicSource(List<String> servers, String topic, 
+			                            String apiKey, String apiSecret,
+			                            String consumerGroup, String consumerInstance, 
+			                            int fetchTimeout, int fetchLimit)
+			throws IllegalArgumentException {
+		
+		super(servers, topic, apiKey, apiSecret, 
+			  consumerGroup, consumerInstance, 
+			  fetchTimeout, fetchLimit);
+		
+		
+		this.init();
+	}
+	
+	/**
+	 * Initialize the Cambria client
+	 */
+	@Override
+	public void init() {
+		this.consumer =
+			new BusConsumer.CambriaConsumerWrapper(this.servers, this.topic, 
+					                           this.apiKey, this.apiSecret,
+					                           this.consumerGroup, this.consumerInstance,
+					                           this.fetchTimeout, this.fetchLimit);
+	}
+	
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public CommInfrastructure getTopicCommInfrastructure() {
+		return Topic.CommInfrastructure.UEB;
+	}
+
+	@Override
+	public String toString() {
+		StringBuilder builder = new StringBuilder();
+		builder.append("SingleThreadedUebTopicSource [getTopicCommInfrastructure()=")
+				.append(getTopicCommInfrastructure()).append(", toString()=").append(super.toString()).append("]");
+		return builder.toString();
+	}
+
+}
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/http/client/HttpClient.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/http/client/HttpClient.java
new file mode 100644
index 0000000..2e81b2c
--- /dev/null
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/http/client/HttpClient.java
@@ -0,0 +1,48 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.openecomp.policy.drools.http.client;
+
+import javax.ws.rs.core.Response;
+
+import org.openecomp.policy.drools.properties.Startable;
+
+public interface HttpClient extends Startable {
+	
+	public Response get(String path);
+	
+	public Response get();
+	
+	public static <T> T getBody(Response response, Class<T> entityType) {
+		return response.readEntity(entityType);
+	}
+
+	public String getName();
+	public boolean isHttps();
+	public boolean isSelfSignedCerts();
+	public String getHostname();
+	public int getPort();
+	public String getBasePath();
+	public String getUserName();
+	public String getPassword();
+	public String getBaseUrl();
+	
+	
+	public static final HttpClientFactory factory = new IndexedHttpClientFactory();
+}
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/http/client/HttpClientFactory.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/http/client/HttpClientFactory.java
new file mode 100644
index 0000000..53a8c2b
--- /dev/null
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/http/client/HttpClientFactory.java
@@ -0,0 +1,185 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.openecomp.policy.drools.http.client;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Properties;
+
+import org.openecomp.policy.drools.http.client.internal.JerseyClient;
+import org.openecomp.policy.drools.properties.PolicyProperties;
+
+public interface HttpClientFactory {
+	
+	public HttpClient build(String name, boolean https, 
+                            boolean selfSignedCerts,
+                            String hostname, int port, 
+                            String baseUrl, String userName,
+                            String password, boolean managed) 
+    throws Exception;
+	
+	public ArrayList<HttpClient> build(Properties properties) throws Exception;
+	
+	public HttpClient get(String name);
+	
+	public List<HttpClient> inventory();
+	
+	public void destroy(String name);
+	
+	public void destroy();
+}
+
+class IndexedHttpClientFactory implements HttpClientFactory {
+	
+	protected HashMap<String, HttpClient> clients = new HashMap<String, HttpClient>();
+
+	@Override
+	public synchronized HttpClient build(String name, boolean https, boolean selfSignedCerts, 
+			                             String hostname, int port,
+			                             String baseUrl, String userName, String password,
+			                             boolean managed) 
+	throws Exception {
+		if (clients.containsKey(name))
+			return clients.get(name);
+		
+		JerseyClient client = 
+				new JerseyClient(name, https, selfSignedCerts, hostname, port, baseUrl, userName, password);
+		
+		if (managed)
+			clients.put(name, client);
+		
+		return client;
+	}
+
+	@Override
+	public synchronized ArrayList<HttpClient> build(Properties properties) throws Exception {
+		ArrayList<HttpClient> clientList = new ArrayList<HttpClient>();
+		
+		String clientNames = properties.getProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES);
+		if (clientNames == null || clientNames.isEmpty()) {
+			return clientList;
+		}
+		
+		List<String> clientNameList = 
+				new ArrayList<String>(Arrays.asList(clientNames.split("\\s*,\\s*")));
+		
+		for (String clientName : clientNameList) {
+			String httpsString = properties.getProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + 
+                                                        clientName + 
+                                                        PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX);
+			boolean https = false;
+			if (httpsString != null && !httpsString.isEmpty()) {
+				https = Boolean.parseBoolean(httpsString);
+			}
+			
+			String hostName = properties.getProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." +
+					                                 clientName + 
+                                                     PolicyProperties.PROPERTY_HTTP_HOST_SUFFIX);
+					
+			String servicePortString = properties.getProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + 
+		                                                      clientName + 
+		                                                      PolicyProperties.PROPERTY_HTTP_PORT_SUFFIX);
+			int port;
+			try {
+				if (servicePortString == null || servicePortString.isEmpty()) {
+					continue;
+				}
+				port = Integer.parseInt(servicePortString);
+			} catch (NumberFormatException nfe) {
+				nfe.printStackTrace();
+				continue;
+			}
+			
+			String baseUrl = properties.getProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." +
+					                                clientName + 
+                                                    PolicyProperties.PROPERTY_HTTP_URL_SUFFIX);
+			
+			String userName = properties.getProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." +
+					                                 clientName + 
+                                                     PolicyProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX);
+
+			String password = properties.getProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." +
+		                                             clientName + 
+                                                     PolicyProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX);
+			
+			String managedString = properties.getProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." +
+					                                      clientName + 
+					                                      PolicyProperties.PROPERTY_MANAGED_SUFFIX);
+			boolean managed = true;
+			if (managedString != null && !managedString.isEmpty()) {
+				managed = Boolean.parseBoolean(managedString);
+			}
+			
+			try {
+				HttpClient client =
+						this.build(clientName, https, https, hostName, port, baseUrl, 
+								   userName, password, managed);
+				clientList.add(client);
+			} catch (Exception e) {
+				e.printStackTrace();
+			}
+		}
+		
+		return clientList;
+	}
+
+	@Override
+	public synchronized HttpClient get(String name) {
+		if (clients.containsKey(name)) {
+			return clients.get(name);
+		} 
+		
+		throw new IllegalArgumentException("Http Client " + name + " not found");
+	}
+
+	@Override
+	public synchronized List<HttpClient> inventory() {
+		return new ArrayList<HttpClient>(this.clients.values());
+	}
+
+	@Override
+	public synchronized void destroy(String name) {
+		if (!clients.containsKey(name)) {
+			return;
+		}
+		
+		HttpClient client = clients.remove(name);
+		try {
+			client.shutdown();
+		} catch (IllegalStateException e) {
+			e.printStackTrace();
+		}
+	}
+
+	@Override
+	public void destroy() {
+		List<HttpClient> clientsInventory = this.inventory();
+		for (HttpClient client: clientsInventory) {
+			client.shutdown();
+		}
+		
+		synchronized(this) {
+			this.clients.clear();
+		}
+	}
+	
+}
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/http/client/internal/JerseyClient.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/http/client/internal/JerseyClient.java
new file mode 100644
index 0000000..4fa59dc
--- /dev/null
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/http/client/internal/JerseyClient.java
@@ -0,0 +1,242 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * policy-healthcheck
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.policy.drools.http.client.internal;
+
+import java.security.SecureRandom;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSession;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.core.Response;
+
+import org.glassfish.jersey.client.authentication.HttpAuthenticationFeature;
+import org.openecomp.policy.drools.http.client.HttpClient;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+public class JerseyClient implements HttpClient {
+	
+	protected final String name;
+	protected final boolean https;
+	protected final boolean selfSignedCerts;
+	protected final String hostname;
+	protected final int port;
+	protected final String basePath;
+	protected final String userName;
+	protected final String password;
+	
+	protected final Client client;
+	protected final String baseUrl;
+	
+	protected boolean alive = true;
+	
+	
+	public JerseyClient(String name, boolean https, 
+			            boolean selfSignedCerts,
+			            String hostname, int port, 
+			            String basePath, String userName,
+			            String password) 
+	throws Exception {
+		
+		super();
+		
+		if (name == null || name.isEmpty())
+			throw new IllegalArgumentException("Name must be provided");
+		
+		if (hostname == null || hostname.isEmpty())
+			throw new IllegalArgumentException("Hostname must be provided");
+		
+		if (port <= 0 && port >= 65535)
+			throw new IllegalArgumentException("Invalid Port provided: " + port);
+		
+		this.name = name;
+		this.https = https;
+		this.hostname = hostname;
+		this.port = port;
+		this.basePath = basePath;
+		this.userName = userName;
+		this.password = password;
+		this.selfSignedCerts = selfSignedCerts;
+		
+		StringBuffer tmpBaseUrl = new StringBuffer();
+		if (this.https) {
+			tmpBaseUrl.append("https://");
+			ClientBuilder clientBuilder;
+			SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
+			if (this.selfSignedCerts) {
+				sslContext.init(null, new TrustManager[]{new X509TrustManager() {
+					@Override
+			        public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {}
+					@Override
+			        public void checkServerTrusted(X509Certificate[]  chain, String authType) throws CertificateException {}
+					@Override
+			        public X509Certificate[] getAcceptedIssuers() { return new X509Certificate[0]; }
+	
+			    }}, new SecureRandom());
+				 clientBuilder = ClientBuilder.newBuilder().sslContext(sslContext).hostnameVerifier(new HostnameVerifier() {
+					@Override
+					public boolean verify(String hostname, SSLSession session) {return true;}
+				 });
+			} else {
+				sslContext.init(null, null, null);
+				clientBuilder = ClientBuilder.newBuilder().sslContext(sslContext);
+			}
+			this.client = clientBuilder.build();
+		} else {	
+			tmpBaseUrl.append("http://");
+			this.client = ClientBuilder.newClient();
+		}
+		
+		if (this.userName != null && !this.userName.isEmpty() &&
+			this.password != null && !this.password.isEmpty()) {
+			HttpAuthenticationFeature authFeature = HttpAuthenticationFeature.basic(userName, password);
+			this.client.register(authFeature);
+		}
+		
+		this.baseUrl = tmpBaseUrl.append(this.hostname).append(":").
+				                  append(this.port).append("/").
+		                          append((this.basePath == null) ? "" : this.basePath).
+		                          toString();
+	}
+	
+	@Override
+	public Response get(String path) {		
+		if (path != null && !path.isEmpty())
+			return this.client.target(this.baseUrl).path(path).request().get();
+		else
+			return this.client.target(this.baseUrl).request().get();
+	}
+	
+	@Override
+	public Response get() {
+		return this.client.target(this.baseUrl).request().get();
+	}
+	
+
+	@Override
+	public boolean start() throws IllegalStateException {
+		return alive;
+	}
+
+	@Override
+	public boolean stop() throws IllegalStateException {
+		return !alive;
+	}
+
+	@Override
+	public void shutdown() throws IllegalStateException {
+		synchronized(this) {
+			alive = false;
+		}
+		
+		try {
+			this.client.close();
+		} catch (Exception e) {
+			e.printStackTrace();
+		}
+	}
+
+	@Override
+	public synchronized boolean isAlive() {
+		return this.alive;
+	}
+
+	@Override
+	public String getName() {
+		return name;
+	}
+
+	@Override
+	public boolean isHttps() {
+		return https;
+	}
+
+	@Override
+	public boolean isSelfSignedCerts() {
+		return selfSignedCerts;
+	}
+
+	@Override
+	public String getHostname() {
+		return hostname;
+	}
+
+	public int getPort() {
+		return port;
+	}
+
+	@Override
+	public String getBasePath() {
+		return basePath;
+	}
+
+	@Override
+	public String getUserName() {
+		return userName;
+	}
+
+	@JsonIgnore
+	@Override
+	public String getPassword() {
+		return password;
+	}
+
+	@Override
+	public String getBaseUrl() {
+		return baseUrl;
+	}
+
+	@Override
+	public String toString() {
+		StringBuilder builder = new StringBuilder();
+		builder.append("JerseyClient [name=");
+		builder.append(name);
+		builder.append(", https=");
+		builder.append(https);
+		builder.append(", selfSignedCerts=");
+		builder.append(selfSignedCerts);
+		builder.append(", hostname=");
+		builder.append(hostname);
+		builder.append(", port=");
+		builder.append(port);
+		builder.append(", basePath=");
+		builder.append(basePath);
+		builder.append(", userName=");
+		builder.append(userName);
+		builder.append(", password=");
+		builder.append(password);
+		builder.append(", client=");
+		builder.append(client);
+		builder.append(", baseUrl=");
+		builder.append(baseUrl);
+		builder.append(", alive=");
+		builder.append(alive);
+		builder.append("]");
+		return builder.toString();
+	}
+
+}
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/http/server/HttpServletServer.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/http/server/HttpServletServer.java
new file mode 100644
index 0000000..5f5dd78
--- /dev/null
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/http/server/HttpServletServer.java
@@ -0,0 +1,81 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.openecomp.policy.drools.http.server;
+
+import org.openecomp.policy.drools.properties.Startable;
+
+/**
+ * A Jetty Server to server REST Requests
+ */
+public interface HttpServletServer extends Startable {
+
+	/**
+	 * 
+	 * @return port
+	 */
+	public int getPort();
+	
+	/**
+	 * enables basic authentication with user and password on the the relative path relativeUriPath
+	 * 
+	 * @param user
+	 * @param password
+	 * @param relativeUriPath
+	 */
+	public void setBasicAuthentication(String user, String password, String relativeUriPath);
+
+	/**
+	 * adds a JAX-RS servlet class to serve REST requests
+	 * 
+	 * @param servletPath
+	 * @param restClass
+	 * @throws IllegalArgumentException
+	 * @throws IllegalStateException
+	 */
+	public void addServletClass(String servletPath, String restClass) 
+			throws IllegalArgumentException, IllegalStateException;
+
+	/**
+	 * adds a package containing JAX-RS classes to serve REST requests
+	 * 
+	 * @param servletPath
+	 * @param restPackage
+	 * @throws IllegalArgumentException
+	 * @throws IllegalStateException
+	 */
+	public void addServletPackage(String servletPath, String restPackage) 
+			throws IllegalArgumentException, IllegalStateException;
+	
+	/**
+	 * blocking start of the http server
+	 * 
+	 * @param maxWaitTime max time to wait for the start to take place
+	 * @return true if start was successful
+	 * 
+	 * @throws IllegalArgumentException if arguments are invalid
+	 */
+	public boolean waitedStart(long maxWaitTime) throws IllegalArgumentException;
+	
+	
+	/**
+	 * factory for managing and tracking DMAAP sources
+	 */
+	public static HttpServletServerFactory factory = new IndexedHttpServletServerFactory();
+}
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/http/server/HttpServletServerFactory.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/http/server/HttpServletServerFactory.java
new file mode 100644
index 0000000..bd5ae24
--- /dev/null
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/http/server/HttpServletServerFactory.java
@@ -0,0 +1,206 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.openecomp.policy.drools.http.server;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Properties;
+
+import org.openecomp.policy.common.logging.flexlogger.FlexLogger;
+import org.openecomp.policy.common.logging.flexlogger.Logger;
+import org.openecomp.policy.drools.http.server.internal.JettyJerseyServer;
+import org.openecomp.policy.drools.properties.PolicyProperties;
+
+/**
+ * Jetty Server Factory
+ */
+public interface HttpServletServerFactory {
+
+	public HttpServletServer build(String name, String host, int port, String contextPath, boolean managed)
+		throws IllegalArgumentException;
+	
+	public ArrayList<HttpServletServer> build(Properties properties) throws IllegalArgumentException;
+	
+	public HttpServletServer get(int port);
+	public List<HttpServletServer> inventory();
+	public void destroy(int port);
+	public void destroy();
+}
+
+class IndexedHttpServletServerFactory implements HttpServletServerFactory {
+	
+	protected static Logger  logger = FlexLogger.getLogger(IndexedHttpServletServerFactory.class);	
+	
+	protected HashMap<Integer, JettyJerseyServer> servers = new HashMap<Integer, JettyJerseyServer>();
+
+	@Override
+	public synchronized HttpServletServer build(String name, String host, int port, 
+			                                    String contextPath, boolean managed) 
+		throws IllegalArgumentException {	
+		
+		if (servers.containsKey(port))
+			return servers.get(port);
+		
+		JettyJerseyServer server = new JettyJerseyServer(name, host, port, contextPath);
+		if (managed)
+			servers.put(port, server);
+		
+		return server;
+	}
+	
+	@Override
+	public synchronized ArrayList<HttpServletServer> build(Properties properties) 
+		throws IllegalArgumentException {	
+		
+		ArrayList<HttpServletServer> serviceList = new ArrayList<HttpServletServer>();
+		
+		String serviceNames = properties.getProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES);
+		if (serviceNames == null || serviceNames.isEmpty()) {
+			logger.warn("No topic for HTTP Service " + properties);
+			return serviceList;
+		}
+		
+		List<String> serviceNameList = 
+				new ArrayList<String>(Arrays.asList(serviceNames.split("\\s*,\\s*")));
+		
+		for (String serviceName : serviceNameList) {
+			String servicePortString = properties.getProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + 
+		                                                      serviceName + 
+		                                                      PolicyProperties.PROPERTY_HTTP_PORT_SUFFIX);
+			
+			int servicePort;
+			try {
+				if (servicePortString == null || servicePortString.isEmpty()) {
+					if (logger.isWarnEnabled())
+						logger.warn("No HTTP port for service in " + serviceName);
+					continue;
+				}
+				servicePort = Integer.parseInt(servicePortString);
+			} catch (NumberFormatException nfe) {
+				if (logger.isWarnEnabled())
+					logger.warn("No HTTP port for service in " + serviceName);
+				continue;
+			}
+			
+			String hostName = properties.getProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." +
+                                                     serviceName + 
+                                                     PolicyProperties.PROPERTY_HTTP_HOST_SUFFIX);
+			
+			String contextUriPath = properties.getProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." +
+                                                        serviceName + 
+                                                        PolicyProperties.PROPERTY_HTTP_CONTEXT_URIPATH_SUFFIX);
+			
+			String userName = properties.getProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." +
+					                                 serviceName + 
+                                                     PolicyProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX);
+
+			String password = properties.getProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." +
+                                                     serviceName + 
+                                                     PolicyProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX);
+			
+			String authUriPath = properties.getProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." +
+                                                        serviceName + 
+                                                        PolicyProperties.PROPERTY_HTTP_AUTH_URIPATH_SUFFIX);
+			
+			String restClasses = properties.getProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." +
+                                                      serviceName + 
+                                                      PolicyProperties.PROPERTY_HTTP_REST_CLASSES_SUFFIX);
+			
+			String restPackages = properties.getProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." +
+                                                        serviceName + 
+                                                        PolicyProperties.PROPERTY_HTTP_REST_PACKAGES_SUFFIX);
+			String restUriPath = properties.getProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." +
+                                                         serviceName + 
+                                                         PolicyProperties.PROPERTY_HTTP_REST_URIPATH_SUFFIX);
+			
+			String managedString = properties.getProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." +
+                                                          serviceName + 
+                                                          PolicyProperties.PROPERTY_MANAGED_SUFFIX);		
+			boolean managed = true;
+			if (managedString != null && !managedString.isEmpty()) {
+				managed = Boolean.parseBoolean(managedString);
+			}
+			
+			HttpServletServer service = build(serviceName, hostName, servicePort, contextUriPath, managed);
+			if (userName != null && !userName.isEmpty() && password != null && !password.isEmpty()) {
+				service.setBasicAuthentication(userName, password, authUriPath);
+			}
+			
+			if (restClasses != null && !restClasses.isEmpty()) {
+				List<String> restClassesList = 
+						new ArrayList<String>(Arrays.asList(restClasses.split("\\s*,\\s*")));
+				for (String restClass : restClassesList)
+					service.addServletClass(restUriPath, restClass);
+			}
+			
+			if (restPackages != null && !restPackages.isEmpty()) {
+				List<String> restPackageList = 
+						new ArrayList<String>(Arrays.asList(restPackages.split("\\s*,\\s*")));
+				for (String restPackage : restPackageList)
+					service.addServletPackage(restUriPath, restPackage);
+			}
+			
+			serviceList.add(service);
+		}
+		
+		return serviceList;
+	}
+
+	@Override
+	public synchronized HttpServletServer get(int port) throws IllegalArgumentException {
+		
+		if (servers.containsKey(port)) {
+			return servers.get(port);
+		} 
+		
+		throw new IllegalArgumentException("Http Server for " + port + " not found");
+	}
+
+	@Override
+	public synchronized List<HttpServletServer> inventory() {
+		 return new ArrayList<HttpServletServer>(this.servers.values());
+	}
+	
+	@Override
+	public synchronized void destroy(int port) throws IllegalArgumentException, IllegalStateException {
+		
+		if (!servers.containsKey(port)) {
+			return;
+		}
+		
+		HttpServletServer server = servers.remove(port);
+		server.shutdown();
+	}
+
+	@Override
+	public synchronized void destroy() throws IllegalArgumentException, IllegalStateException {
+		List<HttpServletServer> servers = this.inventory();
+		for (HttpServletServer server: servers) {
+			server.shutdown();
+		}
+		
+		synchronized(this) {
+			this.servers.clear();
+		}
+	}
+	
+}
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/http/server/internal/JettyJerseyServer.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/http/server/internal/JettyJerseyServer.java
new file mode 100644
index 0000000..4914a4c
--- /dev/null
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/http/server/internal/JettyJerseyServer.java
@@ -0,0 +1,130 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.openecomp.policy.drools.http.server.internal;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import org.eclipse.jetty.servlet.ServletHolder;
+
+import org.openecomp.policy.common.logging.flexlogger.FlexLogger;
+import org.openecomp.policy.common.logging.flexlogger.Logger;
+
+/**
+ * REST Jetty Server using Jersey
+ */
+public class JettyJerseyServer extends JettyServletServer {
+	
+	protected static final String JERSEY_PACKAGES_PARAM = "jersey.config.server.provider.packages";
+	protected static final String JERSEY_CLASSNAMES_PARAM = "jersey.config.server.provider.classnames";
+	
+	protected static Logger logger = FlexLogger.getLogger(JettyJerseyServer.class);
+	
+	protected ArrayList<String> packages = new ArrayList<String>();
+	protected HashMap<String, ServletHolder> servlets = 
+							new HashMap<String, ServletHolder>();
+	
+	public JettyJerseyServer(String name, String host, int port, String contextPath) 
+	       throws IllegalArgumentException {		
+		super(name, host, port, contextPath);
+	}
+	
+	protected synchronized ServletHolder getServlet(String servletPath) 
+			  throws IllegalArgumentException {
+		
+		if (servletPath == null || servletPath.isEmpty())
+			servletPath = "/*";
+		
+		ServletHolder jerseyServlet = servlets.get(servletPath);
+		if (jerseyServlet == null) {
+			jerseyServlet = context.addServlet
+	                (org.glassfish.jersey.servlet.ServletContainer.class, servletPath);  
+			jerseyServlet.setInitOrder(0);
+			String initPackages = 
+					jerseyServlet.getInitParameter(JERSEY_PACKAGES_PARAM);
+			if (initPackages == null) {
+		        jerseyServlet.setInitParameter(
+		        		JERSEY_PACKAGES_PARAM,
+		        		"com.jersey.jaxb,com.fasterxml.jackson.jaxrs.json");
+			}
+			this.servlets.put(servletPath, jerseyServlet);
+		}
+		
+		return jerseyServlet;
+	}
+	
+	@Override
+	public synchronized void addServletPackage(String servletPath, String restPackage) 
+	       throws IllegalArgumentException, IllegalStateException {
+		
+    	if (restPackage == null || restPackage.isEmpty())
+			throw new IllegalArgumentException("No discoverable REST package provided");
+		
+		ServletHolder jerseyServlet = this.getServlet(servletPath);
+		if (jerseyServlet == null)
+			throw new IllegalStateException("Unexpected, no Jersey Servlet class");
+		
+		String initPackages = 
+				jerseyServlet.getInitParameter(JERSEY_PACKAGES_PARAM);
+		if (initPackages == null)
+			throw new IllegalStateException("Unexpected, no Init Parameters loaded");
+		
+        jerseyServlet.setInitParameter(
+        		JERSEY_PACKAGES_PARAM,
+        		initPackages + "," + restPackage);
+        
+        if (logger.isDebugEnabled())
+        	logger.debug(this + "Added REST Package: " + jerseyServlet.dump());
+	}
+	
+	@Override
+	public synchronized void addServletClass(String servletPath, String restClass) 
+		       throws IllegalArgumentException, IllegalStateException {
+			
+    	if (restClass == null || restClass.isEmpty())
+			throw new IllegalArgumentException("No discoverable REST class provided");
+		
+		ServletHolder jerseyServlet = this.getServlet(servletPath);
+		if (jerseyServlet == null)
+			throw new IllegalStateException("Unexpected, no Jersey Servlet class");
+		
+		String initClasses = 
+				jerseyServlet.getInitParameter(JERSEY_CLASSNAMES_PARAM);
+		if (initClasses == null)
+			initClasses = restClass;
+		else
+			initClasses = initClasses + "," + restClass;
+		
+        jerseyServlet.setInitParameter(
+        		JERSEY_CLASSNAMES_PARAM,
+        		initClasses);
+        
+        if (logger.isDebugEnabled())
+        	logger.debug(this + "Added REST Class: " + jerseyServlet.dump());
+	}
+
+	@Override
+	public String toString() {
+		StringBuilder builder = new StringBuilder();
+		builder.append("JerseyJettyServer [packages=").append(packages).append(", servlets=").append(servlets)
+			   .append(", toString()=").append(super.toString()).append("]");
+		return builder.toString();
+	}
+}
diff --git a/policy-endpoints/src/main/java/org/openecomp/policy/drools/http/server/internal/JettyServletServer.java b/policy-endpoints/src/main/java/org/openecomp/policy/drools/http/server/internal/JettyServletServer.java
new file mode 100644
index 0000000..74360e8
--- /dev/null
+++ b/policy-endpoints/src/main/java/org/openecomp/policy/drools/http/server/internal/JettyServletServer.java
@@ -0,0 +1,353 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.openecomp.policy.drools.http.server.internal;
+
+import org.eclipse.jetty.security.ConstraintMapping;
+import org.eclipse.jetty.security.ConstraintSecurityHandler;
+import org.eclipse.jetty.security.HashLoginService;
+import org.eclipse.jetty.security.authentication.BasicAuthenticator;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.util.security.Constraint;
+import org.eclipse.jetty.util.security.Credential;
+
+import org.openecomp.policy.common.logging.eelf.MessageCodes;
+import org.openecomp.policy.common.logging.flexlogger.FlexLogger;
+import org.openecomp.policy.common.logging.flexlogger.Logger;
+import org.openecomp.policy.drools.http.server.HttpServletServer;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+/**
+ * Http Server implementation using Embedded Jetty
+ */
+public abstract class JettyServletServer implements HttpServletServer, Runnable {
+	
+	private static Logger logger = FlexLogger.getLogger(JettyServletServer.class);
+
+	protected final String name;
+
+	protected final String host;
+	protected final int port;
+	
+	protected String user;
+	protected String password;
+	
+	protected final String contextPath;
+	
+	protected final Server jettyServer;
+	protected final ServletContextHandler context;
+	protected final ServerConnector connector;
+	
+	protected volatile Thread jettyThread;
+	
+	protected Object startCondition = new Object();
+	
+	public JettyServletServer(String name, String host, int port, String contextPath) 
+		   throws IllegalArgumentException {
+			
+		if (name == null || name.isEmpty())
+			name = "http-" + port;
+		
+		if (port <= 0 && port >= 65535)
+			throw new IllegalArgumentException("Invalid Port provided: " + port);
+		
+		if (host == null || host.isEmpty())
+			host = "localhost";
+		
+		if (contextPath == null || contextPath.isEmpty())
+			contextPath = "/";
+		
+		this.name = name;
+		
+		this.host = host;
+		this.port = port;
+
+		this.contextPath = contextPath;
+		
+        this.context = new ServletContextHandler(ServletContextHandler.SESSIONS);
+        this.context.setContextPath(contextPath);
+        
+        this.jettyServer = new Server();
+        
+        this.connector = new ServerConnector(this.jettyServer);
+        this.connector.setName(name);
+        this.connector.setReuseAddress(true);
+        this.connector.setPort(port);
+        this.connector.setHost(host);    
+        
+        this.jettyServer.addConnector(this.connector);       
+        this.jettyServer.setHandler(context);
+	}
+	
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public void setBasicAuthentication(String user, String password, String servletPath) {
+        if (user == null || user.isEmpty() || password == null || password.isEmpty()) 
+        	throw new IllegalArgumentException("Missing user and/or password");
+        
+        if (servletPath == null || servletPath.isEmpty())
+        	servletPath = "/*";
+        	     	
+    	HashLoginService hashLoginService = new HashLoginService();
+        hashLoginService.putUser(user, 
+        		                Credential.getCredential(password), 
+        		                new String[] {"user"});
+        hashLoginService.setName(this.connector.getName() + "-login-service");
+        
+        Constraint constraint = new Constraint();
+        constraint.setName(Constraint.__BASIC_AUTH);
+        constraint.setRoles(new String[]{"user"});
+        constraint.setAuthenticate(true);
+         
+        ConstraintMapping constraintMapping = new ConstraintMapping();
+        constraintMapping.setConstraint(constraint);
+        constraintMapping.setPathSpec(servletPath);
+        
+        ConstraintSecurityHandler securityHandler = new ConstraintSecurityHandler();
+        securityHandler.setAuthenticator(new BasicAuthenticator());
+        securityHandler.setRealmName(this.connector.getName() + "-realm");
+        securityHandler.addConstraintMapping(constraintMapping);
+        securityHandler.setLoginService(hashLoginService);		
+        
+        this.context.setSecurityHandler(securityHandler);
+        
+		this.user = user;
+		this.password = password;
+	}
+	
+	/**
+	 * Jetty Server Execution
+	 */
+	@Override
+	public void run() {
+        try {        	
+        	if (logger.isInfoEnabled())
+        		logger.info(this + " STARTING " + this.jettyServer.dump());
+        	
+            this.jettyServer.start();
+            
+        	synchronized(this.startCondition) {
+        		this.startCondition.notifyAll();
+        	}
+        	
+            this.jettyServer.join();
+        } catch (Exception e) {
+			logger.warn(MessageCodes.EXCEPTION_ERROR, e,  
+	                    "Error found while running management server", this.toString());
+		} 
+	}
+	
+	@Override
+	public boolean waitedStart(long maxWaitTime) throws IllegalArgumentException {
+		
+		if (maxWaitTime < 0)
+			throw new IllegalArgumentException("max-wait-time cannot be negative");
+		
+		long pendingWaitTime = maxWaitTime;
+		
+		if (!this.start())
+			return false;
+		
+		synchronized (this.startCondition) {
+			
+			while (!this.jettyServer.isRunning()) {
+				try {
+					long startTs = System.currentTimeMillis();
+					
+					this.startCondition.wait(pendingWaitTime);
+					
+					if (maxWaitTime == 0)
+						/* spurious notification */
+						continue;
+					
+					long endTs = System.currentTimeMillis();
+					pendingWaitTime = pendingWaitTime - (endTs - startTs);
+					
+					if (logger.isInfoEnabled())
+						logger.info(this + "Pending time is " + pendingWaitTime + 
+								    " ms.");
+					
+					if (pendingWaitTime <= 0)
+						return false;
+					
+				} catch (InterruptedException e) {
+					logger.warn("waited-start has been interrupted");
+					return false;			
+				}
+			}
+			
+			return (this.jettyServer.isRunning());
+		}
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public boolean start() throws IllegalStateException {
+		if (logger.isDebugEnabled())
+			logger.debug(this + "START");
+		
+		synchronized(this) {			
+			if (jettyThread == null || 
+				!this.jettyThread.isAlive()) {
+				
+				this.jettyThread = new Thread(this);
+				this.jettyThread.setName(this.name + "-" + this.port);
+				this.jettyThread.start();
+			}
+		}
+		
+		return true;
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public boolean stop() throws IllegalStateException {
+		logger.info(this + "STOP");
+		
+		synchronized(this) {
+			if (jettyThread == null) {
+				return true;
+			} 
+			
+			if (!jettyThread.isAlive()) {
+				this.jettyThread = null;
+			} 
+			
+			try {
+				this.connector.stop();
+			} catch (Exception e) {
+				logger.error(MessageCodes.EXCEPTION_ERROR, e,  
+				           "Error while stopping management server", this.toString());
+				e.printStackTrace();
+			}
+			
+			try {
+				this.jettyServer.stop();
+			} catch (Exception e) {
+				logger.error(MessageCodes.EXCEPTION_ERROR, e,  
+						           "Error while stopping management server", this.toString());
+				return false;
+			}
+			
+			Thread.yield();
+		}
+
+		return true;
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public void shutdown() throws IllegalStateException {
+		logger.info(this + "SHUTDOWN");
+		
+		this.stop();
+		
+		if (this.jettyThread == null)
+			return;
+		
+		Thread jettyThreadCopy = this.jettyThread;
+
+		if (jettyThreadCopy.isAlive()) {
+			try {
+				jettyThreadCopy.join(1000L);
+			} catch (InterruptedException e) {
+				logger.warn(MessageCodes.EXCEPTION_ERROR, e,  
+				                  "Error while shutting down management server", this.toString());
+			}
+			if (!jettyThreadCopy.isInterrupted()) {
+				try {
+					jettyThreadCopy.interrupt();
+				} catch(Exception e) {
+					// do nothing
+					logger.warn("exception while shutting down (OK)");
+				}
+			}
+		}
+		
+		this.jettyServer.destroy();
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public boolean isAlive() {
+		if (this.jettyThread != null)
+			return this.jettyThread.isAlive();
+		
+		return false;
+	}
+
+	@Override
+	public int getPort() {
+		return this.port;
+	}
+
+	/**
+	 * @return the name
+	 */
+	public String getName() {
+		return name;
+	}
+
+	/**
+	 * @return the host
+	 */
+	public String getHost() {
+		return host;
+	}
+
+	/**
+	 * @return the user
+	 */
+	public String getUser() {
+		return user;
+	}
+
+	/**
+	 * @return the password
+	 */
+	@JsonIgnore
+	public String getPassword() {
+		return password;
+	}
+
+	@Override
+	public String toString() {
+		StringBuilder builder = new StringBuilder();
+		builder.append("JettyServer [name=").append(name).append(", host=").append(host).append(", port=").append(port)
+				.append(", user=").append(user).append(", password=").append((password != null)).append(", contextPath=")
+				.append(contextPath).append(", jettyServer=").append(jettyServer).append(", context=").append(this.context)
+				.append(", connector=").append(connector).append(", jettyThread=").append(jettyThread)
+				.append("]");
+		return builder.toString();
+	}
+
+}
diff --git a/policy-endpoints/src/main/resources/schema/pdpd-configuration.jsonschema b/policy-endpoints/src/main/resources/schema/pdpd-configuration.jsonschema
new file mode 100644
index 0000000..34ee199
--- /dev/null
+++ b/policy-endpoints/src/main/resources/schema/pdpd-configuration.jsonschema
@@ -0,0 +1,61 @@
+{
+	"title": "ENGINE-CONFIGURATION",
+	"type":"object",
+	"$schema": "http://json-schema.org/draft-03/schema",
+	"required":false,
+	"properties":{
+		"requestID": {
+			"description": "Unique Transaction ID.   This is an UUID.",
+			"type":"string",
+			"required":true
+		},
+		"entity": {
+			"description": "Set of entities on which configuration can be performed: controller",
+			"type":"string",
+			"required":true
+		},
+		"controllers": {
+			"description": "Controller Information, only applicable when the entity is set to controller",
+			"type":"array",
+			"required":false,
+			"items": {
+				"description": "Drools Related Information",
+				"type":"object",
+				"required":true,
+				"properties":{
+					"name": {
+						"type":"string",
+						"required":true
+					},
+					"operation": {
+						"description": "Set of operations that can be applied to a controller: create, lock",
+						"type":"string",
+						"required":true
+					},
+					"drools": {
+						"description": "Maven Related Information",
+						"type":"object",
+						"required":false,
+						"properties":{
+							"artifactId": {
+								"description": "Maven Artifact ID",
+								"type":"string",
+								"required":true
+							},
+							"groupId": {
+								"description": "Maven Group ID",
+								"type":"string",
+								"required":true
+							},
+							"version": {
+								"description": "Maven Version",
+								"type":"string",
+								"required":true
+							}
+						}
+					}
+				}
+			}
+		}
+	}
+}
diff --git a/policy-endpoints/src/test/java/org/openecomp/policy/drools/http/server/test/HttpClientTest.java b/policy-endpoints/src/test/java/org/openecomp/policy/drools/http/server/test/HttpClientTest.java
new file mode 100644
index 0000000..ced3dcf
--- /dev/null
+++ b/policy-endpoints/src/test/java/org/openecomp/policy/drools/http/server/test/HttpClientTest.java
@@ -0,0 +1,230 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.openecomp.policy.drools.http.server.test;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Properties;
+
+import javax.ws.rs.core.Response;
+
+import org.junit.Test;
+import org.openecomp.policy.drools.http.client.HttpClient;
+import org.openecomp.policy.drools.http.server.HttpServletServer;
+import org.openecomp.policy.drools.properties.PolicyProperties;
+
+public class HttpClientTest {
+	
+	@Test
+	public void testHttpNoAuthClient() throws Exception {		
+		System.out.println("-- testHttpNoAuthClient() --");
+
+		HttpServletServer server = HttpServletServer.factory.build("echo", "localhost", 6666, "/", true);
+		server.addServletPackage("/*", this.getClass().getPackage().getName());
+		server.waitedStart(5000);
+		
+		HttpClient client = HttpClient.factory.build("testHttpNoAuthClient", false, false, 
+				                                     "localhost", 6666, "junit/echo", 
+				                                     null, null, true);
+		Response response = client.get("hello");
+		String body = HttpClient.getBody(response, String.class);
+		
+		assertTrue(response.getStatus() == 200);
+		assertTrue(body.equals("hello"));
+		
+		HttpServletServer.factory.destroy();
+		HttpClient.factory.destroy();
+	}
+	
+	@Test
+	public void testHttpAuthClient() throws Exception {		
+		System.out.println("-- testHttpAuthClient() --");
+
+		HttpServletServer server = HttpServletServer.factory.build("echo", "localhost", 6666, "/", true);
+		server.setBasicAuthentication("x", "y", null);
+		server.addServletPackage("/*", this.getClass().getPackage().getName());
+		server.waitedStart(5000);
+		
+		HttpClient client = HttpClient.factory.build("testHttpAuthClient",false, false, 
+				                                     "localhost", 6666, "junit/echo", 
+				                                     "x", "y", true);
+		Response response = client.get("hello");
+		String body = HttpClient.getBody(response, String.class);
+		
+		assertTrue(response.getStatus() == 200);
+		assertTrue(body.equals("hello"));
+		
+		HttpServletServer.factory.destroy();
+		HttpClient.factory.destroy();
+	}
+	
+	@Test
+	public void testHttpAuthClient401() throws Exception {		
+		System.out.println("-- testHttpAuthClient401() --");
+
+		HttpServletServer server = HttpServletServer.factory.build("echo", "localhost", 6666, "/", true);
+		server.setBasicAuthentication("x", "y", null);
+		server.addServletPackage("/*", this.getClass().getPackage().getName());
+		server.waitedStart(5000);
+		
+		HttpClient client = HttpClient.factory.build("testHttpAuthClient401",false, false, 
+				                                     "localhost", 6666, "junit/echo", 
+				                                     null, null, true);
+		Response response = client.get("hello");
+		assertTrue(response.getStatus() == 401);
+		
+		HttpServletServer.factory.destroy();
+		HttpClient.factory.destroy();
+	}
+	
+  //@Test 
+   public void testHttpAuthClientHttps() throws Exception {                             
+		System.out.println("-- testHttpAuthClientHttps() --");
+
+		HttpClient client = HttpClient.factory.build("testHttpAuthClientHttps", true, true, "somehost.somewhere.com",
+				9091, "pap/test", "testpap", "alpha123", true);
+		Response response = client.get();
+		assertTrue(response.getStatus() == 200);
+
+		HttpClient client2 = HttpClient.factory.build("testHttpAuthClientHttps2", true, true, "somehost.somewhere.com",
+				8081, "pdp", "testpdp", "alpha123", true);
+		Response response2 = client2.get("test");
+		assertTrue(response2.getStatus() == 500);
+
+		HttpServletServer.factory.destroy();
+		HttpClient.factory.destroy();
+    }
+    
+    @Test
+    public void testHttpAuthClientProps() throws Exception {
+		System.out.println("-- testHttpAuthClientProps() --");
+		
+		Properties httpProperties = new Properties();
+		
+		httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES, "PAP,PDP");
+		httpProperties.setProperty
+			(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_HTTP_HOST_SUFFIX, 
+			 "localhost");
+		httpProperties.setProperty
+			(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_HTTP_PORT_SUFFIX, 
+			 "9091");
+		httpProperties.setProperty
+			(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX, 
+			 "testpap");
+		httpProperties.setProperty
+			(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX, 
+			 "alpha123");
+		httpProperties.setProperty
+			(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_HTTP_REST_CLASSES_SUFFIX, 
+			 "org.openecomp.policy.drools.http.server.test.RestMockHealthCheck");
+		httpProperties.setProperty
+			(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_MANAGED_SUFFIX, 
+			 "true");
+		
+		httpProperties.setProperty
+			(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_HTTP_HOST_SUFFIX, 
+			 "localhost");
+		httpProperties.setProperty
+			(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_HTTP_PORT_SUFFIX, 
+			 "8081");
+		httpProperties.setProperty
+			(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX, 
+			 "testpdp");
+		httpProperties.setProperty
+			(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX, 
+			 "alpha123");
+		httpProperties.setProperty
+			(PolicyProperties.PROPERTY_HTTP_SERVER_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_HTTP_REST_CLASSES_SUFFIX, 
+			 "org.openecomp.policy.drools.http.server.test.RestMockHealthCheck");
+		httpProperties.setProperty
+			(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_MANAGED_SUFFIX, 
+			 "true");
+		
+		httpProperties.setProperty(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES, "PAP,PDP");
+		httpProperties.setProperty
+			(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_HTTP_HOST_SUFFIX, 
+			 "localhost");
+		httpProperties.setProperty
+			(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_HTTP_PORT_SUFFIX, 
+			 "9091");
+		httpProperties.setProperty
+			(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_HTTP_URL_SUFFIX, 
+			 "pap/test");
+		httpProperties.setProperty
+			(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX, 
+			 "false");
+		httpProperties.setProperty
+			(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX, 
+			 "testpap");
+		httpProperties.setProperty
+			(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX, 
+			 "alpha123");
+		httpProperties.setProperty
+			(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PAP" + PolicyProperties.PROPERTY_MANAGED_SUFFIX, 
+			 "true");
+		
+		httpProperties.setProperty
+			(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_HTTP_HOST_SUFFIX, 
+			 "localhost");
+		httpProperties.setProperty
+			(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_HTTP_PORT_SUFFIX, 
+			 "8081");
+		httpProperties.setProperty
+			(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_HTTP_URL_SUFFIX, 
+			 "pdp");
+		httpProperties.setProperty
+			(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_HTTP_HTTPS_SUFFIX, 
+			 "false");
+		httpProperties.setProperty
+			(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_HTTP_AUTH_USERNAME_SUFFIX, 
+			 "testpdp");
+		httpProperties.setProperty
+			(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_HTTP_AUTH_PASSWORD_SUFFIX, 
+			 "alpha123");
+		httpProperties.setProperty
+			(PolicyProperties.PROPERTY_HTTP_CLIENT_SERVICES + "." + "PDP" + PolicyProperties.PROPERTY_MANAGED_SUFFIX, 
+			 "true");
+		
+		ArrayList<HttpServletServer> servers = HttpServletServer.factory.build(httpProperties);
+		assertTrue(servers.size() == 2);
+		
+		ArrayList<HttpClient> clients = HttpClient.factory.build(httpProperties);
+		assertTrue(clients.size() == 2);
+		
+		for (HttpServletServer server: servers) {
+			server.waitedStart(5000);
+		}
+		
+		HttpClient clientPAP = HttpClient.factory.get("PAP");
+		Response response = clientPAP.get();
+		assertTrue(response.getStatus() == 200);
+
+		HttpClient clientPDP = HttpClient.factory.get("PDP");
+		Response response2 = clientPDP.get("test");
+		assertTrue(response2.getStatus() == 500);
+
+		HttpServletServer.factory.destroy();
+		HttpClient.factory.destroy();    	
+    }
+
+
+}
diff --git a/policy-endpoints/src/test/java/org/openecomp/policy/drools/http/server/test/HttpServerTest.java b/policy-endpoints/src/test/java/org/openecomp/policy/drools/http/server/test/HttpServerTest.java
new file mode 100644
index 0000000..94f2980
--- /dev/null
+++ b/policy-endpoints/src/test/java/org/openecomp/policy/drools/http/server/test/HttpServerTest.java
@@ -0,0 +1,181 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * policy-endpoints
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.openecomp.policy.drools.http.server.test;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.URL;
+import java.util.UUID;
+
+import org.junit.Test;
+import org.openecomp.policy.drools.http.server.HttpServletServer;
+
+/**
+ *
+ */
+public class HttpServerTest {
+
+	@Test
+	public void testSingleServer() throws Exception {
+		System.out.println("-- testSingleServer() --");
+		
+		HttpServletServer server = HttpServletServer.factory.build("echo", "localhost", 5678, "/", true);
+		server.addServletPackage("/*", this.getClass().getPackage().getName());
+		server.waitedStart(5000);
+		
+		assertTrue(HttpServletServer.factory.get(5678).isAlive());
+		
+		String echo = "hello";
+		URL url = new URL("http://localhost:5678/junit/echo/" + echo);
+		String response = response(url);
+		System.out.println("Received .. " + response);
+		assertTrue(response.equals(echo));
+		
+		HttpServletServer.factory.destroy();
+		assertTrue(HttpServletServer.factory.inventory().size() == 0);
+	}
+	
+	@Test
+	public void testMultipleServers() throws Exception {
+		System.out.println("-- testMultipleServers() --");
+		
+		HttpServletServer server1 = HttpServletServer.factory.build("echo-1", "localhost", 5678, "/", true);
+		server1.addServletPackage("/*", this.getClass().getPackage().getName());
+		server1.waitedStart(5000);
+		
+		HttpServletServer server2 = HttpServletServer.factory.build("echo-2", "localhost", 5679, "/", true);
+		server2.addServletPackage("/*", this.getClass().getPackage().getName());
+		server2.waitedStart(5000);
+		
+		assertTrue(HttpServletServer.factory.get(5678).isAlive());
+		assertTrue(HttpServletServer.factory.get(5679).isAlive());
+		
+		String echo = "hello";
+		
+		URL url1 = new URL("http://localhost:5678/junit/echo/" + echo);
+		String response1 = response(url1);
+		System.out.println("Received .. " + response1);
+		assertTrue(response1.equals(echo));
+		
+		URL url2 = new URL("http://localhost:5679/junit/echo/" + echo);
+		String response2 = response(url2);
+		System.out.println("Received .. " + response2);
+		assertTrue(response2.equals(echo));
+		
+		HttpServletServer.factory.destroy();		
+		assertTrue(HttpServletServer.factory.inventory().size() == 0);
+	}
+	
+	@Test
+	public void testMultiServicePackage() throws Exception {
+		System.out.println("-- testMultiServicePackage() --");
+		
+		String randomName = UUID.randomUUID().toString();
+		
+		HttpServletServer server = HttpServletServer.factory.build(randomName, "localhost", 5678, "/", true);
+		server.addServletPackage("/*", this.getClass().getPackage().getName());
+		server.waitedStart(5000);
+		
+		assertTrue(HttpServletServer.factory.get(5678).isAlive());
+		
+		String echo = "hello";
+		URL urlService1 = new URL("http://localhost:5678/junit/echo/" + echo);
+		String responseService1 = response(urlService1);
+		System.out.println("Received .. " + responseService1);
+		assertTrue(responseService1.equals(echo));
+		
+		URL urlService2 = new URL("http://localhost:5678/junit/endpoints/http/servers");
+		String responseService2 = response(urlService2);
+		System.out.println("Received .. " + responseService2);
+		assertTrue(responseService2.contains(randomName));
+		
+		HttpServletServer.factory.destroy();		
+		assertTrue(HttpServletServer.factory.inventory().size() == 0);
+	}
+	
+	@Test
+	public void testServiceClass() throws Exception {
+		System.out.println("-- testServiceClass() --");
+		String randomName = UUID.randomUUID().toString();
+		
+		HttpServletServer server = HttpServletServer.factory.build(randomName, "localhost", 5678, "/", true);
+		server.addServletClass("/*", RestEchoService.class.getCanonicalName());
+		server.waitedStart(5000);
+		
+		assertTrue(HttpServletServer.factory.get(5678).isAlive());
+		
+		String echo = "hello";
+		URL urlService1 = new URL("http://localhost:5678/junit/echo/" + echo);
+		String responseService1 = response(urlService1);
+		System.out.println("Received .. " + responseService1);
+		assertTrue(responseService1.equals(echo));
+		
+		HttpServletServer.factory.destroy();		
+		assertTrue(HttpServletServer.factory.inventory().size() == 0);
+	}
+	
+	@Test
+	public void testMultiServiceClass() throws Exception {
+		System.out.println("-- testMultiServiceClass() --");
+		
+		String randomName = UUID.randomUUID().toString();
+		
+		HttpServletServer server = HttpServletServer.factory.build(randomName, "localhost", 5678, "/", true);
+		server.addServletClass("/*", RestEchoService.class.getCanonicalName());
+		server.addServletClass("/*", RestEndpoints.class.getCanonicalName());
+		server.waitedStart(5000);
+		
+		assertTrue(HttpServletServer.factory.get(5678).isAlive());
+		
+		String echo = "hello";
+		URL urlService1 = new URL("http://localhost:5678/junit/echo/" + echo);
+		String responseService1 = response(urlService1);
+		System.out.println("Received .. " + responseService1);
+		assertTrue(responseService1.equals(echo));
+		
+		URL urlService2 = new URL("http://localhost:5678/junit/endpoints/http/servers");
+		String responseService2 = response(urlService2);
+		System.out.println("Received .. " + responseService2);
+		assertTrue(responseService2.contains(randomName));
+		
+		HttpServletServer.factory.destroy();		
+		assertTrue(HttpServletServer.factory.inventory().size() == 0);
+	}
+
+	/**
+	 * @param url
+	 * @throws IOException
+	 */
+	protected String response(URL url) throws IOException {
+		BufferedReader ioReader = new BufferedReader(new InputStreamReader(url.openStream()));
+		String response = "";
+		String line;
+		while ((line = ioReader.readLine()) != null) {
+			response += line; 
+		}
+		return response;
+	}
+	
+	
+
+}
diff --git a/policy-endpoints/src/test/java/org/openecomp/policy/drools/http/server/test/RestEchoService.java b/policy-endpoints/src/test/java/org/openecomp/policy/drools/http/server/test/RestEchoService.java
new file mode 100644
index 0000000..a0320a0
--- /dev/null
+++ b/policy-endpoints/src/test/java/org/openecomp/policy/drools/http/server/test/RestEchoService.java
@@ -0,0 +1,19 @@
+package org.openecomp.policy.drools.http.server.test;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+
+@Path("/junit/echo")
+public class RestEchoService {
+	
+    @GET
+    @Path("{word}")
+    @Produces(MediaType.TEXT_PLAIN)
+    public String echo(@PathParam("word") String word) {   
+    	return word;
+    }
+
+}
diff --git a/policy-endpoints/src/test/java/org/openecomp/policy/drools/http/server/test/RestEndpoints.java b/policy-endpoints/src/test/java/org/openecomp/policy/drools/http/server/test/RestEndpoints.java
new file mode 100644
index 0000000..a00f2ff
--- /dev/null
+++ b/policy-endpoints/src/test/java/org/openecomp/policy/drools/http/server/test/RestEndpoints.java
@@ -0,0 +1,25 @@
+package org.openecomp.policy.drools.http.server.test;
+
+import java.util.List;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+
+import org.openecomp.policy.drools.http.server.HttpServletServer;
+
+@Path("/junit/endpoints")
+public class RestEndpoints {
+
+    @GET
+    @Path("http/servers")
+    @Produces(MediaType.TEXT_PLAIN)
+    public String httpServers() {   
+    	List<HttpServletServer> servers = 
+    			HttpServletServer.factory.inventory();
+    	return servers.toString();
+    }
+    
+    
+}
diff --git a/policy-endpoints/src/test/java/org/openecomp/policy/drools/http/server/test/RestMockHealthCheck.java b/policy-endpoints/src/test/java/org/openecomp/policy/drools/http/server/test/RestMockHealthCheck.java
new file mode 100644
index 0000000..ff1b498
--- /dev/null
+++ b/policy-endpoints/src/test/java/org/openecomp/policy/drools/http/server/test/RestMockHealthCheck.java
@@ -0,0 +1,28 @@
+package org.openecomp.policy.drools.http.server.test;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+
+@Path("/")
+public class RestMockHealthCheck {
+	
+    @GET
+    @Path("pap/test")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response papHealthCheck() {   
+		return Response.status(Status.OK).entity("All Alive").build();
+    }
+    
+    @GET
+    @Path("pdp/test")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response pdpHealthCheck() {   
+		return Response.status(Status.INTERNAL_SERVER_ERROR).entity("At least some Dead").build();
+    }
+
+
+}