Improve DMaaP Mediator Producer

Issue-ID: NONRTRIC-586
Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
Change-Id: Iab915f878874b687b1c1b2effc05293582fb254c
diff --git a/dmaap-mediator-producer/.gitignore b/dmaap-mediator-producer/.gitignore
index 0d08f66..9f5396c 100644
--- a/dmaap-mediator-producer/.gitignore
+++ b/dmaap-mediator-producer/.gitignore
@@ -4,3 +4,4 @@
 main
 dmaapmediatorproducer
 __debug_bin*
+simulator
diff --git a/dmaap-mediator-producer/configs/STD_Fault_Messages.json b/dmaap-mediator-producer/configs/STD_Fault_Messages.json
deleted file mode 100644
index 258d743..0000000
--- a/dmaap-mediator-producer/configs/STD_Fault_Messages.json
+++ /dev/null
@@ -1,12 +0,0 @@
-{
-  "id": "STD_Fault_Messages",
-  "dmaapTopic": "unauthenticated.SEC_FAULT_OUTPUT",
-  "schema": {
-    "$schema": "https://json-schema.org/draft/2019-09/schema",
-    "title": "STD_Fault_Messages",
-    "description": "Schema for job delivering fault messages from DMaaP Message Router",
-    "type": "object",
-    "properties": {},
-    "additionalProperties": false
-  }
-}
\ No newline at end of file
diff --git a/dmaap-mediator-producer/configs/type_config.json b/dmaap-mediator-producer/configs/type_config.json
new file mode 100644
index 0000000..983d0f3
--- /dev/null
+++ b/dmaap-mediator-producer/configs/type_config.json
@@ -0,0 +1,9 @@
+{
+  "types":
+    [
+      {
+        "id": "STD_Fault_Messages",
+        "dmaapTopicUrl": "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/STD_Fault_Messages"
+      }
+  ]
+}
\ No newline at end of file
diff --git a/dmaap-mediator-producer/internal/config/config.go b/dmaap-mediator-producer/internal/config/config.go
index 8a2784a..9b7b1dd 100644
--- a/dmaap-mediator-producer/internal/config/config.go
+++ b/dmaap-mediator-producer/internal/config/config.go
@@ -28,14 +28,12 @@
 )
 
 type Config struct {
-	LogLevel                            string
-	InfoProducerSupervisionCallbackHost string
-	InfoProducerSupervisionCallbackPort int
-	InfoJobCallbackHost                 string
-	InfoJobCallbackPort                 int
-	InfoCoordinatorAddress              string
-	MRHost                              string
-	MRPort                              int
+	LogLevel               string
+	InfoProducerHost       string
+	InfoProducerPort       int
+	InfoCoordinatorAddress string
+	MRHost                 string
+	MRPort                 int
 }
 
 type ProducerRegistrationInfo struct {
@@ -46,14 +44,12 @@
 
 func New() *Config {
 	return &Config{
-		LogLevel:                            getEnv("LOG_LEVEL", "Info"),
-		InfoProducerSupervisionCallbackHost: getEnv("INFO_PRODUCER_SUPERVISION_CALLBACK_HOST", ""),
-		InfoProducerSupervisionCallbackPort: getEnvAsInt("INFO_PRODUCER_SUPERVISION_CALLBACK_PORT", 8085),
-		InfoJobCallbackHost:                 getEnv("INFO_JOB_CALLBACK_HOST", ""),
-		InfoJobCallbackPort:                 getEnvAsInt("INFO_JOB_CALLBACK_PORT", 8086),
-		InfoCoordinatorAddress:              getEnv("INFO_COORD_ADDR", "http://enrichmentservice:8083"),
-		MRHost:                              getEnv("MR_HOST", "http://message-router.onap"),
-		MRPort:                              getEnvAsInt("MR_PORT", 3904),
+		LogLevel:               getEnv("LOG_LEVEL", "Info"),
+		InfoProducerHost:       getEnv("INFO_PRODUCER_HOST", ""),
+		InfoProducerPort:       getEnvAsInt("INFO_PRODUCER_PORT", 8085),
+		InfoCoordinatorAddress: getEnv("INFO_COORD_ADDR", "http://enrichmentservice:8083"),
+		MRHost:                 getEnv("MR_HOST", "http://message-router.onap"),
+		MRPort:                 getEnvAsInt("MR_PORT", 3904),
 	}
 }
 
diff --git a/dmaap-mediator-producer/internal/config/config_test.go b/dmaap-mediator-producer/internal/config/config_test.go
index 1043027..0fcbdd3 100644
--- a/dmaap-mediator-producer/internal/config/config_test.go
+++ b/dmaap-mediator-producer/internal/config/config_test.go
@@ -32,10 +32,8 @@
 
 func TestNew_envVarsSetConfigContainSetValues(t *testing.T) {
 	os.Setenv("LOG_LEVEL", "Debug")
-	os.Setenv("INFO_PRODUCER_SUPERVISION_CALLBACK_HOST", "supervisionCallbackHost")
-	os.Setenv("INFO_PRODUCER_SUPERVISION_CALLBACK_PORT", "8095")
-	os.Setenv("INFO_JOB_CALLBACK_HOST", "jobCallbackHost")
-	os.Setenv("INFO_JOB_CALLBACK_PORT", "8096")
+	os.Setenv("INFO_PRODUCER_HOST", "producerHost")
+	os.Setenv("INFO_PRODUCER_PORT", "8095")
 	os.Setenv("INFO_COORD_ADDR", "infoCoordAddr")
 	os.Setenv("MR_HOST", "mrHost")
 	os.Setenv("MR_PORT", "3908")
@@ -43,14 +41,12 @@
 		os.Clearenv()
 	})
 	wantConfig := Config{
-		LogLevel:                            "Debug",
-		InfoProducerSupervisionCallbackHost: "supervisionCallbackHost",
-		InfoProducerSupervisionCallbackPort: 8095,
-		InfoJobCallbackHost:                 "jobCallbackHost",
-		InfoJobCallbackPort:                 8096,
-		InfoCoordinatorAddress:              "infoCoordAddr",
-		MRHost:                              "mrHost",
-		MRPort:                              3908,
+		LogLevel:               "Debug",
+		InfoProducerHost:       "producerHost",
+		InfoProducerPort:       8095,
+		InfoCoordinatorAddress: "infoCoordAddr",
+		MRHost:                 "mrHost",
+		MRPort:                 3908,
 	}
 	if got := New(); !reflect.DeepEqual(got, &wantConfig) {
 		t.Errorf("New() = %v, want %v", got, &wantConfig)
@@ -62,38 +58,34 @@
 	var buf bytes.Buffer
 	log.SetOutput(&buf)
 
-	os.Setenv("INFO_PRODUCER_SUPERVISION_CALLBACK_PORT", "wrong")
+	os.Setenv("INFO_PRODUCER_PORT", "wrong")
 	t.Cleanup(func() {
 		log.SetOutput(os.Stderr)
 		os.Clearenv()
 	})
 	wantConfig := Config{
-		LogLevel:                            "Info",
-		InfoProducerSupervisionCallbackHost: "",
-		InfoProducerSupervisionCallbackPort: 8085,
-		InfoJobCallbackHost:                 "",
-		InfoJobCallbackPort:                 8086,
-		InfoCoordinatorAddress:              "http://enrichmentservice:8083",
-		MRHost:                              "http://message-router.onap",
-		MRPort:                              3904,
+		LogLevel:               "Info",
+		InfoProducerHost:       "",
+		InfoProducerPort:       8085,
+		InfoCoordinatorAddress: "http://enrichmentservice:8083",
+		MRHost:                 "http://message-router.onap",
+		MRPort:                 3904,
 	}
 	if got := New(); !reflect.DeepEqual(got, &wantConfig) {
 		t.Errorf("New() = %v, want %v", got, &wantConfig)
 	}
 	logString := buf.String()
-	assertions.Contains(logString, "Invalid int value: wrong for variable: INFO_PRODUCER_SUPERVISION_CALLBACK_PORT. Default value: 8085 will be used")
+	assertions.Contains(logString, "Invalid int value: wrong for variable: INFO_PRODUCER_PORT. Default value: 8085 will be used")
 }
 
 func TestNew_envVarsNotSetConfigContainDefaultValues(t *testing.T) {
 	wantConfig := Config{
-		LogLevel:                            "Info",
-		InfoProducerSupervisionCallbackHost: "",
-		InfoProducerSupervisionCallbackPort: 8085,
-		InfoJobCallbackHost:                 "",
-		InfoJobCallbackPort:                 8086,
-		InfoCoordinatorAddress:              "http://enrichmentservice:8083",
-		MRHost:                              "http://message-router.onap",
-		MRPort:                              3904,
+		LogLevel:               "Info",
+		InfoProducerHost:       "",
+		InfoProducerPort:       8085,
+		InfoCoordinatorAddress: "http://enrichmentservice:8083",
+		MRHost:                 "http://message-router.onap",
+		MRPort:                 3904,
 	}
 	if got := New(); !reflect.DeepEqual(got, &wantConfig) {
 		t.Errorf("New() = %v, want %v", got, &wantConfig)
diff --git a/dmaap-mediator-producer/internal/config/registrator.go b/dmaap-mediator-producer/internal/config/registrator.go
index 37225ed..db46c54 100644
--- a/dmaap-mediator-producer/internal/config/registrator.go
+++ b/dmaap-mediator-producer/internal/config/registrator.go
@@ -33,9 +33,10 @@
 
 const registerTypePath = "/data-producer/v1/info-types/"
 const registerProducerPath = "/data-producer/v1/info-producers/"
+const typeSchema = `{"type": "object","properties": {},"additionalProperties": false}`
 
 type Registrator interface {
-	RegisterTypes(types []*jobs.Type) error
+	RegisterTypes(types []*jobs.TypeData) error
 	RegisterProducer(producerId string, producerInfo *ProducerRegistrationInfo)
 }
 
@@ -49,9 +50,9 @@
 	}
 }
 
-func (r RegistratorImpl) RegisterTypes(jobTypes []*jobs.Type) error {
+func (r RegistratorImpl) RegisterTypes(jobTypes []jobs.TypeData) error {
 	for _, jobType := range jobTypes {
-		body := fmt.Sprintf(`{"info_job_data_schema": %v}`, jobType.Schema)
+		body := fmt.Sprintf(`{"info_job_data_schema": %v}`, typeSchema)
 		if error := restclient.Put(r.infoCoordinatorAddress+registerTypePath+url.PathEscape(jobType.TypeId), []byte(body)); error != nil {
 			return error
 		}
diff --git a/dmaap-mediator-producer/internal/config/registrator_test.go b/dmaap-mediator-producer/internal/config/registrator_test.go
index a89c43f..353e9de 100644
--- a/dmaap-mediator-producer/internal/config/registrator_test.go
+++ b/dmaap-mediator-producer/internal/config/registrator_test.go
@@ -43,11 +43,10 @@
 
 	restclient.Client = &clientMock
 
-	type1 := jobs.Type{
+	type1 := jobs.TypeData{
 		TypeId: "Type1",
-		Schema: `{"title": "Type 1"}`,
 	}
-	types := []*jobs.Type{&type1}
+	types := []jobs.TypeData{type1}
 
 	r := NewRegistratorImpl("http://localhost:9990")
 	err := r.RegisterTypes(types)
@@ -64,7 +63,7 @@
 	assertions.Equal("/data-producer/v1/info-types/Type1", actualRequest.URL.Path)
 	assertions.Equal("application/json; charset=utf-8", actualRequest.Header.Get("Content-Type"))
 	body, _ := ioutil.ReadAll(actualRequest.Body)
-	expectedBody := []byte(`{"info_job_data_schema": {"title": "Type 1"}}`)
+	expectedBody := []byte(`{"info_job_data_schema": {"type": "object","properties": {},"additionalProperties": false}}`)
 	assertions.Equal(expectedBody, body)
 	clientMock.AssertNumberOfCalls(t, "Do", 1)
 }
diff --git a/dmaap-mediator-producer/internal/jobs/jobs.go b/dmaap-mediator-producer/internal/jobs/jobs.go
index eec59c3..09d3891 100644
--- a/dmaap-mediator-producer/internal/jobs/jobs.go
+++ b/dmaap-mediator-producer/internal/jobs/jobs.go
@@ -24,28 +24,33 @@
 	"encoding/json"
 	"fmt"
 	"os"
-	"path/filepath"
-	"strings"
 	"sync"
 
 	log "github.com/sirupsen/logrus"
 	"oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
 )
 
-type Type struct {
-	TypeId     string `json:"id"`
-	DMaaPTopic string `json:"dmaapTopic"`
-	Schema     string `json:"schema"`
-	Jobs       map[string]JobInfo
+type TypeDefinitions struct {
+	Types []TypeDefinition `json:"types"`
+}
+type TypeDefinition struct {
+	Id            string `json:"id"`
+	DmaapTopicURL string `json:"dmaapTopicUrl"`
+}
+
+type TypeData struct {
+	TypeId        string `json:"id"`
+	DMaaPTopicURL string `json:"dmaapTopicUrl"`
+	Jobs          map[string]JobInfo
 }
 
 type JobInfo struct {
-	Owner            string `json:"owner"`
-	LastUpdated      string `json:"last_updated"`
-	InfoJobIdentity  string `json:"info_job_identity"`
-	TargetUri        string `json:"target_uri"`
-	InfoJobData      string `json:"info_job_data"`
-	InfoTypeIdentity string `json:"info_type_identity"`
+	Owner            string      `json:"owner"`
+	LastUpdated      string      `json:"last_updated"`
+	InfoJobIdentity  string      `json:"info_job_identity"`
+	TargetUri        string      `json:"target_uri"`
+	InfoJobData      interface{} `json:"info_job_data"`
+	InfoTypeIdentity string      `json:"info_type_identity"`
 }
 
 type JobHandler interface {
@@ -53,10 +58,10 @@
 }
 
 var (
-	mu      sync.Mutex
-	typeDir = "configs"
-	Handler JobHandler
-	allJobs = make(map[string]Type)
+	mu         sync.Mutex
+	configFile = "configs/type_config.json"
+	Handler    JobHandler
+	allTypes   = make(map[string]TypeData)
 )
 
 func init() {
@@ -73,8 +78,9 @@
 	mu.Lock()
 	defer mu.Unlock()
 	if err := validateJobInfo(ji); err == nil {
-		jobs := allJobs[ji.InfoTypeIdentity].Jobs
+		jobs := allTypes[ji.InfoTypeIdentity].Jobs
 		jobs[ji.InfoJobIdentity] = ji
+		log.Debug("Added job: ", ji)
 		return nil
 	} else {
 		return err
@@ -82,7 +88,7 @@
 }
 
 func validateJobInfo(ji JobInfo) error {
-	if _, ok := allJobs[ji.InfoTypeIdentity]; !ok {
+	if _, ok := allTypes[ji.InfoTypeIdentity]; !ok {
 		return fmt.Errorf("type not supported: %v", ji.InfoTypeIdentity)
 	}
 	if ji.InfoJobIdentity == "" {
@@ -95,25 +101,30 @@
 	return nil
 }
 
-func GetTypes() ([]*Type, error) {
+func GetTypes() ([]TypeData, error) {
 	mu.Lock()
 	defer mu.Unlock()
-	types := make([]*Type, 0, 1)
-	err := filepath.Walk(typeDir,
-		func(path string, info os.FileInfo, err error) error {
-			if err != nil {
-				return err
-			}
-			if strings.Contains(path, ".json") {
-				if jobType, err := getType(path); err == nil {
-					types = append(types, jobType)
-				}
-			}
-			return nil
-		})
+	types := make([]TypeData, 0, 1)
+	typeDefsByte, err := os.ReadFile(configFile)
 	if err != nil {
 		return nil, err
 	}
+	typeDefs := TypeDefinitions{}
+	err = json.Unmarshal(typeDefsByte, &typeDefs)
+	if err != nil {
+		return nil, err
+	}
+	for _, typeDef := range typeDefs.Types {
+		typeInfo := TypeData{
+			TypeId:        typeDef.Id,
+			DMaaPTopicURL: typeDef.DmaapTopicURL,
+			Jobs:          make(map[string]JobInfo),
+		}
+		if _, ok := allTypes[typeInfo.TypeId]; !ok {
+			allTypes[typeInfo.TypeId] = typeInfo
+		}
+		types = append(types, typeInfo)
+	}
 	return types, nil
 }
 
@@ -121,7 +132,7 @@
 	mu.Lock()
 	defer mu.Unlock()
 	supportedTypes := []string{}
-	for k := range allJobs {
+	for k := range allTypes {
 		supportedTypes = append(supportedTypes, k)
 	}
 	return supportedTypes
@@ -131,29 +142,6 @@
 	return Handler.AddJob(job)
 }
 
-func getType(path string) (*Type, error) {
-	if typeDefinition, err := os.ReadFile(path); err == nil {
-		var dat map[string]interface{}
-		if marshalError := json.Unmarshal(typeDefinition, &dat); marshalError == nil {
-			schema, _ := json.Marshal(dat["schema"])
-			typeInfo := Type{
-				TypeId:     dat["id"].(string),
-				DMaaPTopic: dat["dmaapTopic"].(string),
-				Schema:     string(schema),
-				Jobs:       make(map[string]JobInfo),
-			}
-			if _, ok := allJobs[typeInfo.TypeId]; !ok {
-				allJobs[typeInfo.TypeId] = typeInfo
-			}
-			return &typeInfo, nil
-		} else {
-			return nil, marshalError
-		}
-	} else {
-		return nil, err
-	}
-}
-
 func RunJobs(mRAddress string) {
 	for {
 		pollAndDistributeMessages(mRAddress)
@@ -161,9 +149,9 @@
 }
 
 func pollAndDistributeMessages(mRAddress string) {
-	for typeId, typeInfo := range allJobs {
+	for typeId, typeInfo := range allTypes {
 		log.Debugf("Processing jobs for type: %v", typeId)
-		messagesBody, error := restclient.Get(fmt.Sprintf("%v/events/%v/users/dmaapmediatorproducer", mRAddress, typeInfo.DMaaPTopic))
+		messagesBody, error := restclient.Get(fmt.Sprintf("%v/%v", mRAddress, typeInfo.DMaaPTopicURL))
 		if error != nil {
 			log.Warnf("Error getting data from MR. Cause: %v", error)
 			continue
@@ -172,7 +160,7 @@
 	}
 }
 
-func distributeMessages(messages []byte, typeInfo Type) {
+func distributeMessages(messages []byte, typeInfo TypeData) {
 	if len(messages) > 2 {
 		mu.Lock()
 		for _, jobInfo := range typeInfo.Jobs {
@@ -190,5 +178,5 @@
 }
 
 func clearAll() {
-	allJobs = make(map[string]Type)
+	allTypes = make(map[string]TypeData)
 }
diff --git a/dmaap-mediator-producer/internal/jobs/jobs_test.go b/dmaap-mediator-producer/internal/jobs/jobs_test.go
index b53d85e..3bb2578 100644
--- a/dmaap-mediator-producer/internal/jobs/jobs_test.go
+++ b/dmaap-mediator-producer/internal/jobs/jobs_test.go
@@ -26,16 +26,15 @@
 	"net/http"
 	"os"
 	"path/filepath"
+	"sync"
 	"testing"
 	"time"
 
-	"github.com/stretchr/testify/mock"
 	"github.com/stretchr/testify/require"
 	"oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
-	"oransc.org/nonrtric/dmaapmediatorproducer/mocks"
 )
 
-const typeDefinition = `{"id": "type1", "dmaapTopic": "unauthenticated.SEC_FAULT_OUTPUT", "schema": {"title": "Type 1"}}`
+const typeDefinition = `{"types": [{"id": "type1", "dmaapTopicUrl": "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1"}]}`
 
 func TestGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) {
 	assertions := require.New(t)
@@ -47,19 +46,18 @@
 		os.RemoveAll(typesDir)
 		clearAll()
 	})
-	typeDir = typesDir
-	fname := filepath.Join(typesDir, "type1.json")
+	fname := filepath.Join(typesDir, "type_config.json")
+	configFile = fname
 	if err = os.WriteFile(fname, []byte(typeDefinition), 0666); err != nil {
-		t.Errorf("Unable to create temporary files for types due to: %v", err)
+		t.Errorf("Unable to create temporary config file for types due to: %v", err)
 	}
 	types, err := GetTypes()
-	wantedType := Type{
-		TypeId:     "type1",
-		DMaaPTopic: "unauthenticated.SEC_FAULT_OUTPUT",
-		Schema:     `{"title":"Type 1"}`,
-		Jobs:       make(map[string]JobInfo),
+	wantedType := TypeData{
+		TypeId:        "type1",
+		DMaaPTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1",
+		Jobs:          make(map[string]JobInfo),
 	}
-	wantedTypes := []*Type{&wantedType}
+	wantedTypes := []TypeData{wantedType}
 	assertions.EqualValues(wantedTypes, types)
 	assertions.Nil(err)
 
@@ -77,7 +75,7 @@
 		InfoJobData:      "{}",
 		InfoTypeIdentity: "type1",
 	}
-	allJobs["type1"] = Type{
+	allTypes["type1"] = TypeData{
 		TypeId: "type1",
 		Jobs:   map[string]JobInfo{"job1": wantedJob},
 	}
@@ -87,8 +85,8 @@
 
 	err := AddJob(wantedJob)
 	assertions.Nil(err)
-	assertions.Equal(1, len(allJobs["type1"].Jobs))
-	assertions.Equal(wantedJob, allJobs["type1"].Jobs["job1"])
+	assertions.Equal(1, len(allTypes["type1"].Jobs))
+	assertions.Equal(wantedJob, allTypes["type1"].Jobs["job1"])
 }
 
 func TestAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) {
@@ -104,7 +102,7 @@
 
 func TestAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) {
 	assertions := require.New(t)
-	allJobs["type1"] = Type{
+	allTypes["type1"] = TypeData{
 		TypeId: "type1",
 	}
 	t.Cleanup(func() {
@@ -116,12 +114,12 @@
 
 	err := AddJob(jobInfo)
 	assertions.NotNil(err)
-	assertions.Equal("missing required job identity: {     type1}", err.Error())
+	assertions.Equal("missing required job identity: {    <nil> type1}", err.Error())
 }
 
 func TestAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) {
 	assertions := require.New(t)
-	allJobs["type1"] = Type{
+	allTypes["type1"] = TypeData{
 		TypeId: "type1",
 	}
 	jobInfo := JobInfo{
@@ -131,10 +129,9 @@
 
 	err := AddJob(jobInfo)
 	assertions.NotNil(err)
-	assertions.Equal("missing required target URI: {  job1   type1}", err.Error())
+	assertions.Equal("missing required target URI: {  job1  <nil> type1}", err.Error())
 	clearAll()
 }
-
 func TestPollAndDistributeMessages(t *testing.T) {
 	assertions := require.New(t)
 	jobInfo := JobInfo{
@@ -142,45 +139,84 @@
 		InfoJobIdentity:  "job1",
 		TargetUri:        "http://consumerHost/target",
 	}
-	allJobs["type1"] = Type{
-		TypeId:     "type1",
-		DMaaPTopic: "topic",
-		Jobs:       map[string]JobInfo{"job1": jobInfo},
+	allTypes["type1"] = TypeData{
+		TypeId:        "type1",
+		DMaaPTopicURL: "topicUrl",
+		Jobs:          map[string]JobInfo{"job1": jobInfo},
 	}
 	t.Cleanup(func() {
 		clearAll()
 	})
 
-	body := ioutil.NopCloser(bytes.NewReader([]byte(`[{"message": {"data": "data"}}]`)))
-	clientMock := mocks.HTTPClient{}
-	clientMock.On("Get", mock.Anything).Return(&http.Response{
-		StatusCode: http.StatusOK,
-		Body:       body,
-	}, nil)
+	wg := sync.WaitGroup{}
+	wg.Add(2) // Two calls should be made to the server, one to poll and one to distribute
+	messages := `[{"message": {"data": "data"}}]`
+	clientMock := NewTestClient(func(req *http.Request) *http.Response {
+		if req.URL.String() == "http://mrAddr/topicUrl" {
+			assertions.Equal(req.Method, "GET")
+			wg.Done()
+			return &http.Response{
+				StatusCode: 200,
+				Body:       ioutil.NopCloser(bytes.NewReader([]byte(messages))),
+				Header:     make(http.Header), // Must be set to non-nil value or it panics
+			}
+		} else if req.URL.String() == "http://consumerHost/target" {
+			assertions.Equal(req.Method, "POST")
+			assertions.Equal(messages, getBodyAsString(req))
+			assertions.Equal("application/json; charset=utf-8", req.Header.Get("Content-Type"))
+			wg.Done()
+			return &http.Response{
+				StatusCode: 200,
+				Body:       ioutil.NopCloser(bytes.NewBufferString(`OK`)),
+				Header:     make(http.Header), // Must be set to non-nil value or it panics
+			}
+		}
+		t.Error("Wrong call to client: ", req)
+		t.Fail()
+		return nil
+	})
 
-	clientMock.On("Do", mock.Anything).Return(&http.Response{
-		StatusCode: http.StatusOK,
-	}, nil)
-
-	restclient.Client = &clientMock
+	restclient.Client = clientMock
 
 	pollAndDistributeMessages("http://mrAddr")
 
-	time.Sleep(100 * time.Millisecond)
+	if waitTimeout(&wg, 100*time.Millisecond) {
+		t.Error("Not all calls to server were made")
+		t.Fail()
+	}
+}
 
-	var actualRequest *http.Request
-	clientMock.AssertCalled(t, "Get", "http://mrAddr/events/topic/users/dmaapmediatorproducer")
-	clientMock.AssertNumberOfCalls(t, "Get", 1)
+type RoundTripFunc func(req *http.Request) *http.Response
 
-	clientMock.AssertCalled(t, "Do", mock.MatchedBy(func(req *http.Request) bool {
-		actualRequest = req
-		return true
-	}))
-	assertions.Equal(http.MethodPost, actualRequest.Method)
-	assertions.Equal("consumerHost", actualRequest.URL.Host)
-	assertions.Equal("/target", actualRequest.URL.Path)
-	assertions.Equal("application/json; charset=utf-8", actualRequest.Header.Get("Content-Type"))
-	actualBody, _ := ioutil.ReadAll(actualRequest.Body)
-	assertions.Equal([]byte(`[{"message": {"data": "data"}}]`), actualBody)
-	clientMock.AssertNumberOfCalls(t, "Do", 1)
+func (f RoundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) {
+	return f(req), nil
+}
+
+//NewTestClient returns *http.Client with Transport replaced to avoid making real calls
+func NewTestClient(fn RoundTripFunc) *http.Client {
+	return &http.Client{
+		Transport: RoundTripFunc(fn),
+	}
+}
+
+// waitTimeout waits for the waitgroup for the specified max timeout.
+// Returns true if waiting timed out.
+func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
+	c := make(chan struct{})
+	go func() {
+		defer close(c)
+		wg.Wait()
+	}()
+	select {
+	case <-c:
+		return false // completed normally
+	case <-time.After(timeout):
+		return true // timed out
+	}
+}
+
+func getBodyAsString(req *http.Request) string {
+	buf := new(bytes.Buffer)
+	buf.ReadFrom(req.Body)
+	return buf.String()
 }
diff --git a/dmaap-mediator-producer/internal/server/server.go b/dmaap-mediator-producer/internal/server/server.go
index c3a1331..0b5e5b8 100644
--- a/dmaap-mediator-producer/internal/server/server.go
+++ b/dmaap-mediator-producer/internal/server/server.go
@@ -29,8 +29,11 @@
 	"oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
 )
 
+const StatusCallbackPath = "/status"
+const JobsCallbackPath = "/jobs"
+
 func StatusHandler(w http.ResponseWriter, r *http.Request) {
-	if r.URL.Path != "/" {
+	if r.URL.Path != StatusCallbackPath {
 		http.Error(w, "404 not found.", http.StatusNotFound)
 		return
 	}
@@ -44,7 +47,7 @@
 }
 
 func CreateInfoJobHandler(w http.ResponseWriter, r *http.Request) {
-	if r.URL.Path != "/producer_simulator/info_job" {
+	if r.URL.Path != JobsCallbackPath {
 		http.Error(w, "404 not found.", http.StatusNotFound)
 		return
 	}
@@ -68,14 +71,3 @@
 		http.Error(w, fmt.Sprintf("Invalid job info. Cause: %v", err), http.StatusBadRequest)
 	}
 }
-
-func CreateServer(port int, handlerFunc func(http.ResponseWriter, *http.Request)) *http.Server {
-
-	mux := http.NewServeMux()
-	mux.HandleFunc("/", handlerFunc)
-	server := http.Server{
-		Addr:    fmt.Sprintf(":%v", port), // :{port}
-		Handler: mux,
-	}
-	return &server
-}
diff --git a/dmaap-mediator-producer/internal/server/server_test.go b/dmaap-mediator-producer/internal/server/server_test.go
index 444deba..a4b19c4 100644
--- a/dmaap-mediator-producer/internal/server/server_test.go
+++ b/dmaap-mediator-producer/internal/server/server_test.go
@@ -51,7 +51,7 @@
 			name: "StatusHandler with correct path and method, should return OK",
 			args: args{
 				responseRecorder: httptest.NewRecorder(),
-				r:                newRequest("GET", "/", nil, t),
+				r:                newRequest("GET", "/status", nil, t),
 			},
 			wantedStatus: http.StatusOK,
 			wantedBody:   "All is well!",
@@ -69,7 +69,7 @@
 			name: "StatusHandler with incorrect method, should return MethodNotAllowed",
 			args: args{
 				responseRecorder: httptest.NewRecorder(),
-				r:                newRequest("PUT", "/", nil, t),
+				r:                newRequest("PUT", "/status", nil, t),
 			},
 			wantedStatus: http.StatusMethodNotAllowed,
 			wantedBody:   "Method is not supported.\n",
@@ -119,7 +119,7 @@
 			name: "CreateInfoJobHandler with correct path and method, should return OK",
 			args: args{
 				responseRecorder: httptest.NewRecorder(),
-				r:                newRequest("POST", "/producer_simulator/info_job", &goodJobInfo, t),
+				r:                newRequest("POST", "/jobs", &goodJobInfo, t),
 			},
 			wantedStatus: http.StatusOK,
 			wantedBody:   "",
@@ -128,7 +128,7 @@
 			name: "CreateInfoJobHandler with incorrect job info, should return BadRequest",
 			args: args{
 				responseRecorder: httptest.NewRecorder(),
-				r:                newRequest("POST", "/producer_simulator/info_job", &badJobInfo, t),
+				r:                newRequest("POST", "/jobs", &badJobInfo, t),
 			},
 			wantedStatus: http.StatusBadRequest,
 			wantedBody:   "Invalid job info. Cause: error",
@@ -146,7 +146,7 @@
 			name: "CreateInfoJobHandler with incorrect method, should return MethodNotAllowed",
 			args: args{
 				responseRecorder: httptest.NewRecorder(),
-				r:                newRequest("PUT", "/producer_simulator/info_job", nil, t),
+				r:                newRequest("PUT", "/jobs", nil, t),
 			},
 			wantedStatus: http.StatusMethodNotAllowed,
 			wantedBody:   "Method is not supported.",
diff --git a/dmaap-mediator-producer/main.go b/dmaap-mediator-producer/main.go
index 47e12e9..79fcb6b 100644
--- a/dmaap-mediator-producer/main.go
+++ b/dmaap-mediator-producer/main.go
@@ -22,6 +22,7 @@
 
 import (
 	"fmt"
+	"net/http"
 	"sync"
 
 	log "github.com/sirupsen/logrus"
@@ -31,8 +32,7 @@
 )
 
 var configuration *config.Config
-var supervisionCallbackAddress string
-var jobInfoCallbackAddress string
+var callbackAddress string
 
 func init() {
 	configuration = config.New()
@@ -43,15 +43,10 @@
 	}
 
 	log.Debug("Initializing DMaaP Mediator Producer")
-	if configuration.InfoProducerSupervisionCallbackHost == "" {
+	if configuration.InfoProducerHost == "" {
 		log.Fatal("Missing INFO_PRODUCER_SUPERVISION_CALLBACK_HOST")
 	}
-	supervisionCallbackAddress = fmt.Sprintf("%v:%v", configuration.InfoProducerSupervisionCallbackHost, configuration.InfoProducerSupervisionCallbackPort)
-
-	if configuration.InfoJobCallbackHost == "" {
-		log.Fatal("Missing INFO_JOB_CALLBACK_HOST")
-	}
-	jobInfoCallbackAddress = fmt.Sprintf("%v:%v", configuration.InfoJobCallbackHost, configuration.InfoJobCallbackPort)
+	callbackAddress = fmt.Sprintf("%v:%v", configuration.InfoProducerHost, configuration.InfoProducerPort)
 
 	registrator := config.NewRegistratorImpl(configuration.InfoCoordinatorAddress)
 	if types, err := jobs.GetTypes(); err == nil {
@@ -62,9 +57,9 @@
 		log.Fatalf("Unable to get types to register due to: %v", err)
 	}
 	producer := config.ProducerRegistrationInfo{
-		InfoProducerSupervisionCallbackUrl: supervisionCallbackAddress,
+		InfoProducerSupervisionCallbackUrl: callbackAddress + server.StatusCallbackPath,
 		SupportedInfoTypes:                 jobs.GetSupportedTypes(),
-		InfoJobCallbackUrl:                 jobInfoCallbackAddress,
+		InfoJobCallbackUrl:                 callbackAddress + server.JobsCallbackPath,
 	}
 	if err := registrator.RegisterProducer("DMaaP_Mediator_Producer", &producer); err != nil {
 		log.Fatalf("Unable to register producer due to: %v", err)
@@ -75,19 +70,14 @@
 	log.Debug("Starting DMaaP Mediator Producer")
 	wg := new(sync.WaitGroup)
 
-	// add two goroutines to `wg` WaitGroup, one for each avilable server
-	wg.Add(3)
+	// add two goroutines to `wg` WaitGroup, one for each running go routine
+	wg.Add(2)
 
-	log.Debugf("Starting status callback server at port %v", configuration.InfoProducerSupervisionCallbackPort)
+	log.Debugf("Starting callback server at port %v", configuration.InfoProducerPort)
 	go func() {
-		server := server.CreateServer(configuration.InfoProducerSupervisionCallbackPort, server.StatusHandler)
-		log.Warn(server.ListenAndServe())
-		wg.Done()
-	}()
-
-	go func() {
-		server := server.CreateServer(configuration.InfoJobCallbackPort, server.CreateInfoJobHandler)
-		log.Warn(server.ListenAndServe())
+		http.HandleFunc(server.StatusCallbackPath, server.StatusHandler)
+		http.HandleFunc(server.JobsCallbackPath, server.CreateInfoJobHandler)
+		log.Warn(http.ListenAndServe(fmt.Sprintf(":%v", configuration.InfoProducerPort), nil))
 		wg.Done()
 	}()
 
diff --git a/dmaap-mediator-producer/simulator/consumersimulator.go b/dmaap-mediator-producer/simulator/consumersimulator.go
index 25421ae..144f56f 100644
--- a/dmaap-mediator-producer/simulator/consumersimulator.go
+++ b/dmaap-mediator-producer/simulator/consumersimulator.go
@@ -21,20 +21,43 @@
 package main
 
 import (
+	"encoding/json"
+	"flag"
 	"fmt"
 	"io"
 	http "net/http"
+
+	"oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
 )
 
+func main() {
+	port := flag.Int("port", 40935, "The port this consumer will listen on")
+	flag.Parse()
+	http.HandleFunc("/jobs", handleData)
+
+	fmt.Print("Starting consumer on port: ", *port)
+	http.ListenAndServe(fmt.Sprintf(":%v", *port), nil)
+	registerJob(*port)
+}
+
+func registerJob(port int) {
+	jobInfo := struct {
+		JobOwner      string `json:"job_owner"`
+		JobResultUri  string `json:"job_result_uri"`
+		InfoTypeId    string `json:"info_type_id"`
+		JobDefinition string `json:"job_definition"`
+	}{fmt.Sprintf("test%v", port), fmt.Sprintf("http://localhost:%v/jobs", port), "STD_Fault_Messages", "{}"}
+	fmt.Print("Registering consumer: ", jobInfo)
+	body, _ := json.Marshal(jobInfo)
+	putErr := restclient.Put(fmt.Sprintf("http://localhost:8083/data-consumer/v1/info-jobs/job%v", port), body)
+	if putErr != nil {
+		fmt.Printf("Unable to register consumer: %v", putErr)
+	}
+}
+
 func handleData(w http.ResponseWriter, req *http.Request) {
 	defer req.Body.Close()
 	if reqData, err := io.ReadAll(req.Body); err == nil {
-		fmt.Printf("Consumer received body: %v\n", string(reqData))
+		fmt.Print("Consumer received body: ", string(reqData))
 	}
 }
-
-func main() {
-	http.HandleFunc("/jobs", handleData)
-
-	http.ListenAndServe(":40935", nil)
-}