Merge "Fix Common Event Header fields validation"
diff --git a/hv-collector-analysis/pom.xml b/hv-collector-analysis/pom.xml
index 560cc58..9ace275 100644
--- a/hv-collector-analysis/pom.xml
+++ b/hv-collector-analysis/pom.xml
@@ -32,7 +32,7 @@
<groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
<artifactId>hv-collector-analysis</artifactId>
- <version>4.0.0-SNAPSHOT</version>
+ <version>1.1.0-SNAPSHOT</version>
<description>VES HighVolume Collector :: Code analysis configuration</description>
<build>
diff --git a/hv-collector-core/pom.xml b/hv-collector-core/pom.xml
index ddfa3ed..a813529 100644
--- a/hv-collector-core/pom.xml
+++ b/hv-collector-core/pom.xml
@@ -33,7 +33,7 @@
<parent>
<groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
<artifactId>ves-hv-collector</artifactId>
- <version>4.0.0-SNAPSHOT</version>
+ <version>1.1.0-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
@@ -111,7 +111,7 @@
<artifactId>reactor-extra</artifactId>
</dependency>
<dependency>
- <groupId>io.projectreactor.ipc</groupId>
+ <groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty</artifactId>
</dependency>
<dependency>
@@ -127,36 +127,6 @@
<artifactId>javax.json</artifactId>
<scope>runtime</scope>
</dependency>
-
-
- <dependency>
- <groupId>com.nhaarman</groupId>
- <artifactId>mockito-kotlin</artifactId>
- </dependency>
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-core</artifactId>
- </dependency>
- <dependency>
- <groupId>org.assertj</groupId>
- <artifactId>assertj-core</artifactId>
- </dependency>
- <dependency>
- <groupId>org.jetbrains.kotlin</groupId>
- <artifactId>kotlin-test</artifactId>
- </dependency>
- <dependency>
- <groupId>org.jetbrains.spek</groupId>
- <artifactId>spek-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.jetbrains.spek</groupId>
- <artifactId>spek-junit-platform-engine</artifactId>
- </dependency>
- <dependency>
- <groupId>io.projectreactor</groupId>
- <artifactId>reactor-test</artifactId>
- </dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
index 07b5c82..78afe9f 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
@@ -23,7 +23,7 @@
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider
import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
-import reactor.ipc.netty.http.client.HttpClient
+import reactor.netty.http.client.HttpClient
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
index d08ad9e..af4bbaa 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
@@ -125,7 +125,7 @@
}
companion object {
- private const val MAX_RETRIES = 5
+ private const val MAX_RETRIES = 5L
private const val BACKOFF_INTERVAL_FACTOR = 30L
private val logger = Logger(ConsulConfigurationProvider::class)
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt
index 4503955..1672158 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt
@@ -19,10 +19,11 @@
*/
package org.onap.dcae.collectors.veshv.impl.adapters
+import io.netty.handler.codec.http.HttpStatusClass
import org.slf4j.LoggerFactory
import reactor.core.publisher.Mono
-import reactor.core.publisher.toMono
-import reactor.ipc.netty.http.client.HttpClient
+import reactor.netty.http.client.HttpClient
+import java.lang.IllegalStateException
import java.nio.charset.Charset
/**
@@ -34,14 +35,18 @@
private val logger = LoggerFactory.getLogger(HttpAdapter::class.java)
open fun get(url: String, queryParams: Map<String, Any> = emptyMap()): Mono<String> = httpClient
- .get(url + createQueryString(queryParams))
+ .get()
+ .uri(url + createQueryString(queryParams))
+ .responseSingle { response, content ->
+ if (response.status().codeClass() == HttpStatusClass.SUCCESS)
+ content.asString()
+ else
+ Mono.error(IllegalStateException("$url ${response.status().code()} ${response.status().reasonPhrase()}"))
+ }
.doOnError {
logger.error("Failed to get resource on path: $url (${it.localizedMessage})")
logger.debug("Nested exception:", it)
}
- .flatMap { it.receiveContent().toMono() }
- .map { it.content().toString(Charset.defaultCharset()) }
-
private fun createQueryString(params: Map<String, Any>): String {
if (params.isEmpty())
@@ -57,7 +62,7 @@
}
- return builder.removeSuffix("&").toString()
+ return builder.removeSuffix("&").toString()
}
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
index 7a47cfc..e535300 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
@@ -19,9 +19,8 @@
*/
package org.onap.dcae.collectors.veshv.impl.socket
-import arrow.core.Option
+import arrow.core.getOrElse
import arrow.effects.IO
-import io.netty.handler.ssl.SslContext
import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
import org.onap.dcae.collectors.veshv.boundary.Server
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
@@ -29,15 +28,13 @@
import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
import org.onap.dcae.collectors.veshv.utils.ServerHandle
import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.reactivestreams.Publisher
import reactor.core.publisher.Mono
-import reactor.ipc.netty.ByteBufFlux
-import reactor.ipc.netty.NettyInbound
-import reactor.ipc.netty.NettyOutbound
-import reactor.ipc.netty.options.ServerOptions
-import reactor.ipc.netty.tcp.TcpServer
+import reactor.netty.ByteBufFlux
+import reactor.netty.Connection
+import reactor.netty.NettyInbound
+import reactor.netty.NettyOutbound
+import reactor.netty.tcp.TcpServer
import java.time.Duration
-import java.util.function.BiFunction
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -48,63 +45,65 @@
private val collectorProvider: CollectorProvider) : Server {
override fun start(): IO<ServerHandle> = IO {
- val ctx = TcpServer.builder()
- .options(this::configureServer)
- .build()
- .start(BiFunction<NettyInbound, NettyOutbound, Publisher<Void>> { input, _ ->
- handleConnection(input)
- })
- NettyServerHandle(ctx)
+ val tcpServer = TcpServer.create()
+ .addressSupplier { serverConfig.serverListenAddress }
+ .configureSsl()
+ .handle(this::handleConnection)
+
+ NettyServerHandle(tcpServer.bindNow())
}
- private fun configureServer(opts: ServerOptions.Builder<*>) {
- val sslContext: Option<SslContext> = sslContextFactory.createSslContext(serverConfig.securityConfiguration)
- if (sslContext.isDefined()) opts.sslContext(sslContext.orNull())
- opts.port(serverConfig.listenPort)
- }
+ private fun TcpServer.configureSsl() =
+ sslContextFactory
+ .createSslContext(serverConfig.securityConfiguration)
+ .map { sslContext ->
+ this.secure { b -> b.sslContext(sslContext) }
+ }.getOrElse { this }
- private fun handleConnection(nettyInbound: NettyInbound): Mono<Void> =
+ private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> =
collectorProvider().fold(
{
- logger.warn { "Collector not ready. Closing connection from ${nettyInbound.remoteAddress()}..." }
+ nettyInbound.withConnection { conn ->
+ logger.warn { "Collector not ready. Closing connection from ${conn.address()}..." }
+ }
Mono.empty()
},
{
- logger.info { "Handling connection from ${nettyInbound.remoteAddress()}" }
- val allocator = nettyInbound.context().channel().alloc()
- it.handleConnection(allocator, createDataStream(nettyInbound))
+ nettyInbound.withConnection { conn ->
+ logger.info { "Handling connection from ${conn.address()}" }
+ conn.configureIdleTimeout(serverConfig.idleTimeout)
+ .logConnectionClosed()
+ }
+ it.handleConnection(nettyOutbound.alloc(), createDataStream(nettyInbound))
}
)
-
- fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound
- .configureIdleTimeout(serverConfig.idleTimeout)
- .logConnectionClosed()
+ private fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound
.receive()
.retain()
- private fun NettyInbound.configureIdleTimeout(timeout: Duration): NettyInbound {
+ private fun Connection.configureIdleTimeout(timeout: Duration): Connection {
onReadIdle(timeout.toMillis()) {
logger.info {
- "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${remoteAddress()}..."
+ "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${address()}..."
}
disconnectClient()
}
return this
}
- private fun NettyInbound.disconnectClient() {
- context().channel().close().addListener {
+ private fun Connection.disconnectClient() {
+ channel().close().addListener {
if (it.isSuccess)
- logger.debug { "Channel (${remoteAddress()}) closed successfully." }
+ logger.debug { "Channel (${address()}) closed successfully." }
else
logger.warn("Channel close failed", it.cause())
}
}
- private fun NettyInbound.logConnectionClosed(): NettyInbound {
- context().onClose {
- logger.info("Connection from ${remoteAddress()} has been closed")
+ private fun Connection.logConnectionClosed(): Connection {
+ onTerminate().subscribe {
+ logger.info("Connection from ${address()} has been closed")
}
return this
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
index 7a7d934..8511768 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
@@ -20,6 +20,7 @@
package org.onap.dcae.collectors.veshv.model
import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
+import java.net.InetSocketAddress
import java.time.Duration
/**
@@ -27,10 +28,10 @@
* @since May 2018
*/
data class ServerConfiguration(
- val listenPort: Int,
+ val serverListenAddress: InetSocketAddress,
val configurationProviderParams: ConfigurationProviderParams,
val securityConfiguration: SecurityConfiguration,
val idleTimeout: Duration,
- val healthCheckApiPort: Int,
+ val healthCheckApiListenAddress: InetSocketAddress,
val maximumPayloadSizeBytes: Int,
val dummyMode: Boolean = false)
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
index 9a6889c..7a1a4cd 100644
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
@@ -19,9 +19,9 @@
*/
package org.onap.dcae.collectors.veshv.impl.adapters
-import com.nhaarman.mockito_kotlin.eq
-import com.nhaarman.mockito_kotlin.mock
-import com.nhaarman.mockito_kotlin.whenever
+import com.nhaarman.mockitokotlin2.eq
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.whenever
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt
index 123d8f7..91457fa 100644
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt
@@ -19,21 +19,15 @@
*/
package org.onap.dcae.collectors.veshv.impl.adapters
-import com.nhaarman.mockito_kotlin.mock
-import com.nhaarman.mockito_kotlin.verify
-import com.nhaarman.mockito_kotlin.whenever
-import io.netty.buffer.Unpooled
-import io.netty.handler.codec.http.HttpContent
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
-import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
-import reactor.ipc.netty.http.client.HttpClient
-import reactor.ipc.netty.http.client.HttpClientResponse
+import reactor.netty.http.client.HttpClient
+import reactor.netty.http.server.HttpServer
import reactor.test.StepVerifier
-import java.nio.charset.Charset
+import reactor.test.test
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
@@ -42,66 +36,51 @@
internal object HttpAdapterTest : Spek({
describe("HttpAdapter") {
- val httpClientMock: HttpClient = mock()
- val httpAdapter = HttpAdapter(httpClientMock)
+ val httpServer = HttpServer.create()
+ .host("127.0.0.1")
+ .route { routes ->
+ routes.get("/url") { req, resp ->
+ resp.sendString(Mono.just(req.uri()))
+ }
+ }
+ .bindNow()
+ val baseUrl = "http://${httpServer.host()}:${httpServer.port()}"
+ val httpAdapter = HttpAdapter(HttpClient.create().baseUrl(baseUrl))
+
+ afterGroup {
+ httpServer.disposeNow()
+ }
given("url without query params") {
- val initialUrl = "http://test-url"
- whenever(httpClientMock.get(initialUrl)).thenReturn(Mono.empty())
+ val url = "/url"
it("should not append query string") {
- httpAdapter.get(initialUrl)
- verify(httpClientMock).get(initialUrl)
+ httpAdapter.get(url).test()
+ .expectNext(url)
+ .verifyComplete()
}
}
given("url with query params") {
- val queryParams = mapOf(Pair("key", "value"))
- val initialUrl = "http://test-url"
- val expectedUrl = "http://test-url?key=value"
- whenever(httpClientMock.get(expectedUrl)).thenReturn(Mono.empty())
+ val queryParams = mapOf(Pair("p", "the-value"))
+ val url = "/url"
- it("should parse them to query string and append to url") {
- httpAdapter.get(initialUrl, queryParams)
- verify(httpClientMock).get(expectedUrl)
+ it("should add them as query string to the url") {
+ httpAdapter.get(url, queryParams).test()
+ .expectNext("/url?p=the-value")
+ .verifyComplete()
}
}
- given("valid resource url") {
- val validUrl = "http://valid-url/"
- val responseContent = """{"key1": "value1", "key2": "value2"}"""
- val httpResponse = createHttpResponseMock(responseContent)
- whenever(httpClientMock.get(validUrl)).thenReturn(Mono.just(httpResponse))
-
- it("should return response string") {
- StepVerifier
- .create(httpAdapter.get(validUrl))
- .expectNext(responseContent)
- }
- }
-
- given("invalid resource url") {
- val invalidUrl = "http://invalid-url/"
- val exceptionMessage = "Test exception"
- whenever(httpClientMock.get(invalidUrl)).thenReturn(Mono.error(Exception(exceptionMessage)))
+ given("invalid url") {
+ val invalidUrl = "/wtf"
it("should interrupt the flux") {
StepVerifier
.create(httpAdapter.get(invalidUrl))
- .verifyErrorMessage(exceptionMessage)
+ .verifyError()
}
}
}
-})
-
-fun createHttpResponseMock(content: String): HttpClientResponse {
- val responseMock: HttpClientResponse = mock()
- val contentMock: HttpContent = mock()
- val contentByteBuff = Unpooled.copiedBuffer(content, Charset.defaultCharset())
-
- whenever(responseMock.receiveContent()).thenReturn(Flux.just(contentMock))
- whenever(contentMock.content()).thenReturn(contentByteBuff)
-
- return responseMock
-}
+})
\ No newline at end of file
diff --git a/hv-collector-coverage/pom.xml b/hv-collector-coverage/pom.xml
index a6f3c8d..f1e5c74 100644
--- a/hv-collector-coverage/pom.xml
+++ b/hv-collector-coverage/pom.xml
@@ -33,7 +33,7 @@
<parent>
<groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
<artifactId>ves-hv-collector</artifactId>
- <version>4.0.0-SNAPSHOT</version>
+ <version>1.1.0-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
diff --git a/hv-collector-ct/pom.xml b/hv-collector-ct/pom.xml
index 2482d73..07da24b 100644
--- a/hv-collector-ct/pom.xml
+++ b/hv-collector-ct/pom.xml
@@ -33,7 +33,7 @@
<parent>
<groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
<artifactId>ves-hv-collector</artifactId>
- <version>4.0.0-SNAPSHOT</version>
+ <version>1.1.0-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
diff --git a/hv-collector-dcae-app-simulator/pom.xml b/hv-collector-dcae-app-simulator/pom.xml
index f0a8771..82e99f9 100644
--- a/hv-collector-dcae-app-simulator/pom.xml
+++ b/hv-collector-dcae-app-simulator/pom.xml
@@ -33,7 +33,7 @@
<parent>
<groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
<artifactId>ves-hv-collector</artifactId>
- <version>4.0.0-SNAPSHOT</version>
+ <version>1.1.0-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
@@ -141,34 +141,10 @@
<artifactId>kotlin-stdlib-jdk8</artifactId>
</dependency>
<dependency>
- <groupId>com.nhaarman</groupId>
- <artifactId>mockito-kotlin</artifactId>
- </dependency>
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-core</artifactId>
- </dependency>
- <dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
</dependency>
<dependency>
- <groupId>org.jetbrains.kotlin</groupId>
- <artifactId>kotlin-test</artifactId>
- </dependency>
- <dependency>
- <groupId>org.jetbrains.spek</groupId>
- <artifactId>spek-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.jetbrains.spek</groupId>
- <artifactId>spek-junit-platform-engine</artifactId>
- </dependency>
- <dependency>
- <groupId>io.projectreactor</groupId>
- <artifactId>reactor-test</artifactId>
- </dependency>
- <dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<scope>runtime</scope>
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt
index 83dceb6..17eeb5b 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt
@@ -19,10 +19,9 @@
*/
package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config
-import arrow.core.ForOption
import arrow.core.Option
import arrow.core.fix
-import arrow.instances.extensions
+import arrow.instances.option.monad.monad
import arrow.typeclasses.binding
import org.apache.commons.cli.CommandLine
import org.apache.commons.cli.DefaultParser
@@ -45,26 +44,24 @@
)
override fun getConfiguration(cmdLine: CommandLine): Option<DcaeAppSimConfiguration> =
- ForOption extensions {
- binding {
- val listenPort = cmdLine
- .intValue(LISTEN_PORT)
- .bind()
- val maxPayloadSizeBytes = cmdLine
- .intValue(MAXIMUM_PAYLOAD_SIZE_BYTES, WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES)
- val kafkaBootstrapServers = cmdLine
- .stringValue(KAFKA_SERVERS)
- .bind()
- val kafkaTopics = cmdLine
- .stringValue(KAFKA_TOPICS)
- .map { it.split(",").toSet() }
- .bind()
+ Option.monad().binding {
+ val listenPort = cmdLine
+ .intValue(LISTEN_PORT)
+ .bind()
+ val maxPayloadSizeBytes = cmdLine
+ .intValue(MAXIMUM_PAYLOAD_SIZE_BYTES, WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES)
+ val kafkaBootstrapServers = cmdLine
+ .stringValue(KAFKA_SERVERS)
+ .bind()
+ val kafkaTopics = cmdLine
+ .stringValue(KAFKA_TOPICS)
+ .map { it.split(",").toSet() }
+ .bind()
- DcaeAppSimConfiguration(
- listenPort,
- maxPayloadSizeBytes,
- kafkaBootstrapServers,
- kafkaTopics)
- }.fix()
- }
+ DcaeAppSimConfiguration(
+ listenPort,
+ maxPayloadSizeBytes,
+ kafkaBootstrapServers,
+ kafkaTopics)
+ }.fix()
}
diff --git a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt
index aceb746..e1641cb 100644
--- a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt
+++ b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt
@@ -24,20 +24,19 @@
import arrow.core.Some
import arrow.effects.IO
import com.google.protobuf.ByteString
-import com.nhaarman.mockito_kotlin.any
-import com.nhaarman.mockito_kotlin.eq
-import com.nhaarman.mockito_kotlin.mock
-import com.nhaarman.mockito_kotlin.never
-import com.nhaarman.mockito_kotlin.verify
-import com.nhaarman.mockito_kotlin.whenever
+import com.nhaarman.mockitokotlin2.any
+import com.nhaarman.mockitokotlin2.eq
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.never
+import com.nhaarman.mockitokotlin2.verify
+import com.nhaarman.mockitokotlin2.whenever
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
import org.mockito.ArgumentMatchers.anySet
-import org.mockito.Mockito
-import org.onap.ves.VesEventOuterClass.VesEvent
import org.onap.ves.VesEventOuterClass.CommonEventHeader
+import org.onap.ves.VesEventOuterClass.VesEvent
import java.util.concurrent.ConcurrentLinkedQueue
/**
diff --git a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt
index 5e3090a..a631be7 100644
--- a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt
+++ b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt
@@ -22,9 +22,9 @@
import arrow.core.Either
import arrow.core.Right
import com.google.protobuf.ByteString
-import com.nhaarman.mockito_kotlin.any
-import com.nhaarman.mockito_kotlin.mock
-import com.nhaarman.mockito_kotlin.whenever
+import com.nhaarman.mockitokotlin2.any
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.whenever
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.fail
import org.jetbrains.spek.api.Spek
diff --git a/hv-collector-domain/pom.xml b/hv-collector-domain/pom.xml
index 95b8d84..e03de3c 100644
--- a/hv-collector-domain/pom.xml
+++ b/hv-collector-domain/pom.xml
@@ -33,7 +33,7 @@
<parent>
<groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
<artifactId>ves-hv-collector</artifactId>
- <version>4.0.0-SNAPSHOT</version>
+ <version>1.1.0-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
@@ -96,7 +96,7 @@
<artifactId>kotlin-stdlib-jdk8</artifactId>
</dependency>
<dependency>
- <groupId>io.projectreactor.ipc</groupId>
+ <groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty</artifactId>
</dependency>
<dependency>
diff --git a/hv-collector-health-check/pom.xml b/hv-collector-health-check/pom.xml
index 45fa2e0..6ecdc95 100644
--- a/hv-collector-health-check/pom.xml
+++ b/hv-collector-health-check/pom.xml
@@ -19,7 +19,7 @@
<parent>
<groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
<artifactId>ves-hv-collector</artifactId>
- <version>4.0.0-SNAPSHOT</version>
+ <version>1.1.0-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
@@ -50,7 +50,7 @@
<artifactId>kotlin-stdlib-jdk8</artifactId>
</dependency>
<dependency>
- <groupId>io.projectreactor.ipc</groupId>
+ <groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty</artifactId>
</dependency>
<dependency>
diff --git a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt
index 753f73e..b4d7c14 100644
--- a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt
+++ b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt
@@ -27,26 +27,30 @@
import org.onap.dcae.collectors.veshv.utils.ServerHandle
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
-import reactor.ipc.netty.http.server.HttpServer
-import reactor.ipc.netty.http.server.HttpServerRequest
-import reactor.ipc.netty.http.server.HttpServerResponse
+import reactor.netty.http.server.HttpServer
+import reactor.netty.http.server.HttpServerRequest
+import reactor.netty.http.server.HttpServerResponse
+import java.net.SocketAddress
import java.util.concurrent.atomic.AtomicReference
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since August 2018
*/
-class HealthCheckApiServer(private val healthState: HealthState, private val port: Int) {
+class HealthCheckApiServer(private val healthState: HealthState,
+ private val listenAddress: SocketAddress) {
private val healthDescription: AtomicReference<HealthDescription> = AtomicReference(HealthDescription.STARTING)
fun start(): IO<ServerHandle> = IO {
healthState().subscribe(healthDescription::set)
- val ctx = HttpServer.create(port).startRouter { routes ->
- routes.get("/health/ready", ::readinessHandler)
- routes.get("/health/alive", ::livenessHandler)
- }
- NettyServerHandle(ctx)
+ val ctx = HttpServer.create()
+ .tcpConfiguration { it.addressSupplier { listenAddress } }
+ .route { routes ->
+ routes.get("/health/ready", ::readinessHandler)
+ routes.get("/health/alive", ::livenessHandler)
+ }
+ NettyServerHandle(ctx.bindNow())
}
private fun readinessHandler(req: HttpServerRequest, resp: HttpServerResponse) =
@@ -55,6 +59,6 @@
}
private fun livenessHandler(req: HttpServerRequest, resp: HttpServerResponse) =
- resp.status(HttpResponseStatus.NOT_IMPLEMENTED).sendString(Mono.just("Not implemented yet"))
+ resp.status(HttpResponseStatus.NOT_IMPLEMENTED).sendString(Mono.just("Not implemented yet"))
}
diff --git a/hv-collector-main/pom.xml b/hv-collector-main/pom.xml
index fda519c..9e7101b 100644
--- a/hv-collector-main/pom.xml
+++ b/hv-collector-main/pom.xml
@@ -33,7 +33,7 @@
<parent>
<groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
<artifactId>ves-hv-collector</artifactId>
- <version>4.0.0-SNAPSHOT</version>
+ <version>1.1.0-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
@@ -137,30 +137,6 @@
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-jmx</artifactId>
</dependency>
- <dependency>
- <groupId>org.assertj</groupId>
- <artifactId>assertj-core</artifactId>
- </dependency>
- <dependency>
- <groupId>org.jetbrains.kotlin</groupId>
- <artifactId>kotlin-test</artifactId>
- </dependency>
- <dependency>
- <groupId>org.jetbrains.spek</groupId>
- <artifactId>spek-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.jetbrains.spek</groupId>
- <artifactId>spek-junit-platform-engine</artifactId>
- </dependency>
- <dependency>
- <groupId>com.nhaarman</groupId>
- <artifactId>mockito-kotlin</artifactId>
- </dependency>
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-core</artifactId>
- </dependency>
</dependencies>
diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt
index 0f38219..81d916d 100644
--- a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt
+++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt
@@ -19,10 +19,8 @@
*/
package org.onap.dcae.collectors.veshv.main
-import arrow.core.ForOption
import arrow.core.Option
import arrow.core.fix
-import arrow.instances.extensions
import arrow.instances.option.monad.monad
import arrow.typeclasses.binding
import org.apache.commons.cli.CommandLine
@@ -49,6 +47,7 @@
import org.onap.dcae.collectors.veshv.utils.commandline.intValue
import org.onap.dcae.collectors.veshv.utils.commandline.longValue
import org.onap.dcae.collectors.veshv.utils.commandline.stringValue
+import java.net.InetSocketAddress
import java.time.Duration
internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration>(DefaultParser()) {
@@ -81,8 +80,8 @@
val security = createSecurityConfiguration(cmdLine).bind()
val configurationProviderParams = createConfigurationProviderParams(cmdLine).bind()
ServerConfiguration(
- healthCheckApiPort = healthCheckApiPort,
- listenPort = listenPort,
+ serverListenAddress = InetSocketAddress(listenPort),
+ healthCheckApiListenAddress = InetSocketAddress(healthCheckApiPort),
configurationProviderParams = configurationProviderParams,
securityConfiguration = security,
idleTimeout = Duration.ofSeconds(idleTimeoutSec),
@@ -91,24 +90,22 @@
}.fix()
private fun createConfigurationProviderParams(cmdLine: CommandLine): Option<ConfigurationProviderParams> =
- ForOption extensions {
- binding {
- val configUrl = cmdLine.stringValue(CONSUL_CONFIG_URL).bind()
- val firstRequestDelay = cmdLine.longValue(
- CONSUL_FIRST_REQUEST_DELAY,
- DefaultValues.CONSUL_FIRST_REQUEST_DELAY
- )
- val requestInterval = cmdLine.longValue(
- CONSUL_REQUEST_INTERVAL,
- DefaultValues.CONSUL_REQUEST_INTERVAL
- )
- ConfigurationProviderParams(
- configUrl,
- Duration.ofSeconds(firstRequestDelay),
- Duration.ofSeconds(requestInterval)
- )
- }.fix()
- }
+ Option.monad().binding {
+ val configUrl = cmdLine.stringValue(CONSUL_CONFIG_URL).bind()
+ val firstRequestDelay = cmdLine.longValue(
+ CONSUL_FIRST_REQUEST_DELAY,
+ DefaultValues.CONSUL_FIRST_REQUEST_DELAY
+ )
+ val requestInterval = cmdLine.longValue(
+ CONSUL_REQUEST_INTERVAL,
+ DefaultValues.CONSUL_REQUEST_INTERVAL
+ )
+ ConfigurationProviderParams(
+ configUrl,
+ Duration.ofSeconds(firstRequestDelay),
+ Duration.ofSeconds(requestInterval)
+ )
+ }.fix()
internal object DefaultValues {
const val HEALTH_CHECK_API_PORT = 6060
diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt
index 04fc021..ae59da6 100644
--- a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt
+++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt
@@ -32,7 +32,9 @@
override fun startServer(config: ServerConfiguration) = createHealthCheckServer(config).start()
private fun createHealthCheckServer(config: ServerConfiguration) =
- HealthCheckApiServer(HealthState.INSTANCE, config.healthCheckApiPort)
+ HealthCheckApiServer(
+ HealthState.INSTANCE,
+ config.healthCheckApiListenAddress)
override fun serverStartedMessage(handle: ServerHandle) =
"Health check server is up and listening on ${handle.host}:${handle.port}"
diff --git a/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt b/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt
index 0cf0bb2..1aac6a0 100644
--- a/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt
+++ b/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt
@@ -19,7 +19,6 @@
*/
package org.onap.dcae.collectors.veshv.main
-import arrow.core.identity
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
@@ -70,12 +69,21 @@
)
}
- it("should set proper health check api port") {
- assertThat(result.healthCheckApiPort).isEqualTo(healthCheckApiPort.toInt())
+ it("should set proper listen port") {
+ assertThat(result.serverListenAddress.port).isEqualTo(listenPort.toInt())
}
- it("should set proper listen port") {
- assertThat(result.listenPort).isEqualTo(listenPort.toInt())
+
+ it("should set default listen address") {
+ assertThat(result.serverListenAddress.address.hostAddress).isEqualTo("0.0.0.0")
+ }
+
+ it("should set proper health check api port") {
+ assertThat(result.healthCheckApiListenAddress.port).isEqualTo(healthCheckApiPort.toInt())
+ }
+
+ it("should set default health check api address") {
+ assertThat(result.healthCheckApiListenAddress.address.hostAddress).isEqualTo("0.0.0.0")
}
it("should set proper first consul request delay") {
diff --git a/hv-collector-ssl/pom.xml b/hv-collector-ssl/pom.xml
index 1afa318..e9cbdc2 100644
--- a/hv-collector-ssl/pom.xml
+++ b/hv-collector-ssl/pom.xml
@@ -33,7 +33,7 @@
<parent>
<groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
<artifactId>ves-hv-collector</artifactId>
- <version>4.0.0-SNAPSHOT</version>
+ <version>1.1.0-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
diff --git a/hv-collector-test-utils/pom.xml b/hv-collector-test-utils/pom.xml
index 8e666a9..9dad2ac 100644
--- a/hv-collector-test-utils/pom.xml
+++ b/hv-collector-test-utils/pom.xml
@@ -14,7 +14,7 @@
<parent>
<groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
<artifactId>ves-hv-collector</artifactId>
- <version>4.0.0-SNAPSHOT</version>
+ <version>1.1.0-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
@@ -56,5 +56,30 @@
<artifactId>assertj-core</artifactId>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>org.jetbrains.kotlin</groupId>
+ <artifactId>kotlin-test</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.jetbrains.spek</groupId>
+ <artifactId>spek-api</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.jetbrains.spek</groupId>
+ <artifactId>spek-junit-platform-engine</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-test</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.nhaarman.mockitokotlin2</groupId>
+ <artifactId>mockito-kotlin</artifactId>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/hv-collector-utils/pom.xml b/hv-collector-utils/pom.xml
index 868dd95..d38ccb9 100644
--- a/hv-collector-utils/pom.xml
+++ b/hv-collector-utils/pom.xml
@@ -37,7 +37,7 @@
<parent>
<groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
<artifactId>ves-hv-collector</artifactId>
- <version>4.0.0-SNAPSHOT</version>
+ <version>1.1.0-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
@@ -103,14 +103,6 @@
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
- <groupId>com.nhaarman</groupId>
- <artifactId>mockito-kotlin</artifactId>
- </dependency>
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-core</artifactId>
- </dependency>
- <dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
</dependency>
@@ -131,6 +123,10 @@
<artifactId>reactor-test</artifactId>
</dependency>
<dependency>
+ <groupId>com.nhaarman.mockitokotlin2</groupId>
+ <artifactId>mockito-kotlin</artifactId>
+ </dependency>
+ <dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<scope>test</scope>
diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt
index bb924f2..bdb63b6 100644
--- a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt
+++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt
@@ -20,7 +20,7 @@
package org.onap.dcae.collectors.veshv.utils
import arrow.effects.IO
-import reactor.ipc.netty.tcp.BlockingNettyContext
+import reactor.netty.DisposableServer
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -35,12 +35,12 @@
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since August 2018
*/
-class NettyServerHandle(private val ctx: BlockingNettyContext) : ServerHandle(ctx.host, ctx.port) {
+class NettyServerHandle(private val ctx: DisposableServer) : ServerHandle(ctx.host(), ctx.port()) {
override fun shutdown() = IO {
- ctx.shutdown()
+ ctx.disposeNow()
}
override fun await() = IO<Unit> {
- ctx.context.channel().closeFuture().sync()
+ ctx.channel().closeFuture().sync()
}
}
diff --git a/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/LoggerTest.kt b/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/LoggerTest.kt
index b98131c..462aabe 100644
--- a/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/LoggerTest.kt
+++ b/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/LoggerTest.kt
@@ -19,10 +19,11 @@
*/
package org.onap.dcae.collectors.veshv.utils.logging
-import com.nhaarman.mockito_kotlin.mock
-import com.nhaarman.mockito_kotlin.verify
-import com.nhaarman.mockito_kotlin.verifyNoMoreInteractions
-import com.nhaarman.mockito_kotlin.whenever
+import com.nhaarman.mockitokotlin2.any
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.verify
+import com.nhaarman.mockitokotlin2.verifyNoMoreInteractions
+import com.nhaarman.mockitokotlin2.whenever
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
diff --git a/hv-collector-ves-message-generator/pom.xml b/hv-collector-ves-message-generator/pom.xml
index 87728f6..dae42e2 100644
--- a/hv-collector-ves-message-generator/pom.xml
+++ b/hv-collector-ves-message-generator/pom.xml
@@ -33,7 +33,7 @@
<parent>
<groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
<artifactId>ves-hv-collector</artifactId>
- <version>4.0.0-SNAPSHOT</version>
+ <version>1.1.0-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt
index cc1d16f..fa39ed1 100644
--- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt
+++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt
@@ -31,9 +31,8 @@
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.INVALID_WIRE_FRAME
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.TOO_BIG_PAYLOAD
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.VALID
-import org.onap.ves.VesEventOuterClass.VesEvent
import org.onap.ves.VesEventOuterClass.CommonEventHeader
-
+import org.onap.ves.VesEventOuterClass.VesEvent
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import java.nio.charset.Charset
@@ -54,10 +53,17 @@
private fun createMessageFlux(parameters: MessageParameters): Flux<WireFrameMessage> =
Mono.fromCallable { createMessage(parameters.commonEventHeader, parameters.messageType) }
.let {
- if (parameters.amount < 0)
- it.repeat()
- else
- it.repeat(parameters.amount)
+ when {
+ parameters.amount < 0 ->
+ // repeat forever
+ it.repeat()
+ parameters.amount == 0L ->
+ // do not generate any message
+ Flux.empty()
+ else ->
+ // send original message and additional amount-1 messages
+ it.repeat(parameters.amount - 1)
+ }
}
private fun createMessage(commonEventHeader: CommonEventHeader, messageType: MessageType): WireFrameMessage =
diff --git a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImplTest.kt b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImplTest.kt
index ee76b78..e2aec7d 100644
--- a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImplTest.kt
+++ b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImplTest.kt
@@ -50,6 +50,7 @@
val maxPayloadSizeBytes = 1024
val generator = MessageGeneratorImpl(PayloadGenerator(), maxPayloadSizeBytes)
given("single message parameters") {
+
on("messages amount not specified in parameters") {
it("should create infinite flux") {
val limit = 1000L
@@ -64,6 +65,20 @@
.verifyComplete()
}
}
+
+ on("messages amount = 0 specified in parameters") {
+ it("should create empty message flux") {
+ generator
+ .createMessageFlux(listOf(MessageParameters(
+ commonHeader(PERF3GPP),
+ MessageType.VALID,
+ 0
+ )))
+ .test()
+ .verifyComplete()
+ }
+ }
+
on("messages amount specified in parameters") {
it("should create message flux of specified size") {
generator
@@ -77,6 +92,7 @@
.verifyComplete()
}
}
+
on("message type requesting valid message") {
it("should create flux of valid messages with given domain") {
generator
@@ -94,6 +110,7 @@
.verifyComplete()
}
}
+
on("message type requesting too big payload") {
it("should create flux of messages with given domain and payload exceeding threshold") {
@@ -112,6 +129,7 @@
.verifyComplete()
}
}
+
on("message type requesting invalid GPB data ") {
it("should create flux of messages with invalid payload") {
generator
@@ -130,6 +148,7 @@
.verifyComplete()
}
}
+
on("message type requesting invalid wire frame ") {
it("should create flux of messages with invalid version") {
generator
@@ -148,6 +167,7 @@
.verifyComplete()
}
}
+
on("message type requesting fixed payload") {
it("should create flux of valid messages with fixed payload") {
generator
diff --git a/hv-collector-xnf-simulator/pom.xml b/hv-collector-xnf-simulator/pom.xml
index 85ef090..6526915 100644
--- a/hv-collector-xnf-simulator/pom.xml
+++ b/hv-collector-xnf-simulator/pom.xml
@@ -33,7 +33,7 @@
<parent>
<groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
<artifactId>ves-hv-collector</artifactId>
- <version>4.0.0-SNAPSHOT</version>
+ <version>1.1.0-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
@@ -139,34 +139,6 @@
</dependency>
-->
<dependency>
- <groupId>com.nhaarman</groupId>
- <artifactId>mockito-kotlin</artifactId>
- </dependency>
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-core</artifactId>
- </dependency>
- <dependency>
- <groupId>org.assertj</groupId>
- <artifactId>assertj-core</artifactId>
- </dependency>
- <dependency>
- <groupId>org.jetbrains.kotlin</groupId>
- <artifactId>kotlin-test</artifactId>
- </dependency>
- <dependency>
- <groupId>org.jetbrains.spek</groupId>
- <artifactId>spek-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.jetbrains.spek</groupId>
- <artifactId>spek-junit-platform-engine</artifactId>
- </dependency>
- <dependency>
- <groupId>io.projectreactor</groupId>
- <artifactId>reactor-test</artifactId>
- </dependency>
- <dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<scope>runtime</scope>
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt
index 7a280c1..8df416c 100644
--- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt
+++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt
@@ -35,8 +35,8 @@
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.publisher.ReplayProcessor
-import reactor.ipc.netty.NettyOutbound
-import reactor.ipc.netty.tcp.TcpClient
+import reactor.netty.NettyOutbound
+import reactor.netty.tcp.TcpClient
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
@@ -44,13 +44,12 @@
*/
class VesHvClient(private val configuration: SimulatorConfiguration) {
- private val client: TcpClient = TcpClient.builder()
- .options { opts ->
- opts.host(configuration.vesHost)
- .port(configuration.vesPort)
- .sslContext(createSslContext(configuration.security).orNull())
+ private val client: TcpClient = TcpClient.create()
+ .host(configuration.vesHost)
+ .port(configuration.vesPort)
+ .secure { sslSpec ->
+ createSslContext(configuration.security).fold({}, sslSpec::sslContext)
}
- .build()
fun sendIo(messages: Flux<WireFrameMessage>) =
sendRx(messages).then(Mono.just(Unit)).asIo()
@@ -58,7 +57,8 @@
private fun sendRx(messages: Flux<WireFrameMessage>): Mono<Void> {
val complete = ReplayProcessor.create<Void>(1)
client
- .newHandler { _, output -> handler(complete, messages, output) }
+ .handle { _, output -> handler(complete, messages, output) }
+ .connect()
.doOnError {
logger.info("Failed to connect to VesHvCollector on " +
"${configuration.vesHost}:${configuration.vesPort}")
@@ -94,12 +94,12 @@
private fun createSslContext(config: SecurityConfiguration): Option<SslContext> =
ClientSslContextFactory().createSslContext(config)
- private fun NettyOutbound.logConnectionClosed(): NettyOutbound {
- context().onClose {
- logger.info { "Connection to ${context().address()} has been closed" }
- }
- return this
- }
+ private fun NettyOutbound.logConnectionClosed() =
+ withConnection { conn ->
+ conn.onTerminate().subscribe {
+ logger.info { "Connection to ${conn.address()} has been closed" }
+ }
+ }
companion object {
private val logger = Logger(VesHvClient::class)
diff --git a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt
index 2a78ed5..95510e7 100644
--- a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt
+++ b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt
@@ -23,10 +23,9 @@
import arrow.core.None
import arrow.core.Right
import arrow.effects.IO
-import com.nhaarman.mockito_kotlin.any
-import com.nhaarman.mockito_kotlin.mock
-import com.nhaarman.mockito_kotlin.whenever
-import com.sun.xml.internal.messaging.saaj.util.ByteInputStream
+import com.nhaarman.mockitokotlin2.any
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.whenever
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
@@ -39,6 +38,7 @@
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError
import reactor.core.publisher.Flux
+import java.io.ByteArrayInputStream
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -60,7 +60,7 @@
describe("startSimulation") {
it("should fail when empty input stream") {
// given
- val emptyInputStream = ByteInputStream()
+ val emptyInputStream = ByteArrayInputStream(byteArrayOf())
// when
val result = cut.startSimulation(emptyInputStream)
diff --git a/pom.xml b/pom.xml
index 12fad22..f54e4c1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -32,13 +32,13 @@
<parent>
<groupId>org.onap.oparent</groupId>
<artifactId>oparent</artifactId>
- <version>1.2.0</version>
+ <version>1.2.1</version>
<relativePath/>
</parent>
<groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
<artifactId>ves-hv-collector</artifactId>
- <version>4.0.0-SNAPSHOT</version>
+ <version>1.1.0-SNAPSHOT</version>
<name>dcaegen2-collectors-veshv</name>
<description>VES HighVolume Collector</description>
<packaging>pom</packaging>
@@ -343,7 +343,7 @@
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>hv-collector-analysis</artifactId>
- <version>4.0.0-SNAPSHOT</version>
+ <version>1.1.0-SNAPSHOT</version>
</dependency>
</dependencies>
</plugin>
@@ -586,7 +586,7 @@
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bom</artifactId>
<!-- remember to update netty native bindings versions -->
- <version>Bismuth-SR11</version>
+ <version>Californium-SR2</version>
<type>pom</type>
<scope>import</scope>
</dependency>
@@ -674,21 +674,9 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>com.nhaarman</groupId>
+ <groupId>com.nhaarman.mockitokotlin2</groupId>
<artifactId>mockito-kotlin</artifactId>
- <version>1.5.0</version>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-core</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-core</artifactId>
- <version>2.18.3</version>
+ <version>2.0.0</version>
<scope>test</scope>
</dependency>
<dependency>
diff --git a/version.properties b/version.properties
index 967829d..7b8b963 100644
--- a/version.properties
+++ b/version.properties
@@ -1,5 +1,5 @@
-major=4
-minor=0
+major=1
+minor=1
patch=0
base_version=${major}.${minor}.${patch}
release_version=${base_version}