Implementation of a RESTCONF client that can establish SSE connections
to a RESTCONF server. This is for receiving notification events to
support closed-loop for IBNs.

Issue-ID: CCSDK-3595
Signed-off-by: Hesam Rahimi <>
Change-Id: I2a3be01b03d889b41d4608011436d8b587a621e5
diff --git a/plugins/restconf-client/provider/pom.xml b/plugins/restconf-client/provider/pom.xml
index ca1b284..e82690a 100755
--- a/plugins/restconf-client/provider/pom.xml
+++ b/plugins/restconf-client/provider/pom.xml
@@ -140,7 +140,17 @@
-            <scope>test</scope>
+            <version>2.27</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+            <version>2.6</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
diff --git a/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/ b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/
new file mode 100644
index 0000000..e0854f4
--- /dev/null
+++ b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/
@@ -0,0 +1,175 @@
+package org.onap.ccsdk.sli.plugins.restconfdiscovery;
+import org.apache.commons.lang3.StringUtils;
+import java.util.Objects;
+import java.util.Optional;
+public class DefaultRestSBDevice implements RestSBDevice{
+    private static final String REST = "rest";
+    private static final String COLON = ":";
+    private final String ip;
+    private final int port;
+    private final String username;
+    private final String password;
+    private boolean isActive;
+    private String protocol;
+    private String url;
+    private boolean isProxy;
+    private final Optional<String> testUrl;
+    private final Optional<String> manufacturer;
+    private final Optional<String> hwVersion;
+    private final Optional<String> swVersion;
+    public DefaultRestSBDevice(String ip, int port, String name, String password,
+                               String protocol, String url, boolean isActive) {
+        this(ip, port, name, password, protocol, url, isActive, "", "", "", "");
+    }
+    public DefaultRestSBDevice(String ip, int port, String name, String password,
+                               String protocol, String url, boolean isActive, String testUrl, String manufacturer,
+                               String hwVersion,
+                               String swVersion) {
+        Preconditions.checkNotNull(ip, "IP address cannot be null");
+        Preconditions.checkArgument(port > 0, "Port address cannot be negative");
+        Preconditions.checkNotNull(protocol, "protocol address cannot be null");
+        this.ip = ip;
+        this.port = port;
+        this.username = name;
+        this.password = StringUtils.isEmpty(password) ? null : password;
+        this.isActive = isActive;
+        this.protocol = protocol;
+        this.url = StringUtils.isEmpty(url) ? null : url;
+        this.manufacturer = StringUtils.isEmpty(manufacturer) ?
+                Optional.empty() : Optional.ofNullable(manufacturer);
+        this.hwVersion = StringUtils.isEmpty(hwVersion) ?
+                Optional.empty() : Optional.ofNullable(hwVersion);
+        this.swVersion = StringUtils.isEmpty(swVersion) ?
+                Optional.empty() : Optional.ofNullable(swVersion);
+        this.testUrl = StringUtils.isEmpty(testUrl) ?
+                Optional.empty() : Optional.ofNullable(testUrl);
+        if (this.manufacturer.isPresent()
+                && this.hwVersion.isPresent()
+                && this.swVersion.isPresent()) {
+            this.isProxy = true;
+        } else {
+            this.isProxy = false;
+        }
+    }
+    @Override
+    public String ip() {
+        return ip;
+    }
+    @Override
+    public int port() {
+        return port;
+    }
+    @Override
+    public String username() {
+        return username;
+    }
+    @Override
+    public String password() {
+        return password;
+    }
+    @Override
+    public DeviceId deviceId() {
+        try {
+            return DeviceId.deviceId(new URI(REST, ip + COLON + port, null));
+        } catch (URISyntaxException e) {
+            throw new IllegalArgumentException("Cannot create deviceID " +
+                    REST + COLON + ip +
+                    COLON + port, e);
+        }
+    }
+    @Override
+    public void setActive(boolean active) {
+        isActive = active;
+    }
+    @Override
+    public boolean isActive() {
+        return isActive;
+    }
+    @Override
+    public String protocol() {
+        return protocol;
+    }
+    @Override
+    public String url() {
+        return url;
+    }
+    @Override
+    public boolean isProxy() {
+        return isProxy;
+    }
+    @Override
+    public Optional<String> testUrl() {
+        return testUrl;
+    }
+    @Override
+    public Optional<String> manufacturer() {
+        return manufacturer;
+    }
+    @Override
+    public Optional<String> hwVersion() {
+        return hwVersion;
+    }
+    @Override
+    public Optional<String> swVersion() {
+        return swVersion;
+    }
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+                .omitNullValues()
+                .add("url", url)
+                .add("testUrl", testUrl)
+                .add("protocol", protocol)
+                .add("username", username)
+                .add("port", port)
+                .add("ip", ip)
+                .add("manufacturer", manufacturer.orElse(null))
+                .add("hwVersion", hwVersion.orElse(null))
+                .add("swVersion", swVersion.orElse(null))
+                .toString();
+    }
+    @Override
+    public boolean equals(Object obj) {
+        if (obj == this) {
+            return true;
+        }
+        if (!(obj instanceof RestSBDevice)) {
+            return false;
+        }
+        RestSBDevice device = (RestSBDevice) obj;
+        return this.username.equals(device.username()) && this.ip.equals(device.ip()) &&
+                this.port == device.port();
+    }
+    @Override
+    public int hashCode() {
+        return Objects.hash(ip, port);
+    }
diff --git a/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/ b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/
new file mode 100644
index 0000000..f0df024
--- /dev/null
+++ b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/
@@ -0,0 +1,86 @@
+package org.onap.ccsdk.sli.plugins.restconfdiscovery;
+import java.util.Objects;
+import static;
+public class DeviceId {
+    /**
+     * Represents either no device, or an unspecified device.
+     */
+    public static final DeviceId NONE = deviceId("none:none");
+    private static final int DEVICE_ID_MAX_LENGTH = 1024;
+    private final URI uri;
+    private final String str;
+    // Public construction is prohibited
+    private DeviceId(URI uri) {
+        this.uri = uri;
+        this.str = uri.toString().toLowerCase();
+    }
+    // Default constructor for serialization
+    protected DeviceId() {
+        this.uri = null;
+        this.str = null;
+    }
+    /**
+     * Creates a device id using the supplied URI.
+     *
+     * @param uri device URI
+     * @return DeviceId
+     */
+    public static DeviceId deviceId(URI uri) {
+        return new DeviceId(uri);
+    }
+    /**
+     * Creates a device id using the supplied URI string.
+     *
+     * @param string device URI string
+     * @return DeviceId
+     */
+    public static DeviceId deviceId(String string) {
+        checkArgument(string.length() <= DEVICE_ID_MAX_LENGTH,
+                "deviceId exceeds maximum length " + DEVICE_ID_MAX_LENGTH);
+        return deviceId(URI.create(string));
+    }
+    /**
+     * Returns the backing URI.
+     *
+     * @return backing URI
+     */
+    public URI uri() {
+        return uri;
+    }
+    @Override
+    public int hashCode() {
+        return str.hashCode();
+    }
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj instanceof DeviceId) {
+            final DeviceId that = (DeviceId) obj;
+            return this.getClass() == that.getClass() &&
+                    Objects.equals(this.str, that.str);
+        }
+        return false;
+    }
+    @Override
+    public String toString() {
+        return str;
+    }
diff --git a/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/ b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/
new file mode 100644
index 0000000..aa36a39
--- /dev/null
+++ b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/
@@ -0,0 +1,93 @@
+package org.onap.ccsdk.sli.plugins.restconfdiscovery;
+import java.util.Map;
+ * Abstraction of a RESTCONF controller. Serves as a one stop shop for obtaining
+ * RESTCONF southbound devices and (un)register listeners.
+ */
+public interface RestConfSBController {
+    /**
+     * Returns all the devices known to this controller.
+     *
+     * @return map of devices
+     */
+    Map<DeviceId, RestSBDevice> getDevices();
+    /**
+     * Returns a device by node identifier.
+     *
+     * @param deviceInfo node identifier
+     * @return RestSBDevice rest device
+     */
+    RestSBDevice getDevice(DeviceId deviceInfo);
+    /**
+     * Returns a device by Ip and Port.
+     *
+     * @param ip device ip
+     * @param port device port
+     * @return RestSBDevice rest device
+     */
+    RestSBDevice getDevice(String ip, int port);
+    /**
+     * Adds a device to the device map.
+     *
+     * @param device to be added
+     */
+    void addDevice(RestSBDevice device);
+    /**
+     * Removes the device from the devices map.
+     *
+     * @param deviceId to be removed
+     */
+    void removeDevice(DeviceId deviceId);
+    /**
+     * This method is to be called by whoever is interested to receive
+     * Notifications from a specific device. It does a REST GET request
+     * with specified parameters to the device, and calls the provided
+     * callBackListener upon receiving notifications to notify the requester
+     * about notifications.
+     *
+     * @param device           device to make the request to
+     * @param request          url of the request
+     * @param mediaType        format to retrieve the content in
+     * @param callBackListener method to call when notifications arrives
+     */
+    void enableNotifications(DeviceId device, String request, String mediaType,
+                             RestconfNotificationEventListener callBackListener);
+    /**
+     * Registers a listener for notification events that occur to restconf
+     * devices.
+     *
+     * @param deviceId identifier of the device to which the listener is attached
+     * @param listener the listener to notify
+     */
+    void addNotificationListener(DeviceId deviceId,
+                                 RestconfNotificationEventListener listener);
+    /**
+     * Unregisters the listener for the device.
+     *
+     * @param deviceId identifier of the device for which the listener
+     *                 is to be removed
+     * @param listener listener to be removed
+     */
+    void removeNotificationListener(DeviceId deviceId,
+                                    RestconfNotificationEventListener listener);
+    /**
+     * Returns true if a listener has been installed to listen to RESTCONF
+     * notifications sent from a particular device.
+     *
+     * @param deviceId identifier of the device from which the notifications
+     *                 are generated
+     * @return true if listener is installed; false otherwise
+     */
+    boolean isNotificationEnabled(DeviceId deviceId);
diff --git a/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/ b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/
new file mode 100644
index 0000000..206b02e
--- /dev/null
+++ b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/
@@ -0,0 +1,104 @@
+package org.onap.ccsdk.sli.plugins.restconfdiscovery;
+import java.util.Optional;
+public interface RestSBDevice {
+    /**
+     * Returns the ip of this device.
+     *
+     * @return ip
+     */
+    String ip();
+    /**
+     * Returns the password of this device.
+     *
+     * @return port
+     */
+    int port();
+    /**
+     * Returns the username of this device.
+     *
+     * @return username
+     */
+    String username();
+    /**
+     * Returns the password of this device.
+     *
+     * @return password
+     */
+    String password();
+    /**
+     * Returns the ONOS deviceID for this device.
+     *
+     * @return DeviceId
+     */
+    DeviceId deviceId();
+    /**
+     * Sets or unsets the state of the device.
+     *
+     * @param active boolean
+     */
+    void setActive(boolean active);
+    /**
+     * Returns the state of this device.
+     *
+     * @return state
+     */
+    boolean isActive();
+    /**
+     * Returns the protocol for the REST request, usually HTTP o HTTPS.
+     *
+     * @return protocol
+     */
+    String protocol();
+    /**
+     * Returns the url for the REST requests, to be used instead of IP and PORT.
+     *
+     * @return url
+     */
+    String url();
+    /**
+     * Returns the proxy state of this device
+     * (if true, the device is proxying multiple ONOS devices).
+     * @return proxy state
+     */
+    boolean isProxy();
+    /**
+     * Returns the url for the REST TEST requests.
+     *
+     * @return testUrl
+     */
+    Optional<String> testUrl();
+    /**
+     * The manufacturer of the rest device.
+     *
+     * @return the name of the manufacturer
+     */
+    Optional<String> manufacturer();
+    /**
+     * The hardware version of the rest device.
+     *
+     * @return the hardware version
+     */
+    Optional<String> hwVersion();
+    /**
+     * The software version of rest device.
+     *
+     * @return the software version.
+     */
+    Optional<String> swVersion();
diff --git a/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/ b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/
index d6b93f7..bda6854 100644
--- a/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/
+++ b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/
@@ -20,7 +20,12 @@
 package org.onap.ccsdk.sli.plugins.restconfdiscovery;
 import org.onap.ccsdk.sli.core.sli.SvcLogicContext;
 import org.onap.ccsdk.sli.core.sli.SvcLogicException;
@@ -29,6 +34,7 @@
 import org.onap.ccsdk.sli.plugins.restapicall.RestapiCallNode;
 import org.onap.ccsdk.sli.plugins.restconfapicall.RestconfApiCallNode;
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@@ -36,40 +42,63 @@
+import java.util.HashSet;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.*;
+import static org.onap.ccsdk.sli.plugins.restapicall.JsonParser.convertToProperties;
 import static org.slf4j.LoggerFactory.getLogger;
  * Representation of a plugin to subscribe for notification and then
  * to handle the received notifications.
-public class RestconfDiscoveryNode implements SvcLogicDiscoveryPlugin {
+public class RestconfDiscoveryNode implements RestConfSBController, SvcLogicDiscoveryPlugin {
     private static final Logger log = getLogger(RestconfDiscoveryNode.class);
-    private ExecutorService executor = Executors.newCachedThreadPool();
-    private Map<String, PersistentConnection> runnableInfo = new ConcurrentHashMap<>();
-    private RestconfApiCallNode restconfApiCallNode;
-    private volatile Map<String, SubscriptionInfo> subscriptionInfoMap = new ConcurrentHashMap<>();
-    private volatile LinkedBlockingQueue<String> eventQueue = new LinkedBlockingQueue<>();
+    private static final String ROOT_RESOURCE = "/restconf";
     private static final String SUBSCRIBER_ID = "subscriberId";
     private static final String RESPONSE_CODE = "response-code";
     private static final String RESPONSE_PREFIX = "responsePrefix";
     private static final String OUTPUT_IDENTIFIER = "ietf-subscribed-notif" +
+    private static final String OUTPUT_IDENTIFIER_NO_PREFIX = "output.identifier";
     private static final String RESPONSE_CODE_200 = "200";
     private static final String SSE_URL = "sseConnectURL";
+    private static final String REST_API_URL = "restapiUrl";
+    private static final String RESOURCE_PATH_PREFIX = "/data/";
+    private static final String NOTIFICATION_PATH_PREFIX = "/streams/";
+    private static final String DEVICE_IP = "deviceIp";
+    private static final String DEVICE_PORT = "devicePort";
+    private static final String DOUBLESLASH = "//";
+    private static final String COLON = ":";
+    private RestconfApiCallNode restconfApiCallNode;
+    private RestapiCallNode restapiCallNode = new RestapiCallNode();
+    private volatile Map<String, SubscriptionInfo> subscriptionInfoMap = new ConcurrentHashMap<>();
+    private volatile LinkedBlockingQueue<String> eventQueue = new LinkedBlockingQueue<>();
+    private Map<DeviceId, Set<RestconfNotificationEventListener>>
+            restconfNotificationListenerMap = new ConcurrentHashMap<>();
+    private Map<DeviceId, GetChunksRunnable>
+            runnableTable = new ConcurrentHashMap<>();
+    private Map<DeviceId, String> subscribedDevicesTable = new ConcurrentHashMap<>();
+    private Map<DeviceId, BlockingQueue<String>> eventQMap = new ConcurrentHashMap<>();
+    private Map<DeviceId, InternalRestconfEventProcessorRunnable>
+            processorRunnableTable = new ConcurrentHashMap<>();
+    private Map<String, PersistentConnection> runnableInfo = new ConcurrentHashMap<>();
+    private final Map<DeviceId, RestSBDevice> deviceMap = new ConcurrentHashMap<>();
+    private final Map<DeviceId, Client> clientMap = new ConcurrentHashMap<>();
+    private ExecutorService executor = Executors.newCachedThreadPool();
      * Creates an instance of RestconfDiscoveryNode and starts processing of
@@ -78,12 +107,89 @@
      * @param r restconf api call node
     public RestconfDiscoveryNode(RestconfApiCallNode r) {
+"inside RestconfDiscoveryNode Constructor");
         this.restconfApiCallNode = r;
-        ExecutorService e = Executors.newFixedThreadPool(20);
-        EventProcessor p = new EventProcessor(this);
-        for (int i = 0; i < 20; ++i) {
-            e.execute(p);
+        this.activate();
+//        ExecutorService e = Executors.newFixedThreadPool(20);
+//        EventProcessor p = new EventProcessor(this);
+//        for (int i = 0; i < 20; ++i) {
+//            e.execute(p);
+//        }
+    }
+    public void activate() {
+"RESTCONF SBI Started");
+    }
+    public void deactivate() {
+"RESTCONF SBI Stopped");
+        executor.shutdown();
+        this.getClientMap().clear();
+        this.getDeviceMap().clear();
+    }
+    public Map<DeviceId, RestSBDevice> getDeviceMap() {
+        return deviceMap;
+    }
+    public Map<DeviceId, Client> getClientMap() {
+        return clientMap;
+    }
+    @Override
+    public Map<DeviceId, RestSBDevice> getDevices() {
+        log.trace("RESTCONF SBI::getDevices");
+        return ImmutableMap.copyOf(deviceMap);
+    }
+    @Override
+    public RestSBDevice getDevice(DeviceId deviceInfo) {
+        log.trace("RESTCONF SBI::getDevice with deviceId");
+        return deviceMap.get(deviceInfo);
+    }
+    @Override
+    public RestSBDevice getDevice(String ip, int port) {
+        log.trace("RESTCONF SBI::getDevice with ip and port");
+        try {
+            if (!deviceMap.isEmpty()) {
+                return deviceMap.values().stream().filter(v -> v.ip().equals(ip) && v.port() == port).findFirst().get();
+            }
+        } catch (NoSuchElementException noSuchElementException) {
+            log.error("getDevice::device {}:{} does not exist in deviceMap", ip, port);
+        return null;
+    }
+    @Override
+    public void addDevice(RestSBDevice device) {
+        log.trace("RESTCONF SBI::addDevice");
+        if (!deviceMap.containsKey(device.deviceId())) {
+            if (device.username() != null) {
+                String username = device.username();
+                String password = device.password() == null ? "" : device.password();
+    //                authenticate(client, username, password);
+            }
+            BlockingQueue<String> newBlockingQueue = new LinkedBlockingQueue<>();
+            eventQMap.put(device.deviceId(), newBlockingQueue);
+            InternalRestconfEventProcessorRunnable eventProcessorRunnable =
+                    new InternalRestconfEventProcessorRunnable(device.deviceId());
+            processorRunnableTable.put(device.deviceId(), eventProcessorRunnable);
+            log.trace("addDevice::restconf event processor runnable is created and is going for execute");
+            executor.execute(eventProcessorRunnable);
+            log.trace("addDevice::restconf event processor runnable was sent for execute");
+            deviceMap.put(device.deviceId(), device);
+        } else {
+            log.warn("addDevice::Trying to add a device which already exists {}", device.deviceId());
+        }
+    }
+    @Override
+    public void removeDevice(DeviceId deviceId) {
+        log.trace("RESTCONF SBI::removeDevice");
+        eventQMap.remove(deviceId);
+        clientMap.remove(deviceId);
+        deviceMap.remove(deviceId);
@@ -107,6 +213,62 @@
+    public void establishSubscriptionOnly(Map<String, String> paramMap, SvcLogicContext ctx)
+            throws SvcLogicException {
+        String subscriberId = paramMap.get(SUBSCRIBER_ID);
+        if (subscriberId == null) {
+            throw new SvcLogicException("Subscriber Id is null");
+        }
+        String subscribeUrlString = paramMap.get(REST_API_URL);
+        URL subscribeUrl = null;
+        RestSBDevice dev = null;
+        try {
+            subscribeUrl = new URL(subscribeUrlString);
+            dev = getDevice(subscribeUrl.getHost(), subscribeUrl.getPort());
+        } catch (MalformedURLException e) {
+            log.error("establishSubscriptionOnly::MalformedURLException happened. e: {}", e);
+            return;
+        }
+        if (dev == null) {
+            log.warn("establishSubscriptionOnly::device does not exist in the map. Trying to create one now.");
+            //FIXME: TODO: create a new RestSBDevice and add it to the map, as well as a client and clientMap
+            dev = new DefaultRestSBDevice(subscribeUrl.getHost(),
+                    subscribeUrl.getPort(), "onos", "rocks", "http",
+                    subscribeUrl.getHost() + ":" + subscribeUrl.getPort(), true);
+            this.addDevice(dev);
+        }
+        if (!subscribedDevicesTable.containsKey(dev.deviceId())) {
+  "establishSubscriptionOnly::The device {} has not been subscribed yet. " +
+                    "Trying to subscribe it now...");
+            restapiCallNode.sendRequest(paramMap, ctx);
+            if (getResponseCode(paramMap.get(RESPONSE_PREFIX), ctx).equals(RESPONSE_CODE_200)) {
+                // TODO: save subscription id and subscriber in MYSQL
+                String id = getOutputIdentifierNoPrefix(paramMap.get(RESPONSE_PREFIX), ctx);
+      "establishSubscriptionOnly::Subscription is done successfully and " +
+                        "the output.identifier is: {}", id);
+      "establishSubscriptionOnly::The subscriptionID returned by the server " +
+                        "does not exist in the map. Adding it now...");
+                subscribedDevicesTable.put(dev.deviceId(), id);
+                SvcLogicGraphInfo callbackDG = new SvcLogicGraphInfo(paramMap.get("module"),
+                        paramMap.get("rpc"),
+                        paramMap.get("version"),
+                        paramMap.get("mode"));
+                SubscriptionInfo info = new SubscriptionInfo();
+                info.callBackDG(callbackDG);
+                info.subscriptionId(id);
+                info.subscriberId(subscriberId);
+                subscriptionInfoMap.put(id, info);
+            }
+        }
+    }
+    @Override
     public void modifySubscription(Map<String, String> paramMap, SvcLogicContext ctx) {
         // TODO: to be implemented
@@ -252,6 +414,10 @@
         return ctx.getAttribute(getPrefix(prefix) + RESPONSE_CODE);
+    String getOutputIdentifierNoPrefix(String prefix, SvcLogicContext ctx) {
+        return ctx.getAttribute(getPrefix(prefix) + OUTPUT_IDENTIFIER_NO_PREFIX);
+    }
      * Returns subscription id from event.
@@ -278,6 +444,34 @@
         return null;
+    private String getUrlString(DeviceId deviceId, String request) {
+        RestSBDevice restSBDevice = deviceMap.get(deviceId);
+        if (restSBDevice == null) {
+            log.warn("getUrlString::restSbDevice cannot be NULL!");
+            return "";
+        }
+        if (restSBDevice.url() != null) {
+            return restSBDevice.protocol() + COLON + DOUBLESLASH + restSBDevice.url() + request;
+        } else {
+            return restSBDevice.protocol() + COLON + DOUBLESLASH + restSBDevice.ip().toString()
+                    + COLON + restSBDevice.port() + request;
+        }
+    }
+    private String getSubscriptionIdFromDeviceId(DeviceId deviceId) {
+        if (subscribedDevicesTable.containsKey(deviceId)) {
+            return subscribedDevicesTable.get(deviceId);
+        }
+        return null;
+    }
+    private BlockingQueue<String> getEventQ(DeviceId deviceId) {
+        if (eventQMap.containsKey(deviceId)) {
+            return eventQMap.get(deviceId);
+        }
+        return null;
+    }
      * Returns restconfApiCallNode.
@@ -311,4 +505,316 @@
     void eventQueue(LinkedBlockingQueue<String> eventQueue) {
         this.eventQueue = eventQueue;
+    /**
+     * Establishes a persistent SSE connection between the client and the server.
+     *
+     * @param paramMap input paramter map
+     * @param ctx service logic context
+     */
+    @Override
+    public void establishPersistentSseConnection(Map<String, String> paramMap, SvcLogicContext ctx) throws SvcLogicException {
+        //TODO: FIXME: remove the instantiation of info; not useful
+        String subscriberId = paramMap.get(SUBSCRIBER_ID);
+        SvcLogicGraphInfo callbackDG = new SvcLogicGraphInfo(paramMap.get("module"),
+                paramMap.get("rpc"),
+                paramMap.get("version"),
+                paramMap.get("mode"));
+        SubscriptionInfo info = new SubscriptionInfo();
+        info.callBackDG(callbackDG);
+        info.subscriberId(subscriberId);
+        String sseUrlString = paramMap.get(SSE_URL);
+        URL sseUrl = null;
+        RestSBDevice dev = null;
+        try {
+            sseUrl = new URL(sseUrlString);
+            dev = getDevice(sseUrl.getHost(), sseUrl.getPort());
+        } catch (MalformedURLException e) {
+            log.error("establishPersistentSseConnection::MalformedURLException happened. e: {}", e);
+            return;
+        }
+        if (dev == null) {
+            log.warn("establishPersistentSseConnection::device does not exist in the map. Trying to add one now.");
+            dev = new DefaultRestSBDevice(sseUrl.getHost(),
+                    sseUrl.getPort(), "onos", "rocks", "http",
+                    sseUrl.getHost() + ":" + sseUrl.getPort(), true);
+            this.addDevice(dev);
+        }
+        if (isNotificationEnabled(dev.deviceId())) {
+            log.warn("establishPersistentSseConnection::notifications already enabled on device: {}",
+                    dev.deviceId());
+            return;
+        }
+        if (getSubscriptionIdFromDeviceId(dev.deviceId()) == null) {
+            log.warn("This device {} has not yet been subscribed to receive notifications.",
+                    dev.deviceId());
+            return;
+        }
+        RestconfNotificationEventListenerImpl myListener =
+                new RestconfNotificationEventListenerImpl(info);
+        enableNotifications(dev.deviceId(), "yang-push-json", "json", myListener);
+    }
+    @Override
+    public void enableNotifications(DeviceId device, String request,
+                                    String mediaType,
+                                    RestconfNotificationEventListener listener) {
+        if (isNotificationEnabled(device)) {
+            log.warn("enableNotifications::already enabled on device: {}", device);
+            return;
+        }
+        request = discoverRootResource(device) + NOTIFICATION_PATH_PREFIX
+                + request;
+        addNotificationListener(device, listener);
+        GetChunksRunnable runnable = new GetChunksRunnable(request, mediaType,
+                device);
+        runnableTable.put(device, runnable);
+        executor.execute(runnable);
+    }
+    public void stopNotifications(DeviceId device) {
+        try {
+            runnableTable.get(device).terminate();
+            processorRunnableTable.get(device).terminate();
+        } catch (Exception ex) {
+            log.error("stopNotifications::Exception happened when terminating, ex: {}", ex);
+        }
+"stopNotifications::Runnable is now terminated");
+        runnableTable.remove(device);
+        processorRunnableTable.remove(device);
+        restconfNotificationListenerMap.remove(device);
+        log.debug("stopNotifications::Stop sending notifications for device URI: " + device.uri().toString());
+    }
+    @Override
+    public void deleteSubscriptionAndSseConnection(Map<String, String> paramMap, SvcLogicContext ctx) {
+        String deleteSubscribeUrlString = paramMap.get(REST_API_URL);
+        URL deleteSubscribeUrl = null;
+        RestSBDevice dev = null;
+        try {
+            deleteSubscribeUrl = new URL(deleteSubscribeUrlString);
+            dev = getDevice(deleteSubscribeUrl.getHost(), deleteSubscribeUrl.getPort());
+        } catch (MalformedURLException e) {
+            log.error("establishSubscriptionOnly::MalformedURLException happened. e: {}", e);
+            return;
+        }
+        String deviceIp = deleteSubscribeUrl.getHost();
+        String devicePort = String.valueOf(deleteSubscribeUrl.getPort());
+"deleteSubscriptionAndSseConnection::Trying to unsubscribe device {}:{}",
+                deviceIp, devicePort);
+        if (dev == null) {
+            log.error("deleteSubscriptionAndSseConnection::device does not exist in the map");
+            return;
+        }
+        String subscriptionId = getSubscriptionIdFromDeviceId(dev.deviceId());
+        if (subscriptionId != null) {
+  "deleteSubscriptionAndSseConnection::SubscriptionID is found {}", subscriptionId);
+  "deleteSubscriptionAndSseConnection::About to send unsubscribe request");
+            try {
+                restapiCallNode.sendRequest(paramMap, ctx);
+                if (getResponseCode(paramMap.get(RESPONSE_PREFIX), ctx).equals(RESPONSE_CODE_200)) {
+          "deleteSubscriptionAndSseConnection::Successfully unsubscribed");
+                    stopNotifications(dev.deviceId());
+                    subscribedDevicesTable.remove(dev.deviceId());
+                    String id = getSubscriptionId(paramMap.get(SUBSCRIBER_ID));
+                    if (id != null) {
+                        subscriptionInfoMap.remove(id);
+                    }
+                } else {
+          "deleteSubscriptionAndSseConnection::Unsubscription was NOT successfull");
+                }
+            } catch (SvcLogicException e) {
+                log.error("deleteSubscriptionAndSseConnection::Exception happened ex: {}", e);
+            }
+        } else {
+            log.warn("deleteSubscriptionAndSseConnection::This device has already been unsubscribed");
+        }
+    }
+    /**
+     * Notifies providers about incoming RESTCONF notification events.
+     */public class GetChunksRunnable implements Runnable {
+        private String request;
+        private String mediaType;
+        private DeviceId deviceId;
+        private volatile boolean running = true;
+        public void terminate() {
+  "GetChunksRunnable.terminate()::threadID: {}",
+                    Thread.currentThread().getId());
+            running = false;
+        }
+        /**
+         * @param request   request
+         * @param mediaType media type
+         * @param deviceId    device identifier
+         */
+        public GetChunksRunnable(String request, String mediaType,
+                                 DeviceId deviceId) {
+            this.request = request;
+            this.mediaType = mediaType;
+            this.deviceId = deviceId;
+        }
+        @Override
+        public void run() {
+            log.trace(" is: {} ...., running is: {}",
+                    Thread.currentThread().getId(), running);
+            try {
+                Client client = ClientBuilder.newBuilder()
+                        .register(SseFeature.class).build();
+                WebTarget target =, request));
+                log.trace(" URI is {}", target.getUri().toString());
+                Response response = target.request().get();
+                EventInput eventInput = response.readEntity(EventInput.class);
+                log.trace(" eventInput");
+                String rcvdData = "";
+                while (!eventInput.isClosed() && running) {
+                    log.trace(" while ...");
+                    final InboundEvent inboundEvent =;
+                    log.trace(" ...");
+                    if (inboundEvent == null) {
+                        // connection has been closed
+              " has been closed ...");
+                        break;
+                    }
+                    if (running) {
+                        rcvdData = inboundEvent.readData(String.class);
+                        BlockingQueue<String> eventQ = getEventQ(deviceId);
+                        if (eventQ != null) {
+                            eventQ.add(rcvdData);
+                            eventQMap.put(deviceId, eventQ);
+                            log.trace(" got filled.");
+                        } else {
+                            log.error(" has not been initialized for this device {}",
+                                    deviceId);
+                        }
+                    } else {
+              " has changed to false while " +
+                                "was blocked to receive new notifications");
+              " client is no longer interested to " +
+                                "receive notifications.");
+                        break;
+                    }
+                }
+                if (!running) {
+                    log.trace(" is false! " +
+                                    "closing eventInput, threadID: {}", Thread.currentThread().getId());
+                    eventInput.close();
+                    response.close();
+                    client.close();
+          " is closed in run()");
+                }
+            } catch (Exception ex) {
+      " got some exception: {}, threadID: {} ", ex,
+                        Thread.currentThread().getId());
+            }
+            log.trace(" Runnable Try Catch. threadID: {} ",
+                    Thread.currentThread().getId());
+        }
+    }
+    public class InternalRestconfEventProcessorRunnable implements Runnable {
+        private volatile boolean running = true;
+        private DeviceId deviceId;
+        public InternalRestconfEventProcessorRunnable(DeviceId deviceId) {
+            this.deviceId = deviceId;
+        }
+        public void terminate() {
+  "InternalRestconfEventProcessorRunnable.terminate()::threadID: {}",
+                    Thread.currentThread().getId());
+            running = false;
+        }
+        @Override
+        public void run() {
+            log.trace("InternalRestconfEventProcessorRunnable::restconf event processor runnable inside run()");
+            while (running) {
+                try {
+                    if (eventQMap != null && !eventQMap.isEmpty() && eventQMap.get(deviceId) != null) {
+                        log.trace("InternalRestconfEventProcessorRunnable::waiting for take()");
+                        if (running) {
+                            String eventJsonString = eventQMap.get(deviceId).take();
+                            log.trace("InternalRestconfEventProcessorRunnable::after take()");
+                  "InternalRestconfEventProcessorRunnable::eventJsonString is {}", eventJsonString);
+                            Map<String, String> param = convertToProperties(eventJsonString);
+                            String idString = param.get("push-change-update.subscription-id");
+                            SubscriptionInfo info = subscriptionInfoMap().get(idString);
+                            if (info != null) {
+                                SvcLogicContext ctx = setContext(param);
+                                SvcLogicGraphInfo callbackDG = info.callBackDG();
+                                callbackDG.executeGraph(ctx);
+                            }
+                        } else {
+                  " has changed to false " +
+                                    "while eventQ was blocked to process new notifications");
+                  "" +
+                                    "the client is no longer interested to receive notifications.");
+                            break;
+                        }
+                    }
+                } catch (InterruptedException | SvcLogicException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+        private SvcLogicContext setContext(Map<String, String> param) {
+            SvcLogicContext ctx = new SvcLogicContext();
+            for (Map.Entry<String, String> entry : param.entrySet()) {
+                ctx.setAttribute(entry.getKey(), entry.getValue());
+            }
+            return ctx;
+        }
+    }
+    public String discoverRootResource(DeviceId device) {
+        return ROOT_RESOURCE;
+    }
+    @Override
+    public void addNotificationListener(DeviceId deviceId,
+                                        RestconfNotificationEventListener listener) {
+        Set<RestconfNotificationEventListener> listeners =
+                restconfNotificationListenerMap.get(deviceId);
+        if (listeners == null) {
+            listeners = new HashSet<>();
+        }
+        listeners.add(listener);
+        this.restconfNotificationListenerMap.put(deviceId, listeners);
+    }
+    @Override
+    public void removeNotificationListener(DeviceId deviceId,
+                                           RestconfNotificationEventListener listener) {
+        Set<RestconfNotificationEventListener> listeners =
+                restconfNotificationListenerMap.get(deviceId);
+        if (listeners != null) {
+            listeners.remove(listener);
+        }
+    }
+    public boolean isNotificationEnabled(DeviceId deviceId) {
+        return runnableTable.containsKey(deviceId);
+    }
diff --git a/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/ b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/
new file mode 100644
index 0000000..292e17e
--- /dev/null
+++ b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/
@@ -0,0 +1,15 @@
+package org.onap.ccsdk.sli.plugins.restconfdiscovery;
+ * Notifies providers about incoming RESTCONF notification events.
+ */
+public interface RestconfNotificationEventListener<T> {
+    /**
+     * Handles the notification event.
+     *
+     * @param deviceId restconf device identifier
+     * @param event    event payload
+     */
+    void handleNotificationEvent(DeviceId deviceId, T event);
diff --git a/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/ b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/
new file mode 100644
index 0000000..fa2fa02
--- /dev/null
+++ b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/
@@ -0,0 +1,22 @@
+package org.onap.ccsdk.sli.plugins.restconfdiscovery;
+import org.slf4j.Logger;
+import static org.slf4j.LoggerFactory.getLogger;
+public class RestconfNotificationEventListenerImpl implements
+        RestconfNotificationEventListener<String> {
+    private final Logger log = getLogger(getClass());
+    SubscriptionInfo info;
+    public RestconfNotificationEventListenerImpl(SubscriptionInfo info) {
+ = info;
+    }
+    @Override
+    public void handleNotificationEvent(DeviceId deviceId, String eventJsonString) {
+"New notification: {} for device: {}",
+                eventJsonString, deviceId.toString());
+    }
diff --git a/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/ b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/
index dfe8cd5..972fb2b 100644
--- a/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/
+++ b/plugins/restconf-client/provider/src/main/java/org/onap/ccsdk/sli/plugins/restconfdiscovery/
@@ -62,6 +62,68 @@
     void establishSubscription(Map<String, String> paramMap, SvcLogicContext ctx) throws SvcLogicException;
+     * Allows directed graphs to subscribe to a restconf server to receive notifications from that server.
+     * @param paramMap HashMap<String,String> of parameters passed by the DG to this function
+     * <table border="1">
+     *  <thead><th>parameter</th><th>Mandatory/Optional</th><th>description</th><th>example values</th></thead>
+     *  <tbody>
+     *      <tr><td>templateDirName</td><td>Optional</td><td>full path to YANG directory that can be used to build a request</td><td>/sdncopt/bvc/resconfapi/test</td></tr>
+     *      <tr><td>establishSubscriptionURL</td><td>Mandatory</td><td>url to establish connection with server</td><td></td></tr>
+     *      <tr><td>sseConnectURL</td><td>Mandatory</td><td>url to setup SSE connection with server</td><td></td></tr>
+     *      <tr><td>callbackDG</td><td>Mandatory</td><td>callback DG to process the received notification</td><td>Resource-Discovery:handleSOTNTopology</td></tr>
+     *      <tr><td>filterURL</td><td>Optional</td><td>url which needs to be subscribed, if null subscribe to all</td><td></td></tr>
+     *      <tr><td>subscriptionType</td><td>Optional</td><td>type of subscription, periodic or onDataChange</td><td>onDataChange</td></tr>
+     *      <tr><td>updateFrequency</td><td>Optional</td><td>update frequency in milli seconds when subscription type is periodic</td><td>1000</td></tr>
+     *      <tr><td>restapiUser</td><td>Optional</td><td>user name to use for http basic authentication</td><td>sdnc_ws</td></tr>
+     *      <tr><td>restapiPassword</td><td>Optional</td><td>unencrypted password to use for http basic authentication</td><td>plain_password</td></tr>
+     *      <tr><td>contentType</td><td>Optional</td><td>http content type to set in the http header</td><td>usually application/json or application/xml</td></tr>
+     *      <tr><td>format</td><td>Optional</td><td>should match request body format</td><td>json or xml</td></tr>
+     *      <tr><td>responsePrefix</td><td>Optional</td><td>location the notification response will be written to in context memory</td><td>tmp.restconfdiscovery.result</td></tr>
+     *      <tr><td>skipSending</td><td>Optional</td><td></td><td>true or false</td></tr>
+     *      <tr><td>convertResponse </td><td>Optional</td><td>whether the response should be converted</td><td>true or false</td></tr>
+     *      <tr><td>customHttpHeaders</td><td>Optional</td><td>a list additional http headers to be passed in, follow the format in the example</td><td>X-CSI-MessageId=messageId,headerFieldName=headerFieldValue</td></tr>
+     *      <tr><td>dumpHeaders</td><td>Optional</td><td>when true writes http header content to context memory</td><td>true or false</td></tr>
+     *  </tbody>
+     * </table>
+     * @param ctx Reference to context memory
+     * @throws SvcLogicException
+     * @since 11.0.2
+     * @see String#split(String, int)
+     */
+    void establishSubscriptionOnly(Map<String, String> paramMap, SvcLogicContext ctx) throws SvcLogicException;
+    /**
+     * Allows directed graphs to establish a discovery subscription for a given subscriber.
+     * @param paramMap HashMap<String,String> of parameters passed by the DG to this function
+     * <table border="1">
+     *  <thead><th>parameter</th><th>Mandatory/Optional</th><th>description</th><th>example values</th></thead>
+     *  <tbody>
+     *      <tr><td>templateDirName</td><td>Optional</td><td>full path to YANG directory that can be used to build a request</td><td>/sdncopt/bvc/resconfapi/test</td></tr>
+     *      <tr><td>establishSubscriptionURL</td><td>Mandatory</td><td>url to establish connection with server</td><td></td></tr>
+     *      <tr><td>sseConnectURL</td><td>Mandatory</td><td>url to setup SSE connection with server</td><td></td></tr>
+     *      <tr><td>callbackDG</td><td>Mandatory</td><td>callback DG to process the received notification</td><td>Resource-Discovery:handleSOTNTopology</td></tr>
+     *      <tr><td>filterURL</td><td>Optional</td><td>url which needs to be subscribed, if null subscribe to all</td><td></td></tr>
+     *      <tr><td>subscriptionType</td><td>Optional</td><td>type of subscription, periodic or onDataChange</td><td>onDataChange</td></tr>
+     *      <tr><td>updateFrequency</td><td>Optional</td><td>update frequency in milli seconds when subscription type is periodic</td><td>1000</td></tr>
+     *      <tr><td>restapiUser</td><td>Optional</td><td>user name to use for http basic authentication</td><td>sdnc_ws</td></tr>
+     *      <tr><td>restapiPassword</td><td>Optional</td><td>unencrypted password to use for http basic authentication</td><td>plain_password</td></tr>
+     *      <tr><td>contentType</td><td>Optional</td><td>http content type to set in the http header</td><td>usually application/json or application/xml</td></tr>
+     *      <tr><td>format</td><td>Optional</td><td>should match request body format</td><td>json or xml</td></tr>
+     *      <tr><td>responsePrefix</td><td>Optional</td><td>location the notification response will be written to in context memory</td><td>tmp.restconfdiscovery.result</td></tr>
+     *      <tr><td>skipSending</td><td>Optional</td><td></td><td>true or false</td></tr>
+     *      <tr><td>convertResponse </td><td>Optional</td><td>whether the response should be converted</td><td>true or false</td></tr>
+     *      <tr><td>customHttpHeaders</td><td>Optional</td><td>a list additional http headers to be passed in, follow the format in the example</td><td>X-CSI-MessageId=messageId,headerFieldName=headerFieldValue</td></tr>
+     *      <tr><td>dumpHeaders</td><td>Optional</td><td>when true writes http header content to context memory</td><td>true or false</td></tr>
+     *  </tbody>
+     * </table>
+     * @param ctx Reference to context memory
+     * @throws SvcLogicException
+     * @since 11.0.2
+     * @see String#split(String, int)
+     */
+    void establishPersistentSseConnection(Map<String, String> paramMap, SvcLogicContext ctx) throws SvcLogicException;
+    /**
      * Allows directed graphs to modify a discovery subscription for a given subscriber.
      * @param paramMap HashMap<String,String> of parameters passed by the DG to this function
      * <table border="1">
@@ -107,4 +169,18 @@
     void deleteSubscription(Map<String, String> paramMap, SvcLogicContext ctx);
+    /**
+     * Allows directed graphs to unsubscribe from a restconf server and to remove the persistent sse connection.
+     * @param paramMap HashMap<String,String> of parameters passed by the DG to this function
+     * <table border="1">
+     *  <thead><th>parameter</th><th>Mandatory/Optional</th><th>description</th><th>example values</th></thead>
+     *  <tbody>
+     *      <tr><td>subscriberId</td><td>Mandatory</td><td>subscription subscriber's identifier</td><td>topologyId/1111</td></tr>
+     *  </tbody>
+     * </table>
+     * @param ctx Reference to context memory
+     * @throws SvcLogicException
+     */
+    void deleteSubscriptionAndSseConnection(Map<String, String> paramMap, SvcLogicContext ctx);