blob: 508609849afb822e9ccf16166b1fca8448c5748b [file] [log] [blame]
/*
* Copyright (c) 2019 AT&T Intellectual Property.
* Copyright (c) 2018-2019 Nokia.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* This source code is part of the near-RT RIC (RAN Intelligent Controller)
* platform project (RICP).
*
*/
package main
import (
"errors"
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
"time"
mdcloggo "gerrit.o-ran-sc.org/r/com/golog.git"
)
var appmgrDomain string
const appmgrXAppConfigPath = "/ric/v1/config"
const appmgrPort = "8080"
// VesMgr contains runtime information of the vesmgr process
type VesMgr struct {
myIPAddress string
chXAppSubscriptions chan subscriptionNotification
chXAppNotifications chan []byte
chSupervision chan chan string
chVesagent chan error
vesagent cmdRunner
httpServer HTTPServer
}
type subscriptionNotification struct {
subscribed bool
err error
subsID string
}
var logger *mdcloggo.MdcLogger
// Version information, which is filled during compilation
// Version tag of vesmgr container
var Version string
// Hash of the git commit used in building
var Hash string
const vesmgrXappNotifPort = "8080"
const vesmgrXappNotifPath = "/vesmgr_xappnotif/"
const timeoutPostXAppSubscriptions = 5
const vespaConfigFile = "/etc/ves-agent/ves-agent.yaml"
func init() {
logger, _ = mdcloggo.InitLogger("vesmgr")
}
func getMyIP() (myIP string, retErr error) {
addrs, err := net.InterfaceAddrs()
if err != nil {
logger.Error("net.InterfaceAddrs failed: %s", err.Error())
return "", err
}
for _, addr := range addrs {
// check the address type and if it is not a loopback take it
if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
if ipnet.IP.To4() != nil {
logger.Info("My IP Address: %s", ipnet.IP.String())
return ipnet.IP.String(), nil
}
}
}
return "", nil
}
func createConf(fname string, xappMetrics []byte) {
f, err := os.Create(fname)
if err != nil {
logger.Error("Cannot create vespa conf file: %s", err.Error())
os.Exit(1)
}
defer f.Close()
createVespaConfig(f, xappMetrics)
logger.Info("Vespa config created")
}
func (vesmgr *VesMgr) subscribeXAppNotifications() {
xappNotifURL := "http://" + vesmgr.myIPAddress + ":" + vesmgrXappNotifPort + vesmgrXappNotifPath
subsURL := "http://" + appmgrDomain + ":" + appmgrPort + appmgrSubsPath
go subscribexAppNotifications(xappNotifURL, vesmgr.chXAppSubscriptions, timeoutPostXAppSubscriptions, subsURL)
logger.Info("xApp notifications subscribed from %s", subsURL)
}
// Init initializes the vesmgr
func (vesmgr *VesMgr) Init(listenPort string) *VesMgr {
logger.Info("vesmgrInit")
logger.Info("version: %s (%s)", Version, Hash)
var err error
if vesmgr.myIPAddress, err = getMyIP(); err != nil || vesmgr.myIPAddress == "" {
logger.Error("Cannot get myIPAddress: IP %s", vesmgr.myIPAddress)
panic("Cannot get my IP address")
}
var ok bool
appmgrDomain, ok = os.LookupEnv("VESMGR_APPMGRDOMAIN")
if ok {
logger.Info("Using appmgrdomain %s", appmgrDomain)
} else {
pltnamespace := os.Getenv("PLT_NAMESPACE")
if pltnamespace == "" {
pltnamespace = "ricplt"
}
appmgrDomain = fmt.Sprintf("service-%s-appmgr-http.%s.svc.cluster.local", pltnamespace, pltnamespace)
logger.Info("Using default appmgrdomain %s", appmgrDomain)
}
vesmgr.chXAppSubscriptions = make(chan subscriptionNotification)
// Create notifications as buffered channel so that
// xappmgr does not block if we are stuck somewhere
vesmgr.chXAppNotifications = make(chan []byte, 10)
vesmgr.chSupervision = make(chan chan string)
vesmgr.chVesagent = make(chan error)
vesmgr.httpServer = HTTPServer{}
vesmgr.httpServer.init(vesmgr.myIPAddress + ":" + listenPort)
vesmgr.vesagent = makeRunner("ves-agent", "-i", os.Getenv("VESMGR_HB_INTERVAL"),
"-m", os.Getenv("VESMGR_MEAS_INTERVAL"), "--Measurement.Prometheus.Address",
os.Getenv("VESMGR_PROMETHEUS_ADDR"), "--AlertManager.Bind", os.Getenv("VESMGR_ALERTMANAGER_BIND_ADDR"),
"--Debug")
return vesmgr
}
func (vesmgr *VesMgr) startVesagent() {
vesmgr.vesagent.run(vesmgr.chVesagent)
}
func (vesmgr *VesMgr) killVespa() error {
logger.Info("Killing vespa")
err := vesmgr.vesagent.kill()
if err != nil {
logger.Error("Cannot kill vespa: %s", err.Error())
return err
}
return <-vesmgr.chVesagent // wait vespa exit
}
func queryXAppsConfig(appmgrURL string, timeout time.Duration) ([]byte, error) {
emptyConfig := []byte("{}")
logger.Info("query xAppConfig started, url %s", appmgrURL)
req, err := http.NewRequest("GET", appmgrURL, nil)
if err != nil {
logger.Error("Failed to create a HTTP request: %s", err)
return emptyConfig, err
}
req.Header.Set("Content-Type", "application/json")
client := &http.Client{}
client.Timeout = time.Second * timeout
resp, err := client.Do(req)
if err != nil {
logger.Error("Query xApp config failed: %s", err)
return emptyConfig, err
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusOK {
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
logger.Error("Failed to read xApp config body: %s", err)
return emptyConfig, err
}
logger.Info("query xAppConfig completed")
return body, nil
}
logger.Error("Error from xApp config query: %s", resp.Status)
return emptyConfig, errors.New(resp.Status)
}
func queryConf() (appConfig []byte, err error) {
for i := 0; i < 10; i++ {
appConfig, err = queryXAppsConfig("http://"+appmgrDomain+":"+appmgrPort+appmgrXAppConfigPath, 10*time.Second)
if len(appConfig) > 0 {
break
}
time.Sleep(5 * time.Second)
}
return appConfig, err
}
func (vesmgr *VesMgr) emptyNotificationsChannel() {
for {
select {
case <-vesmgr.chXAppNotifications:
// we don't care the content
default:
return
}
}
}
func (vesmgr *VesMgr) servRequest() {
select {
case supervision := <-vesmgr.chSupervision:
logger.Info("vesmgr: supervision")
supervision <- "OK"
case xAppNotif := <-vesmgr.chXAppNotifications:
logger.Info("vesmgr: xApp notification")
logger.Info(string(xAppNotif))
vesmgr.emptyNotificationsChannel()
/*
* If xapp config query fails then we cannot create
* a new configuration and kill vespa.
* In that case we assume that
* the situation is fixed when the next
* xapp notif comes
*/
xappConfig, err := queryConf()
if err == nil {
vesmgr.killVespa()
createConf(vespaConfigFile, xappConfig)
vesmgr.startVesagent()
}
case err := <-vesmgr.chVesagent:
logger.Error("Vesagent exited: " + err.Error())
os.Exit(1)
}
}
func (vesmgr *VesMgr) waitSubscriptionLoop() {
for {
select {
case supervision := <-vesmgr.chSupervision:
logger.Info("vesmgr: supervision")
supervision <- "OK"
case isSubscribed := <-vesmgr.chXAppSubscriptions:
if isSubscribed.err != nil {
logger.Error("Failed to make xApp subscriptions, vesmgr exiting: %s", isSubscribed.err)
os.Exit(1)
}
return
}
}
}
// Run the vesmgr process main loop
func (vesmgr *VesMgr) Run() {
logger.Info("vesmgr main loop ready")
vesmgr.httpServer.start(vesmgrXappNotifPath, vesmgr.chXAppNotifications, vesmgr.chSupervision)
vesmgr.subscribeXAppNotifications()
vesmgr.waitSubscriptionLoop()
xappConfig, _ := queryConf()
createConf(vespaConfigFile, xappConfig)
vesmgr.startVesagent()
for {
vesmgr.servRequest()
}
}