Merge "Remove IO monad usage from simulators"
diff --git a/development/configuration/base.json b/development/configuration/base.json
index 5233f02..c89a828 100644
--- a/development/configuration/base.json
+++ b/development/configuration/base.json
@@ -16,18 +16,5 @@
       "trustStoreFile": "/etc/ves-hv/ssl/trust.p12",
       "trustStorePassword": "onaponap"
     }
-  },
-  "collector": {
-    "dummyMode": false,
-    "maxRequestSizeBytes": 1048576,
-    "kafkaServers": [
-      "message-router-kafka:9092"
-    ],
-    "routing": [
-      {
-        "fromDomain": "perf3gpp",
-        "toTopic": "HV_VES_PERF3GPP"
-      }
-    ]
   }
 }
\ No newline at end of file
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
index 9f8c552..efe0aa8 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
@@ -34,10 +34,14 @@
     private val configReader = FileConfigurationReader()
     private val configValidator = ConfigurationValidator()
 
+    private lateinit var initialConfig: HvVesConfiguration
+
     fun hvVesConfigurationUpdates(args: Array<String>): Flux<HvVesConfiguration> =
             Flux.just(cmd.parse(args))
                     .throwOnLeft { MissingArgumentException(it.message, it.cause) }
                     .map { it.reader().use(configReader::loadConfig) }
                     .map { configValidator.validate(it) }
                     .throwOnLeft { ValidationException(it.message) }
+                    .doOnNext { initialConfig = it }
+
 }
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Configuration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Configuration.kt
index 566f2c0..3375821 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Configuration.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Configuration.kt
@@ -49,6 +49,5 @@
 data class CollectorConfiguration(
         val maxRequestSizeBytes: Int,
         val kafkaServers: String,
-        val routing: Routing,
-        val dummyMode: Boolean = false
+        val routing: Routing
 )
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt
index aab8eca..e5a83ac 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/Routing.kt
@@ -19,59 +19,8 @@
  */
 package org.onap.dcae.collectors.veshv.config.api.model
 
-import arrow.core.Option
-import org.onap.dcae.collectors.veshv.domain.RoutedMessage
-import org.onap.dcae.collectors.veshv.domain.VesMessage
-import org.onap.ves.VesEventOuterClass.CommonEventHeader
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
 
-data class Routing(val routes: List<Route>) {
+data class Route(val domain: String, val sink: KafkaSink)
 
-    fun routeFor(commonHeader: CommonEventHeader): Option<Route> =
-            Option.fromNullable(routes.find { it.applies(commonHeader) })
-}
-
-data class Route(val domain: String, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int = { 0 }) {
-
-    fun applies(commonHeader: CommonEventHeader) = commonHeader.domain == domain
-
-    operator fun invoke(message: VesMessage): RoutedMessage =
-            RoutedMessage(targetTopic, partitioning(message.header), message)
-}
-
-
-/*
-HvVesConfiguration DSL
-*/
-
-fun routing(init: RoutingBuilder.() -> Unit): RoutingBuilder = RoutingBuilder().apply(init)
-
-class RoutingBuilder {
-    private val routes: MutableList<RouteBuilder> = mutableListOf()
-
-    fun defineRoute(init: RouteBuilder.() -> Unit): RouteBuilder = RouteBuilder()
-            .apply(init)
-            .also { routes.add(it) }
-
-    fun build() = Routing(routes.map { it.build() }.toList())
-}
-
-class RouteBuilder {
-
-    private lateinit var domain: String
-    private lateinit var targetTopic: String
-    private lateinit var partitioning: (CommonEventHeader) -> Int
-
-    fun fromDomain(domain: String): RouteBuilder = apply {
-        this.domain = domain
-    }
-
-    fun toTopic(targetTopic: String): RouteBuilder = apply {
-        this.targetTopic = targetTopic
-    }
-
-    fun withFixedPartitioning(num: Int = 0): RouteBuilder = apply {
-        partitioning = { num }
-    }
-
-    fun build() = Route(domain, targetTopic, partitioning)
-}
+typealias Routing = List<Route>
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt
new file mode 100644
index 0000000..f044492
--- /dev/null
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt
@@ -0,0 +1,97 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.config.impl
+
+
+import arrow.core.None
+import arrow.core.Option
+import arrow.core.Some
+import arrow.core.getOrElse
+import arrow.core.toOption
+import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
+
+/**
+ * @author Pawel Biniek <pawel.biniek@nokia.com>
+ * @since March 2019
+ */
+internal class ConfigurationMerger {
+    fun merge(base: PartialConfiguration, update: PartialConfiguration): PartialConfiguration =
+            PartialConfiguration(
+                    mergeServerConfig(base.server, update.server),
+                    mergeCbsConfig(base.cbs, update.cbs),
+                    mergeSecurityConfig(base.security, update.security),
+                    mergeCollectorConfig(base.collector, update.collector),
+                    mergeLogLevel(base.logLevel, update.logLevel)
+            )
+
+
+    private fun mergeServerConfig(baseOption: Option<PartialServerConfig>,
+                                  updateOption: Option<PartialServerConfig>) =
+            applyUpdate(baseOption, updateOption) { base, update ->
+                PartialServerConfig(
+                        base.listenPort.updateToGivenOrNone(update.listenPort),
+                        base.idleTimeoutSec.updateToGivenOrNone(update.idleTimeoutSec),
+                        base.maxPayloadSizeBytes.updateToGivenOrNone(update.maxPayloadSizeBytes)
+                )
+            }
+
+
+    private fun mergeCbsConfig(baseOption: Option<PartialCbsConfig>,
+                               updateOption: Option<PartialCbsConfig>) =
+            applyUpdate(baseOption, updateOption) { base, update ->
+                PartialCbsConfig(
+                        base.firstRequestDelaySec.updateToGivenOrNone(update.firstRequestDelaySec),
+                        base.requestIntervalSec.updateToGivenOrNone(update.requestIntervalSec)
+                )
+            }
+
+    private fun mergeSecurityConfig(baseOption: Option<PartialSecurityConfig>,
+                                    updateOption: Option<PartialSecurityConfig>) =
+            applyUpdate(baseOption, updateOption) { base, update ->
+                PartialSecurityConfig(
+                        base.keys.updateToGivenOrNone(update.keys)
+                )
+            }
+
+    private fun mergeCollectorConfig(baseOption: Option<PartialCollectorConfig>,
+                                     updateOption: Option<PartialCollectorConfig>) =
+            applyUpdate(baseOption, updateOption) { base, update ->
+                PartialCollectorConfig(
+                        base.maxRequestSizeBytes.updateToGivenOrNone(update.maxRequestSizeBytes),
+                        base.kafkaServers.updateToGivenOrNone(update.kafkaServers),
+                        base.routing.updateToGivenOrNone(update.routing)
+                )
+            }
+
+
+    private fun mergeLogLevel(base: Option<LogLevel>, update: Option<LogLevel>) =
+            base.updateToGivenOrNone(update)
+}
+
+private fun <T> applyUpdate(base: Option<T>, update: Option<T>, overrider: (base: T, update: T) -> T) =
+        when {
+            base is Some && update is Some -> overrider(base.t, update.t).toOption()
+            base is Some && update is None -> base
+            base is None && update is Some -> update
+            else -> None
+        }
+
+private fun <T> Option<T>.updateToGivenOrNone(update: Option<T>) =
+        update.getOrElse(this::orNull).toOption()
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt
index 90be3db..04bba7e 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt
@@ -22,7 +22,6 @@
 import arrow.core.Either
 import arrow.core.None
 import arrow.core.Option
-import arrow.core.Some
 import arrow.core.getOrElse
 import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
 import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
@@ -54,14 +53,20 @@
 
         val securityConfiguration = SecurityConfiguration(partialConfig.security.bind().keys)
 
-        val collectorConfiguration = partialConfig.collector.bind()
-                .let { createCollectorConfig(it).bind() }
+// TOD0: retrieve when ConfigurationMerger is implemented
+//        val collectorConfiguration = partialConfig.collector.bind()
+//                .let { createCollectorConfig(it).bind() }
 
         HvVesConfiguration(
                 serverConfiguration,
                 cbsConfiguration,
                 securityConfiguration,
-                collectorConfiguration,
+// TOD0: swap when ConfigurationMerger is implemented
+//                    collectorConfiguration
+                CollectorConfiguration(-1,
+                        "I do not exist. I'm not even a URL :o",
+                        emptyList()),
+// end TOD0
                 logLevel
         )
     }.toEither { ValidationError("Some required configuration options are missing") }
@@ -79,7 +84,7 @@
             partial.mapBinding {
                 ServerConfiguration(
                         it.listenPort.bind(),
-                        Duration.ofSeconds(it.idleTimeoutSec.bind().toLong()),
+                        it.idleTimeoutSec.bind(),
                         it.maxPayloadSizeBytes.bind()
                 )
             }
@@ -87,20 +92,20 @@
     private fun createCbsConfiguration(partial: PartialCbsConfig) =
             partial.mapBinding {
                 CbsConfiguration(
-                        Duration.ofSeconds(it.firstRequestDelaySec.bind().toLong()),
-                        Duration.ofSeconds(it.requestIntervalSec.bind().toLong())
+                        it.firstRequestDelaySec.bind(),
+                        it.requestIntervalSec.bind()
                 )
             }
 
-    private fun createCollectorConfig(partial: PartialCollectorConfig) =
-            partial.mapBinding {
-                CollectorConfiguration(
-                        it.maxRequestSizeBytes.bind(),
-                        toKafkaServersString(it.kafkaServers.bind()),
-                        it.routing.bind(),
-                        it.dummyMode.bind()
-                )
-            }
+// TOD0: retrieve when ConfigurationMerger is implemented
+//    private fun createCollectorConfig(partial: PartialCollectorConfig) =
+//            partial.mapBinding {
+//                CollectorConfiguration(
+//                        it.maxRequestSizeBytes.bind(),
+//                        toKafkaServersString(it.kafkaServers.bind()),
+//                        it.routing.bind()
+//                )
+//            }
 
     private fun toKafkaServersString(kafkaServers: List<InetSocketAddress>): String =
             kafkaServers.joinToString(",") { "${it.hostName}:${it.port}" }
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReader.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReader.kt
index 1e77dde..9513107 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReader.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReader.kt
@@ -21,16 +21,12 @@
 
 import arrow.core.Option
 import com.google.gson.GsonBuilder
-import org.onap.dcae.collectors.veshv.config.api.model.Route
-import org.onap.dcae.collectors.veshv.config.api.model.Routing
-import org.onap.dcae.collectors.veshv.config.impl.gsonadapters.AddressAdapter
+import org.onap.dcae.collectors.veshv.config.impl.gsonadapters.DurationOfSecondsAdapter
 import org.onap.dcae.collectors.veshv.config.impl.gsonadapters.OptionAdapter
-import org.onap.dcae.collectors.veshv.config.impl.gsonadapters.RouteAdapter
-import org.onap.dcae.collectors.veshv.config.impl.gsonadapters.RoutingAdapter
 import org.onap.dcae.collectors.veshv.config.impl.gsonadapters.SecurityAdapter
-import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys
+
 import java.io.Reader
-import java.net.InetSocketAddress
+import java.time.Duration
 
 /**
  * @author Pawel Biniek <pawel.biniek@nokia.com>
@@ -38,11 +34,9 @@
  */
 internal class FileConfigurationReader {
     private val gson = GsonBuilder()
-            .registerTypeAdapter(InetSocketAddress::class.java, AddressAdapter())
-            .registerTypeAdapter(Route::class.java, RouteAdapter())
-            .registerTypeAdapter(Routing::class.java, RoutingAdapter())
             .registerTypeAdapter(Option::class.java, OptionAdapter())
             .registerTypeAdapter(PartialSecurityConfig::class.java, SecurityAdapter())
+            .registerTypeAdapter(Duration::class.java, DurationOfSecondsAdapter())
             .create()
 
     fun loadConfig(input: Reader): PartialConfiguration =
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt
index 3e6df3e..a27998e 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt
@@ -25,6 +25,7 @@
 import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
 import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys
 import java.net.InetSocketAddress
+import java.time.Duration
 
 /**
  * @author Pawel Biniek <pawel.biniek@nokia.com>
@@ -40,19 +41,18 @@
 
 internal data class PartialServerConfig(
         val listenPort: Option<Int> = None,
-        val idleTimeoutSec: Option<Int> = None,
+        val idleTimeoutSec: Option<Duration> = None,
         val maxPayloadSizeBytes: Option<Int> = None
 )
 
 internal data class PartialCbsConfig(
-        val firstRequestDelaySec: Option<Int> = None,
-        val requestIntervalSec: Option<Int> = None
+        val firstRequestDelaySec: Option<Duration> = None,
+        val requestIntervalSec: Option<Duration> = None
 )
 
 internal data class PartialSecurityConfig(val keys: Option<SecurityKeys> = None)
 
 internal data class PartialCollectorConfig(
-        val dummyMode: Option<Boolean> = None,
         val maxRequestSizeBytes: Option<Int> = None,
         val kafkaServers: Option<List<InetSocketAddress>> = None,
         val routing: Option<Routing> = None
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/AddressAdapter.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/AddressAdapter.kt
deleted file mode 100644
index 255be03..0000000
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/AddressAdapter.kt
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2019 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.config.impl.gsonadapters
-
-import com.google.gson.JsonDeserializationContext
-import com.google.gson.JsonDeserializer
-import com.google.gson.JsonElement
-import com.google.gson.JsonParseException
-import java.lang.reflect.Type
-import java.net.InetSocketAddress
-
-/**
- * @author Pawel Biniek <pawel.biniek@nokia.com>
- * @since February 2019
- */
-internal class AddressAdapter : JsonDeserializer<InetSocketAddress> {
-    override fun deserialize(
-            json: JsonElement,
-            typeOfT: Type,
-            context: JsonDeserializationContext): InetSocketAddress {
-        val portStart = json.asString.lastIndexOf(":")
-        if (portStart > 0) {
-            val address = json.asString.substring(0, portStart)
-            val port = json.asString.substring(portStart + 1)
-            return InetSocketAddress(address, port.toInt())
-        } else throw InvalidAddressException("Cannot parse '" + json.asString + "' to address")
-    }
-
-    class InvalidAddressException(reason: String) : RuntimeException(reason)
-}
-
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/RoutingAdapter.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/DurationOfSecondsAdapter.kt
similarity index 74%
rename from sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/RoutingAdapter.kt
rename to sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/DurationOfSecondsAdapter.kt
index 4b29909..99da110 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/RoutingAdapter.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/DurationOfSecondsAdapter.kt
@@ -22,19 +22,15 @@
 import com.google.gson.JsonDeserializationContext
 import com.google.gson.JsonDeserializer
 import com.google.gson.JsonElement
-import com.google.gson.reflect.TypeToken
-import org.onap.dcae.collectors.veshv.config.api.model.Route
-import org.onap.dcae.collectors.veshv.config.api.model.Routing
 import java.lang.reflect.Type
+import java.time.Duration
 
 /**
  * @author Pawel Biniek <pawel.biniek@nokia.com>
  * @since March 2019
  */
-internal class RoutingAdapter : JsonDeserializer<Routing> {
-    override fun deserialize(json: JsonElement, typeOfT: Type, context: JsonDeserializationContext): Routing {
-        val parametrizedType = TypeToken.getParameterized(List::class.java, Route::class.java).type
-        return Routing(context.deserialize<List<Route>>(json, parametrizedType))
-    }
+class DurationOfSecondsAdapter : JsonDeserializer<Duration> {
+    override fun deserialize(json: JsonElement, typeOfT: Type, context: JsonDeserializationContext) =
+        Duration.ofSeconds(json.asLong)
 
 }
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/RouteAdapter.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/RouteAdapter.kt
deleted file mode 100644
index 25cb886..0000000
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/RouteAdapter.kt
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2019 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.config.impl.gsonadapters
-
-import com.google.gson.JsonDeserializationContext
-import com.google.gson.JsonDeserializer
-import com.google.gson.JsonElement
-import org.onap.dcae.collectors.veshv.config.api.model.Route
-import org.onap.dcae.collectors.veshv.config.api.model.RouteBuilder
-import java.lang.reflect.Type
-
-/**
- * @author Pawel Biniek <pawel.biniek@nokia.com>
- * @since March 2019
- */
-internal class RouteAdapter : JsonDeserializer<Route> {
-    override fun deserialize(json: JsonElement, typeOfT: Type, context: JsonDeserializationContext?): Route {
-        val jobj = json.asJsonObject
-        return RouteBuilder()
-                .fromDomain(jobj["fromDomain"].asString)
-                .toTopic(jobj["toTopic"].asString)
-                .withFixedPartitioning()
-                .build()
-    }
-
-}
diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMergerTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMergerTest.kt
new file mode 100644
index 0000000..d5b18e6
--- /dev/null
+++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMergerTest.kt
@@ -0,0 +1,102 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.config.impl
+
+import arrow.core.Some
+import org.jetbrains.spek.api.Spek
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
+import java.io.InputStreamReader
+import java.io.Reader
+import java.time.Duration
+
+/**
+ * @author Pawel Biniek <pawel.biniek@nokia.com>
+ * @since February 2019
+ */
+internal object ConfigurationMergerTest : Spek({
+    describe("Merges partial configurations into one") {
+        it("merges single parameter into empty config") {
+            val actual = PartialConfiguration()
+            val diff = PartialConfiguration(logLevel = Some(LogLevel.INFO))
+
+            val result = ConfigurationMerger().merge(actual, diff)
+
+            assertThat(result.logLevel).isEqualTo(Some(LogLevel.INFO))
+        }
+
+        it("merges single embedded parameter into empty config") {
+            val actual = PartialConfiguration()
+            val serverConfig = PartialServerConfig(listenPort = Some(45))
+            val diff = PartialConfiguration(server = Some(serverConfig))
+
+            val result = ConfigurationMerger().merge(actual, diff)
+
+            assertThat(result.server).isEqualTo(Some(serverConfig))
+        }
+
+        it("merges single parameter into full config") {
+            val actual = FileConfigurationReader().loadConfig(
+                    InputStreamReader(
+                            FileConfigurationReaderTest.javaClass.getResourceAsStream("/sampleConfig.json")) as Reader)
+            val diff = PartialConfiguration(logLevel = Some(LogLevel.INFO))
+
+            val result = ConfigurationMerger().merge(actual, diff)
+
+            assertThat(result.logLevel).isEqualTo(Some(LogLevel.INFO))
+        }
+
+        it("merges single embedded parameter into full config") {
+            val actual = FileConfigurationReader().loadConfig(
+                    InputStreamReader(
+                            FileConfigurationReaderTest.javaClass.getResourceAsStream("/sampleConfig.json")) as Reader)
+            val serverConfig = PartialServerConfig(listenPort = Some(45))
+            val diff = PartialConfiguration(server = Some(serverConfig))
+
+            val result = ConfigurationMerger().merge(actual, diff)
+
+            assertThat(result.server.orNull()?.listenPort).isEqualTo(serverConfig.listenPort)
+            assertThat(result.server.orNull()?.idleTimeoutSec?.isEmpty()).isFalse()
+            assertThat(result.server.orNull()?.idleTimeoutSec).isEqualTo(Some(Duration.ofSeconds(1200)))
+            assertThat(result.server.orNull()?.maxPayloadSizeBytes?.isEmpty()).isFalse()
+            assertThat(result.server.orNull()?.maxPayloadSizeBytes).isEqualTo(Some(512000))
+        }
+
+        it("merges full config into single parameter") {
+            val actual = PartialConfiguration(logLevel = Some(LogLevel.INFO))
+            val diff = FileConfigurationReader().loadConfig(
+                    InputStreamReader(
+                            FileConfigurationReaderTest.javaClass.getResourceAsStream("/sampleConfig.json")) as Reader)
+
+            val result = ConfigurationMerger().merge(actual, diff)
+
+            assertThat(result.logLevel).isEqualTo(Some(LogLevel.ERROR))
+            assertThat(result.server.isEmpty()).isFalse()
+            assertThat(result.server.orNull()?.maxPayloadSizeBytes).isEqualTo(Some(512000))
+            assertThat(result.server.orNull()?.idleTimeoutSec).isEqualTo(Some(Duration.ofSeconds(1200)))
+
+            assertThat(result.security.isEmpty()).isFalse()
+            assertThat(result.cbs.isEmpty()).isFalse()
+        }
+    }
+})
+
diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt
index beb5df6..4b89488 100644
--- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt
+++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt
@@ -28,7 +28,7 @@
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.config.api.model.routing
+import org.onap.dcae.collectors.veshv.config.api.model.Routing
 import org.onap.dcae.collectors.veshv.config.impl.ConfigurationValidator.Companion.DEFAULT_LOG_LEVEL
 import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
 import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys
@@ -53,21 +53,20 @@
             val config = PartialConfiguration(
                     Some(PartialServerConfig(
                             Some(1),
-                            Some(2),
+                            Some(Duration.ofSeconds(2)),
                             Some(3)
                     )),
                     Some(PartialCbsConfig(
-                            Some(5),
-                            Some(3)
+                            Some(Duration.ofSeconds(5)),
+                            Some(Duration.ofSeconds(3))
                     )),
                     Some(PartialSecurityConfig(
                             Some(mock())
                     )),
                     Some(PartialCollectorConfig(
-                            Some(true),
                             Some(4),
                             Some(emptyList()),
-                            Some(routing { }.build())
+                            someFromEmptyRouting
                     )),
                     None
             )
@@ -86,10 +85,9 @@
         }
 
         describe("validating complete configuration") {
-            val idleTimeoutSec = 10
-            val firstReqDelaySec = 10
+            val idleTimeoutSec = Duration.ofSeconds(10L)
+            val firstReqDelaySec = Duration.ofSeconds(10L)
             val securityKeys = Some(mock<SecurityKeys>())
-            val routing = routing { }.build()
 
             val config = PartialConfiguration(
                     Some(PartialServerConfig(
@@ -99,16 +97,15 @@
                     )),
                     Some(PartialCbsConfig(
                             Some(firstReqDelaySec),
-                            Some(3)
+                            Some(Duration.ofSeconds(3))
                     )),
                     Some(PartialSecurityConfig(
                             securityKeys
                     )),
                     Some(PartialCollectorConfig(
-                            Some(true),
                             Some(4),
                             Some(emptyList()),
-                            Some(routing)
+                            someFromEmptyRouting
                     )),
                     Some(LogLevel.INFO)
             )
@@ -121,26 +118,25 @@
                         },
                         {
                             assertThat(it.server.idleTimeout)
-                                    .isEqualTo(Duration.ofSeconds(idleTimeoutSec.toLong()))
+                                    .isEqualTo(idleTimeoutSec)
 
                             assertThat(it.security.keys)
                                     .isEqualTo(securityKeys)
 
                             assertThat(it.cbs.firstRequestDelay)
-                                    .isEqualTo(Duration.ofSeconds(firstReqDelaySec.toLong()))
+                                    .isEqualTo(firstReqDelaySec)
 
                             assertThat(it.collector.routing)
-                                    .isEqualTo(routing)
+                                    .isEqualTo(emptyRouting)
                         }
                 )
             }
         }
 
         describe("validating configuration with security disabled") {
-            val idleTimeoutSec = 10
-            val firstReqDelaySec = 10
+            val idleTimeoutSec = Duration.ofSeconds(10)
+            val firstReqDelaySec = Duration.ofSeconds(10)
             val securityKeys: Option<SecurityKeys> = None
-            val routing = routing { }.build()
 
             val config = PartialConfiguration(
                     Some(PartialServerConfig(
@@ -150,16 +146,15 @@
                     )),
                     Some(PartialCbsConfig(
                             Some(firstReqDelaySec),
-                            Some(3)
+                            Some(Duration.ofSeconds(3))
                     )),
                     Some(PartialSecurityConfig(
                             securityKeys
                     )),
                     Some(PartialCollectorConfig(
-                            Some(true),
                             Some(4),
                             Some(emptyList()),
-                            Some(routing)
+                            someFromEmptyRouting
                     )),
                     Some(LogLevel.INFO)
             )
@@ -172,16 +167,16 @@
                         },
                         {
                             assertThat(it.server.idleTimeout)
-                                    .isEqualTo(Duration.ofSeconds(idleTimeoutSec.toLong()))
+                                    .isEqualTo(idleTimeoutSec)
 
                             assertThat(it.security.keys)
                                     .isEqualTo(securityKeys)
 
                             assertThat(it.cbs.firstRequestDelay)
-                                    .isEqualTo(Duration.ofSeconds(firstReqDelaySec.toLong()))
+                                    .isEqualTo(firstReqDelaySec)
 
                             assertThat(it.collector.routing)
-                                    .isEqualTo(routing)
+                                    .isEqualTo(emptyRouting)
                         }
                 )
             }
@@ -189,3 +184,6 @@
 
     }
 })
+
+val emptyRouting: Routing = emptyList()
+val someFromEmptyRouting = Some(emptyRouting)
diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReaderTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReaderTest.kt
index 8267e30..4e35bfb 100644
--- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReaderTest.kt
+++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReaderTest.kt
@@ -24,11 +24,11 @@
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.config.api.model.Routing
 import org.onap.dcae.collectors.veshv.tests.utils.resourceAsStream
 import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
 import java.io.StringReader
 import java.net.InetSocketAddress
+import java.time.Duration
 
 /**
  * @author Pawel Biniek <pawel.biniek@nokia.com>
@@ -59,50 +59,6 @@
                 assertThat(config.server.orNull()?.listenPort).isEqualTo(Some(12003))
             }
 
-            it("parses ip address") {
-                val input = """{  "collector" : {
-                    "kafkaServers": [
-                      "192.168.255.1:5005",
-                      "192.168.255.26:5006"
-                    ]
-                  }
-                }"""
-
-                val config = cut.loadConfig(StringReader(input))
-                assertThat(config.collector.nonEmpty()).isTrue()
-                val collector = config.collector.orNull() as PartialCollectorConfig
-                assertThat(collector.kafkaServers.nonEmpty()).isTrue()
-                val addresses = collector.kafkaServers.orNull() as List<InetSocketAddress>
-                assertThat(addresses)
-                        .isEqualTo(listOf(
-                                InetSocketAddress("192.168.255.1", 5005),
-                                InetSocketAddress("192.168.255.26", 5006)
-                        ))
-            }
-
-            it("parses routing array with RoutingAdapter") {
-                val input = """{
-                    "collector" : {
-                        "routing" : [
-                            {
-                              "fromDomain": "perf3gpp",
-                              "toTopic": "HV_VES_PERF3GPP"
-                            }
-                        ]
-                    }
-                }""".trimIndent()
-                val config = cut.loadConfig(StringReader(input))
-                assertThat(config.collector.nonEmpty()).isTrue()
-                val collector = config.collector.orNull() as PartialCollectorConfig
-                assertThat(collector.routing.nonEmpty()).isTrue()
-                val routing = collector.routing.orNull() as Routing
-                routing.run {
-                    assertThat(routes.size).isEqualTo(1)
-                    assertThat(routes[0].domain).isEqualTo("perf3gpp")
-                    assertThat(routes[0].targetTopic).isEqualTo("HV_VES_PERF3GPP")
-                }
-            }
-
             it("parses disabled security configuration") {
                 val input = """{
                     "security": {
@@ -139,22 +95,13 @@
 
                 assertThat(config.cbs.nonEmpty()).isTrue()
                 val cbs = config.cbs.orNull() as PartialCbsConfig
-                assertThat(cbs.firstRequestDelaySec).isEqualTo(Some(7))
-                assertThat(cbs.requestIntervalSec).isEqualTo(Some(900))
-
-                assertThat(config.collector.nonEmpty()).isTrue()
-                val collector = config.collector.orNull() as PartialCollectorConfig
-                collector.run {
-                    assertThat(dummyMode).isEqualTo(Some(false))
-                    assertThat(maxRequestSizeBytes).isEqualTo(Some(512000))
-                    assertThat(kafkaServers.nonEmpty()).isTrue()
-                    assertThat(routing.nonEmpty()).isTrue()
-                }
+                assertThat(cbs.firstRequestDelaySec).isEqualTo(Some(Duration.ofSeconds(7)))
+                assertThat(cbs.requestIntervalSec).isEqualTo(Some(Duration.ofSeconds(900)))
 
                 assertThat(config.server.nonEmpty()).isTrue()
                 val server = config.server.orNull() as PartialServerConfig
                 server.run {
-                    assertThat(idleTimeoutSec).isEqualTo(Some(1200))
+                    assertThat(idleTimeoutSec).isEqualTo(Some(Duration.ofSeconds(1200)))
                     assertThat(listenPort).isEqualTo(Some(6000))
                     assertThat(maxPayloadSizeBytes).isEqualTo(Some(512000))
                 }
diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/AddressAdapterTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/AddressAdapterTest.kt
deleted file mode 100644
index f70c433..0000000
--- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/gsonadapters/AddressAdapterTest.kt
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2019 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.config.impl.gsonadapters
-
-import com.google.gson.Gson
-import com.google.gson.JsonDeserializationContext
-import com.google.gson.JsonParseException
-import com.google.gson.reflect.TypeToken
-import com.nhaarman.mockitokotlin2.mock
-import org.assertj.core.api.Assertions.assertThat
-import org.jetbrains.spek.api.Spek
-import org.jetbrains.spek.api.dsl.describe
-import org.jetbrains.spek.api.dsl.given
-import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.config.impl.gsonadapters.AddressAdapter.InvalidAddressException
-import java.lang.NumberFormatException
-import kotlin.test.assertFailsWith
-
-
-internal object AddressAdapterTest : Spek({
-
-    describe("deserialization") {
-        val gson = Gson()
-        val context = mock<JsonDeserializationContext>()
-        val addressAdapterType = TypeToken.get(AddressAdapter::class.java).type
-
-        val cut = AddressAdapter()
-
-        given("valid string") {
-            val address = "hostname:9000"
-            val json = gson.toJsonTree(address)
-
-            it("should return address") {
-                val deserialized = cut.deserialize(json, addressAdapterType, context)
-
-                assertThat(deserialized.hostName).isEqualTo("hostname")
-                assertThat(deserialized.port).isEqualTo(9000)
-            }
-        }
-
-        val invalidAddresses = mapOf(
-                Pair("missingPort", InvalidAddressException::class),
-                Pair("NaNPort:Hey", NumberFormatException::class),
-                Pair(":6036", InvalidAddressException::class))
-
-        invalidAddresses.forEach { address, exception ->
-            given("invalid address string: $address") {
-
-                val json = gson.toJsonTree(address)
-                it("should throw exception") {
-                    assertFailsWith(exception) {
-                        cut.deserialize(json, addressAdapterType, context)
-                    }
-                }
-            }
-        }
-    }
-})
-
-
diff --git a/sources/hv-collector-configuration/src/test/resources/sampleConfig.json b/sources/hv-collector-configuration/src/test/resources/sampleConfig.json
index b49085e..07f0702 100644
--- a/sources/hv-collector-configuration/src/test/resources/sampleConfig.json
+++ b/sources/hv-collector-configuration/src/test/resources/sampleConfig.json
@@ -18,19 +18,5 @@
       "trustStoreFile": "trust.ks.pkcs12",
       "trustStorePassword": "changeMeToo"
     }
-  },
-  "collector": {
-    "dummyMode": false,
-    "maxRequestSizeBytes": 512000,
-    "kafkaServers": [
-      "192.168.255.1:5005",
-      "192.168.255.1:5006"
-    ],
-    "routing": [
-      {
-        "fromDomain": "perf3gpp",
-        "toTopic": "HV_VES_PERF3GPP"
-      }
-    ]
   }
-}
\ No newline at end of file
+}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
index 0a1e2d4..1b92d90 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@
  */
 package org.onap.dcae.collectors.veshv.boundary
 
+import org.onap.dcae.collectors.veshv.config.api.model.Routing
 import org.onap.dcae.collectors.veshv.domain.RoutedMessage
 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 import org.onap.dcae.collectors.veshv.model.ClientContext
@@ -26,13 +27,21 @@
 import org.onap.dcae.collectors.veshv.model.ConsumedMessage
 import org.onap.dcae.collectors.veshv.model.MessageDropCause
 import org.onap.dcae.collectors.veshv.utils.Closeable
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
+import org.onap.dcaegen2.services.sdk.model.streams.SinkStream
 import reactor.core.publisher.Flux
 
-interface Sink {
+interface Sink : Closeable {
+    fun send(message: RoutedMessage) = send(Flux.just(message))
+
     fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage>
 }
 
+interface SinkProvider : Closeable {
+    operator fun invoke(stream: SinkStream, ctx: ClientContext): Lazy<Sink>
+}
+
+typealias ConfigurationProvider = () -> Flux<Routing>
+
 interface Metrics {
     fun notifyBytesReceived(size: Int)
     fun notifyMessageReceived(msg: WireFrameMessage)
@@ -42,11 +51,3 @@
     fun notifyClientConnected()
     fun notifyClientRejected(cause: ClientRejectionCause)
 }
-
-interface SinkProvider : Closeable {
-    operator fun invoke(ctx: ClientContext): Sink
-}
-
-interface ConfigurationProvider {
-    operator fun invoke(): Flux<Sequence<KafkaSink>>
-}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
index fa4f967..2b29acd 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
@@ -25,6 +25,7 @@
 import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
 import org.onap.dcae.collectors.veshv.boundary.Metrics
 import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.config.api.model.Routing
 import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
@@ -36,7 +37,6 @@
 import org.onap.dcae.collectors.veshv.model.ServiceContext
 import org.onap.dcae.collectors.veshv.utils.arrow.getOption
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
 import java.util.concurrent.atomic.AtomicReference
 
 /**
@@ -50,7 +50,7 @@
                        private val healthState: HealthState = HealthState.INSTANCE) {
 
     fun createVesHvCollectorProvider(): CollectorProvider {
-        val config = AtomicReference<Sequence<KafkaSink>>()
+        val config = AtomicReference<Routing>()
         configuration()
                 .doOnNext {
                     logger.info(ServiceContext::mdc) { "Using updated configuration for new connections" }
@@ -71,17 +71,15 @@
         }
     }
 
-    private fun createVesHvCollector(kafkaSinks: Sequence<KafkaSink>, ctx: ClientContext): Collector =
+    private fun createVesHvCollector(routing: Routing, ctx: ClientContext): Collector =
             VesHvCollector(
                     clientContext = ctx,
                     wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maxPayloadSizeBytes), ctx),
                     protobufDecoder = VesDecoder(),
-                    router = Router(kafkaSinks, ctx),
-                    sink = sinkProvider(ctx),
+                    router = Router(routing, sinkProvider, ctx, metrics),
                     metrics = metrics)
 
     companion object {
         private val logger = Logger(CollectorFactory::class)
     }
 }
-
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
index d2c35cb..6e2e20f 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -19,39 +19,75 @@
  */
 package org.onap.dcae.collectors.veshv.impl
 
-import arrow.core.Option
+import arrow.core.None
+import arrow.core.toOption
+import org.onap.dcae.collectors.veshv.boundary.Metrics
+import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.config.api.model.Route
 import org.onap.dcae.collectors.veshv.config.api.model.Routing
-import org.onap.dcae.collectors.veshv.config.api.model.routing
 import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug
 import org.onap.dcae.collectors.veshv.domain.RoutedMessage
 import org.onap.dcae.collectors.veshv.domain.VesMessage
+import org.onap.dcae.collectors.veshv.model.ConsumedMessage
+import org.onap.dcae.collectors.veshv.model.MessageDropCause
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
+import reactor.core.publisher.Flux
 
-class Router(private val routing: Routing, private val ctx: ClientContext) {
+class Router internal constructor(private val routing: Routing,
+                                  private val messageSinks: Map<String, Lazy<Sink>>,
+                                  private val ctx: ClientContext,
+                                  private val metrics: Metrics) {
+    constructor(routing: Routing,
+                sinkProvider: SinkProvider,
+                ctx: ClientContext,
+                metrics: Metrics) :
+            this(routing,
+                    constructMessageSinks(routing, sinkProvider, ctx),
+                    ctx,
+                    metrics) {
+        logger.debug(ctx::mdc) { "Routing for client: $routing" }
+        logger.trace(ctx::mdc) { "Message sinks configured for client: $messageSinks" }
+    }
 
-    constructor(kafkaSinks: Sequence<KafkaSink>, ctx: ClientContext) : this(
-            routing {
-                kafkaSinks.forEach {
-                    defineRoute {
-                        fromDomain(it.name())
-                        toTopic(it.topicName())
-                        withFixedPartitioning()
+    fun route(message: VesMessage): Flux<ConsumedMessage> =
+            routeFor(message.header)
+                    .fold({
+                        metrics.notifyMessageDropped(MessageDropCause.ROUTE_NOT_FOUND)
+                        logger.warn(ctx::fullMdc) { "Could not find route for message ${message.header}" }
+                        logger.trace(ctx::fullMdc) { "Routing available for client: ${routing}" }
+                        Flux.empty<Route>()
+                    }, {
+                        logger.trace(ctx::fullMdc) { "Found route for message: $it. Assigned partition: $NONE_PARTITION" }
+                        Flux.just(it)
+                    })
+                    .flatMap {
+                        val sinkTopic = it.sink.topicName()
+                        messageSinkFor(sinkTopic).send(RoutedMessage(message, sinkTopic, NONE_PARTITION))
                     }
-                }
-            }.build(),
-            ctx
-    )
 
-    fun findDestination(message: VesMessage): Option<RoutedMessage> =
-            routing.routeFor(message.header).map { it(message) }.also {
-                if (it.isEmpty()) {
-                    logger.debug(ctx) { "No route is defined for domain: ${message.header.domain}" }
-                }
+    private fun routeFor(header: CommonEventHeader) =
+            routing.find { it.domain == header.domain }.toOption()
+
+    private fun messageSinkFor(sinkTopic: String) = messageSinks
+            .getOrElse(sinkTopic) {
+                throw MissingMessageSinkException("No message sink configured for sink with topic $sinkTopic")
             }
 
     companion object {
-        private val logger = Logger(Routing::class)
+        private val logger = Logger(Router::class)
+        private val NONE_PARTITION = None
+
+        internal fun constructMessageSinks(routing: Routing,
+                                           sinkProvider: SinkProvider,
+                                           ctx: ClientContext) =
+                routing.map(Route::sink)
+                        .distinctBy { it.topicName() }
+                        .associateBy({ it.topicName() }, { sinkProvider(it, ctx) })
     }
+
+    private fun Lazy<Sink>.send(message: RoutedMessage) = value.send(message)
 }
+
+internal class MissingMessageSinkException(msg: String) : Throwable(msg)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
index 6a2792c..433e4d5 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -22,7 +22,6 @@
 import io.netty.buffer.ByteBuf
 import org.onap.dcae.collectors.veshv.boundary.Collector
 import org.onap.dcae.collectors.veshv.boundary.Metrics
-import org.onap.dcae.collectors.veshv.boundary.Sink
 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError
 import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
@@ -31,15 +30,12 @@
 import org.onap.dcae.collectors.veshv.model.ConsumedMessage
 import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage
 import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
-import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
 import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage
 import org.onap.dcae.collectors.veshv.domain.VesMessage
-import org.onap.dcae.collectors.veshv.utils.arrow.doOnEmpty
 import org.onap.dcae.collectors.veshv.utils.arrow.doOnFailure
 import org.onap.dcae.collectors.veshv.utils.arrow.doOnLeft
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.onap.dcae.collectors.veshv.utils.logging.MessageEither
-import org.onap.dcae.collectors.veshv.utils.logging.filterEmptyWithLog
 import org.onap.dcae.collectors.veshv.utils.logging.filterFailedWithLog
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
@@ -53,7 +49,6 @@
         private val wireChunkDecoder: WireChunkDecoder,
         private val protobufDecoder: VesDecoder,
         private val router: Router,
-        private val sink: Sink,
         private val metrics: Metrics) : Collector {
 
     override fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void> =
@@ -62,10 +57,10 @@
                     .transform(::filterInvalidWireFrame)
                     .transform(::decodeProtobufPayload)
                     .transform(::filterInvalidProtobufMessages)
-                    .transform(::routeMessage)
-                    .onErrorResume {
-                        metrics.notifyClientRejected(ClientRejectionCause.fromThrowable(it))
-                        logger.handleReactiveStreamError(clientContext, it) }
+                    // TOD0: try to remove new flux creation in Sink interface to avoid two calls to handleErrors here
+                    .handleErrors()
+                    .transform(::route)
+                    .handleErrors()
                     .doFinally { releaseBuffersMemory() }
                     .then()
 
@@ -98,18 +93,10 @@
                         .doOnLeft { metrics.notifyMessageDropped(INVALID_MESSAGE) }
             }
 
-    private fun routeMessage(flux: Flux<VesMessage>): Flux<ConsumedMessage> = flux
-            .flatMap(this::findRoute)
-            .compose(sink::send)
+    private fun route(flux: Flux<VesMessage>) = flux
+            .flatMap(router::route)
             .doOnNext(this::updateSinkMetrics)
 
-    private fun findRoute(msg: VesMessage) = router
-            .findDestination(msg)
-            .doOnEmpty { metrics.notifyMessageDropped(ROUTE_NOT_FOUND) }
-            .filterEmptyWithLog(logger, clientContext::fullMdc,
-                    { "Found route for message: ${it.topic}, partition: ${it.partition}" },
-                    { "Could not find route for message" })
-
     private fun updateSinkMetrics(consumedMessage: ConsumedMessage) {
         when (consumedMessage) {
             is SuccessfullyConsumedMessage ->
@@ -119,6 +106,11 @@
         }
     }
 
+    private fun <T> Flux<T>.handleErrors(): Flux<T> = onErrorResume {
+        metrics.notifyClientRejected(ClientRejectionCause.fromThrowable(it))
+        logger.handleReactiveStreamError(clientContext, it)
+    }
+
     private fun releaseBuffersMemory() = wireChunkDecoder.release()
             .also { logger.debug { "Released buffer memory after handling message stream" } }
 
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
index c362020..20b1175 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
@@ -22,7 +22,6 @@
 import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
 import org.onap.dcae.collectors.veshv.boundary.SinkProvider
 import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
-import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
 import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties
@@ -32,11 +31,7 @@
  * @since May 2018
  */
 object AdapterFactory {
-    fun sinkCreatorFactory(config: CollectorConfiguration): SinkProvider =
-            if (config.dummyMode)
-                LoggingSinkProvider()
-            else
-                KafkaSinkProvider(config)
+    fun sinkCreatorFactory(): SinkProvider = KafkaSinkProvider()
 
     fun configurationProvider(config: CbsConfiguration): ConfigurationProvider =
             ConfigurationProviderImpl(
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt
index f9fd698..185693c 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt
@@ -22,18 +22,20 @@
 import com.google.gson.JsonObject
 import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
 import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.Route
+import org.onap.dcae.collectors.veshv.config.api.model.Routing
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
 import org.onap.dcae.collectors.veshv.model.ServiceContext
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog
 import org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamOfType
 import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext
 import reactor.core.publisher.Flux
@@ -73,7 +75,7 @@
         healthState.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION)
     }
 
-    override fun invoke(): Flux<Sequence<KafkaSink>> =
+    override fun invoke(): Flux<Routing> =
             cbsClientMono
                     .doOnNext { logger.info(ServiceContext::mdc) { "CBS client successfully created" } }
                     .onErrorLog(logger, ServiceContext::mdc) { "Failed to retrieve CBS client" }
@@ -81,25 +83,25 @@
                     .doFinally { logger.trace(ServiceContext::mdc) { "CBS client subscription finished" } }
                     .flatMapMany(::handleUpdates)
 
-    private fun handleUpdates(cbsClient: CbsClient): Flux<Sequence<KafkaSink>> = cbsClient
+    private fun handleUpdates(cbsClient: CbsClient) = cbsClient
             .updates(CbsRequests.getConfiguration(RequestDiagnosticContext.create()),
                     firstRequestDelay,
                     requestInterval)
             .doOnNext { logger.info(ServiceContext::mdc) { "Received new configuration:\n$it" } }
-            .map(::createCollectorConfiguration)
+            .map(::createRoutingDescription)
             .onErrorLog(logger, ServiceContext::mdc) { "Error while creating configuration" }
             .retryWhen(retry)
 
-
-    private fun createCollectorConfiguration(configuration: JsonObject): Sequence<KafkaSink> =
-            try {
-                DataStreams.namedSinks(configuration)
-                        .filter(streamOfType(KAFKA))
-                        .map(streamParser::unsafeParse)
-                        .asSequence()
-            } catch (e: NullPointerException) {
-                throw ParsingException("Failed to parse configuration", e)
-            }
+    private fun createRoutingDescription(configuration: JsonObject): Routing = try {
+        DataStreams.namedSinks(configuration)
+                .filter(streamOfType(KAFKA))
+                .map(streamParser::unsafeParse)
+                .map { Route(it.name(), it) }
+                .asIterable()
+                .toList()
+    } catch (e: NullPointerException) {
+        throw ParsingException("Failed to parse configuration", e)
+    }
 
     companion object {
         private const val MAX_RETRIES = 5L
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
deleted file mode 100644
index 3a9467f..0000000
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl.adapters
-
-import org.onap.dcae.collectors.veshv.boundary.Sink
-import org.onap.dcae.collectors.veshv.boundary.SinkProvider
-import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info
-import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.trace
-import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.model.ConsumedMessage
-import org.onap.dcae.collectors.veshv.domain.RoutedMessage
-import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import reactor.core.publisher.Flux
-import java.util.concurrent.atomic.AtomicLong
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since June 2018
- */
-internal class LoggingSinkProvider : SinkProvider {
-
-    override fun invoke(ctx: ClientContext): Sink {
-        return object : Sink {
-            private val totalMessages = AtomicLong()
-            private val totalBytes = AtomicLong()
-
-            override fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> =
-                    messages.doOnNext(this::logMessage).map(::SuccessfullyConsumedMessage)
-
-            private fun logMessage(msg: RoutedMessage) {
-                val msgs = totalMessages.addAndGet(1)
-                val bytes = totalBytes.addAndGet(msg.message.wtpFrame.payloadSize.toLong())
-                val logMessageSupplier = { "Message routed to ${msg.topic}. Total = $msgs ($bytes B)" }
-                if (msgs % INFO_LOGGING_FREQ == 0L)
-                    logger.info(ctx, logMessageSupplier)
-                else
-                    logger.trace(ctx, logMessageSupplier)
-            }
-
-        }
-    }
-
-    companion object {
-        const val INFO_LOGGING_FREQ = 100_000
-        private val logger = Logger(LoggingSinkProvider::class)
-    }
-}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt
similarity index 65%
rename from sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
rename to sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt
index 5052cc5..2ce0f42 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@
  */
 package org.onap.dcae.collectors.veshv.impl.adapters.kafka
 
+import arrow.effects.IO
 import org.onap.dcae.collectors.veshv.boundary.Sink
 import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withDebug
 import org.onap.dcae.collectors.veshv.model.ClientContext
@@ -39,41 +40,39 @@
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since May 2018
  */
-internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>,
-                         private val ctx: ClientContext) : Sink {
+internal class KafkaPublisher(private val sender: KafkaSender<CommonEventHeader, VesMessage>,
+                              private val ctx: ClientContext) : Sink {
 
     override fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> =
-            messages.map(::vesToKafkaRecord).let { records ->
-                sender.send(records).map {
-                    val msg = it.correlationMetadata()
-                    if (it.exception() == null) {
-                        logger.trace(ctx::fullMdc, Marker.Invoke()) {
-                            "Message sent to Kafka with metadata: ${it.recordMetadata()}"
+            messages.map(::vesToKafkaRecord)
+                    .compose { sender.send(it) }
+                    .map {
+                        val msg = it.correlationMetadata()
+                        if (it.exception() == null) {
+                            logger.trace(ctx::fullMdc, Marker.Invoke()) {
+                                "Message sent to Kafka with metadata: ${it.recordMetadata()}"
+                            }
+                            SuccessfullyConsumedMessage(msg)
+                        } else {
+                            logger.warn(ctx::fullMdc, Marker.Invoke()) {
+                                "Failed to send message to Kafka. Reason: ${it.exception().message}"
+                            }
+                            logger.withDebug(ctx) { log("Kafka send failure details", it.exception()) }
+                            FailedToConsumeMessage(msg, it.exception(), MessageDropCause.KAFKA_FAILURE)
                         }
-                        SuccessfullyConsumedMessage(msg)
-                    } else {
-                        logger.warn(ctx::fullMdc, Marker.Invoke()) {
-                            "Failed to send message to Kafka. Reason: ${it.exception().message}"
-                        }
-                        logger.withDebug(ctx) { log("Kafka send failure details", it.exception()) }
-                        FailedToConsumeMessage(msg, it.exception(), MessageDropCause.KAFKA_FAILURE)
                     }
-                }
-            }
 
     private fun vesToKafkaRecord(routed: RoutedMessage): SenderRecord<CommonEventHeader, VesMessage, RoutedMessage> =
             SenderRecord.create(
-                    routed.topic,
-                    routed.partition,
+                    routed.targetTopic,
+                    routed.partition.orNull(),
                     FILL_TIMESTAMP_LATER,
                     routed.message.header,
                     routed.message,
                     routed)
 
-    internal fun usesSameSenderAs(other: KafkaSink) = sender === other.sender
-
     companion object {
         private val FILL_TIMESTAMP_LATER: Long? = null
-        private val logger = Logger(KafkaSink::class)
+        private val logger = Logger(KafkaPublisher::class)
     }
 }
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
index 96e45a0..7a49865 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -20,66 +20,38 @@
 package org.onap.dcae.collectors.veshv.impl.adapters.kafka
 
 import arrow.effects.IO
-import org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG
-import org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
-import org.apache.kafka.clients.producer.ProducerConfig.BUFFER_MEMORY_CONFIG
-import org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
-import org.apache.kafka.clients.producer.ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION
-import org.apache.kafka.clients.producer.ProducerConfig.MAX_REQUEST_SIZE_CONFIG
-import org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG
-import org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
 import org.onap.dcae.collectors.veshv.boundary.SinkProvider
-import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
 import org.onap.dcae.collectors.veshv.domain.VesMessage
+import org.onap.dcae.collectors.veshv.impl.createKafkaSender
 import org.onap.dcae.collectors.veshv.model.ClientContext
 import org.onap.dcae.collectors.veshv.model.ServiceContext
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcaegen2.services.sdk.model.streams.SinkStream
 import org.onap.ves.VesEventOuterClass.CommonEventHeader
 import reactor.kafka.sender.KafkaSender
-import reactor.kafka.sender.SenderOptions
-import java.lang.Integer.max
+import java.util.Collections.synchronizedMap
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since June 2018
  */
-internal class KafkaSinkProvider internal constructor(
-        private val kafkaSender: KafkaSender<CommonEventHeader, VesMessage>) : SinkProvider {
+internal class KafkaSinkProvider : SinkProvider {
+    private val messageSinks = synchronizedMap(
+            mutableMapOf<SinkStream, KafkaSender<CommonEventHeader, VesMessage>>()
+    )
 
-    constructor(config: CollectorConfiguration) : this(constructKafkaSender(config))
-
-    override fun invoke(ctx: ClientContext) = KafkaSink(kafkaSender, ctx)
+    override fun invoke(stream: SinkStream, ctx: ClientContext) = lazy {
+        messageSinks.computeIfAbsent(stream, ::createKafkaSender).let {
+            KafkaPublisher(it, ctx)
+        }
+    }
 
     override fun close() = IO {
-        kafkaSender.close()
-        logger.info(ServiceContext::mdc) { "KafkaSender flushed and closed" }
+        messageSinks.values.forEach { it.close() }
+        logger.info(ServiceContext::mdc) { "Message sinks flushed and closed" }
     }
 
     companion object {
         private val logger = Logger(KafkaSinkProvider::class)
-        private const val MAXIMUM_REQUEST_SIZE_MULTIPLIER = 1.2f
-        private const val BUFFER_MEMORY_MULTIPLIER = 32
-        private const val MINIMUM_BUFFER_MEMORY = 32 * 1024 * 1024
-
-        private fun constructKafkaSender(config: CollectorConfiguration) =
-                KafkaSender.create(constructSenderOptions(config))
-
-        private fun constructSenderOptions(config: CollectorConfiguration) =
-                SenderOptions.create<CommonEventHeader, VesMessage>()
-                        .producerProperty(BOOTSTRAP_SERVERS_CONFIG, config.kafkaServers)
-                        .producerProperty(MAX_REQUEST_SIZE_CONFIG, maxRequestSize(config.maxRequestSizeBytes))
-                        .producerProperty(BUFFER_MEMORY_CONFIG, bufferMemory(config.maxRequestSizeBytes))
-                        .producerProperty(KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java)
-                        .producerProperty(VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java)
-                        .producerProperty(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1)
-                        .producerProperty(RETRIES_CONFIG, 1)
-                        .producerProperty(ACKS_CONFIG, "1")
-                        .stopOnError(false)
-
-        private fun maxRequestSize(maxRequestSizeBytes: Int) =
-                (MAXIMUM_REQUEST_SIZE_MULTIPLIER * maxRequestSizeBytes).toInt()
-
-        private fun bufferMemory(maxRequestSizeBytes: Int) =
-                max(MINIMUM_BUFFER_MEMORY, BUFFER_MEMORY_MULTIPLIER * maxRequestSizeBytes)
     }
 }
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/kafka.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/kafka.kt
new file mode 100644
index 0000000..40de8c5
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/kafka.kt
@@ -0,0 +1,56 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl
+
+import org.apache.kafka.clients.producer.ProducerConfig
+import org.onap.dcae.collectors.veshv.domain.VesMessage
+import org.onap.dcae.collectors.veshv.impl.adapters.kafka.ProtobufSerializer
+import org.onap.dcae.collectors.veshv.impl.adapters.kafka.VesMessageSerializer
+import org.onap.dcaegen2.services.sdk.model.streams.SinkStream
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
+import reactor.kafka.sender.KafkaSender
+import reactor.kafka.sender.SenderOptions
+
+
+private const val MAXIMUM_REQUEST_SIZE_MULTIPLIER = 1.2f
+private const val BUFFER_MEMORY_MULTIPLIER = 32
+private const val MINIMUM_BUFFER_MEMORY = 32 * 1024 * 1024
+
+internal fun createKafkaSender(sinkStream: SinkStream) =
+        (sinkStream as KafkaSink).let { kafkaSink ->
+            KafkaSender.create(SenderOptions.create<CommonEventHeader, VesMessage>()
+                    .producerProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaSink.bootstrapServers())
+                    .producerProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, maxRequestSize(kafkaSink))
+                    .producerProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory(kafkaSink))
+                    .producerProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java)
+                    .producerProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java)
+                    .producerProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1)
+                    .producerProperty(ProducerConfig.RETRIES_CONFIG, 1)
+                    .producerProperty(ProducerConfig.ACKS_CONFIG, "1")
+                    .stopOnError(false)
+            )
+        }
+
+private fun maxRequestSize(kafkaSink: KafkaSink) =
+        (MAXIMUM_REQUEST_SIZE_MULTIPLIER * kafkaSink.maxPayloadSizeBytes()).toInt()
+
+private fun bufferMemory(kafkaSink: KafkaSink) =
+        Integer.max(MINIMUM_BUFFER_MEMORY, BUFFER_MEMORY_MULTIPLIER * kafkaSink.maxPayloadSizeBytes())
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
index 9629816..6b9c680 100644
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -20,21 +20,30 @@
 package org.onap.dcae.collectors.veshv.impl
 
 import arrow.core.None
-import arrow.core.Some
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.verify
+import com.nhaarman.mockitokotlin2.whenever
 import org.assertj.core.api.Assertions.assertThat
 import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
 import org.jetbrains.spek.api.dsl.on
-import org.onap.dcae.collectors.veshv.config.api.model.routing
+import org.onap.dcae.collectors.veshv.config.api.model.Route
+import org.onap.dcae.collectors.veshv.config.api.model.Routing
+import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.domain.RoutedMessage
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.SYSLOG
 import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.domain.RoutedMessage
 import org.onap.dcae.collectors.veshv.domain.VesMessage
+import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage
 import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
 import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
+import reactor.core.publisher.Flux
+import reactor.test.StepVerifier
 
 
 /**
@@ -42,72 +51,85 @@
  * @since May 2018
  */
 object RouterTest : Spek({
-    given("sample configuration") {
-        val config = routing {
 
-            defineRoute {
-                fromDomain(PERF3GPP.domainName)
-                toTopic("ves_rtpm")
-                withFixedPartitioning(2)
+    describe("Router") {
+
+        whenever(perf3gppSinkMock.topicName()).thenReturn(perf3gppTopic)
+        whenever(syslogSinkMock.topicName()).thenReturn(syslogTopic)
+
+        val messageSinkMap = mapOf(
+                Pair(perf3gppTopic, lazyOf(messageSinkMock)),
+                Pair(syslogTopic, lazyOf(messageSinkMock))
+        )
+
+        given("sample routing specification") {
+            val cut = router(defaultRouting, messageSinkMap)
+
+            on("message with existing route (rtpm)") {
+                whenever(messageSinkMock.send(routedPerf3GppMessage))
+                        .thenReturn(Flux.just(successfullyConsumedPerf3gppMessage))
+
+                it("should be properly routed") {
+                    val result = cut.route(perf3gppMessage)
+
+                    assertThat(result).isNotNull()
+                    StepVerifier.create(result)
+                            .expectNext(successfullyConsumedPerf3gppMessage)
+                            .verifyComplete()
+
+                    verify(perf3gppSinkMock).topicName()
+                    verify(messageSinkMock).send(routedPerf3GppMessage)
+                }
             }
 
-            defineRoute {
-                fromDomain(SYSLOG.domainName)
-                toTopic("ves_trace")
-                withFixedPartitioning()
-            }
-        }.build()
-        val cut = Router(config, ClientContext())
+            on("message with existing route (syslog)") {
+                whenever(messageSinkMock.send(routedSyslogMessage))
+                        .thenReturn(Flux.just(successfullyConsumedSyslogMessage))
+                val result = cut.route(syslogMessage)
 
-        on("message with existing route (rtpm)") {
-            val message = VesMessage(commonHeader(PERF3GPP), emptyWireProtocolFrame())
-            val result = cut.findDestination(message)
+                it("should be properly routed") {
+                    StepVerifier.create(result)
+                            .expectNext(successfullyConsumedSyslogMessage)
+                            .verifyComplete()
 
-            it("should have route available") {
-                assertThat(result).isNotNull()
+                    verify(syslogSinkMock).topicName()
+                    verify(messageSinkMock).send(routedSyslogMessage)
+                }
             }
 
-            it("should be routed to proper partition") {
-                assertThat(result.map(RoutedMessage::partition)).isEqualTo(Some(2))
-            }
+            on("message with unknown route") {
+                val message = VesMessage(commonHeader(HEARTBEAT), emptyWireProtocolFrame())
+                val result = cut.route(message)
 
-            it("should be routed to proper topic") {
-                assertThat(result.map(RoutedMessage::topic)).isEqualTo(Some("ves_rtpm"))
-            }
-
-            it("should be routed with a given message") {
-                assertThat(result.map(RoutedMessage::message)).isEqualTo(Some(message))
-            }
-        }
-
-        on("message with existing route (trace)") {
-            val message = VesMessage(commonHeader(SYSLOG), emptyWireProtocolFrame())
-            val result = cut.findDestination(message)
-
-            it("should have route available") {
-                assertThat(result).isNotNull()
-            }
-
-            it("should be routed to proper partition") {
-                assertThat(result.map(RoutedMessage::partition)).isEqualTo(Some(0))
-            }
-
-            it("should be routed to proper topic") {
-                assertThat(result.map(RoutedMessage::topic)).isEqualTo(Some("ves_trace"))
-            }
-
-            it("should be routed with a given message") {
-                assertThat(result.map(RoutedMessage::message)).isEqualTo(Some(message))
-            }
-        }
-
-        on("message with unknown route") {
-            val message = VesMessage(commonHeader(HEARTBEAT), emptyWireProtocolFrame())
-            val result = cut.findDestination(message)
-
-            it("should not have route available") {
-                assertThat(result).isEqualTo(None)
+                it("should not have route available") {
+                    StepVerifier.create(result).verifyComplete()
+                }
             }
         }
     }
-})
\ No newline at end of file
+
+})
+
+private fun router(routing: Routing, kafkaPublisherMap: Map<String, Lazy<Sink>>) =
+        Router(routing, kafkaPublisherMap, ClientContext(), mock())
+
+private val perf3gppTopic = "PERF_PERF"
+private val perf3gppSinkMock = mock<KafkaSink>()
+private val default3gppRoute = Route(PERF3GPP.domainName, perf3gppSinkMock)
+
+private val syslogTopic = "SYS_LOG"
+private val syslogSinkMock = mock<KafkaSink>()
+private val defaultSyslogRoute = Route(SYSLOG.domainName, syslogSinkMock)
+
+private val defaultRouting = listOf(default3gppRoute, defaultSyslogRoute)
+
+private val messageSinkMock = mock<Sink>()
+private val default_partition = None
+
+private val perf3gppMessage = VesMessage(commonHeader(PERF3GPP), emptyWireProtocolFrame())
+private val routedPerf3GppMessage = RoutedMessage(perf3gppMessage, perf3gppTopic, default_partition)
+private val successfullyConsumedPerf3gppMessage = SuccessfullyConsumedMessage(routedPerf3GppMessage)
+
+private val syslogMessage = VesMessage(commonHeader(SYSLOG), emptyWireProtocolFrame())
+private val routedSyslogMessage = RoutedMessage(syslogMessage, syslogTopic, default_partition)
+private val successfullyConsumedSyslogMessage = SuccessfullyConsumedMessage(routedSyslogMessage)
\ No newline at end of file
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImplTest.kt
similarity index 92%
rename from sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt
rename to sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImplTest.kt
index 571a668..8616ce0 100644
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImplTest.kt
@@ -36,6 +36,7 @@
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers
 import reactor.core.publisher.Flux
+
 import reactor.core.publisher.Mono
 import reactor.retry.Retry
 import reactor.test.StepVerifier
@@ -64,8 +65,8 @@
                             .expectNoEvent(waitTime)
                 }
             }
-
         }
+
         given("valid configuration from cbs") {
             val configProvider = constructConfigurationProvider(cbsClientMock, healthStateProvider)
 
@@ -76,18 +77,23 @@
 
                     StepVerifier.create(configProvider().take(1))
                             .consumeNextWith {
-                                val receivedSink1 = it.elementAt(0)
-                                val receivedSink2 = it.elementAt(1)
+                                val route1 = it.elementAt(0)
+                                val route2 = it.elementAt(1)
+                                val receivedSink1 = route1.sink
+                                val receivedSink2 = route2.sink
 
+                                assertThat(route1.domain).isEqualTo(PERF3GPP_REGIONAL)
                                 assertThat(receivedSink1.aafCredentials()).isEqualTo(aafCredentials1)
                                 assertThat(receivedSink1.bootstrapServers())
                                         .isEqualTo("dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060")
                                 assertThat(receivedSink1.topicName()).isEqualTo("REG_HVVES_PERF3GPP")
 
+                                assertThat(route2.domain).isEqualTo(PERF3GPP_CENTRAL)
                                 assertThat(receivedSink2.aafCredentials()).isEqualTo(aafCredentials2)
                                 assertThat(receivedSink2.bootstrapServers())
                                         .isEqualTo("dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060")
                                 assertThat(receivedSink2.topicName()).isEqualTo("CEN_HVVES_PERF3GPP")
+
                             }.verifyComplete()
                 }
             }
@@ -120,6 +126,10 @@
 
 })
 
+
+val PERF3GPP_REGIONAL = "perf3gpp_regional"
+val PERF3GPP_CENTRAL = "perf3gpp_central"
+
 private val aafCredentials1 = ImmutableAafCredentials.builder()
         .username("client")
         .password("very secure password")
@@ -133,7 +143,7 @@
 private val validConfiguration = JsonParser().parse("""
 {
     "streams_publishes": {
-        "perf3gpp_regional": {
+        "$PERF3GPP_REGIONAL": {
             "type": "kafka",
             "aaf_credentials": {
                 "username": "client",
@@ -144,7 +154,7 @@
                 "topic_name": "REG_HVVES_PERF3GPP"
             }
         },
-        "perf3gpp_central": {
+        "$PERF3GPP_CENTRAL": {
             "type": "kafka",
             "aaf_credentials": {
                 "username": "other_client",
@@ -161,7 +171,7 @@
 private val invalidConfiguration = JsonParser().parse("""
 {
     "streams_publishes": {
-        "perf3gpp_regional": {
+        "$PERF3GPP_REGIONAL": {
             "type": "kafka",
             "aaf_credentials": {
                 "username": "client",
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt
deleted file mode 100644
index 1e3f2e7..0000000
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProviderTest.kt
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl.adapters.kafka
-
-import arrow.syntax.collections.tail
-import com.nhaarman.mockitokotlin2.mock
-import com.nhaarman.mockitokotlin2.verify
-import org.assertj.core.api.Assertions.assertThat
-import org.jetbrains.spek.api.Spek
-import org.jetbrains.spek.api.dsl.describe
-import org.jetbrains.spek.api.dsl.given
-import org.jetbrains.spek.api.dsl.it
-import org.jetbrains.spek.api.dsl.on
-import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
-import org.onap.dcae.collectors.veshv.config.api.model.routing
-import org.onap.dcae.collectors.veshv.domain.VesMessage
-import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.ves.VesEventOuterClass
-import reactor.kafka.sender.KafkaSender
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since December 2018
- */
-internal object KafkaSinkProviderTest : Spek({
-    describe("non functional requirements") {
-        given("sample configuration") {
-            val config = CollectorConfiguration(
-                    dummyMode = false,
-                    maxRequestSizeBytes = 1024 * 1024,
-                    kafkaServers = "localhost:9090",
-                    routing = routing { }.build())
-
-            val cut = KafkaSinkProvider(config)
-
-            on("sample clients") {
-                val clients = listOf(
-                        ClientContext(),
-                        ClientContext(),
-                        ClientContext(),
-                        ClientContext())
-
-                it("should create only one instance of KafkaSender") {
-                    val sinks = clients.map(cut::invoke)
-                    val firstSink = sinks[0]
-                    val restOfSinks = sinks.tail()
-
-                    assertThat(restOfSinks).isNotEmpty
-                    assertThat(restOfSinks).allSatisfy { sink ->
-                        assertThat(firstSink.usesSameSenderAs(sink))
-                                .describedAs("$sink.kafkaSender should be same as $firstSink.kafkaSender")
-                                .isTrue()
-                    }
-                }
-            }
-        }
-
-        given("dummy KafkaSender") {
-            val kafkaSender: KafkaSender<VesEventOuterClass.CommonEventHeader, VesMessage> = mock()
-            val cut = KafkaSinkProvider(kafkaSender)
-
-            on("close") {
-                cut.close().unsafeRunSync()
-
-                it("should close KafkaSender") {
-                    verify(kafkaSender).close()
-                }
-            }
-        }
-    }
-})
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
index a6b32ed..92719e9 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -33,10 +33,10 @@
 import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
 import org.onap.dcae.collectors.veshv.model.MessageDropCause.KAFKA_FAILURE
 import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
-import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
+import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC
 import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
-import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting
-import org.onap.dcae.collectors.veshv.tests.fakes.configWithTwoDomainsToOneTopicRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting
 import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
 import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidListenerVersion
 import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader
@@ -92,7 +92,7 @@
 
     describe("Messages sent metrics") {
         it("should gather info for each topic separately") {
-            val sut = vesHvWithAlwaysSuccessfulSink(configWithTwoDomainsToOneTopicRouting)
+            val sut = vesHvWithAlwaysSuccessfulSink(twoDomainsToOneTopicRouting)
 
             sut.handleConnection(
                     vesWireFrameMessage(PERF3GPP),
@@ -107,8 +107,8 @@
             assertThat(metrics.messagesOnTopic(PERF3GPP_TOPIC))
                     .describedAs("messagesSentToTopic $PERF3GPP_TOPIC metric")
                     .isEqualTo(2)
-            assertThat(metrics.messagesOnTopic(MEASUREMENTS_FOR_VF_SCALING_TOPIC))
-                    .describedAs("messagesSentToTopic $MEASUREMENTS_FOR_VF_SCALING_TOPIC metric")
+            assertThat(metrics.messagesOnTopic(ALTERNATE_PERF3GPP_TOPIC))
+                    .describedAs("messagesSentToTopic $ALTERNATE_PERF3GPP_TOPIC metric")
                     .isEqualTo(1)
         }
     }
@@ -130,7 +130,7 @@
 
     describe("Messages dropped metrics") {
         it("should gather metrics for invalid messages") {
-            val sut = vesHvWithAlwaysSuccessfulSink(configWithBasicRouting)
+            val sut = vesHvWithAlwaysSuccessfulSink(basicRouting)
 
             sut.handleConnection(
                     messageWithInvalidWireFrameHeader(),
@@ -146,7 +146,7 @@
         }
 
         it("should gather metrics for route not found") {
-            val sut = vesHvWithAlwaysSuccessfulSink(configWithBasicRouting)
+            val sut = vesHvWithAlwaysSuccessfulSink(basicRouting)
 
             sut.handleConnection(
                     vesWireFrameMessage(domain = PERF3GPP),
@@ -160,7 +160,7 @@
         }
 
         it("should gather metrics for sing errors") {
-            val sut = vesHvWithAlwaysFailingSink(configWithBasicRouting)
+            val sut = vesHvWithAlwaysFailingSink(basicRouting)
 
             sut.handleConnection(vesWireFrameMessage(domain = PERF3GPP))
 
@@ -171,7 +171,7 @@
         }
 
         it("should gather summed metrics for dropped messages") {
-            val sut = vesHvWithAlwaysSuccessfulSink(configWithBasicRouting)
+            val sut = vesHvWithAlwaysSuccessfulSink(basicRouting)
 
             sut.handleConnection(
                     vesWireFrameMessage(domain = PERF3GPP),
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
index 50fe098..61a9a35 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -34,7 +34,7 @@
 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 import org.onap.dcae.collectors.veshv.tests.component.Sut.Companion.MAX_PAYLOAD_SIZE_BYTES
 import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink
-import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
 import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType
@@ -57,7 +57,7 @@
         it("should handle multiple clients in reasonable time") {
             val sink = CountingSink()
             val sut = Sut(sink)
-            sut.configurationProvider.updateConfiguration(configWithBasicRouting)
+            sut.configurationProvider.updateConfiguration(basicRouting)
 
             val numMessages: Long = 300_000
             val runs = 4
@@ -79,7 +79,7 @@
             val durationSec = durationMs / 1000.0
             val throughput = sink.count / durationSec
             logger.info { "Processed $runs connections each containing $numMessages msgs." }
-            logger.info { "Forwarded ${sink.count / ONE_MILION} Mmsgs in $durationSec seconds, that is $throughput msgs/s" }
+            logger.info { "Forwarded ${sink.count / ONE_MILLION}M msgs in $durationSec seconds, that is $throughput msgs/PERF3GPP_REGIONAL" }
             assertThat(sink.count)
                     .describedAs("should send all events")
                     .isEqualTo(runs * numMessages)
@@ -88,7 +88,7 @@
         it("should disconnect on transmission errors") {
             val sink = CountingSink()
             val sut = Sut(sink)
-            sut.configurationProvider.updateConfiguration(configWithBasicRouting)
+            sut.configurationProvider.updateConfiguration(basicRouting)
 
             val numMessages: Long = 100_000
             val timeout = Duration.ofSeconds(30)
@@ -159,7 +159,7 @@
 })
 
 
-private const val ONE_MILION = 1_000_000.0
+private const val ONE_MILLION = 1_000_000.0
 private val rand = Random()
 private val generatorsFactory = MessageGeneratorFactory(MAX_PAYLOAD_SIZE_BYTES)
 
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
index eb3ba26..ec54060 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -38,8 +38,9 @@
 import org.onap.dcae.collectors.veshv.tests.fakes.FakeHealthState
 import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics
 import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
-import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
+import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
+import org.onap.dcae.collectors.veshv.utils.Closeable
+import org.onap.dcaegen2.services.sdk.model.streams.SinkStream
 import reactor.core.publisher.Flux
 import java.time.Duration
 import java.util.concurrent.atomic.AtomicBoolean
@@ -48,7 +49,7 @@
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since May 2018
  */
-class Sut(sink: Sink = StoringSink()) : AutoCloseable {
+class Sut(sink: Sink = StoringSink()) : Closeable {
     val configurationProvider = FakeConfigurationProvider()
     val healthStateProvider = FakeHealthState()
     val alloc: ByteBufAllocator = UnpooledByteBufAllocator.DEFAULT
@@ -60,7 +61,9 @@
             sinkProvider,
             metrics,
             MAX_PAYLOAD_SIZE_BYTES,
-            healthStateProvider)
+            healthStateProvider
+    )
+
     private val collectorProvider = collectorFactory.createVesHvCollectorProvider()
 
     val collector: Collector
@@ -68,51 +71,52 @@
             throw IllegalStateException("Collector not available.")
         }
 
-    override fun close() {
-        collectorProvider.close().unsafeRunSync()
+
+    fun handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> {
+        collector.handleConnection(Flux.fromArray(packets)).block(timeout)
+        return sink.sentMessages
     }
 
+    fun handleConnection(vararg packets: ByteBuf) {
+        collector.handleConnection(Flux.fromArray(packets)).block(timeout)
+    }
+
+    override fun close() = collectorProvider.close()
+
     companion object {
         const val MAX_PAYLOAD_SIZE_BYTES = 1024
     }
 }
 
-
 class DummySinkProvider(private val sink: Sink) : SinkProvider {
-    private val active = AtomicBoolean(true)
+    private val sinkInitialized = AtomicBoolean(false)
 
-    override fun invoke(ctx: ClientContext) = sink
-
-    override fun close() = IO {
-        active.set(false)
+    override fun invoke(stream: SinkStream, ctx: ClientContext) = lazy {
+        sinkInitialized.set(true)
+        sink
     }
 
-    val closed get() = !active.get()
-
+    override fun close() =
+            if (sinkInitialized.get()) {
+                sink.close()
+            } else {
+                IO.unit
+            }
 }
 
 private val timeout = Duration.ofSeconds(10)
 
-fun Sut.handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> {
-    collector.handleConnection(Flux.fromArray(packets)).block(timeout)
-    return sink.sentMessages
-}
-
-fun Sut.handleConnection(vararg packets: ByteBuf) {
-    collector.handleConnection(Flux.fromArray(packets)).block(timeout)
-}
-
-fun vesHvWithAlwaysSuccessfulSink(kafkaSinks: Sequence<KafkaSink> = configWithBasicRouting): Sut =
+fun vesHvWithAlwaysSuccessfulSink(routing: Routing = basicRouting): Sut =
         Sut(AlwaysSuccessfulSink()).apply {
-            configurationProvider.updateConfiguration(kafkaSinks)
+            configurationProvider.updateConfiguration(routing)
         }
 
-fun vesHvWithAlwaysFailingSink(kafkaSinks: Sequence<KafkaSink> = configWithBasicRouting): Sut =
+fun vesHvWithAlwaysFailingSink(routing: Routing = basicRouting): Sut =
         Sut(AlwaysFailingSink()).apply {
-            configurationProvider.updateConfiguration(kafkaSinks)
+            configurationProvider.updateConfiguration(routing)
         }
 
-fun vesHvWithDelayingSink(delay: Duration, kafkaSinks: Sequence<KafkaSink> = configWithBasicRouting): Sut =
+fun vesHvWithDelayingSink(delay: Duration, routing: Routing = basicRouting): Sut =
         Sut(DelayingSink(delay)).apply {
-            configurationProvider.updateConfiguration(kafkaSinks)
+            configurationProvider.updateConfiguration(routing)
         }
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
index 21c5c18..5d215fc 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@
  */
 package org.onap.dcae.collectors.veshv.tests.component
 
+import arrow.core.None
 import org.assertj.core.api.Assertions.assertThat
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
@@ -30,13 +31,12 @@
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
 import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC
-import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
 import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
 import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
-import org.onap.dcae.collectors.veshv.tests.fakes.configWithBasicRouting
-import org.onap.dcae.collectors.veshv.tests.fakes.configWithDifferentRouting
-import org.onap.dcae.collectors.veshv.tests.fakes.configWithEmptyRouting
-import org.onap.dcae.collectors.veshv.tests.fakes.configWithTwoDomainsToOneTopicRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.alternativeRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.emptyRouting
+import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting
 import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
 import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader
 import org.onap.dcae.collectors.veshv.tests.utils.messageWithPayloadOfSize
@@ -65,12 +65,28 @@
                     .hasSize(2)
         }
 
+        it("should create sink lazily") {
+            val (sut, sink) = vesHvWithStoringSink()
+
+            // just connecting should not create sink
+            sut.handleConnection()
+            sut.close().unsafeRunSync()
+
+            // then
+            assertThat(sink.closed).isFalse()
+        }
+
         it("should close sink when closing collector provider") {
-            val (sut, _) = vesHvWithStoringSink()
+            val (sut, sink) = vesHvWithStoringSink()
+            // given Sink initialized
+            // Note: as StoringSink is (hopefully) created lazily, "valid" ves message needs to be sent
+            sut.handleConnection(vesWireFrameMessage(PERF3GPP))
 
-            sut.close()
+            // when
+            sut.close().unsafeRunSync()
 
-            assertThat(sut.sinkProvider.closed).isTrue()
+            // then
+            assertThat(sink.closed).isTrue()
         }
     }
 
@@ -145,14 +161,14 @@
             assertThat(messages).describedAs("number of routed messages").hasSize(1)
 
             val msg = messages[0]
-            assertThat(msg.topic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
-            assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0)
+            assertThat(msg.targetTopic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
+            assertThat(msg.partition).describedAs("routed message partition").isEqualTo(None)
         }
 
         it("should be able to direct 2 messages from different domains to one topic") {
             val (sut, sink) = vesHvWithStoringSink()
 
-            sut.configurationProvider.updateConfiguration(configWithTwoDomainsToOneTopicRouting)
+            sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicRouting)
 
             val messages = sut.handleConnection(sink,
                     vesWireFrameMessage(PERF3GPP),
@@ -161,14 +177,14 @@
 
             assertThat(messages).describedAs("number of routed messages").hasSize(3)
 
-            assertThat(messages[0].topic).describedAs("first message topic")
+            assertThat(messages[0].targetTopic).describedAs("first message topic")
                     .isEqualTo(PERF3GPP_TOPIC)
 
-            assertThat(messages[1].topic).describedAs("second message topic")
+            assertThat(messages[1].targetTopic).describedAs("second message topic")
                     .isEqualTo(PERF3GPP_TOPIC)
 
-            assertThat(messages[2].topic).describedAs("last message topic")
-                    .isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
+            assertThat(messages[2].targetTopic).describedAs("last message topic")
+                    .isEqualTo(ALTERNATE_PERF3GPP_TOPIC)
         }
 
         it("should drop message if route was not found") {
@@ -181,7 +197,7 @@
             assertThat(messages).describedAs("number of routed messages").hasSize(1)
 
             val msg = messages[0]
-            assertThat(msg.topic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
+            assertThat(msg.targetTopic).describedAs("routed message topic").isEqualTo(PERF3GPP_TOPIC)
             assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second")
         }
     }
@@ -205,7 +221,7 @@
             it("should update collector") {
                 val firstCollector = sut.collector
 
-                sut.configurationProvider.updateConfiguration(configWithDifferentRouting)
+                sut.configurationProvider.updateConfiguration(alternativeRouting)
                 val collectorAfterUpdate = sut.collector
 
                 assertThat(collectorAfterUpdate).isNotSameAs(firstCollector)
@@ -213,21 +229,21 @@
 
             it("should start routing messages") {
 
-                sut.configurationProvider.updateConfiguration(configWithEmptyRouting)
+                sut.configurationProvider.updateConfiguration(emptyRouting)
 
                 val messages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
                 assertThat(messages).isEmpty()
 
-                sut.configurationProvider.updateConfiguration(configWithBasicRouting)
+                sut.configurationProvider.updateConfiguration(basicRouting)
 
                 val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
                 assertThat(messagesAfterUpdate).hasSize(1)
                 val message = messagesAfterUpdate[0]
 
-                assertThat(message.topic).describedAs("routed message topic after configuration's change")
+                assertThat(message.targetTopic).describedAs("routed message topic after configuration'PERF3GPP_REGIONAL change")
                         .isEqualTo(PERF3GPP_TOPIC)
                 assertThat(message.partition).describedAs("routed message partition")
-                        .isEqualTo(0)
+                        .isEqualTo(None)
             }
 
             it("should change domain routing") {
@@ -236,22 +252,22 @@
                 assertThat(messages).hasSize(1)
                 val firstMessage = messages[0]
 
-                assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration")
+                assertThat(firstMessage.targetTopic).describedAs("routed message topic on initial configuration")
                         .isEqualTo(PERF3GPP_TOPIC)
                 assertThat(firstMessage.partition).describedAs("routed message partition")
-                        .isEqualTo(0)
+                        .isEqualTo(None)
 
 
-                sut.configurationProvider.updateConfiguration(configWithDifferentRouting)
+                sut.configurationProvider.updateConfiguration(alternativeRouting)
 
                 val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
                 assertThat(messagesAfterUpdate).hasSize(2)
                 val secondMessage = messagesAfterUpdate[1]
 
-                assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change")
+                assertThat(secondMessage.targetTopic).describedAs("routed message topic after configuration'PERF3GPP_REGIONAL change")
                         .isEqualTo(ALTERNATE_PERF3GPP_TOPIC)
                 assertThat(secondMessage.partition).describedAs("routed message partition")
-                        .isEqualTo(0)
+                        .isEqualTo(None)
             }
 
             it("should update routing for each client sending one message") {
@@ -261,7 +277,7 @@
 
                 Flux.range(0, messagesAmount).doOnNext {
                     if (it == messagesForEachTopic) {
-                        sut.configurationProvider.updateConfiguration(configWithDifferentRouting)
+                        sut.configurationProvider.updateConfiguration(alternativeRouting)
                     }
                 }.doOnNext {
                     sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP))
@@ -269,8 +285,8 @@
 
 
                 val messages = sink.sentMessages
-                val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC }
-                val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_PERF3GPP_TOPIC }
+                val firstTopicMessagesCount = messages.count { it.targetTopic == PERF3GPP_TOPIC }
+                val secondTopicMessagesCount = messages.count { it.targetTopic == ALTERNATE_PERF3GPP_TOPIC }
 
                 assertThat(messages.size).isEqualTo(messagesAmount)
                 assertThat(messagesForEachTopic)
@@ -287,7 +303,7 @@
                 val incomingMessages = Flux.range(0, messageStreamSize)
                         .doOnNext {
                             if (it == pivot) {
-                                sut.configurationProvider.updateConfiguration(configWithDifferentRouting)
+                                sut.configurationProvider.updateConfiguration(alternativeRouting)
                                 println("config changed")
                             }
                         }
@@ -297,8 +313,8 @@
                 sut.collector.handleConnection(incomingMessages).block(defaultTimeout)
 
                 val messages = sink.sentMessages
-                val firstTopicMessagesCount = messages.count { it.topic == PERF3GPP_TOPIC }
-                val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_PERF3GPP_TOPIC }
+                val firstTopicMessagesCount = messages.count { it.targetTopic == PERF3GPP_TOPIC }
+                val secondTopicMessagesCount = messages.count { it.targetTopic == ALTERNATE_PERF3GPP_TOPIC }
 
                 assertThat(messages.size).isEqualTo(messageStreamSize)
                 assertThat(firstTopicMessagesCount)
@@ -320,7 +336,7 @@
         given("failed configuration change") {
             val (sut, _) = vesHvWithStoringSink()
             sut.configurationProvider.shouldThrowExceptionOnConfigUpdate(true)
-            sut.configurationProvider.updateConfiguration(configWithBasicRouting)
+            sut.configurationProvider.updateConfiguration(basicRouting)
 
             it("should mark the application unhealthy ") {
                 assertThat(sut.healthStateProvider.currentHealth)
@@ -349,6 +365,6 @@
 private fun vesHvWithStoringSink(): Pair<Sut, StoringSink> {
     val sink = StoringSink()
     val sut = Sut(sink)
-    sut.configurationProvider.updateConfiguration(configWithBasicRouting)
+    sut.configurationProvider.updateConfiguration(basicRouting)
     return Pair(sut, sink)
 }
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
index 6599d40..c465fd9 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -20,67 +20,21 @@
 package org.onap.dcae.collectors.veshv.tests.fakes
 
 import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableKafkaSink
+import org.onap.dcae.collectors.veshv.config.api.model.Routing
 import reactor.core.publisher.FluxProcessor
 import reactor.core.publisher.UnicastProcessor
 import reactor.retry.RetryExhaustedException
 
 
-const val PERF3GPP_TOPIC = "HV_VES_PERF3GPP"
-const val MEASUREMENTS_FOR_VF_SCALING_TOPIC = "HV_VES_MEAS_FOR_VF_SCALING"
-const val ALTERNATE_PERF3GPP_TOPIC = "HV_VES_PERF3GPP_ALTERNATIVE"
-const val SAMPLE_BOOTSTRAP_SERVERS = "dmaap-mr-kafka-0:6060,dmaap-mr-kafka-1:6060"
-
-val configWithBasicRouting = sequenceOf(
-    ImmutableKafkaSink.builder()
-            .name(PERF3GPP.domainName)
-            .topicName(PERF3GPP_TOPIC)
-            .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS)
-            .build()
-)
-
-val configWithTwoDomainsToOneTopicRouting = sequenceOf(
-        ImmutableKafkaSink.builder()
-                .name(PERF3GPP.domainName)
-                .topicName(PERF3GPP_TOPIC)
-                .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS)
-                .build(),
-        ImmutableKafkaSink.builder()
-                .name(HEARTBEAT.domainName)
-                .topicName(PERF3GPP_TOPIC)
-                .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS)
-                .build(),
-        ImmutableKafkaSink.builder()
-                .name(MEASUREMENT.domainName)
-                .topicName(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
-                .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS)
-                .build()
-)
-
-val configWithDifferentRouting = sequenceOf(
-                ImmutableKafkaSink.builder()
-                        .name(PERF3GPP.domainName)
-                        .topicName(ALTERNATE_PERF3GPP_TOPIC)
-                        .bootstrapServers(SAMPLE_BOOTSTRAP_SERVERS)
-                        .build()
-        )
-
-val configWithEmptyRouting = emptySequence<KafkaSink>()
-
-
 class FakeConfigurationProvider : ConfigurationProvider {
     private var shouldThrowException = false
-    private val configStream: FluxProcessor<Sequence<KafkaSink>, Sequence<KafkaSink>> = UnicastProcessor.create()
+    private val configStream: FluxProcessor<Routing, Routing> = UnicastProcessor.create()
 
-    fun updateConfiguration(kafkaSinkSequence: Sequence<KafkaSink>) =
+    fun updateConfiguration(routing: Routing) =
             if (shouldThrowException) {
                 configStream.onError(RetryExhaustedException("I'm so tired"))
             } else {
-                configStream.onNext(kafkaSinkSequence)
+                configStream.onNext(routing)
             }
 
 
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt
index b599a07..a450b79 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -54,7 +54,7 @@
 
     override fun notifyMessageSent(msg: RoutedMessage) {
         messagesSentCount++
-        messagesSentToTopic.compute(msg.topic) { k, _ ->
+        messagesSentToTopic.compute(msg.targetTopic) { k, _ ->
             messagesSentToTopic[k]?.inc() ?: 1
         }
         lastProcessingTimeMicros = Duration.between(msg.message.wtpFrame.receivedAt, Instant.now()).toNanos() / 1000.0
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/routing.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/routing.kt
new file mode 100644
index 0000000..e9914ef
--- /dev/null
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/routing.kt
@@ -0,0 +1,60 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.tests.fakes
+
+import org.onap.dcae.collectors.veshv.config.api.model.Route
+import org.onap.dcae.collectors.veshv.config.api.model.Routing
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableKafkaSink
+
+const val PERF3GPP_TOPIC = "HV_VES_PERF3GPP"
+const val ALTERNATE_PERF3GPP_TOPIC = "HV_VES_PERF3GPP_ALTERNATIVE"
+const val KAFKA_BOOTSTRAP_SERVERS = "kafka:9092"
+const val MAX_PAYLOAD_SIZE_BYTES = 1024 * 1024
+
+private val perf3gppKafkaSink = ImmutableKafkaSink.builder()
+        .name("PERF3GPP")
+        .bootstrapServers(KAFKA_BOOTSTRAP_SERVERS)
+        .topicName(PERF3GPP_TOPIC)
+        .maxPayloadSizeBytes(MAX_PAYLOAD_SIZE_BYTES)
+        .build()
+private val alternativeKafkaSink = ImmutableKafkaSink.builder()
+        .name("ALTERNATE")
+        .bootstrapServers(KAFKA_BOOTSTRAP_SERVERS)
+        .topicName(ALTERNATE_PERF3GPP_TOPIC)
+        .maxPayloadSizeBytes(MAX_PAYLOAD_SIZE_BYTES)
+        .build()
+
+
+val basicRouting: Routing = listOf(
+        Route(VesEventDomain.PERF3GPP.domainName, perf3gppKafkaSink)
+)
+
+val alternativeRouting: Routing = listOf(
+        Route(VesEventDomain.PERF3GPP.domainName, alternativeKafkaSink)
+)
+
+val twoDomainsToOneTopicRouting: Routing = listOf(
+        Route(VesEventDomain.PERF3GPP.domainName, perf3gppKafkaSink),
+        Route(VesEventDomain.HEARTBEAT.domainName, perf3gppKafkaSink),
+        Route(VesEventDomain.MEASUREMENT.domainName, alternativeKafkaSink)
+)
+
+val emptyRouting: Routing = emptyList()
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
index 51f724e..160defd 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@
  */
 package org.onap.dcae.collectors.veshv.tests.fakes
 
+import arrow.effects.IO
 import org.onap.dcae.collectors.veshv.boundary.Sink
 import org.onap.dcae.collectors.veshv.model.ConsumedMessage
 import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage
@@ -30,6 +31,7 @@
 import java.time.Duration
 import java.util.*
 import java.util.concurrent.ConcurrentLinkedDeque
+import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent.atomic.AtomicLong
 
 /**
@@ -38,6 +40,8 @@
  */
 class StoringSink : Sink {
     private val sent: Deque<RoutedMessage> = ConcurrentLinkedDeque()
+    private val active = AtomicBoolean(true)
+    val closed get() = !active.get()
 
     val sentMessages: List<RoutedMessage>
         get() = sent.toList()
@@ -45,6 +49,13 @@
     override fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> {
         return messages.doOnNext(sent::addLast).map(::SuccessfullyConsumedMessage)
     }
+
+    /*
+    * TOD0: if the code would look like:
+    * ```IO { active.set(false) }```
+    * the tests wouldn't pass even though `.unsafeRunSync()` is called (see HvVesSpec)
+    */
+    override fun close() = active.set(false).run { IO.unit }
 }
 
 /**
diff --git a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/RoutedMessage.kt b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/RoutedMessage.kt
index e4d147b..04f9be6 100644
--- a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/RoutedMessage.kt
+++ b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/RoutedMessage.kt
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -19,4 +19,9 @@
  */
 package org.onap.dcae.collectors.veshv.domain
 
-data class RoutedMessage(val topic: String, val partition: Int, val message: VesMessage)
+import arrow.core.Option
+
+
+data class RoutedMessage(val message: VesMessage,
+                         val targetTopic: String,
+                         val partition: Option<Int>)
diff --git a/sources/hv-collector-main/src/main/docker/base.json b/sources/hv-collector-main/src/main/docker/base.json
index 7f88cb6..67576c8 100644
--- a/sources/hv-collector-main/src/main/docker/base.json
+++ b/sources/hv-collector-main/src/main/docker/base.json
@@ -12,7 +12,6 @@
   "security": {
   },
   "collector": {
-    "dummyMode": false,
     "maxRequestSizeBytes": 1048576,
     "kafkaServers": [
       "message-router-kafka:9092"
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
index 2fb4476..c04c2c9 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
@@ -2,7 +2,7 @@
  * ============LICENSE_START=======================================================
  * dcaegen2-collectors-veshv
  * ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
  * ================================================================================
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -103,7 +103,7 @@
     override fun notifyMessageSent(msg: RoutedMessage) {
         val now = Instant.now()
         sentMessages.increment()
-        sentMessagesByTopic(msg.topic).increment()
+        sentMessagesByTopic(msg.targetTopic).increment()
 
         processingTime.record(Duration.between(msg.message.wtpFrame.receivedAt, now))
         totalLatency.record(Duration.between(epochMicroToInstant(msg.message.header.lastEpochMicrosec), now))
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
index d15dcce..aed4d92 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
@@ -23,6 +23,7 @@
 import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration
 import org.onap.dcae.collectors.veshv.factory.CollectorFactory
 import org.onap.dcae.collectors.veshv.factory.ServerFactory
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
 import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory
 import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
 import org.onap.dcae.collectors.veshv.model.ServiceContext
@@ -59,9 +60,10 @@
     private fun initializeCollectorFactory(config: HvVesConfiguration): CollectorFactory =
             CollectorFactory(
                     AdapterFactory.configurationProvider(config.cbs),
-                    AdapterFactory.sinkCreatorFactory(config.collector),
+                    AdapterFactory.sinkCreatorFactory(),
                     MicrometerMetrics.INSTANCE,
-                    config.server.maxPayloadSizeBytes
+                    config.server.maxPayloadSizeBytes,
+                    HealthState.INSTANCE
             )
 
     private fun logServerStarted(handle: ServerHandle) =
diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
index e452a5f..f260f15 100644
--- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
+++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
@@ -19,6 +19,7 @@
  */
 package org.onap.dcae.collectors.veshv.main
 
+import arrow.core.Option
 import arrow.core.Try
 import io.micrometer.core.instrument.Counter
 import io.micrometer.core.instrument.Gauge
@@ -44,6 +45,7 @@
 import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
 import org.onap.dcae.collectors.veshv.tests.utils.vesEvent
 import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrame
+import org.onap.ves.VesEventOuterClass
 import java.time.Instant
 import java.time.temporal.Temporal
 import java.util.concurrent.TimeUnit
@@ -383,23 +385,24 @@
 })
 
 fun routedMessage(topic: String, partition: Int = 0) =
-        vesEvent().let { evt ->
-            RoutedMessage(topic, partition,
-                    VesMessage(evt.commonEventHeader, wireProtocolFrame(evt)))
-        }
+        vesEvent().run { toRoutedMessage(topic, partition) }
 
 fun routedMessageReceivedAt(topic: String, receivedAt: Temporal, partition: Int = 0) =
-        vesEvent().let { evt ->
-            RoutedMessage(topic, partition,
-                    VesMessage(evt.commonEventHeader, wireProtocolFrame(evt).copy(receivedAt = receivedAt)))
-        }
+        vesEvent().run { toRoutedMessage(topic, partition, receivedAt) }
 
 fun routedMessageSentAt(topic: String, sentAt: Instant, partition: Int = 0) =
-        vesEvent().let { evt ->
-            val builder = evt.toBuilder()
+        vesEvent().run {
+            val builder = toBuilder()
             builder.commonEventHeaderBuilder.lastEpochMicrosec = sentAt.epochSecond * 1000000 + sentAt.nano / 1000
-            builder.build()
-        }.let { evt ->
-            RoutedMessage(topic, partition,
-                    VesMessage(evt.commonEventHeader, wireProtocolFrame(evt)))
-        }
\ No newline at end of file
+            builder.build().toRoutedMessage(topic, partition)
+        }
+
+private fun VesEventOuterClass.VesEvent.toRoutedMessage(topic: String,
+                                                        partition: Int,
+                                                        receivedAt: Temporal = Instant.now()) =
+        RoutedMessage(
+                VesMessage(this.commonEventHeader, wireProtocolFrame(this).copy(receivedAt = receivedAt)),
+                topic,
+                Option.just(partition)
+        )
+