BidirectionalTopic should use plain TopicSink

BidirectionalTopicClient use plain TopicSink instead of
TopicSinkClient, because the latter encodes its message, while
BidirectionalTopicClient should not, because encoding should
be left up to the user of the class.

Issue-ID: POLICY-1625
Signed-off-by: Jim Hahn <jrh3@att.com>
Change-Id: I6c67e1ee0c56e96a0efcc90eaf1c0a940902e8b3
diff --git a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java
index a1e0315..57a331c 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClient.java
@@ -38,7 +38,7 @@
 public class BidirectionalTopicClient {
     private final String sinkTopic;
     private final String sourceTopic;
-    private final TopicSinkClient sinkClient;
+    private final TopicSink sink;
     private final TopicSource source;
     private final CommInfrastructure sinkTopicCommInfrastructure;
     private final CommInfrastructure sourceTopicCommInfrastructure;
@@ -55,18 +55,15 @@
         this.sourceTopic = sourceTopic;
 
         // init sinkClient
-        try {
-            // if the manager is overridden here, then override it in the sink client, too
-            this.sinkClient = new TopicSinkClient(sinkTopic) {
-                @Override
-                protected List<TopicSink> getTopicSinks(String topic) {
-                    return BidirectionalTopicClient.this.getTopicEndpointManager().getTopicSinks(topic);
-                }
-            };
-        } catch (TopicSinkClientException e) {
-            throw new BidirectionalTopicClientException(e);
+        List<TopicSink> sinks = getTopicEndpointManager().getTopicSinks(sinkTopic);
+        if (sinks.isEmpty()) {
+            throw new BidirectionalTopicClientException("no sinks for topic: " + sinkTopic);
+        } else if (sinks.size() > 1) {
+            throw new BidirectionalTopicClientException("too many sinks for topic: " + sinkTopic);
         }
 
+        this.sink = sinks.get(0);
+
         // init source
         List<TopicSource> sources = getTopicEndpointManager().getTopicSources(Arrays.asList(sourceTopic));
         if (sources.isEmpty()) {
@@ -77,16 +74,12 @@
 
         this.source = sources.get(0);
 
-        this.sinkTopicCommInfrastructure = sinkClient.getSink().getTopicCommInfrastructure();
+        this.sinkTopicCommInfrastructure = sink.getTopicCommInfrastructure();
         this.sourceTopicCommInfrastructure = source.getTopicCommInfrastructure();
     }
 
-    public TopicSink getSink() {
-        return sinkClient.getSink();
-    }
-
-    public boolean send(Object message) {
-        return sinkClient.send(message);
+    public boolean send(String message) {
+        return sink.send(message);
     }
 
     public void register(TopicListener topicListener) {
diff --git a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientTest.java b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientTest.java
index 9b1018d..3f0a002 100644
--- a/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientTest.java
+++ b/policy-endpoints/src/test/java/org/onap/policy/common/endpoints/event/comm/client/BidirectionalTopicClientTest.java
@@ -33,7 +33,6 @@
 import static org.mockito.Mockito.when;
 
 import java.util.Arrays;
-import java.util.Map;
 import java.util.Properties;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -111,7 +110,6 @@
 
     @Test
     public void testBidirectionalTopicClient_testGetters() {
-        assertNotNull(client.getSinkClient());
         assertSame(sink, client.getSink());
         assertSame(source, client.getSource());
         assertEquals(SINK_TOPIC, client.getSinkTopic());
@@ -127,7 +125,7 @@
     public void testBidirectionalTopicClientExceptions() {
         assertThatThrownBy(() -> new BidirectionalTopicClient2("unknown-sink", SOURCE_TOPIC))
                         .isInstanceOf(BidirectionalTopicClientException.class)
-                        .hasCauseInstanceOf(TopicSinkClientException.class);
+                        .hasMessage("no sinks for topic: unknown-sink");
 
         assertThatThrownBy(() -> new BidirectionalTopicClient2(SINK_TOPIC, "unknown-source"))
                         .isInstanceOf(BidirectionalTopicClientException.class)
@@ -146,8 +144,8 @@
      */
     @Test
     public void testDelegates() {
-        assertTrue(client.send(Map.of("outgoing", "outgoing-text")));
-        verify(sink).send("{\"outgoing\":\"outgoing-text\"}");
+        assertTrue(client.send("hello"));
+        verify(sink).send("hello");
 
         assertTrue(client.offer("incoming"));
         verify(source).offer("incoming");