Component tests for consul configuration updates

Added few component test cases for updating configuration

Closes ONAP-464

Change-Id: Id8dba1d1cf4bf641a65e27d2a257fb5c26ee2bbc
Signed-off-by: Jakub Dudycz <jdudycz@nokia.com>
Issue-ID: DCAEGEN2-601
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
index aa5810d..246fc7e 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
@@ -23,11 +23,10 @@
 import org.jetbrains.spek.api.Spek
 import org.jetbrains.spek.api.dsl.describe
 import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
-import org.onap.dcae.collectors.veshv.model.Routing
-import org.onap.dcae.collectors.veshv.model.routing
 import org.onap.dcae.collectors.veshv.tests.fakes.*
 import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
+import reactor.core.publisher.Flux
+import java.time.Duration
 
 /**
  * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -50,7 +49,6 @@
     }
 
     describe("Memory management") {
-
         it("should release memory for each handled and dropped message") {
             val sink = StoringSink()
             val sut = Sut(sink)
@@ -160,7 +158,7 @@
             assertThat(messages.get(2).topic).describedAs("last message topic")
                     .isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
         }
-        
+
         it("should drop message if route was not found") {
             val sink = StoringSink()
             val sut = Sut(sink)
@@ -178,6 +176,139 @@
         }
     }
 
+    describe("configuration update") {
+
+        val defaultTimeout = Duration.ofSeconds(10)
+
+        it("should update collector on configuration change") {
+            val sink = StoringSink()
+            val sut = Sut(sink)
+
+            sut.configurationProvider.updateConfiguration(basicConfiguration)
+            val firstCollector = sut.collector
+
+            sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
+            val collectorAfterUpdate = sut.collector
+
+            assertThat(collectorAfterUpdate).isNotSameAs(firstCollector)
+
+        }
+
+        it("should start routing messages on configuration change") {
+            val sink = StoringSink()
+            val sut = Sut(sink)
+
+            sut.configurationProvider.updateConfiguration(configurationWithoutRouting)
+
+            val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
+            assertThat(messages).isEmpty()
+
+            sut.configurationProvider.updateConfiguration(basicConfiguration)
+
+            val messagesAfterUpdate = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
+            assertThat(messagesAfterUpdate).hasSize(1)
+            val message = messagesAfterUpdate[0]
+
+            assertThat(message.topic).describedAs("routed message topic after configuration's change")
+                    .isEqualTo(HVRANMEAS_TOPIC)
+            assertThat(message.partition).describedAs("routed message partition")
+                    .isEqualTo(0)
+        }
+
+        it("should change domain routing on configuration change") {
+            val sink = StoringSink()
+            val sut = Sut(sink)
+
+            sut.configurationProvider.updateConfiguration(basicConfiguration)
+
+            val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
+            assertThat(messages).hasSize(1)
+            val firstMessage = messages[0]
+
+            assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration")
+                    .isEqualTo(HVRANMEAS_TOPIC)
+            assertThat(firstMessage.partition).describedAs("routed message partition")
+                    .isEqualTo(0)
+
+
+            sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
+
+            val messagesAfterUpdate = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
+            assertThat(messagesAfterUpdate).hasSize(2)
+            val secondMessage = messagesAfterUpdate[1]
+
+            assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change")
+                    .isEqualTo(ALTERNATE_HVRANMEAS_TOPIC)
+            assertThat(secondMessage.partition).describedAs("routed message partition")
+                    .isEqualTo(0)
+        }
+
+        it("should update routing for each client sending one message") {
+            val sink = StoringSink()
+            val sut = Sut(sink)
+
+            sut.configurationProvider.updateConfiguration(basicConfiguration)
+
+            val messagesAmount = 10
+            val messagesForEachTopic = 5
+
+            Flux.range(0, messagesAmount).doOnNext {
+                if (it == messagesForEachTopic) {
+                    sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
+                }
+            }.doOnNext {
+                sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
+            }.then().block(defaultTimeout)
+
+
+            val messages = sink.sentMessages
+            val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC }
+            val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC }
+
+            assertThat(messages.size).isEqualTo(messagesAmount)
+            assertThat(messagesForEachTopic)
+                    .describedAs("amount of messages routed to each topic")
+                    .isEqualTo(firstTopicMessagesCount)
+                    .isEqualTo(secondTopicMessagesCount)
+        }
+
+
+        it("should not update routing for client sending continuous stream of messages") {
+            val sink = StoringSink()
+            val sut = Sut(sink)
+
+            sut.configurationProvider.updateConfiguration(basicConfiguration)
+
+            val messageStreamSize = 10
+            val pivot = 5
+
+            val incomingMessages = Flux.range(0, messageStreamSize)
+                    .doOnNext {
+                        if (it == pivot) {
+                            sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
+                            println("config changed")
+                        }
+                    }
+                    .map { vesMessage(Domain.HVRANMEAS) }
+
+
+            sut.collector.handleConnection(sut.alloc, incomingMessages).block(defaultTimeout)
+
+            val messages = sink.sentMessages
+            val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC }
+            val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC }
+
+            assertThat(messages.size).isEqualTo(messageStreamSize)
+            assertThat(firstTopicMessagesCount)
+                    .describedAs("amount of messages routed to first topic")
+                    .isEqualTo(messageStreamSize)
+
+            assertThat(secondTopicMessagesCount)
+                    .describedAs("amount of messages routed to second topic")
+                    .isEqualTo(0)
+        }
+    }
+
     describe("request validation") {
         it("should reject message with payload greater than 1 MiB and all subsequent messages") {
             val sink = StoringSink()
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
index 47c31c6..b89113f 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
@@ -29,6 +29,7 @@
 
 const val HVRANMEAS_TOPIC = "ves_hvRanMeas"
 const val MEASUREMENTS_FOR_VF_SCALING_TOPIC = "ves_hvMeasForVfScaling"
+const val ALTERNATE_HVRANMEAS_TOPIC = "ves_alternateHvRanMeas"
 
 val basicConfiguration: CollectorConfiguration = CollectorConfiguration(
         kafkaBootstrapServers = "localhost:9969",
@@ -63,6 +64,24 @@
 )
 
 
+val configurationWithDifferentRouting: CollectorConfiguration = CollectorConfiguration(
+        kafkaBootstrapServers = "localhost:9969",
+        routing = routing {
+            defineRoute {
+                fromDomain(Domain.HVRANMEAS)
+                toTopic(ALTERNATE_HVRANMEAS_TOPIC)
+                withFixedPartitioning()
+            }
+        }.build()
+)
+
+
+val configurationWithoutRouting: CollectorConfiguration = CollectorConfiguration(
+        kafkaBootstrapServers = "localhost:9969",
+        routing = routing {
+        }.build()
+)
+
 class FakeConfigurationProvider : ConfigurationProvider {
     private val configStream: FluxProcessor<CollectorConfiguration, CollectorConfiguration> = UnicastProcessor.create()
 
@@ -71,4 +90,4 @@
     }
 
     override fun invoke() = configStream
-}
\ No newline at end of file
+}