Merge "netconf-executor: NetconfSessionImplTest"
diff --git a/ms/blueprintsprocessor/functions/netconf-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfDeviceCommunicator.kt b/ms/blueprintsprocessor/functions/netconf-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfDeviceCommunicator.kt
index 12e3b83..6ef4f41 100644
--- a/ms/blueprintsprocessor/functions/netconf-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfDeviceCommunicator.kt
+++ b/ms/blueprintsprocessor/functions/netconf-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfDeviceCommunicator.kt
@@ -30,6 +30,7 @@
 import java.io.OutputStreamWriter
 import java.nio.charset.StandardCharsets
 import java.util.concurrent.CompletableFuture
+import java.util.concurrent.TimeUnit
 
 class NetconfDeviceCommunicator(private var inputStream: InputStream,
                                 private var out: OutputStream,
@@ -232,4 +233,21 @@
             NetconfMessageUtils.getMsgId(deviceReply),
             deviceInfo))
     }
+
+    /**
+     * Gets the value of the {@link CompletableFuture} from {@link NetconfDeviceCommunicator#sendMessage}
+     * This function is used by NetconfSessionImpl. Needed to wrap exception testing in NetconfSessionImpl.
+     * @param fut {@link CompletableFuture} object
+     * @param timeout the maximum time to wait
+     * @param timeUnit the time unit of the timeout argument
+     * @return the result value
+     * @throws CancellationException if this future was cancelled
+     * @throws ExecutionException if this future completed exceptionally
+     * @throws InterruptedException if the current thread was interrupted while waiting
+     * @throws TimeoutException if the wait timed outStream
+     */
+    internal fun getFutureFromSendMessage(
+        fut: CompletableFuture<String>, timeout: Long, timeUnit: TimeUnit): String {
+        return fut.get(timeout, timeUnit)
+    }
 }
diff --git a/ms/blueprintsprocessor/functions/netconf-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfSessionImpl.kt b/ms/blueprintsprocessor/functions/netconf-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfSessionImpl.kt
index 12eb43f..7e56e3e 100644
--- a/ms/blueprintsprocessor/functions/netconf-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfSessionImpl.kt
+++ b/ms/blueprintsprocessor/functions/netconf-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfSessionImpl.kt
@@ -34,22 +34,21 @@
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.RpcStatus
 import org.slf4j.LoggerFactory
 import java.io.IOException
-import java.util.*
+import java.util.Collections
 import java.util.concurrent.CompletableFuture
 import java.util.concurrent.ConcurrentHashMap
 import java.util.concurrent.ExecutionException
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.TimeoutException
-import java.util.concurrent.atomic.AtomicReference
 
 class NetconfSessionImpl(private val deviceInfo: DeviceInfo, private val rpcService: NetconfRpcService) :
     NetconfSession {
 
     private val log = LoggerFactory.getLogger(NetconfSessionImpl::class.java)
 
-    private val errorReplies: MutableList<String> = Collections.synchronizedList(listOf())
+    private val errorReplies: MutableList<String> = Collections.synchronizedList(mutableListOf())
     private val replies: MutableMap<String, CompletableFuture<String>> = ConcurrentHashMap()
-    private val deviceCapabilities = setOf<String>()
+    private val deviceCapabilities = mutableSetOf<String>()
 
     private var connectionTimeout: Long = 0
     private var replyTimeout: Int = 0
@@ -117,11 +116,8 @@
             } catch (ioe: IOException) {
                 log.warn("$deviceInfo: Error closing session($sessionId) for host($deviceInfo)", ioe)
             }
-
-//            NetconfReceivedEvent(NetconfReceivedEvent.Type.SESSION_CLOSED, "",
-//                "Closed due to unexpected error " + e.cause, "-1", deviceInfo)
-            errorReplies.clear() // move to cleanUp()?
-            replies.clear()
+            clearErrorReplies()
+            clearReplies()
 
             throw NetconfException("$deviceInfo: Closing session $sessionId for request $formattedRequest", e)
         }
@@ -144,27 +140,26 @@
         try {
             if (client.isClosed) {
                 log.info("Trying to restart the whole SSH connection with {}", deviceInfo)
-                replies.clear()
+                clearReplies()
                 startConnection()
             } else if (session.isClosed) {
                 log.info("Trying to restart the session with {}", deviceInfo)
-                replies.clear()
+                clearReplies()
                 startSession()
             } else if (channel.isClosed) {
                 log.info("Trying to reopen the channel with {}", deviceInfo)
-                replies.clear()
+                clearReplies()
                 openChannel()
             } else {
                 return
             }
         } catch (e: IOException) {
-            log.error("Can't reopen connection for device {}", e.message)
+            log.error("Can't reopen connection for device {} error: {}", deviceInfo, e.message)
             throw NetconfException(String.format("Cannot re-open the connection with device (%s)", deviceInfo), e)
         } catch (e: IllegalStateException) {
-            log.error("Can't reopen connection for device {}", e.message)
+            log.error("Can't reopen connection for device {} error: {}", deviceInfo, e.message)
             throw NetconfException(String.format("Cannot re-open the connection with device (%s)", deviceInfo), e)
         }
-
     }
 
     override fun getDeviceInfo(): DeviceInfo {
@@ -191,8 +186,13 @@
 
     }
 
-    private fun startClient() {
+    //Needed to unit test connect method interacting with client.start in startClient() below
+    private fun setupNewSSHClient() {
         client = SshClient.setUpDefaultClient()
+    }
+
+    private fun startClient() {
+        setupNewSSHClient()
 
         client.properties.putIfAbsent(FactoryManager.IDLE_TIMEOUT, TimeUnit.SECONDS.toMillis(idleTimeout.toLong()))
         client.properties.putIfAbsent(FactoryManager.NIO2_READ_TIMEOUT, TimeUnit.SECONDS.toMillis(idleTimeout + 15L))
@@ -304,4 +304,7 @@
 
     internal fun clearErrorReplies() = errorReplies.clear()
     internal fun clearReplies() = replies.clear()
+    internal fun setClient(client: SshClient) { this.client = client }
+    internal fun setSession(session: ClientSession) { this.session = session }
+    internal fun setChannel(channel: ClientChannel) { this.channel = channel }
 }
\ No newline at end of file
diff --git a/ms/blueprintsprocessor/functions/netconf-executor/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/NetconfSessionImplTest.kt b/ms/blueprintsprocessor/functions/netconf-executor/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/NetconfSessionImplTest.kt
deleted file mode 100644
index b462ad0..0000000
--- a/ms/blueprintsprocessor/functions/netconf-executor/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/NetconfSessionImplTest.kt
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Copyright © 2017-2018 AT&T Intellectual Property.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor
-
-import org.apache.sshd.client.channel.ChannelSubsystem
-import org.apache.sshd.client.session.ClientSessionImpl
-import org.junit.After
-import org.junit.Assert
-import org.junit.Before
-import org.junit.Test
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.DeviceInfo
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.core.NetconfRpcServiceImpl
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.core.NetconfSessionImpl
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.mocks.NetconfDeviceSimulator
-import java.util.concurrent.atomic.AtomicReference
-import kotlin.script.experimental.api.asSuccess
-
-class NetconfSessionImplTest {
-
-    private var device: NetconfDeviceSimulator? = null
-    private var deviceInfo: DeviceInfo? = null
-
-    @Before
-    fun before() {
-        deviceInfo = DeviceInfo().apply {
-            username = "username"
-            password = "password"
-            ipAddress = "localhost"
-            port = 2224
-            connectTimeout = 10
-        }
-
-        device = NetconfDeviceSimulator(deviceInfo!!.port)
-        device!!.start()
-    }
-
-    @After
-    fun after() {
-        device!!.stop()
-    }
-
-    @Throws(Exception::class)
-    fun testNetconfSession() {
-        val netconfSession = NetconfSessionImpl(deviceInfo!!, NetconfRpcServiceImpl(DeviceInfo()))
-
-        Assert.assertNotNull(netconfSession.getSessionId())
-        Assert.assertEquals("localhost:2224", netconfSession.getDeviceInfo().toString())
-
-        netconfSession.checkAndReestablish()
-
-        Assert.assertNotNull(netconfSession.getSessionId())
-        Assert.assertEquals("localhost:2224", netconfSession.getDeviceInfo().toString())
-
-        Assert.assertTrue(!netconfSession.getDeviceCapabilitiesSet().isEmpty())
-    }
-
-    @Test
-    fun testNetconfSessionconnect() {
-        val netconfSession = NetconfSessionImpl(deviceInfo!!, NetconfRpcServiceImpl(deviceInfo!!))
-        netconfSession.connect()
-        Assert.assertTrue(netconfSession.sessionstatus("Open"))
-    }
-
-    @Test
-    fun testNetconfSessionreconnect() {
-        val netconfSession = NetconfSessionImpl(deviceInfo!!, NetconfRpcServiceImpl(deviceInfo!!))
-        netconfSession.connect()
-        netconfSession.reconnect()
-        Assert.assertTrue(netconfSession.sessionstatus("Open"))
-
-    }
-    @Test
-    fun testNetconfSessiondisconnect() {
-        val netconfSession = NetconfSessionImpl(deviceInfo!!, NetconfRpcServiceImpl(deviceInfo!!))
-        netconfSession.connect()
-        netconfSession.disconnect()
-        Assert.assertTrue(netconfSession.sessionstatus("Close"))
-
-    }
-    @Test
-    fun testNetconfSessioncheckAndReestablish() {
-        val netconfSession = NetconfSessionImpl(deviceInfo!!, NetconfRpcServiceImpl(deviceInfo!!))
-        netconfSession.connect()
-        netconfSession.checkAndReestablish()
-        Assert.assertTrue(netconfSession.sessionstatus("Open"))
-
-
-    }
-    @Test
-    fun testNetconfSessionconnecgetDeviceInfo() {
-        val netconfSession = NetconfSessionImpl(deviceInfo!!, NetconfRpcServiceImpl(deviceInfo!!))
-        netconfSession.connect()
-        Assert.assertNotNull(netconfSession.getDeviceInfo())
-        Assert.assertFalse(!netconfSession.getDeviceCapabilitiesSet().isEmpty())
-    }
-
-
-}
diff --git a/ms/blueprintsprocessor/functions/netconf-executor/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfSessionImplTest.kt b/ms/blueprintsprocessor/functions/netconf-executor/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfSessionImplTest.kt
new file mode 100644
index 0000000..1f526f4
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/netconf-executor/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfSessionImplTest.kt
@@ -0,0 +1,543 @@
+/*
+ * Copyright © 2019 Bell Canada
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.core
+
+import io.mockk.CapturingSlot
+import io.mockk.Runs
+import io.mockk.every
+import io.mockk.just
+import io.mockk.mockk
+import io.mockk.spyk
+import io.mockk.verify
+import org.apache.sshd.client.SshClient
+import org.apache.sshd.client.channel.ChannelSubsystem
+import org.apache.sshd.client.channel.ClientChannel
+import org.apache.sshd.client.future.DefaultAuthFuture
+import org.apache.sshd.client.future.DefaultConnectFuture
+import org.apache.sshd.client.future.DefaultOpenFuture
+import org.apache.sshd.client.session.ClientSession
+import org.apache.sshd.common.FactoryManager
+import org.junit.Before
+import org.junit.Ignore
+import org.junit.Test
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.DeviceInfo
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.DeviceResponse
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfException
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfRpcService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.NetconfMessageUtils
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.RpcStatus
+import java.io.ByteArrayInputStream
+import java.io.ByteArrayOutputStream
+import java.io.IOException
+import java.io.InputStream
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.CompletableFuture
+import java.util.concurrent.ExecutionException
+import java.util.concurrent.TimeoutException
+import kotlin.test.assertEquals
+import kotlin.test.assertFailsWith
+import kotlin.test.assertTrue
+
+class NetconfSessionImplTest {
+    companion object {
+        val SUCCESSFUL_DEVICE_RESPONSE = DeviceResponse().apply {
+            status = RpcStatus.SUCCESS
+            errorMessage = ""
+            responseMessage = ""
+            requestMessage = ""
+        }
+        val FAILED_DEVICE_RESPONSE = DeviceResponse().apply {
+            status = RpcStatus.FAILURE
+            errorMessage = ""
+            responseMessage = ""
+            requestMessage = ""
+        }
+        val deviceInfo: DeviceInfo = DeviceInfo().apply {
+            username = "username"
+            password = "password"
+            ipAddress = "localhost"
+            port = 2224
+            connectTimeout = 10
+        }
+        private const val someString = "Some string"
+    }
+
+    private lateinit var netconfSession: NetconfSessionImpl
+    private lateinit var netconfCommunicator: NetconfDeviceCommunicator
+    private lateinit var rpcService: NetconfRpcService
+    private lateinit var mockSshClient: SshClient
+    private lateinit var mockClientSession: ClientSession
+    private lateinit var mockClientChannel: ClientChannel
+    private lateinit var mockSubsystem: ChannelSubsystem
+
+    private val futureMsg = "blahblahblah"
+    private val request = "0"
+    private val sessionId = "0"
+    private val messageId = "asdfasdfadf"
+    private val deviceCapabilities = setOf("capability1", "capability2")
+    private val formattedRequest = NetconfMessageUtils.formatRPCRequest(request, messageId, deviceCapabilities)
+    private lateinit var sampleInputStream: InputStream
+    private lateinit var sampleOutputStream: ByteArrayOutputStream
+
+    @Before
+    fun setup() {
+        netconfCommunicator = mockk()
+        rpcService = mockk()
+        netconfSession = NetconfSessionImpl(deviceInfo, rpcService)
+        netconfSession.setStreamHandler(netconfCommunicator)
+        mockSshClient = mockk()
+        mockClientSession = mockk()
+        mockClientChannel = mockk()
+        mockSubsystem = mockk()
+        sampleInputStream = ByteArrayInputStream(someString.toByteArray(StandardCharsets.UTF_8))
+        sampleOutputStream = ByteArrayOutputStream()
+    }
+
+    @Test
+    fun `connect calls appropriate methods`() {
+        val session = spyk(netconfSession, recordPrivateCalls = true)
+        every { session["startClient"]() as Unit } just Runs
+        session.connect()
+        verify { session["startClient"]() }
+    }
+
+    //look for NetconfException being thrown when cannot connect
+    @Test
+    fun `connect throws NetconfException on error`() {
+        val errMsg = "$deviceInfo: Failed to establish SSH session"
+        assertFailsWith(exceptionClass = NetconfException::class, message = errMsg) {
+            val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
+            every { netconfSessionSpy["startClient"]() as Unit } throws NetconfException(errMsg)
+            netconfSessionSpy.connect()
+        }
+    }
+
+    @Test
+    fun `disconnect without force option for rpcService succeeds`() {
+        //rpcService.closeSession succeeds with status not RpcStatus.FAILURE
+        every { rpcService.closeSession(false) } returns SUCCESSFUL_DEVICE_RESPONSE
+        every { mockClientSession.close() } just Runs
+        every { mockSshClient.close() } just Runs
+        every { mockClientChannel.close() } just Runs
+        val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
+        netconfSessionSpy.setSession(mockClientSession)
+        netconfSessionSpy.setClient(mockSshClient)
+        netconfSessionSpy.setChannel(mockClientChannel)
+        //RUN
+        netconfSessionSpy.disconnect()
+        //make sure that rpcService.close session is not called again.
+        verify(exactly = 0) { rpcService.closeSession(true) }
+        verify { mockClientSession.close() }
+        verify { mockSshClient.close() }
+        verify { mockClientChannel.close() }
+    }
+
+    @Test
+    fun `disconnect with force option for rpcService succeeds`() {
+        //rpcService.closeSession succeeds with status not RpcStatus.FAILURE
+        val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
+        every { rpcService.closeSession(any()) } returns
+            FAILED_DEVICE_RESPONSE andThen SUCCESSFUL_DEVICE_RESPONSE
+        every { mockClientSession.close() } just Runs
+        every { mockSshClient.close() } just Runs
+        every { mockClientChannel.close() } just Runs
+        netconfSessionSpy.setSession(mockClientSession)
+        netconfSessionSpy.setClient(mockSshClient)
+        netconfSessionSpy.setChannel(mockClientChannel)
+        //RUN
+        netconfSessionSpy.disconnect()
+        //VERIFY
+        verify(exactly = 2) { rpcService.closeSession(any()) }
+        verify { mockClientSession.close() }
+        verify { mockSshClient.close() }
+        verify { mockClientChannel.close() }
+
+    }
+
+    @Ignore //TODO undo close method removal
+    @Test
+    fun `disconnect wraps exception from ssh closing error`() {
+        val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
+        every { netconfSessionSpy["close"]() as Unit } throws IOException("Some IOException occurred!")
+        every { rpcService.closeSession(false) } returns SUCCESSFUL_DEVICE_RESPONSE
+        every { netconfSessionSpy.checkAndReestablish() } just Runs
+        netconfSessionSpy.disconnect()
+        verify { netconfSessionSpy["close"]() }
+    }
+
+    @Test
+    fun `reconnect calls disconnect and connect`() {
+        val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
+        every { netconfSessionSpy.disconnect() } just Runs
+        every { netconfSessionSpy.connect() } just Runs
+        netconfSessionSpy.reconnect()
+        verify { netconfSessionSpy.disconnect() }
+        verify { netconfSessionSpy.connect() }
+    }
+
+    @Test
+    fun `checkAndReestablish restarts connection and clears replies on sshClient disconnection`() {
+        val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
+        every { mockSshClient.isClosed } returns true
+        netconfSessionSpy.setClient(mockSshClient)
+        every { netconfSessionSpy["startConnection"]() as Unit } just Runs
+        //Call method
+        netconfSessionSpy.checkAndReestablish()
+        //Verify
+        verify { netconfSessionSpy.clearReplies() }
+        verify { netconfSessionSpy["startConnection"]() }
+    }
+
+    @Test
+    fun `checkAndReestablish restarts session and clears replies on clientSession closing`() {
+        val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
+        every { mockClientSession.isClosed } returns true
+        every { mockSshClient.isClosed } returns false
+        every { netconfSessionSpy["startSession"]() as Unit } just Runs
+        netconfSessionSpy.setClient(mockSshClient)
+        netconfSessionSpy.setSession(mockClientSession)
+        //Call method
+        netconfSessionSpy.checkAndReestablish()
+        //Verify
+        verify { netconfSessionSpy.clearReplies() }
+        verify { netconfSessionSpy["startSession"]() }
+    }
+
+    @Test
+    fun `checkAndReestablish reopens channel and clears replies on channel closing`() {
+        val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
+        every { mockClientSession.isClosed } returns false
+        every { mockSshClient.isClosed } returns false
+        every { mockClientChannel.isClosed } returns true
+        every { netconfSessionSpy["openChannel"]() as Unit } just Runs
+        netconfSessionSpy.setClient(mockSshClient)
+        netconfSessionSpy.setSession(mockClientSession)
+        netconfSessionSpy.setChannel(mockClientChannel)
+        //Call method
+        netconfSessionSpy.checkAndReestablish()
+        //Verify
+        verify { netconfSessionSpy.clearReplies() }
+        verify { netconfSessionSpy["openChannel"]() }
+    }
+
+
+    @Test
+    fun `syncRpc runs normally`() {
+        val netconfSessionSpy = spyk(netconfSession)
+        val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
+
+        //test the case where SSH connection did not need to be re-established.
+        //put an existing item into the replies
+        netconfSessionSpy.getReplies()["somekey"] = CompletableFuture.completedFuture("${futureMsg}2")
+        every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
+        every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } returns futureRet.get()
+        every { netconfSessionSpy.checkAndReestablish() } just Runs
+        //call the method
+        assertEquals(futureMsg, netconfSessionSpy.syncRpc("0", "0"))
+        //make sure the replies didn't change
+        assertTrue {
+            netconfSessionSpy.getReplies().size == 1 &&
+                netconfSessionSpy.getReplies().containsKey("somekey")
+        }
+        verify(exactly = 0) { netconfSessionSpy.clearReplies() }
+    }
+
+
+    @Test
+    fun `syncRpc still succeeds and replies are cleared on client disconnect`() {
+        val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
+        val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
+
+        //put an item into the replies
+        netconfSessionSpy.getReplies()["somekey"] = CompletableFuture.completedFuture("${futureMsg}2")
+
+        //tests the case where SSH session needs to be re-established.
+        every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
+        every { netconfSessionSpy["startClient"]() as Unit } just Runs
+        every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } returns futureRet.get()
+        every { mockSshClient.isClosed } returns true
+        netconfSessionSpy.setClient(mockSshClient)
+
+        //call the method
+        assertEquals(futureMsg, netconfSessionSpy.syncRpc("0", "0"))
+        //make sure the replies got cleared out
+        assertTrue { netconfSessionSpy.getReplies().isEmpty() }
+        verify(exactly = 1) { netconfSessionSpy.clearReplies() }
+    }
+
+    @Ignore //TODO
+    //Test for handling CompletableFuture.get returns InterruptedException inside NetconfDeviceCommunicator
+    @Test
+    fun `syncRpc throws NetconfException if InterruptedException is caught`() {
+        val expectedExceptionMsg = "$deviceInfo: Interrupted while waiting for reply for request: $formattedRequest"
+        assertFailsWith(exceptionClass = NetconfException::class, message = expectedExceptionMsg) {
+            val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
+            val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
+            every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
+            every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } throws InterruptedException("interrupted")
+            every { netconfSessionSpy.checkAndReestablish() } just Runs
+            //call the method
+            netconfSessionSpy.syncRpc("0", "0")
+        }
+    }
+
+    @Ignore //TODO revert back on getFutureFromSendMessage
+    @Test
+    fun `syncRpc throws NetconfException if TimeoutException is caught`() {
+        val expectedExceptionMsg = "$deviceInfo: Timed out while waiting for reply for request $formattedRequest after ${deviceInfo.replyTimeout} sec."
+        assertFailsWith(exceptionClass = NetconfException::class, message = expectedExceptionMsg) {
+            val netconfSessionSpy = spyk(netconfSession)
+            val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
+            every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
+            every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } throws TimeoutException("timed out")
+            every { netconfSessionSpy.checkAndReestablish() } just Runs
+            //call the method
+            netconfSessionSpy.syncRpc("0", "0")
+        }
+    }
+
+    @Ignore
+    @Test
+    fun `syncRpc throws NetconfException if ExecutionException is caught`() {
+        val expectedExceptionMsg = "$deviceInfo: Closing session $sessionId for request $formattedRequest"
+        assertFailsWith(exceptionClass = NetconfException::class, message = expectedExceptionMsg) {
+            val netconfSessionSpy = spyk(netconfSession)
+            val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
+            every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
+            every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } throws
+                ExecutionException("exec exception", Exception("nested exception")) //TODO revert getFutureFromSendMessage back
+            every { netconfSessionSpy.checkAndReestablish() } just Runs
+            //call the method
+            netconfSessionSpy.syncRpc("0", "0")
+        }
+    }
+
+    @Ignore //TODO revert back on getFutureFromSendMessage
+    @Test
+    fun `syncRpc throws NetconfException if caught ExecutionException and failed to close SSH session`() {
+        val expectedExceptionMsg = "$deviceInfo: Closing session $sessionId for request $formattedRequest"
+        assertFailsWith(exceptionClass = NetconfException::class, message = expectedExceptionMsg) {
+            val netconfSessionSpy = spyk(netconfSession)
+            val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
+            every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
+            every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } throws
+                ExecutionException("exec exception", Exception("nested exception"))
+            every { netconfSessionSpy["close"]() as Unit } throws IOException("got an IO exception")
+            every { netconfSessionSpy.checkAndReestablish() } just Runs
+            //call the method
+            netconfSessionSpy.syncRpc("0", "0")
+            //make sure replies are cleared...
+            verify(exactly = 1) { netconfSessionSpy.clearReplies() }
+            verify(exactly = 1) { netconfSessionSpy.clearErrorReplies() }
+        }
+    }
+
+    @Test
+    fun `asyncRpc runs normally`() {
+        val netconfSessionSpy = spyk(netconfSession)
+        every { netconfSessionSpy.checkAndReestablish() } just Runs
+        val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
+        every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
+        //run the method
+        val rpcResultFuture = netconfSessionSpy.asyncRpc("0", "0")
+        every { netconfSessionSpy.checkAndReestablish() } just Runs
+        //make sure the future gets resolved
+        assertTrue { rpcResultFuture.get() == futureMsg }
+        //make sure that clearReplies wasn't called (reestablishConnection check)
+        verify(exactly = 0) { netconfSessionSpy.clearReplies() }
+    }
+
+    @Test
+    @Ignore
+    //TODO: get 't' inside asyncRpc to be a Throwable
+    fun `asyncRpc wraps exception`() {
+        assertFailsWith(exceptionClass = NetconfException::class, message = futureMsg) {
+            val netconfSessionSpy = spyk(netconfSession)
+            val futureRet: CompletableFuture<String> = CompletableFuture.supplyAsync {
+                throw Exception("blah")
+            }
+            futureRet.completeExceptionally(IOException("something is wrong"))
+            every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
+            //RUN
+            val rpcResultFuture = netconfSessionSpy.asyncRpc("0", "0")
+        }
+    }
+
+    @Test
+    fun `connect starts underlying client`() {
+        val propertiesMap = hashMapOf<String, Any>()
+        every { mockSshClient.start() } just Runs
+        every { mockSshClient.properties } returns propertiesMap
+        val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
+        every { netconfSessionSpy["setupNewSSHClient"]() as Unit } just Runs
+        every { netconfSessionSpy["startSession"]() as Unit } just Runs
+        netconfSessionSpy.setClient(mockSshClient)
+        netconfSessionSpy.connect()
+        verify { mockSshClient.start() }
+        assertTrue { propertiesMap.containsKey(FactoryManager.IDLE_TIMEOUT) }
+        assertTrue { propertiesMap.containsKey(FactoryManager.NIO2_READ_TIMEOUT) }
+    }
+
+    @Test
+    fun `startSession tries to connect to user supplied device`() {
+        every { mockSshClient.start() } just Runs
+        every { mockSshClient.properties } returns hashMapOf<String, Any>()
+        //setup slots to capture values from the invocations
+        val userSlot = CapturingSlot<String>()
+        val ipSlot = CapturingSlot<String>()
+        val portSlot = CapturingSlot<Int>()
+        //create a future that succeeded
+        val succeededFuture = DefaultConnectFuture(Any(), Any())
+        succeededFuture.value = mockClientSession
+        every { mockSshClient.connect(capture(userSlot), capture(ipSlot), capture(portSlot)) } returns succeededFuture
+        val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
+        every { netconfSessionSpy["authSession"]() as Unit } just Runs
+        every { netconfSessionSpy["setupNewSSHClient"]() as Unit } just Runs
+        netconfSessionSpy.setClient(mockSshClient)
+        //RUN
+        netconfSessionSpy.connect()
+        //Verify
+        verify { mockSshClient.connect(deviceInfo.username, deviceInfo.ipAddress, deviceInfo.port) }
+        assertEquals(deviceInfo.username, userSlot.captured)
+        assertEquals(deviceInfo.ipAddress, ipSlot.captured)
+        assertEquals(deviceInfo.port, portSlot.captured)
+        verify { netconfSessionSpy["authSession"]() }
+    }
+
+    @Test
+    fun `authSession throws exception if ClientSession is not AUTHED`() {
+        assertFailsWith(exceptionClass = NetconfException::class) {
+            //after client session connects,
+            every { mockSshClient.start() } just Runs
+            every { mockSshClient.properties } returns hashMapOf<String, Any>()
+            val succeededAuthFuture = DefaultAuthFuture(Any(), Any())
+            succeededAuthFuture.value = true //AuthFuture's value is Boolean
+            val passSlot = CapturingSlot<String>()
+            every { mockClientSession.addPasswordIdentity(capture(passSlot)) } just Runs
+            every { mockClientSession.auth() } returns succeededAuthFuture
+            val succeededSessionFuture = DefaultConnectFuture(Any(), Any())
+            succeededSessionFuture.value = mockClientSession
+            every { mockSshClient.connect(deviceInfo.username, deviceInfo.ipAddress, deviceInfo.port) } returns succeededSessionFuture
+            every { mockClientSession.waitFor(any(), any()) } returns
+                setOf(ClientSession.ClientSessionEvent.WAIT_AUTH, ClientSession.ClientSessionEvent.CLOSED)
+            val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
+            every { netconfSessionSpy["setupNewSSHClient"]() as Unit } just Runs
+            netconfSessionSpy.setClient(mockSshClient)
+            //RUN
+            netconfSessionSpy.connect()
+        }
+    }
+
+    //common mock initializer for more weird tests.
+    private fun setupOpenChannelMocks(): Unit {
+        every { mockSshClient.start() } just Runs
+        every { mockSshClient.properties } returns hashMapOf<String, Any>()
+        val succeededAuthFuture = DefaultAuthFuture(Any(), Any())
+        succeededAuthFuture.value = true //AuthFuture's value is Boolean
+        val passSlot = CapturingSlot<String>()
+        every { mockClientSession.addPasswordIdentity(capture(passSlot)) } just Runs
+        every { mockClientSession.auth() } returns succeededAuthFuture
+        val succeededSessionFuture = DefaultConnectFuture(Any(), Any())
+        succeededSessionFuture.value = mockClientSession
+        every { mockSshClient.connect(deviceInfo.username, deviceInfo.ipAddress, deviceInfo.port) } returns succeededSessionFuture
+        every { mockClientSession.waitFor(any(), any()) } returns
+            setOf(ClientSession.ClientSessionEvent.WAIT_AUTH,
+                ClientSession.ClientSessionEvent.CLOSED,
+                ClientSession.ClientSessionEvent.AUTHED)
+
+        every { mockClientSession.createSubsystemChannel(any()) } returns mockSubsystem
+        every { mockClientChannel.invertedOut } returns sampleInputStream
+        every { mockClientChannel.invertedIn } returns sampleOutputStream
+    }
+
+    @Test
+    fun `authSession opensChannel if ClientSession is AUTHED and session can be opened`() {
+        //after client session connects, make sure the client receives authentication
+        setupOpenChannelMocks()
+        val channelFuture = DefaultOpenFuture(Any(), Any())
+        channelFuture.value = true
+        channelFuture.setOpened()
+        val connectFuture = DefaultConnectFuture(Any(), Any())
+        connectFuture.value = mockClientSession
+        connectFuture.session = mockClientSession
+        every { mockSubsystem.open() } returns channelFuture
+        every { mockSshClient.connect(deviceInfo.username, deviceInfo.ipAddress, deviceInfo.port) } returns connectFuture
+
+        val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
+        every { netconfSessionSpy["setupNewSSHClient"]() as Unit } just Runs
+        every { netconfSessionSpy["setupHandler"]() as Unit } just Runs
+        netconfSessionSpy.setClient(mockSshClient)
+        //Run
+        netconfSessionSpy.connect()
+        //Verify
+        verify { mockSubsystem.open() }
+    }
+
+
+    @Test
+    fun `authSession throws NetconfException if ClientSession is AUTHED but channelFuture timed out or not open`() {
+        assertFailsWith(exceptionClass = NetconfException::class) {
+            //after client session connects, make sure the client receives authentication
+            setupOpenChannelMocks()
+            val channelFuture = DefaultOpenFuture(Any(), Any())
+            every { mockSubsystem.open() } returns channelFuture
+            val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
+            every { netconfSessionSpy["setupNewSSHClient"]() as Unit } just Runs
+            every { netconfSessionSpy["setupHandler"]() as Unit } just Runs
+            netconfSessionSpy.setClient(mockSshClient)
+            //Run
+            netconfSessionSpy.connect()
+            //Verify
+            verify { mockSubsystem.open() }
+        }
+    }
+
+
+    @Test
+    fun `disconnect closes session, channel, and client`() {
+        every { rpcService.closeSession(false) } returns SUCCESSFUL_DEVICE_RESPONSE
+        every { mockClientSession.close() } just Runs
+        every { mockClientChannel.close() } just Runs
+        every { mockSshClient.close() } just Runs
+        val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
+        netconfSessionSpy.setChannel(mockClientChannel)
+        netconfSessionSpy.setClient(mockSshClient)
+        netconfSessionSpy.setSession(mockClientSession)
+        //RUN
+        netconfSessionSpy.disconnect()
+        //VERIFY
+        verify { mockClientSession.close() }
+        verify { mockClientChannel.close() }
+        verify { mockSshClient.close() }
+    }
+
+    @Ignore
+    @Test
+    fun `disconnect wraps IOException if channel doesn't close`() { //this test is equivalent to others
+        every { rpcService.closeSession(false) } returns SUCCESSFUL_DEVICE_RESPONSE
+        every { mockClientSession.close() } just Runs
+        every { mockClientChannel.close() } throws IOException("channel doesn't want to close!")
+        val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
+        netconfSessionSpy.setChannel(mockClientChannel)
+        netconfSessionSpy.setClient(mockSshClient)
+        netconfSessionSpy.setSession(mockClientSession)
+        //RUN
+        netconfSessionSpy.disconnect()
+        //VERIFY
+        verify { mockClientSession.close() }
+    }
+}
diff --git a/ms/blueprintsprocessor/functions/netconf-executor/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/mocks/NetconfDeviceSimulator.kt b/ms/blueprintsprocessor/functions/netconf-executor/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/mocks/NetconfDeviceSimulator.kt
deleted file mode 100644
index 2b7aa76..0000000
--- a/ms/blueprintsprocessor/functions/netconf-executor/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/mocks/NetconfDeviceSimulator.kt
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Copyright © 2017-2018 AT&T Intellectual Property.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.mocks
-
-
-import org.apache.sshd.common.NamedFactory
-import org.apache.sshd.server.command.Command
-import org.apache.sshd.server.SshServer
-import org.apache.sshd.server.auth.UserAuth
-import org.apache.sshd.server.auth.UserAuthNoneFactory
-import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider
-import java.util.*
-
-
-class NetconfDeviceSimulator(private val port: Int) {
-
-    private var sshd: SshServer? = null
-
-    fun start() {
-        sshd = SshServer.setUpDefaultServer()
-        sshd!!.port = port
-        sshd!!.keyPairProvider = SimpleGeneratorHostKeyProvider()
-
-        val userAuthFactories = ArrayList<NamedFactory<UserAuth>>()
-        userAuthFactories.add(UserAuthNoneFactory())
-        sshd!!.userAuthFactories = userAuthFactories
-
-        val namedFactoryList = ArrayList<NamedFactory<Command>>()
-        namedFactoryList.add(NetconfSubsystemFactory())
-        sshd!!.subsystemFactories = namedFactoryList
-
-        try {
-            sshd!!.start()
-        } catch (e: Exception) {
-            e.printStackTrace()
-        }
-
-    }
-
-    fun stop() {
-        try {
-            sshd!!.stop(true)
-        } catch (e: Exception) {
-            e.printStackTrace()
-        }
-
-    }
-}
\ No newline at end of file
diff --git a/ms/blueprintsprocessor/functions/netconf-executor/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/mocks/NetconfSubsystemFactory.kt b/ms/blueprintsprocessor/functions/netconf-executor/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/mocks/NetconfSubsystemFactory.kt
deleted file mode 100644
index f3e5d38..0000000
--- a/ms/blueprintsprocessor/functions/netconf-executor/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/mocks/NetconfSubsystemFactory.kt
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Copyright © 2017-2018 AT&T Intellectual Property.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.mocks
-
-
-import java.io.IOException
-import java.io.InputStream
-import java.io.OutputStream
-import org.apache.sshd.common.NamedFactory;
-import org.apache.sshd.server.command.Command;
-import org.apache.sshd.server.Environment;
-import org.apache.sshd.server.ExitCallback;
-
-
-class NetconfSubsystemFactory : NamedFactory<Command> {
-
-    private val END_CHAR_SEQUENCE = "]]>]]>"
-
-    override fun create(): Command {
-        return NetconfSubsystem()
-    }
-
-    override fun getName(): String {
-        return "netconf"
-    }
-
-    /**
-     * Simple implementation of netconf reading 1 request, sending a 'hello' response and quitting
-     */
-    inner class NetconfSubsystem : Command {
-        private var input: InputStream? = null
-        private var out: OutputStream? = null
-        private var clientThread: Thread? = null
-        private var r: Int = 0
-
-        @Throws(IOException::class)
-        override fun start(env: Environment) {
-            clientThread = Thread(object : Runnable {
-
-                override fun run() {
-                    try {
-                        val message = StringBuilder()
-                        while (true) {
-                            process(createHelloString())
-                            r = input!!.read()
-                            if (r == -1) {
-                                break
-                            } else {
-                                val c = r.toChar()
-                                message.append(c)
-                                val messageString = message.toString()
-                                if (messageString.endsWith(END_CHAR_SEQUENCE)) {
-                                    println("Detected end message:\n$messageString")
-                                    process(createHelloString())
-                                    message.setLength(0)
-                                    break
-                                }
-                            }
-                        }
-                    } catch (e: IOException) {
-                        e.printStackTrace()
-                    }
-
-                }
-
-                @Throws(IOException::class)
-                private fun process(xmlMessage: String) {
-                    println("Sending message:\n$xmlMessage")
-                    out!!.write(xmlMessage.toByteArray(charset("UTF-8")))
-                    out!!.write((END_CHAR_SEQUENCE + "\n").toByteArray(charset("UTF-8")))
-                    out!!.flush()
-                }
-
-                private fun createHelloString(): String {
-                    val sessionId = "" + (Math.random() * Integer.MAX_VALUE).toInt()
-                    return ("<hello xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n"
-                            + "<capabilities>\n<capability>urn:ietf:params:netconf:base:1.0</capability>\n"
-                            + "<capability>urn:ietf:params:netconf:base:1.1</capability>\n</capabilities>\n"
-                            + "<session-id>" + sessionId + "</session-id>\n</hello>")
-                }
-            })
-
-            clientThread!!.start()
-        }
-
-        @Throws(Exception::class)
-        override fun destroy() {
-            try {
-                clientThread!!.join(2000)
-            } catch (e: InterruptedException) {
-                // log.warn("Error joining Client thread" + e.getMessage());
-            }
-
-            clientThread!!.interrupt()
-        }
-
-        override fun setInputStream(input: InputStream) {
-            this.input = input
-        }
-
-        override fun setOutputStream(out: OutputStream) {
-            this.out = out
-        }
-
-        override fun setErrorStream(err: OutputStream) {}
-
-        override fun setExitCallback(callback: ExitCallback) {}
-
-
-
-    }
-}
\ No newline at end of file