Merge "Refactor OpenAPI Policy Executor"
diff --git a/cps-application/src/main/resources/application.yml b/cps-application/src/main/resources/application.yml
index 6f08071..83494d6 100644
--- a/cps-application/src/main/resources/application.yml
+++ b/cps-application/src/main/resources/application.yml
@@ -160,6 +160,7 @@
endpoint: ${ONAP_OTEL_EXPORTER_ENDPOINT:http://onap-otel-collector:4317}
protocol: ${ONAP_OTEL_EXPORTER_PROTOCOL:grpc}
enabled: ${ONAP_TRACING_ENABLED:false}
+ excluded-observation-names: ${ONAP_EXCLUDED_OBSERVATION_NAMES:tasks.scheduled.execution}
# Actuator
management:
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/OpenTelemetryConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/OpenTelemetryConfig.java
index cff3187..a6a82b7 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/OpenTelemetryConfig.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/OpenTelemetryConfig.java
@@ -26,7 +26,11 @@
import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter;
import io.opentelemetry.sdk.extension.trace.jaeger.sampler.JaegerRemoteSampler;
import io.opentelemetry.sdk.trace.samplers.Sampler;
+import jakarta.annotation.PostConstruct;
import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.actuate.autoconfigure.observation.ObservationRegistryCustomizer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
@@ -37,11 +41,14 @@
import org.springframework.util.AntPathMatcher;
import org.springframework.util.PathMatcher;
+/**
+ * Configuration class for setting up OpenTelemetry tracing in a Spring Boot application.
+ * This class provides beans for OTLP exporters (gRPC and HTTP), a Jaeger remote sampler,
+ * and customizes the ObservationRegistry to exclude certain endpoints from being observed.
+ */
@Configuration
public class OpenTelemetryConfig {
- public static final int JAEGER_REMOTE_SAMPLER_POLLING_INTERVAL_IN_SECOND = 30;
-
@Value("${spring.application.name:cps-application}")
private String serviceId;
@@ -51,9 +58,29 @@
@Value("${cps.tracing.sampler.jaeger_remote.endpoint:http://onap-otel-collector:14250}")
private String jaegerRemoteSamplerUrl;
+ @Value("${cps.tracing.excluded-observation-names:tasks.scheduled.execution}")
+ private String excludedObservationNamesAsCsv;
+
+ private static final int JAEGER_REMOTE_SAMPLER_POLLING_INTERVAL_IN_SECONDS = 30;
+
+ private List<String> excludedObservationNames;
+
/**
- * OTLP Exporter with Grpc exporter protocol.
- */
+ * Initializes the excludedObservationNames after the bean's properties have been set.
+ * This method is called by the Spring container during bean initialization.
+ */
+ @PostConstruct
+ public void init() {
+ excludedObservationNames = Arrays.stream(excludedObservationNamesAsCsv.split(","))
+ .map(String::trim)
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Creates an OTLP Exporter with gRPC protocol.
+ *
+ * @return OtlpGrpcSpanExporter bean if tracing is enabled and the exporter protocol is gRPC
+ */
@Bean
@ConditionalOnExpression(
"${cps.tracing.enabled} && 'grpc'.equals('${cps.tracing.exporter.protocol}')")
@@ -62,7 +89,9 @@
}
/**
- * OTLP Exporter with HTTP exporter protocol.
+ * Creates an OTLP Exporter with HTTP protocol.
+ *
+ * @return OtlpHttpSpanExporter bean if tracing is enabled and the exporter protocol is HTTP
*/
@Bean
@ConditionalOnExpression(
@@ -72,39 +101,40 @@
}
/**
- * Jaeger Remote Sampler.
+ * Creates a Jaeger Remote Sampler.
+ *
+ * @return JaegerRemoteSampler bean if tracing is enabled
*/
@Bean
@ConditionalOnProperty("cps.tracing.enabled")
public JaegerRemoteSampler createJaegerRemoteSampler() {
return JaegerRemoteSampler.builder()
- .setEndpoint(jaegerRemoteSamplerUrl)
- .setPollingInterval(Duration.ofSeconds(JAEGER_REMOTE_SAMPLER_POLLING_INTERVAL_IN_SECOND))
- .setInitialSampler(Sampler.alwaysOff())
- .setServiceName(serviceId)
- .build();
+ .setEndpoint(jaegerRemoteSamplerUrl)
+ .setPollingInterval(Duration.ofSeconds(JAEGER_REMOTE_SAMPLER_POLLING_INTERVAL_IN_SECONDS))
+ .setInitialSampler(Sampler.alwaysOff())
+ .setServiceName(serviceId)
+ .build();
}
/**
- * Excluding /actuator/** endpoints.
- */
+ * Customizes the ObservationRegistry to exclude /actuator/** endpoints from being observed.
+ *
+ * @return ObservationRegistryCustomizer bean if tracing is enabled
+ */
@Bean
@ConditionalOnProperty("cps.tracing.enabled")
- ObservationRegistryCustomizer<ObservationRegistry> skipActuatorEndpointsFromObservation() {
+ public ObservationRegistryCustomizer<ObservationRegistry> skipActuatorEndpointsFromObservation() {
final PathMatcher pathMatcher = new AntPathMatcher("/");
return registry ->
- registry.observationConfig().observationPredicate(observationPredicate(pathMatcher));
+ registry.observationConfig().observationPredicate(observationPredicate(pathMatcher));
}
- /**
- * Excluding /actuator/** endpoints.
- */
- static ObservationPredicate observationPredicate(final PathMatcher pathMatcher) {
- return (name, context) -> {
+ private ObservationPredicate observationPredicate(final PathMatcher pathMatcher) {
+ return (observationName, context) -> {
if (context instanceof ServerRequestObservationContext observationContext) {
return !pathMatcher.match("/actuator/**", observationContext.getCarrier().getRequestURI());
} else {
- return true;
+ return !excludedObservationNames.contains(observationName);
}
};
}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/config/OpenTelemetryCmNotificationSubscriptionConfigSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/config/OpenTelemetryCmNotificationSubscriptionConfigSpec.groovy
deleted file mode 100644
index 0f69069..0000000
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/config/OpenTelemetryCmNotificationSubscriptionConfigSpec.groovy
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2024 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.config
-
-import io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporter
-import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter
-import io.opentelemetry.sdk.extension.trace.jaeger.sampler.JaegerRemoteSampler
-import org.spockframework.spring.SpringBean
-import org.springframework.boot.actuate.autoconfigure.observation.ObservationRegistryCustomizer
-import spock.lang.Shared
-import spock.lang.Specification
-
-class OpenTelemetryConfigSpec extends Specification{
-
- @Shared
- @SpringBean
- OpenTelemetryConfig openTelemetryConfig = new OpenTelemetryConfig()
-
- def setupSpec() {
- openTelemetryConfig.tracingExporterEndpointUrl="http://tracingExporterEndpointUrl"
- openTelemetryConfig.jaegerRemoteSamplerUrl="http://jaegerremotesamplerurl"
- openTelemetryConfig.serviceId ="cps-application"
- }
-
- def 'OpenTelemetryConfig Construction.'() {
- expect: 'the system can create an instance'
- new OpenTelemetryConfig() != null
- }
-
- def 'OTLP Exporter creation with Grpc protocol'(){
- when: 'an OTLP exporter is created'
- def result = openTelemetryConfig.createOtlpExporterGrpc()
- then: 'an OTLP Exporter is created'
- assert result instanceof OtlpGrpcSpanExporter
- }
-
- def 'OTLP Exporter creation with HTTP protocol'(){
- when: 'an OTLP exporter is created'
- def result = openTelemetryConfig.createOtlpExporterHttp()
- then: 'an OTLP Exporter is created'
- assert result instanceof OtlpHttpSpanExporter
- and:
- assert result.builder.endpoint=="http://tracingExporterEndpointUrl"
- }
-
- def 'Jaeger Remote Sampler Creation'(){
- when: 'an OTLP exporter is created'
- def result = openTelemetryConfig.createJaegerRemoteSampler()
- then: 'an OTLP Exporter is created'
- assert result instanceof JaegerRemoteSampler
- and:
- assert result.delegate.type=="remoteSampling"
- and:
- assert result.delegate.url.toString().startsWith("http://jaegerremotesamplerurl")
- }
-
- def 'Skipping Acutator endpoints'(){
- when: 'an OTLP exporter is created'
- def result = openTelemetryConfig.skipActuatorEndpointsFromObservation()
- then: 'an OTLP Exporter is created'
- assert result instanceof ObservationRegistryCustomizer
- }
-}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/config/OpenTelemetryConfigSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/config/OpenTelemetryConfigSpec.groovy
new file mode 100644
index 0000000..cbff731
--- /dev/null
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/config/OpenTelemetryConfigSpec.groovy
@@ -0,0 +1,113 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.config
+
+import io.micrometer.observation.ObservationPredicate
+import io.micrometer.observation.ObservationRegistry
+import io.micrometer.observation.ObservationRegistry.ObservationConfig
+import io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporter
+import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter
+import io.opentelemetry.sdk.extension.trace.jaeger.sampler.JaegerRemoteSampler
+import org.springframework.beans.factory.annotation.Value
+import org.springframework.boot.test.context.SpringBootTest
+import org.springframework.http.server.observation.ServerRequestObservationContext
+import org.springframework.mock.web.MockHttpServletRequest
+import org.springframework.util.AntPathMatcher
+import spock.lang.Specification
+
+@SpringBootTest(classes = [OpenTelemetryConfig])
+class OpenTelemetryConfigSpec extends Specification {
+
+ def objectUnderTest
+
+ @Value('${cps.tracing.exporter.endpoint}')
+ def tracingExporterEndpointUrl
+
+ @Value('${cps.tracing.sampler.jaeger_remote.endpoint}')
+ def jaegerRemoteSamplerUrl
+
+ def setup() {
+ objectUnderTest = new OpenTelemetryConfig(
+ serviceId: 'sample-app',
+ tracingExporterEndpointUrl: tracingExporterEndpointUrl,
+ jaegerRemoteSamplerUrl: jaegerRemoteSamplerUrl,
+ excludedObservationNames: ['excluded-task-name'])
+ }
+
+ def 'OTLP exporter creation with Grpc protocol'() {
+ when: 'an OTLP exporter is created'
+ def result = objectUnderTest.createOtlpExporterGrpc()
+ then: 'expected an instance of OtlpGrpcSpanExporter'
+ assert result instanceof OtlpGrpcSpanExporter
+ }
+
+ def 'OTLP exporter creation with HTTP protocol'() {
+ when: 'an OTLP exporter is created'
+ def result = objectUnderTest.createOtlpExporterHttp()
+ then: 'an OTLP Exporter is created'
+ assert result instanceof OtlpHttpSpanExporter
+ and: 'the endpoint is correctly set'
+ assert result.builder.endpoint == 'http://exporter-test-url'
+ }
+
+ def 'Jaeger Remote Sampler Creation'() {
+ when: 'a Jaeger remote sampler is created'
+ def result = objectUnderTest.createJaegerRemoteSampler()
+ then: 'a Jaeger remote sampler is created'
+ assert result instanceof JaegerRemoteSampler
+ and: 'the sampler type is correct'
+ assert result.delegate.type == 'remoteSampling'
+ and: 'the sampler endpoint is correctly set'
+ assert result.delegate.url.toString().startsWith('http://jaeger-remote-test-url')
+ }
+
+ def 'Skipping actuator endpoints'() {
+ given: 'a mocked observation registry and config'
+ def observationRegistry = Mock(ObservationRegistry.class)
+ def observationConfig = Mock(ObservationConfig.class)
+ observationRegistry.observationConfig() >> observationConfig
+ when: 'an observation registry customizer is created and applied'
+ def result = objectUnderTest.skipActuatorEndpointsFromObservation()
+ result.customize(observationRegistry)
+ then: 'the observation predicate is set correctly'
+ 1 * observationConfig.observationPredicate(_) >> { ObservationPredicate observationPredicate ->
+ def mockedHttpServletRequest = new MockHttpServletRequest(_ as String, requestUrl)
+ def serverRequestObservationContext = new ServerRequestObservationContext(mockedHttpServletRequest, null)
+ and: 'expected predicate for endpoint'
+ assert observationPredicate.test('some-name', serverRequestObservationContext) == expectedPredicate
+ }
+ where: 'the following parameters are used'
+ scenario | requestUrl || expectedPredicate
+ 'an actuator' | '/actuator' || false
+ 'a non actuator' | '/some-api' || true
+ }
+
+ def 'Observation predicate is configured to filter out excluded tasks by name'() {
+ when: 'a path matcher and observation predicate'
+ def observationPredicate = objectUnderTest.observationPredicate(new AntPathMatcher('/'))
+ then: 'a task name is provided'
+ assert observationPredicate.test(taskName, null) == expectedPredicate
+ where: 'the following parameters are used'
+ taskName || expectedPredicate
+ 'excluded-task-name' || false
+ 'non-excluded-task-name' || true
+ }
+}
diff --git a/cps-ncmp-service/src/test/resources/application.yml b/cps-ncmp-service/src/test/resources/application.yml
index f0790dd..759de83 100644
--- a/cps-ncmp-service/src/test/resources/application.yml
+++ b/cps-ncmp-service/src/test/resources/application.yml
@@ -16,6 +16,15 @@
# SPDX-License-Identifier: Apache-2.0
# ============LICENSE_END=========================================================
+cps:
+ tracing:
+ sampler:
+ jaeger_remote:
+ endpoint: http://jaeger-Remote-test-url
+ exporter:
+ endpoint: http://exporter-test-url
+ enabled: true
+
spring:
kafka:
producer:
diff --git a/docker-compose/docker-compose.yml b/docker-compose/docker-compose.yml
index 86afe78..5af325a 100644
--- a/docker-compose/docker-compose.yml
+++ b/docker-compose/docker-compose.yml
@@ -20,6 +20,7 @@
### docker-compose --profile dmi-service up -d -> run CPS services incl. dmi-plugin ###
### docker-compose --profile dmi-stub --profile monitoring up -d -> run CPS with stubbed dmi-plugin (for registration performance testing)
+ ### docker-compose --profile dmi-stub --profile tracing up -d -> run CPS with stubbed dmi-plugin (for open telemetry tracing testing make ONAP_TRACING_ENABLED "true" later "http://localhost:16686" can be accessed from browser)
### to disable notifications make notification.enabled to false & comment out kafka/zookeeper services ###
dbpostgresql:
@@ -54,6 +55,9 @@
DMI_PASSWORD: ${DMI_PASSWORD:-cpsr0cks!}
KAFKA_BOOTSTRAP_SERVER: kafka:29092
notification.enabled: 'true'
+ ONAP_TRACING_ENABLED: 'false'
+ ONAP_OTEL_SAMPLER_JAEGER_REMOTE_ENDPOINT: http://jaeger-service:14250
+ ONAP_OTEL_EXPORTER_ENDPOINT: http://jaeger-service:4317
restart: unless-stopped
depends_on:
- dbpostgresql
@@ -181,5 +185,26 @@
profiles:
- monitoring
+ kafka-ui:
+ container_name: kafka-ui
+ image: provectuslabs/kafka-ui:latest
+ ports:
+ - 8089:8080
+ environment:
+ DYNAMIC_CONFIG_ENABLED: 'true'
+ KAFKA_CLUSTERS_0_NAME: 'cps-kafka-local'
+ KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
+ profiles:
+ - monitoring
+
+ jaeger-service:
+ container_name: jaeger-service
+ image: jaegertracing/all-in-one:latest
+ ports:
+ - 16686:16686
+ restart: unless-stopped
+ profiles:
+ - tracing
+
volumes:
grafana:
diff --git a/k6-tests/README.md b/k6-tests/README.md
index 0fdebcf..9a385e1 100644
--- a/k6-tests/README.md
+++ b/k6-tests/README.md
@@ -4,8 +4,7 @@
k6 tests are written in JavaScript.
## k6 installation
-Follow the instructions in the [k6 installation guide](https://grafana.com/docs/k6/latest/set-up/install-k6/)
-to get started.
+Follow the instructions in the [build from source guide](https://github.com/mostafa/xk6-kafka) to get started.
## Running the k6 test suites
Simply run the main script. (The script assumes k6 and docker-compose have been installed).
diff --git a/k6-tests/ncmp/common/passthrough-crud.js b/k6-tests/ncmp/common/passthrough-crud.js
index 43a215f..76bda4e 100644
--- a/k6-tests/ncmp/common/passthrough-crud.js
+++ b/k6-tests/ncmp/common/passthrough-crud.js
@@ -19,7 +19,12 @@
*/
import http from 'k6/http';
-import { NCMP_BASE_URL, CONTENT_TYPE_JSON_PARAM, getRandomCmHandleId } from './utils.js';
+import {
+ CONTENT_TYPE_JSON_PARAM,
+ getRandomCmHandleId,
+ NCMP_BASE_URL,
+ TOPIC_DATA_OPERATIONS_BATCH_READ
+} from './utils.js';
export function passthroughRead() {
const cmHandleId = getRandomCmHandleId();
@@ -40,3 +45,21 @@
const response = http.post(url, JSON.stringify(body), CONTENT_TYPE_JSON_PARAM);
return response;
}
+
+export function batchRead(cmHandleIds) {
+ const url = `${NCMP_BASE_URL}/ncmp/v1/data?topic=${TOPIC_DATA_OPERATIONS_BATCH_READ}`
+ const payload = {
+ "operations": [
+ {
+ "resourceIdentifier": "parent/child",
+ "targetIds": cmHandleIds,
+ "datastore": "ncmp-datastore:passthrough-operational",
+ "options": "(fields=schemas/schema)",
+ "operationId": "12",
+ "operation": "read"
+ }
+ ]
+ };
+ const response = http.post(url, JSON.stringify(payload), CONTENT_TYPE_JSON_PARAM);
+ return response;
+}
\ No newline at end of file
diff --git a/k6-tests/ncmp/common/utils.js b/k6-tests/ncmp/common/utils.js
index 0f3b8d9..f24edc5 100644
--- a/k6-tests/ncmp/common/utils.js
+++ b/k6-tests/ncmp/common/utils.js
@@ -25,6 +25,9 @@
export const READ_DATA_FOR_CM_HANDLE_DELAY_MS = 300; // must have same value as in docker-compose.yml
export const WRITE_DATA_FOR_CM_HANDLE_DELAY_MS = 670; // must have same value as in docker-compose.yml
export const CONTENT_TYPE_JSON_PARAM = { headers: {'Content-Type': 'application/json'} };
+export const DATA_OPERATION_READ_BATCH_SIZE = 200;
+export const TOPIC_DATA_OPERATIONS_BATCH_READ = 'topic-data-operations-batch-read';
+export const KAFKA_BOOTSTRAP_SERVERS = ['localhost:9092'];
export function recordTimeInSeconds(functionToExecute) {
const startTimeInMillis = Date.now();
@@ -65,6 +68,7 @@
makeSummaryCsvLine('5b', 'NCMP overhead for Synchronous single CM-handle pass-through read', 'milliseconds', 'ncmp_overhead_passthrough_read', data, options),
makeSummaryCsvLine('6a', 'Synchronous single CM-handle pass-through write', 'requests/second', 'http_reqs{scenario:passthrough_write}', data, options),
makeSummaryCsvLine('6b', 'NCMP overhead for Synchronous single CM-handle pass-through write', 'milliseconds', 'ncmp_overhead_passthrough_write', data, options),
+ makeSummaryCsvLine('7', 'Data operations batch read', 'events/second', 'data_operations_batch_read_cmhandles_per_second', data, options),
];
return summaryCsvLines.join('\n') + '\n';
}
diff --git a/k6-tests/ncmp/ncmp-kpi.js b/k6-tests/ncmp/ncmp-kpi.js
index b4c476e..8ff9ec5 100644
--- a/k6-tests/ncmp/ncmp-kpi.js
+++ b/k6-tests/ncmp/ncmp-kpi.js
@@ -20,16 +20,28 @@
import { check } from 'k6';
import { Gauge, Trend } from 'k6/metrics';
-import { TOTAL_CM_HANDLES, READ_DATA_FOR_CM_HANDLE_DELAY_MS, WRITE_DATA_FOR_CM_HANDLE_DELAY_MS,
- makeCustomSummaryReport, recordTimeInSeconds } from './common/utils.js';
+import {
+ TOTAL_CM_HANDLES, READ_DATA_FOR_CM_HANDLE_DELAY_MS, WRITE_DATA_FOR_CM_HANDLE_DELAY_MS,
+ makeCustomSummaryReport, recordTimeInSeconds, makeBatchOfCmHandleIds, DATA_OPERATION_READ_BATCH_SIZE,
+ TOPIC_DATA_OPERATIONS_BATCH_READ, KAFKA_BOOTSTRAP_SERVERS
+} from './common/utils.js';
import { registerAllCmHandles, deregisterAllCmHandles } from './common/cmhandle-crud.js';
import { executeCmHandleSearch, executeCmHandleIdSearch } from './common/search-base.js';
-import { passthroughRead, passthroughWrite } from './common/passthrough-crud.js';
+import { passthroughRead, passthroughWrite, batchRead } from './common/passthrough-crud.js';
+import {
+ Reader,
+} from 'k6/x/kafka';
let cmHandlesCreatedPerSecondGauge = new Gauge('cmhandles_created_per_second');
let cmHandlesDeletedPerSecondGauge = new Gauge('cmhandles_deleted_per_second');
let passthroughReadNcmpOverheadTrend = new Trend('ncmp_overhead_passthrough_read');
let passthroughWriteNcmpOverheadTrend = new Trend('ncmp_overhead_passthrough_write');
+let dataOperationsBatchReadCmHandlePerSecondTrend = new Trend('data_operations_batch_read_cmhandles_per_second');
+
+const reader = new Reader({
+ brokers: KAFKA_BOOTSTRAP_SERVERS,
+ topic: TOPIC_DATA_OPERATIONS_BATCH_READ,
+});
const DURATION = '15m';
@@ -61,6 +73,22 @@
vus: 3,
duration: DURATION,
},
+ data_operation_send_async_http_request: {
+ executor: 'constant-arrival-rate',
+ exec: 'data_operation_send_async_http_request',
+ duration: DURATION,
+ rate: 1,
+ timeUnit: '1s',
+ preAllocatedVUs: 1,
+ },
+ data_operation_async_batch_read: {
+ executor: 'constant-arrival-rate',
+ exec: 'data_operation_async_batch_read',
+ duration: DURATION,
+ rate: 1,
+ timeUnit: '1s',
+ preAllocatedVUs: 1,
+ }
},
thresholds: {
'cmhandles_created_per_second': ['value >= 22'],
@@ -75,6 +103,9 @@
'http_req_failed{scenario:cm_search_module}': ['rate == 0'],
'http_req_failed{scenario:passthrough_read}': ['rate == 0'],
'http_req_failed{scenario:passthrough_write}': ['rate == 0'],
+ 'http_req_failed{scenario:data_operation_send_async_http_request}': ['rate == 0'],
+ 'kafka_reader_error_count{scenario:data_operation_consume_kafka_responses}': ['count == 0'],
+ 'data_operations_batch_read_cmhandles_per_second': ['avg >= 150'],
},
};
@@ -114,6 +145,22 @@
check(JSON.parse(response.body), { 'module search returned expected CM-handles': (arr) => arr.length === TOTAL_CM_HANDLES });
}
+export function data_operation_send_async_http_request() {
+ const nextBatchOfCmHandleIds = makeBatchOfCmHandleIds(DATA_OPERATION_READ_BATCH_SIZE,1);
+ const response = batchRead(nextBatchOfCmHandleIds)
+ check(response, { 'data operation batch read status equals 200': (r) => r.status === 200 });
+}
+
+export function data_operation_async_batch_read() {
+ try {
+ let messages = reader.consume({ limit: DATA_OPERATION_READ_BATCH_SIZE });
+ dataOperationsBatchReadCmHandlePerSecondTrend.add(messages.length);
+ } catch (error) {
+ dataOperationsBatchReadCmHandlePerSecondTrend.add(0);
+ console.error(error);
+ }
+}
+
export function handleSummary(data) {
return {
stdout: makeCustomSummaryReport(data, options),