Use SASL auth in kafka connections

Change-Id: I55a9289901a6a44f3d07a3cf4e5a028399a5d0dc
Issue-ID: DCAEGEN2-1448
Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
diff --git a/development/docker-compose.yml b/development/docker-compose.yml
index 9272c61..1319e39 100644
--- a/development/docker-compose.yml
+++ b/development/docker-compose.yml
@@ -6,22 +6,37 @@
   #
 
   message-router-zookeeper:
-    image: wurstmeister/zookeeper
+    image: nexus3.onap.org:10001/onap/dmaap/zookeeper:4.0.0
     ports:
       - "2181:2181"
 
-  message-router-kafka:
-    #    image: nexus3.onap.org:10001/onap/dmaap/kafka01101:0.0.1
-    image: wurstmeister/kafka
+  message-router-kafka-0:
+    image: nexus3.onap.org:10001/onap/dmaap/kafka111:0.0.6
     ports:
       - "9092:9092"
+      - "9093:9093"
+#    command: "start-kafka.sh"
     environment:
-      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
-      KAFKA_ZOOKEEPER_CONNECT: "message-router-zookeeper:2181"
-      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL_PLAINTEXT:PLAINTEXT,EXTERNAL_PLAINTEXT:PLAINTEXT"
-      KAFKA_ADVERTISED_LISTENERS: "INTERNAL_PLAINTEXT://message-router-kafka:9092"
-      KAFKA_LISTENERS: "INTERNAL_PLAINTEXT://0.0.0.0:9092"
-      KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL_PLAINTEXT"
+      HOST_IP:                                     127.0.0.1
+      KAFKA_BROKER_ID:                             0
+      ENDPOINT_PORT:                               30490
+      KAFKA_ZOOKEEPER_CONNECT:                     "message-router-zookeeper:2181"
+      KAFKA_AUTO_CREATE_TOPICS_ENABLE:             "true"
+      KAFKA_DELETE_TOPIC_ENABLE:                   "true"
+
+      KAFKA_LISTENERS:                             "INTERNAL_SASL_PLAINTEXT://0.0.0.0:9092,EXTERNAL_SASL_PLAINTEXT://0.0.0.0:9093"
+      KAFKA_ADVERTISED_LISTENERS:                  "INTERNAL_SASL_PLAINTEXT://:9092,EXTERNAL_SASL_PLAINTEXT://:9093"
+      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:        "INTERNAL_SASL_PLAINTEXT:SASL_PLAINTEXT,EXTERNAL_SASL_PLAINTEXT:SASL_PLAINTEXT"
+      KAFKA_INTER_BROKER_LISTENER_NAME:            "INTERNAL_SASL_PLAINTEXT"
+      KAFKA_SASL_ENABLED_MECHANISMS:               "PLAIN"
+      KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL:  "PLAIN"
+      KAFKA_AUTHORIZER_CLASS_NAME:                 "org.onap.dmaap.kafkaAuthorize.KafkaCustomAuthorizer"
+
+      aaf_locate_url:                              https://aaf-locate:8095
+      KAFKA_LOG_DIRS:                              /opt/kafka/data
+      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR:      1
+      KAFKA_DEFAULT_REPLICATION_FACTOR:            1
+      KAFKA_NUM_PARTITIONS:                        1
     volumes:
       - /var/run/docker.sock:/var/run/docker.sock
     depends_on:
@@ -47,7 +62,7 @@
                                                 "perf3gpp": {
                                                   "type": "kafka",
                                                   "kafka_info": {
-                                                    "bootstrap_servers": "message-router-kafka:9092",
+                                                    "bootstrap_servers": "message-router-kafka-0:9093",
                                                     "topic_name": "HV_VES_PERF3GPP"
                                                   }
                                                 }
@@ -88,7 +103,7 @@
       retries: 3
       start_period: 15s
     depends_on:
-      - message-router-kafka
+      - message-router-kafka-0
       - config-binding-service
     volumes:
       - ./configuration/:/etc/ves-hv/configuration/
@@ -129,10 +144,10 @@
     ports:
       - "6064:6064/tcp"
     command: ["--listen-port", "6064",
-              "--kafka-bootstrap-servers", "message-router-kafka:9092",
+              "--kafka-bootstrap-servers", "message-router-kafka-0:9092",
               "--kafka-topics", "HV_VES_PERF3GPP"]
     depends_on:
-      - message-router-kafka
+      - message-router-kafka-0
 
   #
   # Monitoring
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/kafka.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/kafka.kt
index 40de8c5..b16ad10 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/kafka.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/kafka.kt
@@ -19,7 +19,11 @@
  */
 package org.onap.dcae.collectors.veshv.impl
 
+import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.security.plain.internals.PlainSaslServer
 import org.onap.dcae.collectors.veshv.domain.VesMessage
 import org.onap.dcae.collectors.veshv.impl.adapters.kafka.ProtobufSerializer
 import org.onap.dcae.collectors.veshv.impl.adapters.kafka.VesMessageSerializer
@@ -34,6 +38,12 @@
 private const val BUFFER_MEMORY_MULTIPLIER = 32
 private const val MINIMUM_BUFFER_MEMORY = 32 * 1024 * 1024
 
+private const val LOGIN_MODULE_CLASS = "org.apache.kafka.common.security.plain.PlainLoginModule"
+private const val USERNAME = "admin"
+private const val PASSWORD = "admin_secret"
+private const val JAAS_CONFIG = "$LOGIN_MODULE_CLASS required username=$USERNAME password=$PASSWORD;"
+private val SASL_PLAINTEXT = (SecurityProtocol.SASL_PLAINTEXT as Enum<SecurityProtocol>).name
+
 internal fun createKafkaSender(sinkStream: SinkStream) =
         (sinkStream as KafkaSink).let { kafkaSink ->
             KafkaSender.create(SenderOptions.create<CommonEventHeader, VesMessage>()
@@ -45,6 +55,9 @@
                     .producerProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1)
                     .producerProperty(ProducerConfig.RETRIES_CONFIG, 1)
                     .producerProperty(ProducerConfig.ACKS_CONFIG, "1")
+                    .producerProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SASL_PLAINTEXT)
+                    .producerProperty(SaslConfigs.SASL_MECHANISM, PlainSaslServer.PLAIN_MECHANISM)
+                    .producerProperty(SaslConfigs.SASL_JAAS_CONFIG, JAAS_CONFIG)
                     .stopOnError(false)
             )
         }
diff --git a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt
index b91e7a1..b5b692d 100644
--- a/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt
+++ b/sources/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/adapters/KafkaSource.kt
@@ -19,7 +19,11 @@
  */
 package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters
 
+import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.common.config.SaslConfigs
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.security.plain.internals.PlainSaslServer.PLAIN_MECHANISM
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import org.onap.dcae.collectors.veshv.utils.logging.Logger
 import reactor.core.publisher.Flux
@@ -40,10 +44,17 @@
     companion object {
         private val logger = Logger(KafkaSource::class)
 
+        private const val LOGIN_MODULE_CLASS = "org.apache.kafka.common.security.plain.PlainLoginModule"
+        private const val USERNAME = "admin"
+        private const val PASSWORD = "admin_secret"
+        private const val JAAS_CONFIG = "$LOGIN_MODULE_CLASS required username=$USERNAME password=$PASSWORD;"
+        private val SASL_PLAINTEXT = (SecurityProtocol.SASL_PLAINTEXT as Enum<SecurityProtocol>).name
+
         fun create(bootstrapServers: String, topics: Set<String>): KafkaSource {
             return KafkaSource(KafkaReceiver.create(createReceiverOptions(bootstrapServers, topics)))
         }
 
+
         fun createReceiverOptions(bootstrapServers: String,
                                   topics: Set<String>): ReceiverOptions<ByteArray, ByteArray>? {
             val props = mapOf<String, Any>(
@@ -52,7 +63,11 @@
                     ConsumerConfig.GROUP_ID_CONFIG to "hv-collector-simulators",
                     ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java,
                     ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to ByteArrayDeserializer::class.java,
-                    ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest"
+                    ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "earliest",
+
+                    CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SASL_PLAINTEXT,
+                    SaslConfigs.SASL_MECHANISM to PLAIN_MECHANISM,
+                    SaslConfigs.SASL_JAAS_CONFIG to JAAS_CONFIG
             )
             return ReceiverOptions.create<ByteArray, ByteArray>(props)
                     .addAssignListener { partitions -> logger.debug { "Partitions assigned $partitions" } }