| // - |
| // ========================LICENSE_START================================= |
| // O-RAN-SC |
| // %% |
| // Copyright (C) 2021: Nordix Foundation |
| // %% |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| // ========================LICENSE_END=================================== |
| // |
| |
| package jobs |
| |
| import ( |
| "fmt" |
| "strings" |
| "sync" |
| "time" |
| |
| "github.com/confluentinc/confluent-kafka-go/kafka" |
| log "github.com/sirupsen/logrus" |
| "oransc.org/nonrtric/dmaapmediatorproducer/internal/config" |
| "oransc.org/nonrtric/dmaapmediatorproducer/internal/kafkaclient" |
| "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient" |
| ) |
| |
| type TypeData struct { |
| Identity string `json:"id"` |
| jobsHandler *jobsHandler |
| } |
| |
| type sourceType string |
| |
| const dMaaPSource = sourceType("dmaap") |
| const kafkaSource = sourceType("kafka") |
| |
| type JobInfo struct { |
| Owner string `json:"owner"` |
| LastUpdated string `json:"last_updated"` |
| InfoJobIdentity string `json:"info_job_identity"` |
| TargetUri string `json:"target_uri"` |
| InfoJobData Parameters `json:"info_job_data"` |
| InfoTypeIdentity string `json:"info_type_identity"` |
| sourceType sourceType |
| } // @name JobInfo |
| |
| type JobTypesManager interface { |
| LoadTypesFromConfiguration(types []config.TypeDefinition) []config.TypeDefinition |
| GetSupportedTypes() []string |
| } |
| |
| //go:generate mockery --name JobsManager |
| type JobsManager interface { |
| AddJobFromRESTCall(JobInfo) error |
| DeleteJobFromRESTCall(jobId string) |
| } |
| |
| type JobsManagerImpl struct { |
| allTypes map[string]TypeData |
| pollClient restclient.HTTPClient |
| mrAddress string |
| kafkaFactory kafkaclient.KafkaFactory |
| distributeClient restclient.HTTPClient |
| } |
| |
| func NewJobsManagerImpl(pollClient restclient.HTTPClient, mrAddr string, kafkaFactory kafkaclient.KafkaFactory, distributeClient restclient.HTTPClient) *JobsManagerImpl { |
| return &JobsManagerImpl{ |
| allTypes: make(map[string]TypeData), |
| pollClient: pollClient, |
| mrAddress: mrAddr, |
| kafkaFactory: kafkaFactory, |
| distributeClient: distributeClient, |
| } |
| } |
| |
| func (jm *JobsManagerImpl) AddJobFromRESTCall(ji JobInfo) error { |
| if err := jm.validateJobInfo(ji); err == nil { |
| typeData := jm.allTypes[ji.InfoTypeIdentity] |
| ji.sourceType = typeData.jobsHandler.sourceType |
| typeData.jobsHandler.addJobCh <- ji |
| log.Debug("Added job: ", ji) |
| return nil |
| } else { |
| return err |
| } |
| } |
| |
| func (jm *JobsManagerImpl) DeleteJobFromRESTCall(jobId string) { |
| for _, typeData := range jm.allTypes { |
| log.Debugf("Deleting job %v from type %v", jobId, typeData.Identity) |
| typeData.jobsHandler.deleteJobCh <- jobId |
| } |
| log.Debug("Deleted job: ", jobId) |
| } |
| |
| func (jm *JobsManagerImpl) validateJobInfo(ji JobInfo) error { |
| if _, ok := jm.allTypes[ji.InfoTypeIdentity]; !ok { |
| return fmt.Errorf("type not supported: %v", ji.InfoTypeIdentity) |
| } |
| if ji.InfoJobIdentity == "" { |
| return fmt.Errorf("missing required job identity: %v", ji) |
| } |
| // Temporary for when there are only REST callbacks needed |
| if ji.TargetUri == "" { |
| return fmt.Errorf("missing required target URI: %v", ji) |
| } |
| return nil |
| } |
| |
| func (jm *JobsManagerImpl) LoadTypesFromConfiguration(types []config.TypeDefinition) []config.TypeDefinition { |
| for _, typeDef := range types { |
| if typeDef.DMaaPTopicURL == "" && typeDef.KafkaInputTopic == "" { |
| log.Fatal("DMaaPTopicURL or KafkaInputTopic must be defined for type: ", typeDef.Identity) |
| } |
| jm.allTypes[typeDef.Identity] = TypeData{ |
| Identity: typeDef.Identity, |
| jobsHandler: newJobsHandler(typeDef, jm.mrAddress, jm.kafkaFactory, jm.pollClient, jm.distributeClient), |
| } |
| } |
| return types |
| } |
| |
| func (jm *JobsManagerImpl) GetSupportedTypes() []string { |
| supportedTypes := []string{} |
| for k := range jm.allTypes { |
| supportedTypes = append(supportedTypes, k) |
| } |
| return supportedTypes |
| } |
| |
| func (jm *JobsManagerImpl) StartJobsForAllTypes() { |
| for _, jobType := range jm.allTypes { |
| |
| go jobType.jobsHandler.startPollingAndDistribution() |
| |
| } |
| } |
| |
| type jobsHandler struct { |
| mu sync.Mutex |
| typeId string |
| sourceType sourceType |
| pollingAgent pollingAgent |
| jobs map[string]job |
| addJobCh chan JobInfo |
| deleteJobCh chan string |
| distributeClient restclient.HTTPClient |
| } |
| |
| func newJobsHandler(typeDef config.TypeDefinition, mRAddress string, kafkaFactory kafkaclient.KafkaFactory, pollClient restclient.HTTPClient, distributeClient restclient.HTTPClient) *jobsHandler { |
| pollingAgent := createPollingAgent(typeDef, mRAddress, pollClient, kafkaFactory, typeDef.KafkaInputTopic) |
| sourceType := kafkaSource |
| if typeDef.DMaaPTopicURL != "" { |
| sourceType = dMaaPSource |
| } |
| return &jobsHandler{ |
| typeId: typeDef.Identity, |
| sourceType: sourceType, |
| pollingAgent: pollingAgent, |
| jobs: make(map[string]job), |
| addJobCh: make(chan JobInfo), |
| deleteJobCh: make(chan string), |
| distributeClient: distributeClient, |
| } |
| } |
| |
| func (jh *jobsHandler) startPollingAndDistribution() { |
| go func() { |
| for { |
| jh.pollAndDistributeMessages() |
| } |
| }() |
| |
| go func() { |
| for { |
| jh.monitorManagementChannels() |
| } |
| }() |
| } |
| |
| func (jh *jobsHandler) pollAndDistributeMessages() { |
| log.Debugf("Processing jobs for type: %v", jh.typeId) |
| messagesBody, error := jh.pollingAgent.pollMessages() |
| if error != nil { |
| log.Warn("Error getting data from source. Cause: ", error) |
| time.Sleep(time.Minute) // Must wait before trying to call data source again |
| return |
| } |
| jh.distributeMessages(messagesBody) |
| } |
| |
| func (jh *jobsHandler) distributeMessages(messages []byte) { |
| if string(messages) != "[]" && len(messages) > 0 { // MR returns an empty array if there are no messages. |
| log.Debug("Distributing messages: ", string(messages)) |
| jh.mu.Lock() |
| defer jh.mu.Unlock() |
| for _, job := range jh.jobs { |
| if len(job.messagesChannel) < cap(job.messagesChannel) { |
| job.messagesChannel <- messages |
| } else { |
| jh.emptyMessagesBuffer(job) |
| } |
| } |
| } |
| } |
| |
| func (jh *jobsHandler) emptyMessagesBuffer(job job) { |
| log.Debug("Emptying message queue for job: ", job.jobInfo.InfoJobIdentity) |
| out: |
| for { |
| select { |
| case <-job.messagesChannel: |
| default: |
| break out |
| } |
| } |
| } |
| |
| func (jh *jobsHandler) monitorManagementChannels() { |
| select { |
| case addedJob := <-jh.addJobCh: |
| jh.addJob(addedJob) |
| case deletedJob := <-jh.deleteJobCh: |
| jh.deleteJob(deletedJob) |
| } |
| } |
| |
| func (jh *jobsHandler) addJob(addedJob JobInfo) { |
| jh.mu.Lock() |
| log.Debug("Add job: ", addedJob) |
| newJob := newJob(addedJob, jh.distributeClient) |
| go newJob.start() |
| jh.jobs[addedJob.InfoJobIdentity] = newJob |
| jh.mu.Unlock() |
| } |
| |
| func (jh *jobsHandler) deleteJob(deletedJob string) { |
| jh.mu.Lock() |
| log.Debug("Delete job: ", deletedJob) |
| j, exist := jh.jobs[deletedJob] |
| if exist { |
| j.controlChannel <- struct{}{} |
| delete(jh.jobs, deletedJob) |
| } |
| jh.mu.Unlock() |
| } |
| |
| type pollingAgent interface { |
| pollMessages() ([]byte, error) |
| } |
| |
| func createPollingAgent(typeDef config.TypeDefinition, mRAddress string, pollClient restclient.HTTPClient, kafkaFactory kafkaclient.KafkaFactory, topicID string) pollingAgent { |
| if typeDef.DMaaPTopicURL != "" { |
| return dMaaPPollingAgent{ |
| messageRouterURL: mRAddress + typeDef.DMaaPTopicURL, |
| pollClient: pollClient, |
| } |
| } else { |
| return newKafkaPollingAgent(kafkaFactory, typeDef.KafkaInputTopic) |
| } |
| } |
| |
| type dMaaPPollingAgent struct { |
| messageRouterURL string |
| pollClient restclient.HTTPClient |
| } |
| |
| func (pa dMaaPPollingAgent) pollMessages() ([]byte, error) { |
| return restclient.Get(pa.messageRouterURL, pa.pollClient) |
| } |
| |
| type kafkaPollingAgent struct { |
| kafkaClient kafkaclient.KafkaClient |
| } |
| |
| func newKafkaPollingAgent(kafkaFactory kafkaclient.KafkaFactory, topicID string) kafkaPollingAgent { |
| c, err := kafkaFactory.NewKafkaClient(topicID) |
| if err != nil { |
| log.Fatalf("Cannot create Kafka client for topic: %v, error details: %v\n", topicID, err) |
| } |
| return kafkaPollingAgent{ |
| kafkaClient: c, |
| } |
| } |
| |
| func (pa kafkaPollingAgent) pollMessages() ([]byte, error) { |
| msg, err := pa.kafkaClient.ReadMessage() |
| if err == nil { |
| return msg, nil |
| } else { |
| if isKafkaTimedOutError(err) { |
| return []byte(""), nil |
| } |
| return nil, err |
| } |
| } |
| |
| func isKafkaTimedOutError(err error) bool { |
| kafkaErr, ok := err.(kafka.Error) |
| return ok && kafkaErr.Code() == kafka.ErrTimedOut |
| } |
| |
| type job struct { |
| jobInfo JobInfo |
| client restclient.HTTPClient |
| messagesChannel chan []byte |
| controlChannel chan struct{} |
| } |
| |
| func newJob(j JobInfo, c restclient.HTTPClient) job { |
| |
| return job{ |
| jobInfo: j, |
| client: c, |
| messagesChannel: make(chan []byte, 10), |
| controlChannel: make(chan struct{}), |
| } |
| } |
| |
| type Parameters struct { |
| BufferTimeout BufferTimeout `json:"bufferTimeout"` |
| } // @name Parameters |
| |
| // Parameters for buffering messages. |
| type BufferTimeout struct { |
| // The maximum number of messages to buffer before sending to consumer. |
| MaxSize int `json:"maxSize"` |
| // The maximum time to wait before sending to consumer if the MaxSize has not been reached. |
| MaxTimeMilliseconds int64 `json:"maxTimeMilliseconds"` |
| } // @name BufferTimeout |
| |
| func (j *job) start() { |
| if j.isJobBuffered() { |
| j.startReadingMessagesBuffered() |
| } else { |
| j.startReadingSingleMessages() |
| } |
| } |
| |
| func (j *job) startReadingSingleMessages() { |
| out: |
| for { |
| select { |
| case <-j.controlChannel: |
| log.Debug("Stop distribution for job: ", j.jobInfo.InfoJobIdentity) |
| break out |
| case msg := <-j.messagesChannel: |
| j.sendMessagesToConsumer(msg) |
| } |
| } |
| } |
| |
| func (j *job) startReadingMessagesBuffered() { |
| out: |
| for { |
| select { |
| case <-j.controlChannel: |
| log.Debug("Stop distribution for job: ", j.jobInfo.InfoJobIdentity) |
| break out |
| default: |
| msgs := j.read(j.jobInfo.InfoJobData.BufferTimeout) |
| if len(msgs) > 0 { |
| j.sendMessagesToConsumer(msgs) |
| } |
| } |
| } |
| } |
| |
| func (j *job) read(bufferParams BufferTimeout) []byte { |
| wg := sync.WaitGroup{} |
| wg.Add(bufferParams.MaxSize) |
| rawMsgs := make([][]byte, 0, bufferParams.MaxSize) |
| c := make(chan struct{}) |
| go func() { |
| i := 0 |
| out: |
| for { |
| select { |
| case <-c: |
| break out |
| case msg := <-j.messagesChannel: |
| rawMsgs = append(rawMsgs, msg) |
| i++ |
| wg.Done() |
| if i == bufferParams.MaxSize { |
| break out |
| } |
| } |
| } |
| }() |
| j.waitTimeout(&wg, time.Duration(bufferParams.MaxTimeMilliseconds)*time.Millisecond) |
| close(c) |
| return getAsJSONArray(rawMsgs) |
| } |
| |
| func getAsJSONArray(rawMsgs [][]byte) []byte { |
| if len(rawMsgs) == 0 { |
| return []byte("") |
| } |
| strings := "" |
| for i := 0; i < len(rawMsgs); i++ { |
| strings = strings + makeIntoString(rawMsgs[i]) |
| strings = addSeparatorIfNeeded(strings, i, len(rawMsgs)) |
| } |
| return []byte(wrapInJSONArray(strings)) |
| } |
| |
| func makeIntoString(rawMsg []byte) string { |
| return `"` + strings.ReplaceAll(string(rawMsg), "\"", "\\\"") + `"` |
| } |
| |
| func addSeparatorIfNeeded(strings string, position, length int) string { |
| if position < length-1 { |
| strings = strings + "," |
| } |
| return strings |
| } |
| |
| func wrapInJSONArray(strings string) string { |
| return "[" + strings + "]" |
| } |
| |
| func (j *job) waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool { |
| c := make(chan struct{}) |
| go func() { |
| defer close(c) |
| wg.Wait() |
| }() |
| select { |
| case <-c: |
| return false // completed normally |
| case <-time.After(timeout): |
| return true // timed out |
| } |
| } |
| |
| func (j *job) sendMessagesToConsumer(messages []byte) { |
| log.Debug("Processing job: ", j.jobInfo.InfoJobIdentity) |
| contentType := restclient.ContentTypeJSON |
| if j.isJobKafka() && !j.isJobBuffered() { |
| contentType = restclient.ContentTypePlain |
| } |
| if postErr := restclient.Post(j.jobInfo.TargetUri, messages, contentType, j.client); postErr != nil { |
| log.Warnf("Error posting data for job: %v. Cause: %v", j.jobInfo, postErr) |
| return |
| } |
| log.Debugf("Messages for job: %v distributed to consumer: %v", j.jobInfo.InfoJobIdentity, j.jobInfo.Owner) |
| } |
| |
| func (j *job) isJobBuffered() bool { |
| return j.jobInfo.InfoJobData.BufferTimeout.MaxSize > 0 && j.jobInfo.InfoJobData.BufferTimeout.MaxTimeMilliseconds > 0 |
| } |
| |
| func (j *job) isJobKafka() bool { |
| return j.jobInfo.sourceType == kafkaSource |
| } |