Add k6 for legacy async batch passthrough read operation

- add kafka support to k6 codebase
- add two new tests: data operation batch passthrough read
   and data operation async batch read (from kafka)
- calculate the events/second via Trend
- add kafka ui support to docker-compose

Note: Before this commit being merged, the k6 executable
should have been compiled with the kafka extension of xk6-kafka

Issue-ID: CPS-2268

Change-Id: Ib7777b7bc9f15b210ea36d3541cba0e0c943f883
Signed-off-by: halil.cakal <halil.cakal@est.tech>
diff --git a/k6-tests/ncmp/ncmp-kpi.js b/k6-tests/ncmp/ncmp-kpi.js
index 96c6263..9bafff9 100644
--- a/k6-tests/ncmp/ncmp-kpi.js
+++ b/k6-tests/ncmp/ncmp-kpi.js
@@ -20,16 +20,28 @@
 
 import { check } from 'k6';
 import { Gauge, Trend } from 'k6/metrics';
-import { TOTAL_CM_HANDLES, READ_DATA_FOR_CM_HANDLE_DELAY_MS, WRITE_DATA_FOR_CM_HANDLE_DELAY_MS,
-         makeCustomSummaryReport, recordTimeInSeconds } from './common/utils.js';
+import {
+    TOTAL_CM_HANDLES, READ_DATA_FOR_CM_HANDLE_DELAY_MS, WRITE_DATA_FOR_CM_HANDLE_DELAY_MS,
+    makeCustomSummaryReport, recordTimeInSeconds, makeBatchOfCmHandleIds, DATA_OPERATION_READ_BATCH_SIZE,
+    TOPIC_DATA_OPERATIONS_BATCH_READ, KAFKA_BOOTSTRAP_SERVERS
+} from './common/utils.js';
 import { registerAllCmHandles, deregisterAllCmHandles } from './common/cmhandle-crud.js';
 import { executeCmHandleSearch, executeCmHandleIdSearch } from './common/search-base.js';
-import { passthroughRead, passthroughWrite } from './common/passthrough-crud.js';
+import { passthroughRead, passthroughWrite, batchRead } from './common/passthrough-crud.js';
+import {
+    Reader,
+} from 'k6/x/kafka';
 
 let cmHandlesCreatedPerSecondGauge = new Gauge('cmhandles_created_per_second');
 let cmHandlesDeletedPerSecondGauge = new Gauge('cmhandles_deleted_per_second');
 let passthroughReadNcmpOverheadTrend = new Trend('ncmp_overhead_passthrough_read');
 let passthroughWriteNcmpOverheadTrend = new Trend('ncmp_overhead_passthrough_write');
+let dataOperationsBatchReadCmHandlePerSecondTrend = new Trend('data_operations_batch_read_cmhandles_per_second');
+
+const reader = new Reader({
+    brokers: KAFKA_BOOTSTRAP_SERVERS,
+    topic: TOPIC_DATA_OPERATIONS_BATCH_READ,
+});
 
 const DURATION = '15m';
 
@@ -61,6 +73,22 @@
             vus: 3,
             duration: DURATION,
         },
+        data_operation_send_async_http_request: {
+            executor: 'constant-arrival-rate',
+            exec: 'data_operation_send_async_http_request',
+            duration: DURATION,
+            rate: 1,
+            timeUnit: '1s',
+            preAllocatedVUs: 1,
+        },
+        data_operation_async_batch_read: {
+            executor: 'constant-arrival-rate',
+            exec: 'data_operation_async_batch_read',
+            duration: DURATION,
+            rate: 1,
+            timeUnit: '1s',
+            preAllocatedVUs: 1,
+        }
     },
     thresholds: {
         'cmhandles_created_per_second': ['value >= 22'],
@@ -75,6 +103,9 @@
         'http_req_failed{scenario:cm_search_module}': ['rate == 0'],
         'http_req_failed{scenario:passthrough_read}': ['rate == 0'],
         'http_req_failed{scenario:passthrough_write}': ['rate == 0'],
+        'http_req_failed{scenario:data_operation_send_async_http_request}': ['rate == 0'],
+        'kafka_reader_error_count{scenario:data_operation_consume_kafka_responses}': ['count == 0'],
+        'data_operations_batch_read_cmhandles_per_second': ['avg >= 150'],
     },
 };
 
@@ -114,6 +145,22 @@
     check(JSON.parse(response.body), { 'module search returned expected CM-handles': (arr) => arr.length === TOTAL_CM_HANDLES });
 }
 
+export function data_operation_send_async_http_request() {
+    const nextBatchOfCmHandleIds = makeBatchOfCmHandleIds(DATA_OPERATION_READ_BATCH_SIZE,1);
+    const response = batchRead(nextBatchOfCmHandleIds)
+    check(response, { 'data operation batch read status equals 200': (r) => r.status === 200 });
+}
+
+export function data_operation_async_batch_read() {
+    try {
+        let messages = reader.consume({ limit: DATA_OPERATION_READ_BATCH_SIZE });
+        dataOperationsBatchReadCmHandlePerSecondTrend.add(messages.length);
+    } catch (error) {
+        dataOperationsBatchReadCmHandlePerSecondTrend.add(0);
+        console.error(error);
+    }
+}
+
 export function handleSummary(data) {
     return {
         stdout: makeCustomSummaryReport(data, options),