Merge "Refactor OpenAPI Policy Executor"
diff --git a/cps-application/src/main/resources/application.yml b/cps-application/src/main/resources/application.yml
index 6f08071..83494d6 100644
--- a/cps-application/src/main/resources/application.yml
+++ b/cps-application/src/main/resources/application.yml
@@ -160,6 +160,7 @@
             endpoint: ${ONAP_OTEL_EXPORTER_ENDPOINT:http://onap-otel-collector:4317}
             protocol: ${ONAP_OTEL_EXPORTER_PROTOCOL:grpc}
         enabled: ${ONAP_TRACING_ENABLED:false}
+        excluded-observation-names: ${ONAP_EXCLUDED_OBSERVATION_NAMES:tasks.scheduled.execution}
 
 # Actuator
 management:
diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/OpenTelemetryConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/OpenTelemetryConfig.java
index cff3187..a6a82b7 100644
--- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/OpenTelemetryConfig.java
+++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/OpenTelemetryConfig.java
@@ -26,7 +26,11 @@
 import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter;
 import io.opentelemetry.sdk.extension.trace.jaeger.sampler.JaegerRemoteSampler;
 import io.opentelemetry.sdk.trace.samplers.Sampler;
+import jakarta.annotation.PostConstruct;
 import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.actuate.autoconfigure.observation.ObservationRegistryCustomizer;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
@@ -37,11 +41,14 @@
 import org.springframework.util.AntPathMatcher;
 import org.springframework.util.PathMatcher;
 
+/**
+ * Configuration class for setting up OpenTelemetry tracing in a Spring Boot application.
+ * This class provides beans for OTLP exporters (gRPC and HTTP), a Jaeger remote sampler,
+ * and customizes the ObservationRegistry to exclude certain endpoints from being observed.
+ */
 @Configuration
 public class OpenTelemetryConfig {
 
-    public static final int JAEGER_REMOTE_SAMPLER_POLLING_INTERVAL_IN_SECOND = 30;
-
     @Value("${spring.application.name:cps-application}")
     private String serviceId;
 
@@ -51,9 +58,29 @@
     @Value("${cps.tracing.sampler.jaeger_remote.endpoint:http://onap-otel-collector:14250}")
     private String jaegerRemoteSamplerUrl;
 
+    @Value("${cps.tracing.excluded-observation-names:tasks.scheduled.execution}")
+    private String excludedObservationNamesAsCsv;
+
+    private static final int JAEGER_REMOTE_SAMPLER_POLLING_INTERVAL_IN_SECONDS = 30;
+
+    private List<String> excludedObservationNames;
+
     /**
-    * OTLP Exporter with Grpc exporter protocol.
-    */
+     * Initializes the excludedObservationNames after the bean's properties have been set.
+     * This method is called by the Spring container during bean initialization.
+     */
+    @PostConstruct
+    public void init() {
+        excludedObservationNames = Arrays.stream(excludedObservationNamesAsCsv.split(","))
+                .map(String::trim)
+                .collect(Collectors.toList());
+    }
+
+    /**
+     * Creates an OTLP Exporter with gRPC protocol.
+     *
+     * @return OtlpGrpcSpanExporter bean if tracing is enabled and the exporter protocol is gRPC
+     */
     @Bean
     @ConditionalOnExpression(
         "${cps.tracing.enabled} && 'grpc'.equals('${cps.tracing.exporter.protocol}')")
@@ -62,7 +89,9 @@
     }
 
     /**
-     * OTLP Exporter with HTTP exporter protocol.
+     * Creates an OTLP Exporter with HTTP protocol.
+     *
+     * @return OtlpHttpSpanExporter bean if tracing is enabled and the exporter protocol is HTTP
      */
     @Bean
     @ConditionalOnExpression(
@@ -72,39 +101,40 @@
     }
 
     /**
-     * Jaeger Remote Sampler.
+     * Creates a Jaeger Remote Sampler.
+     *
+     * @return JaegerRemoteSampler bean if tracing is enabled
      */
     @Bean
     @ConditionalOnProperty("cps.tracing.enabled")
     public JaegerRemoteSampler createJaegerRemoteSampler() {
         return JaegerRemoteSampler.builder()
-          .setEndpoint(jaegerRemoteSamplerUrl)
-          .setPollingInterval(Duration.ofSeconds(JAEGER_REMOTE_SAMPLER_POLLING_INTERVAL_IN_SECOND))
-          .setInitialSampler(Sampler.alwaysOff())
-          .setServiceName(serviceId)
-          .build();
+                .setEndpoint(jaegerRemoteSamplerUrl)
+                .setPollingInterval(Duration.ofSeconds(JAEGER_REMOTE_SAMPLER_POLLING_INTERVAL_IN_SECONDS))
+                .setInitialSampler(Sampler.alwaysOff())
+                .setServiceName(serviceId)
+                .build();
     }
 
     /**
-   * Excluding /actuator/** endpoints.
-   */
+     * Customizes the ObservationRegistry to exclude /actuator/** endpoints from being observed.
+     *
+     * @return ObservationRegistryCustomizer bean if tracing is enabled
+     */
     @Bean
     @ConditionalOnProperty("cps.tracing.enabled")
-    ObservationRegistryCustomizer<ObservationRegistry> skipActuatorEndpointsFromObservation() {
+    public ObservationRegistryCustomizer<ObservationRegistry> skipActuatorEndpointsFromObservation() {
         final PathMatcher pathMatcher = new AntPathMatcher("/");
         return registry ->
-          registry.observationConfig().observationPredicate(observationPredicate(pathMatcher));
+                registry.observationConfig().observationPredicate(observationPredicate(pathMatcher));
     }
 
-    /**
-     * Excluding /actuator/** endpoints.
-     */
-    static ObservationPredicate observationPredicate(final PathMatcher pathMatcher) {
-        return (name, context) -> {
+    private ObservationPredicate observationPredicate(final PathMatcher pathMatcher) {
+        return (observationName, context) -> {
             if (context instanceof ServerRequestObservationContext observationContext) {
                 return !pathMatcher.match("/actuator/**", observationContext.getCarrier().getRequestURI());
             } else {
-                return true;
+                return !excludedObservationNames.contains(observationName);
             }
         };
     }
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/config/OpenTelemetryCmNotificationSubscriptionConfigSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/config/OpenTelemetryCmNotificationSubscriptionConfigSpec.groovy
deleted file mode 100644
index 0f69069..0000000
--- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/config/OpenTelemetryCmNotificationSubscriptionConfigSpec.groovy
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * Copyright (C) 2024 Nordix Foundation
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * SPDX-License-Identifier: Apache-2.0
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.cps.ncmp.config
-
-import io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporter
-import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter
-import io.opentelemetry.sdk.extension.trace.jaeger.sampler.JaegerRemoteSampler
-import org.spockframework.spring.SpringBean
-import org.springframework.boot.actuate.autoconfigure.observation.ObservationRegistryCustomizer
-import spock.lang.Shared
-import spock.lang.Specification
-
-class OpenTelemetryConfigSpec extends Specification{
-
-    @Shared
-    @SpringBean
-    OpenTelemetryConfig openTelemetryConfig = new OpenTelemetryConfig()
-
-    def setupSpec() {
-        openTelemetryConfig.tracingExporterEndpointUrl="http://tracingExporterEndpointUrl"
-        openTelemetryConfig.jaegerRemoteSamplerUrl="http://jaegerremotesamplerurl"
-        openTelemetryConfig.serviceId ="cps-application"
-    }
-
-    def 'OpenTelemetryConfig Construction.'() {
-        expect: 'the system can create an instance'
-        new OpenTelemetryConfig() != null
-    }
-
-    def  'OTLP Exporter creation with Grpc protocol'(){
-        when: 'an OTLP exporter is created'
-            def result = openTelemetryConfig.createOtlpExporterGrpc()
-        then: 'an OTLP Exporter is created'
-            assert result instanceof OtlpGrpcSpanExporter
-    }
-
-    def  'OTLP Exporter creation with HTTP protocol'(){
-        when: 'an OTLP exporter is created'
-            def result = openTelemetryConfig.createOtlpExporterHttp()
-        then: 'an OTLP Exporter is created'
-            assert result instanceof OtlpHttpSpanExporter
-        and:
-            assert result.builder.endpoint=="http://tracingExporterEndpointUrl"
-    }
-
-    def  'Jaeger Remote Sampler Creation'(){
-        when: 'an OTLP exporter is created'
-            def result = openTelemetryConfig.createJaegerRemoteSampler()
-        then: 'an OTLP Exporter is created'
-            assert result instanceof JaegerRemoteSampler
-        and:
-            assert result.delegate.type=="remoteSampling"
-        and:
-            assert result.delegate.url.toString().startsWith("http://jaegerremotesamplerurl")
-    }
-
-    def  'Skipping Acutator endpoints'(){
-        when: 'an OTLP exporter is created'
-            def result = openTelemetryConfig.skipActuatorEndpointsFromObservation()
-        then: 'an OTLP Exporter is created'
-            assert result instanceof ObservationRegistryCustomizer
-    }
-}
diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/config/OpenTelemetryConfigSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/config/OpenTelemetryConfigSpec.groovy
new file mode 100644
index 0000000..cbff731
--- /dev/null
+++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/config/OpenTelemetryConfigSpec.groovy
@@ -0,0 +1,113 @@
+/*
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2024 Nordix Foundation
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.cps.ncmp.config
+
+import io.micrometer.observation.ObservationPredicate
+import io.micrometer.observation.ObservationRegistry
+import io.micrometer.observation.ObservationRegistry.ObservationConfig
+import io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporter
+import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter
+import io.opentelemetry.sdk.extension.trace.jaeger.sampler.JaegerRemoteSampler
+import org.springframework.beans.factory.annotation.Value
+import org.springframework.boot.test.context.SpringBootTest
+import org.springframework.http.server.observation.ServerRequestObservationContext
+import org.springframework.mock.web.MockHttpServletRequest
+import org.springframework.util.AntPathMatcher
+import spock.lang.Specification
+
+@SpringBootTest(classes = [OpenTelemetryConfig])
+class OpenTelemetryConfigSpec extends Specification {
+
+    def objectUnderTest
+
+    @Value('${cps.tracing.exporter.endpoint}')
+    def tracingExporterEndpointUrl
+
+    @Value('${cps.tracing.sampler.jaeger_remote.endpoint}')
+    def jaegerRemoteSamplerUrl
+
+    def setup() {
+        objectUnderTest = new OpenTelemetryConfig(
+                serviceId: 'sample-app',
+                tracingExporterEndpointUrl: tracingExporterEndpointUrl,
+                jaegerRemoteSamplerUrl: jaegerRemoteSamplerUrl,
+                excludedObservationNames: ['excluded-task-name'])
+    }
+
+    def 'OTLP exporter creation with Grpc protocol'() {
+        when: 'an OTLP exporter is created'
+            def result = objectUnderTest.createOtlpExporterGrpc()
+        then: 'expected an instance of OtlpGrpcSpanExporter'
+            assert result instanceof OtlpGrpcSpanExporter
+    }
+
+    def 'OTLP exporter creation with HTTP protocol'() {
+        when: 'an OTLP exporter is created'
+            def result = objectUnderTest.createOtlpExporterHttp()
+        then: 'an OTLP Exporter is created'
+            assert result instanceof OtlpHttpSpanExporter
+        and: 'the endpoint is correctly set'
+            assert result.builder.endpoint == 'http://exporter-test-url'
+    }
+
+    def 'Jaeger Remote Sampler Creation'() {
+        when: 'a Jaeger remote sampler is created'
+            def result = objectUnderTest.createJaegerRemoteSampler()
+        then: 'a Jaeger remote sampler is created'
+            assert result instanceof JaegerRemoteSampler
+        and: 'the sampler type is correct'
+            assert result.delegate.type == 'remoteSampling'
+        and: 'the sampler endpoint is correctly set'
+            assert result.delegate.url.toString().startsWith('http://jaeger-remote-test-url')
+    }
+
+    def 'Skipping actuator endpoints'() {
+        given: 'a mocked observation registry and config'
+            def observationRegistry = Mock(ObservationRegistry.class)
+            def observationConfig = Mock(ObservationConfig.class)
+            observationRegistry.observationConfig() >> observationConfig
+        when: 'an observation registry customizer is created and applied'
+            def result = objectUnderTest.skipActuatorEndpointsFromObservation()
+            result.customize(observationRegistry)
+        then: 'the observation predicate is set correctly'
+            1 * observationConfig.observationPredicate(_) >> { ObservationPredicate observationPredicate ->
+                    def mockedHttpServletRequest = new MockHttpServletRequest(_ as String, requestUrl)
+                    def serverRequestObservationContext = new ServerRequestObservationContext(mockedHttpServletRequest, null)
+                and: 'expected predicate for endpoint'
+                    assert observationPredicate.test('some-name', serverRequestObservationContext) == expectedPredicate
+            }
+        where: 'the following parameters are used'
+            scenario         | requestUrl  || expectedPredicate
+            'an actuator'    | '/actuator' || false
+            'a non actuator' | '/some-api' || true
+    }
+
+    def 'Observation predicate is configured to filter out excluded tasks by name'() {
+        when: 'a path matcher and observation predicate'
+            def observationPredicate = objectUnderTest.observationPredicate(new AntPathMatcher('/'))
+        then: 'a task name is provided'
+            assert observationPredicate.test(taskName, null) == expectedPredicate
+        where: 'the following parameters are used'
+            taskName                 || expectedPredicate
+            'excluded-task-name'     || false
+            'non-excluded-task-name' || true
+    }
+}
diff --git a/cps-ncmp-service/src/test/resources/application.yml b/cps-ncmp-service/src/test/resources/application.yml
index f0790dd..759de83 100644
--- a/cps-ncmp-service/src/test/resources/application.yml
+++ b/cps-ncmp-service/src/test/resources/application.yml
@@ -16,6 +16,15 @@
 #  SPDX-License-Identifier: Apache-2.0
 #  ============LICENSE_END=========================================================
 
+cps:
+    tracing:
+        sampler:
+            jaeger_remote:
+                endpoint: http://jaeger-Remote-test-url
+        exporter:
+            endpoint: http://exporter-test-url
+        enabled: true
+
 spring:
     kafka:
         producer:
diff --git a/docker-compose/docker-compose.yml b/docker-compose/docker-compose.yml
index 86afe78..5af325a 100644
--- a/docker-compose/docker-compose.yml
+++ b/docker-compose/docker-compose.yml
@@ -20,6 +20,7 @@
 
   ### docker-compose --profile dmi-service up -d -> run CPS services incl. dmi-plugin ###
   ### docker-compose --profile dmi-stub --profile monitoring up -d -> run CPS with stubbed dmi-plugin (for registration performance testing)
+  ### docker-compose --profile dmi-stub --profile tracing up -d -> run CPS with stubbed dmi-plugin (for open telemetry tracing testing make ONAP_TRACING_ENABLED "true" later "http://localhost:16686" can be accessed from browser)
   ### to disable notifications make notification.enabled to false & comment out kafka/zookeeper services ###
 
   dbpostgresql:
@@ -54,6 +55,9 @@
       DMI_PASSWORD: ${DMI_PASSWORD:-cpsr0cks!}
       KAFKA_BOOTSTRAP_SERVER: kafka:29092
       notification.enabled: 'true'
+      ONAP_TRACING_ENABLED: 'false'
+      ONAP_OTEL_SAMPLER_JAEGER_REMOTE_ENDPOINT: http://jaeger-service:14250
+      ONAP_OTEL_EXPORTER_ENDPOINT: http://jaeger-service:4317
     restart: unless-stopped
     depends_on:
       - dbpostgresql
@@ -181,5 +185,26 @@
     profiles:
       - monitoring
 
+  kafka-ui:
+    container_name: kafka-ui
+    image: provectuslabs/kafka-ui:latest
+    ports:
+      - 8089:8080
+    environment:
+      DYNAMIC_CONFIG_ENABLED: 'true'
+      KAFKA_CLUSTERS_0_NAME: 'cps-kafka-local'
+      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
+    profiles:
+      - monitoring
+
+  jaeger-service:
+    container_name: jaeger-service
+    image: jaegertracing/all-in-one:latest
+    ports:
+      - 16686:16686
+    restart: unless-stopped
+    profiles:
+      - tracing
+
 volumes:
   grafana:
diff --git a/k6-tests/README.md b/k6-tests/README.md
index 0fdebcf..9a385e1 100644
--- a/k6-tests/README.md
+++ b/k6-tests/README.md
@@ -4,8 +4,7 @@
 k6 tests are written in JavaScript.
 
 ## k6 installation
-Follow the instructions in the [k6 installation guide](https://grafana.com/docs/k6/latest/set-up/install-k6/)
-to get started.
+Follow the instructions in the [build from source guide](https://github.com/mostafa/xk6-kafka) to get started.
 
 ## Running the k6 test suites
 Simply run the main script. (The script assumes k6 and docker-compose have been installed).
diff --git a/k6-tests/ncmp/common/passthrough-crud.js b/k6-tests/ncmp/common/passthrough-crud.js
index 43a215f..76bda4e 100644
--- a/k6-tests/ncmp/common/passthrough-crud.js
+++ b/k6-tests/ncmp/common/passthrough-crud.js
@@ -19,7 +19,12 @@
  */
 
 import http from 'k6/http';
-import { NCMP_BASE_URL, CONTENT_TYPE_JSON_PARAM, getRandomCmHandleId } from './utils.js';
+import {
+    CONTENT_TYPE_JSON_PARAM,
+    getRandomCmHandleId,
+    NCMP_BASE_URL,
+    TOPIC_DATA_OPERATIONS_BATCH_READ
+} from './utils.js';
 
 export function passthroughRead() {
     const cmHandleId = getRandomCmHandleId();
@@ -40,3 +45,21 @@
     const response = http.post(url, JSON.stringify(body), CONTENT_TYPE_JSON_PARAM);
     return response;
 }
+
+export function batchRead(cmHandleIds) {
+    const url = `${NCMP_BASE_URL}/ncmp/v1/data?topic=${TOPIC_DATA_OPERATIONS_BATCH_READ}`
+    const  payload = {
+        "operations": [
+            {
+                "resourceIdentifier": "parent/child",
+                "targetIds": cmHandleIds,
+                "datastore": "ncmp-datastore:passthrough-operational",
+                "options": "(fields=schemas/schema)",
+                "operationId": "12",
+                "operation": "read"
+            }
+        ]
+    };
+    const response = http.post(url, JSON.stringify(payload), CONTENT_TYPE_JSON_PARAM);
+    return response;
+}
\ No newline at end of file
diff --git a/k6-tests/ncmp/common/utils.js b/k6-tests/ncmp/common/utils.js
index 0f3b8d9..f24edc5 100644
--- a/k6-tests/ncmp/common/utils.js
+++ b/k6-tests/ncmp/common/utils.js
@@ -25,6 +25,9 @@
 export const READ_DATA_FOR_CM_HANDLE_DELAY_MS = 300; // must have same value as in docker-compose.yml
 export const WRITE_DATA_FOR_CM_HANDLE_DELAY_MS = 670; // must have same value as in docker-compose.yml
 export const CONTENT_TYPE_JSON_PARAM = { headers: {'Content-Type': 'application/json'} };
+export const DATA_OPERATION_READ_BATCH_SIZE = 200;
+export const TOPIC_DATA_OPERATIONS_BATCH_READ = 'topic-data-operations-batch-read';
+export const KAFKA_BOOTSTRAP_SERVERS = ['localhost:9092'];
 
 export function recordTimeInSeconds(functionToExecute) {
     const startTimeInMillis = Date.now();
@@ -65,6 +68,7 @@
         makeSummaryCsvLine('5b', 'NCMP overhead for Synchronous single CM-handle pass-through read', 'milliseconds', 'ncmp_overhead_passthrough_read', data, options),
         makeSummaryCsvLine('6a', 'Synchronous single CM-handle pass-through write', 'requests/second', 'http_reqs{scenario:passthrough_write}', data, options),
         makeSummaryCsvLine('6b', 'NCMP overhead for Synchronous single CM-handle pass-through write', 'milliseconds', 'ncmp_overhead_passthrough_write', data, options),
+        makeSummaryCsvLine('7', 'Data operations batch read', 'events/second', 'data_operations_batch_read_cmhandles_per_second', data, options),
     ];
     return summaryCsvLines.join('\n') + '\n';
 }
diff --git a/k6-tests/ncmp/ncmp-kpi.js b/k6-tests/ncmp/ncmp-kpi.js
index b4c476e..8ff9ec5 100644
--- a/k6-tests/ncmp/ncmp-kpi.js
+++ b/k6-tests/ncmp/ncmp-kpi.js
@@ -20,16 +20,28 @@
 
 import { check } from 'k6';
 import { Gauge, Trend } from 'k6/metrics';
-import { TOTAL_CM_HANDLES, READ_DATA_FOR_CM_HANDLE_DELAY_MS, WRITE_DATA_FOR_CM_HANDLE_DELAY_MS,
-         makeCustomSummaryReport, recordTimeInSeconds } from './common/utils.js';
+import {
+    TOTAL_CM_HANDLES, READ_DATA_FOR_CM_HANDLE_DELAY_MS, WRITE_DATA_FOR_CM_HANDLE_DELAY_MS,
+    makeCustomSummaryReport, recordTimeInSeconds, makeBatchOfCmHandleIds, DATA_OPERATION_READ_BATCH_SIZE,
+    TOPIC_DATA_OPERATIONS_BATCH_READ, KAFKA_BOOTSTRAP_SERVERS
+} from './common/utils.js';
 import { registerAllCmHandles, deregisterAllCmHandles } from './common/cmhandle-crud.js';
 import { executeCmHandleSearch, executeCmHandleIdSearch } from './common/search-base.js';
-import { passthroughRead, passthroughWrite } from './common/passthrough-crud.js';
+import { passthroughRead, passthroughWrite, batchRead } from './common/passthrough-crud.js';
+import {
+    Reader,
+} from 'k6/x/kafka';
 
 let cmHandlesCreatedPerSecondGauge = new Gauge('cmhandles_created_per_second');
 let cmHandlesDeletedPerSecondGauge = new Gauge('cmhandles_deleted_per_second');
 let passthroughReadNcmpOverheadTrend = new Trend('ncmp_overhead_passthrough_read');
 let passthroughWriteNcmpOverheadTrend = new Trend('ncmp_overhead_passthrough_write');
+let dataOperationsBatchReadCmHandlePerSecondTrend = new Trend('data_operations_batch_read_cmhandles_per_second');
+
+const reader = new Reader({
+    brokers: KAFKA_BOOTSTRAP_SERVERS,
+    topic: TOPIC_DATA_OPERATIONS_BATCH_READ,
+});
 
 const DURATION = '15m';
 
@@ -61,6 +73,22 @@
             vus: 3,
             duration: DURATION,
         },
+        data_operation_send_async_http_request: {
+            executor: 'constant-arrival-rate',
+            exec: 'data_operation_send_async_http_request',
+            duration: DURATION,
+            rate: 1,
+            timeUnit: '1s',
+            preAllocatedVUs: 1,
+        },
+        data_operation_async_batch_read: {
+            executor: 'constant-arrival-rate',
+            exec: 'data_operation_async_batch_read',
+            duration: DURATION,
+            rate: 1,
+            timeUnit: '1s',
+            preAllocatedVUs: 1,
+        }
     },
     thresholds: {
         'cmhandles_created_per_second': ['value >= 22'],
@@ -75,6 +103,9 @@
         'http_req_failed{scenario:cm_search_module}': ['rate == 0'],
         'http_req_failed{scenario:passthrough_read}': ['rate == 0'],
         'http_req_failed{scenario:passthrough_write}': ['rate == 0'],
+        'http_req_failed{scenario:data_operation_send_async_http_request}': ['rate == 0'],
+        'kafka_reader_error_count{scenario:data_operation_consume_kafka_responses}': ['count == 0'],
+        'data_operations_batch_read_cmhandles_per_second': ['avg >= 150'],
     },
 };
 
@@ -114,6 +145,22 @@
     check(JSON.parse(response.body), { 'module search returned expected CM-handles': (arr) => arr.length === TOTAL_CM_HANDLES });
 }
 
+export function data_operation_send_async_http_request() {
+    const nextBatchOfCmHandleIds = makeBatchOfCmHandleIds(DATA_OPERATION_READ_BATCH_SIZE,1);
+    const response = batchRead(nextBatchOfCmHandleIds)
+    check(response, { 'data operation batch read status equals 200': (r) => r.status === 200 });
+}
+
+export function data_operation_async_batch_read() {
+    try {
+        let messages = reader.consume({ limit: DATA_OPERATION_READ_BATCH_SIZE });
+        dataOperationsBatchReadCmHandlePerSecondTrend.add(messages.length);
+    } catch (error) {
+        dataOperationsBatchReadCmHandlePerSecondTrend.add(0);
+        console.error(error);
+    }
+}
+
 export function handleSummary(data) {
     return {
         stdout: makeCustomSummaryReport(data, options),