Remove Dmaap configurations from CSITs

Issue-ID: POLICY-4880
Change-Id: I4f973ef1f7a173eaf5f196d9991d67d584e235fe
Signed-off-by: rameshiyer27 <ramesh.murugan.iyer@est.tech>
diff --git a/compose/config/drools-applications/custom/feature-lifecycle.properties b/compose/config/drools-applications/custom/feature-lifecycle.properties
index ec23bee..053cd06 100644
--- a/compose/config/drools-applications/custom/feature-lifecycle.properties
+++ b/compose/config/drools-applications/custom/feature-lifecycle.properties
@@ -32,10 +32,8 @@
 kafka.source.topics.policy-pdp-pap.effectiveTopic=${envd:POLICY_PDP_PAP_TOPIC}
 kafka.source.topics.policy-pdp-pap.apiKey=${envd:POLICY_PDP_PAP_API_KEY}
 kafka.source.topics.policy-pdp-pap.apiSecret=${envd:POLICY_PDP_PAP_API_SECRET}
-kafka.source.topics.policy-pdp-pap.https=${envd:DMAAP_HTTPS:false}
 
 kafka.sink.topics.policy-pdp-pap.servers=kafka:9092
 kafka.sink.topics.policy-pdp-pap.effectiveTopic=${envd:POLICY_PDP_PAP_TOPIC}
 kafka.sink.topics.policy-pdp-pap.apiKey=${envd:POLICY_PDP_PAP_API_KEY}
 kafka.sink.topics.policy-pdp-pap.apiSecret=${envd:POLICY_PDP_PAP_API_SECRET}
-kafka.sink.topics.policy-pdp-pap.https=${envd:DMAAP_HTTPS:false}
diff --git a/compose/config/drools-applications/env/base.conf b/compose/config/drools-applications/env/base.conf
index 0b92602..76e4257 100644
--- a/compose/config/drools-applications/env/base.conf
+++ b/compose/config/drools-applications/env/base.conf
@@ -70,7 +70,7 @@
 
 LIVENESS_CONTROLLERS=*
 
-# PDP-D DMaaP configuration channel
+# PDP-D Kafka configuration channel
 
 PDPD_CONFIGURATION_TOPIC=pdpd-configuration
 PDPD_CONFIGURATION_API_KEY=
@@ -101,16 +101,12 @@
 PDP_PASSWORD=zb!XztG34
 PDP_HTTPS=false
 
-# DCAE DMaaP
+# DCAE Kafka
 
 DCAE_TOPIC=unauthenticated.dcae_cl_output
 DCAE_SERVERS=kafka:9092
 DCAE_CONSUMER_GROUP=dcae.policy.shared
 
-# Open DMaaP
-
-DMAAP_SERVERS=kafka:9092
-DMAAP_HTTPS=false
 
 # AAI
 
@@ -126,7 +122,7 @@
 SO_HOST=so-sim
 SO_PORT=6669
 SO_CONTEXT_URI=
-SO_URL=http://message-router:6669/
+SO_URL=http://so-sim:6669/
 SO_USERNAME=InfraPortalClient
 SO_PASSWORD='password1$'
 
diff --git a/compose/config/drools-applications/env/feature-pooling-dmaap.conf b/compose/config/drools-applications/env/feature-pooling-kafka.conf
similarity index 100%
rename from compose/config/drools-applications/env/feature-pooling-dmaap.conf
rename to compose/config/drools-applications/env/feature-pooling-kafka.conf
diff --git a/compose/config/drools-pdp/custom/noop.pre.sh b/compose/config/drools-pdp/custom/noop.pre.sh
index 2be3398..7a7759f 100755
--- a/compose/config/drools-pdp/custom/noop.pre.sh
+++ b/compose/config/drools-pdp/custom/noop.pre.sh
@@ -16,8 +16,5 @@
 # limitations under the License.
 # ============LICENSE_END=========================================================
 
-sed -i "s/^dmaap/kafka/g" \
-    ${POLICY_HOME}/config/engine.properties \
-    ${POLICY_HOME}/config/feature-lifecycle.properties
 
 chmod 644 ${POLICY_HOME}/config/engine.properties ${POLICY_HOME}/config/feature-lifecycle.properties
diff --git a/compose/config/drools-pdp/env/base.conf b/compose/config/drools-pdp/env/base.conf
index d301d83..2625b76 100644
--- a/compose/config/drools-pdp/env/base.conf
+++ b/compose/config/drools-pdp/env/base.conf
@@ -67,7 +67,7 @@
 HTTP_SERVER_HTTPS=false
 PROMETHEUS=true
 
-# PDP-D DMaaP configuration channel
+# PDP-D Kafka configuration channel
 
 PDPD_CONFIGURATION_TOPIC=pdpd-configuration
 PDPD_CONFIGURATION_API_KEY=
@@ -100,16 +100,16 @@
 PDP_ENVIRONMENT=
 GUARD_DISABLED=false
 
-# DCAE DMaaP
+# DCAE Kafka
 
 DCAE_TOPIC=unauthenticated.dcae_cl_output
 DCAE_SERVERS=
 DCAE_CONSUMER_GROUP=dcae.policy.shared
 
-# Open DMaaP
+# kafka server
 
-DMAAP_SERVERS=kafka:9092
-DMAAP_HTTPS=false
+KAFKA_SERVERS=kafka:9092
+
 
 # AAI
 
diff --git a/compose/config/sim-all/simParameters.json b/compose/config/sim-all/simParameters.json
index 2bc5812..b54db25 100644
--- a/compose/config/sim-all/simParameters.json
+++ b/compose/config/sim-all/simParameters.json
@@ -1,17 +1,6 @@
 {
-    "dmaapProvider": {
-        "name": "DMaaP simulator",
-        "topicSweepSec": 300
-    },
     "restServers": [
         {
-            "name": "DMaaP simulator",
-            "providerClass": "org.onap.policy.models.sim.dmaap.rest.DmaapSimRestControllerV1",
-            "host": "0.0.0.0",
-            "port": 3904,
-            "https": false
-        },
-        {
             "name": "A&AI simulator",
             "providerClass": "org.onap.policy.simulators.AaiSimulatorJaxRs",
             "host": "0.0.0.0",
@@ -45,56 +34,7 @@
             "https": false
         }
     ],
-    "topicSinks": [
-        {
-            "topic": "appc-cl",
-            "servers": ["${HOST_NAME}"],
-            "topicCommInfrastructure": "DMAAP",
-            "useHttps": false,
-            "apiKey": "some-key",
-            "apiSecret": "some-secret"
-        },
-        {
-            "topic": "appc-lcm-write",
-            "servers": ["${HOST_NAME}"],
-            "topicCommInfrastructure": "DMAAP",
-            "useHttps": false,
-            "apiKey": "some-key",
-            "apiSecret": "some-secret"
-        }
-    ],
-    "topicSources": [
-        {
-            "topic": "appc-cl",
-            "servers": ["${HOST_NAME}"],
-            "topicCommInfrastructure": "DMAAP",
-            "useHttps": false,
-            "apiKey": "some-key",
-            "apiSecret": "some-secret"
-        },
-        {
-            "topic": "appc-lcm-read",
-            "servers": ["${HOST_NAME}"],
-            "topicCommInfrastructure": "DMAAP",
-            "useHttps": false,
-            "apiKey": "some-key",
-            "apiSecret": "some-secret"
-        }
-    ],
-    "topicServers": [
-        {
-            "name": "APPC Legacy simulator",
-            "providerClass": "org.onap.policy.simulators.AppcLegacyTopicServer",
-            "sink": "appc-cl",
-            "source": "appc-cl"
-        },
-        {
-            "name": "appc-lcm-simulator",
-            "providerClass": "org.onap.policy.simulators.AppcLcmTopicServer",
-            "sink": "appc-lcm-write",
-            "source": "appc-lcm-read"
-        }
-    ],
+
     "grpcServer": {
         "name": "CDS simulator",
         "providerClass": "org.onap.policy.simulators.CdsSimulator",
diff --git a/compose/docker-compose.pdp.scale.yml b/compose/docker-compose.pdp.scale.yml
index 53376ab..104de3c 100644
--- a/compose/docker-compose.pdp.scale.yml
+++ b/compose/docker-compose.pdp.scale.yml
@@ -23,6 +23,7 @@
     depends_on:
       - mariadb
       - simulator
+      - kafka
       - pap
     hostname: policy-apex-pdp
     expose:
@@ -39,7 +40,7 @@
     command: [
       '-c', '/opt/app/policy/apex-pdp/bin/apexOnapPf.sh -c /opt/app/policy/apex-pdp/etc/onappf/config/OnapPfConfig.json',
       'mariadb', '3306',
-      'message-router', '3904',
+      'kafka', '9092',
       'pap', '6969'
       ]
   nginx:
@@ -50,3 +51,27 @@
       - apexpdp
     ports:
        - ${APEX_PORT}:${APEX_PORT}
+
+  zookeeper:
+    image: confluentinc/cp-zookeeper:latest
+    environment:
+      ZOOKEEPER_CLIENT_PORT: 2181
+      ZOOKEEPER_TICK_TIME: 2000
+    ports:
+      - 2181:2181
+
+  kafka:
+    image: confluentinc/cp-kafka:latest
+    container_name: kafka
+    depends_on:
+      - zookeeper
+    ports:
+      - 29092:29092
+      - 9092:9092
+    environment:
+      KAFKA_BROKER_ID: 1
+      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
+      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
+      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
+      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
diff --git a/compose/docker-compose.postgres.yml b/compose/docker-compose.postgres.yml
index 4fe4002..1f32dba 100644
--- a/compose/docker-compose.postgres.yml
+++ b/compose/docker-compose.postgres.yml
@@ -73,7 +73,7 @@
       - 6670
       - 6680
     ports:
-      - ${DMAAP_PORT}:3904
+      - ${SIMULATOR_PORT}:3904
   api:
     image: ${CONTAINER_LOCATION}onap/policy-api:${POLICY_API_VERSION}
     container_name: policy-api
diff --git a/compose/docker-compose.yml b/compose/docker-compose.yml
index dbd6298..5effc25 100644
--- a/compose/docker-compose.yml
+++ b/compose/docker-compose.yml
@@ -73,7 +73,7 @@
        - 6670
        - 6680
       ports:
-       - ${DMAAP_PORT}:3904
+       - ${SIMULATOR_PORT}:6666
    api:
       image: ${CONTAINER_LOCATION}onap/policy-api:${POLICY_API_VERSION}
       container_name: policy-api
@@ -177,7 +177,7 @@
       env_file:
         - ./config/drools-applications/env/base.conf
         - ./config/drools-applications/env/feature-healthcheck.conf
-        - ./config/drools-applications/env/feature-pooling-dmaap.conf
+        - ./config/drools-applications/env/feature-pooling-kafka.conf
       entrypoint: /opt/app/policy/bin/wait_for_port.sh
       command: [
         '-c', '/opt/app/policy/bin/pdpd-cl-entrypoint.sh boot',
diff --git a/compose/export-ports.sh b/compose/export-ports.sh
index a55f755..1648863 100755
--- a/compose/export-ports.sh
+++ b/compose/export-ports.sh
@@ -43,7 +43,7 @@
 export SIM_PARTICIPANT_PORT=30011
 export DROOLS_APPS_PORT=30009
 export DROOLS_APPS_TELEMETRY_PORT=30219
-export DMAAP_PORT=30904
+export SIMULATOR_PORT=30904
 export KAFKA_PORT=9092
 export PROMETHEUS_PORT=30259
 export GRAFANA_PORT=30269
diff --git a/csit/resources/scripts/get-cluster-info.sh b/csit/resources/scripts/get-cluster-info.sh
index 272d296..ab0b27b 100755
--- a/csit/resources/scripts/get-cluster-info.sh
+++ b/csit/resources/scripts/get-cluster-info.sh
@@ -31,7 +31,7 @@
 export POLICY_PF_PARTICIPANT_PORT=30008
 export POLICY_HTTP_PARTICIPANT_PORT=30009
 export POLICY_K8S_PARTICIPANT_PORT=30010
-export DMAAP_PORT=30904
+export SIMULATOR_PORT=30904
 
 # Retrieve pod names
 function get_pod_names() {
@@ -113,7 +113,7 @@
 
 function setup_message_router_svc() {
   microk8s kubectl expose service message-router --name message-router-svc --type NodePort --protocol TCP --port 3904 --target-port 3904
-  microk8s kubectl patch service message-router-svc --namespace=default --type='json' --patch='[{"op": "replace", "path": "/spec/ports/0/nodePort", "value":'"$DMAAP_PORT"'}]'
+  microk8s kubectl patch service message-router-svc --namespace=default --type='json' --patch='[{"op": "replace", "path": "/spec/ports/0/nodePort", "value":'"$SIMULATOR_PORT"'}]'
 }
 
 ####MAIN###
diff --git a/csit/resources/scripts/run-test.sh b/csit/resources/scripts/run-test.sh
index 6bd7c07..34048e2 100755
--- a/csit/resources/scripts/run-test.sh
+++ b/csit/resources/scripts/run-test.sh
@@ -33,7 +33,6 @@
 POLICY_PDPX_IP=policy-xacml-pdp:${DEFAULT_PORT}
 POLICY_DROOLS_IP=policy-drools-pdp:9696
 DISTRIBUTION_IP=policy-distribution:6969
-DMAAP_IP=message-router:3904
 KAFKA_IP=kafka:9092
 APEX_EVENTS_IP=policy-apex-pdp:23324
 PROMETHEUS_IP=prometheus:9090
@@ -44,7 +43,7 @@
 export ROBOT_VARIABLES=
 ROBOT_VARIABLES="-v DATA:$DATA -v NODETEMPLATES:$NODETEMPLATES -v POLICY_API_IP:$POLICY_API_IP
 -v POLICY_RUNTIME_ACM_IP:$POLICY_RUNTIME_ACM_IP -v POLICY_PAP_IP:$POLICY_PAP_IP -v APEX_IP:$APEX_IP
--v APEX_EVENTS_IP:$APEX_EVENTS_IP -v DMAAP_IP:$DMAAP_IP  -v KAFKA_IP:$KAFKA_IP -v PROMETHEUS_IP:${PROMETHEUS_IP}
+-v APEX_EVENTS_IP:$APEX_EVENTS_IP -v KAFKA_IP:$KAFKA_IP -v PROMETHEUS_IP:${PROMETHEUS_IP}
 -v POLICY_PDPX_IP:$POLICY_PDPX_IP -v POLICY_DROOLS_IP:$POLICY_DROOLS_IP -v TEMP_FOLDER:${DIST_TEMP_FOLDER}
 -v DISTRIBUTION_IP:$DISTRIBUTION_IP -v CLAMP_K8S_TEST:$CLAMP_K8S_TEST"
 
diff --git a/csit/resources/scripts/setup-apex-pdp-large.sh b/csit/resources/scripts/setup-apex-pdp-large.sh
index 83d7884..1f55861 100644
--- a/csit/resources/scripts/setup-apex-pdp-large.sh
+++ b/csit/resources/scripts/setup-apex-pdp-large.sh
@@ -45,10 +45,8 @@
     sleep 10s
 done
 
-export DMAAP_IP="localhost:${DMAAP_PORT}"
 export SUITES="apex-slas-10.robot"
 
 ROBOT_VARIABLES="-v POLICY_PAP_IP:localhost:${PAP_PORT} -v POLICY_API_IP:localhost:${API_PORT}
 -v PROMETHEUS_IP:localhost:${PROMETHEUS_PORT} -v DATA:${DATA} -v NODETEMPLATES:${NODETEMPLATES}
--v APEX_IP:localhost:${APEX_PORT} -v DMAAP_IP:${DMAAP_IP}
--v APEX_EVENTS_IP:localhost:${APEX_PORT}"
+-v APEX_IP:localhost:${APEX_PORT} -v APEX_EVENTS_IP:localhost:${APEX_PORT}"
diff --git a/csit/resources/scripts/setup-apex-pdp-medium.sh b/csit/resources/scripts/setup-apex-pdp-medium.sh
index e1f3776..e7e0725 100644
--- a/csit/resources/scripts/setup-apex-pdp-medium.sh
+++ b/csit/resources/scripts/setup-apex-pdp-medium.sh
@@ -45,10 +45,8 @@
     sleep 10s
 done
 
-export DMAAP_IP="localhost:${DMAAP_PORT}"
 export SUITES="apex-slas-3.robot"
 
 ROBOT_VARIABLES="-v POLICY_PAP_IP:localhost:${PAP_PORT} -v POLICY_API_IP:localhost:${API_PORT}
 -v PROMETHEUS_IP:localhost:${PROMETHEUS_PORT} -v DATA:${DATA} -v NODETEMPLATES:${NODETEMPLATES}
--v APEX_IP:localhost:${APEX_PORT} -v DMAAP_IP:${DMAAP_IP}
--v APEX_EVENTS_IP:localhost:${APEX_PORT}"
+-v APEX_IP:localhost:${APEX_PORT} -v APEX_EVENTS_IP:localhost:${APEX_PORT}"
diff --git a/csit/resources/tests/apex-pdp-test.robot b/csit/resources/tests/apex-pdp-test.robot
index 5e4ea34..37bcff6 100644
--- a/csit/resources/tests/apex-pdp-test.robot
+++ b/csit/resources/tests/apex-pdp-test.robot
@@ -77,7 +77,7 @@
 *** Keywords ***
 
 TriggerAndVerifyTestPnfPolicy
-    [Documentation]    Send TestPnf policy trigger event to DMaaP and read notifications to verify policy execution
+    [Documentation]    Send TestPnf policy trigger event to Kafka and read notifications to verify policy execution
     [Arguments]    ${topic}
     ${data}=    Get Binary File     ${CURDIR}/data/VesEventForPnfPolicy.json
     ${resp}=    Run Process    ${CURDIR}/kafka_producer.py    unauthenticated.dcae_cl_output    ${data}
@@ -86,13 +86,11 @@
     Run Keyword    CheckLogMessage    ${topic}    FINAL_SUCCESS    Successfully processed the VES event. Hostname is updated.
 
 TriggerAndVerifyTestVnfPolicy
-    [Documentation]    Send TestVnf policy trigger event to DMaaP and read notifications to verify policy execution
-    Create Session   apexSession  http://${KAFKA_IP}   max_retries=1
+    [Documentation]    Send TestVnf policy trigger event to Kafka and read notifications to verify policy execution
+    [Arguments]    ${topic}
     ${data}=    Get Binary File     ${CURDIR}/data/VesEventForVnfPolicy.json
-    &{headers}=  Create Dictionary    Content-Type=application/json    Accept=application/json
-    ${resp}=    POST On Session    apexSession    /events/unauthenticated.DCAE_POLICY_EXAMPLE_OUTPUT    data=${data}   headers=${headers}
-    Should Be Equal As Strings    ${resp.status_code}   200
-    Run Keyword    CheckLogMessage    ACTIVE    VES event has been received. Going to fetch VNF details from AAI.
-    Run Keyword    CheckLogMessage    SUCCESS    VNF details are received from AAI successfully. Sending ConfigModify request to CDS.
-    Run Keyword    CheckLogMessage    SUCCESS    ConfigModify request is successful. Sending restart request to CDS.
-    Run Keyword    CheckLogMessage    FINAL_SUCCESS    Successfully processed the VES Event. Restart is complete.
+    ${resp}=    Run Process    ${CURDIR}/kafka_producer.py    unauthenticated.dcae_policy_example_output    ${data}
+    Run Keyword    CheckLogMessage    ${topic}    ACTIVE    VES event has been received. Going to fetch VNF details from AAI.
+    Run Keyword    CheckLogMessage    ${topic}    SUCCESS    VNF details are received from AAI successfully. Sending ConfigModify request to CDS.
+    Run Keyword    CheckLogMessage    ${topic}    SUCCESS    ConfigModify request is successful. Sending restart request to CDS.
+    Run Keyword    CheckLogMessage    ${topic}    FINAL_SUCCESS    Successfully processed the VES Event. Restart is complete.
diff --git a/csit/resources/tests/apex-slas-10.robot b/csit/resources/tests/apex-slas-10.robot
index 7639ce9..833bb83 100644
--- a/csit/resources/tests/apex-slas-10.robot
+++ b/csit/resources/tests/apex-slas-10.robot
@@ -25,14 +25,11 @@
     CreateNodeTemplate  /policy/api/v1/nodetemplates  200  ${postjson}  1
     DeployPolicy
     Wait Until Keyword Succeeds    2 min    5 sec    QueryPolicyStatus  ${policyName}  defaultGroup  apex  ${pdpName}  onap.policies.native.Apex
-    GetTopic     APEX-CL-MGT2
-    Create Session   apexSession  http://${DMAAP_IP}   max_retries=1
+    GetKafkaTopic     apex-cl-mgt2
     ${data}=    Get Binary File     ${CURDIR}/data/VesEventForVnfPolicy.json
-    &{headers}=  Create Dictionary    Content-Type=application/json    Accept=application/json
     ${eventStartTime}=  Get Current Date
-    ${resp}=    POST On Session    apexSession    /events/unauthenticated.DCAE_POLICY_EXAMPLE_OUTPUT    data=${data}   headers=${headers}
+    ${resp}=    Run Process    ${CURDIR}/kafka_producer.py    unauthenticated.dcae_policy_example_output    ${data}
     ${eventEndTime}=  Get Current Date
-    Should Be Equal As Strings    ${resp.status_code}   200
     ValidateEventExecution    ${eventStartTime}  ${eventEndTime}  100
 
 ValidatePolicyExecutionAndEventRateHighComplexity
@@ -58,14 +55,11 @@
     CreatePolicy  /policy/api/v1/policytypes/onap.policies.native.Apex/versions/1.0.0/policies  200  ${postjson}  ${policyName}  1.0.0
     DeployPolicy
     Wait Until Keyword Succeeds    2 min    5 sec    QueryPolicyStatus  ${policyName}  defaultGroup  apex  ${pdpName}  onap.policies.native.Apex
-    GetTopic     APEX-CL-MGT
-    Create Session   apexSession  http://${DMAAP_IP}   max_retries=1
+    GetKafkaTopic     apex-cl-mgt
     ${data}=    Get Binary File     ${CURDIR}/data/VesEventForPnfPolicy.json
-    &{headers}=  Create Dictionary    Content-Type=application/json    Accept=application/json
     ${eventStartTime}=  Get Current Date
-    ${resp}=    POST On Session    apexSession    /events/unauthenticated.DCAE_CL_OUTPUT    data=${data}   headers=${headers}
+    ${resp}=    Run Process    ${CURDIR}/kafka_producer.py    unauthenticated.dcae_cl_output    ${data}
     ${eventEndTime}=  Get Current Date
-    Should Be Equal As Strings    ${resp.status_code}   200
     ValidateEventExecution    ${eventStartTime}  ${eventEndTime}  10
 
 WaitForPrometheusServer
diff --git a/csit/resources/tests/apex-slas-3.robot b/csit/resources/tests/apex-slas-3.robot
index 13d5895..c37c1cd 100644
--- a/csit/resources/tests/apex-slas-3.robot
+++ b/csit/resources/tests/apex-slas-3.robot
@@ -23,13 +23,10 @@
     CreatePolicy  /policy/api/v1/policytypes/onap.policies.native.Apex/versions/1.0.0/policies  200  ${postjson}  ${policyName}  1.0.0
     DeployPolicy
     Wait Until Keyword Succeeds    2 min    5 sec    QueryPolicyStatus  ${policyName}  defaultGroup  apex  ${pdpName}  onap.policies.native.Apex
-    GetTopic     APEX-CL-MGT
-    Create Session   apexSession  http://${DMAAP_IP}   max_retries=1
+    GetKafkaTopic     apex-cl-mgt
     ${data}=    Get Binary File     ${CURDIR}/data/VesEventForPnfPolicy.json
-    &{headers}=  Create Dictionary    Content-Type=application/json    Accept=application/json
     ${eventStartTime}=  Get Current Date
-    ${resp}=    POST On Session    apexSession    /events/unauthenticated.DCAE_CL_OUTPUT    data=${data}   headers=${headers}
-    Should Be Equal As Strings    ${resp.status_code}   200
+    ${resp}=    Run Process    ${CURDIR}/kafka_producer.py    unauthenticated.dcae_cl_output    ${data}
     ${eventEndTime}=  Get Current Date
     ValidateEventExecution    ${eventStartTime}  ${eventEndTime}  30
 
@@ -42,13 +39,10 @@
     CreateNodeTemplate  /policy/api/v1/nodetemplates  200  ${postjson}  1
     DeployPolicy
     Wait Until Keyword Succeeds    2 min    5 sec    QueryPolicyStatus  ${policyName}  defaultGroup  apex  ${pdpName}  onap.policies.native.Apex
-    GetTopic     APEX-CL-MGT2
-    Create Session   apexSession  http://${DMAAP_IP}   max_retries=1
+    GetKafkaTopic     apex-cl-mgt2
     ${data}=    Get Binary File     ${CURDIR}/data/VesEventForVnfPolicy.json
-    &{headers}=  Create Dictionary    Content-Type=application/json    Accept=application/json
     ${eventStartTime}=  Get Current Date
-    ${resp}=    POST On Session    apexSession    /events/unauthenticated.DCAE_POLICY_EXAMPLE_OUTPUT    data=${data}   headers=${headers}
-    Should Be Equal As Strings    ${resp.status_code}   200
+    ${resp}=    Run Process    ${CURDIR}/kafka_producer.py    unauthenticated.dcae_policy_example_output    ${data}
     ${eventEndTime}=  Get Current Date
     ValidateEventExecution    ${eventStartTime}  ${eventEndTime}  0.6
 
diff --git a/csit/resources/tests/common-library.robot b/csit/resources/tests/common-library.robot
index 531572d..8693be1 100644
--- a/csit/resources/tests/common-library.robot
+++ b/csit/resources/tests/common-library.robot
@@ -131,23 +131,6 @@
     ${actualTime}=   Set Variable  ${rawNumber * ${1000}}
     Should Be True   ${actualTime} <= ${timeLimit}
 
-GetTopic
-    [Arguments]    ${topic}
-    Create Session   session  http://${DMAAP_IP}   max_retries=1
-    ${params}=  Create Dictionary    limit    1    timeout    0
-    ${resp}=    GET On Session    session    /events/${topic}/script/1    ${params}
-    Status Should Be    OK    ${resp}
-
-CheckTopic
-    [Arguments]    ${topic}    ${expected_status}
-    Create Session   session  http://${DMAAP_IP}   max_retries=1
-    ${params}=  Create Dictionary    limit    1
-    ${resp}=    GET On Session    session    /events/${topic}/script/1    ${params}
-    Log  Received response from dmaap ${resp.text}
-    Status Should Be    OK    ${resp}
-    Should Contain    ${resp.text}    ${expected_status}
-    [Return]    ${resp.text}
-
 CheckKafkaTopic
     [Arguments]    ${topic}    ${expected_status}
     ${resp}=    Run Process    ${CURDIR}/kafka_consumer.py    ${topic}    60    ${expected_status}
diff --git a/csit/resources/tests/drools-applications-test.robot b/csit/resources/tests/drools-applications-test.robot
index 5c6a1b5..4049700 100644
--- a/csit/resources/tests/drools-applications-test.robot
+++ b/csit/resources/tests/drools-applications-test.robot
@@ -170,9 +170,6 @@
 OnSet
     [Arguments]    ${file}
     ${data}=    Get File    ${file}
-    Create Session   session  http://${DMAAP_IP}   max_retries=1
-    ${headers}=  Create Dictionary  Content-Type=application/json
-    ${resp}=  POST On Session    session    /events/unauthenticateddcae_cl_output    headers=${headers}    data=${data}
-    Log    Response from dmaap ${resp.text}
-    Status Should Be    OK
-    [Return]    ${resp.text}
+    ${resp}=    Run Process    ${CURDIR}/kafka_producer.py    unauthenticated.dcae_cl_output    ${data}
+    Log    Response from kafka ${resp.stdout}
+    [Return]    ${resp.stdout}