BjornMagnussonXA | c5655db | 2023-03-17 14:55:16 +0100 | [diff] [blame^] | 1 | // ============LICENSE_START=============================================== |
| 2 | // Copyright (C) 2023 Nordix Foundation. All rights reserved. |
| 3 | // ======================================================================== |
| 4 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | // you may not use this file except in compliance with the License. |
| 6 | // You may obtain a copy of the License at |
| 7 | // |
| 8 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | // |
| 10 | // Unless required by applicable law or agreed to in writing, software |
| 11 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | // See the License for the specific language governing permissions and |
| 14 | // limitations under the License. |
| 15 | // ============LICENSE_END================================================= |
| 16 | // |
| 17 | |
| 18 | package main |
| 19 | |
| 20 | import ( |
| 21 | "bytes" |
| 22 | "compress/gzip" |
| 23 | "context" |
| 24 | "crypto/tls" |
| 25 | "encoding/json" |
| 26 | "encoding/xml" |
| 27 | "errors" |
| 28 | "fmt" |
| 29 | "io" |
| 30 | "net" |
| 31 | "os/signal" |
| 32 | "reflect" |
| 33 | "strings" |
| 34 | "sync" |
| 35 | "syscall" |
| 36 | |
| 37 | "net/http" |
| 38 | "os" |
| 39 | "runtime" |
| 40 | "strconv" |
| 41 | "time" |
| 42 | |
| 43 | "github.com/google/uuid" |
| 44 | "golang.org/x/oauth2/clientcredentials" |
| 45 | |
| 46 | log "github.com/sirupsen/logrus" |
| 47 | |
| 48 | "github.com/gorilla/mux" |
| 49 | |
| 50 | "net/http/pprof" |
| 51 | |
| 52 | "github.com/confluentinc/confluent-kafka-go/kafka" |
| 53 | influxdb2 "github.com/influxdata/influxdb-client-go/v2" |
| 54 | jsoniter "github.com/json-iterator/go" |
| 55 | "github.com/minio/minio-go/v7" |
| 56 | "github.com/minio/minio-go/v7/pkg/credentials" |
| 57 | ) |
| 58 | |
| 59 | //== Constants ==// |
| 60 | |
| 61 | const http_port = 80 |
| 62 | const https_port = 443 |
| 63 | const config_file = "application_configuration.json" |
| 64 | const server_crt = "server.crt" |
| 65 | const server_key = "server.key" |
| 66 | |
| 67 | const producer_name = "kafka-producer" |
| 68 | |
| 69 | const registration_delay_short = 2 |
| 70 | const registration_delay_long = 120 |
| 71 | |
| 72 | const mutexLocked = 1 |
| 73 | |
| 74 | const ( |
| 75 | Init AppStates = iota |
| 76 | Running |
| 77 | Terminating |
| 78 | ) |
| 79 | |
| 80 | const reader_queue_length = 100 //Per type job |
| 81 | const writer_queue_length = 100 //Per info job |
| 82 | const parallelism_limiter = 100 //For all jobs |
| 83 | |
| 84 | // This are optional - set if using SASL protocol is used towards kafka |
| 85 | var creds_grant_type = os.Getenv("CREDS_GRANT_TYPE") |
| 86 | var creds_client_secret = os.Getenv("CREDS_CLIENT_SECRET") |
| 87 | var creds_client_id = os.Getenv("CREDS_CLIENT_ID") |
| 88 | var creds_service_url = os.Getenv("AUTH_SERVICE_URL") |
| 89 | |
| 90 | //== Types ==// |
| 91 | |
| 92 | type AppStates int64 |
| 93 | |
| 94 | type FilterParameters struct { |
| 95 | MeasuredEntityDns []string `json:"measuredEntityDns"` |
| 96 | MeasTypes []string `json:"measTypes"` |
| 97 | MeasObjClass []string `json:"measObjClass"` |
| 98 | MeasObjInstIds []string `json:"measObjInstIds"` |
| 99 | } |
| 100 | |
| 101 | type InfoJobDataType struct { |
| 102 | InfoJobData struct { |
| 103 | KafkaOutputTopic string `json:"kafkaOutputTopic"` |
| 104 | |
| 105 | DbUrl string `json:"db-url"` |
| 106 | DbOrg string `json:"db-org"` |
| 107 | DbBucket string `json:"db-bucket"` |
| 108 | DbToken string `json:"db-token"` |
| 109 | |
| 110 | FilterParams FilterParameters `json:"filter"` |
| 111 | } `json:"info_job_data"` |
| 112 | InfoJobIdentity string `json:"info_job_identity"` |
| 113 | InfoTypeIdentity string `json:"info_type_identity"` |
| 114 | LastUpdated string `json:"last_updated"` |
| 115 | Owner string `json:"owner"` |
| 116 | TargetURI string `json:"target_uri"` |
| 117 | } |
| 118 | |
| 119 | // Type for an infojob |
| 120 | type InfoJobRecord struct { |
| 121 | job_info InfoJobDataType |
| 122 | output_topic string |
| 123 | |
| 124 | statistics *InfoJobStats |
| 125 | } |
| 126 | |
| 127 | // Type for an infojob |
| 128 | type TypeJobRecord struct { |
| 129 | InfoType string |
| 130 | InputTopic string |
| 131 | data_in_channel chan *KafkaPayload |
| 132 | reader_control chan ReaderControl |
| 133 | job_control chan JobControl |
| 134 | groupId string |
| 135 | clientId string |
| 136 | |
| 137 | statistics *TypeJobStats |
| 138 | } |
| 139 | |
| 140 | // Type for controlling the topic reader |
| 141 | type ReaderControl struct { |
| 142 | command string |
| 143 | } |
| 144 | |
| 145 | // Type for controlling the topic writer |
| 146 | type WriterControl struct { |
| 147 | command string |
| 148 | } |
| 149 | |
| 150 | // Type for controlling the job |
| 151 | type JobControl struct { |
| 152 | command string |
| 153 | filter Filter |
| 154 | } |
| 155 | |
| 156 | type KafkaPayload struct { |
| 157 | msg *kafka.Message |
| 158 | topic string |
| 159 | jobid string |
| 160 | } |
| 161 | |
| 162 | type FilterMaps struct { |
| 163 | sourceNameMap map[string]bool |
| 164 | measObjClassMap map[string]bool |
| 165 | measObjInstIdsMap map[string]bool |
| 166 | measTypesMap map[string]bool |
| 167 | } |
| 168 | |
| 169 | type InfluxJobParameters struct { |
| 170 | DbUrl string |
| 171 | DbOrg string |
| 172 | DbBucket string |
| 173 | DbToken string |
| 174 | } |
| 175 | |
| 176 | type Filter struct { |
| 177 | JobId string |
| 178 | OutputTopic string |
| 179 | filter FilterMaps |
| 180 | |
| 181 | influxParameters InfluxJobParameters |
| 182 | } |
| 183 | |
| 184 | // Type for info job statistics |
| 185 | type InfoJobStats struct { |
| 186 | out_msg_cnt int |
| 187 | out_data_vol int64 |
| 188 | } |
| 189 | |
| 190 | // Type for type job statistics |
| 191 | type TypeJobStats struct { |
| 192 | in_msg_cnt int |
| 193 | in_data_vol int64 |
| 194 | } |
| 195 | |
| 196 | // == API Datatypes ==// |
| 197 | // Type for supported data types |
| 198 | type DataType struct { |
| 199 | ID string `json:"id"` |
| 200 | KafkaInputTopic string `json:"kafkaInputTopic"` |
| 201 | InputJobType string `json:inputJobType` |
| 202 | InputJobDefinition struct { |
| 203 | KafkaOutputTopic string `json:kafkaOutputTopic` |
| 204 | } `json:inputJobDefinition` |
| 205 | |
| 206 | ext_job *[]byte |
| 207 | ext_job_created bool |
| 208 | ext_job_id string |
| 209 | } |
| 210 | |
| 211 | type DataTypes struct { |
| 212 | ProdDataTypes []DataType `json:"types"` |
| 213 | } |
| 214 | |
| 215 | type Minio_buckets struct { |
| 216 | Buckets map[string]bool |
| 217 | } |
| 218 | |
| 219 | //== External data types ==// |
| 220 | |
| 221 | // // Data type for event xml file download |
| 222 | type XmlFileEventHeader struct { |
| 223 | ProductName string `json:"productName"` |
| 224 | VendorName string `json:"vendorName"` |
| 225 | Location string `json:"location"` |
| 226 | Compression string `json:"compression"` |
| 227 | SourceName string `json:"sourceName"` |
| 228 | FileFormatType string `json:"fileFormatType"` |
| 229 | FileFormatVersion string `json:"fileFormatVersion"` |
| 230 | StartEpochMicrosec int64 `json:"startEpochMicrosec"` |
| 231 | LastEpochMicrosec int64 `json:"lastEpochMicrosec"` |
| 232 | Name string `json:"name"` |
| 233 | ChangeIdentifier string `json:"changeIdentifier"` |
| 234 | InternalLocation string `json:"internalLocation"` |
| 235 | TimeZoneOffset string `json:"timeZoneOffset"` |
| 236 | //ObjectStoreBucket string `json:"objectStoreBucket"` |
| 237 | } |
| 238 | |
| 239 | // Data types for input xml file |
| 240 | type MeasCollecFile struct { |
| 241 | XMLName xml.Name `xml:"measCollecFile"` |
| 242 | Text string `xml:",chardata"` |
| 243 | Xmlns string `xml:"xmlns,attr"` |
| 244 | Xsi string `xml:"xsi,attr"` |
| 245 | SchemaLocation string `xml:"schemaLocation,attr"` |
| 246 | FileHeader struct { |
| 247 | Text string `xml:",chardata"` |
| 248 | FileFormatVersion string `xml:"fileFormatVersion,attr"` |
| 249 | VendorName string `xml:"vendorName,attr"` |
| 250 | DnPrefix string `xml:"dnPrefix,attr"` |
| 251 | FileSender struct { |
| 252 | Text string `xml:",chardata"` |
| 253 | LocalDn string `xml:"localDn,attr"` |
| 254 | ElementType string `xml:"elementType,attr"` |
| 255 | } `xml:"fileSender"` |
| 256 | MeasCollec struct { |
| 257 | Text string `xml:",chardata"` |
| 258 | BeginTime string `xml:"beginTime,attr"` |
| 259 | } `xml:"measCollec"` |
| 260 | } `xml:"fileHeader"` |
| 261 | MeasData struct { |
| 262 | Text string `xml:",chardata"` |
| 263 | ManagedElement struct { |
| 264 | Text string `xml:",chardata"` |
| 265 | LocalDn string `xml:"localDn,attr"` |
| 266 | SwVersion string `xml:"swVersion,attr"` |
| 267 | } `xml:"managedElement"` |
| 268 | MeasInfo []struct { |
| 269 | Text string `xml:",chardata"` |
| 270 | MeasInfoId string `xml:"measInfoId,attr"` |
| 271 | Job struct { |
| 272 | Text string `xml:",chardata"` |
| 273 | JobId string `xml:"jobId,attr"` |
| 274 | } `xml:"job"` |
| 275 | GranPeriod struct { |
| 276 | Text string `xml:",chardata"` |
| 277 | Duration string `xml:"duration,attr"` |
| 278 | EndTime string `xml:"endTime,attr"` |
| 279 | } `xml:"granPeriod"` |
| 280 | RepPeriod struct { |
| 281 | Text string `xml:",chardata"` |
| 282 | Duration string `xml:"duration,attr"` |
| 283 | } `xml:"repPeriod"` |
| 284 | MeasType []struct { |
| 285 | Text string `xml:",chardata"` |
| 286 | P string `xml:"p,attr"` |
| 287 | } `xml:"measType"` |
| 288 | MeasValue []struct { |
| 289 | Text string `xml:",chardata"` |
| 290 | MeasObjLdn string `xml:"measObjLdn,attr"` |
| 291 | R []struct { |
| 292 | Text string `xml:",chardata"` |
| 293 | P string `xml:"p,attr"` |
| 294 | } `xml:"r"` |
| 295 | Suspect string `xml:"suspect"` |
| 296 | } `xml:"measValue"` |
| 297 | } `xml:"measInfo"` |
| 298 | } `xml:"measData"` |
| 299 | FileFooter struct { |
| 300 | Text string `xml:",chardata"` |
| 301 | MeasCollec struct { |
| 302 | Text string `xml:",chardata"` |
| 303 | EndTime string `xml:"endTime,attr"` |
| 304 | } `xml:"measCollec"` |
| 305 | } `xml:"fileFooter"` |
| 306 | } |
| 307 | |
| 308 | // Data type for json file |
| 309 | // Splitted in sevreal part to allow add/remove in lists |
| 310 | type MeasResults struct { |
| 311 | P int `json:"p"` |
| 312 | SValue string `json:"sValue"` |
| 313 | } |
| 314 | |
| 315 | type MeasValues struct { |
| 316 | MeasObjInstID string `json:"measObjInstId"` |
| 317 | SuspectFlag string `json:"suspectFlag"` |
| 318 | MeasResultsList []MeasResults `json:"measResults"` |
| 319 | } |
| 320 | |
| 321 | type SMeasTypes struct { |
| 322 | SMeasType string `json:"sMeasTypesList"` |
| 323 | } |
| 324 | |
| 325 | type MeasInfoList struct { |
| 326 | MeasInfoID struct { |
| 327 | SMeasInfoID string `json:"sMeasInfoId"` |
| 328 | } `json:"measInfoId"` |
| 329 | MeasTypes struct { |
| 330 | SMeasTypesList []string `json:"sMeasTypesList"` |
| 331 | } `json:"measTypes"` |
| 332 | MeasValuesList []MeasValues `json:"measValuesList"` |
| 333 | } |
| 334 | |
| 335 | type PMJsonFile struct { |
| 336 | Event struct { |
| 337 | CommonEventHeader struct { |
| 338 | Domain string `json:"domain"` |
| 339 | EventID string `json:"eventId"` |
| 340 | Sequence int `json:"sequence"` |
| 341 | EventName string `json:"eventName"` |
| 342 | SourceName string `json:"sourceName"` |
| 343 | ReportingEntityName string `json:"reportingEntityName"` |
| 344 | Priority string `json:"priority"` |
| 345 | StartEpochMicrosec int64 `json:"startEpochMicrosec"` |
| 346 | LastEpochMicrosec int64 `json:"lastEpochMicrosec"` |
| 347 | Version string `json:"version"` |
| 348 | VesEventListenerVersion string `json:"vesEventListenerVersion"` |
| 349 | TimeZoneOffset string `json:"timeZoneOffset"` |
| 350 | } `json:"commonEventHeader"` |
| 351 | Perf3GppFields struct { |
| 352 | Perf3GppFieldsVersion string `json:"perf3gppFieldsVersion"` |
| 353 | MeasDataCollection struct { |
| 354 | GranularityPeriod int `json:"granularityPeriod"` |
| 355 | MeasuredEntityUserName string `json:"measuredEntityUserName"` |
| 356 | MeasuredEntityDn string `json:"measuredEntityDn"` |
| 357 | MeasuredEntitySoftwareVersion string `json:"measuredEntitySoftwareVersion"` |
| 358 | SMeasInfoList []MeasInfoList `json:"measInfoList"` |
| 359 | } `json:"measDataCollection"` |
| 360 | } `json:"perf3gppFields"` |
| 361 | } `json:"event"` |
| 362 | } |
| 363 | |
| 364 | // Data type for converted json file message |
| 365 | type FileDownloadedEvt struct { |
| 366 | Filename string `json:"filename"` |
| 367 | } |
| 368 | |
| 369 | //== Variables ==// |
| 370 | |
| 371 | var AppState = Init |
| 372 | |
| 373 | // Lock for all internal data |
| 374 | var datalock sync.Mutex |
| 375 | |
| 376 | var producer_instance_name string = producer_name |
| 377 | |
| 378 | // Keep all info type jobs, key == type id |
| 379 | var TypeJobs map[string]TypeJobRecord = make(map[string]TypeJobRecord) |
| 380 | |
| 381 | // Keep all info jobs, key == job id |
| 382 | var InfoJobs map[string]InfoJobRecord = make(map[string]InfoJobRecord) |
| 383 | |
| 384 | var InfoTypes DataTypes |
| 385 | |
| 386 | // Limiter - valid for all jobs |
| 387 | var jobLimiterChan = make(chan struct{}, parallelism_limiter) |
| 388 | |
| 389 | // TODO: Config param? |
| 390 | var bucket_location = "swe" |
| 391 | |
| 392 | var httpclient = &http.Client{} |
| 393 | |
| 394 | // == Env variables ==// |
| 395 | var bootstrapserver = os.Getenv("KAFKA_SERVER") |
| 396 | var files_volume = os.Getenv("FILES_VOLUME") |
| 397 | var ics_server = os.Getenv("ICS") |
| 398 | var self = os.Getenv("SELF") |
| 399 | var filestore_user = os.Getenv("FILESTORE_USER") |
| 400 | var filestore_pwd = os.Getenv("FILESTORE_PWD") |
| 401 | var filestore_server = os.Getenv("FILESTORE_SERVER") |
| 402 | |
| 403 | var data_out_channel = make(chan *KafkaPayload, writer_queue_length) |
| 404 | var writer_control = make(chan WriterControl, 1) |
| 405 | |
| 406 | var minio_bucketlist map[string]Minio_buckets = make(map[string]Minio_buckets) |
| 407 | |
| 408 | // == Main ==// |
| 409 | func main() { |
| 410 | |
| 411 | //log.SetLevel(log.InfoLevel) |
| 412 | log.SetLevel(log.TraceLevel) |
| 413 | |
| 414 | log.Info("Server starting...") |
| 415 | |
| 416 | if self == "" { |
| 417 | log.Panic("Env SELF not configured") |
| 418 | } |
| 419 | if bootstrapserver == "" { |
| 420 | log.Panic("Env KAFKA_SERVER not set") |
| 421 | } |
| 422 | if ics_server == "" { |
| 423 | log.Panic("Env ICS not set") |
| 424 | } |
| 425 | if os.Getenv("KP") != "" { |
| 426 | producer_instance_name = producer_instance_name + "-" + os.Getenv("KP") |
| 427 | } |
| 428 | |
| 429 | rtr := mux.NewRouter() |
| 430 | rtr.HandleFunc("/callbacks/job/"+producer_instance_name, create_job) |
| 431 | rtr.HandleFunc("/callbacks/job/"+producer_instance_name+"/{job_id}", delete_job) |
| 432 | rtr.HandleFunc("/callbacks/supervision/"+producer_instance_name, supervise_producer) |
| 433 | rtr.HandleFunc("/statistics", statistics) |
| 434 | rtr.HandleFunc("/logging/{level}", logging_level) |
| 435 | rtr.HandleFunc("/logging", logging_level) |
| 436 | rtr.HandleFunc("/", alive) |
| 437 | |
| 438 | //For perf/mem profiling |
| 439 | rtr.HandleFunc("/custom_debug_path/profile", pprof.Profile) |
| 440 | |
| 441 | http.Handle("/", rtr) |
| 442 | |
| 443 | http_server := &http.Server{Addr: ":" + strconv.Itoa(http_port), Handler: nil} |
| 444 | |
| 445 | cer, err := tls.LoadX509KeyPair(server_crt, server_key) |
| 446 | if err != nil { |
| 447 | log.Error("Cannot load key and cert - %v\n", err) |
| 448 | return |
| 449 | } |
| 450 | config := &tls.Config{Certificates: []tls.Certificate{cer}} |
| 451 | https_server := &http.Server{Addr: ":" + strconv.Itoa(https_port), TLSConfig: config, Handler: nil} |
| 452 | |
| 453 | //TODO: Make http on/off configurable |
| 454 | // Run http |
| 455 | go func() { |
| 456 | log.Info("Starting http service...") |
| 457 | err := http_server.ListenAndServe() |
| 458 | if err == http.ErrServerClosed { // graceful shutdown |
| 459 | log.Info("http server shutdown...") |
| 460 | } else if err != nil { |
| 461 | log.Error("http server error: %v\n", err) |
| 462 | } |
| 463 | }() |
| 464 | |
| 465 | //TODO: Make https on/off configurable |
| 466 | // Run https |
| 467 | go func() { |
| 468 | log.Info("Starting https service...") |
| 469 | err := https_server.ListenAndServe() |
| 470 | if err == http.ErrServerClosed { // graceful shutdown |
| 471 | log.Info("https server shutdown...") |
| 472 | } else if err != nil { |
| 473 | log.Error("https server error: %v\n", err) |
| 474 | } |
| 475 | }() |
| 476 | check_tcp(strconv.Itoa(http_port)) |
| 477 | check_tcp(strconv.Itoa(https_port)) |
| 478 | |
| 479 | go start_topic_writer(writer_control, data_out_channel) |
| 480 | |
| 481 | //Setup proc for periodic type registration |
| 482 | var event_chan = make(chan int) //Channel for stopping the proc |
| 483 | go periodic_registration(event_chan) |
| 484 | |
| 485 | //Wait for term/int signal do try to shut down gracefully |
| 486 | sigs := make(chan os.Signal, 1) |
| 487 | signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) |
| 488 | go func() { |
| 489 | sig := <-sigs |
| 490 | fmt.Printf("Received signal %s - application will terminate\n", sig) |
| 491 | event_chan <- 0 // Stop periodic registration |
| 492 | datalock.Lock() |
| 493 | defer datalock.Unlock() |
| 494 | AppState = Terminating |
| 495 | http_server.Shutdown(context.Background()) |
| 496 | https_server.Shutdown(context.Background()) |
| 497 | // Stopping jobs |
| 498 | for key, _ := range TypeJobs { |
| 499 | log.Info("Stopping type job:", key) |
| 500 | for _, dp := range InfoTypes.ProdDataTypes { |
| 501 | if key == dp.ID { |
| 502 | remove_type_job(dp) |
| 503 | } |
| 504 | } |
| 505 | } |
| 506 | }() |
| 507 | |
| 508 | AppState = Running |
| 509 | |
| 510 | //Wait until all go routines has exited |
| 511 | runtime.Goexit() |
| 512 | |
| 513 | fmt.Println("main routine exit") |
| 514 | fmt.Println("server stopped") |
| 515 | } |
| 516 | |
| 517 | func check_tcp(port string) { |
| 518 | log.Info("Checking tcp port: ", port) |
| 519 | for true { |
| 520 | address := net.JoinHostPort("localhost", port) |
| 521 | // 3 second timeout |
| 522 | conn, err := net.DialTimeout("tcp", address, 3*time.Second) |
| 523 | if err != nil { |
| 524 | log.Info("Checking tcp port: ", port, " failed, retrying...") |
| 525 | } else { |
| 526 | if conn != nil { |
| 527 | log.Info("Checking tcp port: ", port, " - OK") |
| 528 | _ = conn.Close() |
| 529 | return |
| 530 | } else { |
| 531 | log.Info("Checking tcp port: ", port, " failed, retrying...") |
| 532 | } |
| 533 | } |
| 534 | } |
| 535 | } |
| 536 | |
| 537 | //== Core functions ==// |
| 538 | |
| 539 | // Run periodic registration of producers |
| 540 | func periodic_registration(evtch chan int) { |
| 541 | var delay int = 1 |
| 542 | for { |
| 543 | select { |
| 544 | case msg := <-evtch: |
| 545 | if msg == 0 { // Stop thread |
| 546 | return |
| 547 | } |
| 548 | case <-time.After(time.Duration(delay) * time.Second): |
| 549 | ok := register_producer() |
| 550 | if ok { |
| 551 | delay = registration_delay_long |
| 552 | } else { |
| 553 | if delay < registration_delay_long { |
| 554 | delay += registration_delay_short |
| 555 | } else { |
| 556 | delay = registration_delay_short |
| 557 | } |
| 558 | } |
| 559 | } |
| 560 | } |
| 561 | } |
| 562 | |
| 563 | func register_producer() bool { |
| 564 | |
| 565 | log.Info("Registering producer: ", producer_instance_name) |
| 566 | |
| 567 | file, err := os.ReadFile(config_file) |
| 568 | if err != nil { |
| 569 | log.Error("Cannot read config file: ", config_file) |
| 570 | log.Error("Registering producer: ", producer_instance_name, " - failed") |
| 571 | return false |
| 572 | } |
| 573 | data := DataTypes{} |
| 574 | err = jsoniter.Unmarshal([]byte(file), &data) |
| 575 | if err != nil { |
| 576 | log.Error("Cannot parse config file: ", config_file) |
| 577 | log.Error("Registering producer: ", producer_instance_name, " - failed") |
| 578 | return false |
| 579 | } |
| 580 | var new_type_names []string |
| 581 | |
| 582 | for i := 0; i < len(data.ProdDataTypes); i++ { |
| 583 | t1 := make(map[string]interface{}) |
| 584 | t2 := make(map[string]interface{}) |
| 585 | |
| 586 | t2["schema"] = "http://json-schema.org/draft-07/schema#" |
| 587 | t2["title"] = data.ProdDataTypes[i].ID |
| 588 | t2["description"] = data.ProdDataTypes[i].ID |
| 589 | t2["type"] = "object" |
| 590 | |
| 591 | t1["info_job_data_schema"] = t2 |
| 592 | |
| 593 | json, err := json.Marshal(t1) |
| 594 | if err != nil { |
| 595 | log.Error("Cannot create json for type: ", data.ProdDataTypes[i].ID) |
| 596 | log.Error("Registering producer: ", producer_instance_name, " - failed") |
| 597 | return false |
| 598 | } else { |
| 599 | //TODO: http/https should be configurable |
| 600 | ok := send_http_request(json, http.MethodPut, "http://"+ics_server+"/data-producer/v1/info-types/"+data.ProdDataTypes[i].ID, true, creds_grant_type != "") |
| 601 | if !ok { |
| 602 | log.Error("Cannot register type: ", data.ProdDataTypes[i].ID) |
| 603 | log.Error("Registering producer: ", producer_instance_name, " - failed") |
| 604 | return false |
| 605 | } |
| 606 | new_type_names = append(new_type_names, data.ProdDataTypes[i].ID) |
| 607 | } |
| 608 | |
| 609 | } |
| 610 | |
| 611 | log.Debug("Registering types: ", new_type_names) |
| 612 | m := make(map[string]interface{}) |
| 613 | m["supported_info_types"] = new_type_names |
| 614 | //TODO: http/https should be configurable |
| 615 | m["info_job_callback_url"] = "http://" + self + "/callbacks/job/" + producer_instance_name |
| 616 | //TODO: http/https should be configurable |
| 617 | m["info_producer_supervision_callback_url"] = "http://" + self + "/callbacks/supervision/" + producer_instance_name |
| 618 | |
| 619 | json, err := json.Marshal(m) |
| 620 | if err != nil { |
| 621 | log.Error("Cannot create json for producer: ", producer_instance_name) |
| 622 | log.Error("Registering producer: ", producer_instance_name, " - failed") |
| 623 | return false |
| 624 | } |
| 625 | ok := send_http_request(json, http.MethodPut, "http://"+ics_server+"/data-producer/v1/info-producers/"+producer_instance_name, true, creds_grant_type != "") |
| 626 | if !ok { |
| 627 | log.Error("Cannot register producer: ", producer_instance_name) |
| 628 | log.Error("Registering producer: ", producer_instance_name, " - failed") |
| 629 | return false |
| 630 | } |
| 631 | datalock.Lock() |
| 632 | defer datalock.Unlock() |
| 633 | |
| 634 | var current_type_names []string |
| 635 | for _, v := range InfoTypes.ProdDataTypes { |
| 636 | current_type_names = append(current_type_names, v.ID) |
| 637 | if contains_str(new_type_names, v.ID) { |
| 638 | //Type exist |
| 639 | log.Debug("Type ", v.ID, " exists") |
| 640 | create_ext_job(v) |
| 641 | } else { |
| 642 | //Type is removed |
| 643 | log.Info("Removing type job for type: ", v.ID, " Type not in configuration") |
| 644 | remove_type_job(v) |
| 645 | } |
| 646 | } |
| 647 | |
| 648 | for _, v := range data.ProdDataTypes { |
| 649 | if contains_str(current_type_names, v.ID) { |
| 650 | //Type exist |
| 651 | log.Debug("Type ", v.ID, " exists") |
| 652 | create_ext_job(v) |
| 653 | } else { |
| 654 | //Type is new |
| 655 | log.Info("Adding type job for type: ", v.ID, " Type added to configuration") |
| 656 | start_type_job(v) |
| 657 | } |
| 658 | } |
| 659 | |
| 660 | InfoTypes = data |
| 661 | log.Debug("Datatypes: ", InfoTypes) |
| 662 | |
| 663 | log.Info("Registering producer: ", producer_instance_name, " - OK") |
| 664 | return true |
| 665 | } |
| 666 | |
| 667 | func remove_type_job(dp DataType) { |
| 668 | log.Info("Removing type job: ", dp.ID) |
| 669 | j, ok := TypeJobs[dp.ID] |
| 670 | if ok { |
| 671 | j.reader_control <- ReaderControl{"EXIT"} |
| 672 | } |
| 673 | |
| 674 | if dp.ext_job_created == true { |
| 675 | dp.ext_job_id = dp.InputJobType + "_" + generate_uuid_from_type(dp.InputJobType) |
| 676 | //TODO: http/https should be configurable |
| 677 | ok := send_http_request(*dp.ext_job, http.MethodDelete, "http://"+ics_server+"/data-consumer/v1/info-jobs/"+dp.ext_job_id, true, creds_grant_type != "") |
| 678 | if !ok { |
| 679 | log.Error("Cannot delete job: ", dp.ext_job_id) |
| 680 | } |
| 681 | dp.ext_job_created = false |
| 682 | dp.ext_job = nil |
| 683 | } |
| 684 | |
| 685 | } |
| 686 | |
| 687 | func start_type_job(dp DataType) { |
| 688 | log.Info("Starting type job: ", dp.ID) |
| 689 | job_record := TypeJobRecord{} |
| 690 | |
| 691 | job_record.job_control = make(chan JobControl, 1) |
| 692 | job_record.reader_control = make(chan ReaderControl, 1) |
| 693 | job_record.data_in_channel = make(chan *KafkaPayload, reader_queue_length) |
| 694 | job_record.InfoType = dp.ID |
| 695 | job_record.InputTopic = dp.KafkaInputTopic |
| 696 | job_record.groupId = "kafka-procon-" + dp.ID |
| 697 | job_record.clientId = dp.ID + "-" + os.Getenv("KP") |
| 698 | var stats TypeJobStats |
| 699 | job_record.statistics = &stats |
| 700 | |
| 701 | switch dp.ID { |
| 702 | case "xml-file-data-to-filestore": |
| 703 | go start_job_xml_file_data(dp.ID, job_record.job_control, job_record.data_in_channel, data_out_channel, "", "pm-files-json") |
| 704 | case "xml-file-data": |
| 705 | go start_job_xml_file_data(dp.ID, job_record.job_control, job_record.data_in_channel, data_out_channel, files_volume, "") |
| 706 | case "json-file-data-from-filestore": |
| 707 | go start_job_json_file_data(dp.ID, job_record.job_control, job_record.data_in_channel, data_out_channel, true) |
| 708 | case "json-file-data": |
| 709 | go start_job_json_file_data(dp.ID, job_record.job_control, job_record.data_in_channel, data_out_channel, false) |
| 710 | case "json-file-data-from-filestore-to-influx": |
| 711 | go start_job_json_file_data_influx(dp.ID, job_record.job_control, job_record.data_in_channel, true) |
| 712 | // case "json-data-to-influx": |
| 713 | // go start_job_json_data_influx(dp.ID, job_record.job_control, job_record.data_in_channel) |
| 714 | |
| 715 | default: |
| 716 | } |
| 717 | |
| 718 | go start_topic_reader(dp.KafkaInputTopic, dp.ID, job_record.reader_control, job_record.data_in_channel, job_record.groupId, job_record.clientId, &stats) |
| 719 | |
| 720 | TypeJobs[dp.ID] = job_record |
| 721 | log.Debug("Type job input type: ", dp.InputJobType) |
| 722 | create_ext_job(dp) |
| 723 | } |
| 724 | |
| 725 | func create_ext_job(dp DataType) { |
| 726 | if dp.InputJobType != "" { |
| 727 | jb := make(map[string]interface{}) |
| 728 | jb["info_type_id"] = dp.InputJobType |
| 729 | jb["job_owner"] = "console" //TODO: |
| 730 | jb["status_notification_uri"] = "http://callback:80/post" |
| 731 | jb1 := make(map[string]interface{}) |
| 732 | jb["job_definition"] = jb1 |
| 733 | jb1["kafkaOutputTopic"] = dp.InputJobDefinition.KafkaOutputTopic |
| 734 | |
| 735 | json, err := json.Marshal(jb) |
| 736 | dp.ext_job_created = false |
| 737 | dp.ext_job = nil |
| 738 | if err != nil { |
| 739 | log.Error("Cannot create json for type: ", dp.InputJobType) |
| 740 | return |
| 741 | } |
| 742 | |
| 743 | dp.ext_job_id = dp.InputJobType + "_" + generate_uuid_from_type(dp.InputJobType) |
| 744 | //TODO: http/https should be configurable |
| 745 | ok := false |
| 746 | for !ok { |
| 747 | ok = send_http_request(json, http.MethodPut, "http://"+ics_server+"/data-consumer/v1/info-jobs/"+dp.ext_job_id, true, creds_grant_type != "") |
| 748 | if !ok { |
| 749 | log.Error("Cannot register job: ", dp.InputJobType) |
| 750 | //TODO: Restart after long time? |
| 751 | } |
| 752 | } |
| 753 | log.Debug("Registered job ok: ", dp.InputJobType) |
| 754 | dp.ext_job_created = true |
| 755 | dp.ext_job = &json |
| 756 | } |
| 757 | } |
| 758 | |
| 759 | func remove_info_job(jobid string) { |
| 760 | log.Info("Removing info job: ", jobid) |
| 761 | filter := Filter{} |
| 762 | filter.JobId = jobid |
| 763 | jc := JobControl{} |
| 764 | jc.command = "REMOVE-FILTER" |
| 765 | jc.filter = filter |
| 766 | infoJob := InfoJobs[jobid] |
| 767 | typeJob := TypeJobs[infoJob.job_info.InfoTypeIdentity] |
| 768 | typeJob.job_control <- jc |
| 769 | delete(InfoJobs, jobid) |
| 770 | |
| 771 | } |
| 772 | |
| 773 | // == Helper functions ==// |
| 774 | |
| 775 | // Function to check the status of a mutex lock |
| 776 | func MutexLocked(m *sync.Mutex) bool { |
| 777 | state := reflect.ValueOf(m).Elem().FieldByName("state") |
| 778 | return state.Int()&mutexLocked == mutexLocked |
| 779 | } |
| 780 | |
| 781 | // Test if slice contains a string |
| 782 | func contains_str(s []string, e string) bool { |
| 783 | for _, a := range s { |
| 784 | if a == e { |
| 785 | return true |
| 786 | } |
| 787 | } |
| 788 | return false |
| 789 | } |
| 790 | |
| 791 | // Send a http request with json (json may be nil) |
| 792 | func send_http_request(json []byte, method string, url string, retry bool, useAuth bool) bool { |
| 793 | |
| 794 | // set the HTTP method, url, and request body |
| 795 | var req *http.Request |
| 796 | var err error |
| 797 | if json == nil { |
| 798 | req, err = http.NewRequest(method, url, http.NoBody) |
| 799 | } else { |
| 800 | req, err = http.NewRequest(method, url, bytes.NewBuffer(json)) |
| 801 | req.Header.Set("Content-Type", "application/json; charset=utf-8") |
| 802 | } |
| 803 | if err != nil { |
| 804 | log.Error("Cannot create http request, method: ", method, " url: ", url) |
| 805 | return false |
| 806 | } |
| 807 | |
| 808 | if useAuth { |
| 809 | token, err := fetch_token() |
| 810 | if err != nil { |
| 811 | log.Error("Cannot fetch token for http request: ", err) |
| 812 | return false |
| 813 | } |
| 814 | req.Header.Set("Authorization", "Bearer "+token.TokenValue) |
| 815 | } |
| 816 | |
| 817 | log.Debug("HTTP request: ", req) |
| 818 | |
| 819 | log.Debug("Sending http request") |
| 820 | resp, err2 := httpclient.Do(req) |
| 821 | if err2 != nil { |
| 822 | log.Error("Http request error: ", err2) |
| 823 | log.Error("Cannot send http request method: ", method, " url: ", url) |
| 824 | } else { |
| 825 | if resp.StatusCode == 200 || resp.StatusCode == 201 || resp.StatusCode == 204 { |
| 826 | log.Debug("Accepted http status: ", resp.StatusCode) |
| 827 | resp.Body.Close() |
| 828 | return true |
| 829 | } |
| 830 | log.Debug("HTTP resp: ", resp) |
| 831 | resp.Body.Close() |
| 832 | } |
| 833 | return false |
| 834 | } |
| 835 | |
| 836 | // // Send a http request with json (json may be nil) |
| 837 | // func send_http_request(json []byte, method string, url string, retry bool, useAuth bool) bool { |
| 838 | |
| 839 | // // set the HTTP method, url, and request body |
| 840 | // var req *http.Request |
| 841 | // var err error |
| 842 | // if json == nil { |
| 843 | // req, err = http.NewRequest(method, url, http.NoBody) |
| 844 | // } else { |
| 845 | // req, err = http.NewRequest(method, url, bytes.NewBuffer(json)) |
| 846 | // req.Header.Set("Content-Type", "application/json; charset=utf-8") |
| 847 | // } |
| 848 | // if err != nil { |
| 849 | // log.Error("Cannot create http request, method: ", method, " url: ", url) |
| 850 | // return false |
| 851 | // } |
| 852 | |
| 853 | // if useAuth { |
| 854 | // token, err := fetch_token() |
| 855 | // if err != nil { |
| 856 | // log.Error("Cannot fetch token for http request: ", err) |
| 857 | // return false |
| 858 | // } |
| 859 | // req.Header.Set("Authorization", "Bearer "+token.TokenValue) |
| 860 | // } |
| 861 | |
| 862 | // log.Debug("HTTP request: ", req) |
| 863 | |
| 864 | // retries := 1 |
| 865 | // if retry { |
| 866 | // retries = 5 |
| 867 | // } |
| 868 | // sleep_time := 1 |
| 869 | // for i := retries; i > 0; i-- { |
| 870 | // log.Debug("Sending http request") |
| 871 | // resp, err2 := httpclient.Do(req) |
| 872 | // if err2 != nil { |
| 873 | // log.Error("Http request error: ", err2) |
| 874 | // log.Error("Cannot send http request method: ", method, " url: ", url, " - retries left: ", i-1) |
| 875 | |
| 876 | // time.Sleep(time.Duration(sleep_time) * time.Second) |
| 877 | // sleep_time = 2 * sleep_time |
| 878 | // } else { |
| 879 | // if resp.StatusCode == 200 || resp.StatusCode == 201 || resp.StatusCode == 204 { |
| 880 | // log.Debug("Accepted http status: ", resp.StatusCode) |
| 881 | // resp.Body.Close() |
| 882 | // return true |
| 883 | // } |
| 884 | // log.Debug("HTTP resp: ", resp) |
| 885 | // resp.Body.Close() |
| 886 | // } |
| 887 | // } |
| 888 | // return false |
| 889 | // } |
| 890 | |
| 891 | // // Send a http request with json (json may be nil) |
| 892 | // func send_http_request(json []byte, method string, url string, retry bool, useAuth bool) bool { |
| 893 | // // initialize http client |
| 894 | // client := &http.Client{} |
| 895 | |
| 896 | // // set the HTTP method, url, and request body |
| 897 | // var req *http.Request |
| 898 | // var err error |
| 899 | // if json == nil { |
| 900 | // req, err = http.NewRequest(method, url, http.NoBody) |
| 901 | // } else { |
| 902 | // req, err = http.NewRequest(method, url, bytes.NewBuffer(json)) |
| 903 | // req.Header.Set("Content-Type", "application/json; charset=utf-8") |
| 904 | // } |
| 905 | // if err != nil { |
| 906 | // log.Error("Cannot create http request method: ", method, " url: ", url) |
| 907 | // return false |
| 908 | // } |
| 909 | |
| 910 | // useAuth = false |
| 911 | // if useAuth { |
| 912 | // token, err := fetch_token() |
| 913 | // if err != nil { |
| 914 | // log.Error("Cannot fetch token for http request: ", err) |
| 915 | // return false |
| 916 | // } |
| 917 | // req.Header.Add("Authorization", "Bearer "+token.TokenValue) |
| 918 | // } |
| 919 | // log.Debug("HTTP request: ", req) |
| 920 | |
| 921 | // b, berr := io.ReadAll(req.Body) |
| 922 | // if berr == nil { |
| 923 | // log.Debug("HTTP request body length: ", len(b)) |
| 924 | // } else { |
| 925 | // log.Debug("HTTP request - cannot check body length: ", berr) |
| 926 | // } |
| 927 | // if json == nil { |
| 928 | // log.Debug("HTTP request null json") |
| 929 | // } else { |
| 930 | // log.Debug("HTTP request json: ", string(json)) |
| 931 | // } |
| 932 | // requestDump, cerr := httputil.DumpRequestOut(req, true) |
| 933 | // if cerr != nil { |
| 934 | // fmt.Println(cerr) |
| 935 | // } |
| 936 | // fmt.Println(string(requestDump)) |
| 937 | |
| 938 | // retries := 1 |
| 939 | // if retry { |
| 940 | // retries = 5 |
| 941 | // } |
| 942 | // sleep_time := 1 |
| 943 | // for i := retries; i > 0; i-- { |
| 944 | // resp, err2 := client.Do(req) |
| 945 | // if err2 != nil { |
| 946 | // log.Error("Http request error: ", err2) |
| 947 | // log.Error("Cannot send http request method: ", method, " url: ", url, " - retries left: ", i) |
| 948 | |
| 949 | // time.Sleep(time.Duration(sleep_time) * time.Second) |
| 950 | // sleep_time = 2 * sleep_time |
| 951 | // } else { |
| 952 | // if resp.StatusCode == 200 || resp.StatusCode == 201 || resp.StatusCode == 204 { |
| 953 | // log.Debug("Accepted http status: ", resp.StatusCode) |
| 954 | // defer resp.Body.Close() |
| 955 | // return true |
| 956 | // } |
| 957 | // } |
| 958 | // } |
| 959 | // return false |
| 960 | // } |
| 961 | |
| 962 | func fetch_token() (*kafka.OAuthBearerToken, error) { |
| 963 | log.Debug("Get token inline") |
| 964 | conf := &clientcredentials.Config{ |
| 965 | ClientID: creds_client_id, |
| 966 | ClientSecret: creds_client_secret, |
| 967 | TokenURL: creds_service_url, |
| 968 | } |
| 969 | token, err := conf.Token(context.Background()) |
| 970 | if err != nil { |
| 971 | log.Warning("Cannot fetch access token: ", err) |
| 972 | return nil, err |
| 973 | } |
| 974 | extensions := map[string]string{} |
| 975 | log.Debug("=====================================================") |
| 976 | log.Debug("token: ", token) |
| 977 | log.Debug("=====================================================") |
| 978 | log.Debug("TokenValue: ", token.AccessToken) |
| 979 | log.Debug("=====================================================") |
| 980 | log.Debug("Expiration: ", token.Expiry) |
| 981 | t := token.Expiry |
| 982 | // t := token.Expiry.Add(-time.Minute) |
| 983 | // log.Debug("Modified expiration: ", t) |
| 984 | oauthBearerToken := kafka.OAuthBearerToken{ |
| 985 | TokenValue: token.AccessToken, |
| 986 | Expiration: t, |
| 987 | Extensions: extensions, |
| 988 | } |
| 989 | |
| 990 | return &oauthBearerToken, nil |
| 991 | } |
| 992 | |
| 993 | // Function to print memory details |
| 994 | // https://pkg.go.dev/runtime#MemStats |
| 995 | func PrintMemUsage() { |
| 996 | if log.IsLevelEnabled(log.DebugLevel) || log.IsLevelEnabled(log.TraceLevel) { |
| 997 | var m runtime.MemStats |
| 998 | runtime.ReadMemStats(&m) |
| 999 | fmt.Printf("Alloc = %v MiB", bToMb(m.Alloc)) |
| 1000 | fmt.Printf("\tTotalAlloc = %v MiB", bToMb(m.TotalAlloc)) |
| 1001 | fmt.Printf("\tSys = %v MiB", bToMb(m.Sys)) |
| 1002 | fmt.Printf("\tNumGC = %v\n", m.NumGC) |
| 1003 | fmt.Printf("HeapSys = %v MiB", bToMb(m.HeapSys)) |
| 1004 | fmt.Printf("\tStackSys = %v MiB", bToMb(m.StackSys)) |
| 1005 | fmt.Printf("\tMSpanSys = %v MiB", bToMb(m.MSpanSys)) |
| 1006 | fmt.Printf("\tMCacheSys = %v MiB", bToMb(m.MCacheSys)) |
| 1007 | fmt.Printf("\tBuckHashSys = %v MiB", bToMb(m.BuckHashSys)) |
| 1008 | fmt.Printf("\tGCSys = %v MiB", bToMb(m.GCSys)) |
| 1009 | fmt.Printf("\tOtherSys = %v MiB\n", bToMb(m.OtherSys)) |
| 1010 | } |
| 1011 | } |
| 1012 | |
| 1013 | func bToMb(b uint64) uint64 { |
| 1014 | return b / 1024 / 1024 |
| 1015 | } |
| 1016 | |
| 1017 | func generate_uuid_from_type(s string) string { |
| 1018 | if len(s) > 16 { |
| 1019 | s = s[:16] |
| 1020 | } |
| 1021 | for len(s) < 16 { |
| 1022 | s = s + "0" |
| 1023 | } |
| 1024 | b := []byte(s) |
| 1025 | b = b[:16] |
| 1026 | uuid, _ := uuid.FromBytes(b) |
| 1027 | return uuid.String() |
| 1028 | } |
| 1029 | |
| 1030 | // Write gzipped data to a Writer |
| 1031 | func gzipWrite(w io.Writer, data *[]byte) error { |
| 1032 | gw, err1 := gzip.NewWriterLevel(w, gzip.BestSpeed) |
| 1033 | |
| 1034 | if err1 != nil { |
| 1035 | return err1 |
| 1036 | } |
| 1037 | defer gw.Close() |
| 1038 | _, err2 := gw.Write(*data) |
| 1039 | return err2 |
| 1040 | } |
| 1041 | |
| 1042 | // Write gunzipped data from Reader to a Writer |
| 1043 | func gunzipReaderToWriter(w io.Writer, data io.Reader) error { |
| 1044 | gr, err1 := gzip.NewReader(data) |
| 1045 | |
| 1046 | if err1 != nil { |
| 1047 | return err1 |
| 1048 | } |
| 1049 | defer gr.Close() |
| 1050 | data2, err2 := io.ReadAll(gr) |
| 1051 | if err2 != nil { |
| 1052 | return err2 |
| 1053 | } |
| 1054 | _, err3 := w.Write(data2) |
| 1055 | if err3 != nil { |
| 1056 | return err3 |
| 1057 | } |
| 1058 | return nil |
| 1059 | } |
| 1060 | |
| 1061 | func create_minio_bucket(mc *minio.Client, client_id string, bucket string) error { |
| 1062 | tctx := context.Background() |
| 1063 | err := mc.MakeBucket(tctx, bucket, minio.MakeBucketOptions{Region: bucket_location}) |
| 1064 | if err != nil { |
| 1065 | // Check to see if we already own this bucket (which happens if you run this twice) |
| 1066 | exists, errBucketExists := mc.BucketExists(tctx, bucket) |
| 1067 | if errBucketExists == nil && exists { |
| 1068 | log.Debug("Already own bucket:", bucket) |
| 1069 | add_bucket(client_id, bucket) |
| 1070 | return nil |
| 1071 | } else { |
| 1072 | log.Error("Cannot create or check bucket ", bucket, " in minio client", err) |
| 1073 | return err |
| 1074 | } |
| 1075 | } |
| 1076 | log.Debug("Successfully created bucket: ", bucket) |
| 1077 | add_bucket(client_id, bucket) |
| 1078 | return nil |
| 1079 | } |
| 1080 | |
| 1081 | func check_minio_bucket(mc *minio.Client, client_id string, bucket string) bool { |
| 1082 | ok := bucket_exist(client_id, bucket) |
| 1083 | if ok { |
| 1084 | return true |
| 1085 | } |
| 1086 | tctx := context.Background() |
| 1087 | exists, err := mc.BucketExists(tctx, bucket) |
| 1088 | if err == nil && exists { |
| 1089 | log.Debug("Already own bucket:", bucket) |
| 1090 | return true |
| 1091 | } |
| 1092 | log.Error("Bucket does not exist, bucket ", bucket, " in minio client", err) |
| 1093 | return false |
| 1094 | } |
| 1095 | |
| 1096 | func add_bucket(minio_id string, bucket string) { |
| 1097 | datalock.Lock() |
| 1098 | defer datalock.Unlock() |
| 1099 | |
| 1100 | b, ok := minio_bucketlist[minio_id] |
| 1101 | if !ok { |
| 1102 | b = Minio_buckets{} |
| 1103 | b.Buckets = make(map[string]bool) |
| 1104 | } |
| 1105 | b.Buckets[bucket] = true |
| 1106 | minio_bucketlist[minio_id] = b |
| 1107 | } |
| 1108 | |
| 1109 | func bucket_exist(minio_id string, bucket string) bool { |
| 1110 | datalock.Lock() |
| 1111 | defer datalock.Unlock() |
| 1112 | |
| 1113 | b, ok := minio_bucketlist[minio_id] |
| 1114 | if !ok { |
| 1115 | return false |
| 1116 | } |
| 1117 | _, ok = b.Buckets[bucket] |
| 1118 | return ok |
| 1119 | } |
| 1120 | |
| 1121 | //== http api functions ==// |
| 1122 | |
| 1123 | // create/update job |
| 1124 | func create_job(w http.ResponseWriter, req *http.Request) { |
| 1125 | log.Debug("Create job, http method: ", req.Method) |
| 1126 | if req.Method != http.MethodPost { |
| 1127 | log.Error("Create job, http method not allowed") |
| 1128 | w.WriteHeader(http.StatusMethodNotAllowed) |
| 1129 | return |
| 1130 | } |
| 1131 | ct := req.Header.Get("Content-Type") |
| 1132 | if ct != "application/json" { |
| 1133 | log.Error("Create job, bad content type") |
| 1134 | http.Error(w, "Bad content type", http.StatusBadRequest) |
| 1135 | return |
| 1136 | } |
| 1137 | |
| 1138 | var t InfoJobDataType |
| 1139 | err := json.NewDecoder(req.Body).Decode(&t) |
| 1140 | if err != nil { |
| 1141 | log.Error("Create job, cannot parse json,", err) |
| 1142 | http.Error(w, "Cannot parse json", http.StatusBadRequest) |
| 1143 | return |
| 1144 | } |
| 1145 | log.Debug("Creating job, id: ", t.InfoJobIdentity) |
| 1146 | datalock.Lock() |
| 1147 | defer datalock.Unlock() |
| 1148 | |
| 1149 | job_id := t.InfoJobIdentity |
| 1150 | job_record, job_found := InfoJobs[job_id] |
| 1151 | type_job_record, found_type := TypeJobs[t.InfoTypeIdentity] |
| 1152 | if !job_found { |
| 1153 | if !found_type { |
| 1154 | log.Error("Type ", t.InfoTypeIdentity, " does not exist") |
| 1155 | http.Error(w, "Type "+t.InfoTypeIdentity+" does not exist", http.StatusBadRequest) |
| 1156 | return |
| 1157 | } |
| 1158 | } else if t.InfoTypeIdentity != job_record.job_info.InfoTypeIdentity { |
| 1159 | log.Error("Job cannot change type") |
| 1160 | http.Error(w, "Job cannot change type", http.StatusBadRequest) |
| 1161 | return |
| 1162 | } else if t.InfoJobData.KafkaOutputTopic != job_record.job_info.InfoJobData.KafkaOutputTopic { |
| 1163 | log.Error("Job cannot change topic") |
| 1164 | http.Error(w, "Job cannot change topic", http.StatusBadRequest) |
| 1165 | return |
| 1166 | } else if !found_type { |
| 1167 | //Should never happen, if the type is removed then job is stopped |
| 1168 | log.Error("Type ", t.InfoTypeIdentity, " does not exist") |
| 1169 | http.Error(w, "Type "+t.InfoTypeIdentity+" does not exist", http.StatusBadRequest) |
| 1170 | return |
| 1171 | } |
| 1172 | |
| 1173 | //TODO: Verify that job contains enough parameters... |
| 1174 | |
| 1175 | if !job_found { |
| 1176 | job_record = InfoJobRecord{} |
| 1177 | job_record.job_info = t |
| 1178 | output_topic := t.InfoJobData.KafkaOutputTopic |
| 1179 | job_record.output_topic = t.InfoJobData.KafkaOutputTopic |
| 1180 | log.Debug("Starting infojob ", job_id, ", type ", t.InfoTypeIdentity, ", output topic", output_topic) |
| 1181 | |
| 1182 | var stats InfoJobStats |
| 1183 | job_record.statistics = &stats |
| 1184 | |
| 1185 | filter := Filter{} |
| 1186 | filter.JobId = job_id |
| 1187 | filter.OutputTopic = job_record.output_topic |
| 1188 | |
| 1189 | jc := JobControl{} |
| 1190 | |
| 1191 | jc.command = "ADD-FILTER" |
| 1192 | |
| 1193 | //TODO: Refactor |
| 1194 | if t.InfoTypeIdentity == "json-file-data-from-filestore" || t.InfoTypeIdentity == "json-file-data" || t.InfoTypeIdentity == "json-file-data-from-filestore-to-influx" { |
| 1195 | fm := FilterMaps{} |
| 1196 | fm.sourceNameMap = make(map[string]bool) |
| 1197 | fm.measObjClassMap = make(map[string]bool) |
| 1198 | fm.measObjInstIdsMap = make(map[string]bool) |
| 1199 | fm.measTypesMap = make(map[string]bool) |
| 1200 | if t.InfoJobData.FilterParams.MeasuredEntityDns != nil { |
| 1201 | for _, v := range t.InfoJobData.FilterParams.MeasuredEntityDns { |
| 1202 | fm.sourceNameMap[v] = true |
| 1203 | } |
| 1204 | } |
| 1205 | if t.InfoJobData.FilterParams.MeasObjClass != nil { |
| 1206 | for _, v := range t.InfoJobData.FilterParams.MeasObjClass { |
| 1207 | fm.measObjClassMap[v] = true |
| 1208 | } |
| 1209 | } |
| 1210 | if t.InfoJobData.FilterParams.MeasObjInstIds != nil { |
| 1211 | for _, v := range t.InfoJobData.FilterParams.MeasObjInstIds { |
| 1212 | fm.measObjInstIdsMap[v] = true |
| 1213 | } |
| 1214 | } |
| 1215 | if t.InfoJobData.FilterParams.MeasTypes != nil { |
| 1216 | for _, v := range t.InfoJobData.FilterParams.MeasTypes { |
| 1217 | fm.measTypesMap[v] = true |
| 1218 | } |
| 1219 | } |
| 1220 | filter.filter = fm |
| 1221 | } |
| 1222 | if t.InfoTypeIdentity == "json-file-data-from-filestore-to-influx" { |
| 1223 | influxparam := InfluxJobParameters{} |
| 1224 | influxparam.DbUrl = t.InfoJobData.DbUrl |
| 1225 | influxparam.DbOrg = t.InfoJobData.DbOrg |
| 1226 | influxparam.DbBucket = t.InfoJobData.DbBucket |
| 1227 | influxparam.DbToken = t.InfoJobData.DbToken |
| 1228 | filter.influxParameters = influxparam |
| 1229 | } |
| 1230 | |
| 1231 | jc.filter = filter |
| 1232 | InfoJobs[job_id] = job_record |
| 1233 | |
| 1234 | type_job_record.job_control <- jc |
| 1235 | |
| 1236 | } else { |
| 1237 | //TODO |
| 1238 | //Update job |
| 1239 | } |
| 1240 | } |
| 1241 | |
| 1242 | // delete job |
| 1243 | func delete_job(w http.ResponseWriter, req *http.Request) { |
| 1244 | if req.Method != http.MethodDelete { |
| 1245 | w.WriteHeader(http.StatusMethodNotAllowed) |
| 1246 | return |
| 1247 | } |
| 1248 | datalock.Lock() |
| 1249 | defer datalock.Unlock() |
| 1250 | |
| 1251 | vars := mux.Vars(req) |
| 1252 | |
| 1253 | if id, ok := vars["job_id"]; ok { |
| 1254 | if _, ok := InfoJobs[id]; ok { |
| 1255 | remove_info_job(id) |
| 1256 | w.WriteHeader(http.StatusNoContent) |
| 1257 | log.Info("Job ", id, " deleted") |
| 1258 | return |
| 1259 | } |
| 1260 | } |
| 1261 | w.WriteHeader(http.StatusNotFound) |
| 1262 | } |
| 1263 | |
| 1264 | // job supervision |
| 1265 | func supervise_job(w http.ResponseWriter, req *http.Request) { |
| 1266 | if req.Method != http.MethodGet { |
| 1267 | w.WriteHeader(http.StatusMethodNotAllowed) |
| 1268 | return |
| 1269 | } |
| 1270 | datalock.Lock() |
| 1271 | defer datalock.Unlock() |
| 1272 | |
| 1273 | vars := mux.Vars(req) |
| 1274 | |
| 1275 | log.Debug("Supervising, job: ", vars["job_id"]) |
| 1276 | if id, ok := vars["job_id"]; ok { |
| 1277 | if _, ok := InfoJobs[id]; ok { |
| 1278 | log.Debug("Supervision ok, job", id) |
| 1279 | return |
| 1280 | } |
| 1281 | } |
| 1282 | w.WriteHeader(http.StatusNotFound) |
| 1283 | } |
| 1284 | |
| 1285 | // producer supervision |
| 1286 | func supervise_producer(w http.ResponseWriter, req *http.Request) { |
| 1287 | if req.Method != http.MethodGet { |
| 1288 | w.WriteHeader(http.StatusMethodNotAllowed) |
| 1289 | return |
| 1290 | } |
| 1291 | |
| 1292 | w.WriteHeader(http.StatusOK) |
| 1293 | } |
| 1294 | |
| 1295 | // producer statictics, all jobs |
| 1296 | func statistics(w http.ResponseWriter, req *http.Request) { |
| 1297 | if req.Method != http.MethodGet { |
| 1298 | w.WriteHeader(http.StatusMethodNotAllowed) |
| 1299 | return |
| 1300 | } |
| 1301 | m := make(map[string]interface{}) |
| 1302 | log.Debug("producer statictics, locked? ", MutexLocked(&datalock)) |
| 1303 | datalock.Lock() |
| 1304 | defer datalock.Unlock() |
| 1305 | req.Header.Set("Content-Type", "application/json; charset=utf-8") |
| 1306 | m["number-of-jobs"] = len(InfoJobs) |
| 1307 | m["number-of-types"] = len(InfoTypes.ProdDataTypes) |
| 1308 | qm := make(map[string]interface{}) |
| 1309 | m["jobs"] = qm |
| 1310 | for key, elem := range InfoJobs { |
| 1311 | jm := make(map[string]interface{}) |
| 1312 | qm[key] = jm |
| 1313 | jm["type"] = elem.job_info.InfoTypeIdentity |
| 1314 | typeJob := TypeJobs[elem.job_info.InfoTypeIdentity] |
| 1315 | jm["groupId"] = typeJob.groupId |
| 1316 | jm["clientID"] = typeJob.clientId |
| 1317 | jm["input topic"] = typeJob.InputTopic |
| 1318 | jm["input queue length - job ("+fmt.Sprintf("%v", cap(typeJob.data_in_channel))+")"] = len(typeJob.data_in_channel) |
| 1319 | jm["output topic"] = elem.output_topic |
| 1320 | jm["output queue length - producer ("+fmt.Sprintf("%v", cap(data_out_channel))+")"] = len(data_out_channel) |
| 1321 | jm["msg_in (type)"] = typeJob.statistics.in_msg_cnt |
| 1322 | jm["msg_out (job)"] = elem.statistics.out_msg_cnt |
| 1323 | |
| 1324 | } |
| 1325 | json, err := json.Marshal(m) |
| 1326 | if err != nil { |
| 1327 | w.WriteHeader(http.StatusInternalServerError) |
| 1328 | log.Error("Cannot marshal statistics json") |
| 1329 | return |
| 1330 | } |
| 1331 | _, err = w.Write(json) |
| 1332 | if err != nil { |
| 1333 | w.WriteHeader(http.StatusInternalServerError) |
| 1334 | log.Error("Cannot send statistics json") |
| 1335 | return |
| 1336 | } |
| 1337 | } |
| 1338 | |
| 1339 | // Simple alive check |
| 1340 | func alive(w http.ResponseWriter, req *http.Request) { |
| 1341 | //Alive check |
| 1342 | } |
| 1343 | |
| 1344 | // Get/Set logging level |
| 1345 | func logging_level(w http.ResponseWriter, req *http.Request) { |
| 1346 | vars := mux.Vars(req) |
| 1347 | if level, ok := vars["level"]; ok { |
| 1348 | if req.Method == http.MethodPut { |
| 1349 | switch level { |
| 1350 | case "trace": |
| 1351 | log.SetLevel(log.TraceLevel) |
| 1352 | case "debug": |
| 1353 | log.SetLevel(log.DebugLevel) |
| 1354 | case "info": |
| 1355 | log.SetLevel(log.InfoLevel) |
| 1356 | case "warn": |
| 1357 | log.SetLevel(log.WarnLevel) |
| 1358 | case "error": |
| 1359 | log.SetLevel(log.ErrorLevel) |
| 1360 | case "fatal": |
| 1361 | log.SetLevel(log.FatalLevel) |
| 1362 | case "panic": |
| 1363 | log.SetLevel(log.PanicLevel) |
| 1364 | default: |
| 1365 | w.WriteHeader(http.StatusNotFound) |
| 1366 | } |
| 1367 | } else { |
| 1368 | w.WriteHeader(http.StatusMethodNotAllowed) |
| 1369 | } |
| 1370 | } else { |
| 1371 | if req.Method == http.MethodGet { |
| 1372 | msg := "none" |
| 1373 | if log.IsLevelEnabled(log.PanicLevel) { |
| 1374 | msg = "panic" |
| 1375 | } else if log.IsLevelEnabled(log.FatalLevel) { |
| 1376 | msg = "fatal" |
| 1377 | } else if log.IsLevelEnabled(log.ErrorLevel) { |
| 1378 | msg = "error" |
| 1379 | } else if log.IsLevelEnabled(log.WarnLevel) { |
| 1380 | msg = "warn" |
| 1381 | } else if log.IsLevelEnabled(log.InfoLevel) { |
| 1382 | msg = "info" |
| 1383 | } else if log.IsLevelEnabled(log.DebugLevel) { |
| 1384 | msg = "debug" |
| 1385 | } else if log.IsLevelEnabled(log.TraceLevel) { |
| 1386 | msg = "trace" |
| 1387 | } |
| 1388 | w.Header().Set("Content-Type", "application/text") |
| 1389 | w.Write([]byte(msg)) |
| 1390 | } else { |
| 1391 | w.WriteHeader(http.StatusMethodNotAllowed) |
| 1392 | } |
| 1393 | } |
| 1394 | } |
| 1395 | |
| 1396 | func start_topic_reader(topic string, type_id string, control_ch chan ReaderControl, data_ch chan *KafkaPayload, gid string, cid string, stats *TypeJobStats) { |
| 1397 | |
| 1398 | log.Info("Topic reader starting, topic: ", topic, " for type: ", type_id) |
| 1399 | |
| 1400 | topic_ok := false |
| 1401 | var c *kafka.Consumer = nil |
| 1402 | running := true |
| 1403 | |
| 1404 | for topic_ok == false { |
| 1405 | |
| 1406 | select { |
| 1407 | case reader_ctrl := <-control_ch: |
| 1408 | if reader_ctrl.command == "EXIT" { |
| 1409 | log.Info("Topic reader on topic: ", topic, " for type: ", type_id, " - stopped") |
| 1410 | //TODO: Stop consumer if present? |
| 1411 | data_ch <- nil //Signal to job handler |
| 1412 | running = false |
| 1413 | return |
| 1414 | } |
| 1415 | case <-time.After(1 * time.Second): |
| 1416 | if !running { |
| 1417 | return |
| 1418 | } |
| 1419 | if c == nil { |
| 1420 | c = create_kafka_consumer(type_id, gid, cid) |
| 1421 | if c == nil { |
| 1422 | log.Info("Cannot start consumer on topic: ", topic, " for type: ", type_id, " - retrying") |
| 1423 | } else { |
| 1424 | log.Info("Consumer started on topic: ", topic, " for type: ", type_id) |
| 1425 | } |
| 1426 | } |
| 1427 | if c != nil && topic_ok == false { |
| 1428 | err := c.SubscribeTopics([]string{topic}, nil) |
| 1429 | if err != nil { |
| 1430 | log.Info("Topic reader cannot start subscribing on topic: ", topic, " for type: ", type_id, " - retrying -- error details: ", err) |
| 1431 | } else { |
| 1432 | log.Info("Topic reader subscribing on topic: ", topic, " for type: ", type_id) |
| 1433 | topic_ok = true |
| 1434 | } |
| 1435 | } |
| 1436 | } |
| 1437 | } |
| 1438 | log.Info("Topic reader ready on topic: ", topic, " for type: ", type_id) |
| 1439 | |
| 1440 | var event_chan = make(chan int) |
| 1441 | go func() { |
| 1442 | for { |
| 1443 | select { |
| 1444 | case evt := <-c.Events(): |
| 1445 | switch evt.(type) { |
| 1446 | case kafka.OAuthBearerTokenRefresh: |
| 1447 | log.Debug("New consumer token needed: ", evt) |
| 1448 | token, err := fetch_token() |
| 1449 | if err != nil { |
| 1450 | log.Warning("Cannot cannot fetch token: ", err) |
| 1451 | c.SetOAuthBearerTokenFailure(err.Error()) |
| 1452 | } else { |
| 1453 | setTokenError := c.SetOAuthBearerToken(*token) |
| 1454 | if setTokenError != nil { |
| 1455 | log.Warning("Cannot cannot set token: ", setTokenError) |
| 1456 | c.SetOAuthBearerTokenFailure(setTokenError.Error()) |
| 1457 | } |
| 1458 | } |
| 1459 | default: |
| 1460 | //TODO: Handle these? |
| 1461 | log.Debug("Dumping topic reader event on topic: ", topic, " for type: ", type_id, " evt: ", evt.String()) |
| 1462 | } |
| 1463 | |
| 1464 | case msg := <-event_chan: |
| 1465 | if msg == 0 { |
| 1466 | return |
| 1467 | } |
| 1468 | case <-time.After(1 * time.Second): |
| 1469 | if !running { |
| 1470 | return |
| 1471 | } |
| 1472 | } |
| 1473 | } |
| 1474 | }() |
| 1475 | |
| 1476 | go func() { |
| 1477 | for { |
| 1478 | //maxDur := 1 * time.Second |
| 1479 | for { |
| 1480 | select { |
| 1481 | case reader_ctrl := <-control_ch: |
| 1482 | if reader_ctrl.command == "EXIT" { |
| 1483 | event_chan <- 0 |
| 1484 | log.Debug("Topic reader on topic: ", topic, " for type: ", type_id, " - stopped") |
| 1485 | data_ch <- nil //Signal to job handler |
| 1486 | defer c.Close() |
| 1487 | return |
| 1488 | } |
| 1489 | default: |
| 1490 | |
| 1491 | ev := c.Poll(1000) |
| 1492 | if ev == nil { |
| 1493 | log.Debug("Topic Reader for type: ", type_id, " Nothing to consume on topic: ", topic) |
| 1494 | continue |
| 1495 | } |
| 1496 | switch e := ev.(type) { |
| 1497 | case *kafka.Message: |
| 1498 | var kmsg KafkaPayload |
| 1499 | kmsg.msg = e |
| 1500 | |
| 1501 | c.Commit() //TODO: Ok? |
| 1502 | |
| 1503 | //TODO: Check for exception |
| 1504 | data_ch <- &kmsg |
| 1505 | stats.in_msg_cnt++ |
| 1506 | log.Debug("Reader msg: ", &kmsg) |
| 1507 | log.Debug("Reader - data_ch ", data_ch) |
| 1508 | case kafka.Error: |
| 1509 | fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e) |
| 1510 | |
| 1511 | case kafka.OAuthBearerTokenRefresh: |
| 1512 | log.Debug("New consumer token needed: ", ev) |
| 1513 | token, err := fetch_token() |
| 1514 | if err != nil { |
| 1515 | log.Warning("Cannot cannot fetch token: ", err) |
| 1516 | c.SetOAuthBearerTokenFailure(err.Error()) |
| 1517 | } else { |
| 1518 | setTokenError := c.SetOAuthBearerToken(*token) |
| 1519 | if setTokenError != nil { |
| 1520 | log.Warning("Cannot cannot set token: ", setTokenError) |
| 1521 | c.SetOAuthBearerTokenFailure(setTokenError.Error()) |
| 1522 | } |
| 1523 | } |
| 1524 | default: |
| 1525 | fmt.Printf("Ignored %v\n", e) |
| 1526 | } |
| 1527 | |
| 1528 | // orig code |
| 1529 | // msg, err := c.ReadMessage(maxDur) |
| 1530 | // if err == nil { |
| 1531 | // var kmsg KafkaPayload |
| 1532 | // kmsg.msg = msg |
| 1533 | |
| 1534 | // c.Commit() //TODO: Ok? |
| 1535 | |
| 1536 | // //TODO: Check for exception |
| 1537 | // data_ch <- &kmsg |
| 1538 | // stats.in_msg_cnt++ |
| 1539 | // log.Debug("Reader msg: ", &kmsg) |
| 1540 | // log.Debug("Reader - data_ch ", data_ch) |
| 1541 | // } else { |
| 1542 | // log.Debug("Topic Reader for type: ", type_id, " Nothing to consume on topic: ", topic, ", reason: ", err) |
| 1543 | // } |
| 1544 | |
| 1545 | } |
| 1546 | } |
| 1547 | } |
| 1548 | }() |
| 1549 | } |
| 1550 | |
| 1551 | func start_topic_writer(control_ch chan WriterControl, data_ch chan *KafkaPayload) { |
| 1552 | |
| 1553 | var kafka_producer *kafka.Producer |
| 1554 | |
| 1555 | running := true |
| 1556 | log.Info("Topic writer starting") |
| 1557 | |
| 1558 | // Wait for kafka producer to become available - and be prepared to exit the writer |
| 1559 | for kafka_producer == nil { |
| 1560 | select { |
| 1561 | case writer_ctl := <-control_ch: |
| 1562 | if writer_ctl.command == "EXIT" { |
| 1563 | //ignore cmd |
| 1564 | } |
| 1565 | default: |
| 1566 | kafka_producer = start_producer() |
| 1567 | if kafka_producer == nil { |
| 1568 | log.Debug("Could not start kafka producer - retrying") |
| 1569 | time.Sleep(1 * time.Second) |
| 1570 | } else { |
| 1571 | log.Debug("Kafka producer started") |
| 1572 | //defer kafka_producer.Close() |
| 1573 | } |
| 1574 | } |
| 1575 | } |
| 1576 | |
| 1577 | var event_chan = make(chan int) |
| 1578 | go func() { |
| 1579 | for { |
| 1580 | select { |
| 1581 | case evt := <-kafka_producer.Events(): |
| 1582 | //TODO: Handle this? Probably yes, look if the msg was delivered and if not, resend? |
| 1583 | switch evt.(type) { |
| 1584 | case *kafka.Message: |
| 1585 | m := evt.(*kafka.Message) |
| 1586 | |
| 1587 | if m.TopicPartition.Error != nil { |
| 1588 | log.Debug("Dumping topic writer event, failed: ", m.TopicPartition.Error) |
| 1589 | } else { |
| 1590 | log.Debug("Dumping topic writer event, message to topic: ", *m.TopicPartition.Topic, " at offset: ", m.TopicPartition.Offset, " at partition: ", m.TopicPartition.Partition) |
| 1591 | } |
| 1592 | case kafka.Error: |
| 1593 | log.Debug("Dumping topic writer event, error: ", evt) |
| 1594 | case kafka.OAuthBearerTokenRefresh: |
| 1595 | log.Debug("New producer token needed: ", evt) |
| 1596 | token, err := fetch_token() |
| 1597 | if err != nil { |
| 1598 | log.Warning("Cannot cannot fetch token: ", err) |
| 1599 | kafka_producer.SetOAuthBearerTokenFailure(err.Error()) |
| 1600 | } else { |
| 1601 | setTokenError := kafka_producer.SetOAuthBearerToken(*token) |
| 1602 | if setTokenError != nil { |
| 1603 | log.Warning("Cannot cannot set token: ", setTokenError) |
| 1604 | kafka_producer.SetOAuthBearerTokenFailure(setTokenError.Error()) |
| 1605 | } |
| 1606 | } |
| 1607 | default: |
| 1608 | log.Debug("Dumping topic writer event, unknown: ", evt) |
| 1609 | } |
| 1610 | |
| 1611 | case msg := <-event_chan: |
| 1612 | if msg == 0 { |
| 1613 | return |
| 1614 | } |
| 1615 | case <-time.After(1 * time.Second): |
| 1616 | if !running { |
| 1617 | return |
| 1618 | } |
| 1619 | } |
| 1620 | } |
| 1621 | }() |
| 1622 | go func() { |
| 1623 | for { |
| 1624 | select { |
| 1625 | case writer_ctl := <-control_ch: |
| 1626 | if writer_ctl.command == "EXIT" { |
| 1627 | // ignore - wait for channel signal |
| 1628 | } |
| 1629 | |
| 1630 | case kmsg := <-data_ch: |
| 1631 | if kmsg == nil { |
| 1632 | event_chan <- 0 |
| 1633 | // TODO: Close producer? |
| 1634 | log.Info("Topic writer stopped by channel signal - start_topic_writer") |
| 1635 | defer kafka_producer.Close() |
| 1636 | return |
| 1637 | } |
| 1638 | |
| 1639 | retries := 10 |
| 1640 | msg_ok := false |
| 1641 | var err error |
| 1642 | for retry := 1; retry <= retries && msg_ok == false; retry++ { |
| 1643 | err = kafka_producer.Produce(&kafka.Message{ |
| 1644 | TopicPartition: kafka.TopicPartition{Topic: &kmsg.topic, Partition: kafka.PartitionAny}, |
| 1645 | Value: kmsg.msg.Value, Key: kmsg.msg.Key}, nil) |
| 1646 | |
| 1647 | if err == nil { |
| 1648 | incr_out_msg_cnt(kmsg.jobid) |
| 1649 | msg_ok = true |
| 1650 | log.Debug("Topic writer, msg sent ok on topic: ", kmsg.topic) |
| 1651 | } else { |
| 1652 | log.Info("Topic writer failed to send message on topic: ", kmsg.topic, " - Retrying. Error details: ", err) |
| 1653 | time.Sleep(time.Duration(retry) * time.Second) |
| 1654 | } |
| 1655 | } |
| 1656 | if !msg_ok { |
| 1657 | //TODO: Retry sending msg? |
| 1658 | log.Error("Topic writer failed to send message on topic: ", kmsg.topic, " - Msg discarded. Error details: ", err) |
| 1659 | } |
| 1660 | case <-time.After(1000 * time.Millisecond): |
| 1661 | if !running { |
| 1662 | return |
| 1663 | } |
| 1664 | } |
| 1665 | } |
| 1666 | }() |
| 1667 | } |
| 1668 | |
| 1669 | func create_kafka_consumer(type_id string, gid string, cid string) *kafka.Consumer { |
| 1670 | var cm kafka.ConfigMap |
| 1671 | if creds_grant_type == "" { |
| 1672 | log.Info("Creating kafka SASL plain text consumer for type: ", type_id) |
| 1673 | cm = kafka.ConfigMap{ |
| 1674 | "bootstrap.servers": bootstrapserver, |
| 1675 | "group.id": gid, |
| 1676 | "client.id": cid, |
| 1677 | "auto.offset.reset": "latest", |
| 1678 | "enable.auto.commit": false, |
| 1679 | //"auto.commit.interval.ms": 5000, |
| 1680 | } |
| 1681 | } else { |
| 1682 | log.Info("Creating kafka SASL plain text consumer for type: ", type_id) |
| 1683 | cm = kafka.ConfigMap{ |
| 1684 | "bootstrap.servers": bootstrapserver, |
| 1685 | "group.id": gid, |
| 1686 | "client.id": cid, |
| 1687 | "auto.offset.reset": "latest", |
| 1688 | "enable.auto.commit": false, |
| 1689 | "sasl.mechanism": "OAUTHBEARER", |
| 1690 | "security.protocol": "SASL_PLAINTEXT", |
| 1691 | } |
| 1692 | } |
| 1693 | c, err := kafka.NewConsumer(&cm) |
| 1694 | |
| 1695 | //TODO: How to handle autocommit or commit message by message |
| 1696 | //TODO: Make arg to kafka configurable |
| 1697 | |
| 1698 | if err != nil { |
| 1699 | log.Error("Cannot create kafka consumer for type: ", type_id, ", error details: ", err) |
| 1700 | return nil |
| 1701 | } |
| 1702 | |
| 1703 | //c.Commit() |
| 1704 | log.Info("Created kafka consumer for type: ", type_id, " OK") |
| 1705 | return c |
| 1706 | } |
| 1707 | |
| 1708 | // Start kafka producer |
| 1709 | func start_producer() *kafka.Producer { |
| 1710 | log.Info("Creating kafka producer") |
| 1711 | |
| 1712 | var cm kafka.ConfigMap |
| 1713 | if creds_grant_type == "" { |
| 1714 | log.Info("Creating kafka SASL plain text producer") |
| 1715 | cm = kafka.ConfigMap{ |
| 1716 | "bootstrap.servers": bootstrapserver, |
| 1717 | } |
| 1718 | } else { |
| 1719 | log.Info("Creating kafka SASL plain text producer") |
| 1720 | cm = kafka.ConfigMap{ |
| 1721 | "bootstrap.servers": bootstrapserver, |
| 1722 | "sasl.mechanism": "OAUTHBEARER", |
| 1723 | "security.protocol": "SASL_PLAINTEXT", |
| 1724 | } |
| 1725 | } |
| 1726 | |
| 1727 | p, err := kafka.NewProducer(&cm) |
| 1728 | if err != nil { |
| 1729 | log.Error("Cannot create kafka producer,", err) |
| 1730 | return nil |
| 1731 | } |
| 1732 | return p |
| 1733 | } |
| 1734 | |
| 1735 | func start_adminclient() *kafka.AdminClient { |
| 1736 | log.Info("Creating kafka admin client") |
| 1737 | a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": bootstrapserver}) |
| 1738 | if err != nil { |
| 1739 | log.Error("Cannot create kafka admin client,", err) |
| 1740 | return nil |
| 1741 | } |
| 1742 | return a |
| 1743 | } |
| 1744 | |
| 1745 | func create_minio_client(id string) (*minio.Client, *error) { |
| 1746 | log.Debug("Get minio client") |
| 1747 | minio_client, err := minio.New(filestore_server, &minio.Options{ |
| 1748 | Secure: false, |
| 1749 | Creds: credentials.NewStaticV4(filestore_user, filestore_pwd, ""), |
| 1750 | }) |
| 1751 | if err != nil { |
| 1752 | log.Error("Cannot create minio client, ", err) |
| 1753 | return nil, &err |
| 1754 | } |
| 1755 | return minio_client, nil |
| 1756 | } |
| 1757 | |
| 1758 | func incr_out_msg_cnt(jobid string) { |
| 1759 | j, ok := InfoJobs[jobid] |
| 1760 | if ok { |
| 1761 | j.statistics.out_msg_cnt++ |
| 1762 | } |
| 1763 | } |
| 1764 | |
| 1765 | func start_job_xml_file_data(type_id string, control_ch chan JobControl, data_in_ch chan *KafkaPayload, data_out_channel chan *KafkaPayload, fvolume string, fsbucket string) { |
| 1766 | |
| 1767 | log.Info("Type job", type_id, " started") |
| 1768 | |
| 1769 | filters := make(map[string]Filter) |
| 1770 | topic_list := make(map[string]string) |
| 1771 | var mc *minio.Client |
| 1772 | const mc_id = "mc_" + "start_job_xml_file_data" |
| 1773 | running := true |
| 1774 | for { |
| 1775 | select { |
| 1776 | case job_ctl := <-control_ch: |
| 1777 | log.Debug("Type job ", type_id, " new cmd received ", job_ctl.command) |
| 1778 | switch job_ctl.command { |
| 1779 | case "EXIT": |
| 1780 | //ignore cmd - handled by channel signal |
| 1781 | case "ADD-FILTER": |
| 1782 | filters[job_ctl.filter.JobId] = job_ctl.filter |
| 1783 | log.Debug("Type job ", type_id, " updated, topic: ", job_ctl.filter.OutputTopic, " jobid: ", job_ctl.filter.JobId) |
| 1784 | |
| 1785 | tmp_topic_list := make(map[string]string) |
| 1786 | for k, v := range topic_list { |
| 1787 | tmp_topic_list[k] = v |
| 1788 | } |
| 1789 | tmp_topic_list[job_ctl.filter.JobId] = job_ctl.filter.OutputTopic |
| 1790 | topic_list = tmp_topic_list |
| 1791 | case "REMOVE-FILTER": |
| 1792 | log.Debug("Type job ", type_id, " removing job: ", job_ctl.filter.JobId) |
| 1793 | |
| 1794 | tmp_topic_list := make(map[string]string) |
| 1795 | for k, v := range topic_list { |
| 1796 | tmp_topic_list[k] = v |
| 1797 | } |
| 1798 | delete(tmp_topic_list, job_ctl.filter.JobId) |
| 1799 | topic_list = tmp_topic_list |
| 1800 | } |
| 1801 | |
| 1802 | case msg := <-data_in_ch: |
| 1803 | if msg == nil { |
| 1804 | log.Info("Type job ", type_id, " stopped by channel signal - start_job_xml_file_data") |
| 1805 | |
| 1806 | running = false |
| 1807 | return |
| 1808 | } |
| 1809 | if fsbucket != "" && fvolume == "" { |
| 1810 | if mc == nil { |
| 1811 | var err *error |
| 1812 | mc, err = create_minio_client(mc_id) |
| 1813 | if err != nil { |
| 1814 | log.Debug("Cannot create minio client for type job: ", type_id) |
| 1815 | } |
| 1816 | } |
| 1817 | } |
| 1818 | //TODO: Sort processed file conversions in order (FIFO) |
| 1819 | jobLimiterChan <- struct{}{} |
| 1820 | go run_xml_job(type_id, msg, "gz", data_out_channel, topic_list, jobLimiterChan, fvolume, fsbucket, mc, mc_id) |
| 1821 | |
| 1822 | case <-time.After(1 * time.Second): |
| 1823 | if !running { |
| 1824 | return |
| 1825 | } |
| 1826 | } |
| 1827 | } |
| 1828 | //}() |
| 1829 | } |
| 1830 | |
| 1831 | func run_xml_job(type_id string, msg *KafkaPayload, outputCompression string, data_out_channel chan *KafkaPayload, topic_list map[string]string, jobLimiterChan chan struct{}, fvolume string, fsbucket string, mc *minio.Client, mc_id string) { |
| 1832 | defer func() { |
| 1833 | <-jobLimiterChan |
| 1834 | }() |
| 1835 | PrintMemUsage() |
| 1836 | |
| 1837 | if fvolume == "" && fsbucket == "" { |
| 1838 | log.Error("Type job: ", type_id, " cannot run, neither file volume nor filestore is set - discarding message") |
| 1839 | return |
| 1840 | } else if (fvolume != "") && (fsbucket != "") { |
| 1841 | log.Error("Type job: ", type_id, " cannot run with output to both file volume and filestore bucket - discarding message") |
| 1842 | return |
| 1843 | } |
| 1844 | |
| 1845 | start := time.Now() |
| 1846 | var evt_data XmlFileEventHeader |
| 1847 | |
| 1848 | err := jsoniter.Unmarshal(msg.msg.Value, &evt_data) |
| 1849 | if err != nil { |
| 1850 | log.Error("Cannot parse XmlFileEventHeader for type job: ", type_id, " - discarding message, error details", err) |
| 1851 | return |
| 1852 | } |
| 1853 | log.Debug("Unmarshal file-collect event for type job: ", type_id, " time: ", time.Since(start).String()) |
| 1854 | |
| 1855 | var reader io.Reader |
| 1856 | |
| 1857 | //TODO -> config |
| 1858 | INPUTBUCKET := "ropfiles" |
| 1859 | |
| 1860 | filename := "" |
| 1861 | if fvolume != "" { |
| 1862 | filename = fvolume + "/" + evt_data.Name |
| 1863 | fi, err := os.Open(filename) |
| 1864 | |
| 1865 | if err != nil { |
| 1866 | log.Error("File ", filename, " - cannot be opened for type job: ", type_id, " - discarding message, error details: ", err) |
| 1867 | return |
| 1868 | } |
| 1869 | defer fi.Close() |
| 1870 | reader = fi |
| 1871 | //} else if evt_data.ObjectStoreBucket != "" { |
| 1872 | } else { |
| 1873 | filename = evt_data.Name |
| 1874 | if mc != nil { |
| 1875 | tctx := context.Background() |
| 1876 | mr, err := mc.GetObject(tctx, INPUTBUCKET, filename, minio.GetObjectOptions{}) |
| 1877 | if err != nil { |
| 1878 | log.Error("Cannot get: ", filename, " for type job: ", type_id, " - discarding message, error details: ", err) |
| 1879 | return |
| 1880 | } |
| 1881 | if mr == nil { |
| 1882 | log.Error("Cannot get: ", filename, " for type job: ", type_id, " - minio get object returned null reader - discarding message, error details: ", err) |
| 1883 | return |
| 1884 | } |
| 1885 | reader = mr |
| 1886 | defer mr.Close() |
| 1887 | } else { |
| 1888 | log.Error("Cannot get: ", filename, " for type job: ", type_id, " - no minio client - discarding message") |
| 1889 | return |
| 1890 | } |
| 1891 | } |
| 1892 | |
| 1893 | if reader == nil { |
| 1894 | log.Error("Cannot get: ", filename, " - null reader") |
| 1895 | return |
| 1896 | } |
| 1897 | var file_bytes []byte |
| 1898 | if strings.HasSuffix(filename, "gz") { |
| 1899 | start := time.Now() |
| 1900 | var buf3 bytes.Buffer |
| 1901 | errb := gunzipReaderToWriter(&buf3, reader) |
| 1902 | if errb != nil { |
| 1903 | log.Error("Cannot gunzip file ", filename, " - discarding message, ", errb) |
| 1904 | return |
| 1905 | } |
| 1906 | file_bytes = buf3.Bytes() |
| 1907 | log.Debug("Gunzip file: ", filename, "time:", time.Since(start).String(), "len: ", len(file_bytes)) |
| 1908 | |
| 1909 | } else { |
| 1910 | var buf3 bytes.Buffer |
| 1911 | _, err2 := io.Copy(&buf3, reader) |
| 1912 | if err2 != nil { |
| 1913 | log.Error("File ", filename, " - cannot be read, discarding message, ", err) |
| 1914 | return |
| 1915 | } |
| 1916 | file_bytes = buf3.Bytes() |
| 1917 | } |
| 1918 | start = time.Now() |
| 1919 | b, err := xml_to_json_conv(&file_bytes, &evt_data) |
| 1920 | if err != nil { |
| 1921 | log.Error("Cannot convert file ", evt_data.Name, " - discarding message, ", err) |
| 1922 | return |
| 1923 | } |
| 1924 | log.Debug("Converted file to json: ", filename, " time", time.Since(start).String(), "len;", len(b)) |
| 1925 | |
| 1926 | new_fn := evt_data.Name + os.Getenv("KP") + ".json" |
| 1927 | if outputCompression == "gz" { |
| 1928 | new_fn = new_fn + ".gz" |
| 1929 | start = time.Now() |
| 1930 | var buf bytes.Buffer |
| 1931 | err = gzipWrite(&buf, &b) |
| 1932 | if err != nil { |
| 1933 | log.Error("Cannot gzip file ", new_fn, " - discarding message, ", err) |
| 1934 | return |
| 1935 | } |
| 1936 | b = buf.Bytes() |
| 1937 | log.Debug("Gzip file: ", new_fn, " time: ", time.Since(start).String(), "len:", len(file_bytes)) |
| 1938 | |
| 1939 | } |
| 1940 | start = time.Now() |
| 1941 | |
| 1942 | if fvolume != "" { |
| 1943 | //Store on disk |
| 1944 | err = os.WriteFile(fvolume+"/"+new_fn, b, 0644) |
| 1945 | if err != nil { |
| 1946 | log.Error("Cannot write file ", new_fn, " - discarding message,", err) |
| 1947 | return |
| 1948 | } |
| 1949 | log.Debug("Write file to disk: "+new_fn, "time: ", time.Since(start).String(), " len: ", len(file_bytes)) |
| 1950 | } else if fsbucket != "" { |
| 1951 | // Store in minio |
| 1952 | objectName := new_fn |
| 1953 | if mc != nil { |
| 1954 | |
| 1955 | contentType := "application/json" |
| 1956 | if strings.HasSuffix(objectName, ".gz") { |
| 1957 | contentType = "application/gzip" |
| 1958 | } |
| 1959 | |
| 1960 | // Upload the xml file with PutObject |
| 1961 | r := bytes.NewReader(b) |
| 1962 | tctx := context.Background() |
| 1963 | if check_minio_bucket(mc, mc_id, fsbucket) == false { |
| 1964 | err := create_minio_bucket(mc, mc_id, fsbucket) |
| 1965 | if err != nil { |
| 1966 | log.Error("Cannot create bucket: ", fsbucket, ", ", err) |
| 1967 | return |
| 1968 | } |
| 1969 | } |
| 1970 | ok := false |
| 1971 | for i := 1; i < 64 && ok == false; i = i * 2 { |
| 1972 | info, err := mc.PutObject(tctx, fsbucket, objectName, r, int64(len(b)), minio.PutObjectOptions{ContentType: contentType}) |
| 1973 | if err != nil { |
| 1974 | |
| 1975 | if i == 1 { |
| 1976 | log.Warn("Cannot upload (first attempt): ", objectName, ", ", err) |
| 1977 | } else { |
| 1978 | log.Warn("Cannot upload (retry): ", objectName, ", ", err) |
| 1979 | } |
| 1980 | time.Sleep(time.Duration(i) * time.Second) |
| 1981 | } else { |
| 1982 | log.Debug("Store ", objectName, " in filestore, time: ", time.Since(start).String()) |
| 1983 | log.Debug("Successfully uploaded: ", objectName, " of size:", info.Size) |
| 1984 | ok = true |
| 1985 | } |
| 1986 | } |
| 1987 | if !ok { |
| 1988 | log.Error("Cannot upload : ", objectName, ", ", err) |
| 1989 | } |
| 1990 | } else { |
| 1991 | log.Error("Cannot upload: ", objectName, ", no client") |
| 1992 | } |
| 1993 | } |
| 1994 | |
| 1995 | start = time.Now() |
| 1996 | if fvolume == "" { |
| 1997 | var fde FileDownloadedEvt |
| 1998 | fde.Filename = new_fn |
| 1999 | j, err := jsoniter.Marshal(fde) |
| 2000 | |
| 2001 | if err != nil { |
| 2002 | log.Error("Cannot marshal FileDownloadedEvt - discarding message, ", err) |
| 2003 | return |
| 2004 | } |
| 2005 | msg.msg.Value = j |
| 2006 | } else { |
| 2007 | var fde FileDownloadedEvt |
| 2008 | fde.Filename = new_fn |
| 2009 | j, err := jsoniter.Marshal(fde) |
| 2010 | |
| 2011 | if err != nil { |
| 2012 | log.Error("Cannot marshal FileDownloadedEvt - discarding message, ", err) |
| 2013 | return |
| 2014 | } |
| 2015 | msg.msg.Value = j |
| 2016 | } |
| 2017 | msg.msg.Key = []byte("\"" + evt_data.SourceName + "\"") |
| 2018 | log.Debug("Marshal file-collect event ", time.Since(start).String()) |
| 2019 | |
| 2020 | for k, v := range topic_list { |
| 2021 | var kmsg *KafkaPayload = new(KafkaPayload) |
| 2022 | kmsg.msg = msg.msg |
| 2023 | kmsg.topic = v |
| 2024 | kmsg.jobid = k |
| 2025 | data_out_channel <- kmsg |
| 2026 | } |
| 2027 | } |
| 2028 | |
| 2029 | func xml_to_json_conv(f_byteValue *[]byte, xfeh *XmlFileEventHeader) ([]byte, error) { |
| 2030 | var f MeasCollecFile |
| 2031 | start := time.Now() |
| 2032 | err := xml.Unmarshal(*f_byteValue, &f) |
| 2033 | if err != nil { |
| 2034 | return nil, errors.New("Cannot unmarshal xml-file") |
| 2035 | } |
| 2036 | log.Debug("Unmarshal xml file XmlFileEvent: ", time.Since(start).String()) |
| 2037 | |
| 2038 | start = time.Now() |
| 2039 | var pmfile PMJsonFile |
| 2040 | |
| 2041 | //TODO: Fill in more values |
| 2042 | pmfile.Event.Perf3GppFields.Perf3GppFieldsVersion = "1.0" |
| 2043 | pmfile.Event.Perf3GppFields.MeasDataCollection.GranularityPeriod = 900 |
| 2044 | pmfile.Event.Perf3GppFields.MeasDataCollection.MeasuredEntityUserName = "" |
| 2045 | pmfile.Event.Perf3GppFields.MeasDataCollection.MeasuredEntityDn = f.FileHeader.FileSender.LocalDn |
| 2046 | pmfile.Event.Perf3GppFields.MeasDataCollection.MeasuredEntitySoftwareVersion = f.MeasData.ManagedElement.SwVersion |
| 2047 | |
| 2048 | for _, it := range f.MeasData.MeasInfo { |
| 2049 | var mili MeasInfoList |
| 2050 | mili.MeasInfoID.SMeasInfoID = it.MeasInfoId |
| 2051 | for _, jt := range it.MeasType { |
| 2052 | mili.MeasTypes.SMeasTypesList = append(mili.MeasTypes.SMeasTypesList, jt.Text) |
| 2053 | } |
| 2054 | for _, jt := range it.MeasValue { |
| 2055 | var mv MeasValues |
| 2056 | mv.MeasObjInstID = jt.MeasObjLdn |
| 2057 | mv.SuspectFlag = jt.Suspect |
| 2058 | if jt.Suspect == "" { |
| 2059 | mv.SuspectFlag = "false" |
| 2060 | } |
| 2061 | for _, kt := range jt.R { |
| 2062 | ni, _ := strconv.Atoi(kt.P) |
| 2063 | nv := kt.Text |
| 2064 | mr := MeasResults{ni, nv} |
| 2065 | mv.MeasResultsList = append(mv.MeasResultsList, mr) |
| 2066 | } |
| 2067 | mili.MeasValuesList = append(mili.MeasValuesList, mv) |
| 2068 | } |
| 2069 | |
| 2070 | pmfile.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList = append(pmfile.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList, mili) |
| 2071 | } |
| 2072 | |
| 2073 | pmfile.Event.Perf3GppFields.MeasDataCollection.GranularityPeriod = 900 |
| 2074 | |
| 2075 | //TODO: Fill more values |
| 2076 | pmfile.Event.CommonEventHeader.Domain = "" //xfeh.Domain |
| 2077 | pmfile.Event.CommonEventHeader.EventID = "" //xfeh.EventID |
| 2078 | pmfile.Event.CommonEventHeader.Sequence = 0 //xfeh.Sequence |
| 2079 | pmfile.Event.CommonEventHeader.EventName = "" //xfeh.EventName |
| 2080 | pmfile.Event.CommonEventHeader.SourceName = xfeh.SourceName |
| 2081 | pmfile.Event.CommonEventHeader.ReportingEntityName = "" //xfeh.ReportingEntityName |
| 2082 | pmfile.Event.CommonEventHeader.Priority = "" //xfeh.Priority |
| 2083 | pmfile.Event.CommonEventHeader.StartEpochMicrosec = xfeh.StartEpochMicrosec |
| 2084 | pmfile.Event.CommonEventHeader.LastEpochMicrosec = xfeh.LastEpochMicrosec |
| 2085 | pmfile.Event.CommonEventHeader.Version = "" //xfeh.Version |
| 2086 | pmfile.Event.CommonEventHeader.VesEventListenerVersion = "" //xfeh.VesEventListenerVersion |
| 2087 | pmfile.Event.CommonEventHeader.TimeZoneOffset = xfeh.TimeZoneOffset |
| 2088 | |
| 2089 | log.Debug("Convert xml to json : ", time.Since(start).String()) |
| 2090 | |
| 2091 | start = time.Now() |
| 2092 | json, err := jsoniter.Marshal(pmfile) |
| 2093 | log.Debug("Marshal json : ", time.Since(start).String()) |
| 2094 | |
| 2095 | if err != nil { |
| 2096 | return nil, errors.New("Cannot marshal converted json") |
| 2097 | } |
| 2098 | return json, nil |
| 2099 | } |
| 2100 | |
| 2101 | func start_job_json_file_data(type_id string, control_ch chan JobControl, data_in_ch chan *KafkaPayload, data_out_channel chan *KafkaPayload, objectstore bool) { |
| 2102 | |
| 2103 | log.Info("Type job", type_id, " started") |
| 2104 | |
| 2105 | filters := make(map[string]Filter) |
| 2106 | filterParams_list := make(map[string]FilterMaps) |
| 2107 | // ch_list := make(map[string]chan *KafkaPayload) |
| 2108 | topic_list := make(map[string]string) |
| 2109 | var mc *minio.Client |
| 2110 | const mc_id = "mc_" + "start_job_json_file_data" |
| 2111 | running := true |
| 2112 | for { |
| 2113 | select { |
| 2114 | case job_ctl := <-control_ch: |
| 2115 | log.Debug("Type job ", type_id, " new cmd received ", job_ctl.command) |
| 2116 | switch job_ctl.command { |
| 2117 | case "EXIT": |
| 2118 | //ignore cmd - handled by channel signal |
| 2119 | case "ADD-FILTER": |
| 2120 | //TODO: Refactor... |
| 2121 | filters[job_ctl.filter.JobId] = job_ctl.filter |
| 2122 | log.Debug("Type job ", type_id, " updated, topic: ", job_ctl.filter.OutputTopic, " jobid: ", job_ctl.filter.JobId) |
| 2123 | |
| 2124 | tmp_filterParams_list := make(map[string]FilterMaps) |
| 2125 | for k, v := range filterParams_list { |
| 2126 | tmp_filterParams_list[k] = v |
| 2127 | } |
| 2128 | tmp_filterParams_list[job_ctl.filter.JobId] = job_ctl.filter.filter |
| 2129 | filterParams_list = tmp_filterParams_list |
| 2130 | |
| 2131 | tmp_topic_list := make(map[string]string) |
| 2132 | for k, v := range topic_list { |
| 2133 | tmp_topic_list[k] = v |
| 2134 | } |
| 2135 | tmp_topic_list[job_ctl.filter.JobId] = job_ctl.filter.OutputTopic |
| 2136 | topic_list = tmp_topic_list |
| 2137 | case "REMOVE-FILTER": |
| 2138 | //TODO: Refactor... |
| 2139 | log.Debug("Type job ", type_id, " removing job: ", job_ctl.filter.JobId) |
| 2140 | |
| 2141 | tmp_filterParams_list := make(map[string]FilterMaps) |
| 2142 | for k, v := range filterParams_list { |
| 2143 | tmp_filterParams_list[k] = v |
| 2144 | } |
| 2145 | delete(tmp_filterParams_list, job_ctl.filter.JobId) |
| 2146 | filterParams_list = tmp_filterParams_list |
| 2147 | |
| 2148 | tmp_topic_list := make(map[string]string) |
| 2149 | for k, v := range topic_list { |
| 2150 | tmp_topic_list[k] = v |
| 2151 | } |
| 2152 | delete(tmp_topic_list, job_ctl.filter.JobId) |
| 2153 | topic_list = tmp_topic_list |
| 2154 | } |
| 2155 | |
| 2156 | case msg := <-data_in_ch: |
| 2157 | if msg == nil { |
| 2158 | log.Info("Type job ", type_id, " stopped by channel signal - start_job_xml_file_data") |
| 2159 | |
| 2160 | running = false |
| 2161 | return |
| 2162 | } |
| 2163 | if objectstore { |
| 2164 | if mc == nil { |
| 2165 | var err *error |
| 2166 | mc, err = create_minio_client(mc_id) |
| 2167 | if err != nil { |
| 2168 | log.Debug("Cannot create minio client for type job: ", type_id) |
| 2169 | } |
| 2170 | } |
| 2171 | } |
| 2172 | //TODO: Sort processed file conversions in order (FIFO) |
| 2173 | jobLimiterChan <- struct{}{} |
| 2174 | go run_json_job(type_id, msg, "", filterParams_list, topic_list, data_out_channel, jobLimiterChan, mc, mc_id, objectstore) |
| 2175 | |
| 2176 | case <-time.After(1 * time.Second): |
| 2177 | if !running { |
| 2178 | return |
| 2179 | } |
| 2180 | } |
| 2181 | } |
| 2182 | } |
| 2183 | |
| 2184 | func run_json_job(type_id string, msg *KafkaPayload, outputCompression string, filterList map[string]FilterMaps, topic_list map[string]string, data_out_channel chan *KafkaPayload, jobLimiterChan chan struct{}, mc *minio.Client, mc_id string, objectstore bool) { |
| 2185 | |
| 2186 | //Release job limit |
| 2187 | defer func() { |
| 2188 | <-jobLimiterChan |
| 2189 | }() |
| 2190 | |
| 2191 | PrintMemUsage() |
| 2192 | |
| 2193 | var evt_data FileDownloadedEvt |
| 2194 | err := jsoniter.Unmarshal(msg.msg.Value, &evt_data) |
| 2195 | if err != nil { |
| 2196 | log.Error("Cannot parse FileDownloadedEvt for type job: ", type_id, " - discarding message, error details: ", err) |
| 2197 | return |
| 2198 | } |
| 2199 | log.Debug("FileDownloadedEvt for job: ", type_id, " file: ", evt_data.Filename) |
| 2200 | |
| 2201 | var reader io.Reader |
| 2202 | |
| 2203 | //TODO -> config |
| 2204 | //INPUTBUCKET := "json-file-ready" |
| 2205 | INPUTBUCKET := "pm-files-json" |
| 2206 | filename := "" |
| 2207 | if objectstore == false { |
| 2208 | filename = files_volume + "/" + evt_data.Filename |
| 2209 | fi, err := os.Open(filename) |
| 2210 | |
| 2211 | if err != nil { |
| 2212 | log.Error("File: ", filename, "for type job: ", type_id, " - cannot be opened -discarding message - error details: ", err) |
| 2213 | return |
| 2214 | } |
| 2215 | defer fi.Close() |
| 2216 | reader = fi |
| 2217 | } else { |
| 2218 | filename = "/" + evt_data.Filename |
| 2219 | if mc != nil { |
| 2220 | if check_minio_bucket(mc, mc_id, INPUTBUCKET) == false { |
| 2221 | log.Error("Bucket not available for reading in type job: ", type_id, "bucket: ", INPUTBUCKET) |
| 2222 | return |
| 2223 | } |
| 2224 | tctx := context.Background() |
| 2225 | mr, err := mc.GetObject(tctx, INPUTBUCKET, filename, minio.GetObjectOptions{}) |
| 2226 | if err != nil { |
| 2227 | log.Error("Obj: ", filename, "for type job: ", type_id, " - cannot be fetched from bucket: ", INPUTBUCKET, " - discarding message, error details: ", err) |
| 2228 | return |
| 2229 | } |
| 2230 | reader = mr |
| 2231 | defer mr.Close() |
| 2232 | } else { |
| 2233 | log.Error("Cannot get obj: ", filename, "for type job: ", type_id, " from bucket: ", INPUTBUCKET, " - no client") |
| 2234 | return |
| 2235 | } |
| 2236 | } |
| 2237 | |
| 2238 | var data *[]byte |
| 2239 | if strings.HasSuffix(filename, "gz") { |
| 2240 | start := time.Now() |
| 2241 | var buf2 bytes.Buffer |
| 2242 | errb := gunzipReaderToWriter(&buf2, reader) |
| 2243 | if errb != nil { |
| 2244 | log.Error("Cannot decompress file/obj ", filename, "for type job: ", type_id, " - discarding message, error details", errb) |
| 2245 | return |
| 2246 | } |
| 2247 | d := buf2.Bytes() |
| 2248 | data = &d |
| 2249 | log.Debug("Decompress file/obj ", filename, "for type job: ", type_id, " time:", time.Since(start).String()) |
| 2250 | } else { |
| 2251 | |
| 2252 | start := time.Now() |
| 2253 | d, err := io.ReadAll(reader) |
| 2254 | if err != nil { |
| 2255 | log.Error("Cannot read file/obj: ", filename, "for type job: ", type_id, " - discarding message, error details", err) |
| 2256 | return |
| 2257 | } |
| 2258 | data = &d |
| 2259 | |
| 2260 | log.Debug("Read file/obj: ", filename, "for type job: ", type_id, " time: ", time.Since(start).String()) |
| 2261 | } |
| 2262 | |
| 2263 | // The code was moved to inside the below for loop - a deep copy of PM PMJsonFile is need to keep it outside |
| 2264 | // var pmfile PMJsonFile |
| 2265 | // start := time.Now() |
| 2266 | // err = jsoniter.Unmarshal(*data, &pmfile) |
| 2267 | // log.Debug("Json unmarshal obj: ", filename, " time: ", time.Since(start).String()) |
| 2268 | |
| 2269 | // if err != nil { |
| 2270 | // log.Error("Msg could not be unmarshalled - discarding message, obj: ", filename, " - error details:", err) |
| 2271 | // return |
| 2272 | // } |
| 2273 | for k, v := range filterList { |
| 2274 | |
| 2275 | var pmfile PMJsonFile |
| 2276 | start := time.Now() |
| 2277 | err = jsoniter.Unmarshal(*data, &pmfile) |
| 2278 | log.Debug("Json unmarshal obj: ", filename, " time: ", time.Since(start).String()) |
| 2279 | |
| 2280 | if err != nil { |
| 2281 | log.Error("Msg could not be unmarshalled - discarding message, obj: ", filename, " - error details:", err) |
| 2282 | return |
| 2283 | } |
| 2284 | |
| 2285 | var kmsg *KafkaPayload = new(KafkaPayload) |
| 2286 | kmsg.msg = new(kafka.Message) |
| 2287 | kmsg.msg.Key = []byte("\"" + pmfile.Event.CommonEventHeader.SourceName + "\"") |
| 2288 | log.Debug("topic:", topic_list[k]) |
| 2289 | log.Debug("sourceNameMap:", v.sourceNameMap) |
| 2290 | log.Debug("measObjClassMap:", v.measObjClassMap) |
| 2291 | log.Debug("measObjInstIdsMap:", v.measObjInstIdsMap) |
| 2292 | log.Debug("measTypesMap:", v.measTypesMap) |
| 2293 | //BMX if len(v.sourceNameMap) > 0 || len(v.measObjInstIdsMap) > 0 || len(v.measTypesMap) > 0 || len(v.measObjClassMap) > 0 { |
| 2294 | b := json_pm_filter_to_byte(evt_data.Filename, &pmfile, v.sourceNameMap, v.measObjClassMap, v.measObjInstIdsMap, v.measTypesMap) |
| 2295 | if b == nil { |
| 2296 | log.Info("File/obj ", filename, "for job: ", k, " - empty after filering, discarding message ") |
| 2297 | return |
| 2298 | } |
| 2299 | kmsg.msg.Value = *b |
| 2300 | //BMX} |
| 2301 | |
| 2302 | // if outputCompression == "json.gz" { |
| 2303 | // start := time.Now() |
| 2304 | // var buf bytes.Buffer |
| 2305 | // err := gzipWrite(&buf, &kmsg.msg.Value) |
| 2306 | // if err != nil { |
| 2307 | // log.Error("Cannot compress file/obj ", filename, "for job: ", job_id, " - discarding message, error details", err) |
| 2308 | // return |
| 2309 | |
| 2310 | // } |
| 2311 | // kmsg.msg.Value = buf.Bytes() |
| 2312 | // log.Debug("Compress file/obj ", filename, "for job: ", job_id, " time:", time.Since(start).String()) |
| 2313 | // } |
| 2314 | kmsg.topic = topic_list[k] |
| 2315 | kmsg.jobid = k |
| 2316 | |
| 2317 | data_out_channel <- kmsg |
| 2318 | } |
| 2319 | |
| 2320 | } |
| 2321 | |
| 2322 | func json_pm_filter_to_byte(resource string, data *PMJsonFile, sourceNameMap map[string]bool, measObjClassMap map[string]bool, measObjInstIdsMap map[string]bool, measTypesMap map[string]bool) *[]byte { |
| 2323 | |
| 2324 | if json_pm_filter_to_obj(resource, data, sourceNameMap, measObjClassMap, measObjInstIdsMap, measTypesMap) == nil { |
| 2325 | return nil |
| 2326 | } |
| 2327 | start := time.Now() |
| 2328 | j, err := jsoniter.Marshal(&data) |
| 2329 | |
| 2330 | log.Debug("Json marshal obj: ", resource, " time: ", time.Since(start).String()) |
| 2331 | |
| 2332 | if err != nil { |
| 2333 | log.Error("Msg could not be marshalled discarding message, obj: ", resource, ", error details: ", err) |
| 2334 | return nil |
| 2335 | } |
| 2336 | |
| 2337 | log.Debug("Filtered json obj: ", resource, " len: ", len(j)) |
| 2338 | return &j |
| 2339 | } |
| 2340 | |
| 2341 | func json_pm_filter_to_obj(resource string, data *PMJsonFile, sourceNameMap map[string]bool, measObjClassMap map[string]bool, measObjInstIdsMap map[string]bool, measTypesMap map[string]bool) *PMJsonFile { |
| 2342 | filter_req := true |
| 2343 | start := time.Now() |
| 2344 | if len(sourceNameMap) != 0 { |
| 2345 | if !sourceNameMap[data.Event.CommonEventHeader.SourceName] { |
| 2346 | filter_req = false |
| 2347 | return nil |
| 2348 | } |
| 2349 | } |
| 2350 | if filter_req { |
| 2351 | modified := false |
| 2352 | var temp_mil []MeasInfoList |
| 2353 | for _, zz := range data.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList { |
| 2354 | |
| 2355 | check_cntr := false |
| 2356 | var cnt_flags []bool |
| 2357 | if len(measTypesMap) > 0 { |
| 2358 | c_cntr := 0 |
| 2359 | var temp_mtl []string |
| 2360 | for _, v := range zz.MeasTypes.SMeasTypesList { |
| 2361 | if measTypesMap[v] { |
| 2362 | cnt_flags = append(cnt_flags, true) |
| 2363 | c_cntr++ |
| 2364 | temp_mtl = append(temp_mtl, v) |
| 2365 | } else { |
| 2366 | cnt_flags = append(cnt_flags, false) |
| 2367 | } |
| 2368 | } |
| 2369 | if c_cntr > 0 { |
| 2370 | check_cntr = true |
| 2371 | zz.MeasTypes.SMeasTypesList = temp_mtl |
| 2372 | } else { |
| 2373 | modified = true |
| 2374 | continue |
| 2375 | } |
| 2376 | } |
| 2377 | keep := false |
| 2378 | var temp_mvl []MeasValues |
| 2379 | for _, yy := range zz.MeasValuesList { |
| 2380 | keep_class := false |
| 2381 | keep_inst := false |
| 2382 | keep_cntr := false |
| 2383 | |
| 2384 | dna := strings.Split(yy.MeasObjInstID, ",") |
| 2385 | instName := dna[len(dna)-1] |
| 2386 | cls := strings.Split(dna[len(dna)-1], "=")[0] |
| 2387 | |
| 2388 | if len(measObjClassMap) > 0 { |
| 2389 | if measObjClassMap[cls] { |
| 2390 | keep_class = true |
| 2391 | } |
| 2392 | } else { |
| 2393 | keep_class = true |
| 2394 | } |
| 2395 | |
| 2396 | if len(measObjInstIdsMap) > 0 { |
| 2397 | if measObjInstIdsMap[instName] { |
| 2398 | keep_inst = true |
| 2399 | } |
| 2400 | } else { |
| 2401 | keep_inst = true |
| 2402 | } |
| 2403 | |
| 2404 | if check_cntr { |
| 2405 | var temp_mrl []MeasResults |
| 2406 | cnt_p := 1 |
| 2407 | for _, v := range yy.MeasResultsList { |
| 2408 | if cnt_flags[v.P-1] { |
| 2409 | v.P = cnt_p |
| 2410 | cnt_p++ |
| 2411 | temp_mrl = append(temp_mrl, v) |
| 2412 | } |
| 2413 | } |
| 2414 | yy.MeasResultsList = temp_mrl |
| 2415 | keep_cntr = true |
| 2416 | } else { |
| 2417 | keep_cntr = true |
| 2418 | } |
| 2419 | if keep_class && keep_cntr && keep_inst { |
| 2420 | keep = true |
| 2421 | temp_mvl = append(temp_mvl, yy) |
| 2422 | } |
| 2423 | } |
| 2424 | if keep { |
| 2425 | zz.MeasValuesList = temp_mvl |
| 2426 | temp_mil = append(temp_mil, zz) |
| 2427 | modified = true |
| 2428 | } |
| 2429 | |
| 2430 | } |
| 2431 | //Only if modified |
| 2432 | if modified { |
| 2433 | if len(temp_mil) == 0 { |
| 2434 | log.Debug("Msg filtered, nothing found, discarding, obj: ", resource) |
| 2435 | return nil |
| 2436 | } |
| 2437 | data.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList = temp_mil |
| 2438 | } |
| 2439 | } |
| 2440 | log.Debug("Filter: ", time.Since(start).String()) |
| 2441 | return data |
| 2442 | } |
| 2443 | |
| 2444 | // func json_pm_filter(resource string, data *PMJsonFile, sourceNameMap map[string]bool, measObjClassMap map[string]bool, measObjInstIdsMap map[string]bool, measTypesMap map[string]bool) *[]byte { |
| 2445 | |
| 2446 | // filter_req := true |
| 2447 | // start := time.Now() |
| 2448 | // if len(sourceNameMap) != 0 { |
| 2449 | // if !sourceNameMap[data.Event.CommonEventHeader.SourceName] { |
| 2450 | // filter_req = false |
| 2451 | // return nil |
| 2452 | // } |
| 2453 | // } |
| 2454 | // if filter_req { |
| 2455 | // modified := false |
| 2456 | // var temp_mil []MeasInfoList |
| 2457 | // for _, zz := range data.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList { |
| 2458 | |
| 2459 | // check_cntr := false |
| 2460 | // var cnt_flags []bool |
| 2461 | // if len(measTypesMap) > 0 { |
| 2462 | // c_cntr := 0 |
| 2463 | // var temp_mtl []string |
| 2464 | // for _, v := range zz.MeasTypes.SMeasTypesList { |
| 2465 | // if measTypesMap[v] { |
| 2466 | // cnt_flags = append(cnt_flags, true) |
| 2467 | // c_cntr++ |
| 2468 | // temp_mtl = append(temp_mtl, v) |
| 2469 | // } else { |
| 2470 | // cnt_flags = append(cnt_flags, false) |
| 2471 | // } |
| 2472 | // } |
| 2473 | // if c_cntr > 0 { |
| 2474 | // check_cntr = true |
| 2475 | // zz.MeasTypes.SMeasTypesList = temp_mtl |
| 2476 | // } else { |
| 2477 | // modified = true |
| 2478 | // continue |
| 2479 | // } |
| 2480 | // } |
| 2481 | // keep := false |
| 2482 | // var temp_mvl []MeasValues |
| 2483 | // for _, yy := range zz.MeasValuesList { |
| 2484 | // keep_class := false |
| 2485 | // keep_inst := false |
| 2486 | // keep_cntr := false |
| 2487 | |
| 2488 | // dna := strings.Split(yy.MeasObjInstID, ",") |
| 2489 | // instName := dna[len(dna)-1] |
| 2490 | // cls := strings.Split(dna[len(dna)-1], "=")[0] |
| 2491 | |
| 2492 | // if len(measObjClassMap) > 0 { |
| 2493 | // if measObjClassMap[cls] { |
| 2494 | // keep_class = true |
| 2495 | // } |
| 2496 | // } else { |
| 2497 | // keep_class = true |
| 2498 | // } |
| 2499 | |
| 2500 | // if len(measObjInstIdsMap) > 0 { |
| 2501 | // if measObjInstIdsMap[instName] { |
| 2502 | // keep_inst = true |
| 2503 | // } |
| 2504 | // } else { |
| 2505 | // keep_inst = true |
| 2506 | // } |
| 2507 | |
| 2508 | // if check_cntr { |
| 2509 | // var temp_mrl []MeasResults |
| 2510 | // cnt_p := 1 |
| 2511 | // for _, v := range yy.MeasResultsList { |
| 2512 | // if cnt_flags[v.P-1] { |
| 2513 | // v.P = cnt_p |
| 2514 | // cnt_p++ |
| 2515 | // temp_mrl = append(temp_mrl, v) |
| 2516 | // } |
| 2517 | // } |
| 2518 | // yy.MeasResultsList = temp_mrl |
| 2519 | // keep_cntr = true |
| 2520 | // } else { |
| 2521 | // keep_cntr = true |
| 2522 | // } |
| 2523 | // if keep_class && keep_cntr && keep_inst { |
| 2524 | // keep = true |
| 2525 | // temp_mvl = append(temp_mvl, yy) |
| 2526 | // } |
| 2527 | // } |
| 2528 | // if keep { |
| 2529 | // zz.MeasValuesList = temp_mvl |
| 2530 | // temp_mil = append(temp_mil, zz) |
| 2531 | // modified = true |
| 2532 | // } |
| 2533 | |
| 2534 | // } |
| 2535 | // //Only if modified |
| 2536 | // if modified { |
| 2537 | // if len(temp_mil) == 0 { |
| 2538 | // log.Debug("Msg filtered, nothing found, discarding, obj: ", resource) |
| 2539 | // return nil |
| 2540 | // } |
| 2541 | // data.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList = temp_mil |
| 2542 | // } |
| 2543 | // } |
| 2544 | // log.Debug("Filter: ", time.Since(start).String()) |
| 2545 | |
| 2546 | // start = time.Now() |
| 2547 | // j, err := jsoniter.Marshal(&data) |
| 2548 | |
| 2549 | // log.Debug("Json marshal obj: ", resource, " time: ", time.Since(start).String()) |
| 2550 | |
| 2551 | // if err != nil { |
| 2552 | // log.Error("Msg could not be marshalled discarding message, obj: ", resource, ", error details: ", err) |
| 2553 | // return nil |
| 2554 | // } |
| 2555 | |
| 2556 | // log.Debug("Filtered json obj: ", resource, " len: ", len(j)) |
| 2557 | // return &j |
| 2558 | // } |
| 2559 | |
| 2560 | func start_job_json_file_data_influx(type_id string, control_ch chan JobControl, data_in_ch chan *KafkaPayload, objectstore bool) { |
| 2561 | |
| 2562 | log.Info("Type job", type_id, " started") |
| 2563 | log.Debug("influx job ch ", data_in_ch) |
| 2564 | filters := make(map[string]Filter) |
| 2565 | filterParams_list := make(map[string]FilterMaps) |
| 2566 | influx_job_params := make(map[string]InfluxJobParameters) |
| 2567 | var mc *minio.Client |
| 2568 | const mc_id = "mc_" + "start_job_json_file_data_influx" |
| 2569 | running := true |
| 2570 | for { |
| 2571 | select { |
| 2572 | case job_ctl := <-control_ch: |
| 2573 | log.Debug("Type job ", type_id, " new cmd received ", job_ctl.command) |
| 2574 | switch job_ctl.command { |
| 2575 | case "EXIT": |
| 2576 | //ignore cmd - handled by channel signal |
| 2577 | case "ADD-FILTER": |
| 2578 | //TODO: Refactor... |
| 2579 | filters[job_ctl.filter.JobId] = job_ctl.filter |
| 2580 | log.Debug("Type job ", type_id, " updated, topic: ", job_ctl.filter.OutputTopic, " jobid: ", job_ctl.filter.JobId) |
| 2581 | log.Debug(job_ctl.filter) |
| 2582 | tmp_filterParams_list := make(map[string]FilterMaps) |
| 2583 | for k, v := range filterParams_list { |
| 2584 | tmp_filterParams_list[k] = v |
| 2585 | } |
| 2586 | tmp_filterParams_list[job_ctl.filter.JobId] = job_ctl.filter.filter |
| 2587 | filterParams_list = tmp_filterParams_list |
| 2588 | |
| 2589 | tmp_influx_job_params := make(map[string]InfluxJobParameters) |
| 2590 | for k, v := range influx_job_params { |
| 2591 | tmp_influx_job_params[k] = v |
| 2592 | } |
| 2593 | tmp_influx_job_params[job_ctl.filter.JobId] = job_ctl.filter.influxParameters |
| 2594 | influx_job_params = tmp_influx_job_params |
| 2595 | |
| 2596 | case "REMOVE-FILTER": |
| 2597 | //TODO: Refactor... |
| 2598 | log.Debug("Type job ", type_id, " removing job: ", job_ctl.filter.JobId) |
| 2599 | |
| 2600 | tmp_filterParams_list := make(map[string]FilterMaps) |
| 2601 | for k, v := range filterParams_list { |
| 2602 | tmp_filterParams_list[k] = v |
| 2603 | } |
| 2604 | delete(tmp_filterParams_list, job_ctl.filter.JobId) |
| 2605 | filterParams_list = tmp_filterParams_list |
| 2606 | |
| 2607 | tmp_influx_job_params := make(map[string]InfluxJobParameters) |
| 2608 | for k, v := range influx_job_params { |
| 2609 | tmp_influx_job_params[k] = v |
| 2610 | } |
| 2611 | delete(tmp_influx_job_params, job_ctl.filter.JobId) |
| 2612 | influx_job_params = tmp_influx_job_params |
| 2613 | } |
| 2614 | |
| 2615 | case msg := <-data_in_ch: |
| 2616 | log.Debug("Data reveived - influx") |
| 2617 | if msg == nil { |
| 2618 | log.Info("Type job ", type_id, " stopped by channel signal - start_job_xml_file_data") |
| 2619 | |
| 2620 | running = false |
| 2621 | return |
| 2622 | } |
| 2623 | if objectstore { |
| 2624 | if mc == nil { |
| 2625 | var err *error |
| 2626 | mc, err = create_minio_client(mc_id) |
| 2627 | if err != nil { |
| 2628 | log.Debug("Cannot create minio client for type job: ", type_id) |
| 2629 | } |
| 2630 | } |
| 2631 | } |
| 2632 | //TODO: Sort processed file conversions in order (FIFO) |
| 2633 | jobLimiterChan <- struct{}{} |
| 2634 | go run_json_file_data_job_influx(type_id, msg, filterParams_list, influx_job_params, jobLimiterChan, mc, mc_id, objectstore) |
| 2635 | |
| 2636 | case <-time.After(1 * time.Second): |
| 2637 | if !running { |
| 2638 | return |
| 2639 | } |
| 2640 | } |
| 2641 | } |
| 2642 | } |
| 2643 | |
| 2644 | func run_json_file_data_job_influx(type_id string, msg *KafkaPayload, filterList map[string]FilterMaps, influxList map[string]InfluxJobParameters, jobLimiterChan chan struct{}, mc *minio.Client, mc_id string, objectstore bool) { |
| 2645 | |
| 2646 | log.Debug("run_json_file_data_job_influx") |
| 2647 | //Release job limit |
| 2648 | defer func() { |
| 2649 | <-jobLimiterChan |
| 2650 | }() |
| 2651 | |
| 2652 | PrintMemUsage() |
| 2653 | |
| 2654 | var evt_data FileDownloadedEvt |
| 2655 | err := jsoniter.Unmarshal(msg.msg.Value, &evt_data) |
| 2656 | if err != nil { |
| 2657 | log.Error("Cannot parse FileDownloadedEvt for type job: ", type_id, " - discarding message, error details: ", err) |
| 2658 | return |
| 2659 | } |
| 2660 | log.Debug("FileDownloadedEvt for job: ", type_id, " file: ", evt_data.Filename) |
| 2661 | |
| 2662 | var reader io.Reader |
| 2663 | |
| 2664 | //TODO -> config |
| 2665 | //INPUTBUCKET := "json-file-ready" |
| 2666 | INPUTBUCKET := "pm-files-json" |
| 2667 | filename := "" |
| 2668 | if objectstore == false { |
| 2669 | filename = files_volume + "/" + evt_data.Filename |
| 2670 | fi, err := os.Open(filename) |
| 2671 | |
| 2672 | if err != nil { |
| 2673 | log.Error("File: ", filename, "for type job: ", type_id, " - cannot be opened -discarding message - error details: ", err) |
| 2674 | return |
| 2675 | } |
| 2676 | defer fi.Close() |
| 2677 | reader = fi |
| 2678 | } else { |
| 2679 | filename = "/" + evt_data.Filename |
| 2680 | if mc != nil { |
| 2681 | if check_minio_bucket(mc, mc_id, INPUTBUCKET) == false { |
| 2682 | log.Error("Bucket not available for reading in type job: ", type_id, "bucket: ", INPUTBUCKET) |
| 2683 | return |
| 2684 | } |
| 2685 | tctx := context.Background() |
| 2686 | mr, err := mc.GetObject(tctx, INPUTBUCKET, filename, minio.GetObjectOptions{}) |
| 2687 | if err != nil { |
| 2688 | log.Error("Obj: ", filename, "for type job: ", type_id, " - cannot be fetched from bucket: ", INPUTBUCKET, " - discarding message, error details: ", err) |
| 2689 | return |
| 2690 | } |
| 2691 | reader = mr |
| 2692 | defer mr.Close() |
| 2693 | } else { |
| 2694 | log.Error("Cannot get obj: ", filename, "for type job: ", type_id, " from bucket: ", INPUTBUCKET, " - no client") |
| 2695 | return |
| 2696 | } |
| 2697 | } |
| 2698 | |
| 2699 | var data *[]byte |
| 2700 | if strings.HasSuffix(filename, "gz") { |
| 2701 | start := time.Now() |
| 2702 | var buf2 bytes.Buffer |
| 2703 | errb := gunzipReaderToWriter(&buf2, reader) |
| 2704 | if errb != nil { |
| 2705 | log.Error("Cannot decompress file/obj ", filename, "for type job: ", type_id, " - discarding message, error details", errb) |
| 2706 | return |
| 2707 | } |
| 2708 | d := buf2.Bytes() |
| 2709 | data = &d |
| 2710 | log.Debug("Decompress file/obj ", filename, "for type job: ", type_id, " time:", time.Since(start).String()) |
| 2711 | } else { |
| 2712 | |
| 2713 | start := time.Now() |
| 2714 | d, err := io.ReadAll(reader) |
| 2715 | if err != nil { |
| 2716 | log.Error("Cannot read file/obj: ", filename, "for type job: ", type_id, " - discarding message, error details", err) |
| 2717 | return |
| 2718 | } |
| 2719 | data = &d |
| 2720 | |
| 2721 | log.Debug("Read file/obj: ", filename, "for type job: ", type_id, " time: ", time.Since(start).String()) |
| 2722 | } |
| 2723 | // The code was moved to inside the below for loop - a deep copy of PM PMJsonFile is need to keep it outside |
| 2724 | // var pmfile PMJsonFile |
| 2725 | // start := time.Now() |
| 2726 | // err = jsoniter.Unmarshal(*data, &pmfile) |
| 2727 | // log.Debug("Json unmarshal obj: ", filename, " time: ", time.Since(start).String()) |
| 2728 | |
| 2729 | // if err != nil { |
| 2730 | // log.Error("Msg could not be unmarshalled - discarding message, obj: ", filename, " - error details:", err) |
| 2731 | // return |
| 2732 | // } |
| 2733 | for k, v := range filterList { |
| 2734 | |
| 2735 | var pmfile PMJsonFile |
| 2736 | start := time.Now() |
| 2737 | err = jsoniter.Unmarshal(*data, &pmfile) |
| 2738 | log.Debug("Json unmarshal obj: ", filename, " time: ", time.Since(start).String()) |
| 2739 | |
| 2740 | if err != nil { |
| 2741 | log.Error("Msg could not be unmarshalled - discarding message, obj: ", filename, " - error details:", err) |
| 2742 | return |
| 2743 | } |
| 2744 | |
| 2745 | if len(v.sourceNameMap) > 0 || len(v.measObjInstIdsMap) > 0 || len(v.measTypesMap) > 0 || len(v.measObjClassMap) > 0 { |
| 2746 | b := json_pm_filter_to_obj(evt_data.Filename, &pmfile, v.sourceNameMap, v.measObjClassMap, v.measObjInstIdsMap, v.measTypesMap) |
| 2747 | if b == nil { |
| 2748 | log.Info("File/obj ", filename, "for job: ", k, " - empty after filering, discarding message ") |
| 2749 | return |
| 2750 | } |
| 2751 | |
| 2752 | } |
| 2753 | fluxParms := influxList[k] |
| 2754 | log.Debug("Influxdb params: ", fluxParms) |
| 2755 | client := influxdb2.NewClient(fluxParms.DbUrl, fluxParms.DbToken) |
| 2756 | writeAPI := client.WriteAPIBlocking(fluxParms.DbOrg, fluxParms.DbBucket) |
| 2757 | |
| 2758 | // fmt.Println(pmfile.Event.CommonEventHeader.StartEpochMicrosec) |
| 2759 | // tUnix := pmfile.Event.CommonEventHeader.StartEpochMicrosec / int64(time.Millisecond) |
| 2760 | // tUnixNanoRemainder := (pmfile.Event.CommonEventHeader.StartEpochMicrosec % int64(time.Millisecond)) * int64(time.Microsecond) |
| 2761 | // timeT := time.Unix(tUnix, tUnixNanoRemainder) |
| 2762 | // fmt.Println(timeT) |
| 2763 | // fmt.Println("======================") |
| 2764 | for _, zz := range pmfile.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList { |
| 2765 | ctr_names := make(map[string]string) |
| 2766 | for cni, cn := range zz.MeasTypes.SMeasTypesList { |
| 2767 | ctr_names[string(cni+1)] = cn |
| 2768 | } |
| 2769 | for _, xx := range zz.MeasValuesList { |
| 2770 | log.Debug("Measurement: ", xx.MeasObjInstID) |
| 2771 | log.Debug("Suspect flag: ", xx.SuspectFlag) |
| 2772 | p := influxdb2.NewPointWithMeasurement(xx.MeasObjInstID) |
| 2773 | p.AddField("suspectflag", xx.SuspectFlag) |
| 2774 | p.AddField("granularityperiod", pmfile.Event.Perf3GppFields.MeasDataCollection.GranularityPeriod) |
| 2775 | p.AddField("timezone", pmfile.Event.CommonEventHeader.TimeZoneOffset) |
| 2776 | for _, yy := range xx.MeasResultsList { |
| 2777 | pi := string(yy.P) |
| 2778 | pv := yy.SValue |
| 2779 | pn := ctr_names[pi] |
| 2780 | log.Debug("Counter: ", pn, " Value: ", pv) |
| 2781 | pv_i, err := strconv.Atoi(pv) |
| 2782 | if err == nil { |
| 2783 | p.AddField(pn, pv_i) |
| 2784 | } else { |
| 2785 | p.AddField(pn, pv) |
| 2786 | } |
| 2787 | } |
| 2788 | //p.SetTime(timeT) |
| 2789 | log.Debug("StartEpochMicrosec from common event header: ", pmfile.Event.CommonEventHeader.StartEpochMicrosec) |
| 2790 | log.Debug("Set time: ", time.Unix(pmfile.Event.CommonEventHeader.StartEpochMicrosec/1000000, 0)) |
| 2791 | p.SetTime(time.Unix(pmfile.Event.CommonEventHeader.StartEpochMicrosec/1000000, 0)) |
| 2792 | err := writeAPI.WritePoint(context.Background(), p) |
| 2793 | if err != nil { |
| 2794 | log.Error("Db write error: ", err) |
| 2795 | } |
| 2796 | } |
| 2797 | |
| 2798 | } |
| 2799 | client.Close() |
| 2800 | } |
| 2801 | |
| 2802 | } |