RICPLT-2988 Unittest timing issues during retransmission case
Preparation also for RICPLT-2571
Change-Id: Ie98aface81a308022ea4015d88aab449564f932f
Signed-off-by: Juha Hyttinen <juha.hyttinen@nokia.com>
diff --git a/pkg/control/control.go b/pkg/control/control.go
index d5a92b6..9ce34a0 100755
--- a/pkg/control/control.go
+++ b/pkg/control/control.go
@@ -44,6 +44,7 @@
tracker *Tracker
timerMap *TimerMap
rmrSendMutex sync.Mutex
+ msgCounter uint64
}
type RMRMeid struct {
@@ -98,6 +99,7 @@
rtmgrClient: &rtmgrClient,
tracker: tracker,
timerMap: timerMap,
+ msgCounter: 0,
}
}
@@ -130,6 +132,7 @@
}
func (c *Control) Consume(msg *xapp.RMRParams) (err error) {
+ c.msgCounter++
switch msg.Mtype {
case xapp.RICMessageTypes["RIC_SUB_REQ"]:
go c.handleSubscriptionRequest(msg)
@@ -155,35 +158,34 @@
params.Mbuf = nil
/* Reserve a sequence number and set it in the payload */
- newSubId, isIdValid := c.registry.ReserveSequenceNumber()
- if isIdValid != true {
+ subs := c.registry.ReserveSubscription()
+ if subs == nil {
xapp.Logger.Error("SubReq: Failed to reserve sequence number. Dropping this msg. SubId: %v, Xid: %s", params.SubId, params.Xid)
- c.registry.releaseSequenceNumber(newSubId)
return
}
- params.SubId = int(newSubId)
- err := c.e2ap.SetSubscriptionRequestSequenceNumber(params.Payload, newSubId)
+ params.SubId = int(subs.Seq)
+ err := c.e2ap.SetSubscriptionRequestSequenceNumber(params.Payload, subs.Seq)
if err != nil {
xapp.Logger.Error("SubReq: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, SubId: %v, Xid: %s, Payload %X", err, params.SubId, params.Xid, params.Payload)
- c.registry.releaseSequenceNumber(newSubId)
+ c.registry.releaseSequenceNumber(subs.Seq)
return
}
srcAddr, srcPort, err := c.rtmgrClient.SplitSource(params.Src)
if err != nil {
xapp.Logger.Error("SubReq: Failed to update routing-manager. Dropping this msg. Err: %s, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
- c.registry.releaseSequenceNumber(newSubId)
+ c.registry.releaseSequenceNumber(subs.Seq)
return
}
// Create transatcion record for every subscription request
var forwardRespToXapp bool = true
var responseReceived bool = false
- transaction, err := c.tracker.TrackTransaction(newSubId, CREATE, *srcAddr, *srcPort, params, responseReceived, forwardRespToXapp)
+ transaction, err := c.tracker.TrackTransaction(subs.Seq, CREATE, *srcAddr, *srcPort, params, responseReceived, forwardRespToXapp)
if err != nil {
xapp.Logger.Error("SubReq: Failed to create transaction record. Dropping this msg. Err: %v SubId: %v, Xid: %s", err, params.SubId, params.Xid)
- c.registry.releaseSequenceNumber(newSubId)
+ c.registry.releaseSequenceNumber(subs.Seq)
return
}
@@ -194,7 +196,7 @@
err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
if err != nil {
xapp.Logger.Error("SubReq: Failed to update routing manager. Dropping this msg. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
- c.registry.releaseSequenceNumber(newSubId)
+ c.registry.releaseSequenceNumber(subs.Seq)
return
}
@@ -204,7 +206,7 @@
if err != nil {
xapp.Logger.Error("SubReq: Failed to send request to E2T %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
}
- c.timerMap.StartTimer("RIC_SUB_REQ", int(newSubId), subReqTime, FirstTry, c.handleSubscriptionRequestTimer)
+ c.timerMap.StartTimer("RIC_SUB_REQ", int(subs.Seq), subReqTime, FirstTry, c.handleSubscriptionRequestTimer)
xapp.Logger.Debug("SubReq: Debugging transaction table = %v", c.tracker.transactionTable)
return
}
@@ -421,13 +423,13 @@
xapp.Logger.Info("SubDelReq: Received payloadSeqNum: %v", payloadSeqNum)
if c.registry.IsValidSequenceNumber(payloadSeqNum) {
- c.registry.deleteSubscription(payloadSeqNum)
var forwardRespToXapp bool = true
_, err = c.trackDeleteTransaction(params, payloadSeqNum, forwardRespToXapp)
if err != nil {
xapp.Logger.Error("SubDelReq: Failed to create transaction record. Dropping this msg. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
return
}
+ c.registry.setSubscriptionToUnConfirmed(payloadSeqNum)
} else {
xapp.Logger.Error("SubDelReq: Not valid sequence number. Dropping this msg. SubId: %v, Xid: %s", params.SubId, params.Xid)
return
diff --git a/pkg/control/main_test.go b/pkg/control/main_test.go
index 1c69b01..dc62225 100644
--- a/pkg/control/main_test.go
+++ b/pkg/control/main_test.go
@@ -117,24 +117,43 @@
//-----------------------------------------------------------------------------
//
//-----------------------------------------------------------------------------
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+type xappTransaction struct {
+ tc *testingXappControl
+ xid string
+ meid *xapp.RMRMeid
+}
type testingXappControl struct {
testingRmrControl
testingMessageChannel
meid *xapp.RMRMeid
xid_seq uint64
- xid string
}
func (tc *testingXappControl) newXid() string {
- tc.xid = tc.desc + "_XID_" + strconv.FormatUint(uint64(tc.xid_seq), 10)
+ var xid string
+ xid = tc.desc + "_XID_" + strconv.FormatUint(uint64(tc.xid_seq), 10)
tc.xid_seq++
- return tc.xid
+ return xid
+}
+
+func (tc *testingXappControl) newXappTransaction(xid *string, ranname string) *xappTransaction {
+ trans := &xappTransaction{}
+ trans.tc = tc
+ if xid == nil {
+ trans.xid = tc.newXid()
+ } else {
+ trans.xid = *xid
+ }
+ trans.meid = &xapp.RMRMeid{RanName: ranname}
+ return trans
}
func (tc *testingXappControl) Consume(msg *xapp.RMRParams) (err error) {
- //if msg.Xid == tc.xid {
if strings.Contains(msg.Xid, tc.desc) {
xapp.Logger.Info("(%s) Consume mtype=%s subid=%d xid=%s", tc.desc, xapp.RicMessageTypeToName[msg.Mtype], msg.SubId, msg.Xid)
tc.rmrConChan <- msg
@@ -149,8 +168,7 @@
xappCtrl.testingRmrControl = initTestingRmrControl(desc, rtfile, port, stat, xappCtrl)
xappCtrl.testingMessageChannel = initTestingMessageChannel()
xappCtrl.meid = &xapp.RMRMeid{RanName: ranname}
- xappCtrl.xid_seq = 0
- xappCtrl.newXid()
+ xappCtrl.xid_seq = 1
return xappCtrl
}
diff --git a/pkg/control/messaging_test.go b/pkg/control/messaging_test.go
index b8ed6e6..2d45ede 100644
--- a/pkg/control/messaging_test.go
+++ b/pkg/control/messaging_test.go
@@ -31,27 +31,12 @@
//-----------------------------------------------------------------------------
//
//-----------------------------------------------------------------------------
-
var e2asnpacker e2ap.E2APPackerIf = e2ap_wrapper.NewAsn1E2Packer()
//-----------------------------------------------------------------------------
//
//-----------------------------------------------------------------------------
-type xappTransaction struct {
- xappConn *testingXappControl
- xid string
-}
-
-func newXappTransaction(xappConn *testingXappControl) {
- trans := &xappTransaction{}
- trans.xappConn = xappConn
- trans.xid = xappConn.newXid()
-}
-
-//-----------------------------------------------------------------------------
-//
-//-----------------------------------------------------------------------------
-func (xappConn *testingXappControl) handle_xapp_subs_req(t *testing.T, updseq bool) {
+func (xappConn *testingXappControl) handle_xapp_subs_req(t *testing.T, oldTrans *xappTransaction) *xappTransaction {
xapp.Logger.Info("handle_xapp_subs_req")
e2SubsReq := e2asnpacker.NewPackerSubscriptionRequest()
@@ -93,29 +78,34 @@
err, packedMsg := e2SubsReq.Pack(nil)
if err != nil {
testError(t, "(%s) pack NOK %s", xappConn.desc, err.Error())
+ return nil
+ }
+
+ var trans *xappTransaction = oldTrans
+ if trans == nil {
+ trans = xappConn.newXappTransaction(nil, "RAN_NAME_1")
}
params := &xapp.RMRParams{}
params.Mtype = xapp.RIC_SUB_REQ
params.SubId = -1
params.Payload = packedMsg.Buf
- params.Meid = xappConn.meid
- if updseq {
- xappConn.newXid()
- }
- params.Xid = xappConn.xid
+ params.Meid = trans.meid
+ params.Xid = trans.xid
params.Mbuf = nil
snderr := xappConn.RmrSend(params)
if snderr != nil {
testError(t, "(%s) RMR SEND FAILED: %s", xappConn.desc, snderr.Error())
+ return nil
}
+ return trans
}
//-----------------------------------------------------------------------------
//
//-----------------------------------------------------------------------------
-func (xappConn *testingXappControl) handle_xapp_subs_resp(t *testing.T) int {
+func (xappConn *testingXappControl) handle_xapp_subs_resp(t *testing.T, trans *xappTransaction) int {
xapp.Logger.Info("handle_xapp_subs_resp")
e2SubsResp := e2asnpacker.NewPackerSubscriptionResponse()
var e2SubsId int
@@ -126,7 +116,10 @@
select {
case msg := <-xappConn.rmrConChan:
if msg.Mtype != xapp.RICMessageTypes["RIC_SUB_RESP"] {
- testError(t, "(%s) Received wrong mtype expected %s got %s, error", xappConn.desc, "RIC_SUB_RESP", xapp.RicMessageTypeToName[msg.Mtype])
+ testError(t, "(%s) Received RIC_SUB_RESP wrong mtype expected %s got %s, error", xappConn.desc, "RIC_SUB_RESP", xapp.RicMessageTypeToName[msg.Mtype])
+ return -1
+ } else if msg.Xid != trans.xid {
+ testError(t, "(%s) Received RIC_SUB_RESP wrong xid expected %s got %s, error", xappConn.desc, trans.xid, msg.Xid)
return -1
} else {
packedData := &packer.PackedData{}
@@ -155,7 +148,7 @@
//-----------------------------------------------------------------------------
//
//-----------------------------------------------------------------------------
-func (xappConn *testingXappControl) handle_xapp_subs_del_req(t *testing.T, updseq bool, e2SubsId int) {
+func (xappConn *testingXappControl) handle_xapp_subs_del_req(t *testing.T, oldTrans *xappTransaction, e2SubsId int) *xappTransaction {
xapp.Logger.Info("handle_xapp_subs_del_req")
e2SubsDelReq := e2asnpacker.NewPackerSubscriptionDeleteRequest()
@@ -174,29 +167,34 @@
err, packedMsg := e2SubsDelReq.Pack(nil)
if err != nil {
testError(t, "(%s) pack NOK %s", xappConn.desc, err.Error())
+ return nil
+ }
+
+ var trans *xappTransaction = oldTrans
+ if trans == nil {
+ trans = xappConn.newXappTransaction(nil, "RAN_NAME_1")
}
params := &xapp.RMRParams{}
params.Mtype = xapp.RIC_SUB_DEL_REQ
params.SubId = e2SubsId
params.Payload = packedMsg.Buf
- params.Meid = xappConn.meid
- if updseq {
- xappConn.newXid()
- }
- params.Xid = xappConn.xid
+ params.Meid = trans.meid
+ params.Xid = trans.xid
params.Mbuf = nil
snderr := xappConn.RmrSend(params)
if snderr != nil {
testError(t, "(%s) RMR SEND FAILED: %s", xappConn.desc, snderr.Error())
+ return nil
}
+ return trans
}
//-----------------------------------------------------------------------------
//
//-----------------------------------------------------------------------------
-func (xappConn *testingXappControl) handle_xapp_subs_del_resp(t *testing.T) {
+func (xappConn *testingXappControl) handle_xapp_subs_del_resp(t *testing.T, trans *xappTransaction) {
xapp.Logger.Info("handle_xapp_subs_del_resp")
e2SubsDelResp := e2asnpacker.NewPackerSubscriptionDeleteResponse()
@@ -206,7 +204,10 @@
select {
case msg := <-xappConn.rmrConChan:
if msg.Mtype != xapp.RICMessageTypes["RIC_SUB_DEL_RESP"] {
- testError(t, "(%s) Received wrong mtype expected %s got %s, error", xappConn.desc, "RIC_SUB_DEL_RESP", xapp.RicMessageTypeToName[msg.Mtype])
+ testError(t, "(%s) Received RIC_SUB_DEL_RESP wrong mtype expected %s got %s, error", xappConn.desc, "RIC_SUB_DEL_RESP", xapp.RicMessageTypeToName[msg.Mtype])
+ return
+ } else if msg.Xid != trans.xid {
+ testError(t, "(%s) Received RIC_SUB_DEL_RESP wrong xid expected %s got %s, error", xappConn.desc, trans.xid, msg.Xid)
return
} else {
packedData := &packer.PackedData{}
@@ -403,6 +404,44 @@
return false
}
+func (mc *testingMainControl) get_seqcnt(t *testing.T) uint16 {
+ mc.c.registry.mutex.Lock()
+ defer mc.c.registry.mutex.Unlock()
+ return mc.c.registry.counter
+}
+
+func (mc *testingMainControl) wait_seqcnt_change(t *testing.T, orig uint16, secs int) (uint16, bool) {
+ i := 1
+ for ; i <= secs*2; i++ {
+ mc.c.registry.mutex.Lock()
+ curr := mc.c.registry.counter
+ mc.c.registry.mutex.Unlock()
+ if curr != orig {
+ return curr, true
+ }
+ time.Sleep(500 * time.Millisecond)
+ }
+ testError(t, "(general) no seq change within %d secs", secs)
+ return 0, false
+}
+
+func (mc *testingMainControl) get_msgcounter(t *testing.T) uint64 {
+ return mc.c.msgCounter
+}
+
+func (mc *testingMainControl) wait_msgcounter_change(t *testing.T, orig uint64, secs int) (uint64, bool) {
+ i := 1
+ for ; i <= secs*2; i++ {
+ curr := mc.c.msgCounter
+ if curr != orig {
+ return curr, true
+ }
+ time.Sleep(500 * time.Millisecond)
+ }
+ testError(t, "(general) no msg counter change within %d secs", secs)
+ return 0, false
+}
+
//-----------------------------------------------------------------------------
// TestSubReqAndSubDelOk
//
@@ -440,15 +479,15 @@
func TestSubReqAndSubDelOk(t *testing.T) {
xapp.Logger.Info("TestSubReqAndSubDelOk")
- xappConn1.handle_xapp_subs_req(t, true)
+ cretrans := xappConn1.handle_xapp_subs_req(t, nil)
crereq, cremsg := e2termConn.handle_e2term_subs_req(t)
e2termConn.handle_e2term_subs_resp(t, crereq, cremsg)
- e2SubsId := xappConn1.handle_xapp_subs_resp(t)
+ e2SubsId := xappConn1.handle_xapp_subs_resp(t, cretrans)
- xappConn1.handle_xapp_subs_del_req(t, true, e2SubsId)
+ deltrans := xappConn1.handle_xapp_subs_del_req(t, nil, e2SubsId)
delreq, delmsg := e2termConn.handle_e2term_subs_del_req(t)
e2termConn.handle_e2term_subs_del_resp(t, delreq, delmsg)
- xappConn1.handle_xapp_subs_del_resp(t)
+ xappConn1.handle_xapp_subs_del_resp(t, deltrans)
//Wait that subs is cleaned
mainCtrl.wait_subs_clean(t, e2SubsId, 10)
@@ -486,19 +525,21 @@
xapp.Logger.Info("TestSubReqRetransmission")
//Subs Create
- xappConn1.handle_xapp_subs_req(t, true)
+ cretrans := xappConn1.handle_xapp_subs_req(t, nil)
crereq, cremsg := e2termConn.handle_e2term_subs_req(t)
- xappConn1.handle_xapp_subs_req(t, false)
+
+ seqBef := mainCtrl.get_msgcounter(t)
+ xappConn1.handle_xapp_subs_req(t, cretrans) //Retransmitted SubReq
+ mainCtrl.wait_msgcounter_change(t, seqBef, 10)
e2termConn.handle_e2term_subs_resp(t, crereq, cremsg)
-
- e2SubsId := xappConn1.handle_xapp_subs_resp(t)
+ e2SubsId := xappConn1.handle_xapp_subs_resp(t, cretrans)
//Subs Delete
- xappConn1.handle_xapp_subs_del_req(t, true, e2SubsId)
+ deltrans := xappConn1.handle_xapp_subs_del_req(t, nil, e2SubsId)
delreq, delmsg := e2termConn.handle_e2term_subs_del_req(t)
e2termConn.handle_e2term_subs_del_resp(t, delreq, delmsg)
- xappConn1.handle_xapp_subs_del_resp(t)
+ xappConn1.handle_xapp_subs_del_resp(t, deltrans)
//Wait that subs is cleaned
mainCtrl.wait_subs_clean(t, e2SubsId, 10)
@@ -535,20 +576,21 @@
xapp.Logger.Info("TestSubDelReqRetransmission")
//Subs Create
- xappConn1.handle_xapp_subs_req(t, true)
+ cretrans := xappConn1.handle_xapp_subs_req(t, nil)
crereq, cremsg := e2termConn.handle_e2term_subs_req(t)
e2termConn.handle_e2term_subs_resp(t, crereq, cremsg)
- e2SubsId := xappConn1.handle_xapp_subs_resp(t)
+ e2SubsId := xappConn1.handle_xapp_subs_resp(t, cretrans)
//Subs Delete
- xappConn1.handle_xapp_subs_del_req(t, true, e2SubsId)
+ deltrans := xappConn1.handle_xapp_subs_del_req(t, nil, e2SubsId)
delreq, delmsg := e2termConn.handle_e2term_subs_del_req(t)
- <-time.After(2 * time.Second)
- xappConn1.handle_xapp_subs_del_req(t, false, e2SubsId)
+ seqBef := mainCtrl.get_msgcounter(t)
+ xappConn1.handle_xapp_subs_del_req(t, deltrans, e2SubsId) //Retransmitted SubDelReq
+ mainCtrl.wait_msgcounter_change(t, seqBef, 10)
e2termConn.handle_e2term_subs_del_resp(t, delreq, delmsg)
- xappConn1.handle_xapp_subs_del_resp(t)
+ xappConn1.handle_xapp_subs_del_resp(t, deltrans)
//Wait that subs is cleaned
mainCtrl.wait_subs_clean(t, e2SubsId, 10)
@@ -596,34 +638,111 @@
xapp.Logger.Info("TestSubReqAndSubDelOkTwoParallel")
//Req1
- xappConn1.handle_xapp_subs_req(t, true)
+ cretrans1 := xappConn1.handle_xapp_subs_req(t, nil)
crereq1, cremsg1 := e2termConn.handle_e2term_subs_req(t)
//Req2
- xappConn2.handle_xapp_subs_req(t, true)
+ cretrans2 := xappConn2.handle_xapp_subs_req(t, nil)
crereq2, cremsg2 := e2termConn.handle_e2term_subs_req(t)
//Resp1
e2termConn.handle_e2term_subs_resp(t, crereq1, cremsg1)
- e2SubsId1 := xappConn1.handle_xapp_subs_resp(t)
+ e2SubsId1 := xappConn1.handle_xapp_subs_resp(t, cretrans1)
//Resp2
e2termConn.handle_e2term_subs_resp(t, crereq2, cremsg2)
- e2SubsId2 := xappConn2.handle_xapp_subs_resp(t)
+ e2SubsId2 := xappConn2.handle_xapp_subs_resp(t, cretrans2)
//Del1
- xappConn1.handle_xapp_subs_del_req(t, true, e2SubsId1)
+ deltrans1 := xappConn1.handle_xapp_subs_del_req(t, nil, e2SubsId1)
delreq1, delmsg1 := e2termConn.handle_e2term_subs_del_req(t)
e2termConn.handle_e2term_subs_del_resp(t, delreq1, delmsg1)
- xappConn1.handle_xapp_subs_del_resp(t)
+ xappConn1.handle_xapp_subs_del_resp(t, deltrans1)
//Wait that subs is cleaned
mainCtrl.wait_subs_clean(t, e2SubsId1, 10)
//Del2
- xappConn2.handle_xapp_subs_del_req(t, true, e2SubsId2)
+ deltrans2 := xappConn2.handle_xapp_subs_del_req(t, nil, e2SubsId2)
delreq2, delmsg2 := e2termConn.handle_e2term_subs_del_req(t)
e2termConn.handle_e2term_subs_del_resp(t, delreq2, delmsg2)
- xappConn2.handle_xapp_subs_del_resp(t)
+ xappConn2.handle_xapp_subs_del_resp(t, deltrans2)
+ //Wait that subs is cleaned
+ mainCtrl.wait_subs_clean(t, e2SubsId2, 10)
+}
+
+//-----------------------------------------------------------------------------
+// TestSameSubsDiffRan
+// Same subscription to different RANs
+//
+// stub stub
+// +-------+ +---------+ +---------+
+// | xapp | | submgr | | e2term |
+// +-------+ +---------+ +---------+
+// | | |
+// | | |
+// | | |
+// | SubReq(r1) | |
+// |------------->| |
+// | | |
+// | | SubReq(r1) |
+// | |------------->|
+// | | |
+// | | SubResp(r1) |
+// | |<-------------|
+// | | |
+// | SubResp(r1) | |
+// |<-------------| |
+// | | |
+// | SubReq(r2) | |
+// |------------->| |
+// | | |
+// | | SubReq(r2) |
+// | |------------->|
+// | | |
+// | | SubResp(r2) |
+// | |<-------------|
+// | | |
+// | SubResp(r2) | |
+// |<-------------| |
+// | | |
+// | [SUBS r1 DELETE] |
+// | | |
+// | [SUBS r2 DELETE] |
+// | | |
+//
+//-----------------------------------------------------------------------------
+func TestSameSubsDiffRan(t *testing.T) {
+ xapp.Logger.Info("TestSameSubsDiffRan")
+
+ //Req1
+ cretrans1 := xappConn1.newXappTransaction(nil, "RAN_NAME_1")
+ xappConn1.handle_xapp_subs_req(t, cretrans1)
+ crereq1, cremsg1 := e2termConn.handle_e2term_subs_req(t)
+ e2termConn.handle_e2term_subs_resp(t, crereq1, cremsg1)
+ e2SubsId1 := xappConn1.handle_xapp_subs_resp(t, cretrans1)
+
+ //Req2
+ cretrans2 := xappConn1.newXappTransaction(nil, "RAN_NAME_2")
+ xappConn1.handle_xapp_subs_req(t, cretrans2)
+ crereq2, cremsg2 := e2termConn.handle_e2term_subs_req(t)
+ e2termConn.handle_e2term_subs_resp(t, crereq2, cremsg2)
+ e2SubsId2 := xappConn1.handle_xapp_subs_resp(t, cretrans2)
+
+ //Del1
+ deltrans1 := xappConn1.newXappTransaction(nil, "RAN_NAME_1")
+ xappConn1.handle_xapp_subs_del_req(t, deltrans1, e2SubsId1)
+ delreq1, delmsg1 := e2termConn.handle_e2term_subs_del_req(t)
+ e2termConn.handle_e2term_subs_del_resp(t, delreq1, delmsg1)
+ xappConn1.handle_xapp_subs_del_resp(t, deltrans1)
+ //Wait that subs is cleaned
+ mainCtrl.wait_subs_clean(t, e2SubsId1, 10)
+
+ //Del2
+ deltrans2 := xappConn1.newXappTransaction(nil, "RAN_NAME_2")
+ xappConn1.handle_xapp_subs_del_req(t, deltrans2, e2SubsId2)
+ delreq2, delmsg2 := e2termConn.handle_e2term_subs_del_req(t)
+ e2termConn.handle_e2term_subs_del_resp(t, delreq2, delmsg2)
+ xappConn1.handle_xapp_subs_del_resp(t, deltrans2)
//Wait that subs is cleaned
mainCtrl.wait_subs_clean(t, e2SubsId2, 10)
}
diff --git a/pkg/control/registry.go b/pkg/control/registry.go
index 3ac15f1..3d70d63 100644
--- a/pkg/control/registry.go
+++ b/pkg/control/registry.go
@@ -25,8 +25,20 @@
)
type Subscription struct {
- Seq uint16
- Confirmed bool
+ Seq uint16
+ Active bool
+}
+
+func (s *Subscription) Confirmed() {
+ s.Active = true
+}
+
+func (s *Subscription) UnConfirmed() {
+ s.Active = false
+}
+
+func (s *Subscription) IsConfirmed() bool {
+ return s.Active
}
type Registry struct {
@@ -42,24 +54,37 @@
}
// Reserves and returns the next free sequence number
-func (r *Registry) ReserveSequenceNumber() (uint16, bool) {
+func (r *Registry) ReserveSubscription() *Subscription {
// Check is current SequenceNumber valid
+ // Allocate next SequenceNumber value and retry N times
r.mutex.Lock()
defer r.mutex.Unlock()
- sequenceNumber := r.counter
- if _, ok := r.register[sequenceNumber]; ok {
- xapp.Logger.Error("Invalid SeqenceNumber sequenceNumber: %v", sequenceNumber)
- return sequenceNumber, false
+ var subs *Subscription = nil
+ var retrytimes uint16 = 1000
+ for ; subs == nil && retrytimes > 0; retrytimes-- {
+ sequenceNumber := r.counter
+ if r.counter == 65535 {
+ r.counter = 0
+ } else {
+ r.counter++
+ }
+ if _, ok := r.register[sequenceNumber]; ok == false {
+ r.register[sequenceNumber] = &Subscription{sequenceNumber, false}
+ return r.register[sequenceNumber]
+ }
}
- r.register[sequenceNumber] = &Subscription{sequenceNumber, false}
+ return nil
+}
- // Allocate next SequenceNumber value
- if r.counter == 65535 {
- r.counter = 0
- } else {
- r.counter++
+// This function checks the validity of the given subscription id
+func (r *Registry) GetSubscription(sn uint16) *Subscription {
+ r.mutex.Lock()
+ defer r.mutex.Unlock()
+ xapp.Logger.Debug("Registry map: %v", r.register)
+ if _, ok := r.register[sn]; ok {
+ return r.register[sn]
}
- return sequenceNumber, true
+ return nil
}
// This function checks the validity of the given subscription id
@@ -77,14 +102,14 @@
func (r *Registry) setSubscriptionToConfirmed(sn uint16) {
r.mutex.Lock()
defer r.mutex.Unlock()
- r.register[sn].Confirmed = true
+ r.register[sn].Confirmed()
}
//This function sets the given id as unused in the register
-func (r *Registry) deleteSubscription(sn uint16) {
+func (r *Registry) setSubscriptionToUnConfirmed(sn uint16) {
r.mutex.Lock()
defer r.mutex.Unlock()
- r.register[sn].Confirmed = false
+ r.register[sn].UnConfirmed()
}
//This function releases the given id as unused in the register
diff --git a/pkg/control/tracker.go b/pkg/control/tracker.go
index 584b331..9287ea8 100644
--- a/pkg/control/tracker.go
+++ b/pkg/control/tracker.go
@@ -31,9 +31,8 @@
}
type TransactionXappKey struct {
- Addr string // xapp addr
- Port uint16 // xapp port
- Xid string // xapp xid in req
+ RmrEndpoint
+ Xid string // xapp xid in req
}
type Transaction struct {
@@ -46,7 +45,7 @@
}
func (t *Transaction) SubRouteInfo() SubRouteInfo {
- return SubRouteInfo{t.Key.TransType, t.Xappkey.Addr, t.Xappkey.Port, t.Key.SubID}
+ return SubRouteInfo{t.Key.TransType, t.Xappkey.RmrEndpoint.Addr, t.Xappkey.RmrEndpoint.Port, t.Key.SubID}
}
/*
@@ -69,7 +68,8 @@
*/
func (t *Tracker) TrackTransaction(subID uint16, act Action, addr string, port uint16, params *xapp.RMRParams, respReceived bool, forwardRespToXapp bool) (*Transaction, error) {
key := TransactionKey{subID, act}
- xappkey := TransactionXappKey{addr, port, params.Xid}
+ endpoint := RmrEndpoint{addr, port}
+ xappkey := TransactionXappKey{endpoint, params.Xid}
trans := &Transaction{t, key, xappkey, params, respReceived, forwardRespToXapp}
t.mutex.Lock()
defer t.mutex.Unlock()
diff --git a/pkg/control/types.go b/pkg/control/types.go
index 83312d8..febd41a 100644
--- a/pkg/control/types.go
+++ b/pkg/control/types.go
@@ -32,4 +32,8 @@
SubID uint16
}
+type RmrEndpoint struct {
+ Addr string // xapp addr
+ Port uint16 // xapp port
+}
type Action int