Improve concurrency of message sending

Issue-ID: NONRTRIC-635
Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
Change-Id: I414740a918eb65abae94c91990ff39a3cfc7bd22
diff --git a/dmaap-mediator-producer/Dockerfile b/dmaap-mediator-producer/Dockerfile
index bc09fdc..1c7f45c 100644
--- a/dmaap-mediator-producer/Dockerfile
+++ b/dmaap-mediator-producer/Dockerfile
@@ -20,7 +20,7 @@
 ##
 ## Build
 ##
-FROM golang:1.17-bullseye AS build
+FROM nexus3.o-ran-sc.org:10001/golang:1.17-bullseye AS build
 WORKDIR /app
 COPY go.mod .
 COPY go.sum .
diff --git a/dmaap-mediator-producer/README.md b/dmaap-mediator-producer/README.md
index 90f8471..2fd7194 100644
--- a/dmaap-mediator-producer/README.md
+++ b/dmaap-mediator-producer/README.md
@@ -36,7 +36,7 @@
 
 At start up the producer will register the configured job types in ICS and also register itself as a producer supporting these types. If ICS is unavailable, the producer will retry to connect indefinetely. The same goes for MR.
 
-Once the initial registration is done, the producer will constantly poll MR for all configured job types. When receiving messages for a type, it will distribute these messages to all jobs registered for the type. If no jobs for that type are registered, the messages will be discarded. If a consumer is unavailable for distribution, the messages will be discarded for that consumer.
+Once the initial registration is done, the producer will constantly poll MR for all configured job types. When receiving messages for a type, it will distribute these messages to all jobs registered for the type. If no jobs for that type are registered, the messages will be discarded. If a consumer is unavailable for distribution, the messages will be discarded for that consumer until it is available again.
 
 ## Development
 
diff --git a/dmaap-mediator-producer/internal/jobs/jobs.go b/dmaap-mediator-producer/internal/jobs/jobs.go
index 1c42942..d4694bf 100644
--- a/dmaap-mediator-producer/internal/jobs/jobs.go
+++ b/dmaap-mediator-producer/internal/jobs/jobs.go
@@ -34,7 +34,7 @@
 type TypeData struct {
 	TypeId        string `json:"id"`
 	DMaaPTopicURL string `json:"dmaapTopicUrl"`
-	jobHandler    *jobHandler
+	jobsHandler   *jobsHandler
 }
 
 type JobInfo struct {
@@ -64,17 +64,6 @@
 	distributeClient restclient.HTTPClient
 }
 
-type jobHandler struct {
-	mu               sync.Mutex
-	typeId           string
-	topicUrl         string
-	jobs             map[string]JobInfo
-	addJobCh         chan JobInfo
-	deleteJobCh      chan string
-	pollClient       restclient.HTTPClient
-	distributeClient restclient.HTTPClient
-}
-
 func NewJobsManagerImpl(typeConfigFilePath string, pollClient restclient.HTTPClient, mrAddr string, distributeClient restclient.HTTPClient) *JobsManagerImpl {
 	return &JobsManagerImpl{
 		configFile:       typeConfigFilePath,
@@ -88,7 +77,7 @@
 func (jm *JobsManagerImpl) AddJob(ji JobInfo) error {
 	if err := jm.validateJobInfo(ji); err == nil {
 		typeData := jm.allTypes[ji.InfoTypeIdentity]
-		typeData.jobHandler.addJobCh <- ji
+		typeData.jobsHandler.addJobCh <- ji
 		log.Debug("Added job: ", ji)
 		return nil
 	} else {
@@ -99,7 +88,7 @@
 func (jm *JobsManagerImpl) DeleteJob(jobId string) {
 	for _, typeData := range jm.allTypes {
 		log.Debugf("Deleting job %v from type %v", jobId, typeData.TypeId)
-		typeData.jobHandler.deleteJobCh <- jobId
+		typeData.jobsHandler.deleteJobCh <- jobId
 	}
 	log.Debug("Deleted job: ", jobId)
 }
@@ -131,21 +120,10 @@
 		return nil, err
 	}
 	for _, typeDef := range typeDefs.Types {
-		addCh := make(chan JobInfo)
-		deleteCh := make(chan string)
-		jh := jobHandler{
-			typeId:           typeDef.Id,
-			topicUrl:         typeDef.DmaapTopicURL,
-			jobs:             make(map[string]JobInfo),
-			addJobCh:         addCh,
-			deleteJobCh:      deleteCh,
-			pollClient:       jm.pollClient,
-			distributeClient: jm.distributeClient,
-		}
 		jm.allTypes[typeDef.Id] = TypeData{
 			TypeId:        typeDef.Id,
 			DMaaPTopicURL: typeDef.DmaapTopicURL,
-			jobHandler:    &jh,
+			jobsHandler:   newJobsHandler(typeDef.Id, typeDef.DmaapTopicURL, jm.pollClient, jm.distributeClient),
 		}
 	}
 	return typeDefs.Types, nil
@@ -162,12 +140,35 @@
 func (jm *JobsManagerImpl) StartJobs() {
 	for _, jobType := range jm.allTypes {
 
-		go jobType.jobHandler.start(jm.mrAddress)
+		go jobType.jobsHandler.start(jm.mrAddress)
 
 	}
 }
 
-func (jh *jobHandler) start(mRAddress string) {
+type jobsHandler struct {
+	mu               sync.Mutex
+	typeId           string
+	topicUrl         string
+	jobs             map[string]job
+	addJobCh         chan JobInfo
+	deleteJobCh      chan string
+	pollClient       restclient.HTTPClient
+	distributeClient restclient.HTTPClient
+}
+
+func newJobsHandler(typeId string, topicURL string, pollClient restclient.HTTPClient, distributeClient restclient.HTTPClient) *jobsHandler {
+	return &jobsHandler{
+		typeId:           typeId,
+		topicUrl:         topicURL,
+		jobs:             make(map[string]job),
+		addJobCh:         make(chan JobInfo),
+		deleteJobCh:      make(chan string),
+		pollClient:       pollClient,
+		distributeClient: distributeClient,
+	}
+}
+
+func (jh *jobsHandler) start(mRAddress string) {
 	go func() {
 		for {
 			jh.pollAndDistributeMessages(mRAddress)
@@ -181,45 +182,104 @@
 	}()
 }
 
-func (jh *jobHandler) pollAndDistributeMessages(mRAddress string) {
+func (jh *jobsHandler) pollAndDistributeMessages(mRAddress string) {
 	log.Debugf("Processing jobs for type: %v", jh.typeId)
 	messagesBody, error := restclient.Get(mRAddress+jh.topicUrl, jh.pollClient)
 	if error != nil {
-		log.Warnf("Error getting data from MR. Cause: %v", error)
+		log.Warn("Error getting data from MR. Cause: ", error)
 	}
-	log.Debugf("Received messages: %v", string(messagesBody))
+	log.Debug("Received messages: ", string(messagesBody))
 	jh.distributeMessages(messagesBody)
 }
 
-func (jh *jobHandler) distributeMessages(messages []byte) {
+func (jh *jobsHandler) distributeMessages(messages []byte) {
 	if len(messages) > 2 {
 		jh.mu.Lock()
 		defer jh.mu.Unlock()
-		for _, jobInfo := range jh.jobs {
-			go jh.sendMessagesToConsumer(messages, jobInfo)
+		for _, job := range jh.jobs {
+			if len(job.messagesChannel) < cap(job.messagesChannel) {
+				job.messagesChannel <- messages
+			} else {
+				jh.emptyMessagesBuffer(job)
+			}
 		}
 	}
 }
 
-func (jh *jobHandler) sendMessagesToConsumer(messages []byte, jobInfo JobInfo) {
-	log.Debugf("Processing job: %v", jobInfo.InfoJobIdentity)
-	if postErr := restclient.Post(jobInfo.TargetUri, messages, jh.distributeClient); postErr != nil {
-		log.Warnf("Error posting data for job: %v. Cause: %v", jobInfo, postErr)
+func (jh *jobsHandler) emptyMessagesBuffer(job job) {
+	log.Debug("Emptying message queue for job: ", job.jobInfo.InfoJobIdentity)
+out:
+	for {
+		select {
+		case <-job.messagesChannel:
+		default:
+			break out
+		}
 	}
-	log.Debugf("Messages distributed to consumer: %v.", jobInfo.Owner)
 }
 
-func (jh *jobHandler) monitorManagementChannels() {
+func (jh *jobsHandler) monitorManagementChannels() {
 	select {
 	case addedJob := <-jh.addJobCh:
-		jh.mu.Lock()
-		log.Debugf("received %v from addJobCh\n", addedJob)
-		jh.jobs[addedJob.InfoJobIdentity] = addedJob
-		jh.mu.Unlock()
+		jh.addJob(addedJob)
 	case deletedJob := <-jh.deleteJobCh:
-		jh.mu.Lock()
-		log.Debugf("received %v from deleteJobCh\n", deletedJob)
-		delete(jh.jobs, deletedJob)
-		jh.mu.Unlock()
+		jh.deleteJob(deletedJob)
 	}
 }
+
+func (jh *jobsHandler) addJob(addedJob JobInfo) {
+	jh.mu.Lock()
+	log.Debug("Add job: ", addedJob)
+	newJob := newJob(addedJob, jh.distributeClient)
+	go newJob.start()
+	jh.jobs[addedJob.InfoJobIdentity] = newJob
+	jh.mu.Unlock()
+}
+
+func (jh *jobsHandler) deleteJob(deletedJob string) {
+	jh.mu.Lock()
+	log.Debug("Delete job: ", deletedJob)
+	j, exist := jh.jobs[deletedJob]
+	if exist {
+		j.controlChannel <- struct{}{}
+		delete(jh.jobs, deletedJob)
+	}
+	jh.mu.Unlock()
+}
+
+type job struct {
+	jobInfo         JobInfo
+	client          restclient.HTTPClient
+	messagesChannel chan []byte
+	controlChannel  chan struct{}
+}
+
+func newJob(j JobInfo, c restclient.HTTPClient) job {
+	return job{
+		jobInfo:         j,
+		client:          c,
+		messagesChannel: make(chan []byte, 10),
+		controlChannel:  make(chan struct{}),
+	}
+}
+
+func (j *job) start() {
+out:
+	for {
+		select {
+		case <-j.controlChannel:
+			log.Debug("Stop distribution for job: ", j.jobInfo.InfoJobIdentity)
+			break out
+		case msg := <-j.messagesChannel:
+			j.sendMessagesToConsumer(msg)
+		}
+	}
+}
+
+func (j *job) sendMessagesToConsumer(messages []byte) {
+	log.Debug("Processing job: ", j.jobInfo.InfoJobIdentity)
+	if postErr := restclient.Post(j.jobInfo.TargetUri, messages, j.client); postErr != nil {
+		log.Warnf("Error posting data for job: %v. Cause: %v", j.jobInfo, postErr)
+	}
+	log.Debugf("Messages for job: %v distributed to consumer: %v", j.jobInfo.InfoJobIdentity, j.jobInfo.Owner)
+}
diff --git a/dmaap-mediator-producer/internal/jobs/jobs_test.go b/dmaap-mediator-producer/internal/jobs/jobs_test.go
index 3651a13..066823d 100644
--- a/dmaap-mediator-producer/internal/jobs/jobs_test.go
+++ b/dmaap-mediator-producer/internal/jobs/jobs_test.go
@@ -36,7 +36,7 @@
 
 const typeDefinition = `{"types": [{"id": "type1", "dmaapTopicUrl": "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1"}]}`
 
-func TestGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) {
+func TestJobsManagerGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) {
 	assertions := require.New(t)
 	typesDir, err := os.MkdirTemp("", "configs")
 	if err != nil {
@@ -63,7 +63,7 @@
 	assertions.EqualValues([]string{"type1"}, supportedTypes)
 }
 
-func TestManagerAddJobWhenTypeIsSupported_shouldAddJobToChannel(t *testing.T) {
+func TestJobsManagerAddJobWhenTypeIsSupported_shouldAddJobToChannel(t *testing.T) {
 	assertions := require.New(t)
 	managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
 	wantedJob := JobInfo{
@@ -74,11 +74,11 @@
 		InfoJobData:      "{}",
 		InfoTypeIdentity: "type1",
 	}
-	jobHandler := jobHandler{
+	jobsHandler := jobsHandler{
 		addJobCh: make(chan JobInfo)}
 	managerUnderTest.allTypes["type1"] = TypeData{
-		TypeId:     "type1",
-		jobHandler: &jobHandler,
+		TypeId:      "type1",
+		jobsHandler: &jobsHandler,
 	}
 
 	var err error
@@ -87,11 +87,11 @@
 	}()
 
 	assertions.Nil(err)
-	addedJob := <-jobHandler.addJobCh
+	addedJob := <-jobsHandler.addJobCh
 	assertions.Equal(wantedJob, addedJob)
 }
 
-func TestManagerAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) {
+func TestJobsManagerAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) {
 	assertions := require.New(t)
 	managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
 	jobInfo := JobInfo{
@@ -103,7 +103,7 @@
 	assertions.Equal("type not supported: type1", err.Error())
 }
 
-func TestManagerAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) {
+func TestJobsManagerAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) {
 	assertions := require.New(t)
 	managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
 	managerUnderTest.allTypes["type1"] = TypeData{
@@ -118,7 +118,7 @@
 	assertions.Equal("missing required job identity: {    <nil> type1}", err.Error())
 }
 
-func TestManagerAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) {
+func TestJobsManagerAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) {
 	assertions := require.New(t)
 	managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
 	managerUnderTest.allTypes["type1"] = TypeData{
@@ -134,33 +134,37 @@
 	assertions.Equal("missing required target URI: {  job1  <nil> type1}", err.Error())
 }
 
-func TestManagerDeleteJob(t *testing.T) {
+func TestJobsManagerDeleteJob_shouldSendDeleteToChannel(t *testing.T) {
 	assertions := require.New(t)
 	managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
-	jobHandler := jobHandler{
+	jobsHandler := jobsHandler{
 		deleteJobCh: make(chan string)}
 	managerUnderTest.allTypes["type1"] = TypeData{
-		TypeId:     "type1",
-		jobHandler: &jobHandler,
+		TypeId:      "type1",
+		jobsHandler: &jobsHandler,
 	}
 
 	go managerUnderTest.DeleteJob("job2")
 
-	assertions.Equal("job2", <-jobHandler.deleteJobCh)
+	assertions.Equal("job2", <-jobsHandler.deleteJobCh)
 }
 
-func TestHandlerPollAndDistributeMessages(t *testing.T) {
+func TestAddJobToJobsManager_shouldStartPollAndDistributeMessages(t *testing.T) {
 	assertions := require.New(t)
 
-	wg := sync.WaitGroup{}
+	called := false
 	messages := `[{"message": {"data": "data"}}]`
 	pollClientMock := NewTestClient(func(req *http.Request) *http.Response {
 		if req.URL.String() == "http://mrAddr/topicUrl" {
 			assertions.Equal(req.Method, "GET")
-			wg.Done() // Signal that the poll call has been made
+			body := "[]"
+			if !called {
+				called = true
+				body = messages
+			}
 			return &http.Response{
 				StatusCode: 200,
-				Body:       ioutil.NopCloser(bytes.NewReader([]byte(messages))),
+				Body:       ioutil.NopCloser(bytes.NewReader([]byte(body))),
 				Header:     make(http.Header), // Must be set to non-nil value or it panics
 			}
 		}
@@ -168,12 +172,14 @@
 		t.Fail()
 		return nil
 	})
+
+	wg := sync.WaitGroup{}
 	distributeClientMock := NewTestClient(func(req *http.Request) *http.Response {
 		if req.URL.String() == "http://consumerHost/target" {
 			assertions.Equal(req.Method, "POST")
 			assertions.Equal(messages, getBodyAsString(req))
 			assertions.Equal("application/json", req.Header.Get("Content-Type"))
-			wg.Done() // Signal that the distribution call has been made
+			wg.Done()
 			return &http.Response{
 				StatusCode: 200,
 				Body:       ioutil.NopCloser(bytes.NewBufferString(`OK`)),
@@ -184,73 +190,72 @@
 		t.Fail()
 		return nil
 	})
+	jobsHandler := newJobsHandler("type1", "/topicUrl", pollClientMock, distributeClientMock)
+
+	jobsManager := NewJobsManagerImpl("", pollClientMock, "http://mrAddr", distributeClientMock)
+	jobsManager.allTypes["type1"] = TypeData{
+		DMaaPTopicURL: "/topicUrl",
+		TypeId:        "type1",
+		jobsHandler:   jobsHandler,
+	}
+
+	jobsManager.StartJobs()
 
 	jobInfo := JobInfo{
 		InfoTypeIdentity: "type1",
 		InfoJobIdentity:  "job1",
 		TargetUri:        "http://consumerHost/target",
 	}
-	handlerUnderTest := jobHandler{
-		topicUrl:         "/topicUrl",
-		jobs:             map[string]JobInfo{jobInfo.InfoJobIdentity: jobInfo},
-		pollClient:       pollClientMock,
-		distributeClient: distributeClientMock,
-	}
 
-	wg.Add(2) // Two calls should be made to the server, one to poll and one to distribute
-	handlerUnderTest.pollAndDistributeMessages("http://mrAddr")
+	wg.Add(1) // Wait till the distribution has happened
+	jobsManager.AddJob(jobInfo)
 
-	if waitTimeout(&wg, 100*time.Millisecond) {
+	if waitTimeout(&wg, 2*time.Second) {
 		t.Error("Not all calls to server were made")
 		t.Fail()
 	}
 }
 
-func TestHandlerAddJob_shouldAddJobToJobsMap(t *testing.T) {
-	assertions := require.New(t)
+func TestJobsHandlerDeleteJob_shouldDeleteJobFromJobsMap(t *testing.T) {
+	jobToDelete := newJob(JobInfo{}, nil)
+	go jobToDelete.start()
+	jobsHandler := newJobsHandler("type1", "/topicUrl", nil, nil)
+	jobsHandler.jobs["job1"] = jobToDelete
 
-	jobInfo := JobInfo{
-		InfoTypeIdentity: "type1",
-		InfoJobIdentity:  "job1",
-		TargetUri:        "http://consumerHost/target",
+	go jobsHandler.monitorManagementChannels()
+
+	jobsHandler.deleteJobCh <- "job1"
+
+	deleted := false
+	for i := 0; i < 100; i++ {
+		if len(jobsHandler.jobs) == 0 {
+			deleted = true
+			break
+		}
+		time.Sleep(time.Microsecond) // Need to drop control to let the job's goroutine do the job
 	}
-
-	addCh := make(chan JobInfo)
-	handlerUnderTest := jobHandler{
-		mu:       sync.Mutex{},
-		jobs:     map[string]JobInfo{},
-		addJobCh: addCh,
-	}
-
-	go func() {
-		addCh <- jobInfo
-	}()
-
-	handlerUnderTest.monitorManagementChannels()
-
-	assertions.Len(handlerUnderTest.jobs, 1)
-	assertions.Equal(jobInfo, handlerUnderTest.jobs["job1"])
+	require.New(t).True(deleted, "Job not deleted")
 }
 
-func TestHandlerDeleteJob_shouldDeleteJobFromJobsMap(t *testing.T) {
-	assertions := require.New(t)
+func TestJobsHandlerEmptyJobMessageBufferWhenItIsFull(t *testing.T) {
+	job := newJob(JobInfo{
+		InfoJobIdentity: "job",
+	}, nil)
 
-	deleteCh := make(chan string)
-	handlerUnderTest := jobHandler{
-		mu: sync.Mutex{},
-		jobs: map[string]JobInfo{"job1": {
-			InfoJobIdentity: "job1",
-		}},
-		deleteJobCh: deleteCh,
+	jobsHandler := newJobsHandler("type1", "/topicUrl", nil, nil)
+	jobsHandler.jobs["job1"] = job
+
+	fillMessagesBuffer(job.messagesChannel)
+
+	jobsHandler.distributeMessages([]byte("sent msg"))
+
+	require.New(t).Len(job.messagesChannel, 0)
+}
+
+func fillMessagesBuffer(mc chan []byte) {
+	for i := 0; i < cap(mc); i++ {
+		mc <- []byte("msg")
 	}
-
-	go func() {
-		deleteCh <- "job1"
-	}()
-
-	handlerUnderTest.monitorManagementChannels()
-
-	assertions.Len(handlerUnderTest.jobs, 0)
 }
 
 type RoundTripFunc func(req *http.Request) *http.Response