Merge "netconf-executor: NetconfSessionImplTest"
diff --git a/ms/blueprintsprocessor/functions/netconf-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfDeviceCommunicator.kt b/ms/blueprintsprocessor/functions/netconf-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfDeviceCommunicator.kt
index 12e3b83..6ef4f41 100644
--- a/ms/blueprintsprocessor/functions/netconf-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfDeviceCommunicator.kt
+++ b/ms/blueprintsprocessor/functions/netconf-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfDeviceCommunicator.kt
@@ -30,6 +30,7 @@
import java.io.OutputStreamWriter
import java.nio.charset.StandardCharsets
import java.util.concurrent.CompletableFuture
+import java.util.concurrent.TimeUnit
class NetconfDeviceCommunicator(private var inputStream: InputStream,
private var out: OutputStream,
@@ -232,4 +233,21 @@
NetconfMessageUtils.getMsgId(deviceReply),
deviceInfo))
}
+
+ /**
+ * Gets the value of the {@link CompletableFuture} from {@link NetconfDeviceCommunicator#sendMessage}
+ * This function is used by NetconfSessionImpl. Needed to wrap exception testing in NetconfSessionImpl.
+ * @param fut {@link CompletableFuture} object
+ * @param timeout the maximum time to wait
+ * @param timeUnit the time unit of the timeout argument
+ * @return the result value
+ * @throws CancellationException if this future was cancelled
+ * @throws ExecutionException if this future completed exceptionally
+ * @throws InterruptedException if the current thread was interrupted while waiting
+ * @throws TimeoutException if the wait timed outStream
+ */
+ internal fun getFutureFromSendMessage(
+ fut: CompletableFuture<String>, timeout: Long, timeUnit: TimeUnit): String {
+ return fut.get(timeout, timeUnit)
+ }
}
diff --git a/ms/blueprintsprocessor/functions/netconf-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfSessionImpl.kt b/ms/blueprintsprocessor/functions/netconf-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfSessionImpl.kt
index 12eb43f..7e56e3e 100644
--- a/ms/blueprintsprocessor/functions/netconf-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfSessionImpl.kt
+++ b/ms/blueprintsprocessor/functions/netconf-executor/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfSessionImpl.kt
@@ -34,22 +34,21 @@
import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.RpcStatus
import org.slf4j.LoggerFactory
import java.io.IOException
-import java.util.*
+import java.util.Collections
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ExecutionException
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
-import java.util.concurrent.atomic.AtomicReference
class NetconfSessionImpl(private val deviceInfo: DeviceInfo, private val rpcService: NetconfRpcService) :
NetconfSession {
private val log = LoggerFactory.getLogger(NetconfSessionImpl::class.java)
- private val errorReplies: MutableList<String> = Collections.synchronizedList(listOf())
+ private val errorReplies: MutableList<String> = Collections.synchronizedList(mutableListOf())
private val replies: MutableMap<String, CompletableFuture<String>> = ConcurrentHashMap()
- private val deviceCapabilities = setOf<String>()
+ private val deviceCapabilities = mutableSetOf<String>()
private var connectionTimeout: Long = 0
private var replyTimeout: Int = 0
@@ -117,11 +116,8 @@
} catch (ioe: IOException) {
log.warn("$deviceInfo: Error closing session($sessionId) for host($deviceInfo)", ioe)
}
-
-// NetconfReceivedEvent(NetconfReceivedEvent.Type.SESSION_CLOSED, "",
-// "Closed due to unexpected error " + e.cause, "-1", deviceInfo)
- errorReplies.clear() // move to cleanUp()?
- replies.clear()
+ clearErrorReplies()
+ clearReplies()
throw NetconfException("$deviceInfo: Closing session $sessionId for request $formattedRequest", e)
}
@@ -144,27 +140,26 @@
try {
if (client.isClosed) {
log.info("Trying to restart the whole SSH connection with {}", deviceInfo)
- replies.clear()
+ clearReplies()
startConnection()
} else if (session.isClosed) {
log.info("Trying to restart the session with {}", deviceInfo)
- replies.clear()
+ clearReplies()
startSession()
} else if (channel.isClosed) {
log.info("Trying to reopen the channel with {}", deviceInfo)
- replies.clear()
+ clearReplies()
openChannel()
} else {
return
}
} catch (e: IOException) {
- log.error("Can't reopen connection for device {}", e.message)
+ log.error("Can't reopen connection for device {} error: {}", deviceInfo, e.message)
throw NetconfException(String.format("Cannot re-open the connection with device (%s)", deviceInfo), e)
} catch (e: IllegalStateException) {
- log.error("Can't reopen connection for device {}", e.message)
+ log.error("Can't reopen connection for device {} error: {}", deviceInfo, e.message)
throw NetconfException(String.format("Cannot re-open the connection with device (%s)", deviceInfo), e)
}
-
}
override fun getDeviceInfo(): DeviceInfo {
@@ -191,8 +186,13 @@
}
- private fun startClient() {
+ //Needed to unit test connect method interacting with client.start in startClient() below
+ private fun setupNewSSHClient() {
client = SshClient.setUpDefaultClient()
+ }
+
+ private fun startClient() {
+ setupNewSSHClient()
client.properties.putIfAbsent(FactoryManager.IDLE_TIMEOUT, TimeUnit.SECONDS.toMillis(idleTimeout.toLong()))
client.properties.putIfAbsent(FactoryManager.NIO2_READ_TIMEOUT, TimeUnit.SECONDS.toMillis(idleTimeout + 15L))
@@ -304,4 +304,7 @@
internal fun clearErrorReplies() = errorReplies.clear()
internal fun clearReplies() = replies.clear()
+ internal fun setClient(client: SshClient) { this.client = client }
+ internal fun setSession(session: ClientSession) { this.session = session }
+ internal fun setChannel(channel: ClientChannel) { this.channel = channel }
}
\ No newline at end of file
diff --git a/ms/blueprintsprocessor/functions/netconf-executor/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/NetconfSessionImplTest.kt b/ms/blueprintsprocessor/functions/netconf-executor/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/NetconfSessionImplTest.kt
deleted file mode 100644
index b462ad0..0000000
--- a/ms/blueprintsprocessor/functions/netconf-executor/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/NetconfSessionImplTest.kt
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Copyright © 2017-2018 AT&T Intellectual Property.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor
-
-import org.apache.sshd.client.channel.ChannelSubsystem
-import org.apache.sshd.client.session.ClientSessionImpl
-import org.junit.After
-import org.junit.Assert
-import org.junit.Before
-import org.junit.Test
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.DeviceInfo
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.core.NetconfRpcServiceImpl
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.core.NetconfSessionImpl
-import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.mocks.NetconfDeviceSimulator
-import java.util.concurrent.atomic.AtomicReference
-import kotlin.script.experimental.api.asSuccess
-
-class NetconfSessionImplTest {
-
- private var device: NetconfDeviceSimulator? = null
- private var deviceInfo: DeviceInfo? = null
-
- @Before
- fun before() {
- deviceInfo = DeviceInfo().apply {
- username = "username"
- password = "password"
- ipAddress = "localhost"
- port = 2224
- connectTimeout = 10
- }
-
- device = NetconfDeviceSimulator(deviceInfo!!.port)
- device!!.start()
- }
-
- @After
- fun after() {
- device!!.stop()
- }
-
- @Throws(Exception::class)
- fun testNetconfSession() {
- val netconfSession = NetconfSessionImpl(deviceInfo!!, NetconfRpcServiceImpl(DeviceInfo()))
-
- Assert.assertNotNull(netconfSession.getSessionId())
- Assert.assertEquals("localhost:2224", netconfSession.getDeviceInfo().toString())
-
- netconfSession.checkAndReestablish()
-
- Assert.assertNotNull(netconfSession.getSessionId())
- Assert.assertEquals("localhost:2224", netconfSession.getDeviceInfo().toString())
-
- Assert.assertTrue(!netconfSession.getDeviceCapabilitiesSet().isEmpty())
- }
-
- @Test
- fun testNetconfSessionconnect() {
- val netconfSession = NetconfSessionImpl(deviceInfo!!, NetconfRpcServiceImpl(deviceInfo!!))
- netconfSession.connect()
- Assert.assertTrue(netconfSession.sessionstatus("Open"))
- }
-
- @Test
- fun testNetconfSessionreconnect() {
- val netconfSession = NetconfSessionImpl(deviceInfo!!, NetconfRpcServiceImpl(deviceInfo!!))
- netconfSession.connect()
- netconfSession.reconnect()
- Assert.assertTrue(netconfSession.sessionstatus("Open"))
-
- }
- @Test
- fun testNetconfSessiondisconnect() {
- val netconfSession = NetconfSessionImpl(deviceInfo!!, NetconfRpcServiceImpl(deviceInfo!!))
- netconfSession.connect()
- netconfSession.disconnect()
- Assert.assertTrue(netconfSession.sessionstatus("Close"))
-
- }
- @Test
- fun testNetconfSessioncheckAndReestablish() {
- val netconfSession = NetconfSessionImpl(deviceInfo!!, NetconfRpcServiceImpl(deviceInfo!!))
- netconfSession.connect()
- netconfSession.checkAndReestablish()
- Assert.assertTrue(netconfSession.sessionstatus("Open"))
-
-
- }
- @Test
- fun testNetconfSessionconnecgetDeviceInfo() {
- val netconfSession = NetconfSessionImpl(deviceInfo!!, NetconfRpcServiceImpl(deviceInfo!!))
- netconfSession.connect()
- Assert.assertNotNull(netconfSession.getDeviceInfo())
- Assert.assertFalse(!netconfSession.getDeviceCapabilitiesSet().isEmpty())
- }
-
-
-}
diff --git a/ms/blueprintsprocessor/functions/netconf-executor/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfSessionImplTest.kt b/ms/blueprintsprocessor/functions/netconf-executor/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfSessionImplTest.kt
new file mode 100644
index 0000000..1f526f4
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/netconf-executor/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/core/NetconfSessionImplTest.kt
@@ -0,0 +1,543 @@
+/*
+ * Copyright © 2019 Bell Canada
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.core
+
+import io.mockk.CapturingSlot
+import io.mockk.Runs
+import io.mockk.every
+import io.mockk.just
+import io.mockk.mockk
+import io.mockk.spyk
+import io.mockk.verify
+import org.apache.sshd.client.SshClient
+import org.apache.sshd.client.channel.ChannelSubsystem
+import org.apache.sshd.client.channel.ClientChannel
+import org.apache.sshd.client.future.DefaultAuthFuture
+import org.apache.sshd.client.future.DefaultConnectFuture
+import org.apache.sshd.client.future.DefaultOpenFuture
+import org.apache.sshd.client.session.ClientSession
+import org.apache.sshd.common.FactoryManager
+import org.junit.Before
+import org.junit.Ignore
+import org.junit.Test
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.DeviceInfo
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.DeviceResponse
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfException
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.api.NetconfRpcService
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.NetconfMessageUtils
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.utils.RpcStatus
+import java.io.ByteArrayInputStream
+import java.io.ByteArrayOutputStream
+import java.io.IOException
+import java.io.InputStream
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.CompletableFuture
+import java.util.concurrent.ExecutionException
+import java.util.concurrent.TimeoutException
+import kotlin.test.assertEquals
+import kotlin.test.assertFailsWith
+import kotlin.test.assertTrue
+
+class NetconfSessionImplTest {
+ companion object {
+ val SUCCESSFUL_DEVICE_RESPONSE = DeviceResponse().apply {
+ status = RpcStatus.SUCCESS
+ errorMessage = ""
+ responseMessage = ""
+ requestMessage = ""
+ }
+ val FAILED_DEVICE_RESPONSE = DeviceResponse().apply {
+ status = RpcStatus.FAILURE
+ errorMessage = ""
+ responseMessage = ""
+ requestMessage = ""
+ }
+ val deviceInfo: DeviceInfo = DeviceInfo().apply {
+ username = "username"
+ password = "password"
+ ipAddress = "localhost"
+ port = 2224
+ connectTimeout = 10
+ }
+ private const val someString = "Some string"
+ }
+
+ private lateinit var netconfSession: NetconfSessionImpl
+ private lateinit var netconfCommunicator: NetconfDeviceCommunicator
+ private lateinit var rpcService: NetconfRpcService
+ private lateinit var mockSshClient: SshClient
+ private lateinit var mockClientSession: ClientSession
+ private lateinit var mockClientChannel: ClientChannel
+ private lateinit var mockSubsystem: ChannelSubsystem
+
+ private val futureMsg = "blahblahblah"
+ private val request = "0"
+ private val sessionId = "0"
+ private val messageId = "asdfasdfadf"
+ private val deviceCapabilities = setOf("capability1", "capability2")
+ private val formattedRequest = NetconfMessageUtils.formatRPCRequest(request, messageId, deviceCapabilities)
+ private lateinit var sampleInputStream: InputStream
+ private lateinit var sampleOutputStream: ByteArrayOutputStream
+
+ @Before
+ fun setup() {
+ netconfCommunicator = mockk()
+ rpcService = mockk()
+ netconfSession = NetconfSessionImpl(deviceInfo, rpcService)
+ netconfSession.setStreamHandler(netconfCommunicator)
+ mockSshClient = mockk()
+ mockClientSession = mockk()
+ mockClientChannel = mockk()
+ mockSubsystem = mockk()
+ sampleInputStream = ByteArrayInputStream(someString.toByteArray(StandardCharsets.UTF_8))
+ sampleOutputStream = ByteArrayOutputStream()
+ }
+
+ @Test
+ fun `connect calls appropriate methods`() {
+ val session = spyk(netconfSession, recordPrivateCalls = true)
+ every { session["startClient"]() as Unit } just Runs
+ session.connect()
+ verify { session["startClient"]() }
+ }
+
+ //look for NetconfException being thrown when cannot connect
+ @Test
+ fun `connect throws NetconfException on error`() {
+ val errMsg = "$deviceInfo: Failed to establish SSH session"
+ assertFailsWith(exceptionClass = NetconfException::class, message = errMsg) {
+ val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
+ every { netconfSessionSpy["startClient"]() as Unit } throws NetconfException(errMsg)
+ netconfSessionSpy.connect()
+ }
+ }
+
+ @Test
+ fun `disconnect without force option for rpcService succeeds`() {
+ //rpcService.closeSession succeeds with status not RpcStatus.FAILURE
+ every { rpcService.closeSession(false) } returns SUCCESSFUL_DEVICE_RESPONSE
+ every { mockClientSession.close() } just Runs
+ every { mockSshClient.close() } just Runs
+ every { mockClientChannel.close() } just Runs
+ val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
+ netconfSessionSpy.setSession(mockClientSession)
+ netconfSessionSpy.setClient(mockSshClient)
+ netconfSessionSpy.setChannel(mockClientChannel)
+ //RUN
+ netconfSessionSpy.disconnect()
+ //make sure that rpcService.close session is not called again.
+ verify(exactly = 0) { rpcService.closeSession(true) }
+ verify { mockClientSession.close() }
+ verify { mockSshClient.close() }
+ verify { mockClientChannel.close() }
+ }
+
+ @Test
+ fun `disconnect with force option for rpcService succeeds`() {
+ //rpcService.closeSession succeeds with status not RpcStatus.FAILURE
+ val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
+ every { rpcService.closeSession(any()) } returns
+ FAILED_DEVICE_RESPONSE andThen SUCCESSFUL_DEVICE_RESPONSE
+ every { mockClientSession.close() } just Runs
+ every { mockSshClient.close() } just Runs
+ every { mockClientChannel.close() } just Runs
+ netconfSessionSpy.setSession(mockClientSession)
+ netconfSessionSpy.setClient(mockSshClient)
+ netconfSessionSpy.setChannel(mockClientChannel)
+ //RUN
+ netconfSessionSpy.disconnect()
+ //VERIFY
+ verify(exactly = 2) { rpcService.closeSession(any()) }
+ verify { mockClientSession.close() }
+ verify { mockSshClient.close() }
+ verify { mockClientChannel.close() }
+
+ }
+
+ @Ignore //TODO undo close method removal
+ @Test
+ fun `disconnect wraps exception from ssh closing error`() {
+ val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
+ every { netconfSessionSpy["close"]() as Unit } throws IOException("Some IOException occurred!")
+ every { rpcService.closeSession(false) } returns SUCCESSFUL_DEVICE_RESPONSE
+ every { netconfSessionSpy.checkAndReestablish() } just Runs
+ netconfSessionSpy.disconnect()
+ verify { netconfSessionSpy["close"]() }
+ }
+
+ @Test
+ fun `reconnect calls disconnect and connect`() {
+ val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
+ every { netconfSessionSpy.disconnect() } just Runs
+ every { netconfSessionSpy.connect() } just Runs
+ netconfSessionSpy.reconnect()
+ verify { netconfSessionSpy.disconnect() }
+ verify { netconfSessionSpy.connect() }
+ }
+
+ @Test
+ fun `checkAndReestablish restarts connection and clears replies on sshClient disconnection`() {
+ val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
+ every { mockSshClient.isClosed } returns true
+ netconfSessionSpy.setClient(mockSshClient)
+ every { netconfSessionSpy["startConnection"]() as Unit } just Runs
+ //Call method
+ netconfSessionSpy.checkAndReestablish()
+ //Verify
+ verify { netconfSessionSpy.clearReplies() }
+ verify { netconfSessionSpy["startConnection"]() }
+ }
+
+ @Test
+ fun `checkAndReestablish restarts session and clears replies on clientSession closing`() {
+ val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
+ every { mockClientSession.isClosed } returns true
+ every { mockSshClient.isClosed } returns false
+ every { netconfSessionSpy["startSession"]() as Unit } just Runs
+ netconfSessionSpy.setClient(mockSshClient)
+ netconfSessionSpy.setSession(mockClientSession)
+ //Call method
+ netconfSessionSpy.checkAndReestablish()
+ //Verify
+ verify { netconfSessionSpy.clearReplies() }
+ verify { netconfSessionSpy["startSession"]() }
+ }
+
+ @Test
+ fun `checkAndReestablish reopens channel and clears replies on channel closing`() {
+ val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
+ every { mockClientSession.isClosed } returns false
+ every { mockSshClient.isClosed } returns false
+ every { mockClientChannel.isClosed } returns true
+ every { netconfSessionSpy["openChannel"]() as Unit } just Runs
+ netconfSessionSpy.setClient(mockSshClient)
+ netconfSessionSpy.setSession(mockClientSession)
+ netconfSessionSpy.setChannel(mockClientChannel)
+ //Call method
+ netconfSessionSpy.checkAndReestablish()
+ //Verify
+ verify { netconfSessionSpy.clearReplies() }
+ verify { netconfSessionSpy["openChannel"]() }
+ }
+
+
+ @Test
+ fun `syncRpc runs normally`() {
+ val netconfSessionSpy = spyk(netconfSession)
+ val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
+
+ //test the case where SSH connection did not need to be re-established.
+ //put an existing item into the replies
+ netconfSessionSpy.getReplies()["somekey"] = CompletableFuture.completedFuture("${futureMsg}2")
+ every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
+ every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } returns futureRet.get()
+ every { netconfSessionSpy.checkAndReestablish() } just Runs
+ //call the method
+ assertEquals(futureMsg, netconfSessionSpy.syncRpc("0", "0"))
+ //make sure the replies didn't change
+ assertTrue {
+ netconfSessionSpy.getReplies().size == 1 &&
+ netconfSessionSpy.getReplies().containsKey("somekey")
+ }
+ verify(exactly = 0) { netconfSessionSpy.clearReplies() }
+ }
+
+
+ @Test
+ fun `syncRpc still succeeds and replies are cleared on client disconnect`() {
+ val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
+ val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
+
+ //put an item into the replies
+ netconfSessionSpy.getReplies()["somekey"] = CompletableFuture.completedFuture("${futureMsg}2")
+
+ //tests the case where SSH session needs to be re-established.
+ every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
+ every { netconfSessionSpy["startClient"]() as Unit } just Runs
+ every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } returns futureRet.get()
+ every { mockSshClient.isClosed } returns true
+ netconfSessionSpy.setClient(mockSshClient)
+
+ //call the method
+ assertEquals(futureMsg, netconfSessionSpy.syncRpc("0", "0"))
+ //make sure the replies got cleared out
+ assertTrue { netconfSessionSpy.getReplies().isEmpty() }
+ verify(exactly = 1) { netconfSessionSpy.clearReplies() }
+ }
+
+ @Ignore //TODO
+ //Test for handling CompletableFuture.get returns InterruptedException inside NetconfDeviceCommunicator
+ @Test
+ fun `syncRpc throws NetconfException if InterruptedException is caught`() {
+ val expectedExceptionMsg = "$deviceInfo: Interrupted while waiting for reply for request: $formattedRequest"
+ assertFailsWith(exceptionClass = NetconfException::class, message = expectedExceptionMsg) {
+ val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
+ val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
+ every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
+ every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } throws InterruptedException("interrupted")
+ every { netconfSessionSpy.checkAndReestablish() } just Runs
+ //call the method
+ netconfSessionSpy.syncRpc("0", "0")
+ }
+ }
+
+ @Ignore //TODO revert back on getFutureFromSendMessage
+ @Test
+ fun `syncRpc throws NetconfException if TimeoutException is caught`() {
+ val expectedExceptionMsg = "$deviceInfo: Timed out while waiting for reply for request $formattedRequest after ${deviceInfo.replyTimeout} sec."
+ assertFailsWith(exceptionClass = NetconfException::class, message = expectedExceptionMsg) {
+ val netconfSessionSpy = spyk(netconfSession)
+ val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
+ every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
+ every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } throws TimeoutException("timed out")
+ every { netconfSessionSpy.checkAndReestablish() } just Runs
+ //call the method
+ netconfSessionSpy.syncRpc("0", "0")
+ }
+ }
+
+ @Ignore
+ @Test
+ fun `syncRpc throws NetconfException if ExecutionException is caught`() {
+ val expectedExceptionMsg = "$deviceInfo: Closing session $sessionId for request $formattedRequest"
+ assertFailsWith(exceptionClass = NetconfException::class, message = expectedExceptionMsg) {
+ val netconfSessionSpy = spyk(netconfSession)
+ val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
+ every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
+ every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } throws
+ ExecutionException("exec exception", Exception("nested exception")) //TODO revert getFutureFromSendMessage back
+ every { netconfSessionSpy.checkAndReestablish() } just Runs
+ //call the method
+ netconfSessionSpy.syncRpc("0", "0")
+ }
+ }
+
+ @Ignore //TODO revert back on getFutureFromSendMessage
+ @Test
+ fun `syncRpc throws NetconfException if caught ExecutionException and failed to close SSH session`() {
+ val expectedExceptionMsg = "$deviceInfo: Closing session $sessionId for request $formattedRequest"
+ assertFailsWith(exceptionClass = NetconfException::class, message = expectedExceptionMsg) {
+ val netconfSessionSpy = spyk(netconfSession)
+ val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
+ every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
+ every { netconfCommunicator.getFutureFromSendMessage(any(), any(), any()) } throws
+ ExecutionException("exec exception", Exception("nested exception"))
+ every { netconfSessionSpy["close"]() as Unit } throws IOException("got an IO exception")
+ every { netconfSessionSpy.checkAndReestablish() } just Runs
+ //call the method
+ netconfSessionSpy.syncRpc("0", "0")
+ //make sure replies are cleared...
+ verify(exactly = 1) { netconfSessionSpy.clearReplies() }
+ verify(exactly = 1) { netconfSessionSpy.clearErrorReplies() }
+ }
+ }
+
+ @Test
+ fun `asyncRpc runs normally`() {
+ val netconfSessionSpy = spyk(netconfSession)
+ every { netconfSessionSpy.checkAndReestablish() } just Runs
+ val futureRet: CompletableFuture<String> = CompletableFuture.completedFuture(futureMsg)
+ every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
+ //run the method
+ val rpcResultFuture = netconfSessionSpy.asyncRpc("0", "0")
+ every { netconfSessionSpy.checkAndReestablish() } just Runs
+ //make sure the future gets resolved
+ assertTrue { rpcResultFuture.get() == futureMsg }
+ //make sure that clearReplies wasn't called (reestablishConnection check)
+ verify(exactly = 0) { netconfSessionSpy.clearReplies() }
+ }
+
+ @Test
+ @Ignore
+ //TODO: get 't' inside asyncRpc to be a Throwable
+ fun `asyncRpc wraps exception`() {
+ assertFailsWith(exceptionClass = NetconfException::class, message = futureMsg) {
+ val netconfSessionSpy = spyk(netconfSession)
+ val futureRet: CompletableFuture<String> = CompletableFuture.supplyAsync {
+ throw Exception("blah")
+ }
+ futureRet.completeExceptionally(IOException("something is wrong"))
+ every { netconfCommunicator.sendMessage(any(), any()) } returns futureRet
+ //RUN
+ val rpcResultFuture = netconfSessionSpy.asyncRpc("0", "0")
+ }
+ }
+
+ @Test
+ fun `connect starts underlying client`() {
+ val propertiesMap = hashMapOf<String, Any>()
+ every { mockSshClient.start() } just Runs
+ every { mockSshClient.properties } returns propertiesMap
+ val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
+ every { netconfSessionSpy["setupNewSSHClient"]() as Unit } just Runs
+ every { netconfSessionSpy["startSession"]() as Unit } just Runs
+ netconfSessionSpy.setClient(mockSshClient)
+ netconfSessionSpy.connect()
+ verify { mockSshClient.start() }
+ assertTrue { propertiesMap.containsKey(FactoryManager.IDLE_TIMEOUT) }
+ assertTrue { propertiesMap.containsKey(FactoryManager.NIO2_READ_TIMEOUT) }
+ }
+
+ @Test
+ fun `startSession tries to connect to user supplied device`() {
+ every { mockSshClient.start() } just Runs
+ every { mockSshClient.properties } returns hashMapOf<String, Any>()
+ //setup slots to capture values from the invocations
+ val userSlot = CapturingSlot<String>()
+ val ipSlot = CapturingSlot<String>()
+ val portSlot = CapturingSlot<Int>()
+ //create a future that succeeded
+ val succeededFuture = DefaultConnectFuture(Any(), Any())
+ succeededFuture.value = mockClientSession
+ every { mockSshClient.connect(capture(userSlot), capture(ipSlot), capture(portSlot)) } returns succeededFuture
+ val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
+ every { netconfSessionSpy["authSession"]() as Unit } just Runs
+ every { netconfSessionSpy["setupNewSSHClient"]() as Unit } just Runs
+ netconfSessionSpy.setClient(mockSshClient)
+ //RUN
+ netconfSessionSpy.connect()
+ //Verify
+ verify { mockSshClient.connect(deviceInfo.username, deviceInfo.ipAddress, deviceInfo.port) }
+ assertEquals(deviceInfo.username, userSlot.captured)
+ assertEquals(deviceInfo.ipAddress, ipSlot.captured)
+ assertEquals(deviceInfo.port, portSlot.captured)
+ verify { netconfSessionSpy["authSession"]() }
+ }
+
+ @Test
+ fun `authSession throws exception if ClientSession is not AUTHED`() {
+ assertFailsWith(exceptionClass = NetconfException::class) {
+ //after client session connects,
+ every { mockSshClient.start() } just Runs
+ every { mockSshClient.properties } returns hashMapOf<String, Any>()
+ val succeededAuthFuture = DefaultAuthFuture(Any(), Any())
+ succeededAuthFuture.value = true //AuthFuture's value is Boolean
+ val passSlot = CapturingSlot<String>()
+ every { mockClientSession.addPasswordIdentity(capture(passSlot)) } just Runs
+ every { mockClientSession.auth() } returns succeededAuthFuture
+ val succeededSessionFuture = DefaultConnectFuture(Any(), Any())
+ succeededSessionFuture.value = mockClientSession
+ every { mockSshClient.connect(deviceInfo.username, deviceInfo.ipAddress, deviceInfo.port) } returns succeededSessionFuture
+ every { mockClientSession.waitFor(any(), any()) } returns
+ setOf(ClientSession.ClientSessionEvent.WAIT_AUTH, ClientSession.ClientSessionEvent.CLOSED)
+ val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
+ every { netconfSessionSpy["setupNewSSHClient"]() as Unit } just Runs
+ netconfSessionSpy.setClient(mockSshClient)
+ //RUN
+ netconfSessionSpy.connect()
+ }
+ }
+
+ //common mock initializer for more weird tests.
+ private fun setupOpenChannelMocks(): Unit {
+ every { mockSshClient.start() } just Runs
+ every { mockSshClient.properties } returns hashMapOf<String, Any>()
+ val succeededAuthFuture = DefaultAuthFuture(Any(), Any())
+ succeededAuthFuture.value = true //AuthFuture's value is Boolean
+ val passSlot = CapturingSlot<String>()
+ every { mockClientSession.addPasswordIdentity(capture(passSlot)) } just Runs
+ every { mockClientSession.auth() } returns succeededAuthFuture
+ val succeededSessionFuture = DefaultConnectFuture(Any(), Any())
+ succeededSessionFuture.value = mockClientSession
+ every { mockSshClient.connect(deviceInfo.username, deviceInfo.ipAddress, deviceInfo.port) } returns succeededSessionFuture
+ every { mockClientSession.waitFor(any(), any()) } returns
+ setOf(ClientSession.ClientSessionEvent.WAIT_AUTH,
+ ClientSession.ClientSessionEvent.CLOSED,
+ ClientSession.ClientSessionEvent.AUTHED)
+
+ every { mockClientSession.createSubsystemChannel(any()) } returns mockSubsystem
+ every { mockClientChannel.invertedOut } returns sampleInputStream
+ every { mockClientChannel.invertedIn } returns sampleOutputStream
+ }
+
+ @Test
+ fun `authSession opensChannel if ClientSession is AUTHED and session can be opened`() {
+ //after client session connects, make sure the client receives authentication
+ setupOpenChannelMocks()
+ val channelFuture = DefaultOpenFuture(Any(), Any())
+ channelFuture.value = true
+ channelFuture.setOpened()
+ val connectFuture = DefaultConnectFuture(Any(), Any())
+ connectFuture.value = mockClientSession
+ connectFuture.session = mockClientSession
+ every { mockSubsystem.open() } returns channelFuture
+ every { mockSshClient.connect(deviceInfo.username, deviceInfo.ipAddress, deviceInfo.port) } returns connectFuture
+
+ val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
+ every { netconfSessionSpy["setupNewSSHClient"]() as Unit } just Runs
+ every { netconfSessionSpy["setupHandler"]() as Unit } just Runs
+ netconfSessionSpy.setClient(mockSshClient)
+ //Run
+ netconfSessionSpy.connect()
+ //Verify
+ verify { mockSubsystem.open() }
+ }
+
+
+ @Test
+ fun `authSession throws NetconfException if ClientSession is AUTHED but channelFuture timed out or not open`() {
+ assertFailsWith(exceptionClass = NetconfException::class) {
+ //after client session connects, make sure the client receives authentication
+ setupOpenChannelMocks()
+ val channelFuture = DefaultOpenFuture(Any(), Any())
+ every { mockSubsystem.open() } returns channelFuture
+ val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
+ every { netconfSessionSpy["setupNewSSHClient"]() as Unit } just Runs
+ every { netconfSessionSpy["setupHandler"]() as Unit } just Runs
+ netconfSessionSpy.setClient(mockSshClient)
+ //Run
+ netconfSessionSpy.connect()
+ //Verify
+ verify { mockSubsystem.open() }
+ }
+ }
+
+
+ @Test
+ fun `disconnect closes session, channel, and client`() {
+ every { rpcService.closeSession(false) } returns SUCCESSFUL_DEVICE_RESPONSE
+ every { mockClientSession.close() } just Runs
+ every { mockClientChannel.close() } just Runs
+ every { mockSshClient.close() } just Runs
+ val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
+ netconfSessionSpy.setChannel(mockClientChannel)
+ netconfSessionSpy.setClient(mockSshClient)
+ netconfSessionSpy.setSession(mockClientSession)
+ //RUN
+ netconfSessionSpy.disconnect()
+ //VERIFY
+ verify { mockClientSession.close() }
+ verify { mockClientChannel.close() }
+ verify { mockSshClient.close() }
+ }
+
+ @Ignore
+ @Test
+ fun `disconnect wraps IOException if channel doesn't close`() { //this test is equivalent to others
+ every { rpcService.closeSession(false) } returns SUCCESSFUL_DEVICE_RESPONSE
+ every { mockClientSession.close() } just Runs
+ every { mockClientChannel.close() } throws IOException("channel doesn't want to close!")
+ val netconfSessionSpy = spyk(netconfSession, recordPrivateCalls = true)
+ netconfSessionSpy.setChannel(mockClientChannel)
+ netconfSessionSpy.setClient(mockSshClient)
+ netconfSessionSpy.setSession(mockClientSession)
+ //RUN
+ netconfSessionSpy.disconnect()
+ //VERIFY
+ verify { mockClientSession.close() }
+ }
+}
diff --git a/ms/blueprintsprocessor/functions/netconf-executor/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/mocks/NetconfDeviceSimulator.kt b/ms/blueprintsprocessor/functions/netconf-executor/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/mocks/NetconfDeviceSimulator.kt
deleted file mode 100644
index 2b7aa76..0000000
--- a/ms/blueprintsprocessor/functions/netconf-executor/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/mocks/NetconfDeviceSimulator.kt
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Copyright © 2017-2018 AT&T Intellectual Property.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.mocks
-
-
-import org.apache.sshd.common.NamedFactory
-import org.apache.sshd.server.command.Command
-import org.apache.sshd.server.SshServer
-import org.apache.sshd.server.auth.UserAuth
-import org.apache.sshd.server.auth.UserAuthNoneFactory
-import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider
-import java.util.*
-
-
-class NetconfDeviceSimulator(private val port: Int) {
-
- private var sshd: SshServer? = null
-
- fun start() {
- sshd = SshServer.setUpDefaultServer()
- sshd!!.port = port
- sshd!!.keyPairProvider = SimpleGeneratorHostKeyProvider()
-
- val userAuthFactories = ArrayList<NamedFactory<UserAuth>>()
- userAuthFactories.add(UserAuthNoneFactory())
- sshd!!.userAuthFactories = userAuthFactories
-
- val namedFactoryList = ArrayList<NamedFactory<Command>>()
- namedFactoryList.add(NetconfSubsystemFactory())
- sshd!!.subsystemFactories = namedFactoryList
-
- try {
- sshd!!.start()
- } catch (e: Exception) {
- e.printStackTrace()
- }
-
- }
-
- fun stop() {
- try {
- sshd!!.stop(true)
- } catch (e: Exception) {
- e.printStackTrace()
- }
-
- }
-}
\ No newline at end of file
diff --git a/ms/blueprintsprocessor/functions/netconf-executor/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/mocks/NetconfSubsystemFactory.kt b/ms/blueprintsprocessor/functions/netconf-executor/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/mocks/NetconfSubsystemFactory.kt
deleted file mode 100644
index f3e5d38..0000000
--- a/ms/blueprintsprocessor/functions/netconf-executor/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/netconf/executor/mocks/NetconfSubsystemFactory.kt
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Copyright © 2017-2018 AT&T Intellectual Property.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onap.ccsdk.cds.blueprintsprocessor.functions.netconf.executor.mocks
-
-
-import java.io.IOException
-import java.io.InputStream
-import java.io.OutputStream
-import org.apache.sshd.common.NamedFactory;
-import org.apache.sshd.server.command.Command;
-import org.apache.sshd.server.Environment;
-import org.apache.sshd.server.ExitCallback;
-
-
-class NetconfSubsystemFactory : NamedFactory<Command> {
-
- private val END_CHAR_SEQUENCE = "]]>]]>"
-
- override fun create(): Command {
- return NetconfSubsystem()
- }
-
- override fun getName(): String {
- return "netconf"
- }
-
- /**
- * Simple implementation of netconf reading 1 request, sending a 'hello' response and quitting
- */
- inner class NetconfSubsystem : Command {
- private var input: InputStream? = null
- private var out: OutputStream? = null
- private var clientThread: Thread? = null
- private var r: Int = 0
-
- @Throws(IOException::class)
- override fun start(env: Environment) {
- clientThread = Thread(object : Runnable {
-
- override fun run() {
- try {
- val message = StringBuilder()
- while (true) {
- process(createHelloString())
- r = input!!.read()
- if (r == -1) {
- break
- } else {
- val c = r.toChar()
- message.append(c)
- val messageString = message.toString()
- if (messageString.endsWith(END_CHAR_SEQUENCE)) {
- println("Detected end message:\n$messageString")
- process(createHelloString())
- message.setLength(0)
- break
- }
- }
- }
- } catch (e: IOException) {
- e.printStackTrace()
- }
-
- }
-
- @Throws(IOException::class)
- private fun process(xmlMessage: String) {
- println("Sending message:\n$xmlMessage")
- out!!.write(xmlMessage.toByteArray(charset("UTF-8")))
- out!!.write((END_CHAR_SEQUENCE + "\n").toByteArray(charset("UTF-8")))
- out!!.flush()
- }
-
- private fun createHelloString(): String {
- val sessionId = "" + (Math.random() * Integer.MAX_VALUE).toInt()
- return ("<hello xmlns=\"urn:ietf:params:xml:ns:netconf:base:1.0\">\n"
- + "<capabilities>\n<capability>urn:ietf:params:netconf:base:1.0</capability>\n"
- + "<capability>urn:ietf:params:netconf:base:1.1</capability>\n</capabilities>\n"
- + "<session-id>" + sessionId + "</session-id>\n</hello>")
- }
- })
-
- clientThread!!.start()
- }
-
- @Throws(Exception::class)
- override fun destroy() {
- try {
- clientThread!!.join(2000)
- } catch (e: InterruptedException) {
- // log.warn("Error joining Client thread" + e.getMessage());
- }
-
- clientThread!!.interrupt()
- }
-
- override fun setInputStream(input: InputStream) {
- this.input = input
- }
-
- override fun setOutputStream(out: OutputStream) {
- this.out = out
- }
-
- override fun setErrorStream(err: OutputStream) {}
-
- override fun setExitCallback(callback: ExitCallback) {}
-
-
-
- }
-}
\ No newline at end of file