Some cleaning and bug fixes
Change-Id: Ic7660b9ec386b152262ac8502cc1ebadca8c56b6
Signed-off-by: Juha Hyttinen <juha.hyttinen@nokia.com>
diff --git a/pkg/control/client.go b/pkg/control/client.go
index 73e2ee0..fdafcb6 100644
--- a/pkg/control/client.go
+++ b/pkg/control/client.go
@@ -34,13 +34,12 @@
//
//-----------------------------------------------------------------------------
type SubRouteInfo struct {
- Command Action
- EpList RmrEndpointList
- SubID uint16
+ EpList RmrEndpointList
+ SubID uint16
}
func (sri *SubRouteInfo) String() string {
- return "routeinfo(" + strconv.FormatUint(uint64(sri.SubID), 10) + "/" + sri.Command.String() + "/[" + sri.EpList.String() + "])"
+ return "routeinfo(" + strconv.FormatUint(uint64(sri.SubID), 10) + "/[" + sri.EpList.String() + "])"
}
//-----------------------------------------------------------------------------
@@ -50,39 +49,48 @@
rtClient *rtmgrclient.RoutingManager
}
-func (rc *RtmgrClient) SubscriptionRequestUpdate(subRouteAction SubRouteInfo) error {
+func (rc *RtmgrClient) SubscriptionRequestCreate(subRouteAction SubRouteInfo) error {
subID := int32(subRouteAction.SubID)
- xapp.Logger.Debug("%s ongoing", subRouteAction.String())
- var err error
- switch subRouteAction.Command {
- case CREATE:
- createData := rtmgr_models.XappSubscriptionData{&subRouteAction.EpList.Endpoints[0].Addr, &subRouteAction.EpList.Endpoints[0].Port, &subID}
- createHandle := rtmgrhandle.NewProvideXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
- createHandle.WithXappSubscriptionData(&createData)
- _, err = rc.rtClient.Handle.ProvideXappSubscriptionHandle(createHandle)
- case DELETE:
- deleteData := rtmgr_models.XappSubscriptionData{&subRouteAction.EpList.Endpoints[0].Addr, &subRouteAction.EpList.Endpoints[0].Port, &subID}
- deleteHandle := rtmgrhandle.NewDeleteXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
- deleteHandle.WithXappSubscriptionData(&deleteData)
- _, _, err = rc.rtClient.Handle.DeleteXappSubscriptionHandle(deleteHandle)
- case UPDATE:
- var updateData rtmgr_models.XappList
- for i := range subRouteAction.EpList.Endpoints {
- updateData = append(updateData, &rtmgr_models.XappElement{Address: &subRouteAction.EpList.Endpoints[i].Addr, Port: &subRouteAction.EpList.Endpoints[i].Port})
- }
- updateHandle := rtmgrhandle.NewUpdateXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
- updateHandle.WithSubscriptionID(subRouteAction.SubID)
- updateHandle.WithXappList(updateData)
- _, err = rc.rtClient.Handle.UpdateXappSubscriptionHandle(updateHandle)
-
- default:
- return fmt.Errorf("%s unknown", subRouteAction.String())
- }
-
+ xapp.Logger.Debug("CREATE %s ongoing", subRouteAction.String())
+ createData := rtmgr_models.XappSubscriptionData{&subRouteAction.EpList.Endpoints[0].Addr, &subRouteAction.EpList.Endpoints[0].Port, &subID}
+ createHandle := rtmgrhandle.NewProvideXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
+ createHandle.WithXappSubscriptionData(&createData)
+ _, err := rc.rtClient.Handle.ProvideXappSubscriptionHandle(createHandle)
if err != nil && !(strings.Contains(err.Error(), "status 200")) {
- return fmt.Errorf("%s failed with error: %s", subRouteAction.String(), err.Error())
+ return fmt.Errorf("CREATE %s failed with error: %s", subRouteAction.String(), err.Error())
}
- xapp.Logger.Debug("%s successful", subRouteAction.String())
+ xapp.Logger.Debug("CREATE %s successful", subRouteAction.String())
+ return nil
+}
+
+func (rc *RtmgrClient) SubscriptionRequestUpdate(subRouteAction SubRouteInfo) error {
+ xapp.Logger.Debug("UPDATE %s ongoing", subRouteAction.String())
+ var updateData rtmgr_models.XappList
+ for i := range subRouteAction.EpList.Endpoints {
+ updateData = append(updateData, &rtmgr_models.XappElement{Address: &subRouteAction.EpList.Endpoints[i].Addr, Port: &subRouteAction.EpList.Endpoints[i].Port})
+ }
+ updateHandle := rtmgrhandle.NewUpdateXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
+ updateHandle.WithSubscriptionID(subRouteAction.SubID)
+ updateHandle.WithXappList(updateData)
+ _, err := rc.rtClient.Handle.UpdateXappSubscriptionHandle(updateHandle)
+ if err != nil && !(strings.Contains(err.Error(), "status 200")) {
+ return fmt.Errorf("UPDATE %s failed with error: %s", subRouteAction.String(), err.Error())
+ }
+ xapp.Logger.Debug("UPDATE %s successful", subRouteAction.String())
return nil
}
+
+func (rc *RtmgrClient) SubscriptionRequestDelete(subRouteAction SubRouteInfo) error {
+ subID := int32(subRouteAction.SubID)
+ xapp.Logger.Debug("DELETE %s ongoing", subRouteAction.String())
+ deleteData := rtmgr_models.XappSubscriptionData{&subRouteAction.EpList.Endpoints[0].Addr, &subRouteAction.EpList.Endpoints[0].Port, &subID}
+ deleteHandle := rtmgrhandle.NewDeleteXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
+ deleteHandle.WithXappSubscriptionData(&deleteData)
+ _, _, err := rc.rtClient.Handle.DeleteXappSubscriptionHandle(deleteHandle)
+ if err != nil && !(strings.Contains(err.Error(), "status 200")) {
+ return fmt.Errorf("DELETE %s failed with error: %s", subRouteAction.String(), err.Error())
+ }
+ xapp.Logger.Debug("DELETE %s successful", subRouteAction.String())
+ return nil
+}
diff --git a/pkg/control/control.go b/pkg/control/control.go
index dee7e65..1d64f3c 100755
--- a/pkg/control/control.go
+++ b/pkg/control/control.go
@@ -57,13 +57,6 @@
RanName string
}
-const (
- CREATE Action = 0
- UPDATE Action = 1
- NONE Action = 2
- DELETE Action = 3
-)
-
func init() {
xapp.Logger.Info("SUBMGR")
viper.AutomaticEnv()
@@ -120,10 +113,10 @@
return
}
-func (c *Control) rmrSend(desc string, subs *Subscription, trans *Transaction) (err error) {
+func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
params := &RMRParams{&xapp.RMRParams{}}
params.Mtype = trans.GetMtype()
- params.SubId = int(subs.GetSubId())
+ params.SubId = int(subs.GetReqId().Seq)
params.Xid = ""
params.Meid = subs.GetMeid()
params.Src = ""
@@ -131,13 +124,14 @@
params.Payload = trans.Payload.Buf
params.Mbuf = nil
- return c.rmrSendRaw(desc, params)
+ return c.rmrSendRaw("MSG to E2T:"+desc+":"+trans.String(), params)
}
-func (c *Control) rmrReplyToSender(desc string, subs *Subscription, trans *Transaction) (err error) {
+func (c *Control) rmrSendToXapp(desc string, subs *Subscription, trans *TransactionXapp) (err error) {
+
params := &RMRParams{&xapp.RMRParams{}}
params.Mtype = trans.GetMtype()
- params.SubId = int(subs.GetSubId())
+ params.SubId = int(subs.GetReqId().Seq)
params.Xid = trans.GetXid()
params.Meid = trans.GetMeid()
params.Src = ""
@@ -145,7 +139,7 @@
params.Payload = trans.Payload.Buf
params.Mbuf = nil
- return c.rmrSendRaw(desc, params)
+ return c.rmrSendRaw("MSG to XAPP:"+desc+":"+trans.String(), params)
}
func (c *Control) Consume(params *xapp.RMRParams) (err error) {
@@ -153,6 +147,7 @@
params.Mbuf = nil
msg := &RMRParams{params}
c.msgCounter++
+
switch msg.Mtype {
case xapp.RICMessageTypes["RIC_SUB_REQ"]:
go c.handleXAPPSubscriptionRequest(msg)
@@ -172,15 +167,12 @@
return nil
}
-func idstring(trans fmt.Stringer, subs fmt.Stringer, err error) string {
+
+func idstring(err error, entries ...fmt.Stringer) string {
var retval string = ""
var filler string = ""
- if trans != nil {
- retval += filler + trans.String()
- filler = " "
- }
- if subs != nil {
- retval += filler + subs.String()
+ for _, entry := range entries {
+ retval += filler + entry.String()
filler = " "
}
if err != nil {
@@ -195,24 +187,30 @@
// handle from XAPP Subscription Request
//------------------------------------------------------------------
func (c *Control) handleXAPPSubscriptionRequest(params *RMRParams) {
- xapp.Logger.Info("XAPP-SubReq from xapp: %s", params.String())
+ xapp.Logger.Info("MSG from XAPP: %s", params.String())
subReqMsg, err := c.e2ap.UnpackSubscriptionRequest(params.Payload)
if err != nil {
- xapp.Logger.Error("XAPP-SubReq: %s", idstring(params, nil, err))
+ xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, params))
return
}
- trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src), params.Xid, params.Meid)
- if err != nil {
- xapp.Logger.Error("XAPP-SubReq: %s", idstring(params, nil, err))
+ trans := c.tracker.NewXappTransaction(NewRmrEndpoint(params.Src), params.Xid, &RequestId{subReqMsg.RequestId}, params.Meid)
+ if trans == nil {
+ xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
return
}
defer trans.Release()
+ err = c.tracker.Track(trans)
+ if err != nil {
+ xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
+ return
+ }
+
subs, err := c.registry.AssignToSubscription(trans, subReqMsg)
if err != nil {
- xapp.Logger.Error("XAPP-SubReq: %s", idstring(trans, nil, err))
+ xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
return
}
@@ -228,44 +226,50 @@
case *e2ap.E2APSubscriptionResponse:
trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionResponse(themsg)
if err == nil {
- c.rmrReplyToSender("XAPP-SubReq: SubResp to xapp", subs, trans)
+ c.rmrSendToXapp("", subs, trans)
return
}
case *e2ap.E2APSubscriptionFailure:
trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionFailure(themsg)
if err == nil {
- c.rmrReplyToSender("XAPP-SubReq: SubFail to xapp", subs, trans)
+ c.rmrSendToXapp("", subs, trans)
}
- return
default:
break
}
}
- xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(trans, subs, err))
+ xapp.Logger.Info("XAPP-SubReq: failed %s", idstring(err, trans, subs))
+ go c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
}
//-------------------------------------------------------------------
// handle from XAPP Subscription Delete Request
//------------------------------------------------------------------
func (c *Control) handleXAPPSubscriptionDeleteRequest(params *RMRParams) {
- xapp.Logger.Info("XAPP-SubDelReq from xapp: %s", params.String())
+ xapp.Logger.Info("MSG from XAPP: %s", params.String())
subDelReqMsg, err := c.e2ap.UnpackSubscriptionDeleteRequest(params.Payload)
if err != nil {
- xapp.Logger.Error("XAPP-SubDelReq %s", idstring(params, nil, err))
+ xapp.Logger.Error("XAPP-SubDelReq %s", idstring(err, params))
return
}
- trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src), params.Xid, params.Meid)
- if err != nil {
- xapp.Logger.Error("XAPP-SubDelReq %s", idstring(params, nil, err))
+ trans := c.tracker.NewXappTransaction(NewRmrEndpoint(params.Src), params.Xid, &RequestId{subDelReqMsg.RequestId}, params.Meid)
+ if trans == nil {
+ xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(fmt.Errorf("transaction not created"), params))
return
}
defer trans.Release()
- subs, err := c.registry.GetSubscriptionFirstMatch([]uint16{uint16(subDelReqMsg.RequestId.Seq), uint16(params.SubId)})
+ err = c.tracker.Track(trans)
if err != nil {
- xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(trans, nil, err))
+ xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
+ return
+ }
+
+ subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelReqMsg.RequestId.Seq})
+ if err != nil {
+ xapp.Logger.Error("XAPP-SubDelReq: %s", idstring(err, trans))
return
}
@@ -275,89 +279,65 @@
go c.handleSubscriptionDelete(subs, trans)
trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
+ xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
+
// Whatever is received send ok delete response
subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
- subDelRespMsg.RequestId.Id = subs.SubReqMsg.RequestId.Id
- subDelRespMsg.RequestId.Seq = uint32(subs.GetSubId())
+ subDelRespMsg.RequestId = subs.SubReqMsg.RequestId
subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteResponse(subDelRespMsg)
if err == nil {
- c.rmrReplyToSender("XAPP-SubDelReq: SubDelResp to xapp", subs, trans)
+ c.rmrSendToXapp("", subs, trans)
}
+
+ go c.registry.RemoveFromSubscription(subs, trans, 5*time.Second)
}
//-------------------------------------------------------------------
// SUBS CREATE Handling
//-------------------------------------------------------------------
-func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *Transaction) {
+func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
- trans := c.tracker.NewTransaction(subs.GetMeid())
+ trans := c.tracker.NewSubsTransaction(subs)
subs.WaitTransactionTurn(trans)
defer subs.ReleaseTransactionTurn(trans)
defer trans.Release()
- xapp.Logger.Debug("SUBS-SubReq: Handling %s parent %s", idstring(trans, subs, nil), parentTrans.String())
+ xapp.Logger.Debug("SUBS-SubReq: Handling %s ", idstring(nil, trans, subs, parentTrans))
- subs.mutex.Lock()
- if subs.SubRespMsg != nil {
- xapp.Logger.Debug("SUBS-SubReq: Handling (immediate resp response) %s parent %s", idstring(nil, subs, nil), parentTrans.String())
- parentTrans.SendEvent(subs.SubRespMsg, 0)
- subs.mutex.Unlock()
- return
- }
- if subs.SubFailMsg != nil {
- xapp.Logger.Debug("SUBS-SubReq: Handling (immediate fail response) %s parent %s", idstring(nil, subs, nil), parentTrans.String())
- parentTrans.SendEvent(subs.SubFailMsg, 0)
- subs.mutex.Unlock()
- go c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second)
- return
- }
- if subs.valid == false {
- xapp.Logger.Debug("SUBS-SubReq: Handling (immediate nil response) %s parent %s", idstring(nil, subs, nil), parentTrans.String())
- parentTrans.SendEvent(nil, 0)
- subs.mutex.Unlock()
- go c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second)
- return
- }
- subs.mutex.Unlock()
-
- event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
- switch themsg := event.(type) {
- case *e2ap.E2APSubscriptionResponse:
- subs.mutex.Lock()
- subs.SubRespMsg = themsg
- subs.mutex.Unlock()
- parentTrans.SendEvent(event, 0)
- return
- case *e2ap.E2APSubscriptionFailure:
- subs.mutex.Lock()
- subs.SubFailMsg = themsg
- subs.mutex.Unlock()
- parentTrans.SendEvent(event, 0)
- default:
- xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(trans, subs, nil))
- subs.mutex.Lock()
- subs.valid = false
- subs.mutex.Unlock()
- c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
- parentTrans.SendEvent(nil, 0)
+ subRfMsg, valid := subs.GetCachedResponse()
+ if subRfMsg == nil && valid == true {
+ event := c.sendE2TSubscriptionRequest(subs, trans, parentTrans)
+ switch event.(type) {
+ case *e2ap.E2APSubscriptionResponse:
+ subRfMsg, valid = subs.SetCachedResponse(event, true)
+ case *e2ap.E2APSubscriptionFailure:
+ subRfMsg, valid = subs.SetCachedResponse(event, false)
+ default:
+ xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
+ subRfMsg, valid = subs.SetCachedResponse(nil, false)
+ c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
+ }
+ xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
+ } else {
+ xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
}
- go c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second)
+ parentTrans.SendEvent(subRfMsg, 0)
}
//-------------------------------------------------------------------
// SUBS DELETE Handling
//-------------------------------------------------------------------
-func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *Transaction) {
+func (c *Control) handleSubscriptionDelete(subs *Subscription, parentTrans *TransactionXapp) {
- trans := c.tracker.NewTransaction(subs.GetMeid())
+ trans := c.tracker.NewSubsTransaction(subs)
subs.WaitTransactionTurn(trans)
defer subs.ReleaseTransactionTurn(trans)
defer trans.Release()
- xapp.Logger.Debug("SUBS-SubDelReq: Handling %s parent %s", idstring(trans, subs, nil), parentTrans.String())
+ xapp.Logger.Debug("SUBS-SubDelReq: Handling %s", idstring(nil, trans, subs, parentTrans))
subs.mutex.Lock()
if subs.valid && subs.EpList.HasEndpoint(parentTrans.GetEndpoint()) && subs.EpList.Size() == 1 {
@@ -369,41 +349,37 @@
}
subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
- subDelRespMsg.RequestId.Id = subs.SubReqMsg.RequestId.Id
- subDelRespMsg.RequestId.Seq = uint32(subs.GetSubId())
+ subDelRespMsg.RequestId = subs.SubReqMsg.RequestId
subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
parentTrans.SendEvent(subDelRespMsg, 0)
-
- go c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second)
}
//-------------------------------------------------------------------
// send to E2T Subscription Request
//-------------------------------------------------------------------
-func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *Transaction, parentTrans *Transaction) interface{} {
+func (c *Control) sendE2TSubscriptionRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
var err error
var event interface{} = nil
var timedOut bool = false
subReqMsg := subs.SubReqMsg
- subReqMsg.RequestId.Id = 123
- subReqMsg.RequestId.Seq = uint32(subs.GetSubId())
+ subReqMsg.RequestId = subs.GetReqId().RequestId
trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionRequest(subReqMsg)
if err != nil {
- xapp.Logger.Error("SUBS-SubReq: %s parent %s", idstring(trans, subs, err), parentTrans.String())
+ xapp.Logger.Error("SUBS-SubReq: %s", idstring(err, trans, subs, parentTrans))
return event
}
for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
- desc := fmt.Sprintf("SUBS-SubReq: SubReq to E2T (retry %d)", retries)
- c.rmrSend(desc, subs, trans)
+ desc := fmt.Sprintf("(retry %d)", retries)
+ c.rmrSendToE2T(desc, subs, trans)
event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
if timedOut {
continue
}
break
}
- xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s parent %s", typeofSubsMessage(event), idstring(trans, subs, nil), parentTrans.String())
+ xapp.Logger.Debug("SUBS-SubReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
return event
}
@@ -411,31 +387,30 @@
// send to E2T Subscription Delete Request
//-------------------------------------------------------------------
-func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *Transaction, parentTrans *Transaction) interface{} {
+func (c *Control) sendE2TSubscriptionDeleteRequest(subs *Subscription, trans *TransactionSubs, parentTrans *TransactionXapp) interface{} {
var err error
var event interface{}
var timedOut bool
subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
- subDelReqMsg.RequestId.Id = 123
- subDelReqMsg.RequestId.Seq = uint32(subs.GetSubId())
+ subDelReqMsg.RequestId = subs.GetReqId().RequestId
subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
trans.Mtype, trans.Payload, err = c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
if err != nil {
- xapp.Logger.Error("SUBS-SubDelReq: %s parent %s", idstring(trans, subs, err), parentTrans.String())
+ xapp.Logger.Error("SUBS-SubDelReq: %s", idstring(err, trans, subs, parentTrans))
return event
}
for retries := uint64(0); retries < e2tMaxSubDelReqTryCount; retries++ {
- desc := fmt.Sprintf("SUBS-SubDelReq: SubDelReq to E2T (retry %d)", retries)
- c.rmrSend(desc, subs, trans)
+ desc := fmt.Sprintf("(retry %d)", retries)
+ c.rmrSendToE2T(desc, subs, trans)
event, timedOut = trans.WaitEvent(e2tSubDelReqTime)
if timedOut {
continue
}
break
}
- xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s parent %s", typeofSubsMessage(event), idstring(trans, subs, nil), parentTrans.String())
+ xapp.Logger.Debug("SUBS-SubDelReq: Response handling event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
return event
}
@@ -443,27 +418,27 @@
// handle from E2T Subscription Reponse
//-------------------------------------------------------------------
func (c *Control) handleE2TSubscriptionResponse(params *RMRParams) {
- xapp.Logger.Info("MSG-SubResp from E2T: %s", params.String())
+ xapp.Logger.Info("MSG from E2T: %s", params.String())
subRespMsg, err := c.e2ap.UnpackSubscriptionResponse(params.Payload)
if err != nil {
- xapp.Logger.Error("MSG-SubResp %s", idstring(params, nil, err))
+ xapp.Logger.Error("MSG-SubResp %s", idstring(err, params))
return
}
- subs, err := c.registry.GetSubscriptionFirstMatch([]uint16{uint16(subRespMsg.RequestId.Seq), uint16(params.SubId)})
+ subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subRespMsg.RequestId.Seq})
if err != nil {
- xapp.Logger.Error("MSG-SubResp: %s", idstring(params, nil, err))
+ xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params))
return
}
trans := subs.GetTransaction()
if trans == nil {
err = fmt.Errorf("Ongoing transaction not found")
- xapp.Logger.Error("MSG-SubResp: %s", idstring(params, subs, err))
+ xapp.Logger.Error("MSG-SubResp: %s", idstring(err, params, subs))
return
}
sendOk, timedOut := trans.SendEvent(subRespMsg, e2tRecvMsgTimeout)
if sendOk == false {
err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
- xapp.Logger.Error("MSG-SubResp: %s", idstring(trans, subs, err))
+ xapp.Logger.Error("MSG-SubResp: %s", idstring(err, trans, subs))
}
return
}
@@ -472,27 +447,27 @@
// handle from E2T Subscription Failure
//-------------------------------------------------------------------
func (c *Control) handleE2TSubscriptionFailure(params *RMRParams) {
- xapp.Logger.Info("MSG-SubFail from E2T: %s", params.String())
+ xapp.Logger.Info("MSG from E2T: %s", params.String())
subFailMsg, err := c.e2ap.UnpackSubscriptionFailure(params.Payload)
if err != nil {
- xapp.Logger.Error("MSG-SubFail %s", idstring(params, nil, err))
+ xapp.Logger.Error("MSG-SubFail %s", idstring(err, params))
return
}
- subs, err := c.registry.GetSubscriptionFirstMatch([]uint16{uint16(subFailMsg.RequestId.Seq), uint16(params.SubId)})
+ subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subFailMsg.RequestId.Seq})
if err != nil {
- xapp.Logger.Error("MSG-SubFail: %s", idstring(params, nil, err))
+ xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params))
return
}
trans := subs.GetTransaction()
if trans == nil {
err = fmt.Errorf("Ongoing transaction not found")
- xapp.Logger.Error("MSG-SubFail: %s", idstring(params, subs, err))
+ xapp.Logger.Error("MSG-SubFail: %s", idstring(err, params, subs))
return
}
sendOk, timedOut := trans.SendEvent(subFailMsg, e2tRecvMsgTimeout)
if sendOk == false {
err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
- xapp.Logger.Error("MSG-SubFail: %s", idstring(trans, subs, err))
+ xapp.Logger.Error("MSG-SubFail: %s", idstring(err, trans, subs))
}
return
}
@@ -501,27 +476,27 @@
// handle from E2T Subscription Delete Response
//-------------------------------------------------------------------
func (c *Control) handleE2TSubscriptionDeleteResponse(params *RMRParams) (err error) {
- xapp.Logger.Info("SUBS-SubDelResp from E2T:%s", params.String())
+ xapp.Logger.Info("MSG from E2T: %s", params.String())
subDelRespMsg, err := c.e2ap.UnpackSubscriptionDeleteResponse(params.Payload)
if err != nil {
- xapp.Logger.Error("SUBS-SubDelResp: %s", idstring(params, nil, err))
+ xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
return
}
- subs, err := c.registry.GetSubscriptionFirstMatch([]uint16{uint16(subDelRespMsg.RequestId.Seq), uint16(params.SubId)})
+ subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelRespMsg.RequestId.Seq})
if err != nil {
- xapp.Logger.Error("SUBS-SubDelResp: %s", idstring(params, nil, err))
+ xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params))
return
}
trans := subs.GetTransaction()
if trans == nil {
err = fmt.Errorf("Ongoing transaction not found")
- xapp.Logger.Error("SUBS-SubDelResp: %s", idstring(params, subs, err))
+ xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, params, subs))
return
}
sendOk, timedOut := trans.SendEvent(subDelRespMsg, e2tRecvMsgTimeout)
if sendOk == false {
err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
- xapp.Logger.Error("MSG-SubDelResp: %s", idstring(trans, subs, err))
+ xapp.Logger.Error("MSG-SubDelResp: %s", idstring(err, trans, subs))
}
return
}
@@ -530,27 +505,27 @@
// handle from E2T Subscription Delete Failure
//-------------------------------------------------------------------
func (c *Control) handleE2TSubscriptionDeleteFailure(params *RMRParams) {
- xapp.Logger.Info("MSG-SubDelFail from E2T:%s", params.String())
+ xapp.Logger.Info("MSG from E2T: %s", params.String())
subDelFailMsg, err := c.e2ap.UnpackSubscriptionDeleteFailure(params.Payload)
if err != nil {
- xapp.Logger.Error("MSG-SubDelFail: %s", idstring(params, nil, err))
+ xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
return
}
- subs, err := c.registry.GetSubscriptionFirstMatch([]uint16{uint16(subDelFailMsg.RequestId.Seq), uint16(params.SubId)})
+ subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{subDelFailMsg.RequestId.Seq})
if err != nil {
- xapp.Logger.Error("MSG-SubDelFail: %s", idstring(params, nil, err))
+ xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params))
return
}
trans := subs.GetTransaction()
if trans == nil {
err = fmt.Errorf("Ongoing transaction not found")
- xapp.Logger.Error("MSG-SubDelFail: %s", idstring(params, subs, err))
+ xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, params, subs))
return
}
sendOk, timedOut := trans.SendEvent(subDelFailMsg, e2tRecvMsgTimeout)
if sendOk == false {
err = fmt.Errorf("Passing event to transaction failed: sendOk(%t) timedOut(%t)", sendOk, timedOut)
- xapp.Logger.Error("MSG-SubDelFail: %s", idstring(trans, subs, err))
+ xapp.Logger.Error("MSG-SubDelFail: %s", idstring(err, trans, subs))
}
return
}
diff --git a/pkg/control/registry.go b/pkg/control/registry.go
index 6abdcdb..e00062b 100644
--- a/pkg/control/registry.go
+++ b/pkg/control/registry.go
@@ -30,22 +30,23 @@
//-----------------------------------------------------------------------------
//
//-----------------------------------------------------------------------------
+
type Registry struct {
mutex sync.Mutex
- register map[uint16]*Subscription
- subIds []uint16
+ register map[uint32]*Subscription
+ subIds []uint32
rtmgrClient *RtmgrClient
}
func (r *Registry) Initialize() {
- r.register = make(map[uint16]*Subscription)
- var i uint16
+ r.register = make(map[uint32]*Subscription)
+ var i uint32
for i = 0; i < 65535; i++ {
r.subIds = append(r.subIds, i+1)
}
}
-func (r *Registry) allocateSubs(trans *Transaction, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) {
+func (r *Registry) allocateSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) {
if len(r.subIds) > 0 {
sequenceNumber := r.subIds[0]
r.subIds = r.subIds[1:]
@@ -55,14 +56,15 @@
}
subs := &Subscription{
registry: r,
- Seq: sequenceNumber,
Meid: trans.Meid,
SubReqMsg: subReqMsg,
valid: true,
}
+ subs.ReqId.Id = 123
+ subs.ReqId.Seq = sequenceNumber
if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
- r.subIds = append(r.subIds, subs.Seq)
+ r.subIds = append(r.subIds, subs.ReqId.Seq)
return nil, fmt.Errorf("Registry: Endpoint existing already in subscription")
}
@@ -71,9 +73,10 @@
return nil, fmt.Errorf("Registry: Failed to reserve subscription no free ids")
}
-func (r *Registry) findExistingSubs(trans *Transaction, subReqMsg *e2ap.E2APSubscriptionRequest) *Subscription {
+func (r *Registry) findExistingSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) *Subscription {
+
for _, subs := range r.register {
- if subs.IsSame(trans, subReqMsg) {
+ if subs.IsMergeable(trans, subReqMsg) {
//
// check if there has been race conditions
@@ -84,6 +87,11 @@
subs.mutex.Unlock()
continue
}
+ // If size is zero, entry is to be deleted
+ if subs.EpList.Size() == 0 {
+ subs.mutex.Unlock()
+ continue
+ }
// try to add to endpointlist.
if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
subs.mutex.Unlock()
@@ -91,15 +99,14 @@
}
subs.mutex.Unlock()
- //Race collision during parallel incoming and deleted
- xapp.Logger.Debug("Registry: Identical subs found %s for %s", subs.String(), trans.String())
+ xapp.Logger.Debug("Registry: Mergeable subs found %s for %s", subs.String(), trans.String())
return subs
}
}
return nil
}
-func (r *Registry) AssignToSubscription(trans *Transaction, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) {
+func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) {
var err error
var newAlloc bool
r.mutex.Lock()
@@ -128,31 +135,31 @@
// Subscription route updates
//
if epamount == 1 {
- subRouteAction := SubRouteInfo{CREATE, subs.EpList, subs.Seq}
- err = r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
+ subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.Seq)}
+ err = r.rtmgrClient.SubscriptionRequestCreate(subRouteAction)
} else {
- subRouteAction := SubRouteInfo{UPDATE, subs.EpList, subs.Seq}
+ subRouteAction := SubRouteInfo{subs.EpList, uint16(subs.ReqId.Seq)}
err = r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
}
r.mutex.Lock()
if err != nil {
if newAlloc {
- r.subIds = append(r.subIds, subs.Seq)
+ r.subIds = append(r.subIds, subs.ReqId.Seq)
}
return nil, err
}
if newAlloc {
- r.register[subs.Seq] = subs
+ r.register[subs.ReqId.Seq] = subs
}
- xapp.Logger.Debug("Registry: Create %s", subs.String())
+ xapp.Logger.Debug("CREATE %s", subs.String())
xapp.Logger.Debug("Registry: substable=%v", r.register)
return subs, nil
}
-// TODO: Needs better logic when there is concurrent calls
-func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *Transaction, waitRouteClean time.Duration) error {
+// TODO: Works with concurrent calls, but check if can be improved
+func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *TransactionXapp, waitRouteClean time.Duration) error {
r.mutex.Lock()
defer r.mutex.Unlock()
@@ -161,17 +168,12 @@
delStatus := subs.EpList.DelEndpoint(trans.GetEndpoint())
epamount := subs.EpList.Size()
+ seqId := subs.ReqId.Seq
- //
- // If last endpoint remove from register map
- //
- if epamount == 0 {
- if _, ok := r.register[subs.Seq]; ok {
- xapp.Logger.Debug("Registry: Delete %s", subs.String())
- delete(r.register, subs.Seq)
- xapp.Logger.Debug("Registry: substable=%v", r.register)
- }
+ if delStatus == false {
+ return nil
}
+
r.mutex.Unlock()
//
@@ -183,35 +185,38 @@
subs.mutex.Lock()
}
- xapp.Logger.Info("Registry: Cleaning %s", subs.String())
+ xapp.Logger.Info("CLEAN %s", subs.String())
//
// Subscription route updates
//
- if delStatus {
- if epamount == 0 {
- tmpList := RmrEndpointList{}
- tmpList.AddEndpoint(trans.GetEndpoint())
- subRouteAction := SubRouteInfo{DELETE, tmpList, subs.Seq}
- r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
- } else {
- subRouteAction := SubRouteInfo{UPDATE, subs.EpList, subs.Seq}
- r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
- }
+ if epamount == 0 {
+ tmpList := RmrEndpointList{}
+ tmpList.AddEndpoint(trans.GetEndpoint())
+ subRouteAction := SubRouteInfo{tmpList, uint16(seqId)}
+ r.rtmgrClient.SubscriptionRequestDelete(subRouteAction)
+ } else if subs.EpList.Size() > 0 {
+ subRouteAction := SubRouteInfo{subs.EpList, uint16(seqId)}
+ r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
}
r.mutex.Lock()
//
- // If last endpoint free seq nro
+ // If last endpoint, release and free seqid
//
if epamount == 0 {
- r.subIds = append(r.subIds, subs.Seq)
+ if _, ok := r.register[seqId]; ok {
+ xapp.Logger.Debug("RELEASE %s", subs.String())
+ delete(r.register, seqId)
+ xapp.Logger.Debug("Registry: substable=%v", r.register)
+ }
+ r.subIds = append(r.subIds, seqId)
}
return nil
}
-func (r *Registry) GetSubscription(sn uint16) *Subscription {
+func (r *Registry) GetSubscription(sn uint32) *Subscription {
r.mutex.Lock()
defer r.mutex.Unlock()
if _, ok := r.register[sn]; ok {
@@ -220,7 +225,7 @@
return nil
}
-func (r *Registry) GetSubscriptionFirstMatch(ids []uint16) (*Subscription, error) {
+func (r *Registry) GetSubscriptionFirstMatch(ids []uint32) (*Subscription, error) {
r.mutex.Lock()
defer r.mutex.Unlock()
for _, id := range ids {
diff --git a/pkg/control/subscription.go b/pkg/control/subscription.go
index 5bfe2e1..c2b5283 100644
--- a/pkg/control/subscription.go
+++ b/pkg/control/subscription.go
@@ -22,7 +22,6 @@
import (
"gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
- "strconv"
"sync"
)
@@ -30,27 +29,40 @@
//
//-----------------------------------------------------------------------------
type Subscription struct {
- mutex sync.Mutex // Lock
- valid bool // valid
- registry *Registry // Registry
- Seq uint16 // SubsId
- Meid *xapp.RMRMeid // Meid/ RanName
- EpList RmrEndpointList // Endpoints
- TransLock sync.Mutex // Lock transactions, only one executed per time for subs
- TheTrans *Transaction // Ongoing transaction from xapp
- SubReqMsg *e2ap.E2APSubscriptionRequest // Subscription information
- SubRespMsg *e2ap.E2APSubscriptionResponse // Subscription information
- SubFailMsg *e2ap.E2APSubscriptionFailure // Subscription information
+ mutex sync.Mutex // Lock
+ valid bool // valid
+ registry *Registry // Registry
+ ReqId RequestId // ReqId (Requestor Id + Seq Nro a.k.a subsid)
+ Meid *xapp.RMRMeid // Meid/ RanName
+ EpList RmrEndpointList // Endpoints
+ TransLock sync.Mutex // Lock transactions, only one executed per time for subs
+ TheTrans TransactionIf // Ongoing transaction
+ SubReqMsg *e2ap.E2APSubscriptionRequest // Subscription information
+ SubRFMsg interface{} // Subscription information
}
func (s *Subscription) String() string {
- return "subs(" + strconv.FormatUint(uint64(s.Seq), 10) + "/" + s.Meid.RanName + "/" + s.EpList.String() + ")"
+ return "subs(" + s.ReqId.String() + "/" + s.Meid.RanName + "/" + s.EpList.String() + ")"
}
-func (s *Subscription) GetSubId() uint16 {
+func (s *Subscription) GetCachedResponse() (interface{}, bool) {
s.mutex.Lock()
defer s.mutex.Unlock()
- return s.Seq
+ return s.SubRFMsg, s.valid
+}
+
+func (s *Subscription) SetCachedResponse(subRFMsg interface{}, valid bool) (interface{}, bool) {
+ s.mutex.Lock()
+ defer s.mutex.Unlock()
+ s.SubRFMsg = subRFMsg
+ s.valid = valid
+ return s.SubRFMsg, s.valid
+}
+
+func (s *Subscription) GetReqId() *RequestId {
+ s.mutex.Lock()
+ defer s.mutex.Unlock()
+ return &s.ReqId
}
func (s *Subscription) GetMeid() *xapp.RMRMeid {
@@ -62,30 +74,20 @@
return nil
}
-func (s *Subscription) IsTransactionReserved() bool {
- s.mutex.Lock()
- defer s.mutex.Unlock()
- if s.TheTrans != nil {
- return true
- }
- return false
-
-}
-
-func (s *Subscription) GetTransaction() *Transaction {
+func (s *Subscription) GetTransaction() TransactionIf {
s.mutex.Lock()
defer s.mutex.Unlock()
return s.TheTrans
}
-func (s *Subscription) WaitTransactionTurn(trans *Transaction) {
+func (s *Subscription) WaitTransactionTurn(trans TransactionIf) {
s.TransLock.Lock()
s.mutex.Lock()
s.TheTrans = trans
s.mutex.Unlock()
}
-func (s *Subscription) ReleaseTransactionTurn(trans *Transaction) {
+func (s *Subscription) ReleaseTransactionTurn(trans TransactionIf) {
s.mutex.Lock()
if trans != nil && trans == s.TheTrans {
s.TheTrans = nil
@@ -94,7 +96,7 @@
s.TransLock.Unlock()
}
-func (s *Subscription) IsSame(trans *Transaction, subReqMsg *e2ap.E2APSubscriptionRequest) bool {
+func (s *Subscription) IsMergeable(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) bool {
s.mutex.Lock()
defer s.mutex.Unlock()
@@ -110,15 +112,6 @@
return false
}
- if s.EpList.Size() == 0 {
- return false
- }
-
- //Somehow special case ... ?
- if s.EpList.HasEndpoint(trans.GetEndpoint()) == true {
- return false
- }
-
// EventTrigger check
if s.SubReqMsg.EventTriggerDefinition.InterfaceDirection != subReqMsg.EventTriggerDefinition.InterfaceDirection ||
s.SubReqMsg.EventTriggerDefinition.ProcedureCode != subReqMsg.EventTriggerDefinition.ProcedureCode ||
@@ -156,6 +149,10 @@
return false
}
+ if acts.ActionType != e2ap.E2AP_ActionTypeReport {
+ return false
+ }
+
if acts.ActionDefinition.Present != actt.ActionDefinition.Present ||
acts.ActionDefinition.StyleId != actt.ActionDefinition.StyleId ||
acts.ActionDefinition.ParamId != actt.ActionDefinition.ParamId {
diff --git a/pkg/control/tracker.go b/pkg/control/tracker.go
index c16a76a..2f54237 100644
--- a/pkg/control/tracker.go
+++ b/pkg/control/tracker.go
@@ -30,71 +30,81 @@
//-----------------------------------------------------------------------------
type Tracker struct {
mutex sync.Mutex
- transactionXappTable map[TransactionXappKey]*Transaction
+ transactionXappTable map[TransactionXappKey]*TransactionXapp
transSeq uint64
}
func (t *Tracker) Init() {
- t.transactionXappTable = make(map[TransactionXappKey]*Transaction)
+ t.transactionXappTable = make(map[TransactionXappKey]*TransactionXapp)
}
-func (t *Tracker) NewTransactionFromSkel(transSkel *Transaction) *Transaction {
+func (t *Tracker) initTransaction(transBase *Transaction) {
t.mutex.Lock()
defer t.mutex.Unlock()
- trans := transSkel
- if trans == nil {
- trans = &Transaction{}
- }
- trans.EventChan = make(chan interface{})
- trans.tracker = t
- trans.Seq = t.transSeq
+ transBase.EventChan = make(chan interface{})
+ transBase.tracker = t
+ transBase.Seq = t.transSeq
t.transSeq++
- xapp.Logger.Debug("Transaction: Create %s", trans.String())
+}
+
+func (t *Tracker) NewSubsTransaction(subs *Subscription) *TransactionSubs {
+ trans := &TransactionSubs{}
+ trans.Meid = subs.GetMeid()
+ rid := subs.GetReqId()
+ if rid != nil {
+ trans.ReqId = *rid
+ }
+ t.initTransaction(&trans.Transaction)
+ xapp.Logger.Debug("CREATE %s", trans.String())
return trans
}
-func (t *Tracker) NewTransaction(meid *xapp.RMRMeid) *Transaction {
- trans := &Transaction{}
- trans.Meid = meid
- trans = t.NewTransactionFromSkel(trans)
- return trans
-}
-
-func (t *Tracker) TrackTransaction(
+func (t *Tracker) NewXappTransaction(
endpoint *RmrEndpoint,
xid string,
- meid *xapp.RMRMeid) (*Transaction, error) {
+ reqId *RequestId,
+ meid *xapp.RMRMeid) *TransactionXapp {
- if endpoint == nil {
- err := fmt.Errorf("Tracker: No valid endpoint given")
- return nil, err
- }
-
- trans := &Transaction{}
+ trans := &TransactionXapp{}
trans.XappKey = &TransactionXappKey{*endpoint, xid}
trans.Meid = meid
- trans = t.NewTransactionFromSkel(trans)
+ if reqId != nil {
+ trans.ReqId = *reqId
+ }
+ t.initTransaction(&trans.Transaction)
+ xapp.Logger.Debug("CREATE %s", trans.String())
+ return trans
+}
+
+func (t *Tracker) Track(trans *TransactionXapp) error {
+
+ if trans.GetEndpoint() == nil {
+ err := fmt.Errorf("Tracker: No valid endpoint given in %s", trans.String())
+ return err
+ }
t.mutex.Lock()
defer t.mutex.Unlock()
- if othtrans, ok := t.transactionXappTable[*trans.XappKey]; ok {
- err := fmt.Errorf("Tracker: %s is ongoing, %s not created ", othtrans, trans)
- return nil, err
+ theKey := *trans.XappKey
+
+ if othtrans, ok := t.transactionXappTable[theKey]; ok {
+ err := fmt.Errorf("Tracker: %s is ongoing, not tracking %s", othtrans, trans)
+ return err
}
trans.tracker = t
- t.transactionXappTable[*trans.XappKey] = trans
- xapp.Logger.Debug("Tracker: Add %s", trans.String())
+ t.transactionXappTable[theKey] = trans
+ xapp.Logger.Debug("Tracker: Append %s", trans.String())
//xapp.Logger.Debug("Tracker: transtable=%v", t.transactionXappTable)
- return trans, nil
+ return nil
}
-func (t *Tracker) UnTrackTransaction(xappKey TransactionXappKey) (*Transaction, error) {
+func (t *Tracker) UnTrackTransaction(xappKey TransactionXappKey) (*TransactionXapp, error) {
t.mutex.Lock()
defer t.mutex.Unlock()
if trans, ok2 := t.transactionXappTable[xappKey]; ok2 {
- xapp.Logger.Debug("Tracker: Delete %s", trans.String())
+ xapp.Logger.Debug("Tracker: Remove %s", trans.String())
delete(t.transactionXappTable, xappKey)
//xapp.Logger.Debug("Tracker: transtable=%v", t.transactionXappTable)
return trans, nil
diff --git a/pkg/control/transaction.go b/pkg/control/transaction.go
index 735954e..b2b838b 100644
--- a/pkg/control/transaction.go
+++ b/pkg/control/transaction.go
@@ -30,18 +30,33 @@
//-----------------------------------------------------------------------------
//
//-----------------------------------------------------------------------------
+type TransactionIf interface {
+ String() string
+ Release()
+ SendEvent(interface{}, time.Duration) (bool, bool)
+ WaitEvent(time.Duration) (interface{}, bool)
+}
-type TransactionBase struct {
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+
+type Transaction struct {
mutex sync.Mutex //
- Seq uint64 //
+ Seq uint64 //transaction sequence
tracker *Tracker //tracker instance
Meid *xapp.RMRMeid //meid transaction related
+ ReqId RequestId //
Mtype int //Encoded message type to be send
Payload *packer.PackedData //Encoded message to be send
EventChan chan interface{}
}
-func (t *TransactionBase) SendEvent(event interface{}, waittime time.Duration) (bool, bool) {
+func (t *Transaction) String() string {
+ return "trans(" + strconv.FormatUint(uint64(t.Seq), 10) + "/" + t.Meid.RanName + "/" + t.ReqId.String() + ")"
+}
+
+func (t *Transaction) SendEvent(event interface{}, waittime time.Duration) (bool, bool) {
if waittime > 0 {
select {
case t.EventChan <- event:
@@ -55,7 +70,7 @@
return true, false
}
-func (t *TransactionBase) WaitEvent(waittime time.Duration) (interface{}, bool) {
+func (t *Transaction) WaitEvent(waittime time.Duration) (interface{}, bool) {
if waittime > 0 {
select {
case event := <-t.EventChan:
@@ -68,13 +83,19 @@
return event, false
}
-func (t *TransactionBase) GetMtype() int {
+func (t *Transaction) GetReqId() *RequestId {
+ t.mutex.Lock()
+ defer t.mutex.Unlock()
+ return &t.ReqId
+}
+
+func (t *Transaction) GetMtype() int {
t.mutex.Lock()
defer t.mutex.Unlock()
return t.Mtype
}
-func (t *TransactionBase) GetMeid() *xapp.RMRMeid {
+func (t *Transaction) GetMeid() *xapp.RMRMeid {
t.mutex.Lock()
defer t.mutex.Unlock()
if t.Meid != nil {
@@ -83,7 +104,7 @@
return nil
}
-func (t *TransactionBase) GetPayload() *packer.PackedData {
+func (t *Transaction) GetPayload() *packer.PackedData {
t.mutex.Lock()
defer t.mutex.Unlock()
return t.Payload
@@ -92,6 +113,24 @@
//-----------------------------------------------------------------------------
//
//-----------------------------------------------------------------------------
+type TransactionSubs struct {
+ Transaction //
+}
+
+func (t *TransactionSubs) String() string {
+ return "transsubs(" + t.Transaction.String() + ")"
+}
+
+func (t *TransactionSubs) Release() {
+ t.mutex.Lock()
+ xapp.Logger.Debug("RELEASE %s", t.String())
+ t.tracker = nil
+ t.mutex.Unlock()
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
type TransactionXappKey struct {
RmrEndpoint
Xid string // xapp xid in req
@@ -104,20 +143,20 @@
//-----------------------------------------------------------------------------
//
//-----------------------------------------------------------------------------
-type Transaction struct {
- TransactionBase //
- XappKey *TransactionXappKey //
+type TransactionXapp struct {
+ Transaction //
+ XappKey *TransactionXappKey //
}
-func (t *Transaction) String() string {
+func (t *TransactionXapp) String() string {
var transkey string = "transkey(N/A)"
if t.XappKey != nil {
transkey = t.XappKey.String()
}
- return "trans(" + strconv.FormatUint(uint64(t.Seq), 10) + "/" + t.Meid.RanName + "/" + transkey + ")"
+ return "transxapp(" + t.Transaction.String() + "/" + transkey + ")"
}
-func (t *Transaction) GetEndpoint() *RmrEndpoint {
+func (t *TransactionXapp) GetEndpoint() *RmrEndpoint {
t.mutex.Lock()
defer t.mutex.Unlock()
if t.XappKey != nil {
@@ -126,7 +165,7 @@
return nil
}
-func (t *Transaction) GetXid() string {
+func (t *TransactionXapp) GetXid() string {
t.mutex.Lock()
defer t.mutex.Unlock()
if t.XappKey != nil {
@@ -135,7 +174,7 @@
return ""
}
-func (t *Transaction) GetSrc() string {
+func (t *TransactionXapp) GetSrc() string {
t.mutex.Lock()
defer t.mutex.Unlock()
if t.XappKey != nil {
@@ -144,9 +183,9 @@
return ""
}
-func (t *Transaction) Release() {
+func (t *TransactionXapp) Release() {
t.mutex.Lock()
- xapp.Logger.Debug("Transaction: Release %s", t.String())
+ xapp.Logger.Debug("RELEASE %s", t.String())
tracker := t.tracker
xappkey := t.XappKey
t.tracker = nil
diff --git a/pkg/control/types.go b/pkg/control/types.go
index 164e801..4d318e0 100644
--- a/pkg/control/types.go
+++ b/pkg/control/types.go
@@ -22,6 +22,7 @@
import (
"bytes"
"fmt"
+ "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
"strconv"
"strings"
@@ -30,10 +31,12 @@
//-----------------------------------------------------------------------------
//
//-----------------------------------------------------------------------------
-type RmrDatagram struct {
- MessageType int
- SubscriptionId uint16
- Payload []byte
+type RequestId struct {
+ e2ap.RequestId
+}
+
+func (rid *RequestId) String() string {
+ return "reqid(" + rid.RequestId.String() + ")"
}
//-----------------------------------------------------------------------------
@@ -45,7 +48,15 @@
}
func (endpoint RmrEndpoint) String() string {
- return endpoint.Get()
+ return endpoint.Addr + ":" + strconv.FormatUint(uint64(endpoint.Port), 10)
+}
+
+func (endpoint *RmrEndpoint) Equal(ep *RmrEndpoint) bool {
+ if (endpoint.Addr == ep.Addr) &&
+ (endpoint.Port == ep.Port) {
+ return true
+ }
+ return false
}
func (endpoint *RmrEndpoint) GetAddr() string {
@@ -56,10 +67,6 @@
return endpoint.Port
}
-func (endpoint *RmrEndpoint) Get() string {
- return endpoint.Addr + ":" + strconv.FormatUint(uint64(endpoint.Port), 10)
-}
-
func (endpoint *RmrEndpoint) Set(src string) bool {
elems := strings.Split(src, ":")
if len(elems) == 2 {
@@ -82,11 +89,10 @@
}
func (eplist *RmrEndpointList) String() string {
+ tmpList := eplist.Endpoints
valuesText := []string{}
- for i := range eplist.Endpoints {
- ep := eplist.Endpoints[i]
- text := ep.String()
- valuesText = append(valuesText, text)
+ for i := range tmpList {
+ valuesText = append(valuesText, tmpList[i].String())
}
return strings.Join(valuesText, ",")
}
@@ -97,7 +103,7 @@
func (eplist *RmrEndpointList) AddEndpoint(ep *RmrEndpoint) bool {
for i := range eplist.Endpoints {
- if (eplist.Endpoints[i].Addr == ep.Addr) && (eplist.Endpoints[i].Port == ep.Port) {
+ if eplist.Endpoints[i].Equal(ep) {
return false
}
}
@@ -107,7 +113,7 @@
func (eplist *RmrEndpointList) DelEndpoint(ep *RmrEndpoint) bool {
for i := range eplist.Endpoints {
- if (eplist.Endpoints[i].Addr == ep.Addr) && (eplist.Endpoints[i].Port == ep.Port) {
+ if eplist.Endpoints[i].Equal(ep) {
eplist.Endpoints[i] = eplist.Endpoints[len(eplist.Endpoints)-1]
eplist.Endpoints[len(eplist.Endpoints)-1] = RmrEndpoint{"", 0}
eplist.Endpoints = eplist.Endpoints[:len(eplist.Endpoints)-1]
@@ -120,7 +126,7 @@
func (eplist *RmrEndpointList) DelEndpoints(otheplist *RmrEndpointList) bool {
var retval bool = false
for i := range otheplist.Endpoints {
- if eplist.DelEndpoint(&eplist.Endpoints[i]) {
+ if eplist.DelEndpoint(&otheplist.Endpoints[i]) {
retval = true
}
}
@@ -129,7 +135,7 @@
func (eplist *RmrEndpointList) HasEndpoint(ep *RmrEndpoint) bool {
for i := range eplist.Endpoints {
- if (eplist.Endpoints[i].Addr == ep.Addr) && (eplist.Endpoints[i].Port == ep.Port) {
+ if eplist.Endpoints[i].Equal(ep) {
return true
}
}
@@ -145,25 +151,6 @@
}
//-----------------------------------------------------------------------------
-//
-//-----------------------------------------------------------------------------
-type Action int
-
-func (act Action) String() string {
- actions := [...]string{
- "CREATE",
- "UPDATE",
- "NONE",
- "DELETE",
- }
-
- if act < CREATE || act > DELETE {
- return "UNKNOWN"
- }
- return actions[act]
-}
-
-//-----------------------------------------------------------------------------
// To add own method for rmrparams
//-----------------------------------------------------------------------------
type RMRParams struct {
diff --git a/pkg/control/types_test.go b/pkg/control/types_test.go
index caeae7b..5e1cfac 100644
--- a/pkg/control/types_test.go
+++ b/pkg/control/types_test.go
@@ -39,7 +39,7 @@
testError(t, "Endpoint elems for value %s expected addr %s port %d got addr %s port %d", val, expect.GetAddr(), expect.GetPort(), res.GetAddr(), res.GetPort())
}
if expect.String() != res.String() {
- testError(t, "Endpoint string for value %s expected %s got %s", val, expect.String(), res.Get())
+ testError(t, "Endpoint string for value %s expected %s got %s", val, expect.String(), res.String())
}
}
@@ -52,20 +52,71 @@
testEp(t, "", nil)
}
-func TestAction(t *testing.T) {
+func TestRmrEndpointList(t *testing.T) {
+ epl := &RmrEndpointList{}
- testActionString := func(t *testing.T, val int, str string) {
- if Action(val).String() != str {
- testError(t, "String for value %d expected %s got %s", val, str, Action(val).String())
- }
+ // Simple add / has / delete
+ if epl.AddEndpoint(NewRmrEndpoint("127.0.0.1:8080")) == false {
+ testError(t, "RmrEndpointList: 8080 add failed")
+ }
+ if epl.AddEndpoint(NewRmrEndpoint("127.0.0.1:8080")) == true {
+ testError(t, "RmrEndpointList: 8080 duplicate add success")
+ }
+ if epl.AddEndpoint(NewRmrEndpoint("127.0.0.1:8081")) == false {
+ testError(t, "RmrEndpointList: 8081 add failed")
+ }
+ if epl.HasEndpoint(NewRmrEndpoint("127.0.0.1:8081")) == false {
+ testError(t, "RmrEndpointList: 8081 has failed")
+ }
+ if epl.DelEndpoint(NewRmrEndpoint("127.0.0.1:8081")) == false {
+ testError(t, "RmrEndpointList: 8081 del failed")
+ }
+ if epl.HasEndpoint(NewRmrEndpoint("127.0.0.1:8081")) == true {
+ testError(t, "RmrEndpointList: 8081 has non existing success")
+ }
+ if epl.DelEndpoint(NewRmrEndpoint("127.0.0.1:8081")) == true {
+ testError(t, "RmrEndpointList: 8081 del non existing success")
+ }
+ if epl.DelEndpoint(NewRmrEndpoint("127.0.0.1:8080")) == false {
+ testError(t, "RmrEndpointList: 8080 del failed")
}
- testActionString(t, 0, "CREATE")
- testActionString(t, 1, "UPDATE")
- testActionString(t, 2, "NONE")
- testActionString(t, 3, "DELETE")
- testActionString(t, 5, "UNKNOWN")
- testActionString(t, 6, "UNKNOWN")
- testActionString(t, 7, "UNKNOWN")
- testActionString(t, 10, "UNKNOWN")
+ // list delete
+ if epl.AddEndpoint(NewRmrEndpoint("127.0.0.1:8080")) == false {
+ testError(t, "RmrEndpointList: 8080 add failed")
+ }
+ if epl.AddEndpoint(NewRmrEndpoint("127.0.0.1:8081")) == false {
+ testError(t, "RmrEndpointList: 8081 add failed")
+ }
+ if epl.AddEndpoint(NewRmrEndpoint("127.0.0.1:8082")) == false {
+ testError(t, "RmrEndpointList: 8082 add failed")
+ }
+
+ epl2 := &RmrEndpointList{}
+ if epl2.AddEndpoint(NewRmrEndpoint("127.0.0.1:9080")) == false {
+ testError(t, "RmrEndpointList: othlist add 9080 failed")
+ }
+
+ if epl.DelEndpoints(epl2) == true {
+ testError(t, "RmrEndpointList: delete list not existing successs")
+ }
+
+ if epl2.AddEndpoint(NewRmrEndpoint("127.0.0.1:8080")) == false {
+ testError(t, "RmrEndpointList: othlist add 8080 failed")
+ }
+ if epl.DelEndpoints(epl2) == false {
+ testError(t, "RmrEndpointList: delete list 8080,9080 failed")
+ }
+
+ if epl2.AddEndpoint(NewRmrEndpoint("127.0.0.1:8081")) == false {
+ testError(t, "RmrEndpointList: othlist add 8081 failed")
+ }
+ if epl2.AddEndpoint(NewRmrEndpoint("127.0.0.1:8082")) == false {
+ testError(t, "RmrEndpointList: othlist add 8082 failed")
+ }
+
+ if epl.DelEndpoints(epl2) == false {
+ testError(t, "RmrEndpointList: delete list 8080,8081,8082,9080 failed")
+ }
+
}
diff --git a/pkg/control/ut_ctrl_submgr_test.go b/pkg/control/ut_ctrl_submgr_test.go
index 8403c93..16da422 100644
--- a/pkg/control/ut_ctrl_submgr_test.go
+++ b/pkg/control/ut_ctrl_submgr_test.go
@@ -57,11 +57,11 @@
return false
}
-func (mc *testingSubmgrControl) wait_subs_clean(t *testing.T, e2SubsId int, secs int) bool {
+func (mc *testingSubmgrControl) wait_subs_clean(t *testing.T, e2SubsId uint32, secs int) bool {
var subs *Subscription
i := 1
for ; i <= secs*2; i++ {
- subs = mc.c.registry.GetSubscription(uint16(e2SubsId))
+ subs = mc.c.registry.GetSubscription(e2SubsId)
if subs == nil {
return true
}
@@ -75,11 +75,11 @@
return false
}
-func (mc *testingSubmgrControl) wait_subs_trans_clean(t *testing.T, e2SubsId int, secs int) bool {
- var trans *Transaction
+func (mc *testingSubmgrControl) wait_subs_trans_clean(t *testing.T, e2SubsId uint32, secs int) bool {
+ var trans TransactionIf
i := 1
for ; i <= secs*2; i++ {
- subs := mc.c.registry.GetSubscription(uint16(e2SubsId))
+ subs := mc.c.registry.GetSubscription(e2SubsId)
if subs == nil {
return true
}
@@ -97,13 +97,13 @@
return false
}
-func (mc *testingSubmgrControl) get_subid(t *testing.T) uint16 {
+func (mc *testingSubmgrControl) get_subid(t *testing.T) uint32 {
mc.c.registry.mutex.Lock()
defer mc.c.registry.mutex.Unlock()
return mc.c.registry.subIds[0]
}
-func (mc *testingSubmgrControl) wait_subid_change(t *testing.T, origSubId uint16, secs int) (uint16, bool) {
+func (mc *testingSubmgrControl) wait_subid_change(t *testing.T, origSubId uint32, secs int) (uint32, bool) {
i := 1
for ; i <= secs*2; i++ {
mc.c.registry.mutex.Lock()
diff --git a/pkg/control/ut_messaging_test.go b/pkg/control/ut_messaging_test.go
index 40207b3..2e8bc0e 100644
--- a/pkg/control/ut_messaging_test.go
+++ b/pkg/control/ut_messaging_test.go
@@ -56,7 +56,7 @@
waiter.WaitResult(t)
//Wait that subs is cleaned
- mainCtrl.wait_subs_clean(t, int(newSubsId), 10)
+ mainCtrl.wait_subs_clean(t, newSubsId, 10)
xappConn1.TestMsgCnt(t)
xappConn2.TestMsgCnt(t)
@@ -585,7 +585,7 @@
e2termConn.handle_e2term_subs_del_resp(t, delreq, delmsg)
// Wait that subs is cleaned
- mainCtrl.wait_subs_clean(t, int(delreq.RequestId.Seq), 10)
+ mainCtrl.wait_subs_clean(t, delreq.RequestId.Seq, 10)
xappConn1.TestMsgCnt(t)
xappConn2.TestMsgCnt(t)
@@ -642,7 +642,7 @@
delreq, _ := e2termConn.handle_e2term_subs_del_req(t)
// Wait that subs is cleaned
- mainCtrl.wait_subs_clean(t, int(delreq.RequestId.Seq), 15)
+ mainCtrl.wait_subs_clean(t, delreq.RequestId.Seq, 15)
xappConn1.TestMsgCnt(t)
xappConn2.TestMsgCnt(t)
@@ -1156,7 +1156,7 @@
e2termConn.handle_e2term_subs_del_resp(t, delreq1, delmsg1)
//Wait that subs is cleaned
- mainCtrl.wait_subs_clean(t, int(delreq1.RequestId.Seq), 10)
+ mainCtrl.wait_subs_clean(t, delreq1.RequestId.Seq, 10)
xappConn1.TestMsgCnt(t)
xappConn2.TestMsgCnt(t)
diff --git a/pkg/control/ut_stub_e2term_test.go b/pkg/control/ut_stub_e2term_test.go
index 18a6264..273d159 100644
--- a/pkg/control/ut_stub_e2term_test.go
+++ b/pkg/control/ut_stub_e2term_test.go
@@ -132,7 +132,7 @@
}
e2SubsResp.Set(resp)
- xapp.Logger.Debug("%s", e2SubsResp.String())
+ xapp.Logger.Debug("(%s) %s", e2termConn.desc, e2SubsResp.String())
packerr, packedMsg := e2SubsResp.Pack(nil)
if packerr != nil {
testError(t, "(%s) pack NOK %s", e2termConn.desc, packerr.Error())
@@ -198,7 +198,7 @@
xapp.Logger.Info("(%s) Send Subs Fail", e2termConn.desc)
e2SubsFail.Set(fparams.fail)
- xapp.Logger.Debug("%s", e2SubsFail.String())
+ xapp.Logger.Debug("(%s) %s", e2termConn.desc, e2SubsFail.String())
packerr, packedMsg := e2SubsFail.Pack(nil)
if packerr != nil {
testError(t, "(%s) pack NOK %s", e2termConn.desc, packerr.Error())
@@ -276,7 +276,7 @@
resp.FunctionId = req.FunctionId
e2SubsDelResp.Set(resp)
- xapp.Logger.Debug("%s", e2SubsDelResp.String())
+ xapp.Logger.Debug("(%s) %s", e2termConn.desc, e2SubsDelResp.String())
packerr, packedMsg := e2SubsDelResp.Pack(nil)
if packerr != nil {
testError(t, "(%s) pack NOK %s", e2termConn.desc, packerr.Error())
@@ -316,7 +316,7 @@
resp.Cause.CauseVal = 4 // unspecified
e2SubsDelFail.Set(resp)
- xapp.Logger.Debug("%s", e2SubsDelFail.String())
+ xapp.Logger.Debug("(%s) %s", e2termConn.desc, e2SubsDelFail.String())
packerr, packedMsg := e2SubsDelFail.Pack(nil)
if packerr != nil {
testError(t, "(%s) pack NOK %s", e2termConn.desc, packerr.Error())
diff --git a/pkg/control/ut_stub_xapp_test.go b/pkg/control/ut_stub_xapp_test.go
index 331d14d..ac72b76 100644
--- a/pkg/control/ut_stub_xapp_test.go
+++ b/pkg/control/ut_stub_xapp_test.go
@@ -194,10 +194,10 @@
//-----------------------------------------------------------------------------
//
//-----------------------------------------------------------------------------
-func (xappConn *testingXappStub) handle_xapp_subs_resp(t *testing.T, trans *xappTransaction) int {
+func (xappConn *testingXappStub) handle_xapp_subs_resp(t *testing.T, trans *xappTransaction) uint32 {
xapp.Logger.Info("(%s) handle_xapp_subs_resp", xappConn.desc)
e2SubsResp := xapp_e2asnpacker.NewPackerSubscriptionResponse()
- var e2SubsId int
+ var e2SubsId uint32
//---------------------------------
// xapp activity: Recv Subs Resp
@@ -207,14 +207,18 @@
xappConn.DecMsgCnt()
if msg.Mtype != xapp.RICMessageTypes["RIC_SUB_RESP"] {
testError(t, "(%s) Received RIC_SUB_RESP wrong mtype expected %s got %s, error", xappConn.desc, "RIC_SUB_RESP", xapp.RicMessageTypeToName[msg.Mtype])
- return -1
+ return 0
} else if msg.Xid != trans.xid {
testError(t, "(%s) Received RIC_SUB_RESP wrong xid expected %s got %s, error", xappConn.desc, trans.xid, msg.Xid)
- return -1
+ return 0
} else {
packedData := &packer.PackedData{}
packedData.Buf = msg.Payload
- e2SubsId = msg.SubId
+ if msg.SubId > 0 {
+ e2SubsId = uint32(msg.SubId)
+ } else {
+ e2SubsId = 0
+ }
unpackerr := e2SubsResp.UnPack(packedData)
if unpackerr != nil {
@@ -230,18 +234,18 @@
}
case <-time.After(15 * time.Second):
testError(t, "(%s) Not Received RIC_SUB_RESP within 15 secs", xappConn.desc)
- return -1
+ return 0
}
- return -1
+ return 0
}
//-----------------------------------------------------------------------------
//
//-----------------------------------------------------------------------------
-func (xappConn *testingXappStub) handle_xapp_subs_fail(t *testing.T, trans *xappTransaction) int {
+func (xappConn *testingXappStub) handle_xapp_subs_fail(t *testing.T, trans *xappTransaction) uint32 {
xapp.Logger.Info("(%s) handle_xapp_subs_fail", xappConn.desc)
e2SubsFail := xapp_e2asnpacker.NewPackerSubscriptionFailure()
- var e2SubsId int
+ var e2SubsId uint32
//-------------------------------
// xapp activity: Recv Subs Fail
@@ -251,14 +255,18 @@
xappConn.DecMsgCnt()
if msg.Mtype != xapp.RICMessageTypes["RIC_SUB_FAILURE"] {
testError(t, "(%s) Received RIC_SUB_FAILURE wrong mtype expected %s got %s, error", xappConn.desc, "RIC_SUB_FAILURE", xapp.RicMessageTypeToName[msg.Mtype])
- return -1
+ return 0
} else if msg.Xid != trans.xid {
testError(t, "(%s) Received RIC_SUB_FAILURE wrong xid expected %s got %s, error", xappConn.desc, trans.xid, msg.Xid)
- return -1
+ return 0
} else {
packedData := &packer.PackedData{}
packedData.Buf = msg.Payload
- e2SubsId = msg.SubId
+ if msg.SubId > 0 {
+ e2SubsId = uint32(msg.SubId)
+ } else {
+ e2SubsId = 0
+ }
unpackerr := e2SubsFail.UnPack(packedData)
if unpackerr != nil {
@@ -274,15 +282,15 @@
}
case <-time.After(15 * time.Second):
testError(t, "(%s) Not Received RIC_SUB_FAILURE within 15 secs", xappConn.desc)
- return -1
+ return 0
}
- return -1
+ return 0
}
//-----------------------------------------------------------------------------
//
//-----------------------------------------------------------------------------
-func (xappConn *testingXappStub) handle_xapp_subs_del_req(t *testing.T, oldTrans *xappTransaction, e2SubsId int) *xappTransaction {
+func (xappConn *testingXappStub) handle_xapp_subs_del_req(t *testing.T, oldTrans *xappTransaction, e2SubsId uint32) *xappTransaction {
xapp.Logger.Info("(%s) handle_xapp_subs_del_req", xappConn.desc)
e2SubsDelReq := xapp_e2asnpacker.NewPackerSubscriptionDeleteRequest()
@@ -293,7 +301,7 @@
req := &e2ap.E2APSubscriptionDeleteRequest{}
req.RequestId.Id = 1
- req.RequestId.Seq = uint32(e2SubsId)
+ req.RequestId.Seq = e2SubsId
req.FunctionId = 1
e2SubsDelReq.Set(req)
@@ -311,7 +319,7 @@
params := &RMRParams{&xapp.RMRParams{}}
params.Mtype = xapp.RIC_SUB_DEL_REQ
- params.SubId = e2SubsId
+ params.SubId = int(e2SubsId)
params.Payload = packedMsg.Buf
params.Meid = trans.meid
params.Xid = trans.xid