[SDC] Add kafka native messaging

Change-Id: I5ab8f580947cbc264d94bec48a5e8b659dc44c08
Issue-ID: DMAAP-1787
Signed-off-by: david.mcweeney <david.mcweeney@est.tech>
diff --git a/catalog-be/pom.xml b/catalog-be/pom.xml
index 7b060c9..edd673c 100644
--- a/catalog-be/pom.xml
+++ b/catalog-be/pom.xml
@@ -1043,6 +1043,10 @@
                 </exclusion>
             </exclusions>
         </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+        </dependency>
 
     </dependencies>
 
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java
index d61e150..00d3fed 100644
--- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java
@@ -20,6 +20,7 @@
 package org.openecomp.sdc.be.components.distribution.engine;
 
 import fj.data.Either;
+import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.Executors;
@@ -27,7 +28,7 @@
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
+import org.openecomp.sdc.be.components.kafka.KafkaHandler;
 import org.openecomp.sdc.be.config.BeEcompErrorManager;
 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
 import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
@@ -60,6 +61,7 @@
     private AtomicBoolean status = null;
     private OperationalEnvironmentEntry environmentEntry;
     private CambriaHandler cambriaHandler = new CambriaHandler();
+    private KafkaHandler kafkaHandler = new KafkaHandler();
     private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
 
     public DistributionEngineInitTask(Long delayBeforeStartFlow, DistributionEngineConfiguration deConfiguration, String envName,
@@ -136,9 +138,7 @@
 
     @Override
     public void run() {
-        boolean result = false;
-        result = initFlow();
-        if (result) {
+        if (initFlow()) {
             this.stopTask();
             this.status.set(true);
             if (this.distributionEnginePollingTask != null) {
@@ -159,38 +159,45 @@
      * @return
      */
     public boolean initFlow() {
-        logger.trace("Start init flow for environment {}", this.envName);
-        Set<String> topicsList = null;
-        Either<Set<String>, CambriaErrorResponse> getTopicsRes = null;
-        getTopicsRes = cambriaHandler.getTopics(environmentEntry.getDmaapUebAddress().stream().collect(Collectors.toList()));
-        if (getTopicsRes.isRight()) {
-            CambriaErrorResponse status = getTopicsRes.right().value();
-            if (status.getOperationStatus() == CambriaOperationStatus.NOT_FOUND) {
-                topicsList = new HashSet<>();
+        logger.info("Start init flow for environment {}", this.envName);
+        if (Boolean.FALSE.equals(kafkaHandler.isKafkaActive())) {
+            Set<String> topicsList;
+            Either<Set<String>, CambriaErrorResponse> getTopicsRes;
+            getTopicsRes = cambriaHandler.getTopics(new ArrayList<>(environmentEntry.getDmaapUebAddress()));
+            if (getTopicsRes.isRight()) {
+                CambriaErrorResponse cambriaErrorResponse = getTopicsRes.right().value();
+                if (cambriaErrorResponse.getOperationStatus() == CambriaOperationStatus.NOT_FOUND) {
+                    topicsList = new HashSet<>();
+                } else {
+                    BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW,
+                        "try retrieve list of topics from U-EB server");
+                    return false;
+                }
             } else {
-                BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try retrieve list of topics from U-EB server");
+                topicsList = getTopicsRes.left().value();
+            }
+            String notificationTopic = buildTopicName(deConfiguration.getDistributionNotifTopicName(), this.envName);
+            logger.debug("Going to handle topic {}", notificationTopic);
+            if (!createNotificationTopicIfNotExists(topicsList, notificationTopic)) {
                 return false;
             }
+            CambriaErrorResponse registerProducerStatus = registerToTopic(notificationTopic,
+                SubscriberTypeEnum.PRODUCER);
+            CambriaOperationStatus createStatus = registerProducerStatus.getOperationStatus();
+            if (createStatus != CambriaOperationStatus.OK) {
+                return false;
+            }
+            String statusTopic = buildTopicName(deConfiguration.getDistributionStatusTopicName(), this.envName);
+            logger.debug("Going to handle topic {}", statusTopic);
+            if (!createStatusTopicIfNotExists(topicsList, statusTopic)) {
+                return false;
+            }
+            CambriaErrorResponse registerConcumerStatus = registerToTopic(statusTopic, SubscriberTypeEnum.CONSUMER);
+            return registerConcumerStatus.getOperationStatus() == CambriaOperationStatus.OK;
         } else {
-            topicsList = getTopicsRes.left().value();
+            logger.info("Skipping DisributionEngineInitTask flow to use kafka native for distribution messaging");
+            return true;
         }
-        String notificationTopic = buildTopicName(deConfiguration.getDistributionNotifTopicName(), this.envName);
-        logger.debug("Going to handle topic {}", notificationTopic);
-        if (!createNotificationTopicIfNotExists(topicsList, notificationTopic)) {
-            return false;
-        }
-        CambriaErrorResponse registerProducerStatus = registerToTopic(notificationTopic, SubscriberTypeEnum.PRODUCER);
-        CambriaOperationStatus createStatus = registerProducerStatus.getOperationStatus();
-        if (createStatus != CambriaOperationStatus.OK) {
-            return false;
-        }
-        String statusTopic = buildTopicName(deConfiguration.getDistributionStatusTopicName(), this.envName);
-        logger.debug("Going to handle topic {}", statusTopic);
-        if (!createStatusTopicIfNotExists(topicsList, statusTopic)) {
-            return false;
-        }
-        CambriaErrorResponse registerConcumerStatus = registerToTopic(statusTopic, SubscriberTypeEnum.CONSUMER);
-        return registerConcumerStatus.getOperationStatus() == CambriaOperationStatus.OK;
     }
 
     private CambriaErrorResponse registerToTopic(String topicName, SubscriberTypeEnum subscriberType) {
@@ -281,4 +288,8 @@
     protected void setCambriaHandler(CambriaHandler cambriaHandler) {
         this.cambriaHandler = cambriaHandler;
     }
+
+    protected void setKafkaHandler(KafkaHandler kafkaHandler) {
+        this.kafkaHandler = kafkaHandler;
+    }
 }
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEnginePollingTask.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEnginePollingTask.java
index 1246710..ab4400a 100644
--- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEnginePollingTask.java
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEnginePollingTask.java
@@ -29,6 +29,7 @@
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.concurrent.BasicThreadFactory;
 import org.openecomp.sdc.be.components.distribution.engine.report.DistributionCompleteReporter;
+import org.openecomp.sdc.be.components.kafka.KafkaHandler;
 import org.openecomp.sdc.be.config.BeEcompErrorManager;
 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
 import org.openecomp.sdc.be.config.DistributionEngineConfiguration.DistributionStatusTopicConfig;
@@ -51,6 +52,7 @@
     private String consumerId;
     private String consumerGroup;
     private CambriaHandler cambriaHandler = new CambriaHandler();
+    private final KafkaHandler kafkaHandler = new KafkaHandler();
     private Gson gson = new GsonBuilder().setPrettyPrinting().create();
     private DistributionCompleteReporter distributionCompleteReporter;
     private ScheduledExecutorService scheduledPollingService = Executors
@@ -82,9 +84,12 @@
             fetchTimeoutInSec = 15;
         }
         try {
-            cambriaConsumer = cambriaHandler
-                .createConsumer(environmentEntry.getDmaapUebAddress(), topicName, environmentEntry.getUebApikey(), environmentEntry.getUebSecretKey(),
-                    consumerId, consumerGroup, fetchTimeoutInSec * 1000);
+            if (Boolean.FALSE.equals(kafkaHandler.isKafkaActive())) {
+                cambriaConsumer = cambriaHandler
+                    .createConsumer(environmentEntry.getDmaapUebAddress(), topicName, environmentEntry.getUebApikey(),
+                        environmentEntry.getUebSecretKey(),
+                        consumerId, consumerGroup, fetchTimeoutInSec * 1000);
+            }
             if (scheduledPollingService != null) {
                 logger.debug("Start Distribution Engine polling task. polling interval {} seconds", pollingIntervalInSec);
                 scheduledFuture = scheduledPollingService.scheduleAtFixedRate(this, 0, pollingIntervalInSec, TimeUnit.SECONDS);
@@ -119,14 +124,20 @@
     @Override
     public void run() {
         logger.trace("run() method. polling queue {}", topicName);
+        Either<Iterable<String>, CambriaErrorResponse> fetchResult;
         try {
             // init error
-            if (cambriaConsumer == null) {
-                BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "polling task was not initialized properly");
-                stopTask();
-                return;
+            if (Boolean.FALSE.equals(kafkaHandler.isKafkaActive())) {
+                if (cambriaConsumer == null) {
+                    BeEcompErrorManager.getInstance()
+                        .logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "polling task was not initialized properly");
+                    stopTask();
+                    return;
+                }
+                fetchResult = cambriaHandler.fetchFromTopic(cambriaConsumer);
+            } else {
+                fetchResult = kafkaHandler.fetchFromTopic(topicName);
             }
-            Either<Iterable<String>, CambriaErrorResponse> fetchResult = cambriaHandler.fetchFromTopic(cambriaConsumer);
             // fetch error
             if (fetchResult.isRight()) {
                 CambriaErrorResponse errorResponse = fetchResult.right().value();
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionNotificationSender.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionNotificationSender.java
index 0098eac..b93d485 100644
--- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionNotificationSender.java
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionNotificationSender.java
@@ -19,6 +19,7 @@
  */
 package org.openecomp.sdc.be.components.distribution.engine;
 
+import org.openecomp.sdc.be.components.kafka.KafkaHandler;
 import org.openecomp.sdc.be.config.ConfigurationManager;
 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
 import org.openecomp.sdc.be.dao.api.ActionStatus;
@@ -36,16 +37,26 @@
     private static final Logger logger = Logger.getLogger(DistributionNotificationSender.class.getName());
     @javax.annotation.Resource
     protected ComponentsUtils componentUtils;
-    private CambriaHandler cambriaHandler = new CambriaHandler();
-    private DistributionEngineConfiguration deConfiguration = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration();
+    private final CambriaHandler cambriaHandler = new CambriaHandler();
+
+    private final KafkaHandler kafkaHandler = new KafkaHandler();
+
+//
+    private final DistributionEngineConfiguration deConfiguration = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration();
 
     public ActionStatus sendNotification(String topicName, String distributionId, EnvironmentMessageBusData messageBusData,
                                          INotificationData notificationData, Service service, User modifier) {
         long startTime = System.currentTimeMillis();
-        CambriaErrorResponse status = cambriaHandler
-            .sendNotificationAndClose(topicName, messageBusData.getUebPublicKey(), messageBusData.getUebPrivateKey(),
-                messageBusData.getDmaaPuebEndpoints(), notificationData,
-                deConfiguration.getDistributionNotificationTopic().getMaxWaitingAfterSendingSeconds());
+        CambriaErrorResponse status;
+        if (Boolean.FALSE.equals(kafkaHandler.isKafkaActive())) {
+            status = cambriaHandler
+                .sendNotificationAndClose(topicName, messageBusData.getUebPublicKey(), messageBusData.getUebPrivateKey(),
+                    messageBusData.getDmaaPuebEndpoints(), notificationData,
+                    deConfiguration.getDistributionNotificationTopic().getMaxWaitingAfterSendingSeconds());
+        }
+        else{
+            status = kafkaHandler.sendNotification(topicName, notificationData);
+        }
         logger.info("After publishing service {} of version {}. Status is {}", service.getName(), service.getVersion(), status.getHttpCode());
         auditDistributionNotification(
             new AuditDistributionNotificationBuilder().setTopicName(topicName).setDistributionId(distributionId).setStatus(status).setService(service)
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/KafkaHandler.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/KafkaHandler.java
new file mode 100644
index 0000000..2a5590e
--- /dev/null
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/KafkaHandler.java
@@ -0,0 +1,138 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * SDC
+ * ================================================================================
+ * Copyright (C) 2022 Nordix Foundation. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.openecomp.sdc.be.components.kafka;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonSyntaxException;
+import fj.data.Either;
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.http.HttpStatus;
+import org.apache.kafka.common.KafkaException;
+import org.openecomp.sdc.be.components.distribution.engine.CambriaErrorResponse;
+import org.openecomp.sdc.be.components.distribution.engine.INotificationData;
+import org.openecomp.sdc.be.config.BeEcompErrorManager;
+import org.openecomp.sdc.be.config.ConfigurationManager;
+import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
+import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
+import org.openecomp.sdc.common.log.wrappers.Logger;
+import org.springframework.stereotype.Component;
+
+/**
+ * Utility class that provides a handler for Kafka interactions
+ */
+@Component
+public class KafkaHandler {
+
+    private static final Logger log = Logger.getLogger(KafkaHandler.class.getName());
+    private final Gson gson = new Gson();
+
+    private SdcKafkaConsumer sdcKafkaConsumer;
+
+    private SdcKafkaProducer sdcKafkaProducer;
+
+    @Setter
+    private boolean isKafkaActive;
+
+    private DistributionEngineConfiguration deConfiguration;
+
+    public KafkaHandler(SdcKafkaConsumer sdcKafkaConsumer, SdcKafkaProducer sdcKafkaProducer, boolean isKafkaActive) {
+        this.sdcKafkaConsumer = sdcKafkaConsumer;
+        this.sdcKafkaProducer = sdcKafkaProducer;
+        this.isKafkaActive = isKafkaActive;
+    }
+
+    public KafkaHandler() {
+        isKafkaActive = Boolean.parseBoolean(System.getenv().getOrDefault("USE_KAFKA", "false"));
+        deConfiguration = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration();
+    }
+
+    /**
+     * @return a user configuration whether Kafka is active for this client
+     */
+    public Boolean isKafkaActive() {
+        return isKafkaActive;
+    }
+
+    /**
+     * @param topicName The topic from which messages will be fetched
+     * @return Either A list of messages from a specific topic, or a specific error response
+     */
+    public Either<Iterable<String>, CambriaErrorResponse> fetchFromTopic(String topicName) {
+        try {
+            if(sdcKafkaConsumer == null){
+                sdcKafkaConsumer = new SdcKafkaConsumer(deConfiguration);
+            }
+            sdcKafkaConsumer.subscribe(topicName);
+            Iterable<String> messages = sdcKafkaConsumer.poll(topicName);
+            log.info("Returning messages from topic {}", topicName);
+            return Either.left(messages);
+        } catch (KafkaException e) {
+            BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError("fetchFromTopic", e.getMessage());
+            log.error("Failed to fetch from kafka for topic: {}", topicName, e);
+            CambriaErrorResponse cambriaErrorResponse =
+                new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR,
+                    HttpStatus.SC_INTERNAL_SERVER_ERROR);
+            return Either.right(cambriaErrorResponse);
+        }
+    }
+
+    /**
+     * Publish notification message to a given topic and flush
+     *
+     * @param topicName The topic to which the message should be published
+     * @param data      The data to publish to the topic specified
+     * @return CambriaErrorResponse a status response on success or any errors thrown
+     */
+    public CambriaErrorResponse sendNotification(String topicName, INotificationData data) {
+        CambriaErrorResponse response;
+        if(sdcKafkaProducer == null){
+            sdcKafkaProducer = new SdcKafkaProducer(deConfiguration);
+        }
+       try {
+           String json = gson.toJson(data);
+           log.info("Before sending to topic {}", topicName);
+           sdcKafkaProducer.send(json, topicName);
+       }
+       catch(KafkaException e){
+           BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError("sendNotification", e.getMessage());
+           log.error("Failed to send message . Exception {}", e.getMessage());
+
+           return new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 500);
+       } catch (JsonSyntaxException e) {
+           BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError("sendNotification", e.getMessage());
+           log.error("Failed to convert data to json: {}", data, e);
+
+            return new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 500);
+        } finally {
+            try {
+                sdcKafkaProducer.flush();
+                response = new CambriaErrorResponse(CambriaOperationStatus.OK, 200);
+            } catch (KafkaException | IllegalArgumentException e) {
+                BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError("sendNotification", e.getMessage());
+                log.error("Failed to flush sdcKafkaProducer", e);
+
+                response = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 500);
+            }
+        }
+
+        return response;
+    }
+}
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java
new file mode 100644
index 0000000..8879bf0
--- /dev/null
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java
@@ -0,0 +1,117 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * SDC
+ * ================================================================================
+ * Copyright (C) 2022 Nordix Foundation. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.openecomp.sdc.be.components.kafka;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
+import org.openecomp.sdc.common.log.wrappers.Logger;
+
+/**
+ * Utility class that provides a KafkaConsumer to communicate with a kafka cluster
+ */
+public class SdcKafkaConsumer {
+
+    private static final Logger log = Logger.getLogger(SdcKafkaConsumer.class.getName());
+    private final DistributionEngineConfiguration deConfiguration;
+    private KafkaConsumer<String, String> kafkaConsumer;
+
+    /**
+     * Constructor setting up the KafkaConsumer from a predefined set of configurations
+     */
+    public SdcKafkaConsumer(DistributionEngineConfiguration deConfiguration){
+        log.info("Create SdcKafkaConsumer via constructor");
+        Properties properties = new Properties();
+        this.deConfiguration = deConfiguration;
+
+        properties.put(ConsumerConfig.CLIENT_ID_CONFIG, deConfiguration.getDistributionStatusTopic().getConsumerId()+ "-consumer-" + UUID.randomUUID());
+        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,  "org.apache.kafka.common.serialization.StringDeserializer");
+        properties.put(ConsumerConfig.GROUP_ID_CONFIG, deConfiguration.getDistributionStatusTopic().getConsumerGroup());
+        properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, deConfiguration.getKafkaBootStrapServers());
+        properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
+        properties.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false);
+        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
+        properties.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
+
+        properties.put(SaslConfigs.SASL_JAAS_CONFIG, getKafkaSaslJaasConfig());
+        kafkaConsumer = new KafkaConsumer<>(properties);
+    }
+
+    /**
+     *
+     * @param kafkaConsumer a kafkaConsumer to use within the class
+     * @param deConfiguration - Configuration to pass into the class
+     */
+    @VisibleForTesting
+    SdcKafkaConsumer(KafkaConsumer kafkaConsumer, DistributionEngineConfiguration deConfiguration){
+        this.deConfiguration = deConfiguration;
+        this.kafkaConsumer = kafkaConsumer;
+    }
+
+    /**
+     *
+     * @return the Sasl Jass Config
+     */
+    private String getKafkaSaslJaasConfig() {
+        String saslJaasConfFromEnv = System.getenv("SASL_JAAS_CONFIG");
+        if(saslJaasConfFromEnv != null) {
+            return saslJaasConfFromEnv;
+        } else {
+            throw new KafkaException("sasl.jaas.config not set for Kafka Consumer");
+        }
+    }
+
+    /**
+     *
+     * @param topic Topic in which to subscribe
+     */
+    public void subscribe(String topic) throws KafkaException {
+        if (!kafkaConsumer.subscription().contains(topic)) {
+            kafkaConsumer.subscribe(Collections.singleton(topic));
+        }
+    }
+
+    /**
+     *
+     * @return The list of messages for a specified topic, returned from the poll
+     */
+    public List<String> poll(String topicName) throws KafkaException {
+        log.info("SdcKafkaConsumer - polling for messages from Topic: {}", topicName);
+        List<String> msgs = new ArrayList<>();
+        ConsumerRecords<String, String> consumerRecordsForSpecificTopic = kafkaConsumer.poll(Duration.ofSeconds(deConfiguration.getDistributionStatusTopic().getPollingIntervalSec()));
+        for(ConsumerRecord<String, String> rec : consumerRecordsForSpecificTopic){
+            msgs.add(rec.value());
+        }
+        return msgs;
+    }
+}
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducer.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducer.java
new file mode 100644
index 0000000..bdc984d
--- /dev/null
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducer.java
@@ -0,0 +1,98 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * SDC
+ * ================================================================================
+ * Copyright (C) 2022 Nordix Foundation. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.openecomp.sdc.be.components.kafka;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Properties;
+import java.util.UUID;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class that provides a KafkaProducer to communicate with a kafka cluster
+ */
+public class SdcKafkaProducer {
+    private static final Logger log = LoggerFactory.getLogger(SdcKafkaProducer.class.getName());
+
+    private KafkaProducer<String, String> kafkaProducer;
+
+    /**
+     * Constructor setting up the KafkaProducer from a predefined set of configurations
+     */
+    public SdcKafkaProducer(DistributionEngineConfiguration deConfiguration) {
+        log.info("Create SdcKafkaProducer via constructor");
+        Properties properties = new Properties();
+
+        properties.put(ProducerConfig.CLIENT_ID_CONFIG, deConfiguration.getDistributionStatusTopic().getConsumerId() + "-producer-" + UUID.randomUUID());
+        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,  "org.apache.kafka.common.serialization.StringSerializer");
+        properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, deConfiguration.getKafkaBootStrapServers());
+        properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
+        properties.put(SaslConfigs.SASL_JAAS_CONFIG, getKafkaSaslJaasConfig());
+        properties.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
+        kafkaProducer = new KafkaProducer<>(properties);
+    }
+
+    /**
+     *
+     * @param kafkaProducer Setting a KafkaProducer to use within the sdcKafkaProducer class
+     */
+    @VisibleForTesting
+    SdcKafkaProducer(KafkaProducer kafkaProducer) {
+        this.kafkaProducer = kafkaProducer;
+    }
+
+    /**
+     * @return The Sasl Jaas Configuration
+     */
+    private static String getKafkaSaslJaasConfig() throws KafkaException {
+        String saslJaasConfFromEnv = System.getenv("SASL_JAAS_CONFIG");
+        if(saslJaasConfFromEnv != null) {
+            return saslJaasConfFromEnv;
+        } else {
+            throw new KafkaException("sasl.jaas.config not set for Kafka Consumer");
+        }
+    }
+
+    /**
+     * @param message A message to Send
+     * @param topicName The name of the topic to publish to
+     * @return The status of the send request
+     */
+    public void send(String message, String topicName) throws KafkaException {
+        ProducerRecord<String, String> kafkaMessagePayload = new ProducerRecord<>(topicName, "PartitionKey", message);
+        kafkaProducer.send(kafkaMessagePayload);
+    }
+
+    /**
+     * Kafka FLush operation
+     */
+    public void flush() throws KafkaException {
+        log.info("SdcKafkaProducer - flush");
+        kafkaProducer.flush();
+    }
+}
diff --git a/catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandlerTest.java b/catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandlerTest.java
index 980bb83..a91b246 100644
--- a/catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandlerTest.java
+++ b/catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandlerTest.java
@@ -7,9 +7,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,6 +20,11 @@
 
 package org.openecomp.sdc.be.components.distribution.engine;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+
 import com.att.nsa.apiClient.credentials.ApiCredential;
 import com.att.nsa.apiClient.http.HttpException;
 import com.att.nsa.cambria.client.CambriaClient;
@@ -29,6 +34,14 @@
 import com.att.nsa.cambria.client.CambriaConsumer;
 import com.att.nsa.cambria.client.CambriaIdentityManager;
 import fj.data.Either;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.security.GeneralSecurityException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
 import mockit.Deencapsulation;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -45,20 +58,6 @@
 import org.openecomp.sdc.common.impl.ExternalConfiguration;
 import org.openecomp.sdc.common.impl.FSConfigurationSource;
 
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.security.GeneralSecurityException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doReturn;
-
 @RunWith(MockitoJUnitRunner.class)
 public class CambriaHandlerTest extends BeConfDependentTest {
 
@@ -141,6 +140,7 @@
 
 	@Test
 	public void testGetTopics() throws Exception {
+
 		CambriaHandler testSubject;
 		List<String> hostSet = new LinkedList<>();
 		hostSet.add("mock");
diff --git a/catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTaskTest.java b/catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTaskTest.java
index d53476d..9c1af39 100644
--- a/catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTaskTest.java
+++ b/catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTaskTest.java
@@ -7,9 +7,9 @@
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -25,6 +25,7 @@
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
+import org.openecomp.sdc.be.components.kafka.KafkaHandler;
 import org.openecomp.sdc.be.config.ConfigurationManager;
 import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
 import org.openecomp.sdc.be.config.DistributionEngineConfiguration.CreateTopicConfig;
@@ -54,6 +55,8 @@
 
     private CambriaHandler cambriaHandler;
 
+    private KafkaHandler kafkaHandler;
+
     @BeforeEach
     public void setup() {
         ExternalConfiguration.setAppName("catalog-be");
@@ -65,6 +68,7 @@
 
         componentsUtils = Mockito.mock(ComponentsUtils.class);
         cambriaHandler = Mockito.mock(CambriaHandler.class);
+        kafkaHandler = Mockito.mock(KafkaHandler.class);
     }
 
     @Test
@@ -88,7 +92,7 @@
         assertEquals("check next retry interval reach max retry interval", initTask.getCurrentRetryInterval(), maxRetry);
 
     }
-    
+
     @Test
     void checkStartTask() {
 
@@ -100,10 +104,10 @@
         deConfiguration.setInitRetryIntervalSec(retry);
         deConfiguration.setInitMaxIntervalSec(maxRetry);
         DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
-        
+
         initTask.startTask();
     }
-    
+
     @Test
     void checkRestartTask() {
 
@@ -115,10 +119,10 @@
         deConfiguration.setInitRetryIntervalSec(retry);
         deConfiguration.setInitMaxIntervalSec(maxRetry);
         DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
-        
+
         initTask.restartTask();
     }
-    
+
     @Test
     void checkStopTask() {
 
@@ -130,12 +134,12 @@
         deConfiguration.setInitRetryIntervalSec(retry);
         deConfiguration.setInitMaxIntervalSec(maxRetry);
         DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
-        
+
         initTask.stopTask();
         initTask.startTask();
         initTask.stopTask();
     }
-    
+
     @Test
     void checkDestroy() {
 
@@ -147,10 +151,10 @@
         deConfiguration.setInitRetryIntervalSec(retry);
         deConfiguration.setInitMaxIntervalSec(maxRetry);
         DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
-        
+
         initTask.destroy();
     }
-    
+
     @Test
     void checkRun() {
 
@@ -193,10 +197,10 @@
         initTask.setCambriaHandler(cambriaHandler);
 
         boolean initFlow = initTask.initFlow();
-        
+
         initTask.run();
     }
-    
+
     @Test
     void testInitFlowScenarioSuccess() {
 
@@ -244,6 +248,20 @@
     }
 
     @Test
+    void testInitFlowSuccessKafkaEnabled(){
+        DistributionEngineConfiguration config = new DistributionEngineConfiguration();
+        config.setInitRetryIntervalSec(1);
+        config.setInitMaxIntervalSec(1);
+
+        when(kafkaHandler.isKafkaActive()).thenReturn(true);
+        DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, config, null, new AtomicBoolean(false), componentsUtils, null, null);
+        initTask.setKafkaHandler(kafkaHandler);
+
+        boolean initFlow = initTask.initFlow();
+        assertTrue("check init flow succeed", initFlow);
+    }
+
+    @Test
     void testInitFlowScenarioSuccessTopicsAlreadyExists() {
 
         String envName = "PrOD";
diff --git a/catalog-be/src/test/java/org/openecomp/sdc/be/components/kafka/KafkaHandlerTest.java b/catalog-be/src/test/java/org/openecomp/sdc/be/components/kafka/KafkaHandlerTest.java
new file mode 100644
index 0000000..91ee023
--- /dev/null
+++ b/catalog-be/src/test/java/org/openecomp/sdc/be/components/kafka/KafkaHandlerTest.java
@@ -0,0 +1,138 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * SDC
+ * ================================================================================
+ * Copyright (C) 2022 Nordix Foundation. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.openecomp.sdc.be.components.kafka;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.when;
+
+import com.google.gson.JsonSyntaxException;
+import org.apache.kafka.common.KafkaException;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+
+import java.util.ArrayList;
+import fj.data.Either;
+import java.util.List;
+
+import org.openecomp.sdc.be.components.distribution.engine.CambriaErrorResponse;
+import org.openecomp.sdc.be.components.distribution.engine.NotificationDataImpl;
+import org.openecomp.sdc.be.components.distribution.engine.INotificationData;
+import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
+
+
+@ExtendWith(MockitoExtension.class)
+public class KafkaHandlerTest {
+
+    @Mock
+    private SdcKafkaConsumer mockSdcKafkaConsumer;
+
+    @Mock
+    private SdcKafkaProducer mockSdcKafkaProducer;
+
+    private KafkaHandler kafkaHandler;
+
+    @Test
+    public void testIsKafkaActiveTrue(){
+        KafkaHandler kafkaHandler = new KafkaHandler(mockSdcKafkaConsumer, mockSdcKafkaProducer, true);
+        assertTrue(kafkaHandler.isKafkaActive());
+    }
+
+    @Test
+    public void testIsKafkaActiveFalse(){
+        KafkaHandler kafkaHandler = new KafkaHandler(mockSdcKafkaConsumer, mockSdcKafkaProducer, true);
+        kafkaHandler.setKafkaActive(false);
+        assertFalse(kafkaHandler.isKafkaActive());
+    }
+
+    @Test
+    public void testFetchFromTopicSuccess(){
+        String testTopic = "testTopic";
+        List<String> mockedReturnedMessages = new ArrayList<>();
+        mockedReturnedMessages.add("message1");
+        mockedReturnedMessages.add("message2");
+        KafkaHandler kafkaHandler = new KafkaHandler(mockSdcKafkaConsumer, mockSdcKafkaProducer, true);
+        when(mockSdcKafkaConsumer.poll(any())).thenReturn(mockedReturnedMessages);
+        Either<Iterable<String>, CambriaErrorResponse> response = kafkaHandler.fetchFromTopic(testTopic);
+        Iterable<String> actualReturnedMessages = response.left().value();
+        assertTrue(response.isLeft());
+        assertEquals(actualReturnedMessages, mockedReturnedMessages);
+    }
+
+    @Test
+    public void testFetchFromTopicFail(){
+        String testTopic = "testTopic";
+        KafkaHandler kafkaHandler = new KafkaHandler(mockSdcKafkaConsumer, mockSdcKafkaProducer, true);
+        when(mockSdcKafkaConsumer.poll(any())).thenThrow(new KafkaException());
+        Either<Iterable<String>, CambriaErrorResponse> response = kafkaHandler.fetchFromTopic(testTopic);
+        CambriaErrorResponse responseValue = response.right().value();
+        assertTrue(response.isRight());
+        assertEquals(responseValue.getOperationStatus(), CambriaOperationStatus.INTERNAL_SERVER_ERROR);
+    }
+
+    @Test
+    public void testSendNotificationSuccess(){
+        String testTopic = "testTopic";
+        KafkaHandler kafkaHandler = new KafkaHandler(mockSdcKafkaConsumer, mockSdcKafkaProducer, true);
+        INotificationData testData = new NotificationDataImpl();
+        CambriaErrorResponse response = kafkaHandler.sendNotification(testTopic, testData);
+        assertEquals(response.getOperationStatus(), CambriaOperationStatus.OK);
+        assertEquals(response.getHttpCode(), 200);
+    }
+
+    @Test
+    public void testSendNotificationKafkaException(){
+        String testTopic = "testTopic";
+        KafkaHandler kafkaHandler = new KafkaHandler(mockSdcKafkaConsumer, mockSdcKafkaProducer, true);
+        INotificationData testData = new NotificationDataImpl();
+        doThrow(KafkaException.class).when(mockSdcKafkaProducer).send(any(), any());
+        CambriaErrorResponse response = kafkaHandler.sendNotification(testTopic, testData);
+        assertEquals(response.getOperationStatus(), CambriaOperationStatus.INTERNAL_SERVER_ERROR);
+        assertEquals(response.getHttpCode(), 500);
+    }
+
+    @Test
+    public void testSendNotificationJsonSyntaxException(){
+        String testTopic = "testTopic";
+        KafkaHandler kafkaHandler = new KafkaHandler(mockSdcKafkaConsumer, mockSdcKafkaProducer, true);
+        INotificationData testData = new NotificationDataImpl();
+        doThrow(JsonSyntaxException.class).when(mockSdcKafkaProducer).send(any(), any());
+        CambriaErrorResponse response = kafkaHandler.sendNotification(testTopic, testData);
+        assertEquals(response.getOperationStatus(), CambriaOperationStatus.INTERNAL_SERVER_ERROR);
+        assertEquals(response.getHttpCode(), 500);
+    }
+
+    @Test
+    public void testSendNotificationFlushException(){
+        String testTopic = "testTopic";
+        KafkaHandler kafkaHandler = new KafkaHandler(mockSdcKafkaConsumer, mockSdcKafkaProducer, true);
+        INotificationData testData = new NotificationDataImpl();
+        doThrow(KafkaException.class).when(mockSdcKafkaProducer).flush();
+        CambriaErrorResponse response = kafkaHandler.sendNotification(testTopic, testData);
+        assertEquals(response.getOperationStatus(), CambriaOperationStatus.INTERNAL_SERVER_ERROR);
+        assertEquals(response.getHttpCode(), 500);
+    }
+}
diff --git a/catalog-be/src/test/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumerTest.java b/catalog-be/src/test/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumerTest.java
new file mode 100644
index 0000000..0a4a834
--- /dev/null
+++ b/catalog-be/src/test/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumerTest.java
@@ -0,0 +1,143 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * SDC
+ * ================================================================================
+ * Copyright (C) 2022 Nordix Foundation. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.openecomp.sdc.be.components.kafka;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.kafka.common.KafkaException;
+import org.junit.jupiter.api.Test;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.when;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import java.util.Collections;
+import java.util.Collection;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.jetbrains.annotations.NotNull;
+
+import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
+
+public class SdcKafkaConsumerTest {
+
+    @Test
+    public void TestSubscribeSuccess(){
+        KafkaConsumer<byte[], byte[]> mockKafkaConsumer = Mockito.mock(KafkaConsumer.class);
+        SdcKafkaConsumer sdcKafkaConsumer = new SdcKafkaConsumer(mockKafkaConsumer, null);
+        ArgumentCaptor<Collections> captor = ArgumentCaptor.forClass(Collections.class);
+
+        String testTopics = "testTopic";
+        sdcKafkaConsumer.subscribe(testTopics);
+        verify(mockKafkaConsumer).subscribe((Collection<String>) captor.capture());
+    }
+
+    @Test
+    public void TestSubscribeAlreadySubscribed(){
+        KafkaConsumer<byte[], byte[]> mockKafkaConsumer = Mockito.mock(KafkaConsumer.class);
+        SdcKafkaConsumer sdcKafkaConsumer = new SdcKafkaConsumer(mockKafkaConsumer, null);
+        ArgumentCaptor<Collections> captor = ArgumentCaptor.forClass(Collections.class);
+
+
+        String testTopics = "testTopic";
+        Set<String> currentSubs = new HashSet<String>();
+        currentSubs.add(testTopics);
+        when(mockKafkaConsumer.subscription()).thenReturn(currentSubs);
+        sdcKafkaConsumer.subscribe(testTopics);
+        verify(mockKafkaConsumer, never()).subscribe((Collection<String>) captor.capture());
+    }
+
+    @Test
+    public void TestPollForMessagesForSpecificTopicSuccess(){
+        KafkaConsumer<byte[], byte[]> mockKafkaConsumer = Mockito.mock(KafkaConsumer.class);
+
+
+        String testTopic = "testTopic";
+
+        ConsumerRecords mockedPollResult = getTestConsumerRecords(testTopic);
+
+        when(mockKafkaConsumer.poll(any())).thenReturn(mockedPollResult);
+
+        DistributionEngineConfiguration config = getMockDistributionEngineConfiguration();
+
+        SdcKafkaConsumer sdcKafkaConsumer = new SdcKafkaConsumer(mockKafkaConsumer, config);
+
+        List<String> returned = sdcKafkaConsumer.poll(testTopic);
+        assertTrue(returned.size()==1);
+        assertTrue(returned.contains("testTopicValue"));
+    }
+
+    @Test
+    public void testSaslJaasConfigNotFound(){
+        assertThrows(
+            KafkaException.class,
+            () ->  new SdcKafkaConsumer(setTestDistributionEngineConfigs()),
+            "Sasl Jaas Config should not be found, so expected a KafkaException"
+        );
+    }
+
+    @NotNull
+    private DistributionEngineConfiguration getMockDistributionEngineConfiguration() {
+        DistributionEngineConfiguration config = new DistributionEngineConfiguration();
+        DistributionEngineConfiguration.DistributionStatusTopicConfig mockStatusTopic = new DistributionEngineConfiguration.DistributionStatusTopicConfig();
+        mockStatusTopic.setPollingIntervalSec(1);
+        config.setDistributionStatusTopic(mockStatusTopic);
+        return config;
+    }
+
+    @NotNull
+    private ConsumerRecords getTestConsumerRecords(String testTopics) {
+        Map map = new HashMap<Integer, ConsumerRecord>();
+
+        ConsumerRecord consumerRecord = new ConsumerRecord(testTopics, 0, 0, "", "testTopicValue");
+
+        List<ConsumerRecord> consumerRecordList = new ArrayList<>();
+        consumerRecordList.add(consumerRecord);
+        TopicPartition topicPartition = new TopicPartition(testTopics, 0);
+        map.put(topicPartition, consumerRecordList);
+
+        ConsumerRecords mockedPollResult = new ConsumerRecords(map);
+        return mockedPollResult;
+    }
+
+    private DistributionEngineConfiguration setTestDistributionEngineConfigs(){
+        DistributionEngineConfiguration.DistributionStatusTopicConfig dsTopic = new DistributionEngineConfiguration.DistributionStatusTopicConfig();
+        DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
+        String testBootstrapServers = "TestBootstrapServer";
+        dsTopic.setConsumerGroup("consumerGroup");
+        dsTopic.setConsumerId("consumerId");
+
+        deConfiguration.setKafkaBootStrapServers(testBootstrapServers);
+        deConfiguration.setDistributionStatusTopic(dsTopic);
+        return deConfiguration;
+    }
+}
diff --git a/catalog-be/src/test/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducerTest.java b/catalog-be/src/test/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducerTest.java
new file mode 100644
index 0000000..23322cc
--- /dev/null
+++ b/catalog-be/src/test/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducerTest.java
@@ -0,0 +1,94 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * SDC
+ * ================================================================================
+ * Copyright (C) 2022 Nordix Foundation. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.openecomp.sdc.be.components.kafka;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import org.junit.jupiter.api.Test;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.KafkaException;
+
+import org.openecomp.sdc.be.catalog.api.IStatus;
+import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
+
+public class SdcKafkaProducerTest {
+
+    @Test
+    public void TestSendSuccess(){
+        KafkaProducer<byte[], byte[]> mockKafkaProducer = Mockito.mock(KafkaProducer.class);
+        SdcKafkaProducer sdcKafkaProducer = new SdcKafkaProducer(mockKafkaProducer);
+        ArgumentCaptor<ProducerRecord> captor = ArgumentCaptor.forClass(ProducerRecord.class);
+        sdcKafkaProducer.send("testMessage", "testTopic");
+
+
+        verify(mockKafkaProducer).send(captor.capture());
+    }
+
+    @Test
+    public void testFlushSuccess(){
+        KafkaProducer<byte[], byte[]> mockKafkaProducer = Mockito.mock(KafkaProducer.class);
+        SdcKafkaProducer sdcKafkaProducer = new SdcKafkaProducer(mockKafkaProducer);
+        sdcKafkaProducer.flush();
+
+        verify(mockKafkaProducer).flush();
+    }
+
+    @Test
+    public void testSendFail(){
+        KafkaProducer<byte[], byte[]> mockKafkaProducer = Mockito.mock(KafkaProducer.class);
+        SdcKafkaProducer sdcKafkaProducer = new SdcKafkaProducer(mockKafkaProducer);
+
+        when(mockKafkaProducer.send(any())).thenThrow(new KafkaException());
+
+        assertThrows(
+            KafkaException.class,
+            () ->   sdcKafkaProducer.send("testMessage", "testTopic"),
+            "Expected a KafkaException thrown on KafkaProducer Send");
+    }
+
+    @Test
+    public void testSaslJaasConfigNotFound(){
+        assertThrows(
+            KafkaException.class,
+            () ->  new SdcKafkaProducer(setTestDistributionEngineConfigs()),
+            "Sasl Jaas Config should not be found, so expected a KafkaException"
+        );
+    }
+
+    private DistributionEngineConfiguration setTestDistributionEngineConfigs(){
+        DistributionEngineConfiguration.DistributionStatusTopicConfig dStatusTopicConfig = new DistributionEngineConfiguration.DistributionStatusTopicConfig();
+        DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
+        deConfiguration.setKafkaBootStrapServers("TestBootstrapServer");
+        dStatusTopicConfig.setConsumerId("consumerId");
+
+        deConfiguration.setDistributionStatusTopic(dStatusTopicConfig);
+        deConfiguration.getDistributionStatusTopic().getConsumerId();
+        return deConfiguration;
+    }
+}
diff --git a/pom.xml b/pom.xml
index d75aec0..ce9b2bd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -208,6 +208,11 @@
     <dependencyManagement>
         <dependencies>
             <dependency>
+                <groupId>org.apache.kafka</groupId>
+                <artifactId>kafka-clients</artifactId>
+                <version>3.3.1</version>
+            </dependency>
+            <dependency>
                 <groupId>com.google.guava</groupId>
                 <artifactId>guava</artifactId>
                 <version>${guava.version}</version>