Calculate throughput for legacy batch read data operations

- stress the cps-and-ncmp module for batch read data operation
- for this aim use shared-ireations during async http request
- as parallel to this, consume expected number of messages
  as fast as possible

Issue-ID: CPS-2268

Change-Id: I1b6724479dac7391cbb6407fda52d15176aa8985
Signed-off-by: halil.cakal <halil.cakal@est.tech>
diff --git a/k6-tests/ncmp/common/passthrough-crud.js b/k6-tests/ncmp/common/passthrough-crud.js
index 0cd96ad..86fcef6 100644
--- a/k6-tests/ncmp/common/passthrough-crud.js
+++ b/k6-tests/ncmp/common/passthrough-crud.js
@@ -23,8 +23,8 @@
     performPostRequest,
     performGetRequest,
     NCMP_BASE_URL,
-    TOPIC_DATA_OPERATIONS_BATCH_READ,
-    TOTAL_CM_HANDLES
+    LEGACY_BATCH_TOPIC_NAME,
+    TOTAL_CM_HANDLES,
 } from './utils.js';
 
 export function passthroughRead(useAlternateId) {
@@ -46,8 +46,8 @@
     return performPostRequest(url, payload, 'passthroughWrite');
 }
 
-export function batchRead(cmHandleIds) {
-    const url = `${NCMP_BASE_URL}/ncmp/v1/data?topic=${TOPIC_DATA_OPERATIONS_BATCH_READ}`;
+export function legacyBatchRead(cmHandleIds) {
+    const url = `${NCMP_BASE_URL}/ncmp/v1/data?topic=${LEGACY_BATCH_TOPIC_NAME}`
     const payload = JSON.stringify({
         "operations": [
             {
diff --git a/k6-tests/ncmp/common/utils.js b/k6-tests/ncmp/common/utils.js
index e6d9c92..5d19056 100644
--- a/k6-tests/ncmp/common/utils.js
+++ b/k6-tests/ncmp/common/utils.js
@@ -26,8 +26,9 @@
 export const READ_DATA_FOR_CM_HANDLE_DELAY_MS = 300; // must have same value as in docker-compose.yml
 export const WRITE_DATA_FOR_CM_HANDLE_DELAY_MS = 670; // must have same value as in docker-compose.yml
 export const CONTENT_TYPE_JSON_PARAM = {'Content-Type': 'application/json'};
-export const DATA_OPERATION_READ_BATCH_SIZE = 200;
-export const TOPIC_DATA_OPERATIONS_BATCH_READ = 'topic-data-operations-batch-read';
+export const LEGACY_BATCH_THROUGHPUT_TEST_BATCH_SIZE = 200;
+export const LEGACY_BATCH_THROUGHPUT_TEST_NUMBER_OF_REQUESTS = 1000;
+export const LEGACY_BATCH_TOPIC_NAME = 'legacy_batch_topic';
 export const KAFKA_BOOTSTRAP_SERVERS = ['localhost:9092'];
 export const MODULE_SET_TAGS = ['tagA','tagB','tagC',' tagD']
 
@@ -92,7 +93,7 @@
         makeSummaryCsvLine('5b', 'NCMP overhead for Synchronous single CM-handle pass-through read with alternate id', 'milliseconds', 'ncmp_overhead_passthrough_read_alt_id', data, options),
         makeSummaryCsvLine('6a', 'NCMP overhead for Synchronous single CM-handle pass-through write', 'milliseconds', 'ncmp_overhead_passthrough_write', data, options),
         makeSummaryCsvLine('6b', 'NCMP overhead for Synchronous single CM-handle pass-through write with alternate id', 'milliseconds', 'ncmp_overhead_passthrough_write_alt_id', data, options),
-        makeSummaryCsvLine('7', 'Data operations batch read', 'events/second', 'data_operations_batch_read_cmhandles_per_second', data, options),
+        makeSummaryCsvLine('7', 'Legacy batch read operation', 'events/second', 'legacy_batch_read_cmhandles_per_second', data, options),
     ];
     return summaryCsvLines.join('\n') + '\n';
 }
diff --git a/k6-tests/ncmp/ncmp-kpi.js b/k6-tests/ncmp/ncmp-kpi.js
index 1d084f2..05500a6 100644
--- a/k6-tests/ncmp/ncmp-kpi.js
+++ b/k6-tests/ncmp/ncmp-kpi.js
@@ -23,12 +23,13 @@
 import { Reader } from 'k6/x/kafka';
 import {
     TOTAL_CM_HANDLES, READ_DATA_FOR_CM_HANDLE_DELAY_MS, WRITE_DATA_FOR_CM_HANDLE_DELAY_MS,
-    makeCustomSummaryReport, makeBatchOfCmHandleIds, DATA_OPERATION_READ_BATCH_SIZE,
-    TOPIC_DATA_OPERATIONS_BATCH_READ, KAFKA_BOOTSTRAP_SERVERS, REGISTRATION_BATCH_SIZE
+    makeCustomSummaryReport, makeBatchOfCmHandleIds, LEGACY_BATCH_THROUGHPUT_TEST_BATCH_SIZE,
+    LEGACY_BATCH_TOPIC_NAME, KAFKA_BOOTSTRAP_SERVERS, REGISTRATION_BATCH_SIZE,
+    LEGACY_BATCH_THROUGHPUT_TEST_NUMBER_OF_REQUESTS
 } from './common/utils.js';
 import { createCmHandles, deleteCmHandles, waitForAllCmHandlesToBeReady } from './common/cmhandle-crud.js';
 import { executeCmHandleSearch, executeCmHandleIdSearch } from './common/search-base.js';
-import { passthroughRead, passthroughReadWithAltId, passthroughWrite, batchRead } from './common/passthrough-crud.js';
+import { passthroughRead, passthroughWrite, legacyBatchRead } from './common/passthrough-crud.js';
 
 let cmHandlesCreatedPerSecondTrend = new Trend('cmhandles_created_per_second', false);
 let cmHandlesDeletedPerSecondTrend = new Trend('cmhandles_deleted_per_second', false);
@@ -38,11 +39,11 @@
 let passthroughWriteNcmpOverheadTrendWithAlternateId = new Trend('ncmp_overhead_passthrough_write_alt_id', true);
 let idSearchDurationTrend = new Trend('id_search_duration', true);
 let cmSearchDurationTrend = new Trend('cm_search_duration', true);
-let dataOperationsBatchReadCmHandlePerSecondTrend = new Trend('data_operations_batch_read_cmhandles_per_second', false);
+let legacyBatchReadCmHandlesPerSecondTrend = new Trend('legacy_batch_read_cmhandles_per_second', false);
 
-const reader = new Reader({
+const legacyBatchEventReader = new Reader({
     brokers: KAFKA_BOOTSTRAP_SERVERS,
-    topic: TOPIC_DATA_OPERATIONS_BATCH_READ,
+    topic: LEGACY_BATCH_TOPIC_NAME,
 });
 
 const DURATION = '15m';
@@ -51,57 +52,55 @@
     setupTimeout: '8m',
     teardownTimeout: '6m',
     scenarios: {
-        passthrough_read: {
+        passthrough_read_scenario: {
             executor: 'constant-vus',
-            exec: 'executePassthroughReadScenario',
+            exec: 'passthroughReadScenario',
             vus: 4,
             duration: DURATION,
         },
-        passthrough_read_alt_id: {
+        passthrough_read_alt_id_scenario: {
             executor: 'constant-vus',
-            exec: 'executePassthroughReadAltIdScenario',
+            exec: 'passthroughReadAltIdScenario',
             vus: 4,
             duration: DURATION,
         },
-        passthrough_write: {
+        passthrough_write_scenario: {
             executor: 'constant-vus',
-            exec: 'executePassthroughWriteScenario',
+            exec: 'passthroughWriteScenario',
             vus: 4,
             duration: DURATION,
         },
-        passthrough_write_alt_id: {
+        passthrough_write_alt_id_scenario: {
             executor: 'constant-vus',
-            exec: 'executePassthroughWriteAltIdScenario',
+            exec: 'passthroughWriteAltIdScenario',
             vus: 4,
             duration: DURATION,
         },
-        cm_handle_id_search: {
+        cm_handle_id_search_scenario: {
             executor: 'constant-vus',
-            exec: 'executeCmHandleIdSearchScenario',
+            exec: 'cmHandleIdSearchScenario',
             vus: 5,
             duration: DURATION,
         },
-        cm_handle_search: {
+        cm_handle_search_scenario: {
             executor: 'constant-vus',
-            exec: 'executeCmHandleSearchScenario',
+            exec: 'cmHandleSearchScenario',
             vus: 5,
             duration: DURATION,
         },
-        data_operation_send_async_http_request: {
-            executor: 'constant-arrival-rate',
-            exec: 'data_operation_send_async_http_request',
-            duration: DURATION,
-            rate: 1,
-            timeUnit: '1s',
-            preAllocatedVUs: 1,
+        legacy_batch_produce_scenario: {
+            executor: 'shared-iterations',
+            exec: 'legacyBatchProduceScenario',
+            vus: 2,
+            iterations: LEGACY_BATCH_THROUGHPUT_TEST_NUMBER_OF_REQUESTS,
+            maxDuration: DURATION,
         },
-        data_operation_async_batch_read: {
-            executor: 'constant-arrival-rate',
-            exec: 'data_operation_async_batch_read',
-            duration: DURATION,
-            rate: 1,
-            timeUnit: '1s',
-            preAllocatedVUs: 1,
+        legacy_batch_consume_scenario: {
+            executor: 'per-vu-iterations',
+            exec: 'legacyBatchConsumeScenario',
+            vus: 1,
+            iterations: 1,
+            maxDuration: DURATION,
         }
     },
     thresholds: {
@@ -114,7 +113,7 @@
         'ncmp_overhead_passthrough_write_alt_id': ['avg <= 40'],
         'id_search_duration': ['avg <= 2000'],
         'cm_search_duration': ['avg <= 15000'],
-        'data_operations_batch_read_cmhandles_per_second': ['avg >= 150'],
+        'legacy_batch_read_cmhandles_per_second': ['avg >= 150'],
     },
 };
 
@@ -156,7 +155,7 @@
     cmHandlesDeletedPerSecondTrend.add(DEREGISTERED_CM_HANDLES / totalDeregistrationTimeInSeconds);
 }
 
-export function executePassthroughReadScenario() {
+export function passthroughReadScenario() {
     const response = passthroughRead(false);
     if (check(response, { 'passthrough read status equals 200': (r) => r.status === 200 })) {
         const overhead = response.timings.duration - READ_DATA_FOR_CM_HANDLE_DELAY_MS;
@@ -164,7 +163,7 @@
     }
 }
 
-export function executePassthroughReadAltIdScenario() {
+export function passthroughReadAltIdScenario() {
     const response = passthroughRead(true);
     if (check(response, { 'passthrough read with alternate Id status equals 200': (r) => r.status === 200 })) {
         const overhead = response.timings.duration - READ_DATA_FOR_CM_HANDLE_DELAY_MS;
@@ -172,7 +171,7 @@
     }
 }
 
-export function executePassthroughWriteScenario() {
+export function passthroughWriteScenario() {
     const response = passthroughWrite(false);
     if (check(response, { 'passthrough write status equals 201': (r) => r.status === 201 })) {
         const overhead = response.timings.duration - WRITE_DATA_FOR_CM_HANDLE_DELAY_MS;
@@ -180,7 +179,7 @@
     }
 }
 
-export function executePassthroughWriteAltIdScenario() {
+export function passthroughWriteAltIdScenario() {
     const response = passthroughWrite(true);
     if (check(response, { 'passthrough write with alternate Id status equals 201': (r) => r.status === 201 })) {
         const overhead = response.timings.duration - WRITE_DATA_FOR_CM_HANDLE_DELAY_MS;
@@ -188,7 +187,7 @@
     }
 }
 
-export function executeCmHandleIdSearchScenario() {
+export function cmHandleIdSearchScenario() {
     const response = executeCmHandleIdSearch('module-and-properties');
     if (check(response, { 'CM handle ID search status equals 200': (r) => r.status === 200 })
      && check(response, { 'CM handle ID search returned expected CM-handles': (r) => r.json('#') === TOTAL_CM_HANDLES })) {
@@ -196,7 +195,7 @@
     }
 }
 
-export function executeCmHandleSearchScenario() {
+export function cmHandleSearchScenario() {
     const response = executeCmHandleSearch('module-and-properties');
     if (check(response, { 'CM handle search status equals 200': (r) => r.status === 200 })
      && check(response, { 'CM handle search returned expected CM-handles': (r) => r.json('#') === TOTAL_CM_HANDLES })) {
@@ -204,18 +203,31 @@
     }
 }
 
-export function data_operation_send_async_http_request() {
-    const nextBatchOfCmHandleIds = makeBatchOfCmHandleIds(DATA_OPERATION_READ_BATCH_SIZE, 0);
-    const response = batchRead(nextBatchOfCmHandleIds);
+export function legacyBatchProduceScenario() {
+    const nextBatchOfCmHandleIds = makeBatchOfCmHandleIds(LEGACY_BATCH_THROUGHPUT_TEST_BATCH_SIZE, 0);
+    const response = legacyBatchRead(nextBatchOfCmHandleIds);
     check(response, { 'data operation batch read status equals 200': (r) => r.status === 200 });
 }
 
-export function data_operation_async_batch_read() {
+export function legacyBatchConsumeScenario() {
+    const TOTAL_MESSAGES_TO_CONSUME = LEGACY_BATCH_THROUGHPUT_TEST_NUMBER_OF_REQUESTS * LEGACY_BATCH_THROUGHPUT_TEST_BATCH_SIZE;
     try {
-        let messages = reader.consume({ limit: DATA_OPERATION_READ_BATCH_SIZE });
-        dataOperationsBatchReadCmHandlePerSecondTrend.add(messages.length);
+        let messagesConsumed = 0;
+        let startTime = Date.now();
+
+        while (messagesConsumed < TOTAL_MESSAGES_TO_CONSUME) {
+            let messages = legacyBatchEventReader.consume({ limit: 1000 });
+
+            if (messages.length > 0) {
+                messagesConsumed += messages.length;
+            }
+        }
+
+        let endTime = Date.now();
+        const timeToConsumeMessagesInSeconds = (endTime - startTime) / 1000.0;
+        legacyBatchReadCmHandlesPerSecondTrend.add(TOTAL_MESSAGES_TO_CONSUME / timeToConsumeMessagesInSeconds);
     } catch (error) {
-        dataOperationsBatchReadCmHandlePerSecondTrend.add(0);
+        legacyBatchReadCmHandlesPerSecondTrend.add(0);
         console.error(error);
     }
 }