blob: 99d742a75532814090385a5aaaac54bfe4f8ce67 [file] [log] [blame]
BjornMagnussonXAb998ed62022-09-21 09:26:21 +02001// ============LICENSE_START===============================================
2// Copyright (C) 2022 Nordix Foundation. All rights reserved.
3// ========================================================================
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15// ============LICENSE_END=================================================
16//
17
18package main
19
20import (
21 "bytes"
22 "encoding/json"
23 "fmt"
24 "net/http"
25 "os"
26 "runtime"
27 "time"
28
29 "github.com/confluentinc/confluent-kafka-go/kafka"
30 log "github.com/sirupsen/logrus"
31)
32
33var rapp_id = os.Getenv("RAPPID")
34
35var bootstrapserver = os.Getenv("KAFKA_SERVER")
36
37var topic = os.Getenv("TOPIC")
38
39var ics_server = os.Getenv("ICS")
40
41var job_type = os.Getenv("JOBTYPE")
42
43var gid string = "pm-rapp-" + job_type + "-" + rapp_id
44var cid = gid
45
46// == Main ==//
47func main() {
48
49 log.SetLevel(log.InfoLevel)
50
51 log.Info("Server starting...")
52
53 if rapp_id == "" {
54 log.Error("Env RAPPID not set")
55 os.Exit(1)
56 }
57
58 if bootstrapserver == "" {
59 log.Error("Env KAFKA_SERVER not set")
60 os.Exit(1)
61 }
62
63 if topic == "" {
64 log.Error("Env TOPIC not set")
65 os.Exit(1)
66 }
67
68 if ics_server == "" {
69 log.Error("Env ICS not set")
70 os.Exit(1)
71 }
72
73 if job_type == "" {
74 log.Error("Env JOBTYPE not set")
75 os.Exit(1)
76 }
77
78 jobid := "rapp-job-" + job_type + "-" + rapp_id
79
80 json_str := ""
81 switch job_type {
82 case "PmData":
83 json_str = "{\"info_type_id\": \"PmData\", \"job_owner\": \"console\",\"status_notification_uri\": \"http://callback:80/post\",\"job_definition\": {\"kafkaOutputTopic\":\"" + topic + "\",\"filterType\":\"pmdata\",\"filter\":{\"measTypes\":[\"pmCounterNumber101\"]}}}"
84 case "json-file-data":
85 json_str = "{\"info_type_id\": \"json-file-data\", \"job_owner\": \"console\",\"status_notification_uri\": \"http://callback:80/post\",\"job_definition\": {\"kafkaOutputTopic\":\"" + topic + "\",\"filterType\":\"pmdata\",\"filter\":{\"measTypes\":[\"pmCounterNumber101\"]}}}"
86 case "json-file-data-from-filestore":
87 json_str = "{\"info_type_id\": \"json-file-data-from-filestore\", \"job_owner\": \"console\",\"status_notification_uri\": \"http://callback:80/post\",\"job_definition\": {\"kafkaOutputTopic\":\"" + topic + "\",\"filterType\":\"pmdata\",\"filter\":{\"measTypes\":[\"pmCounterNumber101\"]}}}"
88 default:
89 log.Error("Unknown job type: ", job_type)
90 os.Exit(1)
91 }
92
93 ok := false
94 for !ok {
95 ok = send_http_request([]byte(json_str), http.MethodPut, "http://"+ics_server+"/data-consumer/v1/info-jobs/"+jobid)
96 if !ok {
97 log.Info("Failed to register job: ", jobid, " - retrying")
98 time.Sleep(time.Second)
99 }
100 }
101
102 go read_kafka_messages()
103
104 //Wait until all go routines has exited
105 runtime.Goexit()
106
107 log.Warn("main routine exit")
108 log.Warn("server is stopping...")
109}
110
111func send_http_request(json []byte, method string, url string) bool {
112
113 client := &http.Client{}
114
115 var req *http.Request
116 var err error
117 req, err = http.NewRequest(method, url, bytes.NewBuffer(json))
118 if err != nil {
119 log.Error("Cannot create http request method: ", method, " url: ", url)
120 return false
121 }
122 req.Header.Set("Content-Type", "application/json; charset=utf-8")
123
124 resp, err2 := client.Do(req)
125 if err2 != nil {
126 log.Error("Cannot send http request, method: ", method, "url: ", url)
127 } else {
128 if resp.StatusCode == 200 || resp.StatusCode == 201 {
129 return true
130 }
131 }
132 return false
133
134}
135
136func read_kafka_messages() {
137 var c *kafka.Consumer = nil
138 log.Info("Creating kafka consumer ")
139 var err error
140 for c == nil {
141 c, err = kafka.NewConsumer(&kafka.ConfigMap{
142 "bootstrap.servers": bootstrapserver,
143 "group.id": gid,
144 "client.id": cid,
145 "auto.offset.reset": "latest",
146 })
147 if err != nil {
148 log.Warning("Cannot create kafka consumer - retyring")
149 }
150 }
151
152 log.Info("Creating kafka consumer - ok")
153 log.Info("Start subscribing to topic: ", topic)
154 topic_ok := false
155 for !topic_ok {
156 err = c.SubscribeTopics([]string{topic}, nil)
157 if err != nil {
158 log.Info("Topic reader cannot start subscribing on topic: ", topic, " - retrying -- error details: ", err)
159 } else {
160 log.Info("Topic reader subscribing on topic: ", topic)
161 topic_ok = true
162 }
163 }
164
165 maxDur := 1 * time.Second
166 for {
167 msg, err := c.ReadMessage(maxDur)
168 if err == nil {
169 buf := &bytes.Buffer{}
170 if err := json.Indent(buf, msg.Value, "", " "); err != nil {
171 log.Warning("Received msg not json?")
172 } else {
173 fmt.Println(buf.String())
174 }
175 c.Commit()
176 } else {
177 log.Debug(" Nothing to consume on topic: ", topic, ", reason: ", err)
178 }
179 }
180}