Add kafka jobs to DMaaP Mediator Producer

Issue-ID: NONRTRIC-702
Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
Change-Id: Ia7123240ad91449209afc4769b850cb0c8d9598f
diff --git a/dmaap-mediator-producer/configs/typeSchemaDmaap.json b/dmaap-mediator-producer/configs/typeSchemaDmaap.json
new file mode 100644
index 0000000..a50b236
--- /dev/null
+++ b/dmaap-mediator-producer/configs/typeSchemaDmaap.json
@@ -0,0 +1,10 @@
+{
+  "$schema": "http://json-schema.org/draft-04/schema#",
+  "type": "object",
+  "properties": {
+    "filter": {
+       "type": "string"
+     }
+  },
+  "additionalProperties": false
+}
diff --git a/dmaap-mediator-producer/configs/typeSchemaKafka.json b/dmaap-mediator-producer/configs/typeSchemaKafka.json
new file mode 100644
index 0000000..dcd40f9
--- /dev/null
+++ b/dmaap-mediator-producer/configs/typeSchemaKafka.json
@@ -0,0 +1,26 @@
+{
+  "$schema": "http://json-schema.org/draft-04/schema#",
+  "type": "object",
+  "properties": {
+    "filter": {
+      "type": "string"
+    },
+    "bufferTimeout": {
+      "type": "object",
+      "properties": {
+        "maxSize": {
+          "type": "integer"
+        },
+        "maxTimeMiliseconds": {
+          "type": "integer"
+        }
+      },
+      "additionalProperties": false,
+      "required": [
+        "maxSize",
+        "maxTimeMiliseconds"
+      ]
+    }
+  },
+  "additionalProperties": false
+}
\ No newline at end of file
diff --git a/dmaap-mediator-producer/configs/type_config.json b/dmaap-mediator-producer/configs/type_config.json
index f75d0e4..1149669 100644
--- a/dmaap-mediator-producer/configs/type_config.json
+++ b/dmaap-mediator-producer/configs/type_config.json
@@ -4,6 +4,10 @@
       {
         "id": "STD_Fault_Messages",
         "dmaapTopicUrl": "/events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/STD_Fault_Messages"
+      },
+      {
+        "id": "Kafka_TestTopic",
+        "kafkaInputTopic": "TestTopic"
       }
   ]
 }
\ No newline at end of file
diff --git a/dmaap-mediator-producer/go.mod b/dmaap-mediator-producer/go.mod
index eaaecf7..e701c01 100644
--- a/dmaap-mediator-producer/go.mod
+++ b/dmaap-mediator-producer/go.mod
@@ -10,6 +10,7 @@
 )
 
 require (
+	github.com/confluentinc/confluent-kafka-go v1.8.2 // indirect
 	github.com/davecgh/go-spew v1.1.1 // indirect
 	github.com/hashicorp/go-cleanhttp v0.5.1 // indirect
 	github.com/pmezard/go-difflib v1.0.0 // indirect
diff --git a/dmaap-mediator-producer/go.sum b/dmaap-mediator-producer/go.sum
index 4b3557b..cf2c90f 100644
--- a/dmaap-mediator-producer/go.sum
+++ b/dmaap-mediator-producer/go.sum
@@ -1,3 +1,5 @@
+github.com/confluentinc/confluent-kafka-go v1.8.2 h1:PBdbvYpyOdFLehj8j+9ba7FL4c4Moxn79gy9cYKxG5E=
+github.com/confluentinc/confluent-kafka-go v1.8.2/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg=
 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
diff --git a/dmaap-mediator-producer/internal/config/config.go b/dmaap-mediator-producer/internal/config/config.go
index e03c40a..7582e9c 100644
--- a/dmaap-mediator-producer/internal/config/config.go
+++ b/dmaap-mediator-producer/internal/config/config.go
@@ -24,6 +24,7 @@
 	"encoding/json"
 	"fmt"
 	"os"
+	"path/filepath"
 	"strconv"
 
 	log "github.com/sirupsen/logrus"
@@ -35,6 +36,7 @@
 	InfoProducerPort       int
 	InfoCoordinatorAddress string
 	DMaaPMRAddress         string
+	KafkaBootstrapServers  string
 	ProducerCertPath       string
 	ProducerKeyPath        string
 }
@@ -45,6 +47,7 @@
 		InfoProducerPort:       getEnvAsInt("INFO_PRODUCER_PORT", 8085),
 		InfoCoordinatorAddress: getEnv("INFO_COORD_ADDR", "https://informationservice:8434"),
 		DMaaPMRAddress:         getEnv("DMAAP_MR_ADDR", "https://message-router.onap:3905"),
+		KafkaBootstrapServers:  getEnv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092"),
 		ProducerCertPath:       getEnv("PRODUCER_CERT_PATH", "security/producer.crt"),
 		ProducerKeyPath:        getEnv("PRODUCER_KEY_PATH", "security/producer.key"),
 		LogLevel:               getLogLevel(),
@@ -83,8 +86,8 @@
 	}
 }
 
-func GetJobTypesFromConfiguration(configFile string) ([]TypeDefinition, error) {
-	typeDefsByte, err := os.ReadFile(configFile)
+func GetJobTypesFromConfiguration(configFolder string) ([]TypeDefinition, error) {
+	typeDefsByte, err := os.ReadFile(filepath.Join(configFolder, "type_config.json"))
 	if err != nil {
 		return nil, err
 	}
@@ -96,5 +99,35 @@
 		return nil, err
 	}
 
+	kafkaTypeSchema, err := getTypeSchema(filepath.Join(configFolder, "typeSchemaKafka.json"))
+	if err != nil {
+		return nil, err
+	}
+
+	dMaaPTypeSchema, err := getTypeSchema(filepath.Join(configFolder, "typeSchemaDmaap.json"))
+	if err != nil {
+		return nil, err
+	}
+
+	for i, typeDef := range typeDefs.Types {
+		if typeDef.IsKafkaType() {
+			typeDefs.Types[i].TypeSchema = kafkaTypeSchema
+		} else {
+			typeDefs.Types[i].TypeSchema = dMaaPTypeSchema
+		}
+	}
 	return typeDefs.Types, nil
 }
+
+func getTypeSchema(schemaFile string) (interface{}, error) {
+	typeDefsByte, err := os.ReadFile(schemaFile)
+	if err != nil {
+		return nil, err
+	}
+	var schema interface{}
+	err = json.Unmarshal(typeDefsByte, &schema)
+	if err != nil {
+		return nil, err
+	}
+	return schema, nil
+}
diff --git a/dmaap-mediator-producer/internal/config/config_test.go b/dmaap-mediator-producer/internal/config/config_test.go
index faf5900..e66a818 100644
--- a/dmaap-mediator-producer/internal/config/config_test.go
+++ b/dmaap-mediator-producer/internal/config/config_test.go
@@ -22,6 +22,7 @@
 
 import (
 	"bytes"
+	"encoding/json"
 	"os"
 	"path/filepath"
 	"testing"
@@ -37,6 +38,7 @@
 	os.Setenv("INFO_PRODUCER_PORT", "8095")
 	os.Setenv("INFO_COORD_ADDR", "infoCoordAddr")
 	os.Setenv("DMAAP_MR_ADDR", "mrHost:3908")
+	os.Setenv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9093")
 	os.Setenv("PRODUCER_CERT_PATH", "cert")
 	os.Setenv("PRODUCER_KEY_PATH", "key")
 	t.Cleanup(func() {
@@ -48,6 +50,7 @@
 		InfoProducerPort:       8095,
 		InfoCoordinatorAddress: "infoCoordAddr",
 		DMaaPMRAddress:         "mrHost:3908",
+		KafkaBootstrapServers:  "localhost:9093",
 		ProducerCertPath:       "cert",
 		ProducerKeyPath:        "key",
 	}
@@ -72,6 +75,7 @@
 		InfoProducerPort:       8085,
 		InfoCoordinatorAddress: "https://informationservice:8434",
 		DMaaPMRAddress:         "https://message-router.onap:3905",
+		KafkaBootstrapServers:  "localhost:9092",
 		ProducerCertPath:       "security/producer.crt",
 		ProducerKeyPath:        "security/producer.key",
 	}
@@ -98,6 +102,7 @@
 		InfoProducerPort:       8085,
 		InfoCoordinatorAddress: "https://informationservice:8434",
 		DMaaPMRAddress:         "https://message-router.onap:3905",
+		KafkaBootstrapServers:  "localhost:9092",
 		ProducerCertPath:       "security/producer.crt",
 		ProducerKeyPath:        "security/producer.key",
 	}
@@ -109,7 +114,17 @@
 	assertions.Contains(logString, "Invalid log level: wrong. Log level will be Info!")
 }
 
-const typeDefinition = `{"types": [{"id": "type1", "dmaapTopicUrl": "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1"}]}`
+const typeDefinition = `{"types": [{"id": "type1", "dmaapTopicUrl": "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1"}, {"id": "type2", "kafkaInputTopic": "TestTopic"}]}`
+const typeSchemaFileContent = `{
+	"$schema": "http://json-schema.org/draft-04/schema#",
+	"type": "object",
+	"properties": {
+	  "filter": {
+		 "type": "string"
+	   }
+	},
+	"additionalProperties": false
+  }`
 
 func TestGetTypesFromConfiguration_fileOkShouldReturnSliceOfTypeDefinitions(t *testing.T) {
 	assertions := require.New(t)
@@ -124,14 +139,30 @@
 	if err = os.WriteFile(fname, []byte(typeDefinition), 0666); err != nil {
 		t.Errorf("Unable to create temporary config file for types due to: %v", err)
 	}
-
-	types, err := GetJobTypesFromConfiguration(fname)
-
-	wantedType := TypeDefinition{
-		Id:            "type1",
-		DmaapTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1",
+	fname = filepath.Join(typesDir, "typeSchemaDmaap.json")
+	if err = os.WriteFile(fname, []byte(typeSchemaFileContent), 0666); err != nil {
+		t.Errorf("Unable to create temporary schema file for DMaaP type due to: %v", err)
 	}
-	wantedTypes := []TypeDefinition{wantedType}
+	fname = filepath.Join(typesDir, "typeSchemaKafka.json")
+	if err = os.WriteFile(fname, []byte(typeSchemaFileContent), 0666); err != nil {
+		t.Errorf("Unable to create temporary schema file for Kafka type due to: %v", err)
+	}
+	var typeSchemaObj interface{}
+	json.Unmarshal([]byte(typeSchemaFileContent), &typeSchemaObj)
+
+	types, err := GetJobTypesFromConfiguration(typesDir)
+
+	wantedDMaaPType := TypeDefinition{
+		Identity:      "type1",
+		DMaaPTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1",
+		TypeSchema:    typeSchemaObj,
+	}
+	wantedKafkaType := TypeDefinition{
+		Identity:        "type2",
+		KafkaInputTopic: "TestTopic",
+		TypeSchema:      typeSchemaObj,
+	}
+	wantedTypes := []TypeDefinition{wantedDMaaPType, wantedKafkaType}
 	assertions.EqualValues(wantedTypes, types)
 	assertions.Nil(err)
 }
diff --git a/dmaap-mediator-producer/internal/config/registrator.go b/dmaap-mediator-producer/internal/config/registrator.go
index 83ed43f..bac14e6 100644
--- a/dmaap-mediator-producer/internal/config/registrator.go
+++ b/dmaap-mediator-producer/internal/config/registrator.go
@@ -32,11 +32,20 @@
 
 const registerTypePath = "/data-producer/v1/info-types/"
 const registerProducerPath = "/data-producer/v1/info-producers/"
-const typeSchema = `{"type": "object","properties": {},"additionalProperties": false}`
 
 type TypeDefinition struct {
-	Id            string `json:"id"`
-	DmaapTopicURL string `json:"dmaapTopicUrl"`
+	Identity        string `json:"id"`
+	DMaaPTopicURL   string `json:"dmaapTopicUrl"`
+	KafkaInputTopic string `json:"kafkaInputTopic"`
+	TypeSchema      interface{}
+}
+
+func (td TypeDefinition) IsKafkaType() bool {
+	return td.KafkaInputTopic != ""
+}
+
+func (td TypeDefinition) IsDMaaPType() bool {
+	return td.DMaaPTopicURL != ""
 }
 
 type ProducerRegistrationInfo struct {
@@ -64,8 +73,8 @@
 
 func (r RegistratorImpl) RegisterTypes(jobTypes []TypeDefinition) error {
 	for _, jobType := range jobTypes {
-		body := fmt.Sprintf(`{"info_job_data_schema": %v}`, typeSchema)
-		if error := restclient.Put(r.infoCoordinatorAddress+registerTypePath+url.PathEscape(jobType.Id), []byte(body), r.httpClient); error != nil {
+		body := fmt.Sprintf(`{"info_job_data_schema": %v}`, jobType.TypeSchema)
+		if error := restclient.Put(r.infoCoordinatorAddress+registerTypePath+url.PathEscape(jobType.Identity), []byte(body), r.httpClient); error != nil {
 			return error
 		}
 		log.Debugf("Registered type: %v", jobType)
diff --git a/dmaap-mediator-producer/internal/config/registrator_test.go b/dmaap-mediator-producer/internal/config/registrator_test.go
index 324aed0..d1b61d8 100644
--- a/dmaap-mediator-producer/internal/config/registrator_test.go
+++ b/dmaap-mediator-producer/internal/config/registrator_test.go
@@ -40,7 +40,8 @@
 	}, nil)
 
 	type1 := TypeDefinition{
-		Id: "Type1",
+		Identity:   "Type1",
+		TypeSchema: `{"type": "object","properties": {},"additionalProperties": false}`,
 	}
 	types := []TypeDefinition{type1}
 
diff --git a/dmaap-mediator-producer/internal/jobs/jobs.go b/dmaap-mediator-producer/internal/jobs/jobs.go
index 867894f..3ef5ca3 100644
--- a/dmaap-mediator-producer/internal/jobs/jobs.go
+++ b/dmaap-mediator-producer/internal/jobs/jobs.go
@@ -25,24 +25,25 @@
 	"sync"
 	"time"
 
+	"github.com/confluentinc/confluent-kafka-go/kafka"
 	log "github.com/sirupsen/logrus"
 	"oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
+	"oransc.org/nonrtric/dmaapmediatorproducer/internal/kafkaclient"
 	"oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
 )
 
 type TypeData struct {
-	TypeId        string `json:"id"`
-	DMaaPTopicURL string `json:"dmaapTopicUrl"`
-	jobsHandler   *jobsHandler
+	Identity    string `json:"id"`
+	jobsHandler *jobsHandler
 }
 
 type JobInfo struct {
-	Owner            string      `json:"owner"`
-	LastUpdated      string      `json:"last_updated"`
-	InfoJobIdentity  string      `json:"info_job_identity"`
-	TargetUri        string      `json:"target_uri"`
-	InfoJobData      interface{} `json:"info_job_data"`
-	InfoTypeIdentity string      `json:"info_type_identity"`
+	Owner            string     `json:"owner"`
+	LastUpdated      string     `json:"last_updated"`
+	InfoJobIdentity  string     `json:"info_job_identity"`
+	TargetUri        string     `json:"target_uri"`
+	InfoJobData      Parameters `json:"info_job_data"`
+	InfoTypeIdentity string     `json:"info_type_identity"`
 }
 
 type JobTypesManager interface {
@@ -59,14 +60,16 @@
 	allTypes         map[string]TypeData
 	pollClient       restclient.HTTPClient
 	mrAddress        string
+	kafkaFactory     kafkaclient.KafkaFactory
 	distributeClient restclient.HTTPClient
 }
 
-func NewJobsManagerImpl(pollClient restclient.HTTPClient, mrAddr string, distributeClient restclient.HTTPClient) *JobsManagerImpl {
+func NewJobsManagerImpl(pollClient restclient.HTTPClient, mrAddr string, kafkaFactory kafkaclient.KafkaFactory, distributeClient restclient.HTTPClient) *JobsManagerImpl {
 	return &JobsManagerImpl{
 		allTypes:         make(map[string]TypeData),
 		pollClient:       pollClient,
 		mrAddress:        mrAddr,
+		kafkaFactory:     kafkaFactory,
 		distributeClient: distributeClient,
 	}
 }
@@ -84,7 +87,7 @@
 
 func (jm *JobsManagerImpl) DeleteJobFromRESTCall(jobId string) {
 	for _, typeData := range jm.allTypes {
-		log.Debugf("Deleting job %v from type %v", jobId, typeData.TypeId)
+		log.Debugf("Deleting job %v from type %v", jobId, typeData.Identity)
 		typeData.jobsHandler.deleteJobCh <- jobId
 	}
 	log.Debug("Deleted job: ", jobId)
@@ -106,10 +109,12 @@
 
 func (jm *JobsManagerImpl) LoadTypesFromConfiguration(types []config.TypeDefinition) []config.TypeDefinition {
 	for _, typeDef := range types {
-		jm.allTypes[typeDef.Id] = TypeData{
-			TypeId:        typeDef.Id,
-			DMaaPTopicURL: typeDef.DmaapTopicURL,
-			jobsHandler:   newJobsHandler(typeDef.Id, typeDef.DmaapTopicURL, jm.pollClient, jm.distributeClient),
+		if typeDef.DMaaPTopicURL == "" && typeDef.KafkaInputTopic == "" {
+			log.Fatal("DMaaPTopicURL or KafkaInputTopic must be defined for type: ", typeDef.Identity)
+		}
+		jm.allTypes[typeDef.Identity] = TypeData{
+			Identity:    typeDef.Identity,
+			jobsHandler: newJobsHandler(typeDef, jm.mrAddress, jm.kafkaFactory, jm.pollClient, jm.distributeClient),
 		}
 	}
 	return types
@@ -126,7 +131,7 @@
 func (jm *JobsManagerImpl) StartJobsForAllTypes() {
 	for _, jobType := range jm.allTypes {
 
-		go jobType.jobsHandler.startPollingAndDistribution(jm.mrAddress)
+		go jobType.jobsHandler.startPollingAndDistribution()
 
 	}
 }
@@ -134,30 +139,29 @@
 type jobsHandler struct {
 	mu               sync.Mutex
 	typeId           string
-	topicUrl         string
+	pollingAgent     pollingAgent
 	jobs             map[string]job
 	addJobCh         chan JobInfo
 	deleteJobCh      chan string
-	pollClient       restclient.HTTPClient
 	distributeClient restclient.HTTPClient
 }
 
-func newJobsHandler(typeId string, topicURL string, pollClient restclient.HTTPClient, distributeClient restclient.HTTPClient) *jobsHandler {
+func newJobsHandler(typeDef config.TypeDefinition, mRAddress string, kafkaFactory kafkaclient.KafkaFactory, pollClient restclient.HTTPClient, distributeClient restclient.HTTPClient) *jobsHandler {
+	pollingAgent := createPollingAgent(typeDef, mRAddress, pollClient, kafkaFactory, typeDef.KafkaInputTopic)
 	return &jobsHandler{
-		typeId:           typeId,
-		topicUrl:         topicURL,
+		typeId:           typeDef.Identity,
+		pollingAgent:     pollingAgent,
 		jobs:             make(map[string]job),
 		addJobCh:         make(chan JobInfo),
 		deleteJobCh:      make(chan string),
-		pollClient:       pollClient,
 		distributeClient: distributeClient,
 	}
 }
 
-func (jh *jobsHandler) startPollingAndDistribution(mRAddress string) {
+func (jh *jobsHandler) startPollingAndDistribution() {
 	go func() {
 		for {
-			jh.pollAndDistributeMessages(mRAddress)
+			jh.pollAndDistributeMessages()
 		}
 	}()
 
@@ -168,19 +172,20 @@
 	}()
 }
 
-func (jh *jobsHandler) pollAndDistributeMessages(mRAddress string) {
+func (jh *jobsHandler) pollAndDistributeMessages() {
 	log.Debugf("Processing jobs for type: %v", jh.typeId)
-	messagesBody, error := restclient.Get(mRAddress+jh.topicUrl, jh.pollClient)
+	messagesBody, error := jh.pollingAgent.pollMessages()
 	if error != nil {
-		log.Warn("Error getting data from MR. Cause: ", error)
-		time.Sleep(time.Minute) // Must wait before trying to call MR again
+		log.Warn("Error getting data from source. Cause: ", error)
+		time.Sleep(time.Minute) // Must wait before trying to call data source again
+		return
 	}
-	log.Debug("Received messages: ", string(messagesBody))
 	jh.distributeMessages(messagesBody)
 }
 
 func (jh *jobsHandler) distributeMessages(messages []byte) {
-	if len(messages) > 2 {
+	if string(messages) != "[]" && len(messages) > 0 { // MR returns an ampty array if there are no messages.
+		log.Debug("Distributing messages: ", string(messages))
 		jh.mu.Lock()
 		defer jh.mu.Unlock()
 		for _, job := range jh.jobs {
@@ -234,6 +239,61 @@
 	jh.mu.Unlock()
 }
 
+type pollingAgent interface {
+	pollMessages() ([]byte, error)
+}
+
+func createPollingAgent(typeDef config.TypeDefinition, mRAddress string, pollClient restclient.HTTPClient, kafkaFactory kafkaclient.KafkaFactory, topicID string) pollingAgent {
+	if typeDef.DMaaPTopicURL != "" {
+		return dMaaPPollingAgent{
+			messageRouterURL: mRAddress + typeDef.DMaaPTopicURL,
+			pollClient:       pollClient,
+		}
+	} else {
+		return newKafkaPollingAgent(kafkaFactory, typeDef.KafkaInputTopic)
+	}
+}
+
+type dMaaPPollingAgent struct {
+	messageRouterURL string
+	pollClient       restclient.HTTPClient
+}
+
+func (pa dMaaPPollingAgent) pollMessages() ([]byte, error) {
+	return restclient.Get(pa.messageRouterURL, pa.pollClient)
+}
+
+type kafkaPollingAgent struct {
+	kafkaClient kafkaclient.KafkaClient
+}
+
+func newKafkaPollingAgent(kafkaFactory kafkaclient.KafkaFactory, topicID string) kafkaPollingAgent {
+	c, err := kafkaclient.NewKafkaClient(kafkaFactory, topicID)
+	if err != nil {
+		log.Fatalf("Cannot create Kafka client for topic: %v, error details: %v\n", topicID, err)
+	}
+	return kafkaPollingAgent{
+		kafkaClient: c,
+	}
+}
+
+func (pa kafkaPollingAgent) pollMessages() ([]byte, error) {
+	msg, err := pa.kafkaClient.ReadMessage()
+	if err == nil {
+		return msg, nil
+	} else {
+		if isKafkaTimedOutError(err) {
+			return []byte(""), nil
+		}
+		return nil, err
+	}
+}
+
+func isKafkaTimedOutError(err error) bool {
+	kafkaErr, ok := err.(kafka.Error)
+	return ok && kafkaErr.Code() == kafka.ErrTimedOut
+}
+
 type job struct {
 	jobInfo         JobInfo
 	client          restclient.HTTPClient
@@ -242,6 +302,7 @@
 }
 
 func newJob(j JobInfo, c restclient.HTTPClient) job {
+
 	return job{
 		jobInfo:         j,
 		client:          c,
@@ -250,7 +311,24 @@
 	}
 }
 
+type Parameters struct {
+	BufferTimeout BufferTimeout `json:"bufferTimeout"`
+}
+
+type BufferTimeout struct {
+	MaxSize            int   `json:"maxSize"`
+	MaxTimeMiliseconds int64 `json:"maxTimeMiliseconds"`
+}
+
 func (j *job) start() {
+	if j.jobInfo.InfoJobData.BufferTimeout.MaxSize == 0 {
+		j.startReadingSingleMessages()
+	} else {
+		j.startReadingMessagesBuffered()
+	}
+}
+
+func (j *job) startReadingSingleMessages() {
 out:
 	for {
 		select {
@@ -263,10 +341,68 @@
 	}
 }
 
+func (j *job) startReadingMessagesBuffered() {
+out:
+	for {
+		select {
+		case <-j.controlChannel:
+			log.Debug("Stop distribution for job: ", j.jobInfo.InfoJobIdentity)
+			break out
+		default:
+			msgs := j.read(j.jobInfo.InfoJobData.BufferTimeout)
+			if len(msgs) > 0 {
+				j.sendMessagesToConsumer(msgs)
+			}
+		}
+	}
+}
+
+func (j *job) read(bufferParams BufferTimeout) []byte {
+	wg := sync.WaitGroup{}
+	wg.Add(bufferParams.MaxSize)
+	var msgs []byte
+	c := make(chan struct{})
+	go func() {
+		i := 0
+	out:
+		for {
+			select {
+			case <-c:
+				break out
+			case msg := <-j.messagesChannel:
+				i++
+				msgs = append(msgs, msg...)
+				wg.Done()
+				if i == bufferParams.MaxSize {
+					break out
+				}
+			}
+		}
+	}()
+	j.waitTimeout(&wg, time.Duration(bufferParams.MaxTimeMiliseconds)*time.Millisecond)
+	close(c)
+	return msgs
+}
+
+func (j *job) waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
+	c := make(chan struct{})
+	go func() {
+		defer close(c)
+		wg.Wait()
+	}()
+	select {
+	case <-c:
+		return false // completed normally
+	case <-time.After(timeout):
+		return true // timed out
+	}
+}
+
 func (j *job) sendMessagesToConsumer(messages []byte) {
 	log.Debug("Processing job: ", j.jobInfo.InfoJobIdentity)
 	if postErr := restclient.Post(j.jobInfo.TargetUri, messages, j.client); postErr != nil {
 		log.Warnf("Error posting data for job: %v. Cause: %v", j.jobInfo, postErr)
+		return
 	}
 	log.Debugf("Messages for job: %v distributed to consumer: %v", j.jobInfo.InfoJobIdentity, j.jobInfo.Owner)
 }
diff --git a/dmaap-mediator-producer/internal/jobs/jobs_test.go b/dmaap-mediator-producer/internal/jobs/jobs_test.go
index 30b4ffd..ab1165c 100644
--- a/dmaap-mediator-producer/internal/jobs/jobs_test.go
+++ b/dmaap-mediator-producer/internal/jobs/jobs_test.go
@@ -22,52 +22,60 @@
 
 import (
 	"bytes"
+	"fmt"
 	"io/ioutil"
 	"net/http"
+	"strconv"
 	"sync"
 	"testing"
 	"time"
 
+	"github.com/confluentinc/confluent-kafka-go/kafka"
+	"github.com/stretchr/testify/mock"
 	"github.com/stretchr/testify/require"
 	"oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
+	"oransc.org/nonrtric/dmaapmediatorproducer/internal/kafkaclient"
+	"oransc.org/nonrtric/dmaapmediatorproducer/mocks"
 )
 
-const typeDefinition = `{"types": [{"id": "type1", "dmaapTopicUrl": "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1"}]}`
-
-func TestJobsManagerGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) {
+func TestJobsManagerGetTypes_shouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) {
 	assertions := require.New(t)
 
-	managerUnderTest := NewJobsManagerImpl(nil, "", nil)
+	managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil)
 
-	wantedType := config.TypeDefinition{
-		Id:            "type1",
-		DmaapTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1",
+	wantedDMaaPType := config.TypeDefinition{
+		Identity:      "type1",
+		DMaaPTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1",
 	}
-	wantedTypes := []config.TypeDefinition{wantedType}
+	wantedKafkaType := config.TypeDefinition{
+		Identity:        "type2",
+		KafkaInputTopic: "topic",
+	}
+	wantedTypes := []config.TypeDefinition{wantedDMaaPType, wantedKafkaType}
 
 	types := managerUnderTest.LoadTypesFromConfiguration(wantedTypes)
 
 	assertions.EqualValues(wantedTypes, types)
 
 	supportedTypes := managerUnderTest.GetSupportedTypes()
-	assertions.EqualValues([]string{"type1"}, supportedTypes)
+	assertions.ElementsMatch([]string{"type1", "type2"}, supportedTypes)
 }
 
 func TestJobsManagerAddJobWhenTypeIsSupported_shouldAddJobToChannel(t *testing.T) {
 	assertions := require.New(t)
-	managerUnderTest := NewJobsManagerImpl(nil, "", nil)
+	managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil)
 	wantedJob := JobInfo{
 		Owner:            "owner",
 		LastUpdated:      "now",
 		InfoJobIdentity:  "job1",
 		TargetUri:        "target",
-		InfoJobData:      "{}",
+		InfoJobData:      Parameters{},
 		InfoTypeIdentity: "type1",
 	}
 	jobsHandler := jobsHandler{
 		addJobCh: make(chan JobInfo)}
 	managerUnderTest.allTypes["type1"] = TypeData{
-		TypeId:      "type1",
+		Identity:    "type1",
 		jobsHandler: &jobsHandler,
 	}
 
@@ -83,7 +91,7 @@
 
 func TestJobsManagerAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) {
 	assertions := require.New(t)
-	managerUnderTest := NewJobsManagerImpl(nil, "", nil)
+	managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil)
 	jobInfo := JobInfo{
 		InfoTypeIdentity: "type1",
 	}
@@ -95,9 +103,9 @@
 
 func TestJobsManagerAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) {
 	assertions := require.New(t)
-	managerUnderTest := NewJobsManagerImpl(nil, "", nil)
+	managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil)
 	managerUnderTest.allTypes["type1"] = TypeData{
-		TypeId: "type1",
+		Identity: "type1",
 	}
 
 	jobInfo := JobInfo{
@@ -105,14 +113,14 @@
 	}
 	err := managerUnderTest.AddJobFromRESTCall(jobInfo)
 	assertions.NotNil(err)
-	assertions.Equal("missing required job identity: {    <nil> type1}", err.Error())
+	assertions.Equal("missing required job identity: {    {{0 0}} type1}", err.Error())
 }
 
 func TestJobsManagerAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) {
 	assertions := require.New(t)
-	managerUnderTest := NewJobsManagerImpl(nil, "", nil)
+	managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil)
 	managerUnderTest.allTypes["type1"] = TypeData{
-		TypeId: "type1",
+		Identity: "type1",
 	}
 
 	jobInfo := JobInfo{
@@ -121,16 +129,16 @@
 	}
 	err := managerUnderTest.AddJobFromRESTCall(jobInfo)
 	assertions.NotNil(err)
-	assertions.Equal("missing required target URI: {  job1  <nil> type1}", err.Error())
+	assertions.Equal("missing required target URI: {  job1  {{0 0}} type1}", err.Error())
 }
 
 func TestJobsManagerDeleteJob_shouldSendDeleteToChannel(t *testing.T) {
 	assertions := require.New(t)
-	managerUnderTest := NewJobsManagerImpl(nil, "", nil)
+	managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil)
 	jobsHandler := jobsHandler{
 		deleteJobCh: make(chan string)}
 	managerUnderTest.allTypes["type1"] = TypeData{
-		TypeId:      "type1",
+		Identity:    "type1",
 		jobsHandler: &jobsHandler,
 	}
 
@@ -139,21 +147,21 @@
 	assertions.Equal("job2", <-jobsHandler.deleteJobCh)
 }
 
-func TestAddJobToJobsManager_shouldStartPollAndDistributeMessages(t *testing.T) {
+func TestStartJobsManagerAddDMaaPJob_shouldStartPollAndDistributeMessages(t *testing.T) {
 	assertions := require.New(t)
 
 	called := false
-	messages := `[{"message": {"data": "data"}}]`
+	dMaaPMessages := `[{"message": {"data": "dmaap"}}]`
 	pollClientMock := NewTestClient(func(req *http.Request) *http.Response {
 		if req.URL.String() == "http://mrAddr/topicUrl" {
 			assertions.Equal(req.Method, "GET")
 			body := "[]"
 			if !called {
 				called = true
-				body = messages
+				body = dMaaPMessages
 			}
 			return &http.Response{
-				StatusCode: 200,
+				StatusCode: http.StatusOK,
 				Body:       ioutil.NopCloser(bytes.NewReader([]byte(body))),
 				Header:     make(http.Header), // Must be set to non-nil value or it panics
 			}
@@ -165,9 +173,9 @@
 
 	wg := sync.WaitGroup{}
 	distributeClientMock := NewTestClient(func(req *http.Request) *http.Response {
-		if req.URL.String() == "http://consumerHost/target" {
+		if req.URL.String() == "http://consumerHost/dmaaptarget" {
 			assertions.Equal(req.Method, "POST")
-			assertions.Equal(messages, getBodyAsString(req, t))
+			assertions.Equal(dMaaPMessages, getBodyAsString(req, t))
 			assertions.Equal("application/json", req.Header.Get("Content-Type"))
 			wg.Done()
 			return &http.Response{
@@ -180,25 +188,88 @@
 		t.Fail()
 		return nil
 	})
-	jobsHandler := newJobsHandler("type1", "/topicUrl", pollClientMock, distributeClientMock)
-
-	jobsManager := NewJobsManagerImpl(pollClientMock, "http://mrAddr", distributeClientMock)
-	jobsManager.allTypes["type1"] = TypeData{
+	dMaaPTypeDef := config.TypeDefinition{
+		Identity:      "type1",
 		DMaaPTopicURL: "/topicUrl",
-		TypeId:        "type1",
-		jobsHandler:   jobsHandler,
+	}
+	dMaaPJobsHandler := newJobsHandler(dMaaPTypeDef, "http://mrAddr", nil, pollClientMock, distributeClientMock)
+
+	jobsManager := NewJobsManagerImpl(pollClientMock, "http://mrAddr", kafkaclient.KafkaFactoryImpl{}, distributeClientMock)
+	jobsManager.allTypes["type1"] = TypeData{
+		Identity:    "type1",
+		jobsHandler: dMaaPJobsHandler,
+	}
+	jobsManager.StartJobsForAllTypes()
+
+	dMaaPJobInfo := JobInfo{
+		InfoTypeIdentity: "type1",
+		InfoJobIdentity:  "job1",
+		TargetUri:        "http://consumerHost/dmaaptarget",
+	}
+
+	wg.Add(1) // Wait till the distribution has happened
+	err := jobsManager.AddJobFromRESTCall(dMaaPJobInfo)
+	assertions.Nil(err)
+
+	if waitTimeout(&wg, 2*time.Second) {
+		t.Error("Not all calls to server were made")
+		t.Fail()
+	}
+}
+
+func TestStartJobsManagerAddKafkaJob_shouldStartPollAndDistributeMessages(t *testing.T) {
+	assertions := require.New(t)
+
+	kafkaMessages := `1`
+	wg := sync.WaitGroup{}
+	distributeClientMock := NewTestClient(func(req *http.Request) *http.Response {
+		if req.URL.String() == "http://consumerHost/kafkatarget" {
+			assertions.Equal(req.Method, "POST")
+			assertions.Equal(kafkaMessages, getBodyAsString(req, t))
+			assertions.Equal("application/json", req.Header.Get("Content-Type"))
+			wg.Done()
+			return &http.Response{
+				StatusCode: 200,
+				Body:       ioutil.NopCloser(bytes.NewBufferString(`OK`)),
+				Header:     make(http.Header), // Must be set to non-nil value or it panics
+			}
+		}
+		t.Error("Wrong call to client: ", req)
+		t.Fail()
+		return nil
+	})
+
+	kafkaTypeDef := config.TypeDefinition{
+		Identity:        "type2",
+		KafkaInputTopic: "topic",
+	}
+	kafkaFactoryMock := mocks.KafkaFactory{}
+	kafkaConsumerMock := mocks.KafkaConsumer{}
+	kafkaConsumerMock.On("Commit").Return([]kafka.TopicPartition{}, error(nil))
+	kafkaConsumerMock.On("Subscribe", mock.Anything).Return(error(nil))
+	kafkaConsumerMock.On("ReadMessage", mock.Anything).Return(&kafka.Message{
+		Value: []byte(kafkaMessages),
+	}, error(nil)).Once()
+	kafkaConsumerMock.On("ReadMessage", mock.Anything).Return(nil, fmt.Errorf("Just to stop"))
+	kafkaFactoryMock.On("NewKafkaConsumer", mock.Anything).Return(kafkaConsumerMock, nil)
+	kafkaJobsHandler := newJobsHandler(kafkaTypeDef, "", kafkaFactoryMock, nil, distributeClientMock)
+
+	jobsManager := NewJobsManagerImpl(nil, "", kafkaFactoryMock, distributeClientMock)
+	jobsManager.allTypes["type2"] = TypeData{
+		Identity:    "type2",
+		jobsHandler: kafkaJobsHandler,
 	}
 
 	jobsManager.StartJobsForAllTypes()
 
-	jobInfo := JobInfo{
-		InfoTypeIdentity: "type1",
-		InfoJobIdentity:  "job1",
-		TargetUri:        "http://consumerHost/target",
+	kafkaJobInfo := JobInfo{
+		InfoTypeIdentity: "type2",
+		InfoJobIdentity:  "job2",
+		TargetUri:        "http://consumerHost/kafkatarget",
 	}
 
 	wg.Add(1) // Wait till the distribution has happened
-	err := jobsManager.AddJobFromRESTCall(jobInfo)
+	err := jobsManager.AddJobFromRESTCall(kafkaJobInfo)
 	assertions.Nil(err)
 
 	if waitTimeout(&wg, 2*time.Second) {
@@ -210,7 +281,11 @@
 func TestJobsHandlerDeleteJob_shouldDeleteJobFromJobsMap(t *testing.T) {
 	jobToDelete := newJob(JobInfo{}, nil)
 	go jobToDelete.start()
-	jobsHandler := newJobsHandler("type1", "/topicUrl", nil, nil)
+	typeDef := config.TypeDefinition{
+		Identity:      "type1",
+		DMaaPTopicURL: "/topicUrl",
+	}
+	jobsHandler := newJobsHandler(typeDef, "http://mrAddr", kafkaclient.KafkaFactoryImpl{}, nil, nil)
 	jobsHandler.jobs["job1"] = jobToDelete
 
 	go jobsHandler.monitorManagementChannels()
@@ -233,7 +308,11 @@
 		InfoJobIdentity: "job",
 	}, nil)
 
-	jobsHandler := newJobsHandler("type1", "/topicUrl", nil, nil)
+	typeDef := config.TypeDefinition{
+		Identity:      "type1",
+		DMaaPTopicURL: "/topicUrl",
+	}
+	jobsHandler := newJobsHandler(typeDef, "http://mrAddr", kafkaclient.KafkaFactoryImpl{}, nil, nil)
 	jobsHandler.jobs["job1"] = job
 
 	fillMessagesBuffer(job.messagesChannel)
@@ -243,6 +322,143 @@
 	require.New(t).Len(job.messagesChannel, 0)
 }
 
+func TestKafkaPollingAgentTimedOut_shouldResultInEMptyMessages(t *testing.T) {
+	assertions := require.New(t)
+
+	kafkaFactoryMock := mocks.KafkaFactory{}
+	kafkaConsumerMock := mocks.KafkaConsumer{}
+	kafkaConsumerMock.On("Commit").Return([]kafka.TopicPartition{}, error(nil))
+	kafkaConsumerMock.On("Subscribe", mock.Anything).Return(error(nil))
+	kafkaConsumerMock.On("ReadMessage", mock.Anything).Return(nil, kafka.NewError(kafka.ErrTimedOut, "", false))
+	kafkaFactoryMock.On("NewKafkaConsumer", mock.Anything).Return(kafkaConsumerMock, nil)
+
+	pollingAgentUnderTest := newKafkaPollingAgent(kafkaFactoryMock, "")
+	messages, err := pollingAgentUnderTest.pollMessages()
+
+	assertions.Equal([]byte(""), messages)
+	assertions.Nil(err)
+}
+
+func TestJobWithoutParameters_shouldSendOneMessageAtATime(t *testing.T) {
+	assertions := require.New(t)
+
+	wg := sync.WaitGroup{}
+	messageNo := 1
+	distributeClientMock := NewTestClient(func(req *http.Request) *http.Response {
+		if req.URL.String() == "http://consumerHost/target" {
+			assertions.Equal(req.Method, "POST")
+			assertions.Equal(fmt.Sprint("message", messageNo), getBodyAsString(req, t))
+			messageNo++
+			assertions.Equal("application/json", req.Header.Get("Content-Type"))
+			wg.Done()
+			return &http.Response{
+				StatusCode: 200,
+				Body:       ioutil.NopCloser(bytes.NewBufferString(`OK`)),
+				Header:     make(http.Header), // Must be set to non-nil value or it panics
+			}
+		}
+		t.Error("Wrong call to client: ", req)
+		t.Fail()
+		return nil
+	})
+
+	jobUnderTest := newJob(JobInfo{TargetUri: "http://consumerHost/target"}, distributeClientMock)
+
+	wg.Add(2)
+	go jobUnderTest.start()
+
+	jobUnderTest.messagesChannel <- []byte("message1")
+	jobUnderTest.messagesChannel <- []byte("message2")
+
+	if waitTimeout(&wg, 2*time.Second) {
+		t.Error("Not all calls to server were made")
+		t.Fail()
+	}
+}
+
+func TestJobWithBufferedParameters_shouldSendMessagesTogether(t *testing.T) {
+	assertions := require.New(t)
+
+	wg := sync.WaitGroup{}
+	distributeClientMock := NewTestClient(func(req *http.Request) *http.Response {
+		if req.URL.String() == "http://consumerHost/target" {
+			assertions.Equal(req.Method, "POST")
+			assertions.Equal("12", getBodyAsString(req, t))
+			assertions.Equal("application/json", req.Header.Get("Content-Type"))
+			wg.Done()
+			return &http.Response{
+				StatusCode: 200,
+				Body:       ioutil.NopCloser(bytes.NewBufferString(`OK`)),
+				Header:     make(http.Header), // Must be set to non-nil value or it panics
+			}
+		}
+		t.Error("Wrong call to client: ", req)
+		t.Fail()
+		return nil
+	})
+
+	jobUnderTest := newJob(JobInfo{
+		TargetUri: "http://consumerHost/target",
+		InfoJobData: Parameters{
+			BufferTimeout: BufferTimeout{
+				MaxSize:            5,
+				MaxTimeMiliseconds: 200,
+			},
+		},
+	}, distributeClientMock)
+
+	wg.Add(1)
+	go jobUnderTest.start()
+
+	go func() {
+		jobUnderTest.messagesChannel <- []byte("1")
+		jobUnderTest.messagesChannel <- []byte("2")
+	}()
+
+	if waitTimeout(&wg, 2*time.Second) {
+		t.Error("Not all calls to server were made")
+		t.Fail()
+	}
+}
+
+func TestJobReadMoreThanBufferSizeMessages_shouldOnlyReturnMaxSizeNoOfMessages(t *testing.T) {
+	assertions := require.New(t)
+
+	jobUnderTest := newJob(JobInfo{}, nil)
+
+	go func() {
+		for i := 0; i < 4; i++ {
+			jobUnderTest.messagesChannel <- []byte(strconv.Itoa(i))
+		}
+	}()
+
+	msgs := jobUnderTest.read(BufferTimeout{
+		MaxSize:            2,
+		MaxTimeMiliseconds: 200,
+	})
+
+	assertions.Equal([]byte("01"), msgs)
+}
+func TestJobReadBufferedWhenTimeout_shouldOnlyReturnMessagesSentBeforeTimeout(t *testing.T) {
+	assertions := require.New(t)
+
+	jobUnderTest := newJob(JobInfo{}, nil)
+
+	go func() {
+		for i := 0; i < 4; i++ {
+			time.Sleep(10 * time.Millisecond)
+			jobUnderTest.messagesChannel <- []byte(strconv.Itoa(i))
+		}
+	}()
+
+	msgs := jobUnderTest.read(BufferTimeout{
+		MaxSize:            2,
+		MaxTimeMiliseconds: 30,
+	})
+
+	assertions.Equal([]byte("01"), msgs)
+}
+
 func fillMessagesBuffer(mc chan []byte) {
 	for i := 0; i < cap(mc); i++ {
 		mc <- []byte("msg")
diff --git a/dmaap-mediator-producer/internal/kafkaclient/kafkaclient.go b/dmaap-mediator-producer/internal/kafkaclient/kafkaclient.go
new file mode 100644
index 0000000..16abcb4
--- /dev/null
+++ b/dmaap-mediator-producer/internal/kafkaclient/kafkaclient.go
@@ -0,0 +1,94 @@
+// -
+//   ========================LICENSE_START=================================
+//   O-RAN-SC
+//   %%
+//   Copyright (C) 2021: Nordix Foundation
+//   %%
+//   Licensed under the Apache License, Version 2.0 (the "License");
+//   you may not use this file except in compliance with the License.
+//   You may obtain a copy of the License at
+//
+//        http://www.apache.org/licenses/LICENSE-2.0
+//
+//   Unless required by applicable law or agreed to in writing, software
+//   distributed under the License is distributed on an "AS IS" BASIS,
+//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//   See the License for the specific language governing permissions and
+//   limitations under the License.
+//   ========================LICENSE_END===================================
+//
+
+package kafkaclient
+
+import (
+	"time"
+
+	"github.com/confluentinc/confluent-kafka-go/kafka"
+)
+
+type KafkaFactory interface {
+	NewKafkaConsumer(topicID string) (KafkaConsumer, error)
+}
+
+type KafkaFactoryImpl struct {
+	BootstrapServer string
+}
+
+func (kf KafkaFactoryImpl) NewKafkaConsumer(topicID string) (KafkaConsumer, error) {
+	consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
+		"bootstrap.servers": kf.BootstrapServer,
+		"group.id":          "dmaap-mediator-producer",
+		"auto.offset.reset": "earliest",
+	})
+	if err != nil {
+		return nil, err
+	}
+	return KafkaConsumerImpl{consumer: consumer}, nil
+}
+
+func NewKafkaClient(factory KafkaFactory, topicID string) (KafkaClient, error) {
+	consumer, err := factory.NewKafkaConsumer(topicID)
+	if err != nil {
+		return KafkaClient{}, err
+	}
+	consumer.Commit()
+	err = consumer.Subscribe(topicID)
+	if err != nil {
+		return KafkaClient{}, err
+	}
+	return KafkaClient{consumer: consumer}, nil
+}
+
+type KafkaClient struct {
+	consumer KafkaConsumer
+}
+
+func (kc KafkaClient) ReadMessage() ([]byte, error) {
+	msg, err := kc.consumer.ReadMessage(time.Second)
+	if err != nil {
+		return nil, err
+	}
+	return msg.Value, nil
+}
+
+type KafkaConsumer interface {
+	Commit() ([]kafka.TopicPartition, error)
+	Subscribe(topic string) (err error)
+	ReadMessage(timeout time.Duration) (*kafka.Message, error)
+}
+
+type KafkaConsumerImpl struct {
+	consumer *kafka.Consumer
+}
+
+func (kc KafkaConsumerImpl) Commit() ([]kafka.TopicPartition, error) {
+	return kc.consumer.Commit()
+}
+
+func (kc KafkaConsumerImpl) Subscribe(topic string) error {
+	return kc.consumer.Subscribe(topic, nil)
+}
+
+func (kc KafkaConsumerImpl) ReadMessage(timeout time.Duration) (*kafka.Message, error) {
+	return kc.consumer.ReadMessage(timeout)
+}
diff --git a/dmaap-mediator-producer/internal/server/server_test.go b/dmaap-mediator-producer/internal/server/server_test.go
index 6248c22..6fe4d7a 100644
--- a/dmaap-mediator-producer/internal/server/server_test.go
+++ b/dmaap-mediator-producer/internal/server/server_test.go
@@ -34,7 +34,7 @@
 	"github.com/stretchr/testify/mock"
 	"github.com/stretchr/testify/require"
 	"oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
-	"oransc.org/nonrtric/dmaapmediatorproducer/mocks/jobhandler"
+	"oransc.org/nonrtric/dmaapmediatorproducer/mocks/jobshandler"
 )
 
 func TestNewRouter(t *testing.T) {
@@ -88,7 +88,7 @@
 	assertions.Equal("/admin/log", path)
 }
 
-func TestAddInfoJobHandler(t *testing.T) {
+func TestAddInfoJobToJobsHandler(t *testing.T) {
 	assertions := require.New(t)
 
 	type args struct {
@@ -102,21 +102,21 @@
 		wantedBody   string
 	}{
 		{
-			name: "AddInfoJobHandler with correct job, should return OK",
+			name: "AddInfoJobToJobsHandler with correct job, should return OK",
 			args: args{
 				job: jobs.JobInfo{
 					Owner:            "owner",
 					LastUpdated:      "now",
 					InfoJobIdentity:  "jobId",
 					TargetUri:        "target",
-					InfoJobData:      "{}",
+					InfoJobData:      jobs.Parameters{},
 					InfoTypeIdentity: "type",
 				},
 			},
 			wantedStatus: http.StatusOK,
 		},
 		{
-			name: "AddInfoJobHandler with incorrect job info, should return BadRequest",
+			name: "AddInfoJobToJobsHandler with incorrect job info, should return BadRequest",
 			args: args{
 				job: jobs.JobInfo{
 					Owner: "bad",
@@ -129,10 +129,10 @@
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			jobHandlerMock := jobhandler.JobHandler{}
-			jobHandlerMock.On("AddJobFromRESTCall", tt.args.job).Return(tt.args.mockReturn)
+			jobsHandlerMock := jobshandler.JobsHandler{}
+			jobsHandlerMock.On("AddJobFromRESTCall", tt.args.job).Return(tt.args.mockReturn)
 
-			callbackHandlerUnderTest := NewProducerCallbackHandler(&jobHandlerMock)
+			callbackHandlerUnderTest := NewProducerCallbackHandler(&jobsHandlerMock)
 
 			handler := http.HandlerFunc(callbackHandlerUnderTest.addInfoJobHandler)
 			responseRecorder := httptest.NewRecorder()
@@ -142,17 +142,17 @@
 
 			assertions.Equal(tt.wantedStatus, responseRecorder.Code, tt.name)
 			assertions.Contains(responseRecorder.Body.String(), tt.wantedBody, tt.name)
-			jobHandlerMock.AssertCalled(t, "AddJobFromRESTCall", tt.args.job)
+			jobsHandlerMock.AssertCalled(t, "AddJobFromRESTCall", tt.args.job)
 		})
 	}
 }
 
 func TestDeleteJob(t *testing.T) {
 	assertions := require.New(t)
-	jobHandlerMock := jobhandler.JobHandler{}
-	jobHandlerMock.On("DeleteJobFromRESTCall", mock.Anything).Return(nil)
+	jobsHandlerMock := jobshandler.JobsHandler{}
+	jobsHandlerMock.On("DeleteJobFromRESTCall", mock.Anything).Return(nil)
 
-	callbackHandlerUnderTest := NewProducerCallbackHandler(&jobHandlerMock)
+	callbackHandlerUnderTest := NewProducerCallbackHandler(&jobsHandlerMock)
 
 	responseRecorder := httptest.NewRecorder()
 	r := mux.SetURLVars(newRequest(http.MethodDelete, "/jobs/", nil, t), map[string]string{"infoJobId": "job1"})
@@ -162,7 +162,7 @@
 
 	assertions.Equal("", responseRecorder.Body.String())
 
-	jobHandlerMock.AssertCalled(t, "DeleteJobFromRESTCall", "job1")
+	jobsHandlerMock.AssertCalled(t, "DeleteJobFromRESTCall", "job1")
 }
 
 func TestSetLogLevel(t *testing.T) {
diff --git a/dmaap-mediator-producer/main.go b/dmaap-mediator-producer/main.go
index 1aabdda..819ffa9 100644
--- a/dmaap-mediator-producer/main.go
+++ b/dmaap-mediator-producer/main.go
@@ -29,6 +29,7 @@
 	log "github.com/sirupsen/logrus"
 	"oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
 	"oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
+	"oransc.org/nonrtric/dmaapmediatorproducer/internal/kafkaclient"
 	"oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
 	"oransc.org/nonrtric/dmaapmediatorproducer/internal/server"
 )
@@ -55,9 +56,12 @@
 	} else {
 		log.Fatalf("Stopping producer due to error: %v", err)
 	}
-	retryClient := restclient.CreateRetryClient(cert)
 
-	jobsManager := jobs.NewJobsManagerImpl(retryClient, configuration.DMaaPMRAddress, restclient.CreateClientWithoutRetry(cert, 10*time.Second))
+	retryClient := restclient.CreateRetryClient(cert)
+	kafkaFactory := kafkaclient.KafkaFactoryImpl{BootstrapServer: configuration.KafkaBootstrapServers}
+	distributionClient := restclient.CreateClientWithoutRetry(cert, 10*time.Second)
+
+	jobsManager := jobs.NewJobsManagerImpl(retryClient, configuration.DMaaPMRAddress, kafkaFactory, distributionClient)
 	go startCallbackServer(jobsManager, callbackAddress)
 
 	if err := registerTypesAndProducer(jobsManager, configuration.InfoCoordinatorAddress, callbackAddress, retryClient); err != nil {
@@ -78,11 +82,14 @@
 	if configuration.ProducerCertPath == "" || configuration.ProducerKeyPath == "" {
 		return fmt.Errorf("missing PRODUCER_CERT and/or PRODUCER_KEY")
 	}
+	if configuration.DMaaPMRAddress == "" && configuration.KafkaBootstrapServers == "" {
+		return fmt.Errorf("at least one of DMAAP_MR_ADDR or KAFKA_BOOTSRAP_SERVERS must be provided")
+	}
 	return nil
 }
 func registerTypesAndProducer(jobTypesHandler jobs.JobTypesManager, infoCoordinatorAddress string, callbackAddress string, client restclient.HTTPClient) error {
 	registrator := config.NewRegistratorImpl(infoCoordinatorAddress, client)
-	configTypes, err := config.GetJobTypesFromConfiguration("configs/type_config.json")
+	configTypes, err := config.GetJobTypesFromConfiguration("configs")
 	if err != nil {
 		return fmt.Errorf("unable to register all types due to: %v", err)
 	}
diff --git a/dmaap-mediator-producer/mocks/KafkaConsumer.go b/dmaap-mediator-producer/mocks/KafkaConsumer.go
new file mode 100644
index 0000000..8ae0893
--- /dev/null
+++ b/dmaap-mediator-producer/mocks/KafkaConsumer.go
@@ -0,0 +1,76 @@
+// Code generated by mockery v1.0.0. DO NOT EDIT.
+
+package mocks
+
+import (
+	kafka "github.com/confluentinc/confluent-kafka-go/kafka"
+
+	mock "github.com/stretchr/testify/mock"
+
+	time "time"
+)
+
+// KafkaConsumer is an autogenerated mock type for the KafkaConsumer type
+type KafkaConsumer struct {
+	mock.Mock
+}
+
+// Commit provides a mock function with given fields:
+func (_m KafkaConsumer) Commit() ([]kafka.TopicPartition, error) {
+	ret := _m.Called()
+
+	var r0 []kafka.TopicPartition
+	if rf, ok := ret.Get(0).(func() []kafka.TopicPartition); ok {
+		r0 = rf()
+	} else {
+		if ret.Get(0) != nil {
+			r0 = ret.Get(0).([]kafka.TopicPartition)
+		}
+	}
+
+	var r1 error
+	if rf, ok := ret.Get(1).(func() error); ok {
+		r1 = rf()
+	} else {
+		r1 = ret.Error(1)
+	}
+
+	return r0, r1
+}
+
+// ReadMessage provides a mock function with given fields: timeout
+func (_m KafkaConsumer) ReadMessage(timeout time.Duration) (*kafka.Message, error) {
+	ret := _m.Called(timeout)
+
+	var r0 *kafka.Message
+	if rf, ok := ret.Get(0).(func(time.Duration) *kafka.Message); ok {
+		r0 = rf(timeout)
+	} else {
+		if ret.Get(0) != nil {
+			r0 = ret.Get(0).(*kafka.Message)
+		}
+	}
+
+	var r1 error
+	if rf, ok := ret.Get(1).(func(time.Duration) error); ok {
+		r1 = rf(timeout)
+	} else {
+		r1 = ret.Error(1)
+	}
+
+	return r0, r1
+}
+
+// Subscribe provides a mock function with given fields: topic
+func (_m KafkaConsumer) Subscribe(topic string) error {
+	ret := _m.Called(topic)
+
+	var r0 error
+	if rf, ok := ret.Get(0).(func(string) error); ok {
+		r0 = rf(topic)
+	} else {
+		r0 = ret.Error(0)
+	}
+
+	return r0
+}
diff --git a/dmaap-mediator-producer/mocks/KafkaFactory.go b/dmaap-mediator-producer/mocks/KafkaFactory.go
new file mode 100644
index 0000000..f05457a
--- /dev/null
+++ b/dmaap-mediator-producer/mocks/KafkaFactory.go
@@ -0,0 +1,36 @@
+// Code generated by mockery v1.0.0. DO NOT EDIT.
+
+package mocks
+
+import (
+	mock "github.com/stretchr/testify/mock"
+	"oransc.org/nonrtric/dmaapmediatorproducer/internal/kafkaclient"
+)
+
+// KafkaFactory is an autogenerated mock type for the KafkaFactory type
+type KafkaFactory struct {
+	mock.Mock
+}
+
+// NewKafkaConsumer provides a mock function with given fields: topicID
+func (_m KafkaFactory) NewKafkaConsumer(topicID string) (kafkaclient.KafkaConsumer, error) {
+	ret := _m.Called(topicID)
+
+	var r0 kafkaclient.KafkaConsumer
+	if rf, ok := ret.Get(0).(func(string) kafkaclient.KafkaConsumer); ok {
+		r0 = rf(topicID)
+	} else {
+		if ret.Get(0) != nil {
+			r0 = ret.Get(0).(kafkaclient.KafkaConsumer)
+		}
+	}
+
+	var r1 error
+	if rf, ok := ret.Get(1).(func(string) error); ok {
+		r1 = rf(topicID)
+	} else {
+		r1 = ret.Error(1)
+	}
+
+	return r0, r1
+}
diff --git a/dmaap-mediator-producer/mocks/jobhandler/JobHandler.go b/dmaap-mediator-producer/mocks/jobshandler/JobsHandler.go
similarity index 66%
rename from dmaap-mediator-producer/mocks/jobhandler/JobHandler.go
rename to dmaap-mediator-producer/mocks/jobshandler/JobsHandler.go
index ad20752..271590f 100644
--- a/dmaap-mediator-producer/mocks/jobhandler/JobHandler.go
+++ b/dmaap-mediator-producer/mocks/jobshandler/JobsHandler.go
@@ -1,19 +1,19 @@
 // Code generated by mockery v2.9.3. DO NOT EDIT.
 
-package jobhandler
+package jobshandler
 
 import (
 	mock "github.com/stretchr/testify/mock"
 	jobs "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
 )
 
-// JobHandler is an autogenerated mock type for the JobHandler type
-type JobHandler struct {
+// JobsHandler is an autogenerated mock type for the JobsHandler type
+type JobsHandler struct {
 	mock.Mock
 }
 
 // AddJob provides a mock function with given fields: _a0
-func (_m *JobHandler) AddJobFromRESTCall(_a0 jobs.JobInfo) error {
+func (_m *JobsHandler) AddJobFromRESTCall(_a0 jobs.JobInfo) error {
 	ret := _m.Called(_a0)
 
 	var r0 error
@@ -27,6 +27,6 @@
 }
 
 // DeleteJob provides a mock function with given fields: jobId
-func (_m *JobHandler) DeleteJobFromRESTCall(jobId string) {
+func (_m *JobsHandler) DeleteJobFromRESTCall(jobId string) {
 	_m.Called(jobId)
 }