Upgraded to RMR 4.7.4 and some improvements
Signed-off-by: wahidw <abdulwahid.w@nokia.com>
Change-Id: Icbc120c35db37f68e6524bd998b127593b0e094a
diff --git a/Dockerfile b/Dockerfile
index adae21d..0d561f0 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -26,12 +26,12 @@
FROM nexus3.o-ran-sc.org:10002/o-ran-sc/bldr-ubuntu18-c-go:1.9.0 as rtmgrbuild
# Install RMr shared library
-ARG RMRVERSION=4.5.2
+ARG RMRVERSION=4.7.4
RUN wget --content-disposition https://packagecloud.io/o-ran-sc/release/packages/debian/stretch/rmr_${RMRVERSION}_amd64.deb/download.deb && dpkg -i rmr_${RMRVERSION}_amd64.deb && rm -rf rmr_${RMRVERSION}_amd64.deb
# Install RMr development header files
RUN wget --content-disposition https://packagecloud.io/o-ran-sc/release/packages/debian/stretch/rmr-dev_${RMRVERSION}_amd64.deb/download.deb && dpkg -i rmr-dev_${RMRVERSION}_amd64.deb && rm -rf rmr-dev_${RMRVERSION}_amd64.deb
-ENV GOLANG_VERSION 1.12.1
+ENV GOLANG_VERSION 1.13.10
RUN wget --quiet https://dl.google.com/go/go$GOLANG_VERSION.linux-amd64.tar.gz \
&& tar xvzf go$GOLANG_VERSION.linux-amd64.tar.gz -C /usr/local
ENV PATH="/usr/local/go/bin:${PATH}"
diff --git a/RELNOTES b/RELNOTES
index dc34000..bfc5a93 100644
--- a/RELNOTES
+++ b/RELNOTES
@@ -1,3 +1,6 @@
+### v0.7.6
+* Upgraded to RMR 4.7.4 and some improvements
+
### v0.7.5
* Open RMR connection in a a new thread
diff --git a/cmd/rtmgr.go b/cmd/rtmgr.go
index c8ad5c6..2d3d81e 100644
--- a/cmd/rtmgr.go
+++ b/cmd/rtmgr.go
@@ -49,6 +49,14 @@
const SERVICENAME = "rtmgr"
+/*type RMRUpdateType int
+
+const (
+ XappType = iota
+ SubsType
+ E2Type
+)*/
+
func SetupCloseHandler() {
c := make(chan os.Signal, 2)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
diff --git a/container-tag.yaml b/container-tag.yaml
index 4381920..547f709 100644
--- a/container-tag.yaml
+++ b/container-tag.yaml
@@ -2,4 +2,4 @@
# By default this file is in the docker build directory,
# but the location can configured in the JJB template.
---
-tag: 0.7.5
+tag: 0.7.6
diff --git a/pkg/nbi/control.go b/pkg/nbi/control.go
index d88f5b0..2ed63d8 100644
--- a/pkg/nbi/control.go
+++ b/pkg/nbi/control.go
@@ -30,16 +30,19 @@
"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
"net/http"
"os"
+ "routing-manager/pkg/models"
"routing-manager/pkg/rpe"
"routing-manager/pkg/rtmgr"
"routing-manager/pkg/sbi"
"routing-manager/pkg/sdl"
"strconv"
+ "strings"
"sync"
"time"
)
var m sync.Mutex
+var EndpointLock sync.Mutex
var nbiEngine Engine
var sbiEngine sbi.Engine
@@ -139,11 +142,15 @@
}
}
- ep := sbiEngine.CheckEndpoint(string(params.Payload))
- if ep == nil {
- xapp.Logger.Error("Update Routing Table Request can't handle due to end point %s is not avail in complete ep list: ", string(params.Payload))
- return
+ /* hack with WA only for mcxapp in near future */
+ if strings.Contains(msg.String(), "ricxapp") {
+ ep := sbiEngine.CheckEndpoint(string(params.Payload))
+ if ep == nil {
+ xapp.Logger.Error("Update Routing Table Request can't handle due to end point %s is not avail in complete ep list: ", string(params.Payload))
+ return
+ }
}
+
epstr, whid := sbiEngine.CreateEndpoint(msg.String())
if epstr == nil || whid < 0 {
xapp.Logger.Error("Wormhole Id creation failed %d for %s", whid, msg.String())
@@ -184,7 +191,19 @@
if err != nil {
return errors.New("Routing table cannot be published due to: " + err.Error())
}
+ EndpointLock.Lock()
sbiEngine.UpdateEndpoints(data)
+ EndpointLock.Unlock()
+
+ return nil
+}
+
+func sendPartialRoutesToAll(xappSubData *models.XappSubscriptionData, updatetype rtmgr.RMRUpdateType) (err error) {
+ policies := rpeEngine.GeneratePartialPolicies(rtmgr.Eps, xappSubData, updatetype)
+ err = sbiEngine.DistributeAll(policies)
+ if err != nil {
+ return errors.New("Routing table cannot be published due to: " + err.Error())
+ }
return nil
}
@@ -224,9 +243,18 @@
/* used for rtmgr restart case to connect to Endpoints */
go updateEp()
time.Sleep(5 * time.Second)
+ sendRoutesToAll()
+ /* Sometimes first message fails, retry after 5 sec */
+ time.Sleep(5 * time.Second)
+ sendRoutesToAll()
for {
- sendRoutesToAll()
+ xapp.Logger.Debug("Periodic Routes value = %s", xapp.Config.GetString("periodicroutes"))
+ if xapp.Config.GetString("periodicroutes") == "enable" {
+ go updateEp()
+ time.Sleep(5 * time.Second)
+ sendRoutesToAll()
+ }
rtmgr.Rtmgr_ready = true
time.Sleep(INTERVAL * time.Second)
diff --git a/pkg/nbi/httprestful.go b/pkg/nbi/httprestful.go
index 5393dc9..60d65fb 100644
--- a/pkg/nbi/httprestful.go
+++ b/pkg/nbi/httprestful.go
@@ -146,6 +146,7 @@
m.Unlock()
updateEp()
return sendRoutesToAll()
+ //return sendPartialRoutesToAll(nil, rtmgr.XappType)
}
}
@@ -227,7 +228,7 @@
addSubscription(&rtmgr.Subs, data)
xapp.Logger.Debug("Endpoints: %v", rtmgr.Eps)
updateEp()
- return sendRoutesToAll()
+ return sendPartialRoutesToAll(data, rtmgr.SubsType)
}
func subscriptionExists(data *models.XappSubscriptionData) bool {
@@ -295,6 +296,7 @@
err, IsDuplicate := validateE2tData(data)
if IsDuplicate == true {
updateEp()
+ //return sendPartialRoutesToAll(nil, rtmgr.E2Type)
return sendRoutesToAll()
}
@@ -309,6 +311,7 @@
sdlEngine.WriteNewE2TInstance(xapp.Config.GetString("rtfile"), e2data, meiddata)
m.Unlock()
updateEp()
+ //sendPartialRoutesToAll(nil, rtmgr.E2Type)
sendRoutesToAll()
time.Sleep(10 * time.Second)
for ep, value := range rtmgr.RMRConnStatus {
diff --git a/pkg/rpe/rmr.go b/pkg/rpe/rmr.go
index eb7d7ed..17c977b 100644
--- a/pkg/rpe/rmr.go
+++ b/pkg/rpe/rmr.go
@@ -32,6 +32,7 @@
import (
"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
+ "routing-manager/pkg/models"
"routing-manager/pkg/rtmgr"
"strconv"
//"strings"
@@ -114,6 +115,49 @@
return &rawrt
}
+/*
+Produces the raw route message consumable by RMR
+*/
+func (r *Rmr) generatePartialRMRPolicies(eps rtmgr.Endpoints, xappSubData *models.XappSubscriptionData, key string, updatetype rtmgr.RMRUpdateType) *[]string {
+ rawrt := []string{key + "updatert|start\n"}
+ rt := r.generatePartialRouteTable(eps, xappSubData, updatetype)
+ for _, rte := range *rt {
+ rawrte := key + "mse|" + rte.MessageType
+ for _, tx := range rte.TxList {
+ rawrte += "," + tx.Ip + ":" + strconv.Itoa(int(tx.Port))
+ }
+ rawrte += "|" + strconv.Itoa(int(rte.SubID)) + "|"
+ group := ""
+ for _, rxg := range rte.RxGroups {
+ member := ""
+ for _, rx := range rxg {
+ if member == "" {
+ member += rx.Ip + ":" + strconv.Itoa(int(rx.Port))
+ } else {
+ member += "," + rx.Ip + ":" + strconv.Itoa(int(rx.Port))
+ }
+ }
+ if group == "" {
+ group += member
+ } else {
+ group += ";" + member
+ }
+ }
+ rawrte += group
+
+ if rte.RouteType == "%meid" {
+ rawrte += group + rte.RouteType
+ }
+
+ rawrt = append(rawrt, rawrte+"\n")
+ }
+
+ rawrt = append(rawrt, key+"updatert|end\n")
+ //count := 0
+
+ xapp.Logger.Debug("rmr.GeneratePolicies returns: %v", rawrt)
+ return &rawrt
+}
func (r *RmrPush) GeneratePolicies(eps rtmgr.Endpoints, rcs *rtmgr.RicComponents) *[]string {
xapp.Logger.Debug("Invoked rmr.GeneratePolicies, args: %v: ", eps)
return r.generateRMRPolicies(eps, rcs, "")
@@ -122,3 +166,8 @@
func (r *RmrPush) GenerateRouteTable(eps rtmgr.Endpoints) *rtmgr.RouteTable {
return r.generateRouteTable(eps)
}
+
+func (r *RmrPush) GeneratePartialPolicies(eps rtmgr.Endpoints, xappSubData *models.XappSubscriptionData, updatetype rtmgr.RMRUpdateType) *[]string {
+ xapp.Logger.Debug("Invoked rmr.GeneratePartialRMR, args: %v: ", eps)
+ return r.generatePartialRMRPolicies(eps, xappSubData, "", updatetype)
+}
diff --git a/pkg/rpe/rpe.go b/pkg/rpe/rpe.go
index 2c0423e..a11baf8 100644
--- a/pkg/rpe/rpe.go
+++ b/pkg/rpe/rpe.go
@@ -32,6 +32,7 @@
import (
"errors"
"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
+ "routing-manager/pkg/models"
"routing-manager/pkg/rtmgr"
"routing-manager/pkg/sbi"
"runtime"
@@ -258,7 +259,27 @@
}
}
-func (r *Rpe) generatePlatformRoutes(e2TermEp []rtmgr.Endpoint, subManEp *rtmgr.Endpoint, e2ManEp *rtmgr.Endpoint, rsmEp *rtmgr.Endpoint, a1mediatorEp *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) {
+func (r *Rpe) generatePartialSubscriptionTable(xappSubData *models.XappSubscriptionData, subManEp *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) {
+ xapp.Logger.Debug("rpe.addSingleSubscriptionRoutes invoked")
+ xAppUuid := *xappSubData.Address + ":" + strconv.Itoa(int(*xappSubData.Port))
+ xapp.Logger.Debug("xApp UUID: %v", xAppUuid)
+ xAppEp := getEndpointByUuid(xAppUuid)
+ if xAppEp != nil {
+ //Subscription Manager -> xApp
+ r.addRoute("RIC_SUB_RESP", subManEp, xAppEp, routeTable, *xappSubData.SubscriptionID, "")
+ r.addRoute("RIC_SUB_FAILURE", subManEp, xAppEp, routeTable, *xappSubData.SubscriptionID, "")
+ r.addRoute("RIC_SUB_DEL_RESP", subManEp, xAppEp, routeTable, *xappSubData.SubscriptionID, "")
+ r.addRoute("RIC_SUB_DEL_FAILURE", subManEp, xAppEp, routeTable, *xappSubData.SubscriptionID, "")
+ //E2 Termination -> xApp
+ r.addRoute("RIC_INDICATION", nil, xAppEp, routeTable, *xappSubData.SubscriptionID, "")
+ r.addRoute("RIC_CONTROL_ACK", nil, xAppEp, routeTable, *xappSubData.SubscriptionID, "")
+ r.addRoute("RIC_CONTROL_FAILURE", nil, xAppEp, routeTable, *xappSubData.SubscriptionID, "")
+ } else {
+ xapp.Logger.Error("generateSubscriptionRoutes xAppEp is nil, xApp UUID: %v", xAppUuid)
+ }
+}
+
+func (r *Rpe) generatePlatformRoutes(e2TermEp []rtmgr.Endpoint, subManEp *rtmgr.Endpoint, e2ManEp *rtmgr.Endpoint, a1mediatorEp *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) {
xapp.Logger.Debug("rpe.generatePlatformRoutes invoked")
//Platform Routes --- Subscription Routes
//Subscription Manager -> E2 Termination
@@ -274,8 +295,8 @@
sendEp = subManEp
case "E2MAN":
sendEp = e2ManEp
- case "RSM":
- sendEp = rsmEp
+ //case "RSM":,
+ // sendEp = rsmEp
case "A1MEDIATOR":
sendEp = a1mediatorEp
}
@@ -286,8 +307,8 @@
Ep = e2ManEp
//case "UEMAN":
// Ep = ueManEp
- case "RSM":
- Ep = rsmEp
+ //case "RSM":
+ // Ep = rsmEp
case "A1MEDIATOR":
Ep = a1mediatorEp
}
@@ -303,6 +324,54 @@
}
}
+func (r *Rpe) generatePartialRouteTable(endPointList rtmgr.Endpoints, xappSubData *models.XappSubscriptionData, updatetype rtmgr.RMRUpdateType) *rtmgr.RouteTable {
+ xapp.Logger.Debug("rpe.generatePartialRouteTable invoked")
+ xapp.Logger.Debug("Endpoint List: %v", endPointList)
+ routeTable := &rtmgr.RouteTable{}
+ subManEp := getEndpointByName(&endPointList, "SUBMAN")
+ if subManEp == nil {
+ xapp.Logger.Error("Platform component not found: %v", "Subscription Manager")
+ xapp.Logger.Debug("Endpoints: %v", endPointList)
+ }
+ /*e2TermListEp := getEndpointListByName(&endPointList, "E2TERMINST")
+ if len(e2TermListEp) == 0 {
+ xapp.Logger.Error("Platform component not found: %v", "E2 Termination List")
+ xapp.Logger.Debug("Endpoints: %v", endPointList)
+ }
+ e2ManEp := getEndpointByName(&endPointList, "E2MAN")
+ if e2ManEp == nil {
+ xapp.Logger.Error("Platform component not found: %v", "E2 Manager")
+ xapp.Logger.Debug("Endpoints: %v", endPointList)
+ }*/
+
+ if xappSubData != nil && updatetype == rtmgr.SubsType {
+ xapp.Logger.Info("Updating partial subscription routes")
+ r.generatePartialSubscriptionTable(xappSubData, subManEp, routeTable)
+ }
+ /*if updatetype == rtmgr.XappType {
+ xapp.Logger.Info("Updating partial xapp routes")
+ for _, endPoint := range endPointList {
+ xapp.Logger.Debug("Endpoint: %v, xAppType: %v", endPoint.Name, endPoint.XAppType)
+ if endPoint.XAppType != sbi.PlatformType && (len(endPoint.TxMessages) > 0 || len(endPoint.RxMessages) > 0) {
+ r.generateXappRoutes(endPoint, subManEp, routeTable)
+ r.generateXappToXappRoutes(endPoint, endPointList, routeTable)
+ }
+ }
+ }
+ if updatetype == rtmgr.E2Type {
+ xapp.Logger.Info("Updating partial E2 routes")
+ if len(e2TermListEp) > 0 {
+ r.addRoute_rx_list("RIC_SCTP_CLEAR_ALL", e2ManEp, e2TermListEp, routeTable, -1, "")
+ r.addRoute_rx_list("E2_TERM_KEEP_ALIVE_REQ", e2ManEp, e2TermListEp, routeTable, -1, "")
+ r.addRoute_rx_list("RIC_E2_SETUP_RESP", e2ManEp, e2TermListEp, routeTable, -1, "")
+ r.addRoute_rx_list("RIC_E2_SETUP_FAILURE", e2ManEp, e2TermListEp, routeTable, -1, "")
+ }
+ }*/
+
+ return routeTable
+
+}
+
func (r *Rpe) generateRouteTable(endPointList rtmgr.Endpoints) *rtmgr.RouteTable {
xapp.Logger.Debug("rpe.generateRouteTable invoked")
xapp.Logger.Debug("Endpoint List: %v", endPointList)
@@ -322,11 +391,11 @@
xapp.Logger.Error("Platform component not found: %v", "E2 Manager")
xapp.Logger.Debug("Endpoints: %v", endPointList)
}
- rsmEp := getEndpointByName(&endPointList, "RSM")
+ /*rsmEp := getEndpointByName(&endPointList, "RSM")
if rsmEp == nil {
xapp.Logger.Error("Platform component not found: %v", "Resource Status Manager")
xapp.Logger.Debug("Endpoints: %v", endPointList)
- }
+ }*/
A1MediatorEp := getEndpointByName(&endPointList, "A1MEDIATOR")
if A1MediatorEp == nil {
xapp.Logger.Error("Platform component not found: %v", "A1Mediator")
@@ -338,7 +407,7 @@
xapp.Logger.Error("Platform component not found: %v", "E2 Termination List")
xapp.Logger.Debug("Endpoints: %v", endPointList)
}
- r.generatePlatformRoutes(e2TermListEp, subManEp, e2ManEp, rsmEp, A1MediatorEp, routeTable)
+ r.generatePlatformRoutes(e2TermListEp, subManEp, e2ManEp, A1MediatorEp, routeTable)
for _, endPoint := range endPointList {
xapp.Logger.Debug("Endpoint: %v, xAppType: %v", endPoint.Name, endPoint.XAppType)
diff --git a/pkg/rpe/types.go b/pkg/rpe/types.go
index 4a5b9fc..2d466d3 100644
--- a/pkg/rpe/types.go
+++ b/pkg/rpe/types.go
@@ -29,7 +29,10 @@
package rpe
-import "routing-manager/pkg/rtmgr"
+import (
+ "routing-manager/pkg/models"
+ "routing-manager/pkg/rtmgr"
+)
//type generatePolicies func(rtmgr.Endpoints) *[]string
//type generateRouteTable func(rtmgr.Endpoints) *rtmgr.RouteTable
@@ -45,4 +48,5 @@
type Engine interface {
GeneratePolicies(rtmgr.Endpoints, *rtmgr.RicComponents) *[]string
GenerateRouteTable(rtmgr.Endpoints) *rtmgr.RouteTable
+ GeneratePartialPolicies(eps rtmgr.Endpoints, xappSubData *models.XappSubscriptionData, updatetype rtmgr.RMRUpdateType) *[]string
}
diff --git a/pkg/rtmgr/types.go b/pkg/rtmgr/types.go
index 41e8cf0..b9b9604 100644
--- a/pkg/rtmgr/types.go
+++ b/pkg/rtmgr/types.go
@@ -28,6 +28,14 @@
package rtmgr
+type RMRUpdateType int
+
+const (
+ XappType = iota
+ SubsType
+ E2Type
+)
+
type XApps struct {
XAppList []XApp
}
diff --git a/pkg/sbi/nngpush.go b/pkg/sbi/nngpush.go
index 16545cb..ec4d25a 100644
--- a/pkg/sbi/nngpush.go
+++ b/pkg/sbi/nngpush.go
@@ -87,7 +87,8 @@
}
func (c *RmrPush) AddEndpoint(ep *rtmgr.Endpoint) error {
- count := addendpointct + 1
+ addendpointct = addendpointct + 1
+ count := addendpointct
xapp.Logger.Debug("Invoked sbi.AddEndpoint for %s with count = %d", ep.Ip, count)
endpoint := ep.Ip + ":" + strconv.Itoa(DefaultRmrPipelineSocketNumber)
ep.Whid = int(xapp.Rmr.Openwh(endpoint))
@@ -159,7 +160,6 @@
xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
ret := c.send_data(ep, policies, call_id)
- xapp.Logger.Debug("return value is %v", ret)
conn.Lock()
rtmgr.RMRConnStatus[ep.Uuid] = ret
conn.Unlock()
diff --git a/pkg/sbi/types.go b/pkg/sbi/types.go
index b52dcfa..094a8ad 100644
--- a/pkg/sbi/types.go
+++ b/pkg/sbi/types.go
@@ -45,9 +45,9 @@
AddEndpoint(*rtmgr.Endpoint) error
DeleteEndpoint(*rtmgr.Endpoint) error
UpdateEndpoints(*rtmgr.RicComponents)
- CreateEndpoint(string)(*string,int)
- CheckEndpoint(string)*rtmgr.Endpoint
- DistributeToEp(*[]string, string, int ) error
+ CheckEndpoint(string) *rtmgr.Endpoint
+ CreateEndpoint(string) (*string, int)
+ DistributeToEp(*[]string, string, int) error
}
/*type NngSocket interface {