blob: 00289512341a84bd0e4b16a1a26016b51bf084f1 [file] [log] [blame]
ktimoney0bb219e2023-06-08 15:20:53 +01001// -
BjornMagnussonXAc5655db2023-03-17 14:55:16 +01002//
ktimoney0bb219e2023-06-08 15:20:53 +01003// ========================LICENSE_START=================================
4// O-RAN-SC
5// %%
JohnKeeneyc6e2de22023-12-13 19:15:47 +00006// Copyright (C) 2023: Nordix Foundation. All rights reserved.
7// Copyright (C) 2023 OpenInfra Foundation Europe. All rights reserved.
ktimoney0bb219e2023-06-08 15:20:53 +01008// %%
9// Licensed under the Apache License, Version 2.0 (the "License");
10// you may not use this file except in compliance with the License.
11// You may obtain a copy of the License at
BjornMagnussonXAc5655db2023-03-17 14:55:16 +010012//
ktimoney0bb219e2023-06-08 15:20:53 +010013// http://www.apache.org/licenses/LICENSE-2.0
BjornMagnussonXAc5655db2023-03-17 14:55:16 +010014//
ktimoney0bb219e2023-06-08 15:20:53 +010015// Unless required by applicable law or agreed to in writing, software
16// distributed under the License is distributed on an "AS IS" BASIS,
17// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18// See the License for the specific language governing permissions and
19// limitations under the License.
20// ========================LICENSE_END===================================
BjornMagnussonXAc5655db2023-03-17 14:55:16 +010021package main
22
23import (
BjornMagnussonXAc5655db2023-03-17 14:55:16 +010024 "fmt"
ktimoney0bb219e2023-06-08 15:20:53 +010025 "main/common/dataTypes"
26 "main/common/utils"
27 "main/components/kafkacollector"
BjornMagnussonXAc5655db2023-03-17 14:55:16 +010028 "net/http"
29 "os"
ktimoney0bb219e2023-06-08 15:20:53 +010030 "os/signal"
BjornMagnussonXAc5655db2023-03-17 14:55:16 +010031 "runtime"
ktimoney0bb219e2023-06-08 15:20:53 +010032 "sync"
33 "syscall"
BjornMagnussonXAc5655db2023-03-17 14:55:16 +010034 "time"
JohnKeeneyc6e2de22023-12-13 19:15:47 +000035
36 jsoniter "github.com/json-iterator/go"
37 log "github.com/sirupsen/logrus"
BjornMagnussonXAc5655db2023-03-17 14:55:16 +010038)
39
ktimoney0bb219e2023-06-08 15:20:53 +010040var ics_server = os.Getenv("ICS")
41var self = os.Getenv("SELF")
BjornMagnussonXAc5655db2023-03-17 14:55:16 +010042
43// This are optional - set if using SASL protocol is used towards kafka
44var creds_grant_type = os.Getenv("CREDS_GRANT_TYPE")
BjornMagnussonXAc5655db2023-03-17 14:55:16 +010045
ktimoney0bb219e2023-06-08 15:20:53 +010046var bootstrapserver = os.Getenv("KAFKA_SERVER")
BjornMagnussonXAc5655db2023-03-17 14:55:16 +010047
ktimoney0bb219e2023-06-08 15:20:53 +010048const config_file = "application_configuration.json"
49const producer_name = "kafka-producer"
BjornMagnussonXAc5655db2023-03-17 14:55:16 +010050
ktimoney0bb219e2023-06-08 15:20:53 +010051var producer_instance_name string = producer_name
BjornMagnussonXAc5655db2023-03-17 14:55:16 +010052
ktimoney0bb219e2023-06-08 15:20:53 +010053const reader_queue_length = 100 //Per type job
54const writer_queue_length = 100 //Per info job
BjornMagnussonXAc5655db2023-03-17 14:55:16 +010055
ktimoney0bb219e2023-06-08 15:20:53 +010056var files_volume = os.Getenv("FILES_VOLUME")
BjornMagnussonXAc5655db2023-03-17 14:55:16 +010057
ktimoney0bb219e2023-06-08 15:20:53 +010058var data_out_channel = make(chan *dataTypes.KafkaPayload, writer_queue_length)
59var writer_control = make(chan dataTypes.WriterControl, 1)
BjornMagnussonXAc5655db2023-03-17 14:55:16 +010060
ktimoney0bb219e2023-06-08 15:20:53 +010061const registration_delay_short = 2
62const registration_delay_long = 120
BjornMagnussonXAc5655db2023-03-17 14:55:16 +010063
ambrishestba3389b2023-10-10 07:41:14 +010064const failedMessageLabel = " - failed"
65
BjornMagnussonXAc5655db2023-03-17 14:55:16 +010066//== Variables ==//
67
68var AppState = Init
69
70// Lock for all internal data
71var datalock sync.Mutex
72
ktimoney0bb219e2023-06-08 15:20:53 +010073const (
74 Init dataTypes.AppStates = iota
75 Running
76 Terminating
77)
BjornMagnussonXAc5655db2023-03-17 14:55:16 +010078
ambrishest6e3dc812023-10-04 16:47:32 +010079const registeringProducer = "Registering producer: "
80
BjornMagnussonXAc5655db2023-03-17 14:55:16 +010081// == Main ==//
82func main() {
83
84 //log.SetLevel(log.InfoLevel)
85 log.SetLevel(log.TraceLevel)
86
87 log.Info("Server starting...")
88
89 if self == "" {
90 log.Panic("Env SELF not configured")
91 }
92 if bootstrapserver == "" {
93 log.Panic("Env KAFKA_SERVER not set")
94 }
95 if ics_server == "" {
96 log.Panic("Env ICS not set")
97 }
98 if os.Getenv("KP") != "" {
99 producer_instance_name = producer_instance_name + "-" + os.Getenv("KP")
100 }
101
ambrishest6e3dc812023-10-04 16:47:32 +0100102 go kafkacollector.StartTopicWriter(writer_control, data_out_channel)
BjornMagnussonXAc5655db2023-03-17 14:55:16 +0100103
104 //Setup proc for periodic type registration
ambrishest6e3dc812023-10-04 16:47:32 +0100105 var eventChan = make(chan int) //Channel for stopping the proc
106 go periodicRegistration(eventChan)
BjornMagnussonXAc5655db2023-03-17 14:55:16 +0100107
108 //Wait for term/int signal do try to shut down gracefully
109 sigs := make(chan os.Signal, 1)
110 signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
111 go func() {
112 sig := <-sigs
113 fmt.Printf("Received signal %s - application will terminate\n", sig)
ambrishest6e3dc812023-10-04 16:47:32 +0100114 eventChan <- 0 // Stop periodic registration
BjornMagnussonXAc5655db2023-03-17 14:55:16 +0100115 datalock.Lock()
116 defer datalock.Unlock()
117 AppState = Terminating
BjornMagnussonXAc5655db2023-03-17 14:55:16 +0100118 }()
119
120 AppState = Running
121
122 //Wait until all go routines has exited
123 runtime.Goexit()
124
125 fmt.Println("main routine exit")
126 fmt.Println("server stopped")
127}
128
ktimoney0bb219e2023-06-08 15:20:53 +0100129// == Core functions ==//
BjornMagnussonXAc5655db2023-03-17 14:55:16 +0100130// Run periodic registration of producers
ambrishest6e3dc812023-10-04 16:47:32 +0100131func periodicRegistration(evtch chan int) {
BjornMagnussonXAc5655db2023-03-17 14:55:16 +0100132 var delay int = 1
133 for {
134 select {
135 case msg := <-evtch:
136 if msg == 0 { // Stop thread
137 return
138 }
139 case <-time.After(time.Duration(delay) * time.Second):
ambrishest6e3dc812023-10-04 16:47:32 +0100140 ok := registerProducer()
BjornMagnussonXAc5655db2023-03-17 14:55:16 +0100141 if ok {
142 delay = registration_delay_long
143 } else {
144 if delay < registration_delay_long {
145 delay += registration_delay_short
146 } else {
147 delay = registration_delay_short
148 }
149 }
150 }
151 }
152}
153
ambrishest6e3dc812023-10-04 16:47:32 +0100154func registerProducer() bool {
BjornMagnussonXAc5655db2023-03-17 14:55:16 +0100155
ambrishest6e3dc812023-10-04 16:47:32 +0100156 log.Info(registeringProducer, producer_instance_name)
BjornMagnussonXAc5655db2023-03-17 14:55:16 +0100157
158 file, err := os.ReadFile(config_file)
159 if err != nil {
160 log.Error("Cannot read config file: ", config_file)
ambrishest6e3dc812023-10-04 16:47:32 +0100161 // NOSONAR
ambrishestba3389b2023-10-10 07:41:14 +0100162 log.Error(registeringProducer, producer_instance_name, failedMessageLabel)
BjornMagnussonXAc5655db2023-03-17 14:55:16 +0100163 return false
164 }
ktimoney0bb219e2023-06-08 15:20:53 +0100165 data := dataTypes.DataTypes{}
BjornMagnussonXAc5655db2023-03-17 14:55:16 +0100166 err = jsoniter.Unmarshal([]byte(file), &data)
167 if err != nil {
168 log.Error("Cannot parse config file: ", config_file)
ambrishest6e3dc812023-10-04 16:47:32 +0100169 // NOSONAR
ambrishestba3389b2023-10-10 07:41:14 +0100170 log.Error(registeringProducer, producer_instance_name, failedMessageLabel)
BjornMagnussonXAc5655db2023-03-17 14:55:16 +0100171 return false
172 }
ambrishest6e3dc812023-10-04 16:47:32 +0100173 var newTypeNames []string
BjornMagnussonXAc5655db2023-03-17 14:55:16 +0100174
175 for i := 0; i < len(data.ProdDataTypes); i++ {
176 t1 := make(map[string]interface{})
177 t2 := make(map[string]interface{})
178
179 t2["schema"] = "http://json-schema.org/draft-07/schema#"
180 t2["title"] = data.ProdDataTypes[i].ID
181 t2["description"] = data.ProdDataTypes[i].ID
182 t2["type"] = "object"
183
184 t1["info_job_data_schema"] = t2
185
ktimoney0bb219e2023-06-08 15:20:53 +0100186 json, err := jsoniter.Marshal(t1)
BjornMagnussonXAc5655db2023-03-17 14:55:16 +0100187 if err != nil {
188 log.Error("Cannot create json for type: ", data.ProdDataTypes[i].ID)
ambrishest6e3dc812023-10-04 16:47:32 +0100189 // NOSONAR
ambrishestba3389b2023-10-10 07:41:14 +0100190 log.Error(registeringProducer, producer_instance_name, failedMessageLabel)
BjornMagnussonXAc5655db2023-03-17 14:55:16 +0100191 return false
192 } else {
ambrishest6e3dc812023-10-04 16:47:32 +0100193 ok := utils.SendHttpRequest(json, http.MethodPut, "http://"+ics_server+"/data-producer/v1/info-types/"+data.ProdDataTypes[i].ID, true, creds_grant_type != "")
BjornMagnussonXAc5655db2023-03-17 14:55:16 +0100194 if !ok {
195 log.Error("Cannot register type: ", data.ProdDataTypes[i].ID)
ambrishest6e3dc812023-10-04 16:47:32 +0100196 // NOSONAR
ambrishestba3389b2023-10-10 07:41:14 +0100197 log.Error(registeringProducer, producer_instance_name, failedMessageLabel)
BjornMagnussonXAc5655db2023-03-17 14:55:16 +0100198 return false
199 }
ambrishest6e3dc812023-10-04 16:47:32 +0100200 newTypeNames = append(newTypeNames, data.ProdDataTypes[i].ID)
BjornMagnussonXAc5655db2023-03-17 14:55:16 +0100201 }
202
203 }
204
ambrishest6e3dc812023-10-04 16:47:32 +0100205 log.Debug("Registering types: ", newTypeNames)
BjornMagnussonXAc5655db2023-03-17 14:55:16 +0100206 datalock.Lock()
207 defer datalock.Unlock()
208
BjornMagnussonXAc5655db2023-03-17 14:55:16 +0100209 for _, v := range data.ProdDataTypes {
ktimoney0bb219e2023-06-08 15:20:53 +0100210 log.Info("Adding type job for type: ", v.ID, " Type added to configuration")
ambrishest6e3dc812023-10-04 16:47:32 +0100211 startTypeJob(v)
BjornMagnussonXAc5655db2023-03-17 14:55:16 +0100212 }
213
ktimoney0bb219e2023-06-08 15:20:53 +0100214 dataTypes.InfoTypes = data
215 log.Debug("Datatypes: ", dataTypes.InfoTypes)
ambrishest6e3dc812023-10-04 16:47:32 +0100216 log.Info(registeringProducer, producer_instance_name, " - OK")
BjornMagnussonXAc5655db2023-03-17 14:55:16 +0100217 return true
218}
219
ambrishest6e3dc812023-10-04 16:47:32 +0100220func startTypeJob(dp dataTypes.DataType) {
BjornMagnussonXAc5655db2023-03-17 14:55:16 +0100221 log.Info("Starting type job: ", dp.ID)
ambrishest6e3dc812023-10-04 16:47:32 +0100222 jobRecord := dataTypes.TypeJobRecord{}
BjornMagnussonXAc5655db2023-03-17 14:55:16 +0100223
ambrishest6e3dc812023-10-04 16:47:32 +0100224 jobRecord.Job_control = make(chan dataTypes.JobControl, 1)
225 jobRecord.Reader_control = make(chan dataTypes.ReaderControl, 1)
226 jobRecord.Data_in_channel = make(chan *dataTypes.KafkaPayload, reader_queue_length)
227 jobRecord.InfoType = dp.ID
228 jobRecord.InputTopic = dp.KafkaInputTopic
229 jobRecord.GroupId = "kafka-procon-" + dp.ID
230 jobRecord.ClientId = dp.ID + "-" + os.Getenv("KP")
BjornMagnussonXAc5655db2023-03-17 14:55:16 +0100231
232 switch dp.ID {
233 case "xml-file-data-to-filestore":
ambrishest6e3dc812023-10-04 16:47:32 +0100234 go kafkacollector.StartJobXmlFileData(dp.ID, jobRecord.Job_control, jobRecord.Data_in_channel, data_out_channel, "", "pm-files-json")
BjornMagnussonXAc5655db2023-03-17 14:55:16 +0100235 case "xml-file-data":
ambrishest6e3dc812023-10-04 16:47:32 +0100236 go kafkacollector.StartJobXmlFileData(dp.ID, jobRecord.Job_control, jobRecord.Data_in_channel, data_out_channel, files_volume, "")
BjornMagnussonXAc5655db2023-03-17 14:55:16 +0100237 default:
238 }
239
ambrishest6e3dc812023-10-04 16:47:32 +0100240 go kafkacollector.StartTopicReader(dp.KafkaInputTopic, dp.ID, jobRecord.Reader_control, jobRecord.Data_in_channel, jobRecord.GroupId, jobRecord.ClientId)
BjornMagnussonXAc5655db2023-03-17 14:55:16 +0100241
ambrishest6e3dc812023-10-04 16:47:32 +0100242 dataTypes.TypeJobs[dp.ID] = jobRecord
BjornMagnussonXAc5655db2023-03-17 14:55:16 +0100243 log.Debug("Type job input type: ", dp.InputJobType)
BjornMagnussonXAc5655db2023-03-17 14:55:16 +0100244}