Add kafka support in Policy CSIT

Issue-ID: POLICY-4402
Signed-off-by: zrrmmua <ramesh.murugan.iyer@est.tech>
Change-Id: I802c19a3c9817d304164eba634adb8c119aa4ced
diff --git a/csit/resources/tests/apex-pdp-common.robot b/csit/resources/tests/apex-pdp-common.robot
index 8ae63af..e645831 100644
--- a/csit/resources/tests/apex-pdp-common.robot
+++ b/csit/resources/tests/apex-pdp-common.robot
@@ -27,8 +27,8 @@
 
 CheckLogMessage
     [Documentation]    Read log messages received and check for expected content.
-    [Arguments]    ${status}    ${expectedMsg}
-    ${result}=     CheckTopic     APEX-CL-MGT    ${status}
+    [Arguments]    ${topic}    ${status}    ${expectedMsg}
+    ${result}=     CheckKafkaTopic     ${topic}    ${status}
     Should Contain    ${result}    ${expectedMsg}
 
 ValidatePolicyExecution
diff --git a/csit/resources/tests/apex-pdp-test.robot b/csit/resources/tests/apex-pdp-test.robot
index b023226..5e4ea34 100644
--- a/csit/resources/tests/apex-pdp-test.robot
+++ b/csit/resources/tests/apex-pdp-test.robot
@@ -31,17 +31,17 @@
      CreatePolicy  /policy/api/v1/policytypes/onap.policies.native.Apex/versions/1.0.0/policies  200  ${postjson}  ${policyName}  1.0.0
      DeployPolicy
      Wait Until Keyword Succeeds    2 min    5 sec    QueryPolicyStatus  ${policyName}  defaultGroup  apex  ${pdpName}  onap.policies.native.Apex
-     GetTopic     APEX-CL-MGT
-     Wait Until Keyword Succeeds    2 min    5 sec    TriggerAndVerifyTestPnfPolicy
+     GetKafkaTopic    apex-cl-mgt
+     Wait Until Keyword Succeeds    2 min    5 sec    TriggerAndVerifyTestPnfPolicy    apex-cl-mgt
 
-ExecuteApexTestVnfPolicy
-     Set Test Variable    ${policyName}    onap.policies.apex.vnf.Test
-     ${postjson}=  Get File  ${CURDIR}/data/${policyName}.json
-     CreatePolicy  /policy/api/v1/policytypes/onap.policies.native.Apex/versions/1.0.0/policies  200  ${postjson}  ${policyName}  1.0.0
-     DeployPolicy
-     Wait Until Keyword Succeeds    2 min    5 sec    QueryPolicyStatus  ${policyName}  defaultGroup  apex  ${pdpName}  onap.policies.native.Apex
-     GetTopic     APEX-CL-MGT
-     Wait Until Keyword Succeeds    2 min    5 sec    TriggerAndVerifyTestVnfPolicy
+#ExecuteApexTestVnfPolicy
+#     Set Test Variable    ${policyName}    onap.policies.apex.vnf.Test
+#     ${postjson}=  Get File  ${CURDIR}/data/${policyName}.json
+#     CreatePolicy  /policy/api/v1/policytypes/onap.policies.native.Apex/versions/1.0.0/policies  200  ${postjson}  ${policyName}  1.0.0
+#     DeployPolicy
+#     Wait Until Keyword Succeeds    2 min    5 sec    QueryPolicyStatus  ${policyName}  defaultGroup  apex  ${pdpName}  onap.policies.native.Apex
+#     GetTopic     apex-cl-mgt
+#     Wait Until Keyword Succeeds    2 min    5 sec    TriggerAndVerifyTestVnfPolicy
 
 ExecuteApexTestPnfPolicyWithMetadataSet
       Set Test Variable    ${policyName}    onap.policies.apex.pnf.metadataSet.Test
@@ -51,17 +51,17 @@
       CreateNodeTemplate  /policy/api/v1/nodetemplates  200  ${postjson}  1
       DeployPolicy
       Wait Until Keyword Succeeds    2 min    5 sec    QueryPolicyStatus  ${policyName}  defaultGroup  apex  ${pdpName}  onap.policies.native.Apex
-      GetTopic     APEX-CL-MGT2
-      Wait Until Keyword Succeeds    2 min    5 sec    TriggerAndVerifyTestPnfPolicy
+      GetKafkaTopic     apex-cl-mgt2
+      Wait Until Keyword Succeeds    2 min    5 sec    TriggerAndVerifyTestPnfPolicy    apex-cl-mgt2
 
 Metrics
      [Documentation]  Verify policy-apex-pdp is exporting prometheus metrics
      ${auth}=  PolicyAdminAuth
      ${resp}=  PerformGetRequest  ${APEX_IP}  /metrics  200  null  ${auth}
-     Should Contain  ${resp.text}  pdpa_policy_deployments_total{operation="deploy",status="TOTAL",} 4.0
-     Should Contain  ${resp.text}  pdpa_policy_deployments_total{operation="deploy",status="SUCCESS",} 4.0
-     Should Contain  ${resp.text}  pdpa_policy_executions_total{status="SUCCESS",} 3.0
-     Should Contain  ${resp.text}  pdpa_policy_executions_total{status="TOTAL",} 3.0
+     Should Contain  ${resp.text}  pdpa_policy_deployments_total{operation="deploy",status="TOTAL",} 3.0
+     Should Contain  ${resp.text}  pdpa_policy_deployments_total{operation="deploy",status="SUCCESS",} 3.0
+     Should Contain  ${resp.text}  pdpa_policy_executions_total{status="SUCCESS",} 6.0
+     Should Contain  ${resp.text}  pdpa_policy_executions_total{status="TOTAL",} 6.0
      Should Match  ${resp.text}  *pdpa_engine_event_executions{engine_instance_id="NSOApexEngine-*:0.0.1",}*
      Should Match  ${resp.text}  *pdpa_engine_event_executions{engine_instance_id="MyApexEngine-*:0.0.1",}*
      Should Match  ${resp.text}  *pdpa_engine_state{engine_instance_id=*,} 2.0*
@@ -78,18 +78,16 @@
 
 TriggerAndVerifyTestPnfPolicy
     [Documentation]    Send TestPnf policy trigger event to DMaaP and read notifications to verify policy execution
-    Create Session   apexSession  http://${DMAAP_IP}   max_retries=1
+    [Arguments]    ${topic}
     ${data}=    Get Binary File     ${CURDIR}/data/VesEventForPnfPolicy.json
-    &{headers}=  Create Dictionary    Content-Type=application/json    Accept=application/json
-    ${resp}=    POST On Session    apexSession    /events/unauthenticated.DCAE_CL_OUTPUT    data=${data}   headers=${headers}
-    Should Be Equal As Strings    ${resp.status_code}   200
-    Run Keyword    CheckLogMessage    ACTIVE    VES event has been received. Going to fetch details from AAI.
-    Run Keyword    CheckLogMessage    SUCCESS    Received response from AAI successfully. Hostname in AAI matches with the one in Ves event. Going to make the update-config request to CDS.
-    Run Keyword    CheckLogMessage    FINAL_SUCCESS    Successfully processed the VES event. Hostname is updated.
+    ${resp}=    Run Process    ${CURDIR}/kafka_producer.py    unauthenticated.dcae_cl_output    ${data}
+    Run Keyword    CheckLogMessage    ${topic}    ACTIVE    VES event has been received. Going to fetch details from AAI.
+    Run Keyword    CheckLogMessage    ${topic}    SUCCESS    Received response from AAI successfully. Hostname in AAI matches with the one in Ves event. Going to make the update-config request to CDS.
+    Run Keyword    CheckLogMessage    ${topic}    FINAL_SUCCESS    Successfully processed the VES event. Hostname is updated.
 
 TriggerAndVerifyTestVnfPolicy
     [Documentation]    Send TestVnf policy trigger event to DMaaP and read notifications to verify policy execution
-    Create Session   apexSession  http://${DMAAP_IP}   max_retries=1
+    Create Session   apexSession  http://${KAFKA_IP}   max_retries=1
     ${data}=    Get Binary File     ${CURDIR}/data/VesEventForVnfPolicy.json
     &{headers}=  Create Dictionary    Content-Type=application/json    Accept=application/json
     ${resp}=    POST On Session    apexSession    /events/unauthenticated.DCAE_POLICY_EXAMPLE_OUTPUT    data=${data}   headers=${headers}
diff --git a/csit/resources/tests/apex-slas.robot b/csit/resources/tests/apex-slas.robot
index 408b0ad..4191bb2 100644
--- a/csit/resources/tests/apex-slas.robot
+++ b/csit/resources/tests/apex-slas.robot
@@ -23,13 +23,10 @@
     CreatePolicy  /policy/api/v1/policytypes/onap.policies.native.Apex/versions/1.0.0/policies  200  ${postjson}  ${policyName}  1.0.0
     DeployPolicy
     Wait Until Keyword Succeeds    2 min    5 sec    QueryPolicyStatus  ${policyName}  defaultGroup  apex  ${pdpName}  onap.policies.native.Apex
-    GetTopic     APEX-CL-MGT
-    Create Session   apexSession  http://${DMAAP_IP}   max_retries=1
+    GetKafkaTopic     apex-cl-mgt
     ${data}=    Get Binary File     ${CURDIR}/data/VesEventForPnfPolicy.json
-    &{headers}=  Create Dictionary    Content-Type=application/json    Accept=application/json
     ${eventStartTime}=  Get Current Date
-    ${resp}=    POST On Session    apexSession    /events/unauthenticated.DCAE_CL_OUTPUT    data=${data}   headers=${headers}
-    Should Be Equal As Strings    ${resp.status_code}   200
+    ${resp}=    Run Process    ${CURDIR}/kafka_producer.py    unauthenticated.dcae_cl_output    ${data}
     ${eventEndTime}=  Get Current Date
     ValidateEventExecution    ${eventStartTime}  ${eventEndTime}  10
 
@@ -53,13 +50,10 @@
     CreateNodeTemplate  /policy/api/v1/nodetemplates  200  ${postjson}  1
     DeployPolicy
     Wait Until Keyword Succeeds    2 min    5 sec    QueryPolicyStatus  ${policyName}  defaultGroup  apex  ${pdpName}  onap.policies.native.Apex
-    GetTopic     APEX-CL-MGT2
-    Create Session   apexSession  http://${DMAAP_IP}   max_retries=1
+    GetKafkaTopic     apex-cl-mgt2
     ${data}=    Get Binary File     ${CURDIR}/data/VesEventForVnfPolicy.json
-    &{headers}=  Create Dictionary    Content-Type=application/json    Accept=application/json
     ${eventStartTime}=  Get Current Date
-    ${resp}=    POST On Session    apexSession    /events/unauthenticated.DCAE_POLICY_EXAMPLE_OUTPUT    data=${data}   headers=${headers}
-    Should Be Equal As Strings    ${resp.status_code}   200
+    ${resp}=    Run Process    ${CURDIR}/kafka_producer.py    unauthenticated.dcae_policy_example_output    ${data}
     ${eventEndTime}=  Get Current Date
     ValidateEventExecution    ${eventStartTime}  ${eventEndTime}  0.2
 
diff --git a/csit/resources/tests/common-library.robot b/csit/resources/tests/common-library.robot
index f5db8e0..531572d 100644
--- a/csit/resources/tests/common-library.robot
+++ b/csit/resources/tests/common-library.robot
@@ -150,9 +150,9 @@
 
 CheckKafkaTopic
     [Arguments]    ${topic}    ${expected_status}
-    ${resp}=    Run Process    ${CURDIR}/kafka_consumer.py    ${topic}    30    ${expected_status}
-    Log to console  Received response from kafka ${resp.stdout}
-    Should Contain    ${resp.text}    ${expected_status}
+    ${resp}=    Run Process    ${CURDIR}/kafka_consumer.py    ${topic}    60    ${expected_status}
+    Should Contain    ${resp.stdout}    ${expected_status}
+    [Return]    ${resp.stdout}
 
 GetKafkaTopic
     [Arguments]    ${topic}
diff --git a/csit/resources/tests/data/onap.policies.apex.pnf.Test.json b/csit/resources/tests/data/onap.policies.apex.pnf.Test.json
index 0552293..fc9b521 100644
--- a/csit/resources/tests/data/onap.policies.apex.pnf.Test.json
+++ b/csit/resources/tests/data/onap.policies.apex.pnf.Test.json
@@ -32,7 +32,7 @@
                 "taskParameters": [
                   {
                     "key": "logUrl",
-                    "value": "http://message-router:3904/events/APEX-CL-MGT"
+                    "value": "http://localhost:8082/topics/apex-cl-mgt"
                   }
                 ]
               },
@@ -2775,10 +2775,23 @@
             "eventInputParameters": {
               "SimpleCL_DCAEConsumer": {
                 "carrierTechnologyParameters": {
-                  "carrierTechnology": "RESTCLIENT",
-                  "parameterClassName": "org.onap.policy.apex.plugins.event.carrier.restclient.RestClientCarrierTechnologyParameters",
+                  "carrierTechnology": "KAFKA",
+                  "parameterClassName": "org.onap.policy.apex.plugins.event.carrier.kafka.KafkaCarrierTechnologyParameters",
                   "parameters": {
-                    "url": "http://message-router:3904/events/unauthenticated.DCAE_CL_OUTPUT/cl/apex?timeout=30000"
+                    "bootstrapServers": "kafka:9092",
+                    "groupId": "apex-grp",
+                    "enableAutoCommit": true,
+                    "autoCommitTime": 1000,
+                    "sessionTimeout": 30000,
+                    "consumerPollTime": 100,
+                    "consumerTopicList": [
+                      "unauthenticated.dcae_cl_output"
+                    ],
+                    "keyDeserializer": "org.apache.kafka.common.serialization.StringDeserializer",
+                    "valueDeserializer": "org.apache.kafka.common.serialization.StringDeserializer",
+                    "kafkaProperties": [
+
+                    ]
                   }
                 },
                 "eventProtocolParameters": {
@@ -2876,10 +2889,21 @@
               },
               "SimpleCL_logOutputter": {
                 "carrierTechnologyParameters": {
-                  "carrierTechnology": "RESTCLIENT",
-                  "parameterClassName": "org.onap.policy.apex.plugins.event.carrier.restclient.RestClientCarrierTechnologyParameters",
+                  "carrierTechnology": "KAFKA",
+                  "parameterClassName": "org.onap.policy.apex.plugins.event.carrier.kafka.KafkaCarrierTechnologyParameters",
                   "parameters": {
-                    "url": "http://message-router:3904/events/APEX-CL-MGT"
+                    "bootstrapServers": "kafka:9092",
+                    "acks": "all",
+                    "retries": 0,
+                    "batchSize": 16384,
+                    "lingerTime": 1,
+                    "bufferMemory": 33554432,
+                    "producerTopic": "apex-cl-mgt",
+                    "keySerializer": "org.apache.kafka.common.serialization.StringSerializer",
+                    "valueSerializer": "org.apache.kafka.common.serialization.StringSerializer",
+                    "kafkaProperties": [
+
+                    ]
                   }
                 },
                 "eventProtocolParameters": {
diff --git a/csit/resources/tests/data/onap.policies.apex.pnf.metadataSet.Test.json b/csit/resources/tests/data/onap.policies.apex.pnf.metadataSet.Test.json
index 20338ff..d9cd87e 100644
--- a/csit/resources/tests/data/onap.policies.apex.pnf.metadataSet.Test.json
+++ b/csit/resources/tests/data/onap.policies.apex.pnf.metadataSet.Test.json
@@ -36,7 +36,7 @@
                 "taskParameters": [
                   {
                     "key": "logUrl",
-                    "value": "http://message-router:3904/events/APEX-CL-MGT2"
+                    "value": "http://kafka:9092/topics/apex-cl-mgt2"
                   }
                 ]
               }
@@ -44,10 +44,23 @@
             "eventInputParameters": {
               "SimpleCL_DCAEConsumer2": {
                 "carrierTechnologyParameters": {
-                  "carrierTechnology": "RESTCLIENT",
-                  "parameterClassName": "org.onap.policy.apex.plugins.event.carrier.restclient.RestClientCarrierTechnologyParameters",
+                  "carrierTechnology": "KAFKA",
+                  "parameterClassName": "org.onap.policy.apex.plugins.event.carrier.kafka.KafkaCarrierTechnologyParameters",
                   "parameters": {
-                    "url": "http://message-router:3904/events/unauthenticated.DCAE_CL_OUTPUT/cl/apex?timeout=30000"
+                    "bootstrapServers": "kafka:9092",
+                    "groupId": "apex-grp2",
+                    "enableAutoCommit": true,
+                    "autoCommitTime": 1000,
+                    "sessionTimeout": 30000,
+                    "consumerPollTime": 100,
+                    "consumerTopicList": [
+                      "unauthenticated.dcae_cl_output"
+                    ],
+                    "keyDeserializer": "org.apache.kafka.common.serialization.StringDeserializer",
+                    "valueDeserializer": "org.apache.kafka.common.serialization.StringDeserializer",
+                    "kafkaProperties": [
+
+                    ]
                   }
                 },
                 "eventProtocolParameters": {
@@ -145,10 +158,21 @@
               },
               "SimpleCL_logOutputter2": {
                 "carrierTechnologyParameters": {
-                  "carrierTechnology": "RESTCLIENT",
-                  "parameterClassName": "org.onap.policy.apex.plugins.event.carrier.restclient.RestClientCarrierTechnologyParameters",
+                  "carrierTechnology": "KAFKA",
+                  "parameterClassName": "org.onap.policy.apex.plugins.event.carrier.kafka.KafkaCarrierTechnologyParameters",
                   "parameters": {
-                    "url": "http://message-router:3904/events/APEX-CL-MGT2"
+                    "bootstrapServers": "kafka:9092",
+                    "acks": "all",
+                    "retries": 0,
+                    "batchSize": 16384,
+                    "lingerTime": 1,
+                    "bufferMemory": 33554432,
+                    "producerTopic": "apex-cl-mgt2",
+                    "keySerializer": "org.apache.kafka.common.serialization.StringSerializer",
+                    "valueSerializer": "org.apache.kafka.common.serialization.StringSerializer",
+                    "kafkaProperties": [
+
+                    ]
                   }
                 },
                 "eventProtocolParameters": {
diff --git a/csit/resources/tests/distribution-test.robot b/csit/resources/tests/distribution-test.robot
index b8b4924..c85be4f 100644
--- a/csit/resources/tests/distribution-test.robot
+++ b/csit/resources/tests/distribution-test.robot
@@ -20,11 +20,11 @@
     [Documentation]  Verify policy-distribution is exporting prometheus metrics after execution
     ${hcauth}=  PolicyAdminAuth
     ${resp}=  PerformGetRequest  ${DISTRIBUTION_IP}  /metrics  200  null  ${hcauth}
-    Should Contain  ${resp.text}  total_distribution_received_count_total 1.0
-    Should Contain  ${resp.text}  distribution_success_count_total 1.0
+    Should Contain  ${resp.text}  total_distribution_received_count_total 2.0
+    Should Contain  ${resp.text}  distribution_success_count_total 2.0
     Should Contain  ${resp.text}  distribution_failure_count_total 0.0
-    Should Contain  ${resp.text}  total_download_received_count_total 1.0
-    Should Contain  ${resp.text}  download_success_count_total 1.0
+    Should Contain  ${resp.text}  total_download_received_count_total 2.0
+    Should Contain  ${resp.text}  download_success_count_total 2.0
     Should Contain  ${resp.text}  download_failure_count_total 0.0
 
 *** Keywords ***
diff --git a/csit/resources/tests/drools-applications-test.robot b/csit/resources/tests/drools-applications-test.robot
index 81ec65e..5c6a1b5 100644
--- a/csit/resources/tests/drools-applications-test.robot
+++ b/csit/resources/tests/drools-applications-test.robot
@@ -29,8 +29,8 @@
 
 MakeTopics
     [Documentation]    Creates the Policy topics
-    GetTopic     POLICY-PDP-PAP
-    GetTopic     POLICY-CL-MGT
+    GetKafkaTopic     policy-pdp-pap
+    GetKafkaTopic     policy-cl-mgt
 
 CreateVcpeXacmlPolicy
     [Documentation]    Create VCPE Policy for Xacml
@@ -59,27 +59,20 @@
 DeployXacmlPolicies
     [Documentation]    Deploys the Policies to Xacml
     PerformPostRequest  /policy/pap/v1/pdps/deployments/batch  null  ${POLICY_PAP_IP}  deploy.xacml.policies.json  ${CURDIR}/data  json  202
-    ${result}=    CheckTopic     POLICY-PDP-PAP    PDP_UPDATE
-    Sleep    5s
-    ${result}=    CheckTopic     POLICY-PDP-PAP    ACTIVE
-    Should Contain    ${result}    responseTo
-    Should Contain    ${result}    xacml
-    Should Contain    ${result}    restart
-    Should Contain    ${result}    onap.restart.tca
+    Sleep  5s
+    ${result}=    CheckKafkaTopic     policy-notification    onap.vfirewall.tca
+    Should Contain    ${result}    deployed-policies
     Should Contain    ${result}    onap.scaleout.tca
-    Should Contain    ${result}    onap.vfirewall.tca
+    Should Contain    ${result}    onap.restart.tca
 
 DeployDroolsPolicies
     [Documentation]    Deploys the Policies to Drools
     PerformPostRequest  /policy/pap/v1/pdps/deployments/batch  null  ${POLICY_PAP_IP}  deploy.drools.policies.json  ${CURDIR}/data  json  202
-    ${result}=    CheckTopic     POLICY-PDP-PAP    PDP_UPDATE
-    Sleep    5s
-    ${result}=    CheckTopic     POLICY-PDP-PAP    ACTIVE
-    Should Contain    ${result}    responseTo
-    Should Contain    ${result}    drools
-    Should Contain    ${result}    operational.restart
+    Sleep  5s
+    ${result}=    CheckKafkaTopic    policy-notification    operational.modifyconfig
+    Should Contain    ${result}    deployed-policies
     Should Contain    ${result}    operational.scaleout
-    Should Contain    ${result}    operational.modifyconfig
+    Should Contain    ${result}    operational.restart
 
 #VcpeExecute
 #    [Documentation]    Executes VCPE Policy
@@ -179,7 +172,7 @@
     ${data}=    Get File    ${file}
     Create Session   session  http://${DMAAP_IP}   max_retries=1
     ${headers}=  Create Dictionary  Content-Type=application/json
-    ${resp}=  POST On Session    session    /events/unauthenticated.DCAE_CL_OUTPUT    headers=${headers}    data=${data}
+    ${resp}=  POST On Session    session    /events/unauthenticateddcae_cl_output    headers=${headers}    data=${data}
     Log    Response from dmaap ${resp.text}
     Status Should Be    OK
     [Return]    ${resp.text}
diff --git a/csit/resources/tests/kafka_consumer.py b/csit/resources/tests/kafka_consumer.py
new file mode 100755
index 0000000..595e3db
--- /dev/null
+++ b/csit/resources/tests/kafka_consumer.py
@@ -0,0 +1,66 @@
+#!/usr/bin/env python3
+#
+# ============LICENSE_START====================================================
+#  Copyright (C) 2023-2024 Nordix Foundation.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# SPDX-License-Identifier: Apache-2.0
+# ============LICENSE_END======================================================
+
+# Python utility to fetch kafka topic and look for required messages.
+# Accepts the arguments {topic_name} and {list of expected values} and {timeout} to verify the kafka topic.
+
+
+from confluent_kafka import Consumer, KafkaException
+import sys
+import time
+
+
+def consume_kafka_topic(topic, expected_values, timeout):
+    config = {
+            'bootstrap.servers': 'localhost:29092',
+            'group.id': 'testgrp',
+            'auto.offset.reset': 'earliest'
+    }
+    consumer = Consumer(config)
+    consumer.subscribe([topic])
+    try:
+        start_time = time.time()
+        while time.time() - start_time < timeout:
+                msg = consumer.poll(1.0)
+                if msg is None:
+                    continue
+                if msg.error():
+                    if msg.error().code() == KafkaException._PARTITION_EOF:
+                        sys.stderr.write(f"Reached end of topic {msg.topic()} / partition {msg.partition()}\n")
+                        print('ERROR')
+                        sys.exit(404)
+                    else:
+                        # Error
+                        raise KafkaException(msg.error())
+                else:
+                    # Message received
+                    message = msg.value().decode('utf-8')
+                    if expected_values in message:
+                        print(message)
+                        sys.exit(200)
+    finally:
+        consumer.close()
+
+
+if __name__ == '__main__':
+    topic_name = sys.argv[1]
+    timeout = int(sys.argv[2])  # timeout in seconds for verifying the kafka topic
+    expected_values = sys.argv[3]
+    consume_kafka_topic(topic_name, expected_values, timeout)
diff --git a/csit/resources/tests/kafka_producer.py b/csit/resources/tests/kafka_producer.py
new file mode 100755
index 0000000..e6f01c2
--- /dev/null
+++ b/csit/resources/tests/kafka_producer.py
@@ -0,0 +1,41 @@
+#!/usr/bin/env python3
+#
+# ============LICENSE_START====================================================
+#  Copyright (C) 2023-2024 Nordix Foundation.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# SPDX-License-Identifier: Apache-2.0
+# ============LICENSE_END======================================================
+
+# Python utility to produce a message on a kafka topic
+# Accepts the arguments {topic_name} and {message}
+
+from confluent_kafka import Producer
+import sys
+
+def post_to_kafka(topic, message):
+    conf = {'bootstrap.servers': 'localhost:29092'}
+
+    producer = Producer(conf)
+    try:
+        producer.produce(topic, value=message.encode('utf-8'))
+        producer.flush()
+        print('Message posted to Kafka topic: {}'.format(topic))
+    except Exception as e:
+        print('Failed to post message: {}'.format(str(e)))
+    finally:
+        producer.flush()
+
+if __name__ == '__main__':
+    post_to_kafka(sys.argv[1], sys.argv[2])
diff --git a/csit/resources/tests/make_topics.py b/csit/resources/tests/make_topics.py
new file mode 100755
index 0000000..64a230e
--- /dev/null
+++ b/csit/resources/tests/make_topics.py
@@ -0,0 +1,41 @@
+#!/usr/bin/env python3
+#
+# ============LICENSE_START====================================================
+#  Copyright (C) 2023-2024 Nordix Foundation.
+# =============================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# SPDX-License-Identifier: Apache-2.0
+# ============LICENSE_END======================================================
+
+# Python utility to create a new kafka topic
+# Accepts the argument {topic_name}
+
+from confluent_kafka.admin import AdminClient, NewTopic
+import sys
+
+def create_topic(bootstrap_servers, topic_name, num_partitions=2, replication_factor=2):
+    admin_client = AdminClient({'bootstrap.servers': bootstrap_servers})
+
+    # Define the topic configuration
+    topic = NewTopic(topic_name, num_partitions=num_partitions, replication_factor=replication_factor)
+
+    # Create the topic
+    admin_client.create_topics([topic])
+
+
+if __name__ == '__main__':
+    topic_name = sys.argv[1]
+    bootstrap_servers = 'localhost:29092'
+
+    create_topic(bootstrap_servers, topic_name)
diff --git a/csit/resources/tests/pap-test.robot b/csit/resources/tests/pap-test.robot
index 8232843..74d299a 100644
--- a/csit/resources/tests/pap-test.robot
+++ b/csit/resources/tests/pap-test.robot
@@ -47,6 +47,7 @@
 
 Consolidated Healthcheck
     [Documentation]  Verify policy consolidated health check
+    sleep  20
     ${resp}=  GetReq  /policy/pap/v1/components/healthcheck
     Should Be Equal As Strings  ${resp.json()['healthy']}  True
 
diff --git a/csit/resources/tests/policy-clamp-test.robot b/csit/resources/tests/policy-clamp-test.robot
index d5fec49..10f9389 100644
--- a/csit/resources/tests/policy-clamp-test.robot
+++ b/csit/resources/tests/policy-clamp-test.robot
@@ -97,6 +97,7 @@
 QueryPolicyTypes
      [Documentation]    Verify the new policy types created
      ${auth}=    Create List    policyadmin    zb!XztG34
+     sleep  10
      Log    Creating session http://${POLICY_API_IP}}:6969
      ${session}=    Create Session      policy  http://${POLICY_API_IP}   auth=${auth}
      ${headers}=  Create Dictionary     Accept=application/json    Content-Type=application/json
diff --git a/csit/resources/tests/xacml-pdp-test.robot b/csit/resources/tests/xacml-pdp-test.robot
index 331a7fa..e500a04 100644
--- a/csit/resources/tests/xacml-pdp-test.robot
+++ b/csit/resources/tests/xacml-pdp-test.robot
@@ -19,7 +19,7 @@
 
 MakeTopics
     [Documentation]    Creates the Policy topics
-    GetTopic   POLICY-PDP-PAP
+    GetKafkaTopic   policy-pdp-pap
 
 ExecuteXacmlPolicy
     CreateMonitorPolicy
@@ -56,12 +56,9 @@
     ${postjson}=  Get file  ${CURDIR}/data/vCPE.policy.input.tosca.deploy.json
     ${policyadmin}=  PolicyAdminAuth
     PerformPostRequest  ${POLICY_PAP_IP}  /policy/pap/v1/pdps/policies  202  ${postjson}  null  ${policyadmin}
-    ${result}=     CheckTopic    POLICY-PDP-PAP    PDP_UPDATE
-    Sleep    5s
-    ${result}=     CheckTopic    POLICY-PDP-PAP    ACTIVE
-    Should Contain    ${result}    responseTo
-    Should Contain    ${result}    xacml
-    Should Contain    ${result}    onap.restart.tca
+    sleep  20s
+    ${result}=     CheckKafkaTopic    policy-notification     onap.restart.tca
+    Should Contain    ${result}    deployed-policies
 
 GetAbbreviatedDecisionResult
     [Documentation]    Get Decision with abbreviated results from Policy Xacml PDP