RICPLT-2979 SubReq go asn into use
Change-Id: I9a47ff45543fbb87172746449cc463bdf7d37ce8
Signed-off-by: Juha Hyttinen <juha.hyttinen@nokia.com>
diff --git a/pkg/control/control.go b/pkg/control/control.go
index 18eeb4c..cfe3dfb 100755
--- a/pkg/control/control.go
+++ b/pkg/control/control.go
@@ -21,7 +21,7 @@
import (
"fmt"
- //"gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/packer"
+ "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
rtmgrhandle "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client/handle"
"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
@@ -144,7 +144,7 @@
params := &RMRParams{&xapp.RMRParams{}}
params.Mtype = trans.GetMtype()
params.SubId = int(subs.GetSubId())
- params.Xid = trans.GetXid()
+ params.Xid = ""
params.Meid = subs.GetMeid()
params.Src = ""
params.PayloadLen = payloadLen
@@ -159,7 +159,7 @@
params.Mtype = mType
params.SubId = int(subs.GetSubId())
params.Xid = trans.GetXid()
- params.Meid = subs.GetMeid()
+ params.Meid = trans.GetMeid()
params.Src = ""
params.PayloadLen = payloadLen
params.Payload = payload
@@ -171,11 +171,8 @@
func (c *Control) Consume(params *xapp.RMRParams) (err error) {
xapp.Rmr.Free(params.Mbuf)
params.Mbuf = nil
-
msg := &RMRParams{params}
-
c.msgCounter++
-
switch msg.Mtype {
case xapp.RICMessageTypes["RIC_SUB_REQ"]:
go c.handleSubscriptionRequest(msg)
@@ -192,88 +189,79 @@
default:
xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
}
+
return nil
}
func (c *Control) handleSubscriptionRequest(params *RMRParams) {
xapp.Logger.Info("SubReq from xapp: %s", params.String())
- srcAddr, srcPort, err := c.rtmgrClient.SplitSource(params.Src)
- if err != nil {
- xapp.Logger.Error("SubReq: Failed to update routing-manager. Dropping this msg. Err: %s, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
- return
- }
+ //
+ //
+ //
+ trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src),
+ params.Mtype,
+ params.Xid,
+ params.Meid,
+ false,
+ true)
- subs, err := c.registry.ReserveSubscription(RmrEndpoint{*srcAddr, *srcPort}, params.Meid)
if err != nil {
- xapp.Logger.Error("SubReq: %s, Dropping this msg.", err.Error())
+ xapp.Logger.Error("SubReq: %s, Dropping this msg. %s", err.Error(), params.String())
return
}
//
- // WIP RICPLT-2979
- //
- /*
- e2SubReq := packerif.NewPackerSubscriptionRequest()
- packedData := &packer.PackedData{}
- packedData.Buf = params.Payload
- err = e2SubReq.UnPack(packedData)
- if err != nil {
- xapp.Logger.Error("SubReq: UnPack() failed: %s", err.Error())
- }
- getErr, subReq := e2SubReq.Get()
- if getErr != nil {
- xapp.Logger.Error("SubReq: Get() failed: %s", err.Error())
- }
-
-
- subReq.RequestId.Seq = uint32(subs.GetSubId())
-
- err = e2SubReq.Set(subReq)
- if err != nil {
- xapp.Logger.Error("SubReq: Set() failed: %s", err.Error())
- return
- }
- err, packedData = e2SubReq.Pack(nil)
- if err != nil {
- xapp.Logger.Error("SubReq: Pack() failed: %s", err.Error())
- return
- }
-
- params.PayloadLen = len(packedData.Buf)
- params.Payload = packedData.Buf
- */
//
//
- //
-
- params.SubId = int(subs.GetSubId())
- err = c.e2ap.SetSubscriptionRequestSequenceNumber(params.Payload, subs.GetSubId())
+ trans.SubReqMsg, err = c.e2ap.UnpackSubscriptionRequest(params.Payload)
if err != nil {
- xapp.Logger.Error("SubReq: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, %s", err, params.String())
- c.registry.DelSubscription(subs.Seq)
+ xapp.Logger.Error("SubReq: %s Dropping this msg. %s", err.Error(), trans)
+ trans.Release()
return
}
- // Create transatcion record for every subscription request
- var forwardRespToXapp bool = true
- var responseReceived bool = false
- trans, err := c.tracker.TrackTransaction(RmrEndpoint{*srcAddr, *srcPort}, params, responseReceived, forwardRespToXapp)
+ //
+ //
+ //
+ subs, err := c.registry.ReserveSubscription(&trans.RmrEndpoint, trans.Meid)
if err != nil {
- xapp.Logger.Error("SubReq: %s, Dropping this msg.", err.Error())
- c.registry.DelSubscription(subs.Seq)
+ xapp.Logger.Error("SubReq: %s, Dropping this msg. %s", err.Error(), trans)
+ trans.Release()
return
}
err = subs.SetTransaction(trans)
if err != nil {
- xapp.Logger.Error("SubReq: %s, Dropping this msg.", err.Error())
+ xapp.Logger.Error("SubReq: %s, Dropping this msg. %s", err.Error(), trans)
c.registry.DelSubscription(subs.Seq)
trans.Release()
return
}
- c.rmrSend("SubReq to E2T", subs, trans, params.Payload, params.PayloadLen)
+ trans.SubReqMsg.RequestId.Seq = uint32(subs.GetSubId())
+
+ //
+ // TODO: subscription create is in fact owned by subscription and not transaction.
+ // Transaction is toward xapp while Subscription is toward ran.
+ // In merge several xapps may wake transactions, while only one subscription
+ // toward ran occurs -> subscription owns subscription creation toward ran
+ //
+ // This is intermediate solution while improving message handling
+ //
+ packedData, err := c.e2ap.PackSubscriptionRequest(trans.SubReqMsg)
+ if err != nil {
+ xapp.Logger.Error("SubReq: %s for trans %s", err.Error(), trans)
+ c.registry.DelSubscription(subs.Seq)
+ trans.Release()
+ return
+ }
+
+ //Optimize and store packed message to be sent (for retransmission). Again owned by subscription?
+ trans.Payload = packedData.Buf
+ trans.PayloadLen = len(packedData.Buf)
+
+ c.rmrSend("SubReq to E2T", subs, trans, packedData.Buf, len(packedData.Buf))
c.timerMap.StartTimer("RIC_SUB_REQ", int(subs.Seq), subReqTime, FirstTry, c.handleSubscriptionRequestTimer)
xapp.Logger.Debug("SubReq: Debugging trans table = %v", c.tracker.transactionXappTable)
@@ -384,100 +372,109 @@
}
if tryCount < maxSubReqTryCount {
- xapp.Logger.Info("SubReq timeout: Resending SubReq to E2T: Mtype: %v, SubId: %v, Xid %s, Meid %v", trans.OrigParams.Mtype, subs.GetSubId(), trans.GetXid(), subs.GetMeid())
+ xapp.Logger.Info("SubReq timeout: Resending SubReq to E2T: Mtype: %v, SubId: %v, Xid %s, Meid %v", trans.GetMtype(), subs.GetSubId(), trans.GetXid(), trans.GetMeid())
trans.RetryTransaction()
- c.rmrSend("SubReq(SubReq timer) to E2T", subs, trans, trans.OrigParams.Payload, trans.OrigParams.PayloadLen)
+ c.rmrSend("SubReq(SubReq timer) to E2T", subs, trans, trans.Payload, trans.PayloadLen)
tryCount++
c.timerMap.StartTimer("RIC_SUB_REQ", int(subs.GetSubId()), subReqTime, tryCount, c.handleSubscriptionRequestTimer)
return
}
- var subDelReqPayload []byte
- subDelReqPayload, err := c.e2ap.PackSubscriptionDeleteRequest(trans.OrigParams.Payload, subs.GetSubId())
- if err != nil {
- xapp.Logger.Error("SubReq timeout: Packing SubDelReq failed. Err: %v", err)
- return
- }
-
- // Cancel failed subscription
- params := &RMRParams{&xapp.RMRParams{}}
- params.Mtype = 12020 // RIC SUBSCRIPTION DELETE
- params.SubId = int(subs.GetSubId())
- params.Xid = trans.GetXid()
- params.Meid = subs.GetMeid()
- params.Src = trans.OrigParams.Src
- params.PayloadLen = len(subDelReqPayload)
- params.Payload = subDelReqPayload
- params.Mbuf = nil
-
// Delete CREATE transaction
trans.Release()
// Create DELETE transaction (internal and no messages toward xapp)
- var forwardRespToXapp bool = false
- var respReceived bool = false
- deltrans, err := c.tracker.TrackTransaction(trans.RmrEndpoint, params, respReceived, forwardRespToXapp)
+ deltrans, err := c.tracker.TrackTransaction(&trans.RmrEndpoint,
+ 12020, // RIC SUBSCRIPTION DELETE
+ trans.GetXid(),
+ trans.GetMeid(),
+ false,
+ false)
+
if err != nil {
xapp.Logger.Error("SubReq timeout: %s, Dropping this msg.", err.Error())
+ //TODO improve error handling. Important at least in merge
+ c.registry.DelSubscription(subs.GetSubId())
return
}
+ deltrans.SubDelReqMsg = &e2ap.E2APSubscriptionDeleteRequest{}
+ deltrans.SubDelReqMsg.RequestId.Id = trans.SubReqMsg.RequestId.Id
+ deltrans.SubDelReqMsg.RequestId.Seq = uint32(subs.GetSubId())
+ deltrans.SubDelReqMsg.FunctionId = trans.SubReqMsg.FunctionId
+ packedData, err := c.e2ap.PackSubscriptionDeleteRequest(deltrans.SubDelReqMsg)
+ if err != nil {
+ xapp.Logger.Error("SubReq timeout: Packing SubDelReq failed. Err: %v", err)
+ //TODO improve error handling. Important at least in merge
+ deltrans.Release()
+ c.registry.DelSubscription(subs.GetSubId())
+ return
+ }
+ deltrans.PayloadLen = len(packedData.Buf)
+ deltrans.Payload = packedData.Buf
+
err = subs.SetTransaction(deltrans)
if err != nil {
xapp.Logger.Error("SubReq timeout: %s, Dropping this msg.", err.Error())
+ //TODO improve error handling. Important at least in merge
deltrans.Release()
return
}
- c.rmrSend("SubDelReq(SubReq timer) to E2T", subs, deltrans, deltrans.OrigParams.Payload, deltrans.OrigParams.PayloadLen)
+ c.rmrSend("SubDelReq(SubReq timer) to E2T", subs, deltrans, deltrans.Payload, deltrans.PayloadLen)
c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subDelReqTime, FirstTry, c.handleSubscriptionDeleteRequestTimer)
return
}
func (c *Control) handleSubscriptionDeleteRequest(params *RMRParams) {
+ var subs *Subscription
+
xapp.Logger.Info("SubDelReq from xapp: %s", params.String())
- srcAddr, srcPort, err := c.rtmgrClient.SplitSource(params.Src)
+ trans, err := c.tracker.TrackTransaction(NewRmrEndpoint(params.Src),
+ params.Mtype,
+ params.Xid,
+ params.Meid,
+ false,
+ true)
+
if err != nil {
- xapp.Logger.Error("SubDelReq: Failed to update routing-manager. Dropping this msg. Err: %s, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
+ xapp.Logger.Error("SubDelReq: %s, Dropping this msg. %s", err.Error(), params.String())
return
}
payloadSeqNum, err := c.e2ap.GetSubscriptionDeleteRequestSequenceNumber(params.Payload)
- if err != nil {
- xapp.Logger.Error("SubDelReq: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, SubId: %v, Xid: %s, Payload %X", err, params.SubId, params.Xid, params.Payload)
- return
+ if err == nil {
+ subs = c.registry.GetSubscription(payloadSeqNum)
}
- xapp.Logger.Info("SubDelReq: Received payloadSeqNum: %v", payloadSeqNum)
+ if subs == nil && params.SubId > 0 {
+ subs = c.registry.GetSubscription(uint16(params.SubId))
+ }
- subs := c.registry.GetSubscription(payloadSeqNum)
if subs == nil {
- xapp.Logger.Error("SubDelReq: Not valid sequence number. Dropping this msg. SubId: %v, Xid: %s", params.SubId, params.Xid)
+ xapp.Logger.Error("SubDelReq: Not valid subscription found payloadSeqNum: %d. Dropping this msg. %s", payloadSeqNum, trans)
+ trans.Release()
return
}
+ xapp.Logger.Info("SubDelReq: subscription found payloadSeqNum: %d. %s", payloadSeqNum, trans)
- var forwardRespToXapp bool = true
- var respReceived bool = false
- trans, err := c.tracker.TrackTransaction(RmrEndpoint{*srcAddr, *srcPort}, params, respReceived, forwardRespToXapp)
- if err != nil {
- xapp.Logger.Error("SubDelReq: %s, Dropping this msg.", err.Error())
- return
- }
+ trans.PayloadLen = params.PayloadLen
+ trans.Payload = params.Payload
err = subs.SetTransaction(trans)
if err != nil {
- xapp.Logger.Error("SubDelReq: %s, Dropping this msg.", err.Error())
+ xapp.Logger.Error("SubDelReq: %s, Dropping this msg. %s", err.Error(), trans)
trans.Release()
return
}
subs.UnConfirmed()
- c.rmrSend("SubDelReq to E2T", subs, trans, trans.OrigParams.Payload, trans.OrigParams.PayloadLen)
+ c.rmrSend("SubDelReq to E2T", subs, trans, trans.Payload, trans.PayloadLen)
c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subDelReqTime, FirstTry, c.handleSubscriptionDeleteRequestTimer)
return
@@ -559,7 +556,7 @@
}
if trans.ForwardRespToXapp == true {
var subDelRespPayload []byte
- subDelRespPayload, err = c.e2ap.PackSubscriptionDeleteResponse(trans.OrigParams.Payload, subs.GetSubId())
+ subDelRespPayload, err = c.e2ap.PackSubscriptionDeleteResponseFromSubDelReq(trans.Payload, subs.GetSubId())
if err != nil {
xapp.Logger.Error("SubDelFail:Packing SubDelResp failed. Err: %v", err)
return
@@ -601,12 +598,12 @@
}
if tryCount < maxSubDelReqTryCount {
- xapp.Logger.Info("SubDelReq timeout: Resending SubDelReq to E2T: Mtype: %v, SubId: %v, Xid %s, Meid %v", trans.OrigParams.Mtype, subs.GetSubId(), trans.GetXid(), subs.GetMeid())
+ xapp.Logger.Info("SubDelReq timeout: Resending SubDelReq to E2T: Mtype: %v, SubId: %v, Xid %s, Meid %v", trans.GetMtype(), subs.GetSubId(), trans.GetXid(), trans.GetMeid())
// Set possible to handle new response for the subId
trans.RetryTransaction()
- c.rmrSend("SubDelReq(SubDelReq timer) to E2T", subs, trans, trans.OrigParams.Payload, trans.OrigParams.PayloadLen)
+ c.rmrSend("SubDelReq(SubDelReq timer) to E2T", subs, trans, trans.Payload, trans.PayloadLen)
tryCount++
c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subReqTime, tryCount, c.handleSubscriptionDeleteRequestTimer)
@@ -615,9 +612,9 @@
if trans.ForwardRespToXapp == true {
var subDelRespPayload []byte
- subDelRespPayload, err := c.e2ap.PackSubscriptionDeleteResponse(trans.OrigParams.Payload, subs.GetSubId())
+ subDelRespPayload, err := c.e2ap.PackSubscriptionDeleteResponseFromSubDelReq(trans.Payload, subs.GetSubId())
if err != nil {
- xapp.Logger.Error("SubDelReq timeout: Unable to pack payload. Dropping this this msg. Err: %v, SubId: %v, Xid: %s, Payload %x", err, subs.GetSubId(), trans.GetXid(), trans.OrigParams.Payload)
+ xapp.Logger.Error("SubDelReq timeout: Unable to pack payload. Dropping this this msg. Err: %v, SubId: %v, Xid: %s, Payload %x", err, subs.GetSubId(), trans.GetXid(), trans.Payload)
return
}
diff --git a/pkg/control/e2ap.go b/pkg/control/e2ap.go
index 084c7eb..5cabe6e 100644
--- a/pkg/control/e2ap.go
+++ b/pkg/control/e2ap.go
@@ -41,27 +41,6 @@
/* RICsubscriptionRequest */
-// Used by e2t test stub
-func (c *E2ap) GetSubscriptionRequestSequenceNumber(payload []byte) (subId uint16, err error) {
- cptr := unsafe.Pointer(&payload[0])
- cret := C.e2ap_get_ric_subscription_request_sequence_number(cptr, C.size_t(len(payload)))
- if cret < 0 {
- return 0, fmt.Errorf("e2ap wrapper is unable to get Subscirption Request Sequence Number due to wrong or invalid payload. Erroxappde: %v", cret)
- }
- subId = uint16(cret)
- return
-}
-
-// Used by submgr, xapp test stub
-func (c *E2ap) SetSubscriptionRequestSequenceNumber(payload []byte, newSubscriptionid uint16) (err error) {
- cptr := unsafe.Pointer(&payload[0])
- size := C.e2ap_set_ric_subscription_request_sequence_number(cptr, C.size_t(len(payload)), C.long(newSubscriptionid))
- if size < 0 {
- return fmt.Errorf("e2ap wrapper is unable to set Subscription Request Sequence Number due to wrong or invalid payload. Erroxappde: %v", size)
- }
- return
-}
-
// Used by submgr, xapp test stub
func (c *E2ap) GetSubscriptionResponseSequenceNumber(payload []byte) (subId uint16, err error) {
cptr := unsafe.Pointer(&payload[0])
@@ -176,61 +155,96 @@
}
// Used by submgr
-func (c *E2ap) PackSubscriptionDeleteResponse(payload []byte, newSubscriptionid uint16) (newPayload []byte, err error) {
- e2SubDelReq := packerif.NewPackerSubscriptionDeleteRequest()
- packedData := &packer.PackedData{}
- packedData.Buf = payload
- err = e2SubDelReq.UnPack(packedData)
+func (c *E2ap) PackSubscriptionDeleteResponseFromSubDelReq(payload []byte, newSubscriptionid uint16) (newPayload []byte, err error) {
+
+ subDelReq, err := c.UnpackSubscriptionDeleteRequest(payload)
if err != nil {
- return make([]byte, 0), fmt.Errorf("PackSubDelResp: UnPack() failed: %s", err.Error())
- }
- getErr, subDelReq := e2SubDelReq.Get()
- if getErr != nil {
- return make([]byte, 0), fmt.Errorf("PackSubDelResp: Get() failed: %s", getErr.Error())
+ return make([]byte, 0), fmt.Errorf("PackSubDelRespFromSubDelReq: SubDelReq unpack failed: %s", err.Error())
}
- e2SubDelResp := packerif.NewPackerSubscriptionDeleteResponse()
- subDelResp := e2ap.E2APSubscriptionDeleteResponse{}
+ subDelResp := &e2ap.E2APSubscriptionDeleteResponse{}
subDelResp.RequestId.Id = subDelReq.RequestId.Id
subDelResp.RequestId.Seq = uint32(newSubscriptionid)
subDelResp.FunctionId = subDelReq.FunctionId
- err = e2SubDelResp.Set(&subDelResp)
+
+ packedData, err := c.PackSubscriptionDeleteResponse(subDelResp)
if err != nil {
- return make([]byte, 0), fmt.Errorf("PackSubDelResp: Set() failed: %s", err.Error())
- }
- err, packedData = e2SubDelResp.Pack(nil)
- if err != nil {
- return make([]byte, 0), fmt.Errorf("PackSubDelResp: Pack() failed: %s", err.Error())
+ return make([]byte, 0), fmt.Errorf("PackSubDelRespFromSubDelReq: SubDelResp pack failed: %s", err.Error())
}
return packedData.Buf, nil
}
-// Used by submgr
-func (c *E2ap) PackSubscriptionDeleteRequest(payload []byte, newSubscriptionid uint16) (newPayload []byte, err error) {
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+func (c *E2ap) UnpackSubscriptionRequest(payload []byte) (*e2ap.E2APSubscriptionRequest, error) {
e2SubReq := packerif.NewPackerSubscriptionRequest()
packedData := &packer.PackedData{}
packedData.Buf = payload
- err = e2SubReq.UnPack(packedData)
+ err := e2SubReq.UnPack(packedData)
if err != nil {
- return make([]byte, 0), fmt.Errorf("PackSubDelReq: UnPack() failed: %s", err.Error())
+ return nil, err
}
- getErr, subReq := e2SubReq.Get()
- if getErr != nil {
- return make([]byte, 0), fmt.Errorf("PackSubDelReq: Get() failed: %s", getErr.Error())
+ err, subReq := e2SubReq.Get()
+ if err != nil {
+ return nil, err
}
+ return subReq, nil
+}
- e2SubDel := packerif.NewPackerSubscriptionDeleteRequest()
- subDelReq := e2ap.E2APSubscriptionDeleteRequest{}
- subDelReq.RequestId.Id = subReq.RequestId.Id
- subDelReq.RequestId.Seq = uint32(newSubscriptionid)
- subDelReq.FunctionId = subReq.FunctionId
- err = e2SubDel.Set(&subDelReq)
+func (c *E2ap) PackSubscriptionRequest(req *e2ap.E2APSubscriptionRequest) (*packer.PackedData, error) {
+ e2SubReq := packerif.NewPackerSubscriptionRequest()
+ err := e2SubReq.Set(req)
if err != nil {
- return make([]byte, 0), fmt.Errorf("PackSubDelReq: Set() failed: %s", err.Error())
+ return nil, err
}
- err, packedData = e2SubDel.Pack(nil)
+ err, packedData := e2SubReq.Pack(nil)
if err != nil {
- return make([]byte, 0), fmt.Errorf("PackSubDelReq: Pack() failed: %s", err.Error())
+ return nil, err
}
- return packedData.Buf, nil
+ return packedData, nil
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+func (c *E2ap) UnpackSubscriptionDeleteRequest(payload []byte) (*e2ap.E2APSubscriptionDeleteRequest, error) {
+ e2SubDelReq := packerif.NewPackerSubscriptionDeleteRequest()
+ packedData := &packer.PackedData{}
+ packedData.Buf = payload
+ err := e2SubDelReq.UnPack(packedData)
+ if err != nil {
+ return nil, err
+ }
+ err, subReq := e2SubDelReq.Get()
+ if err != nil {
+ return nil, err
+ }
+ return subReq, nil
+}
+
+func (c *E2ap) PackSubscriptionDeleteRequest(req *e2ap.E2APSubscriptionDeleteRequest) (*packer.PackedData, error) {
+ e2SubDelReq := packerif.NewPackerSubscriptionDeleteRequest()
+ err := e2SubDelReq.Set(req)
+ if err != nil {
+ return nil, err
+ }
+ err, packedData := e2SubDelReq.Pack(nil)
+ if err != nil {
+ return nil, err
+ }
+ return packedData, nil
+}
+
+func (c *E2ap) PackSubscriptionDeleteResponse(req *e2ap.E2APSubscriptionDeleteResponse) (*packer.PackedData, error) {
+ e2SubDelResp := packerif.NewPackerSubscriptionDeleteResponse()
+ err := e2SubDelResp.Set(req)
+ if err != nil {
+ return nil, err
+ }
+ err, packedData := e2SubDelResp.Pack(nil)
+ if err != nil {
+ return nil, err
+ }
+ return packedData, nil
}
diff --git a/pkg/control/main_test.go b/pkg/control/main_test.go
index dffff77..32ea09f 100644
--- a/pkg/control/main_test.go
+++ b/pkg/control/main_test.go
@@ -68,6 +68,7 @@
type testingRmrControl struct {
testingControl
rmrClientTest *xapp.RMRClient
+ active bool
}
func (tc *testingRmrControl) RmrSend(params *RMRParams) (err error) {
@@ -93,6 +94,7 @@
func initTestingRmrControl(desc string, rtfile string, port string, stat string, consumer xapp.MessageConsumer) testingRmrControl {
tc := testingRmrControl{}
+ tc.active = false
tc.testingControl = initTestingControl(desc, rtfile, port)
tc.rmrClientTest = xapp.NewRMRClientWithParams("tcp:"+port, 4096, 1, stat)
tc.rmrClientTest.SetReadyCB(tc.ReadyCB, nil)
@@ -153,6 +155,12 @@
params.Mbuf = nil
msg := &RMRParams{params}
+ if params.Mtype == 55555 {
+ xapp.Logger.Info("(%s) Testing message ignore %s", tc.desc, msg.String())
+ tc.active = true
+ return
+ }
+
if strings.Contains(msg.Xid, tc.desc) {
xapp.Logger.Info("(%s) Consume %s", tc.desc, msg.String())
tc.rmrConChan <- msg
@@ -182,6 +190,13 @@
xapp.Rmr.Free(params.Mbuf)
params.Mbuf = nil
msg := &RMRParams{params}
+
+ if params.Mtype == 55555 {
+ xapp.Logger.Info("(%s) Testing message ignore %s", tc.desc, msg.String())
+ tc.active = true
+ return
+ }
+
xapp.Logger.Info("(%s) Consume %s", tc.desc, msg.String())
tc.rmrConChan <- msg
return
@@ -338,6 +353,7 @@
mse|12022,localhost:15560|-1|localhost:14560
mse|12021,localhost:14560|-1|localhost:13660;localhost:13560
mse|12022,localhost:14560|-1|localhost:13660;localhost:13560
+mse|55555|-1|localhost:13660;localhost:13560,localhost:15560
newrt|end
`
@@ -437,10 +453,36 @@
e2termConn = createNewE2termControl("e2termstub", e2termrtfilename, "15560", "RMRE2TERMSTUB")
//---------------------------------
- // Stupid sleep to try improve robustness
- // due: http handler and rmr routes init delays
+ // Testing message sending
//---------------------------------
- <-time.After(2 * time.Second)
+ var dummyBuf []byte = make([]byte, 100)
+
+ params := &RMRParams{&xapp.RMRParams{}}
+ params.Mtype = 55555
+ params.SubId = -1
+ params.Payload = dummyBuf
+ params.PayloadLen = 100
+ params.Meid = &xapp.RMRMeid{RanName: "NONEXISTINGRAN"}
+ params.Xid = "THISISTESTFORSTUBS"
+ params.Mbuf = nil
+
+ status := false
+ i := 1
+ for ; i <= 10 && status == false; i++ {
+ xapp.Rmr.Send(params.RMRParams, false)
+ if e2termConn.active == true && xappConn1.active == true && xappConn2.active == true {
+ status = true
+ break
+ } else {
+ xapp.Logger.Info("Sleep 0.5 secs and try routes again")
+ time.Sleep(500 * time.Millisecond)
+ }
+ }
+
+ if status == false {
+ xapp.Logger.Error("Could not initialize routes")
+ os.Exit(1)
+ }
//---------------------------------
//
diff --git a/pkg/control/registry.go b/pkg/control/registry.go
index 2c5bd8c..0970a3a 100644
--- a/pkg/control/registry.go
+++ b/pkg/control/registry.go
@@ -42,7 +42,7 @@
}
// Reserves and returns the next free sequence number
-func (r *Registry) ReserveSubscription(endPoint RmrEndpoint, meid *xapp.RMRMeid) (*Subscription, error) {
+func (r *Registry) ReserveSubscription(endPoint *RmrEndpoint, meid *xapp.RMRMeid) (*Subscription, error) {
// Check is current SequenceNumber valid
// Allocate next SequenceNumber value and retry N times
r.mutex.Lock()
@@ -60,7 +60,7 @@
subs := &Subscription{
Seq: sequenceNumber,
Active: false,
- RmrEndpoint: endPoint,
+ RmrEndpoint: *endPoint,
Meid: meid,
Trans: nil,
}
diff --git a/pkg/control/tracker.go b/pkg/control/tracker.go
index 869e8ca..75127a7 100644
--- a/pkg/control/tracker.go
+++ b/pkg/control/tracker.go
@@ -21,6 +21,7 @@
import (
"fmt"
+ "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
"sync"
)
@@ -36,15 +37,26 @@
t.transactionXappTable = make(map[TransactionXappKey]*Transaction)
}
-func (t *Tracker) TrackTransaction(endpoint RmrEndpoint, params *RMRParams, respReceived bool, forwardRespToXapp bool) (*Transaction, error) {
+func (t *Tracker) TrackTransaction(
+ endpoint *RmrEndpoint,
+ mtype int,
+ xid string,
+ meid *xapp.RMRMeid,
+ respReceived bool,
+ forwardRespToXapp bool) (*Transaction, error) {
+
+ if endpoint == nil {
+ err := fmt.Errorf("Tracker: No valid endpoint given")
+ return nil, err
+ }
trans := &Transaction{
tracker: nil,
Subs: nil,
- RmrEndpoint: endpoint,
- Mtype: params.Mtype,
- Xid: params.Xid,
- OrigParams: params,
+ RmrEndpoint: *endpoint,
+ Mtype: mtype,
+ Xid: xid,
+ Meid: meid,
RespReceived: respReceived,
ForwardRespToXapp: forwardRespToXapp,
}
@@ -52,7 +64,7 @@
t.mutex.Lock()
defer t.mutex.Unlock()
- xappkey := TransactionXappKey{endpoint, params.Xid}
+ xappkey := TransactionXappKey{*endpoint, xid}
if _, ok := t.transactionXappTable[xappkey]; ok {
err := fmt.Errorf("Tracker: Similar transaction with xappkey %s is ongoing, transaction %s not created ", xappkey, trans)
return nil, err
diff --git a/pkg/control/transaction.go b/pkg/control/transaction.go
index 2f4acab..40c9e4d 100644
--- a/pkg/control/transaction.go
+++ b/pkg/control/transaction.go
@@ -20,6 +20,8 @@
package control
import (
+ "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
+ "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
"strconv"
"sync"
)
@@ -41,12 +43,16 @@
//-----------------------------------------------------------------------------
type Transaction struct {
mutex sync.Mutex
- tracker *Tracker // tracker instance
- Subs *Subscription
- RmrEndpoint RmrEndpoint
- Mtype int
- Xid string // xapp xid in req
- OrigParams *RMRParams // request orginal params
+ tracker *Tracker //tracker instance
+ Subs *Subscription //related subscription
+ RmrEndpoint RmrEndpoint //xapp endpoint
+ Mtype int //type of initiating message
+ Xid string //xapp xid in req
+ Meid *xapp.RMRMeid //meid transaction related
+ SubReqMsg *e2ap.E2APSubscriptionRequest //SubReq TODO: maybe own transactions per type
+ SubDelReqMsg *e2ap.E2APSubscriptionDeleteRequest //SubDelReq TODO: maybe own transactions per type
+ Payload []byte //packed message to optimize retransmissions
+ PayloadLen int //packed message len to optimize retransmissions
RespReceived bool
ForwardRespToXapp bool
}
@@ -73,6 +79,15 @@
return t.Mtype
}
+func (t *Transaction) GetMeid() *xapp.RMRMeid {
+ t.mutex.Lock()
+ defer t.mutex.Unlock()
+ if t.Meid != nil {
+ return t.Meid
+ }
+ return nil
+}
+
func (t *Transaction) GetSrc() string {
t.mutex.Lock()
defer t.mutex.Unlock()