blob: e5a1070b9df923ceecfb6f36c7c0f59564c64c3b [file] [log] [blame]
elinuxhenrikcce95ff2021-09-05 17:27:02 +02001// -
2// ========================LICENSE_START=================================
3// O-RAN-SC
4// %%
5// Copyright (C) 2021: Nordix Foundation
6// %%
7// Licensed under the Apache License, Version 2.0 (the "License");
8// you may not use this file except in compliance with the License.
9// You may obtain a copy of the License at
10//
11// http://www.apache.org/licenses/LICENSE-2.0
12//
13// Unless required by applicable law or agreed to in writing, software
14// distributed under the License is distributed on an "AS IS" BASIS,
15// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16// See the License for the specific language governing permissions and
17// limitations under the License.
18// ========================LICENSE_END===================================
19//
20
elinuxhenrik63a42ca2021-09-06 22:16:24 +020021package jobs
elinuxhenrikcce95ff2021-09-05 17:27:02 +020022
23import (
elinuxhenrik28038562021-09-21 15:43:11 +020024 "encoding/json"
elinuxhenrikb1fb1d82021-09-07 02:58:52 +020025 "fmt"
elinuxhenrikcce95ff2021-09-05 17:27:02 +020026 "os"
elinuxhenrik28038562021-09-21 15:43:11 +020027 "sync"
28
29 log "github.com/sirupsen/logrus"
30 "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
elinuxhenrikcce95ff2021-09-05 17:27:02 +020031)
32
elinuxhenrik382870d2021-09-23 11:09:09 +020033type TypeDefinitions struct {
34 Types []TypeDefinition `json:"types"`
35}
36type TypeDefinition struct {
37 Id string `json:"id"`
38 DmaapTopicURL string `json:"dmaapTopicUrl"`
39}
40
41type TypeData struct {
42 TypeId string `json:"id"`
43 DMaaPTopicURL string `json:"dmaapTopicUrl"`
44 Jobs map[string]JobInfo
elinuxhenrikcce95ff2021-09-05 17:27:02 +020045}
46
elinuxhenrik63a42ca2021-09-06 22:16:24 +020047type JobInfo struct {
elinuxhenrik382870d2021-09-23 11:09:09 +020048 Owner string `json:"owner"`
49 LastUpdated string `json:"last_updated"`
50 InfoJobIdentity string `json:"info_job_identity"`
51 TargetUri string `json:"target_uri"`
52 InfoJobData interface{} `json:"info_job_data"`
53 InfoTypeIdentity string `json:"info_type_identity"`
elinuxhenrik63a42ca2021-09-06 22:16:24 +020054}
55
56type JobHandler interface {
57 AddJob(JobInfo) error
elinuxhenrikfe61c612021-09-24 15:08:47 +020058 DeleteJob(jobId string)
elinuxhenrik63a42ca2021-09-06 22:16:24 +020059}
60
61var (
elinuxhenrik382870d2021-09-23 11:09:09 +020062 mu sync.Mutex
63 configFile = "configs/type_config.json"
64 Handler JobHandler
65 allTypes = make(map[string]TypeData)
elinuxhenrik63a42ca2021-09-06 22:16:24 +020066)
67
68func init() {
69 Handler = newJobHandlerImpl()
70}
71
72type jobHandlerImpl struct{}
73
74func newJobHandlerImpl() *jobHandlerImpl {
75 return &jobHandlerImpl{}
76}
77
78func (jh *jobHandlerImpl) AddJob(ji JobInfo) error {
elinuxhenrik28038562021-09-21 15:43:11 +020079 mu.Lock()
80 defer mu.Unlock()
elinuxhenrikb1fb1d82021-09-07 02:58:52 +020081 if err := validateJobInfo(ji); err == nil {
elinuxhenrik382870d2021-09-23 11:09:09 +020082 jobs := allTypes[ji.InfoTypeIdentity].Jobs
elinuxhenrikb1fb1d82021-09-07 02:58:52 +020083 jobs[ji.InfoJobIdentity] = ji
elinuxhenrik382870d2021-09-23 11:09:09 +020084 log.Debug("Added job: ", ji)
elinuxhenrikb1fb1d82021-09-07 02:58:52 +020085 return nil
86 } else {
87 return err
88 }
89}
90
elinuxhenrikfe61c612021-09-24 15:08:47 +020091func (jh *jobHandlerImpl) DeleteJob(jobId string) {
92 mu.Lock()
93 defer mu.Unlock()
94 for _, typeData := range allTypes {
95 delete(typeData.Jobs, jobId)
96 }
97 log.Debug("Deleted job: ", jobId)
98}
99
elinuxhenrikb1fb1d82021-09-07 02:58:52 +0200100func validateJobInfo(ji JobInfo) error {
elinuxhenrik382870d2021-09-23 11:09:09 +0200101 if _, ok := allTypes[ji.InfoTypeIdentity]; !ok {
elinuxhenrikb1fb1d82021-09-07 02:58:52 +0200102 return fmt.Errorf("type not supported: %v", ji.InfoTypeIdentity)
103 }
104 if ji.InfoJobIdentity == "" {
105 return fmt.Errorf("missing required job identity: %v", ji)
106 }
107 // Temporary for when there are only REST callbacks needed
108 if ji.TargetUri == "" {
109 return fmt.Errorf("missing required target URI: %v", ji)
elinuxhenrik63a42ca2021-09-06 22:16:24 +0200110 }
111 return nil
112}
elinuxhenrikcce95ff2021-09-05 17:27:02 +0200113
elinuxhenrik382870d2021-09-23 11:09:09 +0200114func GetTypes() ([]TypeData, error) {
elinuxhenrik28038562021-09-21 15:43:11 +0200115 mu.Lock()
116 defer mu.Unlock()
elinuxhenrik382870d2021-09-23 11:09:09 +0200117 types := make([]TypeData, 0, 1)
118 typeDefsByte, err := os.ReadFile(configFile)
elinuxhenrikcce95ff2021-09-05 17:27:02 +0200119 if err != nil {
120 return nil, err
121 }
elinuxhenrik382870d2021-09-23 11:09:09 +0200122 typeDefs := TypeDefinitions{}
123 err = json.Unmarshal(typeDefsByte, &typeDefs)
124 if err != nil {
125 return nil, err
126 }
127 for _, typeDef := range typeDefs.Types {
128 typeInfo := TypeData{
129 TypeId: typeDef.Id,
130 DMaaPTopicURL: typeDef.DmaapTopicURL,
131 Jobs: make(map[string]JobInfo),
132 }
133 if _, ok := allTypes[typeInfo.TypeId]; !ok {
134 allTypes[typeInfo.TypeId] = typeInfo
135 }
136 types = append(types, typeInfo)
137 }
elinuxhenrikcce95ff2021-09-05 17:27:02 +0200138 return types, nil
139}
140
elinuxhenrika77cd652021-09-06 10:56:21 +0200141func GetSupportedTypes() []string {
elinuxhenrik28038562021-09-21 15:43:11 +0200142 mu.Lock()
143 defer mu.Unlock()
elinuxhenrik63a42ca2021-09-06 22:16:24 +0200144 supportedTypes := []string{}
elinuxhenrik382870d2021-09-23 11:09:09 +0200145 for k := range allTypes {
elinuxhenrik63a42ca2021-09-06 22:16:24 +0200146 supportedTypes = append(supportedTypes, k)
147 }
elinuxhenrika77cd652021-09-06 10:56:21 +0200148 return supportedTypes
149}
150
elinuxhenrik63a42ca2021-09-06 22:16:24 +0200151func AddJob(job JobInfo) error {
152 return Handler.AddJob(job)
153}
154
elinuxhenrikfe61c612021-09-24 15:08:47 +0200155func DeleteJob(jobId string) {
156 Handler.DeleteJob(jobId)
157}
158
elinuxhenrik28038562021-09-21 15:43:11 +0200159func RunJobs(mRAddress string) {
160 for {
161 pollAndDistributeMessages(mRAddress)
162 }
163}
164
165func pollAndDistributeMessages(mRAddress string) {
elinuxhenrik382870d2021-09-23 11:09:09 +0200166 for typeId, typeInfo := range allTypes {
elinuxhenrik28038562021-09-21 15:43:11 +0200167 log.Debugf("Processing jobs for type: %v", typeId)
elinuxhenrik382870d2021-09-23 11:09:09 +0200168 messagesBody, error := restclient.Get(fmt.Sprintf("%v/%v", mRAddress, typeInfo.DMaaPTopicURL))
elinuxhenrik28038562021-09-21 15:43:11 +0200169 if error != nil {
170 log.Warnf("Error getting data from MR. Cause: %v", error)
171 continue
172 }
173 distributeMessages(messagesBody, typeInfo)
174 }
175}
176
elinuxhenrik382870d2021-09-23 11:09:09 +0200177func distributeMessages(messages []byte, typeInfo TypeData) {
elinuxhenrik28038562021-09-21 15:43:11 +0200178 if len(messages) > 2 {
179 mu.Lock()
180 for _, jobInfo := range typeInfo.Jobs {
181 go sendMessagesToConsumer(messages, jobInfo)
182 }
183 mu.Unlock()
184 }
185}
186
187func sendMessagesToConsumer(messages []byte, jobInfo JobInfo) {
188 log.Debugf("Processing job: %v", jobInfo.InfoJobIdentity)
189 if postErr := restclient.Post(jobInfo.TargetUri, messages); postErr != nil {
190 log.Warnf("Error posting data for job: %v. Cause: %v", jobInfo, postErr)
191 }
192}
193
elinuxhenrikb1fb1d82021-09-07 02:58:52 +0200194func clearAll() {
elinuxhenrik382870d2021-09-23 11:09:09 +0200195 allTypes = make(map[string]TypeData)
elinuxhenrikb1fb1d82021-09-07 02:58:52 +0200196}