There should be one KafkaSender per configuration

We should keep only one instance of KafkaSender per instance. However,
   as the configuration might be changed (Consul update) it cannot be a
   strict singleton. Hence there should be 1to1 relationship beetween
   ConsulConfiguration and KafkaSender.

Change-Id: Ie168028c4427741254b8c2fe316b82cca72d7668
Issue-ID: DCAEGEN2-1047
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
diff --git a/development/bin/consul.sh b/development/bin/consul.sh
index c229f83..39f0bde 100755
--- a/development/bin/consul.sh
+++ b/development/bin/consul.sh
@@ -61,7 +61,6 @@
 
 CONFIGURATION="
 {
-    \"dmaap.kafkaBootstrapServers\": \"message-router-kafka:9092\",
     \"collector.routing\":
         [{
             \"fromDomain\": \"${DOMAIN}\",
diff --git a/development/docker-compose.yml b/development/docker-compose.yml
index a64c62d..adf8947 100644
--- a/development/docker-compose.yml
+++ b/development/docker-compose.yml
@@ -44,7 +44,6 @@
     - consul-server
     restart: on-failure
     command: ["kv", "put", "-http-addr=http://consul-server:8500", "veshv-config", '{
-                                              "dmaap.kafkaBootstrapServers": "message-router-kafka:9092",
                                               "collector.routing": [
                                                 {
                                                   "fromDomain": "perf3gpp",
@@ -63,13 +62,14 @@
     ports:
     - "6060:6060"
     - "6061:6061/tcp"
-    entrypoint: ["java", "-Dio.netty.leakDetection.level=paranoid",
-                 "-cp", "*:", "org.onap.dcae.collectors.veshv.main.MainKt"]
     command: ["--listen-port", "6061",
               "--health-check-api-port", "6060",
               "--config-url", "http://consul-server:8500/v1/kv/veshv-config?raw=true",
+              "--kafka-bootstrap-servers", "message-router-kafka:9092",
               "--key-store-password", "onaponap",
               "--trust-store-password", "onaponap"]
+    environment:
+      JAVA_OPTS:  "-Dio.netty.leakDetection.level=paranoid"
     healthcheck:
       test: curl -f http://localhost:6060/health/ready || exit 1
       interval: 10s
diff --git a/development/start-simulation.sh b/development/start-simulation.sh
index 70e4aae..6f38ea7 100755
--- a/development/start-simulation.sh
+++ b/development/start-simulation.sh
@@ -25,7 +25,7 @@
                  "vesEventListenerVersion": "7.2"
                },
                "messageType": "VALID",
-               "messagesAmount": 1
+               "messagesAmount": 1000000
              }
            ]' \
     http://localhost:6062/simulator/async
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
index ac55e55..e4a7394 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
@@ -23,12 +23,13 @@
 import org.onap.dcae.collectors.veshv.model.ClientContext
 import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
 import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.model.ConsumedMessage
 import org.onap.dcae.collectors.veshv.model.MessageDropCause
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
 import reactor.core.publisher.Flux
 
 interface Sink {
-    fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage>
+    fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage>
 }
 
 interface Metrics {
@@ -41,14 +42,14 @@
     fun notifyClientRejected(cause: ClientRejectionCause)
 }
 
-@FunctionalInterface
 interface SinkProvider {
-    operator fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink
+    operator fun invoke(ctx: ClientContext): Sink
 
     companion object {
         fun just(sink: Sink): SinkProvider =
                 object : SinkProvider {
-                    override fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink = sink
+                    override fun invoke(
+                            ctx: ClientContext): Sink = sink
                 }
     }
 }
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
index 2008fc3..fe2b89d 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
@@ -59,18 +59,20 @@
                     healthState.changeState(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND)
                 }
                 .subscribe(config::set)
+
         return { ctx: ClientContext ->
-            config.getOption().map { config -> createVesHvCollector(config, ctx) }
+            config.getOption().map { createVesHvCollector(it, ctx) }
         }
     }
 
-    private fun createVesHvCollector(config: CollectorConfiguration, ctx: ClientContext): Collector = VesHvCollector(
-            clientContext = ctx,
-            wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maximumPayloadSizeBytes), ctx),
-            protobufDecoder = VesDecoder(),
-            router = Router(config.routing, ctx),
-            sink = sinkProvider(config, ctx),
-            metrics = metrics)
+    private fun createVesHvCollector(config: CollectorConfiguration, ctx: ClientContext): Collector =
+            VesHvCollector(
+                    clientContext = ctx,
+                    wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maximumPayloadSizeBytes), ctx),
+                    protobufDecoder = VesDecoder(),
+                    router = Router(config.routing, ctx),
+                    sink = sinkProvider(ctx),
+                    metrics = metrics)
 
     companion object {
         private val logger = Logger(CollectorFactory::class)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
index 5c3f339..fd01c9d 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -28,9 +28,11 @@
 import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
 import org.onap.dcae.collectors.veshv.model.ClientContext
 import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
+import org.onap.dcae.collectors.veshv.model.ConsumedMessage
+import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage
 import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
 import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
-import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage
 import org.onap.dcae.collectors.veshv.model.VesMessage
 import org.onap.dcae.collectors.veshv.utils.arrow.doOnEmpty
 import org.onap.dcae.collectors.veshv.utils.arrow.doOnFailure
@@ -96,10 +98,10 @@
                         .doOnLeft { metrics.notifyMessageDropped(INVALID_MESSAGE) }
             }
 
-    private fun routeMessage(flux: Flux<VesMessage>): Flux<RoutedMessage> = flux
+    private fun routeMessage(flux: Flux<VesMessage>): Flux<ConsumedMessage> = flux
             .flatMap(this::findRoute)
             .compose(sink::send)
-            .doOnNext(metrics::notifyMessageSent)
+            .doOnNext(this::updateSinkMetrics)
 
     private fun findRoute(msg: VesMessage) = router
             .findDestination(msg)
@@ -108,6 +110,15 @@
                     { "Found route for message: ${it.topic}, partition: ${it.partition}" },
                     { "Could not find route for message" })
 
+    private fun updateSinkMetrics(consumedMessage: ConsumedMessage) {
+        when (consumedMessage) {
+            is SuccessfullyConsumedMessage ->
+                metrics.notifyMessageSent(consumedMessage.message)
+            is FailedToConsumeMessage ->
+                metrics.notifyMessageDropped(consumedMessage.cause)
+        }
+    }
+
     private fun releaseBuffersMemory() = wireChunkDecoder.release()
             .also { logger.debug { "Released buffer memory after handling message stream" } }
 
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
index 8c16736..75b6f0a 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
@@ -23,6 +23,7 @@
 import org.onap.dcae.collectors.veshv.boundary.SinkProvider
 import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider
 import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
+import org.onap.dcae.collectors.veshv.model.KafkaConfiguration
 import reactor.netty.http.client.HttpClient
 
 /**
@@ -30,8 +31,12 @@
  * @since May 2018
  */
 object AdapterFactory {
-    fun kafkaSink(): SinkProvider = KafkaSinkProvider()
-    fun loggingSink(): SinkProvider = LoggingSinkProvider()
+    fun sinkCreatorFactory(dummyMode: Boolean,
+                           kafkaConfig: KafkaConfiguration): SinkProvider =
+            if (dummyMode)
+                LoggingSinkProvider()
+            else
+                KafkaSinkProvider(kafkaConfig)
 
     fun consulConfigurationProvider(configurationProviderParams: ConfigurationProviderParams): ConfigurationProvider =
             ConsulConfigurationProvider(httpAdapter(), configurationProviderParams)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
index e4453c9..717da09 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
@@ -25,6 +25,7 @@
 import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
 import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
 import org.onap.dcae.collectors.veshv.model.ServiceContext
+import org.onap.dcae.collectors.veshv.model.routing
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.onap.dcae.collectors.veshv.utils.logging.Marker
 import reactor.core.publisher.Flux
@@ -107,12 +108,11 @@
             Json.createReader(StringReader(responseString)).readObject()
 
     private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration {
-        val routing = configuration.getJsonArray("collector.routing")
+        val routingArray = configuration.getJsonArray("collector.routing")
 
         return CollectorConfiguration(
-                kafkaBootstrapServers = configuration.getString("dmaap.kafkaBootstrapServers"),
-                routing = org.onap.dcae.collectors.veshv.model.routing {
-                    for (route in routing) {
+                routing {
+                    for (route in routingArray) {
                         val routeObj = route.asJsonObject()
                         defineRoute {
                             fromDomain(routeObj.getString("fromDomain"))
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
index 14966d9..7535fbe 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
@@ -21,11 +21,12 @@
 
 import org.onap.dcae.collectors.veshv.boundary.Sink
 import org.onap.dcae.collectors.veshv.boundary.SinkProvider
-import org.onap.dcae.collectors.veshv.model.ClientContext
 import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info
 import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.trace
-import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.model.ConsumedMessage
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import reactor.core.publisher.Flux
 import java.util.concurrent.atomic.AtomicLong
@@ -36,14 +37,13 @@
  */
 internal class LoggingSinkProvider : SinkProvider {
 
-    override fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink {
+    override fun invoke(ctx: ClientContext): Sink {
         return object : Sink {
             private val totalMessages = AtomicLong()
             private val totalBytes = AtomicLong()
 
-            override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> =
-                    messages
-                            .doOnNext(this::logMessage)
+            override fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> =
+                    messages.doOnNext(this::logMessage).map(::SuccessfullyConsumedMessage)
 
             private fun logMessage(msg: RoutedMessage) {
                 val msgs = totalMessages.addAndGet(1)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
index b4f9a90..73c852d 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
@@ -20,19 +20,20 @@
 package org.onap.dcae.collectors.veshv.impl.adapters.kafka
 
 import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withDebug
 import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withWarn
-import org.onap.dcae.collectors.veshv.utils.logging.Marker
+import org.onap.dcae.collectors.veshv.model.ConsumedMessage
+import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage
+import org.onap.dcae.collectors.veshv.model.MessageDropCause
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage
 import org.onap.dcae.collectors.veshv.model.VesMessage
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.logging.Marker
 import org.onap.ves.VesEventOuterClass.CommonEventHeader
 import reactor.core.publisher.Flux
-import reactor.core.publisher.Mono
 import reactor.kafka.sender.KafkaSender
 import reactor.kafka.sender.SenderRecord
-import reactor.kafka.sender.SenderResult
-import java.util.concurrent.atomic.AtomicLong
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -40,44 +41,39 @@
  */
 internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>,
                          private val ctx: ClientContext) : Sink {
-    private val sentMessages = AtomicLong(0)
 
-    override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> {
-        val records = messages.map(this::vesToKafkaRecord)
-        val result = sender.send(records)
-                .doOnNext {
-                    if (it.isSuccessful()) {
-                        Mono.just(it)
+    override fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> =
+            messages.map(::vesToKafkaRecord).let { records ->
+                sender.send(records).map {
+                    val msg = it.correlationMetadata()
+                    if (it.exception() == null) {
+                        logger.trace(ctx::fullMdc, Marker.Invoke()) {
+                            "Message sent to Kafka with metadata: ${it.recordMetadata()}"
+                        }
+                        SuccessfullyConsumedMessage(msg)
                     } else {
-                        logger.withWarn(ctx) { log("Failed to send message to Kafka", it.exception()) }
-                        Mono.empty<SenderResult<RoutedMessage>>()
+                        logger.warn(ctx::fullMdc, Marker.Invoke()) {
+                            "Failed to send message to Kafka. Reason: ${it.exception().message}"
+                        }
+                        logger.withDebug(ctx) { log("Kafka send failure details", it.exception()) }
+                        FailedToConsumeMessage(msg, it.exception(), MessageDropCause.KAFKA_FAILURE)
                     }
                 }
-                .map { it.correlationMetadata() }
+            }
 
-        return result.doOnNext(::logSentMessage)
-    }
+    private fun vesToKafkaRecord(routed: RoutedMessage): SenderRecord<CommonEventHeader, VesMessage, RoutedMessage> =
+            SenderRecord.create(
+                    routed.topic,
+                    routed.partition,
+                    FILL_TIMESTAMP_LATER,
+                    routed.message.header,
+                    routed.message,
+                    routed)
 
-    private fun vesToKafkaRecord(msg: RoutedMessage): SenderRecord<CommonEventHeader, VesMessage, RoutedMessage> {
-        return SenderRecord.create(
-                msg.topic,
-                msg.partition,
-                System.currentTimeMillis(),
-                msg.message.header,
-                msg.message,
-                msg)
-    }
-
-    private fun logSentMessage(sentMsg: RoutedMessage) {
-        logger.trace(ctx::fullMdc, Marker.Invoke()) {
-            val msgNum = sentMessages.incrementAndGet()
-            "Message #$msgNum has been sent to ${sentMsg.topic}:${sentMsg.partition}"
-        }
-    }
-
-    private fun SenderResult<out Any>.isSuccessful() = exception() == null
+    internal fun usesSameSenderAs(other: KafkaSink) = sender === other.sender
 
     companion object {
-        val logger = Logger(KafkaSink::class)
+        private val FILL_TIMESTAMP_LATER: Long? = null
+        private val logger = Logger(KafkaSink::class)
     }
 }
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
index b4f470d..2fa4f54 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
@@ -19,11 +19,16 @@
  */
 package org.onap.dcae.collectors.veshv.impl.adapters.kafka
 
-import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG
+import org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
+import org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
+import org.apache.kafka.clients.producer.ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION
+import org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG
+import org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
 import org.onap.dcae.collectors.veshv.boundary.Sink
 import org.onap.dcae.collectors.veshv.boundary.SinkProvider
 import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.model.KafkaConfiguration
 import org.onap.dcae.collectors.veshv.model.VesMessage
 import org.onap.ves.VesEventOuterClass.CommonEventHeader
 import reactor.kafka.sender.KafkaSender
@@ -33,14 +38,25 @@
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since June 2018
  */
-internal class KafkaSinkProvider : SinkProvider {
-    override fun invoke(config: CollectorConfiguration, ctx: ClientContext): Sink {
-        return KafkaSink(KafkaSender.create(constructSenderOptions(config)), ctx)
-    }
+internal class KafkaSinkProvider internal constructor(
+        private val kafkaSender: KafkaSender<CommonEventHeader, VesMessage>) : SinkProvider {
 
-    private fun constructSenderOptions(config: CollectorConfiguration) =
-            SenderOptions.create<CommonEventHeader, VesMessage>()
-                    .producerProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafkaBootstrapServers)
-                    .producerProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java)
-                    .producerProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java)
+    constructor(config: KafkaConfiguration) : this(constructKafkaSender(config))
+
+    override fun invoke(ctx: ClientContext) = KafkaSink(kafkaSender, ctx)
+
+    companion object {
+        private fun constructKafkaSender(config: KafkaConfiguration) =
+                KafkaSender.create(constructSenderOptions(config))
+
+        private fun constructSenderOptions(config: KafkaConfiguration) =
+                SenderOptions.create<CommonEventHeader, VesMessage>()
+                        .producerProperty(BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServers)
+                        .producerProperty(KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java)
+                        .producerProperty(VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java)
+                        .producerProperty(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1)
+                        .producerProperty(RETRIES_CONFIG, 1)
+                        .producerProperty(ACKS_CONFIG, "1")
+                        .stopOnError(false)
+    }
 }
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/CollectorConfiguration.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/CollectorConfiguration.kt
index ec546c7..f65b97c 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/CollectorConfiguration.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/CollectorConfiguration.kt
@@ -23,4 +23,4 @@
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since May 2018
  */
-data class CollectorConfiguration(val kafkaBootstrapServers: String, val routing: Routing)
+data class CollectorConfiguration(val routing: Routing)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/KafkaConfiguration.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/KafkaConfiguration.kt
new file mode 100644
index 0000000..f65e157
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/KafkaConfiguration.kt
@@ -0,0 +1,26 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.model
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since December 2018
+ */
+data class KafkaConfiguration(val bootstrapServers: String)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
index 8511768..7e5044f 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
@@ -29,6 +29,7 @@
  */
 data class ServerConfiguration(
         val serverListenAddress: InetSocketAddress,
+        val kafkaConfiguration: KafkaConfiguration,
         val configurationProviderParams: ConfigurationProviderParams,
         val securityConfiguration: SecurityConfiguration,
         val idleTimeout: Duration,
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/SuccessfullyConsumedMessage.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/SuccessfullyConsumedMessage.kt
new file mode 100644
index 0000000..29c418a
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/SuccessfullyConsumedMessage.kt
@@ -0,0 +1,33 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.model
+
+import java.lang.Exception
+
+sealed class ConsumedMessage {
+    abstract val message: RoutedMessage
+}
+
+data class SuccessfullyConsumedMessage(override val message: RoutedMessage) : ConsumedMessage()
+
+data class FailedToConsumeMessage(
+        override val message: RoutedMessage,
+        val exception: Exception?,
+        val cause: MessageDropCause) : ConsumedMessage()
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/stream_interruption_cause.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/stream_interruption_cause.kt
index 836eab5..ab7b196 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/stream_interruption_cause.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/stream_interruption_cause.kt
@@ -29,7 +29,8 @@
  */
 enum class MessageDropCause(val tag: String) {
     ROUTE_NOT_FOUND("routing"),
-    INVALID_MESSAGE("invalid")
+    INVALID_MESSAGE("invalid"),
+    KAFKA_FAILURE("kafka")
 }
 
 enum class ClientRejectionCause(val tag: String) {
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
index 9ce0c3d..a92d376 100644
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
@@ -66,8 +66,6 @@
                     StepVerifier.create(consulConfigProvider().take(1))
                             .consumeNextWith {
 
-                                assertEquals("$kafkaAddress:9093", it.kafkaBootstrapServers)
-
                                 val route1 = it.routing.routes[0]
                                 assertThat(FAULT.domainName)
                                         .describedAs("routed domain 1")
@@ -139,12 +137,9 @@
     )
 }
 
-
-const val kafkaAddress = "message-router-kafka"
-
 fun constructConsulResponse(): String =
     """{
-    "dmaap.kafkaBootstrapServers": "$kafkaAddress:9093",
+    "whatever": "garbage",
     "collector.routing": [
             {
                 "fromDomain": "fault",
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt
new file mode 100644
index 0000000..3a924e4
--- /dev/null
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt
@@ -0,0 +1,64 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.adapters.kafka
+
+import arrow.syntax.collections.tail
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.model.ClientContext
+import org.onap.dcae.collectors.veshv.model.KafkaConfiguration
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since December 2018
+ */
+internal object KafkaSinkProviderTest : Spek({
+    describe("non functional requirements") {
+        given("sample configuration") {
+            val config = KafkaConfiguration("localhost:9090")
+            val cut = KafkaSinkProvider(config)
+
+            on("sample clients") {
+                val clients = listOf(
+                        ClientContext(),
+                        ClientContext(),
+                        ClientContext(),
+                        ClientContext())
+
+                it("should create only one instance of KafkaSender") {
+                    val sinks = clients.map(cut::invoke)
+                    val firstSink = sinks[0]
+                    val restOfSinks = sinks.tail()
+
+                    assertThat(restOfSinks).isNotEmpty
+                    assertThat(restOfSinks).allSatisfy { sink ->
+                        assertThat(firstSink.usesSameSenderAs(sink))
+                                .describedAs("$sink.kafkaSender should be same as $firstSink.kafkaSender")
+                                .isTrue()
+                    }
+                }
+            }
+        }
+    }
+})
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
index f457aea..aaa3ee3 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
@@ -31,6 +31,7 @@
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
 import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
 import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
+import org.onap.dcae.collectors.veshv.model.MessageDropCause.KAFKA_FAILURE
 import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
 import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
 import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
@@ -50,7 +51,7 @@
 
     describe("Bytes received metrics") {
         it("should sum up all bytes received") {
-            val sut = vesHvWithNoOpSink()
+            val sut = vesHvWithAlwaysSuccessfulSink()
             val vesWireFrameMessage = vesWireFrameMessage()
             val invalidWireFrame = messageWithInvalidWireFrameHeader()
 
@@ -70,7 +71,7 @@
 
     describe("Messages received metrics") {
         it("should sum up all received messages bytes") {
-            val sut = vesHvWithNoOpSink()
+            val sut = vesHvWithAlwaysSuccessfulSink()
             val firstVesEvent = vesEvent(eventFields = ByteString.copyFrom(ByteArray(10)))
             val secondVesEvent = vesEvent(eventFields = ByteString.copyFrom(ByteArray(40)))
             val firstVesMessage = vesWireFrameMessage(firstVesEvent)
@@ -91,7 +92,7 @@
 
     describe("Messages sent metrics") {
         it("should gather info for each topic separately") {
-            val sut = vesHvWithNoOpSink(twoDomainsToOneTopicConfiguration)
+            val sut = vesHvWithAlwaysSuccessfulSink(twoDomainsToOneTopicConfiguration)
 
             sut.handleConnection(
                     vesWireFrameMessage(PERF3GPP),
@@ -129,7 +130,7 @@
 
     describe("Messages dropped metrics") {
         it("should gather metrics for invalid messages") {
-            val sut = vesHvWithNoOpSink(basicConfiguration)
+            val sut = vesHvWithAlwaysSuccessfulSink(basicConfiguration)
 
             sut.handleConnection(
                     messageWithInvalidWireFrameHeader(),
@@ -145,7 +146,7 @@
         }
 
         it("should gather metrics for route not found") {
-            val sut = vesHvWithNoOpSink(basicConfiguration)
+            val sut = vesHvWithAlwaysSuccessfulSink(basicConfiguration)
 
             sut.handleConnection(
                     vesWireFrameMessage(domain = PERF3GPP),
@@ -158,8 +159,19 @@
                     .isEqualTo(1)
         }
 
+        it("should gather metrics for sing errors") {
+            val sut = vesHvWithAlwaysFailingSink(basicConfiguration)
+
+            sut.handleConnection(vesWireFrameMessage(domain = PERF3GPP))
+
+            val metrics = sut.metrics
+            assertThat(metrics.messagesDropped(KAFKA_FAILURE))
+                    .describedAs("messagesDroppedCause $KAFKA_FAILURE metric")
+                    .isEqualTo(1)
+        }
+
         it("should gather summed metrics for dropped messages") {
-            val sut = vesHvWithNoOpSink(basicConfiguration)
+            val sut = vesHvWithAlwaysSuccessfulSink(basicConfiguration)
 
             sut.handleConnection(
                     vesWireFrameMessage(domain = PERF3GPP),
@@ -183,7 +195,7 @@
             ).forEach { cause, vesMessage ->
                 on("cause $cause") {
                     it("should notify correct metrics") {
-                        val sut = vesHvWithNoOpSink()
+                        val sut = vesHvWithAlwaysSuccessfulSink()
 
                         sut.handleConnection(vesMessage)
 
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
index 7ebbfba..c3e4a58 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
@@ -73,12 +73,17 @@
     collector.handleConnection(Flux.fromArray(packets)).block(timeout)
 }
 
-fun vesHvWithNoOpSink(collectorConfiguration: CollectorConfiguration = basicConfiguration): Sut =
-        Sut(NoOpSink()).apply {
+fun vesHvWithAlwaysSuccessfulSink(collectorConfiguration: CollectorConfiguration = basicConfiguration): Sut =
+        Sut(AlwaysSuccessfulSink()).apply {
+            configurationProvider.updateConfiguration(collectorConfiguration)
+        }
+
+fun vesHvWithAlwaysFailingSink(collectorConfiguration: CollectorConfiguration = basicConfiguration): Sut =
+        Sut(AlwaysFailingSink()).apply {
             configurationProvider.updateConfiguration(collectorConfiguration)
         }
 
 fun vesHvWithDelayingSink(delay: Duration, collectorConfiguration: CollectorConfiguration = basicConfiguration): Sut =
-        Sut(ProcessingSink { it.delayElements(delay) }).apply {
+        Sut(DelayingSink(delay)).apply {
             configurationProvider.updateConfiguration(collectorConfiguration)
         }
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
index 3770913..db56e88 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
@@ -36,7 +36,6 @@
 const val ALTERNATE_PERF3GPP_TOPIC = "HV_VES_PERF3GPP_ALTERNATIVE"
 
 val basicConfiguration: CollectorConfiguration = CollectorConfiguration(
-        kafkaBootstrapServers = "localhost:9969",
         routing = routing {
             defineRoute {
                 fromDomain(PERF3GPP.domainName)
@@ -47,7 +46,6 @@
 )
 
 val twoDomainsToOneTopicConfiguration: CollectorConfiguration = CollectorConfiguration(
-        kafkaBootstrapServers = "localhost:9969",
         routing = routing {
             defineRoute {
                 fromDomain(PERF3GPP.domainName)
@@ -69,7 +67,6 @@
 
 
 val configurationWithDifferentRouting: CollectorConfiguration = CollectorConfiguration(
-        kafkaBootstrapServers = "localhost:9969",
         routing = routing {
             defineRoute {
                 fromDomain(PERF3GPP.domainName)
@@ -81,7 +78,6 @@
 
 
 val configurationWithoutRouting: CollectorConfiguration = CollectorConfiguration(
-        kafkaBootstrapServers = "localhost:9969",
         routing = routing {
         }.build()
 )
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
index 2f731f5..b4ce649 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
@@ -21,13 +21,17 @@
 
 import arrow.core.identity
 import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.model.ConsumedMessage
+import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage
+import org.onap.dcae.collectors.veshv.model.MessageDropCause
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage
 import org.reactivestreams.Publisher
 import reactor.core.publisher.Flux
+import java.time.Duration
 import java.util.*
 import java.util.concurrent.ConcurrentLinkedDeque
 import java.util.concurrent.atomic.AtomicLong
-import java.util.function.Function
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -39,8 +43,8 @@
     val sentMessages: List<RoutedMessage>
         get() = sent.toList()
 
-    override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> {
-        return messages.doOnNext(sent::addLast)
+    override fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> {
+        return messages.doOnNext(sent::addLast).map(::SuccessfullyConsumedMessage)
     }
 }
 
@@ -54,16 +58,23 @@
     val count: Long
         get() = atomicCount.get()
 
-    override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> {
+    override fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> {
         return messages.doOnNext {
             atomicCount.incrementAndGet()
-        }
+        }.map(::SuccessfullyConsumedMessage)
     }
 }
 
 
-open class ProcessingSink(val transformer: (Flux<RoutedMessage>) -> Publisher<RoutedMessage>) : Sink {
-    override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> = messages.transform(transformer)
+open class ProcessingSink(private val transformer: (Flux<RoutedMessage>) -> Publisher<ConsumedMessage>) : Sink {
+    override fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> =
+            messages.transform(transformer)
 }
 
-class NoOpSink : ProcessingSink(::identity)
+class AlwaysSuccessfulSink : ProcessingSink({ it.map(::SuccessfullyConsumedMessage) })
+
+class AlwaysFailingSink : ProcessingSink({ stream ->
+    stream.map { FailedToConsumeMessage(it, null, MessageDropCause.KAFKA_FAILURE) }
+})
+
+class DelayingSink(delay: Duration) : ProcessingSink({ it.delayElements(delay).map(::SuccessfullyConsumedMessage) })
diff --git a/sources/hv-collector-main/Dockerfile b/sources/hv-collector-main/Dockerfile
index ad7a03d..3322059 100644
--- a/sources/hv-collector-main/Dockerfile
+++ b/sources/hv-collector-main/Dockerfile
@@ -11,7 +11,7 @@
 
 WORKDIR /opt/ves-hv-collector
 
-ENTRYPOINT ["entry.sh"]
+ENTRYPOINT ["./entry.sh"]
 
 COPY target/libs/external/* ./
 COPY target/libs/internal/* ./
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt
index 9b985f6..ae87f1c 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt
@@ -27,10 +27,12 @@
 import org.apache.commons.cli.DefaultParser
 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
+import org.onap.dcae.collectors.veshv.model.KafkaConfiguration
 import org.onap.dcae.collectors.veshv.model.ServerConfiguration
 import org.onap.dcae.collectors.veshv.ssl.boundary.createSecurityConfiguration
 import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration
 import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CONSUL_CONFIG_URL
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_SERVERS
 import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CONSUL_FIRST_REQUEST_DELAY
 import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CONSUL_REQUEST_INTERVAL
 import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.DUMMY_MODE
@@ -52,6 +54,7 @@
 
 internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration>(DefaultParser()) {
     override val cmdLineOptionsList = listOf(
+            KAFKA_SERVERS,
             HEALTH_CHECK_API_PORT,
             LISTEN_PORT,
             CONSUL_CONFIG_URL,
@@ -73,6 +76,7 @@
                         HEALTH_CHECK_API_PORT,
                         DefaultValues.HEALTH_CHECK_API_PORT
                 )
+                val kafkaServers = cmdLine.stringValue(KAFKA_SERVERS).bind()
                 val listenPort = cmdLine.intValue(LISTEN_PORT).bind()
                 val idleTimeoutSec = cmdLine.longValue(IDLE_TIMEOUT_SEC, DefaultValues.IDLE_TIMEOUT_SEC)
                 val maxPayloadSizeBytes = cmdLine.intValue(MAXIMUM_PAYLOAD_SIZE_BYTES,
@@ -82,6 +86,7 @@
                 val configurationProviderParams = createConfigurationProviderParams(cmdLine).bind()
                 ServerConfiguration(
                         serverListenAddress = InetSocketAddress(listenPort),
+                        kafkaConfiguration = KafkaConfiguration(kafkaServers),
                         healthCheckApiListenAddress = InetSocketAddress(healthCheckApiPort),
                         configurationProviderParams = configurationProviderParams,
                         securityConfiguration = security,
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
index 288145a..f3bcf38 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
@@ -73,7 +73,7 @@
 
     init {
         registry.gauge(name(MESSAGES, PROCESSING, COUNT), this) {
-            (receivedMsgCount.count() - sentCount.count()).coerceAtLeast(0.0)
+            (receivedMsgCount.count() - sentCount.count() - droppedCount.count()).coerceAtLeast(0.0)
         }
 
         registry.gauge(name(CONNECTIONS, ACTIVE, COUNT), this) {
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
index f9be546..4e2e6d8 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
@@ -36,10 +36,9 @@
     override fun startServer(config: ServerConfiguration): IO<ServerHandle> = createVesServer(config).start()
 
     private fun createVesServer(config: ServerConfiguration): Server {
-        val sink = if (config.dummyMode) AdapterFactory.loggingSink() else AdapterFactory.kafkaSink()
         val collectorProvider = CollectorFactory(
                 AdapterFactory.consulConfigurationProvider(config.configurationProviderParams),
-                sink,
+                AdapterFactory.sinkCreatorFactory(config.dummyMode, config.kafkaConfiguration),
                 MicrometerMetrics.INSTANCE,
                 config.maximumPayloadSizeBytes
         ).createVesHvCollectorProvider()
diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt
index 1aac6a0..9dddeca 100644
--- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt
+++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt
@@ -39,6 +39,7 @@
  */
 object ArgVesHvConfigurationTest : Spek({
     lateinit var cut: ArgVesHvConfiguration
+    val kafkaBootstrapServers = "dmaap-mr-wro:6666,dmaap-mr-gda:6666"
     val healthCheckApiPort = "6070"
     val configurationUrl = "http://test-address/test"
     val firstRequestDelay = "10"
@@ -57,6 +58,7 @@
 
             beforeEachTest {
                 result = cut.parseExpectingSuccess(
+                        "--kafka-bootstrap-servers", kafkaBootstrapServers,
                         "--health-check-api-port", healthCheckApiPort,
                         "--listen-port", listenPort,
                         "--config-url", configurationUrl,
@@ -69,6 +71,10 @@
                 )
             }
 
+            it("should set proper kafka bootstrap servers") {
+                assertThat(result.kafkaConfiguration.bootstrapServers).isEqualTo(kafkaBootstrapServers)
+            }
+
             it("should set proper listen port") {
                 assertThat(result.serverListenAddress.port).isEqualTo(listenPort.toInt())
             }