Add metrics for active connections count
* Fix and refactor gauges tests in MicrometerMetricsTests
as they were not executing
* Fix client disconnection handler in NettyTcpServer
* Add metrics gauge and counters required to measure active connections
Change-Id: I5620d398525c6859679cd5a49dc55a9fefd8b592
Signed-off-by: Jakub Dudycz <jakub.dudycz@nokia.com>
Issue-ID: DCAEGEN2-1041
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
index 61d28c2..ac55e55 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
@@ -36,6 +36,8 @@
fun notifyMessageReceived(msg: WireFrameMessage)
fun notifyMessageSent(msg: RoutedMessage)
fun notifyMessageDropped(cause: MessageDropCause)
+ fun notifyClientDisconnected()
+ fun notifyClientConnected()
fun notifyClientRejected(cause: ClientRejectionCause)
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt
index dce933a..2e6bb4d 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt
@@ -20,6 +20,7 @@
package org.onap.dcae.collectors.veshv.factory
import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
+import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.Server
import org.onap.dcae.collectors.veshv.impl.socket.NettyTcpServer
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
@@ -30,6 +31,8 @@
* @since May 2018
*/
object ServerFactory {
- fun createNettyTcpServer(serverConfiguration: ServerConfiguration, collectorProvider: CollectorProvider): Server =
- NettyTcpServer(serverConfiguration, ServerSslContextFactory(), collectorProvider)
+ fun createNettyTcpServer(serverConfiguration: ServerConfiguration,
+ collectorProvider: CollectorProvider,
+ metrics: Metrics): Server =
+ NettyTcpServer(serverConfiguration, ServerSslContextFactory(), collectorProvider, metrics)
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
index d8d786b..725622f 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
@@ -22,31 +22,30 @@
import arrow.core.None
import arrow.core.Option
import arrow.core.getOrElse
-import arrow.core.toOption
import arrow.effects.IO
import arrow.syntax.collections.firstOption
import io.netty.handler.ssl.SslHandler
import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
+import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.Server
-import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info
import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug
+import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info
import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withWarn
-import org.onap.dcae.collectors.veshv.utils.logging.Marker
+import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
import org.onap.dcae.collectors.veshv.ssl.boundary.ServerSslContextFactory
import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
import org.onap.dcae.collectors.veshv.utils.ServerHandle
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.logging.Marker
import reactor.core.publisher.Mono
import reactor.netty.ByteBufFlux
import reactor.netty.Connection
import reactor.netty.NettyInbound
import reactor.netty.NettyOutbound
import reactor.netty.tcp.TcpServer
-import java.time.Duration
-import java.lang.Exception
import java.security.cert.X509Certificate
+import java.time.Duration
import javax.net.ssl.SSLSession
@@ -56,15 +55,15 @@
*/
internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
private val sslContextFactory: ServerSslContextFactory,
- private val collectorProvider: CollectorProvider) : Server {
+ private val collectorProvider: CollectorProvider,
+ private val metrics: Metrics) : Server {
override fun start(): IO<ServerHandle> = IO {
- val tcpServer = TcpServer.create()
+ TcpServer.create()
.addressSupplier { serverConfig.serverListenAddress }
.configureSsl()
.handle(this::handleConnection)
-
- NettyServerHandle(tcpServer.bindNow())
+ .let { NettyServerHandle(it.bindNow()) }
}
private fun TcpServer.configureSsl() =
@@ -79,13 +78,13 @@
}
private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> {
+ metrics.notifyClientConnected()
val clientContext = ClientContext(nettyOutbound.alloc())
nettyInbound.withConnection {
populateClientContext(clientContext, it)
it.channel().pipeline().get(SslHandler::class.java)?.engine()?.session?.let { sslSession ->
sslSession.peerCertificates.firstOption().map { it as X509Certificate }.map { it.subjectDN.name }
}
-
}
logger.debug(clientContext::fullMdc, Marker.Entry) { "Client connection request received" }
@@ -97,7 +96,8 @@
{
logger.info(clientContext::fullMdc) { "Handling new connection" }
nettyInbound.withConnection { conn ->
- conn.configureIdleTimeout(clientContext, serverConfig.idleTimeout)
+ conn
+ .configureIdleTimeout(clientContext, serverConfig.idleTimeout)
.logConnectionClosed(clientContext)
}
it.handleConnection(createDataStream(nettyInbound))
@@ -132,15 +132,14 @@
.receive()
.retain()
- private fun Connection.configureIdleTimeout(ctx: ClientContext, timeout: Duration): Connection {
- onReadIdle(timeout.toMillis()) {
- logger.info(ctx) {
- "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${address()}..."
+ private fun Connection.configureIdleTimeout(ctx: ClientContext, timeout: Duration): Connection =
+ onReadIdle(timeout.toMillis()) {
+ logger.info(ctx) {
+ "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${address()}..."
+ }
+ disconnectClient(ctx)
}
- disconnectClient(ctx)
- }
- return this
- }
+
private fun Connection.disconnectClient(ctx: ClientContext) {
channel().close().addListener {
@@ -152,13 +151,11 @@
}
}
- private fun Connection.logConnectionClosed(ctx: ClientContext): Connection {
- onTerminate().subscribe {
- // TODO: this code is never executed (at least with ssl-enabled, did not checked with ssl-disabled)
- logger.info(ctx::fullMdc, Marker.Exit) { "Connection has been closed" }
- }
- return this
- }
+ private fun Connection.logConnectionClosed(ctx: ClientContext): Connection =
+ onDispose {
+ metrics.notifyClientDisconnected()
+ logger.info(ctx::fullMdc, Marker.Exit) { "Connection has been closed" }
+ }
companion object {
private val logger = Logger(NettyTcpServer::class)
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
index 572cc79..f457aea 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
@@ -175,7 +175,6 @@
}
describe("clients rejected metrics") {
-
given("rejection causes") {
mapOf(
ClientRejectionCause.PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE to
@@ -192,7 +191,7 @@
assertThat(metrics.clientRejectionCause.size)
.describedAs("metrics were notified with only one rejection cause")
.isOne()
- assertThat(metrics.clientRejectionCause.get(cause))
+ assertThat(metrics.clientRejectionCause[cause])
.describedAs("metrics were notified only once with correct client rejection cause")
.isOne()
}
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt
index a27d167..8573d86 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt
@@ -21,12 +21,11 @@
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
+import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
import org.onap.dcae.collectors.veshv.model.MessageDropCause
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import java.time.Duration
import java.time.Instant
-import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
-import java.util.concurrent.ConcurrentHashMap
import kotlin.test.fail
/**
@@ -35,14 +34,15 @@
*/
class FakeMetrics : Metrics {
- var bytesReceived: Int = 0 ; private set
- var messageBytesReceived: Int = 0 ; private set
- var messagesDroppedCount: Int = 0 ; private set
- var lastProcessingTimeMicros: Double = -1.0 ; private set
- private val messagesDroppedCause: MutableMap<MessageDropCause, Int> = ConcurrentHashMap()
- var messagesSentCount: Int = 0 ; private set
- val messagesSentToTopic: MutableMap<String, Int> = ConcurrentHashMap()
- var clientRejectionCause = mutableMapOf<ClientRejectionCause, Int>() ; private set
+ var bytesReceived: Int = 0; private set
+ var messageBytesReceived: Int = 0; private set
+ var messagesDroppedCount: Int = 0; private set
+ var lastProcessingTimeMicros: Double = -1.0; private set
+ var messagesSentCount: Int = 0; private set
+ var clientRejectionCause = mutableMapOf<ClientRejectionCause, Int>(); private set
+
+ private val messagesSentToTopic = mutableMapOf<String, Int>()
+ private val messagesDroppedCause = mutableMapOf<MessageDropCause, Int>()
override fun notifyBytesReceived(size: Int) {
bytesReceived += size
@@ -69,6 +69,12 @@
clientRejectionCause.compute(cause) { k, _ -> clientRejectionCause[k]?.inc() ?: 1 }
}
+ override fun notifyClientDisconnected() {
+ }
+
+ override fun notifyClientConnected() {
+ }
+
fun messagesOnTopic(topic: String) =
messagesSentToTopic[topic] ?: fail("No messages were sent to topic $topic")
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
index d35e17d..288145a 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
@@ -50,10 +50,13 @@
private val receivedMsgCount = registry.counter(name(MESSAGES, RECEIVED, COUNT))
private val receivedMsgBytes = registry.counter(name(MESSAGES, RECEIVED, BYTES))
+ private val connectionsTotalCount = registry.counter(name(CONNECTIONS, TOTAL, COUNT))
+ private val disconnectionsCount = registry.counter(name(DISCONNECTIONS, COUNT))
+
private val processingTime = registry.timer(name(MESSAGES, PROCESSING, TIME))
private val totalLatency = registry.timer(name(MESSAGES, LATENCY, TIME))
- private val sentCountTotal = registry.counter(name(MESSAGES, SENT, COUNT))
+ private val sentCount = registry.counter(name(MESSAGES, SENT, COUNT))
private val sentToTopicCount = { topic: String ->
registry.counter(name(MESSAGES, SENT, TOPIC, COUNT), TOPIC, topic)
}.memoize<String, Counter>()
@@ -70,8 +73,13 @@
init {
registry.gauge(name(MESSAGES, PROCESSING, COUNT), this) {
- (receivedMsgCount.count() - sentCountTotal.count()).coerceAtLeast(0.0)
+ (receivedMsgCount.count() - sentCount.count()).coerceAtLeast(0.0)
}
+
+ registry.gauge(name(CONNECTIONS, ACTIVE, COUNT), this) {
+ (connectionsTotalCount.count() - disconnectionsCount.count()).coerceAtLeast(0.0)
+ }
+
ClassLoaderMetrics().bindTo(registry)
JvmMemoryMetrics().bindTo(registry)
JvmGcMetrics().bindTo(registry)
@@ -79,7 +87,6 @@
JvmThreadMetrics().bindTo(registry)
}
-
val metricsProvider = MicrometerPrometheusMetricsProvider(registry)
override fun notifyBytesReceived(size: Int) {
@@ -93,7 +100,7 @@
override fun notifyMessageSent(msg: RoutedMessage) {
val now = Instant.now()
- sentCountTotal.increment()
+ sentCount.increment()
sentToTopicCount(msg.topic).increment()
processingTime.record(Duration.between(msg.message.wtpFrame.receivedAt, now))
@@ -110,11 +117,22 @@
clientsRejectedCauseCount(cause.tag).increment()
}
+ override fun notifyClientConnected() {
+ connectionsTotalCount.increment()
+ }
+
+ override fun notifyClientDisconnected() {
+ disconnectionsCount.increment()
+ }
+
companion object {
val INSTANCE = MicrometerMetrics()
internal const val PREFIX = "hvves"
internal const val MESSAGES = "messages"
internal const val RECEIVED = "received"
+ internal const val DISCONNECTIONS = "disconnections"
+ internal const val CONNECTIONS = "connections"
+ internal const val ACTIVE = "active"
internal const val BYTES = "bytes"
internal const val COUNT = "count"
internal const val DATA = "data"
@@ -125,6 +143,7 @@
internal const val REJECTED = "rejected"
internal const val TOPIC = "topic"
internal const val DROPPED = "dropped"
+ internal const val TOTAL = "total"
internal const val TIME = "time"
internal const val LATENCY = "latency"
internal fun name(vararg name: String) = "$PREFIX.${name.joinToString(".")}"
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
index b35dc53..f9be546 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
@@ -44,7 +44,7 @@
config.maximumPayloadSizeBytes
).createVesHvCollectorProvider()
- return ServerFactory.createNettyTcpServer(config, collectorProvider)
+ return ServerFactory.createNettyTcpServer(config, collectorProvider, MicrometerMetrics.INSTANCE)
}
override fun serverStartedMessage(handle: ServerHandle) =
diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
index 71fc8f7..24355d5 100644
--- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
+++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
@@ -22,7 +22,6 @@
import arrow.core.Try
import io.micrometer.core.instrument.Counter
import io.micrometer.core.instrument.Gauge
-import io.micrometer.core.instrument.Meter
import io.micrometer.core.instrument.Timer
import io.micrometer.core.instrument.search.RequiredSearch
import io.micrometer.prometheus.PrometheusConfig
@@ -35,10 +34,10 @@
import org.jetbrains.spek.api.dsl.on
import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics.Companion.PREFIX
-import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.INVALID_WIRE_FRAME_MARKER
-import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE
import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
+import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.INVALID_WIRE_FRAME_MARKER
+import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
@@ -46,6 +45,8 @@
import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrame
import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrameWithPayloadSize
import java.time.Instant
+import io.micrometer.core.instrument.Meter
+
import java.time.temporal.Temporal
import java.util.concurrent.TimeUnit
import kotlin.reflect.KClass
@@ -56,7 +57,6 @@
*/
object MicrometerMetricsTest : Spek({
val doublePrecision = Percentage.withPercentage(0.5)
- val alwaysChangedMeters = setOf("$PREFIX.messages.processing.time", "$PREFIX.messages.latency.time")
lateinit var registry: PrometheusMeterRegistry
lateinit var cut: MicrometerMetrics
@@ -87,6 +87,7 @@
fun <T> verifyCounter(name: String, verifier: (Counter) -> T) =
verifyCounter(registrySearch(name), verifier)
+
fun verifyCountersAndTimersAreUnchangedBut(vararg changedMeters: String) {
fun <T : Meter> verifyAllMetersAreUnchangedBut(
clazz: KClass<T>,
@@ -98,7 +99,9 @@
.map { it as T }
.filterNot { it.id.name in changedCounters }
.forEach {
- assertThat(valueOf(it)).describedAs(it.id.toString()).isCloseTo(0.0, doublePrecision)
+ assertThat(valueOf(it))
+ .describedAs(it.id.toString())
+ .isCloseTo(0.0, doublePrecision)
}
}
@@ -108,8 +111,8 @@
}
}
- describe("notifyBytesReceived") {
+ describe("notifyBytesReceived") {
on("$PREFIX.data.received.bytes counter") {
val counterName = "$PREFIX.data.received.bytes"
@@ -187,6 +190,7 @@
on("$PREFIX.messages.sent.topic.count counter") {
val counterName = "$PREFIX.messages.sent.topic.count"
+
it("should handle counters for different topics") {
cut.notifyMessageSent(routedMessage(topicName1))
cut.notifyMessageSent(routedMessage(topicName2))
@@ -242,13 +246,12 @@
"$PREFIX.messages.processing.time")
}
}
-
}
describe("notifyMessageDropped") {
-
on("$PREFIX.messages.dropped.count counter") {
val counterName = "$PREFIX.messages.dropped.count"
+
it("should increment counter") {
cut.notifyMessageDropped(ROUTE_NOT_FOUND)
cut.notifyMessageDropped(INVALID_MESSAGE)
@@ -262,6 +265,7 @@
on("$PREFIX.messages.dropped.cause.count counter") {
val counterName = "$PREFIX.messages.dropped.cause.count"
+
it("should handle counters for different drop reasons") {
cut.notifyMessageDropped(ROUTE_NOT_FOUND)
cut.notifyMessageDropped(INVALID_MESSAGE)
@@ -278,36 +282,38 @@
}
}
- describe("processing gauge") {
- it("should show difference between sent and received messages") {
+ describe("notifyClientConnected") {
+ on("$PREFIX.connections.total.count counter") {
+ val counterName = "$PREFIX.connections.total.count"
- on("positive difference") {
- cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(128))
- cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(256))
- cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(256))
- cut.notifyMessageSent(routedMessage("perf3gpp"))
- verifyGauge("messages.processing.count") {
- assertThat(it.value()).isCloseTo(2.0, doublePrecision)
- }
- }
+ it("should increment counter") {
+ cut.notifyClientConnected()
+ cut.notifyClientConnected()
- on("zero difference") {
- cut.notifyMessageReceived(emptyWireProtocolFrame())
- cut.notifyMessageSent(routedMessage("perf3gpp"))
- verifyGauge("messages.processing.count") {
- assertThat(it.value()).isCloseTo(0.0, doublePrecision)
+ verifyCounter(counterName) {
+ assertThat(it.count()).isCloseTo(2.0, doublePrecision)
}
- }
-
- on("negative difference") {
- cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(128))
- cut.notifyMessageSent(routedMessage("fault"))
- cut.notifyMessageSent(routedMessage("perf3gpp"))
- verifyGauge("messages.processing.count") {
- assertThat(it.value()).isCloseTo(0.0, doublePrecision)
- }
+ verifyCountersAndTimersAreUnchangedBut(counterName)
}
}
+
+ }
+
+ describe("notifyClientDisconnected") {
+ on("$PREFIX.disconnections.count counter") {
+ val counterName = "$PREFIX.disconnections.count"
+
+ it("should increment counter") {
+ cut.notifyClientDisconnected()
+ cut.notifyClientDisconnected()
+
+ verifyCounter(counterName) {
+ assertThat(it.count()).isCloseTo(2.0, doublePrecision)
+ }
+ verifyCountersAndTimersAreUnchangedBut(counterName)
+ }
+ }
+
}
describe("notifyClientRejected") {
@@ -342,6 +348,74 @@
}
}
}
+
+ describe("$PREFIX.messages.processing.count gauge") {
+ val gaugeName = "$PREFIX.messages.processing.count"
+
+ on("message traffic") {
+ it("should calculate positive difference between sent and received messages") {
+ cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(128))
+ cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(256))
+ cut.notifyMessageReceived(wireProtocolFrameWithPayloadSize(256))
+ cut.notifyMessageSent(routedMessage("perf3gpp"))
+
+ verifyGauge(gaugeName) {
+ assertThat(it.value()).isCloseTo(2.0, doublePrecision)
+ }
+ }
+
+ it("should calculate no difference between sent and received messages") {
+ cut.notifyMessageSent(routedMessage("perf3gpp"))
+ cut.notifyMessageSent(routedMessage("fault"))
+
+ verifyGauge(gaugeName) {
+ assertThat(it.value()).isCloseTo(0.0, doublePrecision)
+ }
+ }
+
+ it("should calculate negative difference between sent and received messages") {
+ cut.notifyMessageSent(routedMessage("fault"))
+
+ verifyGauge(gaugeName) {
+ assertThat(it.value()).isCloseTo(0.0, doublePrecision)
+ }
+ }
+ }
+ }
+
+ describe("$PREFIX.connections.active.count gauge") {
+ val gaugeName = "$PREFIX.connections.active.count"
+
+ on("connection traffic") {
+ it("should calculate positive difference between connected and disconnected clients") {
+ cut.notifyClientConnected()
+ cut.notifyClientConnected()
+ cut.notifyClientConnected()
+ cut.notifyClientDisconnected()
+
+ verifyGauge(gaugeName) {
+ assertThat(it.value()).isCloseTo(2.0, doublePrecision)
+ }
+ }
+
+ it("should calculate no difference between connected and disconnected clients") {
+ cut.notifyClientDisconnected()
+ cut.notifyClientDisconnected()
+
+ verifyGauge(gaugeName) {
+ assertThat(it.value()).isCloseTo(0.0, doublePrecision)
+ }
+ }
+
+ it("should calculate negative difference between connected and disconnected clients") {
+ cut.notifyClientDisconnected()
+
+ verifyGauge(gaugeName) {
+ assertThat(it.value()).isCloseTo(0.0, doublePrecision)
+ }
+ }
+ }
+ }
})
fun routedMessage(topic: String, partition: Int = 0) =
@@ -364,4 +438,4 @@
}.let { evt ->
RoutedMessage(topic, partition,
VesMessage(evt.commonEventHeader, wireProtocolFrame(evt)))
- }
+ }
\ No newline at end of file