Fix NPE when getting Consul configuration

No initial value for AtomicReference was provided hence we had a little
race condition.

Retry when consul returns error.

Change-Id: Ie38ca7fbf445123e98ee94703eba501bb5233fab
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Issue-ID: DCAEGEN2-601
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
index b52f959..7ce49a8 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
@@ -28,9 +28,12 @@
 import org.onap.dcae.collectors.veshv.impl.Router
 import org.onap.dcae.collectors.veshv.impl.VesDecoder
 import org.onap.dcae.collectors.veshv.impl.VesHvCollector
+import org.onap.dcae.collectors.veshv.impl.adapters.ConsulConfigurationProvider
 import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
 import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
-import reactor.core.publisher.Flux
+import org.onap.dcae.collectors.veshv.model.routing
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.ves.VesEventV5
 import java.util.concurrent.atomic.AtomicReference
 
 /**
@@ -42,15 +45,32 @@
                        private val metrics: Metrics) {
 
     fun createVesHvCollectorProvider(): CollectorProvider {
-        val collector: AtomicReference<Collector> = AtomicReference()
-        createVesHvCollector().subscribe(collector::set)
+        val initialValue = createVesHvCollector(defaultConfiguration())
+        val collector: AtomicReference<Collector> = AtomicReference(initialValue)
+        configuration()
+                .map(this::createVesHvCollector)
+                .doOnNext { logger.info("Using updated configuration for new connections") }
+                .doOnError {
+                    logger.error("Shutting down", it)
+                    // TODO: create Health class
+                    // It should monitor all incidents and expose the results for the
+                    // container health check mechanism
+                    System.exit(ERROR_CODE)
+                }
+                .subscribe(collector::set)
         return collector::get
     }
 
-    private fun createVesHvCollector(): Flux<Collector> =
-            configuration()
-                    .doOnError { System.exit(ERROR_CODE) }
-                    .map(this::createVesHvCollector)
+    private fun defaultConfiguration() =
+            CollectorConfiguration(
+                    kafkaBootstrapServers = "kafka:9092",
+                    routing = routing {
+                        defineRoute {
+                            fromDomain(VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS)
+                            toTopic("ves_hvRanMeas")
+                            withFixedPartitioning()
+                        }
+                    }.build())
 
     private fun createVesHvCollector(config: CollectorConfiguration): Collector {
         return VesHvCollector(
@@ -62,7 +82,8 @@
     }
 
     companion object {
-        const val ERROR_CODE = 3
+        private const val ERROR_CODE = 3
+        private val logger = Logger(CollectorFactory::class)
     }
 }
 
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
index 11a0e9b..7248db6 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
@@ -36,11 +36,8 @@
 
     fun consulConfigurationProvider(configurationProviderParams: ConfigurationProviderParams): ConfigurationProvider =
             ConsulConfigurationProvider(
-                    configurationProviderParams.configurationUrl,
                     httpAdapter(),
-                    configurationProviderParams.firstRequestDelay,
-                    configurationProviderParams.requestInterval
-            )
+                    configurationProviderParams)
 
     fun httpAdapter(): HttpAdapter = HttpAdapter(HttpClient.create())
 }
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
index aca0e7e..6f04c95 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
@@ -21,12 +21,14 @@
 
 import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
 import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
-import org.onap.dcae.collectors.veshv.model.routing
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS
+import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.forNumber
 import org.slf4j.LoggerFactory
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
+import reactor.retry.Jitter
+import reactor.retry.Retry
 import java.io.StringReader
 import java.time.Duration
 import java.util.*
@@ -39,41 +41,40 @@
  * @author Jakub Dudycz <jakub.dudycz@nokia.com>
  * @since May 2018
  */
-internal class ConsulConfigurationProvider(private val url: String,
-                                           private val http: HttpAdapter,
+internal class ConsulConfigurationProvider(private val http: HttpAdapter,
+                                           private val url: String,
                                            private val firstRequestDelay: Duration,
-                                           private val requestInterval: Duration
+                                           private val requestInterval: Duration,
+                                           retrySpec: Retry<Any>
 ) : ConfigurationProvider {
 
     private val lastConfigurationHash: AtomicReference<Int> = AtomicReference(0)
+    private val retry = retrySpec
+            .doOnRetry {
+                logger.warn("Could not get fresh configuration", it.exception())
+            }
+
+    constructor(http: HttpAdapter, params: ConfigurationProviderParams) : this(
+            http,
+            params.configurationUrl,
+            params.firstRequestDelay,
+            params.requestInterval,
+            Retry.any<Any>()
+                    .retryMax(MAX_RETRIES)
+                    .fixedBackoff(params.requestInterval.dividedBy(BACKOFF_INTERVAL_FACTOR))
+                    .jitter(Jitter.random()))
 
     override fun invoke(): Flux<CollectorConfiguration> =
-            Flux.concat(createDefaultConfigurationFlux(), createConsulFlux())
+            Flux.interval(firstRequestDelay, requestInterval)
+                    .flatMap { askForConfig() }
+                    .map(::parseJsonResponse)
+                    .map(::extractEncodedConfiguration)
+                    .flatMap(::filterDifferentValues)
+                    .map(::decodeConfiguration)
+                    .map(::createCollectorConfiguration)
+                    .retryWhen(retry)
 
-    private fun createDefaultConfigurationFlux(): Mono<CollectorConfiguration> = Mono.just(
-            CollectorConfiguration(
-                    kafkaBootstrapServers = "kafka:9092",
-                    routing = routing {
-                        defineRoute {
-                            fromDomain(HVRANMEAS)
-                            toTopic("ves_hvRanMeas")
-                            withFixedPartitioning()
-                        }
-                    }.build())
-    ).doOnNext { logger.info("Applied default configuration") }
-
-    private fun createConsulFlux(): Flux<CollectorConfiguration> = Flux
-            .interval(firstRequestDelay, requestInterval)
-            .flatMap { http.get(url) }
-            .doOnError {
-                logger.error("Encountered an error " +
-                        "when trying to acquire configuration from consul. Shutting down..")
-            }
-            .map(::parseJsonResponse)
-            .map(::extractEncodedConfiguration)
-            .flatMap(::filterDifferentValues)
-            .map(::decodeConfiguration)
-            .map(::createCollectorConfiguration)
+    private fun askForConfig(): Mono<String> = http.get(url)
 
     private fun parseJsonResponse(responseString: String): JsonObject =
             Json.createReader(StringReader(responseString)).readArray().first().asJsonObject()
@@ -118,7 +119,9 @@
     }
 
     companion object {
-        private val logger = LoggerFactory.getLogger(ConsulConfigurationProvider::class.java)
+        private const val MAX_RETRIES = 5
+        private const val BACKOFF_INTERVAL_FACTOR = 30L
+        private val logger = Logger(ConsulConfigurationProvider::class)
     }
 }
 
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
index f4c527a..1626c02 100644
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
@@ -28,6 +28,7 @@
 import org.mockito.Mockito
 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
 import reactor.core.publisher.Mono
+import reactor.retry.Retry
 import reactor.test.StepVerifier
 import java.time.Duration
 import java.util.*
@@ -42,27 +43,24 @@
     val httpAdapterMock: HttpAdapter = mock()
     val firstRequestDelay = Duration.ofMillis(1)
     val requestInterval = Duration.ofMillis(1)
+    val retry = Retry.onlyIf<Any> { it.iteration() < 2 }.fixedBackoff(Duration.ofNanos(1))
 
     given("valid resource url") {
 
         val validUrl = "http://valid-url/"
-        val consulConfigProvider = ConsulConfigurationProvider(validUrl, httpAdapterMock, firstRequestDelay, requestInterval)
+        val consulConfigProvider = ConsulConfigurationProvider(
+                httpAdapterMock,
+                validUrl,
+                firstRequestDelay,
+                requestInterval,
+                retry)
 
         whenever(httpAdapterMock.get(eq(validUrl), Mockito.anyMap()))
                 .thenReturn(Mono.just(constructConsulResponse()))
 
-        it("should use default configuration at the beginning, " +
-                "then apply received configuration") {
+        it("should use received configuration") {
 
-            StepVerifier.create(consulConfigProvider().take(2))
-                    .consumeNextWith {
-
-                        assertEquals("kafka:9092", it.kafkaBootstrapServers)
-
-                        val route1 = it.routing.routes[0]
-                        assertEquals(Domain.HVRANMEAS, route1.domain)
-                        assertEquals("ves_hvRanMeas", route1.targetTopic)
-                    }
+            StepVerifier.create(consulConfigProvider().take(1))
                     .consumeNextWith {
 
                         assertEquals("kafka:9093", it.kafkaBootstrapServers)
@@ -81,23 +79,19 @@
     given("invalid resource url") {
 
         val invalidUrl = "http://invalid-url/"
-        val consulConfigProvider = ConsulConfigurationProvider(invalidUrl, httpAdapterMock, firstRequestDelay, requestInterval)
+        val consulConfigProvider = ConsulConfigurationProvider(
+                httpAdapterMock,
+                invalidUrl,
+                firstRequestDelay,
+                requestInterval,
+                retry)
 
         whenever(httpAdapterMock.get(eq(invalidUrl), Mockito.anyMap()))
                 .thenReturn(Mono.error(RuntimeException("Test exception")))
 
-        it("should use default configuration at the beginning, then should interrupt the flux") {
+        it("should interrupt the flux") {
 
             StepVerifier.create(consulConfigProvider())
-                    .consumeNextWith {
-
-
-                        assertEquals("kafka:9092", it.kafkaBootstrapServers)
-
-                        val route1 = it.routing.routes[0]
-                        assertEquals(Domain.HVRANMEAS, route1.domain)
-                        assertEquals("ves_hvRanMeas", route1.targetTopic)
-                    }
                     .verifyErrorMessage("Test exception")
         }
     }
@@ -106,18 +100,18 @@
 fun constructConsulResponse(): String {
 
     val config = """{
-	"kafkaBootstrapServers": "kafka:9093",
-	"routing": [
-		    {
-		    	"fromDomain": 1,
-		    	"toTopic": "test-topic-1"
-		    },
-		    {
-		    	"fromDomain": 2,
-		    	"toTopic": "test-topic-2"
-	    	}
+    "kafkaBootstrapServers": "kafka:9093",
+    "routing": [
+            {
+                "fromDomain": 1,
+                "toTopic": "test-topic-1"
+            },
+            {
+                "fromDomain": 2,
+                "toTopic": "test-topic-2"
+            }
     ]
-    }"""
+}"""
 
     val encodedValue = String(Base64.getEncoder().encode(config.toByteArray()))
 
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt
index cfc44bc..aa3fdc3 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt
@@ -34,5 +34,4 @@
 
     override fun notifyMessageSent(topic: String) {
     }
-
 }
\ No newline at end of file