Use consumers in main

It includes --disable-processing flag.
Also fixed some issues with script for local performance test.
Also added KafkaConsumer::poll in OffsetKafka Consumer - without it KafkaConsumer::assignment returns empty set

Signed-off-by: Izabela Zawadzka <izabela.zawadzka@nokia.com>
Issue-ID: DCAEGEN2-1657
Change-Id: I95fadb45f321398346094dfa0c4a6e9da954c186
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/api/MetricsKafkaConsumer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/api/MetricsKafkaConsumer.kt
new file mode 100644
index 0000000..6dddd0f
--- /dev/null
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/api/MetricsKafkaConsumer.kt
@@ -0,0 +1,32 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.kafkaconsumer.api
+
+import kotlinx.coroutines.Job
+import java.time.Duration
+
+interface MetricsKafkaConsumer {
+    suspend fun start(updateInterval: Long = defaultUpdateInterval, pollTimeout: Duration = defaultPollTimeoutMs): Job
+
+    companion object{
+        private const val defaultUpdateInterval = 500L
+        private val defaultPollTimeoutMs: Duration = Duration.ofMillis(10L)
+    }
+}
\ No newline at end of file
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt
index d105c4a..18de6fc 100644
--- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/OffsetKafkaConsumer.kt
@@ -28,19 +28,23 @@
 import kotlinx.coroutines.launch
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.kafka.common.TopicPartition
+import org.onap.dcae.collectors.veshv.kafkaconsumer.api.MetricsKafkaConsumer
 import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.Metrics
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import java.time.Duration
 
 internal class OffsetKafkaConsumer(private val kafkaConsumer: KafkaConsumer<ByteArray, ByteArray>,
                                    private val topics: Set<String>,
                                    private val metrics: Metrics,
-                                   private val dispatcher: CoroutineDispatcher = Dispatchers.IO) {
+                                   private val dispatcher: CoroutineDispatcher = Dispatchers.IO)
+    : MetricsKafkaConsumer{
 
-    suspend fun start(updateInterval: Long = 500L): Job =
+    override suspend fun start(updateInterval: Long, pollTimeout: Duration):Job =
             GlobalScope.launch(dispatcher) {
-                kafkaConsumer.subscribe(topics)
-                    val topicPartitions = kafkaConsumer.assignment()
+                kafkaConsumer.assign(topics.map { TopicPartition(it, 0) })
                     while (isActive) {
+                        val topicPartitions = kafkaConsumer.assignment()
+
                         kafkaConsumer.endOffsets(topicPartitions)
                                 .forEach { (topicPartition, offset) ->
                                     update(topicPartition, offset)
@@ -58,6 +62,6 @@
     }
 
     companion object {
-        val logger = Logger(OffsetKafkaConsumer::class)
+        private val logger = Logger(OffsetKafkaConsumer::class)
     }
 }
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/ProcessingKafkaConsumer.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/ProcessingKafkaConsumer.kt
index f47a66d..7574d61 100644
--- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/ProcessingKafkaConsumer.kt
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/impl/ProcessingKafkaConsumer.kt
@@ -29,21 +29,22 @@
 import kotlinx.coroutines.launch
 import org.apache.kafka.clients.consumer.ConsumerRecord
 import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.onap.dcae.collectors.veshv.kafkaconsumer.api.MetricsKafkaConsumer
 import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.Metrics
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.onap.ves.VesEventOuterClass
 import java.time.Duration
 
 internal class ProcessingKafkaConsumer (private val kafkaConsumer: KafkaConsumer<ByteArray, ByteArray>,
                                         private val topics: Set<String>,
                                         private val metrics: Metrics,
-                                        private val dispatcher: CoroutineDispatcher = Dispatchers.IO){
+                                        private val dispatcher: CoroutineDispatcher = Dispatchers.IO)
+    : MetricsKafkaConsumer{
 
-    suspend fun start(updateInterval: Long = 500L, timeout: Duration): Job =
+    override suspend fun start(updateInterval: Long, pollTimeout: Duration):Job =
             GlobalScope.launch(dispatcher){
                 kafkaConsumer.subscribe(topics)
                 while (isActive){
-                    kafkaConsumer.poll(timeout).forEach(::update)
+                    kafkaConsumer.poll(pollTimeout).forEach(::update)
                     kafkaConsumer.commitSync()
                     delay(updateInterval)
                 }
diff --git a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/main.kt b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/main.kt
index 7e77bae..9bf4310 100644
--- a/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/main.kt
+++ b/sources/hv-collector-kafka-consumer/src/main/kotlin/org/onap/dcae/collectors/veshv/kafkaconsumer/main.kt
@@ -19,9 +19,14 @@
  */
 package org.onap.dcae.collectors.veshv.kafkaconsumer
 
+import kotlinx.coroutines.runBlocking
+import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.onap.dcae.collectors.veshv.commandline.handleWrongArgumentErrorCurried
+import org.onap.dcae.collectors.veshv.kafka.api.KafkaPropertiesFactory
 import org.onap.dcae.collectors.veshv.kafkaconsumer.config.ArgKafkaConsumerConfiguration
 import org.onap.dcae.collectors.veshv.kafkaconsumer.config.KafkaConsumerConfiguration
+import org.onap.dcae.collectors.veshv.kafkaconsumer.impl.OffsetKafkaConsumer
+import org.onap.dcae.collectors.veshv.kafkaconsumer.impl.ProcessingKafkaConsumer
 import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.MicrometerMetrics
 import org.onap.dcae.collectors.veshv.kafkaconsumer.metrics.http.PrometheusApiServer
 import org.onap.dcae.collectors.veshv.utils.process.ExitCode
@@ -37,6 +42,20 @@
 
 
 private fun startApp(config: KafkaConsumerConfiguration): ExitSuccess {
+    val kafkaConsumer = KafkaConsumer<ByteArray, ByteArray>(KafkaPropertiesFactory.create(
+            config.kafkaBootstrapServers)
+    )
+
+        runBlocking {
+            if (config.disableProcessing) {
+                OffsetKafkaConsumer(kafkaConsumer, config.kafkaTopics, MicrometerMetrics.INSTANCE)
+                        .start()
+            } else {
+                ProcessingKafkaConsumer(kafkaConsumer, config.kafkaTopics, MicrometerMetrics.INSTANCE)
+                        .start()
+            }
+        }
+
     PrometheusApiServer(config.apiAddress, MicrometerMetrics.INSTANCE)
             .start().block()!!.await().block() // TODO refactor netty server logic
 
diff --git a/tools/performance/local/docker-compose.yml b/tools/performance/local/docker-compose.yml
index c0dfc47..73d02fd 100644
--- a/tools/performance/local/docker-compose.yml
+++ b/tools/performance/local/docker-compose.yml
@@ -104,7 +104,7 @@
     image: onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-kafka-consumer
     ports:
       - "6064:6064/tcp"
-    command: ["--listen-port", "6062"]
+    command: ["--listen-port", "6064", "--kafka-topics", "HV_VES_PERF3GPP", "--kafka-bootstrap-servers",  "message-router-kafka-0:9093"]
     depends_on:
       - message-router-kafka-0
 
diff --git a/tools/performance/local/local-performance-test.sh b/tools/performance/local/local-performance-test.sh
index 3c885a6..6d08b8e 100755
--- a/tools/performance/local/local-performance-test.sh
+++ b/tools/performance/local/local-performance-test.sh
@@ -1,10 +1,10 @@
 #!/usr/bin/env bash
 
-SCRIPT_DIRECTORY="$(dirname "$0")"
+SCRIPT_DIRECTORY="$(pwd "$0")"
 CERT_FILE=${CERT_FILE:-/ssl/client.p12}
 CERT_PASS_FILE=${CERT_PASS_FILE:-/ssl/client.pass}
-HV_VES_NETWORK=${HV_VES_NETWORK:-performance_default}
-VOLUME_MAPPING=${VOLUME_MAPPING:-$PWD/../ssl/:/ssl}
+HV_VES_NETWORK=${HV_VES_NETWORK:-local_default}
+VOLUME_MAPPING=${VOLUME_MAPPING:-$SCRIPT_DIRECTORY/../../ssl/:/ssl}
 PRODUCER_IMAGE_NAME=${PRODUCER_IMAGE_NAME:-the-a-team-registry-local.esisoj70.emea.nsn-net.net/onap/org.onap.dcaegen2.collectors.hv-ves.hv-collector-rust-client:latest}
 
 PRODUCER_APP_NAME=hv-ves-producer
@@ -115,6 +115,7 @@
     echo "Clearing generated certs"
     cd ../../ssl
     ./gen-certs.sh clean
+
     cd "$SCRIPT_DIRECTORY"
 
     echo "Removing HV-VES components"