blob: 2bf77999bb9fed08e1e3de5bca1f133562bc22e6 [file] [log] [blame]
/*
==================================================================================
Copyright (c) 2019 AT&T Intellectual Property.
Copyright (c) 2019 Nokia
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==================================================================================
*/
package xapp
import (
"bytes"
"encoding/json"
"fmt"
"github.com/spf13/viper"
"io/ioutil"
"net/http"
"os"
"os/signal"
"strings"
"sync/atomic"
"syscall"
"time"
)
type ReadyCB func(interface{})
type ShutdownCB func()
var (
// XApp is an application instance
Rmr *RMRClient
Sdl *SDLClient
Rnib *RNIBClient
Resource *Router
Metric *Metrics
Logger *Log
Config Configurator
Subscription *Subscriber
Alarm *AlarmClient
readyCb ReadyCB
readyCbParams interface{}
shutdownCb ShutdownCB
shutdownFlag int32
shutdownCnt int32
)
func IsReady() bool {
return Rmr != nil && Rmr.IsReady() && Sdl != nil && Sdl.IsReady()
}
func SetReadyCB(cb ReadyCB, params interface{}) {
readyCb = cb
readyCbParams = params
}
func XappReadyCb(params interface{}) {
Alarm = NewAlarmClient(viper.GetString("moId"), viper.GetString("name"))
if readyCb != nil {
readyCb(readyCbParams)
}
}
func xappShutdownCb() {
SendDeregistermsg()
Logger.Info("Wait for xapp to get unregistered")
time.Sleep(10 * time.Second)
}
func registerxapp() {
var (
retries int = 10
)
for retries > 0 {
name, _ := os.Hostname()
httpservicename := "SERVICE_RICXAPP_" + strings.ToUpper(name) + "_HTTP_PORT"
httpendpoint := os.Getenv(strings.Replace(httpservicename, "-", "_", -1))
urlString := strings.Split(httpendpoint, "//")
// Added this check to make UT pass
if urlString[0] == "" {
return
}
resp, err := http.Get(fmt.Sprintf("http://%s/ric/v1/health/ready", urlString[1]))
retries -= 1
time.Sleep(5 * time.Second)
if err != nil {
Logger.Error("Error in health check: %v", err)
}
if err == nil {
retries -= 10
Logger.Info("Health Probe Success with resp.StatusCode is %v", resp.StatusCode)
if resp.StatusCode >= 200 && resp.StatusCode <= 299 {
go SendRegistermsg()
}
} else {
Logger.Info("Health Probe failed, retrying...")
}
}
}
func SendRegistermsg() {
name, _ := os.Hostname()
xappname := viper.GetString("name")
xappversion := viper.GetString("version")
appnamespace := os.Getenv("APP_NAMESPACE")
if appnamespace == "" {
appnamespace = "ricxapp"
}
httpservicename := "SERVICE_" + strings.ToUpper(appnamespace) + "_" + strings.ToUpper(name) + "_HTTP_PORT"
rmrservicename := "SERVICE_" + strings.ToUpper(appnamespace) + "_" + strings.ToUpper(name) + "_RMR_PORT"
httpendpointstr := os.Getenv(strings.Replace(httpservicename, "-", "_", -1))
rmrendpointstr := os.Getenv(strings.Replace(rmrservicename, "-", "_", -1))
httpendpoint := strings.Split(httpendpointstr, "//")
rmrendpoint := strings.Split(rmrendpointstr, "//")
if httpendpoint[0] == "" || rmrendpoint[0] == "" {
return
}
pltnamespace := os.Getenv("PLT_NAMESPACE")
if pltnamespace == "" {
pltnamespace = "ricplt"
}
configpath := "/ric/v1/config"
requestBody, err := json.Marshal(map[string]string{
"appName": name,
"httpEndpoint": httpendpoint[1],
"rmrEndpoint": rmrendpoint[1],
"appInstanceName": xappname,
"appVersion": xappversion,
"configPath": configpath,
})
if err != nil {
Logger.Info("Error while compiling request to appmgr: %v", err)
} else {
url := fmt.Sprintf("http://service-%v-appmgr-http.%v:8080/ric/v1/register", pltnamespace, pltnamespace)
resp, err := http.Post(url, "application/json", bytes.NewBuffer(requestBody))
Logger.Info(" Resp is %v", resp)
if err != nil {
Logger.Info("Error compiling request to appmgr: %v", err)
}
Logger.Info("Registering request sent. Response received is :%v", resp)
if resp != nil {
body, err := ioutil.ReadAll(resp.Body)
Logger.Info("Post body is %v", resp.Body)
if err != nil {
Logger.Info("rsp: Error compiling request to appmgr: %v", string(body))
}
defer resp.Body.Close()
}
}
}
func SendDeregistermsg() {
name, _ := os.Hostname()
xappname := viper.GetString("name")
appnamespace := os.Getenv("APP_NAMESPACE")
if appnamespace == "" {
appnamespace = "ricxapp"
}
pltnamespace := os.Getenv("PLT_NAMESPACE")
if pltnamespace == "" {
pltnamespace = "ricplt"
}
requestBody, err := json.Marshal(map[string]string{
"appName": name,
"appInstanceName": xappname,
})
if err != nil {
Logger.Info("Error while compiling request to appmgr: %v", err)
} else {
url := fmt.Sprintf("http://service-%v-appmgr-http.%v:8080/ric/v1/deregister", pltnamespace, pltnamespace)
resp, err := http.Post(url, "application/json", bytes.NewBuffer(requestBody))
Logger.Info(" Resp is %v", resp)
if err != nil {
Logger.Info("Error compiling request to appmgr: %v", err)
}
Logger.Info("Deregistering request sent. Response received is :%v", resp)
if resp != nil {
body, err := ioutil.ReadAll(resp.Body)
Logger.Info("Post body is %v", resp.Body)
if err != nil {
Logger.Info("rsp: Error compiling request to appmgr: %v", string(body))
}
defer resp.Body.Close()
}
}
}
func SetShutdownCB(cb ShutdownCB) {
shutdownCb = cb
}
func InstallSignalHandler() {
//
// Signal handlers to really exit program.
// shutdownCb can hang until application has
// made all needed gracefull shutdown actions
// hardcoded limit for shutdown is 20 seconds
//
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, syscall.SIGINT, syscall.SIGTERM)
//signal handler function
go func() {
for range interrupt {
if atomic.CompareAndSwapInt32(&shutdownFlag, 0, 1) {
// close function
go func() {
timeout := int(20)
sentry := make(chan struct{})
defer close(sentry)
// close callback
go func() {
if shutdownCb != nil {
shutdownCb()
}
sentry <- struct{}{}
}()
select {
case <-time.After(time.Duration(timeout) * time.Second):
Logger.Info("xapp-frame shutdown callback took more than %d seconds", timeout)
case <-sentry:
Logger.Info("xapp-frame shutdown callback handled within %d seconds", timeout)
}
os.Exit(0)
}()
} else {
newCnt := atomic.AddInt32(&shutdownCnt, 1)
Logger.Info("xapp-frame shutdown already ongoing. Forced exit counter %d/%d ", newCnt, 5)
if newCnt >= 5 {
Logger.Info("xapp-frame shutdown forced exit")
os.Exit(0)
}
continue
}
}
}()
}
func init() {
// Load xapp configuration
Logger = LoadConfig()
if viper.IsSet("controls.logger.level") {
Logger.SetLevel(viper.GetInt("controls.logger.level"))
} else {
Logger.SetLevel(viper.GetInt("logger.level"))
}
Logger.SetFormat(0)
Resource = NewRouter()
Config = Configurator{}
Metric = NewMetrics(viper.GetString("metrics.url"), viper.GetString("metrics.namespace"), Resource.router)
Subscription = NewSubscriber(viper.GetString("subscription.host"), viper.GetInt("subscription.timeout"))
Sdl = NewSDLClient(viper.GetString("controls.db.namespace"))
Rnib = NewRNIBClient()
InstallSignalHandler()
}
func RunWithParams(c MessageConsumer, sdlcheck bool) {
Rmr = NewRMRClient()
Rmr.SetReadyCB(XappReadyCb, nil)
SetShutdownCB(xappShutdownCb)
host := fmt.Sprintf(":%d", GetPortData("http").Port)
go http.ListenAndServe(host, Resource.router)
Logger.Info(fmt.Sprintf("Xapp started, listening on: %s", host))
if sdlcheck {
Sdl.TestConnection()
}
go registerxapp()
Rmr.Start(c)
}
func Run(c MessageConsumer) {
RunWithParams(c, true)
}