blob: 0e15cfcc18508ba2b6ca841f1d0267e3e4b4033c [file] [log] [blame]
DR695H6a3fad82019-06-03 21:32:22 -04001# Copyright 2019 AT&T Intellectual Property. All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
DR695Hb8a725c2019-06-18 17:11:30 -040014
15from kafka import KafkaConsumer
16from kafka import KafkaProducer
17import ssl
DR695H6a3fad82019-06-03 21:32:22 -040018from robot.api.deco import keyword
19from robot import utils
20
21
22class KafkaKeywords(object):
23 """ Utilities useful for Kafka consuming and producing """
24
25 def __init__(self):
26 super(KafkaKeywords, self).__init__()
27 self._cache = utils.ConnectionCache('No Kafka Environments created')
28
29 @keyword
DR695Hb8a725c2019-06-18 17:11:30 -040030 def connect(self, alias, kafka_host, sasl_user, sasl_password):
DR695H6a3fad82019-06-03 21:32:22 -040031 """connect to the specified kafka server"""
DR695Hb8a725c2019-06-18 17:11:30 -040032 client = {
33 "bootstrap_servers": kafka_host,
34 "sasl_plain_username": sasl_user,
35 "sasl_plain_password": sasl_password,
marekpl7d878252019-07-22 17:53:27 +020036 "security_protocol": 'SASL_PLAINTEXT',
DR695Hb8a725c2019-06-18 17:11:30 -040037 "ssl_context": ssl.create_default_context(),
38 "sasl_mechanism": 'PLAIN'
39 }
DR695H6a3fad82019-06-03 21:32:22 -040040 self._cache.register(client, alias=alias)
41
42 @keyword
43 def produce(self, alias, topic, key, value):
44 assert topic
45 assert value
46
DR695Hb8a725c2019-06-18 17:11:30 -040047 producer = self._get_producer(alias)
48 return producer.send(topic, value=value, key=key)
DR695H6a3fad82019-06-03 21:32:22 -040049
DR695Hb8a725c2019-06-18 17:11:30 -040050 def _get_producer(self, alias):
51 cache = self._cache.switch(alias)
52 prod = KafkaProducer(bootstrap_servers=cache['bootstrap_servers'],
53 sasl_plain_username=cache['sasl_plain_username'],
marekpl7d878252019-07-22 17:53:27 +020054 sasl_plain_password=cache['sasl_plain_password'],
DR695Hb8a725c2019-06-18 17:11:30 -040055 security_protocol=cache['security_protocol'],
56 ssl_context=cache['ssl_context'],
57 sasl_mechanism=cache['sasl_mechanism'],
58 request_timeout_ms=5000)
DR695H6a3fad82019-06-03 21:32:22 -040059 return prod
60
61 @keyword
62 def consume(self, alias, topic_name, consumer_group=None):
63 assert topic_name
64
65 consumer = self._get_consumer(alias, topic_name, consumer_group)
DR695Hb8a725c2019-06-18 17:11:30 -040066 msg = next(consumer)
67 consumer.close(autocommit=True)
DR695H6a3fad82019-06-03 21:32:22 -040068 if msg is None:
69 return None
70 else:
71 return msg.value
72
marekpl7d878252019-07-22 17:53:27 +020073 def _get_consumer(self, alias, topic_name, consumer_group=None, set_offset_to_earliest=True):
DR695H6a3fad82019-06-03 21:32:22 -040074 assert topic_name
75
76 # default to the topic as group name
77 if consumer_group:
78 cgn = consumer_group
79 else:
80 cgn = topic_name
81
DR695Hb8a725c2019-06-18 17:11:30 -040082 cache = self._cache.switch(alias)
DR695H6a3fad82019-06-03 21:32:22 -040083
DR695Hb8a725c2019-06-18 17:11:30 -040084 consumer = KafkaConsumer(bootstrap_servers=cache['bootstrap_servers'],
85 sasl_plain_username=cache['sasl_plain_username'],
marekpl7d878252019-07-22 17:53:27 +020086 sasl_plain_password=cache['sasl_plain_password'],
DR695Hb8a725c2019-06-18 17:11:30 -040087 security_protocol=cache['security_protocol'],
88 ssl_context=cache['ssl_context'],
89 sasl_mechanism=cache['sasl_mechanism'],
90 group_id=cgn,
marekpl7d878252019-07-22 17:53:27 +020091 request_timeout_ms=10001)
92
93 consumer.assign([TopicPartition(str(topic_name), 0),TopicPartition(str(topic_name), 1),TopicPartition(str(topic_name), 2)])
94 consumer.poll()
DR695Hb8a725c2019-06-18 17:11:30 -040095
DR695H6a3fad82019-06-03 21:32:22 -040096 if set_offset_to_earliest:
DR695Hb8a725c2019-06-18 17:11:30 -040097 consumer.seek_to_beginning()
98 else:
99 consumer.seek_to_end()
DR695H6a3fad82019-06-03 21:32:22 -0400100
DR695Hb8a725c2019-06-18 17:11:30 -0400101 consumer.topics()
DR695H6a3fad82019-06-03 21:32:22 -0400102
DR695Hb8a725c2019-06-18 17:11:30 -0400103 return consumer