Merge "Poll MR and send messages to consumers"
diff --git a/.gitignore b/.gitignore
index df309a1..5915080 100644
--- a/.gitignore
+++ b/.gitignore
@@ -18,3 +18,5 @@
.vscode
.factorypath
+
+coverage.*
diff --git a/dmaap-mediator-producer/configs/STD_Fault_Messages.json b/dmaap-mediator-producer/configs/STD_Fault_Messages.json
index b944802..258d743 100644
--- a/dmaap-mediator-producer/configs/STD_Fault_Messages.json
+++ b/dmaap-mediator-producer/configs/STD_Fault_Messages.json
@@ -1,7 +1,12 @@
{
- "$schema": "https://json-schema.org/draft/2019-09/schema",
- "title": "STD_Fault_Messages",
- "description": "Schema for job delivering fault messages from DMaaP Message Router",
- "type": "object",
- "properties": {}
+ "id": "STD_Fault_Messages",
+ "dmaapTopic": "unauthenticated.SEC_FAULT_OUTPUT",
+ "schema": {
+ "$schema": "https://json-schema.org/draft/2019-09/schema",
+ "title": "STD_Fault_Messages",
+ "description": "Schema for job delivering fault messages from DMaaP Message Router",
+ "type": "object",
+ "properties": {},
+ "additionalProperties": false
+ }
}
\ No newline at end of file
diff --git a/dmaap-mediator-producer/internal/config/config.go b/dmaap-mediator-producer/internal/config/config.go
index 3616c58..8a2784a 100644
--- a/dmaap-mediator-producer/internal/config/config.go
+++ b/dmaap-mediator-producer/internal/config/config.go
@@ -34,6 +34,8 @@
InfoJobCallbackHost string
InfoJobCallbackPort int
InfoCoordinatorAddress string
+ MRHost string
+ MRPort int
}
type ProducerRegistrationInfo struct {
@@ -50,6 +52,8 @@
InfoJobCallbackHost: getEnv("INFO_JOB_CALLBACK_HOST", ""),
InfoJobCallbackPort: getEnvAsInt("INFO_JOB_CALLBACK_PORT", 8086),
InfoCoordinatorAddress: getEnv("INFO_COORD_ADDR", "http://enrichmentservice:8083"),
+ MRHost: getEnv("MR_HOST", "http://message-router.onap"),
+ MRPort: getEnvAsInt("MR_PORT", 3904),
}
}
diff --git a/dmaap-mediator-producer/internal/config/config_test.go b/dmaap-mediator-producer/internal/config/config_test.go
index 4a65dc0..1043027 100644
--- a/dmaap-mediator-producer/internal/config/config_test.go
+++ b/dmaap-mediator-producer/internal/config/config_test.go
@@ -37,6 +37,8 @@
os.Setenv("INFO_JOB_CALLBACK_HOST", "jobCallbackHost")
os.Setenv("INFO_JOB_CALLBACK_PORT", "8096")
os.Setenv("INFO_COORD_ADDR", "infoCoordAddr")
+ os.Setenv("MR_HOST", "mrHost")
+ os.Setenv("MR_PORT", "3908")
t.Cleanup(func() {
os.Clearenv()
})
@@ -47,6 +49,8 @@
InfoJobCallbackHost: "jobCallbackHost",
InfoJobCallbackPort: 8096,
InfoCoordinatorAddress: "infoCoordAddr",
+ MRHost: "mrHost",
+ MRPort: 3908,
}
if got := New(); !reflect.DeepEqual(got, &wantConfig) {
t.Errorf("New() = %v, want %v", got, &wantConfig)
@@ -70,6 +74,8 @@
InfoJobCallbackHost: "",
InfoJobCallbackPort: 8086,
InfoCoordinatorAddress: "http://enrichmentservice:8083",
+ MRHost: "http://message-router.onap",
+ MRPort: 3904,
}
if got := New(); !reflect.DeepEqual(got, &wantConfig) {
t.Errorf("New() = %v, want %v", got, &wantConfig)
@@ -86,6 +92,8 @@
InfoJobCallbackHost: "",
InfoJobCallbackPort: 8086,
InfoCoordinatorAddress: "http://enrichmentservice:8083",
+ MRHost: "http://message-router.onap",
+ MRPort: 3904,
}
if got := New(); !reflect.DeepEqual(got, &wantConfig) {
t.Errorf("New() = %v, want %v", got, &wantConfig)
diff --git a/dmaap-mediator-producer/internal/jobs/jobs.go b/dmaap-mediator-producer/internal/jobs/jobs.go
index 7347178..eec59c3 100644
--- a/dmaap-mediator-producer/internal/jobs/jobs.go
+++ b/dmaap-mediator-producer/internal/jobs/jobs.go
@@ -21,15 +21,22 @@
package jobs
import (
+ "encoding/json"
"fmt"
"os"
"path/filepath"
"strings"
+ "sync"
+
+ log "github.com/sirupsen/logrus"
+ "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
)
type Type struct {
- TypeId string
- Schema string
+ TypeId string `json:"id"`
+ DMaaPTopic string `json:"dmaapTopic"`
+ Schema string `json:"schema"`
+ Jobs map[string]JobInfo
}
type JobInfo struct {
@@ -46,9 +53,10 @@
}
var (
+ mu sync.Mutex
typeDir = "configs"
Handler JobHandler
- allJobs = make(map[string]map[string]JobInfo)
+ allJobs = make(map[string]Type)
)
func init() {
@@ -62,8 +70,10 @@
}
func (jh *jobHandlerImpl) AddJob(ji JobInfo) error {
+ mu.Lock()
+ defer mu.Unlock()
if err := validateJobInfo(ji); err == nil {
- jobs := allJobs[ji.InfoTypeIdentity]
+ jobs := allJobs[ji.InfoTypeIdentity].Jobs
jobs[ji.InfoJobIdentity] = ji
return nil
} else {
@@ -86,6 +96,8 @@
}
func GetTypes() ([]*Type, error) {
+ mu.Lock()
+ defer mu.Unlock()
types := make([]*Type, 0, 1)
err := filepath.Walk(typeDir,
func(path string, info os.FileInfo, err error) error {
@@ -106,6 +118,8 @@
}
func GetSupportedTypes() []string {
+ mu.Lock()
+ defer mu.Unlock()
supportedTypes := []string{}
for k := range allJobs {
supportedTypes = append(supportedTypes, k)
@@ -118,23 +132,63 @@
}
func getType(path string) (*Type, error) {
- fileName := filepath.Base(path)
- typeName := strings.TrimSuffix(fileName, filepath.Ext(fileName))
-
- if typeSchema, err := os.ReadFile(path); err == nil {
- typeInfo := Type{
- TypeId: typeName,
- Schema: string(typeSchema),
+ if typeDefinition, err := os.ReadFile(path); err == nil {
+ var dat map[string]interface{}
+ if marshalError := json.Unmarshal(typeDefinition, &dat); marshalError == nil {
+ schema, _ := json.Marshal(dat["schema"])
+ typeInfo := Type{
+ TypeId: dat["id"].(string),
+ DMaaPTopic: dat["dmaapTopic"].(string),
+ Schema: string(schema),
+ Jobs: make(map[string]JobInfo),
+ }
+ if _, ok := allJobs[typeInfo.TypeId]; !ok {
+ allJobs[typeInfo.TypeId] = typeInfo
+ }
+ return &typeInfo, nil
+ } else {
+ return nil, marshalError
}
- if _, ok := allJobs[typeName]; !ok {
- allJobs[typeName] = make(map[string]JobInfo)
- }
- return &typeInfo, nil
} else {
return nil, err
}
}
+func RunJobs(mRAddress string) {
+ for {
+ pollAndDistributeMessages(mRAddress)
+ }
+}
+
+func pollAndDistributeMessages(mRAddress string) {
+ for typeId, typeInfo := range allJobs {
+ log.Debugf("Processing jobs for type: %v", typeId)
+ messagesBody, error := restclient.Get(fmt.Sprintf("%v/events/%v/users/dmaapmediatorproducer", mRAddress, typeInfo.DMaaPTopic))
+ if error != nil {
+ log.Warnf("Error getting data from MR. Cause: %v", error)
+ continue
+ }
+ distributeMessages(messagesBody, typeInfo)
+ }
+}
+
+func distributeMessages(messages []byte, typeInfo Type) {
+ if len(messages) > 2 {
+ mu.Lock()
+ for _, jobInfo := range typeInfo.Jobs {
+ go sendMessagesToConsumer(messages, jobInfo)
+ }
+ mu.Unlock()
+ }
+}
+
+func sendMessagesToConsumer(messages []byte, jobInfo JobInfo) {
+ log.Debugf("Processing job: %v", jobInfo.InfoJobIdentity)
+ if postErr := restclient.Post(jobInfo.TargetUri, messages); postErr != nil {
+ log.Warnf("Error posting data for job: %v. Cause: %v", jobInfo, postErr)
+ }
+}
+
func clearAll() {
- allJobs = make(map[string]map[string]JobInfo)
+ allJobs = make(map[string]Type)
}
diff --git a/dmaap-mediator-producer/internal/jobs/jobs_test.go b/dmaap-mediator-producer/internal/jobs/jobs_test.go
index 0941033..b53d85e 100644
--- a/dmaap-mediator-producer/internal/jobs/jobs_test.go
+++ b/dmaap-mediator-producer/internal/jobs/jobs_test.go
@@ -21,14 +21,21 @@
package jobs
import (
+ "bytes"
+ "io/ioutil"
+ "net/http"
"os"
"path/filepath"
"testing"
+ "time"
+ "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
+ "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
+ "oransc.org/nonrtric/dmaapmediatorproducer/mocks"
)
-const type1Schema = `{"title": "Type 1"}`
+const typeDefinition = `{"id": "type1", "dmaapTopic": "unauthenticated.SEC_FAULT_OUTPUT", "schema": {"title": "Type 1"}}`
func TestGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) {
assertions := require.New(t)
@@ -42,13 +49,15 @@
})
typeDir = typesDir
fname := filepath.Join(typesDir, "type1.json")
- if err = os.WriteFile(fname, []byte(type1Schema), 0666); err != nil {
+ if err = os.WriteFile(fname, []byte(typeDefinition), 0666); err != nil {
t.Errorf("Unable to create temporary files for types due to: %v", err)
}
types, err := GetTypes()
wantedType := Type{
- TypeId: "type1",
- Schema: type1Schema,
+ TypeId: "type1",
+ DMaaPTopic: "unauthenticated.SEC_FAULT_OUTPUT",
+ Schema: `{"title":"Type 1"}`,
+ Jobs: make(map[string]JobInfo),
}
wantedTypes := []*Type{&wantedType}
assertions.EqualValues(wantedTypes, types)
@@ -60,11 +69,7 @@
func TestAddJobWhenTypeIsSupported_shouldAddJobToAllJobsMap(t *testing.T) {
assertions := require.New(t)
- allJobs["type1"] = make(map[string]JobInfo)
- t.Cleanup(func() {
- clearAll()
- })
- jobInfo := JobInfo{
+ wantedJob := JobInfo{
Owner: "owner",
LastUpdated: "now",
InfoJobIdentity: "job1",
@@ -72,11 +77,18 @@
InfoJobData: "{}",
InfoTypeIdentity: "type1",
}
+ allJobs["type1"] = Type{
+ TypeId: "type1",
+ Jobs: map[string]JobInfo{"job1": wantedJob},
+ }
+ t.Cleanup(func() {
+ clearAll()
+ })
- err := AddJob(jobInfo)
+ err := AddJob(wantedJob)
assertions.Nil(err)
- assertions.Equal(1, len(allJobs["type1"]))
- assertions.Equal(jobInfo, allJobs["type1"]["job1"])
+ assertions.Equal(1, len(allJobs["type1"].Jobs))
+ assertions.Equal(wantedJob, allJobs["type1"].Jobs["job1"])
}
func TestAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) {
@@ -92,7 +104,9 @@
func TestAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) {
assertions := require.New(t)
- allJobs["type1"] = make(map[string]JobInfo)
+ allJobs["type1"] = Type{
+ TypeId: "type1",
+ }
t.Cleanup(func() {
clearAll()
})
@@ -107,7 +121,9 @@
func TestAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) {
assertions := require.New(t)
- allJobs["type1"] = make(map[string]JobInfo)
+ allJobs["type1"] = Type{
+ TypeId: "type1",
+ }
jobInfo := JobInfo{
InfoTypeIdentity: "type1",
InfoJobIdentity: "job1",
@@ -118,3 +134,53 @@
assertions.Equal("missing required target URI: { job1 type1}", err.Error())
clearAll()
}
+
+func TestPollAndDistributeMessages(t *testing.T) {
+ assertions := require.New(t)
+ jobInfo := JobInfo{
+ InfoTypeIdentity: "type1",
+ InfoJobIdentity: "job1",
+ TargetUri: "http://consumerHost/target",
+ }
+ allJobs["type1"] = Type{
+ TypeId: "type1",
+ DMaaPTopic: "topic",
+ Jobs: map[string]JobInfo{"job1": jobInfo},
+ }
+ t.Cleanup(func() {
+ clearAll()
+ })
+
+ body := ioutil.NopCloser(bytes.NewReader([]byte(`[{"message": {"data": "data"}}]`)))
+ clientMock := mocks.HTTPClient{}
+ clientMock.On("Get", mock.Anything).Return(&http.Response{
+ StatusCode: http.StatusOK,
+ Body: body,
+ }, nil)
+
+ clientMock.On("Do", mock.Anything).Return(&http.Response{
+ StatusCode: http.StatusOK,
+ }, nil)
+
+ restclient.Client = &clientMock
+
+ pollAndDistributeMessages("http://mrAddr")
+
+ time.Sleep(100 * time.Millisecond)
+
+ var actualRequest *http.Request
+ clientMock.AssertCalled(t, "Get", "http://mrAddr/events/topic/users/dmaapmediatorproducer")
+ clientMock.AssertNumberOfCalls(t, "Get", 1)
+
+ clientMock.AssertCalled(t, "Do", mock.MatchedBy(func(req *http.Request) bool {
+ actualRequest = req
+ return true
+ }))
+ assertions.Equal(http.MethodPost, actualRequest.Method)
+ assertions.Equal("consumerHost", actualRequest.URL.Host)
+ assertions.Equal("/target", actualRequest.URL.Path)
+ assertions.Equal("application/json; charset=utf-8", actualRequest.Header.Get("Content-Type"))
+ actualBody, _ := ioutil.ReadAll(actualRequest.Body)
+ assertions.Equal([]byte(`[{"message": {"data": "data"}}]`), actualBody)
+ clientMock.AssertNumberOfCalls(t, "Do", 1)
+}
diff --git a/dmaap-mediator-producer/internal/restclient/HTTPClient.go b/dmaap-mediator-producer/internal/restclient/HTTPClient.go
index 78a02b6..a783f7e 100644
--- a/dmaap-mediator-producer/internal/restclient/HTTPClient.go
+++ b/dmaap-mediator-producer/internal/restclient/HTTPClient.go
@@ -73,7 +73,15 @@
}
func Put(url string, body []byte) error {
- if req, reqErr := http.NewRequest(http.MethodPut, url, bytes.NewBuffer(body)); reqErr == nil {
+ return do(http.MethodPut, url, body)
+}
+
+func Post(url string, body []byte) error {
+ return do(http.MethodPost, url, body)
+}
+
+func do(method string, url string, body []byte) error {
+ if req, reqErr := http.NewRequest(method, url, bytes.NewBuffer(body)); reqErr == nil {
req.Header.Set("Content-Type", "application/json; charset=utf-8")
if response, respErr := Client.Do(req); respErr == nil {
if isResponseSuccess(response.StatusCode) {
diff --git a/dmaap-mediator-producer/internal/server/server_test.go b/dmaap-mediator-producer/internal/server/server_test.go
index d221c93..444deba 100644
--- a/dmaap-mediator-producer/internal/server/server_test.go
+++ b/dmaap-mediator-producer/internal/server/server_test.go
@@ -32,7 +32,7 @@
"github.com/stretchr/testify/require"
"oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
- "oransc.org/nonrtric/dmaapmediatorproducer/mocks"
+ "oransc.org/nonrtric/dmaapmediatorproducer/mocks/jobhandler"
)
func TestStatusHandler(t *testing.T) {
@@ -88,7 +88,7 @@
func TestCreateInfoJobHandler(t *testing.T) {
assertions := require.New(t)
- jobHandlerMock := mocks.JobHandler{}
+ jobHandlerMock := jobhandler.JobHandler{}
goodJobInfo := jobs.JobInfo{
Owner: "owner",
diff --git a/dmaap-mediator-producer/main.go b/dmaap-mediator-producer/main.go
index 3fe92dc..47e12e9 100644
--- a/dmaap-mediator-producer/main.go
+++ b/dmaap-mediator-producer/main.go
@@ -76,7 +76,7 @@
wg := new(sync.WaitGroup)
// add two goroutines to `wg` WaitGroup, one for each avilable server
- wg.Add(2)
+ wg.Add(3)
log.Debugf("Starting status callback server at port %v", configuration.InfoProducerSupervisionCallbackPort)
go func() {
@@ -91,6 +91,11 @@
wg.Done()
}()
+ go func() {
+ jobs.RunJobs(fmt.Sprintf("%v:%v", configuration.MRHost, configuration.MRPort))
+ wg.Done()
+ }()
+
// wait until WaitGroup is done
wg.Wait()
log.Debug("Stopping DMaaP Mediator Producer")
diff --git a/dmaap-mediator-producer/mocks/JobHandler.go b/dmaap-mediator-producer/mocks/jobhandler/JobHandler.go
similarity index 85%
rename from dmaap-mediator-producer/mocks/JobHandler.go
rename to dmaap-mediator-producer/mocks/jobhandler/JobHandler.go
index 4914e4d..edb2bf5 100644
--- a/dmaap-mediator-producer/mocks/JobHandler.go
+++ b/dmaap-mediator-producer/mocks/jobhandler/JobHandler.go
@@ -1,10 +1,10 @@
// Code generated by mockery v2.9.3. DO NOT EDIT.
-package mocks
+package jobhandler
import (
mock "github.com/stretchr/testify/mock"
- jobs "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
+ "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
)
// JobHandler is an autogenerated mock type for the JobHandler type
diff --git a/dmaap-mediator-producer/simulator/consumersimulator.go b/dmaap-mediator-producer/simulator/consumersimulator.go
new file mode 100644
index 0000000..25421ae
--- /dev/null
+++ b/dmaap-mediator-producer/simulator/consumersimulator.go
@@ -0,0 +1,40 @@
+// -
+// ========================LICENSE_START=================================
+// O-RAN-SC
+// %%
+// Copyright (C) 2021: Nordix Foundation
+// %%
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ========================LICENSE_END===================================
+//
+
+package main
+
+import (
+ "fmt"
+ "io"
+ http "net/http"
+)
+
+func handleData(w http.ResponseWriter, req *http.Request) {
+ defer req.Body.Close()
+ if reqData, err := io.ReadAll(req.Body); err == nil {
+ fmt.Printf("Consumer received body: %v\n", string(reqData))
+ }
+}
+
+func main() {
+ http.HandleFunc("/jobs", handleData)
+
+ http.ListenAndServe(":40935", nil)
+}