blob: 99d742a75532814090385a5aaaac54bfe4f8ce67 [file] [log] [blame]
// ============LICENSE_START===============================================
// Copyright (C) 2022 Nordix Foundation. All rights reserved.
// ========================================================================
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ============LICENSE_END=================================================
//
package main
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"os"
"runtime"
"time"
"github.com/confluentinc/confluent-kafka-go/kafka"
log "github.com/sirupsen/logrus"
)
var rapp_id = os.Getenv("RAPPID")
var bootstrapserver = os.Getenv("KAFKA_SERVER")
var topic = os.Getenv("TOPIC")
var ics_server = os.Getenv("ICS")
var job_type = os.Getenv("JOBTYPE")
var gid string = "pm-rapp-" + job_type + "-" + rapp_id
var cid = gid
// == Main ==//
func main() {
log.SetLevel(log.InfoLevel)
log.Info("Server starting...")
if rapp_id == "" {
log.Error("Env RAPPID not set")
os.Exit(1)
}
if bootstrapserver == "" {
log.Error("Env KAFKA_SERVER not set")
os.Exit(1)
}
if topic == "" {
log.Error("Env TOPIC not set")
os.Exit(1)
}
if ics_server == "" {
log.Error("Env ICS not set")
os.Exit(1)
}
if job_type == "" {
log.Error("Env JOBTYPE not set")
os.Exit(1)
}
jobid := "rapp-job-" + job_type + "-" + rapp_id
json_str := ""
switch job_type {
case "PmData":
json_str = "{\"info_type_id\": \"PmData\", \"job_owner\": \"console\",\"status_notification_uri\": \"http://callback:80/post\",\"job_definition\": {\"kafkaOutputTopic\":\"" + topic + "\",\"filterType\":\"pmdata\",\"filter\":{\"measTypes\":[\"pmCounterNumber101\"]}}}"
case "json-file-data":
json_str = "{\"info_type_id\": \"json-file-data\", \"job_owner\": \"console\",\"status_notification_uri\": \"http://callback:80/post\",\"job_definition\": {\"kafkaOutputTopic\":\"" + topic + "\",\"filterType\":\"pmdata\",\"filter\":{\"measTypes\":[\"pmCounterNumber101\"]}}}"
case "json-file-data-from-filestore":
json_str = "{\"info_type_id\": \"json-file-data-from-filestore\", \"job_owner\": \"console\",\"status_notification_uri\": \"http://callback:80/post\",\"job_definition\": {\"kafkaOutputTopic\":\"" + topic + "\",\"filterType\":\"pmdata\",\"filter\":{\"measTypes\":[\"pmCounterNumber101\"]}}}"
default:
log.Error("Unknown job type: ", job_type)
os.Exit(1)
}
ok := false
for !ok {
ok = send_http_request([]byte(json_str), http.MethodPut, "http://"+ics_server+"/data-consumer/v1/info-jobs/"+jobid)
if !ok {
log.Info("Failed to register job: ", jobid, " - retrying")
time.Sleep(time.Second)
}
}
go read_kafka_messages()
//Wait until all go routines has exited
runtime.Goexit()
log.Warn("main routine exit")
log.Warn("server is stopping...")
}
func send_http_request(json []byte, method string, url string) bool {
client := &http.Client{}
var req *http.Request
var err error
req, err = http.NewRequest(method, url, bytes.NewBuffer(json))
if err != nil {
log.Error("Cannot create http request method: ", method, " url: ", url)
return false
}
req.Header.Set("Content-Type", "application/json; charset=utf-8")
resp, err2 := client.Do(req)
if err2 != nil {
log.Error("Cannot send http request, method: ", method, "url: ", url)
} else {
if resp.StatusCode == 200 || resp.StatusCode == 201 {
return true
}
}
return false
}
func read_kafka_messages() {
var c *kafka.Consumer = nil
log.Info("Creating kafka consumer ")
var err error
for c == nil {
c, err = kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": bootstrapserver,
"group.id": gid,
"client.id": cid,
"auto.offset.reset": "latest",
})
if err != nil {
log.Warning("Cannot create kafka consumer - retyring")
}
}
log.Info("Creating kafka consumer - ok")
log.Info("Start subscribing to topic: ", topic)
topic_ok := false
for !topic_ok {
err = c.SubscribeTopics([]string{topic}, nil)
if err != nil {
log.Info("Topic reader cannot start subscribing on topic: ", topic, " - retrying -- error details: ", err)
} else {
log.Info("Topic reader subscribing on topic: ", topic)
topic_ok = true
}
}
maxDur := 1 * time.Second
for {
msg, err := c.ReadMessage(maxDur)
if err == nil {
buf := &bytes.Buffer{}
if err := json.Indent(buf, msg.Value, "", " "); err != nil {
log.Warning("Received msg not json?")
} else {
fmt.Println(buf.String())
}
c.Commit()
} else {
log.Debug(" Nothing to consume on topic: ", topic, ", reason: ", err)
}
}
}