Improve DCAE APP Simulator coverage

Also there was a need to refactor the code, because application logic
was placed inside Ratpack handlers.

Change-Id: Iba3d4d039a98ba88e0dba580c1b7726b53440538
Issue-ID: DCAEGEN2-732
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
diff --git a/hv-collector-dcae-app-simulator/pom.xml b/hv-collector-dcae-app-simulator/pom.xml
index 47f71ba..ce4a271 100644
--- a/hv-collector-dcae-app-simulator/pom.xml
+++ b/hv-collector-dcae-app-simulator/pom.xml
@@ -19,8 +19,8 @@
   ~ ============LICENSE_END=========================================================
   -->
 <project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
 
     <licenses>
@@ -105,6 +105,14 @@
             <artifactId>arrow-effects</artifactId>
         </dependency>
         <dependency>
+            <groupId>io.arrow-kt</groupId>
+            <artifactId>arrow-effects-reactor</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.arrow-kt</groupId>
+            <artifactId>arrow-syntax</artifactId>
+        </dependency>
+        <dependency>
             <groupId>io.ratpack</groupId>
             <artifactId>ratpack-core</artifactId>
         </dependency>
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt
new file mode 100644
index 0000000..262e05b
--- /dev/null
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt
@@ -0,0 +1,74 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
+
+import arrow.core.getOrElse
+import arrow.effects.IO
+import arrow.effects.fix
+import arrow.effects.monadError
+import arrow.typeclasses.bindingCatch
+import org.onap.dcae.collectors.veshv.utils.arrow.getOption
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import java.io.InputStream
+import java.util.concurrent.atomic.AtomicReference
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since August 2018
+ */
+class DcaeAppSimulator(private val consumerFactory: ConsumerFactory,
+                       private val messageStreamValidation: MessageStreamValidation = MessageStreamValidation()) {
+    private val consumerState: AtomicReference<ConsumerStateProvider> = AtomicReference()
+
+    fun listenToTopics(topicsString: String) = listenToTopics(extractTopics(topicsString))
+
+    fun listenToTopics(topics: Set<String>): IO<Unit> = IO.monadError().bindingCatch {
+        if (topics.any { it.isBlank() })
+            throw IllegalArgumentException("Topic list cannot contain empty elements")
+        if (topics.isEmpty())
+            throw IllegalArgumentException("Topic list cannot be empty")
+
+        logger.info("Received new configuration. Creating consumer for topics: $topics")
+        consumerState.set(consumerFactory.createConsumerForTopics(topics).bind())
+    }.fix()
+
+    fun state() = consumerState.getOption().map { it.currentState() }
+
+    fun resetState(): IO<Unit> = consumerState.getOption().fold(
+            { IO.unit },
+            { it.reset() }
+    )
+
+    fun validate(jsonDescription: InputStream) = messageStreamValidation.validate(jsonDescription, currentMessages())
+
+    private fun currentMessages(): List<ByteArray> =
+            consumerState.getOption()
+                    .map { it.currentState().consumedMessages }
+                    .getOrElse(::emptyList)
+
+    private fun extractTopics(topicsString: String): Set<String> =
+            topicsString.substringAfter("=")
+                    .split(",")
+                    .toSet()
+
+    companion object {
+        private val logger = Logger(DcaeAppSimulator::class)
+    }
+}
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt
new file mode 100644
index 0000000..239f710
--- /dev/null
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt
@@ -0,0 +1,85 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
+
+import arrow.effects.IO
+import arrow.effects.fix
+import arrow.effects.monadError
+import arrow.typeclasses.bindingCatch
+import org.onap.dcae.collectors.veshv.domain.ByteData
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
+import org.onap.dcae.collectors.veshv.utils.arrow.asIo
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
+import org.onap.ves.VesEventV5
+import java.io.InputStream
+import javax.json.Json
+
+class MessageStreamValidation(
+        private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE,
+        private val messageGenerator: MessageGenerator = MessageGenerator.INSTANCE) {
+
+    fun validate(jsonDescription: InputStream, consumedMessages: List<ByteArray>): IO<Boolean> =
+            IO.monadError().bindingCatch {
+                val messageParams = parseMessageParams(jsonDescription)
+                val expectedEvents = generateEvents(messageParams).bind()
+                val actualEvents = decodeConsumedEvents(consumedMessages)
+                if (shouldValidatePayloads(messageParams)) {
+                    expectedEvents == actualEvents
+                } else {
+                    validateHeaders(actualEvents, expectedEvents)
+                }
+            }.fix()
+
+    private fun parseMessageParams(input: InputStream): List<MessageParameters> {
+        val expectations = Json.createReader(input).readArray()
+        val messageParams = messageParametersParser.parse(expectations)
+
+        if (messageParams.isEmpty())
+            throw IllegalArgumentException("Message param list cannot be empty")
+
+        return messageParams
+    }
+
+    private fun shouldValidatePayloads(parameters: List<MessageParameters>) =
+            parameters.all { it.messageType == MessageType.FIXED_PAYLOAD }
+
+
+    private fun validateHeaders(actual: List<VesEventV5.VesEvent>, expected: List<VesEventV5.VesEvent>): Boolean {
+        val consumedHeaders = actual.map { it.commonEventHeader }
+        val generatedHeaders = expected.map { it.commonEventHeader }
+        return generatedHeaders == consumedHeaders
+    }
+
+
+    private fun generateEvents(parameters: List<MessageParameters>): IO<List<VesEventV5.VesEvent>> =
+            messageGenerator.createMessageFlux(parameters)
+                    .map(PayloadWireFrameMessage::payload)
+                    .map(ByteData::unsafeAsArray)
+                    .map(VesEventV5.VesEvent::parseFrom)
+                    .collectList()
+                    .asIo()
+
+    private fun decodeConsumedEvents(consumedMessages: List<ByteArray>) =
+            consumedMessages.map(VesEventV5.VesEvent::parseFrom)
+
+}
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/ApiServer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/ApiServer.kt
new file mode 100644
index 0000000..6c830b9
--- /dev/null
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/ApiServer.kt
@@ -0,0 +1,129 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters
+
+import arrow.core.Left
+import arrow.core.Right
+import arrow.effects.IO
+import arrow.effects.fix
+import arrow.effects.monad
+import arrow.typeclasses.binding
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import ratpack.exec.Promise
+import ratpack.handling.Chain
+import ratpack.handling.Context
+import ratpack.http.Response
+import ratpack.server.RatpackServer
+import ratpack.server.ServerConfig
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+class ApiServer(private val simulator: DcaeAppSimulator) {
+
+
+    fun start(port: Int, kafkaTopics: Set<String>): IO<RatpackServer> =
+            simulator.listenToTopics(kafkaTopics).map {
+                RatpackServer.start { server ->
+                    server.serverConfig(ServerConfig.embedded().port(port))
+                            .handlers(::setupHandlers)
+                }
+            }
+
+    private fun setupHandlers(chain: Chain) {
+        chain
+                .put("configuration/topics") { ctx ->
+                    val operation = ctx.bodyIo().flatMap { body ->
+                        simulator.listenToTopics(body.text)
+                    }
+                    ctx.response.sendOrError(operation)
+
+                }
+                .delete("messages") { ctx ->
+                    ctx.response.contentType(CONTENT_TEXT)
+                    ctx.response.sendOrError(simulator.resetState())
+                }
+                .get("messages/all/count") { ctx ->
+                    simulator.state().fold(
+                            { ctx.response.status(STATUS_NOT_FOUND) },
+                            {
+                                ctx.response
+                                        .contentType(CONTENT_TEXT)
+                                        .send(it.messagesCount.toString())
+                            })
+                }
+                .post("messages/all/validate") { ctx ->
+                    val responseStatus = IO.monad().binding {
+                        val body = ctx.bodyIo().bind()
+                        val isValid = simulator.validate(body.inputStream).bind()
+                        if (isValid)
+                            STATUS_OK
+                        else
+                            STATUS_BAD_REQUEST
+                    }.fix()
+
+                    ctx.response.sendStatusOrError(responseStatus)
+                }
+                .get("healthcheck") { ctx ->
+                    ctx.response.status(STATUS_OK).send()
+                }
+    }
+
+    private fun Context.bodyIo() = request.body.asIo()
+
+    private fun <T> Promise<T>.asIo(): IO<T> = IO.async { emitResult ->
+        onError {
+            emitResult(Left(it))
+        }.then { result ->
+            emitResult(Right(result))
+        }
+    }
+
+    private fun Response.sendOrError(responseStatus: IO<Unit>) {
+        sendStatusOrError(responseStatus.map { STATUS_OK })
+    }
+
+    private fun Response.sendStatusOrError(responseStatus: IO<Int>) {
+        responseStatus.unsafeRunAsync { cb ->
+            cb.fold(
+                    { err ->
+                        logger.warn("Error occurred. Sending HTTP$STATUS_INTERNAL_SERVER_ERROR.", err)
+                        status(ApiServer.STATUS_INTERNAL_SERVER_ERROR)
+                                .send(CONTENT_TEXT, err.message)
+                    },
+                    {
+                        status(it).send()
+                    }
+            )
+        }
+    }
+
+    companion object {
+        private val logger = Logger(ApiServer::class)
+        private const val CONTENT_TEXT = "text/plain"
+
+        private const val STATUS_OK = 200
+        private const val STATUS_BAD_REQUEST = 400
+        private const val STATUS_NOT_FOUND = 404
+        private const val STATUS_INTERNAL_SERVER_ERROR = 500
+    }
+}
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt
similarity index 62%
rename from hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt
rename to hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt
index d53609c..1596517 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt
@@ -17,15 +17,16 @@
  * limitations under the License.
  * ============LICENSE_END=========================================================
  */
-package org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters
 
 import arrow.effects.IO
 import org.apache.kafka.clients.consumer.ConsumerConfig
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.Consumer
+import org.onap.dcae.collectors.veshv.utils.arrow.evaluateIo
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import reactor.kafka.receiver.KafkaReceiver
 import reactor.kafka.receiver.ReceiverOptions
-import java.util.*
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -35,7 +36,7 @@
 
     fun start(): IO<Consumer> = IO {
         val consumer = Consumer()
-        receiver.receive().subscribe(consumer::update)
+        receiver.receive().map(consumer::update).evaluateIo().subscribe()
         consumer
     }
 
@@ -43,18 +44,22 @@
         private val logger = Logger(KafkaSource::class)
 
         fun create(bootstrapServers: String, topics: Set<String>): KafkaSource {
-            val props = HashMap<String, Any>()
-            props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
-            props[ConsumerConfig.CLIENT_ID_CONFIG] = "hv-collector-dcae-app-simulator"
-            props[ConsumerConfig.GROUP_ID_CONFIG] = "hv-collector-simulators"
-            props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java
-            props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java
-            props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
-            val receiverOptions = ReceiverOptions.create<ByteArray, ByteArray>(props)
+            return KafkaSource(KafkaReceiver.create(createReceiverOptions(bootstrapServers, topics)))
+        }
+
+        fun createReceiverOptions(bootstrapServers: String, topics: Set<String>): ReceiverOptions<ByteArray, ByteArray>? {
+            val props = mapOf<String, Any>(
+                    ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers,
+                    ConsumerConfig.CLIENT_ID_CONFIG to "hv-collector-dcae-app-simulator",
+                    ConsumerConfig.GROUP_ID_CONFIG to "hv-collector-simulators",
+                    ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java,
+                    ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java,
+                    ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest"
+            )
+            return ReceiverOptions.create<ByteArray, ByteArray>(props)
                     .addAssignListener { partitions -> logger.debug { "Partitions assigned $partitions" } }
                     .addRevokeListener { partitions -> logger.debug { "Partitions revoked $partitions" } }
                     .subscription(topics)
-            return KafkaSource(KafkaReceiver.create(receiverOptions))
         }
     }
 }
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgDcaeAppSimConfiguration.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt
similarity index 97%
rename from hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgDcaeAppSimConfiguration.kt
rename to hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt
index 065cdf9..d5f5560 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgDcaeAppSimConfiguration.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt
@@ -17,7 +17,7 @@
 * limitations under the License.
 * ============LICENSE_END=========================================================
 */
-package org.onap.dcae.collectors.veshv.simulators.dcaeapp.config
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config
 
 import arrow.core.ForOption
 import arrow.core.Option
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/DcaeAppSimConfiguration.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt
similarity index 93%
rename from hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/DcaeAppSimConfiguration.kt
rename to hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt
index 5bd2d15..c114313 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/DcaeAppSimConfiguration.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt
@@ -17,7 +17,7 @@
  * limitations under the License.
  * ============LICENSE_END=========================================================
  */
-package org.onap.dcae.collectors.veshv.simulators.dcaeapp.config
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config
 
 data class DcaeAppSimConfiguration(
         val apiPort: Int,
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt
similarity index 84%
rename from hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt
rename to hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt
index 08bb149..1eefdbd 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/consumer.kt
@@ -17,9 +17,10 @@
  * limitations under the License.
  * ============LICENSE_END=========================================================
  */
-package org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
 
 import arrow.effects.IO
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.KafkaSource
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import reactor.kafka.receiver.ReceiverRecord
 import java.util.concurrent.ConcurrentLinkedQueue
@@ -28,7 +29,7 @@
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since June 2018
  */
-class ConsumerState(private val messages: ConcurrentLinkedQueue<ByteArray>){
+class ConsumerState(private val messages: ConcurrentLinkedQueue<ByteArray>) {
     val messagesCount: Int by lazy {
         messages.size
     }
@@ -53,19 +54,17 @@
         consumedMessages.clear()
     }
 
-    fun update(record: ReceiverRecord<ByteArray, ByteArray>) {
+    fun update(record: ReceiverRecord<ByteArray, ByteArray>) = IO<Unit> {
         logger.trace { "Updating stats for message from ${record.topic()}:${record.partition()}" }
         consumedMessages.add(record.value())
     }
 
-
     companion object {
         private val logger = Logger(Consumer::class)
     }
 }
 
 class ConsumerFactory(private val kafkaBootstrapServers: String) {
-    fun createConsumerForTopics(kafkaTopics: Set<String>): ConsumerStateProvider {
-        return KafkaSource.create(kafkaBootstrapServers, kafkaTopics.toSet()).start().unsafeRunSync()
-    }
+    fun createConsumerForTopics(kafkaTopics: Set<String>): IO<Consumer> =
+            KafkaSource.create(kafkaBootstrapServers, kafkaTopics.toSet()).start()
 }
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
index 9f84fc4..a65a268 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
@@ -20,10 +20,12 @@
 package org.onap.dcae.collectors.veshv.simulators.dcaeapp
 
 import arrow.effects.IO
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.config.ArgDcaeAppSimConfiguration
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.config.DcaeAppSimConfiguration
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerFactory
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.remote.ApiServer
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.ArgDcaeAppSimConfiguration
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.DcaeAppSimConfiguration
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.ConsumerFactory
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.MessageStreamValidation
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.ApiServer
 import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
 import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
 import org.onap.dcae.collectors.veshv.utils.arrow.void
@@ -50,7 +52,7 @@
 
 
 private fun startApp(config: DcaeAppSimConfiguration): IO<Unit> {
-    return ApiServer(ConsumerFactory(config.kafkaBootstrapServers))
+    return ApiServer(DcaeAppSimulator(ConsumerFactory(config.kafkaBootstrapServers)))
             .start(config.apiPort, config.kafkaTopics)
             .void()
-}
\ No newline at end of file
+}
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt
deleted file mode 100644
index cd25813..0000000
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.simulators.dcaeapp.remote
-
-import arrow.effects.IO
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerFactory
-import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerStateProvider
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.FIXED_PAYLOAD
-import org.onap.ves.VesEventV5.VesEvent
-import ratpack.handling.Chain
-import ratpack.handling.Context
-import ratpack.server.RatpackServer
-import ratpack.server.ServerConfig
-import reactor.core.publisher.Mono
-import javax.json.Json
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since May 2018
- */
-class ApiServer(private val consumerFactory: ConsumerFactory,
-                private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE) {
-
-    private lateinit var consumerState: ConsumerStateProvider
-
-    fun start(port: Int, kafkaTopics: Set<String>): IO<RatpackServer> = IO {
-        consumerState = consumerFactory.createConsumerForTopics(kafkaTopics)
-        RatpackServer.start { server ->
-            server.serverConfig(ServerConfig.embedded().port(port))
-                    .handlers(this::setupHandlers)
-        }
-    }
-
-    private fun setupHandlers(chain: Chain) {
-        chain
-                .put("configuration/topics") { ctx ->
-                    ctx.request.body.then { it ->
-                        val topics = extractTopics(it.text)
-                        logger.info("Received new configuration. Creating consumer for topics: $topics")
-                        consumerState = consumerFactory.createConsumerForTopics(topics)
-                        ctx.response
-                                .status(STATUS_OK)
-                                .send()
-                    }
-
-                }
-                .delete("messages") { ctx ->
-                    ctx.response.contentType(CONTENT_TEXT)
-                    consumerState.reset()
-                            .unsafeRunAsync {
-                                it.fold(
-                                        { ctx.response.status(STATUS_INTERNAL_SERVER_ERROR) },
-                                        { ctx.response.status(STATUS_OK) }
-                                ).send()
-                            }
-                }
-                .get("messages/all/count") { ctx ->
-                    val state = consumerState.currentState()
-                    ctx.response
-                            .contentType(CONTENT_TEXT)
-                            .send(state.messagesCount.toString())
-                }
-                .post("messages/all/validate") { ctx ->
-                    ctx.request.body
-                            .map { Json.createReader(it.inputStream).readArray() }
-                            .map { messageParametersParser.parse(it) }
-                            .map { generateEvents(ctx, it) }
-                            .then { (generatedEvents, shouldValidatePayloads) ->
-                                generatedEvents
-                                        .doOnSuccess { sendResponse(ctx, it, shouldValidatePayloads) }
-                                        .block()
-                            }
-                }
-                .get("healthcheck") { ctx ->
-                    ctx.response.status(STATUS_OK).send()
-                }
-    }
-
-    private fun generateEvents(ctx: Context, parameters: List<MessageParameters>):
-            Pair<Mono<List<VesEvent>>, Boolean> = Pair(
-
-            doGenerateEvents(parameters).doOnError {
-                logger.error("Error occurred when generating messages: $it")
-                ctx.response
-                        .status(STATUS_INTERNAL_SERVER_ERROR)
-                        .send()
-            },
-            parameters.all { it.messageType == FIXED_PAYLOAD }
-    )
-
-    private fun doGenerateEvents(parameters: List<MessageParameters>): Mono<List<VesEvent>> = MessageGenerator.INSTANCE
-            .createMessageFlux(parameters)
-            .map(PayloadWireFrameMessage::payload)
-            .map { decode(it.unsafeAsArray()) }
-            .collectList()
-
-
-    private fun decode(bytes: ByteArray): VesEvent = VesEvent.parseFrom(bytes)
-
-
-    private fun sendResponse(ctx: Context,
-                             generatedEvents: List<VesEvent>,
-                             shouldValidatePayloads: Boolean) =
-            resolveResponseStatusCode(
-                    generated = generatedEvents,
-                    consumed = decodeConsumedEvents(),
-                    validatePayloads = shouldValidatePayloads
-            ).let { ctx.response.status(it).send() }
-
-
-    private fun decodeConsumedEvents(): List<VesEvent> = consumerState
-            .currentState()
-            .consumedMessages
-            .map(::decode)
-
-
-    private fun resolveResponseStatusCode(generated: List<VesEvent>,
-                                          consumed: List<VesEvent>,
-                                          validatePayloads: Boolean): Int =
-            if (validatePayloads) {
-                if (generated == consumed) STATUS_OK else STATUS_BAD_REQUEST
-            } else {
-                validateHeaders(consumed, generated)
-            }
-
-    private fun validateHeaders(consumed: List<VesEvent>, generated: List<VesEvent>): Int {
-        val consumedHeaders = consumed.map { it.commonEventHeader }
-        val generatedHeaders = generated.map { it.commonEventHeader }
-        return if (generatedHeaders == consumedHeaders) STATUS_OK else STATUS_BAD_REQUEST
-    }
-
-    private fun extractTopics(it: String): Set<String> =
-            it.substringAfter("=")
-                    .split(",")
-                    .toSet()
-
-    companion object {
-        private val logger = Logger(ApiServer::class)
-        private const val CONTENT_TEXT = "text/plain"
-
-        private const val STATUS_OK = 200
-        private const val STATUS_BAD_REQUEST = 400
-        private const val STATUS_INTERNAL_SERVER_ERROR = 500
-    }
-}
-
-
diff --git a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt
new file mode 100644
index 0000000..debe955
--- /dev/null
+++ b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/ConsumerTest.kt
@@ -0,0 +1,83 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
+
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import reactor.kafka.receiver.ReceiverRecord
+
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since August 2018
+ */
+internal class ConsumerTest : Spek({
+
+    lateinit var cut: Consumer
+
+    beforeEachTest {
+        cut = Consumer()
+    }
+
+    describe("Consumer which holds the state of received Kafka records") {
+        it("should contain empty state in the beginning") {
+            assertEmptyState(cut)
+        }
+
+        describe("update") {
+            val value = byteArrayOf(2)
+
+            beforeEachTest {
+                cut.update(receiverRecord(
+                        topic = "topic",
+                        key = byteArrayOf(1),
+                        value = value
+                )).unsafeRunSync()
+            }
+
+            it("should contain one message if it was updated once") {
+                assertState(cut, value)
+            }
+
+            it("should contain empty state message if it was reset after update") {
+                cut.reset().unsafeRunSync()
+                assertEmptyState(cut)
+            }
+        }
+    }
+})
+
+fun assertEmptyState(cut: Consumer) {
+    assertState(cut)
+}
+
+fun assertState(cut: Consumer, vararg values: ByteArray) {
+    assertThat(cut.currentState().consumedMessages)
+            .containsOnly(*values)
+    assertThat(cut.currentState().messagesCount)
+            .isEqualTo(values.size)
+}
+
+fun receiverRecord(topic: String, key: ByteArray, value: ByteArray) =
+        ReceiverRecord(ConsumerRecord(topic, 1, 100L, key, value), null)
diff --git a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt
new file mode 100644
index 0000000..c0ba581
--- /dev/null
+++ b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt
@@ -0,0 +1,184 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
+
+import arrow.core.Left
+import arrow.core.None
+import arrow.core.Some
+import arrow.effects.IO
+import com.google.protobuf.ByteString
+import com.nhaarman.mockito_kotlin.any
+import com.nhaarman.mockito_kotlin.eq
+import com.nhaarman.mockito_kotlin.mock
+import com.nhaarman.mockito_kotlin.never
+import com.nhaarman.mockito_kotlin.verify
+import com.nhaarman.mockito_kotlin.whenever
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+import org.mockito.ArgumentMatchers.anySet
+import org.mockito.Mockito
+import org.onap.ves.VesEventV5.VesEvent
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import java.util.concurrent.ConcurrentLinkedQueue
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since August 2018
+ */
+internal class DcaeAppSimulatorTest : Spek({
+    lateinit var consumerFactory: ConsumerFactory
+    lateinit var messageStreamValidation: MessageStreamValidation
+    lateinit var consumer: Consumer
+    lateinit var cut: DcaeAppSimulator
+
+    beforeEachTest {
+        consumerFactory = mock()
+        messageStreamValidation = mock()
+        consumer = mock()
+        cut = DcaeAppSimulator(consumerFactory, messageStreamValidation)
+
+        whenever(consumerFactory.createConsumerForTopics(anySet())).thenReturn(IO.just(consumer))
+    }
+
+    fun consumerState(vararg messages: ByteArray) = ConsumerState(ConcurrentLinkedQueue(messages.toList()))
+
+    describe("listenToTopics") {
+        val topics = setOf("hvMeas", "faults")
+
+        it("should fail when topic list is empty") {
+            val result = cut.listenToTopics(setOf()).attempt().unsafeRunSync()
+            assertThat(result.isLeft()).isTrue()
+        }
+
+        it("should fail when topic list contains empty strings") {
+            val result = cut.listenToTopics(setOf("hvMeas", " ", "faults")).attempt().unsafeRunSync()
+            assertThat(result.isLeft()).isTrue()
+        }
+
+        it("should subscribe to given topics") {
+            cut.listenToTopics(topics).unsafeRunSync()
+            verify(consumerFactory).createConsumerForTopics(topics)
+        }
+
+        it("should subscribe to given topics when called with comma separated list") {
+            cut.listenToTopics("hvMeas,faults").unsafeRunSync()
+            verify(consumerFactory).createConsumerForTopics(topics)
+        }
+
+        it("should handle errors") {
+            // given
+            val error = RuntimeException("WTF")
+            whenever(consumerFactory.createConsumerForTopics(anySet()))
+                    .thenReturn(IO.raiseError(error))
+
+            // when
+            val result = cut.listenToTopics("hvMeas").attempt().unsafeRunSync()
+
+            // then
+            assertThat(result).isEqualTo(Left(error))
+        }
+    }
+
+    describe("state") {
+
+        it("should return None when topics hasn't been initialized") {
+            assertThat(cut.state()).isEqualTo(None)
+        }
+
+        describe("when topics are initialized") {
+            beforeEachTest {
+                cut.listenToTopics("hvMeas").unsafeRunSync()
+            }
+
+            it("should return some state when it has been set") {
+                val state = consumerState()
+                whenever(consumer.currentState()).thenReturn(state)
+
+                assertThat(cut.state()).isEqualTo(Some(state))
+            }
+        }
+    }
+
+    describe("resetState") {
+        it("should do nothing when topics hasn't been initialized") {
+            cut.resetState().unsafeRunSync()
+            verify(consumer, never()).reset()
+        }
+
+        describe("when topics are initialized") {
+            beforeEachTest {
+                cut.listenToTopics("hvMeas").unsafeRunSync()
+            }
+
+            it("should reset the state") {
+                // given
+                whenever(consumer.reset()).thenReturn(IO.unit)
+
+                // when
+                cut.resetState().unsafeRunSync()
+
+                // then
+                verify(consumer).reset()
+            }
+        }
+    }
+
+    describe("validate") {
+        beforeEachTest {
+            whenever(messageStreamValidation.validate(any(), any())).thenReturn(IO.just(true))
+        }
+
+        it("should use empty list when consumer is unavailable") {
+            // when
+            val result = cut.validate("['The JSON']".byteInputStream()).unsafeRunSync()
+
+            // then
+            verify(messageStreamValidation).validate(any(), eq(emptyList()))
+            assertThat(result).isTrue()
+        }
+
+        it("should delegate to MessageStreamValidation") {
+            // given
+            cut.listenToTopics("hvMeas").unsafeRunSync()
+            whenever(consumer.currentState()).thenReturn(consumerState(vesEvent().toByteArray()))
+
+            // when
+            val result = cut.validate("['The JSON']".byteInputStream()).unsafeRunSync()
+
+            // then
+            verify(messageStreamValidation).validate(any(), any())
+            assertThat(result).isTrue()
+        }
+    }
+})
+
+
+private const val DUMMY_EVENT_ID = "aaa"
+private const val DUMMY_PAYLOAD = "payload"
+
+private fun vesEvent(eventId: String = DUMMY_EVENT_ID, payload: String = DUMMY_PAYLOAD): VesEvent {
+    return VesEvent.newBuilder()
+            .setCommonEventHeader(CommonEventHeader.newBuilder()
+                    .setEventId(eventId))
+            .setHvRanMeasFields(ByteString.copyFrom(payload.toByteArray()))
+            .build()
+}
diff --git a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt
new file mode 100644
index 0000000..0bdd115
--- /dev/null
+++ b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt
@@ -0,0 +1,224 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
+
+import arrow.core.Either
+import arrow.core.Left
+import arrow.core.None
+import arrow.core.Some
+import arrow.effects.IO
+import javax.json.stream.JsonParsingException
+import com.google.protobuf.ByteString
+import com.nhaarman.mockito_kotlin.any
+import com.nhaarman.mockito_kotlin.mock
+import com.nhaarman.mockito_kotlin.never
+import com.nhaarman.mockito_kotlin.verify
+import com.nhaarman.mockito_kotlin.whenever
+import org.assertj.core.api.Assertions.assertThat
+import org.assertj.core.api.Assertions.fail
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+import org.mockito.ArgumentMatchers.anyList
+import org.mockito.ArgumentMatchers.anySet
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
+import org.onap.ves.VesEventV5.VesEvent
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import reactor.core.publisher.Flux
+import java.util.concurrent.ConcurrentLinkedQueue
+import javax.json.Json
+import javax.json.JsonArray
+import javax.json.JsonValue
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since August 2018
+ */
+internal class MessageStreamValidationTest : Spek({
+    lateinit var messageParametersParser: MessageParametersParser
+    lateinit var messageGenerator: MessageGenerator
+    lateinit var cut: MessageStreamValidation
+
+    beforeEachTest {
+        messageParametersParser = mock()
+        messageGenerator = mock()
+        cut = MessageStreamValidation(messageParametersParser, messageGenerator)
+    }
+
+    fun givenParsedMessageParameters(vararg params: MessageParameters) {
+        whenever(messageParametersParser.parse(any())).thenReturn(params.toList())
+    }
+
+    describe("validate") {
+
+        it("should return error when JSON is invalid") {
+            // when
+            val result = cut.validate("[{invalid json}]".byteInputStream(), listOf()).attempt().unsafeRunSync()
+
+            // then
+            when(result) {
+                is Either.Left -> assertThat(result.a).isInstanceOf(JsonParsingException::class.java)
+                else -> fail("validation should fail")
+            }
+        }
+
+        it("should return error when message param list is empty") {
+            // given
+            givenParsedMessageParameters()
+
+            // when
+            val result = cut.validate(sampleJsonAsStream(), listOf()).attempt().unsafeRunSync()
+
+            // then
+            assertThat(result.isLeft()).isTrue()
+        }
+
+        describe("when validating headers only") {
+            it("should return true when messages are the same") {
+                // given
+                val jsonAsStream = sampleJsonAsStream()
+                val event = vesEvent()
+                val generatedWireProtocolFrame = PayloadWireFrameMessage(event.toByteArray())
+                val receivedMessageBytes = event.toByteArray()
+
+                givenParsedMessageParameters(MessageParameters(event.commonEventHeader, MessageType.VALID, 1))
+                whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
+
+                // when
+                val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
+
+                // then
+                assertThat(result).isTrue()
+            }
+
+            it("should return true when messages differ with payload only") {
+                // given
+                val jsonAsStream = sampleJsonAsStream()
+                val generatedEvent = vesEvent(payload = "payload A")
+                val receivedEvent = vesEvent(payload = "payload B")
+                val generatedWireProtocolFrame = PayloadWireFrameMessage(generatedEvent.toByteArray())
+                val receivedMessageBytes = receivedEvent.toByteArray()
+
+                givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.VALID, 1))
+                whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
+
+                // when
+                val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
+
+                // then
+                assertThat(result).isTrue()
+            }
+
+            it("should return false when messages are different") {
+                // given
+                val jsonAsStream = sampleJsonAsStream()
+                val generatedEvent = vesEvent()
+                val receivedEvent = vesEvent(eventId = "bbb")
+                val generatedWireProtocolFrame = PayloadWireFrameMessage(generatedEvent.toByteArray())
+                val receivedMessageBytes = receivedEvent.toByteArray()
+
+                givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.VALID, 1))
+                whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
+
+                // when
+                val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
+
+                // then
+                assertThat(result).isFalse()
+            }
+        }
+
+        describe("when validating whole messages") {
+            it("should return true when messages are the same") {
+                // given
+                val jsonAsStream = sampleJsonAsStream()
+                val event = vesEvent()
+                val generatedWireProtocolFrame = PayloadWireFrameMessage(event.toByteArray())
+                val receivedMessageBytes = event.toByteArray()
+
+                givenParsedMessageParameters(MessageParameters(event.commonEventHeader, MessageType.FIXED_PAYLOAD, 1))
+                whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
+
+                // when
+                val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
+
+                // then
+                assertThat(result).isTrue()
+            }
+
+            it("should return false when messages differ with payload only") {
+                // given
+                val jsonAsStream = sampleJsonAsStream()
+                val generatedEvent = vesEvent(payload = "payload A")
+                val receivedEvent = vesEvent(payload = "payload B")
+                val generatedWireProtocolFrame = PayloadWireFrameMessage(generatedEvent.toByteArray())
+                val receivedMessageBytes = receivedEvent.toByteArray()
+
+                givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.FIXED_PAYLOAD, 1))
+                whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
+
+                // when
+                val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
+
+                // then
+                assertThat(result).isFalse()
+            }
+
+            it("should return false when messages are different") {
+                // given
+                val jsonAsStream = sampleJsonAsStream()
+                val generatedEvent = vesEvent()
+                val receivedEvent = vesEvent("bbb")
+                val generatedWireProtocolFrame = PayloadWireFrameMessage(generatedEvent.toByteArray())
+                val receivedMessageBytes = receivedEvent.toByteArray()
+
+                givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.FIXED_PAYLOAD, 1))
+                whenever(messageGenerator.createMessageFlux(anyList())).thenReturn(Flux.just(generatedWireProtocolFrame))
+
+                // when
+                val result = cut.validate(jsonAsStream, listOf(receivedMessageBytes)).unsafeRunSync()
+
+                // then
+                assertThat(result).isFalse()
+            }
+        }
+    }
+})
+
+
+
+private const val DUMMY_EVENT_ID = "aaa"
+private const val DUMMY_PAYLOAD = "payload"
+
+private fun vesEvent(eventId: String = DUMMY_EVENT_ID, payload: String = DUMMY_PAYLOAD): VesEvent {
+    return VesEvent.newBuilder()
+            .setCommonEventHeader(CommonEventHeader.newBuilder()
+                    .setEventId(eventId))
+            .setHvRanMeasFields(ByteString.copyFrom(payload.toByteArray()))
+            .build()
+}
+
+private const val sampleJsonArray = """["headersOnly"]"""
+
+private fun sampleJsonAsStream() = sampleJsonArray.byteInputStream()
diff --git a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSourceTest.kt b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSourceTest.kt
new file mode 100644
index 0000000..de74f62
--- /dev/null
+++ b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSourceTest.kt
@@ -0,0 +1,54 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters
+
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk></piotr.jaszczyk>@nokia.com>
+ * @since August 2018
+ */
+internal class KafkaSourceTest : Spek({
+    val servers = "kafka1:9080,kafka2:9080"
+    val topics = setOf("topic1", "topic2")
+
+    describe("receiver options") {
+        val options = KafkaSource.createReceiverOptions(servers, topics)!!.toImmutable()
+
+        fun verifyProperty(key: String, expectedValue: Any) {
+            it("should have $key option set") {
+                assertThat(options.consumerProperty(key))
+                        .isEqualTo(expectedValue)
+            }
+        }
+
+        verifyProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers)
+        verifyProperty(ConsumerConfig.CLIENT_ID_CONFIG, "hv-collector-dcae-app-simulator")
+        verifyProperty(ConsumerConfig.GROUP_ID_CONFIG, "hv-collector-simulators")
+        verifyProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer::class.java)
+        verifyProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer::class.java)
+        verifyProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+    }
+})
\ No newline at end of file
diff --git a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgDcaeAppSimConfigurationTest.kt b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfigurationTest.kt
similarity index 98%
rename from hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgDcaeAppSimConfigurationTest.kt
rename to hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfigurationTest.kt
index 7d88793..e7a22fc 100644
--- a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgDcaeAppSimConfigurationTest.kt
+++ b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfigurationTest.kt
@@ -17,7 +17,7 @@
  * limitations under the License.
  * ============LICENSE_END=========================================================
  */
-package org.onap.dcae.collectors.veshv.simulators.dcaeapp.config
+package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config
 
 import org.assertj.core.api.Assertions.assertThat
 import org.jetbrains.spek.api.Spek
diff --git a/hv-collector-dcae-app-simulator/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/hv-collector-dcae-app-simulator/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
new file mode 100644
index 0000000..ca6ee9c
--- /dev/null
+++ b/hv-collector-dcae-app-simulator/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
@@ -0,0 +1 @@
+mock-maker-inline
\ No newline at end of file
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt
index b2e4250..c61ab26 100644
--- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt
+++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt
@@ -31,7 +31,7 @@
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since June 2018
  */
-class WireFrameEncoder(private val allocator: ByteBufAllocator) {
+class WireFrameEncoder(private val allocator: ByteBufAllocator = ByteBufAllocator.DEFAULT) {
 
     fun encode(frame: PayloadWireFrameMessage): ByteBuf {
         val bb = allocator.buffer(PayloadWireFrameMessage.HEADER_SIZE + frame.payload.size())
diff --git a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt
index 89d1f32..fa63c36 100644
--- a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt
+++ b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt
@@ -40,7 +40,7 @@
  */
 object WireFrameCodecsTest : Spek({
     val payloadAsString = "coffeebabe"
-    val encoder = WireFrameEncoder(UnpooledByteBufAllocator.DEFAULT)
+    val encoder = WireFrameEncoder()
     val decoder = WireFrameDecoder()
 
     fun createSampleFrame() =
diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt
index 39964c1..844d18d 100644
--- a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt
+++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt
@@ -20,12 +20,15 @@
 package org.onap.dcae.collectors.veshv.utils.arrow
 
 import arrow.core.Either
+import arrow.core.Option
 import arrow.core.identity
+import java.util.concurrent.atomic.AtomicReference
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since July 2018
  */
 
-
 fun <A> Either<A, A>.flatten() = fold(::identity, ::identity)
+
+fun <A> AtomicReference<A>.getOption() = Option.fromNullable(get())
diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt
index e37b0d7..cef537e 100644
--- a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt
+++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/effects.kt
@@ -20,7 +20,11 @@
 package org.onap.dcae.collectors.veshv.utils.arrow
 
 import arrow.core.Either
+import arrow.core.Left
+import arrow.core.Right
 import arrow.effects.IO
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
 import kotlin.system.exitProcess
 
 /**
@@ -47,3 +51,15 @@
 
 
 fun IO<Any>.void() = map { Unit }
+
+fun <T> Mono<T>.asIo() = IO.async<T> { proc ->
+    subscribe({ proc(Right(it)) }, { proc(Left(it)) })
+}
+
+fun <T> Flux<IO<T>>.evaluateIo(): Flux<T> =
+        flatMap { io ->
+            io.attempt().unsafeRunSync().fold(
+                    { Flux.error<T>(it) },
+                    { Flux.just<T>(it) }
+            )
+        }
diff --git a/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/CoreKtTest.kt b/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/CoreKtTest.kt
new file mode 100644
index 0000000..585851b
--- /dev/null
+++ b/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/CoreKtTest.kt
@@ -0,0 +1,63 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.utils.arrow
+
+import arrow.core.None
+import arrow.core.Some
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import java.util.concurrent.atomic.AtomicReference
+
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk></piotr.jaszczyk>@nokia.com>
+ * @since August 2018
+ */
+internal class CoreKtTest: Spek({
+    describe("AtomicReference.getOption") {
+        given("empty atomic reference") {
+            val atomicReference = AtomicReference<String>()
+
+            on("getOption") {
+                val result = atomicReference.getOption()
+
+                it("should be None") {
+                    assertThat(result).isEqualTo(None)
+                }
+            }
+        }
+        given("non-empty atomic reference") {
+            val initialValue = "reksio"
+            val atomicReference = AtomicReference(initialValue)
+
+            on("getOption") {
+                val result = atomicReference.getOption()
+
+                it("should be Some($initialValue)") {
+                    assertThat(result).isEqualTo(Some(initialValue))
+                }
+            }
+        }
+    }
+})
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 7ab6503..03dac1f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -560,6 +560,11 @@
                 <version>${arrow.version}</version>
             </dependency>
             <dependency>
+                <groupId>io.arrow-kt</groupId>
+                <artifactId>arrow-effects-reactor</artifactId>
+                <version>${arrow.version}</version>
+            </dependency>
+            <dependency>
                 <groupId>ch.qos.logback</groupId>
                 <artifactId>logback-classic</artifactId>
                 <version>1.3.0-alpha4</version>