Merge "Fix Common Event Header fields validation"
diff --git a/hv-collector-analysis/pom.xml b/hv-collector-analysis/pom.xml
index 560cc58..9ace275 100644
--- a/hv-collector-analysis/pom.xml
+++ b/hv-collector-analysis/pom.xml
@@ -32,7 +32,7 @@
 
     <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
     <artifactId>hv-collector-analysis</artifactId>
-    <version>4.0.0-SNAPSHOT</version>
+    <version>1.1.0-SNAPSHOT</version>
     <description>VES HighVolume Collector :: Code analysis configuration</description>
 
     <build>
diff --git a/hv-collector-core/pom.xml b/hv-collector-core/pom.xml
index ddfa3ed..a813529 100644
--- a/hv-collector-core/pom.xml
+++ b/hv-collector-core/pom.xml
@@ -33,7 +33,7 @@
     <parent>
         <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
         <artifactId>ves-hv-collector</artifactId>
-        <version>4.0.0-SNAPSHOT</version>
+        <version>1.1.0-SNAPSHOT</version>
         <relativePath>..</relativePath>
     </parent>
 
@@ -111,7 +111,7 @@
             <artifactId>reactor-extra</artifactId>
         </dependency>
         <dependency>
-            <groupId>io.projectreactor.ipc</groupId>
+            <groupId>io.projectreactor.netty</groupId>
             <artifactId>reactor-netty</artifactId>
         </dependency>
         <dependency>
@@ -127,36 +127,6 @@
             <artifactId>javax.json</artifactId>
             <scope>runtime</scope>
         </dependency>
-
-
-        <dependency>
-            <groupId>com.nhaarman</groupId>
-            <artifactId>mockito-kotlin</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.mockito</groupId>
-            <artifactId>mockito-core</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.assertj</groupId>
-            <artifactId>assertj-core</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.jetbrains.kotlin</groupId>
-            <artifactId>kotlin-test</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.jetbrains.spek</groupId>
-            <artifactId>spek-api</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.jetbrains.spek</groupId>
-            <artifactId>spek-junit-platform-engine</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>io.projectreactor</groupId>
-            <artifactId>reactor-test</artifactId>
-        </dependency>
         <dependency>
             <groupId>ch.qos.logback</groupId>
             <artifactId>logback-classic</artifactId>
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
index 07b5c82..78afe9f 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
@@ -23,7 +23,7 @@
 import org.onap.dcae.collectors.veshv.boundary.SinkProvider
 import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider
 import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
-import reactor.ipc.netty.http.client.HttpClient
+import reactor.netty.http.client.HttpClient
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
index d08ad9e..af4bbaa 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
@@ -125,7 +125,7 @@
     }
 
     companion object {
-        private const val MAX_RETRIES = 5
+        private const val MAX_RETRIES = 5L
         private const val BACKOFF_INTERVAL_FACTOR = 30L
         private val logger = Logger(ConsulConfigurationProvider::class)
     }
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt
index 4503955..1672158 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt
@@ -19,10 +19,11 @@
  */
 package org.onap.dcae.collectors.veshv.impl.adapters
 
+import io.netty.handler.codec.http.HttpStatusClass
 import org.slf4j.LoggerFactory
 import reactor.core.publisher.Mono
-import reactor.core.publisher.toMono
-import reactor.ipc.netty.http.client.HttpClient
+import reactor.netty.http.client.HttpClient
+import java.lang.IllegalStateException
 import java.nio.charset.Charset
 
 /**
@@ -34,14 +35,18 @@
     private val logger = LoggerFactory.getLogger(HttpAdapter::class.java)
 
     open fun get(url: String, queryParams: Map<String, Any> = emptyMap()): Mono<String> = httpClient
-            .get(url + createQueryString(queryParams))
+            .get()
+            .uri(url + createQueryString(queryParams))
+            .responseSingle { response, content ->
+                if (response.status().codeClass() == HttpStatusClass.SUCCESS)
+                    content.asString()
+                else
+                    Mono.error(IllegalStateException("$url ${response.status().code()} ${response.status().reasonPhrase()}"))
+            }
             .doOnError {
                 logger.error("Failed to get resource on path: $url (${it.localizedMessage})")
                 logger.debug("Nested exception:", it)
             }
-            .flatMap { it.receiveContent().toMono() }
-            .map { it.content().toString(Charset.defaultCharset()) }
-
 
     private fun createQueryString(params: Map<String, Any>): String {
         if (params.isEmpty())
@@ -57,7 +62,7 @@
 
         }
 
-        return  builder.removeSuffix("&").toString()
+        return builder.removeSuffix("&").toString()
     }
 
 }
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
index 7a47cfc..e535300 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
@@ -19,9 +19,8 @@
  */
 package org.onap.dcae.collectors.veshv.impl.socket
 
-import arrow.core.Option
+import arrow.core.getOrElse
 import arrow.effects.IO
-import io.netty.handler.ssl.SslContext
 import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
 import org.onap.dcae.collectors.veshv.boundary.Server
 import org.onap.dcae.collectors.veshv.model.ServerConfiguration
@@ -29,15 +28,13 @@
 import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
 import org.onap.dcae.collectors.veshv.utils.ServerHandle
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.reactivestreams.Publisher
 import reactor.core.publisher.Mono
-import reactor.ipc.netty.ByteBufFlux
-import reactor.ipc.netty.NettyInbound
-import reactor.ipc.netty.NettyOutbound
-import reactor.ipc.netty.options.ServerOptions
-import reactor.ipc.netty.tcp.TcpServer
+import reactor.netty.ByteBufFlux
+import reactor.netty.Connection
+import reactor.netty.NettyInbound
+import reactor.netty.NettyOutbound
+import reactor.netty.tcp.TcpServer
 import java.time.Duration
-import java.util.function.BiFunction
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -48,63 +45,65 @@
                               private val collectorProvider: CollectorProvider) : Server {
 
     override fun start(): IO<ServerHandle> = IO {
-        val ctx = TcpServer.builder()
-                .options(this::configureServer)
-                .build()
-                .start(BiFunction<NettyInbound, NettyOutbound, Publisher<Void>> { input, _ ->
-                    handleConnection(input)
-                })
-        NettyServerHandle(ctx)
+        val tcpServer = TcpServer.create()
+                .addressSupplier { serverConfig.serverListenAddress }
+                .configureSsl()
+                .handle(this::handleConnection)
+
+        NettyServerHandle(tcpServer.bindNow())
     }
 
-    private fun configureServer(opts: ServerOptions.Builder<*>) {
-        val sslContext: Option<SslContext> = sslContextFactory.createSslContext(serverConfig.securityConfiguration)
-        if (sslContext.isDefined()) opts.sslContext(sslContext.orNull())
-        opts.port(serverConfig.listenPort)
-    }
+    private fun TcpServer.configureSsl() =
+            sslContextFactory
+                    .createSslContext(serverConfig.securityConfiguration)
+                    .map { sslContext ->
+                        this.secure { b -> b.sslContext(sslContext) }
+                    }.getOrElse { this }
 
-    private fun handleConnection(nettyInbound: NettyInbound): Mono<Void> =
+    private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> =
             collectorProvider().fold(
                     {
-                        logger.warn { "Collector not ready. Closing connection from ${nettyInbound.remoteAddress()}..." }
+                        nettyInbound.withConnection { conn ->
+                            logger.warn { "Collector not ready. Closing connection from ${conn.address()}..." }
+                        }
                         Mono.empty()
                     },
                     {
-                        logger.info { "Handling connection from ${nettyInbound.remoteAddress()}" }
-                        val allocator = nettyInbound.context().channel().alloc()
-                        it.handleConnection(allocator, createDataStream(nettyInbound))
+                        nettyInbound.withConnection { conn ->
+                            logger.info { "Handling connection from ${conn.address()}" }
+                            conn.configureIdleTimeout(serverConfig.idleTimeout)
+                                    .logConnectionClosed()
+                        }
+                        it.handleConnection(nettyOutbound.alloc(), createDataStream(nettyInbound))
                     }
             )
 
-
-    fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound
-            .configureIdleTimeout(serverConfig.idleTimeout)
-            .logConnectionClosed()
+    private fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound
             .receive()
             .retain()
 
-    private fun NettyInbound.configureIdleTimeout(timeout: Duration): NettyInbound {
+    private fun Connection.configureIdleTimeout(timeout: Duration): Connection {
         onReadIdle(timeout.toMillis()) {
             logger.info {
-                "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${remoteAddress()}..."
+                "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${address()}..."
             }
             disconnectClient()
         }
         return this
     }
 
-    private fun NettyInbound.disconnectClient() {
-        context().channel().close().addListener {
+    private fun Connection.disconnectClient() {
+        channel().close().addListener {
             if (it.isSuccess)
-                logger.debug { "Channel (${remoteAddress()}) closed successfully." }
+                logger.debug { "Channel (${address()}) closed successfully." }
             else
                 logger.warn("Channel close failed", it.cause())
         }
     }
 
-    private fun NettyInbound.logConnectionClosed(): NettyInbound {
-        context().onClose {
-            logger.info("Connection from ${remoteAddress()} has been closed")
+    private fun Connection.logConnectionClosed(): Connection {
+        onTerminate().subscribe {
+            logger.info("Connection from ${address()} has been closed")
         }
         return this
     }
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
index 7a7d934..8511768 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
@@ -20,6 +20,7 @@
 package org.onap.dcae.collectors.veshv.model
 
 import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
+import java.net.InetSocketAddress
 import java.time.Duration
 
 /**
@@ -27,10 +28,10 @@
  * @since May 2018
  */
 data class ServerConfiguration(
-        val listenPort: Int,
+        val serverListenAddress: InetSocketAddress,
         val configurationProviderParams: ConfigurationProviderParams,
         val securityConfiguration: SecurityConfiguration,
         val idleTimeout: Duration,
-        val healthCheckApiPort: Int,
+        val healthCheckApiListenAddress: InetSocketAddress,
         val maximumPayloadSizeBytes: Int,
         val dummyMode: Boolean = false)
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
index 9a6889c..7a1a4cd 100644
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
@@ -19,9 +19,9 @@
  */
 package org.onap.dcae.collectors.veshv.impl.adapters
 
-import com.nhaarman.mockito_kotlin.eq
-import com.nhaarman.mockito_kotlin.mock
-import com.nhaarman.mockito_kotlin.whenever
+import com.nhaarman.mockitokotlin2.eq
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.whenever
 import org.assertj.core.api.Assertions.assertThat
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt
index 123d8f7..91457fa 100644
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt
@@ -19,21 +19,15 @@
  */
 package org.onap.dcae.collectors.veshv.impl.adapters
 
-import com.nhaarman.mockito_kotlin.mock
-import com.nhaarman.mockito_kotlin.verify
-import com.nhaarman.mockito_kotlin.whenever
-import io.netty.buffer.Unpooled
-import io.netty.handler.codec.http.HttpContent
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.given
 import org.jetbrains.spek.api.dsl.it
-import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
-import reactor.ipc.netty.http.client.HttpClient
-import reactor.ipc.netty.http.client.HttpClientResponse
+import reactor.netty.http.client.HttpClient
+import reactor.netty.http.server.HttpServer
 import reactor.test.StepVerifier
-import java.nio.charset.Charset
+import reactor.test.test
 
 /**
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
@@ -42,66 +36,51 @@
 internal object HttpAdapterTest : Spek({
     describe("HttpAdapter") {
 
-        val httpClientMock: HttpClient = mock()
-        val httpAdapter = HttpAdapter(httpClientMock)
+        val httpServer = HttpServer.create()
+                .host("127.0.0.1")
+                .route { routes ->
+                    routes.get("/url") { req, resp ->
+                        resp.sendString(Mono.just(req.uri()))
+                    }
+                }
+                .bindNow()
+        val baseUrl = "http://${httpServer.host()}:${httpServer.port()}"
+        val httpAdapter = HttpAdapter(HttpClient.create().baseUrl(baseUrl))
+
+        afterGroup {
+            httpServer.disposeNow()
+        }
 
         given("url without query params") {
-            val initialUrl = "http://test-url"
-            whenever(httpClientMock.get(initialUrl)).thenReturn(Mono.empty())
+            val url = "/url"
 
             it("should not append query string") {
-                httpAdapter.get(initialUrl)
-                verify(httpClientMock).get(initialUrl)
+                httpAdapter.get(url).test()
+                        .expectNext(url)
+                        .verifyComplete()
             }
         }
 
         given("url with query params") {
-            val queryParams = mapOf(Pair("key", "value"))
-            val initialUrl = "http://test-url"
-            val expectedUrl = "http://test-url?key=value"
-            whenever(httpClientMock.get(expectedUrl)).thenReturn(Mono.empty())
+            val queryParams = mapOf(Pair("p", "the-value"))
+            val url = "/url"
 
-            it("should parse them to query string and append to url") {
-                httpAdapter.get(initialUrl, queryParams)
-                verify(httpClientMock).get(expectedUrl)
+            it("should add them as query string to the url") {
+                httpAdapter.get(url, queryParams).test()
+                        .expectNext("/url?p=the-value")
+                        .verifyComplete()
             }
         }
 
-        given("valid resource url") {
-            val validUrl = "http://valid-url/"
-            val responseContent = """{"key1": "value1", "key2": "value2"}"""
-            val httpResponse = createHttpResponseMock(responseContent)
-            whenever(httpClientMock.get(validUrl)).thenReturn(Mono.just(httpResponse))
-
-            it("should return response string") {
-                StepVerifier
-                        .create(httpAdapter.get(validUrl))
-                        .expectNext(responseContent)
-            }
-        }
-
-        given("invalid resource url") {
-            val invalidUrl = "http://invalid-url/"
-            val exceptionMessage = "Test exception"
-            whenever(httpClientMock.get(invalidUrl)).thenReturn(Mono.error(Exception(exceptionMessage)))
+        given("invalid url") {
+            val invalidUrl = "/wtf"
 
             it("should interrupt the flux") {
                 StepVerifier
                         .create(httpAdapter.get(invalidUrl))
-                        .verifyErrorMessage(exceptionMessage)
+                        .verifyError()
             }
         }
     }
 
-})
-
-fun createHttpResponseMock(content: String): HttpClientResponse {
-    val responseMock: HttpClientResponse = mock()
-    val contentMock: HttpContent = mock()
-    val contentByteBuff = Unpooled.copiedBuffer(content, Charset.defaultCharset())
-
-    whenever(responseMock.receiveContent()).thenReturn(Flux.just(contentMock))
-    whenever(contentMock.content()).thenReturn(contentByteBuff)
-
-    return responseMock
-}
+})
\ No newline at end of file
diff --git a/hv-collector-coverage/pom.xml b/hv-collector-coverage/pom.xml
index a6f3c8d..f1e5c74 100644
--- a/hv-collector-coverage/pom.xml
+++ b/hv-collector-coverage/pom.xml
@@ -33,7 +33,7 @@
     <parent>
         <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
         <artifactId>ves-hv-collector</artifactId>
-        <version>4.0.0-SNAPSHOT</version>
+        <version>1.1.0-SNAPSHOT</version>
         <relativePath>..</relativePath>
     </parent>
 
diff --git a/hv-collector-ct/pom.xml b/hv-collector-ct/pom.xml
index 2482d73..07da24b 100644
--- a/hv-collector-ct/pom.xml
+++ b/hv-collector-ct/pom.xml
@@ -33,7 +33,7 @@
     <parent>
         <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
         <artifactId>ves-hv-collector</artifactId>
-        <version>4.0.0-SNAPSHOT</version>
+        <version>1.1.0-SNAPSHOT</version>
         <relativePath>..</relativePath>
     </parent>
 
diff --git a/hv-collector-dcae-app-simulator/pom.xml b/hv-collector-dcae-app-simulator/pom.xml
index f0a8771..82e99f9 100644
--- a/hv-collector-dcae-app-simulator/pom.xml
+++ b/hv-collector-dcae-app-simulator/pom.xml
@@ -33,7 +33,7 @@
     <parent>
         <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
         <artifactId>ves-hv-collector</artifactId>
-        <version>4.0.0-SNAPSHOT</version>
+        <version>1.1.0-SNAPSHOT</version>
         <relativePath>..</relativePath>
     </parent>
 
@@ -141,34 +141,10 @@
             <artifactId>kotlin-stdlib-jdk8</artifactId>
         </dependency>
         <dependency>
-            <groupId>com.nhaarman</groupId>
-            <artifactId>mockito-kotlin</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.mockito</groupId>
-            <artifactId>mockito-core</artifactId>
-        </dependency>
-        <dependency>
             <groupId>org.assertj</groupId>
             <artifactId>assertj-core</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.jetbrains.kotlin</groupId>
-            <artifactId>kotlin-test</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.jetbrains.spek</groupId>
-            <artifactId>spek-api</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.jetbrains.spek</groupId>
-            <artifactId>spek-junit-platform-engine</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>io.projectreactor</groupId>
-            <artifactId>reactor-test</artifactId>
-        </dependency>
-        <dependency>
             <groupId>ch.qos.logback</groupId>
             <artifactId>logback-classic</artifactId>
             <scope>runtime</scope>
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt
index 83dceb6..17eeb5b 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt
@@ -19,10 +19,9 @@
 */
 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config
 
-import arrow.core.ForOption
 import arrow.core.Option
 import arrow.core.fix
-import arrow.instances.extensions
+import arrow.instances.option.monad.monad
 import arrow.typeclasses.binding
 import org.apache.commons.cli.CommandLine
 import org.apache.commons.cli.DefaultParser
@@ -45,26 +44,24 @@
     )
 
     override fun getConfiguration(cmdLine: CommandLine): Option<DcaeAppSimConfiguration> =
-            ForOption extensions {
-                binding {
-                    val listenPort = cmdLine
-                            .intValue(LISTEN_PORT)
-                            .bind()
-                    val maxPayloadSizeBytes = cmdLine
-                            .intValue(MAXIMUM_PAYLOAD_SIZE_BYTES, WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES)
-                    val kafkaBootstrapServers = cmdLine
-                            .stringValue(KAFKA_SERVERS)
-                            .bind()
-                    val kafkaTopics = cmdLine
-                            .stringValue(KAFKA_TOPICS)
-                            .map { it.split(",").toSet() }
-                            .bind()
+            Option.monad().binding {
+                val listenPort = cmdLine
+                        .intValue(LISTEN_PORT)
+                        .bind()
+                val maxPayloadSizeBytes = cmdLine
+                        .intValue(MAXIMUM_PAYLOAD_SIZE_BYTES, WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES)
+                val kafkaBootstrapServers = cmdLine
+                        .stringValue(KAFKA_SERVERS)
+                        .bind()
+                val kafkaTopics = cmdLine
+                        .stringValue(KAFKA_TOPICS)
+                        .map { it.split(",").toSet() }
+                        .bind()
 
-                    DcaeAppSimConfiguration(
-                            listenPort,
-                            maxPayloadSizeBytes,
-                            kafkaBootstrapServers,
-                            kafkaTopics)
-                }.fix()
-            }
+                DcaeAppSimConfiguration(
+                        listenPort,
+                        maxPayloadSizeBytes,
+                        kafkaBootstrapServers,
+                        kafkaTopics)
+            }.fix()
 }
diff --git a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt
index aceb746..e1641cb 100644
--- a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt
+++ b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt
@@ -24,20 +24,19 @@
 import arrow.core.Some
 import arrow.effects.IO
 import com.google.protobuf.ByteString
-import com.nhaarman.mockito_kotlin.any
-import com.nhaarman.mockito_kotlin.eq
-import com.nhaarman.mockito_kotlin.mock
-import com.nhaarman.mockito_kotlin.never
-import com.nhaarman.mockito_kotlin.verify
-import com.nhaarman.mockito_kotlin.whenever
+import com.nhaarman.mockitokotlin2.any
+import com.nhaarman.mockitokotlin2.eq
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.never
+import com.nhaarman.mockitokotlin2.verify
+import com.nhaarman.mockitokotlin2.whenever
 import org.assertj.core.api.Assertions.assertThat
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.it
 import org.mockito.ArgumentMatchers.anySet
-import org.mockito.Mockito
-import org.onap.ves.VesEventOuterClass.VesEvent
 import org.onap.ves.VesEventOuterClass.CommonEventHeader
+import org.onap.ves.VesEventOuterClass.VesEvent
 import java.util.concurrent.ConcurrentLinkedQueue
 
 /**
diff --git a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt
index 5e3090a..a631be7 100644
--- a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt
+++ b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt
@@ -22,9 +22,9 @@
 import arrow.core.Either
 import arrow.core.Right
 import com.google.protobuf.ByteString
-import com.nhaarman.mockito_kotlin.any
-import com.nhaarman.mockito_kotlin.mock
-import com.nhaarman.mockito_kotlin.whenever
+import com.nhaarman.mockitokotlin2.any
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.whenever
 import org.assertj.core.api.Assertions.assertThat
 import org.assertj.core.api.Assertions.fail
 import org.jetbrains.spek.api.Spek
diff --git a/hv-collector-domain/pom.xml b/hv-collector-domain/pom.xml
index 95b8d84..e03de3c 100644
--- a/hv-collector-domain/pom.xml
+++ b/hv-collector-domain/pom.xml
@@ -33,7 +33,7 @@
     <parent>
         <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
         <artifactId>ves-hv-collector</artifactId>
-        <version>4.0.0-SNAPSHOT</version>
+        <version>1.1.0-SNAPSHOT</version>
         <relativePath>..</relativePath>
     </parent>
 
@@ -96,7 +96,7 @@
             <artifactId>kotlin-stdlib-jdk8</artifactId>
         </dependency>
         <dependency>
-            <groupId>io.projectreactor.ipc</groupId>
+            <groupId>io.projectreactor.netty</groupId>
             <artifactId>reactor-netty</artifactId>
         </dependency>
         <dependency>
diff --git a/hv-collector-health-check/pom.xml b/hv-collector-health-check/pom.xml
index 45fa2e0..6ecdc95 100644
--- a/hv-collector-health-check/pom.xml
+++ b/hv-collector-health-check/pom.xml
@@ -19,7 +19,7 @@
     <parent>
         <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
         <artifactId>ves-hv-collector</artifactId>
-        <version>4.0.0-SNAPSHOT</version>
+        <version>1.1.0-SNAPSHOT</version>
         <relativePath>..</relativePath>
     </parent>
 
@@ -50,7 +50,7 @@
             <artifactId>kotlin-stdlib-jdk8</artifactId>
         </dependency>
         <dependency>
-            <groupId>io.projectreactor.ipc</groupId>
+            <groupId>io.projectreactor.netty</groupId>
             <artifactId>reactor-netty</artifactId>
         </dependency>
         <dependency>
diff --git a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt
index 753f73e..b4d7c14 100644
--- a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt
+++ b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt
@@ -27,26 +27,30 @@
 import org.onap.dcae.collectors.veshv.utils.ServerHandle
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
-import reactor.ipc.netty.http.server.HttpServer
-import reactor.ipc.netty.http.server.HttpServerRequest
-import reactor.ipc.netty.http.server.HttpServerResponse
+import reactor.netty.http.server.HttpServer
+import reactor.netty.http.server.HttpServerRequest
+import reactor.netty.http.server.HttpServerResponse
+import java.net.SocketAddress
 import java.util.concurrent.atomic.AtomicReference
 
 /**
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
  * @since August 2018
  */
-class HealthCheckApiServer(private val healthState: HealthState, private val port: Int) {
+class HealthCheckApiServer(private val healthState: HealthState,
+                           private val listenAddress: SocketAddress) {
 
     private val healthDescription: AtomicReference<HealthDescription> = AtomicReference(HealthDescription.STARTING)
 
     fun start(): IO<ServerHandle> = IO {
         healthState().subscribe(healthDescription::set)
-        val ctx = HttpServer.create(port).startRouter { routes ->
-            routes.get("/health/ready", ::readinessHandler)
-            routes.get("/health/alive", ::livenessHandler)
-        }
-        NettyServerHandle(ctx)
+        val ctx = HttpServer.create()
+                .tcpConfiguration { it.addressSupplier { listenAddress } }
+                .route { routes ->
+                    routes.get("/health/ready", ::readinessHandler)
+                    routes.get("/health/alive", ::livenessHandler)
+                }
+        NettyServerHandle(ctx.bindNow())
     }
 
     private fun readinessHandler(req: HttpServerRequest, resp: HttpServerResponse) =
@@ -55,6 +59,6 @@
             }
 
     private fun livenessHandler(req: HttpServerRequest, resp: HttpServerResponse) =
-                resp.status(HttpResponseStatus.NOT_IMPLEMENTED).sendString(Mono.just("Not implemented yet"))
+            resp.status(HttpResponseStatus.NOT_IMPLEMENTED).sendString(Mono.just("Not implemented yet"))
 
 }
diff --git a/hv-collector-main/pom.xml b/hv-collector-main/pom.xml
index fda519c..9e7101b 100644
--- a/hv-collector-main/pom.xml
+++ b/hv-collector-main/pom.xml
@@ -33,7 +33,7 @@
     <parent>
         <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
         <artifactId>ves-hv-collector</artifactId>
-        <version>4.0.0-SNAPSHOT</version>
+        <version>1.1.0-SNAPSHOT</version>
         <relativePath>..</relativePath>
     </parent>
 
@@ -137,30 +137,6 @@
             <groupId>io.micrometer</groupId>
             <artifactId>micrometer-registry-jmx</artifactId>
         </dependency>
-        <dependency>
-            <groupId>org.assertj</groupId>
-            <artifactId>assertj-core</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.jetbrains.kotlin</groupId>
-            <artifactId>kotlin-test</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.jetbrains.spek</groupId>
-            <artifactId>spek-api</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.jetbrains.spek</groupId>
-            <artifactId>spek-junit-platform-engine</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>com.nhaarman</groupId>
-            <artifactId>mockito-kotlin</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.mockito</groupId>
-            <artifactId>mockito-core</artifactId>
-        </dependency>
     </dependencies>
 
 
diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt
index 0f38219..81d916d 100644
--- a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt
+++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt
@@ -19,10 +19,8 @@
  */
 package org.onap.dcae.collectors.veshv.main
 
-import arrow.core.ForOption
 import arrow.core.Option
 import arrow.core.fix
-import arrow.instances.extensions
 import arrow.instances.option.monad.monad
 import arrow.typeclasses.binding
 import org.apache.commons.cli.CommandLine
@@ -49,6 +47,7 @@
 import org.onap.dcae.collectors.veshv.utils.commandline.intValue
 import org.onap.dcae.collectors.veshv.utils.commandline.longValue
 import org.onap.dcae.collectors.veshv.utils.commandline.stringValue
+import java.net.InetSocketAddress
 import java.time.Duration
 
 internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration>(DefaultParser()) {
@@ -81,8 +80,8 @@
                 val security = createSecurityConfiguration(cmdLine).bind()
                 val configurationProviderParams = createConfigurationProviderParams(cmdLine).bind()
                 ServerConfiguration(
-                        healthCheckApiPort = healthCheckApiPort,
-                        listenPort = listenPort,
+                        serverListenAddress = InetSocketAddress(listenPort),
+                        healthCheckApiListenAddress = InetSocketAddress(healthCheckApiPort),
                         configurationProviderParams = configurationProviderParams,
                         securityConfiguration = security,
                         idleTimeout = Duration.ofSeconds(idleTimeoutSec),
@@ -91,24 +90,22 @@
             }.fix()
 
     private fun createConfigurationProviderParams(cmdLine: CommandLine): Option<ConfigurationProviderParams> =
-            ForOption extensions {
-                binding {
-                    val configUrl = cmdLine.stringValue(CONSUL_CONFIG_URL).bind()
-                    val firstRequestDelay = cmdLine.longValue(
-                            CONSUL_FIRST_REQUEST_DELAY,
-                            DefaultValues.CONSUL_FIRST_REQUEST_DELAY
-                    )
-                    val requestInterval = cmdLine.longValue(
-                            CONSUL_REQUEST_INTERVAL,
-                            DefaultValues.CONSUL_REQUEST_INTERVAL
-                    )
-                    ConfigurationProviderParams(
-                            configUrl,
-                            Duration.ofSeconds(firstRequestDelay),
-                            Duration.ofSeconds(requestInterval)
-                    )
-                }.fix()
-            }
+            Option.monad().binding {
+                val configUrl = cmdLine.stringValue(CONSUL_CONFIG_URL).bind()
+                val firstRequestDelay = cmdLine.longValue(
+                        CONSUL_FIRST_REQUEST_DELAY,
+                        DefaultValues.CONSUL_FIRST_REQUEST_DELAY
+                )
+                val requestInterval = cmdLine.longValue(
+                        CONSUL_REQUEST_INTERVAL,
+                        DefaultValues.CONSUL_REQUEST_INTERVAL
+                )
+                ConfigurationProviderParams(
+                        configUrl,
+                        Duration.ofSeconds(firstRequestDelay),
+                        Duration.ofSeconds(requestInterval)
+                )
+            }.fix()
 
     internal object DefaultValues {
         const val HEALTH_CHECK_API_PORT = 6060
diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt
index 04fc021..ae59da6 100644
--- a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt
+++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt
@@ -32,7 +32,9 @@
     override fun startServer(config: ServerConfiguration) = createHealthCheckServer(config).start()
 
     private fun createHealthCheckServer(config: ServerConfiguration) =
-            HealthCheckApiServer(HealthState.INSTANCE, config.healthCheckApiPort)
+            HealthCheckApiServer(
+                    HealthState.INSTANCE,
+                    config.healthCheckApiListenAddress)
 
     override fun serverStartedMessage(handle: ServerHandle) =
             "Health check server is up and listening on ${handle.host}:${handle.port}"
diff --git a/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt b/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt
index 0cf0bb2..1aac6a0 100644
--- a/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt
+++ b/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt
@@ -19,7 +19,6 @@
  */
 package org.onap.dcae.collectors.veshv.main
 
-import arrow.core.identity
 import org.assertj.core.api.Assertions.assertThat
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
@@ -70,12 +69,21 @@
                 )
             }
 
-            it("should set proper health check api port") {
-                assertThat(result.healthCheckApiPort).isEqualTo(healthCheckApiPort.toInt())
+            it("should set proper listen port") {
+                assertThat(result.serverListenAddress.port).isEqualTo(listenPort.toInt())
             }
 
-            it("should set proper listen port") {
-                assertThat(result.listenPort).isEqualTo(listenPort.toInt())
+
+            it("should set default listen address") {
+                assertThat(result.serverListenAddress.address.hostAddress).isEqualTo("0.0.0.0")
+            }
+
+            it("should set proper health check api port") {
+                assertThat(result.healthCheckApiListenAddress.port).isEqualTo(healthCheckApiPort.toInt())
+            }
+
+            it("should set default health check api address") {
+                assertThat(result.healthCheckApiListenAddress.address.hostAddress).isEqualTo("0.0.0.0")
             }
 
             it("should set proper first consul request delay") {
diff --git a/hv-collector-ssl/pom.xml b/hv-collector-ssl/pom.xml
index 1afa318..e9cbdc2 100644
--- a/hv-collector-ssl/pom.xml
+++ b/hv-collector-ssl/pom.xml
@@ -33,7 +33,7 @@
     <parent>
         <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
         <artifactId>ves-hv-collector</artifactId>
-        <version>4.0.0-SNAPSHOT</version>
+        <version>1.1.0-SNAPSHOT</version>
         <relativePath>..</relativePath>
     </parent>
 
diff --git a/hv-collector-test-utils/pom.xml b/hv-collector-test-utils/pom.xml
index 8e666a9..9dad2ac 100644
--- a/hv-collector-test-utils/pom.xml
+++ b/hv-collector-test-utils/pom.xml
@@ -14,7 +14,7 @@
     <parent>
         <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
         <artifactId>ves-hv-collector</artifactId>
-        <version>4.0.0-SNAPSHOT</version>
+        <version>1.1.0-SNAPSHOT</version>
         <relativePath>..</relativePath>
     </parent>
 
@@ -56,5 +56,30 @@
             <artifactId>assertj-core</artifactId>
             <scope>compile</scope>
         </dependency>
+        <dependency>
+            <groupId>org.jetbrains.kotlin</groupId>
+            <artifactId>kotlin-test</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.jetbrains.spek</groupId>
+            <artifactId>spek-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.jetbrains.spek</groupId>
+            <artifactId>spek-junit-platform-engine</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>io.projectreactor</groupId>
+            <artifactId>reactor-test</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.nhaarman.mockitokotlin2</groupId>
+            <artifactId>mockito-kotlin</artifactId>
+            <scope>compile</scope>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git a/hv-collector-utils/pom.xml b/hv-collector-utils/pom.xml
index 868dd95..d38ccb9 100644
--- a/hv-collector-utils/pom.xml
+++ b/hv-collector-utils/pom.xml
@@ -37,7 +37,7 @@
     <parent>
         <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
         <artifactId>ves-hv-collector</artifactId>
-        <version>4.0.0-SNAPSHOT</version>
+        <version>1.1.0-SNAPSHOT</version>
         <relativePath>..</relativePath>
     </parent>
 
@@ -103,14 +103,6 @@
             <artifactId>slf4j-api</artifactId>
         </dependency>
         <dependency>
-            <groupId>com.nhaarman</groupId>
-            <artifactId>mockito-kotlin</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.mockito</groupId>
-            <artifactId>mockito-core</artifactId>
-        </dependency>
-        <dependency>
             <groupId>org.assertj</groupId>
             <artifactId>assertj-core</artifactId>
         </dependency>
@@ -131,6 +123,10 @@
             <artifactId>reactor-test</artifactId>
         </dependency>
         <dependency>
+            <groupId>com.nhaarman.mockitokotlin2</groupId>
+            <artifactId>mockito-kotlin</artifactId>
+        </dependency>
+        <dependency>
             <groupId>ch.qos.logback</groupId>
             <artifactId>logback-classic</artifactId>
             <scope>test</scope>
diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt
index bb924f2..bdb63b6 100644
--- a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt
+++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt
@@ -20,7 +20,7 @@
 package org.onap.dcae.collectors.veshv.utils
 
 import arrow.effects.IO
-import reactor.ipc.netty.tcp.BlockingNettyContext
+import reactor.netty.DisposableServer
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -35,12 +35,12 @@
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
  * @since August 2018
  */
-class NettyServerHandle(private val ctx: BlockingNettyContext) : ServerHandle(ctx.host, ctx.port) {
+class NettyServerHandle(private val ctx: DisposableServer) : ServerHandle(ctx.host(), ctx.port()) {
     override fun shutdown() = IO {
-        ctx.shutdown()
+        ctx.disposeNow()
     }
 
     override fun await() = IO<Unit> {
-        ctx.context.channel().closeFuture().sync()
+        ctx.channel().closeFuture().sync()
     }
 }
diff --git a/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/LoggerTest.kt b/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/LoggerTest.kt
index b98131c..462aabe 100644
--- a/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/LoggerTest.kt
+++ b/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/LoggerTest.kt
@@ -19,10 +19,11 @@
  */
 package org.onap.dcae.collectors.veshv.utils.logging
 
-import com.nhaarman.mockito_kotlin.mock
-import com.nhaarman.mockito_kotlin.verify
-import com.nhaarman.mockito_kotlin.verifyNoMoreInteractions
-import com.nhaarman.mockito_kotlin.whenever
+import com.nhaarman.mockitokotlin2.any
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.verify
+import com.nhaarman.mockitokotlin2.verifyNoMoreInteractions
+import com.nhaarman.mockitokotlin2.whenever
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.it
diff --git a/hv-collector-ves-message-generator/pom.xml b/hv-collector-ves-message-generator/pom.xml
index 87728f6..dae42e2 100644
--- a/hv-collector-ves-message-generator/pom.xml
+++ b/hv-collector-ves-message-generator/pom.xml
@@ -33,7 +33,7 @@
     <parent>
         <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
         <artifactId>ves-hv-collector</artifactId>
-        <version>4.0.0-SNAPSHOT</version>
+        <version>1.1.0-SNAPSHOT</version>
         <relativePath>..</relativePath>
     </parent>
 
diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt
index cc1d16f..fa39ed1 100644
--- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt
+++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt
@@ -31,9 +31,8 @@
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.INVALID_WIRE_FRAME
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.TOO_BIG_PAYLOAD
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.VALID
-import org.onap.ves.VesEventOuterClass.VesEvent
 import org.onap.ves.VesEventOuterClass.CommonEventHeader
-
+import org.onap.ves.VesEventOuterClass.VesEvent
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
 import java.nio.charset.Charset
@@ -54,10 +53,17 @@
     private fun createMessageFlux(parameters: MessageParameters): Flux<WireFrameMessage> =
             Mono.fromCallable { createMessage(parameters.commonEventHeader, parameters.messageType) }
                     .let {
-                        if (parameters.amount < 0)
-                            it.repeat()
-                        else
-                            it.repeat(parameters.amount)
+                        when {
+                            parameters.amount < 0 ->
+                                // repeat forever
+                                it.repeat()
+                            parameters.amount == 0L ->
+                                // do not generate any message
+                                Flux.empty()
+                            else ->
+                                // send original message and additional amount-1 messages
+                                it.repeat(parameters.amount - 1)
+                        }
                     }
 
     private fun createMessage(commonEventHeader: CommonEventHeader, messageType: MessageType): WireFrameMessage =
diff --git a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImplTest.kt b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImplTest.kt
index ee76b78..e2aec7d 100644
--- a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImplTest.kt
+++ b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImplTest.kt
@@ -50,6 +50,7 @@
         val maxPayloadSizeBytes = 1024
         val generator = MessageGeneratorImpl(PayloadGenerator(), maxPayloadSizeBytes)
         given("single message parameters") {
+
             on("messages amount not specified in parameters") {
                 it("should create infinite flux") {
                     val limit = 1000L
@@ -64,6 +65,20 @@
                             .verifyComplete()
                 }
             }
+
+            on("messages amount = 0 specified in parameters") {
+                it("should create empty message flux") {
+                    generator
+                            .createMessageFlux(listOf(MessageParameters(
+                                    commonHeader(PERF3GPP),
+                                    MessageType.VALID,
+                                    0
+                            )))
+                            .test()
+                            .verifyComplete()
+                }
+            }
+
             on("messages amount specified in parameters") {
                 it("should create message flux of specified size") {
                     generator
@@ -77,6 +92,7 @@
                             .verifyComplete()
                 }
             }
+
             on("message type requesting valid message") {
                 it("should create flux of valid messages with given domain") {
                     generator
@@ -94,6 +110,7 @@
                             .verifyComplete()
                 }
             }
+
             on("message type requesting too big payload") {
                 it("should create flux of messages with given domain and payload exceeding threshold") {
 
@@ -112,6 +129,7 @@
                             .verifyComplete()
                 }
             }
+
             on("message type requesting invalid GPB data ") {
                 it("should create flux of messages with invalid payload") {
                     generator
@@ -130,6 +148,7 @@
                             .verifyComplete()
                 }
             }
+
             on("message type requesting invalid wire frame ") {
                 it("should create flux of messages with invalid version") {
                     generator
@@ -148,6 +167,7 @@
                             .verifyComplete()
                 }
             }
+
             on("message type requesting fixed payload") {
                 it("should create flux of valid messages with fixed payload") {
                     generator
diff --git a/hv-collector-xnf-simulator/pom.xml b/hv-collector-xnf-simulator/pom.xml
index 85ef090..6526915 100644
--- a/hv-collector-xnf-simulator/pom.xml
+++ b/hv-collector-xnf-simulator/pom.xml
@@ -33,7 +33,7 @@
     <parent>
         <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
         <artifactId>ves-hv-collector</artifactId>
-        <version>4.0.0-SNAPSHOT</version>
+        <version>1.1.0-SNAPSHOT</version>
         <relativePath>..</relativePath>
     </parent>
 
@@ -139,34 +139,6 @@
         </dependency>
         -->
         <dependency>
-            <groupId>com.nhaarman</groupId>
-            <artifactId>mockito-kotlin</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.mockito</groupId>
-            <artifactId>mockito-core</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.assertj</groupId>
-            <artifactId>assertj-core</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.jetbrains.kotlin</groupId>
-            <artifactId>kotlin-test</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.jetbrains.spek</groupId>
-            <artifactId>spek-api</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.jetbrains.spek</groupId>
-            <artifactId>spek-junit-platform-engine</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>io.projectreactor</groupId>
-            <artifactId>reactor-test</artifactId>
-        </dependency>
-        <dependency>
             <groupId>ch.qos.logback</groupId>
             <artifactId>logback-classic</artifactId>
             <scope>runtime</scope>
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt
index 7a280c1..8df416c 100644
--- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt
+++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt
@@ -35,8 +35,8 @@
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
 import reactor.core.publisher.ReplayProcessor
-import reactor.ipc.netty.NettyOutbound
-import reactor.ipc.netty.tcp.TcpClient
+import reactor.netty.NettyOutbound
+import reactor.netty.tcp.TcpClient
 
 /**
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
@@ -44,13 +44,12 @@
  */
 class VesHvClient(private val configuration: SimulatorConfiguration) {
 
-    private val client: TcpClient = TcpClient.builder()
-            .options { opts ->
-                opts.host(configuration.vesHost)
-                        .port(configuration.vesPort)
-                        .sslContext(createSslContext(configuration.security).orNull())
+    private val client: TcpClient = TcpClient.create()
+            .host(configuration.vesHost)
+            .port(configuration.vesPort)
+            .secure { sslSpec ->
+                createSslContext(configuration.security).fold({}, sslSpec::sslContext)
             }
-            .build()
 
     fun sendIo(messages: Flux<WireFrameMessage>) =
             sendRx(messages).then(Mono.just(Unit)).asIo()
@@ -58,7 +57,8 @@
     private fun sendRx(messages: Flux<WireFrameMessage>): Mono<Void> {
         val complete = ReplayProcessor.create<Void>(1)
         client
-                .newHandler { _, output -> handler(complete, messages, output) }
+                .handle { _, output -> handler(complete, messages, output) }
+                .connect()
                 .doOnError {
                     logger.info("Failed to connect to VesHvCollector on " +
                             "${configuration.vesHost}:${configuration.vesPort}")
@@ -94,12 +94,12 @@
     private fun createSslContext(config: SecurityConfiguration): Option<SslContext> =
             ClientSslContextFactory().createSslContext(config)
 
-    private fun NettyOutbound.logConnectionClosed(): NettyOutbound {
-        context().onClose {
-            logger.info { "Connection to ${context().address()} has been closed" }
-        }
-        return this
-    }
+    private fun NettyOutbound.logConnectionClosed() =
+            withConnection { conn ->
+                conn.onTerminate().subscribe {
+                    logger.info { "Connection to ${conn.address()} has been closed" }
+                }
+            }
 
     companion object {
         private val logger = Logger(VesHvClient::class)
diff --git a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt
index 2a78ed5..95510e7 100644
--- a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt
+++ b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt
@@ -23,10 +23,9 @@
 import arrow.core.None
 import arrow.core.Right
 import arrow.effects.IO
-import com.nhaarman.mockito_kotlin.any
-import com.nhaarman.mockito_kotlin.mock
-import com.nhaarman.mockito_kotlin.whenever
-import com.sun.xml.internal.messaging.saaj.util.ByteInputStream
+import com.nhaarman.mockitokotlin2.any
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.whenever
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.it
@@ -39,6 +38,7 @@
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
 import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError
 import reactor.core.publisher.Flux
+import java.io.ByteArrayInputStream
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -60,7 +60,7 @@
     describe("startSimulation") {
         it("should fail when empty input stream") {
             // given
-            val emptyInputStream = ByteInputStream()
+            val emptyInputStream = ByteArrayInputStream(byteArrayOf())
 
             // when
             val result = cut.startSimulation(emptyInputStream)
diff --git a/pom.xml b/pom.xml
index 12fad22..f54e4c1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -32,13 +32,13 @@
     <parent>
         <groupId>org.onap.oparent</groupId>
         <artifactId>oparent</artifactId>
-        <version>1.2.0</version>
+        <version>1.2.1</version>
         <relativePath/>
     </parent>
 
     <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
     <artifactId>ves-hv-collector</artifactId>
-    <version>4.0.0-SNAPSHOT</version>
+    <version>1.1.0-SNAPSHOT</version>
     <name>dcaegen2-collectors-veshv</name>
     <description>VES HighVolume Collector</description>
     <packaging>pom</packaging>
@@ -343,7 +343,7 @@
                     <dependency>
                         <groupId>${project.groupId}</groupId>
                         <artifactId>hv-collector-analysis</artifactId>
-                        <version>4.0.0-SNAPSHOT</version>
+                        <version>1.1.0-SNAPSHOT</version>
                     </dependency>
                 </dependencies>
             </plugin>
@@ -586,7 +586,7 @@
                 <groupId>io.projectreactor</groupId>
                 <artifactId>reactor-bom</artifactId>
                 <!-- remember to update netty native bindings versions -->
-                <version>Bismuth-SR11</version>
+                <version>Californium-SR2</version>
                 <type>pom</type>
                 <scope>import</scope>
             </dependency>
@@ -674,21 +674,9 @@
                 <scope>test</scope>
             </dependency>
             <dependency>
-                <groupId>com.nhaarman</groupId>
+                <groupId>com.nhaarman.mockitokotlin2</groupId>
                 <artifactId>mockito-kotlin</artifactId>
-                <version>1.5.0</version>
-                <scope>test</scope>
-                <exclusions>
-                    <exclusion>
-                        <groupId>org.mockito</groupId>
-                        <artifactId>mockito-core</artifactId>
-                    </exclusion>
-                </exclusions>
-            </dependency>
-            <dependency>
-                <groupId>org.mockito</groupId>
-                <artifactId>mockito-core</artifactId>
-                <version>2.18.3</version>
+                <version>2.0.0</version>
                 <scope>test</scope>
             </dependency>
             <dependency>
diff --git a/version.properties b/version.properties
index 967829d..7b8b963 100644
--- a/version.properties
+++ b/version.properties
@@ -1,5 +1,5 @@
-major=4
-minor=0
+major=1
+minor=1
 patch=0
 base_version=${major}.${minor}.${patch}
 release_version=${base_version}