KafkaKeywords update

KafkaKeywords update

Issue-ID: DCAEGEN2-565
Signed-off-by: marekpl <marek.pondel@nokia.com>
Change-Id: I21f13a43347c510f1cb45437ae659e9b41e3f9ac
diff --git a/robotframework-onap/ONAPLibrary/KafkaKeywords.py b/robotframework-onap/ONAPLibrary/KafkaKeywords.py
index c178bc8..f5adce5 100644
--- a/robotframework-onap/ONAPLibrary/KafkaKeywords.py
+++ b/robotframework-onap/ONAPLibrary/KafkaKeywords.py
@@ -71,7 +71,7 @@
         else:
             return msg.value
 
-    def _get_consumer(self, alias, topic_name, consumer_group=None, set_offset_to_earliest=True):
+    def _get_consumer(self, alias, topic_name, consumer_group=None, set_offset_to_earliest=False):
         assert topic_name
 
         # default to the topic as group name
@@ -91,14 +91,15 @@
                                  group_id=cgn,
                                  request_timeout_ms=10001)
 
-	consumer.assign([TopicPartition(str(topic_name), 0),TopicPartition(str(topic_name), 1),TopicPartition(str(topic_name), 2)])
-        consumer.poll()
+	partitions = [TopicPartition(str(topic_name), 0), TopicPartition(str(topic_name), 1), TopicPartition(str(topic_name), 2)]
+	consumer.assign(partitions)
+	last = consumer.end_offsets(partitions)
+	offset = max(last.values())
 
         if set_offset_to_earliest:
             consumer.seek_to_beginning()
         else:
-            consumer.seek_to_end()
-
-        consumer.topics()
+            for tp in partitions:
+		consumer.seek(tp, offset - 1)
 
         return consumer