Publish Route only once to the endpoint that requests it. Periodic/Event based distribution will be done only for the process with RMR Control Port 4561
Change-Id: Ieae5db99d62e580a2bc8d9e5ae3a00963b0618a5
Signed-off-by: wahidw <abdulwahid.w@nokia.com>
diff --git a/pkg/nbi/control.go b/pkg/nbi/control.go
index e31379d..db6c405 100644
--- a/pkg/nbi/control.go
+++ b/pkg/nbi/control.go
@@ -144,14 +144,22 @@
return
}
- ep,whid := sbiEngine.CreateEndpoint(string(params.Payload),msg.String())
- if ep == nil || whid < 0 {
+ ep := sbiEngine.CheckEndpoint(string(params.Payload))
+ if ep == nil {
xapp.Logger.Error("Update Routing Table Request can't handle due to end point %s is not avail in complete ep list: ", string(params.Payload))
return
}
+ epstr,whid := sbiEngine.CreateEndpoint(msg.String())
+ if epstr == nil || whid < 0 {
+ xapp.Logger.Error("Wormhole Id creation failed %d for %s",whid,msg.String() )
+ return
+ }
+ /*This is to ensure the latest routes are sent.
+ Assumption is that in this time interval the routes are built for this endpoint */
+ time.Sleep(100 * time.Millisecond)
policies := rpeEngine.GeneratePolicies(rtmgr.Eps, data)
- err = sbiEngine.DistributeToEp(policies, *ep, whid)
+ err = sbiEngine.DistributeToEp(policies, *epstr, whid)
if err != nil {
xapp.Logger.Error("Routing table cannot be published due to: " + err.Error())
return
diff --git a/pkg/sbi/nngpush.go b/pkg/sbi/nngpush.go
index 1b0bed0..4b7f871 100644
--- a/pkg/sbi/nngpush.go
+++ b/pkg/sbi/nngpush.go
@@ -177,20 +177,21 @@
params.Timeout = 200
state, retstr = xapp.Rmr.SendCallMsg(params.RMRParams)
routestatus := strings.Split(retstr," ")
- if state != C.RMR_OK && routestatus[0] == "OK" {
+ if state != C.RMR_OK && routestatus[0] != "OK" {
xapp.Logger.Error("Updating Routes to Endpoint: " + ep.Uuid + " failed, call_id: " + strconv.Itoa(call_id) + " for xapp.Rmr.SendCallMsg " + " Route Update Status: " + routestatus[0])
return false
} else {
xapp.Logger.Info("Update Routes to Endpoint: " + ep.Uuid + " successful, call_id: " + strconv.Itoa(call_id) + ", Payload length: " + strconv.Itoa(params.PayloadLen) + ", Route Update Status: " + routestatus[0] + "(# of Entries:" + strconv.Itoa(len(*policies)))
return true
}
-
- xapp.Logger.Error("Route Update to endpoint: " + ep.Uuid + " failed, call_id: " + strconv.Itoa(call_id) + " xapp.Rmr.SendCallMsg not called")
- return false
}
-func (c *RmrPush) CreateEndpoint(payload string,rmrsrc string)(ep *string,whid int) {
- return c.createEndpoint(payload,rmrsrc, c)
+func (c *RmrPush) CheckEndpoint(payload string)(ep *rtmgr.Endpoint) {
+ return c.checkEndpoint(payload)
+}
+
+func (c *RmrPush) CreateEndpoint(rmrsrc string)(ep *string,whid int) {
+ return c.createEndpoint(rmrsrc)
}
func (c *RmrPush) DistributeToEp(policies *[]string, ep string, whid int) error {
@@ -230,15 +231,12 @@
params.Timeout = 200
state, retstr = xapp.Rmr.SendCallMsg(params.RMRParams)
routestatus := strings.Split(retstr," ")
- if state != C.RMR_OK && routestatus[0] == "OK" {
+ if state != C.RMR_OK && routestatus[0] != "OK" {
xapp.Logger.Error("Updating Routes to Endpoint: " + ep + " failed, call_id: " + strconv.Itoa(call_id) + ",whi_id: " + strconv.Itoa(whid) + " for xapp.Rmr.SendCallMsg " + " Route Update Status: " + routestatus[0])
return false
} else {
- xapp.Logger.Info("Update Routes to Endpoint: " + ep + " successful, call_id: " + strconv.Itoa(call_id) + ", Payload length: " + strconv.Itoa(params.PayloadLen) + ",whi_id: " + strconv.Itoa(whid) + ", Route Update Status: " + routestatus[0] + "(# of Entries:" + strconv.Itoa(len(*policies)))
+ xapp.Logger.Info("Update Routes to Endpoint: " + ep + " successful, call_id: " + strconv.Itoa(call_id) + ", Payload length: " + strconv.Itoa(params.PayloadLen) + ",whid: " + strconv.Itoa(whid) + ", Route Update Status: " + routestatus[0] + "(# of Entries:" + strconv.Itoa(len(*policies)))
return true
}
-
- xapp.Logger.Error("Route Update to endpoint: " + ep + " failed, call_id: " + strconv.Itoa(call_id) + ",whi_id: " + strconv.Itoa(whid) + " xapp.Rmr.SendCallMsg not called")
- return false
}
diff --git a/pkg/sbi/nngpush_test.go b/pkg/sbi/nngpush_test.go
index 79821eb..aae9121 100644
--- a/pkg/sbi/nngpush_test.go
+++ b/pkg/sbi/nngpush_test.go
@@ -27,6 +27,7 @@
*/
package sbi
+
import (
//"errors"
"routing-manager/pkg/rtmgr"
@@ -117,6 +118,7 @@
var rmrpush = RmrPush{}
resetTestPushDataset(rmrpush, stub.ValidEndpoints)
+ rmrcallid = 200
err = rmrpush.DistributeAll(stub.ValidPolicies)
if err != nil {
t.Errorf("rmrpush.DistributeAll(policies) was incorrect, got: %v, want: %v.", err, "nil")
@@ -131,6 +133,7 @@
var rmrpush = RmrPush{}
resetTestPushDataset(rmrpush, stub.ValidEndpoints)
+ rmrdynamiccallid = 255
err = rmrpush.DistributeToEp(stub.ValidPolicies,"localhost:4561",100)
if err != nil {
t.Errorf("rmrpush.DistributetoEp(policies) was incorrect, got: %v, want: %v.", err, "nil")
@@ -148,11 +151,18 @@
}
}
+func TestCheckEndpoint(t *testing.T) {
+ var rmrpush = RmrPush{}
+ resetTestPushDataset(rmrpush, stub.ValidEndpoints1)
+ rmrpush.CheckEndpoint("192.168.0.1:0")
+ rmrpush.CheckEndpoint("10.2.2.1:0")
+ rmrpush.CheckEndpoint("localhost:0")
+}
+
func TestCreateEndpoint(t *testing.T) {
var rmrpush = RmrPush{}
resetTestPushDataset(rmrpush, stub.ValidEndpoints1)
- rmrpush.CreateEndpoint("192.168.0.1:0","Src=192.168.0.1:4561")
- rmrpush.CreateEndpoint("localhost:4560","Src=192.168.11.1:4444")
+ rmrpush.CreateEndpoint("Src=127.0.0.1:4561 hello")
}
/*
Initialize and send policies
@@ -163,3 +173,26 @@
policies := []string{"hello","welcome"}
rmrpush.send_data(rtmgr.Eps["localhost"],&policies,1)
}
+
+func TestString( t *testing.T) {
+ var params xapp.RMRParams
+ params.Payload = []byte("abcdefgh")
+ params.Meid = &xapp.RMRMeid{}
+ msg := RMRParams{¶ms}
+ msg.String()
+
+}
+
+func TestSenddata(t *testing.T) {
+ var rmrpush = RmrPush{}
+ ep := rtmgr.Endpoint{Whid:-1, Ip:"1.1.1.1"}
+ policies := []string{"mse|12345|-1|local.com"}
+ rmrpush.send_data(&ep, &policies,300)
+}
+
+func TestSendDynamicdata(t *testing.T) {
+ var rmrpush = RmrPush{}
+ ep := "1.1.1.1"
+ policies := []string{"mse|12345|-1|local.com"}
+ rmrpush.sendDynamicRoutes(ep,1, &policies,300)
+}
diff --git a/pkg/sbi/sbi.go b/pkg/sbi/sbi.go
index 21ff78e..8833987 100644
--- a/pkg/sbi/sbi.go
+++ b/pkg/sbi/sbi.go
@@ -31,9 +31,9 @@
import (
"errors"
- //"fmt"
+ "fmt"
"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
- //"net"
+ "net"
"routing-manager/pkg/rtmgr"
"strconv"
"strings"
@@ -123,16 +123,16 @@
rtmgr.Eps[uuid].Keepalive = true
} else {
ep := &rtmgr.Endpoint{
- Uuid: uuid,
- Name: pc.Name,
- XAppType: PlatformType,
- Ip: pc.Fqdn,
- Port: pc.Port,
+ Uuid: uuid,
+ Name: pc.Name,
+ XAppType: PlatformType,
+ Ip: pc.Fqdn,
+ Port: pc.Port,
//TxMessages: rtmgr.PLATFORMMESSAGETYPES[pc.Name]["tx"],
//RxMessages: rtmgr.PLATFORMMESSAGETYPES[pc.Name]["rx"],
- Socket: nil,
- IsReady: false,
- Keepalive: true,
+ Socket: nil,
+ IsReady: false,
+ Keepalive: true,
}
xapp.Logger.Debug("ep created: %v", ep)
if err := sbi.AddEndpoint(ep); err != nil {
@@ -155,16 +155,16 @@
rtmgr.Eps[uuid].Keepalive = true
} else {
ep := &rtmgr.Endpoint{
- Uuid: uuid,
- Name: e2t.Name,
- XAppType: PlatformType,
- Ip: ipaddress,
- Port: uint16(port),
+ Uuid: uuid,
+ Name: e2t.Name,
+ XAppType: PlatformType,
+ Ip: ipaddress,
+ Port: uint16(port),
//TxMessages: rtmgr.PLATFORMMESSAGETYPES[e2t.Name]["tx"],
//RxMessages: rtmgr.PLATFORMMESSAGETYPES[e2t.Name]["rx"],
- Socket: nil,
- IsReady: false,
- Keepalive: true,
+ Socket: nil,
+ IsReady: false,
+ Keepalive: true,
}
xapp.Logger.Debug("ep created: %v", ep)
if err := sbi.AddEndpoint(ep); err != nil {
@@ -176,40 +176,47 @@
}
}
-func (s *Sbi) createEndpoint(payload string,rmrsrc string, sbi Engine) (*string,int) {
- xapp.Logger.Debug("CreateEndPoint %v", payload)
-// stringSlice := strings.Split(payload, " ")
-// uuid := stringSlice[0]
-// xapp.Logger.Debug(">>> uuid %v", stringSlice[0])
+func (s *Sbi) checkEndpoint(payload string) *rtmgr.Endpoint {
+ /* Payload contains endpoint in the form of IP<domain name>:Port.
+ Port is data port of sender endpoint.
+ Eps contains the UUID in the form of IP<domain name>:Port.
+ Port is the Application Port(http) */
-/* if _, ok := rtmgr.Eps[uuid]; ok {
- ep := rtmgr.Eps[uuid]
- return ep
- }*/
+ xapp.Logger.Debug("Invoked checkEndPoint %v", payload)
+ stringSlice := strings.Split(payload, " ")
+ uuid := stringSlice[0]
+ stringsubsplit := strings.Split(uuid, ":")
+ xapp.Logger.Debug(">>> uuid %v", stringSlice[0])
+ for _, ep := range rtmgr.Eps {
+ if strings.Contains(ep.Uuid, stringsubsplit[0]) == true {
+ endpoint := rtmgr.Eps[ep.Uuid]
+ return endpoint
+ }
+ }
/* incase the stored Endpoint list is in the form of IP:port*/
-/* stringsubsplit := strings.Split(uuid, ":")
addr, err := net.LookupIP(stringsubsplit[0])
if err == nil {
convertedUuid := fmt.Sprintf("%s:%s", addr[0], stringsubsplit[1])
xapp.Logger.Info(" IP:Port received is %s", convertedUuid)
- if _, ok := rtmgr.Eps[convertedUuid]; ok {
- ep := rtmgr.Eps[convertedUuid]
- return ep
+ IP := fmt.Sprintf("%s", addr[0])
+ for _, ep := range rtmgr.Eps {
+ res := strings.Contains(ep.Uuid, IP)
+ if res == true {
+ endpoint := rtmgr.Eps[ep.Uuid]
+ return endpoint
+ }
}
- }*/
+ }
+ return nil
+}
+func (s *Sbi) createEndpoint(rmrsrc string) (*string, int) {
/* Create a new mapping, this case is assumed for multiple process sending RMR request from a container */
- srcString := strings.Split(rmrsrc," ")
- srcStringSlice := strings.Split(srcString[0],"=")
+ srcString := strings.Split(rmrsrc, " ")
+ srcStringSlice := strings.Split(srcString[0], "=")
Whid := int(xapp.Rmr.Openwh(srcStringSlice[1]))
- xapp.Logger.Info("Wormhole Id created is %d for EndPoint %s",Whid,srcStringSlice[1])
- if Whid > 0 {
-// rtmgr.RmrEp[srcStringSlice[1]] = Whid
- xapp.Logger.Info("received %s and mapped to Whid = %d",srcStringSlice[1],Whid)
- return &srcStringSlice[1],Whid
- }
-
- return nil,Whid
- }
+ xapp.Logger.Info("Wormhole Id created is %d for EndPoint %s", Whid, srcStringSlice[1])
+ return &srcStringSlice[1], Whid
+}
diff --git a/pkg/sbi/types.go b/pkg/sbi/types.go
index e2f14ed..b52dcfa 100644
--- a/pkg/sbi/types.go
+++ b/pkg/sbi/types.go
@@ -45,7 +45,8 @@
AddEndpoint(*rtmgr.Endpoint) error
DeleteEndpoint(*rtmgr.Endpoint) error
UpdateEndpoints(*rtmgr.RicComponents)
- CreateEndpoint(string,string)(*string,int)
+ CreateEndpoint(string)(*string,int)
+ CheckEndpoint(string)*rtmgr.Endpoint
DistributeToEp(*[]string, string, int ) error
}
diff --git a/pkg/stub/stub.go b/pkg/stub/stub.go
index b3e5f3d..f5f3b72 100644
--- a/pkg/stub/stub.go
+++ b/pkg/stub/stub.go
@@ -71,6 +71,7 @@
{Uuid: "192.168.0.1:0", Name: "SUBMAN", XAppType: "app2", Ip: "", Port: 0, TxMessages: []string{"", ""}, RxMessages: []string{"", ""}, Socket: nil, IsReady: false, Keepalive: false},
{Uuid: "10.1.1.1:0", Name: "E2MAN", XAppType: "app3", Ip: "", Port: 0, TxMessages: []string{"", ""}, RxMessages: []string{"", ""}, Socket: nil, IsReady: true, Keepalive: false},
{Uuid: "10.2.2.1:0", Name: "UEMAN", XAppType: "app4", Ip: "", Port: 0, TxMessages: []string{"", ""}, RxMessages: []string{"", ""}, Policies: []int32{1, 2}, Socket: nil, IsReady: false, Keepalive: true},
+ {Uuid: "127.0.0.1:0", Name: "UEMAN1", XAppType: "app4", Ip: "", Port: 0, TxMessages: []string{"", ""}, RxMessages: []string{"", ""}, Policies: []int32{1, 2}, Socket: nil, IsReady: false, Keepalive: true},
}
var ValidSubscriptions = &[]rtmgr.Subscription{