Remove unused code
Issue-ID: NONRTRIC-880
Change-Id: I5e9864f7ff89e7cca7b013632a554c9437377e30
Signed-off-by: ktimoney <kevin.timoney@est.tech>
(cherry picked from commit 0bb219edead30bb0459abd2cd2163326c97ca92d)
diff --git a/install/helm/nrt-base-1/charts/strimzi-kafka/templates/app-kafka.yaml b/install/helm/nrt-base-1/charts/strimzi-kafka/templates/app-kafka.yaml
index dcc0bb8..de55ff6 100644
--- a/install/helm/nrt-base-1/charts/strimzi-kafka/templates/app-kafka.yaml
+++ b/install/helm/nrt-base-1/charts/strimzi-kafka/templates/app-kafka.yaml
@@ -22,7 +22,7 @@
namespace: nonrtric
spec:
kafka:
- version: 3.3.1
+ version: 3.5.0
replicas: 1
listeners:
- name: plain
diff --git a/install/scripts/update_ics_job.sh b/install/scripts/update_ics_job.sh
new file mode 100755
index 0000000..b8788a6
--- /dev/null
+++ b/install/scripts/update_ics_job.sh
@@ -0,0 +1,52 @@
+#!/bin/bash
+
+# ============LICENSE_START===============================================
+# Copyright (C) 2023 Nordix Foundation. All rights reserved.
+# ========================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=================================================
+#
+
+# args: <job-id> <job-index-suffix> [<access-token>]
+# job file shall exist in file "".job.json"
+update_ics_job() {
+
+ ICS_PROXY_PORT=$(kubectl get svc -n nonrtric informationservice --output jsonpath='{.spec.ports[?(@.name=="http")].nodePort}')
+ echo "NodePort to ics: "$ICS_PROXY_PORT
+ JOB=$(<.job.json)
+ echo $JOB
+ retcode=1
+ echo "Updating job $1"
+ while [ $retcode -ne 0 ]; do
+ if [ -z "$2" ]; then
+ __bearer=""
+ else
+ __bearer="Authorization: Bearer $TOKEN"
+ fi
+ STAT=$(curl -s -X PUT -w '%{http_code}' -H accept:application/json -H Content-Type:application/json http://$KUBERNETESHOST:$ICS_PROXY_PORT/data-consumer/v1/info-jobs/$1 --data-binary @.job.json -H "$__bearer" )
+ retcode=$?
+ echo "curl return code: $retcode"
+ if [ $retcode -eq 0 ]; then
+ status=${STAT:${#STAT}-3}
+ echo "http status code: "$status
+ if [ "$status" == "200" ]; then
+ echo "Job created ok"
+ elif [ "$status" == "201" ]; then
+ echo "Job created ok"
+ else
+ retcode=1
+ fi
+ fi
+ sleep 1
+ done
+}
diff --git a/install/update-pm-log.sh b/install/update-pm-log.sh
new file mode 100755
index 0000000..c523fce
--- /dev/null
+++ b/install/update-pm-log.sh
@@ -0,0 +1,88 @@
+#!/bin/bash
+
+# ============LICENSE_START===============================================
+# Copyright (C) 2023 Nordix Foundation. All rights reserved.
+# ========================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=================================================
+#
+
+. scripts/kube_get_controlplane_host.sh
+
+# Generic error printout function
+# args: <numeric-response-code> <descriptive-string>
+check_error() {
+ if [ $1 -ne 0 ]; then
+ echo "Failed: $2"
+ echo "Exiting..."
+ exit 1
+ fi
+}
+
+export KUBERNETESHOST=$(kube_get_controlplane_host)
+if [ $? -ne 0 ]; then
+ echo $KUBERNETESHOST
+ echo "Exiting"
+ exit 1
+fi
+
+echo "Kubernetes control plane host: $KUBERNETESHOST"
+
+. scripts/kube_get_nodeport.sh
+. scripts/get_influxdb2_token.sh
+. scripts/create_influxdb2_bucket.sh
+. scripts/update_ics_job.sh
+
+echo "Installation of pm to influx job"
+
+echo " Retriving influxdb2 access token..."
+INFLUXDB2_TOKEN=$(get_influxdb2_token influxdb2-0 nonrtric)
+
+
+bucket=pm-logg-bucket
+echo "Creating bucket $bucket in influxdb2"
+create_influxdb2_bucket influxdb2-0 nonrtric $bucket
+
+. scripts/populate_keycloak.sh
+
+cid="console-setup"
+TOKEN=$(get_client_token nonrtric-realm $cid)
+
+JOB='{
+ "info_type_id": "PmData",
+ "job_owner": "console",
+ "job_definition": {
+ "filter": {
+ "sourceNames": ["node2-1"],
+ "measObjInstIds": [],
+ "measTypeSpecs": [
+ {
+ "measuredObjClass": "NRCellDU",
+ "measTypes": [
+ "pmCounterNumber102"
+ ]
+ }
+ ],
+ "measuredEntityDns": []
+ },
+ "deliveryInfo": {
+ "topic": "pmreports",
+ "bootStrapServers": "kafka-1-kafka-bootstrap.nonrtric:9097"
+ }
+ }
+ }'
+echo $JOB > .job.json
+update_ics_job pmlog $TOKEN
+
+echo "done"
+
diff --git a/pm-file-converter/Dockerfile b/pm-file-converter/Dockerfile
index 80867f2..6248698 100644
--- a/pm-file-converter/Dockerfile
+++ b/pm-file-converter/Dockerfile
@@ -20,6 +20,8 @@
WORKDIR /app
COPY main.go .
+ADD common common
+ADD components components
RUN go mod init main
RUN go mod tidy
diff --git a/pm-file-converter/common/dataTypes/dataTypes.go b/pm-file-converter/common/dataTypes/dataTypes.go
new file mode 100644
index 0000000..66b462c
--- /dev/null
+++ b/pm-file-converter/common/dataTypes/dataTypes.go
@@ -0,0 +1,230 @@
+// -
+//
+// ========================LICENSE_START=================================
+// O-RAN-SC
+// %%
+// Copyright (C) 2023: Nordix Foundation
+// %%
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ========================LICENSE_END===================================
+
+package dataTypes
+
+import (
+ "encoding/xml"
+ "github.com/confluentinc/confluent-kafka-go/kafka"
+)
+
+// // Data type for event xml file download
+type XmlFileEventHeader struct {
+ ProductName string `json:"productName"`
+ VendorName string `json:"vendorName"`
+ Location string `json:"location"`
+ Compression string `json:"compression"`
+ SourceName string `json:"sourceName"`
+ FileFormatType string `json:"fileFormatType"`
+ FileFormatVersion string `json:"fileFormatVersion"`
+ StartEpochMicrosec int64 `json:"startEpochMicrosec"`
+ LastEpochMicrosec int64 `json:"lastEpochMicrosec"`
+ Name string `json:"name"`
+ ChangeIdentifier string `json:"changeIdentifier"`
+ InternalLocation string `json:"internalLocation"`
+ TimeZoneOffset string `json:"timeZoneOffset"`
+ ObjectStoreBucket string `json:"objectStoreBucket"`
+}
+
+// Data types for input xml file
+type MeasCollecFile struct {
+ XMLName xml.Name `xml:"measCollecFile"`
+ Text string `xml:",chardata"`
+ Xmlns string `xml:"xmlns,attr"`
+ Xsi string `xml:"xsi,attr"`
+ SchemaLocation string `xml:"schemaLocation,attr"`
+ FileHeader struct {
+ Text string `xml:",chardata"`
+ FileFormatVersion string `xml:"fileFormatVersion,attr"`
+ VendorName string `xml:"vendorName,attr"`
+ DnPrefix string `xml:"dnPrefix,attr"`
+ FileSender struct {
+ Text string `xml:",chardata"`
+ LocalDn string `xml:"localDn,attr"`
+ ElementType string `xml:"elementType,attr"`
+ } `xml:"fileSender"`
+ MeasCollec struct {
+ Text string `xml:",chardata"`
+ BeginTime string `xml:"beginTime,attr"`
+ } `xml:"measCollec"`
+ } `xml:"fileHeader"`
+ MeasData struct {
+ Text string `xml:",chardata"`
+ ManagedElement struct {
+ Text string `xml:",chardata"`
+ LocalDn string `xml:"localDn,attr"`
+ SwVersion string `xml:"swVersion,attr"`
+ } `xml:"managedElement"`
+ MeasInfo []struct {
+ Text string `xml:",chardata"`
+ MeasInfoId string `xml:"measInfoId,attr"`
+ Job struct {
+ Text string `xml:",chardata"`
+ JobId string `xml:"jobId,attr"`
+ } `xml:"job"`
+ GranPeriod struct {
+ Text string `xml:",chardata"`
+ Duration string `xml:"duration,attr"`
+ EndTime string `xml:"endTime,attr"`
+ } `xml:"granPeriod"`
+ RepPeriod struct {
+ Text string `xml:",chardata"`
+ Duration string `xml:"duration,attr"`
+ } `xml:"repPeriod"`
+ MeasType []struct {
+ Text string `xml:",chardata"`
+ P string `xml:"p,attr"`
+ } `xml:"measType"`
+ MeasValue []struct {
+ Text string `xml:",chardata"`
+ MeasObjLdn string `xml:"measObjLdn,attr"`
+ R []struct {
+ Text string `xml:",chardata"`
+ P string `xml:"p,attr"`
+ } `xml:"r"`
+ Suspect string `xml:"suspect"`
+ } `xml:"measValue"`
+ } `xml:"measInfo"`
+ } `xml:"measData"`
+ FileFooter struct {
+ Text string `xml:",chardata"`
+ MeasCollec struct {
+ Text string `xml:",chardata"`
+ EndTime string `xml:"endTime,attr"`
+ } `xml:"measCollec"`
+ } `xml:"fileFooter"`
+}
+
+// Splitted in sevreal part to allow add/remove in lists
+type MeasResults struct {
+ P int `json:"p"`
+ SValue string `json:"sValue"`
+}
+
+type MeasValues struct {
+ MeasObjInstID string `json:"measObjInstId"`
+ SuspectFlag string `json:"suspectFlag"`
+ MeasResultsList []MeasResults `json:"measResults"`
+}
+
+type SMeasTypes struct {
+ SMeasType string `json:"sMeasTypesList"`
+}
+
+type MeasInfoList struct {
+ MeasInfoID struct {
+ SMeasInfoID string `json:"sMeasInfoId"`
+ } `json:"measInfoId"`
+ MeasTypes struct {
+ SMeasTypesList []string `json:"sMeasTypesList"`
+ } `json:"measTypes"`
+ MeasValuesList []MeasValues `json:"measValuesList"`
+}
+
+type PMJsonFile struct {
+ Event struct {
+ CommonEventHeader struct {
+ Domain string `json:"domain"`
+ EventID string `json:"eventId"`
+ Sequence int `json:"sequence"`
+ EventName string `json:"eventName"`
+ SourceName string `json:"sourceName"`
+ ReportingEntityName string `json:"reportingEntityName"`
+ Priority string `json:"priority"`
+ StartEpochMicrosec int64 `json:"startEpochMicrosec"`
+ LastEpochMicrosec int64 `json:"lastEpochMicrosec"`
+ Version string `json:"version"`
+ VesEventListenerVersion string `json:"vesEventListenerVersion"`
+ TimeZoneOffset string `json:"timeZoneOffset"`
+ } `json:"commonEventHeader"`
+ Perf3GppFields struct {
+ Perf3GppFieldsVersion string `json:"perf3gppFieldsVersion"`
+ MeasDataCollection struct {
+ GranularityPeriod int `json:"granularityPeriod"`
+ MeasuredEntityUserName string `json:"measuredEntityUserName"`
+ MeasuredEntityDn string `json:"measuredEntityDn"`
+ MeasuredEntitySoftwareVersion string `json:"measuredEntitySoftwareVersion"`
+ SMeasInfoList []MeasInfoList `json:"measInfoList"`
+ } `json:"measDataCollection"`
+ } `json:"perf3gppFields"`
+ } `json:"event"`
+}
+
+type FileDownloadedEvt struct {
+ Filename string `json:"filename"`
+}
+
+type KafkaPayload struct {
+ Msg *kafka.Message
+ Topic string
+}
+
+// Type for controlling the topic reader
+type ReaderControl struct {
+ Command string
+}
+
+// Type for controlling the topic writer
+type WriterControl struct {
+ Command string
+}
+
+// == API Datatypes ==//
+// Type for supported data types
+type DataType struct {
+ ID string `json:"id"`
+ KafkaInputTopic string `json:"kafkaInputTopic"`
+ InputJobType string `json:inputJobType`
+ InputJobDefinition struct {
+ KafkaOutputTopic string `json:kafkaOutputTopic`
+ } `json:inputJobDefinition`
+
+ Ext_job *[]byte
+ Ext_job_created bool
+ Ext_job_id string
+}
+
+// Type for controlling the job
+type JobControl struct {
+ Command string
+ //Filter Filter
+}
+
+type AppStates int64
+
+var InfoTypes DataTypes
+
+// Keep all info type jobs, key == type id
+var TypeJobs map[string]TypeJobRecord = make(map[string]TypeJobRecord)
+
+// Type for an infojob
+type TypeJobRecord struct {
+ InfoType string
+ InputTopic string
+ Data_in_channel chan *KafkaPayload
+ Reader_control chan ReaderControl
+ Job_control chan JobControl
+ GroupId string
+ ClientId string
+}
+
+type DataTypes struct {
+ ProdDataTypes []DataType `json:"types"`
+}
diff --git a/pm-file-converter/common/utils/utils.go b/pm-file-converter/common/utils/utils.go
new file mode 100644
index 0000000..5466d2c
--- /dev/null
+++ b/pm-file-converter/common/utils/utils.go
@@ -0,0 +1,74 @@
+// -
+//
+// ========================LICENSE_START=================================
+// O-RAN-SC
+// %%
+// Copyright (C) 2023: Nordix Foundation
+// %%
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ========================LICENSE_END===================================
+package utils
+
+import (
+ "bytes"
+ log "github.com/sirupsen/logrus"
+ "main/components/kafkacollector"
+ "net/http"
+)
+
+var httpclient = &http.Client{}
+
+// Send a http request with json (json may be nil)
+func Send_http_request(json []byte, method string, url string, retry bool, useAuth bool) bool {
+
+ // set the HTTP method, url, and request body
+ var req *http.Request
+ var err error
+ if json == nil {
+ req, err = http.NewRequest(method, url, http.NoBody)
+ } else {
+ req, err = http.NewRequest(method, url, bytes.NewBuffer(json))
+ req.Header.Set("Content-Type", "application/json; charset=utf-8")
+ }
+ if err != nil {
+ log.Error("Cannot create http request, method: ", method, " url: ", url)
+ return false
+ }
+
+ if useAuth {
+ token, err := kafkacollector.Fetch_token()
+ if err != nil {
+ log.Error("Cannot fetch token for http request: ", err)
+ return false
+ }
+ req.Header.Set("Authorization", "Bearer "+token.TokenValue)
+ }
+
+ log.Debug("HTTP request: ", req)
+
+ log.Debug("Sending http request")
+ resp, err2 := httpclient.Do(req)
+ if err2 != nil {
+ log.Error("Http request error: ", err2)
+ log.Error("Cannot send http request method: ", method, " url: ", url)
+ } else {
+ if resp.StatusCode == 200 || resp.StatusCode == 201 || resp.StatusCode == 204 {
+ log.Debug("Accepted http status: ", resp.StatusCode)
+ resp.Body.Close()
+ return true
+ }
+ log.Debug("HTTP resp: ", resp)
+ resp.Body.Close()
+ }
+ return false
+}
diff --git a/pm-file-converter/components/kafkacollector/kafkacollector.go b/pm-file-converter/components/kafkacollector/kafkacollector.go
new file mode 100644
index 0000000..d70114c
--- /dev/null
+++ b/pm-file-converter/components/kafkacollector/kafkacollector.go
@@ -0,0 +1,456 @@
+// -
+//
+// ========================LICENSE_START=================================
+// O-RAN-SC
+// %%
+// Copyright (C) 2023: Nordix Foundation
+// %%
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ========================LICENSE_END===================================
+package kafkacollector
+
+import (
+ "context"
+ "fmt"
+ "github.com/confluentinc/confluent-kafka-go/kafka"
+ jsoniter "github.com/json-iterator/go"
+ log "github.com/sirupsen/logrus"
+ "golang.org/x/oauth2/clientcredentials"
+ "main/common/dataTypes"
+ "main/components/miniocollector"
+ "os"
+ "time"
+)
+
+var creds_grant_type = os.Getenv("CREDS_GRANT_TYPE")
+var bootstrapserver = os.Getenv("KAFKA_SERVER")
+var creds_client_secret = os.Getenv("CREDS_CLIENT_SECRET")
+var creds_client_id = os.Getenv("CREDS_CLIENT_ID")
+var creds_service_url = os.Getenv("AUTH_SERVICE_URL")
+
+// Limiter - valid for all jobs
+const parallelism_limiter = 100 //For all jobs
+var jobLimiterChan = make(chan struct{}, parallelism_limiter)
+
+func Start_topic_reader(topic string, type_id string, control_ch chan dataTypes.ReaderControl, data_ch chan *dataTypes.KafkaPayload, gid string, cid string) {
+
+ log.Info("Topic reader starting, topic: ", topic, " for type: ", type_id)
+
+ topic_ok := false
+ var c *kafka.Consumer = nil
+ running := true
+
+ for topic_ok == false {
+
+ select {
+ case reader_ctrl := <-control_ch:
+ if reader_ctrl.Command == "EXIT" {
+ log.Info("Topic reader on topic: ", topic, " for type: ", type_id, " - stopped")
+ data_ch <- nil //Signal to job handler
+ running = false
+ return
+ }
+ case <-time.After(1 * time.Second):
+ if !running {
+ return
+ }
+ if c == nil {
+ c = create_kafka_consumer(type_id, gid, cid)
+ if c == nil {
+ log.Info("Cannot start consumer on topic: ", topic, " for type: ", type_id, " - retrying")
+ } else {
+ log.Info("Consumer started on topic: ", topic, " for type: ", type_id)
+ }
+ }
+ if c != nil && topic_ok == false {
+ err := c.SubscribeTopics([]string{topic}, nil)
+ if err != nil {
+ log.Info("Topic reader cannot start subscribing on topic: ", topic, " for type: ", type_id, " - retrying -- error details: ", err)
+ } else {
+ log.Info("Topic reader subscribing on topic: ", topic, " for type: ", type_id)
+ topic_ok = true
+ }
+ }
+ }
+ }
+ log.Info("Topic reader ready on topic: ", topic, " for type: ", type_id)
+
+ var event_chan = make(chan int)
+ go func() {
+ for {
+ select {
+ case evt := <-c.Events():
+ switch evt.(type) {
+ case kafka.OAuthBearerTokenRefresh:
+ log.Debug("New consumer token needed: ", evt)
+ token, err := Fetch_token()
+ if err != nil {
+ log.Warning("Cannot cannot fetch token: ", err)
+ c.SetOAuthBearerTokenFailure(err.Error())
+ } else {
+ setTokenError := c.SetOAuthBearerToken(*token)
+ if setTokenError != nil {
+ log.Warning("Cannot cannot set token: ", setTokenError)
+ c.SetOAuthBearerTokenFailure(setTokenError.Error())
+ }
+ }
+ default:
+ log.Debug("Dumping topic reader event on topic: ", topic, " for type: ", type_id, " evt: ", evt.String())
+ }
+
+ case msg := <-event_chan:
+ if msg == 0 {
+ return
+ }
+ case <-time.After(1 * time.Second):
+ if !running {
+ return
+ }
+ }
+ }
+ }()
+
+ go func() {
+ for {
+ for {
+ select {
+ case reader_ctrl := <-control_ch:
+ if reader_ctrl.Command == "EXIT" {
+ event_chan <- 0
+ log.Debug("Topic reader on topic: ", topic, " for type: ", type_id, " - stopped")
+ data_ch <- nil //Signal to job handler
+ defer c.Close()
+ return
+ }
+ default:
+
+ ev := c.Poll(1000)
+ if ev == nil {
+ log.Debug("Topic Reader for type: ", type_id, " Nothing to consume on topic: ", topic)
+ continue
+ }
+ switch e := ev.(type) {
+ case *kafka.Message:
+ var kmsg dataTypes.KafkaPayload
+ kmsg.Msg = e
+
+ c.Commit()
+
+ data_ch <- &kmsg
+ log.Debug("Reader msg: ", &kmsg)
+ log.Debug("Reader - data_ch ", data_ch)
+ case kafka.Error:
+ fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
+
+ case kafka.OAuthBearerTokenRefresh:
+ log.Debug("New consumer token needed: ", ev)
+ token, err := Fetch_token()
+ if err != nil {
+ log.Warning("Cannot cannot fetch token: ", err)
+ c.SetOAuthBearerTokenFailure(err.Error())
+ } else {
+ setTokenError := c.SetOAuthBearerToken(*token)
+ if setTokenError != nil {
+ log.Warning("Cannot cannot set token: ", setTokenError)
+ c.SetOAuthBearerTokenFailure(setTokenError.Error())
+ }
+ }
+ default:
+ fmt.Printf("Ignored %v\n", e)
+ }
+ }
+ }
+ }
+ }()
+}
+
+func Start_topic_writer(control_ch chan dataTypes.WriterControl, data_ch chan *dataTypes.KafkaPayload) {
+
+ var kafka_producer *kafka.Producer
+
+ running := true
+ log.Info("Topic writer starting")
+
+ // Wait for kafka producer to become available - and be prepared to exit the writer
+ for kafka_producer == nil {
+ select {
+ case writer_ctl := <-control_ch:
+ if writer_ctl.Command == "EXIT" {
+ //ignore cmd
+ }
+ default:
+ kafka_producer = start_producer()
+ if kafka_producer == nil {
+ log.Debug("Could not start kafka producer - retrying")
+ time.Sleep(1 * time.Second)
+ } else {
+ log.Debug("Kafka producer started")
+ }
+ }
+ }
+
+ var event_chan = make(chan int)
+ go func() {
+ for {
+ select {
+ case evt := <-kafka_producer.Events():
+ switch evt.(type) {
+ case *kafka.Message:
+ m := evt.(*kafka.Message)
+
+ if m.TopicPartition.Error != nil {
+ log.Debug("Dumping topic writer event, failed: ", m.TopicPartition.Error)
+ } else {
+ log.Debug("Dumping topic writer event, message to topic: ", *m.TopicPartition.Topic, " at offset: ", m.TopicPartition.Offset, " at partition: ", m.TopicPartition.Partition)
+ }
+ case kafka.Error:
+ log.Debug("Dumping topic writer event, error: ", evt)
+ case kafka.OAuthBearerTokenRefresh:
+ log.Debug("New producer token needed: ", evt)
+ token, err := Fetch_token()
+ if err != nil {
+ log.Warning("Cannot cannot fetch token: ", err)
+ kafka_producer.SetOAuthBearerTokenFailure(err.Error())
+ } else {
+ setTokenError := kafka_producer.SetOAuthBearerToken(*token)
+ if setTokenError != nil {
+ log.Warning("Cannot cannot set token: ", setTokenError)
+ kafka_producer.SetOAuthBearerTokenFailure(setTokenError.Error())
+ }
+ }
+ default:
+ log.Debug("Dumping topic writer event, unknown: ", evt)
+ }
+
+ case msg := <-event_chan:
+ if msg == 0 {
+ return
+ }
+ case <-time.After(1 * time.Second):
+ if !running {
+ return
+ }
+ }
+ }
+ }()
+ go func() {
+ for {
+ select {
+ case writer_ctl := <-control_ch:
+ if writer_ctl.Command == "EXIT" {
+ // ignore - wait for channel signal
+ }
+
+ case kmsg := <-data_ch:
+ if kmsg == nil {
+ event_chan <- 0
+ log.Info("Topic writer stopped by channel signal - start_topic_writer")
+ defer kafka_producer.Close()
+ return
+ }
+
+ retries := 10
+ msg_ok := false
+ var err error
+ for retry := 1; retry <= retries && msg_ok == false; retry++ {
+ err = kafka_producer.Produce(&kafka.Message{
+ TopicPartition: kafka.TopicPartition{Topic: &kmsg.Topic, Partition: kafka.PartitionAny},
+ Value: kmsg.Msg.Value, Key: kmsg.Msg.Key}, nil)
+
+ if err == nil {
+ msg_ok = true
+ log.Debug("Topic writer, msg sent ok on topic: ", kmsg.Topic)
+ } else {
+ log.Info("Topic writer failed to send message on topic: ", kmsg.Topic, " - Retrying. Error details: ", err)
+ time.Sleep(time.Duration(retry) * time.Second)
+ }
+ }
+ if !msg_ok {
+ log.Error("Topic writer failed to send message on topic: ", kmsg.Topic, " - Msg discarded. Error details: ", err)
+ }
+ case <-time.After(1000 * time.Millisecond):
+ if !running {
+ return
+ }
+ }
+ }
+ }()
+}
+
+func create_kafka_consumer(type_id string, gid string, cid string) *kafka.Consumer {
+ var cm kafka.ConfigMap
+ if creds_grant_type == "" {
+ log.Info("Creating kafka plain text consumer for type: ", type_id)
+ cm = kafka.ConfigMap{
+ "bootstrap.servers": bootstrapserver,
+ "group.id": gid,
+ "client.id": cid,
+ "auto.offset.reset": "latest",
+ "enable.auto.commit": false,
+ }
+ } else {
+ log.Info("Creating kafka SASL plain text consumer for type: ", type_id)
+ cm = kafka.ConfigMap{
+ "bootstrap.servers": bootstrapserver,
+ "group.id": gid,
+ "client.id": cid,
+ "auto.offset.reset": "latest",
+ "enable.auto.commit": false,
+ "sasl.mechanism": "OAUTHBEARER",
+ "security.protocol": "SASL_PLAINTEXT",
+ }
+ }
+ c, err := kafka.NewConsumer(&cm)
+
+ if err != nil {
+ log.Error("Cannot create kafka consumer for type: ", type_id, ", error details: ", err)
+ return nil
+ }
+
+ log.Info("Created kafka consumer for type: ", type_id, " OK")
+ return c
+}
+
+// Start kafka producer
+func start_producer() *kafka.Producer {
+ log.Info("Creating kafka producer")
+
+ var cm kafka.ConfigMap
+ if creds_grant_type == "" {
+ log.Info("Creating kafka SASL plain text producer")
+ cm = kafka.ConfigMap{
+ "bootstrap.servers": bootstrapserver,
+ }
+ } else {
+ log.Info("Creating kafka SASL plain text producer")
+ cm = kafka.ConfigMap{
+ "bootstrap.servers": bootstrapserver,
+ "sasl.mechanism": "OAUTHBEARER",
+ "security.protocol": "SASL_PLAINTEXT",
+ }
+ }
+
+ p, err := kafka.NewProducer(&cm)
+ if err != nil {
+ log.Error("Cannot create kafka producer,", err)
+ return nil
+ }
+ return p
+}
+
+func Start_job_xml_file_data(type_id string, control_ch chan dataTypes.JobControl, data_in_ch chan *dataTypes.KafkaPayload, data_out_channel chan *dataTypes.KafkaPayload, fvolume string, fsbucket string) {
+
+ log.Info("Type job", type_id, " started")
+ topic_list := make(map[string]string)
+ topic_list[type_id] = "json-file-ready-kp"
+ topic_list["PmData"] = "json-file-ready-kpadp"
+ running := true
+ for {
+ select {
+ case job_ctl := <-control_ch:
+ log.Debug("Type job ", type_id, " new cmd received ", job_ctl.Command)
+ switch job_ctl.Command {
+ case "EXIT":
+ //ignore cmd - handled by channel signal
+ }
+
+ case msg := <-data_in_ch:
+ if msg == nil {
+ log.Info("Type job ", type_id, " stopped by channel signal - start_job_xml_file_data")
+
+ running = false
+ return
+ }
+ jobLimiterChan <- struct{}{}
+ go run_xml_job(type_id, msg, "gz", data_out_channel, topic_list, jobLimiterChan, fvolume, fsbucket)
+
+ case <-time.After(1 * time.Second):
+ if !running {
+ return
+ }
+ }
+ }
+}
+
+func run_xml_job(type_id string, msg *dataTypes.KafkaPayload, outputCompression string, data_out_channel chan *dataTypes.KafkaPayload, topic_list map[string]string, jobLimiterChan chan struct{}, fvolume string, fsbucket string) {
+ defer func() {
+ <-jobLimiterChan
+ }()
+ start := time.Now()
+ var evt_data dataTypes.XmlFileEventHeader
+
+ err := jsoniter.Unmarshal(msg.Msg.Value, &evt_data)
+ if err != nil {
+ log.Error("Cannot parse XmlFileEventHeader for type job: ", type_id, " - discarding message, error details", err)
+ return
+ }
+ log.Debug("Unmarshal file-collect event for type job: ", type_id, " time: ", time.Since(start).String())
+
+ start = time.Now()
+ new_fn := miniocollector.Xml_to_json_conv(&evt_data)
+ if err != nil {
+ log.Error("Cannot convert file ", evt_data.Name, " - discarding message, ", err)
+ return
+ }
+ log.Debug("Converted file to json: ", new_fn, " time", time.Since(start).String())
+
+ var fde dataTypes.FileDownloadedEvt
+ fde.Filename = new_fn
+ j, err := jsoniter.Marshal(fde)
+
+ if err != nil {
+ log.Error("Cannot marshal FileDownloadedEvt - discarding message, ", err)
+ return
+ }
+ msg.Msg.Value = j
+
+ msg.Msg.Key = []byte("\"" + evt_data.SourceName + "\"")
+ log.Debug("Marshal file-collect event ", time.Since(start).String())
+ log.Debug("Sending file-collect event to output topic(s)", len(topic_list))
+ for _, v := range topic_list {
+ fmt.Println("Output Topic: " + v)
+ var kmsg *dataTypes.KafkaPayload = new(dataTypes.KafkaPayload)
+ kmsg.Msg = msg.Msg
+ kmsg.Topic = v
+ data_out_channel <- kmsg
+ }
+}
+
+func Fetch_token() (*kafka.OAuthBearerToken, error) {
+ log.Debug("Get token inline")
+ conf := &clientcredentials.Config{
+ ClientID: creds_client_id,
+ ClientSecret: creds_client_secret,
+ TokenURL: creds_service_url,
+ }
+ token, err := conf.Token(context.Background())
+ if err != nil {
+ log.Warning("Cannot fetch access token: ", err)
+ return nil, err
+ }
+ extensions := map[string]string{}
+ log.Debug("=====================================================")
+ log.Debug("token: ", token)
+ log.Debug("=====================================================")
+ log.Debug("TokenValue: ", token.AccessToken)
+ log.Debug("=====================================================")
+ log.Debug("Expiration: ", token.Expiry)
+ t := token.Expiry
+ oauthBearerToken := kafka.OAuthBearerToken{
+ TokenValue: token.AccessToken,
+ Expiration: t,
+ Extensions: extensions,
+ }
+
+ return &oauthBearerToken, nil
+}
diff --git a/pm-file-converter/components/miniocollector/miniocollector.go b/pm-file-converter/components/miniocollector/miniocollector.go
new file mode 100644
index 0000000..1663194
--- /dev/null
+++ b/pm-file-converter/components/miniocollector/miniocollector.go
@@ -0,0 +1,151 @@
+// -
+//
+// ========================LICENSE_START=================================
+// O-RAN-SC
+// %%
+// Copyright (C) 2023: Nordix Foundation
+// %%
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ========================LICENSE_END===================================
+package miniocollector
+
+import (
+ "bytes"
+ "compress/gzip"
+ "context"
+ "fmt"
+ jsoniter "github.com/json-iterator/go"
+ "github.com/minio/minio-go/v7"
+ "github.com/minio/minio-go/v7/pkg/credentials"
+ log "github.com/sirupsen/logrus"
+ "io"
+ "main/common/dataTypes"
+ "main/components/xmltransform"
+ "net/url"
+ "os"
+ "strings"
+ "time"
+)
+
+func Xml_to_json_conv(evt_data *dataTypes.XmlFileEventHeader) string {
+ filestoreUser := os.Getenv("FILESTORE_USER")
+ filestorePwd := os.Getenv("FILESTORE_PWD")
+ filestoreServer := os.Getenv("FILESTORE_SERVER")
+
+ s3Client, err := minio.New(filestoreServer, &minio.Options{
+ Creds: credentials.NewStaticV4(filestoreUser, filestorePwd, ""),
+ Secure: false,
+ })
+ if err != nil {
+ log.Fatalln(err)
+ }
+ expiry := time.Second * 24 * 60 * 60 // 1 day.
+ objectName := evt_data.Name
+ bucketName := evt_data.ObjectStoreBucket
+ compresion := evt_data.Compression
+ reqParams := make(url.Values)
+
+ xmlh, err := jsoniter.Marshal(evt_data)
+ if err != nil {
+ fmt.Printf("Error: %s", err)
+ return ""
+ }
+
+ // Generate presigned GET url with lambda function
+ presignedURL, err := s3Client.PresignedGetObject(context.Background(), bucketName, objectName, expiry, reqParams)
+ if err != nil {
+ log.Fatalln(err)
+ }
+ file_bytes := xmltransform.Convert(presignedURL.String(), compresion, string(xmlh))
+ newObjectName := objectName + "kafka-producer-pm-xml2json-0.json.gz"
+ var buf bytes.Buffer
+ err = gzipWrite(&buf, &file_bytes)
+ upload_object(s3Client, buf.Bytes(), newObjectName, "pm-files-json")
+ fmt.Println("")
+
+ return newObjectName
+}
+
+func upload_object(mc *minio.Client, b []byte, objectName string, fsbucket string) {
+ contentType := "application/json"
+ if strings.HasSuffix(objectName, ".gz") {
+ contentType = "application/gzip"
+ }
+
+ // Upload the xml file with PutObject
+ r := bytes.NewReader(b)
+ tctx := context.Background()
+ if check_minio_bucket(mc, fsbucket) == false {
+ err := create_minio_bucket(mc, fsbucket)
+ if err != nil {
+ log.Error("Cannot create bucket: ", fsbucket, ", ", err)
+ return
+ }
+ }
+ ok := false
+ for i := 1; i < 64 && ok == false; i = i * 2 {
+ info, err := mc.PutObject(tctx, fsbucket, objectName, r, int64(len(b)), minio.PutObjectOptions{ContentType: contentType})
+ if err != nil {
+
+ if i == 1 {
+ log.Warn("Cannot upload (first attempt): ", objectName, ", ", err)
+ } else {
+ log.Warn("Cannot upload (retry): ", objectName, ", ", err)
+ }
+ time.Sleep(time.Duration(i) * time.Second)
+ } else {
+ log.Debug("Successfully uploaded: ", objectName, " of size:", info.Size)
+ }
+ }
+}
+
+func create_minio_bucket(mc *minio.Client, bucket string) error {
+ tctx := context.Background()
+ err := mc.MakeBucket(tctx, bucket, minio.MakeBucketOptions{})
+ if err != nil {
+ // Check to see if we already own this bucket (which happens if you run this twice)
+ exists, errBucketExists := mc.BucketExists(tctx, bucket)
+ if errBucketExists == nil && exists {
+ log.Debug("Already own bucket:", bucket)
+ return nil
+ } else {
+ log.Error("Cannot create or check bucket ", bucket, " in minio client", err)
+ return err
+ }
+ }
+ log.Debug("Successfully created bucket: ", bucket)
+ return nil
+}
+
+func check_minio_bucket(mc *minio.Client, bucket string) bool {
+ tctx := context.Background()
+ exists, err := mc.BucketExists(tctx, bucket)
+ if err == nil && exists {
+ log.Debug("Already own bucket:", bucket)
+ return true
+ }
+ log.Error("Bucket does not exist, bucket ", bucket, " in minio client", err)
+ return false
+}
+
+// Write gzipped data to a Writer
+func gzipWrite(w io.Writer, data *[]byte) error {
+ gw, err1 := gzip.NewWriterLevel(w, gzip.BestSpeed)
+
+ if err1 != nil {
+ return err1
+ }
+ defer gw.Close()
+ _, err2 := gw.Write(*data)
+ return err2
+}
diff --git a/pm-file-converter/components/xmltransform/xmltransform.go b/pm-file-converter/components/xmltransform/xmltransform.go
new file mode 100644
index 0000000..8e88d19
--- /dev/null
+++ b/pm-file-converter/components/xmltransform/xmltransform.go
@@ -0,0 +1,142 @@
+// -
+//
+// ========================LICENSE_START=================================
+// O-RAN-SC
+// %%
+// Copyright (C) 2023: Nordix Foundation
+// %%
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ========================LICENSE_END===================================
+
+package xmltransform
+
+import (
+ "bytes"
+ "compress/gzip"
+ "encoding/xml"
+ "errors"
+ "fmt"
+ jsoniter "github.com/json-iterator/go"
+ log "github.com/sirupsen/logrus"
+ "io"
+ "main/common/dataTypes"
+ "net/http"
+ "strconv"
+ "time"
+)
+
+func xml_to_json_conv(f_byteValue *[]byte, xfeh *dataTypes.XmlFileEventHeader) ([]byte, error) {
+ var f dataTypes.MeasCollecFile
+ start := time.Now()
+ err := xml.Unmarshal(*f_byteValue, &f)
+ if err != nil {
+ return nil, errors.New("Cannot unmarshal xml-file")
+ }
+ log.Debug("Unmarshal xml file XmlFileEvent: ", time.Since(start).String())
+
+ start = time.Now()
+ var pmfile dataTypes.PMJsonFile
+
+ //TODO: Fill in more values
+ pmfile.Event.Perf3GppFields.Perf3GppFieldsVersion = "1.0"
+ pmfile.Event.Perf3GppFields.MeasDataCollection.GranularityPeriod = 900
+ pmfile.Event.Perf3GppFields.MeasDataCollection.MeasuredEntityUserName = ""
+ pmfile.Event.Perf3GppFields.MeasDataCollection.MeasuredEntityDn = f.FileHeader.FileSender.LocalDn
+ pmfile.Event.Perf3GppFields.MeasDataCollection.MeasuredEntitySoftwareVersion = f.MeasData.ManagedElement.SwVersion
+
+ for _, it := range f.MeasData.MeasInfo {
+ var mili dataTypes.MeasInfoList
+ mili.MeasInfoID.SMeasInfoID = it.MeasInfoId
+ for _, jt := range it.MeasType {
+ mili.MeasTypes.SMeasTypesList = append(mili.MeasTypes.SMeasTypesList, jt.Text)
+ }
+ for _, jt := range it.MeasValue {
+ var mv dataTypes.MeasValues
+ mv.MeasObjInstID = jt.MeasObjLdn
+ mv.SuspectFlag = jt.Suspect
+ if jt.Suspect == "" {
+ mv.SuspectFlag = "false"
+ }
+ for _, kt := range jt.R {
+ ni, _ := strconv.Atoi(kt.P)
+ nv := kt.Text
+ mr := dataTypes.MeasResults{ni, nv}
+ mv.MeasResultsList = append(mv.MeasResultsList, mr)
+ }
+ mili.MeasValuesList = append(mili.MeasValuesList, mv)
+ }
+
+ pmfile.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList = append(pmfile.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList, mili)
+ }
+
+ pmfile.Event.Perf3GppFields.MeasDataCollection.GranularityPeriod = 900
+
+ //TODO: Fill more values
+ pmfile.Event.CommonEventHeader.Domain = "" //xfeh.Domain
+ pmfile.Event.CommonEventHeader.EventID = "" //xfeh.EventID
+ pmfile.Event.CommonEventHeader.Sequence = 0 //xfeh.Sequence
+ pmfile.Event.CommonEventHeader.EventName = "" //xfeh.EventName
+ pmfile.Event.CommonEventHeader.SourceName = xfeh.SourceName
+ pmfile.Event.CommonEventHeader.ReportingEntityName = "" //xfeh.ReportingEntityName
+ pmfile.Event.CommonEventHeader.Priority = "" //xfeh.Priority
+ pmfile.Event.CommonEventHeader.StartEpochMicrosec = xfeh.StartEpochMicrosec
+ pmfile.Event.CommonEventHeader.LastEpochMicrosec = xfeh.LastEpochMicrosec
+ pmfile.Event.CommonEventHeader.Version = "" //xfeh.Version
+ pmfile.Event.CommonEventHeader.VesEventListenerVersion = "" //xfeh.VesEventListenerVersion
+ pmfile.Event.CommonEventHeader.TimeZoneOffset = xfeh.TimeZoneOffset
+
+ log.Debug("Convert xml to json : ", time.Since(start).String())
+
+ start = time.Now()
+ json, err := jsoniter.Marshal(pmfile)
+ log.Debug("Marshal json : ", time.Since(start).String())
+
+ if err != nil {
+ return nil, errors.New("Cannot marshal converted json")
+ }
+ return json, nil
+}
+
+func Convert(inputS3Url, compression, xmlFileEventHeader string) []byte {
+ evt_data := dataTypes.XmlFileEventHeader{}
+ jsoniter.Unmarshal([]byte(xmlFileEventHeader), &evt_data)
+
+ client := new(http.Client)
+
+ request, err := http.NewRequest("GET", inputS3Url, nil)
+ request.Header.Add("Accept-Encoding", "gzip")
+
+ response, err := client.Do(request)
+ defer response.Body.Close()
+
+ // Check that the server actually sent compressed data
+ var reader io.ReadCloser
+ switch compression {
+ case "gzip", "gz":
+ reader, err = gzip.NewReader(response.Body)
+ defer reader.Close()
+ default:
+ reader = response.Body
+ }
+
+ var buf3 bytes.Buffer
+ _, err2 := io.Copy(&buf3, reader)
+ if err2 != nil {
+ log.Error("Error reading response, discarding message, ", err)
+ return nil
+ }
+ file_bytes := buf3.Bytes()
+ fmt.Println("Converting to XML")
+ b, err := xml_to_json_conv(&file_bytes, &evt_data)
+ return b
+}
diff --git a/pm-file-converter/main.go b/pm-file-converter/main.go
index d37b0d2..b931a2a 100644
--- a/pm-file-converter/main.go
+++ b/pm-file-converter/main.go
@@ -1,370 +1,63 @@
-// ============LICENSE_START===============================================
-// Copyright (C) 2023 Nordix Foundation. All rights reserved.
-// ========================================================================
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
+// -
//
-// http://www.apache.org/licenses/LICENSE-2.0
+// ========================LICENSE_START=================================
+// O-RAN-SC
+// %%
+// Copyright (C) 2023: Nordix Foundation
+// %%
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-// ============LICENSE_END=================================================
+// http://www.apache.org/licenses/LICENSE-2.0
//
-
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ========================LICENSE_END===================================
package main
import (
- "bytes"
- "compress/gzip"
- "context"
- "crypto/tls"
- "encoding/json"
- "encoding/xml"
- "errors"
"fmt"
- "io"
- "net"
- "os/signal"
- "reflect"
- "strings"
- "sync"
- "syscall"
-
+ jsoniter "github.com/json-iterator/go"
+ log "github.com/sirupsen/logrus"
+ "main/common/dataTypes"
+ "main/common/utils"
+ "main/components/kafkacollector"
"net/http"
"os"
+ "os/signal"
"runtime"
- "strconv"
+ "sync"
+ "syscall"
"time"
-
- "github.com/google/uuid"
- "golang.org/x/oauth2/clientcredentials"
-
- log "github.com/sirupsen/logrus"
-
- "github.com/gorilla/mux"
-
- "net/http/pprof"
-
- "github.com/confluentinc/confluent-kafka-go/kafka"
- influxdb2 "github.com/influxdata/influxdb-client-go/v2"
- jsoniter "github.com/json-iterator/go"
- "github.com/minio/minio-go/v7"
- "github.com/minio/minio-go/v7/pkg/credentials"
)
-//== Constants ==//
-
-const http_port = 80
-const https_port = 443
-const config_file = "application_configuration.json"
-const server_crt = "server.crt"
-const server_key = "server.key"
-
-const producer_name = "kafka-producer"
-
-const registration_delay_short = 2
-const registration_delay_long = 120
-
-const mutexLocked = 1
-
-const (
- Init AppStates = iota
- Running
- Terminating
-)
-
-const reader_queue_length = 100 //Per type job
-const writer_queue_length = 100 //Per info job
-const parallelism_limiter = 100 //For all jobs
+var ics_server = os.Getenv("ICS")
+var self = os.Getenv("SELF")
// This are optional - set if using SASL protocol is used towards kafka
var creds_grant_type = os.Getenv("CREDS_GRANT_TYPE")
-var creds_client_secret = os.Getenv("CREDS_CLIENT_SECRET")
-var creds_client_id = os.Getenv("CREDS_CLIENT_ID")
-var creds_service_url = os.Getenv("AUTH_SERVICE_URL")
-//== Types ==//
+var bootstrapserver = os.Getenv("KAFKA_SERVER")
-type AppStates int64
+const config_file = "application_configuration.json"
+const producer_name = "kafka-producer"
-type FilterParameters struct {
- MeasuredEntityDns []string `json:"measuredEntityDns"`
- MeasTypes []string `json:"measTypes"`
- MeasObjClass []string `json:"measObjClass"`
- MeasObjInstIds []string `json:"measObjInstIds"`
-}
+var producer_instance_name string = producer_name
-type InfoJobDataType struct {
- InfoJobData struct {
- KafkaOutputTopic string `json:"kafkaOutputTopic"`
+const reader_queue_length = 100 //Per type job
+const writer_queue_length = 100 //Per info job
- DbUrl string `json:"db-url"`
- DbOrg string `json:"db-org"`
- DbBucket string `json:"db-bucket"`
- DbToken string `json:"db-token"`
+var files_volume = os.Getenv("FILES_VOLUME")
- FilterParams FilterParameters `json:"filter"`
- } `json:"info_job_data"`
- InfoJobIdentity string `json:"info_job_identity"`
- InfoTypeIdentity string `json:"info_type_identity"`
- LastUpdated string `json:"last_updated"`
- Owner string `json:"owner"`
- TargetURI string `json:"target_uri"`
-}
+var data_out_channel = make(chan *dataTypes.KafkaPayload, writer_queue_length)
+var writer_control = make(chan dataTypes.WriterControl, 1)
-// Type for an infojob
-type InfoJobRecord struct {
- job_info InfoJobDataType
- output_topic string
-
- statistics *InfoJobStats
-}
-
-// Type for an infojob
-type TypeJobRecord struct {
- InfoType string
- InputTopic string
- data_in_channel chan *KafkaPayload
- reader_control chan ReaderControl
- job_control chan JobControl
- groupId string
- clientId string
-
- statistics *TypeJobStats
-}
-
-// Type for controlling the topic reader
-type ReaderControl struct {
- command string
-}
-
-// Type for controlling the topic writer
-type WriterControl struct {
- command string
-}
-
-// Type for controlling the job
-type JobControl struct {
- command string
- filter Filter
-}
-
-type KafkaPayload struct {
- msg *kafka.Message
- topic string
- jobid string
-}
-
-type FilterMaps struct {
- sourceNameMap map[string]bool
- measObjClassMap map[string]bool
- measObjInstIdsMap map[string]bool
- measTypesMap map[string]bool
-}
-
-type InfluxJobParameters struct {
- DbUrl string
- DbOrg string
- DbBucket string
- DbToken string
-}
-
-type Filter struct {
- JobId string
- OutputTopic string
- filter FilterMaps
-
- influxParameters InfluxJobParameters
-}
-
-// Type for info job statistics
-type InfoJobStats struct {
- out_msg_cnt int
- out_data_vol int64
-}
-
-// Type for type job statistics
-type TypeJobStats struct {
- in_msg_cnt int
- in_data_vol int64
-}
-
-// == API Datatypes ==//
-// Type for supported data types
-type DataType struct {
- ID string `json:"id"`
- KafkaInputTopic string `json:"kafkaInputTopic"`
- InputJobType string `json:inputJobType`
- InputJobDefinition struct {
- KafkaOutputTopic string `json:kafkaOutputTopic`
- } `json:inputJobDefinition`
-
- ext_job *[]byte
- ext_job_created bool
- ext_job_id string
-}
-
-type DataTypes struct {
- ProdDataTypes []DataType `json:"types"`
-}
-
-type Minio_buckets struct {
- Buckets map[string]bool
-}
-
-//== External data types ==//
-
-// // Data type for event xml file download
-type XmlFileEventHeader struct {
- ProductName string `json:"productName"`
- VendorName string `json:"vendorName"`
- Location string `json:"location"`
- Compression string `json:"compression"`
- SourceName string `json:"sourceName"`
- FileFormatType string `json:"fileFormatType"`
- FileFormatVersion string `json:"fileFormatVersion"`
- StartEpochMicrosec int64 `json:"startEpochMicrosec"`
- LastEpochMicrosec int64 `json:"lastEpochMicrosec"`
- Name string `json:"name"`
- ChangeIdentifier string `json:"changeIdentifier"`
- InternalLocation string `json:"internalLocation"`
- TimeZoneOffset string `json:"timeZoneOffset"`
- //ObjectStoreBucket string `json:"objectStoreBucket"`
-}
-
-// Data types for input xml file
-type MeasCollecFile struct {
- XMLName xml.Name `xml:"measCollecFile"`
- Text string `xml:",chardata"`
- Xmlns string `xml:"xmlns,attr"`
- Xsi string `xml:"xsi,attr"`
- SchemaLocation string `xml:"schemaLocation,attr"`
- FileHeader struct {
- Text string `xml:",chardata"`
- FileFormatVersion string `xml:"fileFormatVersion,attr"`
- VendorName string `xml:"vendorName,attr"`
- DnPrefix string `xml:"dnPrefix,attr"`
- FileSender struct {
- Text string `xml:",chardata"`
- LocalDn string `xml:"localDn,attr"`
- ElementType string `xml:"elementType,attr"`
- } `xml:"fileSender"`
- MeasCollec struct {
- Text string `xml:",chardata"`
- BeginTime string `xml:"beginTime,attr"`
- } `xml:"measCollec"`
- } `xml:"fileHeader"`
- MeasData struct {
- Text string `xml:",chardata"`
- ManagedElement struct {
- Text string `xml:",chardata"`
- LocalDn string `xml:"localDn,attr"`
- SwVersion string `xml:"swVersion,attr"`
- } `xml:"managedElement"`
- MeasInfo []struct {
- Text string `xml:",chardata"`
- MeasInfoId string `xml:"measInfoId,attr"`
- Job struct {
- Text string `xml:",chardata"`
- JobId string `xml:"jobId,attr"`
- } `xml:"job"`
- GranPeriod struct {
- Text string `xml:",chardata"`
- Duration string `xml:"duration,attr"`
- EndTime string `xml:"endTime,attr"`
- } `xml:"granPeriod"`
- RepPeriod struct {
- Text string `xml:",chardata"`
- Duration string `xml:"duration,attr"`
- } `xml:"repPeriod"`
- MeasType []struct {
- Text string `xml:",chardata"`
- P string `xml:"p,attr"`
- } `xml:"measType"`
- MeasValue []struct {
- Text string `xml:",chardata"`
- MeasObjLdn string `xml:"measObjLdn,attr"`
- R []struct {
- Text string `xml:",chardata"`
- P string `xml:"p,attr"`
- } `xml:"r"`
- Suspect string `xml:"suspect"`
- } `xml:"measValue"`
- } `xml:"measInfo"`
- } `xml:"measData"`
- FileFooter struct {
- Text string `xml:",chardata"`
- MeasCollec struct {
- Text string `xml:",chardata"`
- EndTime string `xml:"endTime,attr"`
- } `xml:"measCollec"`
- } `xml:"fileFooter"`
-}
-
-// Data type for json file
-// Splitted in sevreal part to allow add/remove in lists
-type MeasResults struct {
- P int `json:"p"`
- SValue string `json:"sValue"`
-}
-
-type MeasValues struct {
- MeasObjInstID string `json:"measObjInstId"`
- SuspectFlag string `json:"suspectFlag"`
- MeasResultsList []MeasResults `json:"measResults"`
-}
-
-type SMeasTypes struct {
- SMeasType string `json:"sMeasTypesList"`
-}
-
-type MeasInfoList struct {
- MeasInfoID struct {
- SMeasInfoID string `json:"sMeasInfoId"`
- } `json:"measInfoId"`
- MeasTypes struct {
- SMeasTypesList []string `json:"sMeasTypesList"`
- } `json:"measTypes"`
- MeasValuesList []MeasValues `json:"measValuesList"`
-}
-
-type PMJsonFile struct {
- Event struct {
- CommonEventHeader struct {
- Domain string `json:"domain"`
- EventID string `json:"eventId"`
- Sequence int `json:"sequence"`
- EventName string `json:"eventName"`
- SourceName string `json:"sourceName"`
- ReportingEntityName string `json:"reportingEntityName"`
- Priority string `json:"priority"`
- StartEpochMicrosec int64 `json:"startEpochMicrosec"`
- LastEpochMicrosec int64 `json:"lastEpochMicrosec"`
- Version string `json:"version"`
- VesEventListenerVersion string `json:"vesEventListenerVersion"`
- TimeZoneOffset string `json:"timeZoneOffset"`
- } `json:"commonEventHeader"`
- Perf3GppFields struct {
- Perf3GppFieldsVersion string `json:"perf3gppFieldsVersion"`
- MeasDataCollection struct {
- GranularityPeriod int `json:"granularityPeriod"`
- MeasuredEntityUserName string `json:"measuredEntityUserName"`
- MeasuredEntityDn string `json:"measuredEntityDn"`
- MeasuredEntitySoftwareVersion string `json:"measuredEntitySoftwareVersion"`
- SMeasInfoList []MeasInfoList `json:"measInfoList"`
- } `json:"measDataCollection"`
- } `json:"perf3gppFields"`
- } `json:"event"`
-}
-
-// Data type for converted json file message
-type FileDownloadedEvt struct {
- Filename string `json:"filename"`
-}
+const registration_delay_short = 2
+const registration_delay_long = 120
//== Variables ==//
@@ -373,37 +66,11 @@
// Lock for all internal data
var datalock sync.Mutex
-var producer_instance_name string = producer_name
-
-// Keep all info type jobs, key == type id
-var TypeJobs map[string]TypeJobRecord = make(map[string]TypeJobRecord)
-
-// Keep all info jobs, key == job id
-var InfoJobs map[string]InfoJobRecord = make(map[string]InfoJobRecord)
-
-var InfoTypes DataTypes
-
-// Limiter - valid for all jobs
-var jobLimiterChan = make(chan struct{}, parallelism_limiter)
-
-// TODO: Config param?
-var bucket_location = "swe"
-
-var httpclient = &http.Client{}
-
-// == Env variables ==//
-var bootstrapserver = os.Getenv("KAFKA_SERVER")
-var files_volume = os.Getenv("FILES_VOLUME")
-var ics_server = os.Getenv("ICS")
-var self = os.Getenv("SELF")
-var filestore_user = os.Getenv("FILESTORE_USER")
-var filestore_pwd = os.Getenv("FILESTORE_PWD")
-var filestore_server = os.Getenv("FILESTORE_SERVER")
-
-var data_out_channel = make(chan *KafkaPayload, writer_queue_length)
-var writer_control = make(chan WriterControl, 1)
-
-var minio_bucketlist map[string]Minio_buckets = make(map[string]Minio_buckets)
+const (
+ Init dataTypes.AppStates = iota
+ Running
+ Terminating
+)
// == Main ==//
func main() {
@@ -426,55 +93,7 @@
producer_instance_name = producer_instance_name + "-" + os.Getenv("KP")
}
- rtr := mux.NewRouter()
- rtr.HandleFunc("/callbacks/job/"+producer_instance_name, create_job)
- rtr.HandleFunc("/callbacks/job/"+producer_instance_name+"/{job_id}", delete_job)
- rtr.HandleFunc("/callbacks/supervision/"+producer_instance_name, supervise_producer)
- rtr.HandleFunc("/statistics", statistics)
- rtr.HandleFunc("/logging/{level}", logging_level)
- rtr.HandleFunc("/logging", logging_level)
- rtr.HandleFunc("/", alive)
-
- //For perf/mem profiling
- rtr.HandleFunc("/custom_debug_path/profile", pprof.Profile)
-
- http.Handle("/", rtr)
-
- http_server := &http.Server{Addr: ":" + strconv.Itoa(http_port), Handler: nil}
-
- cer, err := tls.LoadX509KeyPair(server_crt, server_key)
- if err != nil {
- log.Error("Cannot load key and cert - ", err)
- return
- }
- config := &tls.Config{Certificates: []tls.Certificate{cer}}
- https_server := &http.Server{Addr: ":" + strconv.Itoa(https_port), TLSConfig: config, Handler: nil}
-
- // Run http
- go func() {
- log.Info("Starting http service...")
- err := http_server.ListenAndServe()
- if err == http.ErrServerClosed { // graceful shutdown
- log.Info("http server shutdown...")
- } else if err != nil {
- log.Error("http server error: ", err)
- }
- }()
-
- // Run https
- go func() {
- log.Info("Starting https service...")
- err := https_server.ListenAndServe()
- if err == http.ErrServerClosed { // graceful shutdown
- log.Info("https server shutdown...")
- } else if err != nil {
- log.Error("https server error: ", err)
- }
- }()
- check_tcp(strconv.Itoa(http_port))
- check_tcp(strconv.Itoa(https_port))
-
- go start_topic_writer(writer_control, data_out_channel)
+ go kafkacollector.Start_topic_writer(writer_control, data_out_channel)
//Setup proc for periodic type registration
var event_chan = make(chan int) //Channel for stopping the proc
@@ -490,17 +109,6 @@
datalock.Lock()
defer datalock.Unlock()
AppState = Terminating
- http_server.Shutdown(context.Background())
- https_server.Shutdown(context.Background())
- // Stopping jobs
- for key, _ := range TypeJobs {
- log.Info("Stopping type job:", key)
- for _, dp := range InfoTypes.ProdDataTypes {
- if key == dp.ID {
- remove_type_job(dp)
- }
- }
- }
}()
AppState = Running
@@ -512,28 +120,7 @@
fmt.Println("server stopped")
}
-func check_tcp(port string) {
- log.Info("Checking tcp port: ", port)
- for true {
- address := net.JoinHostPort("localhost", port)
- // 3 second timeout
- conn, err := net.DialTimeout("tcp", address, 3*time.Second)
- if err != nil {
- log.Info("Checking tcp port: ", port, " failed, retrying...")
- } else {
- if conn != nil {
- log.Info("Checking tcp port: ", port, " - OK")
- _ = conn.Close()
- return
- } else {
- log.Info("Checking tcp port: ", port, " failed, retrying...")
- }
- }
- }
-}
-
-//== Core functions ==//
-
+// == Core functions ==//
// Run periodic registration of producers
func periodic_registration(evtch chan int) {
var delay int = 1
@@ -568,7 +155,7 @@
log.Error("Registering producer: ", producer_instance_name, " - failed")
return false
}
- data := DataTypes{}
+ data := dataTypes.DataTypes{}
err = jsoniter.Unmarshal([]byte(file), &data)
if err != nil {
log.Error("Cannot parse config file: ", config_file)
@@ -588,13 +175,13 @@
t1["info_job_data_schema"] = t2
- json, err := json.Marshal(t1)
+ json, err := jsoniter.Marshal(t1)
if err != nil {
log.Error("Cannot create json for type: ", data.ProdDataTypes[i].ID)
log.Error("Registering producer: ", producer_instance_name, " - failed")
return false
} else {
- ok := send_http_request(json, http.MethodPut, "http://"+ics_server+"/data-producer/v1/info-types/"+data.ProdDataTypes[i].ID, true, creds_grant_type != "")
+ ok := utils.Send_http_request(json, http.MethodPut, "http://"+ics_server+"/data-producer/v1/info-types/"+data.ProdDataTypes[i].ID, true, creds_grant_type != "")
if !ok {
log.Error("Cannot register type: ", data.ProdDataTypes[i].ID)
log.Error("Registering producer: ", producer_instance_name, " - failed")
@@ -606,1857 +193,42 @@
}
log.Debug("Registering types: ", new_type_names)
- m := make(map[string]interface{})
- m["supported_info_types"] = new_type_names
- m["info_job_callback_url"] = "http://" + self + "/callbacks/job/" + producer_instance_name
- m["info_producer_supervision_callback_url"] = "http://" + self + "/callbacks/supervision/" + producer_instance_name
-
- json, err := json.Marshal(m)
- if err != nil {
- log.Error("Cannot create json for producer: ", producer_instance_name)
- log.Error("Registering producer: ", producer_instance_name, " - failed")
- return false
- }
- ok := send_http_request(json, http.MethodPut, "http://"+ics_server+"/data-producer/v1/info-producers/"+producer_instance_name, true, creds_grant_type != "")
- if !ok {
- log.Error("Cannot register producer: ", producer_instance_name)
- log.Error("Registering producer: ", producer_instance_name, " - failed")
- return false
- }
datalock.Lock()
defer datalock.Unlock()
- var current_type_names []string
- for _, v := range InfoTypes.ProdDataTypes {
- current_type_names = append(current_type_names, v.ID)
- if contains_str(new_type_names, v.ID) {
- //Type exist
- log.Debug("Type ", v.ID, " exists")
- create_ext_job(v)
- } else {
- //Type is removed
- log.Info("Removing type job for type: ", v.ID, " Type not in configuration")
- remove_type_job(v)
- }
- }
-
for _, v := range data.ProdDataTypes {
- if contains_str(current_type_names, v.ID) {
- //Type exist
- log.Debug("Type ", v.ID, " exists")
- create_ext_job(v)
- } else {
- //Type is new
- log.Info("Adding type job for type: ", v.ID, " Type added to configuration")
- start_type_job(v)
- }
+ log.Info("Adding type job for type: ", v.ID, " Type added to configuration")
+ start_type_job(v)
}
- InfoTypes = data
- log.Debug("Datatypes: ", InfoTypes)
-
+ dataTypes.InfoTypes = data
+ log.Debug("Datatypes: ", dataTypes.InfoTypes)
log.Info("Registering producer: ", producer_instance_name, " - OK")
return true
}
-func remove_type_job(dp DataType) {
- log.Info("Removing type job: ", dp.ID)
- j, ok := TypeJobs[dp.ID]
- if ok {
- j.reader_control <- ReaderControl{"EXIT"}
- }
-
- if dp.ext_job_created == true {
- dp.ext_job_id = dp.InputJobType + "_" + generate_uuid_from_type(dp.InputJobType)
- ok := send_http_request(*dp.ext_job, http.MethodDelete, "http://"+ics_server+"/data-consumer/v1/info-jobs/"+dp.ext_job_id, true, creds_grant_type != "")
- if !ok {
- log.Error("Cannot delete job: ", dp.ext_job_id)
- }
- dp.ext_job_created = false
- dp.ext_job = nil
- }
-
-}
-
-func start_type_job(dp DataType) {
+func start_type_job(dp dataTypes.DataType) {
log.Info("Starting type job: ", dp.ID)
- job_record := TypeJobRecord{}
+ job_record := dataTypes.TypeJobRecord{}
- job_record.job_control = make(chan JobControl, 1)
- job_record.reader_control = make(chan ReaderControl, 1)
- job_record.data_in_channel = make(chan *KafkaPayload, reader_queue_length)
+ job_record.Job_control = make(chan dataTypes.JobControl, 1)
+ job_record.Reader_control = make(chan dataTypes.ReaderControl, 1)
+ job_record.Data_in_channel = make(chan *dataTypes.KafkaPayload, reader_queue_length)
job_record.InfoType = dp.ID
job_record.InputTopic = dp.KafkaInputTopic
- job_record.groupId = "kafka-procon-" + dp.ID
- job_record.clientId = dp.ID + "-" + os.Getenv("KP")
- var stats TypeJobStats
- job_record.statistics = &stats
+ job_record.GroupId = "kafka-procon-" + dp.ID
+ job_record.ClientId = dp.ID + "-" + os.Getenv("KP")
switch dp.ID {
case "xml-file-data-to-filestore":
- go start_job_xml_file_data(dp.ID, job_record.job_control, job_record.data_in_channel, data_out_channel, "", "pm-files-json")
+ go kafkacollector.Start_job_xml_file_data(dp.ID, job_record.Job_control, job_record.Data_in_channel, data_out_channel, "", "pm-files-json")
case "xml-file-data":
- go start_job_xml_file_data(dp.ID, job_record.job_control, job_record.data_in_channel, data_out_channel, files_volume, "")
- case "json-file-data-from-filestore":
- go start_job_json_file_data(dp.ID, job_record.job_control, job_record.data_in_channel, data_out_channel, true)
- case "json-file-data":
- go start_job_json_file_data(dp.ID, job_record.job_control, job_record.data_in_channel, data_out_channel, false)
- case "json-file-data-from-filestore-to-influx":
- go start_job_json_file_data_influx(dp.ID, job_record.job_control, job_record.data_in_channel, true)
-
+ go kafkacollector.Start_job_xml_file_data(dp.ID, job_record.Job_control, job_record.Data_in_channel, data_out_channel, files_volume, "")
default:
}
- go start_topic_reader(dp.KafkaInputTopic, dp.ID, job_record.reader_control, job_record.data_in_channel, job_record.groupId, job_record.clientId, &stats)
+ go kafkacollector.Start_topic_reader(dp.KafkaInputTopic, dp.ID, job_record.Reader_control, job_record.Data_in_channel, job_record.GroupId, job_record.ClientId)
- TypeJobs[dp.ID] = job_record
+ dataTypes.TypeJobs[dp.ID] = job_record
log.Debug("Type job input type: ", dp.InputJobType)
- create_ext_job(dp)
-}
-
-func create_ext_job(dp DataType) {
- if dp.InputJobType != "" {
- jb := make(map[string]interface{})
- jb["info_type_id"] = dp.InputJobType
- jb["job_owner"] = "console" //TODO:
- jb["status_notification_uri"] = "http://callback:80/post"
- jb1 := make(map[string]interface{})
- jb["job_definition"] = jb1
- jb1["kafkaOutputTopic"] = dp.InputJobDefinition.KafkaOutputTopic
-
- json, err := json.Marshal(jb)
- dp.ext_job_created = false
- dp.ext_job = nil
- if err != nil {
- log.Error("Cannot create json for type: ", dp.InputJobType)
- return
- }
-
- dp.ext_job_id = dp.InputJobType + "_" + generate_uuid_from_type(dp.InputJobType)
- ok := false
- for !ok {
- ok = send_http_request(json, http.MethodPut, "http://"+ics_server+"/data-consumer/v1/info-jobs/"+dp.ext_job_id, true, creds_grant_type != "")
- if !ok {
- log.Error("Cannot register job: ", dp.InputJobType)
- }
- }
- log.Debug("Registered job ok: ", dp.InputJobType)
- dp.ext_job_created = true
- dp.ext_job = &json
- }
-}
-
-func remove_info_job(jobid string) {
- log.Info("Removing info job: ", jobid)
- filter := Filter{}
- filter.JobId = jobid
- jc := JobControl{}
- jc.command = "REMOVE-FILTER"
- jc.filter = filter
- infoJob := InfoJobs[jobid]
- typeJob := TypeJobs[infoJob.job_info.InfoTypeIdentity]
- typeJob.job_control <- jc
- delete(InfoJobs, jobid)
-
-}
-
-// == Helper functions ==//
-
-// Function to check the status of a mutex lock
-func MutexLocked(m *sync.Mutex) bool {
- state := reflect.ValueOf(m).Elem().FieldByName("state")
- return state.Int()&mutexLocked == mutexLocked
-}
-
-// Test if slice contains a string
-func contains_str(s []string, e string) bool {
- for _, a := range s {
- if a == e {
- return true
- }
- }
- return false
-}
-
-// Send a http request with json (json may be nil)
-func send_http_request(json []byte, method string, url string, retry bool, useAuth bool) bool {
-
- // set the HTTP method, url, and request body
- var req *http.Request
- var err error
- if json == nil {
- req, err = http.NewRequest(method, url, http.NoBody)
- } else {
- req, err = http.NewRequest(method, url, bytes.NewBuffer(json))
- req.Header.Set("Content-Type", "application/json; charset=utf-8")
- }
- if err != nil {
- log.Error("Cannot create http request, method: ", method, " url: ", url)
- return false
- }
-
- if useAuth {
- token, err := fetch_token()
- if err != nil {
- log.Error("Cannot fetch token for http request: ", err)
- return false
- }
- req.Header.Set("Authorization", "Bearer "+token.TokenValue)
- }
-
- log.Debug("HTTP request: ", req)
-
- log.Debug("Sending http request")
- resp, err2 := httpclient.Do(req)
- if err2 != nil {
- log.Error("Http request error: ", err2)
- log.Error("Cannot send http request method: ", method, " url: ", url)
- } else {
- if resp.StatusCode == 200 || resp.StatusCode == 201 || resp.StatusCode == 204 {
- log.Debug("Accepted http status: ", resp.StatusCode)
- resp.Body.Close()
- return true
- }
- log.Debug("HTTP resp: ", resp)
- resp.Body.Close()
- }
- return false
-}
-
-func fetch_token() (*kafka.OAuthBearerToken, error) {
- log.Debug("Get token inline")
- conf := &clientcredentials.Config{
- ClientID: creds_client_id,
- ClientSecret: creds_client_secret,
- TokenURL: creds_service_url,
- }
- token, err := conf.Token(context.Background())
- if err != nil {
- log.Warning("Cannot fetch access token: ", err)
- return nil, err
- }
- extensions := map[string]string{}
- log.Debug("=====================================================")
- log.Debug("token: ", token)
- log.Debug("=====================================================")
- log.Debug("TokenValue: ", token.AccessToken)
- log.Debug("=====================================================")
- log.Debug("Expiration: ", token.Expiry)
- t := token.Expiry
- oauthBearerToken := kafka.OAuthBearerToken{
- TokenValue: token.AccessToken,
- Expiration: t,
- Extensions: extensions,
- }
-
- return &oauthBearerToken, nil
-}
-
-// Function to print memory details
-// https://pkg.go.dev/runtime#MemStats
-func PrintMemUsage() {
- if log.IsLevelEnabled(log.DebugLevel) || log.IsLevelEnabled(log.TraceLevel) {
- var m runtime.MemStats
- runtime.ReadMemStats(&m)
- fmt.Printf("Alloc = %v MiB", bToMb(m.Alloc))
- fmt.Printf("\tTotalAlloc = %v MiB", bToMb(m.TotalAlloc))
- fmt.Printf("\tSys = %v MiB", bToMb(m.Sys))
- fmt.Printf("\tNumGC = %v\n", m.NumGC)
- fmt.Printf("HeapSys = %v MiB", bToMb(m.HeapSys))
- fmt.Printf("\tStackSys = %v MiB", bToMb(m.StackSys))
- fmt.Printf("\tMSpanSys = %v MiB", bToMb(m.MSpanSys))
- fmt.Printf("\tMCacheSys = %v MiB", bToMb(m.MCacheSys))
- fmt.Printf("\tBuckHashSys = %v MiB", bToMb(m.BuckHashSys))
- fmt.Printf("\tGCSys = %v MiB", bToMb(m.GCSys))
- fmt.Printf("\tOtherSys = %v MiB\n", bToMb(m.OtherSys))
- }
-}
-
-func bToMb(b uint64) uint64 {
- return b / 1024 / 1024
-}
-
-func generate_uuid_from_type(s string) string {
- if len(s) > 16 {
- s = s[:16]
- }
- for len(s) < 16 {
- s = s + "0"
- }
- b := []byte(s)
- b = b[:16]
- uuid, _ := uuid.FromBytes(b)
- return uuid.String()
-}
-
-// Write gzipped data to a Writer
-func gzipWrite(w io.Writer, data *[]byte) error {
- gw, err1 := gzip.NewWriterLevel(w, gzip.BestSpeed)
-
- if err1 != nil {
- return err1
- }
- defer gw.Close()
- _, err2 := gw.Write(*data)
- return err2
-}
-
-// Write gunzipped data from Reader to a Writer
-func gunzipReaderToWriter(w io.Writer, data io.Reader) error {
- gr, err1 := gzip.NewReader(data)
-
- if err1 != nil {
- return err1
- }
- defer gr.Close()
- data2, err2 := io.ReadAll(gr)
- if err2 != nil {
- return err2
- }
- _, err3 := w.Write(data2)
- if err3 != nil {
- return err3
- }
- return nil
-}
-
-func create_minio_bucket(mc *minio.Client, client_id string, bucket string) error {
- tctx := context.Background()
- err := mc.MakeBucket(tctx, bucket, minio.MakeBucketOptions{Region: bucket_location})
- if err != nil {
- // Check to see if we already own this bucket (which happens if you run this twice)
- exists, errBucketExists := mc.BucketExists(tctx, bucket)
- if errBucketExists == nil && exists {
- log.Debug("Already own bucket:", bucket)
- add_bucket(client_id, bucket)
- return nil
- } else {
- log.Error("Cannot create or check bucket ", bucket, " in minio client", err)
- return err
- }
- }
- log.Debug("Successfully created bucket: ", bucket)
- add_bucket(client_id, bucket)
- return nil
-}
-
-func check_minio_bucket(mc *minio.Client, client_id string, bucket string) bool {
- ok := bucket_exist(client_id, bucket)
- if ok {
- return true
- }
- tctx := context.Background()
- exists, err := mc.BucketExists(tctx, bucket)
- if err == nil && exists {
- log.Debug("Already own bucket:", bucket)
- return true
- }
- log.Error("Bucket does not exist, bucket ", bucket, " in minio client", err)
- return false
-}
-
-func add_bucket(minio_id string, bucket string) {
- datalock.Lock()
- defer datalock.Unlock()
-
- b, ok := minio_bucketlist[minio_id]
- if !ok {
- b = Minio_buckets{}
- b.Buckets = make(map[string]bool)
- }
- b.Buckets[bucket] = true
- minio_bucketlist[minio_id] = b
-}
-
-func bucket_exist(minio_id string, bucket string) bool {
- datalock.Lock()
- defer datalock.Unlock()
-
- b, ok := minio_bucketlist[minio_id]
- if !ok {
- return false
- }
- _, ok = b.Buckets[bucket]
- return ok
-}
-
-//== http api functions ==//
-
-// create/update job
-func create_job(w http.ResponseWriter, req *http.Request) {
- log.Debug("Create job, http method: ", req.Method)
- if req.Method != http.MethodPost {
- log.Error("Create job, http method not allowed")
- w.WriteHeader(http.StatusMethodNotAllowed)
- return
- }
- ct := req.Header.Get("Content-Type")
- if ct != "application/json" {
- log.Error("Create job, bad content type")
- http.Error(w, "Bad content type", http.StatusBadRequest)
- return
- }
-
- var t InfoJobDataType
- err := json.NewDecoder(req.Body).Decode(&t)
- if err != nil {
- log.Error("Create job, cannot parse json,", err)
- http.Error(w, "Cannot parse json", http.StatusBadRequest)
- return
- }
- log.Debug("Creating job, id: ", t.InfoJobIdentity)
- datalock.Lock()
- defer datalock.Unlock()
-
- job_id := t.InfoJobIdentity
- job_record, job_found := InfoJobs[job_id]
- type_job_record, found_type := TypeJobs[t.InfoTypeIdentity]
- if !job_found {
- if !found_type {
- log.Error("Type ", t.InfoTypeIdentity, " does not exist")
- http.Error(w, "Type "+t.InfoTypeIdentity+" does not exist", http.StatusBadRequest)
- return
- }
- } else if t.InfoTypeIdentity != job_record.job_info.InfoTypeIdentity {
- log.Error("Job cannot change type")
- http.Error(w, "Job cannot change type", http.StatusBadRequest)
- return
- } else if t.InfoJobData.KafkaOutputTopic != job_record.job_info.InfoJobData.KafkaOutputTopic {
- log.Error("Job cannot change topic")
- http.Error(w, "Job cannot change topic", http.StatusBadRequest)
- return
- } else if !found_type {
- //Should never happen, if the type is removed then job is stopped
- log.Error("Type ", t.InfoTypeIdentity, " does not exist")
- http.Error(w, "Type "+t.InfoTypeIdentity+" does not exist", http.StatusBadRequest)
- return
- }
-
- if !job_found {
- job_record = InfoJobRecord{}
- job_record.job_info = t
- output_topic := t.InfoJobData.KafkaOutputTopic
- job_record.output_topic = t.InfoJobData.KafkaOutputTopic
- log.Debug("Starting infojob ", job_id, ", type ", t.InfoTypeIdentity, ", output topic", output_topic)
-
- var stats InfoJobStats
- job_record.statistics = &stats
-
- filter := Filter{}
- filter.JobId = job_id
- filter.OutputTopic = job_record.output_topic
-
- jc := JobControl{}
-
- jc.command = "ADD-FILTER"
-
- if t.InfoTypeIdentity == "json-file-data-from-filestore" || t.InfoTypeIdentity == "json-file-data" || t.InfoTypeIdentity == "json-file-data-from-filestore-to-influx" {
- fm := FilterMaps{}
- fm.sourceNameMap = make(map[string]bool)
- fm.measObjClassMap = make(map[string]bool)
- fm.measObjInstIdsMap = make(map[string]bool)
- fm.measTypesMap = make(map[string]bool)
- if t.InfoJobData.FilterParams.MeasuredEntityDns != nil {
- for _, v := range t.InfoJobData.FilterParams.MeasuredEntityDns {
- fm.sourceNameMap[v] = true
- }
- }
- if t.InfoJobData.FilterParams.MeasObjClass != nil {
- for _, v := range t.InfoJobData.FilterParams.MeasObjClass {
- fm.measObjClassMap[v] = true
- }
- }
- if t.InfoJobData.FilterParams.MeasObjInstIds != nil {
- for _, v := range t.InfoJobData.FilterParams.MeasObjInstIds {
- fm.measObjInstIdsMap[v] = true
- }
- }
- if t.InfoJobData.FilterParams.MeasTypes != nil {
- for _, v := range t.InfoJobData.FilterParams.MeasTypes {
- fm.measTypesMap[v] = true
- }
- }
- filter.filter = fm
- }
- if t.InfoTypeIdentity == "json-file-data-from-filestore-to-influx" {
- influxparam := InfluxJobParameters{}
- influxparam.DbUrl = t.InfoJobData.DbUrl
- influxparam.DbOrg = t.InfoJobData.DbOrg
- influxparam.DbBucket = t.InfoJobData.DbBucket
- influxparam.DbToken = t.InfoJobData.DbToken
- filter.influxParameters = influxparam
- }
-
- jc.filter = filter
- InfoJobs[job_id] = job_record
-
- type_job_record.job_control <- jc
-
- } else {
- //TODO
- //Update job
- }
-}
-
-// delete job
-func delete_job(w http.ResponseWriter, req *http.Request) {
- if req.Method != http.MethodDelete {
- w.WriteHeader(http.StatusMethodNotAllowed)
- return
- }
- datalock.Lock()
- defer datalock.Unlock()
-
- vars := mux.Vars(req)
-
- if id, ok := vars["job_id"]; ok {
- if _, ok := InfoJobs[id]; ok {
- remove_info_job(id)
- w.WriteHeader(http.StatusNoContent)
- log.Info("Job ", id, " deleted")
- return
- }
- }
- w.WriteHeader(http.StatusNotFound)
-}
-
-// job supervision
-func supervise_job(w http.ResponseWriter, req *http.Request) {
- if req.Method != http.MethodGet {
- w.WriteHeader(http.StatusMethodNotAllowed)
- return
- }
- datalock.Lock()
- defer datalock.Unlock()
-
- vars := mux.Vars(req)
-
- log.Debug("Supervising, job: ", vars["job_id"])
- if id, ok := vars["job_id"]; ok {
- if _, ok := InfoJobs[id]; ok {
- log.Debug("Supervision ok, job", id)
- return
- }
- }
- w.WriteHeader(http.StatusNotFound)
-}
-
-// producer supervision
-func supervise_producer(w http.ResponseWriter, req *http.Request) {
- if req.Method != http.MethodGet {
- w.WriteHeader(http.StatusMethodNotAllowed)
- return
- }
-
- w.WriteHeader(http.StatusOK)
-}
-
-// producer statistics, all jobs
-func statistics(w http.ResponseWriter, req *http.Request) {
- if req.Method != http.MethodGet {
- w.WriteHeader(http.StatusMethodNotAllowed)
- return
- }
- m := make(map[string]interface{})
- log.Debug("producer statictics, locked? ", MutexLocked(&datalock))
- datalock.Lock()
- defer datalock.Unlock()
- req.Header.Set("Content-Type", "application/json; charset=utf-8")
- m["number-of-jobs"] = len(InfoJobs)
- m["number-of-types"] = len(InfoTypes.ProdDataTypes)
- qm := make(map[string]interface{})
- m["jobs"] = qm
- for key, elem := range InfoJobs {
- jm := make(map[string]interface{})
- qm[key] = jm
- jm["type"] = elem.job_info.InfoTypeIdentity
- typeJob := TypeJobs[elem.job_info.InfoTypeIdentity]
- jm["groupId"] = typeJob.groupId
- jm["clientID"] = typeJob.clientId
- jm["input topic"] = typeJob.InputTopic
- jm["input queue length - job ("+fmt.Sprintf("%v", cap(typeJob.data_in_channel))+")"] = len(typeJob.data_in_channel)
- jm["output topic"] = elem.output_topic
- jm["output queue length - producer ("+fmt.Sprintf("%v", cap(data_out_channel))+")"] = len(data_out_channel)
- jm["msg_in (type)"] = typeJob.statistics.in_msg_cnt
- jm["msg_out (job)"] = elem.statistics.out_msg_cnt
-
- }
- json, err := json.Marshal(m)
- if err != nil {
- w.WriteHeader(http.StatusInternalServerError)
- log.Error("Cannot marshal statistics json")
- return
- }
- _, err = w.Write(json)
- if err != nil {
- w.WriteHeader(http.StatusInternalServerError)
- log.Error("Cannot send statistics json")
- return
- }
-}
-
-// Simple alive check
-func alive(w http.ResponseWriter, req *http.Request) {
- //Alive check
-}
-
-// Get/Set logging level
-func logging_level(w http.ResponseWriter, req *http.Request) {
- vars := mux.Vars(req)
- if level, ok := vars["level"]; ok {
- if req.Method == http.MethodPut {
- switch level {
- case "trace":
- log.SetLevel(log.TraceLevel)
- case "debug":
- log.SetLevel(log.DebugLevel)
- case "info":
- log.SetLevel(log.InfoLevel)
- case "warn":
- log.SetLevel(log.WarnLevel)
- case "error":
- log.SetLevel(log.ErrorLevel)
- case "fatal":
- log.SetLevel(log.FatalLevel)
- case "panic":
- log.SetLevel(log.PanicLevel)
- default:
- w.WriteHeader(http.StatusNotFound)
- }
- } else {
- w.WriteHeader(http.StatusMethodNotAllowed)
- }
- } else {
- if req.Method == http.MethodGet {
- msg := "none"
- if log.IsLevelEnabled(log.PanicLevel) {
- msg = "panic"
- } else if log.IsLevelEnabled(log.FatalLevel) {
- msg = "fatal"
- } else if log.IsLevelEnabled(log.ErrorLevel) {
- msg = "error"
- } else if log.IsLevelEnabled(log.WarnLevel) {
- msg = "warn"
- } else if log.IsLevelEnabled(log.InfoLevel) {
- msg = "info"
- } else if log.IsLevelEnabled(log.DebugLevel) {
- msg = "debug"
- } else if log.IsLevelEnabled(log.TraceLevel) {
- msg = "trace"
- }
- w.Header().Set("Content-Type", "application/text")
- w.Write([]byte(msg))
- } else {
- w.WriteHeader(http.StatusMethodNotAllowed)
- }
- }
-}
-
-func start_topic_reader(topic string, type_id string, control_ch chan ReaderControl, data_ch chan *KafkaPayload, gid string, cid string, stats *TypeJobStats) {
-
- log.Info("Topic reader starting, topic: ", topic, " for type: ", type_id)
-
- topic_ok := false
- var c *kafka.Consumer = nil
- running := true
-
- for topic_ok == false {
-
- select {
- case reader_ctrl := <-control_ch:
- if reader_ctrl.command == "EXIT" {
- log.Info("Topic reader on topic: ", topic, " for type: ", type_id, " - stopped")
- data_ch <- nil //Signal to job handler
- running = false
- return
- }
- case <-time.After(1 * time.Second):
- if !running {
- return
- }
- if c == nil {
- c = create_kafka_consumer(type_id, gid, cid)
- if c == nil {
- log.Info("Cannot start consumer on topic: ", topic, " for type: ", type_id, " - retrying")
- } else {
- log.Info("Consumer started on topic: ", topic, " for type: ", type_id)
- }
- }
- if c != nil && topic_ok == false {
- err := c.SubscribeTopics([]string{topic}, nil)
- if err != nil {
- log.Info("Topic reader cannot start subscribing on topic: ", topic, " for type: ", type_id, " - retrying -- error details: ", err)
- } else {
- log.Info("Topic reader subscribing on topic: ", topic, " for type: ", type_id)
- topic_ok = true
- }
- }
- }
- }
- log.Info("Topic reader ready on topic: ", topic, " for type: ", type_id)
-
- var event_chan = make(chan int)
- go func() {
- for {
- select {
- case evt := <-c.Events():
- switch evt.(type) {
- case kafka.OAuthBearerTokenRefresh:
- log.Debug("New consumer token needed: ", evt)
- token, err := fetch_token()
- if err != nil {
- log.Warning("Cannot cannot fetch token: ", err)
- c.SetOAuthBearerTokenFailure(err.Error())
- } else {
- setTokenError := c.SetOAuthBearerToken(*token)
- if setTokenError != nil {
- log.Warning("Cannot cannot set token: ", setTokenError)
- c.SetOAuthBearerTokenFailure(setTokenError.Error())
- }
- }
- default:
- log.Debug("Dumping topic reader event on topic: ", topic, " for type: ", type_id, " evt: ", evt.String())
- }
-
- case msg := <-event_chan:
- if msg == 0 {
- return
- }
- case <-time.After(1 * time.Second):
- if !running {
- return
- }
- }
- }
- }()
-
- go func() {
- for {
- for {
- select {
- case reader_ctrl := <-control_ch:
- if reader_ctrl.command == "EXIT" {
- event_chan <- 0
- log.Debug("Topic reader on topic: ", topic, " for type: ", type_id, " - stopped")
- data_ch <- nil //Signal to job handler
- defer c.Close()
- return
- }
- default:
-
- ev := c.Poll(1000)
- if ev == nil {
- log.Debug("Topic Reader for type: ", type_id, " Nothing to consume on topic: ", topic)
- continue
- }
- switch e := ev.(type) {
- case *kafka.Message:
- var kmsg KafkaPayload
- kmsg.msg = e
-
- c.Commit()
-
- data_ch <- &kmsg
- stats.in_msg_cnt++
- log.Debug("Reader msg: ", &kmsg)
- log.Debug("Reader - data_ch ", data_ch)
- case kafka.Error:
- fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
-
- case kafka.OAuthBearerTokenRefresh:
- log.Debug("New consumer token needed: ", ev)
- token, err := fetch_token()
- if err != nil {
- log.Warning("Cannot cannot fetch token: ", err)
- c.SetOAuthBearerTokenFailure(err.Error())
- } else {
- setTokenError := c.SetOAuthBearerToken(*token)
- if setTokenError != nil {
- log.Warning("Cannot cannot set token: ", setTokenError)
- c.SetOAuthBearerTokenFailure(setTokenError.Error())
- }
- }
- default:
- fmt.Printf("Ignored %v\n", e)
- }
- }
- }
- }
- }()
-}
-
-func start_topic_writer(control_ch chan WriterControl, data_ch chan *KafkaPayload) {
-
- var kafka_producer *kafka.Producer
-
- running := true
- log.Info("Topic writer starting")
-
- // Wait for kafka producer to become available - and be prepared to exit the writer
- for kafka_producer == nil {
- select {
- case writer_ctl := <-control_ch:
- if writer_ctl.command == "EXIT" {
- //ignore cmd
- }
- default:
- kafka_producer = start_producer()
- if kafka_producer == nil {
- log.Debug("Could not start kafka producer - retrying")
- time.Sleep(1 * time.Second)
- } else {
- log.Debug("Kafka producer started")
- }
- }
- }
-
- var event_chan = make(chan int)
- go func() {
- for {
- select {
- case evt := <-kafka_producer.Events():
- switch evt.(type) {
- case *kafka.Message:
- m := evt.(*kafka.Message)
-
- if m.TopicPartition.Error != nil {
- log.Debug("Dumping topic writer event, failed: ", m.TopicPartition.Error)
- } else {
- log.Debug("Dumping topic writer event, message to topic: ", *m.TopicPartition.Topic, " at offset: ", m.TopicPartition.Offset, " at partition: ", m.TopicPartition.Partition)
- }
- case kafka.Error:
- log.Debug("Dumping topic writer event, error: ", evt)
- case kafka.OAuthBearerTokenRefresh:
- log.Debug("New producer token needed: ", evt)
- token, err := fetch_token()
- if err != nil {
- log.Warning("Cannot cannot fetch token: ", err)
- kafka_producer.SetOAuthBearerTokenFailure(err.Error())
- } else {
- setTokenError := kafka_producer.SetOAuthBearerToken(*token)
- if setTokenError != nil {
- log.Warning("Cannot cannot set token: ", setTokenError)
- kafka_producer.SetOAuthBearerTokenFailure(setTokenError.Error())
- }
- }
- default:
- log.Debug("Dumping topic writer event, unknown: ", evt)
- }
-
- case msg := <-event_chan:
- if msg == 0 {
- return
- }
- case <-time.After(1 * time.Second):
- if !running {
- return
- }
- }
- }
- }()
- go func() {
- for {
- select {
- case writer_ctl := <-control_ch:
- if writer_ctl.command == "EXIT" {
- // ignore - wait for channel signal
- }
-
- case kmsg := <-data_ch:
- if kmsg == nil {
- event_chan <- 0
- log.Info("Topic writer stopped by channel signal - start_topic_writer")
- defer kafka_producer.Close()
- return
- }
-
- retries := 10
- msg_ok := false
- var err error
- for retry := 1; retry <= retries && msg_ok == false; retry++ {
- err = kafka_producer.Produce(&kafka.Message{
- TopicPartition: kafka.TopicPartition{Topic: &kmsg.topic, Partition: kafka.PartitionAny},
- Value: kmsg.msg.Value, Key: kmsg.msg.Key}, nil)
-
- if err == nil {
- incr_out_msg_cnt(kmsg.jobid)
- msg_ok = true
- log.Debug("Topic writer, msg sent ok on topic: ", kmsg.topic)
- } else {
- log.Info("Topic writer failed to send message on topic: ", kmsg.topic, " - Retrying. Error details: ", err)
- time.Sleep(time.Duration(retry) * time.Second)
- }
- }
- if !msg_ok {
- log.Error("Topic writer failed to send message on topic: ", kmsg.topic, " - Msg discarded. Error details: ", err)
- }
- case <-time.After(1000 * time.Millisecond):
- if !running {
- return
- }
- }
- }
- }()
-}
-
-func create_kafka_consumer(type_id string, gid string, cid string) *kafka.Consumer {
- var cm kafka.ConfigMap
- if creds_grant_type == "" {
- log.Info("Creating kafka SASL plain text consumer for type: ", type_id)
- cm = kafka.ConfigMap{
- "bootstrap.servers": bootstrapserver,
- "group.id": gid,
- "client.id": cid,
- "auto.offset.reset": "latest",
- "enable.auto.commit": false,
- }
- } else {
- log.Info("Creating kafka SASL plain text consumer for type: ", type_id)
- cm = kafka.ConfigMap{
- "bootstrap.servers": bootstrapserver,
- "group.id": gid,
- "client.id": cid,
- "auto.offset.reset": "latest",
- "enable.auto.commit": false,
- "sasl.mechanism": "OAUTHBEARER",
- "security.protocol": "SASL_PLAINTEXT",
- }
- }
- c, err := kafka.NewConsumer(&cm)
-
- if err != nil {
- log.Error("Cannot create kafka consumer for type: ", type_id, ", error details: ", err)
- return nil
- }
-
- log.Info("Created kafka consumer for type: ", type_id, " OK")
- return c
-}
-
-// Start kafka producer
-func start_producer() *kafka.Producer {
- log.Info("Creating kafka producer")
-
- var cm kafka.ConfigMap
- if creds_grant_type == "" {
- log.Info("Creating kafka SASL plain text producer")
- cm = kafka.ConfigMap{
- "bootstrap.servers": bootstrapserver,
- }
- } else {
- log.Info("Creating kafka SASL plain text producer")
- cm = kafka.ConfigMap{
- "bootstrap.servers": bootstrapserver,
- "sasl.mechanism": "OAUTHBEARER",
- "security.protocol": "SASL_PLAINTEXT",
- }
- }
-
- p, err := kafka.NewProducer(&cm)
- if err != nil {
- log.Error("Cannot create kafka producer,", err)
- return nil
- }
- return p
-}
-
-func start_adminclient() *kafka.AdminClient {
- log.Info("Creating kafka admin client")
- a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": bootstrapserver})
- if err != nil {
- log.Error("Cannot create kafka admin client,", err)
- return nil
- }
- return a
-}
-
-func create_minio_client(id string) (*minio.Client, *error) {
- log.Debug("Get minio client")
- minio_client, err := minio.New(filestore_server, &minio.Options{
- Secure: false,
- Creds: credentials.NewStaticV4(filestore_user, filestore_pwd, ""),
- })
- if err != nil {
- log.Error("Cannot create minio client, ", err)
- return nil, &err
- }
- return minio_client, nil
-}
-
-func incr_out_msg_cnt(jobid string) {
- j, ok := InfoJobs[jobid]
- if ok {
- j.statistics.out_msg_cnt++
- }
-}
-
-func start_job_xml_file_data(type_id string, control_ch chan JobControl, data_in_ch chan *KafkaPayload, data_out_channel chan *KafkaPayload, fvolume string, fsbucket string) {
-
- log.Info("Type job", type_id, " started")
-
- filters := make(map[string]Filter)
- topic_list := make(map[string]string)
- var mc *minio.Client
- const mc_id = "mc_" + "start_job_xml_file_data"
- running := true
- for {
- select {
- case job_ctl := <-control_ch:
- log.Debug("Type job ", type_id, " new cmd received ", job_ctl.command)
- switch job_ctl.command {
- case "EXIT":
- //ignore cmd - handled by channel signal
- case "ADD-FILTER":
- filters[job_ctl.filter.JobId] = job_ctl.filter
- log.Debug("Type job ", type_id, " updated, topic: ", job_ctl.filter.OutputTopic, " jobid: ", job_ctl.filter.JobId)
-
- tmp_topic_list := make(map[string]string)
- for k, v := range topic_list {
- tmp_topic_list[k] = v
- }
- tmp_topic_list[job_ctl.filter.JobId] = job_ctl.filter.OutputTopic
- topic_list = tmp_topic_list
- case "REMOVE-FILTER":
- log.Debug("Type job ", type_id, " removing job: ", job_ctl.filter.JobId)
-
- tmp_topic_list := make(map[string]string)
- for k, v := range topic_list {
- tmp_topic_list[k] = v
- }
- delete(tmp_topic_list, job_ctl.filter.JobId)
- topic_list = tmp_topic_list
- }
-
- case msg := <-data_in_ch:
- if msg == nil {
- log.Info("Type job ", type_id, " stopped by channel signal - start_job_xml_file_data")
-
- running = false
- return
- }
- if fsbucket != "" && fvolume == "" {
- if mc == nil {
- var err *error
- mc, err = create_minio_client(mc_id)
- if err != nil {
- log.Debug("Cannot create minio client for type job: ", type_id)
- }
- }
- }
- jobLimiterChan <- struct{}{}
- go run_xml_job(type_id, msg, "gz", data_out_channel, topic_list, jobLimiterChan, fvolume, fsbucket, mc, mc_id)
-
- case <-time.After(1 * time.Second):
- if !running {
- return
- }
- }
- }
-}
-
-func run_xml_job(type_id string, msg *KafkaPayload, outputCompression string, data_out_channel chan *KafkaPayload, topic_list map[string]string, jobLimiterChan chan struct{}, fvolume string, fsbucket string, mc *minio.Client, mc_id string) {
- defer func() {
- <-jobLimiterChan
- }()
- PrintMemUsage()
-
- if fvolume == "" && fsbucket == "" {
- log.Error("Type job: ", type_id, " cannot run, neither file volume nor filestore is set - discarding message")
- return
- } else if (fvolume != "") && (fsbucket != "") {
- log.Error("Type job: ", type_id, " cannot run with output to both file volume and filestore bucket - discarding message")
- return
- }
-
- start := time.Now()
- var evt_data XmlFileEventHeader
-
- err := jsoniter.Unmarshal(msg.msg.Value, &evt_data)
- if err != nil {
- log.Error("Cannot parse XmlFileEventHeader for type job: ", type_id, " - discarding message, error details", err)
- return
- }
- log.Debug("Unmarshal file-collect event for type job: ", type_id, " time: ", time.Since(start).String())
-
- var reader io.Reader
-
- INPUTBUCKET := "ropfiles"
-
- filename := ""
- if fvolume != "" {
- filename = fvolume + "/" + evt_data.Name
- fi, err := os.Open(filename)
-
- if err != nil {
- log.Error("File ", filename, " - cannot be opened for type job: ", type_id, " - discarding message, error details: ", err)
- return
- }
- defer fi.Close()
- reader = fi
- } else {
- filename = evt_data.Name
- if mc != nil {
- tctx := context.Background()
- mr, err := mc.GetObject(tctx, INPUTBUCKET, filename, minio.GetObjectOptions{})
- if err != nil {
- log.Error("Cannot get: ", filename, " for type job: ", type_id, " - discarding message, error details: ", err)
- return
- }
- if mr == nil {
- log.Error("Cannot get: ", filename, " for type job: ", type_id, " - minio get object returned null reader - discarding message, error details: ", err)
- return
- }
- reader = mr
- defer mr.Close()
- } else {
- log.Error("Cannot get: ", filename, " for type job: ", type_id, " - no minio client - discarding message")
- return
- }
- }
-
- if reader == nil {
- log.Error("Cannot get: ", filename, " - null reader")
- return
- }
- var file_bytes []byte
- if strings.HasSuffix(filename, "gz") {
- start := time.Now()
- var buf3 bytes.Buffer
- errb := gunzipReaderToWriter(&buf3, reader)
- if errb != nil {
- log.Error("Cannot gunzip file ", filename, " - discarding message, ", errb)
- return
- }
- file_bytes = buf3.Bytes()
- log.Debug("Gunzip file: ", filename, "time:", time.Since(start).String(), "len: ", len(file_bytes))
-
- } else {
- var buf3 bytes.Buffer
- _, err2 := io.Copy(&buf3, reader)
- if err2 != nil {
- log.Error("File ", filename, " - cannot be read, discarding message, ", err)
- return
- }
- file_bytes = buf3.Bytes()
- }
- start = time.Now()
- b, err := xml_to_json_conv(&file_bytes, &evt_data)
- if err != nil {
- log.Error("Cannot convert file ", evt_data.Name, " - discarding message, ", err)
- return
- }
- log.Debug("Converted file to json: ", filename, " time", time.Since(start).String(), "len;", len(b))
-
- new_fn := evt_data.Name + os.Getenv("KP") + ".json"
- if outputCompression == "gz" {
- new_fn = new_fn + ".gz"
- start = time.Now()
- var buf bytes.Buffer
- err = gzipWrite(&buf, &b)
- if err != nil {
- log.Error("Cannot gzip file ", new_fn, " - discarding message, ", err)
- return
- }
- b = buf.Bytes()
- log.Debug("Gzip file: ", new_fn, " time: ", time.Since(start).String(), "len:", len(file_bytes))
-
- }
- start = time.Now()
-
- if fvolume != "" {
- //Store on disk
- err = os.WriteFile(fvolume+"/"+new_fn, b, 0644)
- if err != nil {
- log.Error("Cannot write file ", new_fn, " - discarding message,", err)
- return
- }
- log.Debug("Write file to disk: "+new_fn, "time: ", time.Since(start).String(), " len: ", len(file_bytes))
- } else if fsbucket != "" {
- // Store in minio
- objectName := new_fn
- if mc != nil {
-
- contentType := "application/json"
- if strings.HasSuffix(objectName, ".gz") {
- contentType = "application/gzip"
- }
-
- // Upload the xml file with PutObject
- r := bytes.NewReader(b)
- tctx := context.Background()
- if check_minio_bucket(mc, mc_id, fsbucket) == false {
- err := create_minio_bucket(mc, mc_id, fsbucket)
- if err != nil {
- log.Error("Cannot create bucket: ", fsbucket, ", ", err)
- return
- }
- }
- ok := false
- for i := 1; i < 64 && ok == false; i = i * 2 {
- info, err := mc.PutObject(tctx, fsbucket, objectName, r, int64(len(b)), minio.PutObjectOptions{ContentType: contentType})
- if err != nil {
-
- if i == 1 {
- log.Warn("Cannot upload (first attempt): ", objectName, ", ", err)
- } else {
- log.Warn("Cannot upload (retry): ", objectName, ", ", err)
- }
- time.Sleep(time.Duration(i) * time.Second)
- } else {
- log.Debug("Store ", objectName, " in filestore, time: ", time.Since(start).String())
- log.Debug("Successfully uploaded: ", objectName, " of size:", info.Size)
- ok = true
- }
- }
- if !ok {
- log.Error("Cannot upload : ", objectName, ", ", err)
- }
- } else {
- log.Error("Cannot upload: ", objectName, ", no client")
- }
- }
-
- start = time.Now()
- if fvolume == "" {
- var fde FileDownloadedEvt
- fde.Filename = new_fn
- j, err := jsoniter.Marshal(fde)
-
- if err != nil {
- log.Error("Cannot marshal FileDownloadedEvt - discarding message, ", err)
- return
- }
- msg.msg.Value = j
- } else {
- var fde FileDownloadedEvt
- fde.Filename = new_fn
- j, err := jsoniter.Marshal(fde)
-
- if err != nil {
- log.Error("Cannot marshal FileDownloadedEvt - discarding message, ", err)
- return
- }
- msg.msg.Value = j
- }
- msg.msg.Key = []byte("\"" + evt_data.SourceName + "\"")
- log.Debug("Marshal file-collect event ", time.Since(start).String())
-
- for k, v := range topic_list {
- var kmsg *KafkaPayload = new(KafkaPayload)
- kmsg.msg = msg.msg
- kmsg.topic = v
- kmsg.jobid = k
- data_out_channel <- kmsg
- }
-}
-
-func xml_to_json_conv(f_byteValue *[]byte, xfeh *XmlFileEventHeader) ([]byte, error) {
- var f MeasCollecFile
- start := time.Now()
- err := xml.Unmarshal(*f_byteValue, &f)
- if err != nil {
- return nil, errors.New("Cannot unmarshal xml-file")
- }
- log.Debug("Unmarshal xml file XmlFileEvent: ", time.Since(start).String())
-
- start = time.Now()
- var pmfile PMJsonFile
-
- pmfile.Event.Perf3GppFields.Perf3GppFieldsVersion = "1.0"
- pmfile.Event.Perf3GppFields.MeasDataCollection.GranularityPeriod = 900
- pmfile.Event.Perf3GppFields.MeasDataCollection.MeasuredEntityUserName = ""
- pmfile.Event.Perf3GppFields.MeasDataCollection.MeasuredEntityDn = f.FileHeader.FileSender.LocalDn
- pmfile.Event.Perf3GppFields.MeasDataCollection.MeasuredEntitySoftwareVersion = f.MeasData.ManagedElement.SwVersion
-
- for _, it := range f.MeasData.MeasInfo {
- var mili MeasInfoList
- mili.MeasInfoID.SMeasInfoID = it.MeasInfoId
- for _, jt := range it.MeasType {
- mili.MeasTypes.SMeasTypesList = append(mili.MeasTypes.SMeasTypesList, jt.Text)
- }
- for _, jt := range it.MeasValue {
- var mv MeasValues
- mv.MeasObjInstID = jt.MeasObjLdn
- mv.SuspectFlag = jt.Suspect
- if jt.Suspect == "" {
- mv.SuspectFlag = "false"
- }
- for _, kt := range jt.R {
- ni, _ := strconv.Atoi(kt.P)
- nv := kt.Text
- mr := MeasResults{ni, nv}
- mv.MeasResultsList = append(mv.MeasResultsList, mr)
- }
- mili.MeasValuesList = append(mili.MeasValuesList, mv)
- }
-
- pmfile.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList = append(pmfile.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList, mili)
- }
-
- pmfile.Event.Perf3GppFields.MeasDataCollection.GranularityPeriod = 900
-
- //TODO: Fill more values
- pmfile.Event.CommonEventHeader.Domain = "" //xfeh.Domain
- pmfile.Event.CommonEventHeader.EventID = "" //xfeh.EventID
- pmfile.Event.CommonEventHeader.Sequence = 0 //xfeh.Sequence
- pmfile.Event.CommonEventHeader.EventName = "" //xfeh.EventName
- pmfile.Event.CommonEventHeader.SourceName = xfeh.SourceName
- pmfile.Event.CommonEventHeader.ReportingEntityName = "" //xfeh.ReportingEntityName
- pmfile.Event.CommonEventHeader.Priority = "" //xfeh.Priority
- pmfile.Event.CommonEventHeader.StartEpochMicrosec = xfeh.StartEpochMicrosec
- pmfile.Event.CommonEventHeader.LastEpochMicrosec = xfeh.LastEpochMicrosec
- pmfile.Event.CommonEventHeader.Version = "" //xfeh.Version
- pmfile.Event.CommonEventHeader.VesEventListenerVersion = "" //xfeh.VesEventListenerVersion
- pmfile.Event.CommonEventHeader.TimeZoneOffset = xfeh.TimeZoneOffset
-
- log.Debug("Convert xml to json : ", time.Since(start).String())
-
- start = time.Now()
- json, err := jsoniter.Marshal(pmfile)
- log.Debug("Marshal json : ", time.Since(start).String())
-
- if err != nil {
- return nil, errors.New("Cannot marshal converted json")
- }
- return json, nil
-}
-
-func start_job_json_file_data(type_id string, control_ch chan JobControl, data_in_ch chan *KafkaPayload, data_out_channel chan *KafkaPayload, objectstore bool) {
-
- log.Info("Type job", type_id, " started")
-
- filters := make(map[string]Filter)
- filterParams_list := make(map[string]FilterMaps)
- topic_list := make(map[string]string)
- var mc *minio.Client
- const mc_id = "mc_" + "start_job_json_file_data"
- running := true
- for {
- select {
- case job_ctl := <-control_ch:
- log.Debug("Type job ", type_id, " new cmd received ", job_ctl.command)
- switch job_ctl.command {
- case "EXIT":
- case "ADD-FILTER":
- filters[job_ctl.filter.JobId] = job_ctl.filter
- log.Debug("Type job ", type_id, " updated, topic: ", job_ctl.filter.OutputTopic, " jobid: ", job_ctl.filter.JobId)
-
- tmp_filterParams_list := make(map[string]FilterMaps)
- for k, v := range filterParams_list {
- tmp_filterParams_list[k] = v
- }
- tmp_filterParams_list[job_ctl.filter.JobId] = job_ctl.filter.filter
- filterParams_list = tmp_filterParams_list
-
- tmp_topic_list := make(map[string]string)
- for k, v := range topic_list {
- tmp_topic_list[k] = v
- }
- tmp_topic_list[job_ctl.filter.JobId] = job_ctl.filter.OutputTopic
- topic_list = tmp_topic_list
- case "REMOVE-FILTER":
- log.Debug("Type job ", type_id, " removing job: ", job_ctl.filter.JobId)
-
- tmp_filterParams_list := make(map[string]FilterMaps)
- for k, v := range filterParams_list {
- tmp_filterParams_list[k] = v
- }
- delete(tmp_filterParams_list, job_ctl.filter.JobId)
- filterParams_list = tmp_filterParams_list
-
- tmp_topic_list := make(map[string]string)
- for k, v := range topic_list {
- tmp_topic_list[k] = v
- }
- delete(tmp_topic_list, job_ctl.filter.JobId)
- topic_list = tmp_topic_list
- }
-
- case msg := <-data_in_ch:
- if msg == nil {
- log.Info("Type job ", type_id, " stopped by channel signal - start_job_xml_file_data")
-
- running = false
- return
- }
- if objectstore {
- if mc == nil {
- var err *error
- mc, err = create_minio_client(mc_id)
- if err != nil {
- log.Debug("Cannot create minio client for type job: ", type_id)
- }
- }
- }
- //TODO: Sort processed file conversions in order (FIFO)
- jobLimiterChan <- struct{}{}
- go run_json_job(type_id, msg, "", filterParams_list, topic_list, data_out_channel, jobLimiterChan, mc, mc_id, objectstore)
-
- case <-time.After(1 * time.Second):
- if !running {
- return
- }
- }
- }
-}
-
-func run_json_job(type_id string, msg *KafkaPayload, outputCompression string, filterList map[string]FilterMaps, topic_list map[string]string, data_out_channel chan *KafkaPayload, jobLimiterChan chan struct{}, mc *minio.Client, mc_id string, objectstore bool) {
-
- //Release job limit
- defer func() {
- <-jobLimiterChan
- }()
-
- PrintMemUsage()
-
- var evt_data FileDownloadedEvt
- err := jsoniter.Unmarshal(msg.msg.Value, &evt_data)
- if err != nil {
- log.Error("Cannot parse FileDownloadedEvt for type job: ", type_id, " - discarding message, error details: ", err)
- return
- }
- log.Debug("FileDownloadedEvt for job: ", type_id, " file: ", evt_data.Filename)
-
- var reader io.Reader
-
- INPUTBUCKET := "pm-files-json"
- filename := ""
- if objectstore == false {
- filename = files_volume + "/" + evt_data.Filename
- fi, err := os.Open(filename)
-
- if err != nil {
- log.Error("File: ", filename, "for type job: ", type_id, " - cannot be opened -discarding message - error details: ", err)
- return
- }
- defer fi.Close()
- reader = fi
- } else {
- filename = "/" + evt_data.Filename
- if mc != nil {
- if check_minio_bucket(mc, mc_id, INPUTBUCKET) == false {
- log.Error("Bucket not available for reading in type job: ", type_id, "bucket: ", INPUTBUCKET)
- return
- }
- tctx := context.Background()
- mr, err := mc.GetObject(tctx, INPUTBUCKET, filename, minio.GetObjectOptions{})
- if err != nil {
- log.Error("Obj: ", filename, "for type job: ", type_id, " - cannot be fetched from bucket: ", INPUTBUCKET, " - discarding message, error details: ", err)
- return
- }
- reader = mr
- defer mr.Close()
- } else {
- log.Error("Cannot get obj: ", filename, "for type job: ", type_id, " from bucket: ", INPUTBUCKET, " - no client")
- return
- }
- }
-
- var data *[]byte
- if strings.HasSuffix(filename, "gz") {
- start := time.Now()
- var buf2 bytes.Buffer
- errb := gunzipReaderToWriter(&buf2, reader)
- if errb != nil {
- log.Error("Cannot decompress file/obj ", filename, "for type job: ", type_id, " - discarding message, error details", errb)
- return
- }
- d := buf2.Bytes()
- data = &d
- log.Debug("Decompress file/obj ", filename, "for type job: ", type_id, " time:", time.Since(start).String())
- } else {
-
- start := time.Now()
- d, err := io.ReadAll(reader)
- if err != nil {
- log.Error("Cannot read file/obj: ", filename, "for type job: ", type_id, " - discarding message, error details", err)
- return
- }
- data = &d
-
- log.Debug("Read file/obj: ", filename, "for type job: ", type_id, " time: ", time.Since(start).String())
- }
-
- for k, v := range filterList {
-
- var pmfile PMJsonFile
- start := time.Now()
- err = jsoniter.Unmarshal(*data, &pmfile)
- log.Debug("Json unmarshal obj: ", filename, " time: ", time.Since(start).String())
-
- if err != nil {
- log.Error("Msg could not be unmarshalled - discarding message, obj: ", filename, " - error details:", err)
- return
- }
-
- var kmsg *KafkaPayload = new(KafkaPayload)
- kmsg.msg = new(kafka.Message)
- kmsg.msg.Key = []byte("\"" + pmfile.Event.CommonEventHeader.SourceName + "\"")
- log.Debug("topic:", topic_list[k])
- log.Debug("sourceNameMap:", v.sourceNameMap)
- log.Debug("measObjClassMap:", v.measObjClassMap)
- log.Debug("measObjInstIdsMap:", v.measObjInstIdsMap)
- log.Debug("measTypesMap:", v.measTypesMap)
-
- b := json_pm_filter_to_byte(evt_data.Filename, &pmfile, v.sourceNameMap, v.measObjClassMap, v.measObjInstIdsMap, v.measTypesMap)
- if b == nil {
- log.Info("File/obj ", filename, "for job: ", k, " - empty after filering, discarding message ")
- return
- }
- kmsg.msg.Value = *b
-
- kmsg.topic = topic_list[k]
- kmsg.jobid = k
-
- data_out_channel <- kmsg
- }
-
-}
-
-func json_pm_filter_to_byte(resource string, data *PMJsonFile, sourceNameMap map[string]bool, measObjClassMap map[string]bool, measObjInstIdsMap map[string]bool, measTypesMap map[string]bool) *[]byte {
-
- if json_pm_filter_to_obj(resource, data, sourceNameMap, measObjClassMap, measObjInstIdsMap, measTypesMap) == nil {
- return nil
- }
- start := time.Now()
- j, err := jsoniter.Marshal(&data)
-
- log.Debug("Json marshal obj: ", resource, " time: ", time.Since(start).String())
-
- if err != nil {
- log.Error("Msg could not be marshalled discarding message, obj: ", resource, ", error details: ", err)
- return nil
- }
-
- log.Debug("Filtered json obj: ", resource, " len: ", len(j))
- return &j
-}
-
-func json_pm_filter_to_obj(resource string, data *PMJsonFile, sourceNameMap map[string]bool, measObjClassMap map[string]bool, measObjInstIdsMap map[string]bool, measTypesMap map[string]bool) *PMJsonFile {
- filter_req := true
- start := time.Now()
- if len(sourceNameMap) != 0 {
- if !sourceNameMap[data.Event.CommonEventHeader.SourceName] {
- filter_req = false
- return nil
- }
- }
- if filter_req {
- modified := false
- var temp_mil []MeasInfoList
- for _, zz := range data.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList {
-
- check_cntr := false
- var cnt_flags []bool
- if len(measTypesMap) > 0 {
- c_cntr := 0
- var temp_mtl []string
- for _, v := range zz.MeasTypes.SMeasTypesList {
- if measTypesMap[v] {
- cnt_flags = append(cnt_flags, true)
- c_cntr++
- temp_mtl = append(temp_mtl, v)
- } else {
- cnt_flags = append(cnt_flags, false)
- }
- }
- if c_cntr > 0 {
- check_cntr = true
- zz.MeasTypes.SMeasTypesList = temp_mtl
- } else {
- modified = true
- continue
- }
- }
- keep := false
- var temp_mvl []MeasValues
- for _, yy := range zz.MeasValuesList {
- keep_class := false
- keep_inst := false
- keep_cntr := false
-
- dna := strings.Split(yy.MeasObjInstID, ",")
- instName := dna[len(dna)-1]
- cls := strings.Split(dna[len(dna)-1], "=")[0]
-
- if len(measObjClassMap) > 0 {
- if measObjClassMap[cls] {
- keep_class = true
- }
- } else {
- keep_class = true
- }
-
- if len(measObjInstIdsMap) > 0 {
- if measObjInstIdsMap[instName] {
- keep_inst = true
- }
- } else {
- keep_inst = true
- }
-
- if check_cntr {
- var temp_mrl []MeasResults
- cnt_p := 1
- for _, v := range yy.MeasResultsList {
- if cnt_flags[v.P-1] {
- v.P = cnt_p
- cnt_p++
- temp_mrl = append(temp_mrl, v)
- }
- }
- yy.MeasResultsList = temp_mrl
- keep_cntr = true
- } else {
- keep_cntr = true
- }
- if keep_class && keep_cntr && keep_inst {
- keep = true
- temp_mvl = append(temp_mvl, yy)
- }
- }
- if keep {
- zz.MeasValuesList = temp_mvl
- temp_mil = append(temp_mil, zz)
- modified = true
- }
-
- }
- //Only if modified
- if modified {
- if len(temp_mil) == 0 {
- log.Debug("Msg filtered, nothing found, discarding, obj: ", resource)
- return nil
- }
- data.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList = temp_mil
- }
- }
- log.Debug("Filter: ", time.Since(start).String())
- return data
-}
-
-func start_job_json_file_data_influx(type_id string, control_ch chan JobControl, data_in_ch chan *KafkaPayload, objectstore bool) {
-
- log.Info("Type job", type_id, " started")
- log.Debug("influx job ch ", data_in_ch)
- filters := make(map[string]Filter)
- filterParams_list := make(map[string]FilterMaps)
- influx_job_params := make(map[string]InfluxJobParameters)
- var mc *minio.Client
- const mc_id = "mc_" + "start_job_json_file_data_influx"
- running := true
- for {
- select {
- case job_ctl := <-control_ch:
- log.Debug("Type job ", type_id, " new cmd received ", job_ctl.command)
- switch job_ctl.command {
- case "EXIT":
- //ignore cmd - handled by channel signal
- case "ADD-FILTER":
-
- filters[job_ctl.filter.JobId] = job_ctl.filter
- log.Debug("Type job ", type_id, " updated, topic: ", job_ctl.filter.OutputTopic, " jobid: ", job_ctl.filter.JobId)
- log.Debug(job_ctl.filter)
- tmp_filterParams_list := make(map[string]FilterMaps)
- for k, v := range filterParams_list {
- tmp_filterParams_list[k] = v
- }
- tmp_filterParams_list[job_ctl.filter.JobId] = job_ctl.filter.filter
- filterParams_list = tmp_filterParams_list
-
- tmp_influx_job_params := make(map[string]InfluxJobParameters)
- for k, v := range influx_job_params {
- tmp_influx_job_params[k] = v
- }
- tmp_influx_job_params[job_ctl.filter.JobId] = job_ctl.filter.influxParameters
- influx_job_params = tmp_influx_job_params
-
- case "REMOVE-FILTER":
-
- log.Debug("Type job ", type_id, " removing job: ", job_ctl.filter.JobId)
-
- tmp_filterParams_list := make(map[string]FilterMaps)
- for k, v := range filterParams_list {
- tmp_filterParams_list[k] = v
- }
- delete(tmp_filterParams_list, job_ctl.filter.JobId)
- filterParams_list = tmp_filterParams_list
-
- tmp_influx_job_params := make(map[string]InfluxJobParameters)
- for k, v := range influx_job_params {
- tmp_influx_job_params[k] = v
- }
- delete(tmp_influx_job_params, job_ctl.filter.JobId)
- influx_job_params = tmp_influx_job_params
- }
-
- case msg := <-data_in_ch:
- log.Debug("Data reveived - influx")
- if msg == nil {
- log.Info("Type job ", type_id, " stopped by channel signal - start_job_xml_file_data")
-
- running = false
- return
- }
- if objectstore {
- if mc == nil {
- var err *error
- mc, err = create_minio_client(mc_id)
- if err != nil {
- log.Debug("Cannot create minio client for type job: ", type_id)
- }
- }
- }
-
- jobLimiterChan <- struct{}{}
- go run_json_file_data_job_influx(type_id, msg, filterParams_list, influx_job_params, jobLimiterChan, mc, mc_id, objectstore)
-
- case <-time.After(1 * time.Second):
- if !running {
- return
- }
- }
- }
-}
-
-func run_json_file_data_job_influx(type_id string, msg *KafkaPayload, filterList map[string]FilterMaps, influxList map[string]InfluxJobParameters, jobLimiterChan chan struct{}, mc *minio.Client, mc_id string, objectstore bool) {
-
- log.Debug("run_json_file_data_job_influx")
- //Release job limit
- defer func() {
- <-jobLimiterChan
- }()
-
- PrintMemUsage()
-
- var evt_data FileDownloadedEvt
- err := jsoniter.Unmarshal(msg.msg.Value, &evt_data)
- if err != nil {
- log.Error("Cannot parse FileDownloadedEvt for type job: ", type_id, " - discarding message, error details: ", err)
- return
- }
- log.Debug("FileDownloadedEvt for job: ", type_id, " file: ", evt_data.Filename)
-
- var reader io.Reader
-
- INPUTBUCKET := "pm-files-json"
- filename := ""
- if objectstore == false {
- filename = files_volume + "/" + evt_data.Filename
- fi, err := os.Open(filename)
-
- if err != nil {
- log.Error("File: ", filename, "for type job: ", type_id, " - cannot be opened -discarding message - error details: ", err)
- return
- }
- defer fi.Close()
- reader = fi
- } else {
- filename = "/" + evt_data.Filename
- if mc != nil {
- if check_minio_bucket(mc, mc_id, INPUTBUCKET) == false {
- log.Error("Bucket not available for reading in type job: ", type_id, "bucket: ", INPUTBUCKET)
- return
- }
- tctx := context.Background()
- mr, err := mc.GetObject(tctx, INPUTBUCKET, filename, minio.GetObjectOptions{})
- if err != nil {
- log.Error("Obj: ", filename, "for type job: ", type_id, " - cannot be fetched from bucket: ", INPUTBUCKET, " - discarding message, error details: ", err)
- return
- }
- reader = mr
- defer mr.Close()
- } else {
- log.Error("Cannot get obj: ", filename, "for type job: ", type_id, " from bucket: ", INPUTBUCKET, " - no client")
- return
- }
- }
-
- var data *[]byte
- if strings.HasSuffix(filename, "gz") {
- start := time.Now()
- var buf2 bytes.Buffer
- errb := gunzipReaderToWriter(&buf2, reader)
- if errb != nil {
- log.Error("Cannot decompress file/obj ", filename, "for type job: ", type_id, " - discarding message, error details", errb)
- return
- }
- d := buf2.Bytes()
- data = &d
- log.Debug("Decompress file/obj ", filename, "for type job: ", type_id, " time:", time.Since(start).String())
- } else {
-
- start := time.Now()
- d, err := io.ReadAll(reader)
- if err != nil {
- log.Error("Cannot read file/obj: ", filename, "for type job: ", type_id, " - discarding message, error details", err)
- return
- }
- data = &d
-
- log.Debug("Read file/obj: ", filename, "for type job: ", type_id, " time: ", time.Since(start).String())
- }
- for k, v := range filterList {
-
- var pmfile PMJsonFile
- start := time.Now()
- err = jsoniter.Unmarshal(*data, &pmfile)
- log.Debug("Json unmarshal obj: ", filename, " time: ", time.Since(start).String())
-
- if err != nil {
- log.Error("Msg could not be unmarshalled - discarding message, obj: ", filename, " - error details:", err)
- return
- }
-
- if len(v.sourceNameMap) > 0 || len(v.measObjInstIdsMap) > 0 || len(v.measTypesMap) > 0 || len(v.measObjClassMap) > 0 {
- b := json_pm_filter_to_obj(evt_data.Filename, &pmfile, v.sourceNameMap, v.measObjClassMap, v.measObjInstIdsMap, v.measTypesMap)
- if b == nil {
- log.Info("File/obj ", filename, "for job: ", k, " - empty after filering, discarding message ")
- return
- }
-
- }
- fluxParms := influxList[k]
- log.Debug("Influxdb params: ", fluxParms)
- client := influxdb2.NewClient(fluxParms.DbUrl, fluxParms.DbToken)
- writeAPI := client.WriteAPIBlocking(fluxParms.DbOrg, fluxParms.DbBucket)
-
- for _, zz := range pmfile.Event.Perf3GppFields.MeasDataCollection.SMeasInfoList {
- ctr_names := make(map[string]string)
- for cni, cn := range zz.MeasTypes.SMeasTypesList {
- ctr_names[strconv.Itoa(cni+1)] = cn
- }
- for _, xx := range zz.MeasValuesList {
- log.Debug("Measurement: ", xx.MeasObjInstID)
- log.Debug("Suspect flag: ", xx.SuspectFlag)
- p := influxdb2.NewPointWithMeasurement(xx.MeasObjInstID)
- p.AddField("suspectflag", xx.SuspectFlag)
- p.AddField("granularityperiod", pmfile.Event.Perf3GppFields.MeasDataCollection.GranularityPeriod)
- p.AddField("timezone", pmfile.Event.CommonEventHeader.TimeZoneOffset)
- for _, yy := range xx.MeasResultsList {
- pi := strconv.Itoa(yy.P)
- pv := yy.SValue
- pn := ctr_names[pi]
- log.Debug("Counter: ", pn, " Value: ", pv)
- pv_i, err := strconv.Atoi(pv)
- if err == nil {
- p.AddField(pn, pv_i)
- } else {
- p.AddField(pn, pv)
- }
- }
- //p.SetTime(timeT)
- log.Debug("StartEpochMicrosec from common event header: ", pmfile.Event.CommonEventHeader.StartEpochMicrosec)
- log.Debug("Set time: ", time.Unix(pmfile.Event.CommonEventHeader.StartEpochMicrosec/1000000, 0))
- p.SetTime(time.Unix(pmfile.Event.CommonEventHeader.StartEpochMicrosec/1000000, 0))
- err := writeAPI.WritePoint(context.Background(), p)
- if err != nil {
- log.Error("Db write error: ", err)
- }
- }
-
- }
- client.Close()
- }
-
}