Merge configurations

- changed temporarily HV-VES default log level to DEBUG as in current
implementation we are applying LogLevel defined in configuration file only
if we successfully retrieve one from configuration-module, which means
that inside of this module we are logging on default level (from logback
file). This should be fixed in future work
- reduced log level on SDK's CbsClientImpl as it's logging frequency was
too high

Change-Id: If50df18df099c34bfc36d39b045140f9b9ad87f6
Issue-ID: DCAEGEN2-1347
Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
diff --git a/development/bin/consul.sh b/development/bin/consul.sh
index 39f0bde..2a6bc0f 100755
--- a/development/bin/consul.sh
+++ b/development/bin/consul.sh
@@ -59,13 +59,16 @@
 DOMAIN=${1:-perf3gpp}
 TOPIC=${2:-HV_VES_PERF3GPP}
 
-CONFIGURATION="
-{
-    \"collector.routing\":
-        [{
-            \"fromDomain\": \"${DOMAIN}\",
-            \"toTopic\": \"${TOPIC}\"
-        }]
+CONFIGURATION="{
+  "streams_publishes": {
+    "${DOMAIN}": {
+      "type": "kafka",
+      "kafka_info": {
+        "bootstrap_servers": "message-router-kafka:9092",
+        "topic_name": "${TOPIC}"
+      }
+    }
+  }
 }"
 CONFIGURATION_ENDPOINT=localhost:8500/v1/kv/veshv-config
 
diff --git a/development/docker-compose.yml b/development/docker-compose.yml
index e85b520..d135e8b 100644
--- a/development/docker-compose.yml
+++ b/development/docker-compose.yml
@@ -43,12 +43,15 @@
     image: docker.io/consul:1.0.6
     restart: on-failure
     command: ["kv", "put", "-http-addr=http://consul-server:8500", "dcae-hv-ves-collector", '{
-                                              "collector.routing": [
-                                                {
-                                                  "fromDomain": "perf3gpp",
-                                                  "toTopic": "HV_VES_PERF3GPP"
+                                              "streams_publishes": {
+                                                "perf3gpp": {
+                                                  "type": "kafka",
+                                                  "kafka_info": {
+                                                    "bootstrap_servers": "message-router-kafka:9092",
+                                                    "topic_name": "HV_VES_PERF3GPP"
+                                                  }
                                                 }
-                                              ]
+                                              }
                                             }'
     ]
     depends_on:
@@ -74,6 +77,7 @@
       - "6061:6061/tcp"
     environment:
       JAVA_OPTS: "-Dio.netty.leakDetection.level=paranoid -Dlogback.configurationFile=/etc/ONAP/dcae-hv-ves-collector/logback.xml"
+      VESHV_CONFIGURATION_FILE: "/etc/ves-hv/configuration/base.json"
       CONSUL_HOST: "consul-server"
       CONFIG_BINDING_SERVICE: "CBS"
       HOSTNAME: "dcae-hv-ves-collector"
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
index 9684484..ccce62a 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
@@ -19,6 +19,7 @@
  */
 package org.onap.dcae.collectors.veshv.config.api
 
+import arrow.core.getOrElse
 import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration
 import org.onap.dcae.collectors.veshv.config.api.model.MissingArgumentException
 import org.onap.dcae.collectors.veshv.config.api.model.ValidationException
@@ -27,41 +28,56 @@
 import org.onap.dcae.collectors.veshv.config.impl.ConfigurationValidator
 import org.onap.dcae.collectors.veshv.config.impl.FileConfigurationReader
 import org.onap.dcae.collectors.veshv.config.impl.HvVesCommandLineParser
-import org.onap.dcae.collectors.veshv.utils.arrow.rightOrThrow
+import org.onap.dcae.collectors.veshv.config.impl.PartialConfiguration
 import org.onap.dcae.collectors.veshv.utils.arrow.throwOnLeft
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory
 import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties
 import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
 
 class ConfigurationModule {
 
     private val cmd = HvVesCommandLineParser()
     private val configReader = FileConfigurationReader()
     private val configValidator = ConfigurationValidator()
+    private val merger = ConfigurationMerger()
 
     fun healthCheckPort(args: Array<String>): Int = cmd.getHealthcheckPort(args)
 
     fun hvVesConfigurationUpdates(args: Array<String>,
                                   configStateListener: ConfigurationStateListener,
                                   mdc: MappedDiagnosticContext): Flux<HvVesConfiguration> =
-            Flux.just(cmd.getConfigurationFile(args))
-                    .throwOnLeft { MissingArgumentException(it.message, it.cause) }
-                    .map { it.reader().use(configReader::loadConfig) }
+            Mono.just(cmd.getConfigurationFile(args))
+                    .throwOnLeft(::MissingArgumentException)
+                    .map {
+                        logger.info { "Using base configuration file: ${it.absolutePath}" }
+                        it.reader().use(configReader::loadConfig)
+                    }
                     .cache()
-                    .flatMap { basePartialConfig ->
-                        val baseConfig = configValidator.validate(basePartialConfig)
-                                .rightOrThrow { ValidationException(it.message) }
-                        val cbsConfigProvider = CbsConfigurationProvider(
-                                CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()),
-                                baseConfig.cbs,
-                                configStateListener,
-                                mdc)
-                        val merger = ConfigurationMerger()
-                        cbsConfigProvider()
+                    .flatMapMany { basePartialConfig ->
+                        cbsConfigurationProvider(basePartialConfig, configStateListener, mdc)
+                                .invoke()
                                 .map { merger.merge(basePartialConfig, it) }
-                                .map { configValidator.validate(it) }
-                                .throwOnLeft { ValidationException(it.message) }
+                                .map(configValidator::validate)
+                                .throwOnLeft()
                     }
 
+    private fun cbsConfigurationProvider(basePartialConfig: PartialConfiguration,
+                                         configStateListener: ConfigurationStateListener,
+                                         mdc: MappedDiagnosticContext): CbsConfigurationProvider =
+            CbsConfigurationProvider(
+                    CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()),
+                    cbsConfigurationFrom(basePartialConfig),
+                    configStateListener,
+                    mdc)
+
+    private fun cbsConfigurationFrom(basePartialConfig: PartialConfiguration) =
+            configValidator.validatedCbsConfiguration(basePartialConfig)
+                    .getOrElse { throw ValidationException("Invalid CBS section defined in configuration file") }
+
+    companion object {
+        private val logger = Logger(ConfigurationModule::class)
+    }
 }
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/exceptions.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/exceptions.kt
index 2fc2982..bea7cc5 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/exceptions.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/exceptions.kt
@@ -19,6 +19,8 @@
  */
 package org.onap.dcae.collectors.veshv.config.api.model
 
-class MissingArgumentException(message: String, cause: Throwable?) : RuntimeException(message, cause)
+import org.onap.dcae.collectors.veshv.commandline.WrongArgumentError
+
+class MissingArgumentException(err: WrongArgumentError) : RuntimeException(err.message, err.cause)
 
 class ValidationException(message: String) : RuntimeException(message)
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt
index f044492..63d590a 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt
@@ -74,8 +74,6 @@
                                      updateOption: Option<PartialCollectorConfig>) =
             applyUpdate(baseOption, updateOption) { base, update ->
                 PartialCollectorConfig(
-                        base.maxRequestSizeBytes.updateToGivenOrNone(update.maxRequestSizeBytes),
-                        base.kafkaServers.updateToGivenOrNone(update.kafkaServers),
                         base.routing.updateToGivenOrNone(update.routing)
                 )
             }
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt
index 3e599b5..ead5655 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt
@@ -27,13 +27,13 @@
 import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
 import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration
 import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.ValidationException
 import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityConfiguration
 import org.onap.dcae.collectors.veshv.utils.arrow.OptionUtils.binding
 import org.onap.dcae.collectors.veshv.utils.arrow.mapBinding
+import org.onap.dcae.collectors.veshv.utils.arrow.doOnEmpty
 import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import java.net.InetSocketAddress
-import java.time.Duration
 
 /**
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
@@ -41,33 +41,35 @@
  */
 internal class ConfigurationValidator {
 
-    fun validate(partialConfig: PartialConfiguration)
-            : Either<ValidationError, HvVesConfiguration> = binding {
-        val logLevel = determineLogLevel(partialConfig.logLevel)
+    fun validate(partialConfig: PartialConfiguration) =
+            logger.info { "About to validate configuration: $partialConfig" }.let {
+                binding {
+                    val logLevel = determineLogLevel(partialConfig.logLevel)
 
-        val serverConfiguration = partialConfig.server.bind()
-                .let { createServerConfiguration(it).bind() }
+                    val serverConfiguration = validatedServerConfiguration(partialConfig)
+                            .doOnEmpty { logger.debug { "Cannot bind server configuration" } }
+                            .bind()
 
-        val cbsConfiguration = partialConfig.cbs.bind()
-                .let { createCbsConfiguration(it).bind() }
+                    val cbsConfiguration = validatedCbsConfiguration(partialConfig)
+                            .doOnEmpty { logger.debug { "Cannot bind cbs configuration" } }
+                            .bind()
 
-        val securityConfiguration = SecurityConfiguration(partialConfig.security.bind().keys)
+                    val securityConfiguration = SecurityConfiguration(partialConfig.security.bind().keys)
 
-// TOD0: retrieve when ConfigurationMerger is implemented
-//        val collectorConfiguration = partialConfig.collector.bind()
-//                .let { createCollectorConfig(it).bind() }
+                    val collectorConfiguration = validatedCollectorConfig(partialConfig)
+                            .doOnEmpty { logger.debug { "Cannot bind collector configuration" } }
+                            .bind()
 
-        HvVesConfiguration(
-                serverConfiguration,
-                cbsConfiguration,
-                securityConfiguration,
-// TOD0: swap when ConfigurationMerger is implemented
-//                    collectorConfiguration
-                CollectorConfiguration(emptyList()),
-// end TOD0
-                logLevel
-        )
-    }.toEither { ValidationError("Some required configuration options are missing") }
+                    HvVesConfiguration(
+                            serverConfiguration,
+                            cbsConfiguration,
+                            securityConfiguration,
+                            collectorConfiguration,
+                            logLevel
+                    )
+                }.toEither { ValidationException("Some required configuration options are missing") }
+            }
+
 
     private fun determineLogLevel(logLevel: Option<LogLevel>) =
             logLevel.getOrElse {
@@ -78,40 +80,38 @@
                 DEFAULT_LOG_LEVEL
             }
 
-    private fun createServerConfiguration(partial: PartialServerConfig) =
+    private fun validatedServerConfiguration(partial: PartialConfiguration) =
             partial.mapBinding {
-                ServerConfiguration(
-                        it.listenPort.bind(),
-                        it.idleTimeoutSec.bind(),
-                        it.maxPayloadSizeBytes.bind()
-                )
+                partial.server.bind().let {
+                    ServerConfiguration(
+                            it.listenPort.bind(),
+                            it.idleTimeoutSec.bind(),
+                            it.maxPayloadSizeBytes.bind()
+                    )
+                }
             }
 
-    private fun createCbsConfiguration(partial: PartialCbsConfig) =
+    fun validatedCbsConfiguration(partial: PartialConfiguration) =
             partial.mapBinding {
-                CbsConfiguration(
-                        it.firstRequestDelaySec.bind(),
-                        it.requestIntervalSec.bind()
-                )
+                it.cbs.bind().let {
+                    CbsConfiguration(
+                            it.firstRequestDelaySec.bind(),
+                            it.requestIntervalSec.bind()
+                    )
+                }
             }
 
-// TOD0: retrieve when ConfigurationMerger is implemented
-//    private fun createCollectorConfig(partial: PartialCollectorConfig) =
-//            partial.mapBinding {
-//                CollectorConfiguration(
-//                        it.maxRequestSizeBytes.bind(),
-//                        toKafkaServersString(it.kafkaServers.bind()),
-//                        it.routing.bind()
-//                )
-//            }
-
-    private fun toKafkaServersString(kafkaServers: List<InetSocketAddress>): String =
-            kafkaServers.joinToString(",") { "${it.hostName}:${it.port}" }
+    private fun validatedCollectorConfig(partial: PartialConfiguration) =
+            partial.mapBinding {
+                partial.collector.bind().let {
+                    CollectorConfiguration(
+                            it.routing.bind()
+                    )
+                }
+            }
 
     companion object {
         val DEFAULT_LOG_LEVEL = LogLevel.INFO
         private val logger = Logger(ConfigurationValidator::class)
     }
 }
-
-data class ValidationError(val message: String, val cause: Option<Throwable> = None)
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReader.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReader.kt
index 9513107..f6ae5be 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReader.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReader.kt
@@ -24,6 +24,7 @@
 import org.onap.dcae.collectors.veshv.config.impl.gsonadapters.DurationOfSecondsAdapter
 import org.onap.dcae.collectors.veshv.config.impl.gsonadapters.OptionAdapter
 import org.onap.dcae.collectors.veshv.config.impl.gsonadapters.SecurityAdapter
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
 
 import java.io.Reader
 import java.time.Duration
@@ -41,4 +42,9 @@
 
     fun loadConfig(input: Reader): PartialConfiguration =
             gson.fromJson(input, PartialConfiguration::class.java)
+                    .also { logger.info { "Successfully read file and parsed json to configuration: $it" } }
+
+    companion object {
+        private val logger = Logger(FileConfigurationReader::class)
+    }
 }
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt
index c1a9829..c6730a4 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt
@@ -45,7 +45,7 @@
                 it.stringValue(CONFIGURATION_FILE).map(::File)
             }.toEither {
                 WrongArgumentError(
-                        message = "Unexpected error when parsing command line arguments",
+                        message = "Base configuration filepath missing on command line",
                         cmdLineOptionsList = cmdLineOptionsList)
             }
 
@@ -53,8 +53,7 @@
             parse(args) {
                 it.intValue(HEALTH_CHECK_API_PORT)
             }.getOrElse {
-                logger.info { "Healthcheck port missing on command line," +
-                        " using default: $DEFAULT_HEALTHCHECK_PORT" }
+                logger.info { "Healthcheck port missing on command line, using default: $DEFAULT_HEALTHCHECK_PORT" }
                 DEFAULT_HEALTHCHECK_PORT
             }
 
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt
index f3c149c..b4e1bf6 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt
@@ -53,7 +53,5 @@
 internal data class PartialSecurityConfig(val keys: Option<SecurityKeys> = None)
 
 internal data class PartialCollectorConfig(
-        val maxRequestSizeBytes: Option<Int> = None,
-        val kafkaServers: Option<List<InetSocketAddress>> = None, // TOD0: remove properties and simplify this part
         val routing: Option<Routing> = None
 )
diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt
index 4b89488..55d06cd 100644
--- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt
+++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt
@@ -64,8 +64,6 @@
                             Some(mock())
                     )),
                     Some(PartialCollectorConfig(
-                            Some(4),
-                            Some(emptyList()),
                             someFromEmptyRouting
                     )),
                     None
@@ -103,8 +101,6 @@
                             securityKeys
                     )),
                     Some(PartialCollectorConfig(
-                            Some(4),
-                            Some(emptyList()),
                             someFromEmptyRouting
                     )),
                     Some(LogLevel.INFO)
@@ -152,8 +148,6 @@
                             securityKeys
                     )),
                     Some(PartialCollectorConfig(
-                            Some(4),
-                            Some(emptyList()),
                             someFromEmptyRouting
                     )),
                     Some(LogLevel.INFO)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
index 433e4d5..618b818 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -57,8 +57,6 @@
                     .transform(::filterInvalidWireFrame)
                     .transform(::decodeProtobufPayload)
                     .transform(::filterInvalidProtobufMessages)
-                    // TOD0: try to remove new flux creation in Sink interface to avoid two calls to handleErrors here
-                    .handleErrors()
                     .transform(::route)
                     .handleErrors()
                     .doFinally { releaseBuffersMemory() }
@@ -106,14 +104,14 @@
         }
     }
 
+    private fun releaseBuffersMemory() = wireChunkDecoder.release()
+            .also { logger.debug { "Released buffer memory after handling message stream" } }
+
     private fun <T> Flux<T>.handleErrors(): Flux<T> = onErrorResume {
         metrics.notifyClientRejected(ClientRejectionCause.fromThrowable(it))
         logger.handleReactiveStreamError(clientContext, it)
     }
 
-    private fun releaseBuffersMemory() = wireChunkDecoder.release()
-            .also { logger.debug { "Released buffer memory after handling message stream" } }
-
     private fun <T> Flux<T>.filterFailedWithLog(predicate: (T) -> MessageEither): Flux<T> =
             filterFailedWithLog(logger, clientContext::fullMdc, predicate)
 
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
index 92719e9..430f798 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
@@ -33,6 +33,7 @@
 import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
 import org.onap.dcae.collectors.veshv.model.MessageDropCause.KAFKA_FAILURE
 import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
+import org.onap.dcae.collectors.veshv.tests.component.Sut.Companion.MAX_PAYLOAD_SIZE_BYTES
 import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC
 import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
 import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
@@ -159,7 +160,7 @@
                     .isEqualTo(1)
         }
 
-        it("should gather metrics for sing errors") {
+        it("should gather metrics for sink errors") {
             val sut = vesHvWithAlwaysFailingSink(basicRouting)
 
             sut.handleConnection(vesWireFrameMessage(domain = PERF3GPP))
@@ -190,7 +191,7 @@
         given("rejection causes") {
             mapOf(
                     ClientRejectionCause.PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE to
-                            messageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1),
+                            messageWithPayloadOfSize(MAX_PAYLOAD_SIZE_BYTES + 1),
                     ClientRejectionCause.INVALID_WIRE_FRAME_MARKER to garbageFrame()
             ).forEach { cause, vesMessage ->
                 on("cause $cause") {
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
index 1217c47..f79c2e4 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
@@ -77,7 +77,7 @@
     override fun close() = collectorProvider.close()
 
     companion object {
-        const val MAX_PAYLOAD_SIZE_BYTES = 1024
+        const val MAX_PAYLOAD_SIZE_BYTES = 1024 * 1024
     }
 }
 
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
index 6a718ee..2430c74 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
@@ -23,7 +23,6 @@
 import org.assertj.core.api.Assertions.assertThat
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
-import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
 import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
 import org.onap.dcae.collectors.veshv.config.api.model.Routing
@@ -31,21 +30,17 @@
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
+import org.onap.dcae.collectors.veshv.tests.component.Sut.Companion.MAX_PAYLOAD_SIZE_BYTES
 import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC
 import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
 import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
 import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
-import org.onap.dcae.collectors.veshv.tests.fakes.alternativeRouting
-import org.onap.dcae.collectors.veshv.tests.fakes.emptyRouting
 import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting
 import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
 import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader
 import org.onap.dcae.collectors.veshv.tests.utils.messageWithPayloadOfSize
 import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage
 import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload
-import reactor.core.publisher.Flux
-import java.time.Duration
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -97,7 +92,7 @@
             val (sut, sink) = vesHvWithStoringSink()
             val validMessage = vesWireFrameMessage(PERF3GPP)
             val msgWithInvalidFrame = messageWithInvalidWireFrameHeader()
-            val msgWithTooBigPayload = messageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP)
+            val msgWithTooBigPayload = messageWithPayloadOfSize(MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP)
             val expectedRefCnt = 0
 
             val handledEvents = sut.handleConnection(
@@ -208,7 +203,7 @@
 
             val handledMessages = sut.handleConnection(sink,
                     vesWireFrameMessage(PERF3GPP, "first"),
-                    messageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP),
+                    messageWithPayloadOfSize(MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP),
                     vesWireFrameMessage(PERF3GPP))
 
             assertThat(handledMessages).hasSize(1)
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/routing.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/routing.kt
index e9914ef..8956e81 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/routing.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/routing.kt
@@ -27,7 +27,7 @@
 const val PERF3GPP_TOPIC = "HV_VES_PERF3GPP"
 const val ALTERNATE_PERF3GPP_TOPIC = "HV_VES_PERF3GPP_ALTERNATIVE"
 const val KAFKA_BOOTSTRAP_SERVERS = "kafka:9092"
-const val MAX_PAYLOAD_SIZE_BYTES = 1024 * 1024
+private const val MAX_PAYLOAD_SIZE_BYTES = 1024 * 1024
 
 private val perf3gppKafkaSink = ImmutableKafkaSink.builder()
         .name("PERF3GPP")
diff --git a/sources/hv-collector-main/Dockerfile b/sources/hv-collector-main/Dockerfile
index a1e89a7..cfd4a7b 100644
--- a/sources/hv-collector-main/Dockerfile
+++ b/sources/hv-collector-main/Dockerfile
@@ -1,6 +1,6 @@
 FROM docker.io/openjdk:11-jre-slim
 
-LABEL copyright="Copyright (C) 2018 NOKIA"
+LABEL copyright="Copyright (C) 2018-2019 NOKIA"
 LABEL license.name="The Apache Software License, Version 2.0"
 LABEL license.url="http://www.apache.org/licenses/LICENSE-2.0"
 LABEL maintainer="Nokia Wroclaw ONAP Team"
@@ -19,4 +19,4 @@
 COPY src/main/docker/*.sh ./
 COPY src/main/docker/base.json /etc/ves-hv/configuration/base.json
 
-COPY target/hv-collector-main-*.jar ./
+COPY target/hv-collector-main-*.jar ./
\ No newline at end of file
diff --git a/sources/hv-collector-main/src/main/docker/base.json b/sources/hv-collector-main/src/main/docker/base.json
index 67576c8..e0b9c45 100644
--- a/sources/hv-collector-main/src/main/docker/base.json
+++ b/sources/hv-collector-main/src/main/docker/base.json
@@ -10,17 +10,5 @@
     "requestIntervalSec": 5
   },
   "security": {
-  },
-  "collector": {
-    "maxRequestSizeBytes": 1048576,
-    "kafkaServers": [
-      "message-router-kafka:9092"
-    ],
-    "routing": [
-      {
-        "fromDomain": "perf3gpp",
-        "toTopic": "HV_VES_PERF3GPP"
-      }
-    ]
   }
 }
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
index 22d8000..dc207ef 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
@@ -29,7 +29,6 @@
 import org.onap.dcae.collectors.veshv.model.ServiceContext
 import org.onap.dcae.collectors.veshv.utils.ServerHandle
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.dcae.collectors.veshv.utils.neverComplete
 import org.onap.dcae.collectors.veshv.utils.registerShutdownHook
 import reactor.core.scheduler.Schedulers
 import java.util.concurrent.atomic.AtomicReference
@@ -57,20 +56,20 @@
                 HealthState.INSTANCE.changeState(HealthDescription.HEALTHY)
             }
             .doOnError {
-                logger.error(ServiceContext::mdc) { "Failed to acquire configuration ${it.message}" }
-                logger.debug(ServiceContext::mdc) { "Detailed stack trace: $it" }
+                logger.error(ServiceContext::mdc) { "Failed to create configuration: ${it.message}" }
+                logger.withDebug(ServiceContext::mdc) { log("Detailed stack trace: ", it) }
                 HealthState.INSTANCE.changeState(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND)
             }
             .doOnNext(::startServer)
             .doOnError(::logServerStartFailed)
-            .neverComplete() // TODO: remove after merging configuration stream with cbs
+            .then()
             .block()
 }
 
 private fun startServer(config: HvVesConfiguration) {
     stopRunningServer()
     Logger.setLogLevel(VES_HV_PACKAGE, config.logLevel)
-    logger.info { "Using configuration: $config" }
+    logger.debug(ServiceContext::mdc) { "Configuration: $config" }
 
     VesServer.start(config).let {
         registerShutdownHook { shutdownGracefully(it) }
diff --git a/sources/hv-collector-main/src/main/resources/logback.xml b/sources/hv-collector-main/src/main/resources/logback.xml
index 40f3c8a..21c1fa3 100644
--- a/sources/hv-collector-main/src/main/resources/logback.xml
+++ b/sources/hv-collector-main/src/main/resources/logback.xml
@@ -20,9 +20,9 @@
 -->
 <configuration>
     <property name="COMPONENT_NAME"
-            value="dcae-hv-ves-collector"/>
+              value="dcae-hv-ves-collector"/>
     <property name="COMPONENT_SHORT_NAME"
-            value="hv-ves"/>
+              value="hv-ves"/>
 
     <property name="LOG_FILENAME" value="${COMPONENT_SHORT_NAME}"/>
     <property name="LOG_PATH" value="/var/log/ONAP/${COMPONENT_NAME}"/>
@@ -57,7 +57,8 @@
 | ${p_mdc}\t
 | ${p_thr}%n"/>
 
-    <property name="ONAP_LOG_PATTERN" value="%nopexception${p_log}|${p_tim}|${p_lvl}|${p_msg}|${p_mdc}|${p_exc}|${p_mak}|${p_thr}%n"/>
+    <property name="ONAP_LOG_PATTERN"
+              value="%nopexception${p_log}|${p_tim}|${p_lvl}|${p_msg}|${p_mdc}|${p_exc}|${p_mak}|${p_thr}%n"/>
 
     <property name="ONAP_LOG_PATTERN_FROM_WIKI" value="%nopexception%logger
 |%date{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC}
@@ -76,7 +77,7 @@
     </appender>
 
     <appender name="ROLLING-FILE"
-            class="ch.qos.logback.core.rolling.RollingFileAppender">
+              class="ch.qos.logback.core.rolling.RollingFileAppender">
         <encoder>
             <pattern>${ONAP_LOG_PATTERN}</pattern>
         </encoder>
@@ -93,8 +94,10 @@
     <logger name="io.netty" level="INFO"/>
     <logger name="io.netty.util" level="WARN"/>
     <logger name="org.apache.kafka" level="INFO"/>
+    <logger name="org.onap.dcaegen2.services.sdk" level="INFO"/>
+    <logger name="org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.CbsClientImpl" level="WARN"/>
 
-    <root level="INFO">
+    <root level="DEBUG">
         <appender-ref ref="CONSOLE"/>
         <appender-ref ref="ROLLING-FILE"/>
     </root>
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt
index d5b33b9..47b3d55 100644
--- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt
@@ -34,6 +34,7 @@
 import arrow.typeclasses.MonadContinuation
 import arrow.typeclasses.binding
 import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
 import java.util.concurrent.atomic.AtomicReference
 
 /**
@@ -57,8 +58,12 @@
 
 fun <A, B> Either<A, B>.rightOrThrow(mapper: (A) -> Throwable) = fold({ throw mapper(it) }, ::identity)
 
+fun <A : Exception, B> Flux<Either<A, B>>.throwOnLeft(): Flux<B> = map { it.rightOrThrow() }
+
 fun <A, B> Flux<Either<A, B>>.throwOnLeft(f: (A) -> Exception): Flux<B> = map { it.rightOrThrow(f) }
 
+fun <A, B> Mono<Either<A, B>>.throwOnLeft(f: (A) -> Exception): Mono<B> = map { it.rightOrThrow(f) }
+
 fun <A> AtomicReference<A>.getOption() = Option.fromNullable(get())
 
 fun <A> Option.Companion.fromNullablesChain(firstValue: A?, vararg nextValues: () -> A?): Option<A> =
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/reactive.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/reactive.kt
deleted file mode 100644
index aaa598d..0000000
--- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/reactive.kt
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2019 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.utils
-
-import reactor.core.publisher.Flux
-import reactor.core.publisher.Mono
-
-fun <T> Flux<T>.neverComplete(): Mono<Void> = then(Mono.never<T>()).then()
\ No newline at end of file