Message prioritization error handling

Error handling for message processor and forward errors to output sink

Optimize and expose message prioritization state service dependencies

Issue-ID: CCSDK-1917
Signed-off-by: Brinda Santh <bs2796@att.com>
Change-Id: Iee04811871de5306ba1f7e37d6e6c78f1a969181
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractTopologyComponents.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractTopologyComponents.kt
index d89f713..d114da5 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractTopologyComponents.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/AbstractTopologyComponents.kt
@@ -34,7 +34,7 @@
         this.processorContext = context
         /** Get the State service to update in store */
         this.messagePrioritizationStateService = BluePrintDependencyService
-                .instance(MessagePrioritizationStateService::class)
+                .messagePrioritizationStateService()
 
     }
 
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt
index ef9d5a0..967cc19 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumer.kt
@@ -19,6 +19,7 @@
 import org.apache.kafka.common.serialization.Serdes
 import org.apache.kafka.streams.Topology
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.topology.MessagePrioritizationSerde
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils.MessageProcessorUtils.bluePrintProcessorSupplier
 import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaStreamsBasicAuthConsumerProperties
 import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
 import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
@@ -74,9 +75,12 @@
                         Serdes.String().serializer(), MessagePrioritizationSerde().serializer(),
                         MessagePrioritizationConstants.PROCESSOR_PRIORITIZE)
 
+                /** To receive completed and error messages */
                 topology.addSink(MessagePrioritizationConstants.SINK_OUTPUT,
                         prioritizationConfiguration.outputTopic,
                         Serdes.String().serializer(), MessagePrioritizationSerde().serializer(),
+                        MessagePrioritizationConstants.PROCESSOR_PRIORITIZE,
+                        MessagePrioritizationConstants.PROCESSOR_AGGREGATE,
                         MessagePrioritizationConstants.PROCESSOR_OUTPUT)
 
                 // Output will be sent to the group-output topic from Processor API
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt
index d874cef..3358a56 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationData.kt
@@ -28,8 +28,8 @@
     EXPIRED("expired"),
     PRIORITIZED("prioritized"),
     AGGREGATED("aggregated"),
-    IGNORED("ignored"),
     COMPLETED("completed"),
+    ERROR("error")
 }
 
 open class PrioritizationConfiguration : Serializable {
@@ -59,7 +59,6 @@
     lateinit var id: String
     var group: String? = null
     var state: String? = null
-    var notifyMessage: String? = null
 }
 
 data class CorrelationCheckResponse(var message: String? = null,
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt
index 94fedf4..ec061ad 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizeExtensions.kt
@@ -16,21 +16,27 @@
 
 package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization
 
-import org.apache.kafka.streams.processor.ProcessorSupplier
 import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService
+import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.AbstractComponentFunction
 import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
 
 
-fun <K, V> bluePrintProcessorSupplier(name: String, prioritizationConfiguration: PrioritizationConfiguration)
-        : ProcessorSupplier<K, V> {
-    return ProcessorSupplier<K, V> {
-        // Dynamically resolve the Prioritization Processor
-        val processorInstance = BluePrintDependencyService.instance<AbstractMessagePrioritizeProcessor<K, V>>(name)
-        processorInstance.prioritizationConfiguration = prioritizationConfiguration
-        processorInstance
-    }
-}
+/**
+ * Register the MessagePrioritizationStateService and exposed dependency
+ */
+fun BluePrintDependencyService.messagePrioritizationStateService(): MessagePrioritizationStateService =
+        instance(MessagePrioritizationStateService::class)
 
+/**
+ * Expose messagePrioritizationStateService to AbstractComponentFunction
+ */
+fun AbstractComponentFunction.messagePrioritizationStateService() =
+        BluePrintDependencyService.messagePrioritizationStateService()
+
+/**
+ * MessagePrioritization correlation extensions
+ */
 fun MessagePrioritization.toFormatedCorrelation(): String {
     val ascendingKey = this.correlationId!!.split(",")
             .map { it.trim() }.sorted().joinToString(",")
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt
new file mode 100644
index 0000000..382cb9c
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/api/MessagePrioritizationApi.kt
@@ -0,0 +1,55 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.api
+
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.UpdateStateRequest
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.db.MessagePrioritization
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.service.MessagePrioritizationStateService
+import org.onap.ccsdk.cds.blueprintsprocessor.rest.service.monoMdc
+import org.springframework.http.MediaType
+import org.springframework.web.bind.annotation.*
+
+@RestController
+@RequestMapping(value = ["/api/v1/message-prioritization"])
+open class MessagePrioritizationApi(private val messagePrioritizationStateService: MessagePrioritizationStateService) {
+
+    @GetMapping(path = ["/ping"], produces = [MediaType.APPLICATION_JSON_VALUE])
+    @ResponseBody
+    fun ping(): String = "Success"
+
+
+    @GetMapping(path = ["/{id}"], produces = [MediaType.APPLICATION_JSON_VALUE])
+    @ResponseBody
+    fun messagePrioritization(@PathVariable(value = "id") id: String) = monoMdc {
+        messagePrioritizationStateService.getMessage(id)
+    }
+
+    @PostMapping(path = ["/"], produces = [MediaType.APPLICATION_JSON_VALUE],
+            consumes = [MediaType.APPLICATION_JSON_VALUE])
+    @ResponseBody
+    fun saveMessagePrioritization(@RequestBody messagePrioritization: MessagePrioritization) = monoMdc {
+        messagePrioritizationStateService.saveMessage(messagePrioritization)
+    }
+
+    @PostMapping(path = ["/update-state"], produces = [MediaType.APPLICATION_JSON_VALUE],
+            consumes = [MediaType.APPLICATION_JSON_VALUE])
+    fun updateMessagePrioritizationState(@RequestBody updateMessageState: UpdateStateRequest) =
+            monoMdc {
+                messagePrioritizationStateService.setMessageState(updateMessageState.id,
+                        updateMessageState.state!!)
+            }
+}
\ No newline at end of file
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritizationRepositories.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritizationRepositories.kt
index 307d932..69c8107 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritizationRepositories.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/MessagePrioritizationRepositories.kt
@@ -71,19 +71,27 @@
 
     @Modifying
     @Transactional
-    @Query("UPDATE MessagePrioritization pm SET pm.state = :state WHERE pm.id = :id")
-    fun setStatusForMessageId(id: String, state: String): Int
+    @Query("UPDATE MessagePrioritization SET state = :state, updatedDate = :currentDate " +
+            "WHERE id = :id")
+    fun setStateForMessageId(id: String, state: String, currentDate: Date): Int
 
     @Modifying
     @Transactional
-    @Query("UPDATE MessagePrioritization pm SET pm.state = :state WHERE pm.id IN :ids")
-    fun setStatusForMessageIds(ids: List<String>, state: String): Int
+    @Query("UPDATE MessagePrioritization SET state = :state, updatedDate = :currentDate " +
+            "WHERE id IN :ids")
+    fun setStateForMessageIds(ids: List<String>, state: String, currentDate: Date): Int
 
     @Modifying
     @Transactional
-    @Query("UPDATE MessagePrioritization pm SET pm.aggregatedMessageIds = :aggregatedMessageIds " +
-            "WHERE pm.id = :id")
-    fun setAggregatedMessageIds(id: String, aggregatedMessageIds: String): Int
+    @Query("UPDATE MessagePrioritization SET state = :state, error = :error, updatedDate = :currentDate " +
+            "WHERE id = :id")
+    fun setStateAndErrorForMessageId(id: String, state: String, error: String, currentDate: Date): Int
+
+    @Modifying
+    @Transactional
+    @Query("UPDATE MessagePrioritization SET state = :state, " +
+            "aggregatedMessageIds = :aggregatedMessageIds, updatedDate = :currentDate WHERE id = :id")
+    fun setStateAndAggregatedMessageIds(id: String, state: String, aggregatedMessageIds: String, currentDate: Date): Int
 
     @Modifying
     @Transactional
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageEntity.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageEntity.kt
index 4973cdf..1825f91 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageEntity.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/db/PrioritizationMessageEntity.kt
@@ -49,6 +49,10 @@
     var message: String? = null
 
     @Lob
+    @Column(name = "error", nullable = true)
+    var error: String? = null
+
+    @Lob
     @Column(name = "aggregated_message_ids", nullable = true)
     var aggregatedMessageIds: String? = null
 
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt
index e4369fc..8424226 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/service/MessagePrioritizationStateService.kt
@@ -33,15 +33,20 @@
 
     suspend fun getMessage(id: String): MessagePrioritization
 
+    suspend fun getMessages(ids: List<String>): List<MessagePrioritization>?
+
     suspend fun getExpiryEligibleMessages(count: Int): List<MessagePrioritization>?
 
-    suspend fun getMessageForStatesNotExpiredIn(group: String, states: List<String>, count: Int): List<MessagePrioritization>?
+    suspend fun getMessageForStatesNotExpiredIn(group: String, states: List<String>, count: Int)
+            : List<MessagePrioritization>?
 
-    suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int): List<MessagePrioritization>?
+    suspend fun getMessageForStatesExpired(group: String, states: List<String>, count: Int)
+            : List<MessagePrioritization>?
 
     suspend fun getExpiredMessages(group: String, expiryDate: Date, count: Int): List<MessagePrioritization>?
 
-    suspend fun getCorrelatedMessages(group: String, states: List<String>, types: List<String>?, correlationIds: String): List<MessagePrioritization>?
+    suspend fun getCorrelatedMessages(group: String, states: List<String>, types: List<String>?,
+                                      correlationIds: String): List<MessagePrioritization>?
 
     suspend fun updateMessagesState(ids: List<String>, state: String)
 
@@ -51,7 +56,9 @@
 
     suspend fun setMessagesState(ids: List<String>, state: String)
 
-    suspend fun updateMessageStateAndGroupedIds(id: String, state: String, groupedIds: List<String>): MessagePrioritization
+    suspend fun setMessageStateANdError(id: String, state: String, error: String)
+
+    suspend fun setMessageStateAndAggregatedIds(id: String, state: String, aggregatedIds: List<String>)
 
     suspend fun deleteMessage(id: String)
 
@@ -64,7 +71,8 @@
 
 @Service
 open class MessagePrioritizationStateServiceImpl(
-        private val prioritizationMessageRepository: PrioritizationMessageRepository) : MessagePrioritizationStateService {
+        private val prioritizationMessageRepository: PrioritizationMessageRepository)
+    : MessagePrioritizationStateService {
 
     private val log = logger(MessagePrioritizationStateServiceImpl::class)
 
@@ -82,6 +90,10 @@
                 ?: throw BluePrintProcessorException("couldn't find message for id($id)")
     }
 
+    override suspend fun getMessages(ids: List<String>): List<MessagePrioritization>? {
+        return prioritizationMessageRepository.findAllById(ids)
+    }
+
     override suspend fun getExpiryEligibleMessages(count: Int): List<MessagePrioritization>? {
         return prioritizationMessageRepository
                 .findByStateInAndExpiredDate(arrayListOf(MessageState.NEW.name, MessageState.WAIT.name),
@@ -115,6 +127,7 @@
         }
     }
 
+    @Transactional
     override suspend fun updateMessagesState(ids: List<String>, state: String) {
         ids.forEach {
             val updated = updateMessageState(it, state)
@@ -124,12 +137,17 @@
 
     @Transactional
     override suspend fun setMessageState(id: String, state: String) {
-        prioritizationMessageRepository.setStatusForMessageId(id, state)
+        prioritizationMessageRepository.setStateForMessageId(id, state, Date())
     }
 
     @Transactional
     override suspend fun setMessagesState(ids: List<String>, state: String) {
-        prioritizationMessageRepository.setStatusForMessageIds(ids, state)
+        prioritizationMessageRepository.setStateForMessageIds(ids, state, Date())
+    }
+
+    @Transactional
+    override suspend fun setMessageStateANdError(id: String, state: String, error: String) {
+        prioritizationMessageRepository.setStateAndErrorForMessageId(id, state, error, Date())
     }
 
     @Transactional
@@ -141,16 +159,10 @@
         return saveMessage(updateMessage)
     }
 
-    override suspend fun updateMessageStateAndGroupedIds(id: String, state: String, groupedMessageIds: List<String>)
-            : MessagePrioritization {
-
-        val groupedIds = groupedMessageIds.joinToString(",")
-        val updateMessage = getMessage(id).apply {
-            this.updatedDate = Date()
-            this.state = state
-            this.aggregatedMessageIds = groupedIds
-        }
-        return saveMessage(updateMessage)
+    @Transactional
+    override suspend fun setMessageStateAndAggregatedIds(id: String, state: String, aggregatedIds: List<String>) {
+        val groupedIds = aggregatedIds.joinToString(",")
+        prioritizationMessageRepository.setStateAndAggregatedMessageIds(id, state, groupedIds, Date())
     }
 
     override suspend fun deleteMessage(id: String) {
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt
index 8dd4019..45f5c77 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessageAggregateProcessor.kt
@@ -32,14 +32,36 @@
         log.info("@@@@@ received in aggregation processor key($key), value($value)")
         val ids = value.split(",").map { it.trim() }
         if (!ids.isNullOrEmpty()) {
-            if (ids.size == 1) {
-                processorContext.forward(key, ids.first(), To.child(MessagePrioritizationConstants.PROCESSOR_OUTPUT))
-            } else {
-                /** Implement Aggregation logic in overridden class, If necessary,
-                 Populate New Message and Update status with Prioritized, Forward the message to next processor */
-                handleAggregation(ids)
-                /** Update all messages to Aggregated state */
-                messagePrioritizationStateService.setMessagesState(ids, MessageState.AGGREGATED.name)
+            try {
+                if (ids.size == 1) {
+                    processorContext.forward(key, ids.first(), To.child(MessagePrioritizationConstants.PROCESSOR_OUTPUT))
+                } else {
+                    /** Implement Aggregation logic in overridden class, If necessary,
+                    Populate New Message and Update status with Prioritized, Forward the message to next processor */
+                    handleAggregation(ids)
+                    /** Update all messages to Aggregated state */
+                    messagePrioritizationStateService.setMessagesState(ids, MessageState.AGGREGATED.name)
+                }
+            } catch (e: Exception) {
+                val error = "failed in Aggregate message($ids) : ${e.message}"
+                log.error(error, e)
+                val storeMessages = messagePrioritizationStateService.getMessages(ids)
+                if (!storeMessages.isNullOrEmpty()) {
+                    storeMessages.forEach { messagePrioritization ->
+                        try {
+                            /** Update the data store */
+                            messagePrioritizationStateService.setMessageStateANdError(messagePrioritization.id,
+                                    MessageState.ERROR.name, error)
+                            /** Publish to Error topic */
+                            this.processorContext.forward(messagePrioritization.id, messagePrioritization,
+                                    To.child(MessagePrioritizationConstants.SINK_OUTPUT))
+                        } catch (sendException: Exception) {
+                            log.error("failed to update/publish error message(${messagePrioritization.id}) : " +
+                                    "${sendException.message}", e)
+                        }
+
+                    }
+                }
             }
         }
     }
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt
index 5a5aa25..7dde265 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/topology/MessagePrioritizeProcessor.kt
@@ -41,12 +41,22 @@
 
     override suspend fun processNB(key: ByteArray, value: ByteArray) {
         log.info("***** received in prioritize processor key(${String(key)})")
-        val data = JacksonUtils.readValue(String(value), MessagePrioritization::class.java)
+        val messagePrioritize = JacksonUtils.readValue(String(value), MessagePrioritization::class.java)
                 ?: throw BluePrintProcessorException("failed to convert")
-        // Save the Message
-        messagePrioritizationStateService.saveMessage(data)
-        handleCorrelationAndNextStep(data)
-
+        try {
+            // Save the Message
+            messagePrioritizationStateService.saveMessage(messagePrioritize)
+            handleCorrelationAndNextStep(messagePrioritize)
+        } catch (e: Exception) {
+            messagePrioritize.error = "failed in Prioritize message(${messagePrioritize.id}) : ${e.message}"
+            log.error(messagePrioritize.error)
+            /** Update the data store */
+            messagePrioritizationStateService.setMessageStateANdError(messagePrioritize.id, MessageState.ERROR.name,
+                    messagePrioritize.error!!)
+            /** Publish to Output topic */
+            this.processorContext.forward(messagePrioritize.id, messagePrioritize,
+                    To.child(MessagePrioritizationConstants.SINK_OUTPUT))
+        }
     }
 
     override fun init(context: ProcessorContext) {
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt
new file mode 100644
index 0000000..02614d8
--- /dev/null
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/utils/MessageProcessorUtils.kt
@@ -0,0 +1,36 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.utils
+
+import org.apache.kafka.streams.processor.ProcessorSupplier
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.AbstractMessagePrioritizeProcessor
+import org.onap.ccsdk.cds.blueprintsprocessor.functions.message.prioritization.PrioritizationConfiguration
+import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService
+
+object MessageProcessorUtils {
+
+    fun <K, V> bluePrintProcessorSupplier(name: String, prioritizationConfiguration: PrioritizationConfiguration)
+            : ProcessorSupplier<K, V> {
+        return ProcessorSupplier<K, V> {
+            // Dynamically resolve the Prioritization Processor
+            val processorInstance = BluePrintDependencyService.instance<AbstractMessagePrioritizeProcessor<K, V>>(name)
+            processorInstance.prioritizationConfiguration = prioritizationConfiguration
+            processorInstance
+        }
+    }
+
+}
\ No newline at end of file