blob: 4113a421d98851c51dfd9a3b945bd39fa6b74a96 [file] [log] [blame]
BjornMagnussonXAc5655db2023-03-17 14:55:16 +01001// ============LICENSE_START===============================================
2// Copyright (C) 2023 Nordix Foundation. All rights reserved.
3// ========================================================================
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15// ============LICENSE_END=================================================
16//
17
18package main
19
20import (
21 "bytes"
22 "compress/gzip"
23 "context"
24 "encoding/json"
25 "fmt"
26 "io"
27 "io/ioutil"
28 "net/http"
29 "net/http/pprof"
30 "os"
31 "os/signal"
32 "runtime"
33 "strconv"
34 "strings"
35 "syscall"
36 "time"
37
38 "github.com/confluentinc/confluent-kafka-go/kafka"
39 "github.com/gorilla/mux"
40 jsoniter "github.com/json-iterator/go"
41 log "github.com/sirupsen/logrus"
42 "golang.org/x/oauth2/clientcredentials"
43)
44
45type JobDefinition struct {
46 InfoTypeID string `json:"info_type_id"`
47 JobOwner string `json:"job_owner"`
48 JobResultURI string `json:"job_result_uri"`
49 JobDefinition struct {
50 KafkaOutputTopic string `json:"kafkaOutputTopic"`
51 FilterType string `json:"filterType"`
52 Filter json.RawMessage `json:"filter"`
53 DeliveryInfo struct {
54 Topic string `json:"topic"`
55 BootStrapServers string `json:"bootStrapServers"`
56 } `json:"deliveryInfo"`
57 } `json:"job_definition"`
58}
59
60const jobdef = "/config/jobDefinition.json"
61
62var rapp_id = os.Getenv("APPID")
63
64var rapp_ns = os.Getenv("APPNS")
65
66var bootstrapserver = os.Getenv("KAFKA_SERVER")
67
68var topic = os.Getenv("TOPIC")
69
70var ics_server = os.Getenv("ICS")
71
72var jwt_file = os.Getenv("JWT_FILE")
73
74var ssl_path = os.Getenv("SSLPATH")
75
76var gzipped_data = os.Getenv("GZIP")
77
78var log_payload = os.Getenv("LOG_PAYLOAD")
79
80// This are optional - if rapp is fethcing the token instead of the side car
81var creds_grant_type = os.Getenv("CREDS_GRANT_TYPE")
82var creds_client_secret = os.Getenv("CREDS_CLIENT_SECRET")
83var creds_client_id = os.Getenv("CREDS_CLIENT_ID")
84var creds_service_url = os.Getenv("AUTH_SERVICE_URL")
85
86var gid = ""
87var cid = "cid-0"
88
89var msg_count int = 0
90var msg_corrupted_count int = 0
91
92var jobid = "<not-set>"
93var consumer_type = "<not-set>"
94
95var currentToken = ""
96
97var appStatus = "INIT"
98
99var msg_per_sec int = 0
100
101var httpclient = &http.Client{}
102
103// == Main ==//
104func main() {
105
106 log.SetLevel(log.InfoLevel)
107 log.SetLevel(log.DebugLevel)
108
109 log.Info("Server starting...")
110
111 if creds_service_url != "" {
112 log.Warn("Disabling jwt retrieval from side car")
113 jwt_file = ""
114 }
115
116 if rapp_id == "" {
117 log.Error("Env APPID not set")
118 os.Exit(1)
119 }
120
121 if rapp_ns == "" {
122 log.Error("Env APPNS not set")
123 os.Exit(1)
124 }
125
126 if bootstrapserver == "" {
127 log.Error("Env KAFKA_SERVER not set")
128 os.Exit(1)
129 }
130
131 if topic == "" {
132 log.Error("Env TOPIC not set")
133 os.Exit(1)
134 }
135
136 if ics_server == "" {
137 log.Error("Env ICS not set")
138 os.Exit(1)
139 }
140
141 rtr := mux.NewRouter()
142 rtr.HandleFunc("/statistics", statistics)
143 rtr.HandleFunc("/status", status)
144 rtr.HandleFunc("/logging/{level}", logging_level)
145 rtr.HandleFunc("/logging", logging_level)
146 rtr.HandleFunc("/", alive)
147
148 //For perf/mem profiling
149 rtr.HandleFunc("/custom_debug_path/profile", pprof.Profile)
150
151 http.Handle("/", rtr)
152
153 fileBytes, err := os.ReadFile(jobdef)
154 if err != nil {
155 log.Error("Cannot read job defintion file: ", jobdef, err)
156 os.Exit(1)
157 }
158 fmt.Println("FROM FILE")
159 fmt.Println(string(fileBytes))
160
161 job_json := JobDefinition{}
162 err = jsoniter.Unmarshal([]byte(fileBytes), &job_json)
163 if err != nil {
164 log.Error("Cannot parse job defintion file: ", jobdef, err)
165 os.Exit(1)
166 }
167 job_type := job_json.InfoTypeID
168 job_json.JobDefinition.KafkaOutputTopic = topic
169 job_json.JobDefinition.DeliveryInfo.Topic = topic
170 job_json.JobDefinition.DeliveryInfo.BootStrapServers = bootstrapserver
171
172 gid = "pm-rapp-" + job_type + "-" + rapp_id
173
174 jobid = "rapp-job-" + job_type + "-" + rapp_id
175
176 json_bytes, err := json.Marshal(job_json)
177 if err != nil {
178 log.Error("Cannot marshal job json", err)
179 os.Exit(1)
180 }
181
182 json_str := string(json_bytes)
183
184 if strings.HasPrefix(bootstrapserver, "http://") {
185 if creds_service_url != "" {
186 consumer_type = "accesstoken strimzi bridge consumer"
187 retrive_token_strimzi()
188 }
189 } else {
190 go read_kafka_messages()
191 }
192
193 ok := false
194 if ics_server != "" {
195 for !ok {
196 log.Debug("Registring job: ", jobid, " json: ", json_str)
197 ok, _ = send_http_request([]byte(json_str), http.MethodPut, "http://"+ics_server+"/data-consumer/v1/info-jobs/"+jobid, "", currentToken, 0, false)
198 if !ok {
199 log.Info("Failed to register job: ", jobid, " - retrying")
200 time.Sleep(time.Second)
201 }
202 }
203 } else {
204 log.Info("No job registered - read from topic only")
205 }
206 if strings.HasPrefix(bootstrapserver, "http://") {
207 go read_bridge_messages()
208 }
209
210 go calc_average()
211
212 http_port := "80"
213 http_server := &http.Server{Addr: ":" + http_port, Handler: nil}
214
215 sigs := make(chan os.Signal, 1)
216 signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
217 go func() {
218 fmt.Println("Setting handler for signal sigint and sigterm")
219 sig := <-sigs
220 appStatus = "TERMINATING"
221 fmt.Printf("Received signal %s - application will terminate\n", sig)
222
223 if strings.HasPrefix(bootstrapserver, "http://") {
224 log.Debug("stopping strimzi consumer for job: ", jobid)
225 ok, _ = send_http_request(nil, http.MethodDelete, bootstrapserver+"/consumers/"+gid+"/instances/"+cid, "", currentToken, 0, false)
226 if !ok {
227 log.Info("Failed to delete consumer "+cid+" in group: ", gid, " - retrying")
228 }
229 }
230
231 ok := false
232 if ics_server != "" {
233 for !ok {
234 log.Debug("stopping job: ", jobid, " json: ", json_str)
235 ok, _ = send_http_request(nil, http.MethodDelete, "http://"+ics_server+"/data-consumer/v1/info-jobs/"+jobid, "", currentToken, 0, false)
236 if !ok {
237 log.Info("Failed to stop job: ", jobid, " - retrying")
238 time.Sleep(time.Second)
239 }
240 }
241 }
242 http_server.Shutdown(context.Background())
243 }()
244 appStatus = "RUNNING"
245 log.Info("Starting http service...")
246 err = http_server.ListenAndServe()
247 if err == http.ErrServerClosed { // graceful shutdown
248 log.Info("http server shutdown...")
249 os.Exit(1)
250 } else if err != nil {
251 log.Error("http server error: ", err)
252 log.Info("http server shutdown...")
253 os.Exit(1)
254 }
255
256 //Wait until all go routines has exited
257 runtime.Goexit()
258
259 log.Warn("main routine exit")
260 log.Warn("server is stopping...")
261}
262
263// Simple alive check
264func alive(w http.ResponseWriter, req *http.Request) {
265 //Alive check
266}
267
268// Get/Set logging level
269func logging_level(w http.ResponseWriter, req *http.Request) {
270 vars := mux.Vars(req)
271 if level, ok := vars["level"]; ok {
272 if req.Method == http.MethodPut {
273 switch level {
274 case "trace":
275 log.SetLevel(log.TraceLevel)
276 case "debug":
277 log.SetLevel(log.DebugLevel)
278 case "info":
279 log.SetLevel(log.InfoLevel)
280 case "warn":
281 log.SetLevel(log.WarnLevel)
282 case "error":
283 log.SetLevel(log.ErrorLevel)
284 case "fatal":
285 log.SetLevel(log.FatalLevel)
286 case "panic":
287 log.SetLevel(log.PanicLevel)
288 default:
289 w.WriteHeader(http.StatusNotFound)
290 }
291 } else {
292 w.WriteHeader(http.StatusMethodNotAllowed)
293 }
294 } else {
295 if req.Method == http.MethodGet {
296 msg := "none"
297 if log.IsLevelEnabled(log.PanicLevel) {
298 msg = "panic"
299 } else if log.IsLevelEnabled(log.FatalLevel) {
300 msg = "fatal"
301 } else if log.IsLevelEnabled(log.ErrorLevel) {
302 msg = "error"
303 } else if log.IsLevelEnabled(log.WarnLevel) {
304 msg = "warn"
305 } else if log.IsLevelEnabled(log.InfoLevel) {
306 msg = "info"
307 } else if log.IsLevelEnabled(log.DebugLevel) {
308 msg = "debug"
309 } else if log.IsLevelEnabled(log.TraceLevel) {
310 msg = "trace"
311 }
312 w.Header().Set("Content-Type", "application/text")
313 w.Write([]byte(msg))
314 } else {
315 w.WriteHeader(http.StatusMethodNotAllowed)
316 }
317 }
318}
319
320// Get app state
321func status(w http.ResponseWriter, req *http.Request) {
322 if req.Method != http.MethodGet {
323 w.WriteHeader(http.StatusMethodNotAllowed)
324 return
325 }
326
327 _, err := w.Write([]byte(appStatus))
328 if err != nil {
329 w.WriteHeader(http.StatusInternalServerError)
330 log.Error("Cannot send statistics json")
331 return
332 }
333}
334
335// producer statictics, all jobs
336func statistics(w http.ResponseWriter, req *http.Request) {
337 if req.Method != http.MethodGet {
338 w.WriteHeader(http.StatusMethodNotAllowed)
339 return
340 }
341 m := make(map[string]interface{})
342 log.Debug("rapp statictics")
343
344 req.Header.Set("Content-Type", "application/json; charset=utf-8")
345 m["number-of-messages"] = strconv.Itoa(msg_count)
346 m["number-of-corrupted-messages"] = strconv.Itoa(msg_corrupted_count)
347 m["job id"] = jobid
348 m["group id"] = gid
349 m["client id"] = cid
350 m["kafka consumer type"] = consumer_type
351 m["server"] = bootstrapserver
352 m["topic"] = topic
353 m["messages per sec"] = msg_per_sec
354
355 json, err := json.Marshal(m)
356 if err != nil {
357 w.WriteHeader(http.StatusInternalServerError)
358 log.Error("Cannot marshal statistics json")
359 return
360 }
361 _, err = w.Write(json)
362 if err != nil {
363 w.WriteHeader(http.StatusInternalServerError)
364 log.Error("Cannot send statistics json")
365 return
366 }
367}
368
369func calc_average() {
370
371 for true {
372 v := msg_count
373 time.Sleep(60 * time.Second)
374 msg_per_sec = (msg_count - v) / 60
375 }
376}
377
378func send_http_request(jsonData []byte, method string, url string, contentType string, accessToken string, alt_ok_response int, returnJson bool) (bool, map[string]interface{}) {
379
380 var req *http.Request
381 var err error
382 if jsonData != nil {
383 req, err = http.NewRequest(method, url, bytes.NewBuffer(jsonData))
384 if err != nil {
385 log.Error("Cannot create http request method: ", method, " url: ", url)
386 return false, nil
387 }
388 if contentType == "" {
389 req.Header.Set("Content-Type", "application/json; charset=utf-8")
390 } else {
391 req.Header.Set("Content-Type", contentType)
392 }
393 } else {
394 req, err = http.NewRequest(method, url, nil)
395 if err != nil {
396 log.Error("Cannot create http request method: ", method, " url: ", url)
397 return false, nil
398 }
399 }
400 if jwt_file != "" || creds_service_url != "" {
401 if accessToken != "" {
402 req.Header.Add("authorization", accessToken)
403 } else {
404 log.Error("Cannot create http request for url: ", url, " - token missing")
405 return false, nil
406 }
407 }
408 log.Debug("Http request: ", req)
409 resp, err2 := httpclient.Do(req)
410 if err2 != nil {
411 log.Error("Cannot send http request, method: ", method, "url: ", url)
412 } else {
413 if resp.StatusCode == 200 || resp.StatusCode == 201 || resp.StatusCode == 204 {
414
415 if returnJson {
416 defer resp.Body.Close()
417 body, err3 := ioutil.ReadAll(resp.Body)
418 if err3 != nil {
419 log.Error("Cannot read body, method: ", method, ", url: ", url, " resp: ", resp.StatusCode)
420 return false, nil
421 } else {
422 var responseJson map[string]interface{}
423 err := json.Unmarshal(body, &responseJson)
424 if err != nil {
425 log.Error("Received msg not json? - cannot unmarshal")
426 return false, nil
427 }
428 fmt.Println(string(body))
429 log.Debug("Accepted response code: ", resp.StatusCode)
430 return true, responseJson
431 }
432 }
433
434 log.Debug("Accepted response code: ", resp.StatusCode)
435 return true, nil
436 } else {
437 if alt_ok_response != 0 && resp.StatusCode == alt_ok_response {
438
439 if returnJson {
440 defer resp.Body.Close()
441 body, err3 := ioutil.ReadAll(resp.Body)
442 if err3 != nil {
443 log.Error("Cannot read body, method: ", method, ", url: ", url, " resp: ", resp.StatusCode)
444 return false, nil
445 } else {
446 var responseJson map[string]interface{}
447 err := json.Unmarshal(body, &responseJson)
448 if err != nil {
449 log.Error("Received msg not json? - cannot unmarshal")
450 return false, nil
451 }
452 fmt.Println(string(body))
453 log.Debug("Accepted alternative response code: ", resp.StatusCode)
454 return true, responseJson
455 }
456 }
457 } else {
458 log.Error("Bad response, method: ", method, " url: ", url, " resp: ", resp.StatusCode)
459 }
460 }
461 }
462 return false, nil
463
464}
465
466func retrive_token_strimzi() {
467 log.Debug("Get token inline - strimzi comm")
468
469 conf := &clientcredentials.Config{
470 ClientID: creds_client_id,
471 ClientSecret: creds_client_secret,
472 TokenURL: creds_service_url,
473 }
474 var modExpiry = time.Now()
475 ok := false
476 for !ok {
477 token, err := conf.Token(context.Background())
478 if err != nil {
479 log.Warning("Cannot fetch access token: ", err, " - retrying ")
480 time.Sleep(time.Second)
481 continue
482 }
483 log.Debug("token: ", token)
484 log.Debug("TokenValue: ", token.AccessToken)
485 log.Debug("Expiration: ", token.Expiry)
486 modExpiry = token.Expiry.Add(-time.Minute)
487 log.Debug("Modified expiration: ", modExpiry)
488 currentToken = token.AccessToken
489 ok = true
490 }
491 log.Debug("Initial token ok")
492 diff := modExpiry.Sub(time.Now())
493 go func() {
494 select {
495 case <-time.After(diff):
496 for !ok {
497 token, err := conf.Token(context.Background())
498 if err != nil {
499 log.Warning("Cannot fetch access token: ", err, " - retrying ")
500 time.Sleep(time.Second)
501 continue
502 }
503 log.Debug("token: ", token)
504 log.Debug("TokenValue: ", token.AccessToken)
505 log.Debug("Expiration: ", token.Expiry)
506 modExpiry = token.Expiry.Add(-time.Minute)
507 log.Debug("Modified expiration: ", modExpiry)
508 currentToken = token.AccessToken
509 ok = true
510 }
511 diff = modExpiry.Sub(time.Now())
512 }
513 }()
514}
515
516func retrive_token(c *kafka.Consumer) {
517 log.Debug("Get token inline")
518 conf := &clientcredentials.Config{
519 ClientID: creds_client_id,
520 ClientSecret: creds_client_secret,
521 TokenURL: creds_service_url,
522 }
523 token, err := conf.Token(context.Background())
524 if err != nil {
525 log.Warning("Cannot fetch access token: ", err)
526 c.SetOAuthBearerTokenFailure(err.Error())
527 return
528 }
529 extensions := map[string]string{}
530 log.Debug("token: ", token)
531 log.Debug("TokenValue: ", token.AccessToken)
532 log.Debug("Expiration: ", token.Expiry)
533 t := token.Expiry.Add(-time.Minute)
534 log.Debug("Modified expiration: ", t)
535 oauthBearerToken := kafka.OAuthBearerToken{
536 TokenValue: token.AccessToken,
537 Expiration: t,
538 Extensions: extensions,
539 }
540 log.Debug("Setting new token to consumer")
541 setTokenError := c.SetOAuthBearerToken(oauthBearerToken)
542 currentToken = token.AccessToken
543 if setTokenError != nil {
544 log.Warning("Cannot cannot set token in client: ", setTokenError)
545 c.SetOAuthBearerTokenFailure(setTokenError.Error())
546 }
547}
548
549func gzipWrite(w io.Writer, data *[]byte) error {
550 gw, err1 := gzip.NewWriterLevel(w, gzip.BestSpeed)
551
552 if err1 != nil {
553 return err1
554 }
555 defer gw.Close()
556 _, err2 := gw.Write(*data)
557 return err2
558}
559
560func read_bridge_messages() {
561
562 consumer_type = "unsecure strimzi bridge consumer"
563 if creds_service_url != "" {
564 consumer_type = "accesstoken strimzi bridge consumer"
565 }
566 ok := false
567 log.Debug("Cleaning consumer "+cid+" in group: ", gid)
568 ok, _ = send_http_request(nil, http.MethodDelete, bootstrapserver+"/consumers/"+gid+"/instances/"+cid, "", currentToken, 0, false)
569 if !ok {
570 log.Info("Failed to delete consumer "+cid+" in group: ", gid, " - it may not exist - ok")
571 }
572 var bridge_base_url = ""
573 ok = false
574 json_str := "{\"name\": \"" + cid + "\", \"auto.offset.reset\": \"latest\",\"format\": \"json\"}"
575 for !ok {
576 log.Debug("Creating consumer "+cid+" in group: ", gid)
577 var respJson map[string]interface{}
578 ok, respJson = send_http_request([]byte(json_str), http.MethodPost, bootstrapserver+"/consumers/"+gid, "application/vnd.kafka.v2+json", currentToken, 409, true) //409 if consumer already exists
579 if ok {
580 bridge_base_url = fmt.Sprintf("%s", respJson["base_uri"])
581 } else {
582 log.Info("Failed create consumer "+cid+" in group: ", gid, " - retrying")
583 time.Sleep(time.Second)
584 }
585 }
586
587 ok = false
588 json_str = "{\"topics\": [\"" + topic + "\"]}"
589
590 for !ok {
591 log.Debug("Subscribing to topic: ", topic)
592 ok, _ = send_http_request([]byte(json_str), http.MethodPost, bridge_base_url+"/subscription", "application/vnd.kafka.v2+json", currentToken, 0, false)
593 if !ok {
594 log.Info("Failed subscribe to topic: ", topic, " - retrying")
595 time.Sleep(time.Second)
596 }
597 }
598
599 for true {
600 log.Debug("Reading messages on topic: ", topic)
601
602 var req *http.Request
603 var err error
604 url := bridge_base_url + "/records"
605
606 req, err = http.NewRequest(http.MethodGet, url, nil)
607 if err != nil {
608 log.Error("Cannot create http request method: GET, url: ", url)
609 time.Sleep(1 * time.Second)
610 continue
611 }
612 req.Header.Set("accept", "application/vnd.kafka.json.v2+json")
613
614 if creds_service_url != "" {
615 if currentToken != "" {
616 req.Header.Add("authorization", currentToken)
617 } else {
618 log.Error("Cannot create http request for url: ", url, " - token missing")
619 time.Sleep(1 * time.Second)
620 continue
621 }
622 }
623
624 values := req.URL.Query()
625 values.Add("timeout", "10000")
626 req.URL.RawQuery = values.Encode()
627
628 log.Debug(req)
629
630 resp, err2 := httpclient.Do(req)
631 if err2 != nil {
632 log.Error("Cannot send http request, method: GET, url: ", url)
633 time.Sleep(1 * time.Second)
634 continue
635 } else {
636 body, err := ioutil.ReadAll(resp.Body)
637 resp.Body.Close()
638 if resp.StatusCode == 200 || resp.StatusCode == 201 || resp.StatusCode == 204 {
639 log.Debug("Accepted response code: ", resp.StatusCode)
640
641 if err != nil {
642 log.Error("Cannot read body, method: GET, url: ", url, " resp: ", resp.StatusCode)
643 } else {
644 var responseJson []interface{}
645 err := json.Unmarshal(body, &responseJson)
646 if err != nil {
647 log.Error("Received msg not json? - cannot unmarshal")
648 msg_corrupted_count++
649 } else {
650 if len(responseJson) == 0 {
651 log.Debug("No message")
652 continue
653 }
654 for _, item := range responseJson {
655 j, err := json.MarshalIndent(item, "", " ")
656 if err != nil {
657 log.Error("Message in array not json? - cannot unmarshal")
658 msg_corrupted_count++
659 } else {
660 msg_count++
661 if log_payload != "" {
662 fmt.Println("Message: " + string(j))
663 }
664 }
665 }
666 }
667 }
668
669 log.Debug("Commiting message")
670 ok, _ = send_http_request(nil, http.MethodPost, bridge_base_url+"/offsets", "", currentToken, 0, false)
671 if !ok {
672 log.Info("Failed to commit message")
673 }
674
675 } else {
676 log.Error("Bad response, method: GET, url: ", url, " resp: ", resp.StatusCode)
677 log.Error("Bad response, data: ", string(body))
678 }
679 }
680 }
681
682}
683
684func read_kafka_messages() {
685 var c *kafka.Consumer = nil
686 log.Info("Creating kafka consumer...")
687 var err error
688 for c == nil {
689 if jwt_file == "" && creds_service_url == "" {
690 if ssl_path == "" {
691 log.Info("unsecure consumer")
692 consumer_type = "kafka unsecure consumer"
693 c, err = kafka.NewConsumer(&kafka.ConfigMap{
694 "bootstrap.servers": bootstrapserver,
695 "group.id": gid,
696 "client.id": cid,
697 "auto.offset.reset": "latest",
698 })
699 } else {
700 log.Info("ssl consumer")
701 consumer_type = "kafka ssl consumer"
702 c, err = kafka.NewConsumer(&kafka.ConfigMap{
703 "bootstrap.servers": bootstrapserver,
704 "group.id": gid,
705 "client.id": cid,
706 "auto.offset.reset": "latest",
707 "security.protocol": "SSL",
708 "ssl.key.location": ssl_path + "/clt.key",
709 "ssl.certificate.location": ssl_path + "/clt.crt",
710 "ssl.ca.location": ssl_path + "/ca.crt",
711 })
712 }
713 } else {
714 if ssl_path != "" {
715 panic("SSL cannot be configued with JWT_FILE or RAPP_AUTH_SERVICE_URL")
716 }
717 log.Info("sasl consumer")
718 consumer_type = "kafka sasl unsecure consumer"
719 c, err = kafka.NewConsumer(&kafka.ConfigMap{
720 "bootstrap.servers": bootstrapserver,
721 "group.id": gid,
722 "client.id": cid,
723 "auto.offset.reset": "latest",
724 "sasl.mechanism": "OAUTHBEARER",
725 "security.protocol": "SASL_PLAINTEXT",
726 })
727 }
728 if err != nil {
729 log.Warning("Cannot create kafka consumer - retrying, error: ", err)
730 time.Sleep(1 * time.Second)
731 }
732 }
733
734 log.Info("Creating kafka consumer - ok")
735 log.Info("Start subscribing to topic: ", topic)
736 topic_ok := false
737 for !topic_ok {
738 err = c.SubscribeTopics([]string{topic}, nil)
739 if err != nil {
740 log.Info("Topic reader cannot start subscribing on topic: ", topic, " - retrying -- error details: ", err)
741 } else {
742 log.Info("Topic reader subscribing on topic: ", topic)
743 topic_ok = true
744 }
745 }
746
747 fileModTime := time.Now()
748 for {
749 if jwt_file != "" {
750 fileInfo, err := os.Stat(jwt_file)
751 if err == nil {
752 if fileModTime != fileInfo.ModTime() {
753 log.Debug("JWT file is updated")
754 fileModTime = fileInfo.ModTime()
755 fileBytes, err := ioutil.ReadFile(jwt_file)
756 if err != nil {
757 log.Error("JWT file read error: ", err)
758 } else {
759 fileString := string(fileBytes)
760 log.Info("JWT: ", fileString)
761 t := time.Now()
762 t15 := time.Second * 300
763 t = t.Add(t15)
764 oauthBearerToken := kafka.OAuthBearerToken{
765 TokenValue: fileString,
766 Expiration: t,
767 }
768 log.Debug("Setting new token to consumer")
769 setTokenError := c.SetOAuthBearerToken(oauthBearerToken)
770 if setTokenError != nil {
771 log.Warning("Cannot cannot set token in client: ", setTokenError)
772 }
773 }
774 } else {
775 log.Debug("JWT file not updated - OK")
776 }
777 } else {
778 log.Error("JWT does not exist: ", err)
779 }
780 }
781 ev := c.Poll(1000)
782 if ev == nil {
783 log.Debug(" Nothing to consume on topic: ", topic)
784 continue
785 }
786 switch e := ev.(type) {
787 case *kafka.Message:
788 var pdata *[]byte = &e.Value
789 if gzipped_data != "" {
790 var buf bytes.Buffer
791 err = gzipWrite(&buf, pdata)
792 if err != nil {
793 log.Warning("Cannot unzip data")
794 pdata = nil
795 } else {
796 *pdata = buf.Bytes()
797 fmt.Println("Unzipped data")
798 }
799 }
800 if pdata != nil {
801 buf := &bytes.Buffer{}
802
803 if err := json.Indent(buf, *pdata, "", " "); err != nil {
804 log.Warning("Received msg not json?")
805 } else {
806 fmt.Println(buf.String())
807 msg_count++
808 fmt.Println("Number of received json msgs: " + strconv.Itoa(msg_count))
809 }
810 }
811 c.Commit()
812 case kafka.Error:
813 fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
814
815 case kafka.OAuthBearerTokenRefresh:
816 if jwt_file == "" {
817 oart, ok := ev.(kafka.OAuthBearerTokenRefresh)
818 fmt.Println(oart)
819 if !ok {
820 continue
821 }
822 retrive_token(c)
823 }
824 default:
825 fmt.Printf("Ignored %v\n", e)
826 }
827
828 }
829}