RICPLT-2961 Drop retransmitted messages with same transaction id
Change-Id: Iaccff14cd9bbbf0b029a8153e9665209b68f65d3
Signed-off-by: Juha Hyttinen <juha.hyttinen@nokia.com>
diff --git a/pkg/control/control.go b/pkg/control/control.go
index b398daa..cb085de 100644
--- a/pkg/control/control.go
+++ b/pkg/control/control.go
@@ -164,26 +164,27 @@
err := c.e2ap.SetSubscriptionRequestSequenceNumber(params.Payload, newSubId)
if err != nil {
xapp.Logger.Error("SubReq: Unable to set Sequence Number in Payload. Dropping this msg. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
+ c.registry.releaseSequenceNumber(newSubId)
return
}
srcAddr, srcPort, err := c.rtmgrClient.SplitSource(params.Src)
if err != nil {
xapp.Logger.Error("SubReq: Failed to update routing-manager. Dropping this msg. Err: %s, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
+ c.registry.releaseSequenceNumber(newSubId)
return
}
/* Create transatcion records for every subscription request */
- xactKey := TransactionKey{newSubId, CREATE}
- xactValue := Transaction{*srcAddr, *srcPort, params}
- err = c.tracker.TrackTransaction(xactKey, xactValue)
+ transaction, err := c.tracker.TrackTransaction(newSubId, CREATE, *srcAddr, *srcPort, params)
if err != nil {
xapp.Logger.Error("SubReq: Failed to create transaction record. Dropping this msg. Err: %v SubId: %v, Xid: %s", err, params.SubId, params.Xid)
+ c.registry.releaseSequenceNumber(newSubId)
return
}
/* Update routing manager about the new subscription*/
- subRouteAction := SubRouteInfo{CREATE, *srcAddr, *srcPort, newSubId}
+ subRouteAction := transaction.SubRouteInfo()
xapp.Logger.Info("SubReq: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid)
err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
@@ -225,13 +226,12 @@
c.timerMap.StopTimer("RIC_SUB_REQ", int(payloadSeqNum))
c.registry.setSubscriptionToConfirmed(payloadSeqNum)
- var transaction Transaction
- transaction, err = c.tracker.RetriveTransaction(payloadSeqNum, CREATE)
+ transaction, err := c.tracker.RetriveTransaction(payloadSeqNum, CREATE)
if err != nil {
xapp.Logger.Error("SubResp: Failed to retrive transaction record. Dropping this msg. Err: %v, SubId: %v", err, params.SubId)
return
}
- xapp.Logger.Info("SubResp: SubId: %v, from address: %v:%v. Retrieved old subId", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort)
+ xapp.Logger.Info("SubResp: SubId: %v, from address: %v:%v. Retrieved old subId", int(payloadSeqNum), transaction.Xappkey.Addr, transaction.Xappkey.Port)
params.SubId = int(payloadSeqNum)
params.Xid = transaction.OrigParams.Xid
@@ -242,7 +242,7 @@
xapp.Logger.Error("SubResp: Failed to send response to xApp. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
}
- xapp.Logger.Info("SubResp: SubId: %v, from address: %v:%v. Deleting transaction record", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort)
+ xapp.Logger.Info("SubResp: SubId: %v, from address: %v:%v. Deleting transaction record", int(payloadSeqNum), transaction.Xappkey.Addr, transaction.Xappkey.Port)
transaction, err = c.tracker.completeTransaction(payloadSeqNum, CREATE)
if err != nil {
xapp.Logger.Error("SubResp: Failed to delete transaction record. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
@@ -265,13 +265,12 @@
c.timerMap.StopTimer("RIC_SUB_REQ", int(payloadSeqNum))
- var transaction Transaction
- transaction, err = c.tracker.RetriveTransaction(payloadSeqNum, CREATE)
+ transaction, err := c.tracker.RetriveTransaction(payloadSeqNum, CREATE)
if err != nil {
xapp.Logger.Error("SubFail: Failed to retrive transaction record. Dropping this msg. Err: %v, SubId: %v", err, params.SubId)
return
}
- xapp.Logger.Info("SubFail: SubId: %v, from address: %v:%v. Forwarding response to xApp", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort)
+ xapp.Logger.Info("SubFail: SubId: %v, from address: %v:%v. Forwarding response to xApp", int(payloadSeqNum), transaction.Xappkey.Addr, transaction.Xappkey.Port)
params.SubId = int(payloadSeqNum)
params.Xid = transaction.OrigParams.Xid
@@ -285,7 +284,7 @@
time.Sleep(3 * time.Second)
xapp.Logger.Info("SubFail: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid)
- subRouteAction := SubRouteInfo{CREATE, transaction.XappInstanceAddress, transaction.XappPort, payloadSeqNum}
+ subRouteAction := transaction.SubRouteInfo()
err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
if err != nil {
xapp.Logger.Error("SubFail: Failed to update routing manager. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
@@ -348,10 +347,10 @@
time.Sleep(3 * time.Second)
- xapp.Logger.Info("SendSubFail: SubId: %v, from address: %v:%v. Deleting transaction record", int(subId), transaction.XappInstanceAddress, transaction.XappPort)
+ xapp.Logger.Info("SendSubFail: SubId: %v, from address: %v:%v. Deleting transaction record", int(subId), transaction.Xappkey.Addr, transaction.Xappkey.Port)
xapp.Logger.Info("SubReqTimer: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid)
- subRouteAction := SubRouteInfo{DELETE, transaction.XappInstanceAddress, transaction.XappPort, subId}
+ subRouteAction := SubRouteInfo{DELETE, transaction.Xappkey.Addr, transaction.Xappkey.Port, subId}
err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
if err != nil {
xapp.Logger.Error("SendSubFail: Failed to update routing manager %v. SubId: %v, Xid: %s", err, params.SubId, params.Xid)
@@ -409,7 +408,7 @@
if c.registry.IsValidSequenceNumber(payloadSeqNum) {
c.registry.deleteSubscription(payloadSeqNum)
- err = c.trackDeleteTransaction(params, payloadSeqNum)
+ _, err = c.trackDeleteTransaction(params, payloadSeqNum)
if err != nil {
xapp.Logger.Error("SubDelReq: Failed to create transaction record. Dropping this msg. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
return
@@ -429,14 +428,12 @@
return
}
-func (c *Control) trackDeleteTransaction(params *xapp.RMRParams, payloadSeqNum uint16) (err error) {
+func (c *Control) trackDeleteTransaction(params *xapp.RMRParams, payloadSeqNum uint16) (transaction *Transaction, err error) {
srcAddr, srcPort, err := c.rtmgrClient.SplitSource(params.Src)
if err != nil {
xapp.Logger.Error("SubDelReq: Failed to update routing-manager. Err: %s, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
}
- xactKey := TransactionKey{payloadSeqNum, DELETE}
- xactValue := Transaction{*srcAddr, *srcPort, params}
- err = c.tracker.TrackTransaction(xactKey, xactValue)
+ transaction, err = c.tracker.TrackTransaction(payloadSeqNum, DELETE, *srcAddr, *srcPort, params)
return
}
@@ -454,13 +451,12 @@
c.timerMap.StopTimer("RIC_SUB_DEL_REQ", int(payloadSeqNum))
- var transaction Transaction
- transaction, err = c.tracker.RetriveTransaction(payloadSeqNum, DELETE)
+ transaction, err := c.tracker.RetriveTransaction(payloadSeqNum, DELETE)
if err != nil {
xapp.Logger.Error("SubDelResp: Failed to retrive transaction record. Dropping this msg. Err: %v, SubId: %v", err, params.SubId)
return
}
- xapp.Logger.Info("SubDelResp: SubId: %v, from address: %v:%v. Forwarding response to xApp", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort)
+ xapp.Logger.Info("SubDelResp: SubId: %v, from address: %v:%v. Forwarding response to xApp", int(payloadSeqNum), transaction.Xappkey.Addr, transaction.Xappkey.Port)
params.SubId = int(payloadSeqNum)
params.Xid = transaction.OrigParams.Xid
@@ -474,7 +470,7 @@
time.Sleep(3 * time.Second)
xapp.Logger.Info("SubDelResp: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid)
- subRouteAction := SubRouteInfo{DELETE, transaction.XappInstanceAddress, transaction.XappPort, payloadSeqNum}
+ subRouteAction := SubRouteInfo{DELETE, transaction.Xappkey.Addr, transaction.Xappkey.Port, payloadSeqNum}
err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
if err != nil {
xapp.Logger.Error("SubDelResp: Failed to update routing manager. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
@@ -509,13 +505,12 @@
c.timerMap.StopTimer("RIC_SUB_DEL_REQ", int(payloadSeqNum))
- var transaction Transaction
- transaction, err = c.tracker.RetriveTransaction(payloadSeqNum, DELETE)
+ transaction, err := c.tracker.RetriveTransaction(payloadSeqNum, DELETE)
if err != nil {
xapp.Logger.Error("SubDelFail: Failed to retrive transaction record. Dropping msg. Err %v, SubId: %v", err, params.SubId)
return
}
- xapp.Logger.Info("SubDelFail: SubId: %v, from address: %v:%v. Forwarding response to xApp", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort)
+ xapp.Logger.Info("SubDelFail: SubId: %v, from address: %v:%v. Forwarding response to xApp", int(payloadSeqNum), transaction.Xappkey.Addr, transaction.Xappkey.Port)
params.SubId = int(payloadSeqNum)
params.Xid = transaction.OrigParams.Xid
@@ -529,7 +524,7 @@
time.Sleep(3 * time.Second)
xapp.Logger.Info("SubDelFail: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid)
- subRouteAction := SubRouteInfo{DELETE, transaction.XappInstanceAddress, transaction.XappPort, payloadSeqNum}
+ subRouteAction := SubRouteInfo{DELETE, transaction.Xappkey.Addr, transaction.Xappkey.Port, payloadSeqNum}
c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
if err != nil {
xapp.Logger.Error("SubDelFail: Failed to update routing manager. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
@@ -592,10 +587,10 @@
time.Sleep(3 * time.Second)
- xapp.Logger.Info("SendSubDelFail: SubId: %v, from address: %v:%v. Deleting transaction record", int(subId), transaction.XappInstanceAddress, transaction.XappPort)
+ xapp.Logger.Info("SendSubDelFail: SubId: %v, from address: %v:%v. Deleting transaction record", int(subId), transaction.Xappkey.Addr, transaction.Xappkey.Port)
xapp.Logger.Info("SendSubDelFail: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid)
- subRouteAction := SubRouteInfo{DELETE, transaction.XappInstanceAddress, transaction.XappPort, subId}
+ subRouteAction := SubRouteInfo{DELETE, transaction.Xappkey.Addr, transaction.Xappkey.Port, subId}
err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
if err != nil {
xapp.Logger.Error("SendSubDelFail: Failed to update routing manager. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
diff --git a/pkg/control/main_test.go b/pkg/control/main_test.go
index f74339e..2185d53 100644
--- a/pkg/control/main_test.go
+++ b/pkg/control/main_test.go
@@ -98,6 +98,25 @@
//-----------------------------------------------------------------------------
//
//-----------------------------------------------------------------------------
+type testingMainControl struct {
+ testingControl
+ c *Control
+}
+
+func (mc *testingMainControl) wait_subs_clean(e2SubsId int, secs int) bool {
+ i := 1
+ for ; i <= secs*2; i++ {
+ if mc.c.registry.IsValidSequenceNumber(uint16(e2SubsId)) == false {
+ return true
+ }
+ time.Sleep(500 * time.Millisecond)
+ }
+ return false
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
func testError(t *testing.T, pattern string, args ...interface{}) {
xapp.Logger.Error(fmt.Sprintf(pattern, args...))
@@ -123,6 +142,7 @@
var xappConn *testingRmrControl
var e2termConn *testingRmrControl
+var mainCtrl *testingMainControl
func TestMain(m *testing.M) {
xapp.Logger.Info("TestMain start")
@@ -187,16 +207,17 @@
subrtfilename, _ := testCreateTmpFile(subsrt)
defer os.Remove(subrtfilename)
os.Setenv("RMR_SEED_RT", subrtfilename)
+ os.Setenv("RMR_SRC_ID", "localhost:14560")
xapp.Logger.Info("Using rt file %s", os.Getenv("RMR_SEED_RT"))
+ xapp.Logger.Info("Using src id %s", os.Getenv("RMR_SRC_ID"))
- mainCtrl := &testingControl{}
+ mainCtrl = &testingMainControl{}
mainCtrl.desc = "main"
mainCtrl.syncChan = make(chan struct{})
- os.Setenv("RMR_SRC_ID", "localhost:14560")
- c := NewControl()
+ mainCtrl.c = NewControl()
xapp.SetReadyCB(mainCtrl.ReadyCB, nil)
- go xapp.RunWithParams(c, false)
+ go xapp.RunWithParams(mainCtrl.c, false)
<-mainCtrl.syncChan
//---------------------------------
diff --git a/pkg/control/messaging_test.go b/pkg/control/messaging_test.go
index 60a5dc0..7308563 100644
--- a/pkg/control/messaging_test.go
+++ b/pkg/control/messaging_test.go
@@ -34,9 +34,9 @@
var e2asnpacker e2ap.E2APPackerIf = e2ap_wrapper.NewAsn1E2Packer()
+//-----------------------------------------------------------------------------
//
-//
-//
+//-----------------------------------------------------------------------------
func createSubsReq() *e2ap.E2APSubscriptionRequest {
req := &e2ap.E2APSubscriptionRequest{}
@@ -69,9 +69,9 @@
return req
}
+//-----------------------------------------------------------------------------
//
-//
-//
+//-----------------------------------------------------------------------------
func createSubsResp(req *e2ap.E2APSubscriptionRequest) *e2ap.E2APSubscriptionResponse {
resp := &e2ap.E2APSubscriptionResponse{}
@@ -96,9 +96,9 @@
return resp
}
+//-----------------------------------------------------------------------------
//
-//
-//
+//-----------------------------------------------------------------------------
func createSubsDelReq(e2SubsId uint32) *e2ap.E2APSubscriptionDeleteRequest {
req := &e2ap.E2APSubscriptionDeleteRequest{}
req.RequestId.Id = 1
@@ -107,9 +107,9 @@
return req
}
+//-----------------------------------------------------------------------------
//
-//
-//
+//-----------------------------------------------------------------------------
func createSubsDelResp(req *e2ap.E2APSubscriptionDeleteRequest) *e2ap.E2APSubscriptionDeleteResponse {
resp := &e2ap.E2APSubscriptionDeleteResponse{}
resp.RequestId.Id = req.RequestId.Id
@@ -119,7 +119,303 @@
}
//-----------------------------------------------------------------------------
-// TestSubRequestSubResponseOk
+//
+//-----------------------------------------------------------------------------
+func handle_xapp_subs_req(t *testing.T) {
+ xapp.Logger.Info("handle_xapp_subs_req start")
+ e2SubsReq := e2asnpacker.NewPackerSubscriptionRequest()
+
+ //---------------------------------
+ // xapp activity: Send Subs Req
+ //---------------------------------
+ //select {
+ //case <-time.After(1 * time.Second):
+ xapp.Logger.Info("(xappConn) Send Subs Req")
+ req := createSubsReq()
+ e2SubsReq.Set(req)
+ xapp.Logger.Debug("%s", e2SubsReq.String())
+ err, packedMsg := e2SubsReq.Pack(nil)
+ if err != nil {
+ testError(t, "(xappConn) pack NOK %s", err.Error())
+ }
+
+ params := &xapp.RMRParams{}
+ params.Mtype = xapp.RIC_SUB_REQ
+ params.SubId = -1
+ params.Payload = packedMsg.Buf
+ params.Meid = &xapp.RMRMeid{RanName: "RAN_NAME_1"}
+ params.Xid = "XID_1"
+ params.Mbuf = nil
+
+ snderr := xappConn.RmrSend(params)
+ if snderr != nil {
+ testError(t, "(xappConn) RMR SEND FAILED: %s", snderr.Error())
+ }
+ //}
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+func handle_e2term_subs_req(t *testing.T) (*e2ap.E2APSubscriptionRequest, *xapp.RMRParams) {
+ xapp.Logger.Info("handle_e2term_subs_req start")
+ e2SubsReq := e2asnpacker.NewPackerSubscriptionRequest()
+
+ //---------------------------------
+ // e2term activity: Recv Subs Req
+ //---------------------------------
+ select {
+ case msg := <-e2termConn.rmrConChan:
+ if msg.Mtype != xapp.RICMessageTypes["RIC_SUB_REQ"] {
+ testError(t, "(e2termConn) Received non RIC_SUB_REQ message")
+ } else {
+ xapp.Logger.Info("(e2termConn) Recv Subs Req")
+ packedData := &packer.PackedData{}
+ packedData.Buf = msg.Payload
+ unpackerr := e2SubsReq.UnPack(packedData)
+ if unpackerr != nil {
+ testError(t, "(e2termConn) RIC_SUB_REQ unpack failed err: %s", unpackerr.Error())
+ }
+ geterr, req := e2SubsReq.Get()
+ if geterr != nil {
+ testError(t, "(e2termConn) RIC_SUB_REQ get failed err: %s", geterr.Error())
+ }
+ return req, msg
+ }
+ case <-time.After(15 * time.Second):
+ testError(t, "(e2termConn) Not Received RIC_SUB_REQ within 15 secs")
+ }
+ return nil, nil
+}
+
+func handle_e2term_subs_resp(t *testing.T, req *e2ap.E2APSubscriptionRequest, msg *xapp.RMRParams) {
+ xapp.Logger.Info("handle_e2term_subs_resp start")
+ e2SubsResp := e2asnpacker.NewPackerSubscriptionResponse()
+
+ //---------------------------------
+ // e2term activity: Send Subs Resp
+ //---------------------------------
+ xapp.Logger.Info("(e2termConn) Send Subs Resp")
+ resp := createSubsResp(req)
+ e2SubsResp.Set(resp)
+ xapp.Logger.Debug("%s", e2SubsResp.String())
+ packerr, packedMsg := e2SubsResp.Pack(nil)
+ if packerr != nil {
+ testError(t, "(e2termConn) pack NOK %s", packerr.Error())
+ }
+
+ params := &xapp.RMRParams{}
+ params.Mtype = xapp.RIC_SUB_RESP
+ params.SubId = msg.SubId
+ params.Payload = packedMsg.Buf
+ params.Meid = msg.Meid
+ params.Xid = msg.Xid
+ params.Mbuf = nil
+
+ snderr := e2termConn.RmrSend(params)
+ if snderr != nil {
+ testError(t, "(e2termConn) RMR SEND FAILED: %s", snderr.Error())
+ }
+}
+
+func handle_e2term_subs_reqandresp(t *testing.T) {
+ req, msg := handle_e2term_subs_req(t)
+ handle_e2term_subs_resp(t, req, msg)
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+func handle_xapp_subs_resp(t *testing.T) int {
+ xapp.Logger.Info("handle_xapp_subs_resp start")
+ e2SubsResp := e2asnpacker.NewPackerSubscriptionResponse()
+ var e2SubsId int
+
+ //---------------------------------
+ // xapp activity: Recv Subs Resp
+ //---------------------------------
+ select {
+ case msg := <-xappConn.rmrConChan:
+ if msg.Mtype != xapp.RICMessageTypes["RIC_SUB_RESP"] {
+ testError(t, "(xappConn) Received non RIC_SUB_RESP message")
+ } else {
+ xapp.Logger.Info("(xappConn) Recv Subs Resp")
+
+ packedData := &packer.PackedData{}
+ packedData.Buf = msg.Payload
+ e2SubsId = msg.SubId
+ unpackerr := e2SubsResp.UnPack(packedData)
+
+ if unpackerr != nil {
+ testError(t, "(xappConn) RIC_SUB_RESP unpack failed err: %s", unpackerr.Error())
+ }
+ geterr, _ := e2SubsResp.Get()
+ if geterr != nil {
+ testError(t, "(xappConn) RIC_SUB_RESP get failed err: %s", geterr.Error())
+ }
+
+ }
+ case <-time.After(15 * time.Second):
+ testError(t, "(xappConn) Not Received RIC_SUB_RESP within 15 secs")
+ }
+ return e2SubsId
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+func handle_xapp_subs_del_req(t *testing.T, e2SubsId int) {
+ xapp.Logger.Info("handle_xapp_subs_del_req start")
+ e2SubsDelReq := e2asnpacker.NewPackerSubscriptionDeleteRequest()
+
+ //---------------------------------
+ // xapp activity: Send Subs Del Req
+ //---------------------------------
+ //select {
+ //case <-time.After(1 * time.Second):
+ xapp.Logger.Info("(xappConn) Send Subs Del Req")
+ req := createSubsDelReq(uint32(e2SubsId))
+ e2SubsDelReq.Set(req)
+ xapp.Logger.Debug("%s", e2SubsDelReq.String())
+ err, packedMsg := e2SubsDelReq.Pack(nil)
+ if err != nil {
+ testError(t, "(xappConn) pack NOK %s", err.Error())
+ }
+
+ params := &xapp.RMRParams{}
+ params.Mtype = xapp.RIC_SUB_DEL_REQ
+ params.SubId = e2SubsId
+ params.Payload = packedMsg.Buf
+ params.Meid = &xapp.RMRMeid{RanName: "RAN_NAME_1"}
+ params.Xid = "XID_1"
+ params.Mbuf = nil
+
+ snderr := xappConn.RmrSend(params)
+ if snderr != nil {
+ testError(t, "(xappConn) RMR SEND FAILED: %s", snderr.Error())
+ }
+ //}
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+func handle_e2term_subs_del_req(t *testing.T) (*e2ap.E2APSubscriptionDeleteRequest, *xapp.RMRParams) {
+ xapp.Logger.Info("handle_e2term_subs_del_req start")
+ e2SubsDelReq := e2asnpacker.NewPackerSubscriptionDeleteRequest()
+
+ //---------------------------------
+ // e2term activity: Recv Subs Del Req
+ //---------------------------------
+ select {
+ case msg := <-e2termConn.rmrConChan:
+ if msg.Mtype != xapp.RICMessageTypes["RIC_SUB_DEL_REQ"] {
+ testError(t, "(e2termConn) Received non RIC_SUB_DEL_REQ message")
+ } else {
+ xapp.Logger.Info("(e2termConn) Recv Subs Del Req")
+
+ packedData := &packer.PackedData{}
+ packedData.Buf = msg.Payload
+ unpackerr := e2SubsDelReq.UnPack(packedData)
+ if unpackerr != nil {
+ testError(t, "(e2termConn) RIC_SUB_DEL_REQ unpack failed err: %s", unpackerr.Error())
+ }
+ geterr, req := e2SubsDelReq.Get()
+ if geterr != nil {
+ testError(t, "(e2termConn) RIC_SUB_DEL_REQ get failed err: %s", geterr.Error())
+ }
+ return req, msg
+ }
+ case <-time.After(15 * time.Second):
+ testError(t, "(e2termConn) Not Received RIC_SUB_DEL_REQ within 15 secs")
+ }
+ return nil, nil
+}
+
+func handle_e2term_subs_del_resp(t *testing.T, req *e2ap.E2APSubscriptionDeleteRequest, msg *xapp.RMRParams) {
+ xapp.Logger.Info("handle_e2term_subs_del_resp start")
+ e2SubsDelResp := e2asnpacker.NewPackerSubscriptionDeleteResponse()
+
+ //---------------------------------
+ // e2term activity: Send Subs Del Resp
+ //---------------------------------
+ xapp.Logger.Info("(e2termConn) Send Subs Del Resp")
+ resp := createSubsDelResp(req)
+ e2SubsDelResp.Set(resp)
+ xapp.Logger.Debug("%s", e2SubsDelResp.String())
+ packerr, packedMsg := e2SubsDelResp.Pack(nil)
+ if packerr != nil {
+ testError(t, "(e2termConn) pack NOK %s", packerr.Error())
+ }
+
+ params := &xapp.RMRParams{}
+ params.Mtype = xapp.RIC_SUB_DEL_RESP
+ params.SubId = msg.SubId
+ params.Payload = packedMsg.Buf
+ params.Meid = msg.Meid
+ params.Xid = msg.Xid
+ params.Mbuf = nil
+
+ snderr := e2termConn.RmrSend(params)
+ if snderr != nil {
+ testError(t, "(e2termConn) RMR SEND FAILED: %s", snderr.Error())
+ }
+
+}
+
+func handle_e2term_subs_del_reqandresp(t *testing.T) {
+ req, msg := handle_e2term_subs_del_req(t)
+ handle_e2term_subs_del_resp(t, req, msg)
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+func handle_xapp_subs_del_resp(t *testing.T) {
+ xapp.Logger.Info("handle_xapp_subs_del_resp start")
+ e2SubsDelResp := e2asnpacker.NewPackerSubscriptionDeleteResponse()
+
+ //---------------------------------
+ // xapp activity: Recv Subs Del Resp
+ //---------------------------------
+ select {
+ case msg := <-xappConn.rmrConChan:
+ if msg.Mtype != xapp.RICMessageTypes["RIC_SUB_DEL_RESP"] {
+ testError(t, "(xappConn) Received non RIC_SUB_DEL_RESP message")
+ } else {
+ xapp.Logger.Info("(xappConn) Recv Subs Del Resp")
+
+ packedData := &packer.PackedData{}
+ packedData.Buf = msg.Payload
+ unpackerr := e2SubsDelResp.UnPack(packedData)
+ if unpackerr != nil {
+ testError(t, "(xappConn) RIC_SUB_DEL_RESP unpack failed err: %s", unpackerr.Error())
+ }
+ geterr, _ := e2SubsDelResp.Get()
+ if geterr != nil {
+ testError(t, "(xappConn) RIC_SUB_DEL_RESP get failed err: %s", geterr.Error())
+ }
+
+ }
+ case <-time.After(15 * time.Second):
+ testError(t, "(xappConn) Not Received RIC_SUB_DEL_RESP within 15 secs")
+ }
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+func handle_wait_subs_clean(t *testing.T, e2SubsId int) bool {
+ xapp.Logger.Info("handle_wait_subs_clean start")
+ if mainCtrl.wait_subs_clean(e2SubsId, 10) == false {
+ testError(t, "(general) no clean within 10 secs")
+ return false
+ }
+ return true
+}
+
+//-----------------------------------------------------------------------------
+// TestSubReqAndSubDelOk
//
// stub stub
// +-------+ +---------+ +---------+
@@ -153,215 +449,114 @@
//
//-----------------------------------------------------------------------------
func TestSubReqAndSubDelOk(t *testing.T) {
-
xapp.Logger.Info("TestSubReqAndSubDelOk start")
- e2SubsReq := e2asnpacker.NewPackerSubscriptionRequest()
- e2SubsResp := e2asnpacker.NewPackerSubscriptionResponse()
- e2SubsDelReq := e2asnpacker.NewPackerSubscriptionDeleteRequest()
- e2SubsDelResp := e2asnpacker.NewPackerSubscriptionDeleteResponse()
- var e2SubsId int
- //---------------------------------
- // xapp activity: Send Subs Req
- //---------------------------------
- select {
- case <-time.After(5 * time.Second):
- xapp.Logger.Info("(xappConn) Send Subs Req")
- req := createSubsReq()
- e2SubsReq.Set(req)
- xapp.Logger.Debug("%s", e2SubsReq.String())
- err, packedMsg := e2SubsReq.Pack(nil)
- if err != nil {
- testError(t, "(xappConn) pack NOK %s", err.Error())
- }
+ handle_xapp_subs_req(t)
+ handle_e2term_subs_reqandresp(t)
+ e2SubsId := handle_xapp_subs_resp(t)
- params := &xapp.RMRParams{}
- params.Mtype = xapp.RIC_SUB_REQ
- params.SubId = -1
- params.Payload = packedMsg.Buf
- params.Meid = &xapp.RMRMeid{RanName: "RAN_NAME_1"}
- params.Xid = "XID_1"
- params.Mbuf = nil
+ handle_xapp_subs_del_req(t, e2SubsId)
+ handle_e2term_subs_del_reqandresp(t)
+ handle_xapp_subs_del_resp(t)
- snderr := xappConn.RmrSend(params)
- if snderr != nil {
- testError(t, "(xappConn) RMR SEND FAILED: %s", snderr.Error())
- }
- }
+ //Wait that subs is cleaned
+ handle_wait_subs_clean(t, e2SubsId)
+}
- //---------------------------------
- // e2term activity: Recv Subs Req & Send Subs Resp
- //---------------------------------
- select {
- case msg := <-e2termConn.rmrConChan:
- if msg.Mtype != xapp.RICMessageTypes["RIC_SUB_REQ"] {
- testError(t, "(e2termConn) Received non RIC_SUB_REQ message")
- } else {
- xapp.Logger.Info("(e2termConn) Recv Subs Req & Send Subs Resp")
- packedData := &packer.PackedData{}
- packedData.Buf = msg.Payload
- unpackerr := e2SubsReq.UnPack(packedData)
- if unpackerr != nil {
- testError(t, "(e2termConn) RIC_SUB_REQ unpack failed err: %s", unpackerr.Error())
- }
- geterr, req := e2SubsReq.Get()
- if geterr != nil {
- testError(t, "(e2termConn) RIC_SUB_REQ get failed err: %s", geterr.Error())
- }
+//-----------------------------------------------------------------------------
+// TestSubReqRetransmission
+//
+// stub stub
+// +-------+ +---------+ +---------+
+// | xapp | | submgr | | e2term |
+// +-------+ +---------+ +---------+
+// | | |
+// | SubReq | |
+// |------------->| |
+// | | |
+// | | SubReq |
+// | |------------->|
+// | | |
+// | SubReq | |
+// | (retrans) | |
+// |------------->| |
+// | | |
+// | | SubResp |
+// | |<-------------|
+// | | |
+// | SubResp | |
+// |<-------------| |
+// | | |
+// | [SUBS DELETE] |
+// | | |
+//
+//-----------------------------------------------------------------------------
+func TestSubReqRetransmission(t *testing.T) {
+ xapp.Logger.Info("TestSubReqRetransmission start")
- resp := createSubsResp(req)
- e2SubsResp.Set(resp)
- xapp.Logger.Debug("%s", e2SubsResp.String())
- packerr, packedMsg := e2SubsResp.Pack(nil)
- if packerr != nil {
- testError(t, "(e2termConn) pack NOK %s", packerr.Error())
- }
+ //Subs Create
+ handle_xapp_subs_req(t)
+ req, msg := handle_e2term_subs_req(t)
+ handle_xapp_subs_req(t)
- params := &xapp.RMRParams{}
- params.Mtype = xapp.RIC_SUB_RESP
- params.SubId = msg.SubId
- params.Payload = packedMsg.Buf
- params.Meid = msg.Meid
- params.Xid = msg.Xid
- params.Mbuf = nil
+ handle_e2term_subs_resp(t, req, msg)
- snderr := e2termConn.RmrSend(params)
- if snderr != nil {
- testError(t, "(e2termConn) RMR SEND FAILED: %s", snderr.Error())
- }
+ e2SubsId := handle_xapp_subs_resp(t)
- }
- case <-time.After(15 * time.Second):
- testError(t, "(e2termConn) Not Received RIC_SUB_REQ within 15 secs")
- }
+ //Subs Delete
+ handle_xapp_subs_del_req(t, e2SubsId)
+ handle_e2term_subs_del_reqandresp(t)
+ handle_xapp_subs_del_resp(t)
- //---------------------------------
- // xapp activity: Recv Subs Resp
- //---------------------------------
- select {
- case msg := <-xappConn.rmrConChan:
- if msg.Mtype != xapp.RICMessageTypes["RIC_SUB_RESP"] {
- testError(t, "(xappConn) Received non RIC_SUB_RESP message")
- } else {
- xapp.Logger.Info("(xappConn) Recv Subs Resp")
+ //Wait that subs is cleaned
+ handle_wait_subs_clean(t, e2SubsId)
+}
- packedData := &packer.PackedData{}
- packedData.Buf = msg.Payload
- e2SubsId = msg.SubId
- unpackerr := e2SubsResp.UnPack(packedData)
+//-----------------------------------------------------------------------------
+// TestSubDelReqRetransmission
+//
+// stub stub
+// +-------+ +---------+ +---------+
+// | xapp | | submgr | | e2term |
+// +-------+ +---------+ +---------+
+// | | |
+// | [SUBS CREATE] |
+// | | |
+// | | |
+// | SubDelReq | |
+// |------------->| |
+// | | |
+// | | SubDelReq |
+// | |------------->|
+// | | |
+// | SubDelReq | |
+// |------------->| |
+// | | |
+// | | SubDelResp |
+// | |<-------------|
+// | | |
+// | SubDelResp | |
+// |<-------------| |
+//
+//-----------------------------------------------------------------------------
+func TestSubDelReqRetransmission(t *testing.T) {
+ xapp.Logger.Info("TestSubDelReqRetransmission start")
- if unpackerr != nil {
- testError(t, "(xappConn) RIC_SUB_RESP unpack failed err: %s", unpackerr.Error())
- }
- geterr, _ := e2SubsResp.Get()
- if geterr != nil {
- testError(t, "(xappConn) RIC_SUB_RESP get failed err: %s", geterr.Error())
- }
+ //Subs Create
+ handle_xapp_subs_req(t)
+ handle_e2term_subs_reqandresp(t)
+ e2SubsId := handle_xapp_subs_resp(t)
- }
- case <-time.After(15 * time.Second):
- testError(t, "(xappConn) Not Received RIC_SUB_RESP within 15 secs")
- }
+ //Subs Delete
+ handle_xapp_subs_del_req(t, e2SubsId)
+ req, msg := handle_e2term_subs_del_req(t)
- //---------------------------------
- // xapp activity: Send Subs Del Req
- //---------------------------------
- select {
- case <-time.After(2 * time.Second):
- xapp.Logger.Info("(xappConn) Send Subs Del Req")
- req := createSubsDelReq(uint32(e2SubsId))
- e2SubsDelReq.Set(req)
- xapp.Logger.Debug("%s", e2SubsDelReq.String())
- err, packedMsg := e2SubsDelReq.Pack(nil)
- if err != nil {
- testError(t, "(xappConn) pack NOK %s", err.Error())
- }
+ <-time.After(2 * time.Second)
- params := &xapp.RMRParams{}
- params.Mtype = xapp.RIC_SUB_DEL_REQ
- params.SubId = e2SubsId
- params.Payload = packedMsg.Buf
- params.Meid = &xapp.RMRMeid{RanName: "RAN_NAME_1"}
- params.Xid = "XID_1"
- params.Mbuf = nil
+ handle_xapp_subs_del_req(t, e2SubsId)
- snderr := xappConn.RmrSend(params)
- if snderr != nil {
- testError(t, "(xappConn) RMR SEND FAILED: %s", snderr.Error())
- }
- }
+ handle_e2term_subs_del_resp(t, req, msg)
- //---------------------------------
- // e2term activity: Recv Subs Del Req & Send Subs Del Resp
- //---------------------------------
- select {
- case msg := <-e2termConn.rmrConChan:
- if msg.Mtype != xapp.RICMessageTypes["RIC_SUB_DEL_REQ"] {
- testError(t, "(e2termConn) Received non RIC_SUB_DEL_REQ message")
- } else {
- xapp.Logger.Info("(e2termConn) Recv Subs Del Req & Send Subs Del Resp")
-
- packedData := &packer.PackedData{}
- packedData.Buf = msg.Payload
- unpackerr := e2SubsDelReq.UnPack(packedData)
- if unpackerr != nil {
- testError(t, "(e2termConn) RIC_SUB_DEL_REQ unpack failed err: %s", unpackerr.Error())
- }
- geterr, req := e2SubsDelReq.Get()
- if geterr != nil {
- testError(t, "(e2termConn) RIC_SUB_DEL_REQ get failed err: %s", geterr.Error())
- }
-
- resp := createSubsDelResp(req)
- e2SubsDelResp.Set(resp)
- xapp.Logger.Debug("%s", e2SubsDelResp.String())
- packerr, packedMsg := e2SubsDelResp.Pack(nil)
- if packerr != nil {
- testError(t, "(e2termConn) pack NOK %s", packerr.Error())
- }
-
- params := &xapp.RMRParams{}
- params.Mtype = xapp.RIC_SUB_DEL_RESP
- params.SubId = msg.SubId
- params.Payload = packedMsg.Buf
- params.Meid = msg.Meid
- params.Xid = msg.Xid
- params.Mbuf = nil
-
- snderr := e2termConn.RmrSend(params)
- if snderr != nil {
- testError(t, "(e2termConn) RMR SEND FAILED: %s", snderr.Error())
- }
-
- }
- case <-time.After(15 * time.Second):
- testError(t, "(e2termConn) Not Received RIC_SUB_DEL_REQ within 15 secs")
- }
-
- //---------------------------------
- // xapp activity: Recv Subs Del Resp
- //---------------------------------
- select {
- case msg := <-xappConn.rmrConChan:
- if msg.Mtype != xapp.RICMessageTypes["RIC_SUB_DEL_RESP"] {
- testError(t, "(xappConn) Received non RIC_SUB_DEL_RESP message")
- } else {
- xapp.Logger.Info("(xappConn) Recv Subs Del Resp")
-
- packedData := &packer.PackedData{}
- packedData.Buf = msg.Payload
- unpackerr := e2SubsDelResp.UnPack(packedData)
- if unpackerr != nil {
- testError(t, "(xappConn) RIC_SUB_DEL_RESP unpack failed err: %s", unpackerr.Error())
- }
- geterr, _ := e2SubsDelResp.Get()
- if geterr != nil {
- testError(t, "(xappConn) RIC_SUB_DEL_RESP get failed err: %s", geterr.Error())
- }
-
- }
- case <-time.After(15 * time.Second):
- testError(t, "(xappConn) Not Received RIC_SUB_DEL_RESP within 15 secs")
- }
-
+ //Wait that subs is cleaned
+ handle_wait_subs_clean(t, e2SubsId)
}
diff --git a/pkg/control/tracker.go b/pkg/control/tracker.go
index 1682ae7..65f816e 100644
--- a/pkg/control/tracker.go
+++ b/pkg/control/tracker.go
@@ -21,83 +21,101 @@
import (
"fmt"
+ "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
"sync"
)
+type TransactionKey struct {
+ SubID uint16 // subscription id / sequence number
+ TransType Action // action ongoing (CREATE/DELETE etc)
+}
+
+type TransactionXappKey struct {
+ Addr string // xapp addr
+ Port uint16 // xapp port
+ Xid string // xapp xid in req
+}
+
+type Transaction struct {
+ tracker *Tracker // tracker instance
+ Key TransactionKey // action key
+ Xappkey TransactionXappKey // transaction key
+ OrigParams *xapp.RMRParams // request orginal params
+}
+
+func (t *Transaction) SubRouteInfo() SubRouteInfo {
+ return SubRouteInfo{t.Key.TransType, t.Xappkey.Addr, t.Xappkey.Port, t.Key.SubID}
+}
+
/*
Implements a record of ongoing transactions and helper functions to CRUD the records.
*/
type Tracker struct {
- transactionTable map[TransactionKey]Transaction
- mutex sync.Mutex
+ transactionTable map[TransactionKey]*Transaction
+ transactionXappTable map[TransactionXappKey]*Transaction
+ mutex sync.Mutex
}
func (t *Tracker) Init() {
- t.transactionTable = make(map[TransactionKey]Transaction)
+ t.transactionTable = make(map[TransactionKey]*Transaction)
+ t.transactionXappTable = make(map[TransactionXappKey]*Transaction)
}
/*
Checks if a tranascation with similar type has been ongoing. If not then creates one.
Returns error if there is similar transatcion ongoing.
*/
-func (t *Tracker) TrackTransaction(key TransactionKey, xact Transaction) error {
+func (t *Tracker) TrackTransaction(subID uint16, act Action, addr string, port uint16, params *xapp.RMRParams) (*Transaction, error) {
+ key := TransactionKey{subID, act}
+ xappkey := TransactionXappKey{addr, port, params.Xid}
+ trans := &Transaction{t, key, xappkey, params}
t.mutex.Lock()
defer t.mutex.Unlock()
if _, ok := t.transactionTable[key]; ok {
// TODO: Implement merge related check here. If the key is same but the value is different.
- err := fmt.Errorf("transaction tracker: Similar transaction with sub id %d and type %s is ongoing", key.SubID, key.transType)
- return err
+ err := fmt.Errorf("transaction tracker: Similar transaction with sub id %d and type %s is ongoing", key.SubID, key.TransType)
+ return nil, err
}
- t.transactionTable[key] = xact
- return nil
+ if _, ok := t.transactionXappTable[xappkey]; ok {
+ // TODO: Implement merge related check here. If the key is same but the value is different.
+ err := fmt.Errorf("transaction tracker: Similar transaction with xapp key %v is ongoing", xappkey)
+ return nil, err
+ }
+ t.transactionTable[key] = trans
+ t.transactionXappTable[xappkey] = trans
+ return trans, nil
}
/*
Retreives the transaction table entry for the given request.
Returns error in case the transaction cannot be found.
*/
-func (t *Tracker) UpdateTransaction(SubID uint16, transType Action, xact Transaction) error {
- key := TransactionKey{SubID, transType}
- t.mutex.Lock()
- defer t.mutex.Unlock()
- if _, ok := t.transactionTable[key]; ok {
- // TODO: Implement merge related check here. If the key is same but the value is different.
- err := fmt.Errorf("transaction tracker: Similar transaction with sub id %d and type %v is ongoing", key.SubID, key.transType)
- return err
- }
- t.transactionTable[key] = xact
- return nil
-}
-
-/*
-Retreives the transaction table entry for the given request.
-Returns error in case the transaction cannot be found.
-*/
-func (t *Tracker) RetriveTransaction(subID uint16, act Action) (Transaction, error) {
+func (t *Tracker) RetriveTransaction(subID uint16, act Action) (*Transaction, error) {
key := TransactionKey{subID, act}
t.mutex.Lock()
defer t.mutex.Unlock()
- var xact Transaction
- if xact, ok := t.transactionTable[key]; ok {
- return xact, nil
+ if trans, ok := t.transactionTable[key]; ok {
+ return trans, nil
}
err := fmt.Errorf("transaction record for Subscription ID %d and action %s does not exist", subID, act)
- return xact, err
+ return nil, err
}
/*
Deletes the transaction table entry for the given request and returns the deleted xapp's address and port for reference.
Returns error in case the transaction cannot be found.
*/
-func (t *Tracker) completeTransaction(subID uint16, act Action) (Transaction, error) {
+func (t *Tracker) completeTransaction(subID uint16, act Action) (*Transaction, error) {
key := TransactionKey{subID, act}
- var emptyTransaction Transaction
t.mutex.Lock()
defer t.mutex.Unlock()
- if xact, ok := t.transactionTable[key]; ok {
+ if trans, ok1 := t.transactionTable[key]; ok1 {
+ if _, ok2 := t.transactionXappTable[trans.Xappkey]; ok2 {
+ delete(t.transactionXappTable, trans.Xappkey)
+ }
delete(t.transactionTable, key)
- return xact, nil
+ return trans, nil
}
err := fmt.Errorf("transaction record for Subscription ID %d and action %s does not exist", subID, act)
- return emptyTransaction, err
+ return nil, err
}
diff --git a/pkg/control/types.go b/pkg/control/types.go
index 3a9587f..83312d8 100644
--- a/pkg/control/types.go
+++ b/pkg/control/types.go
@@ -19,10 +19,6 @@
package control
-import (
- "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
-)
-
type RmrDatagram struct {
MessageType int
SubscriptionId uint16
@@ -37,14 +33,3 @@
}
type Action int
-
-type TransactionKey struct {
- SubID uint16
- transType Action
-}
-
-type Transaction struct {
- XappInstanceAddress string
- XappPort uint16
- OrigParams *xapp.RMRParams
-}