RICPLT-2961 Drop retransmitted messages with same transaction id

Change-Id: Iaccff14cd9bbbf0b029a8153e9665209b68f65d3
Signed-off-by: Juha Hyttinen <juha.hyttinen@nokia.com>
diff --git a/pkg/control/control.go b/pkg/control/control.go
index b398daa..cb085de 100644
--- a/pkg/control/control.go
+++ b/pkg/control/control.go
@@ -164,26 +164,27 @@
 	err := c.e2ap.SetSubscriptionRequestSequenceNumber(params.Payload, newSubId)
 	if err != nil {
 		xapp.Logger.Error("SubReq: Unable to set Sequence Number in Payload. Dropping this msg. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
+		c.registry.releaseSequenceNumber(newSubId)
 		return
 	}
 
 	srcAddr, srcPort, err := c.rtmgrClient.SplitSource(params.Src)
 	if err != nil {
 		xapp.Logger.Error("SubReq: Failed to update routing-manager. Dropping this msg. Err: %s, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
+		c.registry.releaseSequenceNumber(newSubId)
 		return
 	}
 
 	/* Create transatcion records for every subscription request */
-	xactKey := TransactionKey{newSubId, CREATE}
-	xactValue := Transaction{*srcAddr, *srcPort, params}
-	err = c.tracker.TrackTransaction(xactKey, xactValue)
+	transaction, err := c.tracker.TrackTransaction(newSubId, CREATE, *srcAddr, *srcPort, params)
 	if err != nil {
 		xapp.Logger.Error("SubReq: Failed to create transaction record. Dropping this msg. Err: %v SubId: %v, Xid: %s", err, params.SubId, params.Xid)
+		c.registry.releaseSequenceNumber(newSubId)
 		return
 	}
 
 	/* Update routing manager about the new subscription*/
-	subRouteAction := SubRouteInfo{CREATE, *srcAddr, *srcPort, newSubId}
+	subRouteAction := transaction.SubRouteInfo()
 	xapp.Logger.Info("SubReq: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid)
 
 	err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
@@ -225,13 +226,12 @@
 	c.timerMap.StopTimer("RIC_SUB_REQ", int(payloadSeqNum))
 
 	c.registry.setSubscriptionToConfirmed(payloadSeqNum)
-	var transaction Transaction
-	transaction, err = c.tracker.RetriveTransaction(payloadSeqNum, CREATE)
+	transaction, err := c.tracker.RetriveTransaction(payloadSeqNum, CREATE)
 	if err != nil {
 		xapp.Logger.Error("SubResp: Failed to retrive transaction record. Dropping this msg. Err: %v, SubId: %v", err, params.SubId)
 		return
 	}
-	xapp.Logger.Info("SubResp: SubId: %v, from address: %v:%v. Retrieved old subId", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort)
+	xapp.Logger.Info("SubResp: SubId: %v, from address: %v:%v. Retrieved old subId", int(payloadSeqNum), transaction.Xappkey.Addr, transaction.Xappkey.Port)
 
 	params.SubId = int(payloadSeqNum)
 	params.Xid = transaction.OrigParams.Xid
@@ -242,7 +242,7 @@
 		xapp.Logger.Error("SubResp: Failed to send response to xApp. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
 	}
 
-	xapp.Logger.Info("SubResp: SubId: %v, from address: %v:%v. Deleting transaction record", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort)
+	xapp.Logger.Info("SubResp: SubId: %v, from address: %v:%v. Deleting transaction record", int(payloadSeqNum), transaction.Xappkey.Addr, transaction.Xappkey.Port)
 	transaction, err = c.tracker.completeTransaction(payloadSeqNum, CREATE)
 	if err != nil {
 		xapp.Logger.Error("SubResp: Failed to delete transaction record. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
@@ -265,13 +265,12 @@
 
 	c.timerMap.StopTimer("RIC_SUB_REQ", int(payloadSeqNum))
 
-	var transaction Transaction
-	transaction, err = c.tracker.RetriveTransaction(payloadSeqNum, CREATE)
+	transaction, err := c.tracker.RetriveTransaction(payloadSeqNum, CREATE)
 	if err != nil {
 		xapp.Logger.Error("SubFail: Failed to retrive transaction record. Dropping this msg. Err: %v, SubId: %v", err, params.SubId)
 		return
 	}
-	xapp.Logger.Info("SubFail: SubId: %v, from address: %v:%v. Forwarding response to xApp", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort)
+	xapp.Logger.Info("SubFail: SubId: %v, from address: %v:%v. Forwarding response to xApp", int(payloadSeqNum), transaction.Xappkey.Addr, transaction.Xappkey.Port)
 
 	params.SubId = int(payloadSeqNum)
 	params.Xid = transaction.OrigParams.Xid
@@ -285,7 +284,7 @@
 	time.Sleep(3 * time.Second)
 
 	xapp.Logger.Info("SubFail: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid)
-	subRouteAction := SubRouteInfo{CREATE, transaction.XappInstanceAddress, transaction.XappPort, payloadSeqNum}
+	subRouteAction := transaction.SubRouteInfo()
 	err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
 	if err != nil {
 		xapp.Logger.Error("SubFail: Failed to update routing manager. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
@@ -348,10 +347,10 @@
 
 	time.Sleep(3 * time.Second)
 
-	xapp.Logger.Info("SendSubFail: SubId: %v, from address: %v:%v. Deleting transaction record", int(subId), transaction.XappInstanceAddress, transaction.XappPort)
+	xapp.Logger.Info("SendSubFail: SubId: %v, from address: %v:%v. Deleting transaction record", int(subId), transaction.Xappkey.Addr, transaction.Xappkey.Port)
 
 	xapp.Logger.Info("SubReqTimer: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid)
-	subRouteAction := SubRouteInfo{DELETE, transaction.XappInstanceAddress, transaction.XappPort, subId}
+	subRouteAction := SubRouteInfo{DELETE, transaction.Xappkey.Addr, transaction.Xappkey.Port, subId}
 	err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
 	if err != nil {
 		xapp.Logger.Error("SendSubFail: Failed to update routing manager %v. SubId: %v, Xid: %s", err, params.SubId, params.Xid)
@@ -409,7 +408,7 @@
 
 	if c.registry.IsValidSequenceNumber(payloadSeqNum) {
 		c.registry.deleteSubscription(payloadSeqNum)
-		err = c.trackDeleteTransaction(params, payloadSeqNum)
+		_, err = c.trackDeleteTransaction(params, payloadSeqNum)
 		if err != nil {
 			xapp.Logger.Error("SubDelReq: Failed to create transaction record. Dropping this msg. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
 			return
@@ -429,14 +428,12 @@
 	return
 }
 
-func (c *Control) trackDeleteTransaction(params *xapp.RMRParams, payloadSeqNum uint16) (err error) {
+func (c *Control) trackDeleteTransaction(params *xapp.RMRParams, payloadSeqNum uint16) (transaction *Transaction, err error) {
 	srcAddr, srcPort, err := c.rtmgrClient.SplitSource(params.Src)
 	if err != nil {
 		xapp.Logger.Error("SubDelReq: Failed to update routing-manager. Err: %s, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
 	}
-	xactKey := TransactionKey{payloadSeqNum, DELETE}
-	xactValue := Transaction{*srcAddr, *srcPort, params}
-	err = c.tracker.TrackTransaction(xactKey, xactValue)
+	transaction, err = c.tracker.TrackTransaction(payloadSeqNum, DELETE, *srcAddr, *srcPort, params)
 	return
 }
 
@@ -454,13 +451,12 @@
 
 	c.timerMap.StopTimer("RIC_SUB_DEL_REQ", int(payloadSeqNum))
 
-	var transaction Transaction
-	transaction, err = c.tracker.RetriveTransaction(payloadSeqNum, DELETE)
+	transaction, err := c.tracker.RetriveTransaction(payloadSeqNum, DELETE)
 	if err != nil {
 		xapp.Logger.Error("SubDelResp: Failed to retrive transaction record. Dropping this msg. Err: %v, SubId: %v", err, params.SubId)
 		return
 	}
-	xapp.Logger.Info("SubDelResp: SubId: %v, from address: %v:%v. Forwarding response to xApp", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort)
+	xapp.Logger.Info("SubDelResp: SubId: %v, from address: %v:%v. Forwarding response to xApp", int(payloadSeqNum), transaction.Xappkey.Addr, transaction.Xappkey.Port)
 
 	params.SubId = int(payloadSeqNum)
 	params.Xid = transaction.OrigParams.Xid
@@ -474,7 +470,7 @@
 	time.Sleep(3 * time.Second)
 
 	xapp.Logger.Info("SubDelResp: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid)
-	subRouteAction := SubRouteInfo{DELETE, transaction.XappInstanceAddress, transaction.XappPort, payloadSeqNum}
+	subRouteAction := SubRouteInfo{DELETE, transaction.Xappkey.Addr, transaction.Xappkey.Port, payloadSeqNum}
 	err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
 	if err != nil {
 		xapp.Logger.Error("SubDelResp: Failed to update routing manager. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
@@ -509,13 +505,12 @@
 
 	c.timerMap.StopTimer("RIC_SUB_DEL_REQ", int(payloadSeqNum))
 
-	var transaction Transaction
-	transaction, err = c.tracker.RetriveTransaction(payloadSeqNum, DELETE)
+	transaction, err := c.tracker.RetriveTransaction(payloadSeqNum, DELETE)
 	if err != nil {
 		xapp.Logger.Error("SubDelFail: Failed to retrive transaction record. Dropping msg. Err %v, SubId: %v", err, params.SubId)
 		return
 	}
-	xapp.Logger.Info("SubDelFail: SubId: %v, from address: %v:%v. Forwarding response to xApp", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort)
+	xapp.Logger.Info("SubDelFail: SubId: %v, from address: %v:%v. Forwarding response to xApp", int(payloadSeqNum), transaction.Xappkey.Addr, transaction.Xappkey.Port)
 
 	params.SubId = int(payloadSeqNum)
 	params.Xid = transaction.OrigParams.Xid
@@ -529,7 +524,7 @@
 	time.Sleep(3 * time.Second)
 
 	xapp.Logger.Info("SubDelFail: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid)
-	subRouteAction := SubRouteInfo{DELETE, transaction.XappInstanceAddress, transaction.XappPort, payloadSeqNum}
+	subRouteAction := SubRouteInfo{DELETE, transaction.Xappkey.Addr, transaction.Xappkey.Port, payloadSeqNum}
 	c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
 	if err != nil {
 		xapp.Logger.Error("SubDelFail: Failed to update routing manager. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
@@ -592,10 +587,10 @@
 
 	time.Sleep(3 * time.Second)
 
-	xapp.Logger.Info("SendSubDelFail: SubId: %v, from address: %v:%v. Deleting transaction record", int(subId), transaction.XappInstanceAddress, transaction.XappPort)
+	xapp.Logger.Info("SendSubDelFail: SubId: %v, from address: %v:%v. Deleting transaction record", int(subId), transaction.Xappkey.Addr, transaction.Xappkey.Port)
 
 	xapp.Logger.Info("SendSubDelFail: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid)
-	subRouteAction := SubRouteInfo{DELETE, transaction.XappInstanceAddress, transaction.XappPort, subId}
+	subRouteAction := SubRouteInfo{DELETE, transaction.Xappkey.Addr, transaction.Xappkey.Port, subId}
 	err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
 	if err != nil {
 		xapp.Logger.Error("SendSubDelFail: Failed to update routing manager. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
diff --git a/pkg/control/main_test.go b/pkg/control/main_test.go
index f74339e..2185d53 100644
--- a/pkg/control/main_test.go
+++ b/pkg/control/main_test.go
@@ -98,6 +98,25 @@
 //-----------------------------------------------------------------------------
 //
 //-----------------------------------------------------------------------------
+type testingMainControl struct {
+	testingControl
+	c *Control
+}
+
+func (mc *testingMainControl) wait_subs_clean(e2SubsId int, secs int) bool {
+	i := 1
+	for ; i <= secs*2; i++ {
+		if mc.c.registry.IsValidSequenceNumber(uint16(e2SubsId)) == false {
+			return true
+		}
+		time.Sleep(500 * time.Millisecond)
+	}
+	return false
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
 
 func testError(t *testing.T, pattern string, args ...interface{}) {
 	xapp.Logger.Error(fmt.Sprintf(pattern, args...))
@@ -123,6 +142,7 @@
 
 var xappConn *testingRmrControl
 var e2termConn *testingRmrControl
+var mainCtrl *testingMainControl
 
 func TestMain(m *testing.M) {
 	xapp.Logger.Info("TestMain start")
@@ -187,16 +207,17 @@
 	subrtfilename, _ := testCreateTmpFile(subsrt)
 	defer os.Remove(subrtfilename)
 	os.Setenv("RMR_SEED_RT", subrtfilename)
+	os.Setenv("RMR_SRC_ID", "localhost:14560")
 	xapp.Logger.Info("Using rt file %s", os.Getenv("RMR_SEED_RT"))
+	xapp.Logger.Info("Using src id  %s", os.Getenv("RMR_SRC_ID"))
 
-	mainCtrl := &testingControl{}
+	mainCtrl = &testingMainControl{}
 	mainCtrl.desc = "main"
 	mainCtrl.syncChan = make(chan struct{})
 
-	os.Setenv("RMR_SRC_ID", "localhost:14560")
-	c := NewControl()
+	mainCtrl.c = NewControl()
 	xapp.SetReadyCB(mainCtrl.ReadyCB, nil)
-	go xapp.RunWithParams(c, false)
+	go xapp.RunWithParams(mainCtrl.c, false)
 	<-mainCtrl.syncChan
 
 	//---------------------------------
diff --git a/pkg/control/messaging_test.go b/pkg/control/messaging_test.go
index 60a5dc0..7308563 100644
--- a/pkg/control/messaging_test.go
+++ b/pkg/control/messaging_test.go
@@ -34,9 +34,9 @@
 
 var e2asnpacker e2ap.E2APPackerIf = e2ap_wrapper.NewAsn1E2Packer()
 
+//-----------------------------------------------------------------------------
 //
-//
-//
+//-----------------------------------------------------------------------------
 func createSubsReq() *e2ap.E2APSubscriptionRequest {
 	req := &e2ap.E2APSubscriptionRequest{}
 
@@ -69,9 +69,9 @@
 	return req
 }
 
+//-----------------------------------------------------------------------------
 //
-//
-//
+//-----------------------------------------------------------------------------
 func createSubsResp(req *e2ap.E2APSubscriptionRequest) *e2ap.E2APSubscriptionResponse {
 
 	resp := &e2ap.E2APSubscriptionResponse{}
@@ -96,9 +96,9 @@
 	return resp
 }
 
+//-----------------------------------------------------------------------------
 //
-//
-//
+//-----------------------------------------------------------------------------
 func createSubsDelReq(e2SubsId uint32) *e2ap.E2APSubscriptionDeleteRequest {
 	req := &e2ap.E2APSubscriptionDeleteRequest{}
 	req.RequestId.Id = 1
@@ -107,9 +107,9 @@
 	return req
 }
 
+//-----------------------------------------------------------------------------
 //
-//
-//
+//-----------------------------------------------------------------------------
 func createSubsDelResp(req *e2ap.E2APSubscriptionDeleteRequest) *e2ap.E2APSubscriptionDeleteResponse {
 	resp := &e2ap.E2APSubscriptionDeleteResponse{}
 	resp.RequestId.Id = req.RequestId.Id
@@ -119,7 +119,303 @@
 }
 
 //-----------------------------------------------------------------------------
-// TestSubRequestSubResponseOk
+//
+//-----------------------------------------------------------------------------
+func handle_xapp_subs_req(t *testing.T) {
+	xapp.Logger.Info("handle_xapp_subs_req start")
+	e2SubsReq := e2asnpacker.NewPackerSubscriptionRequest()
+
+	//---------------------------------
+	// xapp activity: Send Subs Req
+	//---------------------------------
+	//select {
+	//case <-time.After(1 * time.Second):
+	xapp.Logger.Info("(xappConn) Send Subs Req")
+	req := createSubsReq()
+	e2SubsReq.Set(req)
+	xapp.Logger.Debug("%s", e2SubsReq.String())
+	err, packedMsg := e2SubsReq.Pack(nil)
+	if err != nil {
+		testError(t, "(xappConn) pack NOK %s", err.Error())
+	}
+
+	params := &xapp.RMRParams{}
+	params.Mtype = xapp.RIC_SUB_REQ
+	params.SubId = -1
+	params.Payload = packedMsg.Buf
+	params.Meid = &xapp.RMRMeid{RanName: "RAN_NAME_1"}
+	params.Xid = "XID_1"
+	params.Mbuf = nil
+
+	snderr := xappConn.RmrSend(params)
+	if snderr != nil {
+		testError(t, "(xappConn) RMR SEND FAILED: %s", snderr.Error())
+	}
+	//}
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+func handle_e2term_subs_req(t *testing.T) (*e2ap.E2APSubscriptionRequest, *xapp.RMRParams) {
+	xapp.Logger.Info("handle_e2term_subs_req start")
+	e2SubsReq := e2asnpacker.NewPackerSubscriptionRequest()
+
+	//---------------------------------
+	// e2term activity: Recv Subs Req
+	//---------------------------------
+	select {
+	case msg := <-e2termConn.rmrConChan:
+		if msg.Mtype != xapp.RICMessageTypes["RIC_SUB_REQ"] {
+			testError(t, "(e2termConn) Received non RIC_SUB_REQ message")
+		} else {
+			xapp.Logger.Info("(e2termConn) Recv Subs Req")
+			packedData := &packer.PackedData{}
+			packedData.Buf = msg.Payload
+			unpackerr := e2SubsReq.UnPack(packedData)
+			if unpackerr != nil {
+				testError(t, "(e2termConn) RIC_SUB_REQ unpack failed err: %s", unpackerr.Error())
+			}
+			geterr, req := e2SubsReq.Get()
+			if geterr != nil {
+				testError(t, "(e2termConn) RIC_SUB_REQ get failed err: %s", geterr.Error())
+			}
+			return req, msg
+		}
+	case <-time.After(15 * time.Second):
+		testError(t, "(e2termConn) Not Received RIC_SUB_REQ within 15 secs")
+	}
+	return nil, nil
+}
+
+func handle_e2term_subs_resp(t *testing.T, req *e2ap.E2APSubscriptionRequest, msg *xapp.RMRParams) {
+	xapp.Logger.Info("handle_e2term_subs_resp start")
+	e2SubsResp := e2asnpacker.NewPackerSubscriptionResponse()
+
+	//---------------------------------
+	// e2term activity: Send Subs Resp
+	//---------------------------------
+	xapp.Logger.Info("(e2termConn) Send Subs Resp")
+	resp := createSubsResp(req)
+	e2SubsResp.Set(resp)
+	xapp.Logger.Debug("%s", e2SubsResp.String())
+	packerr, packedMsg := e2SubsResp.Pack(nil)
+	if packerr != nil {
+		testError(t, "(e2termConn) pack NOK %s", packerr.Error())
+	}
+
+	params := &xapp.RMRParams{}
+	params.Mtype = xapp.RIC_SUB_RESP
+	params.SubId = msg.SubId
+	params.Payload = packedMsg.Buf
+	params.Meid = msg.Meid
+	params.Xid = msg.Xid
+	params.Mbuf = nil
+
+	snderr := e2termConn.RmrSend(params)
+	if snderr != nil {
+		testError(t, "(e2termConn) RMR SEND FAILED: %s", snderr.Error())
+	}
+}
+
+func handle_e2term_subs_reqandresp(t *testing.T) {
+	req, msg := handle_e2term_subs_req(t)
+	handle_e2term_subs_resp(t, req, msg)
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+func handle_xapp_subs_resp(t *testing.T) int {
+	xapp.Logger.Info("handle_xapp_subs_resp start")
+	e2SubsResp := e2asnpacker.NewPackerSubscriptionResponse()
+	var e2SubsId int
+
+	//---------------------------------
+	// xapp activity: Recv Subs Resp
+	//---------------------------------
+	select {
+	case msg := <-xappConn.rmrConChan:
+		if msg.Mtype != xapp.RICMessageTypes["RIC_SUB_RESP"] {
+			testError(t, "(xappConn) Received non RIC_SUB_RESP message")
+		} else {
+			xapp.Logger.Info("(xappConn) Recv Subs Resp")
+
+			packedData := &packer.PackedData{}
+			packedData.Buf = msg.Payload
+			e2SubsId = msg.SubId
+			unpackerr := e2SubsResp.UnPack(packedData)
+
+			if unpackerr != nil {
+				testError(t, "(xappConn) RIC_SUB_RESP unpack failed err: %s", unpackerr.Error())
+			}
+			geterr, _ := e2SubsResp.Get()
+			if geterr != nil {
+				testError(t, "(xappConn) RIC_SUB_RESP get failed err: %s", geterr.Error())
+			}
+
+		}
+	case <-time.After(15 * time.Second):
+		testError(t, "(xappConn) Not Received RIC_SUB_RESP within 15 secs")
+	}
+	return e2SubsId
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+func handle_xapp_subs_del_req(t *testing.T, e2SubsId int) {
+	xapp.Logger.Info("handle_xapp_subs_del_req start")
+	e2SubsDelReq := e2asnpacker.NewPackerSubscriptionDeleteRequest()
+
+	//---------------------------------
+	// xapp activity: Send Subs Del Req
+	//---------------------------------
+	//select {
+	//case <-time.After(1 * time.Second):
+	xapp.Logger.Info("(xappConn) Send Subs Del Req")
+	req := createSubsDelReq(uint32(e2SubsId))
+	e2SubsDelReq.Set(req)
+	xapp.Logger.Debug("%s", e2SubsDelReq.String())
+	err, packedMsg := e2SubsDelReq.Pack(nil)
+	if err != nil {
+		testError(t, "(xappConn) pack NOK %s", err.Error())
+	}
+
+	params := &xapp.RMRParams{}
+	params.Mtype = xapp.RIC_SUB_DEL_REQ
+	params.SubId = e2SubsId
+	params.Payload = packedMsg.Buf
+	params.Meid = &xapp.RMRMeid{RanName: "RAN_NAME_1"}
+	params.Xid = "XID_1"
+	params.Mbuf = nil
+
+	snderr := xappConn.RmrSend(params)
+	if snderr != nil {
+		testError(t, "(xappConn) RMR SEND FAILED: %s", snderr.Error())
+	}
+	//}
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+func handle_e2term_subs_del_req(t *testing.T) (*e2ap.E2APSubscriptionDeleteRequest, *xapp.RMRParams) {
+	xapp.Logger.Info("handle_e2term_subs_del_req start")
+	e2SubsDelReq := e2asnpacker.NewPackerSubscriptionDeleteRequest()
+
+	//---------------------------------
+	// e2term activity: Recv Subs Del Req
+	//---------------------------------
+	select {
+	case msg := <-e2termConn.rmrConChan:
+		if msg.Mtype != xapp.RICMessageTypes["RIC_SUB_DEL_REQ"] {
+			testError(t, "(e2termConn) Received non RIC_SUB_DEL_REQ message")
+		} else {
+			xapp.Logger.Info("(e2termConn) Recv Subs Del Req")
+
+			packedData := &packer.PackedData{}
+			packedData.Buf = msg.Payload
+			unpackerr := e2SubsDelReq.UnPack(packedData)
+			if unpackerr != nil {
+				testError(t, "(e2termConn) RIC_SUB_DEL_REQ unpack failed err: %s", unpackerr.Error())
+			}
+			geterr, req := e2SubsDelReq.Get()
+			if geterr != nil {
+				testError(t, "(e2termConn) RIC_SUB_DEL_REQ get failed err: %s", geterr.Error())
+			}
+			return req, msg
+		}
+	case <-time.After(15 * time.Second):
+		testError(t, "(e2termConn) Not Received RIC_SUB_DEL_REQ within 15 secs")
+	}
+	return nil, nil
+}
+
+func handle_e2term_subs_del_resp(t *testing.T, req *e2ap.E2APSubscriptionDeleteRequest, msg *xapp.RMRParams) {
+	xapp.Logger.Info("handle_e2term_subs_del_resp start")
+	e2SubsDelResp := e2asnpacker.NewPackerSubscriptionDeleteResponse()
+
+	//---------------------------------
+	// e2term activity: Send Subs Del Resp
+	//---------------------------------
+	xapp.Logger.Info("(e2termConn) Send Subs Del Resp")
+	resp := createSubsDelResp(req)
+	e2SubsDelResp.Set(resp)
+	xapp.Logger.Debug("%s", e2SubsDelResp.String())
+	packerr, packedMsg := e2SubsDelResp.Pack(nil)
+	if packerr != nil {
+		testError(t, "(e2termConn) pack NOK %s", packerr.Error())
+	}
+
+	params := &xapp.RMRParams{}
+	params.Mtype = xapp.RIC_SUB_DEL_RESP
+	params.SubId = msg.SubId
+	params.Payload = packedMsg.Buf
+	params.Meid = msg.Meid
+	params.Xid = msg.Xid
+	params.Mbuf = nil
+
+	snderr := e2termConn.RmrSend(params)
+	if snderr != nil {
+		testError(t, "(e2termConn) RMR SEND FAILED: %s", snderr.Error())
+	}
+
+}
+
+func handle_e2term_subs_del_reqandresp(t *testing.T) {
+	req, msg := handle_e2term_subs_del_req(t)
+	handle_e2term_subs_del_resp(t, req, msg)
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+func handle_xapp_subs_del_resp(t *testing.T) {
+	xapp.Logger.Info("handle_xapp_subs_del_resp start")
+	e2SubsDelResp := e2asnpacker.NewPackerSubscriptionDeleteResponse()
+
+	//---------------------------------
+	// xapp activity: Recv Subs Del Resp
+	//---------------------------------
+	select {
+	case msg := <-xappConn.rmrConChan:
+		if msg.Mtype != xapp.RICMessageTypes["RIC_SUB_DEL_RESP"] {
+			testError(t, "(xappConn) Received non RIC_SUB_DEL_RESP message")
+		} else {
+			xapp.Logger.Info("(xappConn) Recv Subs Del Resp")
+
+			packedData := &packer.PackedData{}
+			packedData.Buf = msg.Payload
+			unpackerr := e2SubsDelResp.UnPack(packedData)
+			if unpackerr != nil {
+				testError(t, "(xappConn) RIC_SUB_DEL_RESP unpack failed err: %s", unpackerr.Error())
+			}
+			geterr, _ := e2SubsDelResp.Get()
+			if geterr != nil {
+				testError(t, "(xappConn) RIC_SUB_DEL_RESP get failed err: %s", geterr.Error())
+			}
+
+		}
+	case <-time.After(15 * time.Second):
+		testError(t, "(xappConn) Not Received RIC_SUB_DEL_RESP within 15 secs")
+	}
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+func handle_wait_subs_clean(t *testing.T, e2SubsId int) bool {
+	xapp.Logger.Info("handle_wait_subs_clean start")
+	if mainCtrl.wait_subs_clean(e2SubsId, 10) == false {
+		testError(t, "(general) no clean within 10 secs")
+		return false
+	}
+	return true
+}
+
+//-----------------------------------------------------------------------------
+// TestSubReqAndSubDelOk
 //
 //   stub                          stub
 // +-------+     +---------+    +---------+
@@ -153,215 +449,114 @@
 //
 //-----------------------------------------------------------------------------
 func TestSubReqAndSubDelOk(t *testing.T) {
-
 	xapp.Logger.Info("TestSubReqAndSubDelOk start")
-	e2SubsReq := e2asnpacker.NewPackerSubscriptionRequest()
-	e2SubsResp := e2asnpacker.NewPackerSubscriptionResponse()
-	e2SubsDelReq := e2asnpacker.NewPackerSubscriptionDeleteRequest()
-	e2SubsDelResp := e2asnpacker.NewPackerSubscriptionDeleteResponse()
-	var e2SubsId int
 
-	//---------------------------------
-	// xapp activity: Send Subs Req
-	//---------------------------------
-	select {
-	case <-time.After(5 * time.Second):
-		xapp.Logger.Info("(xappConn) Send Subs Req")
-		req := createSubsReq()
-		e2SubsReq.Set(req)
-		xapp.Logger.Debug("%s", e2SubsReq.String())
-		err, packedMsg := e2SubsReq.Pack(nil)
-		if err != nil {
-			testError(t, "(xappConn) pack NOK %s", err.Error())
-		}
+	handle_xapp_subs_req(t)
+	handle_e2term_subs_reqandresp(t)
+	e2SubsId := handle_xapp_subs_resp(t)
 
-		params := &xapp.RMRParams{}
-		params.Mtype = xapp.RIC_SUB_REQ
-		params.SubId = -1
-		params.Payload = packedMsg.Buf
-		params.Meid = &xapp.RMRMeid{RanName: "RAN_NAME_1"}
-		params.Xid = "XID_1"
-		params.Mbuf = nil
+	handle_xapp_subs_del_req(t, e2SubsId)
+	handle_e2term_subs_del_reqandresp(t)
+	handle_xapp_subs_del_resp(t)
 
-		snderr := xappConn.RmrSend(params)
-		if snderr != nil {
-			testError(t, "(xappConn) RMR SEND FAILED: %s", snderr.Error())
-		}
-	}
+	//Wait that subs is cleaned
+	handle_wait_subs_clean(t, e2SubsId)
+}
 
-	//---------------------------------
-	// e2term activity: Recv Subs Req & Send Subs Resp
-	//---------------------------------
-	select {
-	case msg := <-e2termConn.rmrConChan:
-		if msg.Mtype != xapp.RICMessageTypes["RIC_SUB_REQ"] {
-			testError(t, "(e2termConn) Received non RIC_SUB_REQ message")
-		} else {
-			xapp.Logger.Info("(e2termConn) Recv Subs Req & Send Subs Resp")
-			packedData := &packer.PackedData{}
-			packedData.Buf = msg.Payload
-			unpackerr := e2SubsReq.UnPack(packedData)
-			if unpackerr != nil {
-				testError(t, "(e2termConn) RIC_SUB_REQ unpack failed err: %s", unpackerr.Error())
-			}
-			geterr, req := e2SubsReq.Get()
-			if geterr != nil {
-				testError(t, "(e2termConn) RIC_SUB_REQ get failed err: %s", geterr.Error())
-			}
+//-----------------------------------------------------------------------------
+// TestSubReqRetransmission
+//
+//   stub                          stub
+// +-------+     +---------+    +---------+
+// | xapp  |     | submgr  |    | e2term  |
+// +-------+     +---------+    +---------+
+//     |              |              |
+//     |  SubReq      |              |
+//     |------------->|              |
+//     |              |              |
+//     |              | SubReq       |
+//     |              |------------->|
+//     |              |              |
+//     |  SubReq      |              |
+//     | (retrans)    |              |
+//     |------------->|              |
+//     |              |              |
+//     |              |      SubResp |
+//     |              |<-------------|
+//     |              |              |
+//     |      SubResp |              |
+//     |<-------------|              |
+//     |              |              |
+//     |         [SUBS DELETE]       |
+//     |              |              |
+//
+//-----------------------------------------------------------------------------
+func TestSubReqRetransmission(t *testing.T) {
+	xapp.Logger.Info("TestSubReqRetransmission start")
 
-			resp := createSubsResp(req)
-			e2SubsResp.Set(resp)
-			xapp.Logger.Debug("%s", e2SubsResp.String())
-			packerr, packedMsg := e2SubsResp.Pack(nil)
-			if packerr != nil {
-				testError(t, "(e2termConn) pack NOK %s", packerr.Error())
-			}
+	//Subs Create
+	handle_xapp_subs_req(t)
+	req, msg := handle_e2term_subs_req(t)
+	handle_xapp_subs_req(t)
 
-			params := &xapp.RMRParams{}
-			params.Mtype = xapp.RIC_SUB_RESP
-			params.SubId = msg.SubId
-			params.Payload = packedMsg.Buf
-			params.Meid = msg.Meid
-			params.Xid = msg.Xid
-			params.Mbuf = nil
+	handle_e2term_subs_resp(t, req, msg)
 
-			snderr := e2termConn.RmrSend(params)
-			if snderr != nil {
-				testError(t, "(e2termConn) RMR SEND FAILED: %s", snderr.Error())
-			}
+	e2SubsId := handle_xapp_subs_resp(t)
 
-		}
-	case <-time.After(15 * time.Second):
-		testError(t, "(e2termConn) Not Received RIC_SUB_REQ within 15 secs")
-	}
+	//Subs Delete
+	handle_xapp_subs_del_req(t, e2SubsId)
+	handle_e2term_subs_del_reqandresp(t)
+	handle_xapp_subs_del_resp(t)
 
-	//---------------------------------
-	// xapp activity: Recv Subs Resp
-	//---------------------------------
-	select {
-	case msg := <-xappConn.rmrConChan:
-		if msg.Mtype != xapp.RICMessageTypes["RIC_SUB_RESP"] {
-			testError(t, "(xappConn) Received non RIC_SUB_RESP message")
-		} else {
-			xapp.Logger.Info("(xappConn) Recv Subs Resp")
+	//Wait that subs is cleaned
+	handle_wait_subs_clean(t, e2SubsId)
+}
 
-			packedData := &packer.PackedData{}
-			packedData.Buf = msg.Payload
-			e2SubsId = msg.SubId
-			unpackerr := e2SubsResp.UnPack(packedData)
+//-----------------------------------------------------------------------------
+// TestSubDelReqRetransmission
+//
+//   stub                          stub
+// +-------+     +---------+    +---------+
+// | xapp  |     | submgr  |    | e2term  |
+// +-------+     +---------+    +---------+
+//     |              |              |
+//     |         [SUBS CREATE]       |
+//     |              |              |
+//     |              |              |
+//     | SubDelReq    |              |
+//     |------------->|              |
+//     |              |              |
+//     |              | SubDelReq    |
+//     |              |------------->|
+//     |              |              |
+//     | SubDelReq    |              |
+//     |------------->|              |
+//     |              |              |
+//     |              |   SubDelResp |
+//     |              |<-------------|
+//     |              |              |
+//     |   SubDelResp |              |
+//     |<-------------|              |
+//
+//-----------------------------------------------------------------------------
+func TestSubDelReqRetransmission(t *testing.T) {
+	xapp.Logger.Info("TestSubDelReqRetransmission start")
 
-			if unpackerr != nil {
-				testError(t, "(xappConn) RIC_SUB_RESP unpack failed err: %s", unpackerr.Error())
-			}
-			geterr, _ := e2SubsResp.Get()
-			if geterr != nil {
-				testError(t, "(xappConn) RIC_SUB_RESP get failed err: %s", geterr.Error())
-			}
+	//Subs Create
+	handle_xapp_subs_req(t)
+	handle_e2term_subs_reqandresp(t)
+	e2SubsId := handle_xapp_subs_resp(t)
 
-		}
-	case <-time.After(15 * time.Second):
-		testError(t, "(xappConn) Not Received RIC_SUB_RESP within 15 secs")
-	}
+	//Subs Delete
+	handle_xapp_subs_del_req(t, e2SubsId)
+	req, msg := handle_e2term_subs_del_req(t)
 
-	//---------------------------------
-	// xapp activity: Send Subs Del Req
-	//---------------------------------
-	select {
-	case <-time.After(2 * time.Second):
-		xapp.Logger.Info("(xappConn) Send Subs Del Req")
-		req := createSubsDelReq(uint32(e2SubsId))
-		e2SubsDelReq.Set(req)
-		xapp.Logger.Debug("%s", e2SubsDelReq.String())
-		err, packedMsg := e2SubsDelReq.Pack(nil)
-		if err != nil {
-			testError(t, "(xappConn) pack NOK %s", err.Error())
-		}
+	<-time.After(2 * time.Second)
 
-		params := &xapp.RMRParams{}
-		params.Mtype = xapp.RIC_SUB_DEL_REQ
-		params.SubId = e2SubsId
-		params.Payload = packedMsg.Buf
-		params.Meid = &xapp.RMRMeid{RanName: "RAN_NAME_1"}
-		params.Xid = "XID_1"
-		params.Mbuf = nil
+	handle_xapp_subs_del_req(t, e2SubsId)
 
-		snderr := xappConn.RmrSend(params)
-		if snderr != nil {
-			testError(t, "(xappConn) RMR SEND FAILED: %s", snderr.Error())
-		}
-	}
+	handle_e2term_subs_del_resp(t, req, msg)
 
-	//---------------------------------
-	// e2term activity: Recv Subs Del Req & Send Subs Del Resp
-	//---------------------------------
-	select {
-	case msg := <-e2termConn.rmrConChan:
-		if msg.Mtype != xapp.RICMessageTypes["RIC_SUB_DEL_REQ"] {
-			testError(t, "(e2termConn) Received non RIC_SUB_DEL_REQ message")
-		} else {
-			xapp.Logger.Info("(e2termConn) Recv Subs Del Req & Send Subs Del Resp")
-
-			packedData := &packer.PackedData{}
-			packedData.Buf = msg.Payload
-			unpackerr := e2SubsDelReq.UnPack(packedData)
-			if unpackerr != nil {
-				testError(t, "(e2termConn) RIC_SUB_DEL_REQ unpack failed err: %s", unpackerr.Error())
-			}
-			geterr, req := e2SubsDelReq.Get()
-			if geterr != nil {
-				testError(t, "(e2termConn) RIC_SUB_DEL_REQ get failed err: %s", geterr.Error())
-			}
-
-			resp := createSubsDelResp(req)
-			e2SubsDelResp.Set(resp)
-			xapp.Logger.Debug("%s", e2SubsDelResp.String())
-			packerr, packedMsg := e2SubsDelResp.Pack(nil)
-			if packerr != nil {
-				testError(t, "(e2termConn) pack NOK %s", packerr.Error())
-			}
-
-			params := &xapp.RMRParams{}
-			params.Mtype = xapp.RIC_SUB_DEL_RESP
-			params.SubId = msg.SubId
-			params.Payload = packedMsg.Buf
-			params.Meid = msg.Meid
-			params.Xid = msg.Xid
-			params.Mbuf = nil
-
-			snderr := e2termConn.RmrSend(params)
-			if snderr != nil {
-				testError(t, "(e2termConn) RMR SEND FAILED: %s", snderr.Error())
-			}
-
-		}
-	case <-time.After(15 * time.Second):
-		testError(t, "(e2termConn) Not Received RIC_SUB_DEL_REQ within 15 secs")
-	}
-
-	//---------------------------------
-	// xapp activity: Recv Subs Del Resp
-	//---------------------------------
-	select {
-	case msg := <-xappConn.rmrConChan:
-		if msg.Mtype != xapp.RICMessageTypes["RIC_SUB_DEL_RESP"] {
-			testError(t, "(xappConn) Received non RIC_SUB_DEL_RESP message")
-		} else {
-			xapp.Logger.Info("(xappConn) Recv Subs Del Resp")
-
-			packedData := &packer.PackedData{}
-			packedData.Buf = msg.Payload
-			unpackerr := e2SubsDelResp.UnPack(packedData)
-			if unpackerr != nil {
-				testError(t, "(xappConn) RIC_SUB_DEL_RESP unpack failed err: %s", unpackerr.Error())
-			}
-			geterr, _ := e2SubsDelResp.Get()
-			if geterr != nil {
-				testError(t, "(xappConn) RIC_SUB_DEL_RESP get failed err: %s", geterr.Error())
-			}
-
-		}
-	case <-time.After(15 * time.Second):
-		testError(t, "(xappConn) Not Received RIC_SUB_DEL_RESP within 15 secs")
-	}
-
+	//Wait that subs is cleaned
+	handle_wait_subs_clean(t, e2SubsId)
 }
diff --git a/pkg/control/tracker.go b/pkg/control/tracker.go
index 1682ae7..65f816e 100644
--- a/pkg/control/tracker.go
+++ b/pkg/control/tracker.go
@@ -21,83 +21,101 @@
 
 import (
 	"fmt"
+	"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
 	"sync"
 )
 
+type TransactionKey struct {
+	SubID     uint16 // subscription id / sequence number
+	TransType Action // action ongoing (CREATE/DELETE etc)
+}
+
+type TransactionXappKey struct {
+	Addr string // xapp addr
+	Port uint16 // xapp port
+	Xid  string // xapp xid in req
+}
+
+type Transaction struct {
+	tracker    *Tracker           // tracker instance
+	Key        TransactionKey     // action key
+	Xappkey    TransactionXappKey // transaction key
+	OrigParams *xapp.RMRParams    // request orginal params
+}
+
+func (t *Transaction) SubRouteInfo() SubRouteInfo {
+	return SubRouteInfo{t.Key.TransType, t.Xappkey.Addr, t.Xappkey.Port, t.Key.SubID}
+}
+
 /*
 Implements a record of ongoing transactions and helper functions to CRUD the records.
 */
 type Tracker struct {
-	transactionTable map[TransactionKey]Transaction
-	mutex            sync.Mutex
+	transactionTable     map[TransactionKey]*Transaction
+	transactionXappTable map[TransactionXappKey]*Transaction
+	mutex                sync.Mutex
 }
 
 func (t *Tracker) Init() {
-	t.transactionTable = make(map[TransactionKey]Transaction)
+	t.transactionTable = make(map[TransactionKey]*Transaction)
+	t.transactionXappTable = make(map[TransactionXappKey]*Transaction)
 }
 
 /*
 Checks if a tranascation with similar type has been ongoing. If not then creates one.
 Returns error if there is similar transatcion ongoing.
 */
-func (t *Tracker) TrackTransaction(key TransactionKey, xact Transaction) error {
+func (t *Tracker) TrackTransaction(subID uint16, act Action, addr string, port uint16, params *xapp.RMRParams) (*Transaction, error) {
+	key := TransactionKey{subID, act}
+	xappkey := TransactionXappKey{addr, port, params.Xid}
+	trans := &Transaction{t, key, xappkey, params}
 	t.mutex.Lock()
 	defer t.mutex.Unlock()
 	if _, ok := t.transactionTable[key]; ok {
 		// TODO: Implement merge related check here. If the key is same but the value is different.
-		err := fmt.Errorf("transaction tracker: Similar transaction with sub id %d and type %s is ongoing", key.SubID, key.transType)
-		return err
+		err := fmt.Errorf("transaction tracker: Similar transaction with sub id %d and type %s is ongoing", key.SubID, key.TransType)
+		return nil, err
 	}
-	t.transactionTable[key] = xact
-	return nil
+	if _, ok := t.transactionXappTable[xappkey]; ok {
+		// TODO: Implement merge related check here. If the key is same but the value is different.
+		err := fmt.Errorf("transaction tracker: Similar transaction with xapp key %v is ongoing", xappkey)
+		return nil, err
+	}
+	t.transactionTable[key] = trans
+	t.transactionXappTable[xappkey] = trans
+	return trans, nil
 }
 
 /*
 Retreives the transaction table entry for the given request.
 Returns error in case the transaction cannot be found.
 */
-func (t *Tracker) UpdateTransaction(SubID uint16, transType Action, xact Transaction) error {
-	key := TransactionKey{SubID, transType}
-	t.mutex.Lock()
-	defer t.mutex.Unlock()
-	if _, ok := t.transactionTable[key]; ok {
-		// TODO: Implement merge related check here. If the key is same but the value is different.
-		err := fmt.Errorf("transaction tracker: Similar transaction with sub id %d and type %v is ongoing", key.SubID, key.transType)
-		return err
-	}
-	t.transactionTable[key] = xact
-	return nil
-}
-
-/*
-Retreives the transaction table entry for the given request.
-Returns error in case the transaction cannot be found.
-*/
-func (t *Tracker) RetriveTransaction(subID uint16, act Action) (Transaction, error) {
+func (t *Tracker) RetriveTransaction(subID uint16, act Action) (*Transaction, error) {
 	key := TransactionKey{subID, act}
 	t.mutex.Lock()
 	defer t.mutex.Unlock()
-	var xact Transaction
-	if xact, ok := t.transactionTable[key]; ok {
-		return xact, nil
+	if trans, ok := t.transactionTable[key]; ok {
+		return trans, nil
 	}
 	err := fmt.Errorf("transaction record for Subscription ID %d and action %s does not exist", subID, act)
-	return xact, err
+	return nil, err
 }
 
 /*
 Deletes the transaction table entry for the given request and returns the deleted xapp's address and port for reference.
 Returns error in case the transaction cannot be found.
 */
-func (t *Tracker) completeTransaction(subID uint16, act Action) (Transaction, error) {
+func (t *Tracker) completeTransaction(subID uint16, act Action) (*Transaction, error) {
 	key := TransactionKey{subID, act}
-	var emptyTransaction Transaction
 	t.mutex.Lock()
 	defer t.mutex.Unlock()
-	if xact, ok := t.transactionTable[key]; ok {
+	if trans, ok1 := t.transactionTable[key]; ok1 {
+		if _, ok2 := t.transactionXappTable[trans.Xappkey]; ok2 {
+			delete(t.transactionXappTable, trans.Xappkey)
+		}
 		delete(t.transactionTable, key)
-		return xact, nil
+		return trans, nil
 	}
 	err := fmt.Errorf("transaction record for Subscription ID %d and action %s does not exist", subID, act)
-	return emptyTransaction, err
+	return nil, err
 }
diff --git a/pkg/control/types.go b/pkg/control/types.go
index 3a9587f..83312d8 100644
--- a/pkg/control/types.go
+++ b/pkg/control/types.go
@@ -19,10 +19,6 @@
 
 package control
 
-import (
-	"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
-)
-
 type RmrDatagram struct {
 	MessageType    int
 	SubscriptionId uint16
@@ -37,14 +33,3 @@
 }
 
 type Action int
-
-type TransactionKey struct {
-	SubID     uint16
-	transType Action
-}
-
-type Transaction struct {
-	XappInstanceAddress string
-	XappPort            uint16
-	OrigParams          *xapp.RMRParams
-}