blob: eae559ac7e61c9d6b3e19d17f443f2ceacca3e44 [file] [log] [blame]
elinuxhenrike30dbe32022-02-28 16:10:48 +01001// -
2// ========================LICENSE_START=================================
3// O-RAN-SC
4// %%
5// Copyright (C) 2021: Nordix Foundation
6// %%
7// Licensed under the Apache License, Version 2.0 (the "License");
8// you may not use this file except in compliance with the License.
9// You may obtain a copy of the License at
10//
11// http://www.apache.org/licenses/LICENSE-2.0
12//
13// Unless required by applicable law or agreed to in writing, software
14// distributed under the License is distributed on an "AS IS" BASIS,
15// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16// See the License for the specific language governing permissions and
17// limitations under the License.
18// ========================LICENSE_END===================================
19//
20
21package jobs
22
23import (
24 "fmt"
25 "strings"
26 "sync"
27 "time"
28
29 "github.com/confluentinc/confluent-kafka-go/kafka"
30 log "github.com/sirupsen/logrus"
31 "oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
32 "oransc.org/nonrtric/dmaapmediatorproducer/internal/kafkaclient"
33 "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
34)
35
36type TypeData struct {
37 Identity string `json:"id"`
38 jobsHandler *jobsHandler
39}
40
41type sourceType string
42
43const dMaaPSource = sourceType("dmaap")
44const kafkaSource = sourceType("kafka")
45
46type JobInfo struct {
47 Owner string `json:"owner"`
48 LastUpdated string `json:"last_updated"`
49 InfoJobIdentity string `json:"info_job_identity"`
50 TargetUri string `json:"target_uri"`
51 InfoJobData Parameters `json:"info_job_data"`
52 InfoTypeIdentity string `json:"info_type_identity"`
53 sourceType sourceType
54} // @name JobInfo
55
56type JobTypesManager interface {
57 LoadTypesFromConfiguration(types []config.TypeDefinition) []config.TypeDefinition
58 GetSupportedTypes() []string
59}
60
elinuxhenrikeeb62e52022-03-09 09:41:59 +010061//go:generate mockery --name JobsManager
elinuxhenrike30dbe32022-02-28 16:10:48 +010062type JobsManager interface {
63 AddJobFromRESTCall(JobInfo) error
64 DeleteJobFromRESTCall(jobId string)
65}
66
67type JobsManagerImpl struct {
68 allTypes map[string]TypeData
69 pollClient restclient.HTTPClient
70 mrAddress string
71 kafkaFactory kafkaclient.KafkaFactory
72 distributeClient restclient.HTTPClient
73}
74
75func NewJobsManagerImpl(pollClient restclient.HTTPClient, mrAddr string, kafkaFactory kafkaclient.KafkaFactory, distributeClient restclient.HTTPClient) *JobsManagerImpl {
76 return &JobsManagerImpl{
77 allTypes: make(map[string]TypeData),
78 pollClient: pollClient,
79 mrAddress: mrAddr,
80 kafkaFactory: kafkaFactory,
81 distributeClient: distributeClient,
82 }
83}
84
85func (jm *JobsManagerImpl) AddJobFromRESTCall(ji JobInfo) error {
86 if err := jm.validateJobInfo(ji); err == nil {
87 typeData := jm.allTypes[ji.InfoTypeIdentity]
88 ji.sourceType = typeData.jobsHandler.sourceType
89 typeData.jobsHandler.addJobCh <- ji
90 log.Debug("Added job: ", ji)
91 return nil
92 } else {
93 return err
94 }
95}
96
97func (jm *JobsManagerImpl) DeleteJobFromRESTCall(jobId string) {
98 for _, typeData := range jm.allTypes {
99 log.Debugf("Deleting job %v from type %v", jobId, typeData.Identity)
100 typeData.jobsHandler.deleteJobCh <- jobId
101 }
102 log.Debug("Deleted job: ", jobId)
103}
104
105func (jm *JobsManagerImpl) validateJobInfo(ji JobInfo) error {
106 if _, ok := jm.allTypes[ji.InfoTypeIdentity]; !ok {
107 return fmt.Errorf("type not supported: %v", ji.InfoTypeIdentity)
108 }
109 if ji.InfoJobIdentity == "" {
110 return fmt.Errorf("missing required job identity: %v", ji)
111 }
112 // Temporary for when there are only REST callbacks needed
113 if ji.TargetUri == "" {
114 return fmt.Errorf("missing required target URI: %v", ji)
115 }
116 return nil
117}
118
119func (jm *JobsManagerImpl) LoadTypesFromConfiguration(types []config.TypeDefinition) []config.TypeDefinition {
120 for _, typeDef := range types {
121 if typeDef.DMaaPTopicURL == "" && typeDef.KafkaInputTopic == "" {
122 log.Fatal("DMaaPTopicURL or KafkaInputTopic must be defined for type: ", typeDef.Identity)
123 }
124 jm.allTypes[typeDef.Identity] = TypeData{
125 Identity: typeDef.Identity,
126 jobsHandler: newJobsHandler(typeDef, jm.mrAddress, jm.kafkaFactory, jm.pollClient, jm.distributeClient),
127 }
128 }
129 return types
130}
131
132func (jm *JobsManagerImpl) GetSupportedTypes() []string {
133 supportedTypes := []string{}
134 for k := range jm.allTypes {
135 supportedTypes = append(supportedTypes, k)
136 }
137 return supportedTypes
138}
139
140func (jm *JobsManagerImpl) StartJobsForAllTypes() {
141 for _, jobType := range jm.allTypes {
142
143 go jobType.jobsHandler.startPollingAndDistribution()
144
145 }
146}
147
148type jobsHandler struct {
149 mu sync.Mutex
150 typeId string
151 sourceType sourceType
152 pollingAgent pollingAgent
153 jobs map[string]job
154 addJobCh chan JobInfo
155 deleteJobCh chan string
156 distributeClient restclient.HTTPClient
157}
158
159func newJobsHandler(typeDef config.TypeDefinition, mRAddress string, kafkaFactory kafkaclient.KafkaFactory, pollClient restclient.HTTPClient, distributeClient restclient.HTTPClient) *jobsHandler {
160 pollingAgent := createPollingAgent(typeDef, mRAddress, pollClient, kafkaFactory, typeDef.KafkaInputTopic)
161 sourceType := kafkaSource
162 if typeDef.DMaaPTopicURL != "" {
163 sourceType = dMaaPSource
164 }
165 return &jobsHandler{
166 typeId: typeDef.Identity,
167 sourceType: sourceType,
168 pollingAgent: pollingAgent,
169 jobs: make(map[string]job),
170 addJobCh: make(chan JobInfo),
171 deleteJobCh: make(chan string),
172 distributeClient: distributeClient,
173 }
174}
175
176func (jh *jobsHandler) startPollingAndDistribution() {
177 go func() {
178 for {
179 jh.pollAndDistributeMessages()
180 }
181 }()
182
183 go func() {
184 for {
185 jh.monitorManagementChannels()
186 }
187 }()
188}
189
190func (jh *jobsHandler) pollAndDistributeMessages() {
191 log.Debugf("Processing jobs for type: %v", jh.typeId)
192 messagesBody, error := jh.pollingAgent.pollMessages()
193 if error != nil {
194 log.Warn("Error getting data from source. Cause: ", error)
195 time.Sleep(time.Minute) // Must wait before trying to call data source again
196 return
197 }
198 jh.distributeMessages(messagesBody)
199}
200
201func (jh *jobsHandler) distributeMessages(messages []byte) {
202 if string(messages) != "[]" && len(messages) > 0 { // MR returns an ampty array if there are no messages.
203 log.Debug("Distributing messages: ", string(messages))
204 jh.mu.Lock()
205 defer jh.mu.Unlock()
206 for _, job := range jh.jobs {
207 if len(job.messagesChannel) < cap(job.messagesChannel) {
208 job.messagesChannel <- messages
209 } else {
210 jh.emptyMessagesBuffer(job)
211 }
212 }
213 }
214}
215
216func (jh *jobsHandler) emptyMessagesBuffer(job job) {
217 log.Debug("Emptying message queue for job: ", job.jobInfo.InfoJobIdentity)
218out:
219 for {
220 select {
221 case <-job.messagesChannel:
222 default:
223 break out
224 }
225 }
226}
227
228func (jh *jobsHandler) monitorManagementChannels() {
229 select {
230 case addedJob := <-jh.addJobCh:
231 jh.addJob(addedJob)
232 case deletedJob := <-jh.deleteJobCh:
233 jh.deleteJob(deletedJob)
234 }
235}
236
237func (jh *jobsHandler) addJob(addedJob JobInfo) {
238 jh.mu.Lock()
239 log.Debug("Add job: ", addedJob)
240 newJob := newJob(addedJob, jh.distributeClient)
241 go newJob.start()
242 jh.jobs[addedJob.InfoJobIdentity] = newJob
243 jh.mu.Unlock()
244}
245
246func (jh *jobsHandler) deleteJob(deletedJob string) {
247 jh.mu.Lock()
248 log.Debug("Delete job: ", deletedJob)
249 j, exist := jh.jobs[deletedJob]
250 if exist {
251 j.controlChannel <- struct{}{}
252 delete(jh.jobs, deletedJob)
253 }
254 jh.mu.Unlock()
255}
256
257type pollingAgent interface {
258 pollMessages() ([]byte, error)
259}
260
261func createPollingAgent(typeDef config.TypeDefinition, mRAddress string, pollClient restclient.HTTPClient, kafkaFactory kafkaclient.KafkaFactory, topicID string) pollingAgent {
262 if typeDef.DMaaPTopicURL != "" {
263 return dMaaPPollingAgent{
264 messageRouterURL: mRAddress + typeDef.DMaaPTopicURL,
265 pollClient: pollClient,
266 }
267 } else {
268 return newKafkaPollingAgent(kafkaFactory, typeDef.KafkaInputTopic)
269 }
270}
271
272type dMaaPPollingAgent struct {
273 messageRouterURL string
274 pollClient restclient.HTTPClient
275}
276
277func (pa dMaaPPollingAgent) pollMessages() ([]byte, error) {
278 return restclient.Get(pa.messageRouterURL, pa.pollClient)
279}
280
281type kafkaPollingAgent struct {
282 kafkaClient kafkaclient.KafkaClient
283}
284
285func newKafkaPollingAgent(kafkaFactory kafkaclient.KafkaFactory, topicID string) kafkaPollingAgent {
elinuxhenrikeeb62e52022-03-09 09:41:59 +0100286 c, err := kafkaFactory.NewKafkaClient(topicID)
elinuxhenrike30dbe32022-02-28 16:10:48 +0100287 if err != nil {
288 log.Fatalf("Cannot create Kafka client for topic: %v, error details: %v\n", topicID, err)
289 }
290 return kafkaPollingAgent{
291 kafkaClient: c,
292 }
293}
294
295func (pa kafkaPollingAgent) pollMessages() ([]byte, error) {
296 msg, err := pa.kafkaClient.ReadMessage()
297 if err == nil {
298 return msg, nil
299 } else {
300 if isKafkaTimedOutError(err) {
301 return []byte(""), nil
302 }
303 return nil, err
304 }
305}
306
307func isKafkaTimedOutError(err error) bool {
308 kafkaErr, ok := err.(kafka.Error)
309 return ok && kafkaErr.Code() == kafka.ErrTimedOut
310}
311
312type job struct {
313 jobInfo JobInfo
314 client restclient.HTTPClient
315 messagesChannel chan []byte
316 controlChannel chan struct{}
317}
318
319func newJob(j JobInfo, c restclient.HTTPClient) job {
320
321 return job{
322 jobInfo: j,
323 client: c,
324 messagesChannel: make(chan []byte, 10),
325 controlChannel: make(chan struct{}),
326 }
327}
328
329type Parameters struct {
330 BufferTimeout BufferTimeout `json:"bufferTimeout"`
331} // @name Parameters
332
333type BufferTimeout struct {
334 MaxSize int `json:"maxSize"`
335 MaxTimeMiliseconds int64 `json:"maxTimeMiliseconds"`
336} // @name BufferTimeout
337
338func (j *job) start() {
339 if j.isJobBuffered() {
340 j.startReadingMessagesBuffered()
341 } else {
342 j.startReadingSingleMessages()
343 }
344}
345
346func (j *job) startReadingSingleMessages() {
347out:
348 for {
349 select {
350 case <-j.controlChannel:
351 log.Debug("Stop distribution for job: ", j.jobInfo.InfoJobIdentity)
352 break out
353 case msg := <-j.messagesChannel:
354 j.sendMessagesToConsumer(msg)
355 }
356 }
357}
358
359func (j *job) startReadingMessagesBuffered() {
360out:
361 for {
362 select {
363 case <-j.controlChannel:
364 log.Debug("Stop distribution for job: ", j.jobInfo.InfoJobIdentity)
365 break out
366 default:
367 msgs := j.read(j.jobInfo.InfoJobData.BufferTimeout)
368 if len(msgs) > 0 {
369 j.sendMessagesToConsumer(msgs)
370 }
371 }
372 }
373}
374
375func (j *job) read(bufferParams BufferTimeout) []byte {
376 wg := sync.WaitGroup{}
377 wg.Add(bufferParams.MaxSize)
378 rawMsgs := make([][]byte, 0, bufferParams.MaxSize)
379 c := make(chan struct{})
380 go func() {
381 i := 0
382 out:
383 for {
384 select {
385 case <-c:
386 break out
387 case msg := <-j.messagesChannel:
388 rawMsgs = append(rawMsgs, msg)
389 i++
390 wg.Done()
391 if i == bufferParams.MaxSize {
392 break out
393 }
394 }
395 }
396 }()
397 j.waitTimeout(&wg, time.Duration(bufferParams.MaxTimeMiliseconds)*time.Millisecond)
398 close(c)
399 return getAsJSONArray(rawMsgs)
400}
401
402func getAsJSONArray(rawMsgs [][]byte) []byte {
403 if len(rawMsgs) == 0 {
404 return []byte("")
405 }
406 strings := ""
407 for i := 0; i < len(rawMsgs); i++ {
408 strings = strings + makeIntoString(rawMsgs[i])
409 strings = addSeparatorIfNeeded(strings, i, len(rawMsgs))
410 }
411 return []byte(wrapInJSONArray(strings))
412}
413
414func makeIntoString(rawMsg []byte) string {
415 return `"` + strings.ReplaceAll(string(rawMsg), "\"", "\\\"") + `"`
416}
417
418func addSeparatorIfNeeded(strings string, position, length int) string {
419 if position < length-1 {
420 strings = strings + ","
421 }
422 return strings
423}
424
425func wrapInJSONArray(strings string) string {
426 return "[" + strings + "]"
427}
428
429func (j *job) waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
430 c := make(chan struct{})
431 go func() {
432 defer close(c)
433 wg.Wait()
434 }()
435 select {
436 case <-c:
437 return false // completed normally
438 case <-time.After(timeout):
439 return true // timed out
440 }
441}
442
443func (j *job) sendMessagesToConsumer(messages []byte) {
444 log.Debug("Processing job: ", j.jobInfo.InfoJobIdentity)
445 contentType := restclient.ContentTypeJSON
446 if j.isJobKafka() && !j.isJobBuffered() {
447 contentType = restclient.ContentTypePlain
448 }
449 if postErr := restclient.Post(j.jobInfo.TargetUri, messages, contentType, j.client); postErr != nil {
450 log.Warnf("Error posting data for job: %v. Cause: %v", j.jobInfo, postErr)
451 return
452 }
453 log.Debugf("Messages for job: %v distributed to consumer: %v", j.jobInfo.InfoJobIdentity, j.jobInfo.Owner)
454}
455
456func (j *job) isJobBuffered() bool {
457 return j.jobInfo.InfoJobData.BufferTimeout.MaxSize > 0 && j.jobInfo.InfoJobData.BufferTimeout.MaxTimeMiliseconds > 0
458}
459
460func (j *job) isJobKafka() bool {
461 return j.jobInfo.sourceType == kafkaSource
462}