| // ============LICENSE_START=============================================== |
| // Copyright (C) 2022 Nordix Foundation. All rights reserved. |
| // ======================================================================== |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| // ============LICENSE_END================================================= |
| // |
| |
| package main |
| |
| import ( |
| "bytes" |
| "encoding/json" |
| "fmt" |
| "net/http" |
| "os" |
| "runtime" |
| "time" |
| |
| "github.com/confluentinc/confluent-kafka-go/kafka" |
| log "github.com/sirupsen/logrus" |
| ) |
| |
| var rapp_id = os.Getenv("RAPPID") |
| |
| var bootstrapserver = os.Getenv("KAFKA_SERVER") |
| |
| var topic = os.Getenv("TOPIC") |
| |
| var ics_server = os.Getenv("ICS") |
| |
| var job_type = os.Getenv("JOBTYPE") |
| |
| var gid string = "pm-rapp-" + job_type + "-" + rapp_id |
| var cid = gid |
| |
| // == Main ==// |
| func main() { |
| |
| log.SetLevel(log.InfoLevel) |
| |
| log.Info("Server starting...") |
| |
| if rapp_id == "" { |
| log.Error("Env RAPPID not set") |
| os.Exit(1) |
| } |
| |
| if bootstrapserver == "" { |
| log.Error("Env KAFKA_SERVER not set") |
| os.Exit(1) |
| } |
| |
| if topic == "" { |
| log.Error("Env TOPIC not set") |
| os.Exit(1) |
| } |
| |
| if ics_server == "" { |
| log.Error("Env ICS not set") |
| os.Exit(1) |
| } |
| |
| if job_type == "" { |
| log.Error("Env JOBTYPE not set") |
| os.Exit(1) |
| } |
| |
| jobid := "rapp-job-" + job_type + "-" + rapp_id |
| |
| json_str := "" |
| switch job_type { |
| case "PmData": |
| json_str = "{\"info_type_id\": \"PmData\", \"job_owner\": \"console\",\"status_notification_uri\": \"http://callback:80/post\",\"job_definition\": {\"kafkaOutputTopic\":\"" + topic + "\",\"filterType\":\"pmdata\",\"filter\":{\"measTypes\":[\"pmCounterNumber101\"]}}}" |
| case "json-file-data": |
| json_str = "{\"info_type_id\": \"json-file-data\", \"job_owner\": \"console\",\"status_notification_uri\": \"http://callback:80/post\",\"job_definition\": {\"kafkaOutputTopic\":\"" + topic + "\",\"filterType\":\"pmdata\",\"filter\":{\"measTypes\":[\"pmCounterNumber101\"]}}}" |
| case "json-file-data-from-filestore": |
| json_str = "{\"info_type_id\": \"json-file-data-from-filestore\", \"job_owner\": \"console\",\"status_notification_uri\": \"http://callback:80/post\",\"job_definition\": {\"kafkaOutputTopic\":\"" + topic + "\",\"filterType\":\"pmdata\",\"filter\":{\"measTypes\":[\"pmCounterNumber101\"]}}}" |
| default: |
| log.Error("Unknown job type: ", job_type) |
| os.Exit(1) |
| } |
| |
| ok := false |
| for !ok { |
| ok = send_http_request([]byte(json_str), http.MethodPut, "http://"+ics_server+"/data-consumer/v1/info-jobs/"+jobid) |
| if !ok { |
| log.Info("Failed to register job: ", jobid, " - retrying") |
| time.Sleep(time.Second) |
| } |
| } |
| |
| go read_kafka_messages() |
| |
| //Wait until all go routines has exited |
| runtime.Goexit() |
| |
| log.Warn("main routine exit") |
| log.Warn("server is stopping...") |
| } |
| |
| func send_http_request(json []byte, method string, url string) bool { |
| |
| client := &http.Client{} |
| |
| var req *http.Request |
| var err error |
| req, err = http.NewRequest(method, url, bytes.NewBuffer(json)) |
| if err != nil { |
| log.Error("Cannot create http request method: ", method, " url: ", url) |
| return false |
| } |
| req.Header.Set("Content-Type", "application/json; charset=utf-8") |
| |
| resp, err2 := client.Do(req) |
| if err2 != nil { |
| log.Error("Cannot send http request, method: ", method, "url: ", url) |
| } else { |
| if resp.StatusCode == 200 || resp.StatusCode == 201 { |
| return true |
| } |
| } |
| return false |
| |
| } |
| |
| func read_kafka_messages() { |
| var c *kafka.Consumer = nil |
| log.Info("Creating kafka consumer ") |
| var err error |
| for c == nil { |
| c, err = kafka.NewConsumer(&kafka.ConfigMap{ |
| "bootstrap.servers": bootstrapserver, |
| "group.id": gid, |
| "client.id": cid, |
| "auto.offset.reset": "latest", |
| }) |
| if err != nil { |
| log.Warning("Cannot create kafka consumer - retyring") |
| } |
| } |
| |
| log.Info("Creating kafka consumer - ok") |
| log.Info("Start subscribing to topic: ", topic) |
| topic_ok := false |
| for !topic_ok { |
| err = c.SubscribeTopics([]string{topic}, nil) |
| if err != nil { |
| log.Info("Topic reader cannot start subscribing on topic: ", topic, " - retrying -- error details: ", err) |
| } else { |
| log.Info("Topic reader subscribing on topic: ", topic) |
| topic_ok = true |
| } |
| } |
| |
| maxDur := 1 * time.Second |
| for { |
| msg, err := c.ReadMessage(maxDur) |
| if err == nil { |
| buf := &bytes.Buffer{} |
| if err := json.Indent(buf, msg.Value, "", " "); err != nil { |
| log.Warning("Received msg not json?") |
| } else { |
| fmt.Println(buf.String()) |
| } |
| c.Commit() |
| } else { |
| log.Debug(" Nothing to consume on topic: ", topic, ", reason: ", err) |
| } |
| } |
| } |