blob: 41ef976f8697e1e6874ff072d3bcc375c9f4f3cf [file] [log] [blame]
DR695H6a3fad82019-06-03 21:32:22 -04001# Copyright 2019 AT&T Intellectual Property. All rights reserved.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
DR695Hb8a725c2019-06-18 17:11:30 -040014
15from kafka import KafkaConsumer
16from kafka import KafkaProducer
17import ssl
DR695H6a3fad82019-06-03 21:32:22 -040018from robot.api.deco import keyword
19from robot import utils
20
21
22class KafkaKeywords(object):
23 """ Utilities useful for Kafka consuming and producing """
24
25 def __init__(self):
26 super(KafkaKeywords, self).__init__()
27 self._cache = utils.ConnectionCache('No Kafka Environments created')
28
29 @keyword
DR695Hb8a725c2019-06-18 17:11:30 -040030 def connect(self, alias, kafka_host, sasl_user, sasl_password):
DR695H6a3fad82019-06-03 21:32:22 -040031 """connect to the specified kafka server"""
DR695Hb8a725c2019-06-18 17:11:30 -040032 client = {
33 "bootstrap_servers": kafka_host,
34 "sasl_plain_username": sasl_user,
35 "sasl_plain_password": sasl_password,
36 "security_protocol": 'SASL_SSL',
37 "ssl_context": ssl.create_default_context(),
38 "sasl_mechanism": 'PLAIN'
39 }
DR695H6a3fad82019-06-03 21:32:22 -040040 self._cache.register(client, alias=alias)
41
42 @keyword
43 def produce(self, alias, topic, key, value):
44 assert topic
45 assert value
46
DR695Hb8a725c2019-06-18 17:11:30 -040047 producer = self._get_producer(alias)
48 return producer.send(topic, value=value, key=key)
DR695H6a3fad82019-06-03 21:32:22 -040049
DR695Hb8a725c2019-06-18 17:11:30 -040050 def _get_producer(self, alias):
51 cache = self._cache.switch(alias)
52 prod = KafkaProducer(bootstrap_servers=cache['bootstrap_servers'],
53 sasl_plain_username=cache['sasl_plain_username'],
54 sasl_plain_password=cache['sasl_password'],
55 security_protocol=cache['security_protocol'],
56 ssl_context=cache['ssl_context'],
57 sasl_mechanism=cache['sasl_mechanism'],
58 request_timeout_ms=5000)
DR695H6a3fad82019-06-03 21:32:22 -040059 return prod
60
61 @keyword
62 def consume(self, alias, topic_name, consumer_group=None):
63 assert topic_name
64
65 consumer = self._get_consumer(alias, topic_name, consumer_group)
DR695Hb8a725c2019-06-18 17:11:30 -040066 msg = next(consumer)
67 consumer.close(autocommit=True)
DR695H6a3fad82019-06-03 21:32:22 -040068 if msg is None:
69 return None
70 else:
71 return msg.value
72
73 def _get_consumer(self, alias, topic_name, consumer_group=None, set_offset_to_earliest=False):
74 assert topic_name
75
76 # default to the topic as group name
77 if consumer_group:
78 cgn = consumer_group
79 else:
80 cgn = topic_name
81
DR695Hb8a725c2019-06-18 17:11:30 -040082 cache = self._cache.switch(alias)
DR695H6a3fad82019-06-03 21:32:22 -040083
DR695Hb8a725c2019-06-18 17:11:30 -040084 consumer = KafkaConsumer(bootstrap_servers=cache['bootstrap_servers'],
85 sasl_plain_username=cache['sasl_plain_username'],
86 sasl_plain_password=cache['sasl_password'],
87 security_protocol=cache['security_protocol'],
88 ssl_context=cache['ssl_context'],
89 sasl_mechanism=cache['sasl_mechanism'],
90 group_id=cgn,
91 request_timeout_ms=5000)
92
DR695H6a3fad82019-06-03 21:32:22 -040093 if set_offset_to_earliest:
DR695Hb8a725c2019-06-18 17:11:30 -040094 consumer.seek_to_beginning()
95 else:
96 consumer.seek_to_end()
DR695H6a3fad82019-06-03 21:32:22 -040097
DR695Hb8a725c2019-06-18 17:11:30 -040098 consumer.topics()
DR695H6a3fad82019-06-03 21:32:22 -040099
DR695Hb8a725c2019-06-18 17:11:30 -0400100 return consumer