RICPLT-2910
libe2ap library updated and moved to 3rdparty directory
e2ap_wrapper library to be used from go:
- message wrappers (used now only in tests)
- function wrappers that manipulates message
Go wrappers for messages
Initial unittest implementation.
Change-Id: Ie77cc2ae90b83b12e5c738e6f570fe4661961da6
Signed-off-by: Juha Hyttinen <juha.hyttinen@nokia.com>
diff --git a/pkg/control/control.go b/pkg/control/control.go
index 34dd6a0..fc45f6f 100644
--- a/pkg/control/control.go
+++ b/pkg/control/control.go
@@ -30,25 +30,26 @@
"github.com/go-openapi/strfmt"
"github.com/spf13/viper"
"math/rand"
+ "sync"
"time"
- "sync"
)
var subReqTime time.Duration = 5 * time.Second
var SubDelReqTime time.Duration = 5 * time.Second
type Control struct {
- e2ap *E2ap
- registry *Registry
- rtmgrClient *RtmgrClient
- tracker *Tracker
- timerMap *TimerMap
- rmrSendMutex *sync.Mutex
+ e2ap *E2ap
+ registry *Registry
+ rtmgrClient *RtmgrClient
+ tracker *Tracker
+ timerMap *TimerMap
+ rmrSendMutex sync.Mutex
+ skipRouteUpdate bool // temp solution to skip routeupdate in unittests
}
type RMRMeid struct {
- PlmnID string
- EnbID string
+ PlmnID string
+ EnbID string
RanName string
}
@@ -77,7 +78,7 @@
xapp.Logger.Info("SUBMGR: Initial Sequence Number: %v", seedSN)
}
-func NewControl() Control {
+func NewControl() *Control {
registry := new(Registry)
registry.Initialize(seedSN)
@@ -87,15 +88,19 @@
timerMap := new(TimerMap)
timerMap.Init()
- rmrSendMutex := &sync.Mutex{}
-
transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
client := rtmgrclient.New(transport, strfmt.Default)
handle := rtmgrhandle.NewProvideXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
deleteHandle := rtmgrhandle.NewDeleteXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
rtmgrClient := RtmgrClient{client, handle, deleteHandle}
- return Control{new(E2ap), registry, &rtmgrClient, tracker, timerMap, rmrSendMutex}
+ return &Control{e2ap: new(E2ap),
+ registry: registry,
+ rtmgrClient: &rtmgrClient,
+ tracker: tracker,
+ timerMap: timerMap,
+ skipRouteUpdate: false,
+ }
}
func (c *Control) Run() {
@@ -105,12 +110,12 @@
func (c *Control) rmrSend(params *xapp.RMRParams) (err error) {
status := false
i := 1
- for ; i <= 10 && status == false; i++ {
+ for ; i <= 10 && status == false; i++ {
c.rmrSendMutex.Lock()
status = xapp.Rmr.Send(params, false)
c.rmrSendMutex.Unlock()
if status == false {
- xapp.Logger.Info("rmr.Send() failed. Retry count %v, Mtype: %v, SubId: %v, Xid %s",i, params.Mtype, params.SubId, params.Xid)
+ xapp.Logger.Info("rmr.Send() failed. Retry count %v, Mtype: %v, SubId: %v, Xid %s", i, params.Mtype, params.SubId, params.Xid)
time.Sleep(500 * time.Millisecond)
}
}
@@ -147,15 +152,15 @@
}
func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) {
- xapp.Logger.Info("SubReq received from Src: %s, Mtype: %v, SubId: %v, Xid: %s, Meid: %v",params.Src, params.Mtype, params.SubId, params.Xid, params.Meid)
+ xapp.Logger.Info("SubReq received from Src: %s, Mtype: %v, SubId: %v, Xid: %s, Meid: %v", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid)
xapp.Rmr.Free(params.Mbuf)
params.Mbuf = nil
/* Reserve a sequence number and set it in the payload */
newSubId, isIdValid := c.registry.ReserveSequenceNumber()
if isIdValid != true {
- xapp.Logger.Error("SubReq: Failed to reserve sequence number. Dropping this msg. SubId: %v, Xid: %s",params.SubId, params.Xid)
- return
+ xapp.Logger.Error("SubReq: Failed to reserve sequence number. Dropping this msg. SubId: %v, Xid: %s", params.SubId, params.Xid)
+ return
}
err := c.e2ap.SetSubscriptionRequestSequenceNumber(params.Payload, newSubId)
@@ -182,15 +187,18 @@
/* Update routing manager about the new subscription*/
subRouteAction := SubRouteInfo{CREATE, *srcAddr, *srcPort, newSubId}
xapp.Logger.Info("SubReq: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid)
- err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
- if err != nil {
- xapp.Logger.Error("SubReq: Failed to update routing manager. Dropping this SubReq msg. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
- return
+
+ if c.skipRouteUpdate == false {
+ err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
+ if err != nil {
+ xapp.Logger.Error("SubReq: Failed to update routing manager. Dropping this SubReq msg. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
+ return
+ }
}
// Setting new subscription ID in the RMR header
params.SubId = int(newSubId)
- xapp.Logger.Info("Forwarding SubReq to E2T: Mtype: %v, SubId: %v, Xid %s, Meid %v",params.Mtype, params.SubId, params.Xid, params.Meid)
+ xapp.Logger.Info("Forwarding SubReq to E2T: Mtype: %v, SubId: %v, Xid %s, Meid %v", params.Mtype, params.SubId, params.Xid, params.Meid)
err = c.rmrSend(params)
if err != nil {
xapp.Logger.Error("SubReq: Failed to send request to E2T %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
@@ -202,7 +210,7 @@
}
func (c *Control) handleSubscriptionResponse(params *xapp.RMRParams) {
- xapp.Logger.Info("SubResp received from Src: %s, Mtype: %v, SubId: %v, Meid: %v",params.Src, params.Mtype, params.SubId, params.Meid)
+ xapp.Logger.Info("SubResp received from Src: %s, Mtype: %v, SubId: %v, Meid: %v", params.Src, params.Mtype, params.SubId, params.Meid)
xapp.Rmr.Free(params.Mbuf)
params.Mbuf = nil
@@ -211,10 +219,10 @@
xapp.Logger.Error("SubResp: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, SubId: %v", err, params.SubId)
return
}
- xapp.Logger.Info("SubResp: Received payloadSeqNum: %v",payloadSeqNum)
+ xapp.Logger.Info("SubResp: Received payloadSeqNum: %v", payloadSeqNum)
if !c.registry.IsValidSequenceNumber(payloadSeqNum) {
- xapp.Logger.Error("SubResp: Unknown payloadSeqNum. Dropping this msg. PayloadSeqNum: %v, SubId: %v", payloadSeqNum, params.SubId)
+ xapp.Logger.Error("SubResp: Unknown payloadSeqNum. Dropping this msg. PayloadSeqNum: %v, SubId: %v", payloadSeqNum, params.SubId)
return
}
@@ -224,15 +232,15 @@
var transaction Transaction
transaction, err = c.tracker.RetriveTransaction(payloadSeqNum, CREATE)
if err != nil {
- xapp.Logger.Error("SubResp: Failed to retrive transaction record. Dropping this msg. Err: %V, SubId: %v", err, params.SubId)
+ xapp.Logger.Error("SubResp: Failed to retrive transaction record. Dropping this msg. Err: %v, SubId: %v", err, params.SubId)
return
}
xapp.Logger.Info("SubResp: SubId: %v, from address: %v:%v. Retrieved old subId", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort)
- params.SubId = int(payloadSeqNum)
- params.Xid = transaction.OrigParams.Xid
-
- xapp.Logger.Info("SubResp: Forwarding Subscription Response to xApp Mtype: %v, SubId: %v, Meid: %v",params.Mtype, params.SubId, params.Meid)
+ params.SubId = int(payloadSeqNum)
+ params.Xid = transaction.OrigParams.Xid
+
+ xapp.Logger.Info("SubResp: Forwarding Subscription Response to xApp Mtype: %v, SubId: %v, Meid: %v", params.Mtype, params.SubId, params.Meid)
err = c.rmrReplyToSender(params)
if err != nil {
xapp.Logger.Error("SubResp: Failed to send response to xApp. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
@@ -248,7 +256,7 @@
}
func (c *Control) handleSubscriptionFailure(params *xapp.RMRParams) {
- xapp.Logger.Info("SubFail received from Src: %s, Mtype: %v, SubId: %v, Meid: %v",params.Src, params.Mtype, params.SubId, params.Meid)
+ xapp.Logger.Info("SubFail received from Src: %s, Mtype: %v, SubId: %v, Meid: %v", params.Src, params.Mtype, params.SubId, params.Meid)
xapp.Rmr.Free(params.Mbuf)
params.Mbuf = nil
@@ -263,8 +271,8 @@
var transaction Transaction
transaction, err = c.tracker.RetriveTransaction(payloadSeqNum, CREATE)
- if err != nil {
- xapp.Logger.Error("SubFail: Failed to retrive transaction record. Dropping this msg. Err: %v, SubId: %v: %s", err, params.SubId)
+ if err != nil {
+ xapp.Logger.Error("SubFail: Failed to retrive transaction record. Dropping this msg. Err: %v, SubId: %v", err, params.SubId)
return
}
xapp.Logger.Info("SubFail: SubId: %v, from address: %v:%v. Forwarding response to xApp", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort)
@@ -272,7 +280,7 @@
params.SubId = int(payloadSeqNum)
params.Xid = transaction.OrigParams.Xid
- xapp.Logger.Info("Forwarding SubFail to xApp: Mtype: %v, SubId: %v, Xid: %v, Meid: %v",params.Mtype, params.SubId, params.Xid, params.Meid)
+ xapp.Logger.Info("Forwarding SubFail to xApp: Mtype: %v, SubId: %v, Xid: %v, Meid: %v", params.Mtype, params.SubId, params.Xid, params.Meid)
err = c.rmrReplyToSender(params)
if err != nil {
xapp.Logger.Error("Failed to send response to xApp. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
@@ -280,11 +288,13 @@
time.Sleep(3 * time.Second)
- xapp.Logger.Info("SubFail: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid)
- subRouteAction := SubRouteInfo{CREATE, transaction.XappInstanceAddress, transaction.XappPort, payloadSeqNum}
- err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
- if err != nil {
- xapp.Logger.Error("SubFail: Failed to update routing manager. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
+ if c.skipRouteUpdate == false {
+ xapp.Logger.Info("SubFail: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid)
+ subRouteAction := SubRouteInfo{CREATE, transaction.XappInstanceAddress, transaction.XappPort, payloadSeqNum}
+ err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
+ if err != nil {
+ xapp.Logger.Error("SubFail: Failed to update routing manager. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
+ }
}
xapp.Logger.Info("SubFail: Deleting transaction record. SubId: %v, Xid: %s", params.SubId, params.Xid)
@@ -304,9 +314,9 @@
func (c *Control) handleSubscriptionRequestTimer(strId string, nbrId int) {
newSubId := uint16(nbrId)
xapp.Logger.Info("SubReq timer expired. newSubId: %v", newSubId)
-// var causeContent uint8 = 1 // just some random cause. To be checked later. Should be no respose or something
-// var causeVal uint8 = 1 // just some random val. To be checked later. Should be no respose or something
-// c.sendSubscriptionFailure(newSubId, causeContent, causeVal)
+ // var causeContent uint8 = 1 // just some random cause. To be checked later. Should be no respose or something
+ // var causeVal uint8 = 1 // just some random val. To be checked later. Should be no respose or something
+ // c.sendSubscriptionFailure(newSubId, causeContent, causeVal)
}
/*
@@ -324,13 +334,13 @@
params.SubId = int(subId)
params.Meid = transaction.OrigParams.Meid
params.Xid = transaction.OrigParams.Xid
-
+
// newPayload, packErr := c.e2ap.PackSubscriptionFailure(transaction.OrigParams.Payload, subId, causeContent, causeVal)
// if packErr != nil {
// xapp.Logger.Error("SendSubFail: PackSubscriptionFailure() due to %v", packErr)
// return
// }
-
+
newPayload := []byte("40CA4018000003EA7E00050000010016EA6300020021EA74000200C0") // Temporary solution
params.PayloadLen = len(newPayload)
@@ -392,7 +402,7 @@
}
func (c *Control) handleSubscriptionDeleteRequest(params *xapp.RMRParams) {
- xapp.Logger.Info("SubDelReq received from Src: %s, Mtype: %v, SubId: %v, Xid: %s, Meid: %v",params.Src, params.Mtype, params.SubId, params.Xid, params.Meid)
+ xapp.Logger.Info("SubDelReq received from Src: %s, Mtype: %v, SubId: %v, Xid: %s, Meid: %v", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid)
xapp.Rmr.Free(params.Mbuf)
params.Mbuf = nil
@@ -414,8 +424,8 @@
xapp.Logger.Error("SubDelReq: Not valid sequence number. Dropping this msg. SubId: %v, Xid: %s", params.SubId, params.Xid)
return
}
-
- xapp.Logger.Info("SubDelReq: Forwarding Request to E2T. Mtype: %v, SubId: %v, Xid: %s, Meid: %v",params.Mtype, params.SubId, params.Xid, params.Meid)
+
+ xapp.Logger.Info("SubDelReq: Forwarding Request to E2T. Mtype: %v, SubId: %v, Xid: %s, Meid: %v", params.Mtype, params.SubId, params.Xid, params.Meid)
c.rmrSend(params)
if err != nil {
xapp.Logger.Error("SubDelReq: Failed to send request to E2T. Err %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
@@ -437,13 +447,13 @@
}
func (c *Control) handleSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
- xapp.Logger.Info("SubDelResp received from Src: %s, Mtype: %v, SubId: %v, Meid: %v",params.Src, params.Mtype, params.SubId, params.Meid)
+ xapp.Logger.Info("SubDelResp received from Src: %s, Mtype: %v, SubId: %v, Meid: %v", params.Src, params.Mtype, params.SubId, params.Meid)
xapp.Rmr.Free(params.Mbuf)
params.Mbuf = nil
payloadSeqNum, err := c.e2ap.GetSubscriptionDeleteResponseSequenceNumber(params.Payload)
if err != nil {
- xapp.Logger.Error("SubDelResp: Unable to get Sequence Number from Payload. Dropping this msg. Err: %, SubId: %v", err, params.SubId)
+ xapp.Logger.Error("SubDelResp: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, SubId: %v", err, params.SubId)
return
}
xapp.Logger.Info("SubDelResp: Received payloadSeqNum: %v", payloadSeqNum)
@@ -452,29 +462,31 @@
var transaction Transaction
transaction, err = c.tracker.RetriveTransaction(payloadSeqNum, DELETE)
- if err != nil {
+ if err != nil {
xapp.Logger.Error("SubDelResp: Failed to retrive transaction record. Dropping this msg. Err: %v, SubId: %v", err, params.SubId)
return
}
xapp.Logger.Info("SubDelResp: SubId: %v, from address: %v:%v. Forwarding response to xApp", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort)
- params.SubId = int(payloadSeqNum)
- params.Xid = transaction.OrigParams.Xid
- xapp.Logger.Info("Forwarding SubDelResp to xApp: Mtype: %v, SubId: %v, Xid: %v, Meid: %v",params.Mtype, params.SubId, params.Xid, params.Meid)
+ params.SubId = int(payloadSeqNum)
+ params.Xid = transaction.OrigParams.Xid
+ xapp.Logger.Info("Forwarding SubDelResp to xApp: Mtype: %v, SubId: %v, Xid: %v, Meid: %v", params.Mtype, params.SubId, params.Xid, params.Meid)
err = c.rmrReplyToSender(params)
if err != nil {
xapp.Logger.Error("SubDelResp: Failed to send response to xApp. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
-// return
+ // return
}
time.Sleep(3 * time.Second)
- xapp.Logger.Info("SubDelResp: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid)
- subRouteAction := SubRouteInfo{DELETE, transaction.XappInstanceAddress, transaction.XappPort, payloadSeqNum}
- err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
- if err != nil {
- xapp.Logger.Error("SubDelResp: Failed to update routing manager. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
- return
+ if c.skipRouteUpdate == false {
+ xapp.Logger.Info("SubDelResp: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid)
+ subRouteAction := SubRouteInfo{DELETE, transaction.XappInstanceAddress, transaction.XappPort, payloadSeqNum}
+ err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
+ if err != nil {
+ xapp.Logger.Error("SubDelResp: Failed to update routing manager. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
+ return
+ }
}
xapp.Logger.Info("SubDelResp: Deleting transaction record. SubId: %v, Xid: %s", params.SubId, params.Xid)
@@ -492,13 +504,13 @@
}
func (c *Control) handleSubscriptionDeleteFailure(params *xapp.RMRParams) {
- xapp.Logger.Info("SubDelFail received from Src: %s, Mtype: %v, SubId: %v, Meid: %v",params.Src, params.Mtype, params.SubId, params.Meid)
+ xapp.Logger.Info("SubDelFail received from Src: %s, Mtype: %v, SubId: %v, Meid: %v", params.Src, params.Mtype, params.SubId, params.Meid)
xapp.Rmr.Free(params.Mbuf)
params.Mbuf = nil
payloadSeqNum, err := c.e2ap.GetSubscriptionDeleteFailureSequenceNumber(params.Payload)
if err != nil {
- xapp.Logger.Error("SubDelFail: Unable to get Sequence Number from Payload. Dropping this msg. Err: %, SubId: %v", err, params.SubId)
+ xapp.Logger.Error("SubDelFail: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, SubId: %v", err, params.SubId)
return
}
xapp.Logger.Info("SubDelFail: Received payloadSeqNum: %v", payloadSeqNum)
@@ -507,29 +519,31 @@
var transaction Transaction
transaction, err = c.tracker.RetriveTransaction(payloadSeqNum, DELETE)
- if err != nil {
+ if err != nil {
xapp.Logger.Error("SubDelFail: Failed to retrive transaction record. Dropping msg. Err %v, SubId: %v", err, params.SubId)
return
}
xapp.Logger.Info("SubDelFail: SubId: %v, from address: %v:%v. Forwarding response to xApp", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort)
- params.SubId = int(payloadSeqNum)
- params.Xid = transaction.OrigParams.Xid
- xapp.Logger.Info("Forwarding SubDelFail to xApp: Mtype: %v, SubId: %v, Xid: %v, Meid: %v",params.Mtype, params.SubId, params.Xid, params.Meid)
+ params.SubId = int(payloadSeqNum)
+ params.Xid = transaction.OrigParams.Xid
+ xapp.Logger.Info("Forwarding SubDelFail to xApp: Mtype: %v, SubId: %v, Xid: %v, Meid: %v", params.Mtype, params.SubId, params.Xid, params.Meid)
err = c.rmrReplyToSender(params)
if err != nil {
xapp.Logger.Error("Failed to send SubDelFail to xApp. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
-// return
+ // return
}
time.Sleep(3 * time.Second)
- xapp.Logger.Info("SubDelFail: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid)
- subRouteAction := SubRouteInfo{DELETE, transaction.XappInstanceAddress, transaction.XappPort, payloadSeqNum}
- c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
- if err != nil {
- xapp.Logger.Error("SubDelFail: Failed to update routing manager. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
- return
+ if c.skipRouteUpdate == false {
+ xapp.Logger.Info("SubDelFail: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid)
+ subRouteAction := SubRouteInfo{DELETE, transaction.XappInstanceAddress, transaction.XappPort, payloadSeqNum}
+ c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
+ if err != nil {
+ xapp.Logger.Error("SubDelFail: Failed to update routing manager. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
+ return
+ }
}
xapp.Logger.Info("SubDelFail: Deleting transaction record. SubId: %v, Xid: %s", params.SubId, params.Xid)
@@ -540,7 +554,7 @@
return
}
} else {
- xapp.Logger.Error("SubDelFail: Failed to release sequency number. SubId: %v, Xid: %s", err, params.SubId, params.Xid)
+ xapp.Logger.Error("SubDelFail: Failed to release sequency number. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
return
}
return
@@ -549,9 +563,9 @@
func (c *Control) handleSubscriptionDeleteRequestTimer(strId string, nbrId int) {
newSubId := uint16(nbrId)
xapp.Logger.Info("SubDelReq timer expired. newSubId: %v", newSubId)
-// var causeContent uint8 = 1 // just some random cause. To be checked later. Should be no respose or something
-// var causeVal uint8 = 1 // just some random val. To be checked later. Should be no respose or something
-// c.sendSubscriptionDeleteFailure(newSubId, causeContent, causeVal)
+ // var causeContent uint8 = 1 // just some random cause. To be checked later. Should be no respose or something
+ // var causeVal uint8 = 1 // just some random val. To be checked later. Should be no respose or something
+ // c.sendSubscriptionDeleteFailure(newSubId, causeContent, causeVal)
}
/*
@@ -568,7 +582,7 @@
params.SubId = int(subId)
params.Meid = transaction.OrigParams.Meid
params.Xid = transaction.OrigParams.Xid
-
+
// newPayload, packErr := c.e2ap.PackSubscriptionDeleteFailure(transaction.OrigParams.Payload, subId, causeContent, causeVal)
// if packErr != nil {
// xapp.Logger.Error("SendSubDelFail: PackSubscriptionDeleteFailure(). Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid))
@@ -585,9 +599,9 @@
if err != nil {
xapp.Logger.Error("SendSubDelFail: Failed to send response to xApp: Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
}
-
+
time.Sleep(3 * time.Second)
-
+
xapp.Logger.Info("SendSubDelFail: SubId: %v, from address: %v:%v. Deleting transaction record", int(subId), transaction.XappInstanceAddress, transaction.XappPort)
xapp.Logger.Info("SendSubDelFail: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid)
@@ -610,4 +624,4 @@
}
return
}
-*/
\ No newline at end of file
+*/
diff --git a/pkg/control/e2ap.go b/pkg/control/e2ap.go
index 8f9f7c9..209b13c 100644
--- a/pkg/control/e2ap.go
+++ b/pkg/control/e2ap.go
@@ -22,13 +22,13 @@
/*
#include <wrapper.h>
-#cgo LDFLAGS: -lwrapper
+#cgo LDFLAGS: -le2ap_wrapper -le2ap
*/
import "C"
import (
- "unsafe"
"fmt"
+ "unsafe"
)
type E2ap struct {
@@ -41,7 +41,7 @@
cptr := unsafe.Pointer(&payload[0])
cret := C.e2ap_get_ric_subscription_request_sequence_number(cptr, C.size_t(len(payload)))
if cret < 0 {
- return 0, fmt.Errorf("e2ap wrapper is unable to get Subscirption Request Sequence Number due to wrong or invalid payload. ErrorCode: %v",cret)
+ return 0, fmt.Errorf("e2ap wrapper is unable to get Subscirption Request Sequence Number due to wrong or invalid payload. ErrorCode: %v", cret)
}
subId = uint16(cret)
return
@@ -52,7 +52,7 @@
cptr := unsafe.Pointer(&payload[0])
size := C.e2ap_set_ric_subscription_request_sequence_number(cptr, C.size_t(len(payload)), C.long(newSubscriptionid))
if size < 0 {
- return fmt.Errorf("e2ap wrapper is unable to set Subscription Request Sequence Number due to wrong or invalid payload. ErrorCode: %v",size)
+ return fmt.Errorf("e2ap wrapper is unable to set Subscription Request Sequence Number due to wrong or invalid payload. ErrorCode: %v", size)
}
return
}
@@ -62,18 +62,18 @@
cptr := unsafe.Pointer(&payload[0])
cret := C.e2ap_get_ric_subscription_response_sequence_number(cptr, C.size_t(len(payload)))
if cret < 0 {
- return 0, fmt.Errorf("e2ap wrapper is unable to get Subscirption Response Sequence Number due to wrong or invalid payload. ErrorCode: %v",cret)
+ return 0, fmt.Errorf("e2ap wrapper is unable to get Subscirption Response Sequence Number due to wrong or invalid payload. ErrorCode: %v", cret)
}
subId = uint16(cret)
return
}
// Used by e2t test stub
-func (c *E2ap) SetSubscriptionResponseSequenceNumber(payload []byte, newSubscriptionid uint16) (err error) {
+func (c *E2ap) SetSubscriptionResponseSequenceNumber(payload []byte, newSubscriptionid uint16) (err error) {
cptr := unsafe.Pointer(&payload[0])
size := C.e2ap_set_ric_subscription_response_sequence_number(cptr, C.size_t(len(payload)), C.long(newSubscriptionid))
if size < 0 {
- return fmt.Errorf("e2ap wrapper is unable to set Subscription Response Sequence Number due to wrong or invalid payload. ErrorCode: %v",size)
+ return fmt.Errorf("e2ap wrapper is unable to set Subscription Response Sequence Number due to wrong or invalid payload. ErrorCode: %v", size)
}
return
}
@@ -85,18 +85,18 @@
cptr := unsafe.Pointer(&payload[0])
cret := C.e2ap_get_ric_subscription_delete_request_sequence_number(cptr, C.size_t(len(payload)))
if cret < 0 {
- return 0, fmt.Errorf("e2ap wrapper is unable to get Subscirption Delete Request Sequence Number due to wrong or invalid payload. ErrorCode: %v",cret)
+ return 0, fmt.Errorf("e2ap wrapper is unable to get Subscirption Delete Request Sequence Number due to wrong or invalid payload. ErrorCode: %v", cret)
}
subId = uint16(cret)
return
}
// Used by rco test stub
-func (c *E2ap) SetSubscriptionDeleteRequestSequenceNumber(payload []byte, newSubscriptionid uint16) (err error) {
+func (c *E2ap) SetSubscriptionDeleteRequestSequenceNumber(payload []byte, newSubscriptionid uint16) (err error) {
cptr := unsafe.Pointer(&payload[0])
size := C.e2ap_set_ric_subscription_delete_request_sequence_number(cptr, C.size_t(len(payload)), C.long(newSubscriptionid))
if size < 0 {
- return fmt.Errorf("e2ap wrapper is unable to set Subscription Delete Request Sequence Number due to wrong or invalid payload. ErrorCode: %v",size)
+ return fmt.Errorf("e2ap wrapper is unable to set Subscription Delete Request Sequence Number due to wrong or invalid payload. ErrorCode: %v", size)
}
return
}
@@ -108,18 +108,18 @@
cptr := unsafe.Pointer(&payload[0])
cret := C.e2ap_get_ric_subscription_delete_response_sequence_number(cptr, C.size_t(len(payload)))
if cret < 0 {
- return 0, fmt.Errorf("e2ap wrapper is unable to get Subscirption Delete Response Sequence Number due to wrong or invalid payload. ErrorCode: %v",cret)
+ return 0, fmt.Errorf("e2ap wrapper is unable to get Subscirption Delete Response Sequence Number due to wrong or invalid payload. ErrorCode: %v", cret)
}
subId = uint16(cret)
return
}
// Used by e2t test stub
-func (c *E2ap) SetSubscriptionDeleteResponseSequenceNumber(payload []byte, newSubscriptionid uint16) (err error) {
+func (c *E2ap) SetSubscriptionDeleteResponseSequenceNumber(payload []byte, newSubscriptionid uint16) (err error) {
cptr := unsafe.Pointer(&payload[0])
size := C.e2ap_set_ric_subscription_delete_response_sequence_number(cptr, C.size_t(len(payload)), C.long(newSubscriptionid))
if size < 0 {
- return fmt.Errorf("e2ap wrapper is unable to set Subscription Delete Response Sequence Number due to wrong or invalid payload. ErrorCode: %v",size)
+ return fmt.Errorf("e2ap wrapper is unable to set Subscription Delete Response Sequence Number due to wrong or invalid payload. ErrorCode: %v", size)
}
return
}
@@ -131,18 +131,18 @@
cptr := unsafe.Pointer(&payload[0])
cret := C.e2ap_get_ric_subscription_failure_sequence_number(cptr, C.size_t(len(payload)))
if cret < 0 {
- return 0, fmt.Errorf("e2ap wrapper is unable to get Subscirption Failure Sequence Number due to wrong or invalid payload. ErrorCode: %v",cret)
+ return 0, fmt.Errorf("e2ap wrapper is unable to get Subscirption Failure Sequence Number due to wrong or invalid payload. ErrorCode: %v", cret)
}
subId = uint16(cret)
return
}
// Used by submgr
-func (c *E2ap) SetSubscriptionFailureSequenceNumber(payload []byte, newSubscriptionid uint16) (err error) {
+func (c *E2ap) SetSubscriptionFailureSequenceNumber(payload []byte, newSubscriptionid uint16) (err error) {
cptr := unsafe.Pointer(&payload[0])
size := C.e2ap_set_ric_subscription_failure_sequence_number(cptr, C.size_t(len(payload)), C.long(newSubscriptionid))
if size < 0 {
- return fmt.Errorf("e2ap wrapper is unable to set Subscription Failure Sequence Number due to wrong or invalid payload. ErrorCode: %v",size)
+ return fmt.Errorf("e2ap wrapper is unable to set Subscription Failure Sequence Number due to wrong or invalid payload. ErrorCode: %v", size)
}
return
}
@@ -154,17 +154,18 @@
cptr := unsafe.Pointer(&payload[0])
cret := C.e2ap_get_ric_subscription_delete_failure_sequence_number(cptr, C.size_t(len(payload)))
if cret < 0 {
- return 0, fmt.Errorf("e2ap wrapper is unable to get Subscirption Delete Failure Sequence Number due to wrong or invalid payload. ErrorCode: %v",cret)
+ return 0, fmt.Errorf("e2ap wrapper is unable to get Subscirption Delete Failure Sequence Number due to wrong or invalid payload. ErrorCode: %v", cret)
}
subId = uint16(cret)
return
}
+
// Used by submgr
-func (c *E2ap) SetSubscriptionDeleteFailureSequenceNumber(payload []byte, newSubscriptionid uint16) (err error) {
+func (c *E2ap) SetSubscriptionDeleteFailureSequenceNumber(payload []byte, newSubscriptionid uint16) (err error) {
cptr := unsafe.Pointer(&payload[0])
size := C.e2ap_set_ric_subscription_delete_failure_sequence_number(cptr, C.size_t(len(payload)), C.long(newSubscriptionid))
if size < 0 {
- return fmt.Errorf("e2ap wrapper is unable to set Subscription Delete Failure Sequence Number due to wrong or invalid payload. ErrorCode: %v",size)
+ return fmt.Errorf("e2ap wrapper is unable to set Subscription Delete Failure Sequence Number due to wrong or invalid payload. ErrorCode: %v", size)
}
return
}
diff --git a/pkg/control/main_test.go b/pkg/control/main_test.go
new file mode 100644
index 0000000..3f1284c
--- /dev/null
+++ b/pkg/control/main_test.go
@@ -0,0 +1,237 @@
+/*
+==================================================================================
+ Copyright (c) 2019 AT&T Intellectual Property.
+ Copyright (c) 2019 Nokia
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+==================================================================================
+*/
+
+package control
+
+import (
+ "errors"
+ "fmt"
+ "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
+ "io/ioutil"
+ "os"
+ "testing"
+ "time"
+)
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+type testingControl struct {
+ desc string
+ syncChan chan struct{}
+}
+
+func (tc *testingControl) ReadyCB(data interface{}) {
+ xapp.Logger.Info("testingControl(%s) ReadyCB", tc.desc)
+ tc.syncChan <- struct{}{}
+ return
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+type testingRmrControl struct {
+ testingControl
+ rmrClientTest *xapp.RMRClient
+ rmrConChan chan *xapp.RMRParams
+}
+
+func (tc *testingRmrControl) Consume(msg *xapp.RMRParams) (err error) {
+ xapp.Logger.Info("testingRmrControl(%s) Consume", tc.desc)
+ tc.rmrConChan <- msg
+ return
+}
+
+func (tc *testingRmrControl) RmrSend(params *xapp.RMRParams) (err error) {
+ //
+ //NOTE: Do this way until xapp-frame sending is improved
+ //
+ status := false
+ i := 1
+ for ; i <= 10 && status == false; i++ {
+ status = tc.rmrClientTest.SendMsg(params)
+ if status == false {
+ xapp.Logger.Info("rmr.Send() failed. Retry count %v, Mtype: %v, SubId: %v, Xid %s", i, params.Mtype, params.SubId, params.Xid)
+ time.Sleep(500 * time.Millisecond)
+ }
+ }
+ if status == false {
+ err = errors.New("rmr.Send() failed")
+ tc.rmrClientTest.Free(params.Mbuf)
+ }
+ return
+}
+
+func createNewRmrControl(desc string, rtfile string, port string, stat string) *testingRmrControl {
+ os.Setenv("RMR_SEED_RT", rtfile)
+ os.Setenv("RMR_SRC_ID", "localhost:"+port)
+ xapp.Logger.Info("Using rt file %s", os.Getenv("RMR_SEED_RT"))
+ xapp.Logger.Info("Using src id %s", os.Getenv("RMR_SRC_ID"))
+ newConn := &testingRmrControl{}
+ newConn.desc = desc
+ newConn.syncChan = make(chan struct{})
+ newConn.rmrClientTest = xapp.NewRMRClientWithParams("tcp:"+port, 4096, 1, stat)
+ newConn.rmrConChan = make(chan *xapp.RMRParams)
+ newConn.rmrClientTest.SetReadyCB(newConn.ReadyCB, nil)
+ go newConn.rmrClientTest.Start(newConn)
+ <-newConn.syncChan
+ return newConn
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+
+func testError(t *testing.T, pattern string, args ...interface{}) {
+ xapp.Logger.Error(fmt.Sprintf(pattern, args...))
+ t.Errorf(fmt.Sprintf(pattern, args...))
+}
+
+func testCreateTmpFile(str string) (string, error) {
+ file, err := ioutil.TempFile("/tmp", "*.rt")
+ if err != nil {
+ return "", err
+ }
+ _, err = file.WriteString(str)
+ if err != nil {
+ file.Close()
+ return "", err
+ }
+ return file.Name(), nil
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+
+var xappConn *testingRmrControl
+var e2termConn *testingRmrControl
+
+func TestMain(m *testing.M) {
+ xapp.Logger.Info("TestMain start")
+
+ //
+ //Cfg creation won't work like this as xapp-frame reads it during init.
+ //
+ /*
+ cfgstr:=`{
+ "local": {
+ "host": ":8080"
+ },
+ "logger": {
+ "level": 4
+ },
+ "rmr": {
+ "protPort": "tcp:14560",
+ "maxSize": 4096,
+ "numWorkers": 1,
+ "txMessages": ["RIC_SUB_REQ", "RIC_SUB_DEL_REQ"],
+ "rxMessages": ["RIC_SUB_RESP", "RIC_SUB_FAILURE", "RIC_SUB_DEL_RESP", "RIC_SUB_DEL_FAILURE", "RIC_INDICATION"]
+ },
+ "db": {
+ "host": "localhost",
+ "port": 6379,
+ "namespaces": ["sdl", "rnib"]
+ }
+ }`
+
+ cfgfilename,_ := testCreateTmpFile(cfgstr)
+ defer os.Remove(cfgfilename)
+ os.Setenv("CFG_FILE", cfgfilename)
+ */
+ xapp.Logger.Info("Using cfg file %s", os.Getenv("CFG_FILE"))
+
+ //---------------------------------
+ //
+ //---------------------------------
+ xapp.Logger.Info("### submgr main run ###")
+
+ subsrt := `newrt|start
+mse|12010|-1|localhost:14560
+mse|12010,localhost:14560|-1|localhost:15560
+mse|12011,localhost:15560|-1|localhost:14560
+mse|12011|-1|localhost:13560
+mse|12012,localhost:15560|-1|localhost:14560
+mse|12012|-1|localhost:13560
+mse|12020|-1|localhost:14560
+mse|12020,localhost:14560|-1|localhost:15560
+mse|12021,localhost:15560|-1|localhost:14560
+mse|12021|-1|localhost:13560
+mse|12022,localhost:15560|-1|localhost:14560
+mse|12022|-1|localhost:13560
+newrt|end
+`
+
+ subrtfilename, _ := testCreateTmpFile(subsrt)
+ defer os.Remove(subrtfilename)
+ os.Setenv("RMR_SEED_RT", subrtfilename)
+ xapp.Logger.Info("Using rt file %s", os.Getenv("RMR_SEED_RT"))
+
+ mainCtrl := &testingControl{}
+ mainCtrl.desc = "main"
+ mainCtrl.syncChan = make(chan struct{})
+
+ os.Setenv("RMR_SRC_ID", "localhost:14560")
+ c := NewControl()
+ c.skipRouteUpdate = true
+ xapp.SetReadyCB(mainCtrl.ReadyCB, nil)
+ go xapp.RunWithParams(c, false)
+ <-mainCtrl.syncChan
+
+ //---------------------------------
+ //
+ //---------------------------------
+ xapp.Logger.Info("### xapp rmr run ###")
+
+ xapprt := `newrt|start
+mse|12010|-1|localhost:14560
+mse|12011|-1|localhost:13560
+mse|12012|-1|localhost:13560
+mse|12020|-1|localhost:14560
+mse|12021|-1|localhost:13560
+mse|12022|-1|localhost:13560
+newrt|end
+`
+
+ xapprtfilename, _ := testCreateTmpFile(xapprt)
+ defer os.Remove(xapprtfilename)
+ xappConn = createNewRmrControl("xappConn", xapprtfilename, "13560", "RMRXAPPSTUB")
+
+ //---------------------------------
+ //
+ //---------------------------------
+ xapp.Logger.Info("### e2term rmr run ###")
+
+ e2termrt := `newrt|start
+mse|12010|-1|localhost:15560
+mse|12011|-1|localhost:14560
+mse|12012|-1|localhost:14560
+mse|12020|-1|localhost:15560
+mse|12021|-1|localhost:14560
+mse|12022|-1|localhost:14560
+newrt|end
+`
+
+ e2termrtfilename, _ := testCreateTmpFile(e2termrt)
+ defer os.Remove(e2termrtfilename)
+ e2termConn = createNewRmrControl("e2termConn", e2termrtfilename, "15560", "RMRE2TERMSTUB")
+
+ code := m.Run()
+ os.Exit(code)
+}
diff --git a/pkg/control/messaging_test.go b/pkg/control/messaging_test.go
new file mode 100644
index 0000000..e5d426b
--- /dev/null
+++ b/pkg/control/messaging_test.go
@@ -0,0 +1,222 @@
+/*
+==================================================================================
+ Copyright (c) 2019 AT&T Intellectual Property.
+ Copyright (c) 2019 Nokia
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+==================================================================================
+*/
+
+package control
+
+import (
+ "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
+ "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap_wrapper"
+ "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/packer"
+ "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
+ "testing"
+ "time"
+)
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+
+var e2asnpacker e2ap.E2APPackerIf = e2ap_wrapper.NewAsn1E2Packer()
+
+func createSubsReq() *e2ap.E2APSubscriptionRequest {
+ req := &e2ap.E2APSubscriptionRequest{}
+
+ req.RequestId.Id = 1
+ req.RequestId.Seq = 22
+ req.FunctionId = 1
+
+ req.EventTriggerDefinition.InterfaceId.GlobalEnbId.Present = true
+ req.EventTriggerDefinition.InterfaceId.GlobalEnbId.PlmnIdentity.StringPut("310150")
+ req.EventTriggerDefinition.InterfaceId.GlobalEnbId.NodeId.Id = 123
+ req.EventTriggerDefinition.InterfaceId.GlobalEnbId.NodeId.Bits = e2ap.E2AP_ENBIDHomeBits28
+
+ // gnb -> enb outgoing
+ // enb -> gnb incoming
+ // X2 36423-f40.doc
+ req.EventTriggerDefinition.InterfaceDirection = e2ap.E2AP_InterfaceDirectionIncoming
+ req.EventTriggerDefinition.ProcedureCode = 5 //28 35
+ req.EventTriggerDefinition.TypeOfMessage = e2ap.E2AP_InitiatingMessage
+
+ req.ActionSetups = make([]e2ap.ActionToBeSetupItem, 1)
+ req.ActionSetups[0].ActionId = 0
+ req.ActionSetups[0].ActionType = e2ap.E2AP_ActionTypeReport
+ req.ActionSetups[0].ActionDefinition.Present = false
+ //req.ActionSetups[index].ActionDefinition.StyleId = 255
+ //req.ActionSetups[index].ActionDefinition.ParamId = 222
+ req.ActionSetups[0].SubsequentAction.Present = true
+ req.ActionSetups[0].SubsequentAction.Type = e2ap.E2AP_SubSeqActionTypeContinue
+ req.ActionSetups[0].SubsequentAction.TimetoWait = e2ap.E2AP_TimeToWaitZero
+
+ return req
+}
+
+func createSubsResp(req *e2ap.E2APSubscriptionRequest) *e2ap.E2APSubscriptionResponse {
+
+ resp := &e2ap.E2APSubscriptionResponse{}
+
+ resp.RequestId.Id = req.RequestId.Id
+ resp.RequestId.Seq = req.RequestId.Seq
+ resp.FunctionId = req.FunctionId
+
+ resp.ActionAdmittedList.Items = make([]e2ap.ActionAdmittedItem, len(req.ActionSetups))
+ for index := int(0); index < len(req.ActionSetups); index++ {
+ resp.ActionAdmittedList.Items[index].ActionId = req.ActionSetups[index].ActionId
+ }
+
+ for index := uint64(0); index < 1; index++ {
+ item := e2ap.ActionNotAdmittedItem{}
+ item.ActionId = index
+ item.Cause.Content = 1
+ item.Cause.CauseVal = 1
+ resp.ActionNotAdmittedList.Items = append(resp.ActionNotAdmittedList.Items, item)
+ }
+
+ return resp
+}
+
+//-----------------------------------------------------------------------------
+// TestSubRequestSubResponseOk
+//
+// +-------+ +---------+ +---------+
+// | xapp | | submgr | | e2term |
+// +-------+ +---------+ +---------+
+// | | |
+// | SubReq | |
+// |------------->| |
+// | | |
+// | | SubReq |
+// | |------------->|
+// | | |
+// | | SubResp |
+// | |<-------------|
+// | | |
+// | SubResp | |
+// |<-------------| |
+// | | |
+//
+//-----------------------------------------------------------------------------
+func TestSubRequestSubResponseOk(t *testing.T) {
+
+ xapp.Logger.Info("TestSimple start")
+ e2SubsReq := e2asnpacker.NewPackerSubscriptionRequest()
+ e2SubsResp := e2asnpacker.NewPackerSubscriptionResponse()
+
+ //---------------------------------
+ // xapp activity
+ //---------------------------------
+ select {
+ case <-time.After(5 * time.Second):
+ req := createSubsReq()
+ e2SubsReq.Set(req)
+ xapp.Logger.Debug("%s", e2SubsReq.String())
+ err, packedMsg := e2SubsReq.Pack(nil)
+ if err != nil {
+ testError(t, "(xappConn) pack NOK %s", err.Error())
+ } else {
+ xapp.Logger.Info("(xappConn) pack OK")
+ }
+
+ params := &xapp.RMRParams{}
+ params.Mtype = xapp.RIC_SUB_REQ
+ params.SubId = -1
+ params.Payload = packedMsg.Buf
+ params.Meid = &xapp.RMRMeid{RanName: "RAN_NAME_1"}
+ params.Xid = "XID_1"
+ params.Mbuf = nil
+
+ snderr := xappConn.RmrSend(params)
+ if snderr != nil {
+ testError(t, "(xappConn) RMR SEND FAILED: %s", snderr.Error())
+ }
+ }
+
+ //---------------------------------
+ // e2term activity
+ //---------------------------------
+ select {
+ case msg := <-e2termConn.rmrConChan:
+ if msg.Mtype != xapp.RICMessageTypes["RIC_SUB_REQ"] {
+ testError(t, "(e2termConn) Received non RIC_SUB_REQ message")
+ } else {
+
+ packedData := &packer.PackedData{}
+ packedData.Buf = msg.Payload
+ unpackerr := e2SubsReq.UnPack(packedData)
+ if unpackerr != nil {
+ testError(t, "(e2termConn) RIC_SUB_REQ unpack failed err: %s", unpackerr.Error())
+ }
+ geterr, req := e2SubsReq.Get()
+ if geterr != nil {
+ testError(t, "(e2termConn) RIC_SUB_REQ get failed err: %s", geterr.Error())
+ }
+
+ resp := createSubsResp(req)
+ e2SubsResp.Set(resp)
+ xapp.Logger.Debug("%s", e2SubsResp.String())
+ packerr, packedMsg := e2SubsResp.Pack(nil)
+ if packerr != nil {
+ testError(t, "(e2termConn) pack NOK %s", packerr.Error())
+ } else {
+ xapp.Logger.Info("(e2termConn) pack OK")
+ }
+
+ params := &xapp.RMRParams{}
+ params.Mtype = xapp.RIC_SUB_RESP
+ params.SubId = msg.SubId
+ params.Payload = packedMsg.Buf
+ params.Meid = msg.Meid
+ params.Xid = msg.Xid
+ params.Mbuf = nil
+
+ snderr := e2termConn.RmrSend(params)
+ if snderr != nil {
+ testError(t, "(e2termConn) RMR SEND FAILED: %s", snderr.Error())
+ }
+
+ }
+ case <-time.After(15 * time.Second):
+ testError(t, "(e2termConn) Not Received RIC_SUB_REQ within 15 secs")
+ }
+
+ //---------------------------------
+ // xapp activity
+ //---------------------------------
+ select {
+ case msg := <-xappConn.rmrConChan:
+ if msg.Mtype != xapp.RICMessageTypes["RIC_SUB_RESP"] {
+ testError(t, "(xappConn) Received non RIC_SUB_RESP message")
+ } else {
+
+ packedData := &packer.PackedData{}
+ packedData.Buf = msg.Payload
+ unpackerr := e2SubsResp.UnPack(packedData)
+ if unpackerr != nil {
+ testError(t, "(xappConn) RIC_SUB_RESP unpack failed err: %s", unpackerr.Error())
+ }
+ geterr, _ := e2SubsResp.Get()
+ if geterr != nil {
+ testError(t, "(xappConn) RIC_SUB_RESP get failed err: %s", geterr.Error())
+ }
+
+ }
+ case <-time.After(15 * time.Second):
+ testError(t, "(xappConn) Not Received RIC_SUB_RESP within 15 secs")
+ }
+
+}
diff --git a/pkg/control/registry.go b/pkg/control/registry.go
index 03e90ae..4f5c4fd 100644
--- a/pkg/control/registry.go
+++ b/pkg/control/registry.go
@@ -24,11 +24,10 @@
"sync"
)
-
type Registry struct {
register map[uint16]bool
counter uint16
- mutex sync.Mutex
+ mutex sync.Mutex
}
// This method should run as a constructor
@@ -44,7 +43,7 @@
defer r.mutex.Unlock()
sequenceNumber := r.counter
if _, ok := r.register[sequenceNumber]; ok {
- xapp.Logger.Error("Invalid SeqenceNumber sequenceNumber: %v",sequenceNumber)
+ xapp.Logger.Error("Invalid SeqenceNumber sequenceNumber: %v", sequenceNumber)
return sequenceNumber, false
}
r.register[sequenceNumber] = false
diff --git a/pkg/control/timer.go b/pkg/control/timer.go
index 5d48464..c8385a6 100644
--- a/pkg/control/timer.go
+++ b/pkg/control/timer.go
@@ -54,7 +54,7 @@
subId := 123
timerMap.StartTimer("RIC_SUB_REQ", int(subId), subReqTime, handleSubscriptionRequestTimer)
timerMap.StopTimer("RIC_SUB_REQ", int(subId))
-
+
2)
subReqTime := 2 * time.Second
strId := "1UHSUwNqxiVgUWXvC4zFaatpZFF"
@@ -82,29 +82,29 @@
)
type TimerKey struct {
- strId string
- nbrId int
+ strId string
+ nbrId int
}
type TimerInfo struct {
- timerAddress *time.Timer
+ timerAddress *time.Timer
timerFunctionAddress func()
}
type TimerMap struct {
- timer map[TimerKey] TimerInfo
+ timer map[TimerKey]TimerInfo
mutex sync.Mutex
}
// This method should run as a constructor
func (t *TimerMap) Init() {
- t.timer = make(map[TimerKey] TimerInfo)
+ t.timer = make(map[TimerKey]TimerInfo)
}
func (t *TimerMap) StartTimer(strId string, nbrId int, expireAfterTime time.Duration, timerFunction func(srtId string, nbrId int)) bool {
t.mutex.Lock()
defer t.mutex.Unlock()
- if (timerFunction == nil) {
+ if timerFunction == nil {
xapp.Logger.Error("StartTimer() timerFunc == nil\n")
return false
}
@@ -120,8 +120,8 @@
}
// Store in timerMap in-build Go "timer", timer function executor, and the function to be executed when the timer expires
- t.timer[timerKey] = TimerInfo{timerAddress: time.AfterFunc(expireAfterTime, func(){t.timerFunctionExecutor(strId,nbrId)}),
- timerFunctionAddress: func(){timerFunction(strId,nbrId)}}
+ t.timer[timerKey] = TimerInfo{timerAddress: time.AfterFunc(expireAfterTime, func() { t.timerFunctionExecutor(strId, nbrId) }),
+ timerFunctionAddress: func() { timerFunction(strId, nbrId) }}
return true
}
diff --git a/pkg/control/tracker.go b/pkg/control/tracker.go
index 9b984a3..1682ae7 100644
--- a/pkg/control/tracker.go
+++ b/pkg/control/tracker.go
@@ -29,7 +29,7 @@
*/
type Tracker struct {
transactionTable map[TransactionKey]Transaction
- mutex sync.Mutex
+ mutex sync.Mutex
}
func (t *Tracker) Init() {