blob: 00289512341a84bd0e4b16a1a26016b51bf084f1 [file] [log] [blame]
// -
//
// ========================LICENSE_START=================================
// O-RAN-SC
// %%
// Copyright (C) 2023: Nordix Foundation. All rights reserved.
// Copyright (C) 2023 OpenInfra Foundation Europe. All rights reserved.
// %%
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ========================LICENSE_END===================================
package main
import (
"fmt"
"main/common/dataTypes"
"main/common/utils"
"main/components/kafkacollector"
"net/http"
"os"
"os/signal"
"runtime"
"sync"
"syscall"
"time"
jsoniter "github.com/json-iterator/go"
log "github.com/sirupsen/logrus"
)
var ics_server = os.Getenv("ICS")
var self = os.Getenv("SELF")
// This are optional - set if using SASL protocol is used towards kafka
var creds_grant_type = os.Getenv("CREDS_GRANT_TYPE")
var bootstrapserver = os.Getenv("KAFKA_SERVER")
const config_file = "application_configuration.json"
const producer_name = "kafka-producer"
var producer_instance_name string = producer_name
const reader_queue_length = 100 //Per type job
const writer_queue_length = 100 //Per info job
var files_volume = os.Getenv("FILES_VOLUME")
var data_out_channel = make(chan *dataTypes.KafkaPayload, writer_queue_length)
var writer_control = make(chan dataTypes.WriterControl, 1)
const registration_delay_short = 2
const registration_delay_long = 120
const failedMessageLabel = " - failed"
//== Variables ==//
var AppState = Init
// Lock for all internal data
var datalock sync.Mutex
const (
Init dataTypes.AppStates = iota
Running
Terminating
)
const registeringProducer = "Registering producer: "
// == Main ==//
func main() {
//log.SetLevel(log.InfoLevel)
log.SetLevel(log.TraceLevel)
log.Info("Server starting...")
if self == "" {
log.Panic("Env SELF not configured")
}
if bootstrapserver == "" {
log.Panic("Env KAFKA_SERVER not set")
}
if ics_server == "" {
log.Panic("Env ICS not set")
}
if os.Getenv("KP") != "" {
producer_instance_name = producer_instance_name + "-" + os.Getenv("KP")
}
go kafkacollector.StartTopicWriter(writer_control, data_out_channel)
//Setup proc for periodic type registration
var eventChan = make(chan int) //Channel for stopping the proc
go periodicRegistration(eventChan)
//Wait for term/int signal do try to shut down gracefully
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
sig := <-sigs
fmt.Printf("Received signal %s - application will terminate\n", sig)
eventChan <- 0 // Stop periodic registration
datalock.Lock()
defer datalock.Unlock()
AppState = Terminating
}()
AppState = Running
//Wait until all go routines has exited
runtime.Goexit()
fmt.Println("main routine exit")
fmt.Println("server stopped")
}
// == Core functions ==//
// Run periodic registration of producers
func periodicRegistration(evtch chan int) {
var delay int = 1
for {
select {
case msg := <-evtch:
if msg == 0 { // Stop thread
return
}
case <-time.After(time.Duration(delay) * time.Second):
ok := registerProducer()
if ok {
delay = registration_delay_long
} else {
if delay < registration_delay_long {
delay += registration_delay_short
} else {
delay = registration_delay_short
}
}
}
}
}
func registerProducer() bool {
log.Info(registeringProducer, producer_instance_name)
file, err := os.ReadFile(config_file)
if err != nil {
log.Error("Cannot read config file: ", config_file)
// NOSONAR
log.Error(registeringProducer, producer_instance_name, failedMessageLabel)
return false
}
data := dataTypes.DataTypes{}
err = jsoniter.Unmarshal([]byte(file), &data)
if err != nil {
log.Error("Cannot parse config file: ", config_file)
// NOSONAR
log.Error(registeringProducer, producer_instance_name, failedMessageLabel)
return false
}
var newTypeNames []string
for i := 0; i < len(data.ProdDataTypes); i++ {
t1 := make(map[string]interface{})
t2 := make(map[string]interface{})
t2["schema"] = "http://json-schema.org/draft-07/schema#"
t2["title"] = data.ProdDataTypes[i].ID
t2["description"] = data.ProdDataTypes[i].ID
t2["type"] = "object"
t1["info_job_data_schema"] = t2
json, err := jsoniter.Marshal(t1)
if err != nil {
log.Error("Cannot create json for type: ", data.ProdDataTypes[i].ID)
// NOSONAR
log.Error(registeringProducer, producer_instance_name, failedMessageLabel)
return false
} else {
ok := utils.SendHttpRequest(json, http.MethodPut, "http://"+ics_server+"/data-producer/v1/info-types/"+data.ProdDataTypes[i].ID, true, creds_grant_type != "")
if !ok {
log.Error("Cannot register type: ", data.ProdDataTypes[i].ID)
// NOSONAR
log.Error(registeringProducer, producer_instance_name, failedMessageLabel)
return false
}
newTypeNames = append(newTypeNames, data.ProdDataTypes[i].ID)
}
}
log.Debug("Registering types: ", newTypeNames)
datalock.Lock()
defer datalock.Unlock()
for _, v := range data.ProdDataTypes {
log.Info("Adding type job for type: ", v.ID, " Type added to configuration")
startTypeJob(v)
}
dataTypes.InfoTypes = data
log.Debug("Datatypes: ", dataTypes.InfoTypes)
log.Info(registeringProducer, producer_instance_name, " - OK")
return true
}
func startTypeJob(dp dataTypes.DataType) {
log.Info("Starting type job: ", dp.ID)
jobRecord := dataTypes.TypeJobRecord{}
jobRecord.Job_control = make(chan dataTypes.JobControl, 1)
jobRecord.Reader_control = make(chan dataTypes.ReaderControl, 1)
jobRecord.Data_in_channel = make(chan *dataTypes.KafkaPayload, reader_queue_length)
jobRecord.InfoType = dp.ID
jobRecord.InputTopic = dp.KafkaInputTopic
jobRecord.GroupId = "kafka-procon-" + dp.ID
jobRecord.ClientId = dp.ID + "-" + os.Getenv("KP")
switch dp.ID {
case "xml-file-data-to-filestore":
go kafkacollector.StartJobXmlFileData(dp.ID, jobRecord.Job_control, jobRecord.Data_in_channel, data_out_channel, "", "pm-files-json")
case "xml-file-data":
go kafkacollector.StartJobXmlFileData(dp.ID, jobRecord.Job_control, jobRecord.Data_in_channel, data_out_channel, files_volume, "")
default:
}
go kafkacollector.StartTopicReader(dp.KafkaInputTopic, dp.ID, jobRecord.Reader_control, jobRecord.Data_in_channel, jobRecord.GroupId, jobRecord.ClientId)
dataTypes.TypeJobs[dp.ID] = jobRecord
log.Debug("Type job input type: ", dp.InputJobType)
}