Add a DMaaP simulator

Added a DMaaP simulator for testing applications.  One current
limitation is that the simulator does not support multiple "subscribers"
on the same topic; if someone gets a message, that message cannot be
subsequently retrieved by anyone else.  The simulator has also not been
tested for concurrent getting and posting.

Also added a way to set the response code DMaaP would return for a get
to test that policy can gracefully handle errors.

It may need some work to become truely its own "feature"

Issue-ID: POLICY-489
Change-Id: I524981bdf5e4e825f13e6197dda11d9498e4f4bf
Signed-off-by: Charles Cole <>
diff --git a/feature-simulators/pom.xml b/feature-simulators/pom.xml
new file mode 100644
index 0000000..767739b
--- /dev/null
+++ b/feature-simulators/pom.xml
@@ -0,0 +1,53 @@
+  ============LICENSE_START=======================================================
+  ONAP Policy Engine - Drools PDP
+  ================================================================================
+  Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+  ================================================================================
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  ============LICENSE_END=========================================================
+  -->
+<project xmlns="" xmlns:xsi="" 
+         xsi:schemaLocation="">
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<groupId>org.onap.policy.drools-pdp</groupId>
+		<artifactId>drools-pdp</artifactId>
+		<version>1.2.0-SNAPSHOT</version>
+	</parent>
+	<artifactId>feature-simulators</artifactId>
+  <name>feature-simulators</name>
+  <properties>
+    <maven.compiler.source>1.8</maven.compiler.source>
+    <>1.8</>
+  </properties>
+  <dependencies>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.onap.policy.drools-pdp</groupId>
+      <artifactId>policy-endpoints</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+  </dependencies>
diff --git a/feature-simulators/src/main/java/org/onap/policy/drools/simulators/ b/feature-simulators/src/main/java/org/onap/policy/drools/simulators/
new file mode 100644
index 0000000..bdabc6e
--- /dev/null
+++ b/feature-simulators/src/main/java/org/onap/policy/drools/simulators/
@@ -0,0 +1,123 @@
+ * ============LICENSE_START=======================================================
+ * feature-simulators
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.policy.drools.simulators;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.BlockingQueue;
+import javax.servlet.http.HttpServletResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+public class DMaaPSimulatorJaxRs {
+	private static final Map<String, BlockingQueue<String>> queues = new ConcurrentHashMap<>();
+	private static final Logger logger = LoggerFactory.getLogger(DMaaPSimulatorJaxRs.class);
+	private static int responseCode = 200;
+	@GET
+	@Path("/{topicName}/{consumeGroup}/{consumerId}")
+	public String subscribe(@DefaultValue("0") @QueryParam("timeout") int timeout, @PathParam("topicName") String topicName,
+	        @Context final HttpServletResponse httpResponse) {
+	    int currentRespCode = responseCode;
+        httpResponse.setStatus(currentRespCode);
+        try {
+            httpResponse.flushBuffer();
+        } catch (IOException e) {
+            final Logger logger = LoggerFactory.getLogger(DMaaPSimulatorJaxRs.class);
+            logger.error("flushBuffer threw: ", e);
+            return "Got an error";
+        }
+	    if (currentRespCode < 200 || currentRespCode >= 300)
+	    {
+	        return "You got response code: " + currentRespCode;
+	    }
+		if (queues.containsKey(topicName)) {
+			BlockingQueue<String> queue = queues.get(topicName);
+			String response = "No Data";
+			try {
+				response = queue.poll(timeout, TimeUnit.MILLISECONDS);
+			} catch (InterruptedException e) {
+				logger.debug("error in DMaaP simulator", e);
+			}
+			if (response == null) {
+			    response = "No Data";
+			}
+			return response;
+		}
+		else if (timeout > 0) {
+			try {
+				Thread.sleep(timeout);
+				if (queues.containsKey(topicName)) {
+					BlockingQueue<String> queue = queues.get(topicName);
+					String response = queue.poll();
+					if (response == null) {
+					    response = "No Data";
+					}
+					return response;
+				}
+			} catch (InterruptedException e) {
+				logger.debug("error in DMaaP simulator", e);
+			}
+		}
+		return "No topic";
+	}
+	@Path("/{topicName}")
+	@Consumes(MediaType.TEXT_PLAIN)
+	public String publish(@PathParam("topicName") String topicName, String body) { 
+		if (queues.containsKey(topicName)) {
+			BlockingQueue<String> queue = queues.get(topicName);
+			queue.offer(body);
+		}
+		else {
+			BlockingQueue<String> queue = new LinkedBlockingQueue<>();
+			queue.offer(body);
+			queues.put(topicName, queue);
+		}
+		return "";
+	}
+	@Path("/setStatus")
+	public String setStatus(@QueryParam("statusCode") int statusCode) {
+	    responseCode = statusCode;
+	    return "Status code set";
+	}
diff --git a/feature-simulators/src/test/java/org/onap/policy/drools/simulators/ b/feature-simulators/src/test/java/org/onap/policy/drools/simulators/
new file mode 100644
index 0000000..415c520
--- /dev/null
+++ b/feature-simulators/src/test/java/org/onap/policy/drools/simulators/
@@ -0,0 +1,364 @@
+ * ============LICENSE_START=======================================================
+ * feature-simulators
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.policy.drools.simulators;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static;
+import java.nio.charset.StandardCharsets;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.onap.policy.drools.http.server.HttpServletServer;
+import org.onap.policy.drools.utils.LoggerUtil;
+import org.onap.policy.drools.utils.NetworkUtil;
+public class DMaaPSimulatorTest {
+	private static final int DMAAPSIM_SERVER_PORT = 6670;
+    @BeforeClass
+    public static void setUpSimulator() {
+        LoggerUtil.setLevel("ROOT", "INFO");
+        LoggerUtil.setLevel("org.eclipse.jetty", "WARN");
+        try {
+        	final HttpServletServer testServer ="dmaapSim",
+        			"localhost", DMAAPSIM_SERVER_PORT, "/", false, true);
+        	testServer.addServletClass("/*", DMaaPSimulatorJaxRs.class.getName());
+        	testServer.waitedStart(5000);
+        	if (!NetworkUtil.isTcpPortOpen("localhost", testServer.getPort(), 5, 10000L))
+        		throw new IllegalStateException("cannot connect to port " + testServer.getPort());
+        } catch (final Exception e) {
+        	fail(e.getMessage());
+        }
+    }
+    @AfterClass
+    public static void tearDownSimulator() {
+        HttpServletServer.factory.destroy();
+    }
+    @Test
+    public void testGetNoData() {
+        int timeout = 1000;
+        Pair <Integer, String> response = dmaapGet("myTopicNoData", timeout);
+        assertNotNull(response);
+        assertNotNull(response.a);
+        assertEquals("No topic", response.b);
+    }
+    @Test
+    public void testSinglePost() {
+        String myTopic = "myTopicSinglePost";
+        String testData = "This is some test data";
+        Pair<Integer, String> response = dmaapPost(myTopic, testData);
+        assertNotNull(response);
+        assertNotNull(response.a);
+        assertNotNull(response.b);
+        response = dmaapGet(myTopic, 1000);
+        assertNotNull(response);
+        assertNotNull(response.a);
+        assertEquals(testData, response.b);
+    }
+    @Test
+    public void testOneTopicMultiPost() {
+        String[] data = {"data point 1", "data point 2", "something random"};
+        String myTopic = "myTopicMultiPost";
+        Pair<Integer, String> response = dmaapPost(myTopic, data[0]);
+        assertNotNull(response);
+        assertNotNull(response.a);
+        assertNotNull(response.b);
+        response = dmaapPost(myTopic, data[1]);
+        assertNotNull(response);
+        assertNotNull(response.a);
+        assertNotNull(response.b);
+        response = dmaapPost(myTopic, data[2]);
+        assertNotNull(response);
+        assertNotNull(response.a);
+        assertNotNull(response.b);
+        response = dmaapGet(myTopic, 1000);
+        assertNotNull(response);
+        assertNotNull(response.a);
+        assertEquals(data[0], response.b);
+        response = dmaapGet(myTopic, 1000);
+        assertNotNull(response);
+        assertNotNull(response.a);
+        assertEquals(data[1], response.b);
+        response = dmaapGet(myTopic, 1000);
+        assertNotNull(response);
+        assertNotNull(response.a);
+        assertEquals(data[2], response.b);
+    }
+    @Test
+    public void testMultiTopic() {
+        String[][] data = {{"Topic one message one", "Topic one message two"}, {"Topic two message one", "Topic two message two"}};
+        String[] topics = {"topic1", "topic2"};
+        Pair<Integer, String> response = dmaapPost(topics[0], data[0][0]);
+        assertNotNull(response);
+        assertNotNull(response.a);
+        assertNotNull(response.b);
+        response = dmaapGet(topics[0], 1000);
+        assertNotNull(response);
+        assertNotNull(response.a);
+        assertEquals(data[0][0], response.b);
+        response = dmaapGet(topics[1], 1000);
+        assertNotNull(response);
+        assertNotNull(response.a);
+        assertEquals("No topic", response.b);
+        response = dmaapPost(topics[1], data[1][0]);
+        assertNotNull(response);
+        assertNotNull(response.a);
+        assertNotNull(response.b);
+        response = dmaapPost(topics[1], data[1][1]);
+        assertNotNull(response);
+        assertNotNull(response.a);
+        assertNotNull(response.b);
+        response = dmaapPost(topics[0], data[0][1]);
+        assertNotNull(response);
+        assertNotNull(response.a);
+        assertNotNull(response.b);
+        response = dmaapGet(topics[1], 1000);
+        assertNotNull(response);
+        assertNotNull(response.a);
+        assertEquals(data[1][0], response.b);
+        response = dmaapGet(topics[0], 1000);
+        assertNotNull(response);
+        assertNotNull(response.a);
+        assertEquals(data[0][1], response.b);
+        response = dmaapGet(topics[1], 1000);
+        assertNotNull(response);
+        assertNotNull(response.a);
+        assertEquals(data[1][1], response.b);
+        response = dmaapGet(topics[0], 1000);
+        assertNotNull(response);
+        assertNotNull(response.a);
+        assertEquals("No Data", response.b);
+    }
+    @Test
+    public void testResponseCode() {
+        Pair<Integer, String> response = dmaapPost("myTopic", "myTopicData");
+        assertNotNull(response);
+        assertNotNull(response.a);
+        assertNotNull(response.b);
+        response = setStatus(503);
+        assertNotNull(response);
+        assertNotNull(response.a);
+        assertNotNull(response.b);
+        response = dmaapGet("myTopic", 500);
+        assertNotNull(response);
+        assertEquals(503, response.a.intValue());
+        assertEquals("You got response code: 503", response.b);
+        response = setStatus(202);
+        assertNotNull(response);
+        assertNotNull(response.a);
+        assertNotNull(response.b);
+        response = dmaapGet("myTopic", 500);
+        assertNotNull(response);
+        assertEquals(202, response.a.intValue());
+        assertEquals("myTopicData", response.b);
+    }
+    private static Pair<Integer, String> dmaapGet (String topic, int timeout) {
+        return dmaapGet(topic, "1", "1", timeout);
+    }
+    private static Pair<Integer, String> dmaapGet (String topic, String consumerGroup, String consumerId, int timeout) {
+        String url = "http://localhost:" + DMAAPSIM_SERVER_PORT + "/events/" + topic + "/" + consumerGroup + "/" + consumerId + "?timeout=" + timeout;
+        try {
+        	URLConnection conn = new URL(url).openConnection();
+            HttpURLConnection httpConn = null;
+            if (conn instanceof HttpURLConnection) {
+            	httpConn = (HttpURLConnection) conn;
+            }
+            else {
+            	fail("connection not set up right");
+            }
+            httpConn.setRequestMethod("GET");
+            httpConn.connect();
+            String response = "";
+            try (BufferedReader connReader = new BufferedReader(new InputStreamReader(httpConn.getInputStream()))) {
+            	String line;
+            	while((line = connReader.readLine()) != null) {
+            		response += line;
+            	}
+            	httpConn.disconnect();
+            	return new Pair<Integer, String>(httpConn.getResponseCode(), response);
+            }
+            catch (IOException e) {
+            	if (e.getMessage().startsWith("Server returned HTTP response code")) {
+            		System.out.println("hi");
+            		BufferedReader connReader = new BufferedReader(new InputStreamReader(httpConn.getErrorStream()));
+            		String line;
+            		while((line = connReader.readLine()) != null) {
+                		response += line;
+                	}
+                	httpConn.disconnect();
+                	return new Pair<Integer, String>(httpConn.getResponseCode(), response);
+            	}
+            	else {
+            		fail("we got an exception: " + e);
+            	}
+            }
+        }
+        catch (Exception e) {
+        	fail("we got an exception" + e);
+        }
+        return null;
+    }
+    private static Pair<Integer, String> dmaapPost (String topic, String data) {
+        String url = "http://localhost:" + DMAAPSIM_SERVER_PORT + "/events/" + topic;
+        byte[] postData = data.getBytes(StandardCharsets.UTF_8);
+        try {
+        	URLConnection conn = new URL(url).openConnection();
+        	HttpURLConnection httpConn = null;
+            if (conn instanceof HttpURLConnection) {
+            	httpConn = (HttpURLConnection) conn;
+            }
+            else {
+            	fail("connection not set up right");
+            }
+            httpConn.setRequestMethod("POST");
+            httpConn.setDoOutput(true);
+            httpConn.setRequestProperty( "Content-Type", "text/plain");
+            httpConn.setRequestProperty("Content-Length", ""+postData.length);
+            httpConn.connect();
+            String response = "";
+            try (DataOutputStream connWriter = new DataOutputStream(httpConn.getOutputStream())) {
+            	connWriter.write(postData);
+            	connWriter.flush();
+            }
+            try (BufferedReader connReader = new BufferedReader(new InputStreamReader(httpConn.getInputStream()))) {
+            	String line;
+            	while((line = connReader.readLine()) != null) {
+            		response += line;
+            	}
+            	httpConn.disconnect();
+            	return new Pair<Integer, String>(httpConn.getResponseCode(), response);
+            }
+            catch (IOException e) {
+            	if (e.getMessage().startsWith("Server returned HTTP response code")) {
+            		System.out.println("hi");
+            		BufferedReader connReader = new BufferedReader(new InputStreamReader(httpConn.getErrorStream()));
+            		String line;
+            		while((line = connReader.readLine()) != null) {
+                		response += line;
+                	}
+                	httpConn.disconnect();
+                	return new Pair<Integer, String>(httpConn.getResponseCode(), response);
+            	}
+            	else {
+            		fail("we got an exception: " + e);
+            	}
+            }
+        }
+        catch (Exception e) {
+        	fail("we got an exception: " + e);
+        }
+        return null;
+    }
+    private static Pair<Integer, String> setStatus (int status) {
+        String url = "http://localhost:" + DMAAPSIM_SERVER_PORT + "/events/setStatus?statusCode=" + status;
+        try {
+        	URLConnection conn = new URL(url).openConnection();
+            HttpURLConnection httpConn = null;
+            if (conn instanceof HttpURLConnection) {
+            	httpConn = (HttpURLConnection) conn;
+            }
+            else {
+            	fail("connection not set up right");
+            }
+            httpConn.setRequestMethod("POST");
+            httpConn.connect();
+            String response = "";
+            try (BufferedReader connReader = new BufferedReader(new InputStreamReader(httpConn.getInputStream()))) {
+            	String line;
+            	while((line = connReader.readLine()) != null) {
+            		response += line;
+            	}
+            	httpConn.disconnect();
+            	return new Pair<Integer, String>(httpConn.getResponseCode(), response);
+            }
+            catch (IOException e) {
+            	if (e.getMessage().startsWith("Server returned HTTP response code")) {
+            		System.out.println("hi");
+            		BufferedReader connReader = new BufferedReader(new InputStreamReader(httpConn.getErrorStream()));
+            		String line;
+            		while((line = connReader.readLine()) != null) {
+                		response += line;
+                	}
+                	httpConn.disconnect();
+                	return new Pair<Integer, String>(httpConn.getResponseCode(), response);
+            	}
+            	else {
+            		fail("we got an exception: " + e);
+            	}
+            }
+        }
+        catch (Exception e) {
+        	fail("we got an exception" + e);
+        }
+        return null;
+    }
+    private static class Pair<A, B> {
+		public final A a;
+		public final B b;
+		public Pair(A a, B b) {
+			this.a = a;
+			this.b = b;
+		}
+	}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 8e25b61..44ae9b8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -77,6 +77,7 @@
+		<module>feature-simulators</module>