Improve DMaaP Mediator Producer
Issue-ID: NONRTRIC-586
Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
Change-Id: Iab915f878874b687b1c1b2effc05293582fb254c
diff --git a/dmaap-mediator-producer/.gitignore b/dmaap-mediator-producer/.gitignore
index 0d08f66..9f5396c 100644
--- a/dmaap-mediator-producer/.gitignore
+++ b/dmaap-mediator-producer/.gitignore
@@ -4,3 +4,4 @@
main
dmaapmediatorproducer
__debug_bin*
+simulator
diff --git a/dmaap-mediator-producer/configs/STD_Fault_Messages.json b/dmaap-mediator-producer/configs/STD_Fault_Messages.json
deleted file mode 100644
index 258d743..0000000
--- a/dmaap-mediator-producer/configs/STD_Fault_Messages.json
+++ /dev/null
@@ -1,12 +0,0 @@
-{
- "id": "STD_Fault_Messages",
- "dmaapTopic": "unauthenticated.SEC_FAULT_OUTPUT",
- "schema": {
- "$schema": "https://json-schema.org/draft/2019-09/schema",
- "title": "STD_Fault_Messages",
- "description": "Schema for job delivering fault messages from DMaaP Message Router",
- "type": "object",
- "properties": {},
- "additionalProperties": false
- }
-}
\ No newline at end of file
diff --git a/dmaap-mediator-producer/configs/type_config.json b/dmaap-mediator-producer/configs/type_config.json
new file mode 100644
index 0000000..983d0f3
--- /dev/null
+++ b/dmaap-mediator-producer/configs/type_config.json
@@ -0,0 +1,9 @@
+{
+ "types":
+ [
+ {
+ "id": "STD_Fault_Messages",
+ "dmaapTopicUrl": "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/STD_Fault_Messages"
+ }
+ ]
+}
\ No newline at end of file
diff --git a/dmaap-mediator-producer/internal/config/config.go b/dmaap-mediator-producer/internal/config/config.go
index 8a2784a..9b7b1dd 100644
--- a/dmaap-mediator-producer/internal/config/config.go
+++ b/dmaap-mediator-producer/internal/config/config.go
@@ -28,14 +28,12 @@
)
type Config struct {
- LogLevel string
- InfoProducerSupervisionCallbackHost string
- InfoProducerSupervisionCallbackPort int
- InfoJobCallbackHost string
- InfoJobCallbackPort int
- InfoCoordinatorAddress string
- MRHost string
- MRPort int
+ LogLevel string
+ InfoProducerHost string
+ InfoProducerPort int
+ InfoCoordinatorAddress string
+ MRHost string
+ MRPort int
}
type ProducerRegistrationInfo struct {
@@ -46,14 +44,12 @@
func New() *Config {
return &Config{
- LogLevel: getEnv("LOG_LEVEL", "Info"),
- InfoProducerSupervisionCallbackHost: getEnv("INFO_PRODUCER_SUPERVISION_CALLBACK_HOST", ""),
- InfoProducerSupervisionCallbackPort: getEnvAsInt("INFO_PRODUCER_SUPERVISION_CALLBACK_PORT", 8085),
- InfoJobCallbackHost: getEnv("INFO_JOB_CALLBACK_HOST", ""),
- InfoJobCallbackPort: getEnvAsInt("INFO_JOB_CALLBACK_PORT", 8086),
- InfoCoordinatorAddress: getEnv("INFO_COORD_ADDR", "http://enrichmentservice:8083"),
- MRHost: getEnv("MR_HOST", "http://message-router.onap"),
- MRPort: getEnvAsInt("MR_PORT", 3904),
+ LogLevel: getEnv("LOG_LEVEL", "Info"),
+ InfoProducerHost: getEnv("INFO_PRODUCER_HOST", ""),
+ InfoProducerPort: getEnvAsInt("INFO_PRODUCER_PORT", 8085),
+ InfoCoordinatorAddress: getEnv("INFO_COORD_ADDR", "http://enrichmentservice:8083"),
+ MRHost: getEnv("MR_HOST", "http://message-router.onap"),
+ MRPort: getEnvAsInt("MR_PORT", 3904),
}
}
diff --git a/dmaap-mediator-producer/internal/config/config_test.go b/dmaap-mediator-producer/internal/config/config_test.go
index 1043027..0fcbdd3 100644
--- a/dmaap-mediator-producer/internal/config/config_test.go
+++ b/dmaap-mediator-producer/internal/config/config_test.go
@@ -32,10 +32,8 @@
func TestNew_envVarsSetConfigContainSetValues(t *testing.T) {
os.Setenv("LOG_LEVEL", "Debug")
- os.Setenv("INFO_PRODUCER_SUPERVISION_CALLBACK_HOST", "supervisionCallbackHost")
- os.Setenv("INFO_PRODUCER_SUPERVISION_CALLBACK_PORT", "8095")
- os.Setenv("INFO_JOB_CALLBACK_HOST", "jobCallbackHost")
- os.Setenv("INFO_JOB_CALLBACK_PORT", "8096")
+ os.Setenv("INFO_PRODUCER_HOST", "producerHost")
+ os.Setenv("INFO_PRODUCER_PORT", "8095")
os.Setenv("INFO_COORD_ADDR", "infoCoordAddr")
os.Setenv("MR_HOST", "mrHost")
os.Setenv("MR_PORT", "3908")
@@ -43,14 +41,12 @@
os.Clearenv()
})
wantConfig := Config{
- LogLevel: "Debug",
- InfoProducerSupervisionCallbackHost: "supervisionCallbackHost",
- InfoProducerSupervisionCallbackPort: 8095,
- InfoJobCallbackHost: "jobCallbackHost",
- InfoJobCallbackPort: 8096,
- InfoCoordinatorAddress: "infoCoordAddr",
- MRHost: "mrHost",
- MRPort: 3908,
+ LogLevel: "Debug",
+ InfoProducerHost: "producerHost",
+ InfoProducerPort: 8095,
+ InfoCoordinatorAddress: "infoCoordAddr",
+ MRHost: "mrHost",
+ MRPort: 3908,
}
if got := New(); !reflect.DeepEqual(got, &wantConfig) {
t.Errorf("New() = %v, want %v", got, &wantConfig)
@@ -62,38 +58,34 @@
var buf bytes.Buffer
log.SetOutput(&buf)
- os.Setenv("INFO_PRODUCER_SUPERVISION_CALLBACK_PORT", "wrong")
+ os.Setenv("INFO_PRODUCER_PORT", "wrong")
t.Cleanup(func() {
log.SetOutput(os.Stderr)
os.Clearenv()
})
wantConfig := Config{
- LogLevel: "Info",
- InfoProducerSupervisionCallbackHost: "",
- InfoProducerSupervisionCallbackPort: 8085,
- InfoJobCallbackHost: "",
- InfoJobCallbackPort: 8086,
- InfoCoordinatorAddress: "http://enrichmentservice:8083",
- MRHost: "http://message-router.onap",
- MRPort: 3904,
+ LogLevel: "Info",
+ InfoProducerHost: "",
+ InfoProducerPort: 8085,
+ InfoCoordinatorAddress: "http://enrichmentservice:8083",
+ MRHost: "http://message-router.onap",
+ MRPort: 3904,
}
if got := New(); !reflect.DeepEqual(got, &wantConfig) {
t.Errorf("New() = %v, want %v", got, &wantConfig)
}
logString := buf.String()
- assertions.Contains(logString, "Invalid int value: wrong for variable: INFO_PRODUCER_SUPERVISION_CALLBACK_PORT. Default value: 8085 will be used")
+ assertions.Contains(logString, "Invalid int value: wrong for variable: INFO_PRODUCER_PORT. Default value: 8085 will be used")
}
func TestNew_envVarsNotSetConfigContainDefaultValues(t *testing.T) {
wantConfig := Config{
- LogLevel: "Info",
- InfoProducerSupervisionCallbackHost: "",
- InfoProducerSupervisionCallbackPort: 8085,
- InfoJobCallbackHost: "",
- InfoJobCallbackPort: 8086,
- InfoCoordinatorAddress: "http://enrichmentservice:8083",
- MRHost: "http://message-router.onap",
- MRPort: 3904,
+ LogLevel: "Info",
+ InfoProducerHost: "",
+ InfoProducerPort: 8085,
+ InfoCoordinatorAddress: "http://enrichmentservice:8083",
+ MRHost: "http://message-router.onap",
+ MRPort: 3904,
}
if got := New(); !reflect.DeepEqual(got, &wantConfig) {
t.Errorf("New() = %v, want %v", got, &wantConfig)
diff --git a/dmaap-mediator-producer/internal/config/registrator.go b/dmaap-mediator-producer/internal/config/registrator.go
index 37225ed..db46c54 100644
--- a/dmaap-mediator-producer/internal/config/registrator.go
+++ b/dmaap-mediator-producer/internal/config/registrator.go
@@ -33,9 +33,10 @@
const registerTypePath = "/data-producer/v1/info-types/"
const registerProducerPath = "/data-producer/v1/info-producers/"
+const typeSchema = `{"type": "object","properties": {},"additionalProperties": false}`
type Registrator interface {
- RegisterTypes(types []*jobs.Type) error
+ RegisterTypes(types []*jobs.TypeData) error
RegisterProducer(producerId string, producerInfo *ProducerRegistrationInfo)
}
@@ -49,9 +50,9 @@
}
}
-func (r RegistratorImpl) RegisterTypes(jobTypes []*jobs.Type) error {
+func (r RegistratorImpl) RegisterTypes(jobTypes []jobs.TypeData) error {
for _, jobType := range jobTypes {
- body := fmt.Sprintf(`{"info_job_data_schema": %v}`, jobType.Schema)
+ body := fmt.Sprintf(`{"info_job_data_schema": %v}`, typeSchema)
if error := restclient.Put(r.infoCoordinatorAddress+registerTypePath+url.PathEscape(jobType.TypeId), []byte(body)); error != nil {
return error
}
diff --git a/dmaap-mediator-producer/internal/config/registrator_test.go b/dmaap-mediator-producer/internal/config/registrator_test.go
index a89c43f..353e9de 100644
--- a/dmaap-mediator-producer/internal/config/registrator_test.go
+++ b/dmaap-mediator-producer/internal/config/registrator_test.go
@@ -43,11 +43,10 @@
restclient.Client = &clientMock
- type1 := jobs.Type{
+ type1 := jobs.TypeData{
TypeId: "Type1",
- Schema: `{"title": "Type 1"}`,
}
- types := []*jobs.Type{&type1}
+ types := []jobs.TypeData{type1}
r := NewRegistratorImpl("http://localhost:9990")
err := r.RegisterTypes(types)
@@ -64,7 +63,7 @@
assertions.Equal("/data-producer/v1/info-types/Type1", actualRequest.URL.Path)
assertions.Equal("application/json; charset=utf-8", actualRequest.Header.Get("Content-Type"))
body, _ := ioutil.ReadAll(actualRequest.Body)
- expectedBody := []byte(`{"info_job_data_schema": {"title": "Type 1"}}`)
+ expectedBody := []byte(`{"info_job_data_schema": {"type": "object","properties": {},"additionalProperties": false}}`)
assertions.Equal(expectedBody, body)
clientMock.AssertNumberOfCalls(t, "Do", 1)
}
diff --git a/dmaap-mediator-producer/internal/jobs/jobs.go b/dmaap-mediator-producer/internal/jobs/jobs.go
index eec59c3..09d3891 100644
--- a/dmaap-mediator-producer/internal/jobs/jobs.go
+++ b/dmaap-mediator-producer/internal/jobs/jobs.go
@@ -24,28 +24,33 @@
"encoding/json"
"fmt"
"os"
- "path/filepath"
- "strings"
"sync"
log "github.com/sirupsen/logrus"
"oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
)
-type Type struct {
- TypeId string `json:"id"`
- DMaaPTopic string `json:"dmaapTopic"`
- Schema string `json:"schema"`
- Jobs map[string]JobInfo
+type TypeDefinitions struct {
+ Types []TypeDefinition `json:"types"`
+}
+type TypeDefinition struct {
+ Id string `json:"id"`
+ DmaapTopicURL string `json:"dmaapTopicUrl"`
+}
+
+type TypeData struct {
+ TypeId string `json:"id"`
+ DMaaPTopicURL string `json:"dmaapTopicUrl"`
+ Jobs map[string]JobInfo
}
type JobInfo struct {
- Owner string `json:"owner"`
- LastUpdated string `json:"last_updated"`
- InfoJobIdentity string `json:"info_job_identity"`
- TargetUri string `json:"target_uri"`
- InfoJobData string `json:"info_job_data"`
- InfoTypeIdentity string `json:"info_type_identity"`
+ Owner string `json:"owner"`
+ LastUpdated string `json:"last_updated"`
+ InfoJobIdentity string `json:"info_job_identity"`
+ TargetUri string `json:"target_uri"`
+ InfoJobData interface{} `json:"info_job_data"`
+ InfoTypeIdentity string `json:"info_type_identity"`
}
type JobHandler interface {
@@ -53,10 +58,10 @@
}
var (
- mu sync.Mutex
- typeDir = "configs"
- Handler JobHandler
- allJobs = make(map[string]Type)
+ mu sync.Mutex
+ configFile = "configs/type_config.json"
+ Handler JobHandler
+ allTypes = make(map[string]TypeData)
)
func init() {
@@ -73,8 +78,9 @@
mu.Lock()
defer mu.Unlock()
if err := validateJobInfo(ji); err == nil {
- jobs := allJobs[ji.InfoTypeIdentity].Jobs
+ jobs := allTypes[ji.InfoTypeIdentity].Jobs
jobs[ji.InfoJobIdentity] = ji
+ log.Debug("Added job: ", ji)
return nil
} else {
return err
@@ -82,7 +88,7 @@
}
func validateJobInfo(ji JobInfo) error {
- if _, ok := allJobs[ji.InfoTypeIdentity]; !ok {
+ if _, ok := allTypes[ji.InfoTypeIdentity]; !ok {
return fmt.Errorf("type not supported: %v", ji.InfoTypeIdentity)
}
if ji.InfoJobIdentity == "" {
@@ -95,25 +101,30 @@
return nil
}
-func GetTypes() ([]*Type, error) {
+func GetTypes() ([]TypeData, error) {
mu.Lock()
defer mu.Unlock()
- types := make([]*Type, 0, 1)
- err := filepath.Walk(typeDir,
- func(path string, info os.FileInfo, err error) error {
- if err != nil {
- return err
- }
- if strings.Contains(path, ".json") {
- if jobType, err := getType(path); err == nil {
- types = append(types, jobType)
- }
- }
- return nil
- })
+ types := make([]TypeData, 0, 1)
+ typeDefsByte, err := os.ReadFile(configFile)
if err != nil {
return nil, err
}
+ typeDefs := TypeDefinitions{}
+ err = json.Unmarshal(typeDefsByte, &typeDefs)
+ if err != nil {
+ return nil, err
+ }
+ for _, typeDef := range typeDefs.Types {
+ typeInfo := TypeData{
+ TypeId: typeDef.Id,
+ DMaaPTopicURL: typeDef.DmaapTopicURL,
+ Jobs: make(map[string]JobInfo),
+ }
+ if _, ok := allTypes[typeInfo.TypeId]; !ok {
+ allTypes[typeInfo.TypeId] = typeInfo
+ }
+ types = append(types, typeInfo)
+ }
return types, nil
}
@@ -121,7 +132,7 @@
mu.Lock()
defer mu.Unlock()
supportedTypes := []string{}
- for k := range allJobs {
+ for k := range allTypes {
supportedTypes = append(supportedTypes, k)
}
return supportedTypes
@@ -131,29 +142,6 @@
return Handler.AddJob(job)
}
-func getType(path string) (*Type, error) {
- if typeDefinition, err := os.ReadFile(path); err == nil {
- var dat map[string]interface{}
- if marshalError := json.Unmarshal(typeDefinition, &dat); marshalError == nil {
- schema, _ := json.Marshal(dat["schema"])
- typeInfo := Type{
- TypeId: dat["id"].(string),
- DMaaPTopic: dat["dmaapTopic"].(string),
- Schema: string(schema),
- Jobs: make(map[string]JobInfo),
- }
- if _, ok := allJobs[typeInfo.TypeId]; !ok {
- allJobs[typeInfo.TypeId] = typeInfo
- }
- return &typeInfo, nil
- } else {
- return nil, marshalError
- }
- } else {
- return nil, err
- }
-}
-
func RunJobs(mRAddress string) {
for {
pollAndDistributeMessages(mRAddress)
@@ -161,9 +149,9 @@
}
func pollAndDistributeMessages(mRAddress string) {
- for typeId, typeInfo := range allJobs {
+ for typeId, typeInfo := range allTypes {
log.Debugf("Processing jobs for type: %v", typeId)
- messagesBody, error := restclient.Get(fmt.Sprintf("%v/events/%v/users/dmaapmediatorproducer", mRAddress, typeInfo.DMaaPTopic))
+ messagesBody, error := restclient.Get(fmt.Sprintf("%v/%v", mRAddress, typeInfo.DMaaPTopicURL))
if error != nil {
log.Warnf("Error getting data from MR. Cause: %v", error)
continue
@@ -172,7 +160,7 @@
}
}
-func distributeMessages(messages []byte, typeInfo Type) {
+func distributeMessages(messages []byte, typeInfo TypeData) {
if len(messages) > 2 {
mu.Lock()
for _, jobInfo := range typeInfo.Jobs {
@@ -190,5 +178,5 @@
}
func clearAll() {
- allJobs = make(map[string]Type)
+ allTypes = make(map[string]TypeData)
}
diff --git a/dmaap-mediator-producer/internal/jobs/jobs_test.go b/dmaap-mediator-producer/internal/jobs/jobs_test.go
index b53d85e..3bb2578 100644
--- a/dmaap-mediator-producer/internal/jobs/jobs_test.go
+++ b/dmaap-mediator-producer/internal/jobs/jobs_test.go
@@ -26,16 +26,15 @@
"net/http"
"os"
"path/filepath"
+ "sync"
"testing"
"time"
- "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
- "oransc.org/nonrtric/dmaapmediatorproducer/mocks"
)
-const typeDefinition = `{"id": "type1", "dmaapTopic": "unauthenticated.SEC_FAULT_OUTPUT", "schema": {"title": "Type 1"}}`
+const typeDefinition = `{"types": [{"id": "type1", "dmaapTopicUrl": "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1"}]}`
func TestGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) {
assertions := require.New(t)
@@ -47,19 +46,18 @@
os.RemoveAll(typesDir)
clearAll()
})
- typeDir = typesDir
- fname := filepath.Join(typesDir, "type1.json")
+ fname := filepath.Join(typesDir, "type_config.json")
+ configFile = fname
if err = os.WriteFile(fname, []byte(typeDefinition), 0666); err != nil {
- t.Errorf("Unable to create temporary files for types due to: %v", err)
+ t.Errorf("Unable to create temporary config file for types due to: %v", err)
}
types, err := GetTypes()
- wantedType := Type{
- TypeId: "type1",
- DMaaPTopic: "unauthenticated.SEC_FAULT_OUTPUT",
- Schema: `{"title":"Type 1"}`,
- Jobs: make(map[string]JobInfo),
+ wantedType := TypeData{
+ TypeId: "type1",
+ DMaaPTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1",
+ Jobs: make(map[string]JobInfo),
}
- wantedTypes := []*Type{&wantedType}
+ wantedTypes := []TypeData{wantedType}
assertions.EqualValues(wantedTypes, types)
assertions.Nil(err)
@@ -77,7 +75,7 @@
InfoJobData: "{}",
InfoTypeIdentity: "type1",
}
- allJobs["type1"] = Type{
+ allTypes["type1"] = TypeData{
TypeId: "type1",
Jobs: map[string]JobInfo{"job1": wantedJob},
}
@@ -87,8 +85,8 @@
err := AddJob(wantedJob)
assertions.Nil(err)
- assertions.Equal(1, len(allJobs["type1"].Jobs))
- assertions.Equal(wantedJob, allJobs["type1"].Jobs["job1"])
+ assertions.Equal(1, len(allTypes["type1"].Jobs))
+ assertions.Equal(wantedJob, allTypes["type1"].Jobs["job1"])
}
func TestAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) {
@@ -104,7 +102,7 @@
func TestAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) {
assertions := require.New(t)
- allJobs["type1"] = Type{
+ allTypes["type1"] = TypeData{
TypeId: "type1",
}
t.Cleanup(func() {
@@ -116,12 +114,12 @@
err := AddJob(jobInfo)
assertions.NotNil(err)
- assertions.Equal("missing required job identity: { type1}", err.Error())
+ assertions.Equal("missing required job identity: { <nil> type1}", err.Error())
}
func TestAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) {
assertions := require.New(t)
- allJobs["type1"] = Type{
+ allTypes["type1"] = TypeData{
TypeId: "type1",
}
jobInfo := JobInfo{
@@ -131,10 +129,9 @@
err := AddJob(jobInfo)
assertions.NotNil(err)
- assertions.Equal("missing required target URI: { job1 type1}", err.Error())
+ assertions.Equal("missing required target URI: { job1 <nil> type1}", err.Error())
clearAll()
}
-
func TestPollAndDistributeMessages(t *testing.T) {
assertions := require.New(t)
jobInfo := JobInfo{
@@ -142,45 +139,84 @@
InfoJobIdentity: "job1",
TargetUri: "http://consumerHost/target",
}
- allJobs["type1"] = Type{
- TypeId: "type1",
- DMaaPTopic: "topic",
- Jobs: map[string]JobInfo{"job1": jobInfo},
+ allTypes["type1"] = TypeData{
+ TypeId: "type1",
+ DMaaPTopicURL: "topicUrl",
+ Jobs: map[string]JobInfo{"job1": jobInfo},
}
t.Cleanup(func() {
clearAll()
})
- body := ioutil.NopCloser(bytes.NewReader([]byte(`[{"message": {"data": "data"}}]`)))
- clientMock := mocks.HTTPClient{}
- clientMock.On("Get", mock.Anything).Return(&http.Response{
- StatusCode: http.StatusOK,
- Body: body,
- }, nil)
+ wg := sync.WaitGroup{}
+ wg.Add(2) // Two calls should be made to the server, one to poll and one to distribute
+ messages := `[{"message": {"data": "data"}}]`
+ clientMock := NewTestClient(func(req *http.Request) *http.Response {
+ if req.URL.String() == "http://mrAddr/topicUrl" {
+ assertions.Equal(req.Method, "GET")
+ wg.Done()
+ return &http.Response{
+ StatusCode: 200,
+ Body: ioutil.NopCloser(bytes.NewReader([]byte(messages))),
+ Header: make(http.Header), // Must be set to non-nil value or it panics
+ }
+ } else if req.URL.String() == "http://consumerHost/target" {
+ assertions.Equal(req.Method, "POST")
+ assertions.Equal(messages, getBodyAsString(req))
+ assertions.Equal("application/json; charset=utf-8", req.Header.Get("Content-Type"))
+ wg.Done()
+ return &http.Response{
+ StatusCode: 200,
+ Body: ioutil.NopCloser(bytes.NewBufferString(`OK`)),
+ Header: make(http.Header), // Must be set to non-nil value or it panics
+ }
+ }
+ t.Error("Wrong call to client: ", req)
+ t.Fail()
+ return nil
+ })
- clientMock.On("Do", mock.Anything).Return(&http.Response{
- StatusCode: http.StatusOK,
- }, nil)
-
- restclient.Client = &clientMock
+ restclient.Client = clientMock
pollAndDistributeMessages("http://mrAddr")
- time.Sleep(100 * time.Millisecond)
+ if waitTimeout(&wg, 100*time.Millisecond) {
+ t.Error("Not all calls to server were made")
+ t.Fail()
+ }
+}
- var actualRequest *http.Request
- clientMock.AssertCalled(t, "Get", "http://mrAddr/events/topic/users/dmaapmediatorproducer")
- clientMock.AssertNumberOfCalls(t, "Get", 1)
+type RoundTripFunc func(req *http.Request) *http.Response
- clientMock.AssertCalled(t, "Do", mock.MatchedBy(func(req *http.Request) bool {
- actualRequest = req
- return true
- }))
- assertions.Equal(http.MethodPost, actualRequest.Method)
- assertions.Equal("consumerHost", actualRequest.URL.Host)
- assertions.Equal("/target", actualRequest.URL.Path)
- assertions.Equal("application/json; charset=utf-8", actualRequest.Header.Get("Content-Type"))
- actualBody, _ := ioutil.ReadAll(actualRequest.Body)
- assertions.Equal([]byte(`[{"message": {"data": "data"}}]`), actualBody)
- clientMock.AssertNumberOfCalls(t, "Do", 1)
+func (f RoundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) {
+ return f(req), nil
+}
+
+//NewTestClient returns *http.Client with Transport replaced to avoid making real calls
+func NewTestClient(fn RoundTripFunc) *http.Client {
+ return &http.Client{
+ Transport: RoundTripFunc(fn),
+ }
+}
+
+// waitTimeout waits for the waitgroup for the specified max timeout.
+// Returns true if waiting timed out.
+func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
+ c := make(chan struct{})
+ go func() {
+ defer close(c)
+ wg.Wait()
+ }()
+ select {
+ case <-c:
+ return false // completed normally
+ case <-time.After(timeout):
+ return true // timed out
+ }
+}
+
+func getBodyAsString(req *http.Request) string {
+ buf := new(bytes.Buffer)
+ buf.ReadFrom(req.Body)
+ return buf.String()
}
diff --git a/dmaap-mediator-producer/internal/server/server.go b/dmaap-mediator-producer/internal/server/server.go
index c3a1331..0b5e5b8 100644
--- a/dmaap-mediator-producer/internal/server/server.go
+++ b/dmaap-mediator-producer/internal/server/server.go
@@ -29,8 +29,11 @@
"oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
)
+const StatusCallbackPath = "/status"
+const JobsCallbackPath = "/jobs"
+
func StatusHandler(w http.ResponseWriter, r *http.Request) {
- if r.URL.Path != "/" {
+ if r.URL.Path != StatusCallbackPath {
http.Error(w, "404 not found.", http.StatusNotFound)
return
}
@@ -44,7 +47,7 @@
}
func CreateInfoJobHandler(w http.ResponseWriter, r *http.Request) {
- if r.URL.Path != "/producer_simulator/info_job" {
+ if r.URL.Path != JobsCallbackPath {
http.Error(w, "404 not found.", http.StatusNotFound)
return
}
@@ -68,14 +71,3 @@
http.Error(w, fmt.Sprintf("Invalid job info. Cause: %v", err), http.StatusBadRequest)
}
}
-
-func CreateServer(port int, handlerFunc func(http.ResponseWriter, *http.Request)) *http.Server {
-
- mux := http.NewServeMux()
- mux.HandleFunc("/", handlerFunc)
- server := http.Server{
- Addr: fmt.Sprintf(":%v", port), // :{port}
- Handler: mux,
- }
- return &server
-}
diff --git a/dmaap-mediator-producer/internal/server/server_test.go b/dmaap-mediator-producer/internal/server/server_test.go
index 444deba..a4b19c4 100644
--- a/dmaap-mediator-producer/internal/server/server_test.go
+++ b/dmaap-mediator-producer/internal/server/server_test.go
@@ -51,7 +51,7 @@
name: "StatusHandler with correct path and method, should return OK",
args: args{
responseRecorder: httptest.NewRecorder(),
- r: newRequest("GET", "/", nil, t),
+ r: newRequest("GET", "/status", nil, t),
},
wantedStatus: http.StatusOK,
wantedBody: "All is well!",
@@ -69,7 +69,7 @@
name: "StatusHandler with incorrect method, should return MethodNotAllowed",
args: args{
responseRecorder: httptest.NewRecorder(),
- r: newRequest("PUT", "/", nil, t),
+ r: newRequest("PUT", "/status", nil, t),
},
wantedStatus: http.StatusMethodNotAllowed,
wantedBody: "Method is not supported.\n",
@@ -119,7 +119,7 @@
name: "CreateInfoJobHandler with correct path and method, should return OK",
args: args{
responseRecorder: httptest.NewRecorder(),
- r: newRequest("POST", "/producer_simulator/info_job", &goodJobInfo, t),
+ r: newRequest("POST", "/jobs", &goodJobInfo, t),
},
wantedStatus: http.StatusOK,
wantedBody: "",
@@ -128,7 +128,7 @@
name: "CreateInfoJobHandler with incorrect job info, should return BadRequest",
args: args{
responseRecorder: httptest.NewRecorder(),
- r: newRequest("POST", "/producer_simulator/info_job", &badJobInfo, t),
+ r: newRequest("POST", "/jobs", &badJobInfo, t),
},
wantedStatus: http.StatusBadRequest,
wantedBody: "Invalid job info. Cause: error",
@@ -146,7 +146,7 @@
name: "CreateInfoJobHandler with incorrect method, should return MethodNotAllowed",
args: args{
responseRecorder: httptest.NewRecorder(),
- r: newRequest("PUT", "/producer_simulator/info_job", nil, t),
+ r: newRequest("PUT", "/jobs", nil, t),
},
wantedStatus: http.StatusMethodNotAllowed,
wantedBody: "Method is not supported.",
diff --git a/dmaap-mediator-producer/main.go b/dmaap-mediator-producer/main.go
index 47e12e9..79fcb6b 100644
--- a/dmaap-mediator-producer/main.go
+++ b/dmaap-mediator-producer/main.go
@@ -22,6 +22,7 @@
import (
"fmt"
+ "net/http"
"sync"
log "github.com/sirupsen/logrus"
@@ -31,8 +32,7 @@
)
var configuration *config.Config
-var supervisionCallbackAddress string
-var jobInfoCallbackAddress string
+var callbackAddress string
func init() {
configuration = config.New()
@@ -43,15 +43,10 @@
}
log.Debug("Initializing DMaaP Mediator Producer")
- if configuration.InfoProducerSupervisionCallbackHost == "" {
+ if configuration.InfoProducerHost == "" {
log.Fatal("Missing INFO_PRODUCER_SUPERVISION_CALLBACK_HOST")
}
- supervisionCallbackAddress = fmt.Sprintf("%v:%v", configuration.InfoProducerSupervisionCallbackHost, configuration.InfoProducerSupervisionCallbackPort)
-
- if configuration.InfoJobCallbackHost == "" {
- log.Fatal("Missing INFO_JOB_CALLBACK_HOST")
- }
- jobInfoCallbackAddress = fmt.Sprintf("%v:%v", configuration.InfoJobCallbackHost, configuration.InfoJobCallbackPort)
+ callbackAddress = fmt.Sprintf("%v:%v", configuration.InfoProducerHost, configuration.InfoProducerPort)
registrator := config.NewRegistratorImpl(configuration.InfoCoordinatorAddress)
if types, err := jobs.GetTypes(); err == nil {
@@ -62,9 +57,9 @@
log.Fatalf("Unable to get types to register due to: %v", err)
}
producer := config.ProducerRegistrationInfo{
- InfoProducerSupervisionCallbackUrl: supervisionCallbackAddress,
+ InfoProducerSupervisionCallbackUrl: callbackAddress + server.StatusCallbackPath,
SupportedInfoTypes: jobs.GetSupportedTypes(),
- InfoJobCallbackUrl: jobInfoCallbackAddress,
+ InfoJobCallbackUrl: callbackAddress + server.JobsCallbackPath,
}
if err := registrator.RegisterProducer("DMaaP_Mediator_Producer", &producer); err != nil {
log.Fatalf("Unable to register producer due to: %v", err)
@@ -75,19 +70,14 @@
log.Debug("Starting DMaaP Mediator Producer")
wg := new(sync.WaitGroup)
- // add two goroutines to `wg` WaitGroup, one for each avilable server
- wg.Add(3)
+ // add two goroutines to `wg` WaitGroup, one for each running go routine
+ wg.Add(2)
- log.Debugf("Starting status callback server at port %v", configuration.InfoProducerSupervisionCallbackPort)
+ log.Debugf("Starting callback server at port %v", configuration.InfoProducerPort)
go func() {
- server := server.CreateServer(configuration.InfoProducerSupervisionCallbackPort, server.StatusHandler)
- log.Warn(server.ListenAndServe())
- wg.Done()
- }()
-
- go func() {
- server := server.CreateServer(configuration.InfoJobCallbackPort, server.CreateInfoJobHandler)
- log.Warn(server.ListenAndServe())
+ http.HandleFunc(server.StatusCallbackPath, server.StatusHandler)
+ http.HandleFunc(server.JobsCallbackPath, server.CreateInfoJobHandler)
+ log.Warn(http.ListenAndServe(fmt.Sprintf(":%v", configuration.InfoProducerPort), nil))
wg.Done()
}()
diff --git a/dmaap-mediator-producer/simulator/consumersimulator.go b/dmaap-mediator-producer/simulator/consumersimulator.go
index 25421ae..144f56f 100644
--- a/dmaap-mediator-producer/simulator/consumersimulator.go
+++ b/dmaap-mediator-producer/simulator/consumersimulator.go
@@ -21,20 +21,43 @@
package main
import (
+ "encoding/json"
+ "flag"
"fmt"
"io"
http "net/http"
+
+ "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
)
+func main() {
+ port := flag.Int("port", 40935, "The port this consumer will listen on")
+ flag.Parse()
+ http.HandleFunc("/jobs", handleData)
+
+ fmt.Print("Starting consumer on port: ", *port)
+ http.ListenAndServe(fmt.Sprintf(":%v", *port), nil)
+ registerJob(*port)
+}
+
+func registerJob(port int) {
+ jobInfo := struct {
+ JobOwner string `json:"job_owner"`
+ JobResultUri string `json:"job_result_uri"`
+ InfoTypeId string `json:"info_type_id"`
+ JobDefinition string `json:"job_definition"`
+ }{fmt.Sprintf("test%v", port), fmt.Sprintf("http://localhost:%v/jobs", port), "STD_Fault_Messages", "{}"}
+ fmt.Print("Registering consumer: ", jobInfo)
+ body, _ := json.Marshal(jobInfo)
+ putErr := restclient.Put(fmt.Sprintf("http://localhost:8083/data-consumer/v1/info-jobs/job%v", port), body)
+ if putErr != nil {
+ fmt.Printf("Unable to register consumer: %v", putErr)
+ }
+}
+
func handleData(w http.ResponseWriter, req *http.Request) {
defer req.Body.Close()
if reqData, err := io.ReadAll(req.Body); err == nil {
- fmt.Printf("Consumer received body: %v\n", string(reqData))
+ fmt.Print("Consumer received body: ", string(reqData))
}
}
-
-func main() {
- http.HandleFunc("/jobs", handleData)
-
- http.ListenAndServe(":40935", nil)
-}