blob: e262ff0199f16d2282f119073deed5719fce2547 [file] [log] [blame]
Filip Krzywkacc38f112019-04-04 08:44:20 +02001# ============LICENSE_START====================================
2# csit-dcaegen2-collectors-hv-ves
3# =========================================================
4# Copyright (C) 2019 Nokia. All rights reserved.
5# =========================================================
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10# http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17# ============LICENSE_END=====================================
18
19import docker
20from robot.api import logger
21
22KAFKA_IMAGE_FULL_NAME = "wurstmeister/kafka"
23KAFKA_ADDRESS = "kafka:9092"
24ZOOKEEPER_ADDRESS = "zookeeper:2181"
25
26LIST_TOPICS_COMMAND = "kafka-topics.sh --list --zookeeper %s" % ZOOKEEPER_ADDRESS
27TOPIC_STATUS_COMMAND = "kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list " + KAFKA_ADDRESS + " --topic %s --time -1"
28DELETE_TOPIC_COMMAND = "kafka-topics.sh --zookeeper " + ZOOKEEPER_ADDRESS + " --delete --topic %s"
29
30
31class KafkaLibrary:
32
33 def log_kafka_status(self):
34 dockerClient = docker.from_env()
35 kafka = dockerClient.containers.list(filters={"ancestor": KAFKA_IMAGE_FULL_NAME}, all=True)[0]
36
37 topics = self.get_topics(kafka)
38 logger.info("Topics initialized in Kafka cluster: " + str(topics))
39 for topic in topics:
40 if topic == "__consumer_offsets":
41 # kafka-internal topic, ignore it
42 continue
43
44 self.log_topic_status(kafka, topic)
45 self.reset_topic(kafka, topic)
46
47 dockerClient.close()
48
49 def get_topics(self, kafka):
50 exitCode, output = kafka.exec_run(LIST_TOPICS_COMMAND)
51 return output.splitlines()
52
53 def log_topic_status(self, kafka, topic):
54 _, topic_status = kafka.exec_run(TOPIC_STATUS_COMMAND % topic)
55 logger.info("Messages on topic: " + str(topic_status))
56
57 def reset_topic(self, kafka, topic):
58 logger.info("Removing topic " + str(
59 topic) + " (note that it will be recreated by dcae-app-simulator/hv-ves-collector, however the offset will be reseted)")
60 _, output = kafka.exec_run(DELETE_TOPIC_COMMAND % topic)
61 logger.info(str(output))