Refactor Go code

Issue-ID: NONRTRIC-606
Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
Change-Id: Icbd97b8dbf0c3b015e2c864d4d4dd3581d5ade9b
diff --git a/dmaap-mediator-producer/internal/config/config.go b/dmaap-mediator-producer/internal/config/config.go
index 9b7b1dd..dfd2505 100644
--- a/dmaap-mediator-producer/internal/config/config.go
+++ b/dmaap-mediator-producer/internal/config/config.go
@@ -28,7 +28,7 @@
 )
 
 type Config struct {
-	LogLevel               string
+	LogLevel               log.Level
 	InfoProducerHost       string
 	InfoProducerPort       int
 	InfoCoordinatorAddress string
@@ -36,15 +36,9 @@
 	MRPort                 int
 }
 
-type ProducerRegistrationInfo struct {
-	InfoProducerSupervisionCallbackUrl string   `json:"info_producer_supervision_callback_url"`
-	SupportedInfoTypes                 []string `json:"supported_info_types"`
-	InfoJobCallbackUrl                 string   `json:"info_job_callback_url"`
-}
-
 func New() *Config {
 	return &Config{
-		LogLevel:               getEnv("LOG_LEVEL", "Info"),
+		LogLevel:               getLogLevel(),
 		InfoProducerHost:       getEnv("INFO_PRODUCER_HOST", ""),
 		InfoProducerPort:       getEnvAsInt("INFO_PRODUCER_PORT", 8085),
 		InfoCoordinatorAddress: getEnv("INFO_COORD_ADDR", "http://enrichmentservice:8083"),
@@ -71,3 +65,13 @@
 
 	return defaultVal
 }
+
+func getLogLevel() log.Level {
+	logLevelStr := getEnv("LOG_LEVEL", "Info")
+	if loglevel, err := log.ParseLevel(logLevelStr); err == nil {
+		return loglevel
+	} else {
+		log.Warnf("Invalid log level: %v. Log level will be Info!", logLevelStr)
+		return log.InfoLevel
+	}
+}
diff --git a/dmaap-mediator-producer/internal/config/config_test.go b/dmaap-mediator-producer/internal/config/config_test.go
index 0fcbdd3..fc64e57 100644
--- a/dmaap-mediator-producer/internal/config/config_test.go
+++ b/dmaap-mediator-producer/internal/config/config_test.go
@@ -31,6 +31,7 @@
 )
 
 func TestNew_envVarsSetConfigContainSetValues(t *testing.T) {
+	assertions := require.New(t)
 	os.Setenv("LOG_LEVEL", "Debug")
 	os.Setenv("INFO_PRODUCER_HOST", "producerHost")
 	os.Setenv("INFO_PRODUCER_PORT", "8095")
@@ -41,16 +42,16 @@
 		os.Clearenv()
 	})
 	wantConfig := Config{
-		LogLevel:               "Debug",
+		LogLevel:               log.DebugLevel,
 		InfoProducerHost:       "producerHost",
 		InfoProducerPort:       8095,
 		InfoCoordinatorAddress: "infoCoordAddr",
 		MRHost:                 "mrHost",
 		MRPort:                 3908,
 	}
-	if got := New(); !reflect.DeepEqual(got, &wantConfig) {
-		t.Errorf("New() = %v, want %v", got, &wantConfig)
-	}
+	got := New()
+
+	assertions.Equal(&wantConfig, got)
 }
 
 func TestNew_faultyIntValueSetConfigContainDefaultValueAndWarnInLog(t *testing.T) {
@@ -64,7 +65,7 @@
 		os.Clearenv()
 	})
 	wantConfig := Config{
-		LogLevel:               "Info",
+		LogLevel:               log.InfoLevel,
 		InfoProducerHost:       "",
 		InfoProducerPort:       8085,
 		InfoCoordinatorAddress: "http://enrichmentservice:8083",
@@ -78,16 +79,29 @@
 	assertions.Contains(logString, "Invalid int value: wrong for variable: INFO_PRODUCER_PORT. Default value: 8085 will be used")
 }
 
-func TestNew_envVarsNotSetConfigContainDefaultValues(t *testing.T) {
+func TestNew_envFaultyLogLevelConfigContainDefaultValues(t *testing.T) {
+	assertions := require.New(t)
+	var buf bytes.Buffer
+	log.SetOutput(&buf)
+
+	os.Setenv("LOG_LEVEL", "wrong")
+	t.Cleanup(func() {
+		log.SetOutput(os.Stderr)
+		os.Clearenv()
+	})
+
 	wantConfig := Config{
-		LogLevel:               "Info",
+		LogLevel:               log.InfoLevel,
 		InfoProducerHost:       "",
 		InfoProducerPort:       8085,
 		InfoCoordinatorAddress: "http://enrichmentservice:8083",
 		MRHost:                 "http://message-router.onap",
 		MRPort:                 3904,
 	}
-	if got := New(); !reflect.DeepEqual(got, &wantConfig) {
-		t.Errorf("New() = %v, want %v", got, &wantConfig)
-	}
+
+	got := New()
+
+	assertions.Equal(&wantConfig, got)
+	logString := buf.String()
+	assertions.Contains(logString, "Invalid log level: wrong. Log level will be Info!")
 }
diff --git a/dmaap-mediator-producer/internal/config/registrator.go b/dmaap-mediator-producer/internal/config/registrator.go
index db46c54..83ed43f 100644
--- a/dmaap-mediator-producer/internal/config/registrator.go
+++ b/dmaap-mediator-producer/internal/config/registrator.go
@@ -27,7 +27,6 @@
 
 	log "github.com/sirupsen/logrus"
 
-	"oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
 	"oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
 )
 
@@ -35,25 +34,38 @@
 const registerProducerPath = "/data-producer/v1/info-producers/"
 const typeSchema = `{"type": "object","properties": {},"additionalProperties": false}`
 
+type TypeDefinition struct {
+	Id            string `json:"id"`
+	DmaapTopicURL string `json:"dmaapTopicUrl"`
+}
+
+type ProducerRegistrationInfo struct {
+	InfoProducerSupervisionCallbackUrl string   `json:"info_producer_supervision_callback_url"`
+	SupportedInfoTypes                 []string `json:"supported_info_types"`
+	InfoJobCallbackUrl                 string   `json:"info_job_callback_url"`
+}
+
 type Registrator interface {
-	RegisterTypes(types []*jobs.TypeData) error
+	RegisterTypes(types []TypeDefinition) error
 	RegisterProducer(producerId string, producerInfo *ProducerRegistrationInfo)
 }
 
 type RegistratorImpl struct {
 	infoCoordinatorAddress string
+	httpClient             restclient.HTTPClient
 }
 
-func NewRegistratorImpl(infoCoordAddr string) *RegistratorImpl {
+func NewRegistratorImpl(infoCoordAddr string, client restclient.HTTPClient) *RegistratorImpl {
 	return &RegistratorImpl{
 		infoCoordinatorAddress: infoCoordAddr,
+		httpClient:             client,
 	}
 }
 
-func (r RegistratorImpl) RegisterTypes(jobTypes []jobs.TypeData) error {
+func (r RegistratorImpl) RegisterTypes(jobTypes []TypeDefinition) error {
 	for _, jobType := range jobTypes {
 		body := fmt.Sprintf(`{"info_job_data_schema": %v}`, typeSchema)
-		if error := restclient.Put(r.infoCoordinatorAddress+registerTypePath+url.PathEscape(jobType.TypeId), []byte(body)); error != nil {
+		if error := restclient.Put(r.infoCoordinatorAddress+registerTypePath+url.PathEscape(jobType.Id), []byte(body), r.httpClient); error != nil {
 			return error
 		}
 		log.Debugf("Registered type: %v", jobType)
@@ -63,7 +75,7 @@
 
 func (r RegistratorImpl) RegisterProducer(producerId string, producerInfo *ProducerRegistrationInfo) error {
 	if body, marshalErr := json.Marshal(producerInfo); marshalErr == nil {
-		if putErr := restclient.Put(r.infoCoordinatorAddress+registerProducerPath+url.PathEscape(producerId), []byte(body)); putErr != nil {
+		if putErr := restclient.Put(r.infoCoordinatorAddress+registerProducerPath+url.PathEscape(producerId), []byte(body), r.httpClient); putErr != nil {
 			return putErr
 		}
 		log.Debugf("Registered producer: %v", producerId)
diff --git a/dmaap-mediator-producer/internal/config/registrator_test.go b/dmaap-mediator-producer/internal/config/registrator_test.go
index 353e9de..2cffa2c 100644
--- a/dmaap-mediator-producer/internal/config/registrator_test.go
+++ b/dmaap-mediator-producer/internal/config/registrator_test.go
@@ -27,28 +27,24 @@
 
 	"github.com/stretchr/testify/mock"
 	"github.com/stretchr/testify/require"
-	"oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
-	"oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
-	"oransc.org/nonrtric/dmaapmediatorproducer/mocks"
+	"oransc.org/nonrtric/dmaapmediatorproducer/mocks/httpclient"
 )
 
 func TestRegisterTypes(t *testing.T) {
 	assertions := require.New(t)
 
-	clientMock := mocks.HTTPClient{}
+	clientMock := httpclient.HTTPClient{}
 
 	clientMock.On("Do", mock.Anything).Return(&http.Response{
 		StatusCode: http.StatusCreated,
 	}, nil)
 
-	restclient.Client = &clientMock
-
-	type1 := jobs.TypeData{
-		TypeId: "Type1",
+	type1 := TypeDefinition{
+		Id: "Type1",
 	}
-	types := []jobs.TypeData{type1}
+	types := []TypeDefinition{type1}
 
-	r := NewRegistratorImpl("http://localhost:9990")
+	r := NewRegistratorImpl("http://localhost:9990", &clientMock)
 	err := r.RegisterTypes(types)
 
 	assertions.Nil(err)
@@ -71,21 +67,19 @@
 func TestRegisterProducer(t *testing.T) {
 	assertions := require.New(t)
 
-	clientMock := mocks.HTTPClient{}
+	clientMock := httpclient.HTTPClient{}
 
 	clientMock.On("Do", mock.Anything).Return(&http.Response{
 		StatusCode: http.StatusCreated,
 	}, nil)
 
-	restclient.Client = &clientMock
-
 	producer := ProducerRegistrationInfo{
 		InfoProducerSupervisionCallbackUrl: "supervisionCallbackUrl",
 		SupportedInfoTypes:                 []string{"type1"},
 		InfoJobCallbackUrl:                 "jobCallbackUrl",
 	}
 
-	r := NewRegistratorImpl("http://localhost:9990")
+	r := NewRegistratorImpl("http://localhost:9990", &clientMock)
 	err := r.RegisterProducer("Producer1", &producer)
 
 	assertions.Nil(err)
diff --git a/dmaap-mediator-producer/internal/jobs/jobs.go b/dmaap-mediator-producer/internal/jobs/jobs.go
index e5a1070..7b21b00 100644
--- a/dmaap-mediator-producer/internal/jobs/jobs.go
+++ b/dmaap-mediator-producer/internal/jobs/jobs.go
@@ -27,17 +27,10 @@
 	"sync"
 
 	log "github.com/sirupsen/logrus"
+	"oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
 	"oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
 )
 
-type TypeDefinitions struct {
-	Types []TypeDefinition `json:"types"`
-}
-type TypeDefinition struct {
-	Id            string `json:"id"`
-	DmaapTopicURL string `json:"dmaapTopicUrl"`
-}
-
 type TypeData struct {
 	TypeId        string `json:"id"`
 	DMaaPTopicURL string `json:"dmaapTopicUrl"`
@@ -53,33 +46,38 @@
 	InfoTypeIdentity string      `json:"info_type_identity"`
 }
 
+type JobTypeHandler interface {
+	GetTypes() ([]config.TypeDefinition, error)
+	GetSupportedTypes() []string
+}
+
 type JobHandler interface {
 	AddJob(JobInfo) error
 	DeleteJob(jobId string)
 }
 
-var (
-	mu         sync.Mutex
-	configFile = "configs/type_config.json"
-	Handler    JobHandler
-	allTypes   = make(map[string]TypeData)
-)
-
-func init() {
-	Handler = newJobHandlerImpl()
+type JobHandlerImpl struct {
+	mu               sync.Mutex
+	configFile       string
+	allTypes         map[string]TypeData
+	pollClient       restclient.HTTPClient
+	distributeClient restclient.HTTPClient
 }
 
-type jobHandlerImpl struct{}
-
-func newJobHandlerImpl() *jobHandlerImpl {
-	return &jobHandlerImpl{}
+func NewJobHandlerImpl(typeConfigFilePath string, pollClient restclient.HTTPClient, distributeClient restclient.HTTPClient) *JobHandlerImpl {
+	return &JobHandlerImpl{
+		configFile:       typeConfigFilePath,
+		allTypes:         make(map[string]TypeData),
+		pollClient:       pollClient,
+		distributeClient: distributeClient,
+	}
 }
 
-func (jh *jobHandlerImpl) AddJob(ji JobInfo) error {
-	mu.Lock()
-	defer mu.Unlock()
-	if err := validateJobInfo(ji); err == nil {
-		jobs := allTypes[ji.InfoTypeIdentity].Jobs
+func (jh *JobHandlerImpl) AddJob(ji JobInfo) error {
+	jh.mu.Lock()
+	defer jh.mu.Unlock()
+	if err := jh.validateJobInfo(ji); err == nil {
+		jobs := jh.allTypes[ji.InfoTypeIdentity].Jobs
 		jobs[ji.InfoJobIdentity] = ji
 		log.Debug("Added job: ", ji)
 		return nil
@@ -88,17 +86,17 @@
 	}
 }
 
-func (jh *jobHandlerImpl) DeleteJob(jobId string) {
-	mu.Lock()
-	defer mu.Unlock()
-	for _, typeData := range allTypes {
+func (jh *JobHandlerImpl) DeleteJob(jobId string) {
+	jh.mu.Lock()
+	defer jh.mu.Unlock()
+	for _, typeData := range jh.allTypes {
 		delete(typeData.Jobs, jobId)
 	}
 	log.Debug("Deleted job: ", jobId)
 }
 
-func validateJobInfo(ji JobInfo) error {
-	if _, ok := allTypes[ji.InfoTypeIdentity]; !ok {
+func (jh *JobHandlerImpl) validateJobInfo(ji JobInfo) error {
+	if _, ok := jh.allTypes[ji.InfoTypeIdentity]; !ok {
 		return fmt.Errorf("type not supported: %v", ji.InfoTypeIdentity)
 	}
 	if ji.InfoJobIdentity == "" {
@@ -111,86 +109,75 @@
 	return nil
 }
 
-func GetTypes() ([]TypeData, error) {
-	mu.Lock()
-	defer mu.Unlock()
-	types := make([]TypeData, 0, 1)
-	typeDefsByte, err := os.ReadFile(configFile)
+func (jh *JobHandlerImpl) GetTypes() ([]config.TypeDefinition, error) {
+	jh.mu.Lock()
+	defer jh.mu.Unlock()
+	typeDefsByte, err := os.ReadFile(jh.configFile)
 	if err != nil {
 		return nil, err
 	}
-	typeDefs := TypeDefinitions{}
+	typeDefs := struct {
+		Types []config.TypeDefinition `json:"types"`
+	}{}
 	err = json.Unmarshal(typeDefsByte, &typeDefs)
 	if err != nil {
 		return nil, err
 	}
 	for _, typeDef := range typeDefs.Types {
-		typeInfo := TypeData{
+		jh.allTypes[typeDef.Id] = TypeData{
 			TypeId:        typeDef.Id,
 			DMaaPTopicURL: typeDef.DmaapTopicURL,
 			Jobs:          make(map[string]JobInfo),
 		}
-		if _, ok := allTypes[typeInfo.TypeId]; !ok {
-			allTypes[typeInfo.TypeId] = typeInfo
-		}
-		types = append(types, typeInfo)
 	}
-	return types, nil
+	return typeDefs.Types, nil
 }
 
-func GetSupportedTypes() []string {
-	mu.Lock()
-	defer mu.Unlock()
+func (jh *JobHandlerImpl) GetSupportedTypes() []string {
+	jh.mu.Lock()
+	defer jh.mu.Unlock()
 	supportedTypes := []string{}
-	for k := range allTypes {
+	for k := range jh.allTypes {
 		supportedTypes = append(supportedTypes, k)
 	}
 	return supportedTypes
 }
 
-func AddJob(job JobInfo) error {
-	return Handler.AddJob(job)
-}
-
-func DeleteJob(jobId string) {
-	Handler.DeleteJob(jobId)
-}
-
-func RunJobs(mRAddress string) {
+func (jh *JobHandlerImpl) RunJobs(mRAddress string) {
 	for {
-		pollAndDistributeMessages(mRAddress)
+		jh.pollAndDistributeMessages(mRAddress)
 	}
 }
 
-func pollAndDistributeMessages(mRAddress string) {
-	for typeId, typeInfo := range allTypes {
+func (jh *JobHandlerImpl) pollAndDistributeMessages(mRAddress string) {
+	jh.mu.Lock()
+	defer jh.mu.Unlock()
+	for typeId, typeInfo := range jh.allTypes {
 		log.Debugf("Processing jobs for type: %v", typeId)
-		messagesBody, error := restclient.Get(fmt.Sprintf("%v/%v", mRAddress, typeInfo.DMaaPTopicURL))
+		messagesBody, error := restclient.Get(fmt.Sprintf("%v/%v", mRAddress, typeInfo.DMaaPTopicURL), jh.pollClient)
 		if error != nil {
 			log.Warnf("Error getting data from MR. Cause: %v", error)
 			continue
 		}
-		distributeMessages(messagesBody, typeInfo)
+		jh.distributeMessages(messagesBody, typeInfo)
 	}
 }
 
-func distributeMessages(messages []byte, typeInfo TypeData) {
+func (jh *JobHandlerImpl) distributeMessages(messages []byte, typeInfo TypeData) {
 	if len(messages) > 2 {
-		mu.Lock()
 		for _, jobInfo := range typeInfo.Jobs {
-			go sendMessagesToConsumer(messages, jobInfo)
+			go jh.sendMessagesToConsumer(messages, jobInfo)
 		}
-		mu.Unlock()
 	}
 }
 
-func sendMessagesToConsumer(messages []byte, jobInfo JobInfo) {
+func (jh *JobHandlerImpl) sendMessagesToConsumer(messages []byte, jobInfo JobInfo) {
 	log.Debugf("Processing job: %v", jobInfo.InfoJobIdentity)
-	if postErr := restclient.Post(jobInfo.TargetUri, messages); postErr != nil {
+	if postErr := restclient.Post(jobInfo.TargetUri, messages, jh.distributeClient); postErr != nil {
 		log.Warnf("Error posting data for job: %v. Cause: %v", jobInfo, postErr)
 	}
 }
 
-func clearAll() {
-	allTypes = make(map[string]TypeData)
+func (jh *JobHandlerImpl) clearAll() {
+	jh.allTypes = make(map[string]TypeData)
 }
diff --git a/dmaap-mediator-producer/internal/jobs/jobs_test.go b/dmaap-mediator-producer/internal/jobs/jobs_test.go
index 6f29227..b301c2f 100644
--- a/dmaap-mediator-producer/internal/jobs/jobs_test.go
+++ b/dmaap-mediator-producer/internal/jobs/jobs_test.go
@@ -31,7 +31,6 @@
 	"time"
 
 	"github.com/stretchr/testify/require"
-	"oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
 )
 
 const typeDefinition = `{"types": [{"id": "type1", "dmaapTopicUrl": "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1"}]}`
@@ -42,31 +41,31 @@
 	if err != nil {
 		t.Errorf("Unable to create temporary directory for types due to: %v", err)
 	}
+	fname := filepath.Join(typesDir, "type_config.json")
+	handlerUnderTest := NewJobHandlerImpl(fname, nil, nil)
 	t.Cleanup(func() {
 		os.RemoveAll(typesDir)
-		clearAll()
+		handlerUnderTest.clearAll()
 	})
-	fname := filepath.Join(typesDir, "type_config.json")
-	configFile = fname
 	if err = os.WriteFile(fname, []byte(typeDefinition), 0666); err != nil {
 		t.Errorf("Unable to create temporary config file for types due to: %v", err)
 	}
-	types, err := GetTypes()
-	wantedType := TypeData{
-		TypeId:        "type1",
-		DMaaPTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1",
-		Jobs:          make(map[string]JobInfo),
+	types, err := handlerUnderTest.GetTypes()
+	wantedType := TypeDefinition{
+		Id:            "type1",
+		DmaapTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1",
 	}
-	wantedTypes := []TypeData{wantedType}
+	wantedTypes := []TypeDefinition{wantedType}
 	assertions.EqualValues(wantedTypes, types)
 	assertions.Nil(err)
 
-	supportedTypes := GetSupportedTypes()
+	supportedTypes := handlerUnderTest.GetSupportedTypes()
 	assertions.EqualValues([]string{"type1"}, supportedTypes)
 }
 
 func TestAddJobWhenTypeIsSupported_shouldAddJobToAllJobsMap(t *testing.T) {
 	assertions := require.New(t)
+	handlerUnderTest := NewJobHandlerImpl("", nil, nil)
 	wantedJob := JobInfo{
 		Owner:            "owner",
 		LastUpdated:      "now",
@@ -75,66 +74,72 @@
 		InfoJobData:      "{}",
 		InfoTypeIdentity: "type1",
 	}
-	allTypes["type1"] = TypeData{
+	handlerUnderTest.allTypes["type1"] = TypeData{
 		TypeId: "type1",
 		Jobs:   map[string]JobInfo{"job1": wantedJob},
 	}
 	t.Cleanup(func() {
-		clearAll()
+		handlerUnderTest.clearAll()
 	})
 
-	err := AddJob(wantedJob)
+	err := handlerUnderTest.AddJob(wantedJob)
 	assertions.Nil(err)
-	assertions.Equal(1, len(allTypes["type1"].Jobs))
-	assertions.Equal(wantedJob, allTypes["type1"].Jobs["job1"])
+	assertions.Equal(1, len(handlerUnderTest.allTypes["type1"].Jobs))
+	assertions.Equal(wantedJob, handlerUnderTest.allTypes["type1"].Jobs["job1"])
 }
 
 func TestAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) {
 	assertions := require.New(t)
+	handlerUnderTest := NewJobHandlerImpl("", nil, nil)
 	jobInfo := JobInfo{
 		InfoTypeIdentity: "type1",
 	}
 
-	err := AddJob(jobInfo)
+	err := handlerUnderTest.AddJob(jobInfo)
 	assertions.NotNil(err)
 	assertions.Equal("type not supported: type1", err.Error())
 }
 
 func TestAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) {
 	assertions := require.New(t)
-	allTypes["type1"] = TypeData{
+	handlerUnderTest := NewJobHandlerImpl("", nil, nil)
+	handlerUnderTest.allTypes["type1"] = TypeData{
 		TypeId: "type1",
 	}
 	t.Cleanup(func() {
-		clearAll()
+		handlerUnderTest.clearAll()
 	})
+
 	jobInfo := JobInfo{
 		InfoTypeIdentity: "type1",
 	}
-
-	err := AddJob(jobInfo)
+	err := handlerUnderTest.AddJob(jobInfo)
 	assertions.NotNil(err)
 	assertions.Equal("missing required job identity: {    <nil> type1}", err.Error())
 }
 
 func TestAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) {
 	assertions := require.New(t)
-	allTypes["type1"] = TypeData{
+	handlerUnderTest := NewJobHandlerImpl("", nil, nil)
+	handlerUnderTest.allTypes["type1"] = TypeData{
 		TypeId: "type1",
 	}
+	t.Cleanup(func() {
+		handlerUnderTest.clearAll()
+	})
+
 	jobInfo := JobInfo{
 		InfoTypeIdentity: "type1",
 		InfoJobIdentity:  "job1",
 	}
-
-	err := AddJob(jobInfo)
+	err := handlerUnderTest.AddJob(jobInfo)
 	assertions.NotNil(err)
 	assertions.Equal("missing required target URI: {  job1  <nil> type1}", err.Error())
-	clearAll()
 }
 
 func TestDeleteJob(t *testing.T) {
 	assertions := require.New(t)
+	handlerUnderTest := NewJobHandlerImpl("", nil, nil)
 	jobToKeep := JobInfo{
 		InfoJobIdentity:  "job1",
 		InfoTypeIdentity: "type1",
@@ -143,52 +148,44 @@
 		InfoJobIdentity:  "job2",
 		InfoTypeIdentity: "type1",
 	}
-	allTypes["type1"] = TypeData{
+	handlerUnderTest.allTypes["type1"] = TypeData{
 		TypeId: "type1",
 		Jobs:   map[string]JobInfo{"job1": jobToKeep, "job2": jobToDelete},
 	}
 	t.Cleanup(func() {
-		clearAll()
+		handlerUnderTest.clearAll()
 	})
 
-	DeleteJob("job2")
-	assertions.Equal(1, len(allTypes["type1"].Jobs))
-	assertions.Equal(jobToKeep, allTypes["type1"].Jobs["job1"])
+	handlerUnderTest.DeleteJob("job2")
+	assertions.Equal(1, len(handlerUnderTest.allTypes["type1"].Jobs))
+	assertions.Equal(jobToKeep, handlerUnderTest.allTypes["type1"].Jobs["job1"])
 }
 
 func TestPollAndDistributeMessages(t *testing.T) {
 	assertions := require.New(t)
-	jobInfo := JobInfo{
-		InfoTypeIdentity: "type1",
-		InfoJobIdentity:  "job1",
-		TargetUri:        "http://consumerHost/target",
-	}
-	allTypes["type1"] = TypeData{
-		TypeId:        "type1",
-		DMaaPTopicURL: "topicUrl",
-		Jobs:          map[string]JobInfo{"job1": jobInfo},
-	}
-	t.Cleanup(func() {
-		clearAll()
-	})
 
 	wg := sync.WaitGroup{}
-	wg.Add(2) // Two calls should be made to the server, one to poll and one to distribute
 	messages := `[{"message": {"data": "data"}}]`
-	clientMock := NewTestClient(func(req *http.Request) *http.Response {
+	pollClientMock := NewTestClient(func(req *http.Request) *http.Response {
 		if req.URL.String() == "http://mrAddr/topicUrl" {
 			assertions.Equal(req.Method, "GET")
-			wg.Done()
+			wg.Done() // Signal that the poll call has been made
 			return &http.Response{
 				StatusCode: 200,
 				Body:       ioutil.NopCloser(bytes.NewReader([]byte(messages))),
 				Header:     make(http.Header), // Must be set to non-nil value or it panics
 			}
-		} else if req.URL.String() == "http://consumerHost/target" {
+		}
+		t.Error("Wrong call to client: ", req)
+		t.Fail()
+		return nil
+	})
+	distributeClientMock := NewTestClient(func(req *http.Request) *http.Response {
+		if req.URL.String() == "http://consumerHost/target" {
 			assertions.Equal(req.Method, "POST")
 			assertions.Equal(messages, getBodyAsString(req))
 			assertions.Equal("application/json; charset=utf-8", req.Header.Get("Content-Type"))
-			wg.Done()
+			wg.Done() // Signal that the distribution call has been made
 			return &http.Response{
 				StatusCode: 200,
 				Body:       ioutil.NopCloser(bytes.NewBufferString(`OK`)),
@@ -199,10 +196,23 @@
 		t.Fail()
 		return nil
 	})
+	handlerUnderTest := NewJobHandlerImpl("", pollClientMock, distributeClientMock)
+	jobInfo := JobInfo{
+		InfoTypeIdentity: "type1",
+		InfoJobIdentity:  "job1",
+		TargetUri:        "http://consumerHost/target",
+	}
+	handlerUnderTest.allTypes["type1"] = TypeData{
+		TypeId:        "type1",
+		DMaaPTopicURL: "topicUrl",
+		Jobs:          map[string]JobInfo{"job1": jobInfo},
+	}
+	t.Cleanup(func() {
+		handlerUnderTest.clearAll()
+	})
 
-	restclient.Client = clientMock
-
-	pollAndDistributeMessages("http://mrAddr")
+	wg.Add(2) // Two calls should be made to the server, one to poll and one to distribute
+	handlerUnderTest.pollAndDistributeMessages("http://mrAddr")
 
 	if waitTimeout(&wg, 100*time.Millisecond) {
 		t.Error("Not all calls to server were made")
diff --git a/dmaap-mediator-producer/internal/restclient/HTTPClient.go b/dmaap-mediator-producer/internal/restclient/HTTPClient.go
index a783f7e..2b3a0cf 100644
--- a/dmaap-mediator-producer/internal/restclient/HTTPClient.go
+++ b/dmaap-mediator-producer/internal/restclient/HTTPClient.go
@@ -43,47 +43,35 @@
 	return fmt.Sprintf("Request failed due to error response with status: %v and body: %v", pe.StatusCode, string(pe.Body))
 }
 
-var (
-	Client HTTPClient
-)
-
-func init() {
-	Client = &http.Client{}
-}
-
-func Get(url string) ([]byte, error) {
-	if response, err := Client.Get(url); err == nil {
-		defer response.Body.Close()
-		if responseData, err := io.ReadAll(response.Body); err == nil {
-			if isResponseSuccess(response.StatusCode) {
+func Get(url string, client HTTPClient) ([]byte, error) {
+	if response, err := client.Get(url); err == nil {
+		if isResponseSuccess(response.StatusCode) {
+			defer response.Body.Close()
+			if responseData, err := io.ReadAll(response.Body); err == nil {
 				return responseData, nil
 			} else {
-				requestError := RequestError{
-					StatusCode: response.StatusCode,
-					Body:       responseData,
-				}
-				return nil, requestError
+				return nil, err
 			}
 		} else {
-			return nil, err
+			return nil, getRequestError(response)
 		}
 	} else {
 		return nil, err
 	}
 }
 
-func Put(url string, body []byte) error {
-	return do(http.MethodPut, url, body)
+func Put(url string, body []byte, client HTTPClient) error {
+	return do(http.MethodPut, url, body, client)
 }
 
-func Post(url string, body []byte) error {
-	return do(http.MethodPost, url, body)
+func Post(url string, body []byte, client HTTPClient) error {
+	return do(http.MethodPost, url, body, client)
 }
 
-func do(method string, url string, body []byte) error {
+func do(method string, url string, body []byte, client HTTPClient) error {
 	if req, reqErr := http.NewRequest(method, url, bytes.NewBuffer(body)); reqErr == nil {
 		req.Header.Set("Content-Type", "application/json; charset=utf-8")
-		if response, respErr := Client.Do(req); respErr == nil {
+		if response, respErr := client.Do(req); respErr == nil {
 			if isResponseSuccess(response.StatusCode) {
 				return nil
 			} else {
diff --git a/dmaap-mediator-producer/internal/restclient/HTTPClient_test.go b/dmaap-mediator-producer/internal/restclient/HTTPClient_test.go
index 3727e6a..7fe3cc7 100644
--- a/dmaap-mediator-producer/internal/restclient/HTTPClient_test.go
+++ b/dmaap-mediator-producer/internal/restclient/HTTPClient_test.go
@@ -23,58 +23,55 @@
 import (
 	"bytes"
 	"errors"
+	"fmt"
 	"io/ioutil"
 	"net/http"
-	"reflect"
 	"testing"
 
 	"github.com/stretchr/testify/mock"
 	"github.com/stretchr/testify/require"
-	"oransc.org/nonrtric/dmaapmediatorproducer/mocks"
+	"oransc.org/nonrtric/dmaapmediatorproducer/mocks/httpclient"
 )
 
-func TestGet(t *testing.T) {
-	clientMock := mocks.HTTPClient{}
-
-	clientMock.On("Get", "http://testOk").Return(&http.Response{
-		StatusCode: http.StatusOK,
-		Body:       ioutil.NopCloser(bytes.NewReader([]byte("Response"))),
-	}, nil)
-
-	clientMock.On("Get", "http://testNotOk").Return(&http.Response{
+func TestRequestError_Error(t *testing.T) {
+	assertions := require.New(t)
+	actualError := RequestError{
 		StatusCode: http.StatusBadRequest,
-		Body:       ioutil.NopCloser(bytes.NewReader([]byte("Bad Response"))),
-	}, nil)
-
-	clientMock.On("Get", "http://testError").Return(nil, errors.New("Failed Request"))
-
-	Client = &clientMock
-
+		Body:       []byte("error"),
+	}
+	assertions.Equal("Request failed due to error response with status: 400 and body: error", actualError.Error())
+}
+func TestGet(t *testing.T) {
+	assertions := require.New(t)
 	type args struct {
-		url string
+		url              string
+		mockReturnStatus int
+		mockReturnBody   string
+		mockReturnError  error
 	}
 	tests := []struct {
 		name        string
 		args        args
 		want        []byte
-		wantErr     bool
 		wantedError error
 	}{
 		{
 			name: "Test Get with OK response",
 			args: args{
-				url: "http://testOk",
+				url:              "http://testOk",
+				mockReturnStatus: http.StatusOK,
+				mockReturnBody:   "Response",
 			},
-			want:    []byte("Response"),
-			wantErr: false,
+			want: []byte("Response"),
 		},
 		{
 			name: "Test Get with Not OK response",
 			args: args{
-				url: "http://testNotOk",
+				url:              "http://testNotOk",
+				mockReturnStatus: http.StatusBadRequest,
+				mockReturnBody:   "Bad Response",
 			},
-			want:    nil,
-			wantErr: true,
+			want: nil,
 			wantedError: RequestError{
 				StatusCode: http.StatusBadRequest,
 				Body:       []byte("Bad Response"),
@@ -83,40 +80,38 @@
 		{
 			name: "Test Get with error",
 			args: args{
-				url: "http://testError",
+				url:             "http://testError",
+				mockReturnError: errors.New("Failed Request"),
 			},
 			want:        nil,
-			wantErr:     true,
 			wantedError: errors.New("Failed Request"),
 		},
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			got, err := Get(tt.args.url)
-			if (err != nil) != tt.wantErr {
-				t.Errorf("Get() error = %v, wantErr %v", err, tt.wantErr)
-				return
-			}
-			if !reflect.DeepEqual(got, tt.want) {
-				t.Errorf("Get() = %v, want %v", got, tt.want)
-			}
-			if tt.wantErr && err.Error() != tt.wantedError.Error() {
-				t.Errorf("Get() error = %v, wantedError % v", err, tt.wantedError.Error())
-			}
+			clientMock := httpclient.HTTPClient{}
+			clientMock.On("Get", tt.args.url).Return(&http.Response{
+				StatusCode: tt.args.mockReturnStatus,
+				Body:       ioutil.NopCloser(bytes.NewReader([]byte(tt.args.mockReturnBody))),
+			}, tt.args.mockReturnError)
+
+			got, err := Get(tt.args.url, &clientMock)
+			assertions.Equal(tt.wantedError, err, tt.name)
+			assertions.Equal(tt.want, got, tt.name)
+			clientMock.AssertCalled(t, "Get", tt.args.url)
 		})
 	}
 }
 
 func TestPutOk(t *testing.T) {
 	assertions := require.New(t)
-	clientMock := mocks.HTTPClient{}
+	clientMock := httpclient.HTTPClient{}
 
 	clientMock.On("Do", mock.Anything).Return(&http.Response{
 		StatusCode: http.StatusOK,
 	}, nil)
 
-	Client = &clientMock
-	if err := Put("http://localhost:9990", []byte("body")); err != nil {
+	if err := Put("http://localhost:9990", []byte("body"), &clientMock); err != nil {
 		t.Errorf("Put() error = %v, did not want error", err)
 	}
 	var actualRequest *http.Request
@@ -134,31 +129,76 @@
 	clientMock.AssertNumberOfCalls(t, "Do", 1)
 }
 
-func TestPutBadResponse(t *testing.T) {
+func TestPostOk(t *testing.T) {
 	assertions := require.New(t)
-	clientMock := mocks.HTTPClient{}
+	clientMock := httpclient.HTTPClient{}
 
 	clientMock.On("Do", mock.Anything).Return(&http.Response{
-		StatusCode: http.StatusBadRequest,
-		Body:       ioutil.NopCloser(bytes.NewReader([]byte("Bad Request"))),
+		StatusCode: http.StatusOK,
 	}, nil)
 
-	Client = &clientMock
-	err := Put("url", []byte("body"))
-	assertions.NotNil("Put() error = %v, wanted error", err)
-	expectedErrorMessage := "Request failed due to error response with status: 400 and body: Bad Request"
-	assertions.Equal(expectedErrorMessage, err.Error())
+	if err := Post("http://localhost:9990", []byte("body"), &clientMock); err != nil {
+		t.Errorf("Put() error = %v, did not want error", err)
+	}
+	var actualRequest *http.Request
+	clientMock.AssertCalled(t, "Do", mock.MatchedBy(func(req *http.Request) bool {
+		actualRequest = req
+		return true
+	}))
+	assertions.Equal(http.MethodPost, actualRequest.Method)
+	assertions.Equal("http", actualRequest.URL.Scheme)
+	assertions.Equal("localhost:9990", actualRequest.URL.Host)
+	assertions.Equal("application/json; charset=utf-8", actualRequest.Header.Get("Content-Type"))
+	body, _ := ioutil.ReadAll(actualRequest.Body)
+	expectedBody := []byte("body")
+	assertions.Equal(expectedBody, body)
+	clientMock.AssertNumberOfCalls(t, "Do", 1)
 }
 
-func TestPutError(t *testing.T) {
+func Test_doErrorCases(t *testing.T) {
 	assertions := require.New(t)
-	clientMock := mocks.HTTPClient{}
-
-	clientMock.On("Do", mock.Anything).Return(nil, errors.New("Failed Request"))
-
-	Client = &clientMock
-	err := Put("url", []byte("body"))
-	assertions.NotNil("Put() error = %v, wanted error", err)
-	expectedErrorMessage := "Failed Request"
-	assertions.Equal(expectedErrorMessage, err.Error())
+	type args struct {
+		url              string
+		mockReturnStatus int
+		mockReturnBody   []byte
+		mockReturnError  error
+	}
+	tests := []struct {
+		name    string
+		args    args
+		wantErr error
+	}{
+		{
+			name: "Bad request should get RequestError",
+			args: args{
+				url:              "badRequest",
+				mockReturnStatus: http.StatusBadRequest,
+				mockReturnBody:   []byte("bad request"),
+				mockReturnError:  nil,
+			},
+			wantErr: RequestError{
+				StatusCode: http.StatusBadRequest,
+				Body:       []byte("bad request"),
+			},
+		},
+		{
+			name: "Server unavailable should get error",
+			args: args{
+				url:             "serverUnavailable",
+				mockReturnError: fmt.Errorf("Server unavailable"),
+			},
+			wantErr: fmt.Errorf("Server unavailable"),
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			clientMock := httpclient.HTTPClient{}
+			clientMock.On("Do", mock.Anything).Return(&http.Response{
+				StatusCode: tt.args.mockReturnStatus,
+				Body:       ioutil.NopCloser(bytes.NewReader(tt.args.mockReturnBody)),
+			}, tt.args.mockReturnError)
+			err := do("PUT", tt.args.url, nil, &clientMock)
+			assertions.Equal(tt.wantErr, err, tt.name)
+		})
+	}
 }
diff --git a/dmaap-mediator-producer/internal/server/server.go b/dmaap-mediator-producer/internal/server/server.go
index d07b730..5861cbe 100644
--- a/dmaap-mediator-producer/internal/server/server.go
+++ b/dmaap-mediator-producer/internal/server/server.go
@@ -35,19 +35,32 @@
 const jobIdToken = "infoJobId"
 const deleteJobPath = AddJobPath + "/{" + jobIdToken + "}"
 
-func NewRouter() *mux.Router {
+type ProducerCallbackHandler struct {
+	jobHandler jobs.JobHandler
+}
+
+func NewProducerCallbackHandler(jh jobs.JobHandler) *ProducerCallbackHandler {
+	return &ProducerCallbackHandler{
+		jobHandler: jh,
+	}
+}
+
+func NewRouter(jh jobs.JobHandler) *mux.Router {
+	callbackHandler := NewProducerCallbackHandler(jh)
 	r := mux.NewRouter()
 	r.HandleFunc(StatusPath, statusHandler).Methods(http.MethodGet).Name("status")
-	r.HandleFunc(AddJobPath, addInfoJobHandler).Methods(http.MethodPost).Name("add")
-	r.HandleFunc(deleteJobPath, deleteInfoJobHandler).Methods(http.MethodDelete).Name("delete")
+	r.HandleFunc(AddJobPath, callbackHandler.addInfoJobHandler).Methods(http.MethodPost).Name("add")
+	r.HandleFunc(deleteJobPath, callbackHandler.deleteInfoJobHandler).Methods(http.MethodDelete).Name("delete")
 	r.NotFoundHandler = &notFoundHandler{}
 	r.MethodNotAllowedHandler = &methodNotAllowedHandler{}
 	return r
 }
 
-func statusHandler(w http.ResponseWriter, r *http.Request) {}
+func statusHandler(w http.ResponseWriter, r *http.Request) {
+	// Just respond OK to show the server is alive for now. Might be extended later.
+}
 
-func addInfoJobHandler(w http.ResponseWriter, r *http.Request) {
+func (h *ProducerCallbackHandler) addInfoJobHandler(w http.ResponseWriter, r *http.Request) {
 	b, readErr := ioutil.ReadAll(r.Body)
 	if readErr != nil {
 		http.Error(w, fmt.Sprintf("Unable to read body due to: %v", readErr), http.StatusBadRequest)
@@ -58,12 +71,12 @@
 		http.Error(w, fmt.Sprintf("Invalid json body. Cause: %v", unmarshalErr), http.StatusBadRequest)
 		return
 	}
-	if err := jobs.AddJob(jobInfo); err != nil {
+	if err := h.jobHandler.AddJob(jobInfo); err != nil {
 		http.Error(w, fmt.Sprintf("Invalid job info. Cause: %v", err), http.StatusBadRequest)
 	}
 }
 
-func deleteInfoJobHandler(w http.ResponseWriter, r *http.Request) {
+func (h *ProducerCallbackHandler) deleteInfoJobHandler(w http.ResponseWriter, r *http.Request) {
 	vars := mux.Vars(r)
 	id, ok := vars[jobIdToken]
 	if !ok {
@@ -71,7 +84,7 @@
 		return
 	}
 
-	jobs.DeleteJob(id)
+	h.jobHandler.DeleteJob(id)
 }
 
 type notFoundHandler struct{}
diff --git a/dmaap-mediator-producer/internal/server/server_test.go b/dmaap-mediator-producer/internal/server/server_test.go
index 5943914..0888507 100644
--- a/dmaap-mediator-producer/internal/server/server_test.go
+++ b/dmaap-mediator-producer/internal/server/server_test.go
@@ -39,7 +39,7 @@
 
 func TestNewRouter(t *testing.T) {
 	assertions := require.New(t)
-	r := NewRouter()
+	r := NewRouter(nil)
 	statusRoute := r.Get("status")
 	assertions.NotNil(statusRoute)
 	supportedMethods, err := statusRoute.GetMethods()
@@ -63,7 +63,6 @@
 	responseRecorder := httptest.NewRecorder()
 	handler.ServeHTTP(responseRecorder, newRequest("GET", "/wrong", nil, t))
 	assertions.Equal(http.StatusNotFound, responseRecorder.Code)
-
 	assertions.Contains(responseRecorder.Body.String(), "404 not found.")
 
 	methodNotAllowedHandler := r.MethodNotAllowedHandler
@@ -71,7 +70,6 @@
 	responseRecorder = httptest.NewRecorder()
 	handler.ServeHTTP(responseRecorder, newRequest(http.MethodPut, "/status", nil, t))
 	assertions.Equal(http.StatusMethodNotAllowed, responseRecorder.Code)
-
 	assertions.Contains(responseRecorder.Body.String(), "Method is not supported.")
 }
 
@@ -88,51 +86,39 @@
 
 func TestAddInfoJobHandler(t *testing.T) {
 	assertions := require.New(t)
-	jobHandlerMock := jobhandler.JobHandler{}
-
-	goodJobInfo := jobs.JobInfo{
-		Owner:            "owner",
-		LastUpdated:      "now",
-		InfoJobIdentity:  "jobId",
-		TargetUri:        "target",
-		InfoJobData:      "{}",
-		InfoTypeIdentity: "type",
-	}
-	badJobInfo := jobs.JobInfo{
-		Owner: "bad",
-	}
-	jobHandlerMock.On("AddJob", goodJobInfo).Return(nil)
-	jobHandlerMock.On("AddJob", badJobInfo).Return(errors.New("error"))
-	jobs.Handler = &jobHandlerMock
 
 	type args struct {
-		responseRecorder *httptest.ResponseRecorder
-		r                *http.Request
+		job        jobs.JobInfo
+		mockReturn error
 	}
 	tests := []struct {
 		name         string
 		args         args
 		wantedStatus int
 		wantedBody   string
-		assertFunc   assertMockFunk
 	}{
 		{
 			name: "AddInfoJobHandler with correct path and method, should return OK",
 			args: args{
-				responseRecorder: httptest.NewRecorder(),
-				r:                newRequest(http.MethodPost, "/jobs", &goodJobInfo, t),
+				job: jobs.JobInfo{
+					Owner:            "owner",
+					LastUpdated:      "now",
+					InfoJobIdentity:  "jobId",
+					TargetUri:        "target",
+					InfoJobData:      "{}",
+					InfoTypeIdentity: "type",
+				},
 			},
 			wantedStatus: http.StatusOK,
 			wantedBody:   "",
-			assertFunc: func(mock *jobhandler.JobHandler) {
-				mock.AssertCalled(t, "AddJob", goodJobInfo)
-			},
 		},
 		{
 			name: "AddInfoJobHandler with incorrect job info, should return BadRequest",
 			args: args{
-				responseRecorder: httptest.NewRecorder(),
-				r:                newRequest(http.MethodPost, "/jobs", &badJobInfo, t),
+				job: jobs.JobInfo{
+					Owner: "bad",
+				},
+				mockReturn: errors.New("error"),
 			},
 			wantedStatus: http.StatusBadRequest,
 			wantedBody:   "Invalid job info. Cause: error",
@@ -140,15 +126,19 @@
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			handler := http.HandlerFunc(addInfoJobHandler)
-			handler.ServeHTTP(tt.args.responseRecorder, tt.args.r)
-			assertions.Equal(tt.wantedStatus, tt.args.responseRecorder.Code, tt.name)
+			jobHandlerMock := jobhandler.JobHandler{}
+			jobHandlerMock.On("AddJob", tt.args.job).Return(tt.args.mockReturn)
+			callbackHandlerUnderTest := NewProducerCallbackHandler(&jobHandlerMock)
 
-			assertions.Contains(tt.args.responseRecorder.Body.String(), tt.wantedBody, tt.name)
+			handler := http.HandlerFunc(callbackHandlerUnderTest.addInfoJobHandler)
+			responseRecorder := httptest.NewRecorder()
+			r := newRequest(http.MethodPost, "/jobs", &tt.args.job, t)
 
-			if tt.assertFunc != nil {
-				tt.assertFunc(&jobHandlerMock)
-			}
+			handler.ServeHTTP(responseRecorder, r)
+
+			assertions.Equal(tt.wantedStatus, responseRecorder.Code, tt.name)
+			assertions.Contains(responseRecorder.Body.String(), tt.wantedBody, tt.name)
+			jobHandlerMock.AssertCalled(t, "AddJob", tt.args.job)
 		})
 	}
 }
@@ -158,11 +148,11 @@
 	jobHandlerMock := jobhandler.JobHandler{}
 
 	jobHandlerMock.On("DeleteJob", mock.Anything).Return(nil)
-	jobs.Handler = &jobHandlerMock
+	callbackHandlerUnderTest := NewProducerCallbackHandler(&jobHandlerMock)
 
 	responseRecorder := httptest.NewRecorder()
 	r := mux.SetURLVars(newRequest(http.MethodDelete, "/jobs/", nil, t), map[string]string{"infoJobId": "job1"})
-	handler := http.HandlerFunc(deleteInfoJobHandler)
+	handler := http.HandlerFunc(callbackHandlerUnderTest.deleteInfoJobHandler)
 	handler.ServeHTTP(responseRecorder, r)
 	assertions.Equal(http.StatusOK, responseRecorder.Result().StatusCode)
 
@@ -171,8 +161,6 @@
 	jobHandlerMock.AssertCalled(t, "DeleteJob", "job1")
 }
 
-type assertMockFunk func(mock *jobhandler.JobHandler)
-
 func newRequest(method string, url string, jobInfo *jobs.JobInfo, t *testing.T) *http.Request {
 	var body io.Reader
 	if jobInfo != nil {
diff --git a/dmaap-mediator-producer/main.go b/dmaap-mediator-producer/main.go
index 15207ec..beeb995 100644
--- a/dmaap-mediator-producer/main.go
+++ b/dmaap-mediator-producer/main.go
@@ -24,49 +24,45 @@
 	"fmt"
 	"net/http"
 	"sync"
+	"time"
 
 	log "github.com/sirupsen/logrus"
 	"oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
 	"oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
+	"oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
 	"oransc.org/nonrtric/dmaapmediatorproducer/internal/server"
 )
 
+const timeoutHTTPClient = time.Second * 5
+const timeoutPollClient = time.Second * 15
+
 var configuration *config.Config
-var callbackAddress string
+var httpClient restclient.HTTPClient
+var jobHandler *jobs.JobHandlerImpl
 
 func init() {
 	configuration = config.New()
-	if loglevel, err := log.ParseLevel(configuration.LogLevel); err == nil {
-		log.SetLevel(loglevel)
-	} else {
-		log.Warnf("Invalid log level: %v. Log level will be Info!", configuration.LogLevel)
-	}
-
-	log.Debug("Initializing DMaaP Mediator Producer")
-	if configuration.InfoProducerHost == "" {
-		log.Fatal("Missing INFO_PRODUCER_SUPERVISION_CALLBACK_HOST")
-	}
-	callbackAddress = fmt.Sprintf("%v:%v", configuration.InfoProducerHost, configuration.InfoProducerPort)
-
-	registrator := config.NewRegistratorImpl(configuration.InfoCoordinatorAddress)
-	if types, err := jobs.GetTypes(); err == nil {
-		if regErr := registrator.RegisterTypes(types); regErr != nil {
-			log.Fatalf("Unable to register all types due to: %v", regErr)
-		}
-	} else {
-		log.Fatalf("Unable to get types to register due to: %v", err)
-	}
-	producer := config.ProducerRegistrationInfo{
-		InfoProducerSupervisionCallbackUrl: callbackAddress + server.StatusPath,
-		SupportedInfoTypes:                 jobs.GetSupportedTypes(),
-		InfoJobCallbackUrl:                 callbackAddress + server.AddJobPath,
-	}
-	if err := registrator.RegisterProducer("DMaaP_Mediator_Producer", &producer); err != nil {
-		log.Fatalf("Unable to register producer due to: %v", err)
-	}
 }
 
 func main() {
+	log.SetLevel(configuration.LogLevel)
+	log.Debug("Initializing DMaaP Mediator Producer")
+	if err := validateConfiguration(configuration); err != nil {
+		log.Fatalf("Stopping producer due to error: %v", err)
+	}
+	callbackAddress := fmt.Sprintf("%v:%v", configuration.InfoProducerHost, configuration.InfoProducerPort)
+
+	httpClient = &http.Client{
+		Timeout: timeoutHTTPClient,
+	}
+	pollClient := &http.Client{
+		Timeout: timeoutPollClient,
+	}
+	jobHandler = jobs.NewJobHandlerImpl("configs/type_config.json", pollClient, httpClient)
+	if err := registerTypesAndProducer(jobHandler, configuration.InfoCoordinatorAddress, callbackAddress); err != nil {
+		log.Fatalf("Stopping producer due to: %v", err)
+	}
+
 	log.Debug("Starting DMaaP Mediator Producer")
 	wg := new(sync.WaitGroup)
 
@@ -75,13 +71,13 @@
 
 	log.Debugf("Starting callback server at port %v", configuration.InfoProducerPort)
 	go func() {
-		r := server.NewRouter()
+		r := server.NewRouter(jobHandler)
 		log.Warn(http.ListenAndServe(fmt.Sprintf(":%v", configuration.InfoProducerPort), r))
 		wg.Done()
 	}()
 
 	go func() {
-		jobs.RunJobs(fmt.Sprintf("%v:%v", configuration.MRHost, configuration.MRPort))
+		jobHandler.RunJobs(fmt.Sprintf("%v:%v", configuration.MRHost, configuration.MRPort))
 		wg.Done()
 	}()
 
@@ -89,3 +85,30 @@
 	wg.Wait()
 	log.Debug("Stopping DMaaP Mediator Producer")
 }
+
+func validateConfiguration(configuration *config.Config) error {
+	if configuration.InfoProducerHost == "" {
+		return fmt.Errorf("missing INFO_PRODUCER_HOST")
+	}
+	return nil
+}
+
+func registerTypesAndProducer(jobHandler jobs.JobTypeHandler, infoCoordinatorAddress string, callbackAddress string) error {
+	registrator := config.NewRegistratorImpl(infoCoordinatorAddress, httpClient)
+	if types, err := jobHandler.GetTypes(); err == nil {
+		if regErr := registrator.RegisterTypes(types); regErr != nil {
+			return fmt.Errorf("unable to register all types due to: %v", regErr)
+		}
+	} else {
+		return fmt.Errorf("unable to get types to register due to: %v", err)
+	}
+	producer := config.ProducerRegistrationInfo{
+		InfoProducerSupervisionCallbackUrl: callbackAddress + server.StatusPath,
+		SupportedInfoTypes:                 jobHandler.GetSupportedTypes(),
+		InfoJobCallbackUrl:                 callbackAddress + server.AddJobPath,
+	}
+	if err := registrator.RegisterProducer("DMaaP_Mediator_Producer", &producer); err != nil {
+		return fmt.Errorf("unable to register producer due to: %v", err)
+	}
+	return nil
+}
diff --git a/dmaap-mediator-producer/mocks/HTTPClient.go b/dmaap-mediator-producer/mocks/httpclient/HTTPClient.go
similarity index 93%
rename from dmaap-mediator-producer/mocks/HTTPClient.go
rename to dmaap-mediator-producer/mocks/httpclient/HTTPClient.go
index 3037798..ab399df 100644
--- a/dmaap-mediator-producer/mocks/HTTPClient.go
+++ b/dmaap-mediator-producer/mocks/httpclient/HTTPClient.go
@@ -1,6 +1,6 @@
-// Code generated by mockery v1.0.0. DO NOT EDIT.
+// Code generated by mockery v2.9.3. DO NOT EDIT.
 
-package mocks
+package httpclient
 
 import (
 	http "net/http"
diff --git a/dmaap-mediator-producer/simulator/consumersimulator.go b/dmaap-mediator-producer/simulator/consumersimulator.go
index 144f56f..03da6f4 100644
--- a/dmaap-mediator-producer/simulator/consumersimulator.go
+++ b/dmaap-mediator-producer/simulator/consumersimulator.go
@@ -26,18 +26,25 @@
 	"fmt"
 	"io"
 	http "net/http"
+	"time"
 
 	"oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
 )
 
+var httpClient http.Client
+
 func main() {
+	httpClient = http.Client{
+		Timeout: time.Second * 5,
+	}
 	port := flag.Int("port", 40935, "The port this consumer will listen on")
 	flag.Parse()
 	http.HandleFunc("/jobs", handleData)
 
+	registerJob(*port)
+
 	fmt.Print("Starting consumer on port: ", *port)
 	http.ListenAndServe(fmt.Sprintf(":%v", *port), nil)
-	registerJob(*port)
 }
 
 func registerJob(port int) {
@@ -49,7 +56,7 @@
 	}{fmt.Sprintf("test%v", port), fmt.Sprintf("http://localhost:%v/jobs", port), "STD_Fault_Messages", "{}"}
 	fmt.Print("Registering consumer: ", jobInfo)
 	body, _ := json.Marshal(jobInfo)
-	putErr := restclient.Put(fmt.Sprintf("http://localhost:8083/data-consumer/v1/info-jobs/job%v", port), body)
+	putErr := restclient.Put(fmt.Sprintf("http://localhost:8083/data-consumer/v1/info-jobs/job%v", port), body, &httpClient)
 	if putErr != nil {
 		fmt.Printf("Unable to register consumer: %v", putErr)
 	}