hvves related utils update

hvves related utils update

Issue-ID: DCAEGEN2-565
Signed-off-by: marekpl <marek.pondel@nokia.com>
Change-Id: I97d10ef25671ceb6ea6a5987fe0e4f9f19504d39
diff --git a/robotframework-onap/ONAPLibrary/KafkaKeywords.py b/robotframework-onap/ONAPLibrary/KafkaKeywords.py
index 41ef976..0e15cfc 100644
--- a/robotframework-onap/ONAPLibrary/KafkaKeywords.py
+++ b/robotframework-onap/ONAPLibrary/KafkaKeywords.py
@@ -33,7 +33,7 @@
             "bootstrap_servers": kafka_host,
             "sasl_plain_username": sasl_user,
             "sasl_plain_password": sasl_password,
-            "security_protocol": 'SASL_SSL',
+            "security_protocol": 'SASL_PLAINTEXT',
             "ssl_context": ssl.create_default_context(),
             "sasl_mechanism": 'PLAIN'
         }
@@ -51,7 +51,7 @@
         cache = self._cache.switch(alias)
         prod = KafkaProducer(bootstrap_servers=cache['bootstrap_servers'],
                              sasl_plain_username=cache['sasl_plain_username'],
-                             sasl_plain_password=cache['sasl_password'],
+                             sasl_plain_password=cache['sasl_plain_password'],
                              security_protocol=cache['security_protocol'],
                              ssl_context=cache['ssl_context'],
                              sasl_mechanism=cache['sasl_mechanism'],
@@ -70,7 +70,7 @@
         else:
             return msg.value
 
-    def _get_consumer(self, alias, topic_name, consumer_group=None, set_offset_to_earliest=False):
+    def _get_consumer(self, alias, topic_name, consumer_group=None, set_offset_to_earliest=True):
         assert topic_name
 
         # default to the topic as group name
@@ -83,12 +83,15 @@
 
         consumer = KafkaConsumer(bootstrap_servers=cache['bootstrap_servers'],
                                  sasl_plain_username=cache['sasl_plain_username'],
-                                 sasl_plain_password=cache['sasl_password'],
+                                 sasl_plain_password=cache['sasl_plain_password'],
                                  security_protocol=cache['security_protocol'],
                                  ssl_context=cache['ssl_context'],
                                  sasl_mechanism=cache['sasl_mechanism'],
                                  group_id=cgn,
-                                 request_timeout_ms=5000)
+                                 request_timeout_ms=10001)
+
+	consumer.assign([TopicPartition(str(topic_name), 0),TopicPartition(str(topic_name), 1),TopicPartition(str(topic_name), 2)])
+        consumer.poll()
 
         if set_offset_to_earliest:
             consumer.seek_to_beginning()