Merge "Allow storing REST servers password and userName in environment variables"
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java
new file mode 100644
index 0000000..a1e0315
--- /dev/null
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java
@@ -0,0 +1,109 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.event.comm.client;
+
+import java.util.Arrays;
+import java.util.List;
+import lombok.Getter;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
+import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
+import org.onap.policy.common.endpoints.event.comm.TopicListener;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.onap.policy.common.endpoints.event.comm.TopicSource;
+
+/**
+ * A "bidirectional" topic, which is a pair of topics, one of which is used to publish
+ * requests and the other to receive responses.
+ */
+@Getter
+public class BidirectionalTopicClient {
+    private final String sinkTopic;
+    private final String sourceTopic;
+    private final TopicSinkClient sinkClient;
+    private final TopicSource source;
+    private final CommInfrastructure sinkTopicCommInfrastructure;
+    private final CommInfrastructure sourceTopicCommInfrastructure;
+
+    /**
+     * Constructs the object.
+     *
+     * @param sinkTopic sink topic name
+     * @param sourceTopic source topic name
+     * @throws BidirectionalTopicClientException if either topic does not exist
+     */
+    public BidirectionalTopicClient(String sinkTopic, String sourceTopic) throws BidirectionalTopicClientException {
+        this.sinkTopic = sinkTopic;
+        this.sourceTopic = sourceTopic;
+
+        // init sinkClient
+        try {
+            // if the manager is overridden here, then override it in the sink client, too
+            this.sinkClient = new TopicSinkClient(sinkTopic) {
+                @Override
+                protected List<TopicSink> getTopicSinks(String topic) {
+                    return BidirectionalTopicClient.this.getTopicEndpointManager().getTopicSinks(topic);
+                }
+            };
+        } catch (TopicSinkClientException e) {
+            throw new BidirectionalTopicClientException(e);
+        }
+
+        // init source
+        List<TopicSource> sources = getTopicEndpointManager().getTopicSources(Arrays.asList(sourceTopic));
+        if (sources.isEmpty()) {
+            throw new BidirectionalTopicClientException("no sources for topic: " + sourceTopic);
+        } else if (sources.size() > 1) {
+            throw new BidirectionalTopicClientException("too many sources for topic: " + sourceTopic);
+        }
+
+        this.source = sources.get(0);
+
+        this.sinkTopicCommInfrastructure = sinkClient.getSink().getTopicCommInfrastructure();
+        this.sourceTopicCommInfrastructure = source.getTopicCommInfrastructure();
+    }
+
+    public TopicSink getSink() {
+        return sinkClient.getSink();
+    }
+
+    public boolean send(Object message) {
+        return sinkClient.send(message);
+    }
+
+    public void register(TopicListener topicListener) {
+        source.register(topicListener);
+    }
+
+    public boolean offer(String event) {
+        return source.offer(event);
+    }
+
+    public void unregister(TopicListener topicListener) {
+        source.unregister(topicListener);
+    }
+
+    // these may be overridden by junit tests
+
+    protected TopicEndpoint getTopicEndpointManager() {
+        return TopicEndpointManager.getManager();
+    }
+}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientException.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientException.java
new file mode 100644
index 0000000..3a5d727
--- /dev/null
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientException.java
@@ -0,0 +1,49 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.event.comm.client;
+
+/**
+ * Exception thrown by BidirectionalTopicClient class.
+ */
+public class BidirectionalTopicClientException extends Exception {
+    private static final long serialVersionUID = 1L;
+
+    public BidirectionalTopicClientException() {
+        super();
+    }
+
+    public BidirectionalTopicClientException(String message) {
+        super(message);
+    }
+
+    public BidirectionalTopicClientException(Throwable cause) {
+        super(cause);
+    }
+
+    public BidirectionalTopicClientException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public BidirectionalTopicClientException(String message, Throwable cause, boolean enableSuppression,
+                    boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
+}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientTest.java
new file mode 100644
index 0000000..9b1018d
--- /dev/null
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientTest.java
@@ -0,0 +1,191 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.endpoints.event.comm.client;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Properties;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.event.comm.TopicEndpoint;
+import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
+import org.onap.policy.common.endpoints.event.comm.TopicListener;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.onap.policy.common.endpoints.event.comm.TopicSource;
+
+public class BidirectionalTopicClientTest {
+    private static final String SINK_TOPIC = "my-sink-topic";
+    private static final String SOURCE_TOPIC = "my-source-topic";
+
+    private static final CommInfrastructure SINK_INFRA = CommInfrastructure.UEB;
+    private static final CommInfrastructure SOURCE_INFRA = CommInfrastructure.NOOP;
+
+    @Mock
+    private TopicSink sink;
+    @Mock
+    private TopicSource source;
+    @Mock
+    private TopicEndpoint endpoint;
+    @Mock
+    private TopicListener listener;
+
+    private BidirectionalTopicClient client;
+
+    /**
+     * Configures the endpoints.
+     */
+    @BeforeClass
+    public static void setUpBeforeClass() {
+        Properties props = new Properties();
+        props.setProperty("noop.sink.topics", SINK_TOPIC);
+        props.setProperty("noop.source.topics", SOURCE_TOPIC);
+
+        // clear all topics and then configure one sink and one source
+        TopicEndpointManager.getManager().shutdown();
+        TopicEndpointManager.getManager().addTopicSinks(props);
+        TopicEndpointManager.getManager().addTopicSources(props);
+    }
+
+    @AfterClass
+    public static void tearDownAfterClass() {
+        // clear all topics after the tests
+        TopicEndpointManager.getManager().shutdown();
+    }
+
+    /**
+     * Creates mocks and an initial client object.
+     */
+    @Before
+    public void setUp() throws Exception {
+        MockitoAnnotations.initMocks(this);
+
+        when(sink.send(anyString())).thenReturn(true);
+        when(sink.getTopicCommInfrastructure()).thenReturn(SINK_INFRA);
+
+        when(source.offer(anyString())).thenReturn(true);
+        when(source.getTopicCommInfrastructure()).thenReturn(SOURCE_INFRA);
+
+        when(endpoint.getTopicSinks(anyString())).thenReturn(Arrays.asList());
+        when(endpoint.getTopicSinks(SINK_TOPIC)).thenReturn(Arrays.asList(sink));
+
+        when(endpoint.getTopicSources(any())).thenReturn(Arrays.asList());
+        when(endpoint.getTopicSources(eq(Arrays.asList(SOURCE_TOPIC)))).thenReturn(Arrays.asList(source));
+
+        client = new BidirectionalTopicClient2(SINK_TOPIC, SOURCE_TOPIC);
+    }
+
+    @Test
+    public void testBidirectionalTopicClient_testGetters() {
+        assertNotNull(client.getSinkClient());
+        assertSame(sink, client.getSink());
+        assertSame(source, client.getSource());
+        assertEquals(SINK_TOPIC, client.getSinkTopic());
+        assertEquals(SOURCE_TOPIC, client.getSourceTopic());
+        assertEquals(SINK_INFRA, client.getSinkTopicCommInfrastructure());
+        assertEquals(SOURCE_INFRA, client.getSourceTopicCommInfrastructure());
+    }
+
+    /**
+     * Tests the constructor when the sink or source cannot be found.
+     */
+    @Test
+    public void testBidirectionalTopicClientExceptions() {
+        assertThatThrownBy(() -> new BidirectionalTopicClient2("unknown-sink", SOURCE_TOPIC))
+                        .isInstanceOf(BidirectionalTopicClientException.class)
+                        .hasCauseInstanceOf(TopicSinkClientException.class);
+
+        assertThatThrownBy(() -> new BidirectionalTopicClient2(SINK_TOPIC, "unknown-source"))
+                        .isInstanceOf(BidirectionalTopicClientException.class)
+                        .hasMessage("no sources for topic: unknown-source");
+
+        // too many sources
+        when(endpoint.getTopicSources(eq(Arrays.asList(SOURCE_TOPIC)))).thenReturn(Arrays.asList(source, source));
+
+        assertThatThrownBy(() -> new BidirectionalTopicClient2(SINK_TOPIC, SOURCE_TOPIC))
+                        .isInstanceOf(BidirectionalTopicClientException.class)
+                        .hasMessage("too many sources for topic: my-source-topic");
+    }
+
+    /**
+     * Tests the "delegate" methods.
+     */
+    @Test
+    public void testDelegates() {
+        assertTrue(client.send(Map.of("outgoing", "outgoing-text")));
+        verify(sink).send("{\"outgoing\":\"outgoing-text\"}");
+
+        assertTrue(client.offer("incoming"));
+        verify(source).offer("incoming");
+
+        client.register(listener);
+        verify(source).register(listener);
+
+        client.unregister(listener);
+        verify(source).unregister(listener);
+    }
+
+    @Test
+    public void testGetTopicEndpointManager() throws BidirectionalTopicClientException {
+        // use a real manager
+        client = new BidirectionalTopicClient(SINK_TOPIC, SOURCE_TOPIC);
+        assertNotNull(client.getTopicEndpointManager());
+
+        assertNotNull(client.getSink());
+        assertNotNull(client.getSource());
+
+        assertNotSame(sink, client.getSink());
+        assertNotSame(source, client.getSource());
+    }
+
+
+    /**
+     * BidirectionalTopicClient with some overrides.
+     */
+    private class BidirectionalTopicClient2 extends BidirectionalTopicClient {
+
+        public BidirectionalTopicClient2(String sinkTopic, String sourceTopic)
+                        throws BidirectionalTopicClientException {
+            super(sinkTopic, sourceTopic);
+        }
+
+        @Override
+        protected TopicEndpoint getTopicEndpointManager() {
+            return endpoint;
+        }
+    }
+}
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClientExceptionTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/TopicClientExceptionTest.java
similarity index 85%
rename from policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClientExceptionTest.java
rename to policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/TopicClientExceptionTest.java
index c081470..7b64a20 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/TopicSinkClientExceptionTest.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/TopicClientExceptionTest.java
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * ONAP PAP
  * ================================================================================
- * Copyright (C) 2019 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2019-2020 AT&T Intellectual Property. All rights reserved.
  * Modifications Copyright (C) 2019 Nordix Foundation.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
@@ -26,10 +26,11 @@
 import org.junit.Test;
 import org.onap.policy.common.utils.test.ExceptionsTester;
 
-public class TopicSinkClientExceptionTest {
+public class TopicClientExceptionTest {
 
     @Test
     public void test() {
         assertEquals(5, new ExceptionsTester().test(TopicSinkClientException.class));
+        assertEquals(5, new ExceptionsTester().test(BidirectionalTopicClientException.class));
     }
 }
diff --git a/utils-test/src/main/java/org/onap/policy/common/utils/time/PseudoExecutor.java b/utils-test/src/main/java/org/onap/policy/common/utils/time/PseudoExecutor.java
new file mode 100644
index 0000000..d8b792a
--- /dev/null
+++ b/utils-test/src/main/java/org/onap/policy/common/utils/time/PseudoExecutor.java
@@ -0,0 +1,70 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.utils.time;
+
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.Executor;
+import lombok.Getter;
+
+/**
+ * Executor that will run tasks until the queue is empty or a maximum number of tasks have
+ * been executed. Doesn't actually run anything until {@link #runAll()} is invoked.
+ */
+public class PseudoExecutor implements Executor {
+
+    /**
+     * Tasks to be run.
+     */
+    @Getter
+    private final Queue<Runnable> tasks = new LinkedList<>();
+
+
+    /**
+     * Gets the queue length.
+     *
+     * @return the queue length
+     */
+    public int getQueueLength() {
+        return tasks.size();
+    }
+
+    @Override
+    public void execute(Runnable command) {
+        tasks.add(command);
+    }
+
+    /**
+     * Runs all tasks until the queue is empty or the maximum number of tasks have been
+     * reached.
+     *
+     * @param maxTasks maximum number of tasks to run
+     * @return {@code true} if the queue is empty, {@code false} if the maximum number of
+     *         tasks have been reached before the queue was emptied
+     */
+    public boolean runAll(int maxTasks) {
+        for (int count = 0; count < maxTasks && !tasks.isEmpty(); ++count) {
+            tasks.remove().run();
+        }
+
+        return tasks.isEmpty();
+    }
+}
diff --git a/utils-test/src/test/java/org/onap/policy/common/utils/time/PseudoExecutorTest.java b/utils-test/src/test/java/org/onap/policy/common/utils/time/PseudoExecutorTest.java
new file mode 100644
index 0000000..0046f79
--- /dev/null
+++ b/utils-test/src/test/java/org/onap/policy/common/utils/time/PseudoExecutorTest.java
@@ -0,0 +1,60 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.common.utils.time;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class PseudoExecutorTest {
+    private int invoked;
+    private PseudoExecutor executor;
+
+    @Before
+    public void setUp() {
+        invoked = 0;
+        executor = new PseudoExecutor();
+    }
+
+    @Test
+    public void test() {
+        assertEquals(0, executor.getQueueLength());
+        assertEquals(0, executor.getTasks().size());
+        assertTrue(executor.runAll(0));
+
+        executor.execute(() -> invoked++);
+        executor.execute(() -> invoked++);
+        executor.execute(() -> invoked++);
+        assertEquals(3, executor.getTasks().size());
+        assertEquals(3, executor.getQueueLength());
+
+        assertFalse(executor.runAll(2));
+        assertEquals(2, invoked);
+        assertEquals(1, executor.getQueueLength());
+
+        assertTrue(executor.runAll(2));
+        assertEquals(3, invoked);
+        assertEquals(0, executor.getQueueLength());
+    }
+}