blob: fa173f4752a7cf7e504983b58c11e1da80ceb2ee [file] [log] [blame]
rameshiyer27f2609a32023-12-14 14:17:35 +00001#!/usr/bin/env python3
2#
3# ============LICENSE_START====================================================
rameshiyer27f2e4da72024-01-13 21:26:09 +00004# Copyright (C) 2023-2024 Nordix Foundation.
rameshiyer27f2609a32023-12-14 14:17:35 +00005# =============================================================================
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18# SPDX-License-Identifier: Apache-2.0
19# ============LICENSE_END======================================================
20
21# Python utility to fetch kafka topic and look for required messages.
22# Accepts the arguments {topic_name} and {list of expected values} and {timeout} to verify the kafka topic.
23
24
25from confluent_kafka import Consumer, KafkaException
26import sys
27import time
28
rameshiyer27f2e4da72024-01-13 21:26:09 +000029
rameshiyer2731c61d42024-01-21 14:24:03 +000030def consume_kafka_topic(topic, expected_values, timeout, bootstrap_server):
rameshiyer27f2609a32023-12-14 14:17:35 +000031 config = {
rameshiyer2731c61d42024-01-21 14:24:03 +000032 'bootstrap.servers': bootstrap_server,
rameshiyer27f2609a32023-12-14 14:17:35 +000033 'group.id': 'testgrp',
34 'auto.offset.reset': 'earliest'
35 }
36 consumer = Consumer(config)
37 consumer.subscribe([topic])
38 try:
39 start_time = time.time()
40 while time.time() - start_time < timeout:
41 msg = consumer.poll(1.0)
42 if msg is None:
43 continue
44 if msg.error():
45 if msg.error().code() == KafkaException._PARTITION_EOF:
46 sys.stderr.write(f"Reached end of topic {msg.topic()} / partition {msg.partition()}\n")
47 print('ERROR')
48 sys.exit(404)
49 else:
50 # Error
51 raise KafkaException(msg.error())
52 else:
53 # Message received
54 message = msg.value().decode('utf-8')
rameshiyer27f2e4da72024-01-13 21:26:09 +000055 if expected_values in message:
rameshiyer27f2609a32023-12-14 14:17:35 +000056 print(message)
57 sys.exit(200)
58 finally:
59 consumer.close()
60
rameshiyer27f2609a32023-12-14 14:17:35 +000061
62if __name__ == '__main__':
63 topic_name = sys.argv[1]
rameshiyer27f2e4da72024-01-13 21:26:09 +000064 timeout = int(sys.argv[2]) # timeout in seconds for verifying the kafka topic
65 expected_values = sys.argv[3]
rameshiyer2731c61d42024-01-21 14:24:03 +000066 bootstrap_server = sys.argv[4]
67 consume_kafka_topic(topic_name, expected_values, timeout, bootstrap_server)