Upgraded to RMR 4.7.4 and some improvements

Signed-off-by: wahidw <abdulwahid.w@nokia.com>
Change-Id: Icbc120c35db37f68e6524bd998b127593b0e094a
diff --git a/Dockerfile b/Dockerfile
index adae21d..0d561f0 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -26,12 +26,12 @@
 FROM nexus3.o-ran-sc.org:10002/o-ran-sc/bldr-ubuntu18-c-go:1.9.0 as rtmgrbuild
 
 # Install RMr shared library
-ARG RMRVERSION=4.5.2
+ARG RMRVERSION=4.7.4
 RUN wget --content-disposition https://packagecloud.io/o-ran-sc/release/packages/debian/stretch/rmr_${RMRVERSION}_amd64.deb/download.deb && dpkg -i rmr_${RMRVERSION}_amd64.deb && rm -rf rmr_${RMRVERSION}_amd64.deb
 # Install RMr development header files
 RUN wget --content-disposition https://packagecloud.io/o-ran-sc/release/packages/debian/stretch/rmr-dev_${RMRVERSION}_amd64.deb/download.deb && dpkg -i rmr-dev_${RMRVERSION}_amd64.deb && rm -rf rmr-dev_${RMRVERSION}_amd64.deb
 
-ENV GOLANG_VERSION 1.12.1
+ENV GOLANG_VERSION 1.13.10
 RUN wget --quiet https://dl.google.com/go/go$GOLANG_VERSION.linux-amd64.tar.gz \
      && tar xvzf go$GOLANG_VERSION.linux-amd64.tar.gz -C /usr/local 
 ENV PATH="/usr/local/go/bin:${PATH}"
diff --git a/RELNOTES b/RELNOTES
index dc34000..bfc5a93 100644
--- a/RELNOTES
+++ b/RELNOTES
@@ -1,3 +1,6 @@
+### v0.7.6
+* Upgraded to RMR 4.7.4 and some improvements 
+
 ### v0.7.5
 * Open RMR connection in a a new thread
 
diff --git a/cmd/rtmgr.go b/cmd/rtmgr.go
index c8ad5c6..2d3d81e 100644
--- a/cmd/rtmgr.go
+++ b/cmd/rtmgr.go
@@ -49,6 +49,14 @@
 
 const SERVICENAME = "rtmgr"
 
+/*type RMRUpdateType int
+
+const (
+	XappType = iota
+	SubsType
+	E2Type
+)*/
+
 func SetupCloseHandler() {
 	c := make(chan os.Signal, 2)
 	signal.Notify(c, os.Interrupt, syscall.SIGTERM)
diff --git a/container-tag.yaml b/container-tag.yaml
index 4381920..547f709 100644
--- a/container-tag.yaml
+++ b/container-tag.yaml
@@ -2,4 +2,4 @@
 # By default this file is in the docker build directory,
 # but the location can configured in the JJB template.
 ---
-tag: 0.7.5
+tag: 0.7.6
diff --git a/pkg/nbi/control.go b/pkg/nbi/control.go
index d88f5b0..2ed63d8 100644
--- a/pkg/nbi/control.go
+++ b/pkg/nbi/control.go
@@ -30,16 +30,19 @@
 	"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
 	"net/http"
 	"os"
+	"routing-manager/pkg/models"
 	"routing-manager/pkg/rpe"
 	"routing-manager/pkg/rtmgr"
 	"routing-manager/pkg/sbi"
 	"routing-manager/pkg/sdl"
 	"strconv"
+	"strings"
 	"sync"
 	"time"
 )
 
 var m sync.Mutex
+var EndpointLock sync.Mutex
 
 var nbiEngine Engine
 var sbiEngine sbi.Engine
@@ -139,11 +142,15 @@
 		}
 	}
 
-	ep := sbiEngine.CheckEndpoint(string(params.Payload))
-	if ep == nil {
-		xapp.Logger.Error("Update Routing Table Request can't handle due to end point %s is not avail in complete ep list: ", string(params.Payload))
-		return
+	/* hack with WA only for mcxapp in near future */
+	if strings.Contains(msg.String(), "ricxapp") {
+		ep := sbiEngine.CheckEndpoint(string(params.Payload))
+		if ep == nil {
+			xapp.Logger.Error("Update Routing Table Request can't handle due to end point %s is not avail in complete ep list: ", string(params.Payload))
+			return
+		}
 	}
+
 	epstr, whid := sbiEngine.CreateEndpoint(msg.String())
 	if epstr == nil || whid < 0 {
 		xapp.Logger.Error("Wormhole Id creation failed %d for %s", whid, msg.String())
@@ -184,7 +191,19 @@
 	if err != nil {
 		return errors.New("Routing table cannot be published due to: " + err.Error())
 	}
+	EndpointLock.Lock()
 	sbiEngine.UpdateEndpoints(data)
+	EndpointLock.Unlock()
+
+	return nil
+}
+
+func sendPartialRoutesToAll(xappSubData *models.XappSubscriptionData, updatetype rtmgr.RMRUpdateType) (err error) {
+	policies := rpeEngine.GeneratePartialPolicies(rtmgr.Eps, xappSubData, updatetype)
+	err = sbiEngine.DistributeAll(policies)
+	if err != nil {
+		return errors.New("Routing table cannot be published due to: " + err.Error())
+	}
 
 	return nil
 }
@@ -224,9 +243,18 @@
 	/* used for rtmgr restart case to connect to Endpoints */
 	go updateEp()
 	time.Sleep(5 * time.Second)
+	sendRoutesToAll()
+	/* Sometimes first message  fails, retry after 5 sec */
+	time.Sleep(5 * time.Second)
+	sendRoutesToAll()
 
 	for {
-		sendRoutesToAll()
+		xapp.Logger.Debug("Periodic Routes value = %s", xapp.Config.GetString("periodicroutes"))
+		if xapp.Config.GetString("periodicroutes") == "enable" {
+			go updateEp()
+			time.Sleep(5 * time.Second)
+			sendRoutesToAll()
+		}
 
 		rtmgr.Rtmgr_ready = true
 		time.Sleep(INTERVAL * time.Second)
diff --git a/pkg/nbi/httprestful.go b/pkg/nbi/httprestful.go
index 5393dc9..60d65fb 100644
--- a/pkg/nbi/httprestful.go
+++ b/pkg/nbi/httprestful.go
@@ -146,6 +146,7 @@
 				m.Unlock()
 				updateEp()
 				return sendRoutesToAll()
+				//return sendPartialRoutesToAll(nil, rtmgr.XappType)
 			}
 		}
 
@@ -227,7 +228,7 @@
 	addSubscription(&rtmgr.Subs, data)
 	xapp.Logger.Debug("Endpoints: %v", rtmgr.Eps)
 	updateEp()
-	return sendRoutesToAll()
+	return sendPartialRoutesToAll(data, rtmgr.SubsType)
 }
 
 func subscriptionExists(data *models.XappSubscriptionData) bool {
@@ -295,6 +296,7 @@
 	err, IsDuplicate := validateE2tData(data)
 	if IsDuplicate == true {
 		updateEp()
+		//return sendPartialRoutesToAll(nil, rtmgr.E2Type)
 		return sendRoutesToAll()
 	}
 
@@ -309,6 +311,7 @@
 	sdlEngine.WriteNewE2TInstance(xapp.Config.GetString("rtfile"), e2data, meiddata)
 	m.Unlock()
 	updateEp()
+	//sendPartialRoutesToAll(nil, rtmgr.E2Type)
 	sendRoutesToAll()
 	time.Sleep(10 * time.Second)
 	for ep, value := range rtmgr.RMRConnStatus {
diff --git a/pkg/rpe/rmr.go b/pkg/rpe/rmr.go
index eb7d7ed..17c977b 100644
--- a/pkg/rpe/rmr.go
+++ b/pkg/rpe/rmr.go
@@ -32,6 +32,7 @@
 
 import (
 	"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
+	"routing-manager/pkg/models"
 	"routing-manager/pkg/rtmgr"
 	"strconv"
 	//"strings"
@@ -114,6 +115,49 @@
 	return &rawrt
 }
 
+/*
+Produces the raw route message consumable by RMR
+*/
+func (r *Rmr) generatePartialRMRPolicies(eps rtmgr.Endpoints, xappSubData *models.XappSubscriptionData, key string, updatetype rtmgr.RMRUpdateType) *[]string {
+	rawrt := []string{key + "updatert|start\n"}
+	rt := r.generatePartialRouteTable(eps, xappSubData, updatetype)
+	for _, rte := range *rt {
+		rawrte := key + "mse|" + rte.MessageType
+		for _, tx := range rte.TxList {
+			rawrte += "," + tx.Ip + ":" + strconv.Itoa(int(tx.Port))
+		}
+		rawrte += "|" + strconv.Itoa(int(rte.SubID)) + "|"
+		group := ""
+		for _, rxg := range rte.RxGroups {
+			member := ""
+			for _, rx := range rxg {
+				if member == "" {
+					member += rx.Ip + ":" + strconv.Itoa(int(rx.Port))
+				} else {
+					member += "," + rx.Ip + ":" + strconv.Itoa(int(rx.Port))
+				}
+			}
+			if group == "" {
+				group += member
+			} else {
+				group += ";" + member
+			}
+		}
+		rawrte += group
+
+		if rte.RouteType == "%meid" {
+			rawrte += group + rte.RouteType
+		}
+
+		rawrt = append(rawrt, rawrte+"\n")
+	}
+
+	rawrt = append(rawrt, key+"updatert|end\n")
+	//count := 0
+
+	xapp.Logger.Debug("rmr.GeneratePolicies returns: %v", rawrt)
+	return &rawrt
+}
 func (r *RmrPush) GeneratePolicies(eps rtmgr.Endpoints, rcs *rtmgr.RicComponents) *[]string {
 	xapp.Logger.Debug("Invoked rmr.GeneratePolicies, args: %v: ", eps)
 	return r.generateRMRPolicies(eps, rcs, "")
@@ -122,3 +166,8 @@
 func (r *RmrPush) GenerateRouteTable(eps rtmgr.Endpoints) *rtmgr.RouteTable {
 	return r.generateRouteTable(eps)
 }
+
+func (r *RmrPush) GeneratePartialPolicies(eps rtmgr.Endpoints, xappSubData *models.XappSubscriptionData, updatetype rtmgr.RMRUpdateType) *[]string {
+	xapp.Logger.Debug("Invoked rmr.GeneratePartialRMR, args: %v: ", eps)
+	return r.generatePartialRMRPolicies(eps, xappSubData, "", updatetype)
+}
diff --git a/pkg/rpe/rpe.go b/pkg/rpe/rpe.go
index 2c0423e..a11baf8 100644
--- a/pkg/rpe/rpe.go
+++ b/pkg/rpe/rpe.go
@@ -32,6 +32,7 @@
 import (
 	"errors"
 	"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
+	"routing-manager/pkg/models"
 	"routing-manager/pkg/rtmgr"
 	"routing-manager/pkg/sbi"
 	"runtime"
@@ -258,7 +259,27 @@
 	}
 }
 
-func (r *Rpe) generatePlatformRoutes(e2TermEp []rtmgr.Endpoint, subManEp *rtmgr.Endpoint, e2ManEp *rtmgr.Endpoint, rsmEp *rtmgr.Endpoint, a1mediatorEp *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) {
+func (r *Rpe) generatePartialSubscriptionTable(xappSubData *models.XappSubscriptionData, subManEp *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) {
+	xapp.Logger.Debug("rpe.addSingleSubscriptionRoutes invoked")
+	xAppUuid := *xappSubData.Address + ":" + strconv.Itoa(int(*xappSubData.Port))
+	xapp.Logger.Debug("xApp UUID: %v", xAppUuid)
+	xAppEp := getEndpointByUuid(xAppUuid)
+	if xAppEp != nil {
+		//Subscription Manager -> xApp
+		r.addRoute("RIC_SUB_RESP", subManEp, xAppEp, routeTable, *xappSubData.SubscriptionID, "")
+		r.addRoute("RIC_SUB_FAILURE", subManEp, xAppEp, routeTable, *xappSubData.SubscriptionID, "")
+		r.addRoute("RIC_SUB_DEL_RESP", subManEp, xAppEp, routeTable, *xappSubData.SubscriptionID, "")
+		r.addRoute("RIC_SUB_DEL_FAILURE", subManEp, xAppEp, routeTable, *xappSubData.SubscriptionID, "")
+		//E2 Termination -> xApp
+		r.addRoute("RIC_INDICATION", nil, xAppEp, routeTable, *xappSubData.SubscriptionID, "")
+		r.addRoute("RIC_CONTROL_ACK", nil, xAppEp, routeTable, *xappSubData.SubscriptionID, "")
+		r.addRoute("RIC_CONTROL_FAILURE", nil, xAppEp, routeTable, *xappSubData.SubscriptionID, "")
+	} else {
+		xapp.Logger.Error("generateSubscriptionRoutes xAppEp is nil, xApp UUID: %v", xAppUuid)
+	}
+}
+
+func (r *Rpe) generatePlatformRoutes(e2TermEp []rtmgr.Endpoint, subManEp *rtmgr.Endpoint, e2ManEp *rtmgr.Endpoint, a1mediatorEp *rtmgr.Endpoint, routeTable *rtmgr.RouteTable) {
 	xapp.Logger.Debug("rpe.generatePlatformRoutes invoked")
 	//Platform Routes --- Subscription Routes
 	//Subscription Manager -> E2 Termination
@@ -274,8 +295,8 @@
 			sendEp = subManEp
 		case "E2MAN":
 			sendEp = e2ManEp
-		case "RSM":
-			sendEp = rsmEp
+		//case "RSM":,
+		//	sendEp = rsmEp
 		case "A1MEDIATOR":
 			sendEp = a1mediatorEp
 		}
@@ -286,8 +307,8 @@
 			Ep = e2ManEp
 		//case "UEMAN":
 		//	Ep = ueManEp
-		case "RSM":
-			Ep = rsmEp
+		//case "RSM":
+		//	Ep = rsmEp
 		case "A1MEDIATOR":
 			Ep = a1mediatorEp
 		}
@@ -303,6 +324,54 @@
 	}
 }
 
+func (r *Rpe) generatePartialRouteTable(endPointList rtmgr.Endpoints, xappSubData *models.XappSubscriptionData, updatetype rtmgr.RMRUpdateType) *rtmgr.RouteTable {
+	xapp.Logger.Debug("rpe.generatePartialRouteTable invoked")
+	xapp.Logger.Debug("Endpoint List:  %v", endPointList)
+	routeTable := &rtmgr.RouteTable{}
+	subManEp := getEndpointByName(&endPointList, "SUBMAN")
+	if subManEp == nil {
+		xapp.Logger.Error("Platform component not found: %v", "Subscription Manager")
+		xapp.Logger.Debug("Endpoints: %v", endPointList)
+	}
+	/*e2TermListEp := getEndpointListByName(&endPointList, "E2TERMINST")
+	if len(e2TermListEp) == 0 {
+		xapp.Logger.Error("Platform component not found: %v", "E2 Termination List")
+		xapp.Logger.Debug("Endpoints: %v", endPointList)
+	}
+	e2ManEp := getEndpointByName(&endPointList, "E2MAN")
+	if e2ManEp == nil {
+		xapp.Logger.Error("Platform component not found: %v", "E2 Manager")
+		xapp.Logger.Debug("Endpoints: %v", endPointList)
+	}*/
+
+	if xappSubData != nil && updatetype == rtmgr.SubsType {
+		xapp.Logger.Info("Updating partial subscription routes")
+		r.generatePartialSubscriptionTable(xappSubData, subManEp, routeTable)
+	}
+	/*if updatetype == rtmgr.XappType {
+		xapp.Logger.Info("Updating partial xapp routes")
+		for _, endPoint := range endPointList {
+			xapp.Logger.Debug("Endpoint: %v, xAppType: %v", endPoint.Name, endPoint.XAppType)
+			if endPoint.XAppType != sbi.PlatformType && (len(endPoint.TxMessages) > 0 || len(endPoint.RxMessages) > 0) {
+				r.generateXappRoutes(endPoint, subManEp, routeTable)
+				r.generateXappToXappRoutes(endPoint, endPointList, routeTable)
+			}
+		}
+	}
+	if updatetype == rtmgr.E2Type {
+		xapp.Logger.Info("Updating partial E2 routes")
+		if len(e2TermListEp) > 0 {
+			r.addRoute_rx_list("RIC_SCTP_CLEAR_ALL", e2ManEp, e2TermListEp, routeTable, -1, "")
+			r.addRoute_rx_list("E2_TERM_KEEP_ALIVE_REQ", e2ManEp, e2TermListEp, routeTable, -1, "")
+			r.addRoute_rx_list("RIC_E2_SETUP_RESP", e2ManEp, e2TermListEp, routeTable, -1, "")
+			r.addRoute_rx_list("RIC_E2_SETUP_FAILURE", e2ManEp, e2TermListEp, routeTable, -1, "")
+		}
+	}*/
+
+	return routeTable
+
+}
+
 func (r *Rpe) generateRouteTable(endPointList rtmgr.Endpoints) *rtmgr.RouteTable {
 	xapp.Logger.Debug("rpe.generateRouteTable invoked")
 	xapp.Logger.Debug("Endpoint List:  %v", endPointList)
@@ -322,11 +391,11 @@
 		xapp.Logger.Error("Platform component not found: %v", "E2 Manager")
 		xapp.Logger.Debug("Endpoints: %v", endPointList)
 	}
-	rsmEp := getEndpointByName(&endPointList, "RSM")
+	/*rsmEp := getEndpointByName(&endPointList, "RSM")
 	if rsmEp == nil {
 		xapp.Logger.Error("Platform component not found: %v", "Resource Status Manager")
 		xapp.Logger.Debug("Endpoints: %v", endPointList)
-	}
+	}*/
 	A1MediatorEp := getEndpointByName(&endPointList, "A1MEDIATOR")
 	if A1MediatorEp == nil {
 		xapp.Logger.Error("Platform component not found: %v", "A1Mediator")
@@ -338,7 +407,7 @@
 		xapp.Logger.Error("Platform component not found: %v", "E2 Termination List")
 		xapp.Logger.Debug("Endpoints: %v", endPointList)
 	}
-	r.generatePlatformRoutes(e2TermListEp, subManEp, e2ManEp, rsmEp, A1MediatorEp, routeTable)
+	r.generatePlatformRoutes(e2TermListEp, subManEp, e2ManEp, A1MediatorEp, routeTable)
 
 	for _, endPoint := range endPointList {
 		xapp.Logger.Debug("Endpoint: %v, xAppType: %v", endPoint.Name, endPoint.XAppType)
diff --git a/pkg/rpe/types.go b/pkg/rpe/types.go
index 4a5b9fc..2d466d3 100644
--- a/pkg/rpe/types.go
+++ b/pkg/rpe/types.go
@@ -29,7 +29,10 @@
 
 package rpe
 
-import "routing-manager/pkg/rtmgr"
+import (
+	"routing-manager/pkg/models"
+	"routing-manager/pkg/rtmgr"
+)
 
 //type generatePolicies func(rtmgr.Endpoints) *[]string
 //type generateRouteTable func(rtmgr.Endpoints) *rtmgr.RouteTable
@@ -45,4 +48,5 @@
 type Engine interface {
 	GeneratePolicies(rtmgr.Endpoints, *rtmgr.RicComponents) *[]string
 	GenerateRouteTable(rtmgr.Endpoints) *rtmgr.RouteTable
+	GeneratePartialPolicies(eps rtmgr.Endpoints, xappSubData *models.XappSubscriptionData, updatetype rtmgr.RMRUpdateType) *[]string
 }
diff --git a/pkg/rtmgr/types.go b/pkg/rtmgr/types.go
index 41e8cf0..b9b9604 100644
--- a/pkg/rtmgr/types.go
+++ b/pkg/rtmgr/types.go
@@ -28,6 +28,14 @@
 
 package rtmgr
 
+type RMRUpdateType int
+
+const (
+	XappType = iota
+	SubsType
+	E2Type
+)
+
 type XApps struct {
 	XAppList []XApp
 }
diff --git a/pkg/sbi/nngpush.go b/pkg/sbi/nngpush.go
index 16545cb..ec4d25a 100644
--- a/pkg/sbi/nngpush.go
+++ b/pkg/sbi/nngpush.go
@@ -87,7 +87,8 @@
 }
 
 func (c *RmrPush) AddEndpoint(ep *rtmgr.Endpoint) error {
-	count := addendpointct + 1
+	addendpointct = addendpointct + 1
+	count := addendpointct
 	xapp.Logger.Debug("Invoked sbi.AddEndpoint for %s with count = %d", ep.Ip, count)
 	endpoint := ep.Ip + ":" + strconv.Itoa(DefaultRmrPipelineSocketNumber)
 	ep.Whid = int(xapp.Rmr.Openwh(endpoint))
@@ -159,7 +160,6 @@
 	xapp.Logger.Debug("Push policy to endpoint: " + ep.Uuid)
 
 	ret := c.send_data(ep, policies, call_id)
-	xapp.Logger.Debug("return value is %v", ret)
 	conn.Lock()
 	rtmgr.RMRConnStatus[ep.Uuid] = ret
 	conn.Unlock()
diff --git a/pkg/sbi/types.go b/pkg/sbi/types.go
index b52dcfa..094a8ad 100644
--- a/pkg/sbi/types.go
+++ b/pkg/sbi/types.go
@@ -45,9 +45,9 @@
 	AddEndpoint(*rtmgr.Endpoint) error
 	DeleteEndpoint(*rtmgr.Endpoint) error
 	UpdateEndpoints(*rtmgr.RicComponents)
-	CreateEndpoint(string)(*string,int)
-	CheckEndpoint(string)*rtmgr.Endpoint
-	DistributeToEp(*[]string, string, int ) error
+	CheckEndpoint(string) *rtmgr.Endpoint
+	CreateEndpoint(string) (*string, int)
+	DistributeToEp(*[]string, string, int) error
 }
 
 /*type NngSocket interface {