Add kafka jobs to DMaaP Mediator Producer
Issue-ID: NONRTRIC-702
Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
Change-Id: Ia7123240ad91449209afc4769b850cb0c8d9598f
diff --git a/dmaap-mediator-producer/configs/typeSchemaDmaap.json b/dmaap-mediator-producer/configs/typeSchemaDmaap.json
new file mode 100644
index 0000000..a50b236
--- /dev/null
+++ b/dmaap-mediator-producer/configs/typeSchemaDmaap.json
@@ -0,0 +1,10 @@
+{
+ "$schema": "http://json-schema.org/draft-04/schema#",
+ "type": "object",
+ "properties": {
+ "filter": {
+ "type": "string"
+ }
+ },
+ "additionalProperties": false
+}
diff --git a/dmaap-mediator-producer/configs/typeSchemaKafka.json b/dmaap-mediator-producer/configs/typeSchemaKafka.json
new file mode 100644
index 0000000..dcd40f9
--- /dev/null
+++ b/dmaap-mediator-producer/configs/typeSchemaKafka.json
@@ -0,0 +1,26 @@
+{
+ "$schema": "http://json-schema.org/draft-04/schema#",
+ "type": "object",
+ "properties": {
+ "filter": {
+ "type": "string"
+ },
+ "bufferTimeout": {
+ "type": "object",
+ "properties": {
+ "maxSize": {
+ "type": "integer"
+ },
+ "maxTimeMiliseconds": {
+ "type": "integer"
+ }
+ },
+ "additionalProperties": false,
+ "required": [
+ "maxSize",
+ "maxTimeMiliseconds"
+ ]
+ }
+ },
+ "additionalProperties": false
+}
\ No newline at end of file
diff --git a/dmaap-mediator-producer/configs/type_config.json b/dmaap-mediator-producer/configs/type_config.json
index f75d0e4..1149669 100644
--- a/dmaap-mediator-producer/configs/type_config.json
+++ b/dmaap-mediator-producer/configs/type_config.json
@@ -4,6 +4,10 @@
{
"id": "STD_Fault_Messages",
"dmaapTopicUrl": "/events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/STD_Fault_Messages"
+ },
+ {
+ "id": "Kafka_TestTopic",
+ "kafkaInputTopic": "TestTopic"
}
]
}
\ No newline at end of file
diff --git a/dmaap-mediator-producer/go.mod b/dmaap-mediator-producer/go.mod
index eaaecf7..e701c01 100644
--- a/dmaap-mediator-producer/go.mod
+++ b/dmaap-mediator-producer/go.mod
@@ -10,6 +10,7 @@
)
require (
+ github.com/confluentinc/confluent-kafka-go v1.8.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/hashicorp/go-cleanhttp v0.5.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
diff --git a/dmaap-mediator-producer/go.sum b/dmaap-mediator-producer/go.sum
index 4b3557b..cf2c90f 100644
--- a/dmaap-mediator-producer/go.sum
+++ b/dmaap-mediator-producer/go.sum
@@ -1,3 +1,5 @@
+github.com/confluentinc/confluent-kafka-go v1.8.2 h1:PBdbvYpyOdFLehj8j+9ba7FL4c4Moxn79gy9cYKxG5E=
+github.com/confluentinc/confluent-kafka-go v1.8.2/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
diff --git a/dmaap-mediator-producer/internal/config/config.go b/dmaap-mediator-producer/internal/config/config.go
index e03c40a..7582e9c 100644
--- a/dmaap-mediator-producer/internal/config/config.go
+++ b/dmaap-mediator-producer/internal/config/config.go
@@ -24,6 +24,7 @@
"encoding/json"
"fmt"
"os"
+ "path/filepath"
"strconv"
log "github.com/sirupsen/logrus"
@@ -35,6 +36,7 @@
InfoProducerPort int
InfoCoordinatorAddress string
DMaaPMRAddress string
+ KafkaBootstrapServers string
ProducerCertPath string
ProducerKeyPath string
}
@@ -45,6 +47,7 @@
InfoProducerPort: getEnvAsInt("INFO_PRODUCER_PORT", 8085),
InfoCoordinatorAddress: getEnv("INFO_COORD_ADDR", "https://informationservice:8434"),
DMaaPMRAddress: getEnv("DMAAP_MR_ADDR", "https://message-router.onap:3905"),
+ KafkaBootstrapServers: getEnv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092"),
ProducerCertPath: getEnv("PRODUCER_CERT_PATH", "security/producer.crt"),
ProducerKeyPath: getEnv("PRODUCER_KEY_PATH", "security/producer.key"),
LogLevel: getLogLevel(),
@@ -83,8 +86,8 @@
}
}
-func GetJobTypesFromConfiguration(configFile string) ([]TypeDefinition, error) {
- typeDefsByte, err := os.ReadFile(configFile)
+func GetJobTypesFromConfiguration(configFolder string) ([]TypeDefinition, error) {
+ typeDefsByte, err := os.ReadFile(filepath.Join(configFolder, "type_config.json"))
if err != nil {
return nil, err
}
@@ -96,5 +99,35 @@
return nil, err
}
+ kafkaTypeSchema, err := getTypeSchema(filepath.Join(configFolder, "typeSchemaKafka.json"))
+ if err != nil {
+ return nil, err
+ }
+
+ dMaaPTypeSchema, err := getTypeSchema(filepath.Join(configFolder, "typeSchemaDmaap.json"))
+ if err != nil {
+ return nil, err
+ }
+
+ for i, typeDef := range typeDefs.Types {
+ if typeDef.IsKafkaType() {
+ typeDefs.Types[i].TypeSchema = kafkaTypeSchema
+ } else {
+ typeDefs.Types[i].TypeSchema = dMaaPTypeSchema
+ }
+ }
return typeDefs.Types, nil
}
+
+func getTypeSchema(schemaFile string) (interface{}, error) {
+ typeDefsByte, err := os.ReadFile(schemaFile)
+ if err != nil {
+ return nil, err
+ }
+ var schema interface{}
+ err = json.Unmarshal(typeDefsByte, &schema)
+ if err != nil {
+ return nil, err
+ }
+ return schema, nil
+}
diff --git a/dmaap-mediator-producer/internal/config/config_test.go b/dmaap-mediator-producer/internal/config/config_test.go
index faf5900..e66a818 100644
--- a/dmaap-mediator-producer/internal/config/config_test.go
+++ b/dmaap-mediator-producer/internal/config/config_test.go
@@ -22,6 +22,7 @@
import (
"bytes"
+ "encoding/json"
"os"
"path/filepath"
"testing"
@@ -37,6 +38,7 @@
os.Setenv("INFO_PRODUCER_PORT", "8095")
os.Setenv("INFO_COORD_ADDR", "infoCoordAddr")
os.Setenv("DMAAP_MR_ADDR", "mrHost:3908")
+ os.Setenv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9093")
os.Setenv("PRODUCER_CERT_PATH", "cert")
os.Setenv("PRODUCER_KEY_PATH", "key")
t.Cleanup(func() {
@@ -48,6 +50,7 @@
InfoProducerPort: 8095,
InfoCoordinatorAddress: "infoCoordAddr",
DMaaPMRAddress: "mrHost:3908",
+ KafkaBootstrapServers: "localhost:9093",
ProducerCertPath: "cert",
ProducerKeyPath: "key",
}
@@ -72,6 +75,7 @@
InfoProducerPort: 8085,
InfoCoordinatorAddress: "https://informationservice:8434",
DMaaPMRAddress: "https://message-router.onap:3905",
+ KafkaBootstrapServers: "localhost:9092",
ProducerCertPath: "security/producer.crt",
ProducerKeyPath: "security/producer.key",
}
@@ -98,6 +102,7 @@
InfoProducerPort: 8085,
InfoCoordinatorAddress: "https://informationservice:8434",
DMaaPMRAddress: "https://message-router.onap:3905",
+ KafkaBootstrapServers: "localhost:9092",
ProducerCertPath: "security/producer.crt",
ProducerKeyPath: "security/producer.key",
}
@@ -109,7 +114,17 @@
assertions.Contains(logString, "Invalid log level: wrong. Log level will be Info!")
}
-const typeDefinition = `{"types": [{"id": "type1", "dmaapTopicUrl": "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1"}]}`
+const typeDefinition = `{"types": [{"id": "type1", "dmaapTopicUrl": "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1"}, {"id": "type2", "kafkaInputTopic": "TestTopic"}]}`
+const typeSchemaFileContent = `{
+ "$schema": "http://json-schema.org/draft-04/schema#",
+ "type": "object",
+ "properties": {
+ "filter": {
+ "type": "string"
+ }
+ },
+ "additionalProperties": false
+ }`
func TestGetTypesFromConfiguration_fileOkShouldReturnSliceOfTypeDefinitions(t *testing.T) {
assertions := require.New(t)
@@ -124,14 +139,30 @@
if err = os.WriteFile(fname, []byte(typeDefinition), 0666); err != nil {
t.Errorf("Unable to create temporary config file for types due to: %v", err)
}
-
- types, err := GetJobTypesFromConfiguration(fname)
-
- wantedType := TypeDefinition{
- Id: "type1",
- DmaapTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1",
+ fname = filepath.Join(typesDir, "typeSchemaDmaap.json")
+ if err = os.WriteFile(fname, []byte(typeSchemaFileContent), 0666); err != nil {
+ t.Errorf("Unable to create temporary schema file for DMaaP type due to: %v", err)
}
- wantedTypes := []TypeDefinition{wantedType}
+ fname = filepath.Join(typesDir, "typeSchemaKafka.json")
+ if err = os.WriteFile(fname, []byte(typeSchemaFileContent), 0666); err != nil {
+ t.Errorf("Unable to create temporary schema file for Kafka type due to: %v", err)
+ }
+ var typeSchemaObj interface{}
+ json.Unmarshal([]byte(typeSchemaFileContent), &typeSchemaObj)
+
+ types, err := GetJobTypesFromConfiguration(typesDir)
+
+ wantedDMaaPType := TypeDefinition{
+ Identity: "type1",
+ DMaaPTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1",
+ TypeSchema: typeSchemaObj,
+ }
+ wantedKafkaType := TypeDefinition{
+ Identity: "type2",
+ KafkaInputTopic: "TestTopic",
+ TypeSchema: typeSchemaObj,
+ }
+ wantedTypes := []TypeDefinition{wantedDMaaPType, wantedKafkaType}
assertions.EqualValues(wantedTypes, types)
assertions.Nil(err)
}
diff --git a/dmaap-mediator-producer/internal/config/registrator.go b/dmaap-mediator-producer/internal/config/registrator.go
index 83ed43f..bac14e6 100644
--- a/dmaap-mediator-producer/internal/config/registrator.go
+++ b/dmaap-mediator-producer/internal/config/registrator.go
@@ -32,11 +32,20 @@
const registerTypePath = "/data-producer/v1/info-types/"
const registerProducerPath = "/data-producer/v1/info-producers/"
-const typeSchema = `{"type": "object","properties": {},"additionalProperties": false}`
type TypeDefinition struct {
- Id string `json:"id"`
- DmaapTopicURL string `json:"dmaapTopicUrl"`
+ Identity string `json:"id"`
+ DMaaPTopicURL string `json:"dmaapTopicUrl"`
+ KafkaInputTopic string `json:"kafkaInputTopic"`
+ TypeSchema interface{}
+}
+
+func (td TypeDefinition) IsKafkaType() bool {
+ return td.KafkaInputTopic != ""
+}
+
+func (td TypeDefinition) IsDMaaPType() bool {
+ return td.DMaaPTopicURL != ""
}
type ProducerRegistrationInfo struct {
@@ -64,8 +73,8 @@
func (r RegistratorImpl) RegisterTypes(jobTypes []TypeDefinition) error {
for _, jobType := range jobTypes {
- body := fmt.Sprintf(`{"info_job_data_schema": %v}`, typeSchema)
- if error := restclient.Put(r.infoCoordinatorAddress+registerTypePath+url.PathEscape(jobType.Id), []byte(body), r.httpClient); error != nil {
+ body := fmt.Sprintf(`{"info_job_data_schema": %v}`, jobType.TypeSchema)
+ if error := restclient.Put(r.infoCoordinatorAddress+registerTypePath+url.PathEscape(jobType.Identity), []byte(body), r.httpClient); error != nil {
return error
}
log.Debugf("Registered type: %v", jobType)
diff --git a/dmaap-mediator-producer/internal/config/registrator_test.go b/dmaap-mediator-producer/internal/config/registrator_test.go
index 324aed0..d1b61d8 100644
--- a/dmaap-mediator-producer/internal/config/registrator_test.go
+++ b/dmaap-mediator-producer/internal/config/registrator_test.go
@@ -40,7 +40,8 @@
}, nil)
type1 := TypeDefinition{
- Id: "Type1",
+ Identity: "Type1",
+ TypeSchema: `{"type": "object","properties": {},"additionalProperties": false}`,
}
types := []TypeDefinition{type1}
diff --git a/dmaap-mediator-producer/internal/jobs/jobs.go b/dmaap-mediator-producer/internal/jobs/jobs.go
index 867894f..3ef5ca3 100644
--- a/dmaap-mediator-producer/internal/jobs/jobs.go
+++ b/dmaap-mediator-producer/internal/jobs/jobs.go
@@ -25,24 +25,25 @@
"sync"
"time"
+ "github.com/confluentinc/confluent-kafka-go/kafka"
log "github.com/sirupsen/logrus"
"oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
+ "oransc.org/nonrtric/dmaapmediatorproducer/internal/kafkaclient"
"oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
)
type TypeData struct {
- TypeId string `json:"id"`
- DMaaPTopicURL string `json:"dmaapTopicUrl"`
- jobsHandler *jobsHandler
+ Identity string `json:"id"`
+ jobsHandler *jobsHandler
}
type JobInfo struct {
- Owner string `json:"owner"`
- LastUpdated string `json:"last_updated"`
- InfoJobIdentity string `json:"info_job_identity"`
- TargetUri string `json:"target_uri"`
- InfoJobData interface{} `json:"info_job_data"`
- InfoTypeIdentity string `json:"info_type_identity"`
+ Owner string `json:"owner"`
+ LastUpdated string `json:"last_updated"`
+ InfoJobIdentity string `json:"info_job_identity"`
+ TargetUri string `json:"target_uri"`
+ InfoJobData Parameters `json:"info_job_data"`
+ InfoTypeIdentity string `json:"info_type_identity"`
}
type JobTypesManager interface {
@@ -59,14 +60,16 @@
allTypes map[string]TypeData
pollClient restclient.HTTPClient
mrAddress string
+ kafkaFactory kafkaclient.KafkaFactory
distributeClient restclient.HTTPClient
}
-func NewJobsManagerImpl(pollClient restclient.HTTPClient, mrAddr string, distributeClient restclient.HTTPClient) *JobsManagerImpl {
+func NewJobsManagerImpl(pollClient restclient.HTTPClient, mrAddr string, kafkaFactory kafkaclient.KafkaFactory, distributeClient restclient.HTTPClient) *JobsManagerImpl {
return &JobsManagerImpl{
allTypes: make(map[string]TypeData),
pollClient: pollClient,
mrAddress: mrAddr,
+ kafkaFactory: kafkaFactory,
distributeClient: distributeClient,
}
}
@@ -84,7 +87,7 @@
func (jm *JobsManagerImpl) DeleteJobFromRESTCall(jobId string) {
for _, typeData := range jm.allTypes {
- log.Debugf("Deleting job %v from type %v", jobId, typeData.TypeId)
+ log.Debugf("Deleting job %v from type %v", jobId, typeData.Identity)
typeData.jobsHandler.deleteJobCh <- jobId
}
log.Debug("Deleted job: ", jobId)
@@ -106,10 +109,12 @@
func (jm *JobsManagerImpl) LoadTypesFromConfiguration(types []config.TypeDefinition) []config.TypeDefinition {
for _, typeDef := range types {
- jm.allTypes[typeDef.Id] = TypeData{
- TypeId: typeDef.Id,
- DMaaPTopicURL: typeDef.DmaapTopicURL,
- jobsHandler: newJobsHandler(typeDef.Id, typeDef.DmaapTopicURL, jm.pollClient, jm.distributeClient),
+ if typeDef.DMaaPTopicURL == "" && typeDef.KafkaInputTopic == "" {
+ log.Fatal("DMaaPTopicURL or KafkaInputTopic must be defined for type: ", typeDef.Identity)
+ }
+ jm.allTypes[typeDef.Identity] = TypeData{
+ Identity: typeDef.Identity,
+ jobsHandler: newJobsHandler(typeDef, jm.mrAddress, jm.kafkaFactory, jm.pollClient, jm.distributeClient),
}
}
return types
@@ -126,7 +131,7 @@
func (jm *JobsManagerImpl) StartJobsForAllTypes() {
for _, jobType := range jm.allTypes {
- go jobType.jobsHandler.startPollingAndDistribution(jm.mrAddress)
+ go jobType.jobsHandler.startPollingAndDistribution()
}
}
@@ -134,30 +139,29 @@
type jobsHandler struct {
mu sync.Mutex
typeId string
- topicUrl string
+ pollingAgent pollingAgent
jobs map[string]job
addJobCh chan JobInfo
deleteJobCh chan string
- pollClient restclient.HTTPClient
distributeClient restclient.HTTPClient
}
-func newJobsHandler(typeId string, topicURL string, pollClient restclient.HTTPClient, distributeClient restclient.HTTPClient) *jobsHandler {
+func newJobsHandler(typeDef config.TypeDefinition, mRAddress string, kafkaFactory kafkaclient.KafkaFactory, pollClient restclient.HTTPClient, distributeClient restclient.HTTPClient) *jobsHandler {
+ pollingAgent := createPollingAgent(typeDef, mRAddress, pollClient, kafkaFactory, typeDef.KafkaInputTopic)
return &jobsHandler{
- typeId: typeId,
- topicUrl: topicURL,
+ typeId: typeDef.Identity,
+ pollingAgent: pollingAgent,
jobs: make(map[string]job),
addJobCh: make(chan JobInfo),
deleteJobCh: make(chan string),
- pollClient: pollClient,
distributeClient: distributeClient,
}
}
-func (jh *jobsHandler) startPollingAndDistribution(mRAddress string) {
+func (jh *jobsHandler) startPollingAndDistribution() {
go func() {
for {
- jh.pollAndDistributeMessages(mRAddress)
+ jh.pollAndDistributeMessages()
}
}()
@@ -168,19 +172,20 @@
}()
}
-func (jh *jobsHandler) pollAndDistributeMessages(mRAddress string) {
+func (jh *jobsHandler) pollAndDistributeMessages() {
log.Debugf("Processing jobs for type: %v", jh.typeId)
- messagesBody, error := restclient.Get(mRAddress+jh.topicUrl, jh.pollClient)
+ messagesBody, error := jh.pollingAgent.pollMessages()
if error != nil {
- log.Warn("Error getting data from MR. Cause: ", error)
- time.Sleep(time.Minute) // Must wait before trying to call MR again
+ log.Warn("Error getting data from source. Cause: ", error)
+ time.Sleep(time.Minute) // Must wait before trying to call data source again
+ return
}
- log.Debug("Received messages: ", string(messagesBody))
jh.distributeMessages(messagesBody)
}
func (jh *jobsHandler) distributeMessages(messages []byte) {
- if len(messages) > 2 {
+ if string(messages) != "[]" && len(messages) > 0 { // MR returns an ampty array if there are no messages.
+ log.Debug("Distributing messages: ", string(messages))
jh.mu.Lock()
defer jh.mu.Unlock()
for _, job := range jh.jobs {
@@ -234,6 +239,61 @@
jh.mu.Unlock()
}
+type pollingAgent interface {
+ pollMessages() ([]byte, error)
+}
+
+func createPollingAgent(typeDef config.TypeDefinition, mRAddress string, pollClient restclient.HTTPClient, kafkaFactory kafkaclient.KafkaFactory, topicID string) pollingAgent {
+ if typeDef.DMaaPTopicURL != "" {
+ return dMaaPPollingAgent{
+ messageRouterURL: mRAddress + typeDef.DMaaPTopicURL,
+ pollClient: pollClient,
+ }
+ } else {
+ return newKafkaPollingAgent(kafkaFactory, typeDef.KafkaInputTopic)
+ }
+}
+
+type dMaaPPollingAgent struct {
+ messageRouterURL string
+ pollClient restclient.HTTPClient
+}
+
+func (pa dMaaPPollingAgent) pollMessages() ([]byte, error) {
+ return restclient.Get(pa.messageRouterURL, pa.pollClient)
+}
+
+type kafkaPollingAgent struct {
+ kafkaClient kafkaclient.KafkaClient
+}
+
+func newKafkaPollingAgent(kafkaFactory kafkaclient.KafkaFactory, topicID string) kafkaPollingAgent {
+ c, err := kafkaclient.NewKafkaClient(kafkaFactory, topicID)
+ if err != nil {
+ log.Fatalf("Cannot create Kafka client for topic: %v, error details: %v\n", topicID, err)
+ }
+ return kafkaPollingAgent{
+ kafkaClient: c,
+ }
+}
+
+func (pa kafkaPollingAgent) pollMessages() ([]byte, error) {
+ msg, err := pa.kafkaClient.ReadMessage()
+ if err == nil {
+ return msg, nil
+ } else {
+ if isKafkaTimedOutError(err) {
+ return []byte(""), nil
+ }
+ return nil, err
+ }
+}
+
+func isKafkaTimedOutError(err error) bool {
+ kafkaErr, ok := err.(kafka.Error)
+ return ok && kafkaErr.Code() == kafka.ErrTimedOut
+}
+
type job struct {
jobInfo JobInfo
client restclient.HTTPClient
@@ -242,6 +302,7 @@
}
func newJob(j JobInfo, c restclient.HTTPClient) job {
+
return job{
jobInfo: j,
client: c,
@@ -250,7 +311,24 @@
}
}
+type Parameters struct {
+ BufferTimeout BufferTimeout `json:"bufferTimeout"`
+}
+
+type BufferTimeout struct {
+ MaxSize int `json:"maxSize"`
+ MaxTimeMiliseconds int64 `json:"maxTimeMiliseconds"`
+}
+
func (j *job) start() {
+ if j.jobInfo.InfoJobData.BufferTimeout.MaxSize == 0 {
+ j.startReadingSingleMessages()
+ } else {
+ j.startReadingMessagesBuffered()
+ }
+}
+
+func (j *job) startReadingSingleMessages() {
out:
for {
select {
@@ -263,10 +341,68 @@
}
}
+func (j *job) startReadingMessagesBuffered() {
+out:
+ for {
+ select {
+ case <-j.controlChannel:
+ log.Debug("Stop distribution for job: ", j.jobInfo.InfoJobIdentity)
+ break out
+ default:
+ msgs := j.read(j.jobInfo.InfoJobData.BufferTimeout)
+ if len(msgs) > 0 {
+ j.sendMessagesToConsumer(msgs)
+ }
+ }
+ }
+}
+
+func (j *job) read(bufferParams BufferTimeout) []byte {
+ wg := sync.WaitGroup{}
+ wg.Add(bufferParams.MaxSize)
+ var msgs []byte
+ c := make(chan struct{})
+ go func() {
+ i := 0
+ out:
+ for {
+ select {
+ case <-c:
+ break out
+ case msg := <-j.messagesChannel:
+ i++
+ msgs = append(msgs, msg...)
+ wg.Done()
+ if i == bufferParams.MaxSize {
+ break out
+ }
+ }
+ }
+ }()
+ j.waitTimeout(&wg, time.Duration(bufferParams.MaxTimeMiliseconds)*time.Millisecond)
+ close(c)
+ return msgs
+}
+
+func (j *job) waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
+ c := make(chan struct{})
+ go func() {
+ defer close(c)
+ wg.Wait()
+ }()
+ select {
+ case <-c:
+ return false // completed normally
+ case <-time.After(timeout):
+ return true // timed out
+ }
+}
+
func (j *job) sendMessagesToConsumer(messages []byte) {
log.Debug("Processing job: ", j.jobInfo.InfoJobIdentity)
if postErr := restclient.Post(j.jobInfo.TargetUri, messages, j.client); postErr != nil {
log.Warnf("Error posting data for job: %v. Cause: %v", j.jobInfo, postErr)
+ return
}
log.Debugf("Messages for job: %v distributed to consumer: %v", j.jobInfo.InfoJobIdentity, j.jobInfo.Owner)
}
diff --git a/dmaap-mediator-producer/internal/jobs/jobs_test.go b/dmaap-mediator-producer/internal/jobs/jobs_test.go
index 30b4ffd..ab1165c 100644
--- a/dmaap-mediator-producer/internal/jobs/jobs_test.go
+++ b/dmaap-mediator-producer/internal/jobs/jobs_test.go
@@ -22,52 +22,60 @@
import (
"bytes"
+ "fmt"
"io/ioutil"
"net/http"
+ "strconv"
"sync"
"testing"
"time"
+ "github.com/confluentinc/confluent-kafka-go/kafka"
+ "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
+ "oransc.org/nonrtric/dmaapmediatorproducer/internal/kafkaclient"
+ "oransc.org/nonrtric/dmaapmediatorproducer/mocks"
)
-const typeDefinition = `{"types": [{"id": "type1", "dmaapTopicUrl": "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1"}]}`
-
-func TestJobsManagerGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) {
+func TestJobsManagerGetTypes_shouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) {
assertions := require.New(t)
- managerUnderTest := NewJobsManagerImpl(nil, "", nil)
+ managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil)
- wantedType := config.TypeDefinition{
- Id: "type1",
- DmaapTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1",
+ wantedDMaaPType := config.TypeDefinition{
+ Identity: "type1",
+ DMaaPTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1",
}
- wantedTypes := []config.TypeDefinition{wantedType}
+ wantedKafkaType := config.TypeDefinition{
+ Identity: "type2",
+ KafkaInputTopic: "topic",
+ }
+ wantedTypes := []config.TypeDefinition{wantedDMaaPType, wantedKafkaType}
types := managerUnderTest.LoadTypesFromConfiguration(wantedTypes)
assertions.EqualValues(wantedTypes, types)
supportedTypes := managerUnderTest.GetSupportedTypes()
- assertions.EqualValues([]string{"type1"}, supportedTypes)
+ assertions.ElementsMatch([]string{"type1", "type2"}, supportedTypes)
}
func TestJobsManagerAddJobWhenTypeIsSupported_shouldAddJobToChannel(t *testing.T) {
assertions := require.New(t)
- managerUnderTest := NewJobsManagerImpl(nil, "", nil)
+ managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil)
wantedJob := JobInfo{
Owner: "owner",
LastUpdated: "now",
InfoJobIdentity: "job1",
TargetUri: "target",
- InfoJobData: "{}",
+ InfoJobData: Parameters{},
InfoTypeIdentity: "type1",
}
jobsHandler := jobsHandler{
addJobCh: make(chan JobInfo)}
managerUnderTest.allTypes["type1"] = TypeData{
- TypeId: "type1",
+ Identity: "type1",
jobsHandler: &jobsHandler,
}
@@ -83,7 +91,7 @@
func TestJobsManagerAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) {
assertions := require.New(t)
- managerUnderTest := NewJobsManagerImpl(nil, "", nil)
+ managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil)
jobInfo := JobInfo{
InfoTypeIdentity: "type1",
}
@@ -95,9 +103,9 @@
func TestJobsManagerAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) {
assertions := require.New(t)
- managerUnderTest := NewJobsManagerImpl(nil, "", nil)
+ managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil)
managerUnderTest.allTypes["type1"] = TypeData{
- TypeId: "type1",
+ Identity: "type1",
}
jobInfo := JobInfo{
@@ -105,14 +113,14 @@
}
err := managerUnderTest.AddJobFromRESTCall(jobInfo)
assertions.NotNil(err)
- assertions.Equal("missing required job identity: { <nil> type1}", err.Error())
+ assertions.Equal("missing required job identity: { {{0 0}} type1}", err.Error())
}
func TestJobsManagerAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) {
assertions := require.New(t)
- managerUnderTest := NewJobsManagerImpl(nil, "", nil)
+ managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil)
managerUnderTest.allTypes["type1"] = TypeData{
- TypeId: "type1",
+ Identity: "type1",
}
jobInfo := JobInfo{
@@ -121,16 +129,16 @@
}
err := managerUnderTest.AddJobFromRESTCall(jobInfo)
assertions.NotNil(err)
- assertions.Equal("missing required target URI: { job1 <nil> type1}", err.Error())
+ assertions.Equal("missing required target URI: { job1 {{0 0}} type1}", err.Error())
}
func TestJobsManagerDeleteJob_shouldSendDeleteToChannel(t *testing.T) {
assertions := require.New(t)
- managerUnderTest := NewJobsManagerImpl(nil, "", nil)
+ managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil)
jobsHandler := jobsHandler{
deleteJobCh: make(chan string)}
managerUnderTest.allTypes["type1"] = TypeData{
- TypeId: "type1",
+ Identity: "type1",
jobsHandler: &jobsHandler,
}
@@ -139,21 +147,21 @@
assertions.Equal("job2", <-jobsHandler.deleteJobCh)
}
-func TestAddJobToJobsManager_shouldStartPollAndDistributeMessages(t *testing.T) {
+func TestStartJobsManagerAddDMaaPJob_shouldStartPollAndDistributeMessages(t *testing.T) {
assertions := require.New(t)
called := false
- messages := `[{"message": {"data": "data"}}]`
+ dMaaPMessages := `[{"message": {"data": "dmaap"}}]`
pollClientMock := NewTestClient(func(req *http.Request) *http.Response {
if req.URL.String() == "http://mrAddr/topicUrl" {
assertions.Equal(req.Method, "GET")
body := "[]"
if !called {
called = true
- body = messages
+ body = dMaaPMessages
}
return &http.Response{
- StatusCode: 200,
+ StatusCode: http.StatusOK,
Body: ioutil.NopCloser(bytes.NewReader([]byte(body))),
Header: make(http.Header), // Must be set to non-nil value or it panics
}
@@ -165,9 +173,9 @@
wg := sync.WaitGroup{}
distributeClientMock := NewTestClient(func(req *http.Request) *http.Response {
- if req.URL.String() == "http://consumerHost/target" {
+ if req.URL.String() == "http://consumerHost/dmaaptarget" {
assertions.Equal(req.Method, "POST")
- assertions.Equal(messages, getBodyAsString(req, t))
+ assertions.Equal(dMaaPMessages, getBodyAsString(req, t))
assertions.Equal("application/json", req.Header.Get("Content-Type"))
wg.Done()
return &http.Response{
@@ -180,25 +188,88 @@
t.Fail()
return nil
})
- jobsHandler := newJobsHandler("type1", "/topicUrl", pollClientMock, distributeClientMock)
-
- jobsManager := NewJobsManagerImpl(pollClientMock, "http://mrAddr", distributeClientMock)
- jobsManager.allTypes["type1"] = TypeData{
+ dMaaPTypeDef := config.TypeDefinition{
+ Identity: "type1",
DMaaPTopicURL: "/topicUrl",
- TypeId: "type1",
- jobsHandler: jobsHandler,
+ }
+ dMaaPJobsHandler := newJobsHandler(dMaaPTypeDef, "http://mrAddr", nil, pollClientMock, distributeClientMock)
+
+ jobsManager := NewJobsManagerImpl(pollClientMock, "http://mrAddr", kafkaclient.KafkaFactoryImpl{}, distributeClientMock)
+ jobsManager.allTypes["type1"] = TypeData{
+ Identity: "type1",
+ jobsHandler: dMaaPJobsHandler,
+ }
+ jobsManager.StartJobsForAllTypes()
+
+ dMaaPJobInfo := JobInfo{
+ InfoTypeIdentity: "type1",
+ InfoJobIdentity: "job1",
+ TargetUri: "http://consumerHost/dmaaptarget",
+ }
+
+ wg.Add(1) // Wait till the distribution has happened
+ err := jobsManager.AddJobFromRESTCall(dMaaPJobInfo)
+ assertions.Nil(err)
+
+ if waitTimeout(&wg, 2*time.Second) {
+ t.Error("Not all calls to server were made")
+ t.Fail()
+ }
+}
+
+func TestStartJobsManagerAddKafkaJob_shouldStartPollAndDistributeMessages(t *testing.T) {
+ assertions := require.New(t)
+
+ kafkaMessages := `1`
+ wg := sync.WaitGroup{}
+ distributeClientMock := NewTestClient(func(req *http.Request) *http.Response {
+ if req.URL.String() == "http://consumerHost/kafkatarget" {
+ assertions.Equal(req.Method, "POST")
+ assertions.Equal(kafkaMessages, getBodyAsString(req, t))
+ assertions.Equal("application/json", req.Header.Get("Content-Type"))
+ wg.Done()
+ return &http.Response{
+ StatusCode: 200,
+ Body: ioutil.NopCloser(bytes.NewBufferString(`OK`)),
+ Header: make(http.Header), // Must be set to non-nil value or it panics
+ }
+ }
+ t.Error("Wrong call to client: ", req)
+ t.Fail()
+ return nil
+ })
+
+ kafkaTypeDef := config.TypeDefinition{
+ Identity: "type2",
+ KafkaInputTopic: "topic",
+ }
+ kafkaFactoryMock := mocks.KafkaFactory{}
+ kafkaConsumerMock := mocks.KafkaConsumer{}
+ kafkaConsumerMock.On("Commit").Return([]kafka.TopicPartition{}, error(nil))
+ kafkaConsumerMock.On("Subscribe", mock.Anything).Return(error(nil))
+ kafkaConsumerMock.On("ReadMessage", mock.Anything).Return(&kafka.Message{
+ Value: []byte(kafkaMessages),
+ }, error(nil)).Once()
+ kafkaConsumerMock.On("ReadMessage", mock.Anything).Return(nil, fmt.Errorf("Just to stop"))
+ kafkaFactoryMock.On("NewKafkaConsumer", mock.Anything).Return(kafkaConsumerMock, nil)
+ kafkaJobsHandler := newJobsHandler(kafkaTypeDef, "", kafkaFactoryMock, nil, distributeClientMock)
+
+ jobsManager := NewJobsManagerImpl(nil, "", kafkaFactoryMock, distributeClientMock)
+ jobsManager.allTypes["type2"] = TypeData{
+ Identity: "type2",
+ jobsHandler: kafkaJobsHandler,
}
jobsManager.StartJobsForAllTypes()
- jobInfo := JobInfo{
- InfoTypeIdentity: "type1",
- InfoJobIdentity: "job1",
- TargetUri: "http://consumerHost/target",
+ kafkaJobInfo := JobInfo{
+ InfoTypeIdentity: "type2",
+ InfoJobIdentity: "job2",
+ TargetUri: "http://consumerHost/kafkatarget",
}
wg.Add(1) // Wait till the distribution has happened
- err := jobsManager.AddJobFromRESTCall(jobInfo)
+ err := jobsManager.AddJobFromRESTCall(kafkaJobInfo)
assertions.Nil(err)
if waitTimeout(&wg, 2*time.Second) {
@@ -210,7 +281,11 @@
func TestJobsHandlerDeleteJob_shouldDeleteJobFromJobsMap(t *testing.T) {
jobToDelete := newJob(JobInfo{}, nil)
go jobToDelete.start()
- jobsHandler := newJobsHandler("type1", "/topicUrl", nil, nil)
+ typeDef := config.TypeDefinition{
+ Identity: "type1",
+ DMaaPTopicURL: "/topicUrl",
+ }
+ jobsHandler := newJobsHandler(typeDef, "http://mrAddr", kafkaclient.KafkaFactoryImpl{}, nil, nil)
jobsHandler.jobs["job1"] = jobToDelete
go jobsHandler.monitorManagementChannels()
@@ -233,7 +308,11 @@
InfoJobIdentity: "job",
}, nil)
- jobsHandler := newJobsHandler("type1", "/topicUrl", nil, nil)
+ typeDef := config.TypeDefinition{
+ Identity: "type1",
+ DMaaPTopicURL: "/topicUrl",
+ }
+ jobsHandler := newJobsHandler(typeDef, "http://mrAddr", kafkaclient.KafkaFactoryImpl{}, nil, nil)
jobsHandler.jobs["job1"] = job
fillMessagesBuffer(job.messagesChannel)
@@ -243,6 +322,143 @@
require.New(t).Len(job.messagesChannel, 0)
}
+func TestKafkaPollingAgentTimedOut_shouldResultInEMptyMessages(t *testing.T) {
+ assertions := require.New(t)
+
+ kafkaFactoryMock := mocks.KafkaFactory{}
+ kafkaConsumerMock := mocks.KafkaConsumer{}
+ kafkaConsumerMock.On("Commit").Return([]kafka.TopicPartition{}, error(nil))
+ kafkaConsumerMock.On("Subscribe", mock.Anything).Return(error(nil))
+ kafkaConsumerMock.On("ReadMessage", mock.Anything).Return(nil, kafka.NewError(kafka.ErrTimedOut, "", false))
+ kafkaFactoryMock.On("NewKafkaConsumer", mock.Anything).Return(kafkaConsumerMock, nil)
+
+ pollingAgentUnderTest := newKafkaPollingAgent(kafkaFactoryMock, "")
+ messages, err := pollingAgentUnderTest.pollMessages()
+
+ assertions.Equal([]byte(""), messages)
+ assertions.Nil(err)
+}
+
+func TestJobWithoutParameters_shouldSendOneMessageAtATime(t *testing.T) {
+ assertions := require.New(t)
+
+ wg := sync.WaitGroup{}
+ messageNo := 1
+ distributeClientMock := NewTestClient(func(req *http.Request) *http.Response {
+ if req.URL.String() == "http://consumerHost/target" {
+ assertions.Equal(req.Method, "POST")
+ assertions.Equal(fmt.Sprint("message", messageNo), getBodyAsString(req, t))
+ messageNo++
+ assertions.Equal("application/json", req.Header.Get("Content-Type"))
+ wg.Done()
+ return &http.Response{
+ StatusCode: 200,
+ Body: ioutil.NopCloser(bytes.NewBufferString(`OK`)),
+ Header: make(http.Header), // Must be set to non-nil value or it panics
+ }
+ }
+ t.Error("Wrong call to client: ", req)
+ t.Fail()
+ return nil
+ })
+
+ jobUnderTest := newJob(JobInfo{TargetUri: "http://consumerHost/target"}, distributeClientMock)
+
+ wg.Add(2)
+ go jobUnderTest.start()
+
+ jobUnderTest.messagesChannel <- []byte("message1")
+ jobUnderTest.messagesChannel <- []byte("message2")
+
+ if waitTimeout(&wg, 2*time.Second) {
+ t.Error("Not all calls to server were made")
+ t.Fail()
+ }
+}
+
+func TestJobWithBufferedParameters_shouldSendMessagesTogether(t *testing.T) {
+ assertions := require.New(t)
+
+ wg := sync.WaitGroup{}
+ distributeClientMock := NewTestClient(func(req *http.Request) *http.Response {
+ if req.URL.String() == "http://consumerHost/target" {
+ assertions.Equal(req.Method, "POST")
+ assertions.Equal("12", getBodyAsString(req, t))
+ assertions.Equal("application/json", req.Header.Get("Content-Type"))
+ wg.Done()
+ return &http.Response{
+ StatusCode: 200,
+ Body: ioutil.NopCloser(bytes.NewBufferString(`OK`)),
+ Header: make(http.Header), // Must be set to non-nil value or it panics
+ }
+ }
+ t.Error("Wrong call to client: ", req)
+ t.Fail()
+ return nil
+ })
+
+ jobUnderTest := newJob(JobInfo{
+ TargetUri: "http://consumerHost/target",
+ InfoJobData: Parameters{
+ BufferTimeout: BufferTimeout{
+ MaxSize: 5,
+ MaxTimeMiliseconds: 200,
+ },
+ },
+ }, distributeClientMock)
+
+ wg.Add(1)
+ go jobUnderTest.start()
+
+ go func() {
+ jobUnderTest.messagesChannel <- []byte("1")
+ jobUnderTest.messagesChannel <- []byte("2")
+ }()
+
+ if waitTimeout(&wg, 2*time.Second) {
+ t.Error("Not all calls to server were made")
+ t.Fail()
+ }
+}
+
+func TestJobReadMoreThanBufferSizeMessages_shouldOnlyReturnMaxSizeNoOfMessages(t *testing.T) {
+ assertions := require.New(t)
+
+ jobUnderTest := newJob(JobInfo{}, nil)
+
+ go func() {
+ for i := 0; i < 4; i++ {
+ jobUnderTest.messagesChannel <- []byte(strconv.Itoa(i))
+ }
+ }()
+
+ msgs := jobUnderTest.read(BufferTimeout{
+ MaxSize: 2,
+ MaxTimeMiliseconds: 200,
+ })
+
+ assertions.Equal([]byte("01"), msgs)
+}
+func TestJobReadBufferedWhenTimeout_shouldOnlyReturnMessagesSentBeforeTimeout(t *testing.T) {
+ assertions := require.New(t)
+
+ jobUnderTest := newJob(JobInfo{}, nil)
+
+ go func() {
+ for i := 0; i < 4; i++ {
+ time.Sleep(10 * time.Millisecond)
+ jobUnderTest.messagesChannel <- []byte(strconv.Itoa(i))
+ }
+ }()
+
+ msgs := jobUnderTest.read(BufferTimeout{
+ MaxSize: 2,
+ MaxTimeMiliseconds: 30,
+ })
+
+ assertions.Equal([]byte("01"), msgs)
+}
+
func fillMessagesBuffer(mc chan []byte) {
for i := 0; i < cap(mc); i++ {
mc <- []byte("msg")
diff --git a/dmaap-mediator-producer/internal/kafkaclient/kafkaclient.go b/dmaap-mediator-producer/internal/kafkaclient/kafkaclient.go
new file mode 100644
index 0000000..16abcb4
--- /dev/null
+++ b/dmaap-mediator-producer/internal/kafkaclient/kafkaclient.go
@@ -0,0 +1,94 @@
+// -
+// ========================LICENSE_START=================================
+// O-RAN-SC
+// %%
+// Copyright (C) 2021: Nordix Foundation
+// %%
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ========================LICENSE_END===================================
+//
+
+package kafkaclient
+
+import (
+ "time"
+
+ "github.com/confluentinc/confluent-kafka-go/kafka"
+)
+
+type KafkaFactory interface {
+ NewKafkaConsumer(topicID string) (KafkaConsumer, error)
+}
+
+type KafkaFactoryImpl struct {
+ BootstrapServer string
+}
+
+func (kf KafkaFactoryImpl) NewKafkaConsumer(topicID string) (KafkaConsumer, error) {
+ consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
+ "bootstrap.servers": kf.BootstrapServer,
+ "group.id": "dmaap-mediator-producer",
+ "auto.offset.reset": "earliest",
+ })
+ if err != nil {
+ return nil, err
+ }
+ return KafkaConsumerImpl{consumer: consumer}, nil
+}
+
+func NewKafkaClient(factory KafkaFactory, topicID string) (KafkaClient, error) {
+ consumer, err := factory.NewKafkaConsumer(topicID)
+ if err != nil {
+ return KafkaClient{}, err
+ }
+ consumer.Commit()
+ err = consumer.Subscribe(topicID)
+ if err != nil {
+ return KafkaClient{}, err
+ }
+ return KafkaClient{consumer: consumer}, nil
+}
+
+type KafkaClient struct {
+ consumer KafkaConsumer
+}
+
+func (kc KafkaClient) ReadMessage() ([]byte, error) {
+ msg, err := kc.consumer.ReadMessage(time.Second)
+ if err != nil {
+ return nil, err
+ }
+ return msg.Value, nil
+}
+
+type KafkaConsumer interface {
+ Commit() ([]kafka.TopicPartition, error)
+ Subscribe(topic string) (err error)
+ ReadMessage(timeout time.Duration) (*kafka.Message, error)
+}
+
+type KafkaConsumerImpl struct {
+ consumer *kafka.Consumer
+}
+
+func (kc KafkaConsumerImpl) Commit() ([]kafka.TopicPartition, error) {
+ return kc.consumer.Commit()
+}
+
+func (kc KafkaConsumerImpl) Subscribe(topic string) error {
+ return kc.consumer.Subscribe(topic, nil)
+}
+
+func (kc KafkaConsumerImpl) ReadMessage(timeout time.Duration) (*kafka.Message, error) {
+ return kc.consumer.ReadMessage(timeout)
+}
diff --git a/dmaap-mediator-producer/internal/server/server_test.go b/dmaap-mediator-producer/internal/server/server_test.go
index 6248c22..6fe4d7a 100644
--- a/dmaap-mediator-producer/internal/server/server_test.go
+++ b/dmaap-mediator-producer/internal/server/server_test.go
@@ -34,7 +34,7 @@
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
- "oransc.org/nonrtric/dmaapmediatorproducer/mocks/jobhandler"
+ "oransc.org/nonrtric/dmaapmediatorproducer/mocks/jobshandler"
)
func TestNewRouter(t *testing.T) {
@@ -88,7 +88,7 @@
assertions.Equal("/admin/log", path)
}
-func TestAddInfoJobHandler(t *testing.T) {
+func TestAddInfoJobToJobsHandler(t *testing.T) {
assertions := require.New(t)
type args struct {
@@ -102,21 +102,21 @@
wantedBody string
}{
{
- name: "AddInfoJobHandler with correct job, should return OK",
+ name: "AddInfoJobToJobsHandler with correct job, should return OK",
args: args{
job: jobs.JobInfo{
Owner: "owner",
LastUpdated: "now",
InfoJobIdentity: "jobId",
TargetUri: "target",
- InfoJobData: "{}",
+ InfoJobData: jobs.Parameters{},
InfoTypeIdentity: "type",
},
},
wantedStatus: http.StatusOK,
},
{
- name: "AddInfoJobHandler with incorrect job info, should return BadRequest",
+ name: "AddInfoJobToJobsHandler with incorrect job info, should return BadRequest",
args: args{
job: jobs.JobInfo{
Owner: "bad",
@@ -129,10 +129,10 @@
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- jobHandlerMock := jobhandler.JobHandler{}
- jobHandlerMock.On("AddJobFromRESTCall", tt.args.job).Return(tt.args.mockReturn)
+ jobsHandlerMock := jobshandler.JobsHandler{}
+ jobsHandlerMock.On("AddJobFromRESTCall", tt.args.job).Return(tt.args.mockReturn)
- callbackHandlerUnderTest := NewProducerCallbackHandler(&jobHandlerMock)
+ callbackHandlerUnderTest := NewProducerCallbackHandler(&jobsHandlerMock)
handler := http.HandlerFunc(callbackHandlerUnderTest.addInfoJobHandler)
responseRecorder := httptest.NewRecorder()
@@ -142,17 +142,17 @@
assertions.Equal(tt.wantedStatus, responseRecorder.Code, tt.name)
assertions.Contains(responseRecorder.Body.String(), tt.wantedBody, tt.name)
- jobHandlerMock.AssertCalled(t, "AddJobFromRESTCall", tt.args.job)
+ jobsHandlerMock.AssertCalled(t, "AddJobFromRESTCall", tt.args.job)
})
}
}
func TestDeleteJob(t *testing.T) {
assertions := require.New(t)
- jobHandlerMock := jobhandler.JobHandler{}
- jobHandlerMock.On("DeleteJobFromRESTCall", mock.Anything).Return(nil)
+ jobsHandlerMock := jobshandler.JobsHandler{}
+ jobsHandlerMock.On("DeleteJobFromRESTCall", mock.Anything).Return(nil)
- callbackHandlerUnderTest := NewProducerCallbackHandler(&jobHandlerMock)
+ callbackHandlerUnderTest := NewProducerCallbackHandler(&jobsHandlerMock)
responseRecorder := httptest.NewRecorder()
r := mux.SetURLVars(newRequest(http.MethodDelete, "/jobs/", nil, t), map[string]string{"infoJobId": "job1"})
@@ -162,7 +162,7 @@
assertions.Equal("", responseRecorder.Body.String())
- jobHandlerMock.AssertCalled(t, "DeleteJobFromRESTCall", "job1")
+ jobsHandlerMock.AssertCalled(t, "DeleteJobFromRESTCall", "job1")
}
func TestSetLogLevel(t *testing.T) {
diff --git a/dmaap-mediator-producer/main.go b/dmaap-mediator-producer/main.go
index 1aabdda..819ffa9 100644
--- a/dmaap-mediator-producer/main.go
+++ b/dmaap-mediator-producer/main.go
@@ -29,6 +29,7 @@
log "github.com/sirupsen/logrus"
"oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
"oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
+ "oransc.org/nonrtric/dmaapmediatorproducer/internal/kafkaclient"
"oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
"oransc.org/nonrtric/dmaapmediatorproducer/internal/server"
)
@@ -55,9 +56,12 @@
} else {
log.Fatalf("Stopping producer due to error: %v", err)
}
- retryClient := restclient.CreateRetryClient(cert)
- jobsManager := jobs.NewJobsManagerImpl(retryClient, configuration.DMaaPMRAddress, restclient.CreateClientWithoutRetry(cert, 10*time.Second))
+ retryClient := restclient.CreateRetryClient(cert)
+ kafkaFactory := kafkaclient.KafkaFactoryImpl{BootstrapServer: configuration.KafkaBootstrapServers}
+ distributionClient := restclient.CreateClientWithoutRetry(cert, 10*time.Second)
+
+ jobsManager := jobs.NewJobsManagerImpl(retryClient, configuration.DMaaPMRAddress, kafkaFactory, distributionClient)
go startCallbackServer(jobsManager, callbackAddress)
if err := registerTypesAndProducer(jobsManager, configuration.InfoCoordinatorAddress, callbackAddress, retryClient); err != nil {
@@ -78,11 +82,14 @@
if configuration.ProducerCertPath == "" || configuration.ProducerKeyPath == "" {
return fmt.Errorf("missing PRODUCER_CERT and/or PRODUCER_KEY")
}
+ if configuration.DMaaPMRAddress == "" && configuration.KafkaBootstrapServers == "" {
+ return fmt.Errorf("at least one of DMAAP_MR_ADDR or KAFKA_BOOTSRAP_SERVERS must be provided")
+ }
return nil
}
func registerTypesAndProducer(jobTypesHandler jobs.JobTypesManager, infoCoordinatorAddress string, callbackAddress string, client restclient.HTTPClient) error {
registrator := config.NewRegistratorImpl(infoCoordinatorAddress, client)
- configTypes, err := config.GetJobTypesFromConfiguration("configs/type_config.json")
+ configTypes, err := config.GetJobTypesFromConfiguration("configs")
if err != nil {
return fmt.Errorf("unable to register all types due to: %v", err)
}
diff --git a/dmaap-mediator-producer/mocks/KafkaConsumer.go b/dmaap-mediator-producer/mocks/KafkaConsumer.go
new file mode 100644
index 0000000..8ae0893
--- /dev/null
+++ b/dmaap-mediator-producer/mocks/KafkaConsumer.go
@@ -0,0 +1,76 @@
+// Code generated by mockery v1.0.0. DO NOT EDIT.
+
+package mocks
+
+import (
+ kafka "github.com/confluentinc/confluent-kafka-go/kafka"
+
+ mock "github.com/stretchr/testify/mock"
+
+ time "time"
+)
+
+// KafkaConsumer is an autogenerated mock type for the KafkaConsumer type
+type KafkaConsumer struct {
+ mock.Mock
+}
+
+// Commit provides a mock function with given fields:
+func (_m KafkaConsumer) Commit() ([]kafka.TopicPartition, error) {
+ ret := _m.Called()
+
+ var r0 []kafka.TopicPartition
+ if rf, ok := ret.Get(0).(func() []kafka.TopicPartition); ok {
+ r0 = rf()
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).([]kafka.TopicPartition)
+ }
+ }
+
+ var r1 error
+ if rf, ok := ret.Get(1).(func() error); ok {
+ r1 = rf()
+ } else {
+ r1 = ret.Error(1)
+ }
+
+ return r0, r1
+}
+
+// ReadMessage provides a mock function with given fields: timeout
+func (_m KafkaConsumer) ReadMessage(timeout time.Duration) (*kafka.Message, error) {
+ ret := _m.Called(timeout)
+
+ var r0 *kafka.Message
+ if rf, ok := ret.Get(0).(func(time.Duration) *kafka.Message); ok {
+ r0 = rf(timeout)
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(*kafka.Message)
+ }
+ }
+
+ var r1 error
+ if rf, ok := ret.Get(1).(func(time.Duration) error); ok {
+ r1 = rf(timeout)
+ } else {
+ r1 = ret.Error(1)
+ }
+
+ return r0, r1
+}
+
+// Subscribe provides a mock function with given fields: topic
+func (_m KafkaConsumer) Subscribe(topic string) error {
+ ret := _m.Called(topic)
+
+ var r0 error
+ if rf, ok := ret.Get(0).(func(string) error); ok {
+ r0 = rf(topic)
+ } else {
+ r0 = ret.Error(0)
+ }
+
+ return r0
+}
diff --git a/dmaap-mediator-producer/mocks/KafkaFactory.go b/dmaap-mediator-producer/mocks/KafkaFactory.go
new file mode 100644
index 0000000..f05457a
--- /dev/null
+++ b/dmaap-mediator-producer/mocks/KafkaFactory.go
@@ -0,0 +1,36 @@
+// Code generated by mockery v1.0.0. DO NOT EDIT.
+
+package mocks
+
+import (
+ mock "github.com/stretchr/testify/mock"
+ "oransc.org/nonrtric/dmaapmediatorproducer/internal/kafkaclient"
+)
+
+// KafkaFactory is an autogenerated mock type for the KafkaFactory type
+type KafkaFactory struct {
+ mock.Mock
+}
+
+// NewKafkaConsumer provides a mock function with given fields: topicID
+func (_m KafkaFactory) NewKafkaConsumer(topicID string) (kafkaclient.KafkaConsumer, error) {
+ ret := _m.Called(topicID)
+
+ var r0 kafkaclient.KafkaConsumer
+ if rf, ok := ret.Get(0).(func(string) kafkaclient.KafkaConsumer); ok {
+ r0 = rf(topicID)
+ } else {
+ if ret.Get(0) != nil {
+ r0 = ret.Get(0).(kafkaclient.KafkaConsumer)
+ }
+ }
+
+ var r1 error
+ if rf, ok := ret.Get(1).(func(string) error); ok {
+ r1 = rf(topicID)
+ } else {
+ r1 = ret.Error(1)
+ }
+
+ return r0, r1
+}
diff --git a/dmaap-mediator-producer/mocks/jobhandler/JobHandler.go b/dmaap-mediator-producer/mocks/jobshandler/JobsHandler.go
similarity index 66%
rename from dmaap-mediator-producer/mocks/jobhandler/JobHandler.go
rename to dmaap-mediator-producer/mocks/jobshandler/JobsHandler.go
index ad20752..271590f 100644
--- a/dmaap-mediator-producer/mocks/jobhandler/JobHandler.go
+++ b/dmaap-mediator-producer/mocks/jobshandler/JobsHandler.go
@@ -1,19 +1,19 @@
// Code generated by mockery v2.9.3. DO NOT EDIT.
-package jobhandler
+package jobshandler
import (
mock "github.com/stretchr/testify/mock"
jobs "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
)
-// JobHandler is an autogenerated mock type for the JobHandler type
-type JobHandler struct {
+// JobsHandler is an autogenerated mock type for the JobsHandler type
+type JobsHandler struct {
mock.Mock
}
// AddJob provides a mock function with given fields: _a0
-func (_m *JobHandler) AddJobFromRESTCall(_a0 jobs.JobInfo) error {
+func (_m *JobsHandler) AddJobFromRESTCall(_a0 jobs.JobInfo) error {
ret := _m.Called(_a0)
var r0 error
@@ -27,6 +27,6 @@
}
// DeleteJob provides a mock function with given fields: jobId
-func (_m *JobHandler) DeleteJobFromRESTCall(jobId string) {
+func (_m *JobsHandler) DeleteJobFromRESTCall(jobId string) {
_m.Called(jobId)
}