Register producer DMaaP mediator producer

Issue-ID: NONRTRIC-584
Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
Change-Id: I7f9d8e6a2f68d13e91706722d171b5f6874bae78
diff --git a/dmaap-mediator-producer/internal/config/config.go b/dmaap-mediator-producer/internal/config/config.go
index 6969f9f..a3e3a11 100644
--- a/dmaap-mediator-producer/internal/config/config.go
+++ b/dmaap-mediator-producer/internal/config/config.go
@@ -25,16 +25,24 @@
 )
 
 type Config struct {
-	LogLevel               string
-	JobResultUri           string
-	InfoCoordinatorAddress string
+	LogLevel                           string
+	InfoJobCallbackUrl                 string
+	InfoCoordinatorAddress             string
+	InfoProducerSupervisionCallbackUrl string
+}
+
+type ProducerRegistrationInfo struct {
+	InfoProducerSupervisionCallbackUrl string   `json:"info_producer_supervision_callback_url"`
+	SupportedInfoTypes                 []string `json:"supported_info_types"`
+	InfoJobCallbackUrl                 string   `json:"info_job_callback_url"`
 }
 
 func New() *Config {
 	return &Config{
-		LogLevel:               getEnv("LOG_LEVEL", "Info"),
-		JobResultUri:           getEnv("JOB_RESULT_URI", ""),
-		InfoCoordinatorAddress: getEnv("INFO_COORD_ADDR", "http://enrichmentservice:8083"),
+		LogLevel:                           getEnv("LOG_LEVEL", "Info"),
+		InfoJobCallbackUrl:                 getEnv("INFO_JOB_CALLBACK_URL", ""),
+		InfoCoordinatorAddress:             getEnv("INFO_COORD_ADDR", "http://enrichmentservice:8083"),
+		InfoProducerSupervisionCallbackUrl: getEnv("INFO_PRODUCER_SUPERVISION_CALLBACK_URL", ""),
 	}
 }
 
diff --git a/dmaap-mediator-producer/internal/config/config_test.go b/dmaap-mediator-producer/internal/config/config_test.go
index f0106d0..88ba1d8 100644
--- a/dmaap-mediator-producer/internal/config/config_test.go
+++ b/dmaap-mediator-producer/internal/config/config_test.go
@@ -28,13 +28,15 @@
 
 func TestNew_envVarsSetConfigContainSetValues(t *testing.T) {
 	os.Setenv("LOG_LEVEL", "Debug")
-	os.Setenv("JOB_RESULT_URI", "testUrl")
-	os.Setenv("INFO_COORD_ADDR", "testAddr")
+	os.Setenv("INFO_JOB_CALLBACK_URL", "jobCallbackUrl")
+	os.Setenv("INFO_COORD_ADDR", "infoCoordAddr")
+	os.Setenv("INFO_PRODUCER_SUPERVISION_CALLBACK_URL", "supervisionCallbackUrl")
 	defer os.Clearenv()
 	wantConfig := Config{
-		LogLevel:               "Debug",
-		JobResultUri:           "testUrl",
-		InfoCoordinatorAddress: "testAddr",
+		LogLevel:                           "Debug",
+		InfoJobCallbackUrl:                 "jobCallbackUrl",
+		InfoCoordinatorAddress:             "infoCoordAddr",
+		InfoProducerSupervisionCallbackUrl: "supervisionCallbackUrl",
 	}
 	if got := New(); !reflect.DeepEqual(got, &wantConfig) {
 		t.Errorf("New() = %v, want %v", got, &wantConfig)
@@ -43,9 +45,10 @@
 
 func TestNew_envVarsNotSetConfigContainDefaultValues(t *testing.T) {
 	wantConfig := Config{
-		LogLevel:               "Info",
-		JobResultUri:           "",
-		InfoCoordinatorAddress: "http://enrichmentservice:8083",
+		LogLevel:                           "Info",
+		InfoJobCallbackUrl:                 "",
+		InfoCoordinatorAddress:             "http://enrichmentservice:8083",
+		InfoProducerSupervisionCallbackUrl: "",
 	}
 	if got := New(); !reflect.DeepEqual(got, &wantConfig) {
 		t.Errorf("New() = %v, want %v", got, &wantConfig)
diff --git a/dmaap-mediator-producer/internal/config/registrator.go b/dmaap-mediator-producer/internal/config/registrator.go
index f846f9f..eaf8752 100644
--- a/dmaap-mediator-producer/internal/config/registrator.go
+++ b/dmaap-mediator-producer/internal/config/registrator.go
@@ -21,6 +21,7 @@
 package config
 
 import (
+	"encoding/json"
 	"fmt"
 	"net/url"
 
@@ -31,9 +32,11 @@
 )
 
 const registerTypePath = "/data-producer/v1/info-types/"
+const registerProducerPath = "/data-producer/v1/info-producers/"
 
 type Registrator interface {
 	RegisterTypes(types []*jobtypes.Type) error
+	RegisterProducer(producerId string, producerInfo *ProducerRegistrationInfo)
 }
 
 type RegistratorImpl struct {
@@ -49,10 +52,22 @@
 func (r RegistratorImpl) RegisterTypes(jobTypes []*jobtypes.Type) error {
 	for _, jobType := range jobTypes {
 		body := fmt.Sprintf(`{"info_job_data_schema": %v}`, jobType.Schema)
-		if error := restclient.Put(r.infoCoordinatorAddress+registerTypePath+url.PathEscape(jobType.Name), []byte(body)); error != nil {
+		if error := restclient.Put(r.infoCoordinatorAddress+registerTypePath+url.PathEscape(jobType.TypeId), []byte(body)); error != nil {
 			return error
 		}
 		log.Debugf("Registered type: %v", jobType)
 	}
 	return nil
 }
+
+func (r RegistratorImpl) RegisterProducer(producerId string, producerInfo *ProducerRegistrationInfo) error {
+	if body, marshalErr := json.Marshal(producerInfo); marshalErr == nil {
+		if putErr := restclient.Put(r.infoCoordinatorAddress+registerProducerPath+url.PathEscape(producerId), []byte(body)); putErr != nil {
+			return putErr
+		}
+		log.Debugf("Registered producer: %v", producerId)
+		return nil
+	} else {
+		return marshalErr
+	}
+}
diff --git a/dmaap-mediator-producer/internal/config/registrator_test.go b/dmaap-mediator-producer/internal/config/registrator_test.go
index d3dd3a0..94dc684 100644
--- a/dmaap-mediator-producer/internal/config/registrator_test.go
+++ b/dmaap-mediator-producer/internal/config/registrator_test.go
@@ -44,7 +44,7 @@
 	restclient.Client = &clientMock
 
 	type1 := jobtypes.Type{
-		Name:   "Type1",
+		TypeId: "Type1",
 		Schema: `{"title": "Type 1"}`,
 	}
 	types := []*jobtypes.Type{&type1}
@@ -68,3 +68,40 @@
 	assertions.Equal(expectedBody, body)
 	clientMock.AssertNumberOfCalls(t, "Do", 1)
 }
+
+func TestRegisterProducer(t *testing.T) {
+	assertions := require.New(t)
+
+	clientMock := mocks.HTTPClient{}
+
+	clientMock.On("Do", mock.Anything).Return(&http.Response{
+		StatusCode: http.StatusCreated,
+	}, nil)
+
+	restclient.Client = &clientMock
+
+	producer := ProducerRegistrationInfo{
+		InfoProducerSupervisionCallbackUrl: "supervisionCallbackUrl",
+		SupportedInfoTypes:                 []string{"type1"},
+		InfoJobCallbackUrl:                 "jobCallbackUrl",
+	}
+
+	r := NewRegistratorImpl("http://localhost:9990")
+	err := r.RegisterProducer("Producer1", &producer)
+
+	assertions.Nil(err)
+	var actualRequest *http.Request
+	clientMock.AssertCalled(t, "Do", mock.MatchedBy(func(req *http.Request) bool {
+		actualRequest = req
+		return true
+	}))
+	assertions.Equal(http.MethodPut, actualRequest.Method)
+	assertions.Equal("http", actualRequest.URL.Scheme)
+	assertions.Equal("localhost:9990", actualRequest.URL.Host)
+	assertions.Equal("/data-producer/v1/info-producers/Producer1", actualRequest.URL.Path)
+	assertions.Equal("application/json; charset=utf-8", actualRequest.Header.Get("Content-Type"))
+	body, _ := ioutil.ReadAll(actualRequest.Body)
+	expectedBody := []byte(`{"info_producer_supervision_callback_url":"supervisionCallbackUrl","supported_info_types":["type1"],"info_job_callback_url":"jobCallbackUrl"}`)
+	assertions.Equal(expectedBody, body)
+	clientMock.AssertNumberOfCalls(t, "Do", 1)
+}