blob: e1ef405e727c9c823d9864b23359aaafbad88886 [file] [log] [blame]
elinuxhenrike30dbe32022-02-28 16:10:48 +01001// -
2// ========================LICENSE_START=================================
3// O-RAN-SC
4// %%
5// Copyright (C) 2021: Nordix Foundation
6// %%
7// Licensed under the Apache License, Version 2.0 (the "License");
8// you may not use this file except in compliance with the License.
9// You may obtain a copy of the License at
10//
11// http://www.apache.org/licenses/LICENSE-2.0
12//
13// Unless required by applicable law or agreed to in writing, software
14// distributed under the License is distributed on an "AS IS" BASIS,
15// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16// See the License for the specific language governing permissions and
17// limitations under the License.
18// ========================LICENSE_END===================================
19//
20
21package jobs
22
23import (
24 "fmt"
25 "strings"
26 "sync"
27 "time"
28
29 "github.com/confluentinc/confluent-kafka-go/kafka"
30 log "github.com/sirupsen/logrus"
31 "oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
32 "oransc.org/nonrtric/dmaapmediatorproducer/internal/kafkaclient"
33 "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
34)
35
36type TypeData struct {
37 Identity string `json:"id"`
38 jobsHandler *jobsHandler
39}
40
41type sourceType string
42
43const dMaaPSource = sourceType("dmaap")
44const kafkaSource = sourceType("kafka")
45
46type JobInfo struct {
47 Owner string `json:"owner"`
48 LastUpdated string `json:"last_updated"`
49 InfoJobIdentity string `json:"info_job_identity"`
50 TargetUri string `json:"target_uri"`
51 InfoJobData Parameters `json:"info_job_data"`
52 InfoTypeIdentity string `json:"info_type_identity"`
53 sourceType sourceType
ychaconc7ae5b12023-02-10 13:59:59 +010054} // @name JobInfo
elinuxhenrike30dbe32022-02-28 16:10:48 +010055
56type JobTypesManager interface {
57 LoadTypesFromConfiguration(types []config.TypeDefinition) []config.TypeDefinition
58 GetSupportedTypes() []string
59}
60
elinuxhenrikeeb62e52022-03-09 09:41:59 +010061//go:generate mockery --name JobsManager
elinuxhenrike30dbe32022-02-28 16:10:48 +010062type JobsManager interface {
63 AddJobFromRESTCall(JobInfo) error
64 DeleteJobFromRESTCall(jobId string)
65}
66
67type JobsManagerImpl struct {
68 allTypes map[string]TypeData
69 pollClient restclient.HTTPClient
70 mrAddress string
71 kafkaFactory kafkaclient.KafkaFactory
72 distributeClient restclient.HTTPClient
73}
74
75func NewJobsManagerImpl(pollClient restclient.HTTPClient, mrAddr string, kafkaFactory kafkaclient.KafkaFactory, distributeClient restclient.HTTPClient) *JobsManagerImpl {
76 return &JobsManagerImpl{
77 allTypes: make(map[string]TypeData),
78 pollClient: pollClient,
79 mrAddress: mrAddr,
80 kafkaFactory: kafkaFactory,
81 distributeClient: distributeClient,
82 }
83}
84
85func (jm *JobsManagerImpl) AddJobFromRESTCall(ji JobInfo) error {
86 if err := jm.validateJobInfo(ji); err == nil {
87 typeData := jm.allTypes[ji.InfoTypeIdentity]
88 ji.sourceType = typeData.jobsHandler.sourceType
89 typeData.jobsHandler.addJobCh <- ji
90 log.Debug("Added job: ", ji)
91 return nil
92 } else {
93 return err
94 }
95}
96
97func (jm *JobsManagerImpl) DeleteJobFromRESTCall(jobId string) {
98 for _, typeData := range jm.allTypes {
99 log.Debugf("Deleting job %v from type %v", jobId, typeData.Identity)
100 typeData.jobsHandler.deleteJobCh <- jobId
101 }
102 log.Debug("Deleted job: ", jobId)
103}
104
105func (jm *JobsManagerImpl) validateJobInfo(ji JobInfo) error {
106 if _, ok := jm.allTypes[ji.InfoTypeIdentity]; !ok {
107 return fmt.Errorf("type not supported: %v", ji.InfoTypeIdentity)
108 }
109 if ji.InfoJobIdentity == "" {
110 return fmt.Errorf("missing required job identity: %v", ji)
111 }
112 // Temporary for when there are only REST callbacks needed
113 if ji.TargetUri == "" {
114 return fmt.Errorf("missing required target URI: %v", ji)
115 }
116 return nil
117}
118
119func (jm *JobsManagerImpl) LoadTypesFromConfiguration(types []config.TypeDefinition) []config.TypeDefinition {
120 for _, typeDef := range types {
121 if typeDef.DMaaPTopicURL == "" && typeDef.KafkaInputTopic == "" {
122 log.Fatal("DMaaPTopicURL or KafkaInputTopic must be defined for type: ", typeDef.Identity)
123 }
124 jm.allTypes[typeDef.Identity] = TypeData{
125 Identity: typeDef.Identity,
126 jobsHandler: newJobsHandler(typeDef, jm.mrAddress, jm.kafkaFactory, jm.pollClient, jm.distributeClient),
127 }
128 }
129 return types
130}
131
132func (jm *JobsManagerImpl) GetSupportedTypes() []string {
133 supportedTypes := []string{}
134 for k := range jm.allTypes {
135 supportedTypes = append(supportedTypes, k)
136 }
137 return supportedTypes
138}
139
140func (jm *JobsManagerImpl) StartJobsForAllTypes() {
141 for _, jobType := range jm.allTypes {
142
143 go jobType.jobsHandler.startPollingAndDistribution()
144
145 }
146}
147
148type jobsHandler struct {
149 mu sync.Mutex
150 typeId string
151 sourceType sourceType
152 pollingAgent pollingAgent
153 jobs map[string]job
154 addJobCh chan JobInfo
155 deleteJobCh chan string
156 distributeClient restclient.HTTPClient
157}
158
159func newJobsHandler(typeDef config.TypeDefinition, mRAddress string, kafkaFactory kafkaclient.KafkaFactory, pollClient restclient.HTTPClient, distributeClient restclient.HTTPClient) *jobsHandler {
160 pollingAgent := createPollingAgent(typeDef, mRAddress, pollClient, kafkaFactory, typeDef.KafkaInputTopic)
161 sourceType := kafkaSource
162 if typeDef.DMaaPTopicURL != "" {
163 sourceType = dMaaPSource
164 }
165 return &jobsHandler{
166 typeId: typeDef.Identity,
167 sourceType: sourceType,
168 pollingAgent: pollingAgent,
169 jobs: make(map[string]job),
170 addJobCh: make(chan JobInfo),
171 deleteJobCh: make(chan string),
172 distributeClient: distributeClient,
173 }
174}
175
176func (jh *jobsHandler) startPollingAndDistribution() {
177 go func() {
178 for {
179 jh.pollAndDistributeMessages()
180 }
181 }()
182
183 go func() {
184 for {
185 jh.monitorManagementChannels()
186 }
187 }()
188}
189
190func (jh *jobsHandler) pollAndDistributeMessages() {
191 log.Debugf("Processing jobs for type: %v", jh.typeId)
192 messagesBody, error := jh.pollingAgent.pollMessages()
193 if error != nil {
194 log.Warn("Error getting data from source. Cause: ", error)
195 time.Sleep(time.Minute) // Must wait before trying to call data source again
196 return
197 }
198 jh.distributeMessages(messagesBody)
199}
200
201func (jh *jobsHandler) distributeMessages(messages []byte) {
ychaconc7ae5b12023-02-10 13:59:59 +0100202 if string(messages) != "[]" && len(messages) > 0 { // MR returns an empty array if there are no messages.
elinuxhenrike30dbe32022-02-28 16:10:48 +0100203 log.Debug("Distributing messages: ", string(messages))
204 jh.mu.Lock()
205 defer jh.mu.Unlock()
206 for _, job := range jh.jobs {
207 if len(job.messagesChannel) < cap(job.messagesChannel) {
208 job.messagesChannel <- messages
209 } else {
210 jh.emptyMessagesBuffer(job)
211 }
212 }
213 }
214}
215
216func (jh *jobsHandler) emptyMessagesBuffer(job job) {
217 log.Debug("Emptying message queue for job: ", job.jobInfo.InfoJobIdentity)
218out:
219 for {
220 select {
221 case <-job.messagesChannel:
222 default:
223 break out
224 }
225 }
226}
227
228func (jh *jobsHandler) monitorManagementChannels() {
229 select {
230 case addedJob := <-jh.addJobCh:
231 jh.addJob(addedJob)
232 case deletedJob := <-jh.deleteJobCh:
233 jh.deleteJob(deletedJob)
234 }
235}
236
237func (jh *jobsHandler) addJob(addedJob JobInfo) {
238 jh.mu.Lock()
239 log.Debug("Add job: ", addedJob)
240 newJob := newJob(addedJob, jh.distributeClient)
241 go newJob.start()
242 jh.jobs[addedJob.InfoJobIdentity] = newJob
243 jh.mu.Unlock()
244}
245
246func (jh *jobsHandler) deleteJob(deletedJob string) {
247 jh.mu.Lock()
248 log.Debug("Delete job: ", deletedJob)
249 j, exist := jh.jobs[deletedJob]
250 if exist {
251 j.controlChannel <- struct{}{}
252 delete(jh.jobs, deletedJob)
253 }
254 jh.mu.Unlock()
255}
256
257type pollingAgent interface {
258 pollMessages() ([]byte, error)
259}
260
261func createPollingAgent(typeDef config.TypeDefinition, mRAddress string, pollClient restclient.HTTPClient, kafkaFactory kafkaclient.KafkaFactory, topicID string) pollingAgent {
262 if typeDef.DMaaPTopicURL != "" {
263 return dMaaPPollingAgent{
264 messageRouterURL: mRAddress + typeDef.DMaaPTopicURL,
265 pollClient: pollClient,
266 }
267 } else {
268 return newKafkaPollingAgent(kafkaFactory, typeDef.KafkaInputTopic)
269 }
270}
271
272type dMaaPPollingAgent struct {
273 messageRouterURL string
274 pollClient restclient.HTTPClient
275}
276
277func (pa dMaaPPollingAgent) pollMessages() ([]byte, error) {
278 return restclient.Get(pa.messageRouterURL, pa.pollClient)
279}
280
281type kafkaPollingAgent struct {
282 kafkaClient kafkaclient.KafkaClient
283}
284
285func newKafkaPollingAgent(kafkaFactory kafkaclient.KafkaFactory, topicID string) kafkaPollingAgent {
elinuxhenrikeeb62e52022-03-09 09:41:59 +0100286 c, err := kafkaFactory.NewKafkaClient(topicID)
elinuxhenrike30dbe32022-02-28 16:10:48 +0100287 if err != nil {
288 log.Fatalf("Cannot create Kafka client for topic: %v, error details: %v\n", topicID, err)
289 }
290 return kafkaPollingAgent{
291 kafkaClient: c,
292 }
293}
294
295func (pa kafkaPollingAgent) pollMessages() ([]byte, error) {
296 msg, err := pa.kafkaClient.ReadMessage()
297 if err == nil {
298 return msg, nil
299 } else {
300 if isKafkaTimedOutError(err) {
301 return []byte(""), nil
302 }
303 return nil, err
304 }
305}
306
307func isKafkaTimedOutError(err error) bool {
308 kafkaErr, ok := err.(kafka.Error)
309 return ok && kafkaErr.Code() == kafka.ErrTimedOut
310}
311
312type job struct {
313 jobInfo JobInfo
314 client restclient.HTTPClient
315 messagesChannel chan []byte
316 controlChannel chan struct{}
317}
318
319func newJob(j JobInfo, c restclient.HTTPClient) job {
320
321 return job{
322 jobInfo: j,
323 client: c,
324 messagesChannel: make(chan []byte, 10),
325 controlChannel: make(chan struct{}),
326 }
327}
328
329type Parameters struct {
330 BufferTimeout BufferTimeout `json:"bufferTimeout"`
ychaconc7ae5b12023-02-10 13:59:59 +0100331} // @name Parameters
elinuxhenrike30dbe32022-02-28 16:10:48 +0100332
ychaconc7ae5b12023-02-10 13:59:59 +0100333// Parameters for buffering messages.
elinuxhenrike30dbe32022-02-28 16:10:48 +0100334type BufferTimeout struct {
ychaconc7ae5b12023-02-10 13:59:59 +0100335 // The maximum number of messages to buffer before sending to consumer.
336 MaxSize int `json:"maxSize"`
337 // The maximum time to wait before sending to consumer if the MaxSize has not been reached.
338 MaxTimeMilliseconds int64 `json:"maxTimeMilliseconds"`
339} // @name BufferTimeout
elinuxhenrike30dbe32022-02-28 16:10:48 +0100340
341func (j *job) start() {
342 if j.isJobBuffered() {
343 j.startReadingMessagesBuffered()
344 } else {
345 j.startReadingSingleMessages()
346 }
347}
348
349func (j *job) startReadingSingleMessages() {
350out:
351 for {
352 select {
353 case <-j.controlChannel:
354 log.Debug("Stop distribution for job: ", j.jobInfo.InfoJobIdentity)
355 break out
356 case msg := <-j.messagesChannel:
357 j.sendMessagesToConsumer(msg)
358 }
359 }
360}
361
362func (j *job) startReadingMessagesBuffered() {
363out:
364 for {
365 select {
366 case <-j.controlChannel:
367 log.Debug("Stop distribution for job: ", j.jobInfo.InfoJobIdentity)
368 break out
369 default:
370 msgs := j.read(j.jobInfo.InfoJobData.BufferTimeout)
371 if len(msgs) > 0 {
372 j.sendMessagesToConsumer(msgs)
373 }
374 }
375 }
376}
377
378func (j *job) read(bufferParams BufferTimeout) []byte {
379 wg := sync.WaitGroup{}
380 wg.Add(bufferParams.MaxSize)
381 rawMsgs := make([][]byte, 0, bufferParams.MaxSize)
382 c := make(chan struct{})
383 go func() {
384 i := 0
385 out:
386 for {
387 select {
388 case <-c:
389 break out
390 case msg := <-j.messagesChannel:
391 rawMsgs = append(rawMsgs, msg)
392 i++
393 wg.Done()
394 if i == bufferParams.MaxSize {
395 break out
396 }
397 }
398 }
399 }()
ychaconc7ae5b12023-02-10 13:59:59 +0100400 j.waitTimeout(&wg, time.Duration(bufferParams.MaxTimeMilliseconds)*time.Millisecond)
elinuxhenrike30dbe32022-02-28 16:10:48 +0100401 close(c)
402 return getAsJSONArray(rawMsgs)
403}
404
405func getAsJSONArray(rawMsgs [][]byte) []byte {
406 if len(rawMsgs) == 0 {
407 return []byte("")
408 }
409 strings := ""
410 for i := 0; i < len(rawMsgs); i++ {
411 strings = strings + makeIntoString(rawMsgs[i])
412 strings = addSeparatorIfNeeded(strings, i, len(rawMsgs))
413 }
414 return []byte(wrapInJSONArray(strings))
415}
416
417func makeIntoString(rawMsg []byte) string {
418 return `"` + strings.ReplaceAll(string(rawMsg), "\"", "\\\"") + `"`
419}
420
421func addSeparatorIfNeeded(strings string, position, length int) string {
422 if position < length-1 {
423 strings = strings + ","
424 }
425 return strings
426}
427
428func wrapInJSONArray(strings string) string {
429 return "[" + strings + "]"
430}
431
432func (j *job) waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
433 c := make(chan struct{})
434 go func() {
435 defer close(c)
436 wg.Wait()
437 }()
438 select {
439 case <-c:
440 return false // completed normally
441 case <-time.After(timeout):
442 return true // timed out
443 }
444}
445
446func (j *job) sendMessagesToConsumer(messages []byte) {
447 log.Debug("Processing job: ", j.jobInfo.InfoJobIdentity)
448 contentType := restclient.ContentTypeJSON
449 if j.isJobKafka() && !j.isJobBuffered() {
450 contentType = restclient.ContentTypePlain
451 }
452 if postErr := restclient.Post(j.jobInfo.TargetUri, messages, contentType, j.client); postErr != nil {
453 log.Warnf("Error posting data for job: %v. Cause: %v", j.jobInfo, postErr)
454 return
455 }
456 log.Debugf("Messages for job: %v distributed to consumer: %v", j.jobInfo.InfoJobIdentity, j.jobInfo.Owner)
457}
458
459func (j *job) isJobBuffered() bool {
ychaconc7ae5b12023-02-10 13:59:59 +0100460 return j.jobInfo.InfoJobData.BufferTimeout.MaxSize > 0 && j.jobInfo.InfoJobData.BufferTimeout.MaxTimeMilliseconds > 0
elinuxhenrike30dbe32022-02-28 16:10:48 +0100461}
462
463func (j *job) isJobKafka() bool {
464 return j.jobInfo.sourceType == kafkaSource
465}