blob: 9fc513a55482714870c2bd7a8fa66cc8a23b7fd4 [file] [log] [blame]
// -
// ========================LICENSE_START=================================
// O-RAN-SC
// %%
// Copyright (C) 2022: Nordix Foundation
// %%
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ========================LICENSE_END===================================
//
package main
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"flag"
"fmt"
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/esapi"
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"io/ioutil"
"net"
"net/http"
"net/url"
"rapps/utils/generatejwt"
"strings"
"time"
"log"
)
type Jwttoken struct {
Access_token string
Expires_in int
Refresh_expires_in int
Refresh_token string
Token_type string
Not_before_policy int
Session_state string
Scope string
}
var gatewayHost string
var gatewayPort string
var keycloakHost string
var keycloakPort string
var keycloakAlias string
var elasticHost string
var elasticPort string
var elasticAlias string
var securityEnabled string
var useGateway string
var role string
var rapp string
var methods string
var realmName string
var clientId string
var healthy bool = true
var ttime time.Time
var jwt Jwttoken
const (
namespace = "istio-nonrtric"
scope = "email"
client_assertion_type = "urn:ietf:params:oauth:client-assertion-type:jwt-bearer"
)
var (
reqDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "rapp_http_request_duration_seconds",
Help: "Duration of the last request call.",
Buckets: []float64{0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10},
}, []string{"app", "func", "handler", "method", "code"})
reqBytes = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Name: "rapp_bytes_summary",
Help: "Summary of bytes transferred over http",
}, []string{"app", "func", "handler", "method", "code"})
)
type MyDocument struct {
Timestamp string `json:"@timestamp"`
App string `json:"app"`
Func string `json:"func"`
Handler string `json:"handler"`
Method string `json:"method"`
Code string `json:"code"`
Bytes int64 `json:"bytes"`
}
var client *elasticsearch.Client
func connectToElasticsearch() *elasticsearch.Client {
clusterURLs := []string{"https://" + elasticAlias + ":" + elasticPort }
username := "elastic"
password := "secret"
cert, _ := ioutil.ReadFile("/ca/ca.crt")
dialer := &net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
DualStack: true,
}
// client configuration
cfg := elasticsearch.Config{
Addresses: clusterURLs,
Username: username,
Password: password,
CACert: cert,
Transport: &http.Transport{
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
//fmt.Println("address original =", addr)
if addr == elasticAlias+":"+elasticPort {
addr = elasticHost + ":" + elasticPort
//fmt.Println("address modified =", addr)
}
return dialer.DialContext(ctx, network, addr)
},
TLSClientConfig: &tls.Config{
MinVersion: tls.VersionTLS12,
},
},
}
es, err := elasticsearch.NewClient(cfg)
if err != nil {
log.Fatalf("Error creating the client: %s", err)
}
log.Println(elasticsearch.Version)
resp, err := es.Info()
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
defer resp.Body.Close()
log.Println(resp)
return es
}
func addEsIndex(app, fnc, hnd, meth, code string, bytes int64){
ts := time.Now().Format(time.RFC3339)
doc := MyDocument{Timestamp: ts, App: app, Func: fnc, Handler: hnd, Method: meth, Code: code, Bytes: bytes }
jsonString, _ := json.Marshal(doc)
uid := fmt.Sprintf("%v", uuid.New())
indexName := "gostash-"+time.Now().Format("2006.01.02")
request := esapi.IndexRequest{Index: indexName, DocumentID: uid, Body: strings.NewReader(string(jsonString))}
_, err := request.Do(context.Background(), client)
if err != nil {
fmt.Println(err)
}
}
func getToken() string {
if ttime.Before(time.Now()) {
resp := &http.Response{}
client_assertion := getClientAssertion()
keycloakUrl := "http://" + keycloakHost + ":" + keycloakPort + "/auth/realms/" + realmName + "/protocol/openid-connect/token"
fmt.Printf("Making token request to %s\n", keycloakUrl)
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) {
reqDuration.WithLabelValues("rapp-jwt-invoker", "getToken", resp.Request.URL.Path, resp.Request.Method,
resp.Status).Observe(v)
}))
defer timer.ObserveDuration()
var err error
if jwt.Refresh_token != "" {
resp, err = http.PostForm(keycloakUrl, url.Values{"client_assertion_type": {client_assertion_type},
"client_assertion": {client_assertion}, "grant_type": {"refresh_token"},
"refresh_token": {jwt.Refresh_token}, "client_id": {clientId}, "scope": {scope}})
} else {
resp, err = http.PostForm(keycloakUrl, url.Values{"client_assertion_type": {client_assertion_type},
"client_assertion": {client_assertion}, "grant_type": {"client_credentials"},
"client_id": {clientId}, "scope": {scope}})
}
if err != nil {
fmt.Println(err)
panic("Something wrong with the credentials or url ")
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
json.Unmarshal([]byte(body), &jwt)
reqBytes.WithLabelValues("rapp-jwt-invoker", "getToken", resp.Request.URL.Path, resp.Request.Method,
resp.Status).Observe(float64(resp.ContentLength))
addEsIndex("rapp-jwt-invoker", "getToken", resp.Request.URL.Path, resp.Request.Method,
resp.Status, resp.ContentLength)
ttime = time.Now()
ttime = ttime.Add(time.Second * time.Duration(jwt.Expires_in))
}
return jwt.Access_token
}
func getClientAssertion() string {
realm := "http://" + keycloakHost + ":" + keycloakPort + "/auth/realms/" + realmName
clientAssertion := generatejwt.CreateJWT("/certs/client.key", "/certs/client_pub.key", "", clientId, realm)
return clientAssertion
}
func MakeRequest(client *http.Client, prefix string, method string, ch chan string) {
var service = strings.Split(prefix, "/")[1]
var gatewayUrl = "http://" + gatewayHost + ":" + gatewayPort
var token = ""
var jsonValue []byte = []byte{}
var restUrl string = ""
if securityEnabled == "true" {
token = getToken()
} else {
useGateway = "N"
}
if strings.ToUpper(useGateway) != "Y" {
gatewayUrl = "http://" + service + "." + namespace + ":80"
prefix = ""
}
restUrl = gatewayUrl + prefix
resp := &http.Response{}
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) {
reqDuration.WithLabelValues("rapp-jwt-invoker", "MakeRequest", resp.Request.URL.Path, resp.Request.Method,
resp.Status).Observe(v)
}))
defer timer.ObserveDuration()
req, err := http.NewRequest(method, restUrl, bytes.NewBuffer(jsonValue))
if err != nil {
fmt.Printf("Got error %s", err.Error())
}
req.Header.Set("Content-type", "application/json")
req.Header.Set("Authorization", "Bearer "+token)
resp, err = client.Do(req)
if err != nil {
fmt.Printf("Got error %s", err.Error())
}
defer resp.Body.Close()
body, _ := ioutil.ReadAll(resp.Body)
reqBytes.WithLabelValues("rapp-jwt-invoker", "MakeRequest", req.URL.Path, req.Method,
resp.Status).Observe(float64(resp.ContentLength))
addEsIndex("rapp-jwt-invoker", "MakeRequest", req.URL.Path, req.Method, resp.Status, resp.ContentLength)
respString := string(body[:])
if respString == "RBAC: access denied" {
respString += " for " + service + " " + strings.ToLower(method) + " request"
}
fmt.Printf("Received response for %s %s request - %s\n", service, strings.ToLower(method), respString)
ch <- prefix + "," + method
}
func health(res http.ResponseWriter, req *http.Request) {
if healthy {
res.WriteHeader(http.StatusOK)
res.Write([]byte("healthy"))
} else {
res.WriteHeader(http.StatusInternalServerError)
res.Write([]byte("unhealthy"))
}
}
func main() {
ttime = time.Now()
time.Sleep(3 * time.Second)
prometheus.Register(reqDuration)
prometheus.Register(reqBytes)
flag.StringVar(&gatewayHost, "gatewayHost", "istio-ingressgateway.istio-system", "Gateway Host")
flag.StringVar(&gatewayPort, "gatewayPort", "80", "Gateway Port")
flag.StringVar(&keycloakHost, "keycloakHost", "istio-ingressgateway.istio-system", "Keycloak Host")
flag.StringVar(&keycloakPort, "keycloakPort", "80", "Keycloak Port")
flag.StringVar(&elasticHost, "elasticHost", "istio-ingressgateway.istio-system", "Elasticsearch Host")
flag.StringVar(&elasticPort, "elasticPort", "443", "Elasticsearch Port")
flag.StringVar(&elasticAlias, "elasticAlias", "elasticsearch.est.tech", "Elasticsearch URL Alias")
flag.StringVar(&useGateway, "useGateway", "Y", "Connect to services through API gateway")
flag.StringVar(&securityEnabled, "securityEnabled", "true", "Security is required to use this application")
flag.StringVar(&realmName, "realm", "jwt", "Keycloak realm")
flag.StringVar(&clientId, "client", "jwtprovider-cli", "Keycloak client")
flag.StringVar(&role, "role", "provider-viewer", "Role granted to application")
flag.StringVar(&rapp, "rapp", "rapp-jwt-provider", "Name of rapp to invoke")
flag.StringVar(&methods, "methods", "GET", "Methods to access application")
flag.Parse()
healthHandler := http.HandlerFunc(health)
http.Handle("/health", healthHandler)
http.Handle("/metrics", promhttp.Handler())
client = connectToElasticsearch()
go func() {
http.ListenAndServe(":9000", nil)
}()
client := &http.Client{
Timeout: time.Second * 10,
}
ch := make(chan string)
var prefixArray []string = []string{"/" + rapp}
var methodArray []string = []string{methods}
for _, prefix := range prefixArray {
for _, method := range methodArray {
go MakeRequest(client, prefix, method, ch)
}
}
ioutil.WriteFile("init.txt", []byte("Initialization done."), 0644)
for r := range ch {
go func(resp string) {
time.Sleep(10 * time.Second)
elements := strings.Split(resp, ",")
prefix := elements[0]
method := elements[1]
MakeRequest(client, prefix, method, ch)
}(r)
}
}