Add verification flow in CSIT: Cm Data Subscription Create

- Add Confluent Kafka Library support for Robot Framework
- Add capability of testing for Kafka flows
- Add CSIT test: Cm Data Notifications Subscription Create
  complying with CloudEvents
- Change DMI version to latest for CSIT
- Change DMI service name for Kafka topic extension
- Change kafka listeners in docker compose

Issue-ID: CPS-1613
Change-Id: Ic22cb36fdcee0f5c2bd6ff5c1876747aef563f8b
Signed-off-by: halil.cakal <halil.cakal@est.tech>
diff --git a/csit/data/subscription-notification/cmSubscriptionNcmpInEventForCsit.json b/csit/data/subscription-notification/cmSubscriptionNcmpInEventForCsit.json
new file mode 100644
index 0000000..be749ad
--- /dev/null
+++ b/csit/data/subscription-notification/cmSubscriptionNcmpInEventForCsit.json
@@ -0,0 +1,20 @@
+{
+  "data": {
+    "subscription": {
+      "clientID": "SCO-9989752",
+      "name": "cm-subscription-001"
+    },
+    "dataType": {
+      "dataspace": "ALL",
+      "dataCategory": "CM",
+      "dataProvider": "CM-SERVICE"
+    },
+    "predicates": {
+      "targets": [
+        "CMHandle1"
+      ],
+      "datastore": "ncmp-datastore:passthrough-running",
+      "datastore-xpath-filter": "//_3gpp-nr-nrm-gnbdufunction:GNBDUFunction/_3gpp-nr-nrm-nrcelldu:NRCellDU/ | //_3gpp-nr-nrm-gnbcuupfunction:GNBCUUPFunction// | //_3gpp-nr-nrm-gnbcucpfunction:GNBCUCPFunction/_3gpp-nr-nrm-nrcelldu:NRCellCU// | //_3gpp-nr-nrm-nrsectorcarrier:NRSectorCarrier//"
+    }
+  }
+}
\ No newline at end of file
diff --git a/csit/data/subscription-notification/cmSubscriptionNcmpOutEventForCsit.json b/csit/data/subscription-notification/cmSubscriptionNcmpOutEventForCsit.json
new file mode 100644
index 0000000..71fee37
--- /dev/null
+++ b/csit/data/subscription-notification/cmSubscriptionNcmpOutEventForCsit.json
@@ -0,0 +1,10 @@
+{
+  "data":{
+    "statusCode":1,
+    "statusMessage":"successfully applied subscription",
+    "additionalInfo":{
+      "rejected":[],
+      "pending":[]
+    }
+  }
+}
\ No newline at end of file
diff --git a/csit/data/subscription-notification/createCmHandleRequestBody.json b/csit/data/subscription-notification/createCmHandleRequestBody.json
new file mode 100644
index 0000000..642ea62
--- /dev/null
+++ b/csit/data/subscription-notification/createCmHandleRequestBody.json
@@ -0,0 +1,17 @@
+{
+  "dmiDataPlugin":"dminame1",
+  "dmiModelPlugin":"x",
+  "createdCmHandles": [
+    {
+      "cmHandle": "CMHandle1",
+      "cmHandleProperties": {
+        "Books": "Social Media"
+      },
+      "publicCmHandleProperties": {
+        "Color": "yellow",
+        "Size": "small",
+        "Shape": "cube"
+      }
+    }
+  ]
+}
\ No newline at end of file
diff --git a/csit/plans/cps/setup.sh b/csit/plans/cps/setup.sh
index bdea019..f997d28 100755
--- a/csit/plans/cps/setup.sh
+++ b/csit/plans/cps/setup.sh
@@ -129,4 +129,4 @@
 
 ###################### ROBOT Configurations ##########################
 # Pass variables required for Robot test suites in ROBOT_VARIABLES
-ROBOT_VARIABLES="-v CPS_CORE_HOST:$CPS_CORE_HOST -v CPS_CORE_PORT:$CPS_CORE_PORT -v DMI_HOST:$LOCAL_IP -v DMI_PORT:$DMI_PORT -v DMI_CSIT_STUB_HOST:$LOCAL_IP -v DMI_CSIT_STUB_PORT:$DMI_DEMO_STUB_PORT -v DMI_AUTH_ENABLED:$DMI_AUTH_ENABLED -v CPS_CORE_MANAGEMENT_PORT:$CPS_CORE_MANAGEMENT_PORT -v DATADIR:$WORKSPACE/data --exitonfailure"
+ROBOT_VARIABLES="-v CPS_CORE_HOST:$CPS_CORE_HOST -v CPS_CORE_PORT:$CPS_CORE_PORT -v DMI_HOST:$LOCAL_IP -v DMI_PORT:$DMI_PORT -v DMI_CSIT_STUB_HOST:$LOCAL_IP -v DMI_CSIT_STUB_PORT:$DMI_DEMO_STUB_PORT -v DMI_AUTH_ENABLED:$DMI_AUTH_ENABLED -v CPS_CORE_MANAGEMENT_PORT:$CPS_CORE_MANAGEMENT_PORT -v DATADIR:$WORKSPACE/data -v DATADIR_SUBS_NOTIFICATION:$WORKSPACE/data/subscription-notification --exitonfailure"
\ No newline at end of file
diff --git a/csit/plans/cps/testplan.txt b/csit/plans/cps/testplan.txt
index cca11fb..1775ad3 100644
--- a/csit/plans/cps/testplan.txt
+++ b/csit/plans/cps/testplan.txt
@@ -1,5 +1,5 @@
 # ============LICENSE_START=======================================================
-# Copyright (C) 2021-2022 Nordix Foundation
+# Copyright (C) 2021-2023 Nordix Foundation
 # ================================================================================
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -21,6 +21,7 @@
 cps-data
 cps-model-sync
 cps-data-sync
+cps-subscriptions
 ncmp-passthrough
 cm-handle-query
 cps-data-operations
\ No newline at end of file
diff --git a/csit/pylibs.txt b/csit/pylibs.txt
index 9fee634..32bfa6f 100644
--- a/csit/pylibs.txt
+++ b/csit/pylibs.txt
@@ -9,6 +9,7 @@
 robotframework-selenium2library==3.0.0
 robotframework-extendedselenium2library
 robotframework-sshlibrary
+robotframework-confluentkafkalibrary
 scapy
 # Module jsonpath is needed by current AAA idmlite suite.
 jsonpath-rw
diff --git a/csit/tests/cps-data-operations/cps-data-operations.robot b/csit/tests/cps-data-operations/cps-data-operations.robot
index 451fb0a..b2912ee 100644
--- a/csit/tests/cps-data-operations/cps-data-operations.robot
+++ b/csit/tests/cps-data-operations/cps-data-operations.robot
@@ -53,7 +53,7 @@
         Sleep                            5                         wait some time to get published a message to the client topic
 
 Consume cloud event from client topic
-    ${group_id}=         Create Consumer     port=19092               auto_offset_reset=earliest
+    ${group_id}=         Create Consumer     auto_offset_reset=earliest
     Subscribe Topic      topics=${topic}     group_id=${group_id}
     ${messages}=         Poll                group_id=${group_id}     only_value=false
     ${event}                        Set Variable                      ${messages}[0]
diff --git a/csit/tests/cps-subscriptions/cps-subscription-notification.robot b/csit/tests/cps-subscriptions/cps-subscription-notification.robot
new file mode 100644
index 0000000..b0e8665
--- /dev/null
+++ b/csit/tests/cps-subscriptions/cps-subscription-notification.robot
@@ -0,0 +1,86 @@
+# ============LICENSE_START=======================================================
+# Copyright (C) 2023 Nordix Foundation.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# SPDX-License-Identifier: Apache-2.0
+# ============LICENSE_END=========================================================
+
+
+*** Settings ***
+Library  ConfluentKafkaLibrary
+Library  Collections
+Library  OperatingSystem
+Library  RequestsLibrary
+
+Suite Setup  Starting Test
+
+*** Variables ***
+
+${auth}                 Basic Y3BzdXNlcjpjcHNyMGNrcyE=
+${basePath}             /ncmpInventory/v1/ch
+
+
+*** Test Cases ***
+Create CM Handle
+    Create Session                  CPS_URL                     http://${CPS_CORE_HOST}:${CPS_CORE_PORT}
+    ${headers}                      Create Dictionary           Content-Type=application/json   Authorization=${auth}
+    ${jsonData}=                    Get Binary File             ${DATADIR_SUBS_NOTIFICATION}${/}createCmHandleRequestBody.json
+    ${response}=                    POST On Session             CPS_URL   ${basePath}   headers=${headers}   data=${jsonData}
+    Should Be Equal As Strings      ${response.status_code}     200
+    Sleep                           5                           wait some time to get updated in the db
+
+Verify Kafka flow for Subscription Creation Notification
+    ${group_id}=                    Create Consumer
+    Subscribe Topic                 group_id=${group_id}        topics=${RESPONSE_TOPIC}
+    Wait Until Keyword Succeeds     10x  3s                     All Messages Are Produced and Consumed                  ${group_id}
+    [Teardown]                      Basic Teardown              ${group_id}
+
+*** Keywords ***
+Starting Test
+    Set Suite Variable              ${REQUEST_TOPIC}            subscription
+    Set Suite Variable              ${RESPONSE_TOPIC}           subscription-response
+    ${ncmpOutEventJson}=            Get File                    ${DATADIR_SUBS_NOTIFICATION}${/}cmSubscriptionNcmpOutEventForCsit.json
+    ${ncmpOutEventJson}=            Evaluate                    json.loads("""${ncmpOutEventJson}""")                   json
+    Set Suite Variable              ${ncmpOutEventJsonGlobal}   ${ncmpOutEventJson}
+    ${thread}=                      Start Consumer Threaded     topics=test
+    Set Suite Variable              ${MAIN_THREAD}              ${thread}
+    ${producer_group_id}=           Create Producer
+    Set Suite Variable              ${PRODUCER_ID}              ${producer_group_id}
+    ${ncmpInEventJson}=             Get File                    ${DATADIR_SUBS_NOTIFICATION}${/}cmSubscriptionNcmpInEventForCsit.json         encoding=UTF-8
+    Set Suite Variable              ${ncmpInEventJsonGlobal}    ${ncmpInEventJson}
+    ${headers}=                     Create Dictionary           ce_specversion=1.0  ce_id=some-event-id  ce_source=some-resource  ce_type=subscriptionCreated  ce_correlationid=test-cmhandle1
+    Set Suite Variable              ${headersGlobal}            ${headers}
+
+All Messages Are Produced and Consumed
+    [Arguments]                     ${GROUP_ID}
+    Produce                         group_id=${PRODUCER_ID}     topic=${REQUEST_TOPIC}    value=${ncmpInEventJsonGlobal}    headers=${headersGlobal}
+    Sleep                           10sec
+    ${result}=                      Poll                        group_id=${GROUP_ID}      only_value=False
+    ${headers}                      Set Variable                      ${result[0].headers()}
+    ${value}                        Set Variable                      ${result[0].value()}
+    ${valueAsDict}=                 Evaluate                          json.loads("""${value}""")                              json
+    ${specVersionHeaderValue}       Set Variable                      ${headers[1][1]}
+    ${sourceHeaderValue}            Set Variable                      ${headers[3][1]}
+    ${typeHeaderValue}              Set Variable                      ${headers[4][1]}
+    ${correlationIdHeaderValue}     Set Variable                      ${headers[6][1]}
+    Dictionaries Should Be Equal    ${valueAsDict}                    ${ncmpOutEventJsonGlobal}
+    Should Be Equal As Strings      ${specVersionHeaderValue}         1.0
+    Should Be Equal As Strings      ${sourceHeaderValue}              NCMP
+    Should Be Equal As Strings      ${typeHeaderValue}                subscriptionCreatedStatus
+    Should Be Equal As Strings      ${correlationIdHeaderValue}       SCO-9989752cm-subscription-001
+
+Basic Teardown
+    [Arguments]                     ${group_id}
+    Unsubscribe                     ${group_id}
+    Close Consumer                  ${group_id}
\ No newline at end of file
diff --git a/docker-compose/docker-compose.yml b/docker-compose/docker-compose.yml
index 23f34b4..bb286fd 100644
--- a/docker-compose/docker-compose.yml
+++ b/docker-compose/docker-compose.yml
@@ -46,7 +46,7 @@
       DB_PASSWORD: ${DB_PASSWORD:-cps}
       DMI_USERNAME: ${DMI_USERNAME:-cpsuser}
       DMI_PASSWORD: ${DMI_PASSWORD:-cpsr0cks!}
-      KAFKA_BOOTSTRAP_SERVER: kafka:9092
+      KAFKA_BOOTSTRAP_SERVER: kafka:29092
       notification.enabled: 'true'
       notification.async.executor.time-out-value-in-ms: 2000
       NOTIFICATION_DATASPACE_FILTER_PATTERNS: '.*'
@@ -69,14 +69,14 @@
     image: confluentinc/cp-kafka:6.2.1
     container_name: kafka
     ports:
-      - "19092:19092"
+      - '9092:9092'
     depends_on:
       - zookeeper
     environment:
       KAFKA_BROKER_ID: 1
       KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
-      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,CONNECTIONS_FROM_HOST://localhost:19092
       KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONNECTIONS_FROM_HOST:PLAINTEXT
+      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,CONNECTIONS_FROM_HOST://localhost:9092
       KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
 
   ncmp-dmi-plugin:
@@ -97,9 +97,10 @@
       SDNC_USERNAME: ${SDNC_USERNAME:-admin}
       SDNC_PASSWORD: ${SDNC_PASSWORD:-Kp8bJ4SXszM0WXlhak3eHlcse2gAw84vaoGGmJvUy2U}
       DMI_SERVICE_URL: ${DMI_SERVICE_URL:-http://ncmp-dmi-plugin:8783}
+      DMI_SERVICE_NAME: ${DMI_SERVICE_NAME:-dminame1}
       DMI_USERNAME: ${DMI_USERNAME:-cpsuser}
       DMI_PASSWORD: ${DMI_PASSWORD:-cpsr0cks!}
-      KAFKA_BOOTSTRAP_SERVER: kafka:9092
+      KAFKA_BOOTSTRAP_SERVER: kafka:29092
       notification.data-updated.enabled: 'true'
       NOTIFICATION_DATASPACE_FILTER_PATTERNS: '.*'
     restart: unless-stopped
@@ -112,7 +113,7 @@
     ports:
       - ${DMI_DEMO_STUB_PORT:-8784}:8092
     environment:
-      KAFKA_BOOTSTRAP_SERVER: kafka:9092
+      KAFKA_BOOTSTRAP_SERVER: kafka:29092
       NCMP_CONSUMER_GROUP_ID: ncmp-group
       NCMP_ASYNC_M2M_TOPIC: ncmp-async-m2m
     restart: unless-stopped