blob: 62d744c0de9eb91e76214672ed97fca98a0ccd5b [file] [log] [blame]
santanudeafc967b2021-11-01 18:32:18 +05301# Copyright 2021 Xoriant Corporation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15
16from confluent_kafka.admin import AdminClient
17from confluent_kafka import Consumer
18from app_config import AppConfig
19import logging
20import sys
21
22
23class EventConsumer:
24 broker = ""
25 logger = logging.getLogger()
26
27 def __init__(self):
28 appConfig = AppConfig()
29 self.logger = appConfig.getLogger()
30 self.broker = appConfig.getKafkaBroker()
31
32 def consumeEvents(self, topic, consumergroup, consumerid, limit, timeout):
33 self.logger.debug("topic={}, consumergroup={}, consumerid={}, limit={}, timeout={} "
34 .format(topic, consumergroup, consumerid, limit, timeout))
35 consumer_config = {
36 'bootstrap.servers': self.broker,
37 'group.id': consumergroup,
38 'group.instance.id': consumerid,
39 'auto.offset.reset': 'earliest',
40 'enable.auto.commit': 'false'
41 }
42
43 consumer = Consumer(consumer_config)
44 consumer.subscribe([topic])
45 event_list = []
46
47 try:
48 ctr = 0
49 content_size = 0
50 while True:
51 if (ctr == int(limit)):
52 break
53 if(content_size > 300000):
54 break
55 ctr += 1
56 # read single message at a time
57 msg = consumer.poll(timeout=int(timeout))
58 if msg is None:
59 self.logger.debug("No records ")
60 break
61 if msg.error():
62 self.logger.debug("Error reading message : {}".format(msg.error()))
63 break
64 event = msg.value().decode('utf8').replace("'", '"')
65 content_size = content_size + sys.getsizeof(event)
66 event_list.append(event)
67 consumer.commit()
68
69 except Exception as ex:
70 self.logger.debug('Failed to get event information due to unexpected reason! {0}'.format(ex))
71
72 finally:
73 self.logger.debug("closing consumer")
74 consumer.close()
75 return event_list
76
77
78class TopicConsumer:
79
80 broker = ""
81 logger = logging.getLogger()
82 timeout = 10
83
84 def __init__(self):
85 appConfig = AppConfig()
86 self.logger = appConfig.getLogger()
87 self.broker = appConfig.getKafkaBroker()
88
89 def getTopics(self):
90 try:
91 adminClient = AdminClient({"bootstrap.servers": self.broker})
92 ListTopicsResult = adminClient.list_topics(timeout=self.timeout)
93 topic_list = []
94
95 for key, value in ListTopicsResult.topics.items():
96 topic_list.append(key)
97
98 dict = {'topics': topic_list}
99 return dict
100
101 except Exception as ex:
102 self.logger.debug('Failed to get topic information due to unexpected reason! {0}'.format(ex))
103
104 def listAllTopics(self):
105 try:
106 topic_list = []
107 adminClient = AdminClient({"bootstrap.servers": self.broker})
108 ListTopicsResult = adminClient.list_topics(timeout=self.timeout)
109
110 for key, value in ListTopicsResult.topics.items():
111 dict = {'topicName': key,
112 'owner': '',
113 'txenabled': False
114 }
115 topic_list.append(dict)
116
117 dict2 = {'topics': topic_list}
118 return dict2
119 except Exception as ex:
120 self.logger.debug('Failed to get list of topic information due to unexpected reason! {0}'.format(ex))
121
122 def getTopicDetails(self, topic):
123 try:
124 adminClient = AdminClient({"bootstrap.servers": self.broker})
125 ListTopicsResult = adminClient.list_topics(timeout=self.timeout)
126
127 for key, value in ListTopicsResult.topics.items():
128 if (key == topic):
129 dict = {'name': key,
130 'owner': '',
131 'description': '',
132 'readerAcl': {"enabled": True, "users": []},
133 'writerAcl': {"enabled": True, "users": []}
134 }
135 return dict
136
137 self.logger.debug("Topic {} does not exists! ".format(topic))
138 return "Topic [" + topic + "] does not exists"
139 except Exception as ex:
140 self.logger.debug('Failed to get topic detail due to unexpected reason! {0}'.format(ex))