Some code refactoring, etc.
Change-Id: I4d3e4a38fcdeab11fa846c3b7ec16d998fe29180
Signed-off-by: Mohamed Abukar <abukar.mohamed@nokia.com>
diff --git a/pkg/xapp/restapi.go b/pkg/xapp/restapi.go
index 46e81f6..e335c08 100755
--- a/pkg/xapp/restapi.go
+++ b/pkg/xapp/restapi.go
@@ -37,6 +37,10 @@
AppConfigURL = "/ric/v1/config"
)
+var (
+ healthReady bool
+)
+
type StatusCb func() bool
type Router struct {
@@ -97,7 +101,12 @@
return
}
+func IsHealthProbeReady() bool {
+ return healthReady
+}
+
func readyHandler(w http.ResponseWriter, r *http.Request) {
+ healthReady = true
respondWithJSON(w, http.StatusOK, nil)
}
diff --git a/pkg/xapp/types.go b/pkg/xapp/types.go
index b6fc49b..c9fe1ec 100755
--- a/pkg/xapp/types.go
+++ b/pkg/xapp/types.go
@@ -81,3 +81,14 @@
Policies []int
MaxRetryOnFailure int
}
+
+// @todo: read these from config or somewhere else
+const (
+ SERVICE_HTTP = "SERVICE_%s_%s_HTTP_PORT"
+ SERVICE_RMR = "SERVICE_%s_%s_RMR_PORT"
+ CONFIG_PATH = "/ric/v1/config"
+ REGISTER_PATH = "http://service-%s-appmgr-http.%s:8080/ric/v1/register"
+ DEREGISTER_PATH = "http://service-%s-appmgr-http.%s:8080/ric/v1/deregister"
+ DEFAULT_PLT_NS = "ricplt"
+ DEFAULT_XAPP_NS = "ricxapp"
+)
diff --git a/pkg/xapp/xapp.go b/pkg/xapp/xapp.go
index 2bf7799..1f02285 100755
--- a/pkg/xapp/xapp.go
+++ b/pkg/xapp/xapp.go
@@ -24,7 +24,6 @@
"encoding/json"
"fmt"
"github.com/spf13/viper"
- "io/ioutil"
"net/http"
"os"
"os/signal"
@@ -71,144 +70,123 @@
}
}
-func xappShutdownCb() {
- SendDeregistermsg()
- Logger.Info("Wait for xapp to get unregistered")
- time.Sleep(10 * time.Second)
-}
-
-func registerxapp() {
- var (
- retries int = 10
- )
- for retries > 0 {
- name, _ := os.Hostname()
- httpservicename := "SERVICE_RICXAPP_" + strings.ToUpper(name) + "_HTTP_PORT"
- httpendpoint := os.Getenv(strings.Replace(httpservicename, "-", "_", -1))
- urlString := strings.Split(httpendpoint, "//")
- // Added this check to make UT pass
- if urlString[0] == "" {
- return
- }
- resp, err := http.Get(fmt.Sprintf("http://%s/ric/v1/health/ready", urlString[1]))
- retries -= 1
- time.Sleep(5 * time.Second)
- if err != nil {
- Logger.Error("Error in health check: %v", err)
- }
- if err == nil {
- retries -= 10
- Logger.Info("Health Probe Success with resp.StatusCode is %v", resp.StatusCode)
- if resp.StatusCode >= 200 && resp.StatusCode <= 299 {
- go SendRegistermsg()
- }
- } else {
- Logger.Info("Health Probe failed, retrying...")
- }
- }
-}
-
-func SendRegistermsg() {
- name, _ := os.Hostname()
- xappname := viper.GetString("name")
- xappversion := viper.GetString("version")
- appnamespace := os.Getenv("APP_NAMESPACE")
- if appnamespace == "" {
- appnamespace = "ricxapp"
- }
- httpservicename := "SERVICE_" + strings.ToUpper(appnamespace) + "_" + strings.ToUpper(name) + "_HTTP_PORT"
- rmrservicename := "SERVICE_" + strings.ToUpper(appnamespace) + "_" + strings.ToUpper(name) + "_RMR_PORT"
- httpendpointstr := os.Getenv(strings.Replace(httpservicename, "-", "_", -1))
- rmrendpointstr := os.Getenv(strings.Replace(rmrservicename, "-", "_", -1))
- httpendpoint := strings.Split(httpendpointstr, "//")
- rmrendpoint := strings.Split(rmrendpointstr, "//")
- if httpendpoint[0] == "" || rmrendpoint[0] == "" {
- return
- }
-
- pltnamespace := os.Getenv("PLT_NAMESPACE")
- if pltnamespace == "" {
- pltnamespace = "ricplt"
- }
-
- configpath := "/ric/v1/config"
-
- requestBody, err := json.Marshal(map[string]string{
- "appName": name,
- "httpEndpoint": httpendpoint[1],
- "rmrEndpoint": rmrendpoint[1],
- "appInstanceName": xappname,
- "appVersion": xappversion,
- "configPath": configpath,
- })
-
- if err != nil {
- Logger.Info("Error while compiling request to appmgr: %v", err)
- } else {
- url := fmt.Sprintf("http://service-%v-appmgr-http.%v:8080/ric/v1/register", pltnamespace, pltnamespace)
- resp, err := http.Post(url, "application/json", bytes.NewBuffer(requestBody))
- Logger.Info(" Resp is %v", resp)
- if err != nil {
- Logger.Info("Error compiling request to appmgr: %v", err)
- }
- Logger.Info("Registering request sent. Response received is :%v", resp)
-
- if resp != nil {
- body, err := ioutil.ReadAll(resp.Body)
- Logger.Info("Post body is %v", resp.Body)
- if err != nil {
- Logger.Info("rsp: Error compiling request to appmgr: %v", string(body))
- }
- defer resp.Body.Close()
- }
- }
-}
-
-func SendDeregistermsg() {
-
- name, _ := os.Hostname()
- xappname := viper.GetString("name")
-
- appnamespace := os.Getenv("APP_NAMESPACE")
- if appnamespace == "" {
- appnamespace = "ricxapp"
- }
- pltnamespace := os.Getenv("PLT_NAMESPACE")
- if pltnamespace == "" {
- pltnamespace = "ricplt"
- }
-
- requestBody, err := json.Marshal(map[string]string{
- "appName": name,
- "appInstanceName": xappname,
- })
-
- if err != nil {
- Logger.Info("Error while compiling request to appmgr: %v", err)
- } else {
- url := fmt.Sprintf("http://service-%v-appmgr-http.%v:8080/ric/v1/deregister", pltnamespace, pltnamespace)
- resp, err := http.Post(url, "application/json", bytes.NewBuffer(requestBody))
- Logger.Info(" Resp is %v", resp)
- if err != nil {
- Logger.Info("Error compiling request to appmgr: %v", err)
- }
- Logger.Info("Deregistering request sent. Response received is :%v", resp)
-
- if resp != nil {
- body, err := ioutil.ReadAll(resp.Body)
- Logger.Info("Post body is %v", resp.Body)
- if err != nil {
- Logger.Info("rsp: Error compiling request to appmgr: %v", string(body))
- }
- defer resp.Body.Close()
- }
- }
-}
-
func SetShutdownCB(cb ShutdownCB) {
shutdownCb = cb
}
+func XappShutdownCb() {
+ if err := doDeregister(); err != nil {
+ Logger.Info("xApp deregistration failed: %v, terminating ungracefully!", err)
+ } else {
+ Logger.Info("xApp deregistration successfull!")
+ }
+
+ if shutdownCb != nil {
+ shutdownCb()
+ }
+}
+
+func registerXapp() {
+ for {
+ time.Sleep(5 * time.Second)
+ if !IsHealthProbeReady() {
+ Logger.Info("xApp is not ready yet, waiting ...")
+ continue
+ }
+
+ Logger.Info("xApp is now up and ready, continue with registration ...")
+ if err := doRegister(); err == nil {
+ Logger.Info("xApp registration done, proceeding with startup ...")
+ break
+ }
+ }
+}
+
+func getService(host, service string) string {
+ appnamespace := os.Getenv("APP_NAMESPACE")
+ if appnamespace == "" {
+ appnamespace = DEFAULT_XAPP_NS
+ }
+
+ svc := fmt.Sprintf(service, strings.ToUpper(appnamespace), strings.ToUpper(host))
+ url := strings.Split(os.Getenv(strings.Replace(svc, "-", "_", -1)), "//")
+ if len(url) > 1 {
+ return url[1]
+ }
+ return ""
+}
+
+func getPltNamespace(envName, defVal string) string {
+ pltnamespace := os.Getenv("PLT_NAMESPACE")
+ if pltnamespace == "" {
+ pltnamespace = defVal
+ }
+
+ return pltnamespace
+}
+
+func doPost(pltNs, url string, msg []byte, status int) error {
+ resp, err := http.Post(fmt.Sprintf(url, pltNs, pltNs), "application/json", bytes.NewBuffer(msg))
+ if err != nil || resp == nil || resp.StatusCode != status {
+ Logger.Info("http.Post to '%s' failed with error: %v", fmt.Sprintf(url, pltNs, pltNs), err)
+ return err
+ }
+ Logger.Info("Post to '%s' done, status:%v", fmt.Sprintf(url, pltNs, pltNs), resp.Status)
+
+ return err
+}
+
+func doRegister() error {
+ host, _ := os.Hostname()
+ xappname := viper.GetString("name")
+ xappversion := viper.GetString("version")
+ pltNs := getPltNamespace("PLT_NAMESPACE", DEFAULT_PLT_NS)
+
+ httpEp, rmrEp := getService(host, SERVICE_HTTP), getService(host, SERVICE_RMR)
+ if httpEp == "" || rmrEp == "" {
+ Logger.Warn("Couldn't resolve service endpoints: httpEp=%s rmrEp=%s", httpEp, rmrEp)
+ return nil
+ }
+
+ requestBody, err := json.Marshal(map[string]string{
+ "appName": host,
+ "httpEndpoint": httpEp,
+ "rmrEndpoint": rmrEp,
+ "appInstanceName": xappname,
+ "appVersion": xappversion,
+ "configPath": CONFIG_PATH,
+ })
+
+ if err != nil {
+ Logger.Error("json.Marshal failed with error: %v", err)
+ return err
+ }
+
+ return doPost(pltNs, REGISTER_PATH, requestBody, http.StatusCreated)
+}
+
+func doDeregister() error {
+ if !IsHealthProbeReady() {
+ return nil
+ }
+
+ name, _ := os.Hostname()
+ xappname := viper.GetString("name")
+ pltNs := getPltNamespace("PLT_NAMESPACE", DEFAULT_PLT_NS)
+
+ requestBody, err := json.Marshal(map[string]string{
+ "appName": name,
+ "appInstanceName": xappname,
+ })
+
+ if err != nil {
+ Logger.Error("json.Marshal failed with error: %v", err)
+ return err
+ }
+
+ return doPost(pltNs, DEREGISTER_PATH, requestBody, http.StatusNoContent)
+}
+
func InstallSignalHandler() {
//
// Signal handlers to really exit program.
@@ -230,9 +208,7 @@
// close callback
go func() {
- if shutdownCb != nil {
- shutdownCb()
- }
+ XappShutdownCb()
sentry <- struct{}{}
}()
select {
@@ -280,15 +256,18 @@
func RunWithParams(c MessageConsumer, sdlcheck bool) {
Rmr = NewRMRClient()
+
Rmr.SetReadyCB(XappReadyCb, nil)
- SetShutdownCB(xappShutdownCb)
+
host := fmt.Sprintf(":%d", GetPortData("http").Port)
go http.ListenAndServe(host, Resource.router)
Logger.Info(fmt.Sprintf("Xapp started, listening on: %s", host))
+
if sdlcheck {
Sdl.TestConnection()
}
- go registerxapp()
+ go registerXapp()
+
Rmr.Start(c)
}
diff --git a/pkg/xapp/xapp_test.go b/pkg/xapp/xapp_test.go
index 173b57f..388d079 100755
--- a/pkg/xapp/xapp_test.go
+++ b/pkg/xapp/xapp_test.go
@@ -430,14 +430,14 @@
executeRequest(req, handleFunc)
}
-func TestSendRegistermsg(t *testing.T) {
- Logger.Error("CASE: TestSendRegistermsg")
- SendRegistermsg()
+func TestRegisterXapp(t *testing.T) {
+ Logger.Error("CASE: TestRegisterXapp")
+ doRegister()
}
-func TestSendDeregistermsg(t *testing.T) {
- Logger.Error("CASE: TestSendDeregistermsg")
- SendDeregistermsg()
+func TestDeregisterXapp(t *testing.T) {
+ Logger.Error("CASE: TestDeregisterXapp")
+ doDeregister()
}
func TestMisc(t *testing.T) {