Use consumers in main
It includes --disable-processing flag.
Also fixed some issues with script for local performance test.
Also added KafkaConsumer::poll in OffsetKafka Consumer - without it KafkaConsumer::assignment returns empty set
Signed-off-by: Izabela Zawadzka <izabela.zawadzka@nokia.com>
Issue-ID: DCAEGEN2-1657
Change-Id: I95fadb45f321398346094dfa0c4a6e9da954c186
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/api/MetricsKafkaConsumer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/api/MetricsKafkaConsumer.kt
new file mode 100644
index 0000000..6dddd0f
--- /dev/null
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/api/MetricsKafkaConsumer.kt
@@ -0,0 +1,32 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.kafkaconsumer.api
+
+import kotlinx.coroutines.Job
+import java.time.Duration
+
+interface MetricsKafkaConsumer {
+ suspend fun start(updateInterval: Long = defaultUpdateInterval, pollTimeout: Duration = defaultPollTimeoutMs): Job
+
+ companion object{
+ private const val defaultUpdateInterval = 500L
+ private val defaultPollTimeoutMs: Duration = Duration.ofMillis(10L)
+ }
+}
\ No newline at end of file
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt
index d105c4a..18de6fc 100644
--- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt
@@ -28,19 +28,23 @@
import kotlinx.coroutines.launch
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.TopicPartition
+import org.onap.dcae.collectors.veshv.kafkaconsumer.api.MetricsKafkaConsumer
import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.Metrics
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import java.time.Duration
internal class OffsetKafkaConsumer(private val kafkaConsumer: KafkaConsumer<ByteArray, ByteArray>,
private val topics: Set<String>,
private val metrics: Metrics,
- private val dispatcher: CoroutineDispatcher = Dispatchers.IO) {
+ private val dispatcher: CoroutineDispatcher = Dispatchers.IO)
+ : MetricsKafkaConsumer{
- suspend fun start(updateInterval: Long = 500L): Job =
+ override suspend fun start(updateInterval: Long, pollTimeout: Duration):Job =
GlobalScope.launch(dispatcher) {
- kafkaConsumer.subscribe(topics)
- val topicPartitions = kafkaConsumer.assignment()
+ kafkaConsumer.assign(topics.map { TopicPartition(it, 0) })
while (isActive) {
+ val topicPartitions = kafkaConsumer.assignment()
+
kafkaConsumer.endOffsets(topicPartitions)
.forEach { (topicPartition, offset) ->
update(topicPartition, offset)
@@ -58,6 +62,6 @@
}
companion object {
- val logger = Logger(OffsetKafkaConsumer::class)
+ private val logger = Logger(OffsetKafkaConsumer::class)
}
}
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/ProcessingKafkaConsumer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/ProcessingKafkaConsumer.kt
index f47a66d..7574d61 100644
--- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/ProcessingKafkaConsumer.kt
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/ProcessingKafkaConsumer.kt
@@ -29,21 +29,22 @@
import kotlinx.coroutines.launch
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.onap.dcae.collectors.veshv.kafkaconsumer.api.MetricsKafkaConsumer
import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.Metrics
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.ves.VesEventOuterClass
import java.time.Duration
internal class ProcessingKafkaConsumer (private val kafkaConsumer: KafkaConsumer<ByteArray, ByteArray>,
private val topics: Set<String>,
private val metrics: Metrics,
- private val dispatcher: CoroutineDispatcher = Dispatchers.IO){
+ private val dispatcher: CoroutineDispatcher = Dispatchers.IO)
+ : MetricsKafkaConsumer{
- suspend fun start(updateInterval: Long = 500L, timeout: Duration): Job =
+ override suspend fun start(updateInterval: Long, pollTimeout: Duration):Job =
GlobalScope.launch(dispatcher){
kafkaConsumer.subscribe(topics)
while (isActive){
- kafkaConsumer.poll(timeout).forEach(::update)
+ kafkaConsumer.poll(pollTimeout).forEach(::update)
kafkaConsumer.commitSync()
delay(updateInterval)
}
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/main.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/main.kt
index 7e77bae..9bf4310 100644
--- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/main.kt
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/main.kt
@@ -19,9 +19,14 @@
*/
package org.onap.dcae.collectors.veshv.kafkaconsumer
+import kotlinx.coroutines.runBlocking
+import org.apache.kafka.clients.consumer.KafkaConsumer
import org.onap.dcae.collectors.veshv.commandline.handleWrongArgumentErrorCurried
+import org.onap.dcae.collectors.veshv.kafka.api.KafkaPropertiesFactory
import org.onap.dcae.collectors.veshv.kafkaconsumer.config.ArgKafkaConsumerConfiguration
import org.onap.dcae.collectors.veshv.kafkaconsumer.config.KafkaConsumerConfiguration
+import org.onap.dcae.collectors.veshv.kafkaconsumer.impl.OffsetKafkaConsumer
+import org.onap.dcae.collectors.veshv.kafkaconsumer.impl.ProcessingKafkaConsumer
import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.MicrometerMetrics
import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.http.PrometheusApiServer
import org.onap.dcae.collectors.veshv.utils.process.ExitCode
@@ -37,6 +42,20 @@
private fun startApp(config: KafkaConsumerConfiguration): ExitSuccess {
+ val kafkaConsumer = KafkaConsumer<ByteArray, ByteArray>(KafkaPropertiesFactory.create(
+ config.kafkaBootstrapServers)
+ )
+
+ runBlocking {
+ if (config.disableProcessing) {
+ OffsetKafkaConsumer(kafkaConsumer, config.kafkaTopics, MicrometerMetrics.INSTANCE)
+ .start()
+ } else {
+ ProcessingKafkaConsumer(kafkaConsumer, config.kafkaTopics, MicrometerMetrics.INSTANCE)
+ .start()
+ }
+ }
+
PrometheusApiServer(config.apiAddress, MicrometerMetrics.INSTANCE)
.start().block()!!.await().block() // TODO refactor netty server logic
diff --git a/tools/performance/local/docker-compose.yml b/tools/performance/local/docker-compose.yml
index c0dfc47..73d02fd 100644
--- a/tools/performance/local/docker-compose.yml
+++ b/tools/performance/local/docker-compose.yml
@@ -104,7 +104,7 @@
image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-kafka-consumer
ports:
- "6064:6064/tcp"
- command: ["--listen-port", "6062"]
+ command: ["--listen-port", "6064", "--kafka-topics", "HV_VES_PERF3GPP", "--kafka-bootstrap-servers", "message-router-kafka-0:9093"]
depends_on:
- message-router-kafka-0
diff --git a/tools/performance/local/local-performance-test.sh b/tools/performance/local/local-performance-test.sh
index 3c885a6..6d08b8e 100755
--- a/tools/performance/local/local-performance-test.sh
+++ b/tools/performance/local/local-performance-test.sh
@@ -1,10 +1,10 @@
#!/usr/bin/env bash
-SCRIPT_DIRECTORY="$(dirname "$0")"
+SCRIPT_DIRECTORY="$(pwd "$0")"
CERT_FILE=${CERT_FILE:-/ssl/client.p12}
CERT_PASS_FILE=${CERT_PASS_FILE:-/ssl/client.pass}
-HV_VES_NETWORK=${HV_VES_NETWORK:-performance_default}
-VOLUME_MAPPING=${VOLUME_MAPPING:-$PWD/../ssl/:/ssl}
+HV_VES_NETWORK=${HV_VES_NETWORK:-local_default}
+VOLUME_MAPPING=${VOLUME_MAPPING:-$SCRIPT_DIRECTORY/../../ssl/:/ssl}
PRODUCER_IMAGE_NAME=${PRODUCER_IMAGE_NAME:-the-a-team-registry-local.esisoj70.emea.nsn-net.net/onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-rust-client:latest}
PRODUCER_APP_NAME=hv-ves-producer
@@ -115,6 +115,7 @@
echo "Clearing generated certs"
cd ../../ssl
./gen-certs.sh clean
+
cd "$SCRIPT_DIRECTORY"
echo "Removing HV-VES components"