blob: 9c3d8a412054e32655d1c0f580177bb71a79e33c [file] [log] [blame]
BjornMagnussonXAc5655db2023-03-17 14:55:16 +01001// ============LICENSE_START===============================================
2// Copyright (C) 2023 Nordix Foundation. All rights reserved.
3// ========================================================================
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15// ============LICENSE_END=================================================
16//
17
18package main
19
20import (
21 "bytes"
22 "compress/gzip"
23 "context"
24 "crypto/tls"
25 "encoding/json"
26 "encoding/xml"
27 "errors"
28 "fmt"
29 "io"
30 "net"
31 "os/signal"
32 "reflect"
33 "strings"
34 "sync"
35 "syscall"
36
37 "net/http"
38 "os"
39 "runtime"
40 "strconv"
41 "time"
42
43 "github.com/google/uuid"
44 "golang.org/x/oauth2/clientcredentials"
45
46 log "github.com/sirupsen/logrus"
47
48 "github.com/gorilla/mux"
49
50 "net/http/pprof"
51
52 "github.com/confluentinc/confluent-kafka-go/kafka"
53 influxdb2 "github.com/influxdata/influxdb-client-go/v2"
54 jsoniter "github.com/json-iterator/go"
55 "github.com/minio/minio-go/v7"
56 "github.com/minio/minio-go/v7/pkg/credentials"
57)
58
59//== Constants ==//
60
61const http_port = 80
62const https_port = 443
63const config_file = "application_configuration.json"
64const server_crt = "server.crt"
65const server_key = "server.key"
66
67const producer_name = "kafka-producer"
68
69const registration_delay_short = 2
70const registration_delay_long = 120
71
72const mutexLocked = 1
73
74const (
75 Init AppStates = iota
76 Running
77 Terminating
78)
79
80const reader_queue_length = 100 //Per type job
81const writer_queue_length = 100 //Per info job
82const parallelism_limiter = 100 //For all jobs
83
84// This are optional - set if using SASL protocol is used towards kafka
85var creds_grant_type = os.Getenv("CREDS_GRANT_TYPE")
86var creds_client_secret = os.Getenv("CREDS_CLIENT_SECRET")
87var creds_client_id = os.Getenv("CREDS_CLIENT_ID")
88var creds_service_url = os.Getenv("AUTH_SERVICE_URL")
89
90//== Types ==//
91
92type AppStates int64
93
94type FilterParameters struct {
95 MeasuredEntityDns []string `json:"measuredEntityDns"`
96 MeasTypes []string `json:"measTypes"`
97 MeasObjClass []string `json:"measObjClass"`
98 MeasObjInstIds []string `json:"measObjInstIds"`
99}
100
101type InfoJobDataType struct {
102 InfoJobData struct {
103 KafkaOutputTopic string `json:"kafkaOutputTopic"`
104
105 DbUrl string `json:"db-url"`
106 DbOrg string `json:"db-org"`
107 DbBucket string `json:"db-bucket"`
108 DbToken string `json:"db-token"`
109
110 FilterParams FilterParameters `json:"filter"`
111 } `json:"info_job_data"`
112 InfoJobIdentity string `json:"info_job_identity"`
113 InfoTypeIdentity string `json:"info_type_identity"`
114 LastUpdated string `json:"last_updated"`
115 Owner string `json:"owner"`
116 TargetURI string `json:"target_uri"`
117}
118
119// Type for an infojob
120type InfoJobRecord struct {
121 job_info InfoJobDataType
122 output_topic string
123
124 statistics *InfoJobStats
125}
126
127// Type for an infojob
128type TypeJobRecord struct {
129 InfoType string
130 InputTopic string
131 data_in_channel chan *KafkaPayload
132 reader_control chan ReaderControl
133 job_control chan JobControl
134 groupId string
135 clientId string
136
137 statistics *TypeJobStats
138}
139
140// Type for controlling the topic reader
141type ReaderControl struct {
142 command string
143}
144
145// Type for controlling the topic writer
146type WriterControl struct {
147 command string
148}
149
150// Type for controlling the job
151type JobControl struct {
152 command string
153 filter Filter
154}
155
156type KafkaPayload struct {
157 msg *kafka.Message
158 topic string
159 jobid string
160}
161
162type FilterMaps struct {
163 sourceNameMap map[string]bool
164 measObjClassMap map[string]bool
165 measObjInstIdsMap map[string]bool
166 measTypesMap map[string]bool
167}
168
169type InfluxJobParameters struct {
170 DbUrl string
171 DbOrg string
172 DbBucket string
173 DbToken string
174}
175
176type Filter struct {
177 JobId string
178 OutputTopic string
179 filter FilterMaps
180
181 influxParameters InfluxJobParameters
182}
183
184// Type for info job statistics
185type InfoJobStats struct {
186 out_msg_cnt int
187 out_data_vol int64
188}
189
190// Type for type job statistics
191type TypeJobStats struct {
192 in_msg_cnt int
193 in_data_vol int64
194}
195
196// == API Datatypes ==//
197// Type for supported data types
198type DataType struct {
199 ID string `json:"id"`
200 KafkaInputTopic string `json:"kafkaInputTopic"`
201 InputJobType string `json:inputJobType`
202 InputJobDefinition struct {
203 KafkaOutputTopic string `json:kafkaOutputTopic`
204 } `json:inputJobDefinition`
205
206 ext_job *[]byte
207 ext_job_created bool
208 ext_job_id string
209}
210
211type DataTypes struct {
212 ProdDataTypes []DataType `json:"types"`
213}
214
215type Minio_buckets struct {
216 Buckets map[string]bool
217}
218
219//== External data types ==//
220
221// // Data type for event xml file download
222type XmlFileEventHeader struct {
223 ProductName string `json:"productName"`
224 VendorName string `json:"vendorName"`
225 Location string `json:"location"`
226 Compression string `json:"compression"`
227 SourceName string `json:"sourceName"`
228 FileFormatType string `json:"fileFormatType"`
229 FileFormatVersion string `json:"fileFormatVersion"`
230 StartEpochMicrosec int64 `json:"startEpochMicrosec"`
231 LastEpochMicrosec int64 `json:"lastEpochMicrosec"`
232 Name string `json:"name"`
233 ChangeIdentifier string `json:"changeIdentifier"`
234 InternalLocation string `json:"internalLocation"`
235 TimeZoneOffset string `json:"timeZoneOffset"`
236 //ObjectStoreBucket string `json:"objectStoreBucket"`
237}
238
239// Data types for input xml file
240type MeasCollecFile struct {
241 XMLName xml.Name `xml:"measCollecFile"`
242 Text string `xml:",chardata"`
243 Xmlns string `xml:"xmlns,attr"`
244 Xsi string `xml:"xsi,attr"`
245 SchemaLocation string `xml:"schemaLocation,attr"`
246 FileHeader struct {
247 Text string `xml:",chardata"`
248 FileFormatVersion string `xml:"fileFormatVersion,attr"`
249 VendorName string `xml:"vendorName,attr"`
250 DnPrefix string `xml:"dnPrefix,attr"`
251 FileSender struct {
252 Text string `xml:",chardata"`
253 LocalDn string `xml:"localDn,attr"`
254 ElementType string `xml:"elementType,attr"`
255 } `xml:"fileSender"`
256 MeasCollec struct {
257 Text string `xml:",chardata"`
258 BeginTime string `xml:"beginTime,attr"`
259 } `xml:"measCollec"`
260 } `xml:"fileHeader"`
261 MeasData struct {
262 Text string `xml:",chardata"`
263 ManagedElement struct {
264 Text string `xml:",chardata"`
265 LocalDn string `xml:"localDn,attr"`
266 SwVersion string `xml:"swVersion,attr"`
267 } `xml:"managedElement"`
268 MeasInfo []struct {
269 Text string `xml:",chardata"`
270 MeasInfoId string `xml:"measInfoId,attr"`
271 Job struct {
272 Text string `xml:",chardata"`
273 JobId string `xml:"jobId,attr"`
274 } `xml:"job"`
275 GranPeriod struct {
276 Text string `xml:",chardata"`
277 Duration string `xml:"duration,attr"`
278 EndTime string `xml:"endTime,attr"`
279 } `xml:"granPeriod"`
280 RepPeriod struct {
281 Text string `xml:",chardata"`
282 Duration string `xml:"duration,attr"`
283 } `xml:"repPeriod"`
284 MeasType []struct {
285 Text string `xml:",chardata"`
286 P string `xml:"p,attr"`
287 } `xml:"measType"`
288 MeasValue []struct {
289 Text string `xml:",chardata"`
290 MeasObjLdn string `xml:"measObjLdn,attr"`
291 R []struct {
292 Text string `xml:",chardata"`
293 P string `xml:"p,attr"`
294 } `xml:"r"`
295 Suspect string `xml:"suspect"`
296 } `xml:"measValue"`
297 } `xml:"measInfo"`
298 } `xml:"measData"`
299 FileFooter struct {
300 Text string `xml:",chardata"`
301 MeasCollec struct {
302 Text string `xml:",chardata"`
303 EndTime string `xml:"endTime,attr"`
304 } `xml:"measCollec"`
305 } `xml:"fileFooter"`
306}
307
308// Data type for json file
309// Splitted in sevreal part to allow add/remove in lists
310type MeasResults struct {
311 P int `json:"p"`
312 SValue string `json:"sValue"`
313}
314
315type MeasValues struct {
316 MeasObjInstID string `json:"measObjInstId"`
317 SuspectFlag string `json:"suspectFlag"`
318 MeasResultsList []MeasResults `json:"measResults"`
319}
320
321type SMeasTypes struct {
322 SMeasType string `json:"sMeasTypesList"`
323}
324
325type MeasInfoList struct {
326 MeasInfoID struct {
327 SMeasInfoID string `json:"sMeasInfoId"`
328 } `json:"measInfoId"`
329 MeasTypes struct {
330 SMeasTypesList []string `json:"sMeasTypesList"`
331 } `json:"measTypes"`
332 MeasValuesList []MeasValues `json:"measValuesList"`
333}
334
335type PMJsonFile struct {
336 Event struct {
337 CommonEventHeader struct {
338 Domain string `json:"domain"`
339 EventID string `json:"eventId"`
340 Sequence int `json:"sequence"`
341 EventName string `json:"eventName"`
342 SourceName string `json:"sourceName"`
343 ReportingEntityName string `json:"reportingEntityName"`
344 Priority string `json:"priority"`
345 StartEpochMicrosec int64 `json:"startEpochMicrosec"`
346 LastEpochMicrosec int64 `json:"lastEpochMicrosec"`
347 Version string `json:"version"`
348 VesEventListenerVersion string `json:"vesEventListenerVersion"`
349 TimeZoneOffset string `json:"timeZoneOffset"`
350 } `json:"commonEventHeader"`
351 Perf3GppFields struct {
352 Perf3GppFieldsVersion string `json:"perf3gppFieldsVersion"`
353 MeasDataCollection struct {
354 GranularityPeriod int `json:"granularityPeriod"`
355 MeasuredEntityUserName string `json:"measuredEntityUserName"`
356 MeasuredEntityDn string `json:"measuredEntityDn"`
357 MeasuredEntitySoftwareVersion string `json:"measuredEntitySoftwareVersion"`
358 SMeasInfoList []MeasInfoList `json:"measInfoList"`
359 } `json:"measDataCollection"`
360 } `json:"perf3gppFields"`
361 } `json:"event"`
362}
363
364// Data type for converted json file message
365type FileDownloadedEvt struct {
366 Filename string `json:"filename"`
367}
368
369//== Variables ==//
370
371var AppState = Init
372
373// Lock for all internal data
374var datalock sync.Mutex
375
376var producer_instance_name string = producer_name
377
378// Keep all info type jobs, key == type id
379var TypeJobs map[string]TypeJobRecord = make(map[string]TypeJobRecord)
380
381// Keep all info jobs, key == job id
382var InfoJobs map[string]InfoJobRecord = make(map[string]InfoJobRecord)
383
384var InfoTypes DataTypes
385
386// Limiter - valid for all jobs
387var jobLimiterChan = make(chan struct{}, parallelism_limiter)
388
389// TODO: Config param?
390var bucket_location = "swe"
391
392var httpclient = &http.Client{}
393
394// == Env variables ==//
395var bootstrapserver = os.Getenv("KAFKA_SERVER")
396var files_volume = os.Getenv("FILES_VOLUME")
397var ics_server = os.Getenv("ICS")
398var self = os.Getenv("SELF")
399var filestore_user = os.Getenv("FILESTORE_USER")
400var filestore_pwd = os.Getenv("FILESTORE_PWD")
401var filestore_server = os.Getenv("FILESTORE_SERVER")
402
403var data_out_channel = make(chan *KafkaPayload, writer_queue_length)
404var writer_control = make(chan WriterControl, 1)
405
406var minio_bucketlist map[string]Minio_buckets = make(map[string]Minio_buckets)
407
408// == Main ==//
409func main() {
410
411 //log.SetLevel(log.InfoLevel)
412 log.SetLevel(log.TraceLevel)
413
414 log.Info("Server starting...")
415
416 if self == "" {
417 log.Panic("Env SELF not configured")
418 }
419 if bootstrapserver == "" {
420 log.Panic("Env KAFKA_SERVER not set")
421 }
422 if ics_server == "" {
423 log.Panic("Env ICS not set")
424 }
425 if os.Getenv("KP") != "" {
426 producer_instance_name = producer_instance_name + "-" + os.Getenv("KP")
427 }
428
429 rtr := mux.NewRouter()
430 rtr.HandleFunc("/callbacks/job/"+producer_instance_name, create_job)
431 rtr.HandleFunc("/callbacks/job/"+producer_instance_name+"/{job_id}", delete_job)
432 rtr.HandleFunc("/callbacks/supervision/"+producer_instance_name, supervise_producer)
433 rtr.HandleFunc("/statistics", statistics)
434 rtr.HandleFunc("/logging/{level}", logging_level)
435 rtr.HandleFunc("/logging", logging_level)
436 rtr.HandleFunc("/", alive)
437
438 //For perf/mem profiling
439 rtr.HandleFunc("/custom_debug_path/profile", pprof.Profile)
440
441 http.Handle("/", rtr)
442
443 http_server := &http.Server{Addr: ":" + strconv.Itoa(http_port), Handler: nil}
444
445 cer, err := tls.LoadX509KeyPair(server_crt, server_key)
446 if err != nil {
447 log.Error("Cannot load key and cert - %v\n", err)
448 return
449 }
450 config := &tls.Config{Certificates: []tls.Certificate{cer}}
451 https_server := &http.Server{Addr: ":" + strconv.Itoa(https_port), TLSConfig: config, Handler: nil}
452
453 //TODO: Make http on/off configurable
454 // Run http
455 go func() {
456 log.Info("Starting http service...")
457 err := http_server.ListenAndServe()
458 if err == http.ErrServerClosed { // graceful shutdown
459 log.Info("http server shutdown...")
460 } else if err != nil {
461 log.Error("http server error: %v\n", err)
462 }
463 }()
464
465 //TODO: Make https on/off configurable
466 // Run https
467 go func() {
468 log.Info("Starting https service...")
469 err := https_server.ListenAndServe()
470 if err == http.ErrServerClosed { // graceful shutdown
471 log.Info("https server shutdown...")
472 } else if err != nil {
473 log.Error("https server error: %v\n", err)
474 }
475 }()
476 check_tcp(strconv.Itoa(http_port))
477 check_tcp(strconv.Itoa(https_port))
478
479 go start_topic_writer(writer_control, data_out_channel)
480
481 //Setup proc for periodic type registration
482 var event_chan = make(chan int) //Channel for stopping the proc
483 go periodic_registration(event_chan)
484
485 //Wait for term/int signal do try to shut down gracefully
486 sigs := make(chan os.Signal, 1)
487 signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
488 go func() {
489 sig := <-sigs
490 fmt.Printf("Received signal %s - application will terminate\n", sig)
491 event_chan <- 0 // Stop periodic registration
492 datalock.Lock()
493 defer datalock.Unlock()
494 AppState = Terminating
495 http_server.Shutdown(context.Background())
496 https_server.Shutdown(context.Background())
497 // Stopping jobs
498 for key, _ := range TypeJobs {
499 log.Info("Stopping type job:", key)
500 for _, dp := range InfoTypes.ProdDataTypes {
501 if key == dp.ID {
502 remove_type_job(dp)
503 }
504 }
505 }
506 }()
507
508 AppState = Running
509
510 //Wait until all go routines has exited
511 runtime.Goexit()
512
513 fmt.Println("main routine exit")
514 fmt.Println("server stopped")
515}
516
517func check_tcp(port string) {
518 log.Info("Checking tcp port: ", port)
519 for true {
520 address := net.JoinHostPort("localhost", port)
521 // 3 second timeout
522 conn, err := net.DialTimeout("tcp", address, 3*time.Second)
523 if err != nil {
524 log.Info("Checking tcp port: ", port, " failed, retrying...")
525 } else {
526 if conn != nil {
527 log.Info("Checking tcp port: ", port, " - OK")
528 _ = conn.Close()
529 return
530 } else {
531 log.Info("Checking tcp port: ", port, " failed, retrying...")
532 }
533 }
534 }
535}
536
537//== Core functions ==//
538
539// Run periodic registration of producers
540func periodic_registration(evtch chan int) {
541 var delay int = 1
542 for {
543 select {
544 case msg := <-evtch:
545 if msg == 0 { // Stop thread
546 return
547 }
548 case <-time.After(time.Duration(delay) * time.Second):
549 ok := register_producer()
550 if ok {
551 delay = registration_delay_long
552 } else {
553 if delay < registration_delay_long {
554 delay += registration_delay_short
555 } else {
556 delay = registration_delay_short
557 }
558 }
559 }
560 }
561}
562
563func register_producer() bool {
564
565 log.Info("Registering producer: ", producer_instance_name)
566
567 file, err := os.ReadFile(config_file)
568 if err != nil {
569 log.Error("Cannot read config file: ", config_file)
570 log.Error("Registering producer: ", producer_instance_name, " - failed")
571 return false
572 }
573 data := DataTypes{}
574 err = jsoniter.Unmarshal([]byte(file), &data)
575 if err != nil {
576 log.Error("Cannot parse config file: ", config_file)
577 log.Error("Registering producer: ", producer_instance_name, " - failed")
578 return false
579 }
580 var new_type_names []string
581
582 for i := 0; i < len(data.ProdDataTypes); i++ {
583 t1 := make(map[string]interface{})
584 t2 := make(map[string]interface{})
585
586 t2["schema"] = "http://json-schema.org/draft-07/schema#"
587 t2["title"] = data.ProdDataTypes[i].ID
588 t2["description"] = data.ProdDataTypes[i].ID
589 t2["type"] = "object"
590
591 t1["info_job_data_schema"] = t2
592
593 json, err := json.Marshal(t1)
594 if err != nil {
595 log.Error("Cannot create json for type: ", data.ProdDataTypes[i].ID)
596 log.Error("Registering producer: ", producer_instance_name, " - failed")
597 return false
598 } else {
599 //TODO: http/https should be configurable
600 ok := send_http_request(json, http.MethodPut, "http://"+ics_server+"/data-producer/v1/info-types/"+data.ProdDataTypes[i].ID, true, creds_grant_type != "")
601 if !ok {
602 log.Error("Cannot register type: ", data.ProdDataTypes[i].ID)
603 log.Error("Registering producer: ", producer_instance_name, " - failed")
604 return false
605 }
606 new_type_names = append(new_type_names, data.ProdDataTypes[i].ID)
607 }
608
609 }
610
611 log.Debug("Registering types: ", new_type_names)
612 m := make(map[string]interface{})
613 m["supported_info_types"] = new_type_names
614 //TODO: http/https should be configurable
615 m["info_job_callback_url"] = "http://" + self + "/callbacks/job/" + producer_instance_name
616 //TODO: http/https should be configurable
617 m["info_producer_supervision_callback_url"] = "http://" + self + "/callbacks/supervision/" + producer_instance_name
618
619 json, err := json.Marshal(m)
620 if err != nil {
621 log.Error("Cannot create json for producer: ", producer_instance_name)
622 log.Error("Registering producer: ", producer_instance_name, " - failed")
623 return false
624 }
625 ok := send_http_request(json, http.MethodPut, "http://"+ics_server+"/data-producer/v1/info-producers/"+producer_instance_name, true, creds_grant_type != "")
626 if !ok {
627 log.Error("Cannot register producer: ", producer_instance_name)
628 log.Error("Registering producer: ", producer_instance_name, " - failed")
629 return false
630 }
631 datalock.Lock()
632 defer datalock.Unlock()
633
634 var current_type_names []string
635 for _, v := range InfoTypes.ProdDataTypes {
636 current_type_names = append(current_type_names, v.ID)
637 if contains_str(new_type_names, v.ID) {
638 //Type exist
639 log.Debug("Type ", v.ID, " exists")
640 create_ext_job(v)
641 } else {
642 //Type is removed
643 log.Info("Removing type job for type: ", v.ID, " Type not in configuration")
644 remove_type_job(v)
645 }
646 }
647
648 for _, v := range data.ProdDataTypes {
649 if contains_str(current_type_names, v.ID) {
650 //Type exist
651 log.Debug("Type ", v.ID, " exists")
652 create_ext_job(v)
653 } else {
654 //Type is new
655 log.Info("Adding type job for type: ", v.ID, " Type added to configuration")
656 start_type_job(v)
657 }
658 }
659
660 InfoTypes = data
661 log.Debug("Datatypes: ", InfoTypes)
662
663 log.Info("Registering producer: ", producer_instance_name, " - OK")
664 return true
665}
666
667func remove_type_job(dp DataType) {
668 log.Info("Removing type job: ", dp.ID)
669 j, ok := TypeJobs[dp.ID]
670 if ok {
671 j.reader_control <- ReaderControl{"EXIT"}
672 }
673
674 if dp.ext_job_created == true {
675 dp.ext_job_id = dp.InputJobType + "_" + generate_uuid_from_type(dp.InputJobType)
676 //TODO: http/https should be configurable
677 ok := send_http_request(*dp.ext_job, http.MethodDelete, "http://"+ics_server+"/data-consumer/v1/info-jobs/"+dp.ext_job_id, true, creds_grant_type != "")
678 if !ok {
679 log.Error("Cannot delete job: ", dp.ext_job_id)
680 }
681 dp.ext_job_created = false
682 dp.ext_job = nil
683 }
684
685}
686
687func start_type_job(dp DataType) {
688 log.Info("Starting type job: ", dp.ID)
689 job_record := TypeJobRecord{}
690
691 job_record.job_control = make(chan JobControl, 1)
692 job_record.reader_control = make(chan ReaderControl, 1)
693 job_record.data_in_channel = make(chan *KafkaPayload, reader_queue_length)
694 job_record.InfoType = dp.ID
695 job_record.InputTopic = dp.KafkaInputTopic
696 job_record.groupId = "kafka-procon-" + dp.ID
697 job_record.clientId = dp.ID + "-" + os.Getenv("KP")
698 var stats TypeJobStats
699 job_record.statistics = &stats
700
701 switch dp.ID {
702 case "xml-file-data-to-filestore":
703 go start_job_xml_file_data(dp.ID, job_record.job_control, job_record.data_in_channel, data_out_channel, "", "pm-files-json")
704 case "xml-file-data":
705 go start_job_xml_file_data(dp.ID, job_record.job_control, job_record.data_in_channel, data_out_channel, files_volume, "")
706 case "json-file-data-from-filestore":
707 go start_job_json_file_data(dp.ID, job_record.job_control, job_record.data_in_channel, data_out_channel, true)
708 case "json-file-data":
709 go start_job_json_file_data(dp.ID, job_record.job_control, job_record.data_in_channel, data_out_channel, false)
710 case "json-file-data-from-filestore-to-influx":
711 go start_job_json_file_data_influx(dp.ID, job_record.job_control, job_record.data_in_channel, true)
712 // case "json-data-to-influx":
713 // go start_job_json_data_influx(dp.ID, job_record.job_control, job_record.data_in_channel)
714
715 default:
716 }
717
718 go start_topic_reader(dp.KafkaInputTopic, dp.ID, job_record.reader_control, job_record.data_in_channel, job_record.groupId, job_record.clientId, &stats)
719
720 TypeJobs[dp.ID] = job_record
721 log.Debug("Type job input type: ", dp.InputJobType)
722 create_ext_job(dp)
723}
724
725func create_ext_job(dp DataType) {
726 if dp.InputJobType != "" {
727 jb := make(map[string]interface{})
728 jb["info_type_id"] = dp.InputJobType
729 jb["job_owner"] = "console" //TODO:
730 jb["status_notification_uri"] = "http://callback:80/post"
731 jb1 := make(map[string]interface{})
732 jb["job_definition"] = jb1
733 jb1["kafkaOutputTopic"] = dp.InputJobDefinition.KafkaOutputTopic
734
735 json, err := json.Marshal(jb)
736 dp.ext_job_created = false
737 dp.ext_job = nil
738 if err != nil {
739 log.Error("Cannot create json for type: ", dp.InputJobType)
740 return
741 }
742
743 dp.ext_job_id = dp.InputJobType + "_" + generate_uuid_from_type(dp.InputJobType)
744 //TODO: http/https should be configurable
745 ok := false
746 for !ok {
747 ok = send_http_request(json, http.MethodPut, "http://"+ics_server+"/data-consumer/v1/info-jobs/"+dp.ext_job_id, true, creds_grant_type != "")
748 if !ok {
749 log.Error("Cannot register job: ", dp.InputJobType)
750 //TODO: Restart after long time?
751 }
752 }
753 log.Debug("Registered job ok: ", dp.InputJobType)
754 dp.ext_job_created = true
755 dp.ext_job = &json
756 }
757}
758
759func remove_info_job(jobid string) {
760 log.Info("Removing info job: ", jobid)
761 filter := Filter{}
762 filter.JobId = jobid
763 jc := JobControl{}
764 jc.command = "REMOVE-FILTER"
765 jc.filter = filter
766 infoJob := InfoJobs[jobid]
767 typeJob := TypeJobs[infoJob.job_info.InfoTypeIdentity]
768 typeJob.job_control <- jc
769 delete(InfoJobs, jobid)
770
771}
772
773// == Helper functions ==//
774
775// Function to check the status of a mutex lock
776func MutexLocked(m *sync.Mutex) bool {
777 state := reflect.ValueOf(m).Elem().FieldByName("state")
778 return state.Int()&mutexLocked == mutexLocked
779}
780
781// Test if slice contains a string
782func contains_str(s []string, e string) bool {
783 for _, a := range s {
784 if a == e {
785 return true
786 }
787 }
788 return false
789}
790
791// Send a http request with json (json may be nil)
792func send_http_request(json []byte, method string, url string, retry bool, useAuth bool) bool {
793
794 // set the HTTP method, url, and request body
795 var req *http.Request
796 var err error
797 if json == nil {
798 req, err = http.NewRequest(method, url, http.NoBody)
799 } else {
800 req, err = http.NewRequest(method, url, bytes.NewBuffer(json))
801 req.Header.Set("Content-Type", "application/json; charset=utf-8")
802 }
803 if err != nil {
804 log.Error("Cannot create http request, method: ", method, " url: ", url)
805 return false
806 }
807
808 if useAuth {
809 token, err := fetch_token()
810 if err != nil {
811 log.Error("Cannot fetch token for http request: ", err)
812 return false
813 }
814 req.Header.Set("Authorization", "Bearer "+token.TokenValue)
815 }
816
817 log.Debug("HTTP request: ", req)
818
819 log.Debug("Sending http request")
820 resp, err2 := httpclient.Do(req)
821 if err2 != nil {
822 log.Error("Http request error: ", err2)
823 log.Error("Cannot send http request method: ", method, " url: ", url)
824 } else {
825 if resp.StatusCode == 200 || resp.StatusCode == 201 || resp.StatusCode == 204 {
826 log.Debug("Accepted http status: ", resp.StatusCode)
827 resp.Body.Close()
828 return true
829 }
830 log.Debug("HTTP resp: ", resp)
831 resp.Body.Close()
832 }
833 return false
834}
835
836// // Send a http request with json (json may be nil)
837// func send_http_request(json []byte, method string, url string, retry bool, useAuth bool) bool {
838
839// // set the HTTP method, url, and request body
840// var req *http.Request
841// var err error
842// if json == nil {
843// req, err = http.NewRequest(method, url, http.NoBody)
844// } else {
845// req, err = http.NewRequest(method, url, bytes.NewBuffer(json))
846// req.Header.Set("Content-Type", "application/json; charset=utf-8")
847// }
848// if err != nil {
849// log.Error("Cannot create http request, method: ", method, " url: ", url)
850// return false
851// }
852
853// if useAuth {
854// token, err := fetch_token()
855// if err != nil {
856// log.Error("Cannot fetch token for http request: ", err)
857// return false
858// }
859// req.Header.Set("Authorization", "Bearer "+token.TokenValue)
860// }
861
862// log.Debug("HTTP request: ", req)
863
864// retries := 1
865// if retry {
866// retries = 5
867// }
868// sleep_time := 1
869// for i := retries; i > 0; i-- {
870// log.Debug("Sending http request")
871// resp, err2 := httpclient.Do(req)
872// if err2 != nil {
873// log.Error("Http request error: ", err2)
874// log.Error("Cannot send http request method: ", method, " url: ", url, " - retries left: ", i-1)
875
876// time.Sleep(time.Duration(sleep_time) * time.Second)
877// sleep_time = 2 * sleep_time
878// } else {
879// if resp.StatusCode == 200 || resp.StatusCode == 201 || resp.StatusCode == 204 {
880// log.Debug("Accepted http status: ", resp.StatusCode)
881// resp.Body.Close()
882// return true
883// }
884// log.Debug("HTTP resp: ", resp)
885// resp.Body.Close()
886// }
887// }
888// return false
889// }
890
891// // Send a http request with json (json may be nil)
892// func send_http_request(json []byte, method string, url string, retry bool, useAuth bool) bool {
893// // initialize http client
894// client := &http.Client{}
895
896// // set the HTTP method, url, and request body
897// var req *http.Request
898// var err error
899// if json == nil {
900// req, err = http.NewRequest(method, url, http.NoBody)
901// } else {
902// req, err = http.NewRequest(method, url, bytes.NewBuffer(json))
903// req.Header.Set("Content-Type", "application/json; charset=utf-8")
904// }
905// if err != nil {
906// log.Error("Cannot create http request method: ", method, " url: ", url)
907// return false
908// }
909
910// useAuth = false
911// if useAuth {
912// token, err := fetch_token()
913// if err != nil {
914// log.Error("Cannot fetch token for http request: ", err)
915// return false
916// }
917// req.Header.Add("Authorization", "Bearer "+token.TokenValue)
918// }
919// log.Debug("HTTP request: ", req)
920
921// b, berr := io.ReadAll(req.Body)
922// if berr == nil {
923// log.Debug("HTTP request body length: ", len(b))
924// } else {
925// log.Debug("HTTP request - cannot check body length: ", berr)
926// }
927// if json == nil {
928// log.Debug("HTTP request null json")
929// } else {
930// log.Debug("HTTP request json: ", string(json))
931// }
932// requestDump, cerr := httputil.DumpRequestOut(req, true)
933// if cerr != nil {
934// fmt.Println(cerr)
935// }
936// fmt.Println(string(requestDump))
937
938// retries := 1
939// if retry {
940// retries = 5
941// }
942// sleep_time := 1
943// for i := retries; i > 0; i-- {
944// resp, err2 := client.Do(req)
945// if err2 != nil {
946// log.Error("Http request error: ", err2)
947// log.Error("Cannot send http request method: ", method, " url: ", url, " - retries left: ", i)
948
949// time.Sleep(time.Duration(sleep_time) * time.Second)
950// sleep_time = 2 * sleep_time
951// } else {
952// if resp.StatusCode == 200 || resp.StatusCode == 201 || resp.StatusCode == 204 {
953// log.Debug("Accepted http status: ", resp.StatusCode)
954// defer resp.Body.Close()
955// return true
956// }
957// }
958// }
959// return false
960// }
961
962func fetch_token() (*kafka.OAuthBearerToken, error) {
963 log.Debug("Get token inline")
964 conf := &clientcredentials.Config{
965 ClientID: creds_client_id,
966 ClientSecret: creds_client_secret,
967 TokenURL: creds_service_url,
968 }
969 token, err := conf.Token(context.Background())
970 if err != nil {
971 log.Warning("Cannot fetch access token: ", err)
972 return nil, err
973 }
974 extensions := map[string]string{}
975 log.Debug("=====================================================")
976 log.Debug("token: ", token)
977 log.Debug("=====================================================")
978 log.Debug("TokenValue: ", token.AccessToken)
979 log.Debug("=====================================================")
980 log.Debug("Expiration: ", token.Expiry)
981 t := token.Expiry
982 // t := token.Expiry.Add(-time.Minute)
983 // log.Debug("Modified expiration: ", t)
984 oauthBearerToken := kafka.OAuthBearerToken{
985 TokenValue: token.AccessToken,
986 Expiration: t,
987 Extensions: extensions,
988 }
989
990 return &oauthBearerToken, nil
991}
992
993// Function to print memory details
994// https://pkg.go.dev/runtime#MemStats
995func PrintMemUsage() {
996 if log.IsLevelEnabled(log.DebugLevel) || log.IsLevelEnabled(log.TraceLevel) {
997 var m runtime.MemStats
998 runtime.ReadMemStats(&m)
999 fmt.Printf("Alloc = %v MiB", bToMb(m.Alloc))
1000 fmt.Printf("\tTotalAlloc = %v MiB", bToMb(m.TotalAlloc))
1001 fmt.Printf("\tSys = %v MiB", bToMb(m.Sys))
1002 fmt.Printf("\tNumGC = %v\n", m.NumGC)
1003 fmt.Printf("HeapSys = %v MiB", bToMb(m.HeapSys))
1004 fmt.Printf("\tStackSys = %v MiB", bToMb(m.StackSys))
1005 fmt.Printf("\tMSpanSys = %v MiB", bToMb(m.MSpanSys))
1006 fmt.Printf("\tMCacheSys = %v MiB", bToMb(m.MCacheSys))
1007 fmt.Printf("\tBuckHashSys = %v MiB", bToMb(m.BuckHashSys))
1008 fmt.Printf("\tGCSys = %v MiB", bToMb(m.GCSys))
1009 fmt.Printf("\tOtherSys = %v MiB\n", bToMb(m.OtherSys))
1010 }
1011}
1012
1013func bToMb(b uint64) uint64 {
1014 return b / 1024 / 1024
1015}
1016
1017func generate_uuid_from_type(s string) string {
1018 if len(s) > 16 {
1019 s = s[:16]
1020 }
1021 for len(s) < 16 {
1022 s = s + "0"
1023 }
1024 b := []byte(s)
1025 b = b[:16]
1026 uuid, _ := uuid.FromBytes(b)
1027 return uuid.String()
1028}
1029
1030// Write gzipped data to a Writer
1031func gzipWrite(w io.Writer, data *[]byte) error {
1032 gw, err1 := gzip.NewWriterLevel(w, gzip.BestSpeed)
1033
1034 if err1 != nil {
1035 return err1
1036 }
1037 defer gw.Close()
1038 _, err2 := gw.Write(*data)
1039 return err2
1040}
1041
1042// Write gunzipped data from Reader to a Writer
1043func gunzipReaderToWriter(w io.Writer, data io.Reader) error {
1044 gr, err1 := gzip.NewReader(data)
1045
1046 if err1 != nil {
1047 return err1
1048 }
1049 defer gr.Close()
1050 data2, err2 := io.ReadAll(gr)
1051 if err2 != nil {
1052 return err2
1053 }
1054 _, err3 := w.Write(data2)
1055 if err3 != nil {
1056 return err3
1057 }
1058 return nil
1059}
1060
1061func create_minio_bucket(mc *minio.Client, client_id string, bucket string) error {
1062 tctx := context.Background()
1063 err := mc.MakeBucket(tctx, bucket, minio.MakeBucketOptions{Region: bucket_location})
1064 if err != nil {
1065 // Check to see if we already own this bucket (which happens if you run this twice)
1066 exists, errBucketExists := mc.BucketExists(tctx, bucket)
1067 if errBucketExists == nil && exists {
1068 log.Debug("Already own bucket:", bucket)
1069 add_bucket(client_id, bucket)
1070 return nil
1071 } else {
1072 log.Error("Cannot create or check bucket ", bucket, " in minio client", err)
1073 return err
1074 }
1075 }
1076 log.Debug("Successfully created bucket: ", bucket)
1077 add_bucket(client_id, bucket)
1078 return nil
1079}
1080
1081func check_minio_bucket(mc *minio.Client, client_id string, bucket string) bool {
1082 ok := bucket_exist(client_id, bucket)
1083 if ok {
1084 return true
1085 }
1086 tctx := context.Background()
1087 exists, err := mc.BucketExists(tctx, bucket)
1088 if err == nil && exists {
1089 log.Debug("Already own bucket:", bucket)
1090 return true
1091 }
1092 log.Error("Bucket does not exist, bucket ", bucket, " in minio client", err)
1093 return false
1094}
1095
1096func add_bucket(minio_id string, bucket string) {
1097 datalock.Lock()
1098 defer datalock.Unlock()
1099
1100 b, ok := minio_bucketlist[minio_id]
1101 if !ok {
1102 b = Minio_buckets{}
1103 b.Buckets = make(map[string]bool)
1104 }
1105 b.Buckets[bucket] = true
1106 minio_bucketlist[minio_id] = b
1107}
1108
1109func bucket_exist(minio_id string, bucket string) bool {
1110 datalock.Lock()
1111 defer datalock.Unlock()
1112
1113 b, ok := minio_bucketlist[minio_id]
1114 if !ok {
1115 return false
1116 }
1117 _, ok = b.Buckets[bucket]
1118 return ok
1119}
1120
1121//== http api functions ==//
1122
1123// create/update job
1124func create_job(w http.ResponseWriter, req *http.Request) {
1125 log.Debug("Create job, http method: ", req.Method)
1126 if req.Method != http.MethodPost {
1127 log.Error("Create job, http method not allowed")
1128 w.WriteHeader(http.StatusMethodNotAllowed)
1129 return
1130 }
1131 ct := req.Header.Get("Content-Type")
1132 if ct != "application/json" {
1133 log.Error("Create job, bad content type")
1134 http.Error(w, "Bad content type", http.StatusBadRequest)
1135 return
1136 }
1137
1138 var t InfoJobDataType
1139 err := json.NewDecoder(req.Body).Decode(&t)
1140 if err != nil {
1141 log.Error("Create job, cannot parse json,", err)
1142 http.Error(w, "Cannot parse json", http.StatusBadRequest)
1143 return
1144 }
1145 log.Debug("Creating job, id: ", t.InfoJobIdentity)
1146 datalock.Lock()
1147 defer datalock.Unlock()
1148
1149 job_id := t.InfoJobIdentity
1150 job_record, job_found := InfoJobs[job_id]
1151 type_job_record, found_type := TypeJobs[t.InfoTypeIdentity]
1152 if !job_found {
1153 if !found_type {
1154 log.Error("Type ", t.InfoTypeIdentity, " does not exist")
1155 http.Error(w, "Type "+t.InfoTypeIdentity+" does not exist", http.StatusBadRequest)
1156 return
1157 }
1158 } else if t.InfoTypeIdentity != job_record.job_info.InfoTypeIdentity {
1159 log.Error("Job cannot change type")
1160 http.Error(w, "Job cannot change type", http.StatusBadRequest)
1161 return
1162 } else if t.InfoJobData.KafkaOutputTopic != job_record.job_info.InfoJobData.KafkaOutputTopic {
1163 log.Error("Job cannot change topic")
1164 http.Error(w, "Job cannot change topic", http.StatusBadRequest)
1165 return
1166 } else if !found_type {
1167 //Should never happen, if the type is removed then job is stopped
1168 log.Error("Type ", t.InfoTypeIdentity, " does not exist")
1169 http.Error(w, "Type "+t.InfoTypeIdentity+" does not exist", http.StatusBadRequest)
1170 return
1171 }
1172
1173 //TODO: Verify that job contains enough parameters...
1174
1175 if !job_found {
1176 job_record = InfoJobRecord{}
1177 job_record.job_info = t
1178 output_topic := t.InfoJobData.KafkaOutputTopic
1179 job_record.output_topic = t.InfoJobData.KafkaOutputTopic
1180 log.Debug("Starting infojob ", job_id, ", type ", t.InfoTypeIdentity, ", output topic", output_topic)
1181
1182 var stats InfoJobStats
1183 job_record.statistics = &stats
1184
1185 filter := Filter{}
1186 filter.JobId = job_id
1187 filter.OutputTopic = job_record.output_topic
1188
1189 jc := JobControl{}
1190
1191 jc.command = "ADD-FILTER"
1192
1193 //TODO: Refactor
1194 if t.InfoTypeIdentity == "json-file-data-from-filestore" || t.InfoTypeIdentity == "json-file-data" || t.InfoTypeIdentity == "json-file-data-from-filestore-to-influx" {
1195 fm := FilterMaps{}
1196 fm.sourceNameMap = make(map[string]bool)
1197 fm.measObjClassMap = make(map[string]bool)
1198 fm.measObjInstIdsMap = make(map[string]bool)
1199 fm.measTypesMap = make(map[string]bool)
1200 if t.InfoJobData.FilterParams.MeasuredEntityDns != nil {
1201 for _, v := range t.InfoJobData.FilterParams.MeasuredEntityDns {
1202 fm.sourceNameMap[v] = true
1203 }
1204 }
1205 if t.InfoJobData.FilterParams.MeasObjClass != nil {
1206 for _, v := range t.InfoJobData.FilterParams.MeasObjClass {
1207 fm.measObjClassMap[v] = true
1208 }
1209 }
1210 if t.InfoJobData.FilterParams.MeasObjInstIds != nil {
1211 for _, v := range t.InfoJobData.FilterParams.MeasObjInstIds {
1212 fm.measObjInstIdsMap[v] = true
1213 }
1214 }
1215 if t.InfoJobData.FilterParams.MeasTypes != nil {
1216 for _, v := range t.InfoJobData.FilterParams.MeasTypes {
1217 fm.measTypesMap[v] = true
1218 }
1219 }
1220 filter.filter = fm
1221 }
1222 if t.InfoTypeIdentity == "json-file-data-from-filestore-to-influx" {
1223 influxparam := InfluxJobParameters{}
1224 influxparam.DbUrl = t.InfoJobData.DbUrl
1225 influxparam.DbOrg = t.InfoJobData.DbOrg
1226 influxparam.DbBucket = t.InfoJobData.DbBucket
1227 influxparam.DbToken = t.InfoJobData.DbToken
1228 filter.influxParameters = influxparam
1229 }
1230
1231 jc.filter = filter
1232 InfoJobs[job_id] = job_record
1233
1234 type_job_record.job_control <- jc
1235
1236 } else {
1237 //TODO
1238 //Update job
1239 }
1240}
1241
1242// delete job
1243func delete_job(w http.ResponseWriter, req *http.Request) {
1244 if req.Method != http.MethodDelete {
1245 w.WriteHeader(http.StatusMethodNotAllowed)
1246 return
1247 }
1248 datalock.Lock()
1249 defer datalock.Unlock()
1250
1251 vars := mux.Vars(req)
1252
1253 if id, ok := vars["job_id"]; ok {
1254 if _, ok := InfoJobs[id]; ok {
1255 remove_info_job(id)
1256 w.WriteHeader(http.StatusNoContent)
1257 log.Info("Job ", id, " deleted")
1258 return
1259 }
1260 }
1261 w.WriteHeader(http.StatusNotFound)
1262}
1263
1264// job supervision
1265func supervise_job(w http.ResponseWriter, req *http.Request) {
1266 if req.Method != http.MethodGet {
1267 w.WriteHeader(http.StatusMethodNotAllowed)
1268 return
1269 }
1270 datalock.Lock()
1271 defer datalock.Unlock()
1272
1273 vars := mux.Vars(req)
1274
1275 log.Debug("Supervising, job: ", vars["job_id"])
1276 if id, ok := vars["job_id"]; ok {
1277 if _, ok := InfoJobs[id]; ok {
1278 log.Debug("Supervision ok, job", id)
1279 return
1280 }
1281 }
1282 w.WriteHeader(http.StatusNotFound)
1283}
1284
1285// producer supervision
1286func supervise_producer(w http.ResponseWriter, req *http.Request) {
1287 if req.Method != http.MethodGet {
1288 w.WriteHeader(http.StatusMethodNotAllowed)
1289 return
1290 }
1291
1292 w.WriteHeader(http.StatusOK)
1293}
1294
1295// producer statictics, all jobs
1296func statistics(w http.ResponseWriter, req *http.Request) {
1297 if req.Method != http.MethodGet {
1298 w.WriteHeader(http.StatusMethodNotAllowed)
1299 return
1300 }
1301 m := make(map[string]interface{})
1302 log.Debug("producer statictics, locked? ", MutexLocked(&datalock))
1303 datalock.Lock()
1304 defer datalock.Unlock()
1305 req.Header.Set("Content-Type", "application/json; charset=utf-8")
1306 m["number-of-jobs"] = len(InfoJobs)
1307 m["number-of-types"] = len(InfoTypes.ProdDataTypes)
1308 qm := make(map[string]interface{})
1309 m["jobs"] = qm
1310 for key, elem := range InfoJobs {
1311 jm := make(map[string]interface{})
1312 qm[key] = jm
1313 jm["type"] = elem.job_info.InfoTypeIdentity
1314 typeJob := TypeJobs[elem.job_info.InfoTypeIdentity]
1315 jm["groupId"] = typeJob.groupId
1316 jm["clientID"] = typeJob.clientId
1317 jm["input topic"] = typeJob.InputTopic
1318 jm["input queue length - job ("+fmt.Sprintf("%v", cap(typeJob.data_in_channel))+")"] = len(typeJob.data_in_channel)
1319 jm["output topic"] = elem.output_topic
1320 jm["output queue length - producer ("+fmt.Sprintf("%v", cap(data_out_channel))+")"] = len(data_out_channel)
1321 jm["msg_in (type)"] = typeJob.statistics.in_msg_cnt
1322 jm["msg_out (job)"] = elem.statistics.out_msg_cnt
1323
1324 }
1325 json, err := json.Marshal(m)
1326 if err != nil {
1327 w.WriteHeader(http.StatusInternalServerError)
1328 log.Error("Cannot marshal statistics json")
1329 return
1330 }
1331 _, err = w.Write(json)
1332 if err != nil {
1333 w.WriteHeader(http.StatusInternalServerError)
1334 log.Error("Cannot send statistics json")
1335 return
1336 }
1337}
1338
1339// Simple alive check
1340func alive(w http.ResponseWriter, req *http.Request) {
1341 //Alive check
1342}
1343
1344// Get/Set logging level
1345func logging_level(w http.ResponseWriter, req *http.Request) {
1346 vars := mux.Vars(req)
1347 if level, ok := vars["level"]; ok {
1348 if req.Method == http.MethodPut {
1349 switch level {
1350 case "trace":
1351 log.SetLevel(log.TraceLevel)
1352 case "debug":
1353 log.SetLevel(log.DebugLevel)
1354 case "info":
1355 log.SetLevel(log.InfoLevel)
1356 case "warn":
1357 log.SetLevel(log.WarnLevel)
1358 case "error":
1359 log.SetLevel(log.ErrorLevel)
1360 case "fatal":
1361 log.SetLevel(log.FatalLevel)
1362 case "panic":
1363 log.SetLevel(log.PanicLevel)
1364 default:
1365 w.WriteHeader(http.StatusNotFound)
1366 }
1367 } else {
1368 w.WriteHeader(http.StatusMethodNotAllowed)
1369 }
1370 } else {
1371 if req.Method == http.MethodGet {
1372 msg := "none"
1373 if log.IsLevelEnabled(log.PanicLevel) {
1374 msg = "panic"
1375 } else if log.IsLevelEnabled(log.FatalLevel) {
1376 msg = "fatal"
1377 } else if log.IsLevelEnabled(log.ErrorLevel) {
1378 msg = "error"
1379 } else if log.IsLevelEnabled(log.WarnLevel) {
1380 msg = "warn"
1381 } else if log.IsLevelEnabled(log.InfoLevel) {
1382 msg = "info"
1383 } else if log.IsLevelEnabled(log.DebugLevel) {
1384 msg = "debug"
1385 } else if log.IsLevelEnabled(log.TraceLevel) {
1386 msg = "trace"
1387 }
1388 w.Header().Set("Content-Type", "application/text")
1389 w.Write([]byte(msg))
1390 } else {
1391 w.WriteHeader(http.StatusMethodNotAllowed)
1392 }
1393 }
1394}
1395
1396func start_topic_reader(topic string, type_id string, control_ch chan ReaderControl, data_ch chan *KafkaPayload, gid string, cid string, stats *TypeJobStats) {
1397
1398 log.Info("Topic reader starting, topic: ", topic, " for type: ", type_id)
1399
1400 topic_ok := false
1401 var c *kafka.Consumer = nil
1402 running := true
1403
1404 for topic_ok == false {
1405
1406 select {
1407 case reader_ctrl := <-control_ch:
1408 if reader_ctrl.command == "EXIT" {
1409 log.Info("Topic reader on topic: ", topic, " for type: ", type_id, " - stopped")
1410 //TODO: Stop consumer if present?
1411 data_ch <- nil //Signal to job handler
1412 running = false
1413 return
1414 }
1415 case <-time.After(1 * time.Second):
1416 if !running {
1417 return
1418 }
1419 if c == nil {
1420 c = create_kafka_consumer(type_id, gid, cid)
1421 if c == nil {
1422 log.Info("Cannot start consumer on topic: ", topic, " for type: ", type_id, " - retrying")
1423 } else {
1424 log.Info("Consumer started on topic: ", topic, " for type: ", type_id)
1425 }
1426 }
1427 if c != nil && topic_ok == false {
1428 err := c.SubscribeTopics([]string{topic}, nil)
1429 if err != nil {
1430 log.Info("Topic reader cannot start subscribing on topic: ", topic, " for type: ", type_id, " - retrying -- error details: ", err)
1431 } else {
1432 log.Info("Topic reader subscribing on topic: ", topic, " for type: ", type_id)
1433 topic_ok = true
1434 }
1435 }
1436 }
1437 }
1438 log.Info("Topic reader ready on topic: ", topic, " for type: ", type_id)
1439
1440 var event_chan = make(chan int)
1441 go func() {
1442 for {
1443 select {
1444 case evt := <-c.Events():
1445 switch evt.(type) {
1446 case kafka.OAuthBearerTokenRefresh:
1447 log.Debug("New consumer token needed: ", evt)
1448 token, err := fetch_token()
1449 if err != nil {
1450 log.Warning("Cannot cannot fetch token: ", err)
1451 c.SetOAuthBearerTokenFailure(err.Error())
1452 } else {
1453 setTokenError := c.SetOAuthBearerToken(*token)
1454 if setTokenError != nil {
1455 log.Warning("Cannot cannot set token: ", setTokenError)
1456 c.SetOAuthBearerTokenFailure(setTokenError.Error())
1457 }
1458 }
1459 default:
1460 //TODO: Handle these?
1461 log.Debug("Dumping topic reader event on topic: ", topic, " for type: ", type_id, " evt: ", evt.String())
1462 }
1463
1464 case msg := <-event_chan:
1465 if msg == 0 {
1466 return
1467 }
1468 case <-time.After(1 * time.Second):
1469 if !running {
1470 return
1471 }
1472 }
1473 }
1474 }()
1475
1476 go func() {
1477 for {
1478 //maxDur := 1 * time.Second
1479 for {
1480 select {
1481 case reader_ctrl := <-control_ch:
1482 if reader_ctrl.command == "EXIT" {
1483 event_chan <- 0
1484 log.Debug("Topic reader on topic: ", topic, " for type: ", type_id, " - stopped")
1485 data_ch <- nil //Signal to job handler
1486 defer c.Close()
1487 return
1488 }
1489 default:
1490
1491 ev := c.Poll(1000)
1492 if ev == nil {
1493 log.Debug("Topic Reader for type: ", type_id, " Nothing to consume on topic: ", topic)
1494 continue
1495 }
1496 switch e := ev.(type) {
1497 case *kafka.Message:
1498 var kmsg KafkaPayload
1499 kmsg.msg = e
1500
1501 c.Commit() //TODO: Ok?
1502
1503 //TODO: Check for exception
1504 data_ch <- &kmsg
1505 stats.in_msg_cnt++
1506 log.Debug("Reader msg: ", &kmsg)
1507 log.Debug("Reader - data_ch ", data_ch)
1508 case kafka.Error:
1509 fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
1510
1511 case kafka.OAuthBearerTokenRefresh:
1512 log.Debug("New consumer token needed: ", ev)
1513 token, err := fetch_token()
1514 if err != nil {
1515 log.Warning("Cannot cannot fetch token: ", err)
1516 c.SetOAuthBearerTokenFailure(err.Error())
1517 } else {
1518 setTokenError := c.SetOAuthBearerToken(*token)
1519 if setTokenError != nil {
1520 log.Warning("Cannot cannot set token: ", setTokenError)
1521 c.SetOAuthBearerTokenFailure(setTokenError.Error())
1522 }
1523 }
1524 default:
1525 fmt.Printf("Ignored %v\n", e)
1526 }
1527
1528 // orig code
1529 // msg, err := c.ReadMessage(maxDur)
1530 // if err == nil {
1531 // var kmsg KafkaPayload
1532 // kmsg.msg = msg
1533
1534 // c.Commit() //TODO: Ok?
1535
1536 // //TODO: Check for exception
1537 // data_ch <- &kmsg
1538 // stats.in_msg_cnt++
1539 // log.Debug("Reader msg: ", &kmsg)
1540 // log.Debug("Reader - data_ch ", data_ch)
1541 // } else {
1542 // log.Debug("Topic Reader for type: ", type_id, " Nothing to consume on topic: ", topic, ", reason: ", err)
1543 // }
1544
1545 }
1546 }
1547 }
1548 }()
1549}
1550
1551func start_topic_writer(control_ch chan WriterControl, data_ch chan *KafkaPayload) {
1552
1553 var kafka_producer *kafka.Producer
1554
1555 running := true
1556 log.Info("Topic writer starting")
1557
1558 // Wait for kafka producer to become available - and be prepared to exit the writer
1559 for kafka_producer == nil {
1560 select {
1561 case writer_ctl := <-control_ch:
1562 if writer_ctl.command == "EXIT" {
1563 //ignore cmd
1564 }
1565 default:
1566 kafka_producer = start_producer()
1567 if kafka_producer == nil {
1568 log.Debug("Could not start kafka producer - retrying")
1569 time.Sleep(1 * time.Second)
1570 } else {
1571 log.Debug("Kafka producer started")
1572 //defer kafka_producer.Close()
1573 }
1574 }
1575 }
1576
1577 var event_chan = make(chan int)
1578 go func() {
1579 for {
1580 select {
1581 case evt := <-kafka_producer.Events():
1582 //TODO: Handle this? Probably yes, look if the msg was delivered and if not, resend?
1583 switch evt.(type) {
1584 case *kafka.Message:
1585 m := evt.(*kafka.Message)
1586
1587 if m.TopicPartition.Error != nil {
1588 log.Debug("Dumping topic writer event, failed: ", m.TopicPartition.Error)
1589 } else {
1590 log.Debug("Dumping topic writer event, message to topic: ", *m.TopicPartition.Topic, " at offset: ", m.TopicPartition.Offset, " at partition: ", m.TopicPartition.Partition)
1591 }
1592 case kafka.Error:
1593 log.Debug("Dumping topic writer event, error: ", evt)
1594 case kafka.OAuthBearerTokenRefresh:
1595 log.Debug("New producer token needed: ", evt)
1596 token, err := fetch_token()
1597 if err != nil {
1598 log.Warning("Cannot cannot fetch token: ", err)
1599 kafka_producer.SetOAuthBearerTokenFailure(err.Error())
1600 } else {
1601 setTokenError := kafka_producer.SetOAuthBearerToken(*token)
1602 if setTokenError != nil {
1603 log.Warning("Cannot cannot set token: ", setTokenError)
1604 kafka_producer.SetOAuthBearerTokenFailure(setTokenError.Error())
1605 }
1606 }
1607 default:
1608 log.Debug("Dumping topic writer event, unknown: ", evt)
1609 }
1610
1611 case msg := <-event_chan:
1612 if msg == 0 {
1613 return
1614 }
1615 case <-time.After(1 * time.Second):
1616 if !running {
1617 return
1618 }
1619 }
1620 }
1621 }()
1622 go func() {
1623 for {
1624 select {
1625 case writer_ctl := <-control_ch:
1626 if writer_ctl.command == "EXIT" {
1627 // ignore - wait for channel signal
1628 }
1629
1630 case kmsg := <-data_ch:
1631 if kmsg == nil {
1632 event_chan <- 0
1633 // TODO: Close producer?
1634 log.Info("Topic writer stopped by channel signal - start_topic_writer")
1635 defer kafka_producer.Close()
1636 return
1637 }
1638
1639 retries := 10
1640 msg_ok := false
1641 var err error
1642 for retry := 1; retry <= retries && msg_ok == false; retry++ {
1643 err = kafka_producer.Produce(&kafka.Message{
1644 TopicPartition: kafka.TopicPartition{Topic: &kmsg.topic, Partition: kafka.PartitionAny},
1645 Value: kmsg.msg.Value, Key: kmsg.msg.Key}, nil)
1646
1647 if err == nil {
1648 incr_out_msg_cnt(kmsg.jobid)
1649 msg_ok = true
1650 log.Debug("Topic writer, msg sent ok on topic: ", kmsg.topic)
1651 } else {
1652 log.Info("Topic writer failed to send message on topic: ", kmsg.topic, " - Retrying. Error details: ", err)
1653 time.Sleep(time.Duration(retry) * time.Second)
1654 }
1655 }
1656 if !msg_ok {
1657 //TODO: Retry sending msg?
1658 log.Error("Topic writer failed to send message on topic: ", kmsg.topic, " - Msg discarded. Error details: ", err)
1659 }
1660 case <-time.After(1000 * time.Millisecond):
1661 if !running {
1662 return
1663 }
1664 }
1665 }
1666 }()
1667}
1668
1669func create_kafka_consumer(type_id string, gid string, cid string) *kafka.Consumer {
1670 var cm kafka.ConfigMap
1671 if creds_grant_type == "" {
1672 log.Info("Creating kafka SASL plain text consumer for type: ", type_id)
1673 cm = kafka.ConfigMap{
1674 "bootstrap.servers": bootstrapserver,
1675 "group.id": gid,
1676 "client.id": cid,
1677 "auto.offset.reset": "latest",
1678 "enable.auto.commit": false,
1679 //"auto.commit.interval.ms": 5000,
1680 }
1681 } else {
1682 log.Info("Creating kafka SASL plain text consumer for type: ", type_id)
1683 cm = kafka.ConfigMap{
1684 "bootstrap.servers": bootstrapserver,
1685 "group.id": gid,
1686 "client.id": cid,
1687 "auto.offset.reset": "latest",
1688 "enable.auto.commit": false,
1689 "sasl.mechanism": "OAUTHBEARER",
1690 "security.protocol": "SASL_PLAINTEXT",
1691 }
1692 }
1693 c, err := kafka.NewConsumer(&cm)
1694
1695 //TODO: How to handle autocommit or commit message by message
1696 //TODO: Make arg to kafka configurable
1697
1698 if err != nil {
1699 log.Error("Cannot create kafka consumer for type: ", type_id, ", error details: ", err)
1700 return nil
1701 }
1702
1703 //c.Commit()
1704 log.Info("Created kafka consumer for type: ", type_id, " OK")
1705 return c
1706}
1707
1708// Start kafka producer
1709func start_producer() *kafka.Producer {
1710 log.Info("Creating kafka producer")
1711
1712 var cm kafka.ConfigMap
1713 if creds_grant_type == "" {
1714 log.Info("Creating kafka SASL plain text producer")
1715 cm = kafka.ConfigMap{
1716 "bootstrap.servers": bootstrapserver,
1717 }
1718 } else {
1719 log.Info("Creating kafka SASL plain text producer")
1720 cm = kafka.ConfigMap{
1721 "bootstrap.servers": bootstrapserver,
1722 "sasl.mechanism": "OAUTHBEARER",
1723 "security.protocol": "SASL_PLAINTEXT",
1724 }
1725 }
1726
1727 p, err := kafka.NewProducer(&cm)
1728 if err != nil {
1729 log.Error("Cannot create kafka producer,", err)
1730 return nil
1731 }
1732 return p
1733}
1734
1735func start_adminclient() *kafka.AdminClient {
1736 log.Info("Creating kafka admin client")
1737 a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": bootstrapserver})
1738 if err != nil {
1739 log.Error("Cannot create kafka admin client,", err)
1740 return nil
1741 }
1742 return a
1743}
1744
1745func create_minio_client(id string) (*minio.Client, *error) {
1746 log.Debug("Get minio client")
1747 minio_client, err := minio.New(filestore_server, &minio.Options{
1748 Secure: false,
1749 Creds: credentials.NewStaticV4(filestore_user, filestore_pwd, ""),
1750 })
1751 if err != nil {
1752 log.Error("Cannot create minio client, ", err)
1753 return nil, &err
1754 }
1755 return minio_client, nil
1756}
1757
1758func incr_out_msg_cnt(jobid string) {
1759 j, ok := InfoJobs[jobid]
1760 if ok {
1761 j.statistics.out_msg_cnt++
1762 }
1763}
1764
1765func start_job_xml_file_data(type_id string, control_ch chan JobControl, data_in_ch chan *KafkaPayload, data_out_channel chan *KafkaPayload, fvolume string, fsbucket string) {
1766
1767 log.Info("Type job", type_id, " started")
1768
1769 filters := make(map[string]Filter)
1770 topic_list := make(map[string]string)
1771 var mc *minio.Client
1772 const mc_id = "mc_" + "start_job_xml_file_data"
1773 running := true
1774 for {
1775 select {
1776 case job_ctl := <-control_ch:
1777 log.Debug("Type job ", type_id, " new cmd received ", job_ctl.command)
1778 switch job_ctl.command {
1779 case "EXIT":
1780 //ignore cmd - handled by channel signal
1781 case "ADD-FILTER":
1782 filters[job_ctl.filter.JobId] = job_ctl.filter
1783 log.Debug("Type job ", type_id, " updated, topic: ", job_ctl.filter.OutputTopic, " jobid: ", job_ctl.filter.JobId)
1784
1785 tmp_topic_list := make(map[string]string)
1786 for k, v := range topic_list {
1787 tmp_topic_list[k] = v
1788 }
1789 tmp_topic_list[job_ctl.filter.JobId] = job_ctl.filter.OutputTopic
1790 topic_list = tmp_topic_list
1791 case "REMOVE-FILTER":
1792 log.Debug("Type job ", type_id, " removing job: ", job_ctl.filter.JobId)
1793
1794 tmp_topic_list := make(map[string]string)
1795 for k, v := range topic_list {
1796 tmp_topic_list[k] = v
1797 }
1798 delete(tmp_topic_list, job_ctl.filter.JobId)
1799 topic_list = tmp_topic_list
1800 }
1801
1802 case msg := <-data_in_ch:
1803 if msg == nil {
1804 log.Info("Type job ", type_id, " stopped by channel signal - start_job_xml_file_data")
1805
1806 running = false
1807 return
1808 }
1809 if fsbucket != "" && fvolume == "" {
1810 if mc == nil {
1811 var err *error
1812 mc, err = create_minio_client(mc_id)
1813 if err != nil {
1814 log.Debug("Cannot create minio client for type job: ", type_id)
1815 }
1816 }
1817 }
1818 //TODO: Sort processed file conversions in order (FIFO)
1819 jobLimiterChan <- struct{}{}
1820 go run_xml_job(type_id, msg, "gz", data_out_channel, topic_list, jobLimiterChan, fvolume, fsbucket, mc, mc_id)
1821
1822 case <-time.After(1 * time.Second):
1823 if !running {
1824 return
1825 }
1826 }
1827 }
1828 //}()
1829}
1830
1831func run_xml_job(type_id string, msg *KafkaPayload, outputCompression string, data_out_channel chan *KafkaPayload, topic_list map[string]string, jobLimiterChan chan struct{}, fvolume string, fsbucket string, mc *minio.Client, mc_id string) {
1832 defer func() {
1833 <-jobLimiterChan
1834 }()
1835 PrintMemUsage()
1836
1837 if fvolume == "" && fsbucket == "" {
1838 log.Error("Type job: ", type_id, " cannot run, neither file volume nor filestore is set - discarding message")
1839 return
1840 } else if (fvolume != "") && (fsbucket != "") {
1841 log.Error("Type job: ", type_id, " cannot run with output to both file volume and filestore bucket - discarding message")
1842 return
1843 }
1844
1845 start := time.Now()
1846 var evt_data XmlFileEventHeader
1847
1848 err := jsoniter.Unmarshal(msg.msg.Value, &evt_data)
1849 if err != nil {
1850 log.Error("Cannot parse XmlFileEventHeader for type job: ", type_id, " - discarding message, error details", err)
1851 return
1852 }
1853 log.Debug("Unmarshal file-collect event for type job: ", type_id, " time: ", time.Since(start).String())
1854
1855 var reader io.Reader
1856
1857 //TODO -> config
1858 INPUTBUCKET := "ropfiles"
1859
1860 filename := ""
1861 if fvolume != "" {
1862 filename = fvolume + "/" + evt_data.Name
1863 fi, err := os.Open(filename)
1864
1865 if err != nil {
1866 log.Error("File ", filename, " - cannot be opened for type job: ", type_id, " - discarding message, error details: ", err)
1867 return
1868 }
1869 defer fi.Close()
1870 reader = fi
1871 //} else if evt_data.ObjectStoreBucket != "" {
1872 } else {
1873 filename = evt_data.Name
1874 if mc != nil {
1875 tctx := context.Background()
1876 mr, err := mc.GetObject(tctx, INPUTBUCKET, filename, minio.GetObjectOptions{})
1877 if err != nil {
1878 log.Error("Cannot get: ", filename, " for type job: ", type_id, " - discarding message, error details: ", err)
1879 return
1880 }
1881 if mr == nil {
1882 log.Error("Cannot get: ", filename, " for type job: ", type_id, " - minio get object returned null reader - discarding message, error details: ", err)
1883 return
1884 }
1885 reader = mr
1886 defer mr.Close()
1887 } else {
1888 log.Error("Cannot get: ", filename, " for type job: ", type_id, " - no minio client - discarding message")
1889 return
1890 }
1891 }
1892
1893 if reader == nil {
1894 log.Error("Cannot get: ", filename, " - null reader")
1895 return
1896 }
1897 var file_bytes []byte
1898 if strings.HasSuffix(filename, "gz") {
1899 start := time.Now()
1900 var buf3 bytes.Buffer
1901 errb := gunzipReaderToWriter(&buf3, reader)
1902 if errb != nil {
1903 log.Error("Cannot gunzip file ", filename, " - discarding message, ", errb)
1904 return
1905 }
1906 file_bytes = buf3.Bytes()
1907 log.Debug("Gunzip file: ", filename, "time:", time.Since(start).String(), "len: ", len(file_bytes))
1908
1909 } else {
1910 var buf3 bytes.Buffer
1911 _, err2 := io.Copy(&buf3, reader)
1912 if err2 != nil {
1913 log.Error("File ", filename, " - cannot be read, discarding message, ", err)
1914 return
1915 }
1916 file_bytes = buf3.Bytes()
1917 }
1918 start = time.Now()
1919 b, err := xml_to_json_conv(&file_bytes, &evt_data)
1920 if err != nil {
1921 log.Error("Cannot convert file ", evt_data.Name, " - discarding message, ", err)
1922 return
1923 }
1924 log.Debug("Converted file to json: ", filename, " time", time.Since(start).String(), "len;", len(b))
1925
1926 new_fn := evt_data.Name + os.Getenv("KP") + ".json"
1927 if outputCompression == "gz" {
1928 new_fn = new_fn + ".gz"
1929 start = time.Now()
1930 var buf bytes.Buffer
1931 err = gzipWrite(&buf, &b)
1932 if err != nil {
1933 log.Error("Cannot gzip file ", new_fn, " - discarding message, ", err)
1934 return
1935 }
1936 b = buf.Bytes()
1937 log.Debug("Gzip file: ", new_fn, " time: ", time.Since(start).String(), "len:", len(file_bytes))
1938
1939 }
1940 start = time.Now()
1941
1942 if fvolume != "" {
1943 //Store on disk
1944 err = os.WriteFile(fvolume+"/"+new_fn, b, 0644)
1945 if err != nil {
1946 log.Error("Cannot write file ", new_fn, " - discarding message,", err)
1947 return
1948 }
1949 log.Debug("Write file to disk: "+new_fn, "time: ", time.Since(start).String(), " len: ", len(file_bytes))
1950 } else if fsbucket != "" {
1951 // Store in minio
1952 objectName := new_fn
1953 if mc != nil {
1954
1955 contentType := "application/json"
1956 if strings.HasSuffix(objectName, ".gz") {
1957 contentType = "application/gzip"
1958 }
1959
1960 // Upload the xml file with PutObject
1961 r := bytes.NewReader(b)
1962 tctx := context.Background()
1963 if check_minio_bucket(mc, mc_id, fsbucket) == false {
1964 err := create_minio_bucket(mc, mc_id, fsbucket)
1965 if err != nil {
1966 log.Error("Cannot create bucket: ", fsbucket, ", ", err)
1967 return
1968 }
1969 }
1970 ok := false
1971 for i := 1; i < 64 && ok == false; i = i * 2 {
1972 info, err := mc.PutObject(tctx, fsbucket, objectName, r, int64(len(b)), minio.PutObjectOptions{ContentType: contentType})
1973 if err != nil {
1974
1975 if i == 1 {
1976 log.Warn("Cannot upload (first attempt): ", objectName, ", ", err)
1977 } else {
1978 log.Warn("Cannot upload (retry): ", objectName, ", ", err)
1979 }
1980 time.Sleep(time.Duration(i) * time.Second)
1981 } else {
1982 log.Debug("Store ", objectName, " in filestore, time: ", time.Since(start).String())
1983 log.Debug("Successfully uploaded: ", objectName, " of size:", info.Size)
1984 ok = true
1985 }
1986 }
1987 if !ok {
1988 log.Error("Cannot upload : ", objectName, ", ", err)
1989 }
1990 } else {
1991 log.Error("Cannot upload: ", objectName, ", no client")
1992 }
1993 }
1994
1995 start = time.Now()
1996 if fvolume == "" {
1997 var fde FileDownloadedEvt
1998 fde.Filename = new_fn
1999 j, err := jsoniter.Marshal(fde)
2000
2001 if err != nil {
2002 log.Error("Cannot marshal FileDownloadedEvt - discarding message, ", err)
2003 return
2004 }
2005 msg.msg.Value = j
2006 } else {
2007 var fde FileDownloadedEvt
2008 fde.Filename = new_fn
2009 j, err := jsoniter.Marshal(fde)
2010
2011 if err != nil {
2012 log.Error("Cannot marshal FileDownloadedEvt - discarding message, ", err)
2013 return
2014 }
2015 msg.msg.Value = j
2016 }
2017 msg.msg.Key = []byte("\"" + evt_data.SourceName + "\"")
2018 log.Debug("Marshal file-collect event ", time.Since(start).String())
2019
2020 for k, v := range topic_list {
2021 var kmsg *KafkaPayload = new(KafkaPayload)
2022 kmsg.msg = msg.msg
2023 kmsg.topic = v
2024 kmsg.jobid = k
2025 data_out_channel <- kmsg
2026 }
2027}
2028
2029func xml_to_json_conv(f_byteValue *[]byte, xfeh *XmlFileEventHeader) ([]byte, error) {
2030 var f MeasCollecFile
2031 start := time.Now()
2032 err := xml.Unmarshal(*f_byteValue, &f)
2033 if err != nil {
2034 return nil, errors.New("Cannot unmarshal xml-file")
2035 }
2036 log.Debug("Unmarshal xml file XmlFileEvent: ", time.Since(start).String())
2037
2038 start = time.Now()
2039 var pmfile PMJsonFile
2040
2041 //TODO: Fill in more values
2042 pmfile.Event.Perf3GppFields.Perf3GppFieldsVersion = "1.0"
2043 pmfile.Event.Perf3GppFields.MeasDataCollection.GranularityPeriod = 900
2044 pmfile.Event.Perf3GppFields.MeasDataCollection.MeasuredEntityUserName = ""
2045 pmfile.Event.Perf3GppFields.MeasDataCollection.MeasuredEntityDn = f.FileHeader.FileSender.LocalDn
2046 pmfile.Event.Perf3GppFields.MeasDataCollection.MeasuredEntitySoftwareVersion = f.MeasData.ManagedElement.SwVersion
2047
2048 for _, it := range f.MeasData.MeasInfo {
2049 var mili MeasInfoList
2050 mili.MeasInfoID.SMeasInfoID = it.MeasInfoId
2051 for _, jt := range it.MeasType {
2052 mili.MeasTypes.SMeasTypesList = append(mili.MeasTypes.SMeasTypesList, jt.Text)
2053 }
2054 for _, jt := range it.MeasValue {
2055 var mv MeasValues
2056 mv.MeasObjInstID = jt.MeasObjLdn
2057 mv.SuspectFlag = jt.Suspect
2058 if jt.Suspect == "" {
2059 mv.SuspectFlag = "false"
2060 }
2061 for _, kt := range jt.R {
2062 ni, _ := strconv.Atoi(kt.P)
2063 nv := kt.Text
2064 mr := MeasResults{ni, nv}
2065 mv.MeasResultsList = append(mv.MeasResultsList, mr)
2066 }
2067 mili.MeasValuesList = append(mili.MeasValuesList, mv)
2068 }
2069
2070 pmfile.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList = append(pmfile.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList, mili)
2071 }
2072
2073 pmfile.Event.Perf3GppFields.MeasDataCollection.GranularityPeriod = 900
2074
2075 //TODO: Fill more values
2076 pmfile.Event.CommonEventHeader.Domain = "" //xfeh.Domain
2077 pmfile.Event.CommonEventHeader.EventID = "" //xfeh.EventID
2078 pmfile.Event.CommonEventHeader.Sequence = 0 //xfeh.Sequence
2079 pmfile.Event.CommonEventHeader.EventName = "" //xfeh.EventName
2080 pmfile.Event.CommonEventHeader.SourceName = xfeh.SourceName
2081 pmfile.Event.CommonEventHeader.ReportingEntityName = "" //xfeh.ReportingEntityName
2082 pmfile.Event.CommonEventHeader.Priority = "" //xfeh.Priority
2083 pmfile.Event.CommonEventHeader.StartEpochMicrosec = xfeh.StartEpochMicrosec
2084 pmfile.Event.CommonEventHeader.LastEpochMicrosec = xfeh.LastEpochMicrosec
2085 pmfile.Event.CommonEventHeader.Version = "" //xfeh.Version
2086 pmfile.Event.CommonEventHeader.VesEventListenerVersion = "" //xfeh.VesEventListenerVersion
2087 pmfile.Event.CommonEventHeader.TimeZoneOffset = xfeh.TimeZoneOffset
2088
2089 log.Debug("Convert xml to json : ", time.Since(start).String())
2090
2091 start = time.Now()
2092 json, err := jsoniter.Marshal(pmfile)
2093 log.Debug("Marshal json : ", time.Since(start).String())
2094
2095 if err != nil {
2096 return nil, errors.New("Cannot marshal converted json")
2097 }
2098 return json, nil
2099}
2100
2101func start_job_json_file_data(type_id string, control_ch chan JobControl, data_in_ch chan *KafkaPayload, data_out_channel chan *KafkaPayload, objectstore bool) {
2102
2103 log.Info("Type job", type_id, " started")
2104
2105 filters := make(map[string]Filter)
2106 filterParams_list := make(map[string]FilterMaps)
2107 // ch_list := make(map[string]chan *KafkaPayload)
2108 topic_list := make(map[string]string)
2109 var mc *minio.Client
2110 const mc_id = "mc_" + "start_job_json_file_data"
2111 running := true
2112 for {
2113 select {
2114 case job_ctl := <-control_ch:
2115 log.Debug("Type job ", type_id, " new cmd received ", job_ctl.command)
2116 switch job_ctl.command {
2117 case "EXIT":
2118 //ignore cmd - handled by channel signal
2119 case "ADD-FILTER":
2120 //TODO: Refactor...
2121 filters[job_ctl.filter.JobId] = job_ctl.filter
2122 log.Debug("Type job ", type_id, " updated, topic: ", job_ctl.filter.OutputTopic, " jobid: ", job_ctl.filter.JobId)
2123
2124 tmp_filterParams_list := make(map[string]FilterMaps)
2125 for k, v := range filterParams_list {
2126 tmp_filterParams_list[k] = v
2127 }
2128 tmp_filterParams_list[job_ctl.filter.JobId] = job_ctl.filter.filter
2129 filterParams_list = tmp_filterParams_list
2130
2131 tmp_topic_list := make(map[string]string)
2132 for k, v := range topic_list {
2133 tmp_topic_list[k] = v
2134 }
2135 tmp_topic_list[job_ctl.filter.JobId] = job_ctl.filter.OutputTopic
2136 topic_list = tmp_topic_list
2137 case "REMOVE-FILTER":
2138 //TODO: Refactor...
2139 log.Debug("Type job ", type_id, " removing job: ", job_ctl.filter.JobId)
2140
2141 tmp_filterParams_list := make(map[string]FilterMaps)
2142 for k, v := range filterParams_list {
2143 tmp_filterParams_list[k] = v
2144 }
2145 delete(tmp_filterParams_list, job_ctl.filter.JobId)
2146 filterParams_list = tmp_filterParams_list
2147
2148 tmp_topic_list := make(map[string]string)
2149 for k, v := range topic_list {
2150 tmp_topic_list[k] = v
2151 }
2152 delete(tmp_topic_list, job_ctl.filter.JobId)
2153 topic_list = tmp_topic_list
2154 }
2155
2156 case msg := <-data_in_ch:
2157 if msg == nil {
2158 log.Info("Type job ", type_id, " stopped by channel signal - start_job_xml_file_data")
2159
2160 running = false
2161 return
2162 }
2163 if objectstore {
2164 if mc == nil {
2165 var err *error
2166 mc, err = create_minio_client(mc_id)
2167 if err != nil {
2168 log.Debug("Cannot create minio client for type job: ", type_id)
2169 }
2170 }
2171 }
2172 //TODO: Sort processed file conversions in order (FIFO)
2173 jobLimiterChan <- struct{}{}
2174 go run_json_job(type_id, msg, "", filterParams_list, topic_list, data_out_channel, jobLimiterChan, mc, mc_id, objectstore)
2175
2176 case <-time.After(1 * time.Second):
2177 if !running {
2178 return
2179 }
2180 }
2181 }
2182}
2183
2184func run_json_job(type_id string, msg *KafkaPayload, outputCompression string, filterList map[string]FilterMaps, topic_list map[string]string, data_out_channel chan *KafkaPayload, jobLimiterChan chan struct{}, mc *minio.Client, mc_id string, objectstore bool) {
2185
2186 //Release job limit
2187 defer func() {
2188 <-jobLimiterChan
2189 }()
2190
2191 PrintMemUsage()
2192
2193 var evt_data FileDownloadedEvt
2194 err := jsoniter.Unmarshal(msg.msg.Value, &evt_data)
2195 if err != nil {
2196 log.Error("Cannot parse FileDownloadedEvt for type job: ", type_id, " - discarding message, error details: ", err)
2197 return
2198 }
2199 log.Debug("FileDownloadedEvt for job: ", type_id, " file: ", evt_data.Filename)
2200
2201 var reader io.Reader
2202
2203 //TODO -> config
2204 //INPUTBUCKET := "json-file-ready"
2205 INPUTBUCKET := "pm-files-json"
2206 filename := ""
2207 if objectstore == false {
2208 filename = files_volume + "/" + evt_data.Filename
2209 fi, err := os.Open(filename)
2210
2211 if err != nil {
2212 log.Error("File: ", filename, "for type job: ", type_id, " - cannot be opened -discarding message - error details: ", err)
2213 return
2214 }
2215 defer fi.Close()
2216 reader = fi
2217 } else {
2218 filename = "/" + evt_data.Filename
2219 if mc != nil {
2220 if check_minio_bucket(mc, mc_id, INPUTBUCKET) == false {
2221 log.Error("Bucket not available for reading in type job: ", type_id, "bucket: ", INPUTBUCKET)
2222 return
2223 }
2224 tctx := context.Background()
2225 mr, err := mc.GetObject(tctx, INPUTBUCKET, filename, minio.GetObjectOptions{})
2226 if err != nil {
2227 log.Error("Obj: ", filename, "for type job: ", type_id, " - cannot be fetched from bucket: ", INPUTBUCKET, " - discarding message, error details: ", err)
2228 return
2229 }
2230 reader = mr
2231 defer mr.Close()
2232 } else {
2233 log.Error("Cannot get obj: ", filename, "for type job: ", type_id, " from bucket: ", INPUTBUCKET, " - no client")
2234 return
2235 }
2236 }
2237
2238 var data *[]byte
2239 if strings.HasSuffix(filename, "gz") {
2240 start := time.Now()
2241 var buf2 bytes.Buffer
2242 errb := gunzipReaderToWriter(&buf2, reader)
2243 if errb != nil {
2244 log.Error("Cannot decompress file/obj ", filename, "for type job: ", type_id, " - discarding message, error details", errb)
2245 return
2246 }
2247 d := buf2.Bytes()
2248 data = &d
2249 log.Debug("Decompress file/obj ", filename, "for type job: ", type_id, " time:", time.Since(start).String())
2250 } else {
2251
2252 start := time.Now()
2253 d, err := io.ReadAll(reader)
2254 if err != nil {
2255 log.Error("Cannot read file/obj: ", filename, "for type job: ", type_id, " - discarding message, error details", err)
2256 return
2257 }
2258 data = &d
2259
2260 log.Debug("Read file/obj: ", filename, "for type job: ", type_id, " time: ", time.Since(start).String())
2261 }
2262
2263 // The code was moved to inside the below for loop - a deep copy of PM PMJsonFile is need to keep it outside
2264 // var pmfile PMJsonFile
2265 // start := time.Now()
2266 // err = jsoniter.Unmarshal(*data, &pmfile)
2267 // log.Debug("Json unmarshal obj: ", filename, " time: ", time.Since(start).String())
2268
2269 // if err != nil {
2270 // log.Error("Msg could not be unmarshalled - discarding message, obj: ", filename, " - error details:", err)
2271 // return
2272 // }
2273 for k, v := range filterList {
2274
2275 var pmfile PMJsonFile
2276 start := time.Now()
2277 err = jsoniter.Unmarshal(*data, &pmfile)
2278 log.Debug("Json unmarshal obj: ", filename, " time: ", time.Since(start).String())
2279
2280 if err != nil {
2281 log.Error("Msg could not be unmarshalled - discarding message, obj: ", filename, " - error details:", err)
2282 return
2283 }
2284
2285 var kmsg *KafkaPayload = new(KafkaPayload)
2286 kmsg.msg = new(kafka.Message)
2287 kmsg.msg.Key = []byte("\"" + pmfile.Event.CommonEventHeader.SourceName + "\"")
2288 log.Debug("topic:", topic_list[k])
2289 log.Debug("sourceNameMap:", v.sourceNameMap)
2290 log.Debug("measObjClassMap:", v.measObjClassMap)
2291 log.Debug("measObjInstIdsMap:", v.measObjInstIdsMap)
2292 log.Debug("measTypesMap:", v.measTypesMap)
2293 //BMX if len(v.sourceNameMap) > 0 || len(v.measObjInstIdsMap) > 0 || len(v.measTypesMap) > 0 || len(v.measObjClassMap) > 0 {
2294 b := json_pm_filter_to_byte(evt_data.Filename, &pmfile, v.sourceNameMap, v.measObjClassMap, v.measObjInstIdsMap, v.measTypesMap)
2295 if b == nil {
2296 log.Info("File/obj ", filename, "for job: ", k, " - empty after filering, discarding message ")
2297 return
2298 }
2299 kmsg.msg.Value = *b
2300 //BMX}
2301
2302 // if outputCompression == "json.gz" {
2303 // start := time.Now()
2304 // var buf bytes.Buffer
2305 // err := gzipWrite(&buf, &kmsg.msg.Value)
2306 // if err != nil {
2307 // log.Error("Cannot compress file/obj ", filename, "for job: ", job_id, " - discarding message, error details", err)
2308 // return
2309
2310 // }
2311 // kmsg.msg.Value = buf.Bytes()
2312 // log.Debug("Compress file/obj ", filename, "for job: ", job_id, " time:", time.Since(start).String())
2313 // }
2314 kmsg.topic = topic_list[k]
2315 kmsg.jobid = k
2316
2317 data_out_channel <- kmsg
2318 }
2319
2320}
2321
2322func json_pm_filter_to_byte(resource string, data *PMJsonFile, sourceNameMap map[string]bool, measObjClassMap map[string]bool, measObjInstIdsMap map[string]bool, measTypesMap map[string]bool) *[]byte {
2323
2324 if json_pm_filter_to_obj(resource, data, sourceNameMap, measObjClassMap, measObjInstIdsMap, measTypesMap) == nil {
2325 return nil
2326 }
2327 start := time.Now()
2328 j, err := jsoniter.Marshal(&data)
2329
2330 log.Debug("Json marshal obj: ", resource, " time: ", time.Since(start).String())
2331
2332 if err != nil {
2333 log.Error("Msg could not be marshalled discarding message, obj: ", resource, ", error details: ", err)
2334 return nil
2335 }
2336
2337 log.Debug("Filtered json obj: ", resource, " len: ", len(j))
2338 return &j
2339}
2340
2341func json_pm_filter_to_obj(resource string, data *PMJsonFile, sourceNameMap map[string]bool, measObjClassMap map[string]bool, measObjInstIdsMap map[string]bool, measTypesMap map[string]bool) *PMJsonFile {
2342 filter_req := true
2343 start := time.Now()
2344 if len(sourceNameMap) != 0 {
2345 if !sourceNameMap[data.Event.CommonEventHeader.SourceName] {
2346 filter_req = false
2347 return nil
2348 }
2349 }
2350 if filter_req {
2351 modified := false
2352 var temp_mil []MeasInfoList
2353 for _, zz := range data.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList {
2354
2355 check_cntr := false
2356 var cnt_flags []bool
2357 if len(measTypesMap) > 0 {
2358 c_cntr := 0
2359 var temp_mtl []string
2360 for _, v := range zz.MeasTypes.SMeasTypesList {
2361 if measTypesMap[v] {
2362 cnt_flags = append(cnt_flags, true)
2363 c_cntr++
2364 temp_mtl = append(temp_mtl, v)
2365 } else {
2366 cnt_flags = append(cnt_flags, false)
2367 }
2368 }
2369 if c_cntr > 0 {
2370 check_cntr = true
2371 zz.MeasTypes.SMeasTypesList = temp_mtl
2372 } else {
2373 modified = true
2374 continue
2375 }
2376 }
2377 keep := false
2378 var temp_mvl []MeasValues
2379 for _, yy := range zz.MeasValuesList {
2380 keep_class := false
2381 keep_inst := false
2382 keep_cntr := false
2383
2384 dna := strings.Split(yy.MeasObjInstID, ",")
2385 instName := dna[len(dna)-1]
2386 cls := strings.Split(dna[len(dna)-1], "=")[0]
2387
2388 if len(measObjClassMap) > 0 {
2389 if measObjClassMap[cls] {
2390 keep_class = true
2391 }
2392 } else {
2393 keep_class = true
2394 }
2395
2396 if len(measObjInstIdsMap) > 0 {
2397 if measObjInstIdsMap[instName] {
2398 keep_inst = true
2399 }
2400 } else {
2401 keep_inst = true
2402 }
2403
2404 if check_cntr {
2405 var temp_mrl []MeasResults
2406 cnt_p := 1
2407 for _, v := range yy.MeasResultsList {
2408 if cnt_flags[v.P-1] {
2409 v.P = cnt_p
2410 cnt_p++
2411 temp_mrl = append(temp_mrl, v)
2412 }
2413 }
2414 yy.MeasResultsList = temp_mrl
2415 keep_cntr = true
2416 } else {
2417 keep_cntr = true
2418 }
2419 if keep_class && keep_cntr && keep_inst {
2420 keep = true
2421 temp_mvl = append(temp_mvl, yy)
2422 }
2423 }
2424 if keep {
2425 zz.MeasValuesList = temp_mvl
2426 temp_mil = append(temp_mil, zz)
2427 modified = true
2428 }
2429
2430 }
2431 //Only if modified
2432 if modified {
2433 if len(temp_mil) == 0 {
2434 log.Debug("Msg filtered, nothing found, discarding, obj: ", resource)
2435 return nil
2436 }
2437 data.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList = temp_mil
2438 }
2439 }
2440 log.Debug("Filter: ", time.Since(start).String())
2441 return data
2442}
2443
2444// func json_pm_filter(resource string, data *PMJsonFile, sourceNameMap map[string]bool, measObjClassMap map[string]bool, measObjInstIdsMap map[string]bool, measTypesMap map[string]bool) *[]byte {
2445
2446// filter_req := true
2447// start := time.Now()
2448// if len(sourceNameMap) != 0 {
2449// if !sourceNameMap[data.Event.CommonEventHeader.SourceName] {
2450// filter_req = false
2451// return nil
2452// }
2453// }
2454// if filter_req {
2455// modified := false
2456// var temp_mil []MeasInfoList
2457// for _, zz := range data.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList {
2458
2459// check_cntr := false
2460// var cnt_flags []bool
2461// if len(measTypesMap) > 0 {
2462// c_cntr := 0
2463// var temp_mtl []string
2464// for _, v := range zz.MeasTypes.SMeasTypesList {
2465// if measTypesMap[v] {
2466// cnt_flags = append(cnt_flags, true)
2467// c_cntr++
2468// temp_mtl = append(temp_mtl, v)
2469// } else {
2470// cnt_flags = append(cnt_flags, false)
2471// }
2472// }
2473// if c_cntr > 0 {
2474// check_cntr = true
2475// zz.MeasTypes.SMeasTypesList = temp_mtl
2476// } else {
2477// modified = true
2478// continue
2479// }
2480// }
2481// keep := false
2482// var temp_mvl []MeasValues
2483// for _, yy := range zz.MeasValuesList {
2484// keep_class := false
2485// keep_inst := false
2486// keep_cntr := false
2487
2488// dna := strings.Split(yy.MeasObjInstID, ",")
2489// instName := dna[len(dna)-1]
2490// cls := strings.Split(dna[len(dna)-1], "=")[0]
2491
2492// if len(measObjClassMap) > 0 {
2493// if measObjClassMap[cls] {
2494// keep_class = true
2495// }
2496// } else {
2497// keep_class = true
2498// }
2499
2500// if len(measObjInstIdsMap) > 0 {
2501// if measObjInstIdsMap[instName] {
2502// keep_inst = true
2503// }
2504// } else {
2505// keep_inst = true
2506// }
2507
2508// if check_cntr {
2509// var temp_mrl []MeasResults
2510// cnt_p := 1
2511// for _, v := range yy.MeasResultsList {
2512// if cnt_flags[v.P-1] {
2513// v.P = cnt_p
2514// cnt_p++
2515// temp_mrl = append(temp_mrl, v)
2516// }
2517// }
2518// yy.MeasResultsList = temp_mrl
2519// keep_cntr = true
2520// } else {
2521// keep_cntr = true
2522// }
2523// if keep_class && keep_cntr && keep_inst {
2524// keep = true
2525// temp_mvl = append(temp_mvl, yy)
2526// }
2527// }
2528// if keep {
2529// zz.MeasValuesList = temp_mvl
2530// temp_mil = append(temp_mil, zz)
2531// modified = true
2532// }
2533
2534// }
2535// //Only if modified
2536// if modified {
2537// if len(temp_mil) == 0 {
2538// log.Debug("Msg filtered, nothing found, discarding, obj: ", resource)
2539// return nil
2540// }
2541// data.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList = temp_mil
2542// }
2543// }
2544// log.Debug("Filter: ", time.Since(start).String())
2545
2546// start = time.Now()
2547// j, err := jsoniter.Marshal(&data)
2548
2549// log.Debug("Json marshal obj: ", resource, " time: ", time.Since(start).String())
2550
2551// if err != nil {
2552// log.Error("Msg could not be marshalled discarding message, obj: ", resource, ", error details: ", err)
2553// return nil
2554// }
2555
2556// log.Debug("Filtered json obj: ", resource, " len: ", len(j))
2557// return &j
2558// }
2559
2560func start_job_json_file_data_influx(type_id string, control_ch chan JobControl, data_in_ch chan *KafkaPayload, objectstore bool) {
2561
2562 log.Info("Type job", type_id, " started")
2563 log.Debug("influx job ch ", data_in_ch)
2564 filters := make(map[string]Filter)
2565 filterParams_list := make(map[string]FilterMaps)
2566 influx_job_params := make(map[string]InfluxJobParameters)
2567 var mc *minio.Client
2568 const mc_id = "mc_" + "start_job_json_file_data_influx"
2569 running := true
2570 for {
2571 select {
2572 case job_ctl := <-control_ch:
2573 log.Debug("Type job ", type_id, " new cmd received ", job_ctl.command)
2574 switch job_ctl.command {
2575 case "EXIT":
2576 //ignore cmd - handled by channel signal
2577 case "ADD-FILTER":
2578 //TODO: Refactor...
2579 filters[job_ctl.filter.JobId] = job_ctl.filter
2580 log.Debug("Type job ", type_id, " updated, topic: ", job_ctl.filter.OutputTopic, " jobid: ", job_ctl.filter.JobId)
2581 log.Debug(job_ctl.filter)
2582 tmp_filterParams_list := make(map[string]FilterMaps)
2583 for k, v := range filterParams_list {
2584 tmp_filterParams_list[k] = v
2585 }
2586 tmp_filterParams_list[job_ctl.filter.JobId] = job_ctl.filter.filter
2587 filterParams_list = tmp_filterParams_list
2588
2589 tmp_influx_job_params := make(map[string]InfluxJobParameters)
2590 for k, v := range influx_job_params {
2591 tmp_influx_job_params[k] = v
2592 }
2593 tmp_influx_job_params[job_ctl.filter.JobId] = job_ctl.filter.influxParameters
2594 influx_job_params = tmp_influx_job_params
2595
2596 case "REMOVE-FILTER":
2597 //TODO: Refactor...
2598 log.Debug("Type job ", type_id, " removing job: ", job_ctl.filter.JobId)
2599
2600 tmp_filterParams_list := make(map[string]FilterMaps)
2601 for k, v := range filterParams_list {
2602 tmp_filterParams_list[k] = v
2603 }
2604 delete(tmp_filterParams_list, job_ctl.filter.JobId)
2605 filterParams_list = tmp_filterParams_list
2606
2607 tmp_influx_job_params := make(map[string]InfluxJobParameters)
2608 for k, v := range influx_job_params {
2609 tmp_influx_job_params[k] = v
2610 }
2611 delete(tmp_influx_job_params, job_ctl.filter.JobId)
2612 influx_job_params = tmp_influx_job_params
2613 }
2614
2615 case msg := <-data_in_ch:
2616 log.Debug("Data reveived - influx")
2617 if msg == nil {
2618 log.Info("Type job ", type_id, " stopped by channel signal - start_job_xml_file_data")
2619
2620 running = false
2621 return
2622 }
2623 if objectstore {
2624 if mc == nil {
2625 var err *error
2626 mc, err = create_minio_client(mc_id)
2627 if err != nil {
2628 log.Debug("Cannot create minio client for type job: ", type_id)
2629 }
2630 }
2631 }
2632 //TODO: Sort processed file conversions in order (FIFO)
2633 jobLimiterChan <- struct{}{}
2634 go run_json_file_data_job_influx(type_id, msg, filterParams_list, influx_job_params, jobLimiterChan, mc, mc_id, objectstore)
2635
2636 case <-time.After(1 * time.Second):
2637 if !running {
2638 return
2639 }
2640 }
2641 }
2642}
2643
2644func run_json_file_data_job_influx(type_id string, msg *KafkaPayload, filterList map[string]FilterMaps, influxList map[string]InfluxJobParameters, jobLimiterChan chan struct{}, mc *minio.Client, mc_id string, objectstore bool) {
2645
2646 log.Debug("run_json_file_data_job_influx")
2647 //Release job limit
2648 defer func() {
2649 <-jobLimiterChan
2650 }()
2651
2652 PrintMemUsage()
2653
2654 var evt_data FileDownloadedEvt
2655 err := jsoniter.Unmarshal(msg.msg.Value, &evt_data)
2656 if err != nil {
2657 log.Error("Cannot parse FileDownloadedEvt for type job: ", type_id, " - discarding message, error details: ", err)
2658 return
2659 }
2660 log.Debug("FileDownloadedEvt for job: ", type_id, " file: ", evt_data.Filename)
2661
2662 var reader io.Reader
2663
2664 //TODO -> config
2665 //INPUTBUCKET := "json-file-ready"
2666 INPUTBUCKET := "pm-files-json"
2667 filename := ""
2668 if objectstore == false {
2669 filename = files_volume + "/" + evt_data.Filename
2670 fi, err := os.Open(filename)
2671
2672 if err != nil {
2673 log.Error("File: ", filename, "for type job: ", type_id, " - cannot be opened -discarding message - error details: ", err)
2674 return
2675 }
2676 defer fi.Close()
2677 reader = fi
2678 } else {
2679 filename = "/" + evt_data.Filename
2680 if mc != nil {
2681 if check_minio_bucket(mc, mc_id, INPUTBUCKET) == false {
2682 log.Error("Bucket not available for reading in type job: ", type_id, "bucket: ", INPUTBUCKET)
2683 return
2684 }
2685 tctx := context.Background()
2686 mr, err := mc.GetObject(tctx, INPUTBUCKET, filename, minio.GetObjectOptions{})
2687 if err != nil {
2688 log.Error("Obj: ", filename, "for type job: ", type_id, " - cannot be fetched from bucket: ", INPUTBUCKET, " - discarding message, error details: ", err)
2689 return
2690 }
2691 reader = mr
2692 defer mr.Close()
2693 } else {
2694 log.Error("Cannot get obj: ", filename, "for type job: ", type_id, " from bucket: ", INPUTBUCKET, " - no client")
2695 return
2696 }
2697 }
2698
2699 var data *[]byte
2700 if strings.HasSuffix(filename, "gz") {
2701 start := time.Now()
2702 var buf2 bytes.Buffer
2703 errb := gunzipReaderToWriter(&buf2, reader)
2704 if errb != nil {
2705 log.Error("Cannot decompress file/obj ", filename, "for type job: ", type_id, " - discarding message, error details", errb)
2706 return
2707 }
2708 d := buf2.Bytes()
2709 data = &d
2710 log.Debug("Decompress file/obj ", filename, "for type job: ", type_id, " time:", time.Since(start).String())
2711 } else {
2712
2713 start := time.Now()
2714 d, err := io.ReadAll(reader)
2715 if err != nil {
2716 log.Error("Cannot read file/obj: ", filename, "for type job: ", type_id, " - discarding message, error details", err)
2717 return
2718 }
2719 data = &d
2720
2721 log.Debug("Read file/obj: ", filename, "for type job: ", type_id, " time: ", time.Since(start).String())
2722 }
2723 // The code was moved to inside the below for loop - a deep copy of PM PMJsonFile is need to keep it outside
2724 // var pmfile PMJsonFile
2725 // start := time.Now()
2726 // err = jsoniter.Unmarshal(*data, &pmfile)
2727 // log.Debug("Json unmarshal obj: ", filename, " time: ", time.Since(start).String())
2728
2729 // if err != nil {
2730 // log.Error("Msg could not be unmarshalled - discarding message, obj: ", filename, " - error details:", err)
2731 // return
2732 // }
2733 for k, v := range filterList {
2734
2735 var pmfile PMJsonFile
2736 start := time.Now()
2737 err = jsoniter.Unmarshal(*data, &pmfile)
2738 log.Debug("Json unmarshal obj: ", filename, " time: ", time.Since(start).String())
2739
2740 if err != nil {
2741 log.Error("Msg could not be unmarshalled - discarding message, obj: ", filename, " - error details:", err)
2742 return
2743 }
2744
2745 if len(v.sourceNameMap) > 0 || len(v.measObjInstIdsMap) > 0 || len(v.measTypesMap) > 0 || len(v.measObjClassMap) > 0 {
2746 b := json_pm_filter_to_obj(evt_data.Filename, &pmfile, v.sourceNameMap, v.measObjClassMap, v.measObjInstIdsMap, v.measTypesMap)
2747 if b == nil {
2748 log.Info("File/obj ", filename, "for job: ", k, " - empty after filering, discarding message ")
2749 return
2750 }
2751
2752 }
2753 fluxParms := influxList[k]
2754 log.Debug("Influxdb params: ", fluxParms)
2755 client := influxdb2.NewClient(fluxParms.DbUrl, fluxParms.DbToken)
2756 writeAPI := client.WriteAPIBlocking(fluxParms.DbOrg, fluxParms.DbBucket)
2757
2758 // fmt.Println(pmfile.Event.CommonEventHeader.StartEpochMicrosec)
2759 // tUnix := pmfile.Event.CommonEventHeader.StartEpochMicrosec / int64(time.Millisecond)
2760 // tUnixNanoRemainder := (pmfile.Event.CommonEventHeader.StartEpochMicrosec % int64(time.Millisecond)) * int64(time.Microsecond)
2761 // timeT := time.Unix(tUnix, tUnixNanoRemainder)
2762 // fmt.Println(timeT)
2763 // fmt.Println("======================")
2764 for _, zz := range pmfile.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList {
2765 ctr_names := make(map[string]string)
2766 for cni, cn := range zz.MeasTypes.SMeasTypesList {
2767 ctr_names[string(cni+1)] = cn
2768 }
2769 for _, xx := range zz.MeasValuesList {
2770 log.Debug("Measurement: ", xx.MeasObjInstID)
2771 log.Debug("Suspect flag: ", xx.SuspectFlag)
2772 p := influxdb2.NewPointWithMeasurement(xx.MeasObjInstID)
2773 p.AddField("suspectflag", xx.SuspectFlag)
2774 p.AddField("granularityperiod", pmfile.Event.Perf3GppFields.MeasDataCollection.GranularityPeriod)
2775 p.AddField("timezone", pmfile.Event.CommonEventHeader.TimeZoneOffset)
2776 for _, yy := range xx.MeasResultsList {
2777 pi := string(yy.P)
2778 pv := yy.SValue
2779 pn := ctr_names[pi]
2780 log.Debug("Counter: ", pn, " Value: ", pv)
2781 pv_i, err := strconv.Atoi(pv)
2782 if err == nil {
2783 p.AddField(pn, pv_i)
2784 } else {
2785 p.AddField(pn, pv)
2786 }
2787 }
2788 //p.SetTime(timeT)
2789 log.Debug("StartEpochMicrosec from common event header: ", pmfile.Event.CommonEventHeader.StartEpochMicrosec)
2790 log.Debug("Set time: ", time.Unix(pmfile.Event.CommonEventHeader.StartEpochMicrosec/1000000, 0))
2791 p.SetTime(time.Unix(pmfile.Event.CommonEventHeader.StartEpochMicrosec/1000000, 0))
2792 err := writeAPI.WritePoint(context.Background(), p)
2793 if err != nil {
2794 log.Error("Db write error: ", err)
2795 }
2796 }
2797
2798 }
2799 client.Close()
2800 }
2801
2802}