Support server-side filtering for UEB

Cambria client supports server-side filtering.  Enhanced the TopicSource
classes that use cambria client to support server-side filtering.
Updated license date in one of the files.

Change-Id: Ia56d73c7a5f3ab960418709c1ea7f1e73aa4ba87
Issue-ID: POLICY-577
Signed-off-by: Jim Hahn <jrh3@att.com>
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/FilterableTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/FilterableTopicSource.java
new file mode 100644
index 0000000..b1e0e1c
--- /dev/null
+++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/FilterableTopicSource.java
@@ -0,0 +1,39 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.drools.event.comm;
+
+/**
+ * TopicSource that supports server-side filtering.
+ */
+public interface FilterableTopicSource extends TopicSource {
+
+    /**
+     * Sets the server-side filter.
+     * 
+     * @param filter new filter value, or {@code null}
+     * @throws UnsupportedOperationException if the consumer does not support
+     *         server-side filtering
+     * @throws IllegalArgumentException if the consumer cannot be built with the
+     *         new filter
+     */
+    public void setFilter(String filter);
+
+}
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java
index 984baa7..db240b3 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/BusConsumer.java
@@ -61,19 +61,39 @@
   public void close();
 
   /**
+   * BusConsumer that supports server-side filtering.
+   */
+  public interface FilterableBusConsumer extends BusConsumer {
+
+        /**
+         * Sets the server-side filter.
+         * 
+         * @param filter new filter value, or {@code null}
+         * @throws IllegalArgumentException if the consumer cannot be built with
+         *         the new filter
+         */
+        public void setFilter(String filter);
+  }
+
+  /**
    * Cambria based consumer
    */
-  public static class CambriaConsumerWrapper implements BusConsumer {
+  public static class CambriaConsumerWrapper implements FilterableBusConsumer {
 
     /**
      * logger
      */
     private static Logger logger = LoggerFactory.getLogger(CambriaConsumerWrapper.class);
+    
+    /**
+     * Used to build the consumer.
+     */
+    private final ConsumerBuilder builder;
 
     /**
      * Cambria client
      */
-    protected CambriaConsumer consumer;
+    protected volatile CambriaConsumer consumer;
 
     /**
      * fetch timeout
@@ -105,7 +125,7 @@
 
       this.fetchTimeout = fetchTimeout;
 
-      final ConsumerBuilder builder = new CambriaClientBuilders.ConsumerBuilder();
+      this.builder = new CambriaClientBuilders.ConsumerBuilder();
 
       if (useHttps) {
 
@@ -158,6 +178,19 @@
     }
 
     @Override
+    public void setFilter(String filter) {
+        logger.info("{}: setting DMAAP server-side filter: {}", this, filter);
+        builder.withServerSideFilter(filter);
+
+        try {
+            consumer = builder.build();
+            
+        } catch (MalformedURLException | GeneralSecurityException e) {
+            throw new IllegalArgumentException(e);
+        }
+    }
+
+    @Override
     public String toString() {
       final StringBuilder builder = new StringBuilder();
       builder.append("CambriaConsumerWrapper [fetchTimeout=").append(fetchTimeout).append("]");
diff --git a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java
index b7df8ca..9b2be6a 100644
--- a/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java
+++ b/policy-endpoints/src/main/java/org/onap/policy/drools/event/comm/bus/internal/SingleThreadedBusTopicSource.java
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * policy-endpoints
  * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -27,9 +27,10 @@
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
+import org.onap.policy.drools.event.comm.FilterableTopicSource;
 import org.onap.policy.drools.event.comm.TopicListener;
 import org.onap.policy.drools.event.comm.bus.BusTopicSource;
+import org.onap.policy.drools.event.comm.bus.internal.BusConsumer.FilterableBusConsumer;
 
 /**
  * This topic source implementation specializes in reading messages
@@ -37,7 +38,7 @@
  */
 public abstract class SingleThreadedBusTopicSource 
        extends BusTopicBase
-       implements Runnable, BusTopicSource {
+       implements Runnable, BusTopicSource, FilterableTopicSource {
 	   
 	/**
 	 * Not to be converted to PolicyLogger.
@@ -286,6 +287,16 @@
 	
 
 	@Override
+    public void setFilter(String filter) {
+	    if(consumer instanceof FilterableBusConsumer) {
+	        ((FilterableBusConsumer) consumer).setFilter(filter);
+	        
+	    } else {
+	        throw new UnsupportedOperationException("no server-side filtering for topic " + topic);
+	    }
+    }
+
+    @Override
 	public String toString() {
 		StringBuilder builder = new StringBuilder();
 		builder.append("SingleThreadedBusTopicSource [consumerGroup=").append(consumerGroup)