Register producer DMaaP mediator producer
Issue-ID: NONRTRIC-584
Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
Change-Id: I7f9d8e6a2f68d13e91706722d171b5f6874bae78
diff --git a/dmaap-mediator-producer/.gitignore b/dmaap-mediator-producer/.gitignore
index 567963e..0d08f66 100644
--- a/dmaap-mediator-producer/.gitignore
+++ b/dmaap-mediator-producer/.gitignore
@@ -3,3 +3,4 @@
coverage.*
main
dmaapmediatorproducer
+__debug_bin*
diff --git a/dmaap-mediator-producer/internal/config/config.go b/dmaap-mediator-producer/internal/config/config.go
index 6969f9f..a3e3a11 100644
--- a/dmaap-mediator-producer/internal/config/config.go
+++ b/dmaap-mediator-producer/internal/config/config.go
@@ -25,16 +25,24 @@
)
type Config struct {
- LogLevel string
- JobResultUri string
- InfoCoordinatorAddress string
+ LogLevel string
+ InfoJobCallbackUrl string
+ InfoCoordinatorAddress string
+ InfoProducerSupervisionCallbackUrl string
+}
+
+type ProducerRegistrationInfo struct {
+ InfoProducerSupervisionCallbackUrl string `json:"info_producer_supervision_callback_url"`
+ SupportedInfoTypes []string `json:"supported_info_types"`
+ InfoJobCallbackUrl string `json:"info_job_callback_url"`
}
func New() *Config {
return &Config{
- LogLevel: getEnv("LOG_LEVEL", "Info"),
- JobResultUri: getEnv("JOB_RESULT_URI", ""),
- InfoCoordinatorAddress: getEnv("INFO_COORD_ADDR", "http://enrichmentservice:8083"),
+ LogLevel: getEnv("LOG_LEVEL", "Info"),
+ InfoJobCallbackUrl: getEnv("INFO_JOB_CALLBACK_URL", ""),
+ InfoCoordinatorAddress: getEnv("INFO_COORD_ADDR", "http://enrichmentservice:8083"),
+ InfoProducerSupervisionCallbackUrl: getEnv("INFO_PRODUCER_SUPERVISION_CALLBACK_URL", ""),
}
}
diff --git a/dmaap-mediator-producer/internal/config/config_test.go b/dmaap-mediator-producer/internal/config/config_test.go
index f0106d0..88ba1d8 100644
--- a/dmaap-mediator-producer/internal/config/config_test.go
+++ b/dmaap-mediator-producer/internal/config/config_test.go
@@ -28,13 +28,15 @@
func TestNew_envVarsSetConfigContainSetValues(t *testing.T) {
os.Setenv("LOG_LEVEL", "Debug")
- os.Setenv("JOB_RESULT_URI", "testUrl")
- os.Setenv("INFO_COORD_ADDR", "testAddr")
+ os.Setenv("INFO_JOB_CALLBACK_URL", "jobCallbackUrl")
+ os.Setenv("INFO_COORD_ADDR", "infoCoordAddr")
+ os.Setenv("INFO_PRODUCER_SUPERVISION_CALLBACK_URL", "supervisionCallbackUrl")
defer os.Clearenv()
wantConfig := Config{
- LogLevel: "Debug",
- JobResultUri: "testUrl",
- InfoCoordinatorAddress: "testAddr",
+ LogLevel: "Debug",
+ InfoJobCallbackUrl: "jobCallbackUrl",
+ InfoCoordinatorAddress: "infoCoordAddr",
+ InfoProducerSupervisionCallbackUrl: "supervisionCallbackUrl",
}
if got := New(); !reflect.DeepEqual(got, &wantConfig) {
t.Errorf("New() = %v, want %v", got, &wantConfig)
@@ -43,9 +45,10 @@
func TestNew_envVarsNotSetConfigContainDefaultValues(t *testing.T) {
wantConfig := Config{
- LogLevel: "Info",
- JobResultUri: "",
- InfoCoordinatorAddress: "http://enrichmentservice:8083",
+ LogLevel: "Info",
+ InfoJobCallbackUrl: "",
+ InfoCoordinatorAddress: "http://enrichmentservice:8083",
+ InfoProducerSupervisionCallbackUrl: "",
}
if got := New(); !reflect.DeepEqual(got, &wantConfig) {
t.Errorf("New() = %v, want %v", got, &wantConfig)
diff --git a/dmaap-mediator-producer/internal/config/registrator.go b/dmaap-mediator-producer/internal/config/registrator.go
index f846f9f..eaf8752 100644
--- a/dmaap-mediator-producer/internal/config/registrator.go
+++ b/dmaap-mediator-producer/internal/config/registrator.go
@@ -21,6 +21,7 @@
package config
import (
+ "encoding/json"
"fmt"
"net/url"
@@ -31,9 +32,11 @@
)
const registerTypePath = "/data-producer/v1/info-types/"
+const registerProducerPath = "/data-producer/v1/info-producers/"
type Registrator interface {
RegisterTypes(types []*jobtypes.Type) error
+ RegisterProducer(producerId string, producerInfo *ProducerRegistrationInfo)
}
type RegistratorImpl struct {
@@ -49,10 +52,22 @@
func (r RegistratorImpl) RegisterTypes(jobTypes []*jobtypes.Type) error {
for _, jobType := range jobTypes {
body := fmt.Sprintf(`{"info_job_data_schema": %v}`, jobType.Schema)
- if error := restclient.Put(r.infoCoordinatorAddress+registerTypePath+url.PathEscape(jobType.Name), []byte(body)); error != nil {
+ if error := restclient.Put(r.infoCoordinatorAddress+registerTypePath+url.PathEscape(jobType.TypeId), []byte(body)); error != nil {
return error
}
log.Debugf("Registered type: %v", jobType)
}
return nil
}
+
+func (r RegistratorImpl) RegisterProducer(producerId string, producerInfo *ProducerRegistrationInfo) error {
+ if body, marshalErr := json.Marshal(producerInfo); marshalErr == nil {
+ if putErr := restclient.Put(r.infoCoordinatorAddress+registerProducerPath+url.PathEscape(producerId), []byte(body)); putErr != nil {
+ return putErr
+ }
+ log.Debugf("Registered producer: %v", producerId)
+ return nil
+ } else {
+ return marshalErr
+ }
+}
diff --git a/dmaap-mediator-producer/internal/config/registrator_test.go b/dmaap-mediator-producer/internal/config/registrator_test.go
index d3dd3a0..94dc684 100644
--- a/dmaap-mediator-producer/internal/config/registrator_test.go
+++ b/dmaap-mediator-producer/internal/config/registrator_test.go
@@ -44,7 +44,7 @@
restclient.Client = &clientMock
type1 := jobtypes.Type{
- Name: "Type1",
+ TypeId: "Type1",
Schema: `{"title": "Type 1"}`,
}
types := []*jobtypes.Type{&type1}
@@ -68,3 +68,40 @@
assertions.Equal(expectedBody, body)
clientMock.AssertNumberOfCalls(t, "Do", 1)
}
+
+func TestRegisterProducer(t *testing.T) {
+ assertions := require.New(t)
+
+ clientMock := mocks.HTTPClient{}
+
+ clientMock.On("Do", mock.Anything).Return(&http.Response{
+ StatusCode: http.StatusCreated,
+ }, nil)
+
+ restclient.Client = &clientMock
+
+ producer := ProducerRegistrationInfo{
+ InfoProducerSupervisionCallbackUrl: "supervisionCallbackUrl",
+ SupportedInfoTypes: []string{"type1"},
+ InfoJobCallbackUrl: "jobCallbackUrl",
+ }
+
+ r := NewRegistratorImpl("http://localhost:9990")
+ err := r.RegisterProducer("Producer1", &producer)
+
+ assertions.Nil(err)
+ var actualRequest *http.Request
+ clientMock.AssertCalled(t, "Do", mock.MatchedBy(func(req *http.Request) bool {
+ actualRequest = req
+ return true
+ }))
+ assertions.Equal(http.MethodPut, actualRequest.Method)
+ assertions.Equal("http", actualRequest.URL.Scheme)
+ assertions.Equal("localhost:9990", actualRequest.URL.Host)
+ assertions.Equal("/data-producer/v1/info-producers/Producer1", actualRequest.URL.Path)
+ assertions.Equal("application/json; charset=utf-8", actualRequest.Header.Get("Content-Type"))
+ body, _ := ioutil.ReadAll(actualRequest.Body)
+ expectedBody := []byte(`{"info_producer_supervision_callback_url":"supervisionCallbackUrl","supported_info_types":["type1"],"info_job_callback_url":"jobCallbackUrl"}`)
+ assertions.Equal(expectedBody, body)
+ clientMock.AssertNumberOfCalls(t, "Do", 1)
+}
diff --git a/dmaap-mediator-producer/internal/jobtypes/jobtypes.go b/dmaap-mediator-producer/internal/jobtypes/jobtypes.go
index 14d837d..894c586 100644
--- a/dmaap-mediator-producer/internal/jobtypes/jobtypes.go
+++ b/dmaap-mediator-producer/internal/jobtypes/jobtypes.go
@@ -27,11 +27,12 @@
)
type Type struct {
- Name string
+ TypeId string
Schema string
}
var typeDir = "configs"
+var supportedTypes = make([]string, 0)
func GetTypes() ([]*Type, error) {
types := make([]*Type, 0, 1)
@@ -53,15 +54,21 @@
return types, nil
}
+func GetSupportedTypes() []string {
+ return supportedTypes
+}
+
func getType(path string) (*Type, error) {
fileName := filepath.Base(path)
typeName := strings.TrimSuffix(fileName, filepath.Ext(fileName))
if typeSchema, err := os.ReadFile(path); err == nil {
- return &Type{
- Name: typeName,
+ typeInfo := Type{
+ TypeId: typeName,
Schema: string(typeSchema),
- }, nil
+ }
+ supportedTypes = append(supportedTypes, typeName)
+ return &typeInfo, nil
} else {
return nil, err
}
diff --git a/dmaap-mediator-producer/internal/jobtypes/jobtypes_test.go b/dmaap-mediator-producer/internal/jobtypes/jobtypes_test.go
index 5fdc378..195fc4c 100644
--- a/dmaap-mediator-producer/internal/jobtypes/jobtypes_test.go
+++ b/dmaap-mediator-producer/internal/jobtypes/jobtypes_test.go
@@ -30,7 +30,7 @@
const type1Schema = `{"title": "Type 1"}`
-func TestGetTypes_filesOkShouldReturnSliceOfTypes(t *testing.T) {
+func TestGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) {
assertions := require.New(t)
typesDir, err := os.MkdirTemp("", "configs")
if err != nil {
@@ -44,10 +44,13 @@
}
types, err := GetTypes()
wantedType := Type{
- Name: "type1",
+ TypeId: "type1",
Schema: type1Schema,
}
wantedTypes := []*Type{&wantedType}
assertions.EqualValues(wantedTypes, types)
assertions.Nil(err)
+
+ supportedTypes := GetSupportedTypes()
+ assertions.EqualValues([]string{"type1"}, supportedTypes)
}
diff --git a/dmaap-mediator-producer/main.go b/dmaap-mediator-producer/main.go
index 240bdbd..d38496f 100644
--- a/dmaap-mediator-producer/main.go
+++ b/dmaap-mediator-producer/main.go
@@ -39,8 +39,11 @@
}
log.Debug("Initializing DMaaP Mediator Producer")
- if configuration.JobResultUri == "" {
- log.Fatal("Missing JOB_RESULT_URI")
+ if configuration.InfoJobCallbackUrl == "" {
+ log.Fatal("Missing INFO_JOB_CALLBACK_URL")
+ }
+ if configuration.InfoProducerSupervisionCallbackUrl == "" {
+ log.Fatal("Missing INFO_PRODUCER_SUPERVISION_CALLBACK_URL")
}
registrator := config.NewRegistratorImpl(configuration.InfoCoordinatorAddress)
@@ -51,6 +54,14 @@
} else {
log.Fatalf("Unable to get types to register due to: %v", err)
}
+ producer := config.ProducerRegistrationInfo{
+ InfoProducerSupervisionCallbackUrl: configuration.InfoProducerSupervisionCallbackUrl,
+ SupportedInfoTypes: jobtypes.GetSupportedTypes(),
+ InfoJobCallbackUrl: configuration.InfoJobCallbackUrl,
+ }
+ if err := registrator.RegisterProducer("DMaaP_Mediator_Producer", &producer); err != nil {
+ log.Fatalf("Unable to register producer due to: %v", err)
+ }
}
func main() {