| // ============LICENSE_START=============================================== |
| // Copyright (C) 2023 Nordix Foundation. All rights reserved. |
| // ======================================================================== |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| // ============LICENSE_END================================================= |
| // |
| |
| package main |
| |
| import ( |
| "bytes" |
| "compress/gzip" |
| "context" |
| "crypto/tls" |
| "encoding/json" |
| "encoding/xml" |
| "errors" |
| "fmt" |
| "io" |
| "net" |
| "os/signal" |
| "reflect" |
| "strings" |
| "sync" |
| "syscall" |
| |
| "net/http" |
| "os" |
| "runtime" |
| "strconv" |
| "time" |
| |
| "github.com/google/uuid" |
| "golang.org/x/oauth2/clientcredentials" |
| |
| log "github.com/sirupsen/logrus" |
| |
| "github.com/gorilla/mux" |
| |
| "net/http/pprof" |
| |
| "github.com/confluentinc/confluent-kafka-go/kafka" |
| influxdb2 "github.com/influxdata/influxdb-client-go/v2" |
| jsoniter "github.com/json-iterator/go" |
| "github.com/minio/minio-go/v7" |
| "github.com/minio/minio-go/v7/pkg/credentials" |
| ) |
| |
| //== Constants ==// |
| |
| const http_port = 80 |
| const https_port = 443 |
| const config_file = "application_configuration.json" |
| const server_crt = "server.crt" |
| const server_key = "server.key" |
| |
| const producer_name = "kafka-producer" |
| |
| const registration_delay_short = 2 |
| const registration_delay_long = 120 |
| |
| const mutexLocked = 1 |
| |
| const ( |
| Init AppStates = iota |
| Running |
| Terminating |
| ) |
| |
| const reader_queue_length = 100 //Per type job |
| const writer_queue_length = 100 //Per info job |
| const parallelism_limiter = 100 //For all jobs |
| |
| // This are optional - set if using SASL protocol is used towards kafka |
| var creds_grant_type = os.Getenv("CREDS_GRANT_TYPE") |
| var creds_client_secret = os.Getenv("CREDS_CLIENT_SECRET") |
| var creds_client_id = os.Getenv("CREDS_CLIENT_ID") |
| var creds_service_url = os.Getenv("AUTH_SERVICE_URL") |
| |
| //== Types ==// |
| |
| type AppStates int64 |
| |
| type FilterParameters struct { |
| MeasuredEntityDns []string `json:"measuredEntityDns"` |
| MeasTypes []string `json:"measTypes"` |
| MeasObjClass []string `json:"measObjClass"` |
| MeasObjInstIds []string `json:"measObjInstIds"` |
| } |
| |
| type InfoJobDataType struct { |
| InfoJobData struct { |
| KafkaOutputTopic string `json:"kafkaOutputTopic"` |
| |
| DbUrl string `json:"db-url"` |
| DbOrg string `json:"db-org"` |
| DbBucket string `json:"db-bucket"` |
| DbToken string `json:"db-token"` |
| |
| FilterParams FilterParameters `json:"filter"` |
| } `json:"info_job_data"` |
| InfoJobIdentity string `json:"info_job_identity"` |
| InfoTypeIdentity string `json:"info_type_identity"` |
| LastUpdated string `json:"last_updated"` |
| Owner string `json:"owner"` |
| TargetURI string `json:"target_uri"` |
| } |
| |
| // Type for an infojob |
| type InfoJobRecord struct { |
| job_info InfoJobDataType |
| output_topic string |
| |
| statistics *InfoJobStats |
| } |
| |
| // Type for an infojob |
| type TypeJobRecord struct { |
| InfoType string |
| InputTopic string |
| data_in_channel chan *KafkaPayload |
| reader_control chan ReaderControl |
| job_control chan JobControl |
| groupId string |
| clientId string |
| |
| statistics *TypeJobStats |
| } |
| |
| // Type for controlling the topic reader |
| type ReaderControl struct { |
| command string |
| } |
| |
| // Type for controlling the topic writer |
| type WriterControl struct { |
| command string |
| } |
| |
| // Type for controlling the job |
| type JobControl struct { |
| command string |
| filter Filter |
| } |
| |
| type KafkaPayload struct { |
| msg *kafka.Message |
| topic string |
| jobid string |
| } |
| |
| type FilterMaps struct { |
| sourceNameMap map[string]bool |
| measObjClassMap map[string]bool |
| measObjInstIdsMap map[string]bool |
| measTypesMap map[string]bool |
| } |
| |
| type InfluxJobParameters struct { |
| DbUrl string |
| DbOrg string |
| DbBucket string |
| DbToken string |
| } |
| |
| type Filter struct { |
| JobId string |
| OutputTopic string |
| filter FilterMaps |
| |
| influxParameters InfluxJobParameters |
| } |
| |
| // Type for info job statistics |
| type InfoJobStats struct { |
| out_msg_cnt int |
| out_data_vol int64 |
| } |
| |
| // Type for type job statistics |
| type TypeJobStats struct { |
| in_msg_cnt int |
| in_data_vol int64 |
| } |
| |
| // == API Datatypes ==// |
| // Type for supported data types |
| type DataType struct { |
| ID string `json:"id"` |
| KafkaInputTopic string `json:"kafkaInputTopic"` |
| InputJobType string `json:inputJobType` |
| InputJobDefinition struct { |
| KafkaOutputTopic string `json:kafkaOutputTopic` |
| } `json:inputJobDefinition` |
| |
| ext_job *[]byte |
| ext_job_created bool |
| ext_job_id string |
| } |
| |
| type DataTypes struct { |
| ProdDataTypes []DataType `json:"types"` |
| } |
| |
| type Minio_buckets struct { |
| Buckets map[string]bool |
| } |
| |
| //== External data types ==// |
| |
| // // Data type for event xml file download |
| type XmlFileEventHeader struct { |
| ProductName string `json:"productName"` |
| VendorName string `json:"vendorName"` |
| Location string `json:"location"` |
| Compression string `json:"compression"` |
| SourceName string `json:"sourceName"` |
| FileFormatType string `json:"fileFormatType"` |
| FileFormatVersion string `json:"fileFormatVersion"` |
| StartEpochMicrosec int64 `json:"startEpochMicrosec"` |
| LastEpochMicrosec int64 `json:"lastEpochMicrosec"` |
| Name string `json:"name"` |
| ChangeIdentifier string `json:"changeIdentifier"` |
| InternalLocation string `json:"internalLocation"` |
| TimeZoneOffset string `json:"timeZoneOffset"` |
| //ObjectStoreBucket string `json:"objectStoreBucket"` |
| } |
| |
| // Data types for input xml file |
| type MeasCollecFile struct { |
| XMLName xml.Name `xml:"measCollecFile"` |
| Text string `xml:",chardata"` |
| Xmlns string `xml:"xmlns,attr"` |
| Xsi string `xml:"xsi,attr"` |
| SchemaLocation string `xml:"schemaLocation,attr"` |
| FileHeader struct { |
| Text string `xml:",chardata"` |
| FileFormatVersion string `xml:"fileFormatVersion,attr"` |
| VendorName string `xml:"vendorName,attr"` |
| DnPrefix string `xml:"dnPrefix,attr"` |
| FileSender struct { |
| Text string `xml:",chardata"` |
| LocalDn string `xml:"localDn,attr"` |
| ElementType string `xml:"elementType,attr"` |
| } `xml:"fileSender"` |
| MeasCollec struct { |
| Text string `xml:",chardata"` |
| BeginTime string `xml:"beginTime,attr"` |
| } `xml:"measCollec"` |
| } `xml:"fileHeader"` |
| MeasData struct { |
| Text string `xml:",chardata"` |
| ManagedElement struct { |
| Text string `xml:",chardata"` |
| LocalDn string `xml:"localDn,attr"` |
| SwVersion string `xml:"swVersion,attr"` |
| } `xml:"managedElement"` |
| MeasInfo []struct { |
| Text string `xml:",chardata"` |
| MeasInfoId string `xml:"measInfoId,attr"` |
| Job struct { |
| Text string `xml:",chardata"` |
| JobId string `xml:"jobId,attr"` |
| } `xml:"job"` |
| GranPeriod struct { |
| Text string `xml:",chardata"` |
| Duration string `xml:"duration,attr"` |
| EndTime string `xml:"endTime,attr"` |
| } `xml:"granPeriod"` |
| RepPeriod struct { |
| Text string `xml:",chardata"` |
| Duration string `xml:"duration,attr"` |
| } `xml:"repPeriod"` |
| MeasType []struct { |
| Text string `xml:",chardata"` |
| P string `xml:"p,attr"` |
| } `xml:"measType"` |
| MeasValue []struct { |
| Text string `xml:",chardata"` |
| MeasObjLdn string `xml:"measObjLdn,attr"` |
| R []struct { |
| Text string `xml:",chardata"` |
| P string `xml:"p,attr"` |
| } `xml:"r"` |
| Suspect string `xml:"suspect"` |
| } `xml:"measValue"` |
| } `xml:"measInfo"` |
| } `xml:"measData"` |
| FileFooter struct { |
| Text string `xml:",chardata"` |
| MeasCollec struct { |
| Text string `xml:",chardata"` |
| EndTime string `xml:"endTime,attr"` |
| } `xml:"measCollec"` |
| } `xml:"fileFooter"` |
| } |
| |
| // Data type for json file |
| // Splitted in sevreal part to allow add/remove in lists |
| type MeasResults struct { |
| P int `json:"p"` |
| SValue string `json:"sValue"` |
| } |
| |
| type MeasValues struct { |
| MeasObjInstID string `json:"measObjInstId"` |
| SuspectFlag string `json:"suspectFlag"` |
| MeasResultsList []MeasResults `json:"measResults"` |
| } |
| |
| type SMeasTypes struct { |
| SMeasType string `json:"sMeasTypesList"` |
| } |
| |
| type MeasInfoList struct { |
| MeasInfoID struct { |
| SMeasInfoID string `json:"sMeasInfoId"` |
| } `json:"measInfoId"` |
| MeasTypes struct { |
| SMeasTypesList []string `json:"sMeasTypesList"` |
| } `json:"measTypes"` |
| MeasValuesList []MeasValues `json:"measValuesList"` |
| } |
| |
| type PMJsonFile struct { |
| Event struct { |
| CommonEventHeader struct { |
| Domain string `json:"domain"` |
| EventID string `json:"eventId"` |
| Sequence int `json:"sequence"` |
| EventName string `json:"eventName"` |
| SourceName string `json:"sourceName"` |
| ReportingEntityName string `json:"reportingEntityName"` |
| Priority string `json:"priority"` |
| StartEpochMicrosec int64 `json:"startEpochMicrosec"` |
| LastEpochMicrosec int64 `json:"lastEpochMicrosec"` |
| Version string `json:"version"` |
| VesEventListenerVersion string `json:"vesEventListenerVersion"` |
| TimeZoneOffset string `json:"timeZoneOffset"` |
| } `json:"commonEventHeader"` |
| Perf3GppFields struct { |
| Perf3GppFieldsVersion string `json:"perf3gppFieldsVersion"` |
| MeasDataCollection struct { |
| GranularityPeriod int `json:"granularityPeriod"` |
| MeasuredEntityUserName string `json:"measuredEntityUserName"` |
| MeasuredEntityDn string `json:"measuredEntityDn"` |
| MeasuredEntitySoftwareVersion string `json:"measuredEntitySoftwareVersion"` |
| SMeasInfoList []MeasInfoList `json:"measInfoList"` |
| } `json:"measDataCollection"` |
| } `json:"perf3gppFields"` |
| } `json:"event"` |
| } |
| |
| // Data type for converted json file message |
| type FileDownloadedEvt struct { |
| Filename string `json:"filename"` |
| } |
| |
| //== Variables ==// |
| |
| var AppState = Init |
| |
| // Lock for all internal data |
| var datalock sync.Mutex |
| |
| var producer_instance_name string = producer_name |
| |
| // Keep all info type jobs, key == type id |
| var TypeJobs map[string]TypeJobRecord = make(map[string]TypeJobRecord) |
| |
| // Keep all info jobs, key == job id |
| var InfoJobs map[string]InfoJobRecord = make(map[string]InfoJobRecord) |
| |
| var InfoTypes DataTypes |
| |
| // Limiter - valid for all jobs |
| var jobLimiterChan = make(chan struct{}, parallelism_limiter) |
| |
| // TODO: Config param? |
| var bucket_location = "swe" |
| |
| var httpclient = &http.Client{} |
| |
| // == Env variables ==// |
| var bootstrapserver = os.Getenv("KAFKA_SERVER") |
| var files_volume = os.Getenv("FILES_VOLUME") |
| var ics_server = os.Getenv("ICS") |
| var self = os.Getenv("SELF") |
| var filestore_user = os.Getenv("FILESTORE_USER") |
| var filestore_pwd = os.Getenv("FILESTORE_PWD") |
| var filestore_server = os.Getenv("FILESTORE_SERVER") |
| |
| var data_out_channel = make(chan *KafkaPayload, writer_queue_length) |
| var writer_control = make(chan WriterControl, 1) |
| |
| var minio_bucketlist map[string]Minio_buckets = make(map[string]Minio_buckets) |
| |
| // == Main ==// |
| func main() { |
| |
| //log.SetLevel(log.InfoLevel) |
| log.SetLevel(log.TraceLevel) |
| |
| log.Info("Server starting...") |
| |
| if self == "" { |
| log.Panic("Env SELF not configured") |
| } |
| if bootstrapserver == "" { |
| log.Panic("Env KAFKA_SERVER not set") |
| } |
| if ics_server == "" { |
| log.Panic("Env ICS not set") |
| } |
| if os.Getenv("KP") != "" { |
| producer_instance_name = producer_instance_name + "-" + os.Getenv("KP") |
| } |
| |
| rtr := mux.NewRouter() |
| rtr.HandleFunc("/callbacks/job/"+producer_instance_name, create_job) |
| rtr.HandleFunc("/callbacks/job/"+producer_instance_name+"/{job_id}", delete_job) |
| rtr.HandleFunc("/callbacks/supervision/"+producer_instance_name, supervise_producer) |
| rtr.HandleFunc("/statistics", statistics) |
| rtr.HandleFunc("/logging/{level}", logging_level) |
| rtr.HandleFunc("/logging", logging_level) |
| rtr.HandleFunc("/", alive) |
| |
| //For perf/mem profiling |
| rtr.HandleFunc("/custom_debug_path/profile", pprof.Profile) |
| |
| http.Handle("/", rtr) |
| |
| http_server := &http.Server{Addr: ":" + strconv.Itoa(http_port), Handler: nil} |
| |
| cer, err := tls.LoadX509KeyPair(server_crt, server_key) |
| if err != nil { |
| log.Error("Cannot load key and cert - ", err) |
| return |
| } |
| config := &tls.Config{Certificates: []tls.Certificate{cer}} |
| https_server := &http.Server{Addr: ":" + strconv.Itoa(https_port), TLSConfig: config, Handler: nil} |
| |
| // Run http |
| go func() { |
| log.Info("Starting http service...") |
| err := http_server.ListenAndServe() |
| if err == http.ErrServerClosed { // graceful shutdown |
| log.Info("http server shutdown...") |
| } else if err != nil { |
| log.Error("http server error: ", err) |
| } |
| }() |
| |
| // Run https |
| go func() { |
| log.Info("Starting https service...") |
| err := https_server.ListenAndServe() |
| if err == http.ErrServerClosed { // graceful shutdown |
| log.Info("https server shutdown...") |
| } else if err != nil { |
| log.Error("https server error: ", err) |
| } |
| }() |
| check_tcp(strconv.Itoa(http_port)) |
| check_tcp(strconv.Itoa(https_port)) |
| |
| go start_topic_writer(writer_control, data_out_channel) |
| |
| //Setup proc for periodic type registration |
| var event_chan = make(chan int) //Channel for stopping the proc |
| go periodic_registration(event_chan) |
| |
| //Wait for term/int signal do try to shut down gracefully |
| sigs := make(chan os.Signal, 1) |
| signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) |
| go func() { |
| sig := <-sigs |
| fmt.Printf("Received signal %s - application will terminate\n", sig) |
| event_chan <- 0 // Stop periodic registration |
| datalock.Lock() |
| defer datalock.Unlock() |
| AppState = Terminating |
| http_server.Shutdown(context.Background()) |
| https_server.Shutdown(context.Background()) |
| // Stopping jobs |
| for key, _ := range TypeJobs { |
| log.Info("Stopping type job:", key) |
| for _, dp := range InfoTypes.ProdDataTypes { |
| if key == dp.ID { |
| remove_type_job(dp) |
| } |
| } |
| } |
| }() |
| |
| AppState = Running |
| |
| //Wait until all go routines has exited |
| runtime.Goexit() |
| |
| fmt.Println("main routine exit") |
| fmt.Println("server stopped") |
| } |
| |
| func check_tcp(port string) { |
| log.Info("Checking tcp port: ", port) |
| for true { |
| address := net.JoinHostPort("localhost", port) |
| // 3 second timeout |
| conn, err := net.DialTimeout("tcp", address, 3*time.Second) |
| if err != nil { |
| log.Info("Checking tcp port: ", port, " failed, retrying...") |
| } else { |
| if conn != nil { |
| log.Info("Checking tcp port: ", port, " - OK") |
| _ = conn.Close() |
| return |
| } else { |
| log.Info("Checking tcp port: ", port, " failed, retrying...") |
| } |
| } |
| } |
| } |
| |
| //== Core functions ==// |
| |
| // Run periodic registration of producers |
| func periodic_registration(evtch chan int) { |
| var delay int = 1 |
| for { |
| select { |
| case msg := <-evtch: |
| if msg == 0 { // Stop thread |
| return |
| } |
| case <-time.After(time.Duration(delay) * time.Second): |
| ok := register_producer() |
| if ok { |
| delay = registration_delay_long |
| } else { |
| if delay < registration_delay_long { |
| delay += registration_delay_short |
| } else { |
| delay = registration_delay_short |
| } |
| } |
| } |
| } |
| } |
| |
| func register_producer() bool { |
| |
| log.Info("Registering producer: ", producer_instance_name) |
| |
| file, err := os.ReadFile(config_file) |
| if err != nil { |
| log.Error("Cannot read config file: ", config_file) |
| log.Error("Registering producer: ", producer_instance_name, " - failed") |
| return false |
| } |
| data := DataTypes{} |
| err = jsoniter.Unmarshal([]byte(file), &data) |
| if err != nil { |
| log.Error("Cannot parse config file: ", config_file) |
| log.Error("Registering producer: ", producer_instance_name, " - failed") |
| return false |
| } |
| var new_type_names []string |
| |
| for i := 0; i < len(data.ProdDataTypes); i++ { |
| t1 := make(map[string]interface{}) |
| t2 := make(map[string]interface{}) |
| |
| t2["schema"] = "http://json-schema.org/draft-07/schema#" |
| t2["title"] = data.ProdDataTypes[i].ID |
| t2["description"] = data.ProdDataTypes[i].ID |
| t2["type"] = "object" |
| |
| t1["info_job_data_schema"] = t2 |
| |
| json, err := json.Marshal(t1) |
| if err != nil { |
| log.Error("Cannot create json for type: ", data.ProdDataTypes[i].ID) |
| log.Error("Registering producer: ", producer_instance_name, " - failed") |
| return false |
| } else { |
| ok := send_http_request(json, http.MethodPut, "http://"+ics_server+"/data-producer/v1/info-types/"+data.ProdDataTypes[i].ID, true, creds_grant_type != "") |
| if !ok { |
| log.Error("Cannot register type: ", data.ProdDataTypes[i].ID) |
| log.Error("Registering producer: ", producer_instance_name, " - failed") |
| return false |
| } |
| new_type_names = append(new_type_names, data.ProdDataTypes[i].ID) |
| } |
| |
| } |
| |
| log.Debug("Registering types: ", new_type_names) |
| m := make(map[string]interface{}) |
| m["supported_info_types"] = new_type_names |
| m["info_job_callback_url"] = "http://" + self + "/callbacks/job/" + producer_instance_name |
| m["info_producer_supervision_callback_url"] = "http://" + self + "/callbacks/supervision/" + producer_instance_name |
| |
| json, err := json.Marshal(m) |
| if err != nil { |
| log.Error("Cannot create json for producer: ", producer_instance_name) |
| log.Error("Registering producer: ", producer_instance_name, " - failed") |
| return false |
| } |
| ok := send_http_request(json, http.MethodPut, "http://"+ics_server+"/data-producer/v1/info-producers/"+producer_instance_name, true, creds_grant_type != "") |
| if !ok { |
| log.Error("Cannot register producer: ", producer_instance_name) |
| log.Error("Registering producer: ", producer_instance_name, " - failed") |
| return false |
| } |
| datalock.Lock() |
| defer datalock.Unlock() |
| |
| var current_type_names []string |
| for _, v := range InfoTypes.ProdDataTypes { |
| current_type_names = append(current_type_names, v.ID) |
| if contains_str(new_type_names, v.ID) { |
| //Type exist |
| log.Debug("Type ", v.ID, " exists") |
| create_ext_job(v) |
| } else { |
| //Type is removed |
| log.Info("Removing type job for type: ", v.ID, " Type not in configuration") |
| remove_type_job(v) |
| } |
| } |
| |
| for _, v := range data.ProdDataTypes { |
| if contains_str(current_type_names, v.ID) { |
| //Type exist |
| log.Debug("Type ", v.ID, " exists") |
| create_ext_job(v) |
| } else { |
| //Type is new |
| log.Info("Adding type job for type: ", v.ID, " Type added to configuration") |
| start_type_job(v) |
| } |
| } |
| |
| InfoTypes = data |
| log.Debug("Datatypes: ", InfoTypes) |
| |
| log.Info("Registering producer: ", producer_instance_name, " - OK") |
| return true |
| } |
| |
| func remove_type_job(dp DataType) { |
| log.Info("Removing type job: ", dp.ID) |
| j, ok := TypeJobs[dp.ID] |
| if ok { |
| j.reader_control <- ReaderControl{"EXIT"} |
| } |
| |
| if dp.ext_job_created == true { |
| dp.ext_job_id = dp.InputJobType + "_" + generate_uuid_from_type(dp.InputJobType) |
| ok := send_http_request(*dp.ext_job, http.MethodDelete, "http://"+ics_server+"/data-consumer/v1/info-jobs/"+dp.ext_job_id, true, creds_grant_type != "") |
| if !ok { |
| log.Error("Cannot delete job: ", dp.ext_job_id) |
| } |
| dp.ext_job_created = false |
| dp.ext_job = nil |
| } |
| |
| } |
| |
| func start_type_job(dp DataType) { |
| log.Info("Starting type job: ", dp.ID) |
| job_record := TypeJobRecord{} |
| |
| job_record.job_control = make(chan JobControl, 1) |
| job_record.reader_control = make(chan ReaderControl, 1) |
| job_record.data_in_channel = make(chan *KafkaPayload, reader_queue_length) |
| job_record.InfoType = dp.ID |
| job_record.InputTopic = dp.KafkaInputTopic |
| job_record.groupId = "kafka-procon-" + dp.ID |
| job_record.clientId = dp.ID + "-" + os.Getenv("KP") |
| var stats TypeJobStats |
| job_record.statistics = &stats |
| |
| switch dp.ID { |
| case "xml-file-data-to-filestore": |
| go start_job_xml_file_data(dp.ID, job_record.job_control, job_record.data_in_channel, data_out_channel, "", "pm-files-json") |
| case "xml-file-data": |
| go start_job_xml_file_data(dp.ID, job_record.job_control, job_record.data_in_channel, data_out_channel, files_volume, "") |
| case "json-file-data-from-filestore": |
| go start_job_json_file_data(dp.ID, job_record.job_control, job_record.data_in_channel, data_out_channel, true) |
| case "json-file-data": |
| go start_job_json_file_data(dp.ID, job_record.job_control, job_record.data_in_channel, data_out_channel, false) |
| case "json-file-data-from-filestore-to-influx": |
| go start_job_json_file_data_influx(dp.ID, job_record.job_control, job_record.data_in_channel, true) |
| |
| default: |
| } |
| |
| go start_topic_reader(dp.KafkaInputTopic, dp.ID, job_record.reader_control, job_record.data_in_channel, job_record.groupId, job_record.clientId, &stats) |
| |
| TypeJobs[dp.ID] = job_record |
| log.Debug("Type job input type: ", dp.InputJobType) |
| create_ext_job(dp) |
| } |
| |
| func create_ext_job(dp DataType) { |
| if dp.InputJobType != "" { |
| jb := make(map[string]interface{}) |
| jb["info_type_id"] = dp.InputJobType |
| jb["job_owner"] = "console" //TODO: |
| jb["status_notification_uri"] = "http://callback:80/post" |
| jb1 := make(map[string]interface{}) |
| jb["job_definition"] = jb1 |
| jb1["kafkaOutputTopic"] = dp.InputJobDefinition.KafkaOutputTopic |
| |
| json, err := json.Marshal(jb) |
| dp.ext_job_created = false |
| dp.ext_job = nil |
| if err != nil { |
| log.Error("Cannot create json for type: ", dp.InputJobType) |
| return |
| } |
| |
| dp.ext_job_id = dp.InputJobType + "_" + generate_uuid_from_type(dp.InputJobType) |
| ok := false |
| for !ok { |
| ok = send_http_request(json, http.MethodPut, "http://"+ics_server+"/data-consumer/v1/info-jobs/"+dp.ext_job_id, true, creds_grant_type != "") |
| if !ok { |
| log.Error("Cannot register job: ", dp.InputJobType) |
| } |
| } |
| log.Debug("Registered job ok: ", dp.InputJobType) |
| dp.ext_job_created = true |
| dp.ext_job = &json |
| } |
| } |
| |
| func remove_info_job(jobid string) { |
| log.Info("Removing info job: ", jobid) |
| filter := Filter{} |
| filter.JobId = jobid |
| jc := JobControl{} |
| jc.command = "REMOVE-FILTER" |
| jc.filter = filter |
| infoJob := InfoJobs[jobid] |
| typeJob := TypeJobs[infoJob.job_info.InfoTypeIdentity] |
| typeJob.job_control <- jc |
| delete(InfoJobs, jobid) |
| |
| } |
| |
| // == Helper functions ==// |
| |
| // Function to check the status of a mutex lock |
| func MutexLocked(m *sync.Mutex) bool { |
| state := reflect.ValueOf(m).Elem().FieldByName("state") |
| return state.Int()&mutexLocked == mutexLocked |
| } |
| |
| // Test if slice contains a string |
| func contains_str(s []string, e string) bool { |
| for _, a := range s { |
| if a == e { |
| return true |
| } |
| } |
| return false |
| } |
| |
| // Send a http request with json (json may be nil) |
| func send_http_request(json []byte, method string, url string, retry bool, useAuth bool) bool { |
| |
| // set the HTTP method, url, and request body |
| var req *http.Request |
| var err error |
| if json == nil { |
| req, err = http.NewRequest(method, url, http.NoBody) |
| } else { |
| req, err = http.NewRequest(method, url, bytes.NewBuffer(json)) |
| req.Header.Set("Content-Type", "application/json; charset=utf-8") |
| } |
| if err != nil { |
| log.Error("Cannot create http request, method: ", method, " url: ", url) |
| return false |
| } |
| |
| if useAuth { |
| token, err := fetch_token() |
| if err != nil { |
| log.Error("Cannot fetch token for http request: ", err) |
| return false |
| } |
| req.Header.Set("Authorization", "Bearer "+token.TokenValue) |
| } |
| |
| log.Debug("HTTP request: ", req) |
| |
| log.Debug("Sending http request") |
| resp, err2 := httpclient.Do(req) |
| if err2 != nil { |
| log.Error("Http request error: ", err2) |
| log.Error("Cannot send http request method: ", method, " url: ", url) |
| } else { |
| if resp.StatusCode == 200 || resp.StatusCode == 201 || resp.StatusCode == 204 { |
| log.Debug("Accepted http status: ", resp.StatusCode) |
| resp.Body.Close() |
| return true |
| } |
| log.Debug("HTTP resp: ", resp) |
| resp.Body.Close() |
| } |
| return false |
| } |
| |
| func fetch_token() (*kafka.OAuthBearerToken, error) { |
| log.Debug("Get token inline") |
| conf := &clientcredentials.Config{ |
| ClientID: creds_client_id, |
| ClientSecret: creds_client_secret, |
| TokenURL: creds_service_url, |
| } |
| token, err := conf.Token(context.Background()) |
| if err != nil { |
| log.Warning("Cannot fetch access token: ", err) |
| return nil, err |
| } |
| extensions := map[string]string{} |
| log.Debug("=====================================================") |
| log.Debug("token: ", token) |
| log.Debug("=====================================================") |
| log.Debug("TokenValue: ", token.AccessToken) |
| log.Debug("=====================================================") |
| log.Debug("Expiration: ", token.Expiry) |
| t := token.Expiry |
| oauthBearerToken := kafka.OAuthBearerToken{ |
| TokenValue: token.AccessToken, |
| Expiration: t, |
| Extensions: extensions, |
| } |
| |
| return &oauthBearerToken, nil |
| } |
| |
| // Function to print memory details |
| // https://pkg.go.dev/runtime#MemStats |
| func PrintMemUsage() { |
| if log.IsLevelEnabled(log.DebugLevel) || log.IsLevelEnabled(log.TraceLevel) { |
| var m runtime.MemStats |
| runtime.ReadMemStats(&m) |
| fmt.Printf("Alloc = %v MiB", bToMb(m.Alloc)) |
| fmt.Printf("\tTotalAlloc = %v MiB", bToMb(m.TotalAlloc)) |
| fmt.Printf("\tSys = %v MiB", bToMb(m.Sys)) |
| fmt.Printf("\tNumGC = %v\n", m.NumGC) |
| fmt.Printf("HeapSys = %v MiB", bToMb(m.HeapSys)) |
| fmt.Printf("\tStackSys = %v MiB", bToMb(m.StackSys)) |
| fmt.Printf("\tMSpanSys = %v MiB", bToMb(m.MSpanSys)) |
| fmt.Printf("\tMCacheSys = %v MiB", bToMb(m.MCacheSys)) |
| fmt.Printf("\tBuckHashSys = %v MiB", bToMb(m.BuckHashSys)) |
| fmt.Printf("\tGCSys = %v MiB", bToMb(m.GCSys)) |
| fmt.Printf("\tOtherSys = %v MiB\n", bToMb(m.OtherSys)) |
| } |
| } |
| |
| func bToMb(b uint64) uint64 { |
| return b / 1024 / 1024 |
| } |
| |
| func generate_uuid_from_type(s string) string { |
| if len(s) > 16 { |
| s = s[:16] |
| } |
| for len(s) < 16 { |
| s = s + "0" |
| } |
| b := []byte(s) |
| b = b[:16] |
| uuid, _ := uuid.FromBytes(b) |
| return uuid.String() |
| } |
| |
| // Write gzipped data to a Writer |
| func gzipWrite(w io.Writer, data *[]byte) error { |
| gw, err1 := gzip.NewWriterLevel(w, gzip.BestSpeed) |
| |
| if err1 != nil { |
| return err1 |
| } |
| defer gw.Close() |
| _, err2 := gw.Write(*data) |
| return err2 |
| } |
| |
| // Write gunzipped data from Reader to a Writer |
| func gunzipReaderToWriter(w io.Writer, data io.Reader) error { |
| gr, err1 := gzip.NewReader(data) |
| |
| if err1 != nil { |
| return err1 |
| } |
| defer gr.Close() |
| data2, err2 := io.ReadAll(gr) |
| if err2 != nil { |
| return err2 |
| } |
| _, err3 := w.Write(data2) |
| if err3 != nil { |
| return err3 |
| } |
| return nil |
| } |
| |
| func create_minio_bucket(mc *minio.Client, client_id string, bucket string) error { |
| tctx := context.Background() |
| err := mc.MakeBucket(tctx, bucket, minio.MakeBucketOptions{Region: bucket_location}) |
| if err != nil { |
| // Check to see if we already own this bucket (which happens if you run this twice) |
| exists, errBucketExists := mc.BucketExists(tctx, bucket) |
| if errBucketExists == nil && exists { |
| log.Debug("Already own bucket:", bucket) |
| add_bucket(client_id, bucket) |
| return nil |
| } else { |
| log.Error("Cannot create or check bucket ", bucket, " in minio client", err) |
| return err |
| } |
| } |
| log.Debug("Successfully created bucket: ", bucket) |
| add_bucket(client_id, bucket) |
| return nil |
| } |
| |
| func check_minio_bucket(mc *minio.Client, client_id string, bucket string) bool { |
| ok := bucket_exist(client_id, bucket) |
| if ok { |
| return true |
| } |
| tctx := context.Background() |
| exists, err := mc.BucketExists(tctx, bucket) |
| if err == nil && exists { |
| log.Debug("Already own bucket:", bucket) |
| return true |
| } |
| log.Error("Bucket does not exist, bucket ", bucket, " in minio client", err) |
| return false |
| } |
| |
| func add_bucket(minio_id string, bucket string) { |
| datalock.Lock() |
| defer datalock.Unlock() |
| |
| b, ok := minio_bucketlist[minio_id] |
| if !ok { |
| b = Minio_buckets{} |
| b.Buckets = make(map[string]bool) |
| } |
| b.Buckets[bucket] = true |
| minio_bucketlist[minio_id] = b |
| } |
| |
| func bucket_exist(minio_id string, bucket string) bool { |
| datalock.Lock() |
| defer datalock.Unlock() |
| |
| b, ok := minio_bucketlist[minio_id] |
| if !ok { |
| return false |
| } |
| _, ok = b.Buckets[bucket] |
| return ok |
| } |
| |
| //== http api functions ==// |
| |
| // create/update job |
| func create_job(w http.ResponseWriter, req *http.Request) { |
| log.Debug("Create job, http method: ", req.Method) |
| if req.Method != http.MethodPost { |
| log.Error("Create job, http method not allowed") |
| w.WriteHeader(http.StatusMethodNotAllowed) |
| return |
| } |
| ct := req.Header.Get("Content-Type") |
| if ct != "application/json" { |
| log.Error("Create job, bad content type") |
| http.Error(w, "Bad content type", http.StatusBadRequest) |
| return |
| } |
| |
| var t InfoJobDataType |
| err := json.NewDecoder(req.Body).Decode(&t) |
| if err != nil { |
| log.Error("Create job, cannot parse json,", err) |
| http.Error(w, "Cannot parse json", http.StatusBadRequest) |
| return |
| } |
| log.Debug("Creating job, id: ", t.InfoJobIdentity) |
| datalock.Lock() |
| defer datalock.Unlock() |
| |
| job_id := t.InfoJobIdentity |
| job_record, job_found := InfoJobs[job_id] |
| type_job_record, found_type := TypeJobs[t.InfoTypeIdentity] |
| if !job_found { |
| if !found_type { |
| log.Error("Type ", t.InfoTypeIdentity, " does not exist") |
| http.Error(w, "Type "+t.InfoTypeIdentity+" does not exist", http.StatusBadRequest) |
| return |
| } |
| } else if t.InfoTypeIdentity != job_record.job_info.InfoTypeIdentity { |
| log.Error("Job cannot change type") |
| http.Error(w, "Job cannot change type", http.StatusBadRequest) |
| return |
| } else if t.InfoJobData.KafkaOutputTopic != job_record.job_info.InfoJobData.KafkaOutputTopic { |
| log.Error("Job cannot change topic") |
| http.Error(w, "Job cannot change topic", http.StatusBadRequest) |
| return |
| } else if !found_type { |
| //Should never happen, if the type is removed then job is stopped |
| log.Error("Type ", t.InfoTypeIdentity, " does not exist") |
| http.Error(w, "Type "+t.InfoTypeIdentity+" does not exist", http.StatusBadRequest) |
| return |
| } |
| |
| if !job_found { |
| job_record = InfoJobRecord{} |
| job_record.job_info = t |
| output_topic := t.InfoJobData.KafkaOutputTopic |
| job_record.output_topic = t.InfoJobData.KafkaOutputTopic |
| log.Debug("Starting infojob ", job_id, ", type ", t.InfoTypeIdentity, ", output topic", output_topic) |
| |
| var stats InfoJobStats |
| job_record.statistics = &stats |
| |
| filter := Filter{} |
| filter.JobId = job_id |
| filter.OutputTopic = job_record.output_topic |
| |
| jc := JobControl{} |
| |
| jc.command = "ADD-FILTER" |
| |
| if t.InfoTypeIdentity == "json-file-data-from-filestore" || t.InfoTypeIdentity == "json-file-data" || t.InfoTypeIdentity == "json-file-data-from-filestore-to-influx" { |
| fm := FilterMaps{} |
| fm.sourceNameMap = make(map[string]bool) |
| fm.measObjClassMap = make(map[string]bool) |
| fm.measObjInstIdsMap = make(map[string]bool) |
| fm.measTypesMap = make(map[string]bool) |
| if t.InfoJobData.FilterParams.MeasuredEntityDns != nil { |
| for _, v := range t.InfoJobData.FilterParams.MeasuredEntityDns { |
| fm.sourceNameMap[v] = true |
| } |
| } |
| if t.InfoJobData.FilterParams.MeasObjClass != nil { |
| for _, v := range t.InfoJobData.FilterParams.MeasObjClass { |
| fm.measObjClassMap[v] = true |
| } |
| } |
| if t.InfoJobData.FilterParams.MeasObjInstIds != nil { |
| for _, v := range t.InfoJobData.FilterParams.MeasObjInstIds { |
| fm.measObjInstIdsMap[v] = true |
| } |
| } |
| if t.InfoJobData.FilterParams.MeasTypes != nil { |
| for _, v := range t.InfoJobData.FilterParams.MeasTypes { |
| fm.measTypesMap[v] = true |
| } |
| } |
| filter.filter = fm |
| } |
| if t.InfoTypeIdentity == "json-file-data-from-filestore-to-influx" { |
| influxparam := InfluxJobParameters{} |
| influxparam.DbUrl = t.InfoJobData.DbUrl |
| influxparam.DbOrg = t.InfoJobData.DbOrg |
| influxparam.DbBucket = t.InfoJobData.DbBucket |
| influxparam.DbToken = t.InfoJobData.DbToken |
| filter.influxParameters = influxparam |
| } |
| |
| jc.filter = filter |
| InfoJobs[job_id] = job_record |
| |
| type_job_record.job_control <- jc |
| |
| } else { |
| //TODO |
| //Update job |
| } |
| } |
| |
| // delete job |
| func delete_job(w http.ResponseWriter, req *http.Request) { |
| if req.Method != http.MethodDelete { |
| w.WriteHeader(http.StatusMethodNotAllowed) |
| return |
| } |
| datalock.Lock() |
| defer datalock.Unlock() |
| |
| vars := mux.Vars(req) |
| |
| if id, ok := vars["job_id"]; ok { |
| if _, ok := InfoJobs[id]; ok { |
| remove_info_job(id) |
| w.WriteHeader(http.StatusNoContent) |
| log.Info("Job ", id, " deleted") |
| return |
| } |
| } |
| w.WriteHeader(http.StatusNotFound) |
| } |
| |
| // job supervision |
| func supervise_job(w http.ResponseWriter, req *http.Request) { |
| if req.Method != http.MethodGet { |
| w.WriteHeader(http.StatusMethodNotAllowed) |
| return |
| } |
| datalock.Lock() |
| defer datalock.Unlock() |
| |
| vars := mux.Vars(req) |
| |
| log.Debug("Supervising, job: ", vars["job_id"]) |
| if id, ok := vars["job_id"]; ok { |
| if _, ok := InfoJobs[id]; ok { |
| log.Debug("Supervision ok, job", id) |
| return |
| } |
| } |
| w.WriteHeader(http.StatusNotFound) |
| } |
| |
| // producer supervision |
| func supervise_producer(w http.ResponseWriter, req *http.Request) { |
| if req.Method != http.MethodGet { |
| w.WriteHeader(http.StatusMethodNotAllowed) |
| return |
| } |
| |
| w.WriteHeader(http.StatusOK) |
| } |
| |
| // producer statistics, all jobs |
| func statistics(w http.ResponseWriter, req *http.Request) { |
| if req.Method != http.MethodGet { |
| w.WriteHeader(http.StatusMethodNotAllowed) |
| return |
| } |
| m := make(map[string]interface{}) |
| log.Debug("producer statictics, locked? ", MutexLocked(&datalock)) |
| datalock.Lock() |
| defer datalock.Unlock() |
| req.Header.Set("Content-Type", "application/json; charset=utf-8") |
| m["number-of-jobs"] = len(InfoJobs) |
| m["number-of-types"] = len(InfoTypes.ProdDataTypes) |
| qm := make(map[string]interface{}) |
| m["jobs"] = qm |
| for key, elem := range InfoJobs { |
| jm := make(map[string]interface{}) |
| qm[key] = jm |
| jm["type"] = elem.job_info.InfoTypeIdentity |
| typeJob := TypeJobs[elem.job_info.InfoTypeIdentity] |
| jm["groupId"] = typeJob.groupId |
| jm["clientID"] = typeJob.clientId |
| jm["input topic"] = typeJob.InputTopic |
| jm["input queue length - job ("+fmt.Sprintf("%v", cap(typeJob.data_in_channel))+")"] = len(typeJob.data_in_channel) |
| jm["output topic"] = elem.output_topic |
| jm["output queue length - producer ("+fmt.Sprintf("%v", cap(data_out_channel))+")"] = len(data_out_channel) |
| jm["msg_in (type)"] = typeJob.statistics.in_msg_cnt |
| jm["msg_out (job)"] = elem.statistics.out_msg_cnt |
| |
| } |
| json, err := json.Marshal(m) |
| if err != nil { |
| w.WriteHeader(http.StatusInternalServerError) |
| log.Error("Cannot marshal statistics json") |
| return |
| } |
| _, err = w.Write(json) |
| if err != nil { |
| w.WriteHeader(http.StatusInternalServerError) |
| log.Error("Cannot send statistics json") |
| return |
| } |
| } |
| |
| // Simple alive check |
| func alive(w http.ResponseWriter, req *http.Request) { |
| //Alive check |
| } |
| |
| // Get/Set logging level |
| func logging_level(w http.ResponseWriter, req *http.Request) { |
| vars := mux.Vars(req) |
| if level, ok := vars["level"]; ok { |
| if req.Method == http.MethodPut { |
| switch level { |
| case "trace": |
| log.SetLevel(log.TraceLevel) |
| case "debug": |
| log.SetLevel(log.DebugLevel) |
| case "info": |
| log.SetLevel(log.InfoLevel) |
| case "warn": |
| log.SetLevel(log.WarnLevel) |
| case "error": |
| log.SetLevel(log.ErrorLevel) |
| case "fatal": |
| log.SetLevel(log.FatalLevel) |
| case "panic": |
| log.SetLevel(log.PanicLevel) |
| default: |
| w.WriteHeader(http.StatusNotFound) |
| } |
| } else { |
| w.WriteHeader(http.StatusMethodNotAllowed) |
| } |
| } else { |
| if req.Method == http.MethodGet { |
| msg := "none" |
| if log.IsLevelEnabled(log.PanicLevel) { |
| msg = "panic" |
| } else if log.IsLevelEnabled(log.FatalLevel) { |
| msg = "fatal" |
| } else if log.IsLevelEnabled(log.ErrorLevel) { |
| msg = "error" |
| } else if log.IsLevelEnabled(log.WarnLevel) { |
| msg = "warn" |
| } else if log.IsLevelEnabled(log.InfoLevel) { |
| msg = "info" |
| } else if log.IsLevelEnabled(log.DebugLevel) { |
| msg = "debug" |
| } else if log.IsLevelEnabled(log.TraceLevel) { |
| msg = "trace" |
| } |
| w.Header().Set("Content-Type", "application/text") |
| w.Write([]byte(msg)) |
| } else { |
| w.WriteHeader(http.StatusMethodNotAllowed) |
| } |
| } |
| } |
| |
| func start_topic_reader(topic string, type_id string, control_ch chan ReaderControl, data_ch chan *KafkaPayload, gid string, cid string, stats *TypeJobStats) { |
| |
| log.Info("Topic reader starting, topic: ", topic, " for type: ", type_id) |
| |
| topic_ok := false |
| var c *kafka.Consumer = nil |
| running := true |
| |
| for topic_ok == false { |
| |
| select { |
| case reader_ctrl := <-control_ch: |
| if reader_ctrl.command == "EXIT" { |
| log.Info("Topic reader on topic: ", topic, " for type: ", type_id, " - stopped") |
| data_ch <- nil //Signal to job handler |
| running = false |
| return |
| } |
| case <-time.After(1 * time.Second): |
| if !running { |
| return |
| } |
| if c == nil { |
| c = create_kafka_consumer(type_id, gid, cid) |
| if c == nil { |
| log.Info("Cannot start consumer on topic: ", topic, " for type: ", type_id, " - retrying") |
| } else { |
| log.Info("Consumer started on topic: ", topic, " for type: ", type_id) |
| } |
| } |
| if c != nil && topic_ok == false { |
| err := c.SubscribeTopics([]string{topic}, nil) |
| if err != nil { |
| log.Info("Topic reader cannot start subscribing on topic: ", topic, " for type: ", type_id, " - retrying -- error details: ", err) |
| } else { |
| log.Info("Topic reader subscribing on topic: ", topic, " for type: ", type_id) |
| topic_ok = true |
| } |
| } |
| } |
| } |
| log.Info("Topic reader ready on topic: ", topic, " for type: ", type_id) |
| |
| var event_chan = make(chan int) |
| go func() { |
| for { |
| select { |
| case evt := <-c.Events(): |
| switch evt.(type) { |
| case kafka.OAuthBearerTokenRefresh: |
| log.Debug("New consumer token needed: ", evt) |
| token, err := fetch_token() |
| if err != nil { |
| log.Warning("Cannot cannot fetch token: ", err) |
| c.SetOAuthBearerTokenFailure(err.Error()) |
| } else { |
| setTokenError := c.SetOAuthBearerToken(*token) |
| if setTokenError != nil { |
| log.Warning("Cannot cannot set token: ", setTokenError) |
| c.SetOAuthBearerTokenFailure(setTokenError.Error()) |
| } |
| } |
| default: |
| log.Debug("Dumping topic reader event on topic: ", topic, " for type: ", type_id, " evt: ", evt.String()) |
| } |
| |
| case msg := <-event_chan: |
| if msg == 0 { |
| return |
| } |
| case <-time.After(1 * time.Second): |
| if !running { |
| return |
| } |
| } |
| } |
| }() |
| |
| go func() { |
| for { |
| for { |
| select { |
| case reader_ctrl := <-control_ch: |
| if reader_ctrl.command == "EXIT" { |
| event_chan <- 0 |
| log.Debug("Topic reader on topic: ", topic, " for type: ", type_id, " - stopped") |
| data_ch <- nil //Signal to job handler |
| defer c.Close() |
| return |
| } |
| default: |
| |
| ev := c.Poll(1000) |
| if ev == nil { |
| log.Debug("Topic Reader for type: ", type_id, " Nothing to consume on topic: ", topic) |
| continue |
| } |
| switch e := ev.(type) { |
| case *kafka.Message: |
| var kmsg KafkaPayload |
| kmsg.msg = e |
| |
| c.Commit() |
| |
| data_ch <- &kmsg |
| stats.in_msg_cnt++ |
| log.Debug("Reader msg: ", &kmsg) |
| log.Debug("Reader - data_ch ", data_ch) |
| case kafka.Error: |
| fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e) |
| |
| case kafka.OAuthBearerTokenRefresh: |
| log.Debug("New consumer token needed: ", ev) |
| token, err := fetch_token() |
| if err != nil { |
| log.Warning("Cannot cannot fetch token: ", err) |
| c.SetOAuthBearerTokenFailure(err.Error()) |
| } else { |
| setTokenError := c.SetOAuthBearerToken(*token) |
| if setTokenError != nil { |
| log.Warning("Cannot cannot set token: ", setTokenError) |
| c.SetOAuthBearerTokenFailure(setTokenError.Error()) |
| } |
| } |
| default: |
| fmt.Printf("Ignored %v\n", e) |
| } |
| } |
| } |
| } |
| }() |
| } |
| |
| func start_topic_writer(control_ch chan WriterControl, data_ch chan *KafkaPayload) { |
| |
| var kafka_producer *kafka.Producer |
| |
| running := true |
| log.Info("Topic writer starting") |
| |
| // Wait for kafka producer to become available - and be prepared to exit the writer |
| for kafka_producer == nil { |
| select { |
| case writer_ctl := <-control_ch: |
| if writer_ctl.command == "EXIT" { |
| //ignore cmd |
| } |
| default: |
| kafka_producer = start_producer() |
| if kafka_producer == nil { |
| log.Debug("Could not start kafka producer - retrying") |
| time.Sleep(1 * time.Second) |
| } else { |
| log.Debug("Kafka producer started") |
| } |
| } |
| } |
| |
| var event_chan = make(chan int) |
| go func() { |
| for { |
| select { |
| case evt := <-kafka_producer.Events(): |
| switch evt.(type) { |
| case *kafka.Message: |
| m := evt.(*kafka.Message) |
| |
| if m.TopicPartition.Error != nil { |
| log.Debug("Dumping topic writer event, failed: ", m.TopicPartition.Error) |
| } else { |
| log.Debug("Dumping topic writer event, message to topic: ", *m.TopicPartition.Topic, " at offset: ", m.TopicPartition.Offset, " at partition: ", m.TopicPartition.Partition) |
| } |
| case kafka.Error: |
| log.Debug("Dumping topic writer event, error: ", evt) |
| case kafka.OAuthBearerTokenRefresh: |
| log.Debug("New producer token needed: ", evt) |
| token, err := fetch_token() |
| if err != nil { |
| log.Warning("Cannot cannot fetch token: ", err) |
| kafka_producer.SetOAuthBearerTokenFailure(err.Error()) |
| } else { |
| setTokenError := kafka_producer.SetOAuthBearerToken(*token) |
| if setTokenError != nil { |
| log.Warning("Cannot cannot set token: ", setTokenError) |
| kafka_producer.SetOAuthBearerTokenFailure(setTokenError.Error()) |
| } |
| } |
| default: |
| log.Debug("Dumping topic writer event, unknown: ", evt) |
| } |
| |
| case msg := <-event_chan: |
| if msg == 0 { |
| return |
| } |
| case <-time.After(1 * time.Second): |
| if !running { |
| return |
| } |
| } |
| } |
| }() |
| go func() { |
| for { |
| select { |
| case writer_ctl := <-control_ch: |
| if writer_ctl.command == "EXIT" { |
| // ignore - wait for channel signal |
| } |
| |
| case kmsg := <-data_ch: |
| if kmsg == nil { |
| event_chan <- 0 |
| log.Info("Topic writer stopped by channel signal - start_topic_writer") |
| defer kafka_producer.Close() |
| return |
| } |
| |
| retries := 10 |
| msg_ok := false |
| var err error |
| for retry := 1; retry <= retries && msg_ok == false; retry++ { |
| err = kafka_producer.Produce(&kafka.Message{ |
| TopicPartition: kafka.TopicPartition{Topic: &kmsg.topic, Partition: kafka.PartitionAny}, |
| Value: kmsg.msg.Value, Key: kmsg.msg.Key}, nil) |
| |
| if err == nil { |
| incr_out_msg_cnt(kmsg.jobid) |
| msg_ok = true |
| log.Debug("Topic writer, msg sent ok on topic: ", kmsg.topic) |
| } else { |
| log.Info("Topic writer failed to send message on topic: ", kmsg.topic, " - Retrying. Error details: ", err) |
| time.Sleep(time.Duration(retry) * time.Second) |
| } |
| } |
| if !msg_ok { |
| log.Error("Topic writer failed to send message on topic: ", kmsg.topic, " - Msg discarded. Error details: ", err) |
| } |
| case <-time.After(1000 * time.Millisecond): |
| if !running { |
| return |
| } |
| } |
| } |
| }() |
| } |
| |
| func create_kafka_consumer(type_id string, gid string, cid string) *kafka.Consumer { |
| var cm kafka.ConfigMap |
| if creds_grant_type == "" { |
| log.Info("Creating kafka SASL plain text consumer for type: ", type_id) |
| cm = kafka.ConfigMap{ |
| "bootstrap.servers": bootstrapserver, |
| "group.id": gid, |
| "client.id": cid, |
| "auto.offset.reset": "latest", |
| "enable.auto.commit": false, |
| } |
| } else { |
| log.Info("Creating kafka SASL plain text consumer for type: ", type_id) |
| cm = kafka.ConfigMap{ |
| "bootstrap.servers": bootstrapserver, |
| "group.id": gid, |
| "client.id": cid, |
| "auto.offset.reset": "latest", |
| "enable.auto.commit": false, |
| "sasl.mechanism": "OAUTHBEARER", |
| "security.protocol": "SASL_PLAINTEXT", |
| } |
| } |
| c, err := kafka.NewConsumer(&cm) |
| |
| if err != nil { |
| log.Error("Cannot create kafka consumer for type: ", type_id, ", error details: ", err) |
| return nil |
| } |
| |
| log.Info("Created kafka consumer for type: ", type_id, " OK") |
| return c |
| } |
| |
| // Start kafka producer |
| func start_producer() *kafka.Producer { |
| log.Info("Creating kafka producer") |
| |
| var cm kafka.ConfigMap |
| if creds_grant_type == "" { |
| log.Info("Creating kafka SASL plain text producer") |
| cm = kafka.ConfigMap{ |
| "bootstrap.servers": bootstrapserver, |
| } |
| } else { |
| log.Info("Creating kafka SASL plain text producer") |
| cm = kafka.ConfigMap{ |
| "bootstrap.servers": bootstrapserver, |
| "sasl.mechanism": "OAUTHBEARER", |
| "security.protocol": "SASL_PLAINTEXT", |
| } |
| } |
| |
| p, err := kafka.NewProducer(&cm) |
| if err != nil { |
| log.Error("Cannot create kafka producer,", err) |
| return nil |
| } |
| return p |
| } |
| |
| func start_adminclient() *kafka.AdminClient { |
| log.Info("Creating kafka admin client") |
| a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": bootstrapserver}) |
| if err != nil { |
| log.Error("Cannot create kafka admin client,", err) |
| return nil |
| } |
| return a |
| } |
| |
| func create_minio_client(id string) (*minio.Client, *error) { |
| log.Debug("Get minio client") |
| minio_client, err := minio.New(filestore_server, &minio.Options{ |
| Secure: false, |
| Creds: credentials.NewStaticV4(filestore_user, filestore_pwd, ""), |
| }) |
| if err != nil { |
| log.Error("Cannot create minio client, ", err) |
| return nil, &err |
| } |
| return minio_client, nil |
| } |
| |
| func incr_out_msg_cnt(jobid string) { |
| j, ok := InfoJobs[jobid] |
| if ok { |
| j.statistics.out_msg_cnt++ |
| } |
| } |
| |
| func start_job_xml_file_data(type_id string, control_ch chan JobControl, data_in_ch chan *KafkaPayload, data_out_channel chan *KafkaPayload, fvolume string, fsbucket string) { |
| |
| log.Info("Type job", type_id, " started") |
| |
| filters := make(map[string]Filter) |
| topic_list := make(map[string]string) |
| var mc *minio.Client |
| const mc_id = "mc_" + "start_job_xml_file_data" |
| running := true |
| for { |
| select { |
| case job_ctl := <-control_ch: |
| log.Debug("Type job ", type_id, " new cmd received ", job_ctl.command) |
| switch job_ctl.command { |
| case "EXIT": |
| //ignore cmd - handled by channel signal |
| case "ADD-FILTER": |
| filters[job_ctl.filter.JobId] = job_ctl.filter |
| log.Debug("Type job ", type_id, " updated, topic: ", job_ctl.filter.OutputTopic, " jobid: ", job_ctl.filter.JobId) |
| |
| tmp_topic_list := make(map[string]string) |
| for k, v := range topic_list { |
| tmp_topic_list[k] = v |
| } |
| tmp_topic_list[job_ctl.filter.JobId] = job_ctl.filter.OutputTopic |
| topic_list = tmp_topic_list |
| case "REMOVE-FILTER": |
| log.Debug("Type job ", type_id, " removing job: ", job_ctl.filter.JobId) |
| |
| tmp_topic_list := make(map[string]string) |
| for k, v := range topic_list { |
| tmp_topic_list[k] = v |
| } |
| delete(tmp_topic_list, job_ctl.filter.JobId) |
| topic_list = tmp_topic_list |
| } |
| |
| case msg := <-data_in_ch: |
| if msg == nil { |
| log.Info("Type job ", type_id, " stopped by channel signal - start_job_xml_file_data") |
| |
| running = false |
| return |
| } |
| if fsbucket != "" && fvolume == "" { |
| if mc == nil { |
| var err *error |
| mc, err = create_minio_client(mc_id) |
| if err != nil { |
| log.Debug("Cannot create minio client for type job: ", type_id) |
| } |
| } |
| } |
| jobLimiterChan <- struct{}{} |
| go run_xml_job(type_id, msg, "gz", data_out_channel, topic_list, jobLimiterChan, fvolume, fsbucket, mc, mc_id) |
| |
| case <-time.After(1 * time.Second): |
| if !running { |
| return |
| } |
| } |
| } |
| } |
| |
| func run_xml_job(type_id string, msg *KafkaPayload, outputCompression string, data_out_channel chan *KafkaPayload, topic_list map[string]string, jobLimiterChan chan struct{}, fvolume string, fsbucket string, mc *minio.Client, mc_id string) { |
| defer func() { |
| <-jobLimiterChan |
| }() |
| PrintMemUsage() |
| |
| if fvolume == "" && fsbucket == "" { |
| log.Error("Type job: ", type_id, " cannot run, neither file volume nor filestore is set - discarding message") |
| return |
| } else if (fvolume != "") && (fsbucket != "") { |
| log.Error("Type job: ", type_id, " cannot run with output to both file volume and filestore bucket - discarding message") |
| return |
| } |
| |
| start := time.Now() |
| var evt_data XmlFileEventHeader |
| |
| err := jsoniter.Unmarshal(msg.msg.Value, &evt_data) |
| if err != nil { |
| log.Error("Cannot parse XmlFileEventHeader for type job: ", type_id, " - discarding message, error details", err) |
| return |
| } |
| log.Debug("Unmarshal file-collect event for type job: ", type_id, " time: ", time.Since(start).String()) |
| |
| var reader io.Reader |
| |
| INPUTBUCKET := "ropfiles" |
| |
| filename := "" |
| if fvolume != "" { |
| filename = fvolume + "/" + evt_data.Name |
| fi, err := os.Open(filename) |
| |
| if err != nil { |
| log.Error("File ", filename, " - cannot be opened for type job: ", type_id, " - discarding message, error details: ", err) |
| return |
| } |
| defer fi.Close() |
| reader = fi |
| } else { |
| filename = evt_data.Name |
| if mc != nil { |
| tctx := context.Background() |
| mr, err := mc.GetObject(tctx, INPUTBUCKET, filename, minio.GetObjectOptions{}) |
| if err != nil { |
| log.Error("Cannot get: ", filename, " for type job: ", type_id, " - discarding message, error details: ", err) |
| return |
| } |
| if mr == nil { |
| log.Error("Cannot get: ", filename, " for type job: ", type_id, " - minio get object returned null reader - discarding message, error details: ", err) |
| return |
| } |
| reader = mr |
| defer mr.Close() |
| } else { |
| log.Error("Cannot get: ", filename, " for type job: ", type_id, " - no minio client - discarding message") |
| return |
| } |
| } |
| |
| if reader == nil { |
| log.Error("Cannot get: ", filename, " - null reader") |
| return |
| } |
| var file_bytes []byte |
| if strings.HasSuffix(filename, "gz") { |
| start := time.Now() |
| var buf3 bytes.Buffer |
| errb := gunzipReaderToWriter(&buf3, reader) |
| if errb != nil { |
| log.Error("Cannot gunzip file ", filename, " - discarding message, ", errb) |
| return |
| } |
| file_bytes = buf3.Bytes() |
| log.Debug("Gunzip file: ", filename, "time:", time.Since(start).String(), "len: ", len(file_bytes)) |
| |
| } else { |
| var buf3 bytes.Buffer |
| _, err2 := io.Copy(&buf3, reader) |
| if err2 != nil { |
| log.Error("File ", filename, " - cannot be read, discarding message, ", err) |
| return |
| } |
| file_bytes = buf3.Bytes() |
| } |
| start = time.Now() |
| b, err := xml_to_json_conv(&file_bytes, &evt_data) |
| if err != nil { |
| log.Error("Cannot convert file ", evt_data.Name, " - discarding message, ", err) |
| return |
| } |
| log.Debug("Converted file to json: ", filename, " time", time.Since(start).String(), "len;", len(b)) |
| |
| new_fn := evt_data.Name + os.Getenv("KP") + ".json" |
| if outputCompression == "gz" { |
| new_fn = new_fn + ".gz" |
| start = time.Now() |
| var buf bytes.Buffer |
| err = gzipWrite(&buf, &b) |
| if err != nil { |
| log.Error("Cannot gzip file ", new_fn, " - discarding message, ", err) |
| return |
| } |
| b = buf.Bytes() |
| log.Debug("Gzip file: ", new_fn, " time: ", time.Since(start).String(), "len:", len(file_bytes)) |
| |
| } |
| start = time.Now() |
| |
| if fvolume != "" { |
| //Store on disk |
| err = os.WriteFile(fvolume+"/"+new_fn, b, 0644) |
| if err != nil { |
| log.Error("Cannot write file ", new_fn, " - discarding message,", err) |
| return |
| } |
| log.Debug("Write file to disk: "+new_fn, "time: ", time.Since(start).String(), " len: ", len(file_bytes)) |
| } else if fsbucket != "" { |
| // Store in minio |
| objectName := new_fn |
| if mc != nil { |
| |
| contentType := "application/json" |
| if strings.HasSuffix(objectName, ".gz") { |
| contentType = "application/gzip" |
| } |
| |
| // Upload the xml file with PutObject |
| r := bytes.NewReader(b) |
| tctx := context.Background() |
| if check_minio_bucket(mc, mc_id, fsbucket) == false { |
| err := create_minio_bucket(mc, mc_id, fsbucket) |
| if err != nil { |
| log.Error("Cannot create bucket: ", fsbucket, ", ", err) |
| return |
| } |
| } |
| ok := false |
| for i := 1; i < 64 && ok == false; i = i * 2 { |
| info, err := mc.PutObject(tctx, fsbucket, objectName, r, int64(len(b)), minio.PutObjectOptions{ContentType: contentType}) |
| if err != nil { |
| |
| if i == 1 { |
| log.Warn("Cannot upload (first attempt): ", objectName, ", ", err) |
| } else { |
| log.Warn("Cannot upload (retry): ", objectName, ", ", err) |
| } |
| time.Sleep(time.Duration(i) * time.Second) |
| } else { |
| log.Debug("Store ", objectName, " in filestore, time: ", time.Since(start).String()) |
| log.Debug("Successfully uploaded: ", objectName, " of size:", info.Size) |
| ok = true |
| } |
| } |
| if !ok { |
| log.Error("Cannot upload : ", objectName, ", ", err) |
| } |
| } else { |
| log.Error("Cannot upload: ", objectName, ", no client") |
| } |
| } |
| |
| start = time.Now() |
| if fvolume == "" { |
| var fde FileDownloadedEvt |
| fde.Filename = new_fn |
| j, err := jsoniter.Marshal(fde) |
| |
| if err != nil { |
| log.Error("Cannot marshal FileDownloadedEvt - discarding message, ", err) |
| return |
| } |
| msg.msg.Value = j |
| } else { |
| var fde FileDownloadedEvt |
| fde.Filename = new_fn |
| j, err := jsoniter.Marshal(fde) |
| |
| if err != nil { |
| log.Error("Cannot marshal FileDownloadedEvt - discarding message, ", err) |
| return |
| } |
| msg.msg.Value = j |
| } |
| msg.msg.Key = []byte("\"" + evt_data.SourceName + "\"") |
| log.Debug("Marshal file-collect event ", time.Since(start).String()) |
| |
| for k, v := range topic_list { |
| var kmsg *KafkaPayload = new(KafkaPayload) |
| kmsg.msg = msg.msg |
| kmsg.topic = v |
| kmsg.jobid = k |
| data_out_channel <- kmsg |
| } |
| } |
| |
| func xml_to_json_conv(f_byteValue *[]byte, xfeh *XmlFileEventHeader) ([]byte, error) { |
| var f MeasCollecFile |
| start := time.Now() |
| err := xml.Unmarshal(*f_byteValue, &f) |
| if err != nil { |
| return nil, errors.New("Cannot unmarshal xml-file") |
| } |
| log.Debug("Unmarshal xml file XmlFileEvent: ", time.Since(start).String()) |
| |
| start = time.Now() |
| var pmfile PMJsonFile |
| |
| pmfile.Event.Perf3GppFields.Perf3GppFieldsVersion = "1.0" |
| pmfile.Event.Perf3GppFields.MeasDataCollection.GranularityPeriod = 900 |
| pmfile.Event.Perf3GppFields.MeasDataCollection.MeasuredEntityUserName = "" |
| pmfile.Event.Perf3GppFields.MeasDataCollection.MeasuredEntityDn = f.FileHeader.FileSender.LocalDn |
| pmfile.Event.Perf3GppFields.MeasDataCollection.MeasuredEntitySoftwareVersion = f.MeasData.ManagedElement.SwVersion |
| |
| for _, it := range f.MeasData.MeasInfo { |
| var mili MeasInfoList |
| mili.MeasInfoID.SMeasInfoID = it.MeasInfoId |
| for _, jt := range it.MeasType { |
| mili.MeasTypes.SMeasTypesList = append(mili.MeasTypes.SMeasTypesList, jt.Text) |
| } |
| for _, jt := range it.MeasValue { |
| var mv MeasValues |
| mv.MeasObjInstID = jt.MeasObjLdn |
| mv.SuspectFlag = jt.Suspect |
| if jt.Suspect == "" { |
| mv.SuspectFlag = "false" |
| } |
| for _, kt := range jt.R { |
| ni, _ := strconv.Atoi(kt.P) |
| nv := kt.Text |
| mr := MeasResults{ni, nv} |
| mv.MeasResultsList = append(mv.MeasResultsList, mr) |
| } |
| mili.MeasValuesList = append(mili.MeasValuesList, mv) |
| } |
| |
| pmfile.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList = append(pmfile.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList, mili) |
| } |
| |
| pmfile.Event.Perf3GppFields.MeasDataCollection.GranularityPeriod = 900 |
| |
| //TODO: Fill more values |
| pmfile.Event.CommonEventHeader.Domain = "" //xfeh.Domain |
| pmfile.Event.CommonEventHeader.EventID = "" //xfeh.EventID |
| pmfile.Event.CommonEventHeader.Sequence = 0 //xfeh.Sequence |
| pmfile.Event.CommonEventHeader.EventName = "" //xfeh.EventName |
| pmfile.Event.CommonEventHeader.SourceName = xfeh.SourceName |
| pmfile.Event.CommonEventHeader.ReportingEntityName = "" //xfeh.ReportingEntityName |
| pmfile.Event.CommonEventHeader.Priority = "" //xfeh.Priority |
| pmfile.Event.CommonEventHeader.StartEpochMicrosec = xfeh.StartEpochMicrosec |
| pmfile.Event.CommonEventHeader.LastEpochMicrosec = xfeh.LastEpochMicrosec |
| pmfile.Event.CommonEventHeader.Version = "" //xfeh.Version |
| pmfile.Event.CommonEventHeader.VesEventListenerVersion = "" //xfeh.VesEventListenerVersion |
| pmfile.Event.CommonEventHeader.TimeZoneOffset = xfeh.TimeZoneOffset |
| |
| log.Debug("Convert xml to json : ", time.Since(start).String()) |
| |
| start = time.Now() |
| json, err := jsoniter.Marshal(pmfile) |
| log.Debug("Marshal json : ", time.Since(start).String()) |
| |
| if err != nil { |
| return nil, errors.New("Cannot marshal converted json") |
| } |
| return json, nil |
| } |
| |
| func start_job_json_file_data(type_id string, control_ch chan JobControl, data_in_ch chan *KafkaPayload, data_out_channel chan *KafkaPayload, objectstore bool) { |
| |
| log.Info("Type job", type_id, " started") |
| |
| filters := make(map[string]Filter) |
| filterParams_list := make(map[string]FilterMaps) |
| topic_list := make(map[string]string) |
| var mc *minio.Client |
| const mc_id = "mc_" + "start_job_json_file_data" |
| running := true |
| for { |
| select { |
| case job_ctl := <-control_ch: |
| log.Debug("Type job ", type_id, " new cmd received ", job_ctl.command) |
| switch job_ctl.command { |
| case "EXIT": |
| case "ADD-FILTER": |
| filters[job_ctl.filter.JobId] = job_ctl.filter |
| log.Debug("Type job ", type_id, " updated, topic: ", job_ctl.filter.OutputTopic, " jobid: ", job_ctl.filter.JobId) |
| |
| tmp_filterParams_list := make(map[string]FilterMaps) |
| for k, v := range filterParams_list { |
| tmp_filterParams_list[k] = v |
| } |
| tmp_filterParams_list[job_ctl.filter.JobId] = job_ctl.filter.filter |
| filterParams_list = tmp_filterParams_list |
| |
| tmp_topic_list := make(map[string]string) |
| for k, v := range topic_list { |
| tmp_topic_list[k] = v |
| } |
| tmp_topic_list[job_ctl.filter.JobId] = job_ctl.filter.OutputTopic |
| topic_list = tmp_topic_list |
| case "REMOVE-FILTER": |
| log.Debug("Type job ", type_id, " removing job: ", job_ctl.filter.JobId) |
| |
| tmp_filterParams_list := make(map[string]FilterMaps) |
| for k, v := range filterParams_list { |
| tmp_filterParams_list[k] = v |
| } |
| delete(tmp_filterParams_list, job_ctl.filter.JobId) |
| filterParams_list = tmp_filterParams_list |
| |
| tmp_topic_list := make(map[string]string) |
| for k, v := range topic_list { |
| tmp_topic_list[k] = v |
| } |
| delete(tmp_topic_list, job_ctl.filter.JobId) |
| topic_list = tmp_topic_list |
| } |
| |
| case msg := <-data_in_ch: |
| if msg == nil { |
| log.Info("Type job ", type_id, " stopped by channel signal - start_job_xml_file_data") |
| |
| running = false |
| return |
| } |
| if objectstore { |
| if mc == nil { |
| var err *error |
| mc, err = create_minio_client(mc_id) |
| if err != nil { |
| log.Debug("Cannot create minio client for type job: ", type_id) |
| } |
| } |
| } |
| //TODO: Sort processed file conversions in order (FIFO) |
| jobLimiterChan <- struct{}{} |
| go run_json_job(type_id, msg, "", filterParams_list, topic_list, data_out_channel, jobLimiterChan, mc, mc_id, objectstore) |
| |
| case <-time.After(1 * time.Second): |
| if !running { |
| return |
| } |
| } |
| } |
| } |
| |
| func run_json_job(type_id string, msg *KafkaPayload, outputCompression string, filterList map[string]FilterMaps, topic_list map[string]string, data_out_channel chan *KafkaPayload, jobLimiterChan chan struct{}, mc *minio.Client, mc_id string, objectstore bool) { |
| |
| //Release job limit |
| defer func() { |
| <-jobLimiterChan |
| }() |
| |
| PrintMemUsage() |
| |
| var evt_data FileDownloadedEvt |
| err := jsoniter.Unmarshal(msg.msg.Value, &evt_data) |
| if err != nil { |
| log.Error("Cannot parse FileDownloadedEvt for type job: ", type_id, " - discarding message, error details: ", err) |
| return |
| } |
| log.Debug("FileDownloadedEvt for job: ", type_id, " file: ", evt_data.Filename) |
| |
| var reader io.Reader |
| |
| INPUTBUCKET := "pm-files-json" |
| filename := "" |
| if objectstore == false { |
| filename = files_volume + "/" + evt_data.Filename |
| fi, err := os.Open(filename) |
| |
| if err != nil { |
| log.Error("File: ", filename, "for type job: ", type_id, " - cannot be opened -discarding message - error details: ", err) |
| return |
| } |
| defer fi.Close() |
| reader = fi |
| } else { |
| filename = "/" + evt_data.Filename |
| if mc != nil { |
| if check_minio_bucket(mc, mc_id, INPUTBUCKET) == false { |
| log.Error("Bucket not available for reading in type job: ", type_id, "bucket: ", INPUTBUCKET) |
| return |
| } |
| tctx := context.Background() |
| mr, err := mc.GetObject(tctx, INPUTBUCKET, filename, minio.GetObjectOptions{}) |
| if err != nil { |
| log.Error("Obj: ", filename, "for type job: ", type_id, " - cannot be fetched from bucket: ", INPUTBUCKET, " - discarding message, error details: ", err) |
| return |
| } |
| reader = mr |
| defer mr.Close() |
| } else { |
| log.Error("Cannot get obj: ", filename, "for type job: ", type_id, " from bucket: ", INPUTBUCKET, " - no client") |
| return |
| } |
| } |
| |
| var data *[]byte |
| if strings.HasSuffix(filename, "gz") { |
| start := time.Now() |
| var buf2 bytes.Buffer |
| errb := gunzipReaderToWriter(&buf2, reader) |
| if errb != nil { |
| log.Error("Cannot decompress file/obj ", filename, "for type job: ", type_id, " - discarding message, error details", errb) |
| return |
| } |
| d := buf2.Bytes() |
| data = &d |
| log.Debug("Decompress file/obj ", filename, "for type job: ", type_id, " time:", time.Since(start).String()) |
| } else { |
| |
| start := time.Now() |
| d, err := io.ReadAll(reader) |
| if err != nil { |
| log.Error("Cannot read file/obj: ", filename, "for type job: ", type_id, " - discarding message, error details", err) |
| return |
| } |
| data = &d |
| |
| log.Debug("Read file/obj: ", filename, "for type job: ", type_id, " time: ", time.Since(start).String()) |
| } |
| |
| for k, v := range filterList { |
| |
| var pmfile PMJsonFile |
| start := time.Now() |
| err = jsoniter.Unmarshal(*data, &pmfile) |
| log.Debug("Json unmarshal obj: ", filename, " time: ", time.Since(start).String()) |
| |
| if err != nil { |
| log.Error("Msg could not be unmarshalled - discarding message, obj: ", filename, " - error details:", err) |
| return |
| } |
| |
| var kmsg *KafkaPayload = new(KafkaPayload) |
| kmsg.msg = new(kafka.Message) |
| kmsg.msg.Key = []byte("\"" + pmfile.Event.CommonEventHeader.SourceName + "\"") |
| log.Debug("topic:", topic_list[k]) |
| log.Debug("sourceNameMap:", v.sourceNameMap) |
| log.Debug("measObjClassMap:", v.measObjClassMap) |
| log.Debug("measObjInstIdsMap:", v.measObjInstIdsMap) |
| log.Debug("measTypesMap:", v.measTypesMap) |
| |
| b := json_pm_filter_to_byte(evt_data.Filename, &pmfile, v.sourceNameMap, v.measObjClassMap, v.measObjInstIdsMap, v.measTypesMap) |
| if b == nil { |
| log.Info("File/obj ", filename, "for job: ", k, " - empty after filering, discarding message ") |
| return |
| } |
| kmsg.msg.Value = *b |
| |
| kmsg.topic = topic_list[k] |
| kmsg.jobid = k |
| |
| data_out_channel <- kmsg |
| } |
| |
| } |
| |
| func json_pm_filter_to_byte(resource string, data *PMJsonFile, sourceNameMap map[string]bool, measObjClassMap map[string]bool, measObjInstIdsMap map[string]bool, measTypesMap map[string]bool) *[]byte { |
| |
| if json_pm_filter_to_obj(resource, data, sourceNameMap, measObjClassMap, measObjInstIdsMap, measTypesMap) == nil { |
| return nil |
| } |
| start := time.Now() |
| j, err := jsoniter.Marshal(&data) |
| |
| log.Debug("Json marshal obj: ", resource, " time: ", time.Since(start).String()) |
| |
| if err != nil { |
| log.Error("Msg could not be marshalled discarding message, obj: ", resource, ", error details: ", err) |
| return nil |
| } |
| |
| log.Debug("Filtered json obj: ", resource, " len: ", len(j)) |
| return &j |
| } |
| |
| func json_pm_filter_to_obj(resource string, data *PMJsonFile, sourceNameMap map[string]bool, measObjClassMap map[string]bool, measObjInstIdsMap map[string]bool, measTypesMap map[string]bool) *PMJsonFile { |
| filter_req := true |
| start := time.Now() |
| if len(sourceNameMap) != 0 { |
| if !sourceNameMap[data.Event.CommonEventHeader.SourceName] { |
| filter_req = false |
| return nil |
| } |
| } |
| if filter_req { |
| modified := false |
| var temp_mil []MeasInfoList |
| for _, zz := range data.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList { |
| |
| check_cntr := false |
| var cnt_flags []bool |
| if len(measTypesMap) > 0 { |
| c_cntr := 0 |
| var temp_mtl []string |
| for _, v := range zz.MeasTypes.SMeasTypesList { |
| if measTypesMap[v] { |
| cnt_flags = append(cnt_flags, true) |
| c_cntr++ |
| temp_mtl = append(temp_mtl, v) |
| } else { |
| cnt_flags = append(cnt_flags, false) |
| } |
| } |
| if c_cntr > 0 { |
| check_cntr = true |
| zz.MeasTypes.SMeasTypesList = temp_mtl |
| } else { |
| modified = true |
| continue |
| } |
| } |
| keep := false |
| var temp_mvl []MeasValues |
| for _, yy := range zz.MeasValuesList { |
| keep_class := false |
| keep_inst := false |
| keep_cntr := false |
| |
| dna := strings.Split(yy.MeasObjInstID, ",") |
| instName := dna[len(dna)-1] |
| cls := strings.Split(dna[len(dna)-1], "=")[0] |
| |
| if len(measObjClassMap) > 0 { |
| if measObjClassMap[cls] { |
| keep_class = true |
| } |
| } else { |
| keep_class = true |
| } |
| |
| if len(measObjInstIdsMap) > 0 { |
| if measObjInstIdsMap[instName] { |
| keep_inst = true |
| } |
| } else { |
| keep_inst = true |
| } |
| |
| if check_cntr { |
| var temp_mrl []MeasResults |
| cnt_p := 1 |
| for _, v := range yy.MeasResultsList { |
| if cnt_flags[v.P-1] { |
| v.P = cnt_p |
| cnt_p++ |
| temp_mrl = append(temp_mrl, v) |
| } |
| } |
| yy.MeasResultsList = temp_mrl |
| keep_cntr = true |
| } else { |
| keep_cntr = true |
| } |
| if keep_class && keep_cntr && keep_inst { |
| keep = true |
| temp_mvl = append(temp_mvl, yy) |
| } |
| } |
| if keep { |
| zz.MeasValuesList = temp_mvl |
| temp_mil = append(temp_mil, zz) |
| modified = true |
| } |
| |
| } |
| //Only if modified |
| if modified { |
| if len(temp_mil) == 0 { |
| log.Debug("Msg filtered, nothing found, discarding, obj: ", resource) |
| return nil |
| } |
| data.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList = temp_mil |
| } |
| } |
| log.Debug("Filter: ", time.Since(start).String()) |
| return data |
| } |
| |
| func start_job_json_file_data_influx(type_id string, control_ch chan JobControl, data_in_ch chan *KafkaPayload, objectstore bool) { |
| |
| log.Info("Type job", type_id, " started") |
| log.Debug("influx job ch ", data_in_ch) |
| filters := make(map[string]Filter) |
| filterParams_list := make(map[string]FilterMaps) |
| influx_job_params := make(map[string]InfluxJobParameters) |
| var mc *minio.Client |
| const mc_id = "mc_" + "start_job_json_file_data_influx" |
| running := true |
| for { |
| select { |
| case job_ctl := <-control_ch: |
| log.Debug("Type job ", type_id, " new cmd received ", job_ctl.command) |
| switch job_ctl.command { |
| case "EXIT": |
| //ignore cmd - handled by channel signal |
| case "ADD-FILTER": |
| |
| filters[job_ctl.filter.JobId] = job_ctl.filter |
| log.Debug("Type job ", type_id, " updated, topic: ", job_ctl.filter.OutputTopic, " jobid: ", job_ctl.filter.JobId) |
| log.Debug(job_ctl.filter) |
| tmp_filterParams_list := make(map[string]FilterMaps) |
| for k, v := range filterParams_list { |
| tmp_filterParams_list[k] = v |
| } |
| tmp_filterParams_list[job_ctl.filter.JobId] = job_ctl.filter.filter |
| filterParams_list = tmp_filterParams_list |
| |
| tmp_influx_job_params := make(map[string]InfluxJobParameters) |
| for k, v := range influx_job_params { |
| tmp_influx_job_params[k] = v |
| } |
| tmp_influx_job_params[job_ctl.filter.JobId] = job_ctl.filter.influxParameters |
| influx_job_params = tmp_influx_job_params |
| |
| case "REMOVE-FILTER": |
| |
| log.Debug("Type job ", type_id, " removing job: ", job_ctl.filter.JobId) |
| |
| tmp_filterParams_list := make(map[string]FilterMaps) |
| for k, v := range filterParams_list { |
| tmp_filterParams_list[k] = v |
| } |
| delete(tmp_filterParams_list, job_ctl.filter.JobId) |
| filterParams_list = tmp_filterParams_list |
| |
| tmp_influx_job_params := make(map[string]InfluxJobParameters) |
| for k, v := range influx_job_params { |
| tmp_influx_job_params[k] = v |
| } |
| delete(tmp_influx_job_params, job_ctl.filter.JobId) |
| influx_job_params = tmp_influx_job_params |
| } |
| |
| case msg := <-data_in_ch: |
| log.Debug("Data reveived - influx") |
| if msg == nil { |
| log.Info("Type job ", type_id, " stopped by channel signal - start_job_xml_file_data") |
| |
| running = false |
| return |
| } |
| if objectstore { |
| if mc == nil { |
| var err *error |
| mc, err = create_minio_client(mc_id) |
| if err != nil { |
| log.Debug("Cannot create minio client for type job: ", type_id) |
| } |
| } |
| } |
| |
| jobLimiterChan <- struct{}{} |
| go run_json_file_data_job_influx(type_id, msg, filterParams_list, influx_job_params, jobLimiterChan, mc, mc_id, objectstore) |
| |
| case <-time.After(1 * time.Second): |
| if !running { |
| return |
| } |
| } |
| } |
| } |
| |
| func run_json_file_data_job_influx(type_id string, msg *KafkaPayload, filterList map[string]FilterMaps, influxList map[string]InfluxJobParameters, jobLimiterChan chan struct{}, mc *minio.Client, mc_id string, objectstore bool) { |
| |
| log.Debug("run_json_file_data_job_influx") |
| //Release job limit |
| defer func() { |
| <-jobLimiterChan |
| }() |
| |
| PrintMemUsage() |
| |
| var evt_data FileDownloadedEvt |
| err := jsoniter.Unmarshal(msg.msg.Value, &evt_data) |
| if err != nil { |
| log.Error("Cannot parse FileDownloadedEvt for type job: ", type_id, " - discarding message, error details: ", err) |
| return |
| } |
| log.Debug("FileDownloadedEvt for job: ", type_id, " file: ", evt_data.Filename) |
| |
| var reader io.Reader |
| |
| INPUTBUCKET := "pm-files-json" |
| filename := "" |
| if objectstore == false { |
| filename = files_volume + "/" + evt_data.Filename |
| fi, err := os.Open(filename) |
| |
| if err != nil { |
| log.Error("File: ", filename, "for type job: ", type_id, " - cannot be opened -discarding message - error details: ", err) |
| return |
| } |
| defer fi.Close() |
| reader = fi |
| } else { |
| filename = "/" + evt_data.Filename |
| if mc != nil { |
| if check_minio_bucket(mc, mc_id, INPUTBUCKET) == false { |
| log.Error("Bucket not available for reading in type job: ", type_id, "bucket: ", INPUTBUCKET) |
| return |
| } |
| tctx := context.Background() |
| mr, err := mc.GetObject(tctx, INPUTBUCKET, filename, minio.GetObjectOptions{}) |
| if err != nil { |
| log.Error("Obj: ", filename, "for type job: ", type_id, " - cannot be fetched from bucket: ", INPUTBUCKET, " - discarding message, error details: ", err) |
| return |
| } |
| reader = mr |
| defer mr.Close() |
| } else { |
| log.Error("Cannot get obj: ", filename, "for type job: ", type_id, " from bucket: ", INPUTBUCKET, " - no client") |
| return |
| } |
| } |
| |
| var data *[]byte |
| if strings.HasSuffix(filename, "gz") { |
| start := time.Now() |
| var buf2 bytes.Buffer |
| errb := gunzipReaderToWriter(&buf2, reader) |
| if errb != nil { |
| log.Error("Cannot decompress file/obj ", filename, "for type job: ", type_id, " - discarding message, error details", errb) |
| return |
| } |
| d := buf2.Bytes() |
| data = &d |
| log.Debug("Decompress file/obj ", filename, "for type job: ", type_id, " time:", time.Since(start).String()) |
| } else { |
| |
| start := time.Now() |
| d, err := io.ReadAll(reader) |
| if err != nil { |
| log.Error("Cannot read file/obj: ", filename, "for type job: ", type_id, " - discarding message, error details", err) |
| return |
| } |
| data = &d |
| |
| log.Debug("Read file/obj: ", filename, "for type job: ", type_id, " time: ", time.Since(start).String()) |
| } |
| for k, v := range filterList { |
| |
| var pmfile PMJsonFile |
| start := time.Now() |
| err = jsoniter.Unmarshal(*data, &pmfile) |
| log.Debug("Json unmarshal obj: ", filename, " time: ", time.Since(start).String()) |
| |
| if err != nil { |
| log.Error("Msg could not be unmarshalled - discarding message, obj: ", filename, " - error details:", err) |
| return |
| } |
| |
| if len(v.sourceNameMap) > 0 || len(v.measObjInstIdsMap) > 0 || len(v.measTypesMap) > 0 || len(v.measObjClassMap) > 0 { |
| b := json_pm_filter_to_obj(evt_data.Filename, &pmfile, v.sourceNameMap, v.measObjClassMap, v.measObjInstIdsMap, v.measTypesMap) |
| if b == nil { |
| log.Info("File/obj ", filename, "for job: ", k, " - empty after filering, discarding message ") |
| return |
| } |
| |
| } |
| fluxParms := influxList[k] |
| log.Debug("Influxdb params: ", fluxParms) |
| client := influxdb2.NewClient(fluxParms.DbUrl, fluxParms.DbToken) |
| writeAPI := client.WriteAPIBlocking(fluxParms.DbOrg, fluxParms.DbBucket) |
| |
| for _, zz := range pmfile.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList { |
| ctr_names := make(map[string]string) |
| for cni, cn := range zz.MeasTypes.SMeasTypesList { |
| ctr_names[strconv.Itoa(cni+1)] = cn |
| } |
| for _, xx := range zz.MeasValuesList { |
| log.Debug("Measurement: ", xx.MeasObjInstID) |
| log.Debug("Suspect flag: ", xx.SuspectFlag) |
| p := influxdb2.NewPointWithMeasurement(xx.MeasObjInstID) |
| p.AddField("suspectflag", xx.SuspectFlag) |
| p.AddField("granularityperiod", pmfile.Event.Perf3GppFields.MeasDataCollection.GranularityPeriod) |
| p.AddField("timezone", pmfile.Event.CommonEventHeader.TimeZoneOffset) |
| for _, yy := range xx.MeasResultsList { |
| pi := strconv.Itoa(yy.P) |
| pv := yy.SValue |
| pn := ctr_names[pi] |
| log.Debug("Counter: ", pn, " Value: ", pv) |
| pv_i, err := strconv.Atoi(pv) |
| if err == nil { |
| p.AddField(pn, pv_i) |
| } else { |
| p.AddField(pn, pv) |
| } |
| } |
| //p.SetTime(timeT) |
| log.Debug("StartEpochMicrosec from common event header: ", pmfile.Event.CommonEventHeader.StartEpochMicrosec) |
| log.Debug("Set time: ", time.Unix(pmfile.Event.CommonEventHeader.StartEpochMicrosec/1000000, 0)) |
| p.SetTime(time.Unix(pmfile.Event.CommonEventHeader.StartEpochMicrosec/1000000, 0)) |
| err := writeAPI.WritePoint(context.Background(), p) |
| if err != nil { |
| log.Error("Db write error: ", err) |
| } |
| } |
| |
| } |
| client.Close() |
| } |
| |
| } |