Add partition offset metric to each topic partition
Before this commit offset consumer was able to fetch offset just from one partition.
This commit solve this.
Change-Id: I2c2c300219e43ab422b237094ad775ca8795169e
Issue-ID: DCAEGEN2-1783
Signed-off-by: kjaniak <kornel.janiak@nokia.com>
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/api/MetricsKafkaConsumer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/api/MetricsKafkaConsumer.kt
index 6dddd0f..acbcbdd 100644
--- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/api/MetricsKafkaConsumer.kt
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/api/MetricsKafkaConsumer.kt
@@ -29,4 +29,4 @@
private const val defaultUpdateInterval = 500L
private val defaultPollTimeoutMs: Duration = Duration.ofMillis(10L)
}
-}
\ No newline at end of file
+}
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt
index 18de6fc..52bcf1e 100644
--- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt
@@ -37,21 +37,23 @@
private val topics: Set<String>,
private val metrics: Metrics,
private val dispatcher: CoroutineDispatcher = Dispatchers.IO)
- : MetricsKafkaConsumer{
+ : MetricsKafkaConsumer {
- override suspend fun start(updateInterval: Long, pollTimeout: Duration):Job =
+ override suspend fun start(updateInterval: Long, pollTimeout: Duration): Job =
GlobalScope.launch(dispatcher) {
- kafkaConsumer.assign(topics.map { TopicPartition(it, 0) })
- while (isActive) {
- val topicPartitions = kafkaConsumer.assignment()
+ val topicPartitions = topics.flatMap {
+ listOf(TopicPartition(it, 0), TopicPartition(it, 1), TopicPartition(it, 2))
+ }
+ kafkaConsumer.assign(topicPartitions)
- kafkaConsumer.endOffsets(topicPartitions)
- .forEach { (topicPartition, offset) ->
- update(topicPartition, offset)
- }
- kafkaConsumer.commitSync()
- delay(updateInterval)
- }
+ while (isActive) {
+ kafkaConsumer.endOffsets(kafkaConsumer.assignment())
+ .forEach { (topicPartition, offset) ->
+ update(topicPartition, offset)
+ }
+ kafkaConsumer.commitSync()
+ delay(updateInterval)
+ }
}
private fun update(topicPartition: TopicPartition, offset: Long) {
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt
index 0af2cb2..da6a467 100644
--- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetrics.kt
@@ -35,8 +35,10 @@
private val registry: PrometheusMeterRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
) : Metrics {
- private val currentOffsetByTopicPartition = { topic: String ->
- registry.gauge(name(CONSUMER, OFFSET, TOPIC), listOf(Tag.of(TOPIC, topic)), AtomicLong(0))
+ private val currentOffsetByTopicPartition = { topicPartition: String ->
+ registry.gauge(name(OFFSET, PARTITION, topicPartition.toLowerCase()),
+ listOf(Tag.of(PARTITION, topicPartition)),
+ AtomicLong(0))
}.memoize<String, AtomicLong>()
private val travelTime = Timer.builder(name(TRAVEL,TIME))
@@ -58,9 +60,8 @@
companion object {
val INSTANCE by lazy { MicrometerMetrics() }
- private const val CONSUMER = "consumer"
private const val OFFSET = "offset"
- private const val TOPIC = "topic"
+ private const val PARTITION = "partition"
private const val TRAVEL = "travel"
private const val TIME = "time"
private const val PREFIX = "hv-kafka-consumer"
diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumerTest.kt b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumerTest.kt
index 3bb9eba..26616f1 100644
--- a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumerTest.kt
+++ b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumerTest.kt
@@ -72,7 +72,7 @@
}
}
- given("two topics with partition") {
+ given("two topics with one partition each") {
val topics = setOf(topicName1, topicName2)
val kafkaSource = OffsetKafkaConsumer(mockedKafkaConsumer, topics, mockedMetrics, testDispatcher)
@@ -147,7 +147,7 @@
val offsetKafkaConsumer = OffsetKafkaConsumer(mockedKafkaConsumer, emptyTopics, mockedMetrics, testDispatcher)
on("call of function start") {
- val emptyTopicPartitions = setOf(null)
+ val emptyTopicPartitions = emptySet<TopicPartition>()
whenever(mockedKafkaConsumer.assignment()).thenReturn(emptyTopicPartitions)
whenever(mockedKafkaConsumer.endOffsets(emptyTopicPartitions))
.thenReturn(emptyMap())
diff --git a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt
index 93a39ae..cfe67df 100644
--- a/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt
+++ b/sources/hv-collector-kafka-consumer/src/test/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/metrics/MicrometerMetricsTest.kt
@@ -69,17 +69,34 @@
}
describe("Gauges") {
- val gaugeName = "$PREFIX.consumer.offset.topic"
+ val gaugeName1 = "$PREFIX.offset.partition.sample_topic-0"
+ val gaugeName2 = "$PREFIX.offset.partition.sample_topic-1"
+ val offset1 = 966L
+ val offset2 = 967L
+ val topicPartition1 = TopicPartition("sample_topic", 0)
+ val topicPartition2 = TopicPartition("sample_topic", 1)
on("notifyOffsetChanged") {
- val offset = 966L
- val topicPartition = TopicPartition("sample_topic", 1)
+ it("should update $gaugeName1") {
+ cut.notifyOffsetChanged(offset1, topicPartition1)
- it("should update $gaugeName") {
- cut.notifyOffsetChanged(offset, topicPartition)
+ registry.verifyGauge(gaugeName1) {
+ assertThat(it.value()).isCloseTo(offset1.toDouble(), doublePrecision)
+ }
+ }
+ }
- registry.verifyGauge(gaugeName) {
- assertThat(it.value()).isCloseTo(offset.toDouble(), doublePrecision)
+ on("two partition update") {
+ it("should update $gaugeName1") {
+ cut.notifyOffsetChanged(offset1, topicPartition1)
+ cut.notifyOffsetChanged(offset2, topicPartition2)
+
+ registry.verifyGauge(gaugeName1) {
+ assertThat(it.value()).isCloseTo(offset1.toDouble(), doublePrecision)
+ }
+
+ registry.verifyGauge(gaugeName2) {
+ assertThat(it.value()).isCloseTo(offset2.toDouble(), doublePrecision)
}
}
}
diff --git a/tools/performance/cloud/cloud-based-performance-test.sh b/tools/performance/cloud/cloud-based-performance-test.sh
index cc08f4b..300bf20 100755
--- a/tools/performance/cloud/cloud-based-performance-test.sh
+++ b/tools/performance/cloud/cloud-based-performance-test.sh
@@ -77,7 +77,7 @@
echo "./cloud-based-performance-test.sh setup"
echo "./cloud-based-performance-test.sh start"
echo "./cloud-based-performance-test.sh start --containers 10"
- echo "./cloud-based-performance-test.sh start --containers 10"
+ echo "./cloud-based-performance-test.sh start --properties-file ~/other_test.properties"
echo "./cloud-based-performance-test.sh clean"
exit 1
}
diff --git a/tools/performance/cloud/test.properties b/tools/performance/cloud/test.properties
index bd746a1..04169e3 100644
--- a/tools/performance/cloud/test.properties
+++ b/tools/performance/cloud/test.properties
@@ -14,6 +14,6 @@
# CONSUMER CONFIGURATION
# Addresses of Kafka services to consume from
-consumer.kafka.bootstrapServers=message-router-kafka-0:9093,message-router-kafka-1:9093,message-router-kafka-2:9093
+consumer.kafka.bootstrapServers=message-router-kafka:9092
# Kafka topics to subscribe to
consumer.kafka.topics=HV_VES_PERF3GPP
diff --git a/tools/performance/local/grafana/dashboards/performance_tests.json b/tools/performance/local/grafana/dashboards/performance_tests.json
index 3784a96..654fdc3 100644
--- a/tools/performance/local/grafana/dashboards/performance_tests.json
+++ b/tools/performance/local/grafana/dashboards/performance_tests.json
@@ -191,7 +191,7 @@
"tableColumn": "",
"targets": [
{
- "expr": "hv_kafka_consumer_consumer_offset_topic",
+ "expr": "hv_kafka_consumer_offset_partition_hv_ves_perf3gpp_0",
"format": "time_series",
"intervalFactor": 1,
"refId": "A"
@@ -200,7 +200,7 @@
"thresholds": "",
"timeFrom": null,
"timeShift": null,
- "title": "Current offset",
+ "title": "Current offset on partition 0",
"type": "singlestat",
"valueFontSize": "80%",
"valueMaps": [