Register types DMaaP mediator producer
Issue-ID: NONRTRIC-583
Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
Change-Id: I2f8536cb54a1a0e9a44adbd806af3b7ad8e90e61
diff --git a/dmaap-mediator-producer/internal/config/config.go b/dmaap-mediator-producer/internal/config/config.go
new file mode 100644
index 0000000..6969f9f
--- /dev/null
+++ b/dmaap-mediator-producer/internal/config/config.go
@@ -0,0 +1,47 @@
+// -
+// ========================LICENSE_START=================================
+// O-RAN-SC
+// %%
+// Copyright (C) 2021: Nordix Foundation
+// %%
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ========================LICENSE_END===================================
+//
+
+package config
+
+import (
+ "os"
+)
+
+type Config struct {
+ LogLevel string
+ JobResultUri string
+ InfoCoordinatorAddress string
+}
+
+func New() *Config {
+ return &Config{
+ LogLevel: getEnv("LOG_LEVEL", "Info"),
+ JobResultUri: getEnv("JOB_RESULT_URI", ""),
+ InfoCoordinatorAddress: getEnv("INFO_COORD_ADDR", "http://enrichmentservice:8083"),
+ }
+}
+
+func getEnv(key string, defaultVal string) string {
+ if value, exists := os.LookupEnv(key); exists {
+ return value
+ }
+
+ return defaultVal
+}
diff --git a/dmaap-mediator-producer/internal/config/config_test.go b/dmaap-mediator-producer/internal/config/config_test.go
new file mode 100644
index 0000000..f0106d0
--- /dev/null
+++ b/dmaap-mediator-producer/internal/config/config_test.go
@@ -0,0 +1,53 @@
+// -
+// ========================LICENSE_START=================================
+// O-RAN-SC
+// %%
+// Copyright (C) 2021: Nordix Foundation
+// %%
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ========================LICENSE_END===================================
+//
+
+package config
+
+import (
+ "os"
+ "reflect"
+ "testing"
+)
+
+func TestNew_envVarsSetConfigContainSetValues(t *testing.T) {
+ os.Setenv("LOG_LEVEL", "Debug")
+ os.Setenv("JOB_RESULT_URI", "testUrl")
+ os.Setenv("INFO_COORD_ADDR", "testAddr")
+ defer os.Clearenv()
+ wantConfig := Config{
+ LogLevel: "Debug",
+ JobResultUri: "testUrl",
+ InfoCoordinatorAddress: "testAddr",
+ }
+ if got := New(); !reflect.DeepEqual(got, &wantConfig) {
+ t.Errorf("New() = %v, want %v", got, &wantConfig)
+ }
+}
+
+func TestNew_envVarsNotSetConfigContainDefaultValues(t *testing.T) {
+ wantConfig := Config{
+ LogLevel: "Info",
+ JobResultUri: "",
+ InfoCoordinatorAddress: "http://enrichmentservice:8083",
+ }
+ if got := New(); !reflect.DeepEqual(got, &wantConfig) {
+ t.Errorf("New() = %v, want %v", got, &wantConfig)
+ }
+}
diff --git a/dmaap-mediator-producer/internal/config/registrator.go b/dmaap-mediator-producer/internal/config/registrator.go
new file mode 100644
index 0000000..f846f9f
--- /dev/null
+++ b/dmaap-mediator-producer/internal/config/registrator.go
@@ -0,0 +1,58 @@
+// -
+// ========================LICENSE_START=================================
+// O-RAN-SC
+// %%
+// Copyright (C) 2021: Nordix Foundation
+// %%
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ========================LICENSE_END===================================
+//
+
+package config
+
+import (
+ "fmt"
+ "net/url"
+
+ log "github.com/sirupsen/logrus"
+
+ "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobtypes"
+ "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
+)
+
+const registerTypePath = "/data-producer/v1/info-types/"
+
+type Registrator interface {
+ RegisterTypes(types []*jobtypes.Type) error
+}
+
+type RegistratorImpl struct {
+ infoCoordinatorAddress string
+}
+
+func NewRegistratorImpl(infoCoordAddr string) *RegistratorImpl {
+ return &RegistratorImpl{
+ infoCoordinatorAddress: infoCoordAddr,
+ }
+}
+
+func (r RegistratorImpl) RegisterTypes(jobTypes []*jobtypes.Type) error {
+ for _, jobType := range jobTypes {
+ body := fmt.Sprintf(`{"info_job_data_schema": %v}`, jobType.Schema)
+ if error := restclient.Put(r.infoCoordinatorAddress+registerTypePath+url.PathEscape(jobType.Name), []byte(body)); error != nil {
+ return error
+ }
+ log.Debugf("Registered type: %v", jobType)
+ }
+ return nil
+}
diff --git a/dmaap-mediator-producer/internal/config/registrator_test.go b/dmaap-mediator-producer/internal/config/registrator_test.go
new file mode 100644
index 0000000..d3dd3a0
--- /dev/null
+++ b/dmaap-mediator-producer/internal/config/registrator_test.go
@@ -0,0 +1,70 @@
+// -
+// ========================LICENSE_START=================================
+// O-RAN-SC
+// %%
+// Copyright (C) 2021: Nordix Foundation
+// %%
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ========================LICENSE_END===================================
+//
+
+package config
+
+import (
+ "io/ioutil"
+ "net/http"
+ "testing"
+
+ "github.com/stretchr/testify/mock"
+ "github.com/stretchr/testify/require"
+ "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobtypes"
+ "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
+ "oransc.org/nonrtric/dmaapmediatorproducer/mocks"
+)
+
+func TestRegisterTypes(t *testing.T) {
+ assertions := require.New(t)
+
+ clientMock := mocks.HTTPClient{}
+
+ clientMock.On("Do", mock.Anything).Return(&http.Response{
+ StatusCode: http.StatusCreated,
+ }, nil)
+
+ restclient.Client = &clientMock
+
+ type1 := jobtypes.Type{
+ Name: "Type1",
+ Schema: `{"title": "Type 1"}`,
+ }
+ types := []*jobtypes.Type{&type1}
+
+ r := NewRegistratorImpl("http://localhost:9990")
+ err := r.RegisterTypes(types)
+
+ assertions.Nil(err)
+ var actualRequest *http.Request
+ clientMock.AssertCalled(t, "Do", mock.MatchedBy(func(req *http.Request) bool {
+ actualRequest = req
+ return true
+ }))
+ assertions.Equal(http.MethodPut, actualRequest.Method)
+ assertions.Equal("http", actualRequest.URL.Scheme)
+ assertions.Equal("localhost:9990", actualRequest.URL.Host)
+ assertions.Equal("/data-producer/v1/info-types/Type1", actualRequest.URL.Path)
+ assertions.Equal("application/json; charset=utf-8", actualRequest.Header.Get("Content-Type"))
+ body, _ := ioutil.ReadAll(actualRequest.Body)
+ expectedBody := []byte(`{"info_job_data_schema": {"title": "Type 1"}}`)
+ assertions.Equal(expectedBody, body)
+ clientMock.AssertNumberOfCalls(t, "Do", 1)
+}
diff --git a/dmaap-mediator-producer/internal/jobtypes/jobtypes.go b/dmaap-mediator-producer/internal/jobtypes/jobtypes.go
new file mode 100644
index 0000000..14d837d
--- /dev/null
+++ b/dmaap-mediator-producer/internal/jobtypes/jobtypes.go
@@ -0,0 +1,68 @@
+// -
+// ========================LICENSE_START=================================
+// O-RAN-SC
+// %%
+// Copyright (C) 2021: Nordix Foundation
+// %%
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ========================LICENSE_END===================================
+//
+
+package jobtypes
+
+import (
+ "os"
+ "path/filepath"
+ "strings"
+)
+
+type Type struct {
+ Name string
+ Schema string
+}
+
+var typeDir = "configs"
+
+func GetTypes() ([]*Type, error) {
+ types := make([]*Type, 0, 1)
+ err := filepath.Walk(typeDir,
+ func(path string, info os.FileInfo, err error) error {
+ if err != nil {
+ return err
+ }
+ if strings.Contains(path, ".json") {
+ if jobType, err := getType(path); err == nil {
+ types = append(types, jobType)
+ }
+ }
+ return nil
+ })
+ if err != nil {
+ return nil, err
+ }
+ return types, nil
+}
+
+func getType(path string) (*Type, error) {
+ fileName := filepath.Base(path)
+ typeName := strings.TrimSuffix(fileName, filepath.Ext(fileName))
+
+ if typeSchema, err := os.ReadFile(path); err == nil {
+ return &Type{
+ Name: typeName,
+ Schema: string(typeSchema),
+ }, nil
+ } else {
+ return nil, err
+ }
+}
diff --git a/dmaap-mediator-producer/internal/jobtypes/jobtypes_test.go b/dmaap-mediator-producer/internal/jobtypes/jobtypes_test.go
new file mode 100644
index 0000000..5fdc378
--- /dev/null
+++ b/dmaap-mediator-producer/internal/jobtypes/jobtypes_test.go
@@ -0,0 +1,53 @@
+// -
+// ========================LICENSE_START=================================
+// O-RAN-SC
+// %%
+// Copyright (C) 2021: Nordix Foundation
+// %%
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ========================LICENSE_END===================================
+//
+
+package jobtypes
+
+import (
+ "os"
+ "path/filepath"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+)
+
+const type1Schema = `{"title": "Type 1"}`
+
+func TestGetTypes_filesOkShouldReturnSliceOfTypes(t *testing.T) {
+ assertions := require.New(t)
+ typesDir, err := os.MkdirTemp("", "configs")
+ if err != nil {
+ t.Errorf("Unable to create temporary directory for types due to: %v", err)
+ }
+ defer os.RemoveAll(typesDir)
+ typeDir = typesDir
+ fname := filepath.Join(typesDir, "type1.json")
+ if err = os.WriteFile(fname, []byte(type1Schema), 0666); err != nil {
+ t.Errorf("Unable to create temporary files for types due to: %v", err)
+ }
+ types, err := GetTypes()
+ wantedType := Type{
+ Name: "type1",
+ Schema: type1Schema,
+ }
+ wantedTypes := []*Type{&wantedType}
+ assertions.EqualValues(wantedTypes, types)
+ assertions.Nil(err)
+}
diff --git a/dmaap-mediator-producer/internal/restclient/HTTPClient.go b/dmaap-mediator-producer/internal/restclient/HTTPClient.go
new file mode 100644
index 0000000..c6eb24c
--- /dev/null
+++ b/dmaap-mediator-producer/internal/restclient/HTTPClient.go
@@ -0,0 +1,103 @@
+// -
+// ========================LICENSE_START=================================
+// O-RAN-SC
+// %%
+// Copyright (C) 2021: Nordix Foundation
+// %%
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ========================LICENSE_END===================================
+//
+
+package restclient
+
+import (
+ "bytes"
+ "fmt"
+ "io"
+ "net/http"
+)
+
+// HTTPClient interface
+type HTTPClient interface {
+ Get(url string) (*http.Response, error)
+
+ Do(*http.Request) (*http.Response, error)
+}
+
+type RequestError struct {
+ StatusCode int
+ Body []byte
+}
+
+func (pe RequestError) Error() string {
+ return fmt.Sprintf("Request failed due to error response with status: %v and body: %v", pe.StatusCode, string(pe.Body))
+}
+
+var (
+ Client HTTPClient
+)
+
+func init() {
+ Client = &http.Client{}
+}
+
+func Get(url string) ([]byte, error) {
+ if response, err := Client.Get(url); err == nil {
+ defer response.Body.Close()
+ if responseData, err := io.ReadAll(response.Body); err == nil {
+ if isResponseSuccess(response.StatusCode) {
+ return responseData, nil
+ } else {
+ requestError := RequestError{
+ StatusCode: response.StatusCode,
+ Body: responseData,
+ }
+ return nil, requestError
+ }
+ } else {
+ return nil, err
+ }
+ } else {
+ return nil, err
+ }
+}
+
+func Put(url string, body []byte) error {
+ if req, reqErr := http.NewRequest(http.MethodPut, url, bytes.NewBuffer(body)); reqErr == nil {
+ req.Header.Set("Content-Type", "application/json; charset=utf-8")
+ if response, respErr := Client.Do(req); respErr == nil {
+ if isResponseSuccess(response.StatusCode) {
+ return nil
+ } else {
+ return getRequestError(response)
+ }
+ } else {
+ return respErr
+ }
+ } else {
+ return reqErr
+ }
+}
+
+func isResponseSuccess(statusCode int) bool {
+ return statusCode >= http.StatusOK && statusCode <= 299
+}
+
+func getRequestError(response *http.Response) RequestError {
+ responseData, _ := io.ReadAll(response.Body)
+ putError := RequestError{
+ StatusCode: response.StatusCode,
+ Body: responseData,
+ }
+ return putError
+}
diff --git a/dmaap-mediator-producer/internal/restclient/HTTPClient_test.go b/dmaap-mediator-producer/internal/restclient/HTTPClient_test.go
new file mode 100644
index 0000000..3727e6a
--- /dev/null
+++ b/dmaap-mediator-producer/internal/restclient/HTTPClient_test.go
@@ -0,0 +1,164 @@
+// -
+// ========================LICENSE_START=================================
+// O-RAN-SC
+// %%
+// Copyright (C) 2021: Nordix Foundation
+// %%
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ========================LICENSE_END===================================
+//
+
+package restclient
+
+import (
+ "bytes"
+ "errors"
+ "io/ioutil"
+ "net/http"
+ "reflect"
+ "testing"
+
+ "github.com/stretchr/testify/mock"
+ "github.com/stretchr/testify/require"
+ "oransc.org/nonrtric/dmaapmediatorproducer/mocks"
+)
+
+func TestGet(t *testing.T) {
+ clientMock := mocks.HTTPClient{}
+
+ clientMock.On("Get", "http://testOk").Return(&http.Response{
+ StatusCode: http.StatusOK,
+ Body: ioutil.NopCloser(bytes.NewReader([]byte("Response"))),
+ }, nil)
+
+ clientMock.On("Get", "http://testNotOk").Return(&http.Response{
+ StatusCode: http.StatusBadRequest,
+ Body: ioutil.NopCloser(bytes.NewReader([]byte("Bad Response"))),
+ }, nil)
+
+ clientMock.On("Get", "http://testError").Return(nil, errors.New("Failed Request"))
+
+ Client = &clientMock
+
+ type args struct {
+ url string
+ }
+ tests := []struct {
+ name string
+ args args
+ want []byte
+ wantErr bool
+ wantedError error
+ }{
+ {
+ name: "Test Get with OK response",
+ args: args{
+ url: "http://testOk",
+ },
+ want: []byte("Response"),
+ wantErr: false,
+ },
+ {
+ name: "Test Get with Not OK response",
+ args: args{
+ url: "http://testNotOk",
+ },
+ want: nil,
+ wantErr: true,
+ wantedError: RequestError{
+ StatusCode: http.StatusBadRequest,
+ Body: []byte("Bad Response"),
+ },
+ },
+ {
+ name: "Test Get with error",
+ args: args{
+ url: "http://testError",
+ },
+ want: nil,
+ wantErr: true,
+ wantedError: errors.New("Failed Request"),
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ got, err := Get(tt.args.url)
+ if (err != nil) != tt.wantErr {
+ t.Errorf("Get() error = %v, wantErr %v", err, tt.wantErr)
+ return
+ }
+ if !reflect.DeepEqual(got, tt.want) {
+ t.Errorf("Get() = %v, want %v", got, tt.want)
+ }
+ if tt.wantErr && err.Error() != tt.wantedError.Error() {
+ t.Errorf("Get() error = %v, wantedError % v", err, tt.wantedError.Error())
+ }
+ })
+ }
+}
+
+func TestPutOk(t *testing.T) {
+ assertions := require.New(t)
+ clientMock := mocks.HTTPClient{}
+
+ clientMock.On("Do", mock.Anything).Return(&http.Response{
+ StatusCode: http.StatusOK,
+ }, nil)
+
+ Client = &clientMock
+ if err := Put("http://localhost:9990", []byte("body")); err != nil {
+ t.Errorf("Put() error = %v, did not want error", err)
+ }
+ var actualRequest *http.Request
+ clientMock.AssertCalled(t, "Do", mock.MatchedBy(func(req *http.Request) bool {
+ actualRequest = req
+ return true
+ }))
+ assertions.Equal(http.MethodPut, actualRequest.Method)
+ assertions.Equal("http", actualRequest.URL.Scheme)
+ assertions.Equal("localhost:9990", actualRequest.URL.Host)
+ assertions.Equal("application/json; charset=utf-8", actualRequest.Header.Get("Content-Type"))
+ body, _ := ioutil.ReadAll(actualRequest.Body)
+ expectedBody := []byte("body")
+ assertions.Equal(expectedBody, body)
+ clientMock.AssertNumberOfCalls(t, "Do", 1)
+}
+
+func TestPutBadResponse(t *testing.T) {
+ assertions := require.New(t)
+ clientMock := mocks.HTTPClient{}
+
+ clientMock.On("Do", mock.Anything).Return(&http.Response{
+ StatusCode: http.StatusBadRequest,
+ Body: ioutil.NopCloser(bytes.NewReader([]byte("Bad Request"))),
+ }, nil)
+
+ Client = &clientMock
+ err := Put("url", []byte("body"))
+ assertions.NotNil("Put() error = %v, wanted error", err)
+ expectedErrorMessage := "Request failed due to error response with status: 400 and body: Bad Request"
+ assertions.Equal(expectedErrorMessage, err.Error())
+}
+
+func TestPutError(t *testing.T) {
+ assertions := require.New(t)
+ clientMock := mocks.HTTPClient{}
+
+ clientMock.On("Do", mock.Anything).Return(nil, errors.New("Failed Request"))
+
+ Client = &clientMock
+ err := Put("url", []byte("body"))
+ assertions.NotNil("Put() error = %v, wanted error", err)
+ expectedErrorMessage := "Failed Request"
+ assertions.Equal(expectedErrorMessage, err.Error())
+}