blob: 1f02285aa1e812cbde97191b6b84fdd1d4770f59 [file] [log] [blame]
/*
==================================================================================
Copyright (c) 2019 AT&T Intellectual Property.
Copyright (c) 2019 Nokia
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==================================================================================
*/
package xapp
import (
"bytes"
"encoding/json"
"fmt"
"github.com/spf13/viper"
"net/http"
"os"
"os/signal"
"strings"
"sync/atomic"
"syscall"
"time"
)
type ReadyCB func(interface{})
type ShutdownCB func()
var (
// XApp is an application instance
Rmr *RMRClient
Sdl *SDLClient
Rnib *RNIBClient
Resource *Router
Metric *Metrics
Logger *Log
Config Configurator
Subscription *Subscriber
Alarm *AlarmClient
readyCb ReadyCB
readyCbParams interface{}
shutdownCb ShutdownCB
shutdownFlag int32
shutdownCnt int32
)
func IsReady() bool {
return Rmr != nil && Rmr.IsReady() && Sdl != nil && Sdl.IsReady()
}
func SetReadyCB(cb ReadyCB, params interface{}) {
readyCb = cb
readyCbParams = params
}
func XappReadyCb(params interface{}) {
Alarm = NewAlarmClient(viper.GetString("moId"), viper.GetString("name"))
if readyCb != nil {
readyCb(readyCbParams)
}
}
func SetShutdownCB(cb ShutdownCB) {
shutdownCb = cb
}
func XappShutdownCb() {
if err := doDeregister(); err != nil {
Logger.Info("xApp deregistration failed: %v, terminating ungracefully!", err)
} else {
Logger.Info("xApp deregistration successfull!")
}
if shutdownCb != nil {
shutdownCb()
}
}
func registerXapp() {
for {
time.Sleep(5 * time.Second)
if !IsHealthProbeReady() {
Logger.Info("xApp is not ready yet, waiting ...")
continue
}
Logger.Info("xApp is now up and ready, continue with registration ...")
if err := doRegister(); err == nil {
Logger.Info("xApp registration done, proceeding with startup ...")
break
}
}
}
func getService(host, service string) string {
appnamespace := os.Getenv("APP_NAMESPACE")
if appnamespace == "" {
appnamespace = DEFAULT_XAPP_NS
}
svc := fmt.Sprintf(service, strings.ToUpper(appnamespace), strings.ToUpper(host))
url := strings.Split(os.Getenv(strings.Replace(svc, "-", "_", -1)), "//")
if len(url) > 1 {
return url[1]
}
return ""
}
func getPltNamespace(envName, defVal string) string {
pltnamespace := os.Getenv("PLT_NAMESPACE")
if pltnamespace == "" {
pltnamespace = defVal
}
return pltnamespace
}
func doPost(pltNs, url string, msg []byte, status int) error {
resp, err := http.Post(fmt.Sprintf(url, pltNs, pltNs), "application/json", bytes.NewBuffer(msg))
if err != nil || resp == nil || resp.StatusCode != status {
Logger.Info("http.Post to '%s' failed with error: %v", fmt.Sprintf(url, pltNs, pltNs), err)
return err
}
Logger.Info("Post to '%s' done, status:%v", fmt.Sprintf(url, pltNs, pltNs), resp.Status)
return err
}
func doRegister() error {
host, _ := os.Hostname()
xappname := viper.GetString("name")
xappversion := viper.GetString("version")
pltNs := getPltNamespace("PLT_NAMESPACE", DEFAULT_PLT_NS)
httpEp, rmrEp := getService(host, SERVICE_HTTP), getService(host, SERVICE_RMR)
if httpEp == "" || rmrEp == "" {
Logger.Warn("Couldn't resolve service endpoints: httpEp=%s rmrEp=%s", httpEp, rmrEp)
return nil
}
requestBody, err := json.Marshal(map[string]string{
"appName": host,
"httpEndpoint": httpEp,
"rmrEndpoint": rmrEp,
"appInstanceName": xappname,
"appVersion": xappversion,
"configPath": CONFIG_PATH,
})
if err != nil {
Logger.Error("json.Marshal failed with error: %v", err)
return err
}
return doPost(pltNs, REGISTER_PATH, requestBody, http.StatusCreated)
}
func doDeregister() error {
if !IsHealthProbeReady() {
return nil
}
name, _ := os.Hostname()
xappname := viper.GetString("name")
pltNs := getPltNamespace("PLT_NAMESPACE", DEFAULT_PLT_NS)
requestBody, err := json.Marshal(map[string]string{
"appName": name,
"appInstanceName": xappname,
})
if err != nil {
Logger.Error("json.Marshal failed with error: %v", err)
return err
}
return doPost(pltNs, DEREGISTER_PATH, requestBody, http.StatusNoContent)
}
func InstallSignalHandler() {
//
// Signal handlers to really exit program.
// shutdownCb can hang until application has
// made all needed gracefull shutdown actions
// hardcoded limit for shutdown is 20 seconds
//
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, syscall.SIGINT, syscall.SIGTERM)
//signal handler function
go func() {
for range interrupt {
if atomic.CompareAndSwapInt32(&shutdownFlag, 0, 1) {
// close function
go func() {
timeout := int(20)
sentry := make(chan struct{})
defer close(sentry)
// close callback
go func() {
XappShutdownCb()
sentry <- struct{}{}
}()
select {
case <-time.After(time.Duration(timeout) * time.Second):
Logger.Info("xapp-frame shutdown callback took more than %d seconds", timeout)
case <-sentry:
Logger.Info("xapp-frame shutdown callback handled within %d seconds", timeout)
}
os.Exit(0)
}()
} else {
newCnt := atomic.AddInt32(&shutdownCnt, 1)
Logger.Info("xapp-frame shutdown already ongoing. Forced exit counter %d/%d ", newCnt, 5)
if newCnt >= 5 {
Logger.Info("xapp-frame shutdown forced exit")
os.Exit(0)
}
continue
}
}
}()
}
func init() {
// Load xapp configuration
Logger = LoadConfig()
if viper.IsSet("controls.logger.level") {
Logger.SetLevel(viper.GetInt("controls.logger.level"))
} else {
Logger.SetLevel(viper.GetInt("logger.level"))
}
Logger.SetFormat(0)
Resource = NewRouter()
Config = Configurator{}
Metric = NewMetrics(viper.GetString("metrics.url"), viper.GetString("metrics.namespace"), Resource.router)
Subscription = NewSubscriber(viper.GetString("subscription.host"), viper.GetInt("subscription.timeout"))
Sdl = NewSDLClient(viper.GetString("controls.db.namespace"))
Rnib = NewRNIBClient()
InstallSignalHandler()
}
func RunWithParams(c MessageConsumer, sdlcheck bool) {
Rmr = NewRMRClient()
Rmr.SetReadyCB(XappReadyCb, nil)
host := fmt.Sprintf(":%d", GetPortData("http").Port)
go http.ListenAndServe(host, Resource.router)
Logger.Info(fmt.Sprintf("Xapp started, listening on: %s", host))
if sdlcheck {
Sdl.TestConnection()
}
go registerXapp()
Rmr.Start(c)
}
func Run(c MessageConsumer) {
RunWithParams(c, true)
}