Configure xApp metrics in xApp description

Vesmgr subscribes the xApp notifications from xAppMgr. When it
receives a notification, it requests the xApp config from xAppMgr,
creates the VESPA configuration according to it, and restarts
VESPA with the new configuration.

The xApp counters should be defined in the xApp descriptor.
Vesmgr reads the counter definitions from section config->metrics.
The following fields are required:

* name - Prometheus name of the counter
* objectName - Ves collector object name
* objectInstance - Ves collector object instance

If the same counter name is defined by several xApps,
vesmgr makes the configuration based on the first definition
and ignores the latter ones, if they are conflicting.
However, VESPA reports all counters to Ves collector regardless
of the xApp exposing them.

VESPA reads the ricComponentName from Prometheus label
"kubernetes_name".

There are no more hard-coded metrics configured in this version.

Change-Id: I746c6941cebe686165aed97e223b0ec0e9c7a679
Signed-off-by: Katri Turunen <katri.turunen@nokia.com>
diff --git a/cmd/vesmgr/vesmgr.go b/cmd/vesmgr/vesmgr.go
index 0867517..18ed793 100755
--- a/cmd/vesmgr/vesmgr.go
+++ b/cmd/vesmgr/vesmgr.go
@@ -18,55 +18,125 @@
 package main
 
 import (
+	"errors"
+	"io/ioutil"
+	"net"
+	"net/http"
 	"os"
 	"os/exec"
+	"time"
+
 	mdcloggo "gerrit.o-ran-sc.org/r/com/golog.git"
 )
 
+var appmgrDomain string
+
+const appmgrXAppConfigPath = "/ric/v1/config"
+const appmgrPort = "8080"
+
 type VesAgent struct {
-	Pid  int
-	name string
+	Pid     int
+	name    string
+	process *os.Process
+}
+
+type VesMgr struct {
+	myIPAddress  string
+	appmgrSubsId string
+}
+
+type subsChannel struct {
+	subscribed bool
+	err        error
 }
 
 var vesagent VesAgent
+var vesmgr VesMgr
 var logger *mdcloggo.MdcLogger
-var osExit = os.Exit
+
+const vesmgrXappNotifPort = "8080"
+const vesmgrXappNotifPath = "/vesmgr_xappnotif/"
+const timeoutPostXAppSubscriptions = 5
 
 func init() {
 	logger, _ = mdcloggo.InitLogger("vesmgr")
 }
 
-/* Function to initialize vesmgr */
-func vesmgrInit() {
-	vesagent.name = "ves-agent"
-	logger.MdcAdd("vesmgr", "0.0.1")
-	logger.Info("vesmgrInit")
+func getMyIP() (myIP string, retErr error) {
+	addrs, err := net.InterfaceAddrs()
+	if err != nil {
+		logger.Error("net.InterfaceAddrs failed: %s", err.Error())
+		return "", err
+	}
+	for _, addr := range addrs {
+		// check the address type and if it is not a loopback take it
+		if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
+			if ipnet.IP.To4() != nil {
+				logger.Info("My IP Address: %s", ipnet.IP.String())
+				return ipnet.IP.String(), nil
+			}
+		}
+	}
+	return "", nil
+}
 
-	/* Subscribe notifications from xAPP Mgr */
-	//subscribexAppNotifications()
-
-	// create configuration
+func createConf(xappMetrics []byte) {
 	f, err := os.Create("/etc/ves-agent/ves-agent.yaml")
 	if err != nil {
 		logger.Error("Cannot create vespa conf file: %s", err.Error())
-		return
+		os.Exit(1)
 	}
 	defer f.Close()
 
-	createVespaConfig(f)
-
-	/* Start ves-agent */
-	ch := startVesagent()
-
-	runVesmgr(ch)
+	createVespaConfig(f, xappMetrics)
+	logger.Info("Vespa config created")
 }
 
-func startVesagent() chan error {
-	/* Start ves-agent */
+func subscribeXAppNotifications(chSubscriptions chan subsChannel) {
+	xappNotifUrl := "http://" + vesmgr.myIPAddress + ":" + vesmgrXappNotifPort + vesmgrXappNotifPath
+	subsUrl := "http://" + appmgrDomain + ":" + appmgrPort + appmgrSubsPath
+	go subscribexAppNotifications(xappNotifUrl, chSubscriptions, timeoutPostXAppSubscriptions, subsUrl)
+	logger.Info("xApp notifications subscribed from %s", subsUrl)
+}
+
+func vesmgrInit() {
+	vesagent.name = "ves-agent"
+	logger.Info("vesmgrInit")
+
+	var err error
+	if vesmgr.myIPAddress, err = getMyIP(); err != nil || vesmgr.myIPAddress == "" {
+		logger.Error("Cannot get myIPAddress: IP %s, err %s", vesmgr.myIPAddress, err.Error())
+		return
+	}
+
+	var ok bool
+	appmgrDomain, ok = os.LookupEnv("VESMGR_APPMGRDOMAIN")
+	if ok {
+		logger.Info("Using appmgrdomain %s", appmgrDomain)
+	} else {
+		appmgrDomain = "service-ricplt-appmgr-http.ricplt.svc.cluster.local"
+		logger.Info("Using default appmgrdomain %s", appmgrDomain)
+	}
+	chXAppSubscriptions := make(chan subsChannel)
+	chXAppNotifications := make(chan []byte)
+	chSupervision := make(chan chan string)
+	chVesagent := make(chan error)
+
+	listener, err := net.Listen("tcp", vesmgr.myIPAddress+":"+vesmgrXappNotifPort)
+	startHttpServer(listener, vesmgrXappNotifPath, chXAppNotifications, chSupervision)
+
+	subscribeXAppNotifications(chXAppSubscriptions)
+
+	createConf([]byte{})
+	startVesagent(chVesagent)
+
+	runVesmgr(chVesagent, chSupervision, chXAppNotifications, chXAppSubscriptions)
+}
+
+func startVesagent(ch chan error) {
 	cmd := exec.Command(vesagent.name, "-i", os.Getenv("VESMGR_HB_INTERVAL"), "-m", os.Getenv("VESMGR_MEAS_INTERVAL"), "--Measurement.Prometheus.Address", os.Getenv("VESMGR_PROMETHEUS_ADDR"))
 	cmd.Stdout = os.Stdout
 	cmd.Stderr = os.Stderr
-	ch := make(chan error)
 	if err := cmd.Start(); err != nil {
 		logger.Error("vesmgr exiting, ves-agent start failed: %s", err)
 		go func() {
@@ -75,20 +145,108 @@
 	} else {
 		logger.Info("ves-agent started with pid %d", cmd.Process.Pid)
 		vesagent.Pid = cmd.Process.Pid
+		vesagent.process = cmd.Process
 		go func() {
 			// wait ves-agent exit and then post the error to the channel
 			err := cmd.Wait()
 			ch <- err
 		}()
 	}
-
-	return ch
 }
 
-func runVesmgr(ch chan error) {
+func killVespa(process *os.Process) {
+	logger.Info("Killing vespa")
+	err := process.Kill()
+	if err != nil {
+		logger.Error("Cannot kill vespa: %s", err.Error())
+	}
+}
+
+func queryXAppsStatus(appmgrUrl string, timeout time.Duration) ([]byte, error) {
+
+	logger.Info("query xAppStatus started, url %s", appmgrUrl)
+	req, err := http.NewRequest("GET", appmgrUrl, nil)
+	if err != nil {
+		logger.Error("Failed to create a HTTP request: %s", err)
+		return nil, err
+	}
+	req.Header.Set("Content-Type", "application/json")
+	client := &http.Client{}
+	client.Timeout = time.Second * timeout
+	resp, err := client.Do(req)
+	if err != nil {
+		logger.Error("Query xApp status failed: %s", err)
+		return nil, err
+	}
+	defer resp.Body.Close()
+	if resp.StatusCode == http.StatusOK {
+		body, err := ioutil.ReadAll(resp.Body)
+		if err != nil {
+			logger.Error("Failed to read xApp status body: %s", err)
+			return nil, err
+		}
+		logger.Info("query xAppStatus completed")
+		return body, nil
+	} else {
+		logger.Error("Error from xApp status query: %s", resp.Status)
+		return nil, errors.New(resp.Status)
+	}
+}
+
+type state int
+
+const (
+	normalState           state = iota
+	vespaTerminatingState state = iota
+)
+
+func queryConf() ([]byte, error) {
+	return queryXAppsStatus("http://"+appmgrDomain+":"+appmgrPort+appmgrXAppConfigPath,
+		10*time.Second)
+}
+
+func runVesmgr(chVesagent chan error, chSupervision chan chan string, chXAppNotifications chan []byte, chXAppSubscriptions chan subsChannel) {
+
+	logger.Info("vesmgr main loop ready")
+	mystate := normalState
+	var xappStatus []byte
 	for {
-		err := <-ch
-		logger.Error("Vesagent exited: " + err.Error())
-		osExit(1)
+		select {
+		case supervision := <-chSupervision:
+			logger.Info("vesmgr: supervision")
+			supervision <- "OK"
+		case xAppNotif := <-chXAppNotifications:
+			logger.Info("vesmgr: xApp notification")
+			logger.Info(string(xAppNotif))
+			/*
+			 * If xapp status query fails then we cannot create
+			 * a new configuration and kill vespa.
+			 * In that case we assume that
+			 * the situation is fixed when the next
+			 * xapp notif comes
+			 */
+			var err error
+			xappStatus, err = queryConf()
+			if err == nil {
+				killVespa(vesagent.process)
+				mystate = vespaTerminatingState
+			}
+		case err := <-chVesagent:
+			switch mystate {
+			case vespaTerminatingState:
+				logger.Info("Vesagent termination completed")
+				createConf(xappStatus)
+				startVesagent(chVesagent)
+				mystate = normalState
+			default:
+				logger.Error("Vesagent exited: " + err.Error())
+				os.Exit(1)
+			}
+		case isSubscribed := <-chXAppSubscriptions:
+			if isSubscribed.err != nil {
+				logger.Error("Failed to make xApp subscriptions, vesmgr exiting: %s", isSubscribed.err)
+				os.Exit(1)
+			}
+		}
 	}
 }