Harmonize logging and add new logs
- corrected docker-compose consul url
Change-Id: I78df868e0dd51008ef39d01553e6a0a3b8273a54
Issue-ID: DCAEGEN2-1003
Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
diff --git a/docker-compose.yml b/docker-compose.yml
index 2590928..4015b08 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -39,7 +39,7 @@
"-cp", "*:", "org.onap.dcae.collectors.veshv.main.MainKt"]
command: ["--listen-port", "6061",
"--health-check-api-port", "6060",
- "--config-url", "http://consul:8500/v1/kv/veshv-config",
+ "--config-url", "http://consul:8500/v1/kv/veshv-config?raw=true",
"--key-store-password", "onaponap",
"--trust-store-password", "onaponap"]
healthcheck:
diff --git a/pom.xml b/pom.xml
index 5a83fc8..621c5b2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -81,7 +81,6 @@
<docker-image.name>${project.groupId}.${project.artifactId}</docker-image.name>
<docker-image.latest>1.1-SNAPSHOT</docker-image.latest>
<docker.http_proxy/>
-
</properties>
<build>
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt
index fb94907..9394075 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt
@@ -19,20 +19,38 @@
*/
package org.onap.dcae.collectors.veshv.impl
+import arrow.core.Either
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.domain.headerRequiredFieldDescriptors
import org.onap.dcae.collectors.veshv.domain.vesEventListenerVersionRegex
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.ves.VesEventOuterClass.CommonEventHeader
+typealias ValidationFailMessage = () -> String
+typealias ValidationSuccessMessage = () -> String
+typealias ValidationResult = Either<ValidationFailMessage, ValidationSuccessMessage>
+
internal object MessageValidator {
- fun isValid(message: VesMessage): Boolean {
- return allMandatoryFieldsArePresent(message.header)
- }
+ fun validateFrameMessage(message: WireFrameMessage): ValidationResult =
+ message.validate().fold({
+ Either.left { "Invalid wire frame header, reason: ${it.message}" }
+ }, {
+ Either.right { "Wire frame header is valid" }
+ })
+
+ fun validateProtobufMessage(message: VesMessage): ValidationResult =
+ if (message.isValid()) {
+ Either.right { "Protocol buffers message is valid" }
+ } else {
+ Either.left { "Unsupported protocol buffers message." }
+ }
+
+ fun VesMessage.isValid() = allMandatoryFieldsArePresent(this.header)
+ .and(vesEventListenerVersionRegex.matches(header.vesEventListenerVersion))
private fun allMandatoryFieldsArePresent(header: CommonEventHeader) =
headerRequiredFieldDescriptors
.all { fieldDescriptor -> header.hasField(fieldDescriptor) }
- .and(vesEventListenerVersionRegex.matches(header.vesEventListenerVersion))
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt
index 1d43588..c670e1d 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt
@@ -20,7 +20,6 @@
package org.onap.dcae.collectors.veshv.impl
import arrow.core.Try
-import arrow.core.Option
import org.onap.dcae.collectors.veshv.domain.ByteData
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.ves.VesEventOuterClass.VesEvent
@@ -31,9 +30,9 @@
*/
internal class VesDecoder {
- fun decode(bytes: ByteData): Option<VesMessage> =
+ fun decode(bytes: ByteData): Try<VesMessage> =
Try {
val decodedHeader = VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader
VesMessage(decodedHeader, bytes)
- }.toOption()
+ }
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
index 2f12e0c..4176de9 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -19,17 +19,20 @@
*/
package org.onap.dcae.collectors.veshv.impl
-import arrow.core.Option
+import arrow.core.Either
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.domain.ByteData
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.logging.filterFailedWithLog
+import org.onap.dcae.collectors.veshv.utils.logging.filterEmptyWithLog
import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
@@ -49,9 +52,9 @@
wireChunkDecoderSupplier(alloc).let { wireDecoder ->
dataStream
.transform { decodeWireFrame(it, wireDecoder) }
- .filter(WireFrameMessage::isValid)
- .transform(::decodePayload)
- .filter(VesMessage::isValid)
+ .transform(::filterInvalidWireFrame)
+ .transform(::decodeProtobufPayload)
+ .transform(::filterInvalidProtobufMessages)
.transform(::routeMessage)
.onErrorResume { logger.handleReactiveStreamError(it) }
.doFinally { releaseBuffersMemory(wireDecoder) }
@@ -63,26 +66,38 @@
.concatMap(decoder::decode)
.doOnNext { metrics.notifyMessageReceived(it.payloadSize) }
- private fun decodePayload(flux: Flux<WireFrameMessage>): Flux<VesMessage> = flux
+ private fun filterInvalidWireFrame(flux: Flux<WireFrameMessage>): Flux<WireFrameMessage> = flux
+ .filterFailedWithLog(MessageValidator::validateFrameMessage)
+
+ private fun decodeProtobufPayload(flux: Flux<WireFrameMessage>): Flux<VesMessage> = flux
.map(WireFrameMessage::payload)
- .map(protobufDecoder::decode)
- .flatMap { omitWhenNone(it) }
+ .flatMap(::decodePayload)
+
+ private fun decodePayload(rawPayload: ByteData): Flux<VesMessage> = protobufDecoder
+ .decode(rawPayload)
+ .filterFailedWithLog(logger,
+ { "Ves event header decoded successfully" },
+ { "Failed to decode ves event header, reason: ${it.message}" })
+
+ private fun filterInvalidProtobufMessages(flux: Flux<VesMessage>): Flux<VesMessage> = flux
+ .filterFailedWithLog(MessageValidator::validateProtobufMessage)
private fun routeMessage(flux: Flux<VesMessage>): Flux<RoutedMessage> = flux
.flatMap(this::findRoute)
.compose(sink::send)
.doOnNext { metrics.notifyMessageSent(it.topic) }
-
- private fun findRoute(msg: VesMessage): Mono<RoutedMessage> = omitWhenNone((router::findDestination)(msg))
-
- private fun <V> omitWhenNone(it: Option<V>): Mono<V> = it.fold(
- {
- logger.info("ommiting the message" + 5)
- Mono.empty() },
- { Mono.just(it) })
+ private fun findRoute(msg: VesMessage) = router
+ .findDestination(msg)
+ .filterEmptyWithLog(logger,
+ { "Found route for message: ${it.topic}, partition: ${it.partition}" },
+ { "Could not find route for message" })
private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) = wireChunkDecoder.release()
+ .also { logger.debug("Released buffer memory after handling message stream") }
+
+ fun <T> Flux<T>.filterFailedWithLog(predicate: (T) -> Either<() -> String, () -> String>) =
+ filterFailedWithLog(logger, predicate)
companion object {
private val logger = Logger(VesHvCollector::class)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
index ec7c60c..cea8a7e 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
@@ -82,8 +82,10 @@
private fun filterDifferentValues(configurationString: String) =
hashOf(configurationString).let {
if (it == lastConfigurationHash.get()) {
+ logger.trace { "No change detected in consul configuration" }
Mono.empty()
} else {
+ logger.info { "Obtained new configuration from consul:\n${configurationString}" }
lastConfigurationHash.set(it)
Mono.just(configurationString)
}
@@ -95,7 +97,6 @@
Json.createReader(StringReader(responseString)).readObject()
private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration {
- logger.info { "Obtained new configuration from consul:\n${configuration}" }
val routing = configuration.getJsonArray("collector.routing")
return CollectorConfiguration(
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
index a0c2241..c4d6c87 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
@@ -25,6 +25,7 @@
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.ves.VesEventOuterClass.CommonEventHeader
import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
import reactor.kafka.sender.KafkaSender
import reactor.kafka.sender.SenderRecord
import reactor.kafka.sender.SenderResult
@@ -40,8 +41,14 @@
override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> {
val records = messages.map(this::vesToKafkaRecord)
val result = sender.send(records)
- .doOnNext(::logException)
- .filter(::isSuccessful)
+ .doOnNext {
+ if (it.isSuccessful()) {
+ Mono.just(it)
+ } else {
+ logger.warn(it.exception()) { "Failed to send message to Kafka" }
+ Mono.empty<SenderResult<RoutedMessage>>()
+ }
+ }
.map { it.correlationMetadata() }
return if (logger.traceEnabled) {
@@ -61,12 +68,6 @@
msg)
}
- private fun logException(senderResult: SenderResult<out Any>) {
- if (senderResult.exception() != null) {
- logger.warn(senderResult.exception()) { "Failed to send message to Kafka" }
- }
- }
-
private fun logSentMessage(sentMsg: RoutedMessage) {
logger.trace {
val msgNum = sentMessages.incrementAndGet()
@@ -74,7 +75,7 @@
}
}
- private fun isSuccessful(senderResult: SenderResult<out Any>) = senderResult.exception() == null
+ private fun SenderResult<out Any>.isSuccessful() = exception() == null
companion object {
val logger = Logger(KafkaSink::class)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
index e535300..0b2997f 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
@@ -57,8 +57,12 @@
sslContextFactory
.createSslContext(serverConfig.securityConfiguration)
.map { sslContext ->
+ logger.info("Collector configured with SSL enabled")
this.secure { b -> b.sslContext(sslContext) }
- }.getOrElse { this }
+ }.getOrElse {
+ logger.info("Collector configured with SSL disabled")
+ this
+ }
private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> =
collectorProvider().fold(
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt
index f5bfcce..1965d78 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt
@@ -20,13 +20,10 @@
package org.onap.dcae.collectors.veshv.model
import org.onap.dcae.collectors.veshv.domain.ByteData
-import org.onap.dcae.collectors.veshv.impl.MessageValidator
import org.onap.ves.VesEventOuterClass.CommonEventHeader
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-data class VesMessage(val header: CommonEventHeader, val rawMessage: ByteData) {
- fun isValid(): Boolean = MessageValidator.isValid(this)
-}
+data class VesMessage(val header: CommonEventHeader, val rawMessage: ByteData)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt
index bab95c5..437614a 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt
@@ -20,12 +20,21 @@
package org.onap.dcae.collectors.veshv.model
import arrow.core.Option
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.ves.VesEventOuterClass.CommonEventHeader
data class Routing(val routes: List<Route>) {
fun routeFor(commonHeader: CommonEventHeader): Option<Route> =
- Option.fromNullable(routes.find { it.applies(commonHeader) })
+ Option.fromNullable(routes.find { it.applies(commonHeader) }).also {
+ if (it.isEmpty()) {
+ logger.debug { "No route is defined for domain: ${commonHeader.domain}" }
+ }
+ }
+
+ companion object {
+ private val logger = Logger(Routing::class)
+ }
}
data class Route(val domain: String, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int) {
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt
index 3090042..60bd767 100644
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt
@@ -19,21 +19,29 @@
*/
package org.onap.dcae.collectors.veshv.impl
+import arrow.core.Either.Companion.left
+import arrow.core.Either.Companion.right
+import com.nhaarman.mockitokotlin2.doReturn
+import com.nhaarman.mockitokotlin2.mock
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import org.onap.dcae.collectors.veshv.domain.ByteData
+import org.onap.dcae.collectors.veshv.domain.InvalidMajorVersion
import org.onap.dcae.collectors.veshv.domain.VesEventDomain
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
import org.onap.dcae.collectors.veshv.tests.utils.vesEventBytes
import org.onap.ves.VesEventOuterClass.CommonEventHeader.*
+import kotlin.test.assertTrue
internal object MessageValidatorTest : Spek({
- given("Message validator") {
+ describe("Message validator") {
val cut = MessageValidator
on("ves hv message including header with fully initialized fields") {
@@ -41,29 +49,35 @@
it("should accept message with fully initialized message header") {
val vesMessage = VesMessage(commonHeader, vesEventBytes(commonHeader))
- assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isTrue()
+ with(cut) {
+ assertThat(vesMessage.isValid()).describedAs("message validation result").isTrue()
+ }
}
- VesEventDomain.values()
- .forEach { domain ->
- it("should accept message with $domain domain") {
- val header = commonHeader(domain)
- val vesMessage = VesMessage(header, vesEventBytes(header))
- assertThat(cut.isValid(vesMessage))
- .isTrue()
- }
+ VesEventDomain.values().forEach { domain ->
+ it("should accept message with $domain domain") {
+ val header = commonHeader(domain)
+ val vesMessage = VesMessage(header, vesEventBytes(header))
+ with(cut) {
+ assertThat(vesMessage.isValid()).describedAs("message validation result").isTrue()
}
+ }
+ }
}
on("ves hv message bytes") {
val vesMessage = VesMessage(getDefaultInstance(), ByteData.EMPTY)
it("should not accept message with default header") {
- assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isFalse()
+ with(cut) {
+ assertThat(vesMessage.isValid()).describedAs("message validation result").isFalse()
+ }
}
}
val priorityTestCases = mapOf(
Priority.PRIORITY_NOT_PROVIDED to false,
+ Priority.LOW to true,
+ Priority.MEDIUM to true,
Priority.HIGH to true
)
@@ -73,8 +87,10 @@
val vesMessage = VesMessage(commonEventHeader, vesEventBytes(commonEventHeader))
it("should resolve validation result") {
- assertThat(cut.isValid(vesMessage)).describedAs("message validation results")
- .isEqualTo(expectedResult)
+ with(cut) {
+ assertThat(vesMessage.isValid()).describedAs("message validation results")
+ .isEqualTo(expectedResult)
+ }
}
}
}
@@ -90,7 +106,9 @@
it("should not accept not fully initialized message header") {
val vesMessage = VesMessage(commonHeader, rawMessageBytes)
- assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isFalse()
+ with(cut) {
+ assertThat(vesMessage.isValid()).describedAs("message validation result").isFalse()
+ }
}
}
@@ -101,7 +119,9 @@
it("should not accept message header") {
val vesMessage = VesMessage(commonHeader, rawMessageBytes)
- assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isFalse()
+ with(cut) {
+ assertThat(vesMessage.isValid()).describedAs("message validation result").isFalse()
+ }
}
}
@@ -111,7 +131,10 @@
it("should not accept message header") {
val vesMessage = VesMessage(commonHeader, rawMessageBytes)
- assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isFalse()
+
+ with(cut) {
+ assertThat(vesMessage.isValid()).describedAs("message validation result").isFalse()
+ }
}
}
@@ -121,8 +144,60 @@
it("should not accept message header") {
val vesMessage = VesMessage(commonHeader, rawMessageBytes)
- assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isFalse()
+
+ with(cut) {
+ assertThat(vesMessage.isValid()).describedAs("message validation result").isFalse()
+ }
}
}
+
+ describe("validating messages and converting to Either of string for validation result") {
+ given("WireFrameMessage") {
+ on("valid message as input") {
+ val wireFrameMessage = WireFrameMessage("lets pretend it's valid".toByteArray())
+ val mockedWireFrameMessage = mock<WireFrameMessage> {
+ on { validate() } doReturn right(wireFrameMessage)
+ }
+
+ it("should be right") {
+ assertTrue(cut.validateFrameMessage(mockedWireFrameMessage).isRight())
+ }
+ }
+
+ on("invalid message as input") {
+ val mockedWireFrameMessage = mock<WireFrameMessage> {
+ on { validate() } doReturn left(InvalidMajorVersion(99))
+ }
+
+ it("should be left") {
+ assertTrue(cut.validateFrameMessage(mockedWireFrameMessage).isLeft())
+ }
+ }
+ }
+
+ given("VesEvent") {
+ with(cut) {
+ on("valid message as input") {
+ val commonHeader = commonHeader()
+ val rawMessageBytes = vesEventBytes(commonHeader)
+ val vesMessage = VesMessage(commonHeader, rawMessageBytes)
+
+ it("should be right") {
+ assertTrue(validateProtobufMessage(vesMessage).isRight())
+ }
+ }
+ }
+ on("invalid message as input") {
+ val commonHeader = newBuilder().build()
+ val rawMessageBytes = vesEventBytes(commonHeader)
+ val vesMessage = VesMessage(commonHeader, rawMessageBytes)
+
+ it("should be left") {
+ assertTrue(cut.validateProtobufMessage(vesMessage).isLeft())
+ }
+ }
+ }
+
+ }
}
})
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt
index 8950a55..cdee92c 100644
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt
@@ -20,6 +20,8 @@
package org.onap.dcae.collectors.veshv.impl
import arrow.core.Option
+import arrow.core.Try
+import arrow.core.success
import com.google.protobuf.ByteString
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.given
@@ -30,6 +32,7 @@
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
import org.onap.dcae.collectors.veshv.tests.utils.vesEventBytes
+import reactor.test.test
import java.nio.charset.Charset
import kotlin.test.assertTrue
import kotlin.test.fail
@@ -68,7 +71,7 @@
}
})
-private fun <A> assertFailedWithError(option: Option<A>) =
- option.exists {
+private fun <A> assertFailedWithError(t: Try<A>) =
+ t.exists {
fail("Error expected")
- }
+ }
\ No newline at end of file
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/resources/logback.xml b/sources/hv-collector-dcae-app-simulator/src/main/resources/logback.xml
index 48da3b1..09ac357 100644
--- a/sources/hv-collector-dcae-app-simulator/src/main/resources/logback.xml
+++ b/sources/hv-collector-dcae-app-simulator/src/main/resources/logback.xml
@@ -27,7 +27,7 @@
</appender>
<logger name="org.onap.dcae.collectors.veshv" level="INFO"/>
- <!--<logger name="reactor.ipc.netty" level="DEBUG"/>-->
+ <!--<logger name="reactor.netty" level="DEBUG"/>-->
<root level="INFO">
<appender-ref ref="CONSOLE"/>
diff --git a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/PayloadContentType.kt b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/PayloadContentType.kt
index 7cbf353..e4a1dd8 100644
--- a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/PayloadContentType.kt
+++ b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/PayloadContentType.kt
@@ -27,7 +27,7 @@
GOOGLE_PROTOCOL_BUFFER(0x0001);
companion object {
- private val hexValues = PayloadContentType.values().map { it.hexValue }
+ val hexValues = PayloadContentType.values().map { it.hexValue }
fun isValidHexValue(hex: Int) = hexValues.contains(hex)
}
diff --git a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt
index 0d55ceb..4d60d62 100644
--- a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt
+++ b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt
@@ -46,3 +46,19 @@
object MissingWireFrameHeaderBytes : MissingWireFrameBytes("readable bytes < header size")
object MissingWireFramePayloadBytes : MissingWireFrameBytes("readable bytes < payload size")
object EmptyWireFrame : MissingWireFrameBytes("empty wire frame")
+
+// WireFrameMessage validation exceptions
+
+sealed class WireFrameMessageValidationError(val message: String)
+
+class InvalidMajorVersion(actualVersion: Short) : WireFrameMessageValidationError(
+ "Invalid major version in wire frame header. " +
+ "Expected ${WireFrameMessage.SUPPORTED_VERSION_MAJOR} but was $actualVersion")
+
+class UnsupportedPayloadContentType(actualType: Int) : WireFrameMessageValidationError(
+ "Invalid content type in wire frame header. " +
+ "Expected one of ${PayloadContentType.hexValues}, but was $actualType")
+
+class NotMatchingPayloadSize(definedInHeader: Int, actual: Int) : WireFrameMessageValidationError(
+ "Payload size does not match one defined in wire frame header.\n" +
+ "Defined in header: $definedInHeader, but was: $actual")
diff --git a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt
index de37b14..1257c6b 100644
--- a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt
+++ b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt
@@ -19,6 +19,11 @@
*/
package org.onap.dcae.collectors.veshv.domain
+import arrow.core.Either
+import arrow.core.Either.Companion.left
+import arrow.core.Either.Companion.right
+
+
/**
* Wire frame structure is presented bellow using ASN.1 notation. Please note that official supported specification
* should be available on
@@ -62,10 +67,13 @@
PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
payload.size)
- fun isValid(): Boolean =
- versionMajor == SUPPORTED_VERSION_MAJOR
- && PayloadContentType.isValidHexValue(payloadType)
- && payload.size() == payloadSize
+ fun validate(): Either<WireFrameMessageValidationError, WireFrameMessage> =
+ when {
+ versionMajor != SUPPORTED_VERSION_MAJOR -> left(InvalidMajorVersion(versionMajor))
+ !PayloadContentType.isValidHexValue(payloadType) -> left(UnsupportedPayloadContentType(payloadType))
+ payload.size() != payloadSize -> left(NotMatchingPayloadSize(payload.size(), payloadSize))
+ else -> right(this)
+ }
companion object {
const val MARKER_BYTE: Short = 0xAA
diff --git a/sources/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt b/sources/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt
index f17a79b..f8fbc0a 100644
--- a/sources/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt
+++ b/sources/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt
@@ -60,7 +60,7 @@
payloadSize = 0)
it("should fail validation") {
- assertThat(input.isValid()).isFalse()
+ input.validate().assertFailedWithError { it.isInstanceOf(InvalidMajorVersion::class.java) }
}
}
@@ -73,7 +73,7 @@
payloadSize = 0)
it("should pass validation") {
- assertThat(input.isValid()).isTrue()
+ assertTrue(input.validate().isRight())
}
}
@@ -86,7 +86,7 @@
payloadSize = 0)
it("should fail validation") {
- assertThat(input.isValid()).isFalse()
+ input.validate().assertFailedWithError { it.isInstanceOf(UnsupportedPayloadContentType::class.java) }
}
}
@@ -99,7 +99,7 @@
payloadSize = 1)
it("should fail validation") {
- assertThat(input.isValid()).isFalse()
+ input.validate().assertFailedWithError { it.isInstanceOf(NotMatchingPayloadSize::class.java) }
}
}
@@ -112,7 +112,7 @@
payloadSize = 8)
it("should fail validation") {
- assertThat(input.isValid()).isFalse()
+ input.validate().assertFailedWithError { it.isInstanceOf(NotMatchingPayloadSize::class.java) }
}
}
@@ -126,7 +126,7 @@
payloadSize = payload.size)
it("should pass validation") {
- assertThat(input.isValid()).isTrue()
+ assertTrue(input.validate().isRight())
}
}
@@ -214,7 +214,7 @@
.writeByte(0xAB)
val decoded = decoder.decodeFirst(buff).getMessageOrFail()
- assertThat(decoded.isValid()).describedAs("should be valid").isTrue()
+ assertTrue(decoded.validate().isRight(), "should be valid")
assertThat(buff.readableBytes()).isEqualTo(1)
}
}
diff --git a/sources/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt b/sources/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt
index f12d9ac..ac28200 100644
--- a/sources/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt
+++ b/sources/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt
@@ -25,6 +25,7 @@
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
import org.onap.dcae.collectors.veshv.utils.ServerHandle
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.netty.http.server.HttpServer
@@ -55,9 +56,15 @@
private fun readinessHandler(_req: HttpServerRequest, resp: HttpServerResponse) =
healthDescription.get().run {
+ logger.debug { "HV-VES status: $status, $message" }
resp.status(status.httpResponseStatus.number).sendString(Flux.just(status.toString(), "\n", message))
}
private fun livenessHandler(_req: HttpServerRequest, resp: HttpServerResponse) =
resp.status(HttpResponseStatus.NOT_IMPLEMENTED).sendString(Mono.just("Not implemented yet"))
+
+ companion object {
+ private val logger = Logger(HealthCheckApiServer::class)
+ }
+
}
diff --git a/sources/hv-collector-main/src/main/resources/logback.xml b/sources/hv-collector-main/src/main/resources/logback.xml
index c76ff21..bee0dae 100644
--- a/sources/hv-collector-main/src/main/resources/logback.xml
+++ b/sources/hv-collector-main/src/main/resources/logback.xml
@@ -36,11 +36,14 @@
</rollingPolicy>
</appender>
- <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/>
- <logger name="org.onap.dcae.collectors.veshv.impl.wire" level="DEBUG"/>
- <logger name="org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSink" level="DEBUG"/>
- <logger name="org.onap.dcae.collectors.veshv.impl.adapters.LoggingSinkProvider" level="DEBUG"/>
- <!--<logger name="reactor.ipc.netty" level="DEBUG"/>-->
+ <logger name="org.onap.dcae.collectors.veshv" level="TRACE"/>
+ <logger name="org.onap.dcae.collectors.veshv.impl.wire" level="TRACE"/>
+ <logger name="org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSink" level="TRACE"/>
+ <logger name="org.onap.dcae.collectors.veshv.impl.adapters.LoggingSinkProvider" level="TRACE"/>
+ <logger name="reactor.netty" level="WARN"/>
+ <logger name="io.netty" level="DEBUG"/>
+ <logger name="io.netty.util" level="WARN"/>
+ <logger name="org.apache.kafka" level="WARN"/>
<root level="INFO">
<appender-ref ref="CONSOLE"/>
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt
index 714702d..e8ec254 100644
--- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/reactive_logging.kt
@@ -19,10 +19,48 @@
*/
package org.onap.dcae.collectors.veshv.utils.logging
+import arrow.core.Either
+import arrow.core.Option
+import arrow.core.Try
import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
fun <T> Logger.handleReactiveStreamError(ex: Throwable, returnFlux: Flux<T> = Flux.empty()): Flux<T> {
- logger.warn("Error while handling message stream: ${ex::class.qualifiedName} (${ex.message})")
+ logger.warn("Error while handling message stream: ${ex::class.qualifiedName} (${ex.localizedMessage})")
logger.debug("Detailed stack trace", ex)
return returnFlux
}
+
+
+fun <T> Try<T>.filterFailedWithLog(logger: Logger,
+ acceptedMsg: (T) -> String,
+ rejectedMsg: (Throwable) -> String): Flux<T> =
+ fold({
+ logger.warn(rejectedMsg(it))
+ Flux.empty<T>()
+ }, {
+ logger.trace { acceptedMsg(it) }
+ Flux.just(it)
+ })
+
+fun <T> Option<T>.filterEmptyWithLog(logger: Logger,
+ acceptedMsg: (T) -> String,
+ rejectedMsg: () -> String): Flux<T> =
+ fold({
+ logger.warn(rejectedMsg)
+ Flux.empty<T>()
+ }, {
+ logger.trace { acceptedMsg(it) }
+ Flux.just(it)
+ })
+
+fun <T> Flux<T>.filterFailedWithLog(logger: Logger, predicate: (T) -> Either<() -> String, () -> String>) =
+ flatMap { t ->
+ predicate(t).fold({
+ logger.warn(it)
+ Mono.empty<T>()
+ }, {
+ logger.trace(it)
+ Mono.just<T>(t)
+ })
+ }
diff --git a/sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/ReactiveLoggingTest.kt b/sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/ReactiveLoggingTest.kt
new file mode 100644
index 0000000..0f359df
--- /dev/null
+++ b/sources/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/ReactiveLoggingTest.kt
@@ -0,0 +1,120 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.utils.logging
+
+import arrow.core.Either
+import arrow.core.Failure
+import arrow.core.Option
+import arrow.core.Try
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import reactor.core.publisher.Flux
+import reactor.test.test
+import kotlin.test.fail
+
+class ReactiveLoggingTest : Spek({
+
+ describe("filtering with log message") {
+ val logger = Logger("React")
+ val event = 5
+
+ describe("Try") {
+ given("successful Try") {
+ val cut = Try.just(event)
+
+ it("should not filter stream event and log accepted message") {
+ cut.filterFailedWithLog(logger, ACCEPTED_MESSAGE, FAILED_WITH_EXCEPTION_MESSAGE)
+ .test()
+ .expectNext(event)
+ .verifyComplete()
+ }
+ }
+
+ given("failed Try") {
+ val e = Exception()
+ val cut = Failure(e)
+ it("should filter stream event and log rejected message") {
+ cut.filterFailedWithLog(logger, ACCEPTED_MESSAGE, FAILED_WITH_EXCEPTION_MESSAGE)
+ .test()
+ .verifyComplete()
+ }
+ }
+ }
+
+ describe("Option") {
+ given("Option with content") {
+ val cut = Option.just(event)
+
+ it("should not filter stream event and log accepted message") {
+ cut.filterEmptyWithLog(logger, ACCEPTED_MESSAGE, FAILED_MESSAGE)
+ .test()
+ .expectNext(event)
+ .verifyComplete()
+ }
+ }
+
+ given("empty Option") {
+ val cut = Option.empty<Int>()
+ it("should filter stream event and log rejected message") {
+ cut.filterEmptyWithLog(logger, ACCEPTED_MESSAGE, FAILED_MESSAGE)
+ .test()
+ .verifyComplete()
+ }
+ }
+ }
+
+
+ describe("Either") {
+ given("successful Either (right)") {
+ val cut = Flux.just(event)
+
+ it("should not filter stream event and log accepted message") {
+ cut.filterFailedWithLog(logger, right())
+ .test()
+ .expectNext(event)
+ .verifyComplete()
+ }
+ }
+
+ given("failed Either (left)") {
+ val cut = Flux.just(event)
+
+ it("should filter stream event and log rejected message") {
+ cut.filterFailedWithLog(logger, left())
+ .test()
+ .verifyComplete()
+ }
+ }
+ }
+ }
+})
+
+
+val ACCEPTED_MESSAGE: (Int) -> String = { "SUCCESS" }
+val FAILED_MESSAGE: () -> String = { "FAILED" }
+val FAILED_WITH_EXCEPTION_MESSAGE: (Throwable) -> String = { "FAILED" }
+
+private fun right(): (Int) -> Either<() -> String, () -> String> =
+ { Either.cond(true, { { "SUCCESS" } }, { fail() }) }
+
+private fun left(): (Int) -> Either<() -> String, () -> String> =
+ { Either.cond(false, { fail() }, { FAILED_MESSAGE }) }
diff --git a/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImplTest.kt b/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImplTest.kt
index e2aec7d..930f020 100644
--- a/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImplTest.kt
+++ b/sources/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImplTest.kt
@@ -40,6 +40,7 @@
import org.onap.ves.VesEventOuterClass.CommonEventHeader
import org.onap.ves.VesEventOuterClass.VesEvent
import reactor.test.test
+import kotlin.test.assertTrue
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
@@ -103,7 +104,7 @@
)))
.test()
.assertNext {
- assertThat(it.isValid()).isTrue()
+ assertTrue(it.validate().isRight())
assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes)
assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT.domainName)
}
@@ -122,7 +123,7 @@
)))
.test()
.assertNext {
- assertThat(it.isValid()).isTrue()
+ assertTrue(it.validate().isRight())
assertThat(it.payloadSize).isGreaterThan(maxPayloadSizeBytes)
assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(PERF3GPP.domainName)
}
@@ -140,7 +141,7 @@
)))
.test()
.assertNext {
- assertThat(it.isValid()).isTrue()
+ assertTrue(it.validate().isRight())
assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes)
assertThatExceptionOfType(InvalidProtocolBufferException::class.java)
.isThrownBy { extractCommonEventHeader(it.payload) }
@@ -159,7 +160,7 @@
)))
.test()
.assertNext {
- assertThat(it.isValid()).isFalse()
+ assertTrue(it.validate().isLeft())
assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes)
assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(PERF3GPP.domainName)
assertThat(it.versionMajor).isNotEqualTo(WireFrameMessage.SUPPORTED_VERSION_MINOR)
@@ -178,7 +179,7 @@
)))
.test()
.assertNext {
- assertThat(it.isValid()).isTrue()
+ assertTrue(it.validate().isRight())
assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes)
assertThat(extractEventFields(it.payload).size()).isEqualTo(MessageGenerator.FIXED_PAYLOAD_SIZE)
assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT.domainName)