| /* |
| ================================================================================== |
| Copyright (c) 2019 AT&T Intellectual Property. |
| Copyright (c) 2019 Nokia |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| ================================================================================== |
| */ |
| |
| package xapp |
| |
| import ( |
| "bytes" |
| "encoding/json" |
| "fmt" |
| "github.com/spf13/viper" |
| "io/ioutil" |
| "net/http" |
| "os" |
| "os/signal" |
| "strings" |
| "sync/atomic" |
| "syscall" |
| "time" |
| ) |
| |
| type ReadyCB func(interface{}) |
| type ShutdownCB func() |
| |
| var ( |
| // XApp is an application instance |
| Rmr *RMRClient |
| Sdl *SDLClient |
| Rnib *RNIBClient |
| Resource *Router |
| Metric *Metrics |
| Logger *Log |
| Config Configurator |
| Subscription *Subscriber |
| Alarm *AlarmClient |
| readyCb ReadyCB |
| readyCbParams interface{} |
| shutdownCb ShutdownCB |
| shutdownFlag int32 |
| shutdownCnt int32 |
| ) |
| |
| func IsReady() bool { |
| return Rmr != nil && Rmr.IsReady() && Sdl != nil && Sdl.IsReady() |
| } |
| |
| func SetReadyCB(cb ReadyCB, params interface{}) { |
| readyCb = cb |
| readyCbParams = params |
| } |
| |
| func XappReadyCb(params interface{}) { |
| Alarm = NewAlarmClient(viper.GetString("moId"), viper.GetString("name")) |
| if readyCb != nil { |
| readyCb(readyCbParams) |
| } |
| } |
| |
| func xappShutdownCb() { |
| SendDeregistermsg() |
| Logger.Info("Wait for xapp to get unregistered") |
| time.Sleep(10 * time.Second) |
| } |
| |
| func registerxapp() { |
| var ( |
| retries int = 10 |
| ) |
| for retries > 0 { |
| name, _ := os.Hostname() |
| httpservicename := "SERVICE_RICXAPP_" + strings.ToUpper(name) + "_HTTP_PORT" |
| httpendpoint := os.Getenv(strings.Replace(httpservicename, "-", "_", -1)) |
| urlString := strings.Split(httpendpoint, "//") |
| // Added this check to make UT pass |
| if urlString[0] == "" { |
| return |
| } |
| resp, err := http.Get(fmt.Sprintf("http://%s/ric/v1/health/ready", urlString[1])) |
| retries -= 1 |
| time.Sleep(5 * time.Second) |
| if err != nil { |
| Logger.Error("Error in health check: %v", err) |
| } |
| if err == nil { |
| retries -= 10 |
| Logger.Info("Health Probe Success with resp.StatusCode is %v", resp.StatusCode) |
| if resp.StatusCode >= 200 && resp.StatusCode <= 299 { |
| go SendRegistermsg() |
| } |
| } else { |
| Logger.Info("Health Probe failed, retrying...") |
| } |
| } |
| } |
| |
| func SendRegistermsg() { |
| name, _ := os.Hostname() |
| xappname := viper.GetString("name") |
| xappversion := viper.GetString("version") |
| appnamespace := os.Getenv("APP_NAMESPACE") |
| if appnamespace == "" { |
| appnamespace = "ricxapp" |
| } |
| httpservicename := "SERVICE_" + strings.ToUpper(appnamespace) + "_" + strings.ToUpper(name) + "_HTTP_PORT" |
| rmrservicename := "SERVICE_" + strings.ToUpper(appnamespace) + "_" + strings.ToUpper(name) + "_RMR_PORT" |
| httpendpointstr := os.Getenv(strings.Replace(httpservicename, "-", "_", -1)) |
| rmrendpointstr := os.Getenv(strings.Replace(rmrservicename, "-", "_", -1)) |
| httpendpoint := strings.Split(httpendpointstr, "//") |
| rmrendpoint := strings.Split(rmrendpointstr, "//") |
| if httpendpoint[0] == "" || rmrendpoint[0] == "" { |
| return |
| } |
| |
| pltnamespace := os.Getenv("PLT_NAMESPACE") |
| if pltnamespace == "" { |
| pltnamespace = "ricplt" |
| } |
| |
| configpath := "/ric/v1/config" |
| |
| requestBody, err := json.Marshal(map[string]string{ |
| "appName": name, |
| "httpEndpoint": httpendpoint[1], |
| "rmrEndpoint": rmrendpoint[1], |
| "appInstanceName": xappname, |
| "appVersion": xappversion, |
| "configPath": configpath, |
| }) |
| |
| if err != nil { |
| Logger.Info("Error while compiling request to appmgr: %v", err) |
| } else { |
| url := fmt.Sprintf("http://service-%v-appmgr-http.%v:8080/ric/v1/register", pltnamespace, pltnamespace) |
| resp, err := http.Post(url, "application/json", bytes.NewBuffer(requestBody)) |
| Logger.Info(" Resp is %v", resp) |
| if err != nil { |
| Logger.Info("Error compiling request to appmgr: %v", err) |
| } |
| Logger.Info("Registering request sent. Response received is :%v", resp) |
| |
| if resp != nil { |
| body, err := ioutil.ReadAll(resp.Body) |
| Logger.Info("Post body is %v", resp.Body) |
| if err != nil { |
| Logger.Info("rsp: Error compiling request to appmgr: %v", string(body)) |
| } |
| defer resp.Body.Close() |
| } |
| } |
| } |
| |
| func SendDeregistermsg() { |
| |
| name, _ := os.Hostname() |
| xappname := viper.GetString("name") |
| |
| appnamespace := os.Getenv("APP_NAMESPACE") |
| if appnamespace == "" { |
| appnamespace = "ricxapp" |
| } |
| pltnamespace := os.Getenv("PLT_NAMESPACE") |
| if pltnamespace == "" { |
| pltnamespace = "ricplt" |
| } |
| |
| requestBody, err := json.Marshal(map[string]string{ |
| "appName": name, |
| "appInstanceName": xappname, |
| }) |
| |
| if err != nil { |
| Logger.Info("Error while compiling request to appmgr: %v", err) |
| } else { |
| url := fmt.Sprintf("http://service-%v-appmgr-http.%v:8080/ric/v1/deregister", pltnamespace, pltnamespace) |
| resp, err := http.Post(url, "application/json", bytes.NewBuffer(requestBody)) |
| Logger.Info(" Resp is %v", resp) |
| if err != nil { |
| Logger.Info("Error compiling request to appmgr: %v", err) |
| } |
| Logger.Info("Deregistering request sent. Response received is :%v", resp) |
| |
| if resp != nil { |
| body, err := ioutil.ReadAll(resp.Body) |
| Logger.Info("Post body is %v", resp.Body) |
| if err != nil { |
| Logger.Info("rsp: Error compiling request to appmgr: %v", string(body)) |
| } |
| defer resp.Body.Close() |
| } |
| } |
| } |
| |
| func SetShutdownCB(cb ShutdownCB) { |
| shutdownCb = cb |
| } |
| |
| func InstallSignalHandler() { |
| // |
| // Signal handlers to really exit program. |
| // shutdownCb can hang until application has |
| // made all needed gracefull shutdown actions |
| // hardcoded limit for shutdown is 20 seconds |
| // |
| interrupt := make(chan os.Signal, 1) |
| signal.Notify(interrupt, syscall.SIGINT, syscall.SIGTERM) |
| //signal handler function |
| go func() { |
| for range interrupt { |
| if atomic.CompareAndSwapInt32(&shutdownFlag, 0, 1) { |
| // close function |
| go func() { |
| timeout := int(20) |
| sentry := make(chan struct{}) |
| defer close(sentry) |
| |
| // close callback |
| go func() { |
| if shutdownCb != nil { |
| shutdownCb() |
| } |
| sentry <- struct{}{} |
| }() |
| select { |
| case <-time.After(time.Duration(timeout) * time.Second): |
| Logger.Info("xapp-frame shutdown callback took more than %d seconds", timeout) |
| case <-sentry: |
| Logger.Info("xapp-frame shutdown callback handled within %d seconds", timeout) |
| } |
| os.Exit(0) |
| }() |
| } else { |
| newCnt := atomic.AddInt32(&shutdownCnt, 1) |
| Logger.Info("xapp-frame shutdown already ongoing. Forced exit counter %d/%d ", newCnt, 5) |
| if newCnt >= 5 { |
| Logger.Info("xapp-frame shutdown forced exit") |
| os.Exit(0) |
| } |
| continue |
| } |
| |
| } |
| }() |
| } |
| |
| func init() { |
| // Load xapp configuration |
| Logger = LoadConfig() |
| |
| if viper.IsSet("controls.logger.level") { |
| Logger.SetLevel(viper.GetInt("controls.logger.level")) |
| } else { |
| Logger.SetLevel(viper.GetInt("logger.level")) |
| } |
| Logger.SetFormat(0) |
| |
| Resource = NewRouter() |
| Config = Configurator{} |
| Metric = NewMetrics(viper.GetString("metrics.url"), viper.GetString("metrics.namespace"), Resource.router) |
| Subscription = NewSubscriber(viper.GetString("subscription.host"), viper.GetInt("subscription.timeout")) |
| Sdl = NewSDLClient(viper.GetString("controls.db.namespace")) |
| Rnib = NewRNIBClient() |
| |
| InstallSignalHandler() |
| } |
| |
| func RunWithParams(c MessageConsumer, sdlcheck bool) { |
| Rmr = NewRMRClient() |
| Rmr.SetReadyCB(XappReadyCb, nil) |
| SetShutdownCB(xappShutdownCb) |
| host := fmt.Sprintf(":%d", GetPortData("http").Port) |
| go http.ListenAndServe(host, Resource.router) |
| Logger.Info(fmt.Sprintf("Xapp started, listening on: %s", host)) |
| if sdlcheck { |
| Sdl.TestConnection() |
| } |
| go registerxapp() |
| Rmr.Start(c) |
| } |
| |
| func Run(c MessageConsumer) { |
| RunWithParams(c, true) |
| } |