Merge "Adding Modeling Concepts documentation from COnfluence to RDT"
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt
index 7e6bf68..af8d902 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt
@@ -260,6 +260,7 @@
                     val headers: MutableMap<String, String> = hashMapOf()
                     headers["id"] = it.id
                     blueprintMessageProducerService.sendMessageNB(
+                        key = "mykey",
                         message = it.asJsonString(false),
                         headers = headers
                     )
@@ -272,6 +273,7 @@
                         val headers: MutableMap<String, String> = hashMapOf()
                         headers["id"] = it.id
                         blueprintMessageProducerService.sendMessageNB(
+                            key = "mykey",
                             message = it.asJsonString(false),
                             headers = headers
                         )
@@ -284,6 +286,7 @@
                         val headers: MutableMap<String, String> = hashMapOf()
                         headers["id"] = it.id
                         blueprintMessageProducerService.sendMessageNB(
+                            key = "mykey",
                             message = it.asJsonString(false),
                             headers = headers
                         )
diff --git a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt
index cb78f41..76662d4 100644
--- a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt
+++ b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt
@@ -242,4 +242,6 @@
     const val PROPERTY_CLUSTER_CONFIG_FILE = "CLUSTER_CONFIG_FILE"
 
     const val NODE_TEMPLATE_TYPE_COMPONENT_RESOURCE_RESOLUTION = "component-resource-resolution"
+    const val NODE_TEMPLATE_TYPE_DG = "dg-generic"
+    const val PROPERTY_DG_DEPENDENCY_NODE_TEMPLATE = "dependency-node-templates"
 }
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt
index f74abcd..311d35c 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt
@@ -19,6 +19,7 @@
 
 import kotlinx.coroutines.channels.Channel
 import org.apache.kafka.clients.consumer.Consumer
+import org.apache.kafka.clients.consumer.ConsumerRecord
 import org.apache.kafka.clients.consumer.ConsumerRecords
 import org.apache.kafka.streams.Topology
 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
@@ -29,15 +30,15 @@
 
 interface BlueprintMessageConsumerService {
 
-    suspend fun subscribe(): Channel<String> {
+    suspend fun subscribe(): Channel<ConsumerRecord<String, ByteArray>> {
         return subscribe(null)
     }
 
     /** Subscribe to the Kafka channel with [additionalConfig] */
-    suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<String>
+    suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<ConsumerRecord<String, ByteArray>>
 
     /** Subscribe to the Kafka channel with [additionalConfig] for dynamic [topics]*/
-    suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>? = null): Channel<String>
+    suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>? = null): Channel<ConsumerRecord<String, ByteArray>>
 
     /** Consume and execute dynamic function [consumerFunction] */
     suspend fun consume(consumerFunction: ConsumerFunction) {
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt
index cdc65a1..66d3a5b 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt
@@ -17,34 +17,19 @@
 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
 
 import kotlinx.coroutines.runBlocking
+import java.util.UUID
 
 interface BlueprintMessageProducerService {
 
-    fun sendMessage(message: Any): Boolean {
-        return sendMessage(message = message, headers = null)
+    fun sendMessage(key: String = UUID.randomUUID().toString(), message: Any, headers: MutableMap<String, String>? = null): Boolean = runBlocking {
+        sendMessageNB(key, message, headers)
     }
 
-    fun sendMessage(topic: String, message: Any): Boolean {
-        return sendMessage(topic, message, null)
+    fun sendMessage(key: String = UUID.randomUUID().toString(), topic: String, message: Any, headers: MutableMap<String, String>? = null): Boolean = runBlocking {
+        sendMessageNB(key, topic, message, headers)
     }
 
-    fun sendMessage(message: Any, headers: MutableMap<String, String>?): Boolean = runBlocking {
-        sendMessageNB(message = message, headers = headers)
-    }
+    suspend fun sendMessageNB(key: String = UUID.randomUUID().toString(), message: Any, headers: MutableMap<String, String>? = null): Boolean
 
-    fun sendMessage(topic: String, message: Any, headers: MutableMap<String, String>?): Boolean = runBlocking {
-        sendMessageNB(topic, message, headers)
-    }
-
-    suspend fun sendMessageNB(message: Any): Boolean {
-        return sendMessageNB(message = message, headers = null)
-    }
-
-    suspend fun sendMessageNB(message: Any, headers: MutableMap<String, String>?): Boolean
-
-    suspend fun sendMessageNB(topic: String, message: Any): Boolean {
-        return sendMessageNB(topic, message, null)
-    }
-
-    suspend fun sendMessageNB(topic: String, message: Any, headers: MutableMap<String, String>?): Boolean
+    suspend fun sendMessageNB(key: String = UUID.randomUUID().toString(), topic: String, message: Any, headers: MutableMap<String, String>? = null): Boolean
 }
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt
index cdcd419..a0932e9 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt
@@ -19,13 +19,12 @@
 
 import kotlinx.coroutines.channels.Channel
 import kotlinx.coroutines.delay
-import kotlinx.coroutines.launch
 import kotlinx.coroutines.runBlocking
 import org.apache.kafka.clients.consumer.Consumer
+import org.apache.kafka.clients.consumer.ConsumerRecord
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties
 import org.onap.ccsdk.cds.controllerblueprints.core.logger
-import java.nio.charset.Charset
 import java.time.Duration
 import kotlin.concurrent.thread
 
@@ -35,7 +34,7 @@
     BlueprintMessageConsumerService {
 
     val log = logger(KafkaMessageConsumerService::class)
-    val channel = Channel<String>()
+    val channel = Channel<ConsumerRecord<String, ByteArray>>()
     var kafkaConsumer: Consumer<String, ByteArray>? = null
 
     @Volatile
@@ -49,14 +48,14 @@
         return KafkaConsumer(configProperties)
     }
 
-    override suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<String> {
+    override suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<ConsumerRecord<String, ByteArray>> {
         /** get to topic names */
         val consumerTopic = messageConsumerProperties.topic?.split(",")?.map { it.trim() }
         check(!consumerTopic.isNullOrEmpty()) { "couldn't get topic information" }
         return subscribe(consumerTopic, additionalConfig)
     }
 
-    override suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>?): Channel<String> {
+    override suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>?): Channel<ConsumerRecord<String, ByteArray>> {
         /** Create Kafka consumer */
         kafkaConsumer = kafkaConsumer(additionalConfig)
 
@@ -78,14 +77,10 @@
                     runBlocking {
                         consumerRecords?.forEach { consumerRecord ->
                             /** execute the command block */
-                            consumerRecord.value()?.let {
-                                launch {
-                                    if (!channel.isClosedForSend) {
-                                        channel.send(String(it, Charset.defaultCharset()))
-                                    } else {
-                                        log.error("Channel is closed to receive message")
-                                    }
-                                }
+                            if (!channel.isClosedForSend) {
+                                channel.send(consumerRecord)
+                            } else {
+                                log.error("Channel is closed to receive message")
                             }
                         }
                     }
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt
index 8958d4f..59e9192 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt
@@ -29,7 +29,6 @@
 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageProducerProperties
 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
-import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID
 import org.slf4j.LoggerFactory
 import java.nio.charset.Charset
 
@@ -48,17 +47,13 @@
         const val MAX_ERR_MSG_LEN = 128
     }
 
-    override suspend fun sendMessageNB(message: Any): Boolean {
+    override suspend fun sendMessageNB(key: String, message: Any, headers: MutableMap<String, String>?): Boolean {
         checkNotNull(messageProducerProperties.topic) { "default topic is not configured" }
-        return sendMessageNB(messageProducerProperties.topic!!, message)
-    }
-
-    override suspend fun sendMessageNB(message: Any, headers: MutableMap<String, String>?): Boolean {
-        checkNotNull(messageProducerProperties.topic) { "default topic is not configured" }
-        return sendMessageNB(messageProducerProperties.topic!!, message, headers)
+        return sendMessageNB(key, messageProducerProperties.topic!!, message, headers)
     }
 
     override suspend fun sendMessageNB(
+        key: String,
         topic: String,
         message: Any,
         headers: MutableMap<String, String>?
@@ -73,7 +68,7 @@
             else -> clonedMessage.asJsonString().toByteArray(Charset.defaultCharset())
         }
 
-        val record = ProducerRecord<String, ByteArray>(topic, defaultToUUID(), byteArrayMessage)
+        val record = ProducerRecord<String, ByteArray>(topic, key, byteArrayMessage)
         val recordHeaders = record.headers()
         messageLoggerService.messageProducing(recordHeaders)
         headers?.let {
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerService.kt
index 60f2dfa..4340e48 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerService.kt
@@ -17,6 +17,7 @@
 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
 
 import kotlinx.coroutines.channels.Channel
+import org.apache.kafka.clients.consumer.ConsumerRecord
 import org.apache.kafka.streams.KafkaStreams
 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
 import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
@@ -39,11 +40,11 @@
         return configProperties
     }
 
-    override suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<String> {
+    override suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<ConsumerRecord<String, ByteArray>> {
         throw BluePrintProcessorException("not implemented")
     }
 
-    override suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>?): Channel<String> {
+    override suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>?): Channel<ConsumerRecord<String, ByteArray>> {
         throw BluePrintProcessorException("not implemented")
     }
 
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
index fdf6e48..77bdbe4 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
@@ -51,6 +51,7 @@
 import org.springframework.test.context.ContextConfiguration
 import org.springframework.test.context.TestPropertySource
 import org.springframework.test.context.junit4.SpringRunner
+import java.nio.charset.Charset
 import kotlin.test.assertEquals
 import kotlin.test.assertNotNull
 import kotlin.test.assertTrue
@@ -133,9 +134,14 @@
 
             every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer
             val channel = spyBlueprintMessageConsumerService.subscribe(null)
+            var i = 0
             launch {
                 channel.consumeEach {
-                    assertTrue(it.startsWith("I am message"), "failed to get the actual message")
+                    ++i
+                    val key = it.key()
+                    val value = String(it.value(), Charset.defaultCharset())
+                    assertTrue(value.startsWith("I am message"), "failed to get the actual message")
+                    assertEquals("key_$i", key)
                 }
             }
             delay(10)
@@ -268,6 +274,7 @@
                     val headers: MutableMap<String, String> = hashMapOf()
                     headers["id"] = it.toString()
                     blueprintMessageProducerService.sendMessageNB(
+                        key = "mykey",
                         message = "this is my message($it)",
                         headers = headers
                     )
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
index 537dab1..881f0b4 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
@@ -88,7 +88,7 @@
 
             every { spyBluePrintMessageProducerService.messageTemplate(any()) } returns mockKafkaTemplate
 
-            val response = spyBluePrintMessageProducerService.sendMessage("Testing message")
+            val response = spyBluePrintMessageProducerService.sendMessage("mykey", "Testing message")
             assertTrue(response, "failed to get command response")
         }
     }
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt
index c30ab9b..44990ae 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt
@@ -132,6 +132,7 @@
                     val headers: MutableMap<String, String> = hashMapOf()
                     headers["id"] = it.toString()
                     blueprintMessageProducerService.sendMessageNB(
+                        key = "mykey",
                         message = "this is my message($it)",
                         headers = headers
                     )
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt
index 1f3dd65..1ccf230 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt
@@ -31,6 +31,8 @@
 import org.springframework.boot.context.event.ApplicationReadyEvent
 import org.springframework.context.event.EventListener
 import org.springframework.stereotype.Service
+import java.nio.charset.Charset
+import java.util.UUID
 import java.util.concurrent.Phaser
 import javax.annotation.PreDestroy
 
@@ -95,10 +97,12 @@
                     launch {
                         try {
                             ph.register()
-                            log.trace("Consumed Message : $message")
-                            val executionServiceInput = message.jsonAsType<ExecutionServiceInput>()
+                            val key = message.key() ?: UUID.randomUUID().toString()
+                            val value = String(message.value(), Charset.defaultCharset())
+                            log.trace("Consumed Message : key($key) value($value)")
+                            val executionServiceInput = value.jsonAsType<ExecutionServiceInput>()
                             val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput)
-                            blueprintMessageProducerService.sendMessage(executionServiceOutput)
+                            blueprintMessageProducerService.sendMessage(key, executionServiceOutput)
                         } catch (e: Exception) {
                             log.error("failed in processing the consumed message : $message", e)
                         } finally {
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt
index fca7398..145c37b 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt
@@ -17,6 +17,7 @@
 package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
 
 import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.node.ArrayNode
 import com.fasterxml.jackson.databind.node.ObjectNode
 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
@@ -27,6 +28,8 @@
 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
 import org.onap.ccsdk.cds.controllerblueprints.core.common.ApplicationConstants
 import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BluePrintCatalogService
+import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintContext
+import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintRuntimeService
 import org.onap.ccsdk.cds.controllerblueprints.core.service.PropertyAssignmentService
 import org.onap.ccsdk.cds.controllerblueprints.core.utils.BluePrintMetadataUtils
 import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
@@ -40,6 +43,13 @@
 /**
  * Audit service used to produce execution service input and output message
  * sent into dedicated kafka topics.
+ *
+ * @param bluePrintMessageLibPropertyService Service used to instantiate audit service producers
+ * @param blueprintsProcessorCatalogService Service used to get the base path of the current CBA executed
+ *
+ * @property inputInstance Request Kakfa Producer instance
+ * @property outputInstance Response Kakfa Producer instance
+ * @property log Audit Service logger
  */
 @ConditionalOnProperty(
         name = ["blueprintsprocessor.messageproducer.self-service-api.audit.kafkaEnable"],
@@ -68,12 +78,14 @@
      * Publish execution input into a kafka topic.
      * The correlation UUID is used to link the input to its output.
      * Sensitive data within the request are hidden.
+     * @param executionServiceInput Audited BP request
      */
     override suspend fun publishExecutionInput(executionServiceInput: ExecutionServiceInput) {
         val secureExecutionServiceInput = hideSensitiveData(executionServiceInput)
+        val key = secureExecutionServiceInput.actionIdentifiers.blueprintName
         try {
             this.inputInstance = this.getInputInstance(INPUT_SELECTOR)
-            this.inputInstance!!.sendMessage(secureExecutionServiceInput)
+            this.inputInstance!!.sendMessage(key, secureExecutionServiceInput)
         } catch (e: Exception) {
             var errMsg =
                     if (e.message != null) "ERROR : ${e.message}"
@@ -86,32 +98,38 @@
      * Publish execution output into a kafka topic.
      * The correlation UUID is used to link the output to its input.
      * A correlation UUID is added to link the input to its output.
+     * @param correlationUUID UUID used to link the audited response to its audited request
+     * @param executionServiceOutput Audited BP response
      */
     override suspend fun publishExecutionOutput(correlationUUID: String, executionServiceOutput: ExecutionServiceOutput) {
         executionServiceOutput.correlationUUID = correlationUUID
+        val key = executionServiceOutput.actionIdentifiers.blueprintName
         try {
             this.outputInstance = this.getOutputInstance(OUTPUT_SELECTOR)
-            this.outputInstance!!.sendMessage(executionServiceOutput)
+            this.outputInstance!!.sendMessage(key, executionServiceOutput)
         } catch (e: Exception) {
             var errMsg =
-                if (e.message != null) "ERROR : $e"
-                else "ERROR : Failed to send execution request to Kafka."
+                    if (e.message != null) "ERROR : $e"
+                    else "ERROR : Failed to send execution request to Kafka."
             log.error(errMsg)
         }
     }
 
     /**
-     * Return the input kafka producer instance using a selector.
+     * Return the input kafka producer instance using a [selector] if not already instantiated.
+     * @param selector Selector to retrieve request kafka producer configuration
      */
     private fun getInputInstance(selector: String): BlueprintMessageProducerService = inputInstance ?: createInstance(selector)
 
     /**
-     * Return the output kafka producer instance using a selector.
+     * Return the output kafka producer instance using a [selector] if not already instantiated.
+     * @param selector Selector to retrieve response kafka producer configuration
      */
     private fun getOutputInstance(selector: String): BlueprintMessageProducerService = outputInstance ?: createInstance(selector)
 
     /**
-     * Create a kafka producer instance.
+     * Create a kafka producer instance using a [selector].
+     * @param selector Selector to retrieve kafka producer configuration
      */
     private fun createInstance(selector: String): BlueprintMessageProducerService {
         log.info("Setting up message producer($selector)...")
@@ -119,9 +137,10 @@
     }
 
     /**
-     * Hide sensitive data in the request.
+     * Hide sensitive data in the [executionServiceInput].
      * Sensitive data are declared in the resource resolution mapping using
      * the property metadata "log-protect" set to true.
+     * @param executionServiceInput BP Execution Request where data needs to be hidden
      */
     private suspend fun hideSensitiveData(
         executionServiceInput: ExecutionServiceInput
@@ -153,60 +172,105 @@
                 val blueprintRuntimeService = BluePrintMetadataUtils.getBluePrintRuntime(requestId, basePath.toString())
                 val blueprintContext = blueprintRuntimeService.bluePrintContext()
 
-                /** Looking for node templates defined as component-resource-resolution */
-                val nodeTemplates = blueprintContext.nodeTemplates()
-                nodeTemplates!!.forEach { nodeTemplate ->
-                    val nodeTemplateName = nodeTemplate.key
-                    val nodeTemplateType = blueprintContext.nodeTemplateByName(nodeTemplateName).type
-                    if (nodeTemplateType == BluePrintConstants.NODE_TEMPLATE_TYPE_COMPONENT_RESOURCE_RESOLUTION) {
-                        val interfaceName = blueprintContext.nodeTemplateFirstInterfaceName(nodeTemplateName)
-                        val operationName = blueprintContext.nodeTemplateFirstInterfaceFirstOperationName(nodeTemplateName)
+                val workflowSteps = blueprintContext.workflowByName(workflowName).steps
+                checkNotNull(workflowSteps) { "Failed to get step(s) for workflow($workflowName)" }
+                workflowSteps.forEach { step ->
+                    val nodeTemplateName = step.value.target
+                    checkNotNull(nodeTemplateName) { "Failed to get node template target for workflow($workflowName), step($step)" }
+                    val nodeTemplate = blueprintContext.nodeTemplateByName(nodeTemplateName)
 
-                        val propertyAssignments: MutableMap<String, JsonNode> =
-                                blueprintContext.nodeTemplateInterfaceOperationInputs(nodeTemplateName, interfaceName, operationName)
-                                        ?: hashMapOf()
-
-                        /** Getting values define in artifact-prefix-names */
-                        val input = executionServiceInput.payload.get("$workflowName-request")
-                        blueprintRuntimeService.assignWorkflowInputs(workflowName, input)
-                        val artifactPrefixNamesNode = propertyAssignments[ResourceResolutionConstants.INPUT_ARTIFACT_PREFIX_NAMES]
-                        val propertyAssignmentService = PropertyAssignmentService(blueprintRuntimeService)
-                        val artifactPrefixNamesNodeValue = propertyAssignmentService.resolveAssignmentExpression(
-                                BluePrintConstants.MODEL_DEFINITION_TYPE_NODE_TEMPLATE,
-                                nodeTemplateName,
-                                ResourceResolutionConstants.INPUT_ARTIFACT_PREFIX_NAMES,
-                                artifactPrefixNamesNode!!)
-
-                        val artifactPrefixNames = JacksonUtils.getListFromJsonNode(artifactPrefixNamesNodeValue!!, String::class.java)
-
-                        /** Storing mapping entries with metadata log-protect set to true */
-                        val sensitiveParameters: List<String> = artifactPrefixNames
-                                .map { "$it-mapping" }
-                                .map { blueprintRuntimeService.resolveNodeTemplateArtifact(nodeTemplateName, it) }
-                                .flatMap { JacksonUtils.getListFromJson(it, ResourceAssignment::class.java) }
-                                .filter { PropertyDefinitionUtils.hasLogProtect(it.property) }
-                                .map { it.name }
-
-                        /** Hiding sensitive input parameters from the request */
-                        var workflowProperties: ObjectNode = clonedExecutionServiceInput.payload
-                                .path("$workflowName-request")
-                                .path("$workflowName-properties") as ObjectNode
-
-                        sensitiveParameters.forEach { sensitiveParameter ->
-                            if (workflowProperties.has(sensitiveParameter)) {
-                                workflowProperties.replace(sensitiveParameter, ApplicationConstants.LOG_REDACTED.asJsonPrimitive())
-                            }
+                    /** We need to check in his Node Template Dependencies is case of a Node Template DG */
+                    if (nodeTemplate.type == BluePrintConstants.NODE_TEMPLATE_TYPE_DG) {
+                        val dependencyNodeTemplate = nodeTemplate.properties?.get(BluePrintConstants.PROPERTY_DG_DEPENDENCY_NODE_TEMPLATE) as ArrayNode
+                        dependencyNodeTemplate.forEach { dependencyNodeTemplateName ->
+                            clonedExecutionServiceInput = hideSensitiveDataFromResourceResolution(
+                                    blueprintRuntimeService,
+                                    blueprintContext,
+                                    clonedExecutionServiceInput,
+                                    workflowName,
+                                    dependencyNodeTemplateName.asText()
+                            )
                         }
+                    } else {
+                        clonedExecutionServiceInput = hideSensitiveDataFromResourceResolution(
+                                blueprintRuntimeService,
+                                blueprintContext,
+                                clonedExecutionServiceInput,
+                                workflowName,
+                                nodeTemplateName
+                        )
                     }
                 }
             }
         } catch (e: Exception) {
-        val errMsg = "ERROR : Couldn't hide sensitive data in the execution request."
-        log.error(errMsg, e)
-        clonedExecutionServiceInput.payload.replace(
-                "$workflowName-request",
-                "$errMsg $e".asJsonPrimitive())
+            val errMsg = "ERROR : Couldn't hide sensitive data in the execution request."
+            log.error(errMsg, e)
+            clonedExecutionServiceInput.payload.replace(
+                    "$workflowName-request",
+                    "$errMsg $e".asJsonPrimitive())
         }
         return clonedExecutionServiceInput
     }
+
+    /**
+     * Hide sensitive data in [executionServiceInput] if the given [nodeTemplateName] is a
+     * resource resolution component.
+     * @param blueprintRuntimeService Current blueprint runtime service
+     * @param blueprintContext Current blueprint runtime context
+     * @param executionServiceInput BP Execution Request where data needs to be hidden
+     * @param workflowName Current workflow being executed
+     * @param nodeTemplateName Node template to check for sensitive data
+     * @return [executionServiceInput] with sensitive inputs replaced by a generic string
+     */
+    private suspend fun hideSensitiveDataFromResourceResolution(
+        blueprintRuntimeService: BluePrintRuntimeService<MutableMap<String, JsonNode>>,
+        blueprintContext: BluePrintContext,
+        executionServiceInput: ExecutionServiceInput,
+        workflowName: String,
+        nodeTemplateName: String
+    ): ExecutionServiceInput {
+
+        val nodeTemplate = blueprintContext.nodeTemplateByName(nodeTemplateName)
+        if (nodeTemplate.type == BluePrintConstants.NODE_TEMPLATE_TYPE_COMPONENT_RESOURCE_RESOLUTION) {
+            val interfaceName = blueprintContext.nodeTemplateFirstInterfaceName(nodeTemplateName)
+            val operationName = blueprintContext.nodeTemplateFirstInterfaceFirstOperationName(nodeTemplateName)
+
+            val propertyAssignments: MutableMap<String, JsonNode> =
+                    blueprintContext.nodeTemplateInterfaceOperationInputs(nodeTemplateName, interfaceName, operationName)
+                            ?: hashMapOf()
+
+            /** Getting values define in artifact-prefix-names */
+            val input = executionServiceInput.payload.get("$workflowName-request")
+            blueprintRuntimeService.assignWorkflowInputs(workflowName, input)
+            val artifactPrefixNamesNode = propertyAssignments[ResourceResolutionConstants.INPUT_ARTIFACT_PREFIX_NAMES]
+            val propertyAssignmentService = PropertyAssignmentService(blueprintRuntimeService)
+            val artifactPrefixNamesNodeValue = propertyAssignmentService.resolveAssignmentExpression(
+                    BluePrintConstants.MODEL_DEFINITION_TYPE_NODE_TEMPLATE,
+                    nodeTemplateName,
+                    ResourceResolutionConstants.INPUT_ARTIFACT_PREFIX_NAMES,
+                    artifactPrefixNamesNode!!)
+
+            val artifactPrefixNames = JacksonUtils.getListFromJsonNode(artifactPrefixNamesNodeValue!!, String::class.java)
+
+            /** Storing mapping entries with metadata log-protect set to true */
+            val sensitiveParameters: List<String> = artifactPrefixNames
+                    .map { "$it-mapping" }
+                    .map { blueprintRuntimeService.resolveNodeTemplateArtifact(nodeTemplateName, it) }
+                    .flatMap { JacksonUtils.getListFromJson(it, ResourceAssignment::class.java) }
+                    .filter { PropertyDefinitionUtils.hasLogProtect(it.property) }
+                    .map { it.name }
+
+            /** Hiding sensitive input parameters from the request */
+            var workflowProperties: ObjectNode = executionServiceInput.payload
+                    .path("$workflowName-request")
+                    .path("$workflowName-properties") as ObjectNode
+
+            sensitiveParameters.forEach { sensitiveParameter ->
+                if (workflowProperties.has(sensitiveParameter)) {
+                    workflowProperties.replace(sensitiveParameter, ApplicationConstants.LOG_REDACTED.asJsonPrimitive())
+                }
+            }
+        }
+        return executionServiceInput
+    }
 }