Add metrics for active connections count

* Fix and refactor gauges tests in MicrometerMetricsTests
  as they were not executing
* Fix client disconnection handler in NettyTcpServer
* Add metrics gauge and counters required to measure active connections

Change-Id: I5620d398525c6859679cd5a49dc55a9fefd8b592
Signed-off-by: Jakub Dudycz <jakub.dudycz@nokia.com>
Issue-ID: DCAEGEN2-1041
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
index 61d28c2..ac55e55 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
@@ -36,6 +36,8 @@
     fun notifyMessageReceived(msg: WireFrameMessage)
     fun notifyMessageSent(msg: RoutedMessage)
     fun notifyMessageDropped(cause: MessageDropCause)
+    fun notifyClientDisconnected()
+    fun notifyClientConnected()
     fun notifyClientRejected(cause: ClientRejectionCause)
 }
 
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt
index dce933a..2e6bb4d 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt
@@ -20,6 +20,7 @@
 package org.onap.dcae.collectors.veshv.factory
 
 import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
+import org.onap.dcae.collectors.veshv.boundary.Metrics
 import org.onap.dcae.collectors.veshv.boundary.Server
 import org.onap.dcae.collectors.veshv.impl.socket.NettyTcpServer
 import org.onap.dcae.collectors.veshv.model.ServerConfiguration
@@ -30,6 +31,8 @@
  * @since May 2018
  */
 object ServerFactory {
-    fun createNettyTcpServer(serverConfiguration: ServerConfiguration, collectorProvider: CollectorProvider): Server =
-            NettyTcpServer(serverConfiguration, ServerSslContextFactory(), collectorProvider)
+    fun createNettyTcpServer(serverConfiguration: ServerConfiguration,
+                             collectorProvider: CollectorProvider,
+                             metrics: Metrics): Server =
+            NettyTcpServer(serverConfiguration, ServerSslContextFactory(), collectorProvider, metrics)
 }
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
index d8d786b..725622f 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
@@ -22,31 +22,30 @@
 import arrow.core.None
 import arrow.core.Option
 import arrow.core.getOrElse
-import arrow.core.toOption
 import arrow.effects.IO
 import arrow.syntax.collections.firstOption
 import io.netty.handler.ssl.SslHandler
 import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
+import org.onap.dcae.collectors.veshv.boundary.Metrics
 import org.onap.dcae.collectors.veshv.boundary.Server
-import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info
 import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug
+import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info
 import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withWarn
-import org.onap.dcae.collectors.veshv.utils.logging.Marker
+import org.onap.dcae.collectors.veshv.model.ClientContext
 import org.onap.dcae.collectors.veshv.model.ServerConfiguration
 import org.onap.dcae.collectors.veshv.ssl.boundary.ServerSslContextFactory
 import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
 import org.onap.dcae.collectors.veshv.utils.ServerHandle
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.logging.Marker
 import reactor.core.publisher.Mono
 import reactor.netty.ByteBufFlux
 import reactor.netty.Connection
 import reactor.netty.NettyInbound
 import reactor.netty.NettyOutbound
 import reactor.netty.tcp.TcpServer
-import java.time.Duration
-import java.lang.Exception
 import java.security.cert.X509Certificate
+import java.time.Duration
 import javax.net.ssl.SSLSession
 
 
@@ -56,15 +55,15 @@
  */
 internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
                               private val sslContextFactory: ServerSslContextFactory,
-                              private val collectorProvider: CollectorProvider) : Server {
+                              private val collectorProvider: CollectorProvider,
+                              private val metrics: Metrics) : Server {
 
     override fun start(): IO<ServerHandle> = IO {
-        val tcpServer = TcpServer.create()
+        TcpServer.create()
                 .addressSupplier { serverConfig.serverListenAddress }
                 .configureSsl()
                 .handle(this::handleConnection)
-
-        NettyServerHandle(tcpServer.bindNow())
+                .let { NettyServerHandle(it.bindNow()) }
     }
 
     private fun TcpServer.configureSsl() =
@@ -79,13 +78,13 @@
                     }
 
     private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> {
+        metrics.notifyClientConnected()
         val clientContext = ClientContext(nettyOutbound.alloc())
         nettyInbound.withConnection {
             populateClientContext(clientContext, it)
             it.channel().pipeline().get(SslHandler::class.java)?.engine()?.session?.let { sslSession ->
                 sslSession.peerCertificates.firstOption().map { it as X509Certificate }.map { it.subjectDN.name }
             }
-
         }
 
         logger.debug(clientContext::fullMdc, Marker.Entry) { "Client connection request received" }
@@ -97,7 +96,8 @@
                 {
                     logger.info(clientContext::fullMdc) { "Handling new connection" }
                     nettyInbound.withConnection { conn ->
-                        conn.configureIdleTimeout(clientContext, serverConfig.idleTimeout)
+                        conn
+                                .configureIdleTimeout(clientContext, serverConfig.idleTimeout)
                                 .logConnectionClosed(clientContext)
                     }
                     it.handleConnection(createDataStream(nettyInbound))
@@ -132,15 +132,14 @@
             .receive()
             .retain()
 
-    private fun Connection.configureIdleTimeout(ctx: ClientContext, timeout: Duration): Connection {
-        onReadIdle(timeout.toMillis()) {
-            logger.info(ctx) {
-                "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${address()}..."
+    private fun Connection.configureIdleTimeout(ctx: ClientContext, timeout: Duration): Connection =
+            onReadIdle(timeout.toMillis()) {
+                logger.info(ctx) {
+                    "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${address()}..."
+                }
+                disconnectClient(ctx)
             }
-            disconnectClient(ctx)
-        }
-        return this
-    }
+
 
     private fun Connection.disconnectClient(ctx: ClientContext) {
         channel().close().addListener {
@@ -152,13 +151,11 @@
         }
     }
 
-    private fun Connection.logConnectionClosed(ctx: ClientContext): Connection {
-        onTerminate().subscribe {
-            // TODO: this code is never executed (at least with ssl-enabled, did not checked with ssl-disabled)
-            logger.info(ctx::fullMdc, Marker.Exit) { "Connection has been closed" }
-        }
-        return this
-    }
+    private fun Connection.logConnectionClosed(ctx: ClientContext): Connection =
+            onDispose {
+                metrics.notifyClientDisconnected()
+                logger.info(ctx::fullMdc, Marker.Exit) { "Connection has been closed" }
+            }
 
     companion object {
         private val logger = Logger(NettyTcpServer::class)
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
index 572cc79..f457aea 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
@@ -175,7 +175,6 @@
     }
 
     describe("clients rejected metrics") {
-
         given("rejection causes") {
             mapOf(
                     ClientRejectionCause.PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE to
@@ -192,7 +191,7 @@
                         assertThat(metrics.clientRejectionCause.size)
                                 .describedAs("metrics were notified with only one rejection cause")
                                 .isOne()
-                        assertThat(metrics.clientRejectionCause.get(cause))
+                        assertThat(metrics.clientRejectionCause[cause])
                                 .describedAs("metrics were notified only once with correct client rejection cause")
                                 .isOne()
                     }
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt
index a27d167..8573d86 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt
@@ -21,12 +21,11 @@
 
 import org.onap.dcae.collectors.veshv.boundary.Metrics
 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
+import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
 import org.onap.dcae.collectors.veshv.model.MessageDropCause
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
 import java.time.Duration
 import java.time.Instant
-import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
-import java.util.concurrent.ConcurrentHashMap
 import kotlin.test.fail
 
 /**
@@ -35,14 +34,15 @@
  */
 class FakeMetrics : Metrics {
 
-    var bytesReceived: Int = 0 ; private set
-    var messageBytesReceived: Int = 0 ; private set
-    var messagesDroppedCount: Int = 0 ; private set
-    var lastProcessingTimeMicros: Double = -1.0 ; private set
-    private val messagesDroppedCause: MutableMap<MessageDropCause, Int> = ConcurrentHashMap()
-    var messagesSentCount: Int = 0 ; private set
-    val messagesSentToTopic: MutableMap<String, Int> = ConcurrentHashMap()
-    var clientRejectionCause = mutableMapOf<ClientRejectionCause, Int>() ; private set
+    var bytesReceived: Int = 0; private set
+    var messageBytesReceived: Int = 0; private set
+    var messagesDroppedCount: Int = 0; private set
+    var lastProcessingTimeMicros: Double = -1.0; private set
+    var messagesSentCount: Int = 0; private set
+    var clientRejectionCause = mutableMapOf<ClientRejectionCause, Int>(); private set
+
+    private val messagesSentToTopic = mutableMapOf<String, Int>()
+    private val messagesDroppedCause = mutableMapOf<MessageDropCause, Int>()
 
     override fun notifyBytesReceived(size: Int) {
         bytesReceived += size
@@ -69,6 +69,12 @@
         clientRejectionCause.compute(cause) { k, _ -> clientRejectionCause[k]?.inc() ?: 1 }
     }
 
+    override fun notifyClientDisconnected() {
+    }
+
+    override fun notifyClientConnected() {
+    }
+
     fun messagesOnTopic(topic: String) =
             messagesSentToTopic[topic] ?: fail("No messages were sent to topic $topic")
 
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
index d35e17d..288145a 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
@@ -50,10 +50,13 @@
     private val receivedMsgCount = registry.counter(name(MESSAGES, RECEIVED, COUNT))
     private val receivedMsgBytes = registry.counter(name(MESSAGES, RECEIVED, BYTES))
 
+    private val connectionsTotalCount = registry.counter(name(CONNECTIONS, TOTAL, COUNT))
+    private val disconnectionsCount = registry.counter(name(DISCONNECTIONS, COUNT))
+
     private val processingTime = registry.timer(name(MESSAGES, PROCESSING, TIME))
     private val totalLatency = registry.timer(name(MESSAGES, LATENCY, TIME))
 
-    private val sentCountTotal = registry.counter(name(MESSAGES, SENT, COUNT))
+    private val sentCount = registry.counter(name(MESSAGES, SENT, COUNT))
     private val sentToTopicCount = { topic: String ->
         registry.counter(name(MESSAGES, SENT, TOPIC, COUNT), TOPIC, topic)
     }.memoize<String, Counter>()
@@ -70,8 +73,13 @@
 
     init {
         registry.gauge(name(MESSAGES, PROCESSING, COUNT), this) {
-            (receivedMsgCount.count() - sentCountTotal.count()).coerceAtLeast(0.0)
+            (receivedMsgCount.count() - sentCount.count()).coerceAtLeast(0.0)
         }
+
+        registry.gauge(name(CONNECTIONS, ACTIVE, COUNT), this) {
+            (connectionsTotalCount.count() - disconnectionsCount.count()).coerceAtLeast(0.0)
+        }
+
         ClassLoaderMetrics().bindTo(registry)
         JvmMemoryMetrics().bindTo(registry)
         JvmGcMetrics().bindTo(registry)
@@ -79,7 +87,6 @@
         JvmThreadMetrics().bindTo(registry)
     }
 
-
     val metricsProvider = MicrometerPrometheusMetricsProvider(registry)
 
     override fun notifyBytesReceived(size: Int) {
@@ -93,7 +100,7 @@
 
     override fun notifyMessageSent(msg: RoutedMessage) {
         val now = Instant.now()
-        sentCountTotal.increment()
+        sentCount.increment()
         sentToTopicCount(msg.topic).increment()
 
         processingTime.record(Duration.between(msg.message.wtpFrame.receivedAt, now))
@@ -110,11 +117,22 @@
         clientsRejectedCauseCount(cause.tag).increment()
     }
 
+    override fun notifyClientConnected() {
+        connectionsTotalCount.increment()
+    }
+
+    override fun notifyClientDisconnected() {
+        disconnectionsCount.increment()
+    }
+
     companion object {
         val INSTANCE = MicrometerMetrics()
         internal const val PREFIX = "hvves"
         internal const val MESSAGES = "messages"
         internal const val RECEIVED = "received"
+        internal const val DISCONNECTIONS = "disconnections"
+        internal const val CONNECTIONS = "connections"
+        internal const val ACTIVE = "active"
         internal const val BYTES = "bytes"
         internal const val COUNT = "count"
         internal const val DATA = "data"
@@ -125,6 +143,7 @@
         internal const val REJECTED = "rejected"
         internal const val TOPIC = "topic"
         internal const val DROPPED = "dropped"
+        internal const val TOTAL = "total"
         internal const val TIME = "time"
         internal const val LATENCY = "latency"
         internal fun name(vararg name: String) = "$PREFIX.${name.joinToString(".")}"
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
index b35dc53..f9be546 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
@@ -44,7 +44,7 @@
                 config.maximumPayloadSizeBytes
         ).createVesHvCollectorProvider()
 
-        return ServerFactory.createNettyTcpServer(config, collectorProvider)
+        return ServerFactory.createNettyTcpServer(config, collectorProvider, MicrometerMetrics.INSTANCE)
     }
 
     override fun serverStartedMessage(handle: ServerHandle) =
diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
index 71fc8f7..24355d5 100644
--- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
+++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
@@ -22,7 +22,6 @@
 import arrow.core.Try
 import io.micrometer.core.instrument.Counter
 import io.micrometer.core.instrument.Gauge
-import io.micrometer.core.instrument.Meter
 import io.micrometer.core.instrument.Timer
 import io.micrometer.core.instrument.search.RequiredSearch
 import io.micrometer.prometheus.PrometheusConfig
@@ -35,10 +34,10 @@
 import org.jetbrains.spek.api.dsl.on
 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics.Companion.PREFIX
-import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.INVALID_WIRE_FRAME_MARKER
-import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE
 import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
 import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
+import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.INVALID_WIRE_FRAME_MARKER
+import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
 import org.onap.dcae.collectors.veshv.model.VesMessage
 import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
@@ -46,6 +45,8 @@
 import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrame
 import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrameWithPayloadSize
 import java.time.Instant
+import io.micrometer.core.instrument.Meter
+
 import java.time.temporal.Temporal
 import java.util.concurrent.TimeUnit
 import kotlin.reflect.KClass
@@ -56,7 +57,6 @@
  */
 object MicrometerMetricsTest : Spek({
     val doublePrecision = Percentage.withPercentage(0.5)
-    val alwaysChangedMeters = setOf("$PREFIX.messages.processing.time", "$PREFIX.messages.latency.time")
     lateinit var registry: PrometheusMeterRegistry
     lateinit var cut: MicrometerMetrics
 
@@ -87,6 +87,7 @@
     fun <T> verifyCounter(name: String, verifier: (Counter) -> T) =
             verifyCounter(registrySearch(name), verifier)
 
+
     fun verifyCountersAndTimersAreUnchangedBut(vararg changedMeters: String) {
         fun <T : Meter> verifyAllMetersAreUnchangedBut(
                 clazz: KClass<T>,
@@ -98,7 +99,9 @@
                     .map { it as T }
                     .filterNot { it.id.name in changedCounters }
                     .forEach {
-                        assertThat(valueOf(it)).describedAs(it.id.toString()).isCloseTo(0.0, doublePrecision)
+                        assertThat(valueOf(it))
+                                .describedAs(it.id.toString())
+                                .isCloseTo(0.0, doublePrecision)
                     }
         }
 
@@ -108,8 +111,8 @@
         }
     }
 
-    describe("notifyBytesReceived") {
 
+    describe("notifyBytesReceived") {
         on("$PREFIX.data.received.bytes counter") {
             val counterName = "$PREFIX.data.received.bytes"
 
@@ -187,6 +190,7 @@
 
         on("$PREFIX.messages.sent.topic.count counter") {
             val counterName = "$PREFIX.messages.sent.topic.count"
+
             it("should handle counters for different topics") {
                 cut.notifyMessageSent(routedMessage(topicName1))
                 cut.notifyMessageSent(routedMessage(topicName2))
@@ -242,13 +246,12 @@
                         "$PREFIX.messages.processing.time")
             }
         }
-
     }
 
     describe("notifyMessageDropped") {
-
         on("$PREFIX.messages.dropped.count counter") {
             val counterName = "$PREFIX.messages.dropped.count"
+
             it("should increment counter") {
                 cut.notifyMessageDropped(ROUTE_NOT_FOUND)
                 cut.notifyMessageDropped(INVALID_MESSAGE)
@@ -262,6 +265,7 @@
 
         on("$PREFIX.messages.dropped.cause.count counter") {
             val counterName = "$PREFIX.messages.dropped.cause.count"
+
             it("should handle counters for different drop reasons") {
                 cut.notifyMessageDropped(ROUTE_NOT_FOUND)
                 cut.notifyMessageDropped(INVALID_MESSAGE)
@@ -278,36 +282,38 @@
         }
     }
 
-    describe("processing gauge") {
-        it("should show difference between sent and received messages") {
+    describe("notifyClientConnected") {
+        on("$PREFIX.connections.total.count counter") {
+            val counterName = "$PREFIX.connections.total.count"
 
-            on("positive difference") {
-                cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(128))
-                cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(256))
-                cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(256))
-                cut.notifyMessageSent(routedMessage("perf3gpp"))
-                verifyGauge("messages.processing.count") {
-                    assertThat(it.value()).isCloseTo(2.0, doublePrecision)
-                }
-            }
+            it("should increment counter") {
+                cut.notifyClientConnected()
+                cut.notifyClientConnected()
 
-            on("zero difference") {
-                cut.notifyMessageReceived(emptyWireProtocolFrame())
-                cut.notifyMessageSent(routedMessage("perf3gpp"))
-                verifyGauge("messages.processing.count") {
-                    assertThat(it.value()).isCloseTo(0.0, doublePrecision)
+                verifyCounter(counterName) {
+                    assertThat(it.count()).isCloseTo(2.0, doublePrecision)
                 }
-            }
-
-            on("negative difference") {
-                cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(128))
-                cut.notifyMessageSent(routedMessage("fault"))
-                cut.notifyMessageSent(routedMessage("perf3gpp"))
-                verifyGauge("messages.processing.count") {
-                    assertThat(it.value()).isCloseTo(0.0, doublePrecision)
-                }
+                verifyCountersAndTimersAreUnchangedBut(counterName)
             }
         }
+
+    }
+
+    describe("notifyClientDisconnected") {
+        on("$PREFIX.disconnections.count counter") {
+            val counterName = "$PREFIX.disconnections.count"
+
+            it("should increment counter") {
+                cut.notifyClientDisconnected()
+                cut.notifyClientDisconnected()
+
+                verifyCounter(counterName) {
+                    assertThat(it.count()).isCloseTo(2.0, doublePrecision)
+                }
+                verifyCountersAndTimersAreUnchangedBut(counterName)
+            }
+        }
+
     }
 
     describe("notifyClientRejected") {
@@ -342,6 +348,74 @@
             }
         }
     }
+
+    describe("$PREFIX.messages.processing.count gauge") {
+        val gaugeName = "$PREFIX.messages.processing.count"
+
+        on("message traffic") {
+            it("should calculate positive difference between sent and received messages") {
+                cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(128))
+                cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(256))
+                cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(256))
+                cut.notifyMessageSent(routedMessage("perf3gpp"))
+
+                verifyGauge(gaugeName) {
+                    assertThat(it.value()).isCloseTo(2.0, doublePrecision)
+                }
+            }
+
+            it("should calculate no difference between sent and received messages") {
+                cut.notifyMessageSent(routedMessage("perf3gpp"))
+                cut.notifyMessageSent(routedMessage("fault"))
+
+                verifyGauge(gaugeName) {
+                    assertThat(it.value()).isCloseTo(0.0, doublePrecision)
+                }
+            }
+
+            it("should calculate negative difference between sent and received messages") {
+                cut.notifyMessageSent(routedMessage("fault"))
+
+                verifyGauge(gaugeName) {
+                    assertThat(it.value()).isCloseTo(0.0, doublePrecision)
+                }
+            }
+        }
+    }
+
+    describe("$PREFIX.connections.active.count gauge") {
+        val gaugeName = "$PREFIX.connections.active.count"
+
+        on("connection traffic") {
+            it("should calculate positive difference between connected and disconnected clients") {
+                cut.notifyClientConnected()
+                cut.notifyClientConnected()
+                cut.notifyClientConnected()
+                cut.notifyClientDisconnected()
+
+                verifyGauge(gaugeName) {
+                    assertThat(it.value()).isCloseTo(2.0, doublePrecision)
+                }
+            }
+
+            it("should calculate no difference between connected and disconnected clients") {
+                cut.notifyClientDisconnected()
+                cut.notifyClientDisconnected()
+
+                verifyGauge(gaugeName) {
+                    assertThat(it.value()).isCloseTo(0.0, doublePrecision)
+                }
+            }
+
+            it("should calculate negative difference between connected and disconnected clients") {
+                cut.notifyClientDisconnected()
+
+                verifyGauge(gaugeName) {
+                    assertThat(it.value()).isCloseTo(0.0, doublePrecision)
+                }
+            }
+        }
+    }
 })
 
 fun routedMessage(topic: String, partition: Int = 0) =
@@ -364,4 +438,4 @@
         }.let { evt ->
             RoutedMessage(topic, partition,
                     VesMessage(evt.commonEventHeader, wireProtocolFrame(evt)))
-        }
+        }
\ No newline at end of file