Register types DMaaP mediator producer

Issue-ID: NONRTRIC-583
Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
Change-Id: I2f8536cb54a1a0e9a44adbd806af3b7ad8e90e61
diff --git a/dmaap-mediator-producer/internal/config/config.go b/dmaap-mediator-producer/internal/config/config.go
new file mode 100644
index 0000000..6969f9f
--- /dev/null
+++ b/dmaap-mediator-producer/internal/config/config.go
@@ -0,0 +1,47 @@
+// -
+//   ========================LICENSE_START=================================
+//   O-RAN-SC
+//   %%
+//   Copyright (C) 2021: Nordix Foundation
+//   %%
+//   Licensed under the Apache License, Version 2.0 (the "License");
+//   you may not use this file except in compliance with the License.
+//   You may obtain a copy of the License at
+//
+//        http://www.apache.org/licenses/LICENSE-2.0
+//
+//   Unless required by applicable law or agreed to in writing, software
+//   distributed under the License is distributed on an "AS IS" BASIS,
+//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//   See the License for the specific language governing permissions and
+//   limitations under the License.
+//   ========================LICENSE_END===================================
+//
+
+package config
+
+import (
+	"os"
+)
+
+type Config struct {
+	LogLevel               string
+	JobResultUri           string
+	InfoCoordinatorAddress string
+}
+
+func New() *Config {
+	return &Config{
+		LogLevel:               getEnv("LOG_LEVEL", "Info"),
+		JobResultUri:           getEnv("JOB_RESULT_URI", ""),
+		InfoCoordinatorAddress: getEnv("INFO_COORD_ADDR", "http://enrichmentservice:8083"),
+	}
+}
+
+func getEnv(key string, defaultVal string) string {
+	if value, exists := os.LookupEnv(key); exists {
+		return value
+	}
+
+	return defaultVal
+}
diff --git a/dmaap-mediator-producer/internal/config/config_test.go b/dmaap-mediator-producer/internal/config/config_test.go
new file mode 100644
index 0000000..f0106d0
--- /dev/null
+++ b/dmaap-mediator-producer/internal/config/config_test.go
@@ -0,0 +1,53 @@
+// -
+//   ========================LICENSE_START=================================
+//   O-RAN-SC
+//   %%
+//   Copyright (C) 2021: Nordix Foundation
+//   %%
+//   Licensed under the Apache License, Version 2.0 (the "License");
+//   you may not use this file except in compliance with the License.
+//   You may obtain a copy of the License at
+//
+//        http://www.apache.org/licenses/LICENSE-2.0
+//
+//   Unless required by applicable law or agreed to in writing, software
+//   distributed under the License is distributed on an "AS IS" BASIS,
+//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//   See the License for the specific language governing permissions and
+//   limitations under the License.
+//   ========================LICENSE_END===================================
+//
+
+package config
+
+import (
+	"os"
+	"reflect"
+	"testing"
+)
+
+func TestNew_envVarsSetConfigContainSetValues(t *testing.T) {
+	os.Setenv("LOG_LEVEL", "Debug")
+	os.Setenv("JOB_RESULT_URI", "testUrl")
+	os.Setenv("INFO_COORD_ADDR", "testAddr")
+	defer os.Clearenv()
+	wantConfig := Config{
+		LogLevel:               "Debug",
+		JobResultUri:           "testUrl",
+		InfoCoordinatorAddress: "testAddr",
+	}
+	if got := New(); !reflect.DeepEqual(got, &wantConfig) {
+		t.Errorf("New() = %v, want %v", got, &wantConfig)
+	}
+}
+
+func TestNew_envVarsNotSetConfigContainDefaultValues(t *testing.T) {
+	wantConfig := Config{
+		LogLevel:               "Info",
+		JobResultUri:           "",
+		InfoCoordinatorAddress: "http://enrichmentservice:8083",
+	}
+	if got := New(); !reflect.DeepEqual(got, &wantConfig) {
+		t.Errorf("New() = %v, want %v", got, &wantConfig)
+	}
+}
diff --git a/dmaap-mediator-producer/internal/config/registrator.go b/dmaap-mediator-producer/internal/config/registrator.go
new file mode 100644
index 0000000..f846f9f
--- /dev/null
+++ b/dmaap-mediator-producer/internal/config/registrator.go
@@ -0,0 +1,58 @@
+// -
+//   ========================LICENSE_START=================================
+//   O-RAN-SC
+//   %%
+//   Copyright (C) 2021: Nordix Foundation
+//   %%
+//   Licensed under the Apache License, Version 2.0 (the "License");
+//   you may not use this file except in compliance with the License.
+//   You may obtain a copy of the License at
+//
+//        http://www.apache.org/licenses/LICENSE-2.0
+//
+//   Unless required by applicable law or agreed to in writing, software
+//   distributed under the License is distributed on an "AS IS" BASIS,
+//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//   See the License for the specific language governing permissions and
+//   limitations under the License.
+//   ========================LICENSE_END===================================
+//
+
+package config
+
+import (
+	"fmt"
+	"net/url"
+
+	log "github.com/sirupsen/logrus"
+
+	"oransc.org/nonrtric/dmaapmediatorproducer/internal/jobtypes"
+	"oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
+)
+
+const registerTypePath = "/data-producer/v1/info-types/"
+
+type Registrator interface {
+	RegisterTypes(types []*jobtypes.Type) error
+}
+
+type RegistratorImpl struct {
+	infoCoordinatorAddress string
+}
+
+func NewRegistratorImpl(infoCoordAddr string) *RegistratorImpl {
+	return &RegistratorImpl{
+		infoCoordinatorAddress: infoCoordAddr,
+	}
+}
+
+func (r RegistratorImpl) RegisterTypes(jobTypes []*jobtypes.Type) error {
+	for _, jobType := range jobTypes {
+		body := fmt.Sprintf(`{"info_job_data_schema": %v}`, jobType.Schema)
+		if error := restclient.Put(r.infoCoordinatorAddress+registerTypePath+url.PathEscape(jobType.Name), []byte(body)); error != nil {
+			return error
+		}
+		log.Debugf("Registered type: %v", jobType)
+	}
+	return nil
+}
diff --git a/dmaap-mediator-producer/internal/config/registrator_test.go b/dmaap-mediator-producer/internal/config/registrator_test.go
new file mode 100644
index 0000000..d3dd3a0
--- /dev/null
+++ b/dmaap-mediator-producer/internal/config/registrator_test.go
@@ -0,0 +1,70 @@
+// -
+//   ========================LICENSE_START=================================
+//   O-RAN-SC
+//   %%
+//   Copyright (C) 2021: Nordix Foundation
+//   %%
+//   Licensed under the Apache License, Version 2.0 (the "License");
+//   you may not use this file except in compliance with the License.
+//   You may obtain a copy of the License at
+//
+//        http://www.apache.org/licenses/LICENSE-2.0
+//
+//   Unless required by applicable law or agreed to in writing, software
+//   distributed under the License is distributed on an "AS IS" BASIS,
+//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//   See the License for the specific language governing permissions and
+//   limitations under the License.
+//   ========================LICENSE_END===================================
+//
+
+package config
+
+import (
+	"io/ioutil"
+	"net/http"
+	"testing"
+
+	"github.com/stretchr/testify/mock"
+	"github.com/stretchr/testify/require"
+	"oransc.org/nonrtric/dmaapmediatorproducer/internal/jobtypes"
+	"oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
+	"oransc.org/nonrtric/dmaapmediatorproducer/mocks"
+)
+
+func TestRegisterTypes(t *testing.T) {
+	assertions := require.New(t)
+
+	clientMock := mocks.HTTPClient{}
+
+	clientMock.On("Do", mock.Anything).Return(&http.Response{
+		StatusCode: http.StatusCreated,
+	}, nil)
+
+	restclient.Client = &clientMock
+
+	type1 := jobtypes.Type{
+		Name:   "Type1",
+		Schema: `{"title": "Type 1"}`,
+	}
+	types := []*jobtypes.Type{&type1}
+
+	r := NewRegistratorImpl("http://localhost:9990")
+	err := r.RegisterTypes(types)
+
+	assertions.Nil(err)
+	var actualRequest *http.Request
+	clientMock.AssertCalled(t, "Do", mock.MatchedBy(func(req *http.Request) bool {
+		actualRequest = req
+		return true
+	}))
+	assertions.Equal(http.MethodPut, actualRequest.Method)
+	assertions.Equal("http", actualRequest.URL.Scheme)
+	assertions.Equal("localhost:9990", actualRequest.URL.Host)
+	assertions.Equal("/data-producer/v1/info-types/Type1", actualRequest.URL.Path)
+	assertions.Equal("application/json; charset=utf-8", actualRequest.Header.Get("Content-Type"))
+	body, _ := ioutil.ReadAll(actualRequest.Body)
+	expectedBody := []byte(`{"info_job_data_schema": {"title": "Type 1"}}`)
+	assertions.Equal(expectedBody, body)
+	clientMock.AssertNumberOfCalls(t, "Do", 1)
+}
diff --git a/dmaap-mediator-producer/internal/jobtypes/jobtypes.go b/dmaap-mediator-producer/internal/jobtypes/jobtypes.go
new file mode 100644
index 0000000..14d837d
--- /dev/null
+++ b/dmaap-mediator-producer/internal/jobtypes/jobtypes.go
@@ -0,0 +1,68 @@
+// -
+//   ========================LICENSE_START=================================
+//   O-RAN-SC
+//   %%
+//   Copyright (C) 2021: Nordix Foundation
+//   %%
+//   Licensed under the Apache License, Version 2.0 (the "License");
+//   you may not use this file except in compliance with the License.
+//   You may obtain a copy of the License at
+//
+//        http://www.apache.org/licenses/LICENSE-2.0
+//
+//   Unless required by applicable law or agreed to in writing, software
+//   distributed under the License is distributed on an "AS IS" BASIS,
+//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//   See the License for the specific language governing permissions and
+//   limitations under the License.
+//   ========================LICENSE_END===================================
+//
+
+package jobtypes
+
+import (
+	"os"
+	"path/filepath"
+	"strings"
+)
+
+type Type struct {
+	Name   string
+	Schema string
+}
+
+var typeDir = "configs"
+
+func GetTypes() ([]*Type, error) {
+	types := make([]*Type, 0, 1)
+	err := filepath.Walk(typeDir,
+		func(path string, info os.FileInfo, err error) error {
+			if err != nil {
+				return err
+			}
+			if strings.Contains(path, ".json") {
+				if jobType, err := getType(path); err == nil {
+					types = append(types, jobType)
+				}
+			}
+			return nil
+		})
+	if err != nil {
+		return nil, err
+	}
+	return types, nil
+}
+
+func getType(path string) (*Type, error) {
+	fileName := filepath.Base(path)
+	typeName := strings.TrimSuffix(fileName, filepath.Ext(fileName))
+
+	if typeSchema, err := os.ReadFile(path); err == nil {
+		return &Type{
+			Name:   typeName,
+			Schema: string(typeSchema),
+		}, nil
+	} else {
+		return nil, err
+	}
+}
diff --git a/dmaap-mediator-producer/internal/jobtypes/jobtypes_test.go b/dmaap-mediator-producer/internal/jobtypes/jobtypes_test.go
new file mode 100644
index 0000000..5fdc378
--- /dev/null
+++ b/dmaap-mediator-producer/internal/jobtypes/jobtypes_test.go
@@ -0,0 +1,53 @@
+// -
+//   ========================LICENSE_START=================================
+//   O-RAN-SC
+//   %%
+//   Copyright (C) 2021: Nordix Foundation
+//   %%
+//   Licensed under the Apache License, Version 2.0 (the "License");
+//   you may not use this file except in compliance with the License.
+//   You may obtain a copy of the License at
+//
+//        http://www.apache.org/licenses/LICENSE-2.0
+//
+//   Unless required by applicable law or agreed to in writing, software
+//   distributed under the License is distributed on an "AS IS" BASIS,
+//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//   See the License for the specific language governing permissions and
+//   limitations under the License.
+//   ========================LICENSE_END===================================
+//
+
+package jobtypes
+
+import (
+	"os"
+	"path/filepath"
+	"testing"
+
+	"github.com/stretchr/testify/require"
+)
+
+const type1Schema = `{"title": "Type 1"}`
+
+func TestGetTypes_filesOkShouldReturnSliceOfTypes(t *testing.T) {
+	assertions := require.New(t)
+	typesDir, err := os.MkdirTemp("", "configs")
+	if err != nil {
+		t.Errorf("Unable to create temporary directory for types due to: %v", err)
+	}
+	defer os.RemoveAll(typesDir)
+	typeDir = typesDir
+	fname := filepath.Join(typesDir, "type1.json")
+	if err = os.WriteFile(fname, []byte(type1Schema), 0666); err != nil {
+		t.Errorf("Unable to create temporary files for types due to: %v", err)
+	}
+	types, err := GetTypes()
+	wantedType := Type{
+		Name:   "type1",
+		Schema: type1Schema,
+	}
+	wantedTypes := []*Type{&wantedType}
+	assertions.EqualValues(wantedTypes, types)
+	assertions.Nil(err)
+}
diff --git a/dmaap-mediator-producer/internal/restclient/HTTPClient.go b/dmaap-mediator-producer/internal/restclient/HTTPClient.go
new file mode 100644
index 0000000..c6eb24c
--- /dev/null
+++ b/dmaap-mediator-producer/internal/restclient/HTTPClient.go
@@ -0,0 +1,103 @@
+// -
+//   ========================LICENSE_START=================================
+//   O-RAN-SC
+//   %%
+//   Copyright (C) 2021: Nordix Foundation
+//   %%
+//   Licensed under the Apache License, Version 2.0 (the "License");
+//   you may not use this file except in compliance with the License.
+//   You may obtain a copy of the License at
+//
+//        http://www.apache.org/licenses/LICENSE-2.0
+//
+//   Unless required by applicable law or agreed to in writing, software
+//   distributed under the License is distributed on an "AS IS" BASIS,
+//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//   See the License for the specific language governing permissions and
+//   limitations under the License.
+//   ========================LICENSE_END===================================
+//
+
+package restclient
+
+import (
+	"bytes"
+	"fmt"
+	"io"
+	"net/http"
+)
+
+// HTTPClient interface
+type HTTPClient interface {
+	Get(url string) (*http.Response, error)
+
+	Do(*http.Request) (*http.Response, error)
+}
+
+type RequestError struct {
+	StatusCode int
+	Body       []byte
+}
+
+func (pe RequestError) Error() string {
+	return fmt.Sprintf("Request failed due to error response with status: %v and body: %v", pe.StatusCode, string(pe.Body))
+}
+
+var (
+	Client HTTPClient
+)
+
+func init() {
+	Client = &http.Client{}
+}
+
+func Get(url string) ([]byte, error) {
+	if response, err := Client.Get(url); err == nil {
+		defer response.Body.Close()
+		if responseData, err := io.ReadAll(response.Body); err == nil {
+			if isResponseSuccess(response.StatusCode) {
+				return responseData, nil
+			} else {
+				requestError := RequestError{
+					StatusCode: response.StatusCode,
+					Body:       responseData,
+				}
+				return nil, requestError
+			}
+		} else {
+			return nil, err
+		}
+	} else {
+		return nil, err
+	}
+}
+
+func Put(url string, body []byte) error {
+	if req, reqErr := http.NewRequest(http.MethodPut, url, bytes.NewBuffer(body)); reqErr == nil {
+		req.Header.Set("Content-Type", "application/json; charset=utf-8")
+		if response, respErr := Client.Do(req); respErr == nil {
+			if isResponseSuccess(response.StatusCode) {
+				return nil
+			} else {
+				return getRequestError(response)
+			}
+		} else {
+			return respErr
+		}
+	} else {
+		return reqErr
+	}
+}
+
+func isResponseSuccess(statusCode int) bool {
+	return statusCode >= http.StatusOK && statusCode <= 299
+}
+
+func getRequestError(response *http.Response) RequestError {
+	responseData, _ := io.ReadAll(response.Body)
+	putError := RequestError{
+		StatusCode: response.StatusCode,
+		Body:       responseData,
+	}
+	return putError
+}
diff --git a/dmaap-mediator-producer/internal/restclient/HTTPClient_test.go b/dmaap-mediator-producer/internal/restclient/HTTPClient_test.go
new file mode 100644
index 0000000..3727e6a
--- /dev/null
+++ b/dmaap-mediator-producer/internal/restclient/HTTPClient_test.go
@@ -0,0 +1,164 @@
+// -
+//   ========================LICENSE_START=================================
+//   O-RAN-SC
+//   %%
+//   Copyright (C) 2021: Nordix Foundation
+//   %%
+//   Licensed under the Apache License, Version 2.0 (the "License");
+//   you may not use this file except in compliance with the License.
+//   You may obtain a copy of the License at
+//
+//        http://www.apache.org/licenses/LICENSE-2.0
+//
+//   Unless required by applicable law or agreed to in writing, software
+//   distributed under the License is distributed on an "AS IS" BASIS,
+//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//   See the License for the specific language governing permissions and
+//   limitations under the License.
+//   ========================LICENSE_END===================================
+//
+
+package restclient
+
+import (
+	"bytes"
+	"errors"
+	"io/ioutil"
+	"net/http"
+	"reflect"
+	"testing"
+
+	"github.com/stretchr/testify/mock"
+	"github.com/stretchr/testify/require"
+	"oransc.org/nonrtric/dmaapmediatorproducer/mocks"
+)
+
+func TestGet(t *testing.T) {
+	clientMock := mocks.HTTPClient{}
+
+	clientMock.On("Get", "http://testOk").Return(&http.Response{
+		StatusCode: http.StatusOK,
+		Body:       ioutil.NopCloser(bytes.NewReader([]byte("Response"))),
+	}, nil)
+
+	clientMock.On("Get", "http://testNotOk").Return(&http.Response{
+		StatusCode: http.StatusBadRequest,
+		Body:       ioutil.NopCloser(bytes.NewReader([]byte("Bad Response"))),
+	}, nil)
+
+	clientMock.On("Get", "http://testError").Return(nil, errors.New("Failed Request"))
+
+	Client = &clientMock
+
+	type args struct {
+		url string
+	}
+	tests := []struct {
+		name        string
+		args        args
+		want        []byte
+		wantErr     bool
+		wantedError error
+	}{
+		{
+			name: "Test Get with OK response",
+			args: args{
+				url: "http://testOk",
+			},
+			want:    []byte("Response"),
+			wantErr: false,
+		},
+		{
+			name: "Test Get with Not OK response",
+			args: args{
+				url: "http://testNotOk",
+			},
+			want:    nil,
+			wantErr: true,
+			wantedError: RequestError{
+				StatusCode: http.StatusBadRequest,
+				Body:       []byte("Bad Response"),
+			},
+		},
+		{
+			name: "Test Get with error",
+			args: args{
+				url: "http://testError",
+			},
+			want:        nil,
+			wantErr:     true,
+			wantedError: errors.New("Failed Request"),
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			got, err := Get(tt.args.url)
+			if (err != nil) != tt.wantErr {
+				t.Errorf("Get() error = %v, wantErr %v", err, tt.wantErr)
+				return
+			}
+			if !reflect.DeepEqual(got, tt.want) {
+				t.Errorf("Get() = %v, want %v", got, tt.want)
+			}
+			if tt.wantErr && err.Error() != tt.wantedError.Error() {
+				t.Errorf("Get() error = %v, wantedError % v", err, tt.wantedError.Error())
+			}
+		})
+	}
+}
+
+func TestPutOk(t *testing.T) {
+	assertions := require.New(t)
+	clientMock := mocks.HTTPClient{}
+
+	clientMock.On("Do", mock.Anything).Return(&http.Response{
+		StatusCode: http.StatusOK,
+	}, nil)
+
+	Client = &clientMock
+	if err := Put("http://localhost:9990", []byte("body")); err != nil {
+		t.Errorf("Put() error = %v, did not want error", err)
+	}
+	var actualRequest *http.Request
+	clientMock.AssertCalled(t, "Do", mock.MatchedBy(func(req *http.Request) bool {
+		actualRequest = req
+		return true
+	}))
+	assertions.Equal(http.MethodPut, actualRequest.Method)
+	assertions.Equal("http", actualRequest.URL.Scheme)
+	assertions.Equal("localhost:9990", actualRequest.URL.Host)
+	assertions.Equal("application/json; charset=utf-8", actualRequest.Header.Get("Content-Type"))
+	body, _ := ioutil.ReadAll(actualRequest.Body)
+	expectedBody := []byte("body")
+	assertions.Equal(expectedBody, body)
+	clientMock.AssertNumberOfCalls(t, "Do", 1)
+}
+
+func TestPutBadResponse(t *testing.T) {
+	assertions := require.New(t)
+	clientMock := mocks.HTTPClient{}
+
+	clientMock.On("Do", mock.Anything).Return(&http.Response{
+		StatusCode: http.StatusBadRequest,
+		Body:       ioutil.NopCloser(bytes.NewReader([]byte("Bad Request"))),
+	}, nil)
+
+	Client = &clientMock
+	err := Put("url", []byte("body"))
+	assertions.NotNil("Put() error = %v, wanted error", err)
+	expectedErrorMessage := "Request failed due to error response with status: 400 and body: Bad Request"
+	assertions.Equal(expectedErrorMessage, err.Error())
+}
+
+func TestPutError(t *testing.T) {
+	assertions := require.New(t)
+	clientMock := mocks.HTTPClient{}
+
+	clientMock.On("Do", mock.Anything).Return(nil, errors.New("Failed Request"))
+
+	Client = &clientMock
+	err := Put("url", []byte("body"))
+	assertions.NotNil("Put() error = %v, wanted error", err)
+	expectedErrorMessage := "Failed Request"
+	assertions.Equal(expectedErrorMessage, err.Error())
+}