blob: d37b0d287ed0f066563d2be1366a24d9a7a44d99 [file] [log] [blame]
// ============LICENSE_START===============================================
// Copyright (C) 2023 Nordix Foundation. All rights reserved.
// ========================================================================
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ============LICENSE_END=================================================
//
package main
import (
"bytes"
"compress/gzip"
"context"
"crypto/tls"
"encoding/json"
"encoding/xml"
"errors"
"fmt"
"io"
"net"
"os/signal"
"reflect"
"strings"
"sync"
"syscall"
"net/http"
"os"
"runtime"
"strconv"
"time"
"github.com/google/uuid"
"golang.org/x/oauth2/clientcredentials"
log "github.com/sirupsen/logrus"
"github.com/gorilla/mux"
"net/http/pprof"
"github.com/confluentinc/confluent-kafka-go/kafka"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
jsoniter "github.com/json-iterator/go"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
)
//== Constants ==//
const http_port = 80
const https_port = 443
const config_file = "application_configuration.json"
const server_crt = "server.crt"
const server_key = "server.key"
const producer_name = "kafka-producer"
const registration_delay_short = 2
const registration_delay_long = 120
const mutexLocked = 1
const (
Init AppStates = iota
Running
Terminating
)
const reader_queue_length = 100 //Per type job
const writer_queue_length = 100 //Per info job
const parallelism_limiter = 100 //For all jobs
// This are optional - set if using SASL protocol is used towards kafka
var creds_grant_type = os.Getenv("CREDS_GRANT_TYPE")
var creds_client_secret = os.Getenv("CREDS_CLIENT_SECRET")
var creds_client_id = os.Getenv("CREDS_CLIENT_ID")
var creds_service_url = os.Getenv("AUTH_SERVICE_URL")
//== Types ==//
type AppStates int64
type FilterParameters struct {
MeasuredEntityDns []string `json:"measuredEntityDns"`
MeasTypes []string `json:"measTypes"`
MeasObjClass []string `json:"measObjClass"`
MeasObjInstIds []string `json:"measObjInstIds"`
}
type InfoJobDataType struct {
InfoJobData struct {
KafkaOutputTopic string `json:"kafkaOutputTopic"`
DbUrl string `json:"db-url"`
DbOrg string `json:"db-org"`
DbBucket string `json:"db-bucket"`
DbToken string `json:"db-token"`
FilterParams FilterParameters `json:"filter"`
} `json:"info_job_data"`
InfoJobIdentity string `json:"info_job_identity"`
InfoTypeIdentity string `json:"info_type_identity"`
LastUpdated string `json:"last_updated"`
Owner string `json:"owner"`
TargetURI string `json:"target_uri"`
}
// Type for an infojob
type InfoJobRecord struct {
job_info InfoJobDataType
output_topic string
statistics *InfoJobStats
}
// Type for an infojob
type TypeJobRecord struct {
InfoType string
InputTopic string
data_in_channel chan *KafkaPayload
reader_control chan ReaderControl
job_control chan JobControl
groupId string
clientId string
statistics *TypeJobStats
}
// Type for controlling the topic reader
type ReaderControl struct {
command string
}
// Type for controlling the topic writer
type WriterControl struct {
command string
}
// Type for controlling the job
type JobControl struct {
command string
filter Filter
}
type KafkaPayload struct {
msg *kafka.Message
topic string
jobid string
}
type FilterMaps struct {
sourceNameMap map[string]bool
measObjClassMap map[string]bool
measObjInstIdsMap map[string]bool
measTypesMap map[string]bool
}
type InfluxJobParameters struct {
DbUrl string
DbOrg string
DbBucket string
DbToken string
}
type Filter struct {
JobId string
OutputTopic string
filter FilterMaps
influxParameters InfluxJobParameters
}
// Type for info job statistics
type InfoJobStats struct {
out_msg_cnt int
out_data_vol int64
}
// Type for type job statistics
type TypeJobStats struct {
in_msg_cnt int
in_data_vol int64
}
// == API Datatypes ==//
// Type for supported data types
type DataType struct {
ID string `json:"id"`
KafkaInputTopic string `json:"kafkaInputTopic"`
InputJobType string `json:inputJobType`
InputJobDefinition struct {
KafkaOutputTopic string `json:kafkaOutputTopic`
} `json:inputJobDefinition`
ext_job *[]byte
ext_job_created bool
ext_job_id string
}
type DataTypes struct {
ProdDataTypes []DataType `json:"types"`
}
type Minio_buckets struct {
Buckets map[string]bool
}
//== External data types ==//
// // Data type for event xml file download
type XmlFileEventHeader struct {
ProductName string `json:"productName"`
VendorName string `json:"vendorName"`
Location string `json:"location"`
Compression string `json:"compression"`
SourceName string `json:"sourceName"`
FileFormatType string `json:"fileFormatType"`
FileFormatVersion string `json:"fileFormatVersion"`
StartEpochMicrosec int64 `json:"startEpochMicrosec"`
LastEpochMicrosec int64 `json:"lastEpochMicrosec"`
Name string `json:"name"`
ChangeIdentifier string `json:"changeIdentifier"`
InternalLocation string `json:"internalLocation"`
TimeZoneOffset string `json:"timeZoneOffset"`
//ObjectStoreBucket string `json:"objectStoreBucket"`
}
// Data types for input xml file
type MeasCollecFile struct {
XMLName xml.Name `xml:"measCollecFile"`
Text string `xml:",chardata"`
Xmlns string `xml:"xmlns,attr"`
Xsi string `xml:"xsi,attr"`
SchemaLocation string `xml:"schemaLocation,attr"`
FileHeader struct {
Text string `xml:",chardata"`
FileFormatVersion string `xml:"fileFormatVersion,attr"`
VendorName string `xml:"vendorName,attr"`
DnPrefix string `xml:"dnPrefix,attr"`
FileSender struct {
Text string `xml:",chardata"`
LocalDn string `xml:"localDn,attr"`
ElementType string `xml:"elementType,attr"`
} `xml:"fileSender"`
MeasCollec struct {
Text string `xml:",chardata"`
BeginTime string `xml:"beginTime,attr"`
} `xml:"measCollec"`
} `xml:"fileHeader"`
MeasData struct {
Text string `xml:",chardata"`
ManagedElement struct {
Text string `xml:",chardata"`
LocalDn string `xml:"localDn,attr"`
SwVersion string `xml:"swVersion,attr"`
} `xml:"managedElement"`
MeasInfo []struct {
Text string `xml:",chardata"`
MeasInfoId string `xml:"measInfoId,attr"`
Job struct {
Text string `xml:",chardata"`
JobId string `xml:"jobId,attr"`
} `xml:"job"`
GranPeriod struct {
Text string `xml:",chardata"`
Duration string `xml:"duration,attr"`
EndTime string `xml:"endTime,attr"`
} `xml:"granPeriod"`
RepPeriod struct {
Text string `xml:",chardata"`
Duration string `xml:"duration,attr"`
} `xml:"repPeriod"`
MeasType []struct {
Text string `xml:",chardata"`
P string `xml:"p,attr"`
} `xml:"measType"`
MeasValue []struct {
Text string `xml:",chardata"`
MeasObjLdn string `xml:"measObjLdn,attr"`
R []struct {
Text string `xml:",chardata"`
P string `xml:"p,attr"`
} `xml:"r"`
Suspect string `xml:"suspect"`
} `xml:"measValue"`
} `xml:"measInfo"`
} `xml:"measData"`
FileFooter struct {
Text string `xml:",chardata"`
MeasCollec struct {
Text string `xml:",chardata"`
EndTime string `xml:"endTime,attr"`
} `xml:"measCollec"`
} `xml:"fileFooter"`
}
// Data type for json file
// Splitted in sevreal part to allow add/remove in lists
type MeasResults struct {
P int `json:"p"`
SValue string `json:"sValue"`
}
type MeasValues struct {
MeasObjInstID string `json:"measObjInstId"`
SuspectFlag string `json:"suspectFlag"`
MeasResultsList []MeasResults `json:"measResults"`
}
type SMeasTypes struct {
SMeasType string `json:"sMeasTypesList"`
}
type MeasInfoList struct {
MeasInfoID struct {
SMeasInfoID string `json:"sMeasInfoId"`
} `json:"measInfoId"`
MeasTypes struct {
SMeasTypesList []string `json:"sMeasTypesList"`
} `json:"measTypes"`
MeasValuesList []MeasValues `json:"measValuesList"`
}
type PMJsonFile struct {
Event struct {
CommonEventHeader struct {
Domain string `json:"domain"`
EventID string `json:"eventId"`
Sequence int `json:"sequence"`
EventName string `json:"eventName"`
SourceName string `json:"sourceName"`
ReportingEntityName string `json:"reportingEntityName"`
Priority string `json:"priority"`
StartEpochMicrosec int64 `json:"startEpochMicrosec"`
LastEpochMicrosec int64 `json:"lastEpochMicrosec"`
Version string `json:"version"`
VesEventListenerVersion string `json:"vesEventListenerVersion"`
TimeZoneOffset string `json:"timeZoneOffset"`
} `json:"commonEventHeader"`
Perf3GppFields struct {
Perf3GppFieldsVersion string `json:"perf3gppFieldsVersion"`
MeasDataCollection struct {
GranularityPeriod int `json:"granularityPeriod"`
MeasuredEntityUserName string `json:"measuredEntityUserName"`
MeasuredEntityDn string `json:"measuredEntityDn"`
MeasuredEntitySoftwareVersion string `json:"measuredEntitySoftwareVersion"`
SMeasInfoList []MeasInfoList `json:"measInfoList"`
} `json:"measDataCollection"`
} `json:"perf3gppFields"`
} `json:"event"`
}
// Data type for converted json file message
type FileDownloadedEvt struct {
Filename string `json:"filename"`
}
//== Variables ==//
var AppState = Init
// Lock for all internal data
var datalock sync.Mutex
var producer_instance_name string = producer_name
// Keep all info type jobs, key == type id
var TypeJobs map[string]TypeJobRecord = make(map[string]TypeJobRecord)
// Keep all info jobs, key == job id
var InfoJobs map[string]InfoJobRecord = make(map[string]InfoJobRecord)
var InfoTypes DataTypes
// Limiter - valid for all jobs
var jobLimiterChan = make(chan struct{}, parallelism_limiter)
// TODO: Config param?
var bucket_location = "swe"
var httpclient = &http.Client{}
// == Env variables ==//
var bootstrapserver = os.Getenv("KAFKA_SERVER")
var files_volume = os.Getenv("FILES_VOLUME")
var ics_server = os.Getenv("ICS")
var self = os.Getenv("SELF")
var filestore_user = os.Getenv("FILESTORE_USER")
var filestore_pwd = os.Getenv("FILESTORE_PWD")
var filestore_server = os.Getenv("FILESTORE_SERVER")
var data_out_channel = make(chan *KafkaPayload, writer_queue_length)
var writer_control = make(chan WriterControl, 1)
var minio_bucketlist map[string]Minio_buckets = make(map[string]Minio_buckets)
// == Main ==//
func main() {
//log.SetLevel(log.InfoLevel)
log.SetLevel(log.TraceLevel)
log.Info("Server starting...")
if self == "" {
log.Panic("Env SELF not configured")
}
if bootstrapserver == "" {
log.Panic("Env KAFKA_SERVER not set")
}
if ics_server == "" {
log.Panic("Env ICS not set")
}
if os.Getenv("KP") != "" {
producer_instance_name = producer_instance_name + "-" + os.Getenv("KP")
}
rtr := mux.NewRouter()
rtr.HandleFunc("/callbacks/job/"+producer_instance_name, create_job)
rtr.HandleFunc("/callbacks/job/"+producer_instance_name+"/{job_id}", delete_job)
rtr.HandleFunc("/callbacks/supervision/"+producer_instance_name, supervise_producer)
rtr.HandleFunc("/statistics", statistics)
rtr.HandleFunc("/logging/{level}", logging_level)
rtr.HandleFunc("/logging", logging_level)
rtr.HandleFunc("/", alive)
//For perf/mem profiling
rtr.HandleFunc("/custom_debug_path/profile", pprof.Profile)
http.Handle("/", rtr)
http_server := &http.Server{Addr: ":" + strconv.Itoa(http_port), Handler: nil}
cer, err := tls.LoadX509KeyPair(server_crt, server_key)
if err != nil {
log.Error("Cannot load key and cert - ", err)
return
}
config := &tls.Config{Certificates: []tls.Certificate{cer}}
https_server := &http.Server{Addr: ":" + strconv.Itoa(https_port), TLSConfig: config, Handler: nil}
// Run http
go func() {
log.Info("Starting http service...")
err := http_server.ListenAndServe()
if err == http.ErrServerClosed { // graceful shutdown
log.Info("http server shutdown...")
} else if err != nil {
log.Error("http server error: ", err)
}
}()
// Run https
go func() {
log.Info("Starting https service...")
err := https_server.ListenAndServe()
if err == http.ErrServerClosed { // graceful shutdown
log.Info("https server shutdown...")
} else if err != nil {
log.Error("https server error: ", err)
}
}()
check_tcp(strconv.Itoa(http_port))
check_tcp(strconv.Itoa(https_port))
go start_topic_writer(writer_control, data_out_channel)
//Setup proc for periodic type registration
var event_chan = make(chan int) //Channel for stopping the proc
go periodic_registration(event_chan)
//Wait for term/int signal do try to shut down gracefully
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
sig := <-sigs
fmt.Printf("Received signal %s - application will terminate\n", sig)
event_chan <- 0 // Stop periodic registration
datalock.Lock()
defer datalock.Unlock()
AppState = Terminating
http_server.Shutdown(context.Background())
https_server.Shutdown(context.Background())
// Stopping jobs
for key, _ := range TypeJobs {
log.Info("Stopping type job:", key)
for _, dp := range InfoTypes.ProdDataTypes {
if key == dp.ID {
remove_type_job(dp)
}
}
}
}()
AppState = Running
//Wait until all go routines has exited
runtime.Goexit()
fmt.Println("main routine exit")
fmt.Println("server stopped")
}
func check_tcp(port string) {
log.Info("Checking tcp port: ", port)
for true {
address := net.JoinHostPort("localhost", port)
// 3 second timeout
conn, err := net.DialTimeout("tcp", address, 3*time.Second)
if err != nil {
log.Info("Checking tcp port: ", port, " failed, retrying...")
} else {
if conn != nil {
log.Info("Checking tcp port: ", port, " - OK")
_ = conn.Close()
return
} else {
log.Info("Checking tcp port: ", port, " failed, retrying...")
}
}
}
}
//== Core functions ==//
// Run periodic registration of producers
func periodic_registration(evtch chan int) {
var delay int = 1
for {
select {
case msg := <-evtch:
if msg == 0 { // Stop thread
return
}
case <-time.After(time.Duration(delay) * time.Second):
ok := register_producer()
if ok {
delay = registration_delay_long
} else {
if delay < registration_delay_long {
delay += registration_delay_short
} else {
delay = registration_delay_short
}
}
}
}
}
func register_producer() bool {
log.Info("Registering producer: ", producer_instance_name)
file, err := os.ReadFile(config_file)
if err != nil {
log.Error("Cannot read config file: ", config_file)
log.Error("Registering producer: ", producer_instance_name, " - failed")
return false
}
data := DataTypes{}
err = jsoniter.Unmarshal([]byte(file), &data)
if err != nil {
log.Error("Cannot parse config file: ", config_file)
log.Error("Registering producer: ", producer_instance_name, " - failed")
return false
}
var new_type_names []string
for i := 0; i < len(data.ProdDataTypes); i++ {
t1 := make(map[string]interface{})
t2 := make(map[string]interface{})
t2["schema"] = "http://json-schema.org/draft-07/schema#"
t2["title"] = data.ProdDataTypes[i].ID
t2["description"] = data.ProdDataTypes[i].ID
t2["type"] = "object"
t1["info_job_data_schema"] = t2
json, err := json.Marshal(t1)
if err != nil {
log.Error("Cannot create json for type: ", data.ProdDataTypes[i].ID)
log.Error("Registering producer: ", producer_instance_name, " - failed")
return false
} else {
ok := send_http_request(json, http.MethodPut, "http://"+ics_server+"/data-producer/v1/info-types/"+data.ProdDataTypes[i].ID, true, creds_grant_type != "")
if !ok {
log.Error("Cannot register type: ", data.ProdDataTypes[i].ID)
log.Error("Registering producer: ", producer_instance_name, " - failed")
return false
}
new_type_names = append(new_type_names, data.ProdDataTypes[i].ID)
}
}
log.Debug("Registering types: ", new_type_names)
m := make(map[string]interface{})
m["supported_info_types"] = new_type_names
m["info_job_callback_url"] = "http://" + self + "/callbacks/job/" + producer_instance_name
m["info_producer_supervision_callback_url"] = "http://" + self + "/callbacks/supervision/" + producer_instance_name
json, err := json.Marshal(m)
if err != nil {
log.Error("Cannot create json for producer: ", producer_instance_name)
log.Error("Registering producer: ", producer_instance_name, " - failed")
return false
}
ok := send_http_request(json, http.MethodPut, "http://"+ics_server+"/data-producer/v1/info-producers/"+producer_instance_name, true, creds_grant_type != "")
if !ok {
log.Error("Cannot register producer: ", producer_instance_name)
log.Error("Registering producer: ", producer_instance_name, " - failed")
return false
}
datalock.Lock()
defer datalock.Unlock()
var current_type_names []string
for _, v := range InfoTypes.ProdDataTypes {
current_type_names = append(current_type_names, v.ID)
if contains_str(new_type_names, v.ID) {
//Type exist
log.Debug("Type ", v.ID, " exists")
create_ext_job(v)
} else {
//Type is removed
log.Info("Removing type job for type: ", v.ID, " Type not in configuration")
remove_type_job(v)
}
}
for _, v := range data.ProdDataTypes {
if contains_str(current_type_names, v.ID) {
//Type exist
log.Debug("Type ", v.ID, " exists")
create_ext_job(v)
} else {
//Type is new
log.Info("Adding type job for type: ", v.ID, " Type added to configuration")
start_type_job(v)
}
}
InfoTypes = data
log.Debug("Datatypes: ", InfoTypes)
log.Info("Registering producer: ", producer_instance_name, " - OK")
return true
}
func remove_type_job(dp DataType) {
log.Info("Removing type job: ", dp.ID)
j, ok := TypeJobs[dp.ID]
if ok {
j.reader_control <- ReaderControl{"EXIT"}
}
if dp.ext_job_created == true {
dp.ext_job_id = dp.InputJobType + "_" + generate_uuid_from_type(dp.InputJobType)
ok := send_http_request(*dp.ext_job, http.MethodDelete, "http://"+ics_server+"/data-consumer/v1/info-jobs/"+dp.ext_job_id, true, creds_grant_type != "")
if !ok {
log.Error("Cannot delete job: ", dp.ext_job_id)
}
dp.ext_job_created = false
dp.ext_job = nil
}
}
func start_type_job(dp DataType) {
log.Info("Starting type job: ", dp.ID)
job_record := TypeJobRecord{}
job_record.job_control = make(chan JobControl, 1)
job_record.reader_control = make(chan ReaderControl, 1)
job_record.data_in_channel = make(chan *KafkaPayload, reader_queue_length)
job_record.InfoType = dp.ID
job_record.InputTopic = dp.KafkaInputTopic
job_record.groupId = "kafka-procon-" + dp.ID
job_record.clientId = dp.ID + "-" + os.Getenv("KP")
var stats TypeJobStats
job_record.statistics = &stats
switch dp.ID {
case "xml-file-data-to-filestore":
go start_job_xml_file_data(dp.ID, job_record.job_control, job_record.data_in_channel, data_out_channel, "", "pm-files-json")
case "xml-file-data":
go start_job_xml_file_data(dp.ID, job_record.job_control, job_record.data_in_channel, data_out_channel, files_volume, "")
case "json-file-data-from-filestore":
go start_job_json_file_data(dp.ID, job_record.job_control, job_record.data_in_channel, data_out_channel, true)
case "json-file-data":
go start_job_json_file_data(dp.ID, job_record.job_control, job_record.data_in_channel, data_out_channel, false)
case "json-file-data-from-filestore-to-influx":
go start_job_json_file_data_influx(dp.ID, job_record.job_control, job_record.data_in_channel, true)
default:
}
go start_topic_reader(dp.KafkaInputTopic, dp.ID, job_record.reader_control, job_record.data_in_channel, job_record.groupId, job_record.clientId, &stats)
TypeJobs[dp.ID] = job_record
log.Debug("Type job input type: ", dp.InputJobType)
create_ext_job(dp)
}
func create_ext_job(dp DataType) {
if dp.InputJobType != "" {
jb := make(map[string]interface{})
jb["info_type_id"] = dp.InputJobType
jb["job_owner"] = "console" //TODO:
jb["status_notification_uri"] = "http://callback:80/post"
jb1 := make(map[string]interface{})
jb["job_definition"] = jb1
jb1["kafkaOutputTopic"] = dp.InputJobDefinition.KafkaOutputTopic
json, err := json.Marshal(jb)
dp.ext_job_created = false
dp.ext_job = nil
if err != nil {
log.Error("Cannot create json for type: ", dp.InputJobType)
return
}
dp.ext_job_id = dp.InputJobType + "_" + generate_uuid_from_type(dp.InputJobType)
ok := false
for !ok {
ok = send_http_request(json, http.MethodPut, "http://"+ics_server+"/data-consumer/v1/info-jobs/"+dp.ext_job_id, true, creds_grant_type != "")
if !ok {
log.Error("Cannot register job: ", dp.InputJobType)
}
}
log.Debug("Registered job ok: ", dp.InputJobType)
dp.ext_job_created = true
dp.ext_job = &json
}
}
func remove_info_job(jobid string) {
log.Info("Removing info job: ", jobid)
filter := Filter{}
filter.JobId = jobid
jc := JobControl{}
jc.command = "REMOVE-FILTER"
jc.filter = filter
infoJob := InfoJobs[jobid]
typeJob := TypeJobs[infoJob.job_info.InfoTypeIdentity]
typeJob.job_control <- jc
delete(InfoJobs, jobid)
}
// == Helper functions ==//
// Function to check the status of a mutex lock
func MutexLocked(m *sync.Mutex) bool {
state := reflect.ValueOf(m).Elem().FieldByName("state")
return state.Int()&mutexLocked == mutexLocked
}
// Test if slice contains a string
func contains_str(s []string, e string) bool {
for _, a := range s {
if a == e {
return true
}
}
return false
}
// Send a http request with json (json may be nil)
func send_http_request(json []byte, method string, url string, retry bool, useAuth bool) bool {
// set the HTTP method, url, and request body
var req *http.Request
var err error
if json == nil {
req, err = http.NewRequest(method, url, http.NoBody)
} else {
req, err = http.NewRequest(method, url, bytes.NewBuffer(json))
req.Header.Set("Content-Type", "application/json; charset=utf-8")
}
if err != nil {
log.Error("Cannot create http request, method: ", method, " url: ", url)
return false
}
if useAuth {
token, err := fetch_token()
if err != nil {
log.Error("Cannot fetch token for http request: ", err)
return false
}
req.Header.Set("Authorization", "Bearer "+token.TokenValue)
}
log.Debug("HTTP request: ", req)
log.Debug("Sending http request")
resp, err2 := httpclient.Do(req)
if err2 != nil {
log.Error("Http request error: ", err2)
log.Error("Cannot send http request method: ", method, " url: ", url)
} else {
if resp.StatusCode == 200 || resp.StatusCode == 201 || resp.StatusCode == 204 {
log.Debug("Accepted http status: ", resp.StatusCode)
resp.Body.Close()
return true
}
log.Debug("HTTP resp: ", resp)
resp.Body.Close()
}
return false
}
func fetch_token() (*kafka.OAuthBearerToken, error) {
log.Debug("Get token inline")
conf := &clientcredentials.Config{
ClientID: creds_client_id,
ClientSecret: creds_client_secret,
TokenURL: creds_service_url,
}
token, err := conf.Token(context.Background())
if err != nil {
log.Warning("Cannot fetch access token: ", err)
return nil, err
}
extensions := map[string]string{}
log.Debug("=====================================================")
log.Debug("token: ", token)
log.Debug("=====================================================")
log.Debug("TokenValue: ", token.AccessToken)
log.Debug("=====================================================")
log.Debug("Expiration: ", token.Expiry)
t := token.Expiry
oauthBearerToken := kafka.OAuthBearerToken{
TokenValue: token.AccessToken,
Expiration: t,
Extensions: extensions,
}
return &oauthBearerToken, nil
}
// Function to print memory details
// https://pkg.go.dev/runtime#MemStats
func PrintMemUsage() {
if log.IsLevelEnabled(log.DebugLevel) || log.IsLevelEnabled(log.TraceLevel) {
var m runtime.MemStats
runtime.ReadMemStats(&m)
fmt.Printf("Alloc = %v MiB", bToMb(m.Alloc))
fmt.Printf("\tTotalAlloc = %v MiB", bToMb(m.TotalAlloc))
fmt.Printf("\tSys = %v MiB", bToMb(m.Sys))
fmt.Printf("\tNumGC = %v\n", m.NumGC)
fmt.Printf("HeapSys = %v MiB", bToMb(m.HeapSys))
fmt.Printf("\tStackSys = %v MiB", bToMb(m.StackSys))
fmt.Printf("\tMSpanSys = %v MiB", bToMb(m.MSpanSys))
fmt.Printf("\tMCacheSys = %v MiB", bToMb(m.MCacheSys))
fmt.Printf("\tBuckHashSys = %v MiB", bToMb(m.BuckHashSys))
fmt.Printf("\tGCSys = %v MiB", bToMb(m.GCSys))
fmt.Printf("\tOtherSys = %v MiB\n", bToMb(m.OtherSys))
}
}
func bToMb(b uint64) uint64 {
return b / 1024 / 1024
}
func generate_uuid_from_type(s string) string {
if len(s) > 16 {
s = s[:16]
}
for len(s) < 16 {
s = s + "0"
}
b := []byte(s)
b = b[:16]
uuid, _ := uuid.FromBytes(b)
return uuid.String()
}
// Write gzipped data to a Writer
func gzipWrite(w io.Writer, data *[]byte) error {
gw, err1 := gzip.NewWriterLevel(w, gzip.BestSpeed)
if err1 != nil {
return err1
}
defer gw.Close()
_, err2 := gw.Write(*data)
return err2
}
// Write gunzipped data from Reader to a Writer
func gunzipReaderToWriter(w io.Writer, data io.Reader) error {
gr, err1 := gzip.NewReader(data)
if err1 != nil {
return err1
}
defer gr.Close()
data2, err2 := io.ReadAll(gr)
if err2 != nil {
return err2
}
_, err3 := w.Write(data2)
if err3 != nil {
return err3
}
return nil
}
func create_minio_bucket(mc *minio.Client, client_id string, bucket string) error {
tctx := context.Background()
err := mc.MakeBucket(tctx, bucket, minio.MakeBucketOptions{Region: bucket_location})
if err != nil {
// Check to see if we already own this bucket (which happens if you run this twice)
exists, errBucketExists := mc.BucketExists(tctx, bucket)
if errBucketExists == nil && exists {
log.Debug("Already own bucket:", bucket)
add_bucket(client_id, bucket)
return nil
} else {
log.Error("Cannot create or check bucket ", bucket, " in minio client", err)
return err
}
}
log.Debug("Successfully created bucket: ", bucket)
add_bucket(client_id, bucket)
return nil
}
func check_minio_bucket(mc *minio.Client, client_id string, bucket string) bool {
ok := bucket_exist(client_id, bucket)
if ok {
return true
}
tctx := context.Background()
exists, err := mc.BucketExists(tctx, bucket)
if err == nil && exists {
log.Debug("Already own bucket:", bucket)
return true
}
log.Error("Bucket does not exist, bucket ", bucket, " in minio client", err)
return false
}
func add_bucket(minio_id string, bucket string) {
datalock.Lock()
defer datalock.Unlock()
b, ok := minio_bucketlist[minio_id]
if !ok {
b = Minio_buckets{}
b.Buckets = make(map[string]bool)
}
b.Buckets[bucket] = true
minio_bucketlist[minio_id] = b
}
func bucket_exist(minio_id string, bucket string) bool {
datalock.Lock()
defer datalock.Unlock()
b, ok := minio_bucketlist[minio_id]
if !ok {
return false
}
_, ok = b.Buckets[bucket]
return ok
}
//== http api functions ==//
// create/update job
func create_job(w http.ResponseWriter, req *http.Request) {
log.Debug("Create job, http method: ", req.Method)
if req.Method != http.MethodPost {
log.Error("Create job, http method not allowed")
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
ct := req.Header.Get("Content-Type")
if ct != "application/json" {
log.Error("Create job, bad content type")
http.Error(w, "Bad content type", http.StatusBadRequest)
return
}
var t InfoJobDataType
err := json.NewDecoder(req.Body).Decode(&t)
if err != nil {
log.Error("Create job, cannot parse json,", err)
http.Error(w, "Cannot parse json", http.StatusBadRequest)
return
}
log.Debug("Creating job, id: ", t.InfoJobIdentity)
datalock.Lock()
defer datalock.Unlock()
job_id := t.InfoJobIdentity
job_record, job_found := InfoJobs[job_id]
type_job_record, found_type := TypeJobs[t.InfoTypeIdentity]
if !job_found {
if !found_type {
log.Error("Type ", t.InfoTypeIdentity, " does not exist")
http.Error(w, "Type "+t.InfoTypeIdentity+" does not exist", http.StatusBadRequest)
return
}
} else if t.InfoTypeIdentity != job_record.job_info.InfoTypeIdentity {
log.Error("Job cannot change type")
http.Error(w, "Job cannot change type", http.StatusBadRequest)
return
} else if t.InfoJobData.KafkaOutputTopic != job_record.job_info.InfoJobData.KafkaOutputTopic {
log.Error("Job cannot change topic")
http.Error(w, "Job cannot change topic", http.StatusBadRequest)
return
} else if !found_type {
//Should never happen, if the type is removed then job is stopped
log.Error("Type ", t.InfoTypeIdentity, " does not exist")
http.Error(w, "Type "+t.InfoTypeIdentity+" does not exist", http.StatusBadRequest)
return
}
if !job_found {
job_record = InfoJobRecord{}
job_record.job_info = t
output_topic := t.InfoJobData.KafkaOutputTopic
job_record.output_topic = t.InfoJobData.KafkaOutputTopic
log.Debug("Starting infojob ", job_id, ", type ", t.InfoTypeIdentity, ", output topic", output_topic)
var stats InfoJobStats
job_record.statistics = &stats
filter := Filter{}
filter.JobId = job_id
filter.OutputTopic = job_record.output_topic
jc := JobControl{}
jc.command = "ADD-FILTER"
if t.InfoTypeIdentity == "json-file-data-from-filestore" || t.InfoTypeIdentity == "json-file-data" || t.InfoTypeIdentity == "json-file-data-from-filestore-to-influx" {
fm := FilterMaps{}
fm.sourceNameMap = make(map[string]bool)
fm.measObjClassMap = make(map[string]bool)
fm.measObjInstIdsMap = make(map[string]bool)
fm.measTypesMap = make(map[string]bool)
if t.InfoJobData.FilterParams.MeasuredEntityDns != nil {
for _, v := range t.InfoJobData.FilterParams.MeasuredEntityDns {
fm.sourceNameMap[v] = true
}
}
if t.InfoJobData.FilterParams.MeasObjClass != nil {
for _, v := range t.InfoJobData.FilterParams.MeasObjClass {
fm.measObjClassMap[v] = true
}
}
if t.InfoJobData.FilterParams.MeasObjInstIds != nil {
for _, v := range t.InfoJobData.FilterParams.MeasObjInstIds {
fm.measObjInstIdsMap[v] = true
}
}
if t.InfoJobData.FilterParams.MeasTypes != nil {
for _, v := range t.InfoJobData.FilterParams.MeasTypes {
fm.measTypesMap[v] = true
}
}
filter.filter = fm
}
if t.InfoTypeIdentity == "json-file-data-from-filestore-to-influx" {
influxparam := InfluxJobParameters{}
influxparam.DbUrl = t.InfoJobData.DbUrl
influxparam.DbOrg = t.InfoJobData.DbOrg
influxparam.DbBucket = t.InfoJobData.DbBucket
influxparam.DbToken = t.InfoJobData.DbToken
filter.influxParameters = influxparam
}
jc.filter = filter
InfoJobs[job_id] = job_record
type_job_record.job_control <- jc
} else {
//TODO
//Update job
}
}
// delete job
func delete_job(w http.ResponseWriter, req *http.Request) {
if req.Method != http.MethodDelete {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
datalock.Lock()
defer datalock.Unlock()
vars := mux.Vars(req)
if id, ok := vars["job_id"]; ok {
if _, ok := InfoJobs[id]; ok {
remove_info_job(id)
w.WriteHeader(http.StatusNoContent)
log.Info("Job ", id, " deleted")
return
}
}
w.WriteHeader(http.StatusNotFound)
}
// job supervision
func supervise_job(w http.ResponseWriter, req *http.Request) {
if req.Method != http.MethodGet {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
datalock.Lock()
defer datalock.Unlock()
vars := mux.Vars(req)
log.Debug("Supervising, job: ", vars["job_id"])
if id, ok := vars["job_id"]; ok {
if _, ok := InfoJobs[id]; ok {
log.Debug("Supervision ok, job", id)
return
}
}
w.WriteHeader(http.StatusNotFound)
}
// producer supervision
func supervise_producer(w http.ResponseWriter, req *http.Request) {
if req.Method != http.MethodGet {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
w.WriteHeader(http.StatusOK)
}
// producer statistics, all jobs
func statistics(w http.ResponseWriter, req *http.Request) {
if req.Method != http.MethodGet {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
m := make(map[string]interface{})
log.Debug("producer statictics, locked? ", MutexLocked(&datalock))
datalock.Lock()
defer datalock.Unlock()
req.Header.Set("Content-Type", "application/json; charset=utf-8")
m["number-of-jobs"] = len(InfoJobs)
m["number-of-types"] = len(InfoTypes.ProdDataTypes)
qm := make(map[string]interface{})
m["jobs"] = qm
for key, elem := range InfoJobs {
jm := make(map[string]interface{})
qm[key] = jm
jm["type"] = elem.job_info.InfoTypeIdentity
typeJob := TypeJobs[elem.job_info.InfoTypeIdentity]
jm["groupId"] = typeJob.groupId
jm["clientID"] = typeJob.clientId
jm["input topic"] = typeJob.InputTopic
jm["input queue length - job ("+fmt.Sprintf("%v", cap(typeJob.data_in_channel))+")"] = len(typeJob.data_in_channel)
jm["output topic"] = elem.output_topic
jm["output queue length - producer ("+fmt.Sprintf("%v", cap(data_out_channel))+")"] = len(data_out_channel)
jm["msg_in (type)"] = typeJob.statistics.in_msg_cnt
jm["msg_out (job)"] = elem.statistics.out_msg_cnt
}
json, err := json.Marshal(m)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
log.Error("Cannot marshal statistics json")
return
}
_, err = w.Write(json)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
log.Error("Cannot send statistics json")
return
}
}
// Simple alive check
func alive(w http.ResponseWriter, req *http.Request) {
//Alive check
}
// Get/Set logging level
func logging_level(w http.ResponseWriter, req *http.Request) {
vars := mux.Vars(req)
if level, ok := vars["level"]; ok {
if req.Method == http.MethodPut {
switch level {
case "trace":
log.SetLevel(log.TraceLevel)
case "debug":
log.SetLevel(log.DebugLevel)
case "info":
log.SetLevel(log.InfoLevel)
case "warn":
log.SetLevel(log.WarnLevel)
case "error":
log.SetLevel(log.ErrorLevel)
case "fatal":
log.SetLevel(log.FatalLevel)
case "panic":
log.SetLevel(log.PanicLevel)
default:
w.WriteHeader(http.StatusNotFound)
}
} else {
w.WriteHeader(http.StatusMethodNotAllowed)
}
} else {
if req.Method == http.MethodGet {
msg := "none"
if log.IsLevelEnabled(log.PanicLevel) {
msg = "panic"
} else if log.IsLevelEnabled(log.FatalLevel) {
msg = "fatal"
} else if log.IsLevelEnabled(log.ErrorLevel) {
msg = "error"
} else if log.IsLevelEnabled(log.WarnLevel) {
msg = "warn"
} else if log.IsLevelEnabled(log.InfoLevel) {
msg = "info"
} else if log.IsLevelEnabled(log.DebugLevel) {
msg = "debug"
} else if log.IsLevelEnabled(log.TraceLevel) {
msg = "trace"
}
w.Header().Set("Content-Type", "application/text")
w.Write([]byte(msg))
} else {
w.WriteHeader(http.StatusMethodNotAllowed)
}
}
}
func start_topic_reader(topic string, type_id string, control_ch chan ReaderControl, data_ch chan *KafkaPayload, gid string, cid string, stats *TypeJobStats) {
log.Info("Topic reader starting, topic: ", topic, " for type: ", type_id)
topic_ok := false
var c *kafka.Consumer = nil
running := true
for topic_ok == false {
select {
case reader_ctrl := <-control_ch:
if reader_ctrl.command == "EXIT" {
log.Info("Topic reader on topic: ", topic, " for type: ", type_id, " - stopped")
data_ch <- nil //Signal to job handler
running = false
return
}
case <-time.After(1 * time.Second):
if !running {
return
}
if c == nil {
c = create_kafka_consumer(type_id, gid, cid)
if c == nil {
log.Info("Cannot start consumer on topic: ", topic, " for type: ", type_id, " - retrying")
} else {
log.Info("Consumer started on topic: ", topic, " for type: ", type_id)
}
}
if c != nil && topic_ok == false {
err := c.SubscribeTopics([]string{topic}, nil)
if err != nil {
log.Info("Topic reader cannot start subscribing on topic: ", topic, " for type: ", type_id, " - retrying -- error details: ", err)
} else {
log.Info("Topic reader subscribing on topic: ", topic, " for type: ", type_id)
topic_ok = true
}
}
}
}
log.Info("Topic reader ready on topic: ", topic, " for type: ", type_id)
var event_chan = make(chan int)
go func() {
for {
select {
case evt := <-c.Events():
switch evt.(type) {
case kafka.OAuthBearerTokenRefresh:
log.Debug("New consumer token needed: ", evt)
token, err := fetch_token()
if err != nil {
log.Warning("Cannot cannot fetch token: ", err)
c.SetOAuthBearerTokenFailure(err.Error())
} else {
setTokenError := c.SetOAuthBearerToken(*token)
if setTokenError != nil {
log.Warning("Cannot cannot set token: ", setTokenError)
c.SetOAuthBearerTokenFailure(setTokenError.Error())
}
}
default:
log.Debug("Dumping topic reader event on topic: ", topic, " for type: ", type_id, " evt: ", evt.String())
}
case msg := <-event_chan:
if msg == 0 {
return
}
case <-time.After(1 * time.Second):
if !running {
return
}
}
}
}()
go func() {
for {
for {
select {
case reader_ctrl := <-control_ch:
if reader_ctrl.command == "EXIT" {
event_chan <- 0
log.Debug("Topic reader on topic: ", topic, " for type: ", type_id, " - stopped")
data_ch <- nil //Signal to job handler
defer c.Close()
return
}
default:
ev := c.Poll(1000)
if ev == nil {
log.Debug("Topic Reader for type: ", type_id, " Nothing to consume on topic: ", topic)
continue
}
switch e := ev.(type) {
case *kafka.Message:
var kmsg KafkaPayload
kmsg.msg = e
c.Commit()
data_ch <- &kmsg
stats.in_msg_cnt++
log.Debug("Reader msg: ", &kmsg)
log.Debug("Reader - data_ch ", data_ch)
case kafka.Error:
fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
case kafka.OAuthBearerTokenRefresh:
log.Debug("New consumer token needed: ", ev)
token, err := fetch_token()
if err != nil {
log.Warning("Cannot cannot fetch token: ", err)
c.SetOAuthBearerTokenFailure(err.Error())
} else {
setTokenError := c.SetOAuthBearerToken(*token)
if setTokenError != nil {
log.Warning("Cannot cannot set token: ", setTokenError)
c.SetOAuthBearerTokenFailure(setTokenError.Error())
}
}
default:
fmt.Printf("Ignored %v\n", e)
}
}
}
}
}()
}
func start_topic_writer(control_ch chan WriterControl, data_ch chan *KafkaPayload) {
var kafka_producer *kafka.Producer
running := true
log.Info("Topic writer starting")
// Wait for kafka producer to become available - and be prepared to exit the writer
for kafka_producer == nil {
select {
case writer_ctl := <-control_ch:
if writer_ctl.command == "EXIT" {
//ignore cmd
}
default:
kafka_producer = start_producer()
if kafka_producer == nil {
log.Debug("Could not start kafka producer - retrying")
time.Sleep(1 * time.Second)
} else {
log.Debug("Kafka producer started")
}
}
}
var event_chan = make(chan int)
go func() {
for {
select {
case evt := <-kafka_producer.Events():
switch evt.(type) {
case *kafka.Message:
m := evt.(*kafka.Message)
if m.TopicPartition.Error != nil {
log.Debug("Dumping topic writer event, failed: ", m.TopicPartition.Error)
} else {
log.Debug("Dumping topic writer event, message to topic: ", *m.TopicPartition.Topic, " at offset: ", m.TopicPartition.Offset, " at partition: ", m.TopicPartition.Partition)
}
case kafka.Error:
log.Debug("Dumping topic writer event, error: ", evt)
case kafka.OAuthBearerTokenRefresh:
log.Debug("New producer token needed: ", evt)
token, err := fetch_token()
if err != nil {
log.Warning("Cannot cannot fetch token: ", err)
kafka_producer.SetOAuthBearerTokenFailure(err.Error())
} else {
setTokenError := kafka_producer.SetOAuthBearerToken(*token)
if setTokenError != nil {
log.Warning("Cannot cannot set token: ", setTokenError)
kafka_producer.SetOAuthBearerTokenFailure(setTokenError.Error())
}
}
default:
log.Debug("Dumping topic writer event, unknown: ", evt)
}
case msg := <-event_chan:
if msg == 0 {
return
}
case <-time.After(1 * time.Second):
if !running {
return
}
}
}
}()
go func() {
for {
select {
case writer_ctl := <-control_ch:
if writer_ctl.command == "EXIT" {
// ignore - wait for channel signal
}
case kmsg := <-data_ch:
if kmsg == nil {
event_chan <- 0
log.Info("Topic writer stopped by channel signal - start_topic_writer")
defer kafka_producer.Close()
return
}
retries := 10
msg_ok := false
var err error
for retry := 1; retry <= retries && msg_ok == false; retry++ {
err = kafka_producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &kmsg.topic, Partition: kafka.PartitionAny},
Value: kmsg.msg.Value, Key: kmsg.msg.Key}, nil)
if err == nil {
incr_out_msg_cnt(kmsg.jobid)
msg_ok = true
log.Debug("Topic writer, msg sent ok on topic: ", kmsg.topic)
} else {
log.Info("Topic writer failed to send message on topic: ", kmsg.topic, " - Retrying. Error details: ", err)
time.Sleep(time.Duration(retry) * time.Second)
}
}
if !msg_ok {
log.Error("Topic writer failed to send message on topic: ", kmsg.topic, " - Msg discarded. Error details: ", err)
}
case <-time.After(1000 * time.Millisecond):
if !running {
return
}
}
}
}()
}
func create_kafka_consumer(type_id string, gid string, cid string) *kafka.Consumer {
var cm kafka.ConfigMap
if creds_grant_type == "" {
log.Info("Creating kafka SASL plain text consumer for type: ", type_id)
cm = kafka.ConfigMap{
"bootstrap.servers": bootstrapserver,
"group.id": gid,
"client.id": cid,
"auto.offset.reset": "latest",
"enable.auto.commit": false,
}
} else {
log.Info("Creating kafka SASL plain text consumer for type: ", type_id)
cm = kafka.ConfigMap{
"bootstrap.servers": bootstrapserver,
"group.id": gid,
"client.id": cid,
"auto.offset.reset": "latest",
"enable.auto.commit": false,
"sasl.mechanism": "OAUTHBEARER",
"security.protocol": "SASL_PLAINTEXT",
}
}
c, err := kafka.NewConsumer(&cm)
if err != nil {
log.Error("Cannot create kafka consumer for type: ", type_id, ", error details: ", err)
return nil
}
log.Info("Created kafka consumer for type: ", type_id, " OK")
return c
}
// Start kafka producer
func start_producer() *kafka.Producer {
log.Info("Creating kafka producer")
var cm kafka.ConfigMap
if creds_grant_type == "" {
log.Info("Creating kafka SASL plain text producer")
cm = kafka.ConfigMap{
"bootstrap.servers": bootstrapserver,
}
} else {
log.Info("Creating kafka SASL plain text producer")
cm = kafka.ConfigMap{
"bootstrap.servers": bootstrapserver,
"sasl.mechanism": "OAUTHBEARER",
"security.protocol": "SASL_PLAINTEXT",
}
}
p, err := kafka.NewProducer(&cm)
if err != nil {
log.Error("Cannot create kafka producer,", err)
return nil
}
return p
}
func start_adminclient() *kafka.AdminClient {
log.Info("Creating kafka admin client")
a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": bootstrapserver})
if err != nil {
log.Error("Cannot create kafka admin client,", err)
return nil
}
return a
}
func create_minio_client(id string) (*minio.Client, *error) {
log.Debug("Get minio client")
minio_client, err := minio.New(filestore_server, &minio.Options{
Secure: false,
Creds: credentials.NewStaticV4(filestore_user, filestore_pwd, ""),
})
if err != nil {
log.Error("Cannot create minio client, ", err)
return nil, &err
}
return minio_client, nil
}
func incr_out_msg_cnt(jobid string) {
j, ok := InfoJobs[jobid]
if ok {
j.statistics.out_msg_cnt++
}
}
func start_job_xml_file_data(type_id string, control_ch chan JobControl, data_in_ch chan *KafkaPayload, data_out_channel chan *KafkaPayload, fvolume string, fsbucket string) {
log.Info("Type job", type_id, " started")
filters := make(map[string]Filter)
topic_list := make(map[string]string)
var mc *minio.Client
const mc_id = "mc_" + "start_job_xml_file_data"
running := true
for {
select {
case job_ctl := <-control_ch:
log.Debug("Type job ", type_id, " new cmd received ", job_ctl.command)
switch job_ctl.command {
case "EXIT":
//ignore cmd - handled by channel signal
case "ADD-FILTER":
filters[job_ctl.filter.JobId] = job_ctl.filter
log.Debug("Type job ", type_id, " updated, topic: ", job_ctl.filter.OutputTopic, " jobid: ", job_ctl.filter.JobId)
tmp_topic_list := make(map[string]string)
for k, v := range topic_list {
tmp_topic_list[k] = v
}
tmp_topic_list[job_ctl.filter.JobId] = job_ctl.filter.OutputTopic
topic_list = tmp_topic_list
case "REMOVE-FILTER":
log.Debug("Type job ", type_id, " removing job: ", job_ctl.filter.JobId)
tmp_topic_list := make(map[string]string)
for k, v := range topic_list {
tmp_topic_list[k] = v
}
delete(tmp_topic_list, job_ctl.filter.JobId)
topic_list = tmp_topic_list
}
case msg := <-data_in_ch:
if msg == nil {
log.Info("Type job ", type_id, " stopped by channel signal - start_job_xml_file_data")
running = false
return
}
if fsbucket != "" && fvolume == "" {
if mc == nil {
var err *error
mc, err = create_minio_client(mc_id)
if err != nil {
log.Debug("Cannot create minio client for type job: ", type_id)
}
}
}
jobLimiterChan <- struct{}{}
go run_xml_job(type_id, msg, "gz", data_out_channel, topic_list, jobLimiterChan, fvolume, fsbucket, mc, mc_id)
case <-time.After(1 * time.Second):
if !running {
return
}
}
}
}
func run_xml_job(type_id string, msg *KafkaPayload, outputCompression string, data_out_channel chan *KafkaPayload, topic_list map[string]string, jobLimiterChan chan struct{}, fvolume string, fsbucket string, mc *minio.Client, mc_id string) {
defer func() {
<-jobLimiterChan
}()
PrintMemUsage()
if fvolume == "" && fsbucket == "" {
log.Error("Type job: ", type_id, " cannot run, neither file volume nor filestore is set - discarding message")
return
} else if (fvolume != "") && (fsbucket != "") {
log.Error("Type job: ", type_id, " cannot run with output to both file volume and filestore bucket - discarding message")
return
}
start := time.Now()
var evt_data XmlFileEventHeader
err := jsoniter.Unmarshal(msg.msg.Value, &evt_data)
if err != nil {
log.Error("Cannot parse XmlFileEventHeader for type job: ", type_id, " - discarding message, error details", err)
return
}
log.Debug("Unmarshal file-collect event for type job: ", type_id, " time: ", time.Since(start).String())
var reader io.Reader
INPUTBUCKET := "ropfiles"
filename := ""
if fvolume != "" {
filename = fvolume + "/" + evt_data.Name
fi, err := os.Open(filename)
if err != nil {
log.Error("File ", filename, " - cannot be opened for type job: ", type_id, " - discarding message, error details: ", err)
return
}
defer fi.Close()
reader = fi
} else {
filename = evt_data.Name
if mc != nil {
tctx := context.Background()
mr, err := mc.GetObject(tctx, INPUTBUCKET, filename, minio.GetObjectOptions{})
if err != nil {
log.Error("Cannot get: ", filename, " for type job: ", type_id, " - discarding message, error details: ", err)
return
}
if mr == nil {
log.Error("Cannot get: ", filename, " for type job: ", type_id, " - minio get object returned null reader - discarding message, error details: ", err)
return
}
reader = mr
defer mr.Close()
} else {
log.Error("Cannot get: ", filename, " for type job: ", type_id, " - no minio client - discarding message")
return
}
}
if reader == nil {
log.Error("Cannot get: ", filename, " - null reader")
return
}
var file_bytes []byte
if strings.HasSuffix(filename, "gz") {
start := time.Now()
var buf3 bytes.Buffer
errb := gunzipReaderToWriter(&buf3, reader)
if errb != nil {
log.Error("Cannot gunzip file ", filename, " - discarding message, ", errb)
return
}
file_bytes = buf3.Bytes()
log.Debug("Gunzip file: ", filename, "time:", time.Since(start).String(), "len: ", len(file_bytes))
} else {
var buf3 bytes.Buffer
_, err2 := io.Copy(&buf3, reader)
if err2 != nil {
log.Error("File ", filename, " - cannot be read, discarding message, ", err)
return
}
file_bytes = buf3.Bytes()
}
start = time.Now()
b, err := xml_to_json_conv(&file_bytes, &evt_data)
if err != nil {
log.Error("Cannot convert file ", evt_data.Name, " - discarding message, ", err)
return
}
log.Debug("Converted file to json: ", filename, " time", time.Since(start).String(), "len;", len(b))
new_fn := evt_data.Name + os.Getenv("KP") + ".json"
if outputCompression == "gz" {
new_fn = new_fn + ".gz"
start = time.Now()
var buf bytes.Buffer
err = gzipWrite(&buf, &b)
if err != nil {
log.Error("Cannot gzip file ", new_fn, " - discarding message, ", err)
return
}
b = buf.Bytes()
log.Debug("Gzip file: ", new_fn, " time: ", time.Since(start).String(), "len:", len(file_bytes))
}
start = time.Now()
if fvolume != "" {
//Store on disk
err = os.WriteFile(fvolume+"/"+new_fn, b, 0644)
if err != nil {
log.Error("Cannot write file ", new_fn, " - discarding message,", err)
return
}
log.Debug("Write file to disk: "+new_fn, "time: ", time.Since(start).String(), " len: ", len(file_bytes))
} else if fsbucket != "" {
// Store in minio
objectName := new_fn
if mc != nil {
contentType := "application/json"
if strings.HasSuffix(objectName, ".gz") {
contentType = "application/gzip"
}
// Upload the xml file with PutObject
r := bytes.NewReader(b)
tctx := context.Background()
if check_minio_bucket(mc, mc_id, fsbucket) == false {
err := create_minio_bucket(mc, mc_id, fsbucket)
if err != nil {
log.Error("Cannot create bucket: ", fsbucket, ", ", err)
return
}
}
ok := false
for i := 1; i < 64 && ok == false; i = i * 2 {
info, err := mc.PutObject(tctx, fsbucket, objectName, r, int64(len(b)), minio.PutObjectOptions{ContentType: contentType})
if err != nil {
if i == 1 {
log.Warn("Cannot upload (first attempt): ", objectName, ", ", err)
} else {
log.Warn("Cannot upload (retry): ", objectName, ", ", err)
}
time.Sleep(time.Duration(i) * time.Second)
} else {
log.Debug("Store ", objectName, " in filestore, time: ", time.Since(start).String())
log.Debug("Successfully uploaded: ", objectName, " of size:", info.Size)
ok = true
}
}
if !ok {
log.Error("Cannot upload : ", objectName, ", ", err)
}
} else {
log.Error("Cannot upload: ", objectName, ", no client")
}
}
start = time.Now()
if fvolume == "" {
var fde FileDownloadedEvt
fde.Filename = new_fn
j, err := jsoniter.Marshal(fde)
if err != nil {
log.Error("Cannot marshal FileDownloadedEvt - discarding message, ", err)
return
}
msg.msg.Value = j
} else {
var fde FileDownloadedEvt
fde.Filename = new_fn
j, err := jsoniter.Marshal(fde)
if err != nil {
log.Error("Cannot marshal FileDownloadedEvt - discarding message, ", err)
return
}
msg.msg.Value = j
}
msg.msg.Key = []byte("\"" + evt_data.SourceName + "\"")
log.Debug("Marshal file-collect event ", time.Since(start).String())
for k, v := range topic_list {
var kmsg *KafkaPayload = new(KafkaPayload)
kmsg.msg = msg.msg
kmsg.topic = v
kmsg.jobid = k
data_out_channel <- kmsg
}
}
func xml_to_json_conv(f_byteValue *[]byte, xfeh *XmlFileEventHeader) ([]byte, error) {
var f MeasCollecFile
start := time.Now()
err := xml.Unmarshal(*f_byteValue, &f)
if err != nil {
return nil, errors.New("Cannot unmarshal xml-file")
}
log.Debug("Unmarshal xml file XmlFileEvent: ", time.Since(start).String())
start = time.Now()
var pmfile PMJsonFile
pmfile.Event.Perf3GppFields.Perf3GppFieldsVersion = "1.0"
pmfile.Event.Perf3GppFields.MeasDataCollection.GranularityPeriod = 900
pmfile.Event.Perf3GppFields.MeasDataCollection.MeasuredEntityUserName = ""
pmfile.Event.Perf3GppFields.MeasDataCollection.MeasuredEntityDn = f.FileHeader.FileSender.LocalDn
pmfile.Event.Perf3GppFields.MeasDataCollection.MeasuredEntitySoftwareVersion = f.MeasData.ManagedElement.SwVersion
for _, it := range f.MeasData.MeasInfo {
var mili MeasInfoList
mili.MeasInfoID.SMeasInfoID = it.MeasInfoId
for _, jt := range it.MeasType {
mili.MeasTypes.SMeasTypesList = append(mili.MeasTypes.SMeasTypesList, jt.Text)
}
for _, jt := range it.MeasValue {
var mv MeasValues
mv.MeasObjInstID = jt.MeasObjLdn
mv.SuspectFlag = jt.Suspect
if jt.Suspect == "" {
mv.SuspectFlag = "false"
}
for _, kt := range jt.R {
ni, _ := strconv.Atoi(kt.P)
nv := kt.Text
mr := MeasResults{ni, nv}
mv.MeasResultsList = append(mv.MeasResultsList, mr)
}
mili.MeasValuesList = append(mili.MeasValuesList, mv)
}
pmfile.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList = append(pmfile.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList, mili)
}
pmfile.Event.Perf3GppFields.MeasDataCollection.GranularityPeriod = 900
//TODO: Fill more values
pmfile.Event.CommonEventHeader.Domain = "" //xfeh.Domain
pmfile.Event.CommonEventHeader.EventID = "" //xfeh.EventID
pmfile.Event.CommonEventHeader.Sequence = 0 //xfeh.Sequence
pmfile.Event.CommonEventHeader.EventName = "" //xfeh.EventName
pmfile.Event.CommonEventHeader.SourceName = xfeh.SourceName
pmfile.Event.CommonEventHeader.ReportingEntityName = "" //xfeh.ReportingEntityName
pmfile.Event.CommonEventHeader.Priority = "" //xfeh.Priority
pmfile.Event.CommonEventHeader.StartEpochMicrosec = xfeh.StartEpochMicrosec
pmfile.Event.CommonEventHeader.LastEpochMicrosec = xfeh.LastEpochMicrosec
pmfile.Event.CommonEventHeader.Version = "" //xfeh.Version
pmfile.Event.CommonEventHeader.VesEventListenerVersion = "" //xfeh.VesEventListenerVersion
pmfile.Event.CommonEventHeader.TimeZoneOffset = xfeh.TimeZoneOffset
log.Debug("Convert xml to json : ", time.Since(start).String())
start = time.Now()
json, err := jsoniter.Marshal(pmfile)
log.Debug("Marshal json : ", time.Since(start).String())
if err != nil {
return nil, errors.New("Cannot marshal converted json")
}
return json, nil
}
func start_job_json_file_data(type_id string, control_ch chan JobControl, data_in_ch chan *KafkaPayload, data_out_channel chan *KafkaPayload, objectstore bool) {
log.Info("Type job", type_id, " started")
filters := make(map[string]Filter)
filterParams_list := make(map[string]FilterMaps)
topic_list := make(map[string]string)
var mc *minio.Client
const mc_id = "mc_" + "start_job_json_file_data"
running := true
for {
select {
case job_ctl := <-control_ch:
log.Debug("Type job ", type_id, " new cmd received ", job_ctl.command)
switch job_ctl.command {
case "EXIT":
case "ADD-FILTER":
filters[job_ctl.filter.JobId] = job_ctl.filter
log.Debug("Type job ", type_id, " updated, topic: ", job_ctl.filter.OutputTopic, " jobid: ", job_ctl.filter.JobId)
tmp_filterParams_list := make(map[string]FilterMaps)
for k, v := range filterParams_list {
tmp_filterParams_list[k] = v
}
tmp_filterParams_list[job_ctl.filter.JobId] = job_ctl.filter.filter
filterParams_list = tmp_filterParams_list
tmp_topic_list := make(map[string]string)
for k, v := range topic_list {
tmp_topic_list[k] = v
}
tmp_topic_list[job_ctl.filter.JobId] = job_ctl.filter.OutputTopic
topic_list = tmp_topic_list
case "REMOVE-FILTER":
log.Debug("Type job ", type_id, " removing job: ", job_ctl.filter.JobId)
tmp_filterParams_list := make(map[string]FilterMaps)
for k, v := range filterParams_list {
tmp_filterParams_list[k] = v
}
delete(tmp_filterParams_list, job_ctl.filter.JobId)
filterParams_list = tmp_filterParams_list
tmp_topic_list := make(map[string]string)
for k, v := range topic_list {
tmp_topic_list[k] = v
}
delete(tmp_topic_list, job_ctl.filter.JobId)
topic_list = tmp_topic_list
}
case msg := <-data_in_ch:
if msg == nil {
log.Info("Type job ", type_id, " stopped by channel signal - start_job_xml_file_data")
running = false
return
}
if objectstore {
if mc == nil {
var err *error
mc, err = create_minio_client(mc_id)
if err != nil {
log.Debug("Cannot create minio client for type job: ", type_id)
}
}
}
//TODO: Sort processed file conversions in order (FIFO)
jobLimiterChan <- struct{}{}
go run_json_job(type_id, msg, "", filterParams_list, topic_list, data_out_channel, jobLimiterChan, mc, mc_id, objectstore)
case <-time.After(1 * time.Second):
if !running {
return
}
}
}
}
func run_json_job(type_id string, msg *KafkaPayload, outputCompression string, filterList map[string]FilterMaps, topic_list map[string]string, data_out_channel chan *KafkaPayload, jobLimiterChan chan struct{}, mc *minio.Client, mc_id string, objectstore bool) {
//Release job limit
defer func() {
<-jobLimiterChan
}()
PrintMemUsage()
var evt_data FileDownloadedEvt
err := jsoniter.Unmarshal(msg.msg.Value, &evt_data)
if err != nil {
log.Error("Cannot parse FileDownloadedEvt for type job: ", type_id, " - discarding message, error details: ", err)
return
}
log.Debug("FileDownloadedEvt for job: ", type_id, " file: ", evt_data.Filename)
var reader io.Reader
INPUTBUCKET := "pm-files-json"
filename := ""
if objectstore == false {
filename = files_volume + "/" + evt_data.Filename
fi, err := os.Open(filename)
if err != nil {
log.Error("File: ", filename, "for type job: ", type_id, " - cannot be opened -discarding message - error details: ", err)
return
}
defer fi.Close()
reader = fi
} else {
filename = "/" + evt_data.Filename
if mc != nil {
if check_minio_bucket(mc, mc_id, INPUTBUCKET) == false {
log.Error("Bucket not available for reading in type job: ", type_id, "bucket: ", INPUTBUCKET)
return
}
tctx := context.Background()
mr, err := mc.GetObject(tctx, INPUTBUCKET, filename, minio.GetObjectOptions{})
if err != nil {
log.Error("Obj: ", filename, "for type job: ", type_id, " - cannot be fetched from bucket: ", INPUTBUCKET, " - discarding message, error details: ", err)
return
}
reader = mr
defer mr.Close()
} else {
log.Error("Cannot get obj: ", filename, "for type job: ", type_id, " from bucket: ", INPUTBUCKET, " - no client")
return
}
}
var data *[]byte
if strings.HasSuffix(filename, "gz") {
start := time.Now()
var buf2 bytes.Buffer
errb := gunzipReaderToWriter(&buf2, reader)
if errb != nil {
log.Error("Cannot decompress file/obj ", filename, "for type job: ", type_id, " - discarding message, error details", errb)
return
}
d := buf2.Bytes()
data = &d
log.Debug("Decompress file/obj ", filename, "for type job: ", type_id, " time:", time.Since(start).String())
} else {
start := time.Now()
d, err := io.ReadAll(reader)
if err != nil {
log.Error("Cannot read file/obj: ", filename, "for type job: ", type_id, " - discarding message, error details", err)
return
}
data = &d
log.Debug("Read file/obj: ", filename, "for type job: ", type_id, " time: ", time.Since(start).String())
}
for k, v := range filterList {
var pmfile PMJsonFile
start := time.Now()
err = jsoniter.Unmarshal(*data, &pmfile)
log.Debug("Json unmarshal obj: ", filename, " time: ", time.Since(start).String())
if err != nil {
log.Error("Msg could not be unmarshalled - discarding message, obj: ", filename, " - error details:", err)
return
}
var kmsg *KafkaPayload = new(KafkaPayload)
kmsg.msg = new(kafka.Message)
kmsg.msg.Key = []byte("\"" + pmfile.Event.CommonEventHeader.SourceName + "\"")
log.Debug("topic:", topic_list[k])
log.Debug("sourceNameMap:", v.sourceNameMap)
log.Debug("measObjClassMap:", v.measObjClassMap)
log.Debug("measObjInstIdsMap:", v.measObjInstIdsMap)
log.Debug("measTypesMap:", v.measTypesMap)
b := json_pm_filter_to_byte(evt_data.Filename, &pmfile, v.sourceNameMap, v.measObjClassMap, v.measObjInstIdsMap, v.measTypesMap)
if b == nil {
log.Info("File/obj ", filename, "for job: ", k, " - empty after filering, discarding message ")
return
}
kmsg.msg.Value = *b
kmsg.topic = topic_list[k]
kmsg.jobid = k
data_out_channel <- kmsg
}
}
func json_pm_filter_to_byte(resource string, data *PMJsonFile, sourceNameMap map[string]bool, measObjClassMap map[string]bool, measObjInstIdsMap map[string]bool, measTypesMap map[string]bool) *[]byte {
if json_pm_filter_to_obj(resource, data, sourceNameMap, measObjClassMap, measObjInstIdsMap, measTypesMap) == nil {
return nil
}
start := time.Now()
j, err := jsoniter.Marshal(&data)
log.Debug("Json marshal obj: ", resource, " time: ", time.Since(start).String())
if err != nil {
log.Error("Msg could not be marshalled discarding message, obj: ", resource, ", error details: ", err)
return nil
}
log.Debug("Filtered json obj: ", resource, " len: ", len(j))
return &j
}
func json_pm_filter_to_obj(resource string, data *PMJsonFile, sourceNameMap map[string]bool, measObjClassMap map[string]bool, measObjInstIdsMap map[string]bool, measTypesMap map[string]bool) *PMJsonFile {
filter_req := true
start := time.Now()
if len(sourceNameMap) != 0 {
if !sourceNameMap[data.Event.CommonEventHeader.SourceName] {
filter_req = false
return nil
}
}
if filter_req {
modified := false
var temp_mil []MeasInfoList
for _, zz := range data.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList {
check_cntr := false
var cnt_flags []bool
if len(measTypesMap) > 0 {
c_cntr := 0
var temp_mtl []string
for _, v := range zz.MeasTypes.SMeasTypesList {
if measTypesMap[v] {
cnt_flags = append(cnt_flags, true)
c_cntr++
temp_mtl = append(temp_mtl, v)
} else {
cnt_flags = append(cnt_flags, false)
}
}
if c_cntr > 0 {
check_cntr = true
zz.MeasTypes.SMeasTypesList = temp_mtl
} else {
modified = true
continue
}
}
keep := false
var temp_mvl []MeasValues
for _, yy := range zz.MeasValuesList {
keep_class := false
keep_inst := false
keep_cntr := false
dna := strings.Split(yy.MeasObjInstID, ",")
instName := dna[len(dna)-1]
cls := strings.Split(dna[len(dna)-1], "=")[0]
if len(measObjClassMap) > 0 {
if measObjClassMap[cls] {
keep_class = true
}
} else {
keep_class = true
}
if len(measObjInstIdsMap) > 0 {
if measObjInstIdsMap[instName] {
keep_inst = true
}
} else {
keep_inst = true
}
if check_cntr {
var temp_mrl []MeasResults
cnt_p := 1
for _, v := range yy.MeasResultsList {
if cnt_flags[v.P-1] {
v.P = cnt_p
cnt_p++
temp_mrl = append(temp_mrl, v)
}
}
yy.MeasResultsList = temp_mrl
keep_cntr = true
} else {
keep_cntr = true
}
if keep_class && keep_cntr && keep_inst {
keep = true
temp_mvl = append(temp_mvl, yy)
}
}
if keep {
zz.MeasValuesList = temp_mvl
temp_mil = append(temp_mil, zz)
modified = true
}
}
//Only if modified
if modified {
if len(temp_mil) == 0 {
log.Debug("Msg filtered, nothing found, discarding, obj: ", resource)
return nil
}
data.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList = temp_mil
}
}
log.Debug("Filter: ", time.Since(start).String())
return data
}
func start_job_json_file_data_influx(type_id string, control_ch chan JobControl, data_in_ch chan *KafkaPayload, objectstore bool) {
log.Info("Type job", type_id, " started")
log.Debug("influx job ch ", data_in_ch)
filters := make(map[string]Filter)
filterParams_list := make(map[string]FilterMaps)
influx_job_params := make(map[string]InfluxJobParameters)
var mc *minio.Client
const mc_id = "mc_" + "start_job_json_file_data_influx"
running := true
for {
select {
case job_ctl := <-control_ch:
log.Debug("Type job ", type_id, " new cmd received ", job_ctl.command)
switch job_ctl.command {
case "EXIT":
//ignore cmd - handled by channel signal
case "ADD-FILTER":
filters[job_ctl.filter.JobId] = job_ctl.filter
log.Debug("Type job ", type_id, " updated, topic: ", job_ctl.filter.OutputTopic, " jobid: ", job_ctl.filter.JobId)
log.Debug(job_ctl.filter)
tmp_filterParams_list := make(map[string]FilterMaps)
for k, v := range filterParams_list {
tmp_filterParams_list[k] = v
}
tmp_filterParams_list[job_ctl.filter.JobId] = job_ctl.filter.filter
filterParams_list = tmp_filterParams_list
tmp_influx_job_params := make(map[string]InfluxJobParameters)
for k, v := range influx_job_params {
tmp_influx_job_params[k] = v
}
tmp_influx_job_params[job_ctl.filter.JobId] = job_ctl.filter.influxParameters
influx_job_params = tmp_influx_job_params
case "REMOVE-FILTER":
log.Debug("Type job ", type_id, " removing job: ", job_ctl.filter.JobId)
tmp_filterParams_list := make(map[string]FilterMaps)
for k, v := range filterParams_list {
tmp_filterParams_list[k] = v
}
delete(tmp_filterParams_list, job_ctl.filter.JobId)
filterParams_list = tmp_filterParams_list
tmp_influx_job_params := make(map[string]InfluxJobParameters)
for k, v := range influx_job_params {
tmp_influx_job_params[k] = v
}
delete(tmp_influx_job_params, job_ctl.filter.JobId)
influx_job_params = tmp_influx_job_params
}
case msg := <-data_in_ch:
log.Debug("Data reveived - influx")
if msg == nil {
log.Info("Type job ", type_id, " stopped by channel signal - start_job_xml_file_data")
running = false
return
}
if objectstore {
if mc == nil {
var err *error
mc, err = create_minio_client(mc_id)
if err != nil {
log.Debug("Cannot create minio client for type job: ", type_id)
}
}
}
jobLimiterChan <- struct{}{}
go run_json_file_data_job_influx(type_id, msg, filterParams_list, influx_job_params, jobLimiterChan, mc, mc_id, objectstore)
case <-time.After(1 * time.Second):
if !running {
return
}
}
}
}
func run_json_file_data_job_influx(type_id string, msg *KafkaPayload, filterList map[string]FilterMaps, influxList map[string]InfluxJobParameters, jobLimiterChan chan struct{}, mc *minio.Client, mc_id string, objectstore bool) {
log.Debug("run_json_file_data_job_influx")
//Release job limit
defer func() {
<-jobLimiterChan
}()
PrintMemUsage()
var evt_data FileDownloadedEvt
err := jsoniter.Unmarshal(msg.msg.Value, &evt_data)
if err != nil {
log.Error("Cannot parse FileDownloadedEvt for type job: ", type_id, " - discarding message, error details: ", err)
return
}
log.Debug("FileDownloadedEvt for job: ", type_id, " file: ", evt_data.Filename)
var reader io.Reader
INPUTBUCKET := "pm-files-json"
filename := ""
if objectstore == false {
filename = files_volume + "/" + evt_data.Filename
fi, err := os.Open(filename)
if err != nil {
log.Error("File: ", filename, "for type job: ", type_id, " - cannot be opened -discarding message - error details: ", err)
return
}
defer fi.Close()
reader = fi
} else {
filename = "/" + evt_data.Filename
if mc != nil {
if check_minio_bucket(mc, mc_id, INPUTBUCKET) == false {
log.Error("Bucket not available for reading in type job: ", type_id, "bucket: ", INPUTBUCKET)
return
}
tctx := context.Background()
mr, err := mc.GetObject(tctx, INPUTBUCKET, filename, minio.GetObjectOptions{})
if err != nil {
log.Error("Obj: ", filename, "for type job: ", type_id, " - cannot be fetched from bucket: ", INPUTBUCKET, " - discarding message, error details: ", err)
return
}
reader = mr
defer mr.Close()
} else {
log.Error("Cannot get obj: ", filename, "for type job: ", type_id, " from bucket: ", INPUTBUCKET, " - no client")
return
}
}
var data *[]byte
if strings.HasSuffix(filename, "gz") {
start := time.Now()
var buf2 bytes.Buffer
errb := gunzipReaderToWriter(&buf2, reader)
if errb != nil {
log.Error("Cannot decompress file/obj ", filename, "for type job: ", type_id, " - discarding message, error details", errb)
return
}
d := buf2.Bytes()
data = &d
log.Debug("Decompress file/obj ", filename, "for type job: ", type_id, " time:", time.Since(start).String())
} else {
start := time.Now()
d, err := io.ReadAll(reader)
if err != nil {
log.Error("Cannot read file/obj: ", filename, "for type job: ", type_id, " - discarding message, error details", err)
return
}
data = &d
log.Debug("Read file/obj: ", filename, "for type job: ", type_id, " time: ", time.Since(start).String())
}
for k, v := range filterList {
var pmfile PMJsonFile
start := time.Now()
err = jsoniter.Unmarshal(*data, &pmfile)
log.Debug("Json unmarshal obj: ", filename, " time: ", time.Since(start).String())
if err != nil {
log.Error("Msg could not be unmarshalled - discarding message, obj: ", filename, " - error details:", err)
return
}
if len(v.sourceNameMap) > 0 || len(v.measObjInstIdsMap) > 0 || len(v.measTypesMap) > 0 || len(v.measObjClassMap) > 0 {
b := json_pm_filter_to_obj(evt_data.Filename, &pmfile, v.sourceNameMap, v.measObjClassMap, v.measObjInstIdsMap, v.measTypesMap)
if b == nil {
log.Info("File/obj ", filename, "for job: ", k, " - empty after filering, discarding message ")
return
}
}
fluxParms := influxList[k]
log.Debug("Influxdb params: ", fluxParms)
client := influxdb2.NewClient(fluxParms.DbUrl, fluxParms.DbToken)
writeAPI := client.WriteAPIBlocking(fluxParms.DbOrg, fluxParms.DbBucket)
for _, zz := range pmfile.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList {
ctr_names := make(map[string]string)
for cni, cn := range zz.MeasTypes.SMeasTypesList {
ctr_names[strconv.Itoa(cni+1)] = cn
}
for _, xx := range zz.MeasValuesList {
log.Debug("Measurement: ", xx.MeasObjInstID)
log.Debug("Suspect flag: ", xx.SuspectFlag)
p := influxdb2.NewPointWithMeasurement(xx.MeasObjInstID)
p.AddField("suspectflag", xx.SuspectFlag)
p.AddField("granularityperiod", pmfile.Event.Perf3GppFields.MeasDataCollection.GranularityPeriod)
p.AddField("timezone", pmfile.Event.CommonEventHeader.TimeZoneOffset)
for _, yy := range xx.MeasResultsList {
pi := strconv.Itoa(yy.P)
pv := yy.SValue
pn := ctr_names[pi]
log.Debug("Counter: ", pn, " Value: ", pv)
pv_i, err := strconv.Atoi(pv)
if err == nil {
p.AddField(pn, pv_i)
} else {
p.AddField(pn, pv)
}
}
//p.SetTime(timeT)
log.Debug("StartEpochMicrosec from common event header: ", pmfile.Event.CommonEventHeader.StartEpochMicrosec)
log.Debug("Set time: ", time.Unix(pmfile.Event.CommonEventHeader.StartEpochMicrosec/1000000, 0))
p.SetTime(time.Unix(pmfile.Event.CommonEventHeader.StartEpochMicrosec/1000000, 0))
err := writeAPI.WritePoint(context.Background(), p)
if err != nil {
log.Error("Db write error: ", err)
}
}
}
client.Close()
}
}