Added Kafka metrics for CDS workers

Added counters to gather metrics on CDS Kafka workers.
This will enable us to get metrics on how many messages we consumer and produce to/from kafka.
For consumers we count how many messages we consume and how many failed ie. consumed but not able to be processed (parsing error).
For producers we count how many messages we produce and how many failed ie. failed to be pushed to the cluster (unavailable brokers, network error, ...).
Relocated metrics tag constants to BlueprintConstants so that they can be use by any CDS module.
If they make sense for other metrics then they should be shared.

Issue-ID: CCSDK-3155
Signed-off-by: Julien Fontaine <julien.fontaine@bell.ca>
Change-Id: Iad6aba588766f655f3a74cd626e0f74e29188f96
diff --git a/ms/blueprintsprocessor/functions/message-prioritization/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt b/ms/blueprintsprocessor/functions/message-prioritization/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt
index 5f51394..9f74daa 100644
--- a/ms/blueprintsprocessor/functions/message-prioritization/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritization/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt
@@ -1,5 +1,6 @@
 /*
  * Copyright © 2018-2019 AT&T Intellectual Property.
+ * Modifications Copyright © 2021 Bell Canada.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -16,6 +17,7 @@
 
 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
 
+import io.micrometer.core.instrument.MeterRegistry
 import io.mockk.coEvery
 import io.mockk.every
 import io.mockk.mockk
@@ -49,6 +51,7 @@
 import org.onap.ccsdk.cds.controllerblueprints.core.service.BlueprintDependencyService
 import org.springframework.beans.factory.annotation.Autowired
 import org.springframework.boot.test.autoconfigure.orm.jpa.DataJpaTest
+import org.springframework.boot.test.mock.mockito.MockBean
 import org.springframework.context.ApplicationContext
 import org.springframework.test.annotation.DirtiesContext
 import org.springframework.test.context.ContextConfiguration
@@ -120,6 +123,9 @@
     @Autowired
     lateinit var messagePrioritizationStateService: MessagePrioritizationStateService
 
+    @MockBean
+    lateinit var meterRegistry: MeterRegistry
+
     @Before
     fun setup() {
         BlueprintDependencyService.inject(applicationContext)
diff --git a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BlueprintConstants.kt b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BlueprintConstants.kt
index fdd1d5d..cfe4360 100644
--- a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BlueprintConstants.kt
+++ b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BlueprintConstants.kt
@@ -248,4 +248,12 @@
     const val NODE_TEMPLATE_TYPE_COMPONENT_RESOURCE_RESOLUTION = "component-resource-resolution"
     const val NODE_TEMPLATE_TYPE_DG = "dg-generic"
     const val PROPERTY_DG_DEPENDENCY_NODE_TEMPLATE = "dependency-node-templates"
+
+    // TAGS
+    const val METRIC_TAG_BP_NAME = "blueprint_name"
+    const val METRIC_TAG_BP_VERSION = "blueprint_version"
+    const val METRIC_TAG_BP_ACTION = "blueprint_action"
+    const val METRIC_TAG_BP_STATUS = "status"
+    const val METRIC_TAG_BP_OUTCOME = "outcome"
+    const val METRIC_TAG_TOPIC = "topic"
 }
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BlueprintMessageMetricConstants.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BlueprintMessageMetricConstants.kt
new file mode 100644
index 0000000..be84f47
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BlueprintMessageMetricConstants.kt
@@ -0,0 +1,32 @@
+/*
+ * Copyright © 2021 Bell Canada.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.message
+
+object BlueprintMessageMetricConstants {
+
+    private const val METRIC_PREFIX = "cds.kafka"
+
+    private const val PRODUCED_MESSAGES_PREFIX = "$METRIC_PREFIX.produced.messages"
+    private const val CONSUMED_MESSAGES_PREFIX = "$METRIC_PREFIX.consumed.messages"
+
+    // COUNTERS
+    const val KAFKA_PRODUCED_MESSAGES_COUNTER = "$PRODUCED_MESSAGES_PREFIX.total"
+    const val KAFKA_PRODUCED_MESSAGES_ERROR_COUNTER = "$PRODUCED_MESSAGES_PREFIX.error"
+
+    const val KAFKA_CONSUMED_MESSAGES_COUNTER = "$CONSUMED_MESSAGES_PREFIX.total"
+    const val KAFKA_CONSUMED_MESSAGES_ERROR_COUNTER = "$CONSUMED_MESSAGES_PREFIX.error"
+}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageLibPropertyService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageLibPropertyService.kt
index 9e0c537..cb0bc32 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageLibPropertyService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageLibPropertyService.kt
@@ -1,6 +1,6 @@
 /*
  *  Copyright © 2019 IBM.
- *  Modifications Copyright © 2018-2019 AT&T Intellectual Property.
+ *  Modifications Copyright © 2018-2021 AT&T, Bell Canada Intellectual Property
  *
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@
 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
 
 import com.fasterxml.jackson.databind.JsonNode
+import io.micrometer.core.instrument.MeterRegistry
 import org.onap.ccsdk.cds.blueprintsprocessor.core.BlueprintPropertiesService
 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties
 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageProducerProperties
@@ -36,17 +37,20 @@
 import org.springframework.stereotype.Service
 
 @Service(MessageLibConstants.SERVICE_BLUEPRINT_MESSAGE_LIB_PROPERTY)
-open class BlueprintMessageLibPropertyService(private var bluePrintPropertiesService: BlueprintPropertiesService) {
+open class BlueprintMessageLibPropertyService(
+    private var bluePrintPropertiesService: BlueprintPropertiesService,
+    private val meterRegistry: MeterRegistry
+) {
 
     fun blueprintMessageProducerService(jsonNode: JsonNode): BlueprintMessageProducerService {
         val messageClientProperties = messageProducerProperties(jsonNode)
-        return KafkaMessageProducerService(messageClientProperties)
+        return KafkaMessageProducerService(messageClientProperties, meterRegistry)
     }
 
     fun blueprintMessageProducerService(selector: String): BlueprintMessageProducerService {
         val prefix = "${MessageLibConstants.PROPERTY_MESSAGE_PRODUCER_PREFIX}$selector"
         val messageClientProperties = messageProducerProperties(prefix)
-        return KafkaMessageProducerService(messageClientProperties)
+        return KafkaMessageProducerService(messageClientProperties, meterRegistry)
     }
 
     fun messageProducerProperties(prefix: String): MessageProducerProperties {
@@ -184,17 +188,20 @@
                 /** Message Consumer */
                 MessageLibConstants.TYPE_KAFKA_BASIC_AUTH -> {
                     return KafkaMessageConsumerService(
-                        messageConsumerProperties as KafkaBasicAuthMessageConsumerProperties
+                        messageConsumerProperties as KafkaBasicAuthMessageConsumerProperties,
+                        meterRegistry
                     )
                 }
                 MessageLibConstants.TYPE_KAFKA_SSL_AUTH -> {
                     return KafkaMessageConsumerService(
-                        messageConsumerProperties as KafkaSslAuthMessageConsumerProperties
+                        messageConsumerProperties as KafkaSslAuthMessageConsumerProperties,
+                        meterRegistry
                     )
                 }
                 MessageLibConstants.TYPE_KAFKA_SCRAM_SSL_AUTH -> {
                     return KafkaMessageConsumerService(
-                        messageConsumerProperties as KafkaScramSslAuthMessageConsumerProperties
+                        messageConsumerProperties as KafkaScramSslAuthMessageConsumerProperties,
+                        meterRegistry
                     )
                 }
                 /** Stream Consumer */
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt
index af689a1..004b476 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt
@@ -17,6 +17,7 @@
 
 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
 
+import io.micrometer.core.instrument.MeterRegistry
 import kotlinx.coroutines.channels.Channel
 import kotlinx.coroutines.delay
 import kotlinx.coroutines.launch
@@ -24,13 +25,16 @@
 import org.apache.kafka.clients.consumer.Consumer
 import org.apache.kafka.clients.consumer.ConsumerRecord
 import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.onap.ccsdk.cds.blueprintsprocessor.message.BlueprintMessageMetricConstants
 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.utils.BlueprintMessageUtils
 import org.onap.ccsdk.cds.controllerblueprints.core.logger
 import java.time.Duration
 import kotlin.concurrent.thread
 
 open class KafkaMessageConsumerService(
-    private val messageConsumerProperties: KafkaBasicAuthMessageConsumerProperties
+    private val messageConsumerProperties: KafkaBasicAuthMessageConsumerProperties,
+    private val meterRegistry: MeterRegistry
 ) :
     BlueprintMessageConsumerService {
 
@@ -78,6 +82,10 @@
                     runBlocking {
                         consumerRecords?.forEach { consumerRecord ->
                             launch {
+                                meterRegistry.counter(
+                                    BlueprintMessageMetricConstants.KAFKA_CONSUMED_MESSAGES_COUNTER,
+                                    BlueprintMessageUtils.kafkaMetricTag(consumerRecord.topic())
+                                ).increment()
                                 /** execute the command block */
                                 if (!channel.isClosedForSend) {
                                     channel.send(consumerRecord)
@@ -89,6 +97,10 @@
                                             "key(${consumerRecord.key()})"
                                     )
                                 } else {
+                                    meterRegistry.counter(
+                                        BlueprintMessageMetricConstants.KAFKA_CONSUMED_MESSAGES_ERROR_COUNTER,
+                                        BlueprintMessageUtils.kafkaMetricTag(consumerRecord.topic())
+                                    ).increment()
                                     log.error("Channel is closed to receive message")
                                 }
                             }
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt
index 88b0dfa..21fd84d 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt
@@ -18,6 +18,7 @@
 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
 
 import com.fasterxml.jackson.databind.node.ObjectNode
+import io.micrometer.core.instrument.MeterRegistry
 import org.apache.commons.lang.builder.ToStringBuilder
 import org.apache.kafka.clients.producer.Callback
 import org.apache.kafka.clients.producer.KafkaProducer
@@ -26,14 +27,17 @@
 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status
+import org.onap.ccsdk.cds.blueprintsprocessor.message.BlueprintMessageMetricConstants
 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageProducerProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.message.utils.BlueprintMessageUtils
 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
 import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
 import org.slf4j.LoggerFactory
 import java.nio.charset.Charset
 
 class KafkaMessageProducerService(
-    private val messageProducerProperties: MessageProducerProperties
+    private val messageProducerProperties: MessageProducerProperties,
+    private val meterRegistry: MeterRegistry
 ) :
     BlueprintMessageProducerService {
 
@@ -76,9 +80,17 @@
             headers.forEach { (key, value) -> recordHeaders.add(RecordHeader(key, value.toByteArray())) }
         }
         val callback = Callback { metadata, exception ->
-            if (exception != null)
+            meterRegistry.counter(
+                BlueprintMessageMetricConstants.KAFKA_PRODUCED_MESSAGES_COUNTER,
+                BlueprintMessageUtils.kafkaMetricTag(topic)
+            ).increment()
+            if (exception != null) {
+                meterRegistry.counter(
+                    BlueprintMessageMetricConstants.KAFKA_PRODUCED_MESSAGES_ERROR_COUNTER,
+                    BlueprintMessageUtils.kafkaMetricTag(topic)
+                ).increment()
                 log.error("Couldn't publish ${clonedMessage::class.simpleName} ${getMessageLogData(clonedMessage)}.", exception)
-            else {
+            } else {
                 val message = "${clonedMessage::class.simpleName} published : topic(${metadata.topic()}) " +
                     "partition(${metadata.partition()}) " +
                     "offset(${metadata.offset()}) ${getMessageLogData(clonedMessage)}."
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtils.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtils.kt
new file mode 100644
index 0000000..7431998
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtils.kt
@@ -0,0 +1,29 @@
+/*
+ * Copyright © 2021 Bell Canada.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.message.utils
+
+import io.micrometer.core.instrument.Tag
+import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintConstants
+
+class BlueprintMessageUtils {
+    companion object {
+        fun kafkaMetricTag(topic: String): MutableList<Tag> =
+            mutableListOf(
+                Tag.of(BlueprintConstants.METRIC_TAG_TOPIC, topic)
+            )
+    }
+}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
index f240f76..fb53ff4 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
@@ -1,6 +1,6 @@
 /*
  *  Copyright © 2019 IBM.
- *  Modifications Copyright © 2018-2019 AT&T Intellectual Property.
+ *  Modifications Copyright © 2018-2021 AT&T, Bell Canada Intellectual Property.
  *
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
 
 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
 
+import io.micrometer.core.instrument.MeterRegistry
 import io.mockk.every
 import io.mockk.spyk
 import kotlinx.coroutines.channels.consumeEach
@@ -47,6 +48,7 @@
 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants
 import org.onap.ccsdk.cds.controllerblueprints.core.logger
 import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.test.mock.mockito.MockBean
 import org.springframework.test.annotation.DirtiesContext
 import org.springframework.test.context.ContextConfiguration
 import org.springframework.test.context.TestPropertySource
@@ -100,6 +102,9 @@
     @Autowired
     lateinit var bluePrintMessageLibPropertyService: BlueprintMessageLibPropertyService
 
+    @MockBean
+    lateinit var meterRegistry: MeterRegistry
+
     @Test
     fun testKafkaBasicAuthConsumerService() {
         runBlocking {
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
index 2293cee..1490a33 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
@@ -1,5 +1,6 @@
 /*
  *  Copyright © 2019 IBM.
+ *  Modifications Copyright © 2021 Bell Canada.
  *
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -16,6 +17,7 @@
 
 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
 
+import io.micrometer.core.instrument.MeterRegistry
 import io.mockk.every
 import io.mockk.mockk
 import io.mockk.spyk
@@ -37,6 +39,7 @@
 import org.onap.ccsdk.cds.blueprintsprocessor.message.BlueprintMessageLibConfiguration
 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageLibConstants
 import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.test.mock.mockito.MockBean
 import org.springframework.test.annotation.DirtiesContext
 import org.springframework.test.context.ContextConfiguration
 import org.springframework.test.context.TestPropertySource
@@ -74,6 +77,9 @@
     @Autowired
     lateinit var bluePrintMessageLibPropertyService: BlueprintMessageLibPropertyService
 
+    @MockBean
+    lateinit var meterRegistry: MeterRegistry
+
     @Test
     fun testKafkaScramSslAuthProducerService() {
         runBlocking {
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt
index 70968ef..b93cd42 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt
@@ -1,5 +1,6 @@
 /*
  * Copyright © 2018-2019 AT&T Intellectual Property.
+ * Modifications Copyright © 2021 Bell Canada.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -16,6 +17,7 @@
 
 package org.onap.ccsdk.cds.blueprintsprocessor.message.service
 
+import io.micrometer.core.instrument.MeterRegistry
 import kotlinx.coroutines.delay
 import kotlinx.coroutines.launch
 import kotlinx.coroutines.runBlocking
@@ -32,6 +34,7 @@
 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties
 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
 import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.boot.test.mock.mockito.MockBean
 import org.springframework.test.annotation.DirtiesContext
 import org.springframework.test.context.ContextConfiguration
 import org.springframework.test.context.TestPropertySource
@@ -74,6 +77,9 @@
     @Autowired
     lateinit var bluePrintMessageLibPropertyService: BlueprintMessageLibPropertyService
 
+    @MockBean
+    lateinit var meterRegistry: MeterRegistry
+
     @Test
     fun testProperties() {
         val blueprintMessageConsumerService = bluePrintMessageLibPropertyService.blueprintMessageConsumerService("stream-consumer")
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtilsTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtilsTest.kt
new file mode 100644
index 0000000..849a411
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/utils/BlueprintMessageUtilsTest.kt
@@ -0,0 +1,35 @@
+/*
+ * Copyright © 2021 Bell Canada.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.message.utils
+
+import io.micrometer.core.instrument.Tag
+import org.junit.Test
+import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintConstants
+import kotlin.test.assertEquals
+
+class BlueprintMessageUtilsTest {
+
+    @Test
+    fun testKafkaMetricTag() {
+        val expected = mutableListOf<Tag>(
+            Tag.of(BlueprintConstants.METRIC_TAG_TOPIC, "my-topic")
+        )
+        val tags = BlueprintMessageUtils.kafkaMetricTag("my-topic")
+
+        assertEquals(expected, tags)
+    }
+}
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumer.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumer.kt
index 0ab38c6..661e76b 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumer.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumer.kt
@@ -17,13 +17,16 @@
 
 package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
 
+import io.micrometer.core.instrument.MeterRegistry
 import kotlinx.coroutines.GlobalScope
 import kotlinx.coroutines.channels.consumeEach
 import kotlinx.coroutines.launch
 import kotlinx.coroutines.runBlocking
 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
+import org.onap.ccsdk.cds.blueprintsprocessor.message.BlueprintMessageMetricConstants
 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageLibPropertyService
 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageConsumerService
+import org.onap.ccsdk.cds.blueprintsprocessor.message.utils.BlueprintMessageUtils
 import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintProcessorException
 import org.onap.ccsdk.cds.controllerblueprints.core.jsonAsType
 import org.onap.ccsdk.cds.controllerblueprints.core.logger
@@ -43,8 +46,9 @@
 )
 @Service
 open class BlueprintProcessingKafkaConsumer(
-    private val bluePrintMessageLibPropertyService: BlueprintMessageLibPropertyService,
-    private val executionServiceHandler: ExecutionServiceHandler
+    private val blueprintMessageLibPropertyService: BlueprintMessageLibPropertyService,
+    private val executionServiceHandler: ExecutionServiceHandler,
+    private val meterRegistry: MeterRegistry
 ) {
 
     val log = logger(BlueprintProcessingKafkaConsumer::class)
@@ -69,7 +73,7 @@
 
             /** Get the Message Consumer Service **/
             blueprintMessageConsumerService = try {
-                bluePrintMessageLibPropertyService
+                blueprintMessageLibPropertyService
                     .blueprintMessageConsumerService(CONSUMER_SELECTOR)
             } catch (e: BlueprintProcessorException) {
                 val errorMsg = "Failed creating Kafka consumer message service."
@@ -83,7 +87,7 @@
 
             /** Get the Message Producer Service **/
             val blueprintMessageProducerService = try {
-                bluePrintMessageLibPropertyService
+                blueprintMessageLibPropertyService
                     .blueprintMessageProducerService(PRODUCER_SELECTOR)
             } catch (e: BlueprintProcessorException) {
                 val errorMsg = "Failed creating Kafka producer message service."
@@ -117,6 +121,10 @@
                             val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput)
                             blueprintMessageProducerService.sendMessage(key, executionServiceOutput)
                         } catch (e: Exception) {
+                            meterRegistry.counter(
+                                BlueprintMessageMetricConstants.KAFKA_CONSUMED_MESSAGES_ERROR_COUNTER,
+                                BlueprintMessageUtils.kafkaMetricTag(message.topic())
+                            ).increment()
                             log.error("failed in processing the consumed message : $message", e)
                         } finally {
                             ph.arriveAndDeregister()
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/SelfServiceMetricConstants.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/SelfServiceMetricConstants.kt
index 97c7324..7c8f4ed 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/SelfServiceMetricConstants.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/SelfServiceMetricConstants.kt
@@ -1,3 +1,19 @@
+/*
+ * Copyright © 2021 Bell Canada.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
 
 object SelfServiceMetricConstants {
@@ -6,13 +22,6 @@
 
     private const val PROCESS_PREFIX = "$METRICS_PREFIX.process"
 
-    // TAGS
-    const val TAG_BP_NAME = "blueprint_name"
-    const val TAG_BP_VERSION = "blueprint_version"
-    const val TAG_BP_ACTION = "blueprint_action"
-    const val TAG_BP_STATUS = "status"
-    const val TAG_BP_OUTCOME = "outcome"
-
     // COUNTERS
     const val COUNTER_PROCESS = "$PROCESS_PREFIX.counter"
 
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/Utils.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/BlueprintProcessingUtils.kt
similarity index 76%
rename from ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/Utils.kt
rename to ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/BlueprintProcessingUtils.kt
index aa29383..c04410a 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/Utils.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/BlueprintProcessingUtils.kt
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 2019 Bell Canada.
+ * Copyright © 2021 Bell Canada.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -18,7 +18,7 @@
 import io.micrometer.core.instrument.Tag
 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
 import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
-import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.SelfServiceMetricConstants
+import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintConstants
 import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintException
 import org.springframework.http.HttpStatus
 import org.springframework.http.codec.multipart.FilePart
@@ -68,19 +68,19 @@
 fun cbaMetricTags(executionServiceInput: ExecutionServiceInput): MutableList<Tag> =
     executionServiceInput.actionIdentifiers.let {
         mutableListOf(
-            Tag.of(SelfServiceMetricConstants.TAG_BP_NAME, it.blueprintName),
-            Tag.of(SelfServiceMetricConstants.TAG_BP_VERSION, it.blueprintVersion),
-            Tag.of(SelfServiceMetricConstants.TAG_BP_ACTION, it.actionName)
+            Tag.of(BlueprintConstants.METRIC_TAG_BP_NAME, it.blueprintName),
+            Tag.of(BlueprintConstants.METRIC_TAG_BP_VERSION, it.blueprintVersion),
+            Tag.of(BlueprintConstants.METRIC_TAG_BP_ACTION, it.actionName)
         )
     }
 
 fun cbaMetricTags(executionServiceOutput: ExecutionServiceOutput): MutableList<Tag> =
     executionServiceOutput.let {
         mutableListOf(
-            Tag.of(SelfServiceMetricConstants.TAG_BP_NAME, it.actionIdentifiers.blueprintName),
-            Tag.of(SelfServiceMetricConstants.TAG_BP_VERSION, it.actionIdentifiers.blueprintVersion),
-            Tag.of(SelfServiceMetricConstants.TAG_BP_ACTION, it.actionIdentifiers.actionName),
-            Tag.of(SelfServiceMetricConstants.TAG_BP_STATUS, it.status.code.toString()),
-            Tag.of(SelfServiceMetricConstants.TAG_BP_OUTCOME, it.status.message)
+            Tag.of(BlueprintConstants.METRIC_TAG_BP_NAME, it.actionIdentifiers.blueprintName),
+            Tag.of(BlueprintConstants.METRIC_TAG_BP_VERSION, it.actionIdentifiers.blueprintVersion),
+            Tag.of(BlueprintConstants.METRIC_TAG_BP_ACTION, it.actionIdentifiers.actionName),
+            Tag.of(BlueprintConstants.METRIC_TAG_BP_STATUS, it.status.code.toString()),
+            Tag.of(BlueprintConstants.METRIC_TAG_BP_OUTCOME, it.status.message)
         )
     }
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumerTest.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumerTest.kt
index 3a5cebc..56cc691 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumerTest.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BlueprintProcessingKafkaConsumerTest.kt
@@ -1,5 +1,6 @@
 /*
  *  Copyright © 2019 IBM.
+ *  Modifications Copyright © 2021 Bell Canada.
  *
  *  Licensed under the Apache License, Version 2.0 (the "License");
  *  you may not use this file except in compliance with the License.
@@ -65,7 +66,8 @@
 
             val bluePrintProcessingKafkaConsumer = BlueprintProcessingKafkaConsumer(
                 bluePrintMessageLibPropertyService,
-                executionServiceHandle
+                executionServiceHandle,
+                meterRegistry
             )
 
             launch {
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/BlueprintProcessingUtilsTest.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/BlueprintProcessingUtilsTest.kt
new file mode 100644
index 0000000..10db349
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/BlueprintProcessingUtilsTest.kt
@@ -0,0 +1,96 @@
+/*
+ * Copyright © 2021 Bell Canada.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.utils
+
+import io.micrometer.core.instrument.Tag
+import org.junit.Assert.assertEquals
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ActionIdentifiers
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status
+import org.onap.ccsdk.cds.controllerblueprints.core.BlueprintConstants
+import org.springframework.http.HttpStatus
+import org.springframework.test.context.junit4.SpringRunner
+
+@RunWith(SpringRunner::class)
+class BlueprintProcessingUtilsTest {
+
+    @Test
+    fun `valid Http status codes should be produced for valid parameters`() {
+        val httpStatusCode200 = determineHttpStatusCode(200)
+        assertEquals(HttpStatus.OK, httpStatusCode200)
+
+        val httpStatusCode500 = determineHttpStatusCode(500)
+        assertEquals(HttpStatus.INTERNAL_SERVER_ERROR, httpStatusCode500)
+    }
+
+    @Test
+    fun `Http status code 500 should be produced for any invalid parameter`() {
+        val nonExistentHttpStatusCode = determineHttpStatusCode(999999)
+        assertEquals(HttpStatus.INTERNAL_SERVER_ERROR, nonExistentHttpStatusCode)
+    }
+
+    @Test
+    fun testCbaMetricExecutionInputTags() {
+        val executionServiceInput = ExecutionServiceInput().apply {
+            actionIdentifiers = ActionIdentifiers().apply {
+                blueprintName = "bpName"
+                blueprintVersion = "1.0.0"
+                actionName = "bpAction"
+            }
+        }
+
+        val expectedTags = mutableListOf(
+            Tag.of(BlueprintConstants.METRIC_TAG_BP_NAME, executionServiceInput.actionIdentifiers.blueprintName),
+            Tag.of(BlueprintConstants.METRIC_TAG_BP_VERSION, executionServiceInput.actionIdentifiers.blueprintVersion),
+            Tag.of(BlueprintConstants.METRIC_TAG_BP_ACTION, executionServiceInput.actionIdentifiers.actionName)
+        )
+
+        val metricTag = cbaMetricTags(executionServiceInput)
+
+        assertEquals(expectedTags, metricTag)
+    }
+
+    @Test
+    fun testCbaMetricExecutionOutputTags() {
+        val executionServiceOutput = ExecutionServiceOutput().apply {
+            actionIdentifiers = ActionIdentifiers().apply {
+                blueprintName = "bpName"
+                blueprintVersion = "1.0.0"
+                actionName = "bpAction"
+            }
+            status = Status().apply {
+                code = 200
+                message = "success"
+            }
+        }
+
+        val expectedTags = mutableListOf(
+            Tag.of(BlueprintConstants.METRIC_TAG_BP_NAME, executionServiceOutput.actionIdentifiers.blueprintName),
+            Tag.of(BlueprintConstants.METRIC_TAG_BP_VERSION, executionServiceOutput.actionIdentifiers.blueprintVersion),
+            Tag.of(BlueprintConstants.METRIC_TAG_BP_ACTION, executionServiceOutput.actionIdentifiers.actionName),
+            Tag.of(BlueprintConstants.METRIC_TAG_BP_STATUS, executionServiceOutput.status.code.toString()),
+            Tag.of(BlueprintConstants.METRIC_TAG_BP_OUTCOME, executionServiceOutput.status.message)
+        )
+
+        val metricTag = cbaMetricTags(executionServiceOutput)
+
+        assertEquals(expectedTags, metricTag)
+    }
+}
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/UtilsTest.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/UtilsTest.kt
deleted file mode 100644
index 2238968..0000000
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/utils/UtilsTest.kt
+++ /dev/null
@@ -1,80 +0,0 @@
-package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.utils
-
-import io.micrometer.core.instrument.Tag
-import org.junit.Assert.assertEquals
-import org.junit.Test
-import org.junit.runner.RunWith
-import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ActionIdentifiers
-import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
-import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
-import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status
-import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.SelfServiceMetricConstants
-import org.springframework.http.HttpStatus
-import org.springframework.test.context.junit4.SpringRunner
-
-@RunWith(SpringRunner::class)
-class UtilsTest {
-
-    @Test
-    fun `valid Http status codes should be produced for valid parameters`() {
-        val httpStatusCode200 = determineHttpStatusCode(200)
-        assertEquals(HttpStatus.OK, httpStatusCode200)
-
-        val httpStatusCode500 = determineHttpStatusCode(500)
-        assertEquals(HttpStatus.INTERNAL_SERVER_ERROR, httpStatusCode500)
-    }
-
-    @Test
-    fun `Http status code 500 should be produced for any invalid parameter`() {
-        val nonExistentHttpStatusCode = determineHttpStatusCode(999999)
-        assertEquals(HttpStatus.INTERNAL_SERVER_ERROR, nonExistentHttpStatusCode)
-    }
-
-    @Test
-    fun testCbaMetricExecutionInputTags() {
-        val executionServiceInput = ExecutionServiceInput().apply {
-            actionIdentifiers = ActionIdentifiers().apply {
-                blueprintName = "bpName"
-                blueprintVersion = "1.0.0"
-                actionName = "bpAction"
-            }
-        }
-
-        val expectedTags = mutableListOf(
-            Tag.of(SelfServiceMetricConstants.TAG_BP_NAME, executionServiceInput.actionIdentifiers.blueprintName),
-            Tag.of(SelfServiceMetricConstants.TAG_BP_VERSION, executionServiceInput.actionIdentifiers.blueprintVersion),
-            Tag.of(SelfServiceMetricConstants.TAG_BP_ACTION, executionServiceInput.actionIdentifiers.actionName)
-        )
-
-        val metricTag = cbaMetricTags(executionServiceInput)
-
-        assertEquals(expectedTags, metricTag)
-    }
-
-    @Test
-    fun testCbaMetricExecutionOutputTags() {
-        val executionServiceOutput = ExecutionServiceOutput().apply {
-            actionIdentifiers = ActionIdentifiers().apply {
-                blueprintName = "bpName"
-                blueprintVersion = "1.0.0"
-                actionName = "bpAction"
-            }
-            status = Status().apply {
-                code = 200
-                message = "success"
-            }
-        }
-
-        val expectedTags = mutableListOf(
-            Tag.of(SelfServiceMetricConstants.TAG_BP_NAME, executionServiceOutput.actionIdentifiers.blueprintName),
-            Tag.of(SelfServiceMetricConstants.TAG_BP_VERSION, executionServiceOutput.actionIdentifiers.blueprintVersion),
-            Tag.of(SelfServiceMetricConstants.TAG_BP_ACTION, executionServiceOutput.actionIdentifiers.actionName),
-            Tag.of(SelfServiceMetricConstants.TAG_BP_STATUS, executionServiceOutput.status.code.toString()),
-            Tag.of(SelfServiceMetricConstants.TAG_BP_OUTCOME, executionServiceOutput.status.message)
-        )
-
-        val metricTag = cbaMetricTags(executionServiceOutput)
-
-        assertEquals(expectedTags, metricTag)
-    }
-}
diff --git a/ms/blueprintsprocessor/parent/pom.xml b/ms/blueprintsprocessor/parent/pom.xml
index 6bc5652..b0c60a4 100755
--- a/ms/blueprintsprocessor/parent/pom.xml
+++ b/ms/blueprintsprocessor/parent/pom.xml
@@ -686,6 +686,12 @@
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-validation</artifactId>
         </dependency>
+
+        <!-- Micrometer Prometheus -->
+        <dependency>
+            <groupId>io.micrometer</groupId>
+            <artifactId>micrometer-registry-prometheus</artifactId>
+        </dependency>
     </dependencies>
 
     <repositories>