Enable setting log level from command line

Change-Id: I8397e0134d254cd5b6be79ed2b847ce265fc775c
Signed-off-by: Izabela Zawadzka <izabela.zawadzka@nokia.com>
Issue-ID: DCAEGEN2-1045
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
index 7e5044f..52aa270 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
@@ -20,6 +20,7 @@
 package org.onap.dcae.collectors.veshv.model
 
 import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
+import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
 import java.net.InetSocketAddress
 import java.time.Duration
 
@@ -28,11 +29,14 @@
  * @since May 2018
  */
 data class ServerConfiguration(
-        val serverListenAddress: InetSocketAddress,
-        val kafkaConfiguration: KafkaConfiguration,
-        val configurationProviderParams: ConfigurationProviderParams,
-        val securityConfiguration: SecurityConfiguration,
-        val idleTimeout: Duration,
-        val healthCheckApiListenAddress: InetSocketAddress,
-        val maximumPayloadSizeBytes: Int,
-        val dummyMode: Boolean = false)
+    val serverListenAddress: InetSocketAddress,
+    val kafkaConfiguration: KafkaConfiguration,
+    val configurationProviderParams: ConfigurationProviderParams,
+    val securityConfiguration: SecurityConfiguration,
+    val idleTimeout: Duration,
+    val healthCheckApiListenAddress: InetSocketAddress,
+    val maximumPayloadSizeBytes: Int,
+    val logLevel: LogLevel,
+    val dummyMode: Boolean = false
+)
+
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt
index ae87f1c..2311b2b 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt
@@ -21,6 +21,7 @@
 
 import arrow.core.Option
 import arrow.core.fix
+import arrow.core.getOrElse
 import arrow.instances.option.monad.monad
 import arrow.typeclasses.binding
 import org.apache.commons.cli.CommandLine
@@ -30,7 +31,7 @@
 import org.onap.dcae.collectors.veshv.model.KafkaConfiguration
 import org.onap.dcae.collectors.veshv.model.ServerConfiguration
 import org.onap.dcae.collectors.veshv.ssl.boundary.createSecurityConfiguration
-import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration
+import org.onap.dcae.collectors.veshv.utils.commandline.*
 import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CONSUL_CONFIG_URL
 import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_SERVERS
 import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CONSUL_FIRST_REQUEST_DELAY
@@ -45,73 +46,90 @@
 import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.SSL_DISABLE
 import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_STORE_FILE
 import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_STORE_PASSWORD
-import org.onap.dcae.collectors.veshv.utils.commandline.hasOption
-import org.onap.dcae.collectors.veshv.utils.commandline.intValue
-import org.onap.dcae.collectors.veshv.utils.commandline.longValue
-import org.onap.dcae.collectors.veshv.utils.commandline.stringValue
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LOG_LEVEL
+import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import java.net.InetSocketAddress
 import java.time.Duration
 
+
 internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration>(DefaultParser()) {
     override val cmdLineOptionsList = listOf(
-            KAFKA_SERVERS,
-            HEALTH_CHECK_API_PORT,
-            LISTEN_PORT,
-            CONSUL_CONFIG_URL,
-            CONSUL_FIRST_REQUEST_DELAY,
-            CONSUL_REQUEST_INTERVAL,
-            SSL_DISABLE,
-            KEY_STORE_FILE,
-            KEY_STORE_PASSWORD,
-            TRUST_STORE_FILE,
-            TRUST_STORE_PASSWORD,
-            IDLE_TIMEOUT_SEC,
-            MAXIMUM_PAYLOAD_SIZE_BYTES,
-            DUMMY_MODE
+        KAFKA_SERVERS,
+        HEALTH_CHECK_API_PORT,
+        LISTEN_PORT,
+        CONSUL_CONFIG_URL,
+        CONSUL_FIRST_REQUEST_DELAY,
+        CONSUL_REQUEST_INTERVAL,
+        SSL_DISABLE,
+        KEY_STORE_FILE,
+        KEY_STORE_PASSWORD,
+        TRUST_STORE_FILE,
+        TRUST_STORE_PASSWORD,
+        IDLE_TIMEOUT_SEC,
+        MAXIMUM_PAYLOAD_SIZE_BYTES,
+        DUMMY_MODE,
+        LOG_LEVEL
     )
 
     override fun getConfiguration(cmdLine: CommandLine): Option<ServerConfiguration> =
-            Option.monad().binding {
-                val healthCheckApiPort = cmdLine.intValue(
-                        HEALTH_CHECK_API_PORT,
-                        DefaultValues.HEALTH_CHECK_API_PORT
-                )
-                val kafkaServers = cmdLine.stringValue(KAFKA_SERVERS).bind()
-                val listenPort = cmdLine.intValue(LISTEN_PORT).bind()
-                val idleTimeoutSec = cmdLine.longValue(IDLE_TIMEOUT_SEC, DefaultValues.IDLE_TIMEOUT_SEC)
-                val maxPayloadSizeBytes = cmdLine.intValue(MAXIMUM_PAYLOAD_SIZE_BYTES,
-                        DefaultValues.MAX_PAYLOAD_SIZE_BYTES)
-                val dummyMode = cmdLine.hasOption(DUMMY_MODE)
-                val security = createSecurityConfiguration(cmdLine).bind()
-                val configurationProviderParams = createConfigurationProviderParams(cmdLine).bind()
-                ServerConfiguration(
-                        serverListenAddress = InetSocketAddress(listenPort),
-                        kafkaConfiguration = KafkaConfiguration(kafkaServers),
-                        healthCheckApiListenAddress = InetSocketAddress(healthCheckApiPort),
-                        configurationProviderParams = configurationProviderParams,
-                        securityConfiguration = security,
-                        idleTimeout = Duration.ofSeconds(idleTimeoutSec),
-                        maximumPayloadSizeBytes = maxPayloadSizeBytes,
-                        dummyMode = dummyMode)
-            }.fix()
+        Option.monad().binding {
+            val healthCheckApiPort = cmdLine.intValue(
+                HEALTH_CHECK_API_PORT,
+                DefaultValues.HEALTH_CHECK_API_PORT
+            )
+            val kafkaServers = cmdLine.stringValue(KAFKA_SERVERS).bind()
+            val listenPort = cmdLine.intValue(LISTEN_PORT).bind()
+            val idleTimeoutSec = cmdLine.longValue(IDLE_TIMEOUT_SEC, DefaultValues.IDLE_TIMEOUT_SEC)
+            val maxPayloadSizeBytes = cmdLine.intValue(
+                MAXIMUM_PAYLOAD_SIZE_BYTES,
+                DefaultValues.MAX_PAYLOAD_SIZE_BYTES
+            )
+            val dummyMode = cmdLine.hasOption(DUMMY_MODE)
+            val security = createSecurityConfiguration(cmdLine).bind()
+            val logLevel = cmdLine.stringValue(LOG_LEVEL, DefaultValues.LOG_LEVEL)
+            val configurationProviderParams = createConfigurationProviderParams(cmdLine).bind()
+            ServerConfiguration(
+                serverListenAddress = InetSocketAddress(listenPort),
+                kafkaConfiguration = KafkaConfiguration(kafkaServers),
+                healthCheckApiListenAddress = InetSocketAddress(healthCheckApiPort),
+                configurationProviderParams = configurationProviderParams,
+                securityConfiguration = security,
+                idleTimeout = Duration.ofSeconds(idleTimeoutSec),
+                maximumPayloadSizeBytes = maxPayloadSizeBytes,
+                dummyMode = dummyMode,
+                logLevel = determineLogLevel(logLevel)
+            )
+        }.fix()
+
 
     private fun createConfigurationProviderParams(cmdLine: CommandLine): Option<ConfigurationProviderParams> =
-            Option.monad().binding {
-                val configUrl = cmdLine.stringValue(CONSUL_CONFIG_URL).bind()
-                val firstRequestDelay = cmdLine.longValue(
-                        CONSUL_FIRST_REQUEST_DELAY,
-                        DefaultValues.CONSUL_FIRST_REQUEST_DELAY
-                )
-                val requestInterval = cmdLine.longValue(
-                        CONSUL_REQUEST_INTERVAL,
-                        DefaultValues.CONSUL_REQUEST_INTERVAL
-                )
-                ConfigurationProviderParams(
-                        configUrl,
-                        Duration.ofSeconds(firstRequestDelay),
-                        Duration.ofSeconds(requestInterval)
-                )
-            }.fix()
+        Option.monad().binding {
+            val configUrl = cmdLine.stringValue(CONSUL_CONFIG_URL).bind()
+            val firstRequestDelay = cmdLine.longValue(
+                CONSUL_FIRST_REQUEST_DELAY,
+                DefaultValues.CONSUL_FIRST_REQUEST_DELAY
+            )
+            val requestInterval = cmdLine.longValue(
+                CONSUL_REQUEST_INTERVAL,
+                DefaultValues.CONSUL_REQUEST_INTERVAL
+            )
+            ConfigurationProviderParams(
+                configUrl,
+                Duration.ofSeconds(firstRequestDelay),
+                Duration.ofSeconds(requestInterval)
+            )
+        }.fix()
+
+    private fun determineLogLevel(logLevel: String) = LogLevel.optionFromString(logLevel)
+        .getOrElse {
+            logger.warn {
+                "Failed to parse $logLevel as $LOG_LEVEL command line. " +
+                        "Using default log level (${DefaultValues.LOG_LEVEL})"
+            }
+            LogLevel.valueOf(DefaultValues.LOG_LEVEL)
+        }
+
 
     internal object DefaultValues {
         const val HEALTH_CHECK_API_PORT = 6060
@@ -119,5 +137,10 @@
         const val CONSUL_REQUEST_INTERVAL = 5L
         const val IDLE_TIMEOUT_SEC = 60L
         const val MAX_PAYLOAD_SIZE_BYTES = WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES
+        val LOG_LEVEL = LogLevel.INFO.name
+    }
+
+    companion object {
+        private val logger = Logger(ArgVesHvConfiguration::class)
     }
 }
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
index 5c9566c..c29c5d1 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
@@ -31,25 +31,28 @@
 import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 
-private val logger = Logger("org.onap.dcae.collectors.veshv.main")
-private const val PROGRAM_NAME = "java org.onap.dcae.collectors.veshv.main.MainKt"
+private const val VESHV_PACKAGE = "org.onap.dcae.collectors.veshv"
+private val logger = Logger("$VESHV_PACKAGE.main")
+private const val PROGRAM_NAME = "java $VESHV_PACKAGE.main.MainKt"
 
 fun main(args: Array<String>) =
-        ArgVesHvConfiguration().parse(args)
-                .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME))
-                .map(::startAndAwaitServers)
-                .unsafeRunEitherSync(
-                        { ex ->
-                            logger.withError { log("Failed to start a server", ex) }
-                            ExitFailure(1)
-                        },
-                        { logger.info { "Gentle shutdown" } }
-                )
+    ArgVesHvConfiguration().parse(args)
+        .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME))
+        .map(::startAndAwaitServers)
+        .unsafeRunEitherSync(
+            { ex ->
+                logger.withError { log("Failed to start a server", ex) }
+                ExitFailure(1)
+            },
+            { logger.info { "Gentle shutdown" } }
+        )
 
 private fun startAndAwaitServers(config: ServerConfiguration) =
-        IO.monad().binding {
-            logger.info { "Using configuration: $config" }
-            HealthCheckServer.start(config).bind()
-            VesServer.start(config).bind()
-                    .await().bind()
-        }.fix()
+    IO.monad().binding {
+        Logger.setLogLevel(VESHV_PACKAGE, config.logLevel)
+        logger.info { "Using configuration: $config" }
+        HealthCheckServer.start(config).bind()
+        VesServer.start(config).bind()
+            .await().bind()
+    }.fix()
+
diff --git a/sources/hv-collector-main/src/main/resources/logback.xml b/sources/hv-collector-main/src/main/resources/logback.xml
index c88b8aa..b54dc36 100644
--- a/sources/hv-collector-main/src/main/resources/logback.xml
+++ b/sources/hv-collector-main/src/main/resources/logback.xml
@@ -84,10 +84,6 @@
         </rollingPolicy>
     </appender>
 
-    <logger name="org.onap.dcae.collectors.veshv" level="TRACE"/>
-    <logger name="org.onap.dcae.collectors.veshv.impl.wire" level="TRACE"/>
-    <logger name="org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSink" level="TRACE"/>
-    <logger name="org.onap.dcae.collectors.veshv.impl.adapters.LoggingSinkProvider" level="TRACE"/>
     <logger name="reactor.netty" level="WARN"/>
     <logger name="io.netty" level="DEBUG"/>
     <logger name="io.netty.util" level="WARN"/>
diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt
index 9dddeca..03bf44f 100644
--- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt
+++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt
@@ -30,6 +30,7 @@
 import org.onap.dcae.collectors.veshv.tests.utils.parseExpectingFailure
 import org.onap.dcae.collectors.veshv.tests.utils.parseExpectingSuccess
 import org.onap.dcae.collectors.veshv.utils.commandline.WrongArgumentError
+import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
 import java.time.Duration
 import kotlin.test.assertNotNull
 
@@ -47,6 +48,7 @@
     val listenPort = "6969"
     val keyStorePassword = "kspass"
     val trustStorePassword = "tspass"
+    val logLevel = LogLevel.DEBUG.name
 
     beforeEachTest {
         cut = ArgVesHvConfiguration()
@@ -58,16 +60,17 @@
 
             beforeEachTest {
                 result = cut.parseExpectingSuccess(
-                        "--kafka-bootstrap-servers", kafkaBootstrapServers,
-                        "--health-check-api-port", healthCheckApiPort,
-                        "--listen-port", listenPort,
-                        "--config-url", configurationUrl,
-                        "--first-request-delay", firstRequestDelay,
-                        "--request-interval", requestInterval,
-                        "--key-store", "/tmp/keys.p12",
-                        "--trust-store", "/tmp/trust.p12",
-                        "--key-store-password", keyStorePassword,
-                        "--trust-store-password", trustStorePassword
+                    "--kafka-bootstrap-servers", kafkaBootstrapServers,
+                    "--health-check-api-port", healthCheckApiPort,
+                    "--listen-port", listenPort,
+                    "--config-url", configurationUrl,
+                    "--first-request-delay", firstRequestDelay,
+                    "--request-interval", requestInterval,
+                    "--key-store", "/tmp/keys.p12",
+                    "--trust-store", "/tmp/trust.p12",
+                    "--key-store-password", keyStorePassword,
+                    "--trust-store-password", trustStorePassword,
+                    "--log-level", logLevel
                 )
             }
 
@@ -94,17 +97,17 @@
 
             it("should set proper first consul request delay") {
                 assertThat(result.configurationProviderParams.firstRequestDelay)
-                        .isEqualTo(Duration.ofSeconds(firstRequestDelay.toLong()))
+                    .isEqualTo(Duration.ofSeconds(firstRequestDelay.toLong()))
             }
 
             it("should set proper consul request interval") {
                 assertThat(result.configurationProviderParams.requestInterval)
-                        .isEqualTo(Duration.ofSeconds(requestInterval.toLong()))
+                    .isEqualTo(Duration.ofSeconds(requestInterval.toLong()))
             }
 
             it("should set proper config url") {
                 assertThat(result.configurationProviderParams.configurationUrl)
-                        .isEqualTo(configurationUrl)
+                    .isEqualTo(configurationUrl)
             }
 
             it("should set proper security configuration") {
@@ -116,29 +119,78 @@
                 assertThat(keys.keyStorePassword).isEqualTo(keyStorePassword.toCharArray())
                 assertThat(keys.trustStorePassword).isEqualTo(trustStorePassword.toCharArray())
             }
+
+            it("should set proper log level") {
+                assertThat(result.logLevel).isEqualTo(LogLevel.DEBUG)
+            }
         }
 
         describe("required parameter is absent") {
             on("missing listen port") {
                 it("should throw exception") {
-                    assertThat(cut.parseExpectingFailure(
+                    assertThat(
+                        cut.parseExpectingFailure(
                             "--config-url", configurationUrl,
                             "--ssl-disable",
                             "--first-request-delay", firstRequestDelay,
-                            "--request-interval", requestInterval)
+                            "--request-interval", requestInterval
+                        )
                     ).isInstanceOf(WrongArgumentError::class.java)
                 }
             }
             on("missing configuration url") {
                 it("should throw exception") {
-                    assertThat(cut.parseExpectingFailure(
+                    assertThat(
+                        cut.parseExpectingFailure(
                             "--listen-port", listenPort,
                             "--ssl-disable",
                             "--first-request-delay", firstRequestDelay,
-                            "--request-interval", requestInterval)
+                            "--request-interval", requestInterval
+                        )
                     ).isInstanceOf(WrongArgumentError::class.java)
                 }
             }
         }
+
+        describe("correct log level not provided") {
+            on("missing log level") {
+                it("should set default INFO value") {
+                    val config = cut.parseExpectingSuccess(
+                        "--kafka-bootstrap-servers", kafkaBootstrapServers,
+                        "--health-check-api-port", healthCheckApiPort,
+                        "--listen-port", listenPort,
+                        "--config-url", configurationUrl,
+                        "--first-request-delay", firstRequestDelay,
+                        "--request-interval", requestInterval,
+                        "--key-store", "/tmp/keys.p12",
+                        "--trust-store", "/tmp/trust.p12",
+                        "--key-store-password", keyStorePassword,
+                        "--trust-store-password", trustStorePassword
+                    )
+
+                    assertThat(config.logLevel).isEqualTo(LogLevel.INFO)
+                }
+            }
+
+            on("incorrect log level") {
+                it("should set default INFO value") {
+                    val config = cut.parseExpectingSuccess(
+                        "--kafka-bootstrap-servers", kafkaBootstrapServers,
+                        "--health-check-api-port", healthCheckApiPort,
+                        "--listen-port", listenPort,
+                        "--config-url", configurationUrl,
+                        "--first-request-delay", firstRequestDelay,
+                        "--request-interval", requestInterval,
+                        "--key-store", "/tmp/keys.p12",
+                        "--trust-store", "/tmp/trust.p12",
+                        "--key-store-password", keyStorePassword,
+                        "--trust-store-password", trustStorePassword,
+                        "--log-level", "1"
+                    )
+
+                    assertThat(config.logLevel).isEqualTo(LogLevel.INFO)
+                }
+            }
+        }
     }
 })
\ No newline at end of file
diff --git a/sources/hv-collector-utils/pom.xml b/sources/hv-collector-utils/pom.xml
index 2490767..5bd2472 100644
--- a/sources/hv-collector-utils/pom.xml
+++ b/sources/hv-collector-utils/pom.xml
@@ -105,7 +105,7 @@
         <dependency>
             <groupId>ch.qos.logback</groupId>
             <artifactId>logback-classic</artifactId>
-            <scope>test</scope>
+            <scope>compile</scope>
         </dependency>
         <dependency>
             <groupId>org.assertj</groupId>
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt
index 9439bff..e869901 100644
--- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt
@@ -23,118 +23,144 @@
 
 
 enum class CommandLineOption(val option: Option, val required: Boolean = false) {
-    HEALTH_CHECK_API_PORT(Option.builder("H")
+    HEALTH_CHECK_API_PORT(
+        Option.builder("H")
             .longOpt("health-check-api-port")
             .hasArg()
             .desc("Health check rest api listen port")
             .build()
     ),
-    LISTEN_PORT(Option.builder("p")
+    LISTEN_PORT(
+        Option.builder("p")
             .longOpt("listen-port")
             .hasArg()
             .desc("Listen port")
             .build(),
-            required = true
+        required = true
     ),
-    CONSUL_CONFIG_URL(Option.builder("c")
+    CONSUL_CONFIG_URL(
+        Option.builder("c")
             .longOpt("config-url")
             .hasArg()
             .desc("URL of ves configuration on consul")
             .build(),
-            required = true
+        required = true
     ),
-    CONSUL_FIRST_REQUEST_DELAY(Option.builder("d")
+    CONSUL_FIRST_REQUEST_DELAY(
+        Option.builder("d")
             .longOpt("first-request-delay")
             .hasArg()
             .desc("Delay of first request to consul in seconds")
             .build()
     ),
-    CONSUL_REQUEST_INTERVAL(Option.builder("I")
+    CONSUL_REQUEST_INTERVAL(
+        Option.builder("I")
             .longOpt("request-interval")
             .hasArg()
             .desc("Interval of consul configuration requests in seconds")
             .build()
     ),
-    VES_HV_PORT(Option.builder("v")
+    VES_HV_PORT(
+        Option.builder("v")
             .longOpt("ves-port")
             .hasArg()
             .desc("VesHvCollector port")
             .build(),
-            required = true
+        required = true
     ),
-    VES_HV_HOST(Option.builder("h")
+    VES_HV_HOST(
+        Option.builder("h")
             .longOpt("ves-host")
             .hasArg()
             .desc("VesHvCollector host")
             .build(),
-            required = true
+        required = true
     ),
-    KAFKA_SERVERS(Option.builder("s")
+    KAFKA_SERVERS(
+        Option.builder("s")
             .longOpt("kafka-bootstrap-servers")
             .hasArg()
             .desc("Comma-separated Kafka bootstrap servers in <host>:<port> format")
             .build(),
-            required = true
+        required = true
     ),
-    KAFKA_TOPICS(Option.builder("f")
+    KAFKA_TOPICS(
+        Option.builder("f")
             .longOpt("kafka-topics")
             .hasArg()
             .desc("Comma-separated Kafka topics")
             .build(),
-            required = true
+        required = true
     ),
-    SSL_DISABLE(Option.builder("l")
+    SSL_DISABLE(
+        Option.builder("l")
             .longOpt("ssl-disable")
             .desc("Disable SSL encryption")
             .build()
     ),
-    KEY_STORE_FILE(Option.builder("k")
+    KEY_STORE_FILE(
+        Option.builder("k")
             .longOpt("key-store")
             .hasArg()
             .desc("Key store in PKCS12 format")
             .build()
     ),
-    KEY_STORE_PASSWORD(Option.builder("kp")
+    KEY_STORE_PASSWORD(
+        Option.builder("kp")
             .longOpt("key-store-password")
             .hasArg()
             .desc("Key store password")
             .build()
     ),
-    TRUST_STORE_FILE(Option.builder("t")
+    TRUST_STORE_FILE(
+        Option.builder("t")
             .longOpt("trust-store")
             .hasArg()
             .desc("File with trusted certificate bundle in PKCS12 format")
             .build()
     ),
-    TRUST_STORE_PASSWORD(Option.builder("tp")
+    TRUST_STORE_PASSWORD(
+        Option.builder("tp")
             .longOpt("trust-store-password")
             .hasArg()
             .desc("Trust store password")
             .build()
     ),
-    IDLE_TIMEOUT_SEC(Option.builder("i")
+    IDLE_TIMEOUT_SEC(
+        Option.builder("i")
             .longOpt("idle-timeout-sec")
             .hasArg()
-            .desc("""Idle timeout for remote hosts. After given time without any data exchange the
-                |connection might be closed.""".trimMargin())
+            .desc(
+                """Idle timeout for remote hosts. After given time without any data exchange the
+                |connection might be closed.""".trimMargin()
+            )
             .build()
     ),
-    MAXIMUM_PAYLOAD_SIZE_BYTES(Option.builder("m")
+    MAXIMUM_PAYLOAD_SIZE_BYTES(
+        Option.builder("m")
             .longOpt("max-payload-size")
             .hasArg()
             .desc("Maximum supported payload size in bytes")
             .build()
     ),
-    DUMMY_MODE(Option.builder("u")
+    LOG_LEVEL(
+        Option.builder("ll")
+            .longOpt("log-level")
+            .hasArg()
+            .desc("Log level")
+            .build()
+    ),
+    DUMMY_MODE(
+        Option.builder("u")
             .longOpt("dummy")
             .desc("If present will start in dummy mode (dummy external services)")
             .build()
     );
 
     fun environmentVariableName(prefix: String = DEFAULT_ENV_PREFIX): String =
-            option.longOpt.toUpperCase().replace('-', '_').let { mainPart ->
-                "${prefix}_${mainPart}"
-            }
+        option.longOpt.toUpperCase().replace('-', '_').let { mainPart ->
+            "${prefix}_${mainPart}"
+        }
 
     companion object {
         private const val DEFAULT_ENV_PREFIX = "VESHV"
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/LogLevel.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/LogLevel.kt
new file mode 100644
index 0000000..ec782bf
--- /dev/null
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/LogLevel.kt
@@ -0,0 +1,41 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.utils.logging
+
+import arrow.core.Try
+import ch.qos.logback.classic.Level
+
+enum class LogLevel(private val logbackLevel: Level) {
+    ERROR(Level.ERROR),
+    WARN(Level.WARN),
+    INFO(Level.INFO),
+    DEBUG(Level.DEBUG),
+    TRACE(Level.TRACE);
+
+    operator fun invoke() = logbackLevel
+
+    companion object {
+        fun optionFromString(level: String) = Try {
+            valueOf(level.toUpperCase())
+        }.toOption()
+    }
+
+
+}
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt
index 938ba79..82ce50a 100644
--- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt
@@ -19,6 +19,7 @@
  */
 package org.onap.dcae.collectors.veshv.utils.logging
 
+import ch.qos.logback.classic.LoggerContext
 import kotlin.reflect.KClass
 import org.slf4j.LoggerFactory
 import org.slf4j.MDC
@@ -41,86 +42,92 @@
     fun withError(block: AtLevelLogger.() -> Unit) = errorLogger.block()
 
     fun withError(mdc: MappedDiagnosticContext, block: AtLevelLogger.() -> Unit) =
-            errorLogger.withMdc(mdc, block)
+        errorLogger.withMdc(mdc, block)
 
     fun error(message: () -> String) = errorLogger.run {
         log(message())
     }
 
     fun error(mdc: MappedDiagnosticContext, message: () -> String) =
-            errorLogger.withMdc(mdc) { log(message()) }
+        errorLogger.withMdc(mdc) { log(message()) }
 
     fun error(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) =
-            errorLogger.withMdc(mdc) { log(marker, message()) }
+        errorLogger.withMdc(mdc) { log(marker, message()) }
 
     // WARN
 
     fun withWarn(block: AtLevelLogger.() -> Unit) = warnLogger.block()
 
     fun withWarn(mdc: MappedDiagnosticContext, block: AtLevelLogger.() -> Unit) =
-            warnLogger.withMdc(mdc, block)
+        warnLogger.withMdc(mdc, block)
 
     fun warn(message: () -> String) = warnLogger.run {
         log(message())
     }
 
     fun warn(mdc: MappedDiagnosticContext, message: () -> String) =
-            warnLogger.withMdc(mdc) { log(message()) }
+        warnLogger.withMdc(mdc) { log(message()) }
 
     fun warn(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) =
-            warnLogger.withMdc(mdc) { log(marker, message()) }
+        warnLogger.withMdc(mdc) { log(marker, message()) }
 
     // INFO
 
     fun withInfo(block: AtLevelLogger.() -> Unit) = infoLogger.block()
 
     fun withInfo(mdc: MappedDiagnosticContext, block: AtLevelLogger.() -> Unit) =
-            infoLogger.withMdc(mdc, block)
+        infoLogger.withMdc(mdc, block)
 
     fun info(message: () -> String) = infoLogger.run {
         log(message())
     }
 
     fun info(mdc: MappedDiagnosticContext, message: () -> String) =
-            infoLogger.withMdc(mdc) { log(message()) }
+        infoLogger.withMdc(mdc) { log(message()) }
 
     fun info(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) =
-            infoLogger.withMdc(mdc) { log(marker, message()) }
+        infoLogger.withMdc(mdc) { log(marker, message()) }
 
     // DEBUG
 
     fun withDebug(block: AtLevelLogger.() -> Unit) = debugLogger.block()
 
     fun withDebug(mdc: MappedDiagnosticContext, block: AtLevelLogger.() -> Unit) =
-            debugLogger.withMdc(mdc, block)
+        debugLogger.withMdc(mdc, block)
 
     fun debug(message: () -> String) = debugLogger.run {
         log(message())
     }
 
     fun debug(mdc: MappedDiagnosticContext, message: () -> String) =
-            debugLogger.withMdc(mdc) { log(message()) }
+        debugLogger.withMdc(mdc) { log(message()) }
 
     fun debug(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) =
-            debugLogger.withMdc(mdc) { log(marker, message()) }
+        debugLogger.withMdc(mdc) { log(marker, message()) }
 
     // TRACE
 
     fun withTrace(block: AtLevelLogger.() -> Unit) = traceLogger.block()
 
     fun withTrace(mdc: MappedDiagnosticContext, block: AtLevelLogger.() -> Unit) =
-            traceLogger.withMdc(mdc, block)
+        traceLogger.withMdc(mdc, block)
 
     fun trace(message: () -> String) = traceLogger.run {
         log(message())
     }
 
     fun trace(mdc: MappedDiagnosticContext, message: () -> String) =
-            traceLogger.withMdc(mdc) { log(message()) }
+        traceLogger.withMdc(mdc) { log(message()) }
 
     fun trace(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) =
-            traceLogger.withMdc(mdc) { log(marker, message()) }
+        traceLogger.withMdc(mdc) { log(marker, message()) }
 
+    companion object {
+        fun setLogLevel(packageName: String, level: LogLevel) {
+            val loggerContext = LoggerFactory.getILoggerFactory() as LoggerContext
+            loggerContext.getLogger(packageName).level = level()
+        }
+    }
 }
 
 abstract class AtLevelLogger {
@@ -183,9 +190,9 @@
     }
 
     override fun log(marker: Marker, message: String) =
-            withAdditionalMdc(marker.mdc) {
-                logger.error(marker.slf4jMarker, message)
-            }
+        withAdditionalMdc(marker.mdc) {
+            logger.error(marker.slf4jMarker, message)
+        }
 }
 
 @Suppress("SuboptimalLoggerUsage")
@@ -199,9 +206,9 @@
     }
 
     override fun log(marker: Marker, message: String) =
-            withAdditionalMdc(marker.mdc) {
-                logger.warn(marker.slf4jMarker, message)
-            }
+        withAdditionalMdc(marker.mdc) {
+            logger.warn(marker.slf4jMarker, message)
+        }
 }
 
 @Suppress("SuboptimalLoggerUsage")
@@ -215,9 +222,9 @@
     }
 
     override fun log(marker: Marker, message: String) =
-            withAdditionalMdc(marker.mdc) {
-                logger.info(marker.slf4jMarker, message)
-            }
+        withAdditionalMdc(marker.mdc) {
+            logger.info(marker.slf4jMarker, message)
+        }
 }
 
 @Suppress("SuboptimalLoggerUsage")
@@ -231,9 +238,9 @@
     }
 
     override fun log(marker: Marker, message: String) =
-            withAdditionalMdc(marker.mdc) {
-                logger.debug(marker.slf4jMarker, message)
-            }
+        withAdditionalMdc(marker.mdc) {
+            logger.debug(marker.slf4jMarker, message)
+        }
 }
 
 @Suppress("SuboptimalLoggerUsage")
@@ -247,7 +254,7 @@
     }
 
     override fun log(marker: Marker, message: String) =
-            withAdditionalMdc(marker.mdc) {
-                logger.trace(marker.slf4jMarker, message)
-            }
+        withAdditionalMdc(marker.mdc) {
+            logger.trace(marker.slf4jMarker, message)
+        }
 }