Poll MR and send messages to consumers

Issue-ID: NONRTRIC-586
Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
Change-Id: I7e261aedb1a528c193390f6a3e99a49d7783d35e
diff --git a/.gitignore b/.gitignore
index df309a1..5915080 100644
--- a/.gitignore
+++ b/.gitignore
@@ -18,3 +18,5 @@
 
 .vscode
 .factorypath
+
+coverage.*
diff --git a/dmaap-mediator-producer/configs/STD_Fault_Messages.json b/dmaap-mediator-producer/configs/STD_Fault_Messages.json
index b944802..258d743 100644
--- a/dmaap-mediator-producer/configs/STD_Fault_Messages.json
+++ b/dmaap-mediator-producer/configs/STD_Fault_Messages.json
@@ -1,7 +1,12 @@
 {
-  "$schema": "https://json-schema.org/draft/2019-09/schema",
-  "title": "STD_Fault_Messages",
-  "description": "Schema for job delivering fault messages from DMaaP Message Router",
-  "type": "object",
-  "properties": {}
+  "id": "STD_Fault_Messages",
+  "dmaapTopic": "unauthenticated.SEC_FAULT_OUTPUT",
+  "schema": {
+    "$schema": "https://json-schema.org/draft/2019-09/schema",
+    "title": "STD_Fault_Messages",
+    "description": "Schema for job delivering fault messages from DMaaP Message Router",
+    "type": "object",
+    "properties": {},
+    "additionalProperties": false
+  }
 }
\ No newline at end of file
diff --git a/dmaap-mediator-producer/internal/config/config.go b/dmaap-mediator-producer/internal/config/config.go
index 3616c58..8a2784a 100644
--- a/dmaap-mediator-producer/internal/config/config.go
+++ b/dmaap-mediator-producer/internal/config/config.go
@@ -34,6 +34,8 @@
 	InfoJobCallbackHost                 string
 	InfoJobCallbackPort                 int
 	InfoCoordinatorAddress              string
+	MRHost                              string
+	MRPort                              int
 }
 
 type ProducerRegistrationInfo struct {
@@ -50,6 +52,8 @@
 		InfoJobCallbackHost:                 getEnv("INFO_JOB_CALLBACK_HOST", ""),
 		InfoJobCallbackPort:                 getEnvAsInt("INFO_JOB_CALLBACK_PORT", 8086),
 		InfoCoordinatorAddress:              getEnv("INFO_COORD_ADDR", "http://enrichmentservice:8083"),
+		MRHost:                              getEnv("MR_HOST", "http://message-router.onap"),
+		MRPort:                              getEnvAsInt("MR_PORT", 3904),
 	}
 }
 
diff --git a/dmaap-mediator-producer/internal/config/config_test.go b/dmaap-mediator-producer/internal/config/config_test.go
index 4a65dc0..1043027 100644
--- a/dmaap-mediator-producer/internal/config/config_test.go
+++ b/dmaap-mediator-producer/internal/config/config_test.go
@@ -37,6 +37,8 @@
 	os.Setenv("INFO_JOB_CALLBACK_HOST", "jobCallbackHost")
 	os.Setenv("INFO_JOB_CALLBACK_PORT", "8096")
 	os.Setenv("INFO_COORD_ADDR", "infoCoordAddr")
+	os.Setenv("MR_HOST", "mrHost")
+	os.Setenv("MR_PORT", "3908")
 	t.Cleanup(func() {
 		os.Clearenv()
 	})
@@ -47,6 +49,8 @@
 		InfoJobCallbackHost:                 "jobCallbackHost",
 		InfoJobCallbackPort:                 8096,
 		InfoCoordinatorAddress:              "infoCoordAddr",
+		MRHost:                              "mrHost",
+		MRPort:                              3908,
 	}
 	if got := New(); !reflect.DeepEqual(got, &wantConfig) {
 		t.Errorf("New() = %v, want %v", got, &wantConfig)
@@ -70,6 +74,8 @@
 		InfoJobCallbackHost:                 "",
 		InfoJobCallbackPort:                 8086,
 		InfoCoordinatorAddress:              "http://enrichmentservice:8083",
+		MRHost:                              "http://message-router.onap",
+		MRPort:                              3904,
 	}
 	if got := New(); !reflect.DeepEqual(got, &wantConfig) {
 		t.Errorf("New() = %v, want %v", got, &wantConfig)
@@ -86,6 +92,8 @@
 		InfoJobCallbackHost:                 "",
 		InfoJobCallbackPort:                 8086,
 		InfoCoordinatorAddress:              "http://enrichmentservice:8083",
+		MRHost:                              "http://message-router.onap",
+		MRPort:                              3904,
 	}
 	if got := New(); !reflect.DeepEqual(got, &wantConfig) {
 		t.Errorf("New() = %v, want %v", got, &wantConfig)
diff --git a/dmaap-mediator-producer/internal/jobs/jobs.go b/dmaap-mediator-producer/internal/jobs/jobs.go
index 7347178..eec59c3 100644
--- a/dmaap-mediator-producer/internal/jobs/jobs.go
+++ b/dmaap-mediator-producer/internal/jobs/jobs.go
@@ -21,15 +21,22 @@
 package jobs
 
 import (
+	"encoding/json"
 	"fmt"
 	"os"
 	"path/filepath"
 	"strings"
+	"sync"
+
+	log "github.com/sirupsen/logrus"
+	"oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
 )
 
 type Type struct {
-	TypeId string
-	Schema string
+	TypeId     string `json:"id"`
+	DMaaPTopic string `json:"dmaapTopic"`
+	Schema     string `json:"schema"`
+	Jobs       map[string]JobInfo
 }
 
 type JobInfo struct {
@@ -46,9 +53,10 @@
 }
 
 var (
+	mu      sync.Mutex
 	typeDir = "configs"
 	Handler JobHandler
-	allJobs = make(map[string]map[string]JobInfo)
+	allJobs = make(map[string]Type)
 )
 
 func init() {
@@ -62,8 +70,10 @@
 }
 
 func (jh *jobHandlerImpl) AddJob(ji JobInfo) error {
+	mu.Lock()
+	defer mu.Unlock()
 	if err := validateJobInfo(ji); err == nil {
-		jobs := allJobs[ji.InfoTypeIdentity]
+		jobs := allJobs[ji.InfoTypeIdentity].Jobs
 		jobs[ji.InfoJobIdentity] = ji
 		return nil
 	} else {
@@ -86,6 +96,8 @@
 }
 
 func GetTypes() ([]*Type, error) {
+	mu.Lock()
+	defer mu.Unlock()
 	types := make([]*Type, 0, 1)
 	err := filepath.Walk(typeDir,
 		func(path string, info os.FileInfo, err error) error {
@@ -106,6 +118,8 @@
 }
 
 func GetSupportedTypes() []string {
+	mu.Lock()
+	defer mu.Unlock()
 	supportedTypes := []string{}
 	for k := range allJobs {
 		supportedTypes = append(supportedTypes, k)
@@ -118,23 +132,63 @@
 }
 
 func getType(path string) (*Type, error) {
-	fileName := filepath.Base(path)
-	typeName := strings.TrimSuffix(fileName, filepath.Ext(fileName))
-
-	if typeSchema, err := os.ReadFile(path); err == nil {
-		typeInfo := Type{
-			TypeId: typeName,
-			Schema: string(typeSchema),
+	if typeDefinition, err := os.ReadFile(path); err == nil {
+		var dat map[string]interface{}
+		if marshalError := json.Unmarshal(typeDefinition, &dat); marshalError == nil {
+			schema, _ := json.Marshal(dat["schema"])
+			typeInfo := Type{
+				TypeId:     dat["id"].(string),
+				DMaaPTopic: dat["dmaapTopic"].(string),
+				Schema:     string(schema),
+				Jobs:       make(map[string]JobInfo),
+			}
+			if _, ok := allJobs[typeInfo.TypeId]; !ok {
+				allJobs[typeInfo.TypeId] = typeInfo
+			}
+			return &typeInfo, nil
+		} else {
+			return nil, marshalError
 		}
-		if _, ok := allJobs[typeName]; !ok {
-			allJobs[typeName] = make(map[string]JobInfo)
-		}
-		return &typeInfo, nil
 	} else {
 		return nil, err
 	}
 }
 
+func RunJobs(mRAddress string) {
+	for {
+		pollAndDistributeMessages(mRAddress)
+	}
+}
+
+func pollAndDistributeMessages(mRAddress string) {
+	for typeId, typeInfo := range allJobs {
+		log.Debugf("Processing jobs for type: %v", typeId)
+		messagesBody, error := restclient.Get(fmt.Sprintf("%v/events/%v/users/dmaapmediatorproducer", mRAddress, typeInfo.DMaaPTopic))
+		if error != nil {
+			log.Warnf("Error getting data from MR. Cause: %v", error)
+			continue
+		}
+		distributeMessages(messagesBody, typeInfo)
+	}
+}
+
+func distributeMessages(messages []byte, typeInfo Type) {
+	if len(messages) > 2 {
+		mu.Lock()
+		for _, jobInfo := range typeInfo.Jobs {
+			go sendMessagesToConsumer(messages, jobInfo)
+		}
+		mu.Unlock()
+	}
+}
+
+func sendMessagesToConsumer(messages []byte, jobInfo JobInfo) {
+	log.Debugf("Processing job: %v", jobInfo.InfoJobIdentity)
+	if postErr := restclient.Post(jobInfo.TargetUri, messages); postErr != nil {
+		log.Warnf("Error posting data for job: %v. Cause: %v", jobInfo, postErr)
+	}
+}
+
 func clearAll() {
-	allJobs = make(map[string]map[string]JobInfo)
+	allJobs = make(map[string]Type)
 }
diff --git a/dmaap-mediator-producer/internal/jobs/jobs_test.go b/dmaap-mediator-producer/internal/jobs/jobs_test.go
index 0941033..b53d85e 100644
--- a/dmaap-mediator-producer/internal/jobs/jobs_test.go
+++ b/dmaap-mediator-producer/internal/jobs/jobs_test.go
@@ -21,14 +21,21 @@
 package jobs
 
 import (
+	"bytes"
+	"io/ioutil"
+	"net/http"
 	"os"
 	"path/filepath"
 	"testing"
+	"time"
 
+	"github.com/stretchr/testify/mock"
 	"github.com/stretchr/testify/require"
+	"oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
+	"oransc.org/nonrtric/dmaapmediatorproducer/mocks"
 )
 
-const type1Schema = `{"title": "Type 1"}`
+const typeDefinition = `{"id": "type1", "dmaapTopic": "unauthenticated.SEC_FAULT_OUTPUT", "schema": {"title": "Type 1"}}`
 
 func TestGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) {
 	assertions := require.New(t)
@@ -42,13 +49,15 @@
 	})
 	typeDir = typesDir
 	fname := filepath.Join(typesDir, "type1.json")
-	if err = os.WriteFile(fname, []byte(type1Schema), 0666); err != nil {
+	if err = os.WriteFile(fname, []byte(typeDefinition), 0666); err != nil {
 		t.Errorf("Unable to create temporary files for types due to: %v", err)
 	}
 	types, err := GetTypes()
 	wantedType := Type{
-		TypeId: "type1",
-		Schema: type1Schema,
+		TypeId:     "type1",
+		DMaaPTopic: "unauthenticated.SEC_FAULT_OUTPUT",
+		Schema:     `{"title":"Type 1"}`,
+		Jobs:       make(map[string]JobInfo),
 	}
 	wantedTypes := []*Type{&wantedType}
 	assertions.EqualValues(wantedTypes, types)
@@ -60,11 +69,7 @@
 
 func TestAddJobWhenTypeIsSupported_shouldAddJobToAllJobsMap(t *testing.T) {
 	assertions := require.New(t)
-	allJobs["type1"] = make(map[string]JobInfo)
-	t.Cleanup(func() {
-		clearAll()
-	})
-	jobInfo := JobInfo{
+	wantedJob := JobInfo{
 		Owner:            "owner",
 		LastUpdated:      "now",
 		InfoJobIdentity:  "job1",
@@ -72,11 +77,18 @@
 		InfoJobData:      "{}",
 		InfoTypeIdentity: "type1",
 	}
+	allJobs["type1"] = Type{
+		TypeId: "type1",
+		Jobs:   map[string]JobInfo{"job1": wantedJob},
+	}
+	t.Cleanup(func() {
+		clearAll()
+	})
 
-	err := AddJob(jobInfo)
+	err := AddJob(wantedJob)
 	assertions.Nil(err)
-	assertions.Equal(1, len(allJobs["type1"]))
-	assertions.Equal(jobInfo, allJobs["type1"]["job1"])
+	assertions.Equal(1, len(allJobs["type1"].Jobs))
+	assertions.Equal(wantedJob, allJobs["type1"].Jobs["job1"])
 }
 
 func TestAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) {
@@ -92,7 +104,9 @@
 
 func TestAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) {
 	assertions := require.New(t)
-	allJobs["type1"] = make(map[string]JobInfo)
+	allJobs["type1"] = Type{
+		TypeId: "type1",
+	}
 	t.Cleanup(func() {
 		clearAll()
 	})
@@ -107,7 +121,9 @@
 
 func TestAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) {
 	assertions := require.New(t)
-	allJobs["type1"] = make(map[string]JobInfo)
+	allJobs["type1"] = Type{
+		TypeId: "type1",
+	}
 	jobInfo := JobInfo{
 		InfoTypeIdentity: "type1",
 		InfoJobIdentity:  "job1",
@@ -118,3 +134,53 @@
 	assertions.Equal("missing required target URI: {  job1   type1}", err.Error())
 	clearAll()
 }
+
+func TestPollAndDistributeMessages(t *testing.T) {
+	assertions := require.New(t)
+	jobInfo := JobInfo{
+		InfoTypeIdentity: "type1",
+		InfoJobIdentity:  "job1",
+		TargetUri:        "http://consumerHost/target",
+	}
+	allJobs["type1"] = Type{
+		TypeId:     "type1",
+		DMaaPTopic: "topic",
+		Jobs:       map[string]JobInfo{"job1": jobInfo},
+	}
+	t.Cleanup(func() {
+		clearAll()
+	})
+
+	body := ioutil.NopCloser(bytes.NewReader([]byte(`[{"message": {"data": "data"}}]`)))
+	clientMock := mocks.HTTPClient{}
+	clientMock.On("Get", mock.Anything).Return(&http.Response{
+		StatusCode: http.StatusOK,
+		Body:       body,
+	}, nil)
+
+	clientMock.On("Do", mock.Anything).Return(&http.Response{
+		StatusCode: http.StatusOK,
+	}, nil)
+
+	restclient.Client = &clientMock
+
+	pollAndDistributeMessages("http://mrAddr")
+
+	time.Sleep(100 * time.Millisecond)
+
+	var actualRequest *http.Request
+	clientMock.AssertCalled(t, "Get", "http://mrAddr/events/topic/users/dmaapmediatorproducer")
+	clientMock.AssertNumberOfCalls(t, "Get", 1)
+
+	clientMock.AssertCalled(t, "Do", mock.MatchedBy(func(req *http.Request) bool {
+		actualRequest = req
+		return true
+	}))
+	assertions.Equal(http.MethodPost, actualRequest.Method)
+	assertions.Equal("consumerHost", actualRequest.URL.Host)
+	assertions.Equal("/target", actualRequest.URL.Path)
+	assertions.Equal("application/json; charset=utf-8", actualRequest.Header.Get("Content-Type"))
+	actualBody, _ := ioutil.ReadAll(actualRequest.Body)
+	assertions.Equal([]byte(`[{"message": {"data": "data"}}]`), actualBody)
+	clientMock.AssertNumberOfCalls(t, "Do", 1)
+}
diff --git a/dmaap-mediator-producer/internal/restclient/HTTPClient.go b/dmaap-mediator-producer/internal/restclient/HTTPClient.go
index 78a02b6..a783f7e 100644
--- a/dmaap-mediator-producer/internal/restclient/HTTPClient.go
+++ b/dmaap-mediator-producer/internal/restclient/HTTPClient.go
@@ -73,7 +73,15 @@
 }
 
 func Put(url string, body []byte) error {
-	if req, reqErr := http.NewRequest(http.MethodPut, url, bytes.NewBuffer(body)); reqErr == nil {
+	return do(http.MethodPut, url, body)
+}
+
+func Post(url string, body []byte) error {
+	return do(http.MethodPost, url, body)
+}
+
+func do(method string, url string, body []byte) error {
+	if req, reqErr := http.NewRequest(method, url, bytes.NewBuffer(body)); reqErr == nil {
 		req.Header.Set("Content-Type", "application/json; charset=utf-8")
 		if response, respErr := Client.Do(req); respErr == nil {
 			if isResponseSuccess(response.StatusCode) {
diff --git a/dmaap-mediator-producer/internal/server/server_test.go b/dmaap-mediator-producer/internal/server/server_test.go
index d221c93..444deba 100644
--- a/dmaap-mediator-producer/internal/server/server_test.go
+++ b/dmaap-mediator-producer/internal/server/server_test.go
@@ -32,7 +32,7 @@
 
 	"github.com/stretchr/testify/require"
 	"oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
-	"oransc.org/nonrtric/dmaapmediatorproducer/mocks"
+	"oransc.org/nonrtric/dmaapmediatorproducer/mocks/jobhandler"
 )
 
 func TestStatusHandler(t *testing.T) {
@@ -88,7 +88,7 @@
 
 func TestCreateInfoJobHandler(t *testing.T) {
 	assertions := require.New(t)
-	jobHandlerMock := mocks.JobHandler{}
+	jobHandlerMock := jobhandler.JobHandler{}
 
 	goodJobInfo := jobs.JobInfo{
 		Owner:            "owner",
diff --git a/dmaap-mediator-producer/main.go b/dmaap-mediator-producer/main.go
index 3fe92dc..47e12e9 100644
--- a/dmaap-mediator-producer/main.go
+++ b/dmaap-mediator-producer/main.go
@@ -76,7 +76,7 @@
 	wg := new(sync.WaitGroup)
 
 	// add two goroutines to `wg` WaitGroup, one for each avilable server
-	wg.Add(2)
+	wg.Add(3)
 
 	log.Debugf("Starting status callback server at port %v", configuration.InfoProducerSupervisionCallbackPort)
 	go func() {
@@ -91,6 +91,11 @@
 		wg.Done()
 	}()
 
+	go func() {
+		jobs.RunJobs(fmt.Sprintf("%v:%v", configuration.MRHost, configuration.MRPort))
+		wg.Done()
+	}()
+
 	// wait until WaitGroup is done
 	wg.Wait()
 	log.Debug("Stopping DMaaP Mediator Producer")
diff --git a/dmaap-mediator-producer/mocks/JobHandler.go b/dmaap-mediator-producer/mocks/jobhandler/JobHandler.go
similarity index 85%
rename from dmaap-mediator-producer/mocks/JobHandler.go
rename to dmaap-mediator-producer/mocks/jobhandler/JobHandler.go
index 4914e4d..edb2bf5 100644
--- a/dmaap-mediator-producer/mocks/JobHandler.go
+++ b/dmaap-mediator-producer/mocks/jobhandler/JobHandler.go
@@ -1,10 +1,10 @@
 // Code generated by mockery v2.9.3. DO NOT EDIT.
 
-package mocks
+package jobhandler
 
 import (
 	mock "github.com/stretchr/testify/mock"
-	jobs "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
+	"oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
 )
 
 // JobHandler is an autogenerated mock type for the JobHandler type
diff --git a/dmaap-mediator-producer/simulator/consumersimulator.go b/dmaap-mediator-producer/simulator/consumersimulator.go
new file mode 100644
index 0000000..25421ae
--- /dev/null
+++ b/dmaap-mediator-producer/simulator/consumersimulator.go
@@ -0,0 +1,40 @@
+// -
+//   ========================LICENSE_START=================================
+//   O-RAN-SC
+//   %%
+//   Copyright (C) 2021: Nordix Foundation
+//   %%
+//   Licensed under the Apache License, Version 2.0 (the "License");
+//   you may not use this file except in compliance with the License.
+//   You may obtain a copy of the License at
+//
+//        http://www.apache.org/licenses/LICENSE-2.0
+//
+//   Unless required by applicable law or agreed to in writing, software
+//   distributed under the License is distributed on an "AS IS" BASIS,
+//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//   See the License for the specific language governing permissions and
+//   limitations under the License.
+//   ========================LICENSE_END===================================
+//
+
+package main
+
+import (
+	"fmt"
+	"io"
+	http "net/http"
+)
+
+func handleData(w http.ResponseWriter, req *http.Request) {
+	defer req.Body.Close()
+	if reqData, err := io.ReadAll(req.Body); err == nil {
+		fmt.Printf("Consumer received body: %v\n", string(reqData))
+	}
+}
+
+func main() {
+	http.HandleFunc("/jobs", handleData)
+
+	http.ListenAndServe(":40935", nil)
+}