Fixed integration and subscription routes related issues for R3
Change-Id: I8f89ad2524b6dbe33f088f1685d9b3d5820b7e54
Signed-off-by: prabhukaliswamy <prabhu.k@nokia.com>
diff --git a/RELNOTES b/RELNOTES
index 92fde07..2d57a92 100644
--- a/RELNOTES
+++ b/RELNOTES
@@ -1,3 +1,6 @@
+### v0.4.1
+* Fixed integration and subscription routes related issues for R3
+
### v0.3.9
* Added RSM platform component routes for message types RAN_CONNECTED, RAN_RESTARTED, RAN_RECONFIGURED, RIC_RES_STATUS_REQ,RIC_RES_STATUS_RESP and RIC_RES_STATUS_FAILURE
* xApp manager interface changes for Subscription Request API
diff --git a/cmd/rtmgr.go b/cmd/rtmgr.go
index 101c743..568b7ad 100644
--- a/cmd/rtmgr.go
+++ b/cmd/rtmgr.go
@@ -47,7 +47,7 @@
)
const SERVICENAME = "rtmgr"
-const INTERVAL time.Duration = 2
+const INTERVAL time.Duration = 60
var (
args map[string]*string
@@ -123,7 +123,6 @@
go serveSBI(triggerSBI, sbiEngine, sdlEngine, rpeEngine)
for {
- time.Sleep(INTERVAL * time.Second)
if *args["nbi"] == "httpGetter" {
data, err := nbiEngine.(*nbi.HttpGetter).FetchAllXApps(*args["xm-url"])
if err != nil {
@@ -134,6 +133,9 @@
}
triggerSBI <- true
+
+ time.Sleep(INTERVAL * time.Second)
+ rtmgr.Logger.Debug("Periodic loop timed out. Setting triggerSBI flag to distribute updated routes.")
}
}
diff --git a/container-tag.yaml b/container-tag.yaml
index dc6f324..3308c36 100644
--- a/container-tag.yaml
+++ b/container-tag.yaml
@@ -2,4 +2,4 @@
# By default this file is in the docker build directory,
# but the location can configured in the JJB template.
---
-tag: 0.4.0
+tag: 0.4.1
diff --git a/pkg/nbi/httprestful.go b/pkg/nbi/httprestful.go
index 9606288..8ceca1d 100644
--- a/pkg/nbi/httprestful.go
+++ b/pkg/nbi/httprestful.go
@@ -35,6 +35,8 @@
"encoding/json"
"errors"
"fmt"
+ "github.com/go-openapi/loads"
+ "github.com/go-openapi/runtime/middleware"
"net/url"
"os"
"routing-manager/pkg/models"
@@ -46,9 +48,6 @@
"routing-manager/pkg/sdl"
"strconv"
"time"
-
- "github.com/go-openapi/loads"
- "github.com/go-openapi/runtime/middleware"
)
//var myClient = &http.Client{Timeout: 1 * time.Second}
@@ -217,6 +216,8 @@
if err != nil {
return handle.NewProvideXappSubscriptionHandleBadRequest()
} else {
+ //Delay the reponse as add subscription channel needs to update sdl and then sbi sends updated routes to all endpoints
+ time.Sleep(1 * time.Second)
return handle.NewGetHandlesOK()
}
})
@@ -226,6 +227,8 @@
if err != nil {
return handle.NewDeleteXappSubscriptionHandleNoContent()
} else {
+ //Delay the reponse as delete subscription channel needs to update sdl and then sbi sends updated routes to all endpoints
+ time.Sleep(1 * time.Second)
return handle.NewGetHandlesOK()
}
})
@@ -237,7 +240,7 @@
}
func httpGetXApps(xmurl string) (*[]rtmgr.XApp, error) {
- rtmgr.Logger.Info("Invoked httpgetter.fetchXappList: " + xmurl)
+ rtmgr.Logger.Info("Invoked httprestful.httpGetXApps: " + xmurl)
r, err := myClient.Get(xmurl)
if err != nil {
return nil, err
@@ -252,10 +255,10 @@
rtmgr.Logger.Warn("Json decode failed: " + err.Error())
}
rtmgr.Logger.Info("HTTP GET: OK")
- rtmgr.Logger.Debug("httpgetter.fetchXappList returns: %v", xapps)
+ rtmgr.Logger.Debug("httprestful.httpGetXApps returns: %v", xapps)
return &xapps, err
}
- rtmgr.Logger.Warn("httpgetter got an unexpected http status code: %v", r.StatusCode)
+ rtmgr.Logger.Warn("httprestful got an unexpected http status code: %v", r.StatusCode)
return nil, nil
}
@@ -315,8 +318,12 @@
if err != nil {
rtmgr.Logger.Error("cannot get data from rest api dute to: " + err.Error())
} else if data != nil {
- sdlEngine.WriteXApps(fileName, data)
- triggerSBI <- true
+ rtmgr.Logger.Debug("Fetching all xApps deployed in xApp Manager through GET operation.")
+ alldata, err1 := httpGetXApps(xmurl)
+ if alldata != nil && err1 == nil {
+ sdlEngine.WriteXApps(fileName, alldata)
+ triggerSBI <- true
+ }
}
}
}()
diff --git a/pkg/rpe/rpe.go b/pkg/rpe/rpe.go
index 9a36564..aa8e12f 100644
--- a/pkg/rpe/rpe.go
+++ b/pkg/rpe/rpe.go
@@ -33,8 +33,8 @@
"errors"
"routing-manager/pkg/rtmgr"
"routing-manager/pkg/sbi"
- "runtime"
"strconv"
+ "runtime"
)
var (
@@ -90,26 +90,26 @@
rxList := []rtmgr.EndpointList{[]rtmgr.Endpoint{*rx}}
messageId := rtmgr.MessageTypes[messageType]
route := rtmgr.RouteTableEntry{
- MessageType: messageId,
- TxList: txList,
- RxGroups: rxList,
- SubID: subId}
- *routeTable = append(*routeTable, route)
- rtmgr.Logger.Debug("Route added: MessageTyp: %v, Tx: %v, Rx: %v, SubId: %v", messageId, tx.Uuid, rx.Uuid, subId)
- rtmgr.Logger.Trace("Route added: MessageTyp: %v, Tx: %v, Rx: %v, SubId: %v", messageId, tx, rx, subId)
- } else {
- pc, _, _, ok := runtime.Caller(1)
- details := runtime.FuncForPC(pc)
- if ok && details != nil {
- rtmgr.Logger.Error("Route addition skipped: Either TX or RX endpoint not present. Caller function is %s", details.Name())
+ MessageType: messageId,
+ TxList: txList,
+ RxGroups: rxList,
+ SubID: subId}
+ *routeTable = append(*routeTable, route)
+ rtmgr.Logger.Debug("Route added: MessageTyp: %v, Tx: %v, Rx: %v, SubId: %v", messageId, tx.Uuid, rx.Uuid, subId)
+ rtmgr.Logger.Trace("Route added: MessageTyp: %v, Tx: %v, Rx: %v, SubId: %v", messageId, tx, rx, subId)
+ } else {
+ pc,_,_,ok := runtime.Caller(1)
+ details := runtime.FuncForPC(pc)
+ if ok && details != nil {
+ rtmgr.Logger.Error("Route addition skipped: Either TX or RX endpoint not present. Caller function is %s", details.Name())
+ }
}
- }
}
func (r *Rpe) generateXappRoutes(xAppEp *rtmgr.Endpoint, e2TermEp *rtmgr.Endpoint, subManEp *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) {
rtmgr.Logger.Debug("rpe.generateXappRoutes invoked")
rtmgr.Logger.Debug("Endpoint: %v, xAppType: %v", xAppEp.Name, xAppEp.XAppType)
- if xAppEp.XAppType != sbi.PlatformType && len(xAppEp.TxMessages) > 0 && len(xAppEp.RxMessages) > 0 {
+ if xAppEp.XAppType != sbi.PlatformType && ( len(xAppEp.TxMessages) > 0 || len(xAppEp.RxMessages) > 0 ) {
//xApp -> Subscription Manager
r.addRoute("RIC_SUB_REQ", xAppEp, subManEp, routeTable, -1)
r.addRoute("RIC_SUB_DEL_REQ", xAppEp, subManEp, routeTable, -1)
@@ -121,7 +121,7 @@
}
}
-func (r *Rpe) generateSubscriptionRoutes(e2TermEp *rtmgr.Endpoint, subManEp *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) {
+func (r *Rpe) generateSubscriptionRoutes(selectedxAppEp *rtmgr.Endpoint, e2TermEp *rtmgr.Endpoint, subManEp *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) {
rtmgr.Logger.Debug("rpe.addSubscriptionRoutes invoked")
subscriptionList := &rtmgr.Subs
for _, subscription := range *subscriptionList {
@@ -129,15 +129,18 @@
xAppUuid := subscription.Fqdn + ":" + strconv.Itoa(int(subscription.Port))
rtmgr.Logger.Debug("xApp UUID: %v", xAppUuid)
xAppEp := getEndpointByUuid(xAppUuid)
- //Subscription Manager -> xApp
- r.addRoute("RIC_SUB_RESP", subManEp, xAppEp, routeTable, subscription.SubID)
- r.addRoute("RIC_SUB_FAILURE", subManEp, xAppEp, routeTable, subscription.SubID)
- r.addRoute("RIC_SUB_DEL_RESP", subManEp, xAppEp, routeTable, subscription.SubID)
- r.addRoute("RIC_SUB_DEL_FAILURE", subManEp, xAppEp, routeTable, subscription.SubID)
- //E2 Termination -> xApp
- r.addRoute("RIC_INDICATION", e2TermEp, xAppEp, routeTable, subscription.SubID)
- r.addRoute("RIC_CONTROL_ACK", e2TermEp, xAppEp, routeTable, subscription.SubID)
- r.addRoute("RIC_CONTROL_FAILURE", e2TermEp, xAppEp, routeTable, subscription.SubID)
+ if xAppEp.Uuid == selectedxAppEp.Uuid {
+ rtmgr.Logger.Debug("xApp UUID is matched for selected xApp.UUID: %v and xApp.Name: %v", selectedxAppEp.Uuid, selectedxAppEp.Name)
+ //Subscription Manager -> xApp
+ r.addRoute("RIC_SUB_RESP", subManEp, xAppEp, routeTable, subscription.SubID)
+ r.addRoute("RIC_SUB_FAILURE", subManEp, xAppEp, routeTable, subscription.SubID)
+ r.addRoute("RIC_SUB_DEL_RESP", subManEp, xAppEp, routeTable, subscription.SubID)
+ r.addRoute("RIC_SUB_DEL_FAILURE", subManEp, xAppEp, routeTable, subscription.SubID)
+ //E2 Termination -> xApp
+ r.addRoute("RIC_INDICATION", e2TermEp, xAppEp, routeTable, subscription.SubID)
+ r.addRoute("RIC_CONTROL_ACK", e2TermEp, xAppEp, routeTable, subscription.SubID)
+ r.addRoute("RIC_CONTROL_FAILURE", e2TermEp, xAppEp, routeTable, subscription.SubID)
+ }
}
}
@@ -227,9 +230,9 @@
for _, endPoint := range endPointList {
rtmgr.Logger.Debug("Endpoint: %v, xAppType: %v", endPoint.Name, endPoint.XAppType)
- if endPoint.XAppType != sbi.PlatformType && len(endPoint.TxMessages) > 0 && len(endPoint.RxMessages) > 0 {
+ if endPoint.XAppType != sbi.PlatformType && ( len(endPoint.TxMessages) > 0 || len(endPoint.RxMessages) > 0 ) {
r.generateXappRoutes(endPoint, e2TermEp, subManEp, routeTable)
- r.generateSubscriptionRoutes(e2TermEp, subManEp, routeTable)
+ r.generateSubscriptionRoutes(endPoint, e2TermEp, subManEp, routeTable)
}
}
return routeTable
diff --git a/pkg/sbi/nngpush.go b/pkg/sbi/nngpush.go
index 6e3e225..4f56753 100644
--- a/pkg/sbi/nngpush.go
+++ b/pkg/sbi/nngpush.go
@@ -30,12 +30,11 @@
import (
"errors"
- "routing-manager/pkg/rtmgr"
- "strconv"
-
"nanomsg.org/go/mangos/v2"
"nanomsg.org/go/mangos/v2/protocol/push"
_ "nanomsg.org/go/mangos/v2/transport/all"
+ "routing-manager/pkg/rtmgr"
+ "strconv"
)
type NngPush struct {
@@ -61,6 +60,7 @@
func pipeEventHandler(event mangos.PipeEvent, pipe mangos.Pipe) {
rtmgr.Logger.Debug("Invoked: pipeEventHandler()")
+ rtmgr.Logger.Debug("Received pipe event for " + pipe.Address() + " address")
for _, ep := range rtmgr.Eps {
uri := DefaultNngPipelineSocketPrefix + ep.Ip + ":" + strconv.Itoa(DefaultNngPipelineSocketNumber)
if uri == pipe.Address() {
diff --git a/pkg/sbi/sbi.go b/pkg/sbi/sbi.go
index 7d07160..2737cc1 100644
--- a/pkg/sbi/sbi.go
+++ b/pkg/sbi/sbi.go
@@ -64,6 +64,7 @@
}
func (s *Sbi) pruneEndpointList(sbi Engine) {
+ rtmgr.Logger.Debug("pruneEndpointList invoked.")
for _, ep := range rtmgr.Eps {
if !ep.Keepalive {
rtmgr.Logger.Debug("deleting %v", ep)