Harmonize logging and add new logs

- corrected docker-compose consul url

Change-Id: I78df868e0dd51008ef39d01553e6a0a3b8273a54
Issue-ID: DCAEGEN2-1003
Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
diff --git a/docker-compose.yml b/docker-compose.yml
index 2590928..4015b08 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -39,7 +39,7 @@
                  "-cp", "*:", "org.onap.dcae.collectors.veshv.main.MainKt"]
     command: ["--listen-port", "6061",
               "--health-check-api-port", "6060",
-              "--config-url", "http://consul:8500/v1/kv/veshv-config",
+              "--config-url", "http://consul:8500/v1/kv/veshv-config?raw=true",
               "--key-store-password", "onaponap",
               "--trust-store-password", "onaponap"]
     healthcheck:
diff --git a/pom.xml b/pom.xml
index 5a83fc8..621c5b2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -81,7 +81,6 @@
         <docker-image.name>${project.groupId}.${project.artifactId}</docker-image.name>
         <docker-image.latest>1.1-SNAPSHOT</docker-image.latest>
         <docker.http_proxy/>
-
     </properties>
 
     <build>
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt
index fb94907..9394075 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt
@@ -19,20 +19,38 @@
  */
 package org.onap.dcae.collectors.veshv.impl
 
+import arrow.core.Either
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 import org.onap.dcae.collectors.veshv.domain.headerRequiredFieldDescriptors
 import org.onap.dcae.collectors.veshv.domain.vesEventListenerVersionRegex
 import org.onap.dcae.collectors.veshv.model.VesMessage
 import org.onap.ves.VesEventOuterClass.CommonEventHeader
 
+typealias ValidationFailMessage = () -> String
+typealias ValidationSuccessMessage = () -> String
+typealias ValidationResult = Either<ValidationFailMessage, ValidationSuccessMessage>
+
 internal object MessageValidator {
 
-    fun isValid(message: VesMessage): Boolean {
-        return allMandatoryFieldsArePresent(message.header)
-    }
+    fun validateFrameMessage(message: WireFrameMessage): ValidationResult =
+            message.validate().fold({
+                Either.left { "Invalid wire frame header, reason: ${it.message}" }
+            }, {
+                Either.right { "Wire frame header is valid" }
+            })
+
+    fun validateProtobufMessage(message: VesMessage): ValidationResult =
+            if (message.isValid()) {
+                Either.right { "Protocol buffers message is valid" }
+            } else {
+                Either.left { "Unsupported protocol buffers message." }
+            }
+
+    fun VesMessage.isValid() = allMandatoryFieldsArePresent(this.header)
+            .and(vesEventListenerVersionRegex.matches(header.vesEventListenerVersion))
 
     private fun allMandatoryFieldsArePresent(header: CommonEventHeader) =
             headerRequiredFieldDescriptors
                     .all { fieldDescriptor -> header.hasField(fieldDescriptor) }
-                    .and(vesEventListenerVersionRegex.matches(header.vesEventListenerVersion))
 
 }
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt
index 1d43588..c670e1d 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt
@@ -20,7 +20,6 @@
 package org.onap.dcae.collectors.veshv.impl
 
 import arrow.core.Try
-import arrow.core.Option
 import org.onap.dcae.collectors.veshv.domain.ByteData
 import org.onap.dcae.collectors.veshv.model.VesMessage
 import org.onap.ves.VesEventOuterClass.VesEvent
@@ -31,9 +30,9 @@
  */
 internal class VesDecoder {
 
-    fun decode(bytes: ByteData): Option<VesMessage> =
+    fun decode(bytes: ByteData): Try<VesMessage> =
             Try {
                 val decodedHeader = VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader
                 VesMessage(decodedHeader, bytes)
-            }.toOption()
+            }
 }
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
index 2f12e0c..4176de9 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -19,17 +19,20 @@
  */
 package org.onap.dcae.collectors.veshv.impl
 
-import arrow.core.Option
+import arrow.core.Either
 import io.netty.buffer.ByteBuf
 import io.netty.buffer.ByteBufAllocator
 import org.onap.dcae.collectors.veshv.boundary.Collector
 import org.onap.dcae.collectors.veshv.boundary.Metrics
 import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.domain.ByteData
 import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
 import org.onap.dcae.collectors.veshv.model.RoutedMessage
 import org.onap.dcae.collectors.veshv.model.VesMessage
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.logging.filterFailedWithLog
+import org.onap.dcae.collectors.veshv.utils.logging.filterEmptyWithLog
 import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
@@ -49,9 +52,9 @@
             wireChunkDecoderSupplier(alloc).let { wireDecoder ->
                 dataStream
                         .transform { decodeWireFrame(it, wireDecoder) }
-                        .filter(WireFrameMessage::isValid)
-                        .transform(::decodePayload)
-                        .filter(VesMessage::isValid)
+                        .transform(::filterInvalidWireFrame)
+                        .transform(::decodeProtobufPayload)
+                        .transform(::filterInvalidProtobufMessages)
                         .transform(::routeMessage)
                         .onErrorResume { logger.handleReactiveStreamError(it) }
                         .doFinally { releaseBuffersMemory(wireDecoder) }
@@ -63,26 +66,38 @@
             .concatMap(decoder::decode)
             .doOnNext { metrics.notifyMessageReceived(it.payloadSize) }
 
-    private fun decodePayload(flux: Flux<WireFrameMessage>): Flux<VesMessage> = flux
+    private fun filterInvalidWireFrame(flux: Flux<WireFrameMessage>): Flux<WireFrameMessage> = flux
+            .filterFailedWithLog(MessageValidator::validateFrameMessage)
+
+    private fun decodeProtobufPayload(flux: Flux<WireFrameMessage>): Flux<VesMessage> = flux
             .map(WireFrameMessage::payload)
-            .map(protobufDecoder::decode)
-            .flatMap { omitWhenNone(it) }
+            .flatMap(::decodePayload)
+
+    private fun decodePayload(rawPayload: ByteData): Flux<VesMessage> = protobufDecoder
+            .decode(rawPayload)
+            .filterFailedWithLog(logger,
+                    { "Ves event header decoded successfully" },
+                    { "Failed to decode ves event header, reason: ${it.message}" })
+
+    private fun filterInvalidProtobufMessages(flux: Flux<VesMessage>): Flux<VesMessage> = flux
+            .filterFailedWithLog(MessageValidator::validateProtobufMessage)
 
     private fun routeMessage(flux: Flux<VesMessage>): Flux<RoutedMessage> = flux
             .flatMap(this::findRoute)
             .compose(sink::send)
             .doOnNext { metrics.notifyMessageSent(it.topic) }
 
-
-    private fun findRoute(msg: VesMessage): Mono<RoutedMessage> = omitWhenNone((router::findDestination)(msg))
-
-    private fun <V> omitWhenNone(it: Option<V>): Mono<V> = it.fold(
-            {
-                logger.info("ommiting the message" + 5)
-                Mono.empty() },
-            { Mono.just(it) })
+    private fun findRoute(msg: VesMessage) = router
+            .findDestination(msg)
+            .filterEmptyWithLog(logger,
+                    { "Found route for message: ${it.topic}, partition: ${it.partition}" },
+                    { "Could not find route for message" })
 
     private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) = wireChunkDecoder.release()
+            .also { logger.debug("Released buffer memory after handling message stream") }
+
+    fun <T> Flux<T>.filterFailedWithLog(predicate: (T) -> Either<() -> String, () -> String>) =
+            filterFailedWithLog(logger, predicate)
 
     companion object {
         private val logger = Logger(VesHvCollector::class)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
index ec7c60c..cea8a7e 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
@@ -82,8 +82,10 @@
     private fun filterDifferentValues(configurationString: String) =
             hashOf(configurationString).let {
                 if (it == lastConfigurationHash.get()) {
+                    logger.trace { "No change detected in consul configuration" }
                     Mono.empty()
                 } else {
+                    logger.info { "Obtained new configuration from consul:\n${configurationString}" }
                     lastConfigurationHash.set(it)
                     Mono.just(configurationString)
                 }
@@ -95,7 +97,6 @@
             Json.createReader(StringReader(responseString)).readObject()
 
     private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration {
-        logger.info { "Obtained new configuration from consul:\n${configuration}" }
         val routing = configuration.getJsonArray("collector.routing")
 
         return CollectorConfiguration(
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
index a0c2241..c4d6c87 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
@@ -25,6 +25,7 @@
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.onap.ves.VesEventOuterClass.CommonEventHeader
 import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
 import reactor.kafka.sender.KafkaSender
 import reactor.kafka.sender.SenderRecord
 import reactor.kafka.sender.SenderResult
@@ -40,8 +41,14 @@
     override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> {
         val records = messages.map(this::vesToKafkaRecord)
         val result = sender.send(records)
-                .doOnNext(::logException)
-                .filter(::isSuccessful)
+                .doOnNext {
+                    if (it.isSuccessful()) {
+                        Mono.just(it)
+                    } else {
+                        logger.warn(it.exception()) { "Failed to send message to Kafka" }
+                        Mono.empty<SenderResult<RoutedMessage>>()
+                    }
+                }
                 .map { it.correlationMetadata() }
 
         return if (logger.traceEnabled) {
@@ -61,12 +68,6 @@
                 msg)
     }
 
-    private fun logException(senderResult: SenderResult<out Any>) {
-        if (senderResult.exception() != null) {
-            logger.warn(senderResult.exception()) { "Failed to send message to Kafka" }
-        }
-    }
-
     private fun logSentMessage(sentMsg: RoutedMessage) {
         logger.trace {
             val msgNum = sentMessages.incrementAndGet()
@@ -74,7 +75,7 @@
         }
     }
 
-    private fun isSuccessful(senderResult: SenderResult<out Any>) = senderResult.exception() == null
+    private fun SenderResult<out Any>.isSuccessful() = exception() == null
 
     companion object {
         val logger = Logger(KafkaSink::class)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
index e535300..0b2997f 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
@@ -57,8 +57,12 @@
             sslContextFactory
                     .createSslContext(serverConfig.securityConfiguration)
                     .map { sslContext ->
+                        logger.info("Collector configured with SSL enabled")
                         this.secure { b -> b.sslContext(sslContext) }
-                    }.getOrElse { this }
+                    }.getOrElse {
+                        logger.info("Collector configured with SSL disabled")
+                        this
+                    }
 
     private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> =
             collectorProvider().fold(
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt
index f5bfcce..1965d78 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt
@@ -20,13 +20,10 @@
 package org.onap.dcae.collectors.veshv.model
 
 import org.onap.dcae.collectors.veshv.domain.ByteData
-import org.onap.dcae.collectors.veshv.impl.MessageValidator
 import org.onap.ves.VesEventOuterClass.CommonEventHeader
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since May 2018
  */
-data class VesMessage(val header: CommonEventHeader, val rawMessage: ByteData) {
-    fun isValid(): Boolean = MessageValidator.isValid(this)
-}
+data class VesMessage(val header: CommonEventHeader, val rawMessage: ByteData)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt
index bab95c5..437614a 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt
@@ -20,12 +20,21 @@
 package org.onap.dcae.collectors.veshv.model
 
 import arrow.core.Option
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.onap.ves.VesEventOuterClass.CommonEventHeader
 
 data class Routing(val routes: List<Route>) {
 
     fun routeFor(commonHeader: CommonEventHeader): Option<Route> =
-            Option.fromNullable(routes.find { it.applies(commonHeader) })
+            Option.fromNullable(routes.find { it.applies(commonHeader) }).also {
+                if (it.isEmpty()) {
+                    logger.debug { "No route is defined for domain: ${commonHeader.domain}" }
+                }
+            }
+
+    companion object {
+        private val logger = Logger(Routing::class)
+    }
 }
 
 data class Route(val domain: String, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int) {
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt
index 3090042..60bd767 100644
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt
@@ -19,21 +19,29 @@
  */
 package org.onap.dcae.collectors.veshv.impl
 
+import arrow.core.Either.Companion.left
+import arrow.core.Either.Companion.right
+import com.nhaarman.mockitokotlin2.doReturn
+import com.nhaarman.mockitokotlin2.mock
 import org.assertj.core.api.Assertions.assertThat
 import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
 import org.jetbrains.spek.api.dsl.on
 import org.onap.dcae.collectors.veshv.domain.ByteData
+import org.onap.dcae.collectors.veshv.domain.InvalidMajorVersion
 import org.onap.dcae.collectors.veshv.domain.VesEventDomain
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
 import org.onap.dcae.collectors.veshv.model.VesMessage
 import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
 import org.onap.dcae.collectors.veshv.tests.utils.vesEventBytes
 import org.onap.ves.VesEventOuterClass.CommonEventHeader.*
+import kotlin.test.assertTrue
 
 internal object MessageValidatorTest : Spek({
 
-    given("Message validator") {
+    describe("Message validator") {
         val cut = MessageValidator
 
         on("ves hv message including header with fully initialized fields") {
@@ -41,29 +49,35 @@
 
             it("should accept message with fully initialized message header") {
                 val vesMessage = VesMessage(commonHeader, vesEventBytes(commonHeader))
-                assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isTrue()
+                with(cut) {
+                    assertThat(vesMessage.isValid()).describedAs("message validation result").isTrue()
+                }
             }
 
-            VesEventDomain.values()
-                    .forEach { domain ->
-                        it("should accept message with $domain domain") {
-                            val header = commonHeader(domain)
-                            val vesMessage = VesMessage(header, vesEventBytes(header))
-                            assertThat(cut.isValid(vesMessage))
-                                    .isTrue()
-                        }
+            VesEventDomain.values().forEach { domain ->
+                it("should accept message with $domain domain") {
+                    val header = commonHeader(domain)
+                    val vesMessage = VesMessage(header, vesEventBytes(header))
+                    with(cut) {
+                        assertThat(vesMessage.isValid()).describedAs("message validation result").isTrue()
                     }
+                }
+            }
         }
 
         on("ves hv message bytes") {
             val vesMessage = VesMessage(getDefaultInstance(), ByteData.EMPTY)
             it("should not accept message with default header") {
-                assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isFalse()
+                with(cut) {
+                    assertThat(vesMessage.isValid()).describedAs("message validation result").isFalse()
+                }
             }
         }
 
         val priorityTestCases = mapOf(
                 Priority.PRIORITY_NOT_PROVIDED to false,
+                Priority.LOW to true,
+                Priority.MEDIUM to true,
                 Priority.HIGH to true
         )
 
@@ -73,8 +87,10 @@
                 val vesMessage = VesMessage(commonEventHeader, vesEventBytes(commonEventHeader))
 
                 it("should resolve validation result") {
-                    assertThat(cut.isValid(vesMessage)).describedAs("message validation results")
-                            .isEqualTo(expectedResult)
+                    with(cut) {
+                        assertThat(vesMessage.isValid()).describedAs("message validation results")
+                                .isEqualTo(expectedResult)
+                    }
                 }
             }
         }
@@ -90,7 +106,9 @@
 
             it("should not accept not fully initialized message header") {
                 val vesMessage = VesMessage(commonHeader, rawMessageBytes)
-                assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isFalse()
+                with(cut) {
+                    assertThat(vesMessage.isValid()).describedAs("message validation result").isFalse()
+                }
             }
         }
 
@@ -101,7 +119,9 @@
 
             it("should not accept message header") {
                 val vesMessage = VesMessage(commonHeader, rawMessageBytes)
-                assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isFalse()
+                with(cut) {
+                    assertThat(vesMessage.isValid()).describedAs("message validation result").isFalse()
+                }
             }
         }
 
@@ -111,7 +131,10 @@
 
             it("should not accept message header") {
                 val vesMessage = VesMessage(commonHeader, rawMessageBytes)
-                assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isFalse()
+
+                with(cut) {
+                    assertThat(vesMessage.isValid()).describedAs("message validation result").isFalse()
+                }
             }
         }
 
@@ -121,8 +144,60 @@
 
             it("should not accept message header") {
                 val vesMessage = VesMessage(commonHeader, rawMessageBytes)
-                assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isFalse()
+
+                with(cut) {
+                    assertThat(vesMessage.isValid()).describedAs("message validation result").isFalse()
+                }
             }
         }
+
+        describe("validating messages and converting to Either of string for validation result") {
+            given("WireFrameMessage") {
+                on("valid message as input") {
+                    val wireFrameMessage = WireFrameMessage("lets pretend it's valid".toByteArray())
+                    val mockedWireFrameMessage = mock<WireFrameMessage> {
+                        on { validate() } doReturn right(wireFrameMessage)
+                    }
+
+                    it("should be right") {
+                        assertTrue(cut.validateFrameMessage(mockedWireFrameMessage).isRight())
+                    }
+                }
+
+                on("invalid message as input") {
+                    val mockedWireFrameMessage = mock<WireFrameMessage> {
+                        on { validate() } doReturn left(InvalidMajorVersion(99))
+                    }
+
+                    it("should be left") {
+                        assertTrue(cut.validateFrameMessage(mockedWireFrameMessage).isLeft())
+                    }
+                }
+            }
+
+            given("VesEvent") {
+                with(cut) {
+                    on("valid message as input") {
+                        val commonHeader = commonHeader()
+                        val rawMessageBytes = vesEventBytes(commonHeader)
+                        val vesMessage = VesMessage(commonHeader, rawMessageBytes)
+
+                        it("should be right") {
+                            assertTrue(validateProtobufMessage(vesMessage).isRight())
+                        }
+                    }
+                }
+                on("invalid message as input") {
+                    val commonHeader = newBuilder().build()
+                    val rawMessageBytes = vesEventBytes(commonHeader)
+                    val vesMessage = VesMessage(commonHeader, rawMessageBytes)
+
+                    it("should be left") {
+                        assertTrue(cut.validateProtobufMessage(vesMessage).isLeft())
+                    }
+                }
+            }
+
+        }
     }
 })
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt
index 8950a55..cdee92c 100644
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt
@@ -20,6 +20,8 @@
 package org.onap.dcae.collectors.veshv.impl
 
 import arrow.core.Option
+import arrow.core.Try
+import arrow.core.success
 import com.google.protobuf.ByteString
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.given
@@ -30,6 +32,7 @@
 import org.onap.dcae.collectors.veshv.model.VesMessage
 import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
 import org.onap.dcae.collectors.veshv.tests.utils.vesEventBytes
+import reactor.test.test
 import java.nio.charset.Charset
 import kotlin.test.assertTrue
 import kotlin.test.fail
@@ -68,7 +71,7 @@
     }
 })
 
-private fun <A> assertFailedWithError(option: Option<A>) =
-        option.exists {
+private fun <A> assertFailedWithError(t: Try<A>) =
+        t.exists {
             fail("Error expected")
-        }
+        }
\ No newline at end of file
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/resources/logback.xml b/sources/hv-collector-dcae-app-simulator/src/main/resources/logback.xml
index 48da3b1..09ac357 100644
--- a/sources/hv-collector-dcae-app-simulator/src/main/resources/logback.xml
+++ b/sources/hv-collector-dcae-app-simulator/src/main/resources/logback.xml
@@ -27,7 +27,7 @@
     </appender>
 
   <logger name="org.onap.dcae.collectors.veshv" level="INFO"/>
-  <!--<logger name="reactor.ipc.netty" level="DEBUG"/>-->
+  <!--<logger name="reactor.netty" level="DEBUG"/>-->
 
   <root level="INFO">
       <appender-ref ref="CONSOLE"/>
diff --git a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/PayloadContentType.kt b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/PayloadContentType.kt
index 7cbf353..e4a1dd8 100644
--- a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/PayloadContentType.kt
+++ b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/PayloadContentType.kt
@@ -27,7 +27,7 @@
     GOOGLE_PROTOCOL_BUFFER(0x0001);
 
     companion object {
-        private val hexValues = PayloadContentType.values().map { it.hexValue }
+        val hexValues = PayloadContentType.values().map { it.hexValue }
 
         fun isValidHexValue(hex: Int) = hexValues.contains(hex)
     }
diff --git a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt
index 0d55ceb..4d60d62 100644
--- a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt
+++ b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt
@@ -46,3 +46,19 @@
 object MissingWireFrameHeaderBytes : MissingWireFrameBytes("readable bytes < header size")
 object MissingWireFramePayloadBytes : MissingWireFrameBytes("readable bytes < payload size")
 object EmptyWireFrame : MissingWireFrameBytes("empty wire frame")
+
+// WireFrameMessage validation exceptions
+
+sealed class WireFrameMessageValidationError(val message: String)
+
+class InvalidMajorVersion(actualVersion: Short) : WireFrameMessageValidationError(
+        "Invalid major version in wire frame header. " +
+                "Expected ${WireFrameMessage.SUPPORTED_VERSION_MAJOR} but was $actualVersion")
+
+class UnsupportedPayloadContentType(actualType: Int) : WireFrameMessageValidationError(
+        "Invalid content type in wire frame header. " +
+                "Expected one of ${PayloadContentType.hexValues}, but was $actualType")
+
+class NotMatchingPayloadSize(definedInHeader: Int, actual: Int) : WireFrameMessageValidationError(
+        "Payload size does not match one defined in wire frame header.\n" +
+                "Defined in header: $definedInHeader, but was: $actual")
diff --git a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt
index de37b14..1257c6b 100644
--- a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt
+++ b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt
@@ -19,6 +19,11 @@
  */
 package org.onap.dcae.collectors.veshv.domain
 
+import arrow.core.Either
+import arrow.core.Either.Companion.left
+import arrow.core.Either.Companion.right
+
+
 /**
  * Wire frame structure is presented bellow using ASN.1 notation. Please note that official supported specification
  * should be available on
@@ -62,10 +67,13 @@
             PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
             payload.size)
 
-    fun isValid(): Boolean =
-            versionMajor == SUPPORTED_VERSION_MAJOR
-                    && PayloadContentType.isValidHexValue(payloadType)
-                    && payload.size() == payloadSize
+    fun validate(): Either<WireFrameMessageValidationError, WireFrameMessage> =
+            when {
+                versionMajor != SUPPORTED_VERSION_MAJOR -> left(InvalidMajorVersion(versionMajor))
+                !PayloadContentType.isValidHexValue(payloadType) -> left(UnsupportedPayloadContentType(payloadType))
+                payload.size() != payloadSize -> left(NotMatchingPayloadSize(payload.size(), payloadSize))
+                else -> right(this)
+            }
 
     companion object {
         const val MARKER_BYTE: Short = 0xAA
diff --git a/sources/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt b/sources/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt
index f17a79b..f8fbc0a 100644
--- a/sources/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt
+++ b/sources/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt
@@ -60,7 +60,7 @@
                     payloadSize = 0)
 
             it("should fail validation") {
-                assertThat(input.isValid()).isFalse()
+                input.validate().assertFailedWithError { it.isInstanceOf(InvalidMajorVersion::class.java) }
             }
         }
 
@@ -73,7 +73,7 @@
                     payloadSize = 0)
 
             it("should pass validation") {
-                assertThat(input.isValid()).isTrue()
+                assertTrue(input.validate().isRight())
             }
         }
 
@@ -86,7 +86,7 @@
                     payloadSize = 0)
 
             it("should fail validation") {
-                assertThat(input.isValid()).isFalse()
+                input.validate().assertFailedWithError { it.isInstanceOf(UnsupportedPayloadContentType::class.java) }
             }
         }
 
@@ -99,7 +99,7 @@
                     payloadSize = 1)
 
             it("should fail validation") {
-                assertThat(input.isValid()).isFalse()
+                input.validate().assertFailedWithError { it.isInstanceOf(NotMatchingPayloadSize::class.java) }
             }
         }
 
@@ -112,7 +112,7 @@
                     payloadSize = 8)
 
             it("should fail validation") {
-                assertThat(input.isValid()).isFalse()
+                input.validate().assertFailedWithError { it.isInstanceOf(NotMatchingPayloadSize::class.java) }
             }
         }
 
@@ -126,7 +126,7 @@
                     payloadSize = payload.size)
 
             it("should pass validation") {
-                assertThat(input.isValid()).isTrue()
+                assertTrue(input.validate().isRight())
             }
         }
 
@@ -214,7 +214,7 @@
                         .writeByte(0xAB)
                 val decoded = decoder.decodeFirst(buff).getMessageOrFail()
 
-                assertThat(decoded.isValid()).describedAs("should be valid").isTrue()
+                assertTrue(decoded.validate().isRight(), "should be valid")
                 assertThat(buff.readableBytes()).isEqualTo(1)
             }
         }
diff --git a/sources/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt b/sources/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt
index f12d9ac..ac28200 100644
--- a/sources/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt
+++ b/sources/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt
@@ -25,6 +25,7 @@
 import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
 import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
 import org.onap.dcae.collectors.veshv.utils.ServerHandle
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
 import reactor.netty.http.server.HttpServer
@@ -55,9 +56,15 @@
 
     private fun readinessHandler(_req: HttpServerRequest, resp: HttpServerResponse) =
             healthDescription.get().run {
+                logger.debug { "HV-VES status: $status, $message" }
                 resp.status(status.httpResponseStatus.number).sendString(Flux.just(status.toString(), "\n", message))
             }
 
     private fun livenessHandler(_req: HttpServerRequest, resp: HttpServerResponse) =
             resp.status(HttpResponseStatus.NOT_IMPLEMENTED).sendString(Mono.just("Not implemented yet"))
+
+    companion object {
+        private val logger = Logger(HealthCheckApiServer::class)
+    }
+
 }
diff --git a/sources/hv-collector-main/src/main/resources/logback.xml b/sources/hv-collector-main/src/main/resources/logback.xml
index c76ff21..bee0dae 100644
--- a/sources/hv-collector-main/src/main/resources/logback.xml
+++ b/sources/hv-collector-main/src/main/resources/logback.xml
@@ -36,11 +36,14 @@
         </rollingPolicy>
     </appender>
 
-    <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/>
-    <logger name="org.onap.dcae.collectors.veshv.impl.wire" level="DEBUG"/>
-    <logger name="org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSink" level="DEBUG"/>
-    <logger name="org.onap.dcae.collectors.veshv.impl.adapters.LoggingSinkProvider" level="DEBUG"/>
-    <!--<logger name="reactor.ipc.netty" level="DEBUG"/>-->
+    <logger name="org.onap.dcae.collectors.veshv" level="TRACE"/>
+    <logger name="org.onap.dcae.collectors.veshv.impl.wire" level="TRACE"/>
+    <logger name="org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSink" level="TRACE"/>
+    <logger name="org.onap.dcae.collectors.veshv.impl.adapters.LoggingSinkProvider" level="TRACE"/>
+    <logger name="reactor.netty" level="WARN"/>
+    <logger name="io.netty" level="DEBUG"/>
+    <logger name="io.netty.util" level="WARN"/>
+    <logger name="org.apache.kafka" level="WARN"/>
 
     <root level="INFO">
         <appender-ref ref="CONSOLE"/>
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt
index 714702d..e8ec254 100644
--- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt
@@ -19,10 +19,48 @@
  */
 package org.onap.dcae.collectors.veshv.utils.logging
 
+import arrow.core.Either
+import arrow.core.Option
+import arrow.core.Try
 import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
 
 fun <T> Logger.handleReactiveStreamError(ex: Throwable, returnFlux: Flux<T> = Flux.empty()): Flux<T> {
-    logger.warn("Error while handling message stream: ${ex::class.qualifiedName} (${ex.message})")
+    logger.warn("Error while handling message stream: ${ex::class.qualifiedName} (${ex.localizedMessage})")
     logger.debug("Detailed stack trace", ex)
     return returnFlux
 }
+
+
+fun <T> Try<T>.filterFailedWithLog(logger: Logger,
+                                   acceptedMsg: (T) -> String,
+                                   rejectedMsg: (Throwable) -> String): Flux<T> =
+        fold({
+            logger.warn(rejectedMsg(it))
+            Flux.empty<T>()
+        }, {
+            logger.trace { acceptedMsg(it) }
+            Flux.just(it)
+        })
+
+fun <T> Option<T>.filterEmptyWithLog(logger: Logger,
+                                     acceptedMsg: (T) -> String,
+                                     rejectedMsg: () -> String): Flux<T> =
+        fold({
+            logger.warn(rejectedMsg)
+            Flux.empty<T>()
+        }, {
+            logger.trace { acceptedMsg(it) }
+            Flux.just(it)
+        })
+
+fun <T> Flux<T>.filterFailedWithLog(logger: Logger, predicate: (T) -> Either<() -> String, () -> String>) =
+        flatMap { t ->
+            predicate(t).fold({
+                logger.warn(it)
+                Mono.empty<T>()
+            }, {
+                logger.trace(it)
+                Mono.just<T>(t)
+            })
+        }
diff --git a/sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/ReactiveLoggingTest.kt b/sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/ReactiveLoggingTest.kt
new file mode 100644
index 0000000..0f359df
--- /dev/null
+++ b/sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/ReactiveLoggingTest.kt
@@ -0,0 +1,120 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.utils.logging
+
+import arrow.core.Either
+import arrow.core.Failure
+import arrow.core.Option
+import arrow.core.Try
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import reactor.core.publisher.Flux
+import reactor.test.test
+import kotlin.test.fail
+
+class ReactiveLoggingTest : Spek({
+
+    describe("filtering with log message") {
+        val logger = Logger("React")
+        val event = 5
+
+        describe("Try") {
+            given("successful Try") {
+                val cut = Try.just(event)
+
+                it("should not filter stream event and log accepted message") {
+                    cut.filterFailedWithLog(logger, ACCEPTED_MESSAGE, FAILED_WITH_EXCEPTION_MESSAGE)
+                            .test()
+                            .expectNext(event)
+                            .verifyComplete()
+                }
+            }
+
+            given("failed Try") {
+                val e = Exception()
+                val cut = Failure(e)
+                it("should filter stream event and log rejected message") {
+                    cut.filterFailedWithLog(logger, ACCEPTED_MESSAGE, FAILED_WITH_EXCEPTION_MESSAGE)
+                            .test()
+                            .verifyComplete()
+                }
+            }
+        }
+
+        describe("Option") {
+            given("Option with content") {
+                val cut = Option.just(event)
+
+                it("should not filter stream event and log accepted message") {
+                    cut.filterEmptyWithLog(logger, ACCEPTED_MESSAGE, FAILED_MESSAGE)
+                            .test()
+                            .expectNext(event)
+                            .verifyComplete()
+                }
+            }
+
+            given("empty Option") {
+                val cut = Option.empty<Int>()
+                it("should filter stream event and log rejected message") {
+                    cut.filterEmptyWithLog(logger, ACCEPTED_MESSAGE, FAILED_MESSAGE)
+                            .test()
+                            .verifyComplete()
+                }
+            }
+        }
+
+
+        describe("Either") {
+            given("successful Either (right)") {
+                val cut = Flux.just(event)
+
+                it("should not filter stream event and log accepted message") {
+                    cut.filterFailedWithLog(logger, right())
+                            .test()
+                            .expectNext(event)
+                            .verifyComplete()
+                }
+            }
+
+            given("failed Either (left)") {
+                val cut = Flux.just(event)
+
+                it("should filter stream event and log rejected message") {
+                    cut.filterFailedWithLog(logger, left())
+                            .test()
+                            .verifyComplete()
+                }
+            }
+        }
+    }
+})
+
+
+val ACCEPTED_MESSAGE: (Int) -> String = { "SUCCESS" }
+val FAILED_MESSAGE: () -> String = { "FAILED" }
+val FAILED_WITH_EXCEPTION_MESSAGE: (Throwable) -> String = { "FAILED" }
+
+private fun right(): (Int) -> Either<() -> String, () -> String> =
+        { Either.cond(true, { { "SUCCESS" } }, { fail() }) }
+
+private fun left(): (Int) -> Either<() -> String, () -> String> =
+        { Either.cond(false, { fail() }, { FAILED_MESSAGE }) }
diff --git a/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImplTest.kt b/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImplTest.kt
index e2aec7d..930f020 100644
--- a/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImplTest.kt
+++ b/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImplTest.kt
@@ -40,6 +40,7 @@
 import org.onap.ves.VesEventOuterClass.CommonEventHeader
 import org.onap.ves.VesEventOuterClass.VesEvent
 import reactor.test.test
+import kotlin.test.assertTrue
 
 /**
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
@@ -103,7 +104,7 @@
                             )))
                             .test()
                             .assertNext {
-                                assertThat(it.isValid()).isTrue()
+                                assertTrue(it.validate().isRight())
                                 assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes)
                                 assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT.domainName)
                             }
@@ -122,7 +123,7 @@
                             )))
                             .test()
                             .assertNext {
-                                assertThat(it.isValid()).isTrue()
+                                assertTrue(it.validate().isRight())
                                 assertThat(it.payloadSize).isGreaterThan(maxPayloadSizeBytes)
                                 assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(PERF3GPP.domainName)
                             }
@@ -140,7 +141,7 @@
                             )))
                             .test()
                             .assertNext {
-                                assertThat(it.isValid()).isTrue()
+                                assertTrue(it.validate().isRight())
                                 assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes)
                                 assertThatExceptionOfType(InvalidProtocolBufferException::class.java)
                                         .isThrownBy { extractCommonEventHeader(it.payload) }
@@ -159,7 +160,7 @@
                             )))
                             .test()
                             .assertNext {
-                                assertThat(it.isValid()).isFalse()
+                                assertTrue(it.validate().isLeft())
                                 assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes)
                                 assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(PERF3GPP.domainName)
                                 assertThat(it.versionMajor).isNotEqualTo(WireFrameMessage.SUPPORTED_VERSION_MINOR)
@@ -178,7 +179,7 @@
                             )))
                             .test()
                             .assertNext {
-                                assertThat(it.isValid()).isTrue()
+                                assertTrue(it.validate().isRight())
                                 assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes)
                                 assertThat(extractEventFields(it.payload).size()).isEqualTo(MessageGenerator.FIXED_PAYLOAD_SIZE)
                                 assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT.domainName)