Add k6 for legacy async batch passthrough read operation
- add kafka support to k6 codebase
- add two new tests: data operation batch passthrough read
and data operation async batch read (from kafka)
- calculate the events/second via Trend
- add kafka ui support to docker-compose
Note: Before this commit being merged, the k6 executable
should have been compiled with the kafka extension of xk6-kafka
Issue-ID: CPS-2268
Change-Id: Ib7777b7bc9f15b210ea36d3541cba0e0c943f883
Signed-off-by: halil.cakal <halil.cakal@est.tech>
diff --git a/docker-compose/docker-compose.yml b/docker-compose/docker-compose.yml
index 86afe78..f537cfe 100644
--- a/docker-compose/docker-compose.yml
+++ b/docker-compose/docker-compose.yml
@@ -181,5 +181,17 @@
profiles:
- monitoring
+ kafka-ui:
+ container_name: kafka-ui
+ image: provectuslabs/kafka-ui:latest
+ ports:
+ - 8089:8080
+ environment:
+ DYNAMIC_CONFIG_ENABLED: 'true'
+ KAFKA_CLUSTERS_0_NAME: 'cps-kafka-local'
+ KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
+ profiles:
+ - monitoring
+
volumes:
grafana:
diff --git a/k6-tests/README.md b/k6-tests/README.md
index 0fdebcf..9a385e1 100644
--- a/k6-tests/README.md
+++ b/k6-tests/README.md
@@ -4,8 +4,7 @@
k6 tests are written in JavaScript.
## k6 installation
-Follow the instructions in the [k6 installation guide](https://grafana.com/docs/k6/latest/set-up/install-k6/)
-to get started.
+Follow the instructions in the [build from source guide](https://github.com/mostafa/xk6-kafka) to get started.
## Running the k6 test suites
Simply run the main script. (The script assumes k6 and docker-compose have been installed).
diff --git a/k6-tests/ncmp/common/passthrough-crud.js b/k6-tests/ncmp/common/passthrough-crud.js
index 43a215f..76bda4e 100644
--- a/k6-tests/ncmp/common/passthrough-crud.js
+++ b/k6-tests/ncmp/common/passthrough-crud.js
@@ -19,7 +19,12 @@
*/
import http from 'k6/http';
-import { NCMP_BASE_URL, CONTENT_TYPE_JSON_PARAM, getRandomCmHandleId } from './utils.js';
+import {
+ CONTENT_TYPE_JSON_PARAM,
+ getRandomCmHandleId,
+ NCMP_BASE_URL,
+ TOPIC_DATA_OPERATIONS_BATCH_READ
+} from './utils.js';
export function passthroughRead() {
const cmHandleId = getRandomCmHandleId();
@@ -40,3 +45,21 @@
const response = http.post(url, JSON.stringify(body), CONTENT_TYPE_JSON_PARAM);
return response;
}
+
+export function batchRead(cmHandleIds) {
+ const url = `${NCMP_BASE_URL}/ncmp/v1/data?topic=${TOPIC_DATA_OPERATIONS_BATCH_READ}`
+ const payload = {
+ "operations": [
+ {
+ "resourceIdentifier": "parent/child",
+ "targetIds": cmHandleIds,
+ "datastore": "ncmp-datastore:passthrough-operational",
+ "options": "(fields=schemas/schema)",
+ "operationId": "12",
+ "operation": "read"
+ }
+ ]
+ };
+ const response = http.post(url, JSON.stringify(payload), CONTENT_TYPE_JSON_PARAM);
+ return response;
+}
\ No newline at end of file
diff --git a/k6-tests/ncmp/common/utils.js b/k6-tests/ncmp/common/utils.js
index 0f3b8d9..f24edc5 100644
--- a/k6-tests/ncmp/common/utils.js
+++ b/k6-tests/ncmp/common/utils.js
@@ -25,6 +25,9 @@
export const READ_DATA_FOR_CM_HANDLE_DELAY_MS = 300; // must have same value as in docker-compose.yml
export const WRITE_DATA_FOR_CM_HANDLE_DELAY_MS = 670; // must have same value as in docker-compose.yml
export const CONTENT_TYPE_JSON_PARAM = { headers: {'Content-Type': 'application/json'} };
+export const DATA_OPERATION_READ_BATCH_SIZE = 200;
+export const TOPIC_DATA_OPERATIONS_BATCH_READ = 'topic-data-operations-batch-read';
+export const KAFKA_BOOTSTRAP_SERVERS = ['localhost:9092'];
export function recordTimeInSeconds(functionToExecute) {
const startTimeInMillis = Date.now();
@@ -65,6 +68,7 @@
makeSummaryCsvLine('5b', 'NCMP overhead for Synchronous single CM-handle pass-through read', 'milliseconds', 'ncmp_overhead_passthrough_read', data, options),
makeSummaryCsvLine('6a', 'Synchronous single CM-handle pass-through write', 'requests/second', 'http_reqs{scenario:passthrough_write}', data, options),
makeSummaryCsvLine('6b', 'NCMP overhead for Synchronous single CM-handle pass-through write', 'milliseconds', 'ncmp_overhead_passthrough_write', data, options),
+ makeSummaryCsvLine('7', 'Data operations batch read', 'events/second', 'data_operations_batch_read_cmhandles_per_second', data, options),
];
return summaryCsvLines.join('\n') + '\n';
}
diff --git a/k6-tests/ncmp/ncmp-kpi.js b/k6-tests/ncmp/ncmp-kpi.js
index 96c6263..9bafff9 100644
--- a/k6-tests/ncmp/ncmp-kpi.js
+++ b/k6-tests/ncmp/ncmp-kpi.js
@@ -20,16 +20,28 @@
import { check } from 'k6';
import { Gauge, Trend } from 'k6/metrics';
-import { TOTAL_CM_HANDLES, READ_DATA_FOR_CM_HANDLE_DELAY_MS, WRITE_DATA_FOR_CM_HANDLE_DELAY_MS,
- makeCustomSummaryReport, recordTimeInSeconds } from './common/utils.js';
+import {
+ TOTAL_CM_HANDLES, READ_DATA_FOR_CM_HANDLE_DELAY_MS, WRITE_DATA_FOR_CM_HANDLE_DELAY_MS,
+ makeCustomSummaryReport, recordTimeInSeconds, makeBatchOfCmHandleIds, DATA_OPERATION_READ_BATCH_SIZE,
+ TOPIC_DATA_OPERATIONS_BATCH_READ, KAFKA_BOOTSTRAP_SERVERS
+} from './common/utils.js';
import { registerAllCmHandles, deregisterAllCmHandles } from './common/cmhandle-crud.js';
import { executeCmHandleSearch, executeCmHandleIdSearch } from './common/search-base.js';
-import { passthroughRead, passthroughWrite } from './common/passthrough-crud.js';
+import { passthroughRead, passthroughWrite, batchRead } from './common/passthrough-crud.js';
+import {
+ Reader,
+} from 'k6/x/kafka';
let cmHandlesCreatedPerSecondGauge = new Gauge('cmhandles_created_per_second');
let cmHandlesDeletedPerSecondGauge = new Gauge('cmhandles_deleted_per_second');
let passthroughReadNcmpOverheadTrend = new Trend('ncmp_overhead_passthrough_read');
let passthroughWriteNcmpOverheadTrend = new Trend('ncmp_overhead_passthrough_write');
+let dataOperationsBatchReadCmHandlePerSecondTrend = new Trend('data_operations_batch_read_cmhandles_per_second');
+
+const reader = new Reader({
+ brokers: KAFKA_BOOTSTRAP_SERVERS,
+ topic: TOPIC_DATA_OPERATIONS_BATCH_READ,
+});
const DURATION = '15m';
@@ -61,6 +73,22 @@
vus: 3,
duration: DURATION,
},
+ data_operation_send_async_http_request: {
+ executor: 'constant-arrival-rate',
+ exec: 'data_operation_send_async_http_request',
+ duration: DURATION,
+ rate: 1,
+ timeUnit: '1s',
+ preAllocatedVUs: 1,
+ },
+ data_operation_async_batch_read: {
+ executor: 'constant-arrival-rate',
+ exec: 'data_operation_async_batch_read',
+ duration: DURATION,
+ rate: 1,
+ timeUnit: '1s',
+ preAllocatedVUs: 1,
+ }
},
thresholds: {
'cmhandles_created_per_second': ['value >= 22'],
@@ -75,6 +103,9 @@
'http_req_failed{scenario:cm_search_module}': ['rate == 0'],
'http_req_failed{scenario:passthrough_read}': ['rate == 0'],
'http_req_failed{scenario:passthrough_write}': ['rate == 0'],
+ 'http_req_failed{scenario:data_operation_send_async_http_request}': ['rate == 0'],
+ 'kafka_reader_error_count{scenario:data_operation_consume_kafka_responses}': ['count == 0'],
+ 'data_operations_batch_read_cmhandles_per_second': ['avg >= 150'],
},
};
@@ -114,6 +145,22 @@
check(JSON.parse(response.body), { 'module search returned expected CM-handles': (arr) => arr.length === TOTAL_CM_HANDLES });
}
+export function data_operation_send_async_http_request() {
+ const nextBatchOfCmHandleIds = makeBatchOfCmHandleIds(DATA_OPERATION_READ_BATCH_SIZE,1);
+ const response = batchRead(nextBatchOfCmHandleIds)
+ check(response, { 'data operation batch read status equals 200': (r) => r.status === 200 });
+}
+
+export function data_operation_async_batch_read() {
+ try {
+ let messages = reader.consume({ limit: DATA_OPERATION_READ_BATCH_SIZE });
+ dataOperationsBatchReadCmHandlePerSecondTrend.add(messages.length);
+ } catch (error) {
+ dataOperationsBatchReadCmHandlePerSecondTrend.add(0);
+ console.error(error);
+ }
+}
+
export function handleSummary(data) {
return {
stdout: makeCustomSummaryReport(data, options),