ktimoney | 0bb219e | 2023-06-08 15:20:53 +0100 | [diff] [blame] | 1 | // - |
BjornMagnussonXA | c5655db | 2023-03-17 14:55:16 +0100 | [diff] [blame] | 2 | // |
ktimoney | 0bb219e | 2023-06-08 15:20:53 +0100 | [diff] [blame] | 3 | // ========================LICENSE_START================================= |
| 4 | // O-RAN-SC |
| 5 | // %% |
JohnKeeney | c6e2de2 | 2023-12-13 19:15:47 +0000 | [diff] [blame] | 6 | // Copyright (C) 2023: Nordix Foundation. All rights reserved. |
| 7 | // Copyright (C) 2023 OpenInfra Foundation Europe. All rights reserved. |
ktimoney | 0bb219e | 2023-06-08 15:20:53 +0100 | [diff] [blame] | 8 | // %% |
| 9 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 10 | // you may not use this file except in compliance with the License. |
| 11 | // You may obtain a copy of the License at |
BjornMagnussonXA | c5655db | 2023-03-17 14:55:16 +0100 | [diff] [blame] | 12 | // |
ktimoney | 0bb219e | 2023-06-08 15:20:53 +0100 | [diff] [blame] | 13 | // http://www.apache.org/licenses/LICENSE-2.0 |
BjornMagnussonXA | c5655db | 2023-03-17 14:55:16 +0100 | [diff] [blame] | 14 | // |
ktimoney | 0bb219e | 2023-06-08 15:20:53 +0100 | [diff] [blame] | 15 | // Unless required by applicable law or agreed to in writing, software |
| 16 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 17 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 18 | // See the License for the specific language governing permissions and |
| 19 | // limitations under the License. |
| 20 | // ========================LICENSE_END=================================== |
BjornMagnussonXA | c5655db | 2023-03-17 14:55:16 +0100 | [diff] [blame] | 21 | package main |
| 22 | |
| 23 | import ( |
BjornMagnussonXA | c5655db | 2023-03-17 14:55:16 +0100 | [diff] [blame] | 24 | "fmt" |
ktimoney | 0bb219e | 2023-06-08 15:20:53 +0100 | [diff] [blame] | 25 | "main/common/dataTypes" |
| 26 | "main/common/utils" |
| 27 | "main/components/kafkacollector" |
BjornMagnussonXA | c5655db | 2023-03-17 14:55:16 +0100 | [diff] [blame] | 28 | "net/http" |
| 29 | "os" |
ktimoney | 0bb219e | 2023-06-08 15:20:53 +0100 | [diff] [blame] | 30 | "os/signal" |
BjornMagnussonXA | c5655db | 2023-03-17 14:55:16 +0100 | [diff] [blame] | 31 | "runtime" |
ktimoney | 0bb219e | 2023-06-08 15:20:53 +0100 | [diff] [blame] | 32 | "sync" |
| 33 | "syscall" |
BjornMagnussonXA | c5655db | 2023-03-17 14:55:16 +0100 | [diff] [blame] | 34 | "time" |
JohnKeeney | c6e2de2 | 2023-12-13 19:15:47 +0000 | [diff] [blame] | 35 | |
| 36 | jsoniter "github.com/json-iterator/go" |
| 37 | log "github.com/sirupsen/logrus" |
BjornMagnussonXA | c5655db | 2023-03-17 14:55:16 +0100 | [diff] [blame] | 38 | ) |
| 39 | |
ktimoney | 0bb219e | 2023-06-08 15:20:53 +0100 | [diff] [blame] | 40 | var ics_server = os.Getenv("ICS") |
| 41 | var self = os.Getenv("SELF") |
BjornMagnussonXA | c5655db | 2023-03-17 14:55:16 +0100 | [diff] [blame] | 42 | |
| 43 | // This are optional - set if using SASL protocol is used towards kafka |
| 44 | var creds_grant_type = os.Getenv("CREDS_GRANT_TYPE") |
BjornMagnussonXA | c5655db | 2023-03-17 14:55:16 +0100 | [diff] [blame] | 45 | |
ktimoney | 0bb219e | 2023-06-08 15:20:53 +0100 | [diff] [blame] | 46 | var bootstrapserver = os.Getenv("KAFKA_SERVER") |
BjornMagnussonXA | c5655db | 2023-03-17 14:55:16 +0100 | [diff] [blame] | 47 | |
ktimoney | 0bb219e | 2023-06-08 15:20:53 +0100 | [diff] [blame] | 48 | const config_file = "application_configuration.json" |
| 49 | const producer_name = "kafka-producer" |
BjornMagnussonXA | c5655db | 2023-03-17 14:55:16 +0100 | [diff] [blame] | 50 | |
ktimoney | 0bb219e | 2023-06-08 15:20:53 +0100 | [diff] [blame] | 51 | var producer_instance_name string = producer_name |
BjornMagnussonXA | c5655db | 2023-03-17 14:55:16 +0100 | [diff] [blame] | 52 | |
ktimoney | 0bb219e | 2023-06-08 15:20:53 +0100 | [diff] [blame] | 53 | const reader_queue_length = 100 //Per type job |
| 54 | const writer_queue_length = 100 //Per info job |
BjornMagnussonXA | c5655db | 2023-03-17 14:55:16 +0100 | [diff] [blame] | 55 | |
ktimoney | 0bb219e | 2023-06-08 15:20:53 +0100 | [diff] [blame] | 56 | var files_volume = os.Getenv("FILES_VOLUME") |
BjornMagnussonXA | c5655db | 2023-03-17 14:55:16 +0100 | [diff] [blame] | 57 | |
ktimoney | 0bb219e | 2023-06-08 15:20:53 +0100 | [diff] [blame] | 58 | var data_out_channel = make(chan *dataTypes.KafkaPayload, writer_queue_length) |
| 59 | var writer_control = make(chan dataTypes.WriterControl, 1) |
BjornMagnussonXA | c5655db | 2023-03-17 14:55:16 +0100 | [diff] [blame] | 60 | |
ktimoney | 0bb219e | 2023-06-08 15:20:53 +0100 | [diff] [blame] | 61 | const registration_delay_short = 2 |
| 62 | const registration_delay_long = 120 |
BjornMagnussonXA | c5655db | 2023-03-17 14:55:16 +0100 | [diff] [blame] | 63 | |
ambrishest | ba3389b | 2023-10-10 07:41:14 +0100 | [diff] [blame] | 64 | const failedMessageLabel = " - failed" |
| 65 | |
BjornMagnussonXA | c5655db | 2023-03-17 14:55:16 +0100 | [diff] [blame] | 66 | //== Variables ==// |
| 67 | |
| 68 | var AppState = Init |
| 69 | |
| 70 | // Lock for all internal data |
| 71 | var datalock sync.Mutex |
| 72 | |
ktimoney | 0bb219e | 2023-06-08 15:20:53 +0100 | [diff] [blame] | 73 | const ( |
| 74 | Init dataTypes.AppStates = iota |
| 75 | Running |
| 76 | Terminating |
| 77 | ) |
BjornMagnussonXA | c5655db | 2023-03-17 14:55:16 +0100 | [diff] [blame] | 78 | |
ambrishest | 6e3dc81 | 2023-10-04 16:47:32 +0100 | [diff] [blame] | 79 | const registeringProducer = "Registering producer: " |
| 80 | |
BjornMagnussonXA | c5655db | 2023-03-17 14:55:16 +0100 | [diff] [blame] | 81 | // == Main ==// |
| 82 | func main() { |
| 83 | |
| 84 | //log.SetLevel(log.InfoLevel) |
| 85 | log.SetLevel(log.TraceLevel) |
| 86 | |
| 87 | log.Info("Server starting...") |
| 88 | |
| 89 | if self == "" { |
| 90 | log.Panic("Env SELF not configured") |
| 91 | } |
| 92 | if bootstrapserver == "" { |
| 93 | log.Panic("Env KAFKA_SERVER not set") |
| 94 | } |
| 95 | if ics_server == "" { |
| 96 | log.Panic("Env ICS not set") |
| 97 | } |
| 98 | if os.Getenv("KP") != "" { |
| 99 | producer_instance_name = producer_instance_name + "-" + os.Getenv("KP") |
| 100 | } |
| 101 | |
ambrishest | 6e3dc81 | 2023-10-04 16:47:32 +0100 | [diff] [blame] | 102 | go kafkacollector.StartTopicWriter(writer_control, data_out_channel) |
BjornMagnussonXA | c5655db | 2023-03-17 14:55:16 +0100 | [diff] [blame] | 103 | |
| 104 | //Setup proc for periodic type registration |
ambrishest | 6e3dc81 | 2023-10-04 16:47:32 +0100 | [diff] [blame] | 105 | var eventChan = make(chan int) //Channel for stopping the proc |
| 106 | go periodicRegistration(eventChan) |
BjornMagnussonXA | c5655db | 2023-03-17 14:55:16 +0100 | [diff] [blame] | 107 | |
| 108 | //Wait for term/int signal do try to shut down gracefully |
| 109 | sigs := make(chan os.Signal, 1) |
| 110 | signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) |
| 111 | go func() { |
| 112 | sig := <-sigs |
| 113 | fmt.Printf("Received signal %s - application will terminate\n", sig) |
ambrishest | 6e3dc81 | 2023-10-04 16:47:32 +0100 | [diff] [blame] | 114 | eventChan <- 0 // Stop periodic registration |
BjornMagnussonXA | c5655db | 2023-03-17 14:55:16 +0100 | [diff] [blame] | 115 | datalock.Lock() |
| 116 | defer datalock.Unlock() |
| 117 | AppState = Terminating |
BjornMagnussonXA | c5655db | 2023-03-17 14:55:16 +0100 | [diff] [blame] | 118 | }() |
| 119 | |
| 120 | AppState = Running |
| 121 | |
| 122 | //Wait until all go routines has exited |
| 123 | runtime.Goexit() |
| 124 | |
| 125 | fmt.Println("main routine exit") |
| 126 | fmt.Println("server stopped") |
| 127 | } |
| 128 | |
ktimoney | 0bb219e | 2023-06-08 15:20:53 +0100 | [diff] [blame] | 129 | // == Core functions ==// |
BjornMagnussonXA | c5655db | 2023-03-17 14:55:16 +0100 | [diff] [blame] | 130 | // Run periodic registration of producers |
ambrishest | 6e3dc81 | 2023-10-04 16:47:32 +0100 | [diff] [blame] | 131 | func periodicRegistration(evtch chan int) { |
BjornMagnussonXA | c5655db | 2023-03-17 14:55:16 +0100 | [diff] [blame] | 132 | var delay int = 1 |
| 133 | for { |
| 134 | select { |
| 135 | case msg := <-evtch: |
| 136 | if msg == 0 { // Stop thread |
| 137 | return |
| 138 | } |
| 139 | case <-time.After(time.Duration(delay) * time.Second): |
ambrishest | 6e3dc81 | 2023-10-04 16:47:32 +0100 | [diff] [blame] | 140 | ok := registerProducer() |
BjornMagnussonXA | c5655db | 2023-03-17 14:55:16 +0100 | [diff] [blame] | 141 | if ok { |
| 142 | delay = registration_delay_long |
| 143 | } else { |
| 144 | if delay < registration_delay_long { |
| 145 | delay += registration_delay_short |
| 146 | } else { |
| 147 | delay = registration_delay_short |
| 148 | } |
| 149 | } |
| 150 | } |
| 151 | } |
| 152 | } |
| 153 | |
ambrishest | 6e3dc81 | 2023-10-04 16:47:32 +0100 | [diff] [blame] | 154 | func registerProducer() bool { |
BjornMagnussonXA | c5655db | 2023-03-17 14:55:16 +0100 | [diff] [blame] | 155 | |
ambrishest | 6e3dc81 | 2023-10-04 16:47:32 +0100 | [diff] [blame] | 156 | log.Info(registeringProducer, producer_instance_name) |
BjornMagnussonXA | c5655db | 2023-03-17 14:55:16 +0100 | [diff] [blame] | 157 | |
| 158 | file, err := os.ReadFile(config_file) |
| 159 | if err != nil { |
| 160 | log.Error("Cannot read config file: ", config_file) |
ambrishest | 6e3dc81 | 2023-10-04 16:47:32 +0100 | [diff] [blame] | 161 | // NOSONAR |
ambrishest | ba3389b | 2023-10-10 07:41:14 +0100 | [diff] [blame] | 162 | log.Error(registeringProducer, producer_instance_name, failedMessageLabel) |
BjornMagnussonXA | c5655db | 2023-03-17 14:55:16 +0100 | [diff] [blame] | 163 | return false |
| 164 | } |
ktimoney | 0bb219e | 2023-06-08 15:20:53 +0100 | [diff] [blame] | 165 | data := dataTypes.DataTypes{} |
BjornMagnussonXA | c5655db | 2023-03-17 14:55:16 +0100 | [diff] [blame] | 166 | err = jsoniter.Unmarshal([]byte(file), &data) |
| 167 | if err != nil { |
| 168 | log.Error("Cannot parse config file: ", config_file) |
ambrishest | 6e3dc81 | 2023-10-04 16:47:32 +0100 | [diff] [blame] | 169 | // NOSONAR |
ambrishest | ba3389b | 2023-10-10 07:41:14 +0100 | [diff] [blame] | 170 | log.Error(registeringProducer, producer_instance_name, failedMessageLabel) |
BjornMagnussonXA | c5655db | 2023-03-17 14:55:16 +0100 | [diff] [blame] | 171 | return false |
| 172 | } |
ambrishest | 6e3dc81 | 2023-10-04 16:47:32 +0100 | [diff] [blame] | 173 | var newTypeNames []string |
BjornMagnussonXA | c5655db | 2023-03-17 14:55:16 +0100 | [diff] [blame] | 174 | |
| 175 | for i := 0; i < len(data.ProdDataTypes); i++ { |
| 176 | t1 := make(map[string]interface{}) |
| 177 | t2 := make(map[string]interface{}) |
| 178 | |
| 179 | t2["schema"] = "http://json-schema.org/draft-07/schema#" |
| 180 | t2["title"] = data.ProdDataTypes[i].ID |
| 181 | t2["description"] = data.ProdDataTypes[i].ID |
| 182 | t2["type"] = "object" |
| 183 | |
| 184 | t1["info_job_data_schema"] = t2 |
| 185 | |
ktimoney | 0bb219e | 2023-06-08 15:20:53 +0100 | [diff] [blame] | 186 | json, err := jsoniter.Marshal(t1) |
BjornMagnussonXA | c5655db | 2023-03-17 14:55:16 +0100 | [diff] [blame] | 187 | if err != nil { |
| 188 | log.Error("Cannot create json for type: ", data.ProdDataTypes[i].ID) |
ambrishest | 6e3dc81 | 2023-10-04 16:47:32 +0100 | [diff] [blame] | 189 | // NOSONAR |
ambrishest | ba3389b | 2023-10-10 07:41:14 +0100 | [diff] [blame] | 190 | log.Error(registeringProducer, producer_instance_name, failedMessageLabel) |
BjornMagnussonXA | c5655db | 2023-03-17 14:55:16 +0100 | [diff] [blame] | 191 | return false |
| 192 | } else { |
ambrishest | 6e3dc81 | 2023-10-04 16:47:32 +0100 | [diff] [blame] | 193 | ok := utils.SendHttpRequest(json, http.MethodPut, "http://"+ics_server+"/data-producer/v1/info-types/"+data.ProdDataTypes[i].ID, true, creds_grant_type != "") |
BjornMagnussonXA | c5655db | 2023-03-17 14:55:16 +0100 | [diff] [blame] | 194 | if !ok { |
| 195 | log.Error("Cannot register type: ", data.ProdDataTypes[i].ID) |
ambrishest | 6e3dc81 | 2023-10-04 16:47:32 +0100 | [diff] [blame] | 196 | // NOSONAR |
ambrishest | ba3389b | 2023-10-10 07:41:14 +0100 | [diff] [blame] | 197 | log.Error(registeringProducer, producer_instance_name, failedMessageLabel) |
BjornMagnussonXA | c5655db | 2023-03-17 14:55:16 +0100 | [diff] [blame] | 198 | return false |
| 199 | } |
ambrishest | 6e3dc81 | 2023-10-04 16:47:32 +0100 | [diff] [blame] | 200 | newTypeNames = append(newTypeNames, data.ProdDataTypes[i].ID) |
BjornMagnussonXA | c5655db | 2023-03-17 14:55:16 +0100 | [diff] [blame] | 201 | } |
| 202 | |
| 203 | } |
| 204 | |
ambrishest | 6e3dc81 | 2023-10-04 16:47:32 +0100 | [diff] [blame] | 205 | log.Debug("Registering types: ", newTypeNames) |
BjornMagnussonXA | c5655db | 2023-03-17 14:55:16 +0100 | [diff] [blame] | 206 | datalock.Lock() |
| 207 | defer datalock.Unlock() |
| 208 | |
BjornMagnussonXA | c5655db | 2023-03-17 14:55:16 +0100 | [diff] [blame] | 209 | for _, v := range data.ProdDataTypes { |
ktimoney | 0bb219e | 2023-06-08 15:20:53 +0100 | [diff] [blame] | 210 | log.Info("Adding type job for type: ", v.ID, " Type added to configuration") |
ambrishest | 6e3dc81 | 2023-10-04 16:47:32 +0100 | [diff] [blame] | 211 | startTypeJob(v) |
BjornMagnussonXA | c5655db | 2023-03-17 14:55:16 +0100 | [diff] [blame] | 212 | } |
| 213 | |
ktimoney | 0bb219e | 2023-06-08 15:20:53 +0100 | [diff] [blame] | 214 | dataTypes.InfoTypes = data |
| 215 | log.Debug("Datatypes: ", dataTypes.InfoTypes) |
ambrishest | 6e3dc81 | 2023-10-04 16:47:32 +0100 | [diff] [blame] | 216 | log.Info(registeringProducer, producer_instance_name, " - OK") |
BjornMagnussonXA | c5655db | 2023-03-17 14:55:16 +0100 | [diff] [blame] | 217 | return true |
| 218 | } |
| 219 | |
ambrishest | 6e3dc81 | 2023-10-04 16:47:32 +0100 | [diff] [blame] | 220 | func startTypeJob(dp dataTypes.DataType) { |
BjornMagnussonXA | c5655db | 2023-03-17 14:55:16 +0100 | [diff] [blame] | 221 | log.Info("Starting type job: ", dp.ID) |
ambrishest | 6e3dc81 | 2023-10-04 16:47:32 +0100 | [diff] [blame] | 222 | jobRecord := dataTypes.TypeJobRecord{} |
BjornMagnussonXA | c5655db | 2023-03-17 14:55:16 +0100 | [diff] [blame] | 223 | |
ambrishest | 6e3dc81 | 2023-10-04 16:47:32 +0100 | [diff] [blame] | 224 | jobRecord.Job_control = make(chan dataTypes.JobControl, 1) |
| 225 | jobRecord.Reader_control = make(chan dataTypes.ReaderControl, 1) |
| 226 | jobRecord.Data_in_channel = make(chan *dataTypes.KafkaPayload, reader_queue_length) |
| 227 | jobRecord.InfoType = dp.ID |
| 228 | jobRecord.InputTopic = dp.KafkaInputTopic |
| 229 | jobRecord.GroupId = "kafka-procon-" + dp.ID |
| 230 | jobRecord.ClientId = dp.ID + "-" + os.Getenv("KP") |
BjornMagnussonXA | c5655db | 2023-03-17 14:55:16 +0100 | [diff] [blame] | 231 | |
| 232 | switch dp.ID { |
| 233 | case "xml-file-data-to-filestore": |
ambrishest | 6e3dc81 | 2023-10-04 16:47:32 +0100 | [diff] [blame] | 234 | go kafkacollector.StartJobXmlFileData(dp.ID, jobRecord.Job_control, jobRecord.Data_in_channel, data_out_channel, "", "pm-files-json") |
BjornMagnussonXA | c5655db | 2023-03-17 14:55:16 +0100 | [diff] [blame] | 235 | case "xml-file-data": |
ambrishest | 6e3dc81 | 2023-10-04 16:47:32 +0100 | [diff] [blame] | 236 | go kafkacollector.StartJobXmlFileData(dp.ID, jobRecord.Job_control, jobRecord.Data_in_channel, data_out_channel, files_volume, "") |
BjornMagnussonXA | c5655db | 2023-03-17 14:55:16 +0100 | [diff] [blame] | 237 | default: |
| 238 | } |
| 239 | |
ambrishest | 6e3dc81 | 2023-10-04 16:47:32 +0100 | [diff] [blame] | 240 | go kafkacollector.StartTopicReader(dp.KafkaInputTopic, dp.ID, jobRecord.Reader_control, jobRecord.Data_in_channel, jobRecord.GroupId, jobRecord.ClientId) |
BjornMagnussonXA | c5655db | 2023-03-17 14:55:16 +0100 | [diff] [blame] | 241 | |
ambrishest | 6e3dc81 | 2023-10-04 16:47:32 +0100 | [diff] [blame] | 242 | dataTypes.TypeJobs[dp.ID] = jobRecord |
BjornMagnussonXA | c5655db | 2023-03-17 14:55:16 +0100 | [diff] [blame] | 243 | log.Debug("Type job input type: ", dp.InputJobType) |
BjornMagnussonXA | c5655db | 2023-03-17 14:55:16 +0100 | [diff] [blame] | 244 | } |