Merge "Remove Ratpack dependency for HV-VES health checks"
diff --git a/docker-compose.yml b/docker-compose.yml
index 7bd84f5..33aedec 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -1,6 +1,5 @@
-version: "2"
+version: "3.4"
services:
-
zookeeper:
image: wurstmeister/zookeeper
ports:
@@ -29,14 +28,22 @@
command: ["-server", "-bootstrap", "-ui-dir", "/ui"]
ves-hv-collector:
- image: onap/ves-hv-collector
+ image: nexus3.onap.org:10003/onap/ves-hv-collector:latest
# build:
# context: hv-collector-main
# dockerfile: Dockerfile
ports:
- "6060:6060"
- "6061:6061/tcp"
- command: ["--listen-port", "6061","--config-url", "http://consul:8500/v1/kv/veshv-config"]
+ command: ["--listen-port", "6061",
+ "--health-check-api-port", "6060",
+ "--config-url", "http://consul:8500/v1/kv/veshv-config"]
+ healthcheck:
+ test: curl -f http://localhost:6060/health/ready || exit 1
+ interval: 10s
+ timeout: 3s
+ retries: 3
+ start_period: 20s
depends_on:
- kafka
- consul
@@ -44,7 +51,7 @@
- ./ssl/:/etc/ves-hv/
xnf-simulator:
- image: onap/ves-hv-collector-xnf-simulator
+ image: nexus3.onap.org:10003/onap/ves-hv-collector-xnf-simulator
# build:
# context: hv-collector-xnf-simulator
# dockerfile: Dockerfile
@@ -57,7 +64,7 @@
- ./ssl/:/etc/ves-hv/
dcae-app-simulator:
- image: onap/ves-hv-collector-dcae-simulator
+ image: nexus3.onap.org:10003/onap/ves-hv-collector-dcae-simulator
# build:
# context: hv-collector-dcae-app-simulator
# dockerfile: Dockerfile
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
index ff99717..6c256b7 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
@@ -22,6 +22,7 @@
import arrow.effects.IO
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
+import org.onap.dcae.collectors.veshv.utils.ServerHandle
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
@@ -34,8 +35,3 @@
interface Server {
fun start(): IO<ServerHandle>
}
-
-abstract class ServerHandle(val host: String, val port: Int) {
- abstract fun shutdown(): IO<Unit>
- abstract fun await(): IO<Unit>
-}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
index 3e652b9..a400ff3 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
@@ -25,8 +25,8 @@
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.impl.Router
import org.onap.dcae.collectors.veshv.impl.VesDecoder
import org.onap.dcae.collectors.veshv.impl.VesHvCollector
@@ -42,7 +42,7 @@
class CollectorFactory(val configuration: ConfigurationProvider,
private val sinkProvider: SinkProvider,
private val metrics: Metrics,
- private val healthStateProvider: HealthStateProvider = HealthStateProvider.INSTANCE) {
+ private val healthState: HealthState = HealthState.INSTANCE) {
fun createVesHvCollectorProvider(): CollectorProvider {
val collector: AtomicReference<Collector> = AtomicReference()
@@ -50,11 +50,11 @@
.map(this::createVesHvCollector)
.doOnNext {
logger.info("Using updated configuration for new connections")
- healthStateProvider.changeState(HealthState.HEALTHY)
+ healthState.changeState(HealthDescription.HEALTHY)
}
.doOnError {
logger.error("Failed to acquire configuration from consul")
- healthStateProvider.changeState(HealthState.CONSUL_CONFIGURATION_NOT_FOUND)
+ healthState.changeState(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND)
}
.subscribe(collector::set)
return collector::get
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
index 8146303..7de2830 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
@@ -20,8 +20,8 @@
package org.onap.dcae.collectors.veshv.impl.adapters
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
import org.onap.dcae.collectors.veshv.utils.logging.Logger
@@ -46,7 +46,7 @@
private val url: String,
private val firstRequestDelay: Duration,
private val requestInterval: Duration,
- private val healthStateProvider: HealthStateProvider,
+ private val healthState: HealthState,
retrySpec: Retry<Any>
) : ConfigurationProvider {
@@ -55,7 +55,7 @@
private val retry = retrySpec
.doOnRetry {
logger.warn("Could not get fresh configuration", it.exception())
- healthStateProvider.changeState(HealthState.WAITING_FOR_CONSUL_CONFIGURATION)
+ healthState.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
}
constructor(http: HttpAdapter,
@@ -64,7 +64,7 @@
params.configurationUrl,
params.firstRequestDelay,
params.requestInterval,
- HealthStateProvider.INSTANCE,
+ HealthState.INSTANCE,
Retry.any<Any>()
.retryMax(MAX_RETRIES)
.fixedBackoff(params.requestInterval.dividedBy(BACKOFF_INTERVAL_FACTOR))
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
index c28b151..f858d95 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
@@ -24,15 +24,15 @@
import io.netty.handler.ssl.SslContext
import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
import org.onap.dcae.collectors.veshv.boundary.Server
-import org.onap.dcae.collectors.veshv.boundary.ServerHandle
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
+import org.onap.dcae.collectors.veshv.utils.ServerHandle
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.reactivestreams.Publisher
import reactor.core.publisher.Mono
import reactor.ipc.netty.NettyInbound
import reactor.ipc.netty.NettyOutbound
import reactor.ipc.netty.options.ServerOptions
-import reactor.ipc.netty.tcp.BlockingNettyContext
import reactor.ipc.netty.tcp.TcpServer
import java.time.Duration
import java.util.function.BiFunction
@@ -94,16 +94,6 @@
return this
}
- private class NettyServerHandle(val ctx: BlockingNettyContext) : ServerHandle(ctx.host, ctx.port) {
- override fun shutdown() = IO {
- ctx.shutdown()
- }
-
- override fun await() = IO<Unit> {
- ctx.context.channel().closeFuture().sync()
- }
- }
-
companion object {
private val logger = Logger(NettyTcpServer::class)
}
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
index f9a9ba6..7885892 100644
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
@@ -28,8 +28,8 @@
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import org.mockito.Mockito
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
import reactor.core.publisher.Mono
import reactor.retry.Retry
@@ -47,7 +47,7 @@
describe("Consul configuration provider") {
val httpAdapterMock: HttpAdapter = mock()
- val healthStateProvider = HealthStateProvider.INSTANCE
+ val healthStateProvider = HealthState.INSTANCE
given("valid resource url") {
val validUrl = "http://valid-url/"
@@ -98,7 +98,7 @@
it("should update the health state"){
StepVerifier.create(healthStateProvider().take(iterationCount))
.expectNextCount(iterationCount - 1)
- .expectNext(HealthState.WAITING_FOR_CONSUL_CONFIGURATION)
+ .expectNext(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
.verifyComplete()
}
}
@@ -109,7 +109,7 @@
private fun constructConsulConfigProvider(url: String,
httpAdapter: HttpAdapter,
- healthStateProvider: HealthStateProvider,
+ healthState: HealthState,
iterationCount: Long = 1
): ConsulConfigurationProvider {
@@ -122,7 +122,7 @@
url,
firstRequestDelay,
requestInterval,
- healthStateProvider,
+ healthState,
retry
)
}
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
index e7b7770..e9b7057 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
@@ -28,7 +28,7 @@
import org.onap.dcae.collectors.veshv.factory.CollectorFactory
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider
-import org.onap.dcae.collectors.veshv.tests.fakes.FakeHealthStateProvider
+import org.onap.dcae.collectors.veshv.tests.fakes.FakeHealthState
import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics
import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
import reactor.core.publisher.Flux
@@ -40,7 +40,7 @@
*/
class Sut(sink: Sink = StoringSink()) {
val configurationProvider = FakeConfigurationProvider()
- val healthStateProvider = FakeHealthStateProvider()
+ val healthStateProvider = FakeHealthState()
val alloc: ByteBufAllocator = UnpooledByteBufAllocator.DEFAULT
private val metrics = FakeMetrics()
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
index 493517b..a9f3e9a 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
@@ -24,7 +24,7 @@
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_HVRANMEAS_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.HVRANMEAS_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
@@ -351,7 +351,7 @@
it("should mark the application healthy") {
assertThat(sut.healthStateProvider.currentHealth)
.describedAs("application health state")
- .isEqualTo(HealthState.HEALTHY)
+ .isEqualTo(HealthDescription.HEALTHY)
}
}
@@ -363,7 +363,7 @@
it("should mark the application unhealthy ") {
assertThat(sut.healthStateProvider.currentHealth)
.describedAs("application health state")
- .isEqualTo(HealthState.CONSUL_CONFIGURATION_NOT_FOUND)
+ .isEqualTo(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND)
}
}
}
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthStateProvider.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthState.kt
similarity index 77%
rename from hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthStateProvider.kt
rename to hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthState.kt
index 230a728..c25771b 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthStateProvider.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthState.kt
@@ -19,19 +19,19 @@
*/
package org.onap.dcae.collectors.veshv.tests.fakes
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider
import reactor.core.publisher.Flux
-class FakeHealthStateProvider : HealthStateProvider {
+class FakeHealthState : HealthState {
- lateinit var currentHealth: HealthState
+ lateinit var currentHealth: HealthDescription
- override fun changeState(healthState: HealthState) {
- currentHealth = healthState
+ override fun changeState(healthDescription: HealthDescription) {
+ currentHealth = healthDescription
}
- override fun invoke(): Flux<HealthState> {
+ override fun invoke(): Flux<HealthDescription> {
throw NotImplementedError()
}
}
diff --git a/hv-collector-health-check/pom.xml b/hv-collector-health-check/pom.xml
index 1e77adb..0951587 100644
--- a/hv-collector-health-check/pom.xml
+++ b/hv-collector-health-check/pom.xml
@@ -50,8 +50,8 @@
<artifactId>kotlin-stdlib-jdk8</artifactId>
</dependency>
<dependency>
- <groupId>io.ratpack</groupId>
- <artifactId>ratpack-core</artifactId>
+ <groupId>io.projectreactor.ipc</groupId>
+ <artifactId>reactor-netty</artifactId>
</dependency>
<dependency>
<groupId>io.arrow-kt</groupId>
diff --git a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthCheckApiServer.kt b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthCheckApiServer.kt
deleted file mode 100644
index b21d187..0000000
--- a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthCheckApiServer.kt
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.healthcheck.api
-
-import arrow.effects.IO
-import ratpack.handling.Chain
-import ratpack.server.RatpackServer
-import ratpack.server.ServerConfig
-import java.util.concurrent.atomic.AtomicReference
-
-/**
- * @author Jakub Dudycz <jakub.dudycz@nokia.com>
- * @since August 2018
- */
-class HealthCheckApiServer(private val healthStateProvider: HealthStateProvider) {
-
- private val healthState: AtomicReference<HealthState> = AtomicReference(HealthState.STARTING)
-
- fun start(port: Int): IO<RatpackServer> = IO {
- healthStateProvider().subscribe(healthState::set)
- RatpackServer
- .start {
- it
- .serverConfig(ServerConfig.embedded().port(port).development(false))
- .handlers(this::configureHandlers)
- }
- }
-
- private fun configureHandlers(chain: Chain) {
- chain
- .get("healthcheck") { ctx ->
- healthState.get().run {
- ctx.response.status(responseCode).send(message)
- }
- }
- }
-}
\ No newline at end of file
diff --git a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStateProvider.kt b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthDescription.kt
similarity index 73%
rename from hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStateProvider.kt
rename to hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthDescription.kt
index 5cc09cc..8c69406 100644
--- a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStateProvider.kt
+++ b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthDescription.kt
@@ -19,21 +19,14 @@
*/
package org.onap.dcae.collectors.veshv.healthcheck.api
-import org.onap.dcae.collectors.veshv.healthcheck.impl.HealthStateProviderImpl
-import reactor.core.publisher.Flux
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since August 2018
*/
-interface HealthStateProvider {
-
- operator fun invoke(): Flux<HealthState>
- fun changeState(healthState: HealthState)
-
- companion object {
- val INSTANCE: HealthStateProvider by lazy {
- HealthStateProviderImpl()
- }
- }
+enum class HealthDescription(val message: String, val status: HealthStatus) {
+ HEALTHY("Healthy", HealthStatus.UP),
+ STARTING("Collector is starting", HealthStatus.OUT_OF_SERVICE),
+ RETRYING_FOR_CONSUL_CONFIGURATION("Consul configuration not available. Retrying.", HealthStatus.OUT_OF_SERVICE),
+ CONSUL_CONFIGURATION_NOT_FOUND("Consul configuration not found", HealthStatus.DOWN)
}
diff --git a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthState.kt b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthState.kt
index 3dddf1e..853cc00 100644
--- a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthState.kt
+++ b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthState.kt
@@ -19,16 +19,21 @@
*/
package org.onap.dcae.collectors.veshv.healthcheck.api
-import org.onap.dcae.collectors.veshv.utils.http.Status.Companion.OK
-import org.onap.dcae.collectors.veshv.utils.http.Status.Companion.SERVICE_UNAVAILABLE
+import org.onap.dcae.collectors.veshv.healthcheck.impl.HealthStateImpl
+import reactor.core.publisher.Flux
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since August 2018
*/
-enum class HealthState(val message: String, val responseCode: Int) {
- HEALTHY("Healthy", OK),
- STARTING("Collector is starting", SERVICE_UNAVAILABLE),
- WAITING_FOR_CONSUL_CONFIGURATION("Waiting for consul configuration", SERVICE_UNAVAILABLE),
- CONSUL_CONFIGURATION_NOT_FOUND("Consul configuration not found", SERVICE_UNAVAILABLE)
+interface HealthState {
+
+ operator fun invoke(): Flux<HealthDescription>
+ fun changeState(healthDescription: HealthDescription)
+
+ companion object {
+ val INSTANCE: HealthState by lazy {
+ HealthStateImpl()
+ }
+ }
}
diff --git a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStateProvider.kt b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStatus.kt
similarity index 71%
copy from hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStateProvider.kt
copy to hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStatus.kt
index 5cc09cc..79fc932 100644
--- a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStateProvider.kt
+++ b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStatus.kt
@@ -19,21 +19,16 @@
*/
package org.onap.dcae.collectors.veshv.healthcheck.api
-import org.onap.dcae.collectors.veshv.healthcheck.impl.HealthStateProviderImpl
-import reactor.core.publisher.Flux
+import org.onap.dcae.collectors.veshv.utils.http.Status.Companion.OK
+import org.onap.dcae.collectors.veshv.utils.http.Status.Companion.SERVICE_UNAVAILABLE
/**
- * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since August 2018
*/
-interface HealthStateProvider {
-
- operator fun invoke(): Flux<HealthState>
- fun changeState(healthState: HealthState)
-
- companion object {
- val INSTANCE: HealthStateProvider by lazy {
- HealthStateProviderImpl()
- }
- }
+enum class HealthStatus(val httpResponseStatus: Int) {
+ UP(OK),
+ DOWN(SERVICE_UNAVAILABLE),
+ OUT_OF_SERVICE(SERVICE_UNAVAILABLE),
+ UNKNOWN(SERVICE_UNAVAILABLE)
}
diff --git a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt
new file mode 100644
index 0000000..7e9efac
--- /dev/null
+++ b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt
@@ -0,0 +1,60 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.healthcheck.factory
+
+import arrow.effects.IO
+import io.netty.handler.codec.http.HttpResponseStatus
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
+import org.onap.dcae.collectors.veshv.utils.ServerHandle
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
+import reactor.ipc.netty.http.server.HttpServer
+import reactor.ipc.netty.http.server.HttpServerRequest
+import reactor.ipc.netty.http.server.HttpServerResponse
+import java.util.concurrent.atomic.AtomicReference
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since August 2018
+ */
+class HealthCheckApiServer(private val healthState: HealthState, private val port: Int) {
+
+ private val healthDescription: AtomicReference<HealthDescription> = AtomicReference(HealthDescription.STARTING)
+
+ fun start(): IO<ServerHandle> = IO {
+ healthState().subscribe(healthDescription::set)
+ val ctx = HttpServer.create(port).startRouter { routes ->
+ routes.get("/health/ready", ::readinessHandler)
+ routes.get("/health/alive", ::livenessHandler)
+ }
+ NettyServerHandle(ctx)
+ }
+
+ private fun readinessHandler(req: HttpServerRequest, resp: HttpServerResponse) =
+ healthDescription.get().run {
+ resp.status(status.httpResponseStatus).sendString(Flux.just(status.toString(), "\n", message))
+ }
+
+ private fun livenessHandler(req: HttpServerRequest, resp: HttpServerResponse) =
+ resp.status(HttpResponseStatus.NOT_IMPLEMENTED).sendString(Mono.just("Not implemented yet"))
+
+}
diff --git a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateProviderImpl.kt b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateImpl.kt
similarity index 74%
rename from hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateProviderImpl.kt
rename to hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateImpl.kt
index 5056d2d..c273f0a 100644
--- a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateProviderImpl.kt
+++ b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateImpl.kt
@@ -19,8 +19,8 @@
*/
package org.onap.dcae.collectors.veshv.healthcheck.impl
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import reactor.core.publisher.Flux
import reactor.core.publisher.FluxProcessor
import reactor.core.publisher.UnicastProcessor
@@ -29,11 +29,11 @@
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since August 2018
*/
-internal class HealthStateProviderImpl : HealthStateProvider {
+internal class HealthStateImpl : HealthState {
- private val healthStateStream: FluxProcessor<HealthState, HealthState> = UnicastProcessor.create()
+ private val healthDescriptionStream: FluxProcessor<HealthDescription, HealthDescription> = UnicastProcessor.create()
- override fun invoke(): Flux<HealthState> = healthStateStream
+ override fun invoke(): Flux<HealthDescription> = healthDescriptionStream
- override fun changeState(healthState: HealthState) = healthStateStream.onNext(healthState)
+ override fun changeState(healthDescription: HealthDescription) = healthDescriptionStream.onNext(healthDescription)
}
diff --git a/hv-collector-health-check/src/test/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateProviderImplTest.kt b/hv-collector-health-check/src/test/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateProviderImplTest.kt
index e9c487b..e3fced2 100644
--- a/hv-collector-health-check/src/test/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateProviderImplTest.kt
+++ b/hv-collector-health-check/src/test/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateProviderImplTest.kt
@@ -21,10 +21,9 @@
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
-import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import reactor.test.StepVerifier
/**
@@ -33,20 +32,20 @@
*/
object HealthStateProviderImplTest : Spek({
describe("Health state provider") {
- val healthStateProviderImpl = HealthStateProviderImpl()
+ val healthStateProviderImpl = HealthStateImpl()
on("health state update") {
- healthStateProviderImpl.changeState(HealthState.HEALTHY)
- healthStateProviderImpl.changeState(HealthState.WAITING_FOR_CONSUL_CONFIGURATION)
- healthStateProviderImpl.changeState(HealthState.WAITING_FOR_CONSUL_CONFIGURATION)
- healthStateProviderImpl.changeState(HealthState.CONSUL_CONFIGURATION_NOT_FOUND)
+ healthStateProviderImpl.changeState(HealthDescription.HEALTHY)
+ healthStateProviderImpl.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
+ healthStateProviderImpl.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
+ healthStateProviderImpl.changeState(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND)
it("should push new health state to the subscriber") {
StepVerifier
.create(healthStateProviderImpl().take(4))
- .expectNext(HealthState.HEALTHY)
- .expectNext(HealthState.WAITING_FOR_CONSUL_CONFIGURATION)
- .expectNext(HealthState.WAITING_FOR_CONSUL_CONFIGURATION)
- .expectNext(HealthState.CONSUL_CONFIGURATION_NOT_FOUND)
+ .expectNext(HealthDescription.HEALTHY)
+ .expectNext(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
+ .expectNext(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
+ .expectNext(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND)
.verifyComplete()
}
}
diff --git a/hv-collector-main/pom.xml b/hv-collector-main/pom.xml
index 0e95628..af64ced 100644
--- a/hv-collector-main/pom.xml
+++ b/hv-collector-main/pom.xml
@@ -99,10 +99,6 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>io.ratpack</groupId>
- <artifactId>ratpack-core</artifactId>
- </dependency>
- <dependency>
<groupId>io.arrow-kt</groupId>
<artifactId>arrow-core</artifactId>
</dependency>
diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
index dc92228..a84a39a 100644
--- a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
+++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
@@ -19,13 +19,12 @@
*/
package org.onap.dcae.collectors.veshv.main
-import org.onap.dcae.collectors.veshv.boundary.Server
-import org.onap.dcae.collectors.veshv.boundary.ServerHandle
-import org.onap.dcae.collectors.veshv.factory.CollectorFactory
-import org.onap.dcae.collectors.veshv.factory.ServerFactory
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthCheckApiServer
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider
-import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory
+import arrow.effects.IO
+import arrow.effects.fix
+import arrow.effects.monad
+import arrow.typeclasses.binding
+import org.onap.dcae.collectors.veshv.main.servers.HealthCheckServer
+import org.onap.dcae.collectors.veshv.main.servers.VesServer
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
@@ -38,13 +37,7 @@
fun main(args: Array<String>) =
ArgVesHvConfiguration().parse(args)
.mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME))
- .map(::startHealthCheckApiServer)
- .map(::createServer)
- .map {
- it.start()
- .map(::logServerStarted)
- .flatMap(ServerHandle::await)
- }
+ .map(::startAndAwaitServers)
.unsafeRunEitherSync(
{ ex ->
logger.error("Failed to start a server", ex)
@@ -53,24 +46,9 @@
{ logger.info("Gentle shutdown") }
)
-private fun createServer(config: ServerConfiguration): Server {
- val sink = if (config.dummyMode) AdapterFactory.loggingSink() else AdapterFactory.kafkaSink()
- val collectorProvider = CollectorFactory(
- AdapterFactory.consulConfigurationProvider(config.configurationProviderParams),
- sink,
- MicrometerMetrics()
- ).createVesHvCollectorProvider()
-
- return ServerFactory.createNettyTcpServer(config, collectorProvider)
-}
-
-private fun logServerStarted(handle: ServerHandle): ServerHandle = handle.also {
- logger.info("HighVolume VES Collector is up and listening on ${it.host}:${it.port}")
-}
-
-private fun startHealthCheckApiServer(config: ServerConfiguration): ServerConfiguration = config.apply {
- HealthCheckApiServer(HealthStateProvider.INSTANCE)
- .start(healthCheckApiPort)
- .unsafeRunSync()
- .also { logger.info("Health check api server started on port ${it.bindPort}") }
-}
+private fun startAndAwaitServers(config: ServerConfiguration) =
+ IO.monad().binding {
+ HealthCheckServer.start(config).bind()
+ VesServer.start(config).bind()
+ .await().bind()
+ }.fix()
diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt
new file mode 100644
index 0000000..04fc021
--- /dev/null
+++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt
@@ -0,0 +1,39 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.main.servers
+
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import org.onap.dcae.collectors.veshv.healthcheck.factory.HealthCheckApiServer
+import org.onap.dcae.collectors.veshv.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.utils.ServerHandle
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since August 2018
+ */
+object HealthCheckServer : ServerStarter() {
+ override fun startServer(config: ServerConfiguration) = createHealthCheckServer(config).start()
+
+ private fun createHealthCheckServer(config: ServerConfiguration) =
+ HealthCheckApiServer(HealthState.INSTANCE, config.healthCheckApiPort)
+
+ override fun serverStartedMessage(handle: ServerHandle) =
+ "Health check server is up and listening on ${handle.host}:${handle.port}"
+}
diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt
new file mode 100644
index 0000000..5c6f127
--- /dev/null
+++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt
@@ -0,0 +1,42 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.main.servers
+
+import arrow.effects.IO
+import org.onap.dcae.collectors.veshv.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.utils.ServerHandle
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since August 2018
+ */
+abstract class ServerStarter {
+ fun start(config: ServerConfiguration): IO<ServerHandle> =
+ startServer(config)
+ .map { logger.info(serverStartedMessage(it)); it }
+
+ protected abstract fun startServer(config: ServerConfiguration): IO<ServerHandle>
+ protected abstract fun serverStartedMessage(handle: ServerHandle): String
+
+ companion object {
+ private val logger = Logger(ServerStarter::class)
+ }
+}
diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
new file mode 100644
index 0000000..fbf8936
--- /dev/null
+++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
@@ -0,0 +1,51 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.main.servers
+
+import arrow.effects.IO
+import org.onap.dcae.collectors.veshv.boundary.Server
+import org.onap.dcae.collectors.veshv.factory.CollectorFactory
+import org.onap.dcae.collectors.veshv.factory.ServerFactory
+import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory
+import org.onap.dcae.collectors.veshv.main.MicrometerMetrics
+import org.onap.dcae.collectors.veshv.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.utils.ServerHandle
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since August 2018
+ */
+object VesServer : ServerStarter() {
+ override fun startServer(config: ServerConfiguration): IO<ServerHandle> = createVesServer(config).start()
+
+ private fun createVesServer(config: ServerConfiguration): Server {
+ val sink = if (config.dummyMode) AdapterFactory.loggingSink() else AdapterFactory.kafkaSink()
+ val collectorProvider = CollectorFactory(
+ AdapterFactory.consulConfigurationProvider(config.configurationProviderParams),
+ sink,
+ MicrometerMetrics()
+ ).createVesHvCollectorProvider()
+
+ return ServerFactory.createNettyTcpServer(config, collectorProvider)
+ }
+
+ override fun serverStartedMessage(handle: ServerHandle) =
+ "HighVolume VES Collector is up and listening on ${handle.host}:${handle.port}"
+}
\ No newline at end of file
diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/Status.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/Status.kt
index d20ffac..081dd0d 100644
--- a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/Status.kt
+++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/Status.kt
@@ -23,7 +23,7 @@
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since August 2018
*/
-class Status{
+class Status {
companion object {
const val OK = 200
const val SERVICE_UNAVAILABLE = 503
diff --git a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStateProvider.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt
similarity index 60%
copy from hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStateProvider.kt
copy to hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt
index 5cc09cc..bb924f2 100644
--- a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStateProvider.kt
+++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt
@@ -17,23 +17,30 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.healthcheck.api
+package org.onap.dcae.collectors.veshv.utils
-import org.onap.dcae.collectors.veshv.healthcheck.impl.HealthStateProviderImpl
-import reactor.core.publisher.Flux
+import arrow.effects.IO
+import reactor.ipc.netty.tcp.BlockingNettyContext
/**
- * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since August 2018
*/
-interface HealthStateProvider {
+abstract class ServerHandle(val host: String, val port: Int) {
+ abstract fun shutdown(): IO<Unit>
+ abstract fun await(): IO<Unit>
+}
- operator fun invoke(): Flux<HealthState>
- fun changeState(healthState: HealthState)
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since August 2018
+ */
+class NettyServerHandle(private val ctx: BlockingNettyContext) : ServerHandle(ctx.host, ctx.port) {
+ override fun shutdown() = IO {
+ ctx.shutdown()
+ }
- companion object {
- val INSTANCE: HealthStateProvider by lazy {
- HealthStateProviderImpl()
- }
+ override fun await() = IO<Unit> {
+ ctx.context.channel().closeFuture().sync()
}
}
diff --git a/pom.xml b/pom.xml
index 127e4bd..7656be3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -88,6 +88,7 @@
<docker-image.namespace>onap</docker-image.namespace>
<docker-image.name>${project.groupId}.${project.artifactId}</docker-image.name>
<docker.http_proxy></docker.http_proxy>
+
</properties>
<build>
@@ -121,6 +122,7 @@
<version>${kotlin.version}</version>
<configuration>
<jvmTarget>1.8</jvmTarget>
+ <experimentalCoroutines>enable</experimentalCoroutines>
</configuration>
<executions>
<execution>