Develop a VES Provider

Common VES provider will be used by devicemanager bundles and other bundles for sending VES messages

Issue-ID: SDNC-1188
Signed-off-by: Ravi Pendurty <ravi.pendurty@highstreet-technologies.com>
Change-Id: Ied23b82a528aac23d7bebab272a2f414e67d0866
Signed-off-by: Ravi Pendurty <ravi.pendurty@highstreet-technologies.com>
diff --git a/sdnr/wt/mountpoint-state-provider/provider/pom.xml b/sdnr/wt/mountpoint-state-provider/provider/pom.xml
index 294a33a..f1d221e 100644
--- a/sdnr/wt/mountpoint-state-provider/provider/pom.xml
+++ b/sdnr/wt/mountpoint-state-provider/provider/pom.xml
@@ -69,6 +69,12 @@
         </dependency>
         <dependency>
             <groupId>org.onap.ccsdk.features.sdnr.wt</groupId>
+            <artifactId>sdnr-wt-devicemanager-model</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.onap.ccsdk.features.sdnr.wt</groupId>
             <artifactId>sdnr-wt-netconfnode-state-service-model</artifactId>
             <version>${project.version}</version>
         </dependency>
diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/GeneralConfig.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/GeneralConfig.java
deleted file mode 100644
index 21ca9da..0000000
--- a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/GeneralConfig.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * ============LICENSE_START========================================================================
- * ONAP : ccsdk feature sdnr wt
- * =================================================================================================
- * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
- * =================================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END==========================================================================
- */
-package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl;
-
-import org.onap.ccsdk.features.sdnr.wt.common.configuration.Configuration;
-import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation;
-
-/**
- * Configuration of mountpoint-state-provider, general section<br>
- * - dmaapEnabled : Boolean disable/enable service depending on whether DMaaP is running or not Generates default
- * Configuration properties if none exist or exist partially Generates Publisher properties only for
- * TransportType=HTTPNOAUTH. Other TransportTypes like HTTP, AUTH_KEY and DME2 have additional properties and are not
- * generated by default. For a list of applicable properties for the different TranportType values, please see -
- * https://wiki.onap.org/display/DW/Feature+configuration+requirements
- */
-public class GeneralConfig implements Configuration {
-
-    private static final String SECTION_MARKER = "general";
-
-    private static final String PROPERTY_KEY_ENABLED = "dmaapEnabled"; //"disabled";
-
-    public static final String PROPERTY_KEY_PUBLISHER_TRANSPORTTYPE = "TransportType";
-    private static final String DEFAULT_VALUE_PUBLISHER_TRANSPORTTYPE = "HTTPNOAUTH";
-
-    public static final String PROPERTY_KEY_PUBLISHER_HOST_PORT = "host";
-    private static final String DEFAULT_VALUE_PUBLISHER_HOST_PORT = "onap-dmaap:3904";
-
-    public static final String PROPERTY_KEY_PUBLISHER_TOPIC = "topic";
-    private static final String DEFAULT_VALUE_PUBLISHER_TOPIC = "unauthenticated.SDNR_MOUNTPOINT_STATE_INFO";
-
-    public static final String PROPERTY_KEY_PUBLISHER_CONTENTTYPE = "contenttype";
-    private static final String DEFAULT_VALUE_PUBLISHER_CONTENTTYPE = "application/json";
-
-    public static final String PROPERTY_KEY_PUBLISHER_TIMEOUT = "timeout";
-    private static final String DEFAULT_VALUE_PUBLISHER_TIMEOUT = "20000";
-
-    public static final String PROPERTY_KEY_PUBLISHER_LIMIT = "limit";
-    private static final String DEFAULT_VALUE_PUBLISHER_LIMIT = "10000";
-
-    public static final String PROPERTY_KEY_PUBLISHER_MAXBATCHSIZE = "maxBatchSize";
-    public static final String DEFAULT_VALUE_PUBLISHER_MAXBATCHSIZE = "100";
-
-    public static final String PROPERTY_KEY_PUBLISHER_MAXAGEMS = "maxAgeMs";
-    public static final String DEFAULT_VALUE_PUBLISHER_MAXAGEMS = "250";
-
-    public static final String PROPERTY_KEY_PUBLISHER_MESSAGESENTTHREADOCCURANCE = "MessageSentThreadOccurance";
-    public static final String DEFAULT_VALUE_PUBLISHER_MESSAGESENTTHREADOCCURANCE = "50";
-
-    private final ConfigurationFileRepresentation configuration;
-
-    public GeneralConfig(ConfigurationFileRepresentation configuration) {
-        this.configuration = configuration;
-        this.configuration.addSection(SECTION_MARKER);
-        defaults();
-    }
-
-    public Boolean getEnabled() {
-        Boolean enabled = configuration.getPropertyBoolean(SECTION_MARKER, PROPERTY_KEY_ENABLED);
-        return enabled;
-    }
-
-    public String getHostPort() {
-        return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_HOST_PORT);
-    }
-
-    public String getTransportType() {
-        return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TRANSPORTTYPE);
-    }
-
-    public String getTopic() {
-        return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TOPIC);
-    }
-
-    public String getTimeout() {
-        return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TIMEOUT);
-    }
-
-    public String getLimit() {
-        return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_LIMIT);
-    }
-
-    public String getContenttype() {
-        return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_CONTENTTYPE);
-    }
-
-    public String getMaxBatchSize() {
-        return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MAXBATCHSIZE);
-    }
-
-    public String getMaxAgeMs() {
-        return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MAXAGEMS);
-    }
-
-    public String getMessageSentThreadOccurrence() {
-        return configuration.getProperty(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MESSAGESENTTHREADOCCURANCE);
-    }
-
-    @Override
-    public String getSectionName() {
-        return SECTION_MARKER;
-    }
-
-    @Override
-    public void defaults() {
-        // The default value should be "false" given that SDNR can be run in environments where DMaaP is not used
-        configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_ENABLED, Boolean.FALSE);
-        configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TRANSPORTTYPE,
-                DEFAULT_VALUE_PUBLISHER_TRANSPORTTYPE);
-        configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_HOST_PORT,
-                DEFAULT_VALUE_PUBLISHER_HOST_PORT);
-        configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TOPIC,
-                DEFAULT_VALUE_PUBLISHER_TOPIC);
-        configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_CONTENTTYPE,
-                DEFAULT_VALUE_PUBLISHER_CONTENTTYPE);
-        configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_TIMEOUT,
-                DEFAULT_VALUE_PUBLISHER_TIMEOUT);
-        configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_LIMIT,
-                DEFAULT_VALUE_PUBLISHER_LIMIT);
-        configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MAXBATCHSIZE,
-                DEFAULT_VALUE_PUBLISHER_MAXBATCHSIZE);
-        configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MAXAGEMS,
-                DEFAULT_VALUE_PUBLISHER_MAXAGEMS);
-        configuration.setPropertyIfNotAvailable(SECTION_MARKER, PROPERTY_KEY_PUBLISHER_MESSAGESENTTHREADOCCURANCE,
-                DEFAULT_VALUE_PUBLISHER_MESSAGESENTTHREADOCCURANCE);
-    }
-
-}
diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeConnectListenerImpl.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeConnectListenerImpl.java
index f9b7b1e..5cdf5ab 100644
--- a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeConnectListenerImpl.java
+++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeConnectListenerImpl.java
@@ -34,14 +34,14 @@
 public class MountpointNodeConnectListenerImpl implements NetconfNodeConnectListener, AutoCloseable {
     private static final Logger LOG = LoggerFactory.getLogger(MountpointNodeConnectListenerImpl.class);
     private NetconfNodeStateService netconfNodeStateService;
-    private MountpointStatePublisherMain mountpointStatePublisher;
+    private MountpointStatePublisher mountpointStatePublisher;
     private ListenerRegistration<MountpointNodeConnectListenerImpl> registeredNodeConnectListener;
 
     public MountpointNodeConnectListenerImpl(NetconfNodeStateService netconfNodeStateService) {
         this.netconfNodeStateService = netconfNodeStateService;
     }
 
-    public void start(MountpointStatePublisherMain mountpointStatePublisher) {
+    public void start(MountpointStatePublisher mountpointStatePublisher) {
         this.mountpointStatePublisher = mountpointStatePublisher;
         registeredNodeConnectListener = netconfNodeStateService.registerNetconfNodeConnectListener(this);
     }
diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeStateListenerImpl.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeStateListenerImpl.java
index bbfd879..d8b5a85 100644
--- a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeStateListenerImpl.java
+++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointNodeStateListenerImpl.java
@@ -31,14 +31,14 @@
 public class MountpointNodeStateListenerImpl implements NetconfNodeStateListener, AutoCloseable {
     private static final Logger LOG = LoggerFactory.getLogger(MountpointNodeStateListenerImpl.class);
     private NetconfNodeStateService netconfNodeStateService;
-    private MountpointStatePublisherMain mountpointStatePublisher;
+    private MountpointStatePublisher mountpointStatePublisher;
     private ListenerRegistration<MountpointNodeStateListenerImpl> registeredNodeStateListener;
 
     public MountpointNodeStateListenerImpl(NetconfNodeStateService netconfNodeStateService) {
         this.netconfNodeStateService = netconfNodeStateService;
     }
 
-    public void start(MountpointStatePublisherMain mountpointStatePublisher) {
+    public void start(MountpointStatePublisher mountpointStatePublisher) {
         this.mountpointStatePublisher = mountpointStatePublisher;
         registeredNodeStateListener = netconfNodeStateService.registerNetconfNodeStateListener(this);
     }
diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStateProviderImpl.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStateProviderImpl.java
index e310323..6838bc3 100644
--- a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStateProviderImpl.java
+++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStateProviderImpl.java
@@ -24,27 +24,22 @@
 
 package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl;
 
-import java.io.IOException;
-
-import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation;
-import org.onap.ccsdk.features.sdnr.wt.common.configuration.filechange.IConfigChangedListener;
+import org.onap.ccsdk.features.sdnr.wt.devicemanager.service.NetconfNetworkElementService;
 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfNodeStateService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class MountpointStateProviderImpl implements AutoCloseable, IConfigChangedListener {
+public class MountpointStateProviderImpl implements AutoCloseable {
 
     private static final Logger LOG = LoggerFactory.getLogger(MountpointStateProviderImpl.class);
     private static final String APPLICATION_NAME = "mountpoint-state-provider";
-    private static final String CONFIGURATIONFILE = "etc/mountpoint-state-provider.properties";
 
     private NetconfNodeStateService netconfNodeStateService;
-    private GeneralConfig generalConfig;
-    private boolean dmaapEnabled = false;
+    private NetconfNetworkElementService netconfNetworkElementService;
 
     private MountpointNodeConnectListenerImpl nodeConnectListener;
     private MountpointNodeStateListenerImpl nodeStateListener;
-    private MountpointStatePublisherMain mountpointStatePublisher;
+    private MountpointStatePublisher mountpointStatePublisher;
 
     public MountpointStateProviderImpl() {
         LOG.info("Creating provider class for {}", APPLICATION_NAME);
@@ -56,73 +51,41 @@
         this.netconfNodeStateService = netconfNodeStateService;
     }
 
+    public void setNetconfNetworkElementService(NetconfNetworkElementService netconfNetworkElementService) {
+        this.netconfNetworkElementService = netconfNetworkElementService;
+    }
+
     public void init() {
         LOG.info("Init call for {}", APPLICATION_NAME);
-        ConfigurationFileRepresentation configFileRepresentation =
-                new ConfigurationFileRepresentation(CONFIGURATIONFILE);
-        configFileRepresentation.registerConfigChangedListener(this);
 
         nodeConnectListener = new MountpointNodeConnectListenerImpl(netconfNodeStateService);
         nodeStateListener = new MountpointNodeStateListenerImpl(netconfNodeStateService);
 
-        generalConfig = new GeneralConfig(configFileRepresentation);
-        if (generalConfig.getEnabled()) { //dmaapEnabled
-            startPublishing();
-        }
+        startPublishing();
     }
 
     /**
      * Reflect status for Unit Tests
-     * 
+     *
      * @return Text with status
      */
     public String isInitializationOk() {
         return "No implemented";
     }
 
-    @Override
-    public void onConfigChanged() {
-        LOG.info("Service configuration state changed. Enabled: {}", generalConfig.getEnabled());
-        boolean dmaapEnabledNewVal = generalConfig.getEnabled();
-
-        // DMaap disabled earlier (or during bundle startup) but enabled later, start publisher(s)
-        if (!dmaapEnabled && dmaapEnabledNewVal) {
-            LOG.info("DMaaP is enabled, starting Publisher");
-            startPublishing();
-        } else if (dmaapEnabled && !dmaapEnabledNewVal) {
-            // DMaap enabled earlier (or during bundle startup) but disabled later, stop publisher(s)
-            LOG.info("DMaaP is disabled, stop publisher");
-            stopPublishing();
-        }
-        dmaapEnabled = dmaapEnabledNewVal;
-    }
-
     public void startPublishing() {
-        mountpointStatePublisher = new MountpointStatePublisherMain(generalConfig);
-        mountpointStatePublisher.start();
+        mountpointStatePublisher = new MountpointStatePublisher(netconfNetworkElementService.getServiceProvider().getVESCollectorService());
+        Thread t = new Thread(mountpointStatePublisher);
+        t.start();
 
         nodeConnectListener.start(mountpointStatePublisher);
         nodeStateListener.start(mountpointStatePublisher);
     }
 
-    public void stopPublishing() {
-        try {
-            nodeConnectListener.stop();
-            nodeStateListener.stop();
-            mountpointStatePublisher.stop();
-        } catch (Exception e) {
-            LOG.error("Exception while stopping publisher ", e);
-        }
-    }
-
     @Override
     public void close() throws Exception {
         LOG.info("{} closing ...", this.getClass().getName());
-        try {
-            mountpointStatePublisher.stop();
-        } catch (IOException | InterruptedException e) {
-            LOG.error("Exception while stopping publisher ", e);
-        }
+        mountpointStatePublisher.stop();
         close(nodeConnectListener, nodeStateListener);
         LOG.info("{} closing done", APPLICATION_NAME);
     }
diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStatePublisher.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStatePublisher.java
new file mode 100644
index 0000000..9df37e3
--- /dev/null
+++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStatePublisher.java
@@ -0,0 +1,91 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP : ccsdk features
+ * ================================================================================
+ * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property.
+ * All rights reserved.
+ * ================================================================================
+ * Update Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=======================================================
+ *
+ */
+package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl;
+
+import java.util.LinkedList;
+import java.util.List;
+import org.eclipse.jdt.annotation.NonNull;
+import org.json.JSONObject;
+import org.onap.ccsdk.features.sdnr.wt.devicemanager.service.VESCollectorCfgService;
+import org.onap.ccsdk.features.sdnr.wt.devicemanager.service.VESCollectorService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MountpointStatePublisher implements Runnable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MountpointStatePublisher.class);
+    private List<JSONObject> stateObjects = new LinkedList<JSONObject>();
+    private boolean publish = true;
+    private int publishPause = 5000; // Default pause between fetch - 5 seconds
+    private VESCollectorService vesCollectorService;
+
+    public MountpointStatePublisher(@NonNull VESCollectorService vesCollectorService) {
+        this.vesCollectorService = vesCollectorService;
+    }
+
+    public void addToPublish(JSONObject publishObj) {
+        getStateObjects().add(publishObj);
+    }
+
+    public synchronized List<JSONObject> getStateObjects() {
+        return stateObjects;
+    }
+
+    public void stop() {
+        publish = false;
+    }
+
+    private void pauseThread() throws InterruptedException {
+        if (publishPause > 0) {
+            LOG.debug("No data yet to publish.  Pausing {} ms before retry ", publishPause);
+            Thread.sleep(publishPause);
+        } else {
+            LOG.debug("No data yet to publish. No publish pause specified - retrying immediately");
+        }
+    }
+
+
+    public String createVESMessage(JSONObject msg, VESCollectorCfgService vesCfg) {
+        MountpointStateVESMessageFormatter vesFormatter = new MountpointStateVESMessageFormatter(vesCfg);
+        String vesMsg = vesFormatter.createVESMessage(msg);
+        return vesMsg;
+    }
+
+    @Override
+    public void run() {
+        while (publish) {
+            try {
+                if (getStateObjects().size() > 0) {
+                    JSONObject obj = ((LinkedList<JSONObject>) getStateObjects()).removeFirst();
+                    String vesMsg = createVESMessage(obj, vesCollectorService.getConfig());
+                    this.vesCollectorService.publishVESMessage(vesMsg);
+                } else {
+                    pauseThread();
+                }
+            } catch (Exception ex) {
+                LOG.error("Exception while publishing message, ignoring and continuing ... ", ex);
+            }
+        }
+    }
+}
diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStatePublisherMain.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStatePublisherMain.java
deleted file mode 100644
index 8d6e2d4..0000000
--- a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStatePublisherMain.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * ONAP : ccsdk features
- * ================================================================================
- * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property.
- * All rights reserved.
- * ================================================================================
- * Update Copyright (C) 2020 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=======================================================
- *
- */
-package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-import org.json.JSONObject;
-import org.onap.ccsdk.features.sdnr.wt.common.configuration.Configuration;
-import org.onap.dmaap.mr.client.MRBatchingPublisher;
-import org.onap.dmaap.mr.client.MRClientFactory;
-import org.onap.dmaap.mr.client.response.MRPublisherResponse;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MountpointStatePublisherMain {
-
-    private static final Logger LOG = LoggerFactory.getLogger(MountpointStatePublisherMain.class);
-    private Thread thread = null;
-    private MRBatchingPublisher pub = null;
-    private List<JSONObject> stateObjects = new LinkedList<JSONObject>();
-    private Properties publisherProperties = new Properties();
-    private boolean closePublisher = false;
-    private int publishPause = 5000; // Default pause between fetch - 5 seconds
-
-    public MountpointStatePublisherMain(Configuration config) {
-        initialize(config);
-    }
-
-    public void initialize(Configuration config) {
-        LOG.info("In initializePublisher method of MountpointStatePublisher");
-        GeneralConfig generalCfg = (GeneralConfig) config;
-
-        publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_TRANSPORTTYPE, generalCfg.getTransportType());
-        publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_HOST_PORT, generalCfg.getHostPort());
-        publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_CONTENTTYPE, generalCfg.getContenttype());
-        publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_TOPIC, generalCfg.getTopic());
-        publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_MAXBATCHSIZE, generalCfg.getMaxBatchSize());
-        publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_MAXAGEMS, generalCfg.getMaxAgeMs());
-        publisherProperties.put(GeneralConfig.PROPERTY_KEY_PUBLISHER_MESSAGESENTTHREADOCCURANCE,
-                generalCfg.getMessageSentThreadOccurrence());
-
-        createPublisher(publisherProperties);
-    }
-
-    public MRBatchingPublisher createPublisher(Properties publisherProperties) {
-
-        try {
-            pub = MRClientFactory.createBatchingPublisher(publisherProperties, false);
-            return pub;
-        } catch (IOException e) {
-            LOG.info("Exception while creating a publisher", e);
-        }
-        return null;
-    }
-
-    public void publishMessage(MRBatchingPublisher pub, String msg) {
-        LOG.info("Publishing message {} - ", msg);
-        try {
-            pub.send(msg);
-        } catch (IOException e) {
-            LOG.info("Exception while publishing a mesage ", e);
-        }
-    }
-
-    public MRBatchingPublisher getPublisher() {
-        return pub;
-    }
-
-    public void start() {
-        thread = new Thread(new MountpointStatePublisher());
-        thread.start();
-    }
-
-    public void stop() throws IOException, InterruptedException {
-        closePublisher = true;
-        getPublisher().close(100, TimeUnit.MILLISECONDS); // Send any remaining messages and close)
-    }
-
-    private void pauseThread() throws InterruptedException {
-        if (publishPause > 0) {
-            LOG.debug("No data yet to publish.  Pausing {} ms before retry ", publishPause);
-            Thread.sleep(publishPause);
-        } else {
-            LOG.debug("No data yet to publish. No fetch pause specified - retrying immediately");
-        }
-    }
-
-    public void addToPublish(JSONObject publishObj) {
-        getStateObjects().add(publishObj);
-    }
-
-    public List<JSONObject> getStateObjects() {
-        return stateObjects;
-    }
-
-    public class MountpointStatePublisher implements Runnable {
-
-        @Override
-        public void run() {
-            while (!closePublisher) {
-                try {
-                    if (getStateObjects().size() > 0) {
-                        JSONObject obj = ((LinkedList<JSONObject>) getStateObjects()).removeFirst();
-                        publishMessage(getPublisher(), obj.toString());
-                    } else {
-                        pauseThread();
-                    }
-                } catch (Exception ex) {
-                    LOG.error("Exception while publishing message, ignoring and continuing ... ", ex);
-                }
-
-                MRPublisherResponse res = pub.sendBatchWithResponse(); // As per dmaap-client code understanding, this need not be called but for some reason the messages are not pushed unless this is called
-                LOG.debug("Response message = {} ", res.toString());
-            }
-
-        }
-
-    }
-
-}
diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStateVESMessageFormatter.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStateVESMessageFormatter.java
new file mode 100644
index 0000000..918438e
--- /dev/null
+++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/MountpointStateVESMessageFormatter.java
@@ -0,0 +1,98 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP : ccsdk features
+ * ================================================================================
+ * Copyright (C) 2020 highstreet technologies GmbH Intellectual Property.
+ * All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ */
+package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.time.Instant;
+import org.json.JSONObject;
+import org.onap.ccsdk.features.sdnr.wt.devicemanager.service.VESCollectorCfgService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MountpointStateVESMessageFormatter {
+    private static final Logger LOG = LoggerFactory.getLogger(MountpointStateVESMessageFormatter.class);
+    private static final String VES_DOMAIN = "notification";
+    private static final String VES_PRIORITY = "Normal";
+    private static final String VES_CHANGETYPE = "ConnectionState";
+
+    private VESCollectorCfgService vesCfg;
+    static long sequenceNo = 0;
+
+    public MountpointStateVESMessageFormatter(VESCollectorCfgService vesCfg) {
+        this.vesCfg = vesCfg;
+    }
+
+    public String createVESMessage(JSONObject obj) {
+        LOG.debug("JSON Object to format to VES is - {}", obj.toString());
+        String vesMsg = "{}";
+        sequenceNo++;
+
+        VESCommonEventHeaderPOJO vesCommonEventHeader = createVESCommonEventHeader(obj);
+        VESNotificationFieldsPOJO vesNotificationFields = createVESNotificationFields(obj);
+
+        VESEvent vesEvent = new VESEvent();
+        vesEvent.addEventObjects(vesCommonEventHeader);
+        vesEvent.addEventObjects(vesNotificationFields);
+
+        try {
+            ObjectMapper objMapper = new ObjectMapper();
+            vesMsg = objMapper.writeValueAsString(vesEvent);
+            LOG.debug("VES message to be published - {}", vesMsg);
+        } catch (JsonProcessingException e) {
+            e.printStackTrace();
+        }
+
+        return vesMsg;
+
+    }
+
+    private VESNotificationFieldsPOJO createVESNotificationFields(JSONObject obj) {
+        VESNotificationFieldsPOJO vesNotificationFields = new VESNotificationFieldsPOJO();
+
+        vesNotificationFields.setChangeIdentifier(obj.getString("NodeId"));
+        vesNotificationFields.setChangeType(VES_CHANGETYPE);
+        vesNotificationFields.setNewState(obj.getString("NetConfNodeState"));
+
+        return vesNotificationFields;
+    }
+
+    private VESCommonEventHeaderPOJO createVESCommonEventHeader(JSONObject obj) {
+        VESCommonEventHeaderPOJO vesCommonEventHeader = new VESCommonEventHeaderPOJO();
+
+        vesCommonEventHeader.setDomain(VES_DOMAIN);
+        vesCommonEventHeader
+                .setEventId(obj.getString("NodeId") + "_" + obj.getString("NetConfNodeState") + "_" + sequenceNo);
+        vesCommonEventHeader
+                .setEventName(obj.getString("NodeId") + "_" + obj.getString("NetConfNodeState") + "_" + sequenceNo);
+        vesCommonEventHeader.setSourceName(obj.getString("NodeId"));
+        vesCommonEventHeader.setPriority(VES_PRIORITY);
+        vesCommonEventHeader.setReportingEntityName(this.vesCfg.getReportingEntityName());
+        vesCommonEventHeader.setSequence(sequenceNo);
+
+        Instant time = (Instant) obj.get("TimeStamp");
+        vesCommonEventHeader.setLastEpochMicrosec(time.toEpochMilli() * 100);
+        vesCommonEventHeader.setStartEpochMicrosec(time.toEpochMilli() * 100);
+
+        return vesCommonEventHeader;
+    }
+}
diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/VESCommonEventHeaderPOJO.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/VESCommonEventHeaderPOJO.java
new file mode 100644
index 0000000..7b15ae1
--- /dev/null
+++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/VESCommonEventHeaderPOJO.java
@@ -0,0 +1,190 @@
+/*
+ * ============LICENSE_START========================================================================
+ * ONAP : ccsdk feature sdnr wt
+ * =================================================================================================
+ * Copyright (C) 2020 highstreet technologies GmbH Intellectual Property. All rights reserved.
+ * =================================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END==========================================================================
+ */
+package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+
+@JsonPropertyOrder({"domain", "eventId", "eventName", "eventType", "lastEpochMicrosec", "nfcNamingCode", "nfNamingCode", "nfVendorName", "priority", "reportingEntityId",
+    "reportingEntityName", "sequence", "sourceId", "sourceName", "startEpochMicrosec", "timeZoneOffset", "version", "vesEventListenerVersion"})
+public class VESCommonEventHeaderPOJO {
+
+    private String domain = "";
+    private String eventId = "";
+    private String eventName = "";
+    private String eventType = "";
+    private long sequence = 0L;
+    private String priority = "";
+    @JsonIgnore
+    private String reportingEntityId = "";
+    private String reportingEntityName = "";
+    private String sourceId = "";
+    private String sourceName = "";
+    private long startEpochMicrosec = 0L;
+    private long lastEpochMicrosec = 0L;
+    private String nfcNamingCode = "";
+    private String nfNamingCode = "";
+    private String nfVendorName = "";
+    private String timeZoneOffset = "+00:00";
+    private String version = "4.1";
+    private String vesEventListenerVersion = "7.1.1";
+
+    public String getDomain() {
+        return domain;
+    }
+
+    public void setDomain(String domain) {
+        this.domain = domain;
+    }
+
+    public String getEventId() {
+        return eventId;
+    }
+
+    public void setEventId(String eventId) {
+        this.eventId = eventId;
+    }
+
+    public String getEventName() {
+        return eventName;
+    }
+
+    public void setEventName(String eventName) {
+        this.eventName = eventName;
+    }
+
+    public String getEventType() {
+        return eventType;
+    }
+
+    public void setEventType(String eventType) {
+        this.eventType = eventType;
+    }
+
+    public Long getSequence() {
+        return sequence;
+    }
+
+    public void setSequence(long sequenceNo) {
+        this.sequence = sequenceNo;
+    }
+
+    public String getPriority() {
+        return priority;
+    }
+
+    public void setPriority(String priority) {
+        this.priority = priority;
+    }
+
+    public String getReportingEntityId() {
+        return reportingEntityId;
+    }
+
+    public void setReportingEntityId(String reportingEntityId) {
+        this.reportingEntityId = reportingEntityId;
+    }
+
+    public String getReportingEntityName() {
+        return reportingEntityName;
+    }
+
+    public void setReportingEntityName(String reportingEntityName) {
+        this.reportingEntityName = reportingEntityName;
+    }
+
+    public String getSourceId() {
+        return sourceId;
+    }
+
+    public void setSourceId(String sourceId) {
+        this.sourceId = sourceId;
+    }
+
+    public String getSourceName() {
+        return sourceName;
+    }
+
+    public void setSourceName(String sourceName) {
+        this.sourceName = sourceName;
+    }
+
+    public long getStartEpochMicrosec() {
+        return startEpochMicrosec;
+    }
+
+    public void setStartEpochMicrosec(long startEpochMicrosec) {
+        this.startEpochMicrosec = startEpochMicrosec;
+    }
+
+    public long getLastEpochMicrosec() {
+        return lastEpochMicrosec;
+    }
+
+    public void setLastEpochMicrosec(long lastEpochMicrosec) {
+        this.lastEpochMicrosec = lastEpochMicrosec;
+    }
+
+    public String getNfcNamingCode() {
+        return nfcNamingCode;
+    }
+
+    public void setNfcNamingCode(String nfcNamingCode) {
+        this.nfcNamingCode = nfcNamingCode;
+    }
+
+    public String getNfNamingCode() {
+        return nfNamingCode;
+    }
+
+    public void setNfNamingCode(String nfNamingCode) {
+        this.nfNamingCode = nfNamingCode;
+    }
+
+    public String getNfVendorName() {
+        return nfVendorName;
+    }
+
+    public void setNfVendorName(String nfVendorName) {
+        this.nfVendorName = nfVendorName;
+    }
+
+    public String getTimeZoneOffset() {
+        return timeZoneOffset;
+    }
+
+    public void setTimeZoneOffset(String timeZoneOffset) {
+        this.timeZoneOffset = timeZoneOffset;
+    }
+
+    public String getVersion() {
+        return version;
+    }
+
+    public void setVersion(String version) {
+        this.version = version;
+    }
+
+    public String getVesEventListenerVersion() {
+        return vesEventListenerVersion;
+    }
+
+    public void setVesEventListenerVersion(String vesEventListenerVersion) {
+        this.vesEventListenerVersion = vesEventListenerVersion;
+    }
+}
diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/VESEvent.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/VESEvent.java
new file mode 100644
index 0000000..13017d4
--- /dev/null
+++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/VESEvent.java
@@ -0,0 +1,37 @@
+/*
+ * ============LICENSE_START========================================================================
+ * ONAP : ccsdk feature sdnr wt
+ * =================================================================================================
+ * Copyright (C) 2020 highstreet technologies GmbH Intellectual Property. All rights reserved.
+ * =================================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END==========================================================================
+ */
+package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class VESEvent {
+    public Map<String, Object> event = new HashMap<String, Object>();
+
+    public void addEventObjects(Object eventObject) {
+        if (eventObject instanceof VESCommonEventHeaderPOJO)
+            event.put("commonEventHeader", eventObject);
+        else if (eventObject instanceof VESNotificationFieldsPOJO)
+            event.put("notificationFields", eventObject);
+
+    }
+
+    public Map<String, Object> getEvent() {
+        return event;
+    }
+}
diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/VESNotificationFieldsPOJO.java b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/VESNotificationFieldsPOJO.java
new file mode 100644
index 0000000..1963cd6
--- /dev/null
+++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/impl/VESNotificationFieldsPOJO.java
@@ -0,0 +1,108 @@
+/*
+ * ============LICENSE_START========================================================================
+ * ONAP : ccsdk feature sdnr wt
+ * =================================================================================================
+ * Copyright (C) 2020 highstreet technologies GmbH Intellectual Property. All rights reserved.
+ * =================================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
+ * in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ * ============LICENSE_END==========================================================================
+ */
+package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+@JsonPropertyOrder({"arrayOfNamedHashMap", "changeContact", "changeIdentifier", "changeType", "newState", "oldState", "notificationFieldsVersion"})
+public class VESNotificationFieldsPOJO {
+
+    private ArrayList<HashMap<String, Object>> arrayOfNamedHashMap = new ArrayList<HashMap<String, Object>>();
+    @JsonIgnore
+    private HashMap<String, Object> namedHashMap = new HashMap<String, Object>();
+    @JsonIgnore
+    private HashMap<String, String> hashMap = new HashMap<String, String>();
+    @JsonIgnore
+    private String changeContact = "";
+    private String changeIdentifier = "";
+    private String changeType = "";
+    //@JsonIgnore
+    private String newState = "";
+    @JsonIgnore
+    private String oldState = "";
+    @JsonIgnore
+    private String stateInterface = "";
+    private String notificationFieldsVersion = "2.0";
+
+    public ArrayList<HashMap<String, Object>> getArrayOfNamedHashMap() {
+        return arrayOfNamedHashMap;
+    }
+
+    public void setArrayOfNamedHashMap(ArrayList<HashMap<String, Object>> arrayOfNamedHashMap) {
+        this.arrayOfNamedHashMap = arrayOfNamedHashMap;
+    }
+
+    public String getChangeContact() {
+        return changeContact;
+    }
+
+    public void setChangeContact(String changeContact) {
+        this.changeContact = changeContact;
+    }
+
+    public String getChangeIdentifier() {
+        return changeIdentifier;
+    }
+
+    public void setChangeIdentifier(String changeIdentifier) {
+        this.changeIdentifier = changeIdentifier;
+    }
+
+    public String getChangeType() {
+        return changeType;
+    }
+
+    public void setChangeType(String changeType) {
+        this.changeType = changeType;
+    }
+
+    public String getNewState() {
+        return newState;
+    }
+
+    public void setNewState(String newState) {
+        this.newState = newState;
+    }
+
+    public String getOldState() {
+        return oldState;
+    }
+
+    public void setOldState(String oldState) {
+        this.oldState = oldState;
+    }
+
+    public String getStateInterface() {
+        return stateInterface;
+    }
+
+    public void setStateInterface(String stateInterface) {
+        this.stateInterface = stateInterface;
+    }
+
+    public String getNotificationFieldsVersion() {
+        return notificationFieldsVersion;
+    }
+
+    public void setNotificationFieldsVersion(String notificationFieldsVersion) {
+        this.notificationFieldsVersion = notificationFieldsVersion;
+    }
+}
diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/main/resources/org/opendaylight/blueprint/impl-blueprint.xml b/sdnr/wt/mountpoint-state-provider/provider/src/main/resources/org/opendaylight/blueprint/impl-blueprint.xml
index 3ec9efb..5512359 100644
--- a/sdnr/wt/mountpoint-state-provider/provider/src/main/resources/org/opendaylight/blueprint/impl-blueprint.xml
+++ b/sdnr/wt/mountpoint-state-provider/provider/src/main/resources/org/opendaylight/blueprint/impl-blueprint.xml
@@ -29,9 +29,14 @@
     <reference id="netconfNodeStateService"
                availability="mandatory" activation="eager"
                interface="org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfNodeStateService"/>
+               
+    <reference id="netconfNetworkElementService"
+               availability="mandatory" activation="eager"
+               interface="org.onap.ccsdk.features.sdnr.wt.devicemanager.service.NetconfNetworkElementService"/>
 
     <bean id="provider" class="org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl.MountpointStateProviderImpl" init-method="init" destroy-method="close">
         <property name="netconfNodeStateService" ref="netconfNodeStateService"/>
+        <property name="netconfNetworkElementService" ref="netconfNetworkElementService"/>
     </bean>
 
 </blueprint>
diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/test/GeneralConfigTest.java b/sdnr/wt/mountpoint-state-provider/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/test/GeneralConfigTest.java
deleted file mode 100644
index c921e7b..0000000
--- a/sdnr/wt/mountpoint-state-provider/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/test/GeneralConfigTest.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * ============LICENSE_START========================================================================
- * ONAP : ccsdk feature sdnr wt
- * =================================================================================================
- * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
- * =================================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
- * in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- * ============LICENSE_END==========================================================================
- */
-
-package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.test;
-
-import static org.junit.Assert.assertEquals;
-import com.google.common.io.Files;
-import java.io.File;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import org.junit.After;
-import org.junit.Test;
-import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation;
-import org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl.GeneralConfig;
-
-public class GeneralConfigTest {
-
-    // @formatter:off
-    private static final String TESTCONFIG_CONTENT =
-            "[general]\n"
-            + "dmaapEnabled=false\n"
-            + "TransportType=HTTPNOAUTH\n"
-            + "host=onap-dmap:3904\n"
-            + "topic=unauthenticated.SDNR_MOUNTPOINT_STATE_INFO\n"
-            + "contenttype=application/json\n"
-            + "timeout=20000\n"
-            + "limit=10000\n"
-            + "maxBatchSize=100\n"
-            + "maxAgeMs=250\n"
-            + "MessageSentThreadOccurance=50\n";
-    // @formatter:on
-    private final String fileName = "test.properties";
-    private ConfigurationFileRepresentation globalCfg;
-
-    @Test
-    public void test() throws IOException {
-
-            Files.asCharSink(new File(fileName), StandardCharsets.UTF_8).write(TESTCONFIG_CONTENT);
-            globalCfg = new ConfigurationFileRepresentation(fileName);
-            GeneralConfig cfg = new GeneralConfig(globalCfg);
-            assertEquals("onap-dmap:3904", cfg.getHostPort());
-            assertEquals(false, cfg.getEnabled());
-            assertEquals("unauthenticated.SDNR_MOUNTPOINT_STATE_INFO", cfg.getTopic());
-            assertEquals("application/json", cfg.getContenttype());
-            assertEquals("20000", cfg.getTimeout());
-            assertEquals("10000", cfg.getLimit());
-            assertEquals("100", cfg.getMaxBatchSize());
-            assertEquals("250", cfg.getMaxAgeMs());
-            assertEquals("50", cfg.getMessageSentThreadOccurrence());
-            assertEquals("HTTPNOAUTH", cfg.getTransportType());
-            assertEquals("general", cfg.getSectionName());
-
-
-    }
-
-    @After
-    public void cleanUp() {
-        File file = new File(fileName);
-        if (file.exists()) {
-            System.out.println("File exists, Deleting it");
-            file.delete();
-        }
-
-    }
-}
diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/test/TestMountpointNodeConnectListenerImpl.java b/sdnr/wt/mountpoint-state-provider/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/test/TestMountpointNodeConnectListenerImpl.java
index 2466683..237e949 100644
--- a/sdnr/wt/mountpoint-state-provider/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/test/TestMountpointNodeConnectListenerImpl.java
+++ b/sdnr/wt/mountpoint-state-provider/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/test/TestMountpointNodeConnectListenerImpl.java
@@ -18,19 +18,19 @@
 
 package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.test;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
-import com.google.common.io.Files;
-import java.io.File;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 import java.io.IOException;
-import java.nio.charset.StandardCharsets;
 import java.util.Optional;
-import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation;
-import org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl.GeneralConfig;
+import org.onap.ccsdk.features.sdnr.wt.devicemanager.service.DeviceManagerServiceProvider;
+import org.onap.ccsdk.features.sdnr.wt.devicemanager.service.NetconfNetworkElementService;
+import org.onap.ccsdk.features.sdnr.wt.devicemanager.service.VESCollectorService;
 import org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl.MountpointNodeConnectListenerImpl;
-import org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl.MountpointStatePublisherMain;
+import org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl.MountpointStatePublisher;
 import org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.test.mock.NetconfAccessorMock;
 import org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.test.mock.NetconfNodeMock;
 import org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.test.mock.NetconfNodeStateServiceMock;
@@ -40,39 +40,32 @@
 
 public class TestMountpointNodeConnectListenerImpl {
 
-    // @formatter:off
-    private static final String TESTCONFIG_CONTENT =
-            "[general]\n"
-            + "dmaapEnabled=false\n"
-            + "TransportType=HTTPNOAUTH\n"
-            + "host=onap-dmap:3904\n"
-            + "topic=unauthenticated.SDNR_MOUNTPOINT_STATE_INFO\n"
-            + "contenttype=application/json\n"
-            + "timeout=20000\n"
-            + "limit=10000\n"
-            + "maxBatchSize=100\n"
-            + "maxAgeMs=250\n"
-            + "MessageSentThreadOccurance=50\n";
-    // @formatter:on
-    private final String fileName = "test1.properties";
-    private ConfigurationFileRepresentation globalCfg;
-
-
-    NetconfNodeStateServiceMock netconfNodeStateServiceMock = new NetconfNodeStateServiceMock();
-    MountpointNodeConnectListenerImpl nodeConnectListener =
-            new MountpointNodeConnectListenerImpl(netconfNodeStateServiceMock);
-    MountpointStatePublisherMain mountpointStatePublisher;
-    NetconfNodeMock netconfNodeMock = new NetconfNodeMock();
-    NetconfNode netconfNode = netconfNodeMock.getNetconfNode();
-    NodeId nNodeId = new NodeId("nSky");
-    NetconfAccessor accessor = new NetconfAccessorMock(nNodeId, netconfNode);
+    DeviceManagerServiceProvider serviceProvider;
+    MountpointStatePublisher mountpointStatePublisher;
+    NetconfNodeStateServiceMock netconfNodeStateServiceMock;
+    MountpointNodeConnectListenerImpl nodeConnectListener;
+    NetconfNodeMock netconfNodeMock;
+    NetconfNode netconfNode;
+    NodeId nNodeId;
+    NetconfAccessor accessor;
+    VESCollectorService vesCollectorService;
 
     @Before
     public void initialize() throws IOException {
-        Files.asCharSink(new File(fileName), StandardCharsets.UTF_8).write(TESTCONFIG_CONTENT);
-        globalCfg = new ConfigurationFileRepresentation(fileName);
-        GeneralConfig cfg = new GeneralConfig(globalCfg);
-        mountpointStatePublisher = new MountpointStatePublisherMain(cfg);
+        serviceProvider = mock(DeviceManagerServiceProvider.class);
+        netconfNodeStateServiceMock = new NetconfNodeStateServiceMock();
+        netconfNodeMock = new NetconfNodeMock();
+        netconfNode = netconfNodeMock.getNetconfNode();
+        vesCollectorService = mock(VESCollectorService.class);
+        NetconfNetworkElementService netconfNetworkElementService = mock(NetconfNetworkElementService.class);
+        nNodeId = new NodeId("nSky");
+        accessor = new NetconfAccessorMock(nNodeId, netconfNode);
+
+        mountpointStatePublisher = new MountpointStatePublisher(vesCollectorService);
+        when(netconfNetworkElementService.getServiceProvider()).thenReturn(serviceProvider);
+        when(serviceProvider.getVESCollectorService()).thenReturn(vesCollectorService);
+
+        nodeConnectListener = new MountpointNodeConnectListenerImpl(netconfNodeStateServiceMock);
         nodeConnectListener.start(mountpointStatePublisher);
     }
 
@@ -90,18 +83,8 @@
 
     @Test
     public void testClose() throws Exception {
-        //assertEquals(MountpointStatePublisher.stateObjects.size(), 0);
+        assertEquals(mountpointStatePublisher.getStateObjects().size(), 0);
         nodeConnectListener.close();
     }
 
-    @After
-    public void after() {
-        File file = new File(fileName);
-        if (file.exists()) {
-            System.out.println("File exists, Deleting it");
-            file.delete();
-        }
-
-    }
-
 }
diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/test/TestMountpointNodeStateListenerImpl.java b/sdnr/wt/mountpoint-state-provider/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/test/TestMountpointNodeStateListenerImpl.java
index 8d95a4c..c6a9d11 100644
--- a/sdnr/wt/mountpoint-state-provider/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/test/TestMountpointNodeStateListenerImpl.java
+++ b/sdnr/wt/mountpoint-state-provider/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/test/TestMountpointNodeStateListenerImpl.java
@@ -21,17 +21,15 @@
 
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
-import com.google.common.io.Files;
-import java.io.File;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import org.junit.After;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 import org.junit.Before;
 import org.junit.Test;
-import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation;
-import org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl.GeneralConfig;
+import org.onap.ccsdk.features.sdnr.wt.devicemanager.service.DeviceManagerServiceProvider;
+import org.onap.ccsdk.features.sdnr.wt.devicemanager.service.NetconfNetworkElementService;
+import org.onap.ccsdk.features.sdnr.wt.devicemanager.service.VESCollectorService;
 import org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl.MountpointNodeStateListenerImpl;
-import org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl.MountpointStatePublisherMain;
+import org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl.MountpointStatePublisher;
 import org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.test.mock.NetconfNodeMock;
 import org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.test.mock.NetconfNodeStateServiceMock;
 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev150114.NetconfNode;
@@ -39,37 +37,23 @@
 
 public class TestMountpointNodeStateListenerImpl {
 
-    // @formatter:off
-    private static final String TESTCONFIG_CONTENT =
-            "[general]\n"
-            + "dmaapEnabled=false\n"
-            + "TransportType=HTTPNOAUTH\n"
-            + "host=onap-dmap:3904\n"
-            + "topic=unauthenticated.SDNR_MOUNTPOINT_STATE_INFO\n"
-            + "contenttype=application/json\n"
-            + "timeout=20000\n"
-            + "limit=10000\n"
-            + "maxBatchSize=100\n"
-            + "maxAgeMs=250\n"
-            + "MessageSentThreadOccurance=50\n";
-    // @formatter:on
-    private final String fileName = "test2.properties";
-    private ConfigurationFileRepresentation globalCfg;
-
     NetconfNodeStateServiceMock netconfNodeStateServiceMock = new NetconfNodeStateServiceMock();
     MountpointNodeStateListenerImpl nodeStateListener =
             new MountpointNodeStateListenerImpl(netconfNodeStateServiceMock);
-    MountpointStatePublisherMain mountpointStatePublisher;
+    MountpointStatePublisher mountpointStatePublisher;
     NetconfNodeMock netconfNodeMock = new NetconfNodeMock();
     NetconfNode netconfNode = netconfNodeMock.getNetconfNode();
     NodeId nNodeId = new NodeId("nSky");
+    VESCollectorService vesCollectorService;
 
     @Before
-    public void initialize() throws IOException {
-        Files.asCharSink(new File(fileName), StandardCharsets.UTF_8).write(TESTCONFIG_CONTENT);
-        globalCfg = new ConfigurationFileRepresentation(fileName);
-        GeneralConfig cfg = new GeneralConfig(globalCfg);
-        mountpointStatePublisher = new MountpointStatePublisherMain(cfg);
+    public void initialize() {
+        DeviceManagerServiceProvider serviceProvider = mock(DeviceManagerServiceProvider.class);
+        vesCollectorService = mock(VESCollectorService.class);
+        NetconfNetworkElementService netconfNetworkElementService = mock(NetconfNetworkElementService.class);
+        when(netconfNetworkElementService.getServiceProvider()).thenReturn(serviceProvider);
+        when(serviceProvider.getVESCollectorService()).thenReturn(vesCollectorService);
+        mountpointStatePublisher = new MountpointStatePublisher(vesCollectorService);
         nodeStateListener.start(mountpointStatePublisher);
     }
 
@@ -92,14 +76,4 @@
         nodeStateListener.onRemoved(nNodeId);
     }
 
-    @After
-    public void after() {
-        File file = new File(fileName);
-        if (file.exists()) {
-            System.out.println("File exists, Deleting it");
-            file.delete();
-        }
-
-    }
-
 }
diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/test/TestMountpointStateProviderImpl.java b/sdnr/wt/mountpoint-state-provider/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/test/TestMountpointStateProviderImpl.java
index 19d9308..1cebeb6 100644
--- a/sdnr/wt/mountpoint-state-provider/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/test/TestMountpointStateProviderImpl.java
+++ b/sdnr/wt/mountpoint-state-provider/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/test/TestMountpointStateProviderImpl.java
@@ -24,108 +24,46 @@
 
 package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.test;
 
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.io.FileNotFoundException;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 import java.io.IOException;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
 import org.junit.Test;
+import org.onap.ccsdk.features.sdnr.wt.devicemanager.service.DeviceManagerServiceProvider;
+import org.onap.ccsdk.features.sdnr.wt.devicemanager.service.NetconfNetworkElementService;
+import org.onap.ccsdk.features.sdnr.wt.devicemanager.service.VESCollectorService;
 import org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl.MountpointStateProviderImpl;
+import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfNodeStateService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class TestMountpointStateProviderImpl {
-
-    private static Path KARAF_ETC = Paths.get("etc");
-    private static MountpointStateProviderImpl mountpointStateProvider;
-
     private static final Logger LOG = LoggerFactory.getLogger(TestMountpointStateProviderImpl.class);
+    private MountpointStateProviderImpl mountpointStateProvider;
 
 
+    @Test
+    public void before() throws InterruptedException, IOException {
+        NetconfNodeStateService netconfNodeStateService = mock(NetconfNodeStateService.class);
+        DeviceManagerServiceProvider serviceProvider = mock(DeviceManagerServiceProvider.class);
+        VESCollectorService vesCollectorService = mock(VESCollectorService.class);
+        NetconfNetworkElementService netconfNetworkElementService = mock(NetconfNetworkElementService.class);
 
-    @BeforeClass
-    public static void before() throws InterruptedException, IOException {
+        when(netconfNetworkElementService.getServiceProvider()).thenReturn(serviceProvider);
+        when(serviceProvider.getVESCollectorService()).thenReturn(vesCollectorService);
 
-        System.out.println("Logger: " + LOG.getClass().getName() + " " + LOG.getName());
-        // Call System property to get the classpath value
-        Path etc = KARAF_ETC;
-        delete(etc);
-
-        System.out.println("Create empty:" + etc.toString());
-        Files.createDirectories(etc);
-
-        // Create mocks
-
-        // start using blueprint interface
-        try {
-            mountpointStateProvider = new MountpointStateProviderImpl();
-
-            //mountpointStateProvider.init(); // Can't be tested as this invokes a thread. Mockito doesn't help either
-        } catch (Exception e) {
-            StringWriter sw = new StringWriter();
-            e.printStackTrace(new PrintWriter(sw));
-            fail("Not initialized" + sw.toString());
-        }
-        System.out.println("Initialization status: " + mountpointStateProvider.isInitializationOk());
-        System.out.println("Initialization done");
+        mountpointStateProvider = new MountpointStateProviderImpl();
+        mountpointStateProvider.setNetconfNetworkElementService(netconfNetworkElementService);
+        mountpointStateProvider.setNetconfNodeStateService(netconfNodeStateService);
+        mountpointStateProvider.init();
     }
 
-    @AfterClass
-    public static void after() throws InterruptedException, IOException {
+    /*   @After
+    public void after() throws InterruptedException, IOException {
 
-        System.out.println("Start shutdown");
-        // close using blueprint interface
         try {
-            mountpointStateProvider.close();
+             mountpointStateProvider.close();
         } catch (Exception e) {
             System.out.println(e);
         }
-        delete(KARAF_ETC);
-
-    }
-
-    @Test
-    public void test1() {
-        System.out.println("Test1: slave mountpoint");
-        System.out.println("Initialization status: " + mountpointStateProvider.isInitializationOk());
-        System.out.println("Test2: Done");
-    }
-
-    // ********************* Private
-
-    private static void delete(Path etc) throws IOException {
-        if (Files.exists(etc)) {
-            System.out.println("Found and remove:" + etc.toString());
-            delete(etc.toFile());
-        }
-    }
-
-    private static void delete(File f) throws IOException {
-        if (f.isDirectory()) {
-            for (File c : f.listFiles()) {
-                delete(c);
-            }
-        }
-        if (!f.delete()) {
-            throw new FileNotFoundException("Failed to delete file: " + f);
-        }
-    }
-    /*	@Test
-    	public void testInit() {
-    
-    	}
-    
-    	@Test
-    	public void testOnConfigChanged() {
-    		//fail("Not yet implemented");
-    	}*/
-
+    }*/
 }
diff --git a/sdnr/wt/mountpoint-state-provider/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/test/TestMountpointStatePublisher.java b/sdnr/wt/mountpoint-state-provider/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/test/TestMountpointStatePublisher.java
index 5fc1c5e..468e0c1 100644
--- a/sdnr/wt/mountpoint-state-provider/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/test/TestMountpointStatePublisher.java
+++ b/sdnr/wt/mountpoint-state-provider/provider/src/test/java/org/onap/ccsdk/features/sdnr/wt/mountpointstateprovider/test/TestMountpointStatePublisher.java
@@ -24,136 +24,58 @@
 package org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.test;
 
 
-import java.io.File;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 import org.json.JSONObject;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-import org.onap.ccsdk.features.sdnr.wt.common.configuration.Configuration;
-import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation;
-import org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl.GeneralConfig;
-import org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl.MountpointStatePublisherMain;
-import org.onap.dmaap.mr.client.MRBatchingPublisher;
-import org.onap.dmaap.mr.client.response.MRPublisherResponse;
-import org.slf4j.Logger;
+import org.onap.ccsdk.features.sdnr.wt.devicemanager.service.DeviceManagerServiceProvider;
+import org.onap.ccsdk.features.sdnr.wt.devicemanager.service.NetconfNetworkElementService;
+import org.onap.ccsdk.features.sdnr.wt.devicemanager.service.VESCollectorCfgService;
+import org.onap.ccsdk.features.sdnr.wt.devicemanager.service.VESCollectorService;
+import org.onap.ccsdk.features.sdnr.wt.mountpointstateprovider.impl.MountpointStatePublisher;
 
 public class TestMountpointStatePublisher {
 
-    private static final String CONFIGURATIONTESTFILE = "test3.properties";
-    public Thread publisher;
-    MountpointStatePublisherMain mountpointStatePublisher;
-    ConfigurationFileRepresentation configFileRepresentation;
-    GeneralConfig cfg;
+    MountpointStatePublisher mountpointStatePublisher;
+    VESCollectorService vesCollectorService;
+    VESCollectorCfgService vesCfg;
+    String vesMsg = "{}";
+    JSONObject testJsonData;
 
     @Before
     public void testMountpointStatePublisherData() {
-        String testJsonData =
-                "{\"NodeId\":\"69322972e178_50001\",\"NetConfNodeState\":\"Connecting\",\"TimeStamp\":\"2019-11-12T12:45:08.604Z\"}";
-        configFileRepresentation =
-                new ConfigurationFileRepresentation(CONFIGURATIONTESTFILE);
-        cfg = new GeneralConfig(configFileRepresentation);
-        JSONObject jsonObj = new JSONObject(testJsonData);
-        mountpointStatePublisher = new MountpointStatePublisherMain(cfg);
-        mountpointStatePublisher.getStateObjects().add(jsonObj);
+        testJsonData = new JSONObject();
+        testJsonData.put("NodeId", "69322972e178_50001");
+        testJsonData.put("NetConfNodeState", "Connecting");
+        testJsonData.put("TimeStamp", java.time.Clock.systemUTC().instant());
+
+
+        DeviceManagerServiceProvider serviceProvider = mock(DeviceManagerServiceProvider.class);
+        vesCollectorService = mock(VESCollectorService.class);
+        vesCfg = mock(VESCollectorCfgService.class);
+        NetconfNetworkElementService netconfNetworkElementService = mock(NetconfNetworkElementService.class);
+        when(netconfNetworkElementService.getServiceProvider()).thenReturn(serviceProvider);
+        when(serviceProvider.getVESCollectorService()).thenReturn(vesCollectorService);
+        when(vesCollectorService.getConfig()).thenReturn(vesCfg);
+        when(vesCfg.getReportingEntityName()).thenReturn("ONAP SDN-R");
+        when(vesCollectorService.publishVESMessage(vesMsg)).thenReturn(true);
+
+        mountpointStatePublisher = new MountpointStatePublisher(vesCollectorService);
+        mountpointStatePublisher.addToPublish(testJsonData);
+        //mountpointStatePublisher.getStateObjects().add(testJsonData);
     }
 
     @Test
     public void testMountpointStatePublisherConfiguration() throws InterruptedException {
-        ConfigurationFileRepresentation configFileRepresentation =
-                new ConfigurationFileRepresentation(CONFIGURATIONTESTFILE);
-        GeneralConfig cfg = new GeneralConfig(configFileRepresentation);
-
-        MountpointStatePublisherMain pub = new MountpointStatePublisherMock(cfg);
-        pub.createPublisher(null);
-        pub.publishMessage(pub.createPublisher(null), "Test DMaaP Message");
-
-    }
-
-    public class MountpointStatePublisherMock extends MountpointStatePublisherMain {
-
-        public MountpointStatePublisherMock(Configuration config) {
-            super(config);
-        }
-
-        @Override
-        public MRBatchingPublisher createPublisher(Properties publisherProperties) {
-
-            return new MRBatchingPublisher() {
-
-                @Override
-                public int send(String msg) throws IOException {
-                    System.out.println("Message to send - " + msg);
-                    return 0;
-                }
-
-                @Override
-                public int send(String partition, String msg) throws IOException {
-                    return 0;
-                }
-
-                @Override
-                public int send(message msg) throws IOException {
-                    return 0;
-                }
-
-                @Override
-                public int send(Collection<message> msgs) throws IOException {
-                    return 0;
-                }
-
-                @Override
-                public void close() {
-
-                }
-
-                @Override
-                public void logTo(Logger log) {
-
-                }
-
-                @Override
-                public void setApiCredentials(String apiKey, String apiSecret) {
-
-                }
-
-                @Override
-                public void clearApiCredentials() {
-
-                }
-
-                @Override
-                public int getPendingMessageCount() {
-                    return 0;
-                }
-
-                @Override
-                public List<message> close(long timeout, TimeUnit timeoutUnits)
-                        throws IOException, InterruptedException {
-                    return null;
-                }
-
-                @Override
-                public MRPublisherResponse sendBatchWithResponse() {
-                    return null;
-                }
-
-            };
-        }
+        Thread t = new Thread(mountpointStatePublisher);
+        t.start();
+        Thread.sleep(7000);
     }
 
     @After
-    public void after() {
-        File file = new File(CONFIGURATIONTESTFILE);
-        if (file.exists()) {
-            System.out.println("File exists, Deleting it");
-            file.delete();
-        }
-
+    public void close() {
+        mountpointStatePublisher.stop();
     }
-
 }