New submgr for testing. Tagged as ric-plt-submgr:r3-test-v2. Tested that submgr can receive and forward messages correctly.

Signed-off-by: Anssi Mannila <anssi.mannila@nokia.com>
Change-Id: Ib6a472c3205fbf2e2d9a030282b7071a12ee8b0d
diff --git a/pkg/control/client.go b/pkg/control/client.go
index 14e2455..d377d8e 100644
--- a/pkg/control/client.go
+++ b/pkg/control/client.go
@@ -35,14 +35,10 @@
 	xappDeleteParams *rtmgrhandle.DeleteXappSubscriptionHandleParams
 }
 
-func (rc *RtmgrClient) SubscriptionRequestUpdate() error {
+func (rc *RtmgrClient) SubscriptionRequestUpdate(subRouteAction SubRouteInfo) error {
 	xapp.Logger.Debug("SubscriptionRequestUpdate() invoked")
-	subRouteAction := <-SubscriptionReqChan
-	// Routing manager handles subscription id as int32 to accomodate -1 and uint16 values
 	subID := int32(subRouteAction.SubID)
-
-	xapp.Logger.Debug("Subscription action details received: ", subRouteAction)
-
+	xapp.Logger.Debug("Subscription action details received. subRouteAction.Command: %v, Address %s, Port %v, subID %v", int16(subRouteAction.Command), subRouteAction.Address, subRouteAction.Port, subID)
 	xappSubReq := rtmgr_models.XappSubscriptionData{&subRouteAction.Address, &subRouteAction.Port, &subID}
 
 	switch subRouteAction.Command {
@@ -65,6 +61,7 @@
 			return nil
 		}
 	default:
+		xapp.Logger.Debug("Unknown subRouteAction.Command: %v, subID: %v", subRouteAction.Command, subRouteAction.Address, subRouteAction.Port, subID)
 		return nil
 	}
 }
diff --git a/pkg/control/control.go b/pkg/control/control.go
index dc8c5e0..af629a1 100644
--- a/pkg/control/control.go
+++ b/pkg/control/control.go
@@ -32,23 +32,30 @@
 	"math/rand"
 	"strconv"
 	"time"
+	"sync"	
 )
 
+var rmrSendMutex = &sync.Mutex{}
+
+var subReqTime time.Duration = 2 * time.Second
+var SubDelReqTime time.Duration = 2 * time.Second
+
 type Control struct {
 	e2ap        *E2ap
 	registry    *Registry
 	rtmgrClient *RtmgrClient
 	tracker     *Tracker
 	rcChan      chan *xapp.RMRParams
+	timerMap	*TimerMap
 }
 
 type RMRMeid struct {
 	PlmnID string
 	EnbID  string
+	RanName string
 }
 
 var seedSN uint16
-var SubscriptionReqChan = make(chan SubRouteInfo, 10)
 
 const (
 	CREATE Action = 0
@@ -57,6 +64,7 @@
 )
 
 func init() {
+	xapp.Logger.Info("SUBMGR /ric-plt-submgr:r3-test-v2")
 	viper.AutomaticEnv()
 	viper.SetEnvPrefix("submgr")
 	viper.AllowEmptyEnv(true)
@@ -78,13 +86,16 @@
 	tracker := new(Tracker)
 	tracker.Init()
 
+	timerMap := new(TimerMap)
+	timerMap.Init()
+
 	transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
 	client := rtmgrclient.New(transport, strfmt.Default)
 	handle := rtmgrhandle.NewProvideXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
 	deleteHandle := rtmgrhandle.NewDeleteXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
 	rtmgrClient := RtmgrClient{client, handle, deleteHandle}
 
-	return Control{new(E2ap), registry, &rtmgrClient, tracker, make(chan *xapp.RMRParams)}
+	return Control{new(E2ap), registry, &rtmgrClient, tracker, make(chan *xapp.RMRParams),timerMap}
 }
 
 func (c *Control) Run() {
@@ -98,16 +109,33 @@
 }
 
 func (c *Control) rmrSend(params *xapp.RMRParams) (err error) {
+	status := false
+	i := 1
+	rmrSendMutex.Lock()
+	for ; i <= 10 && status == false; i++ { 
+		status = xapp.Rmr.Send(params, false)
+		if status == false {
+			xapp.Logger.Info("rmr.Send() failed. Retry count %v, Mtype: %v, SubId: %v, Xid %s",i, params.Mtype, params.SubId, params.Xid)
+			time.Sleep(500 * time.Millisecond)
+		}
+	}
+	if status == false {
+		err = errors.New("rmr.Send() failed")
+		xapp.Rmr.Free(params.Mbuf)
+	}
+	rmrSendMutex.Unlock()
+	
+	/*
 	if !xapp.Rmr.Send(params, false) {
 		err = errors.New("rmr.Send() failed")
+		xapp.Rmr.Free(params.Mbuf)
 	}
+	*/	
 	return
 }
 
 func (c *Control) rmrReplyToSender(params *xapp.RMRParams) (err error) {
-	if !xapp.Rmr.Send(params, true) {
-		err = errors.New("rmr.Send() failed")
-	}
+	c.rmrSend(params)
 	return
 }
 
@@ -116,13 +144,15 @@
 		msg := <-c.rcChan
 		switch msg.Mtype {
 		case xapp.RICMessageTypes["RIC_SUB_REQ"]:
-			c.handleSubscriptionRequest(msg)
+			go c.handleSubscriptionRequest(msg)
 		case xapp.RICMessageTypes["RIC_SUB_RESP"]:
-			c.handleSubscriptionResponse(msg)
+			go c.handleSubscriptionResponse(msg)
+		case xapp.RICMessageTypes["RIC_SUB_FAILURE"]:
+			go c.handleSubscriptionFailure(msg)
 		case xapp.RICMessageTypes["RIC_SUB_DEL_REQ"]:
-			c.handleSubscriptionDeleteRequest(msg)
+			go c.handleSubscriptionDeleteRequest(msg)
 		case xapp.RICMessageTypes["RIC_SUB_DEL_RESP"]:
-			c.handleSubscriptionDeleteResponse(msg)
+			go c.handleSubscriptionDeleteResponse(msg)
 		default:
 			err := errors.New("Message Type " + strconv.Itoa(msg.Mtype) + " is discarded")
 			xapp.Logger.Error("Unknown message type: %v", err)
@@ -131,15 +161,16 @@
 }
 
 func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) (err error) {
-	payloadSeqNum, err := c.e2ap.GetSubscriptionRequestSequenceNumber(params.Payload)
-	if err != nil {
-		err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
-		return
-	}
-	xapp.Logger.Info("Subscription Request Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payloadSeqNum)
+	xapp.Logger.Info("Subscription Request Received from Src: %s, Mtype: %v, SubId: %v, Xid: %s, Meid: %v",params.Src, params.Mtype, params.SubId, params.Xid, params.Meid)
+	xapp.Rmr.Free(params.Mbuf)
+	params.Mbuf = nil
 
 	/* Reserve a sequence number and set it in the payload */
-	newSubId := c.registry.ReserveSequenceNumber()
+	newSubId, isIdValid := c.registry.ReserveSequenceNumber()
+	if isIdValid != true {
+		xapp.Logger.Info("Further processing of this SubscriptionRequest stopped. SubId: %v, Xid: %s",params.SubId, params.Xid)
+		return 
+	}
 
 	_, err = c.e2ap.SetSubscriptionRequestSequenceNumber(params.Payload, newSubId)
 	if err != nil {
@@ -164,39 +195,165 @@
 
 	/* Update routing manager about the new subscription*/
 	subRouteAction := SubRouteInfo{CREATE, *srcAddr, *srcPort, newSubId}
-	go c.rtmgrClient.SubscriptionRequestUpdate()
-	SubscriptionReqChan <- subRouteAction
+	xapp.Logger.Info("Starting routing manager update")
+	c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
+
+	//time.Sleep(3 * time.Second)
 
 	// Setting new subscription ID in the RMR header
 	params.SubId = int(newSubId)
-
-	xapp.Logger.Info("Generated ID: %v. Forwarding to E2 Termination...", int(newSubId))
-	c.rmrSend(params)
+	xapp.Logger.Info("Forwarding Subscription Request to E2T: Mtype: %v, SubId: %v, Xid %s, Meid %v",params.Mtype, params.SubId, params.Xid, params.Meid)
+	err = c.rmrSend(params)
+	if err != nil {
+		xapp.Logger.Error("Failed to send request to E2T %v. SubId: %v, Xid: %s", err, params.SubId, params.Xid)
+	} /*else {
+		c.timerMap.StartTimer(newSubId, subReqTime, c.handleSubscriptionRequestTimer)
+	}*/
 	xapp.Logger.Debug("--- Debugging transaction table = %v", c.tracker.transactionTable)
 	return
 }
 
 func (c *Control) handleSubscriptionResponse(params *xapp.RMRParams) (err error) {
+	xapp.Logger.Info("Subscription Response Received from Src: %s, Mtype: %v, SubId: %v, Meid: %v",params.Src, params.Mtype, params.SubId, params.Xid, params.Meid)
+	xapp.Rmr.Free(params.Mbuf)
+	params.Mbuf = nil
+
 	payloadSeqNum, err := c.e2ap.GetSubscriptionResponseSequenceNumber(params.Payload)
 	if err != nil {
 		err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
 		return
 	}
-	xapp.Logger.Info("Subscription Response Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payloadSeqNum)
+
+	xapp.Logger.Info("Received payloadSeqNum: %v",payloadSeqNum)
 	if !c.registry.IsValidSequenceNumber(payloadSeqNum) {
 		err = errors.New("Unknown Subscription ID: " + strconv.Itoa(int(payloadSeqNum)) + " in Subscritpion Response. Message discarded.")
 		return
 	}
+
+//	c.timerMap.StopTimer(payloadSeqNum)
+
 	c.registry.setSubscriptionToConfirmed(payloadSeqNum)
-	xapp.Logger.Info("Subscription Response Registered. Forwarding to Requestor...")
-	transaction, err := c.tracker.completeTransaction(payloadSeqNum, CREATE)
+	var transaction Transaction
+	transaction, err = c.tracker.RetriveTransaction(payloadSeqNum, CREATE)
+	if err != nil {
+		xapp.Logger.Error("Failed to retrive transaction record. Err: %v", err)
+		xapp.Logger.Info("Further processing of this Subscription Response stopped. SubId: %v, Xid: %s",params.SubId, params.Xid)
+		return
+	}
+	xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Retrieved old subId...", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort)
+
+    params.SubId = int(payloadSeqNum)
+    params.Xid = transaction.OrigParams.Xid
+	
+	xapp.Logger.Info("Forwarding Subscription Response to UEEC: Mtype: %v, SubId: %v, Xid: %s, Meid: %v",params.Mtype, params.SubId, params.Xid, params.Meid)
+	err = c.rmrReplyToSender(params)
+	if err != nil {
+		xapp.Logger.Error("Failed to send response to requestor %v. SubId: %v, Xid: %s", err, params.SubId, params.Xid)
+	}
+
+	xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Deleting transaction record", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort)
+	transaction, err = c.tracker.completeTransaction(payloadSeqNum, CREATE)
 	if err != nil {
 		xapp.Logger.Error("Failed to delete a Subscription Request transaction record due to %v", err)
 		return
 	}
-	xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Forwarding to E2 Termination...", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort)
-	params.Mbuf = transaction.OrigParams.Mbuf
-	c.rmrReplyToSender(params)
+	return
+}
+
+func (c *Control) handleSubscriptionFailure(params *xapp.RMRParams) (err error) {
+	xapp.Logger.Info("Subscription Failure Received from Src: %s, Mtype: %v, SubId: %v, Meid: %v",params.Src, params.Mtype, params.SubId, params.Meid)
+	xapp.Rmr.Free(params.Mbuf)
+	params.Mbuf = nil
+
+	payloadSeqNum, err := c.e2ap.GetSubscriptionFailureSequenceNumber(params.Payload)
+	if err != nil {
+		err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
+		return
+	}
+	xapp.Logger.Info("Received payloadSeqNum: %v", payloadSeqNum)
+
+	// should here be IsValidSequenceNumber check?
+
+//	c.timerMap.StopTimer(payloadSeqNum)
+
+	var transaction Transaction
+	transaction, err = c.tracker.RetriveTransaction(payloadSeqNum, CREATE)
+	if  err != nil {
+		xapp.Logger.Error("Failed to retrive transaction record. Err %v", err)
+		xapp.Logger.Info("Further processing of this Subscription Failure stopped. SubId: %v, Xid: %s",params.SubId, params.Xid)
+		return
+	}
+	xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Forwarding response to requestor...", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort)
+
+	params.SubId = int(payloadSeqNum)
+	params.Xid = transaction.OrigParams.Xid
+
+	xapp.Logger.Info("Forwarding Subscription Failure to UEEC: Mtype: %v, SubId: %v, Xid: %v, Meid: %v",params.Mtype, params.SubId, params.Xid, params.Meid)
+	err = c.rmrReplyToSender(params)
+	if err != nil {
+		xapp.Logger.Error("Failed to send response to requestor %v. SubId: %v, Xid: %s", err, params.SubId, params.Xid)
+	}
+
+	time.Sleep(3 * time.Second)
+
+	xapp.Logger.Info("Starting routing manager update")
+	subRouteAction := SubRouteInfo{DELETE, transaction.XappInstanceAddress, transaction.XappPort, payloadSeqNum}
+	c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
+
+	xapp.Logger.Info("Deleting trancaction record")
+	if c.registry.releaseSequenceNumber(payloadSeqNum) {
+		transaction, err = c.tracker.completeTransaction(payloadSeqNum, CREATE)
+		if err != nil {
+			xapp.Logger.Error("Failed to delete a Subscription Request transaction record due to %v", err)
+			return
+		}
+	}
+	return
+}
+
+func (c *Control) handleSubscriptionRequestTimer(subId uint16) {
+	xapp.Logger.Info("Subscription Request timer expired. SubId: %v",subId)
+/*	
+	transaction, err := c.tracker.completeTransaction(subId, CREATE)
+	if err != nil {
+		xapp.Logger.Error("Failed to delete a Subscription Request transaction record due to %v", err)
+		return
+	}
+	xapp.Logger.Info("SubId: %v, Xid %v, Meid: %v",subId, transaction.OrigParams.Xid, transaction.OrigParams.Meid)
+
+	var params xapp.RMRParams
+	params.Mtype = 12012 //xapp.RICMessageTypes["RIC_SUB_FAILURE"]
+	params.SubId = int(subId)
+	params.Meid = transaction.OrigParams.Meid
+	params.Xid = transaction.OrigParams.Xid
+	payload := []byte("40C9408098000003EA7E00050000010016EA6300020021EA6E00808180EA6F000400000000EA6F000400010040EA6F000400020080EA6F0004000300C0EA6F000400040100EA6F000400050140EA6F000400060180EA6F0004000701C0EA6F000400080200EA6F000400090240EA6F0004000A0280EA6F0004000B02C0EA6F0004000C0300EA6F0004000D0340EA6F0004000E0380EA6F0004000F03C0")
+	params.PayloadLen = len(payload)
+	params.Payload = payload
+
+	xapp.Logger.Info("Forwarding Subscription Failure to UEEC: Mtype: %v, SubId: %v, Xid: %s, Meid: %v",params.Mtype, params.SubId, params.Xid, params.Meid)
+	err = c.rmrReplyToSender(&params)
+	if err != nil {
+		xapp.Logger.Error("Failed to send response to requestor %v. SubId: %v, Xid: %s", err, params.SubId, params.Xid)
+	}
+*/
+/*
+	time.Sleep(3 * time.Second)
+
+	xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Deleting transaction record", int(subId), transaction.XappInstanceAddress, transaction.XappPort)
+
+	xapp.Logger.Info("Starting routing manager update")
+	subRouteAction := SubRouteInfo{DELETE, transaction.XappInstanceAddress, transaction.XappPort, payloadSeqNum}
+	c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
+
+	xapp.Logger.Info("Deleting trancaction record")
+	if c.registry.releaseSequenceNumber(payloadSeqNum) {
+		transaction, err = c.tracker.completeTransaction(payloadSeqNum, CREATE)
+		if err != nil {
+			xapp.Logger.Error("Failed to delete a Subscription Request transaction record due to %v", err)
+			return
+		}
+	}
+*/
 	return
 }
 
@@ -223,12 +380,16 @@
 }
 
 func (c *Control) handleSubscriptionDeleteRequest(params *xapp.RMRParams) (err error) {
+	xapp.Logger.Info("Subscription Delete Request Received from Src: %s, Mtype: %v, SubId: %v, Xid: %s, Meid: %v",params.Src, params.Mtype, params.SubId, params.Xid, params.Meid)
+	xapp.Rmr.Free(params.Mbuf)
+	params.Mbuf = nil
+
 	payloadSeqNum, err := c.e2ap.GetSubscriptionDeleteRequestSequenceNumber(params.Payload)
 	if err != nil {
 		err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
 		return
 	}
-	xapp.Logger.Info("Subscription Delete Request Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payloadSeqNum)
+	xapp.Logger.Info("Received payloadSeqNum: %v", payloadSeqNum)
 	if c.registry.IsValidSequenceNumber(payloadSeqNum) {
 		c.registry.deleteSubscription(payloadSeqNum)
 		trackErr := c.trackDeleteTransaction(params, payloadSeqNum)
@@ -237,8 +398,14 @@
 			return trackErr
 		}
 	}
-	xapp.Logger.Info("Subscription ID: %v. Forwarding to E2 Termination...", int(payloadSeqNum))
+
+	xapp.Logger.Info("Forwarding Delete Subscription Request to E2T: Mtype: %v, SubId: %v, Xid: %s, Meid: %v",params.Mtype, params.SubId, params.Xid, params.Meid)
 	c.rmrSend(params)
+	if err != nil {
+		xapp.Logger.Error("Failed to send request to E2T %v. SubId: %v, Xid: %s", err, params.SubId, params.Xid)
+	} /*else {
+		c.timerMap.StartTimer(payloadSeqNum, SubDelReqTime, c.handleSubscriptionDeleteRequestTimer)
+	}*/
 	return
 }
 
@@ -254,27 +421,146 @@
 }
 
 func (c *Control) handleSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
+	xapp.Logger.Info("Subscription Delete Response Received from Src: %s, Mtype: %v, SubId: %v, Meid: %v",params.Src, params.Mtype, params.SubId, params.Meid)
+	xapp.Rmr.Free(params.Mbuf)
+	params.Mbuf = nil
+
 	payloadSeqNum, err := c.e2ap.GetSubscriptionDeleteResponseSequenceNumber(params.Payload)
 	if err != nil {
 		err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
 		return
 	}
-	var transaction, _ = c.tracker.RetriveTransaction(payloadSeqNum, DELETE)
-	subRouteAction := SubRouteInfo{DELETE, transaction.XappInstanceAddress, transaction.XappPort, payloadSeqNum}
-	go c.rtmgrClient.SubscriptionRequestUpdate()
-	SubscriptionReqChan <- subRouteAction
+	xapp.Logger.Info("Received payloadSeqNum: %v", payloadSeqNum)
 
-	xapp.Logger.Info("Subscription Delete Response Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payloadSeqNum)
+	// should here be IsValidSequenceNumber check?
+//	c.timerMap.StopTimer(payloadSeqNum)
+	
+	var transaction Transaction
+	transaction, err = c.tracker.RetriveTransaction(payloadSeqNum, DELETE)
+	if  err != nil {
+		xapp.Logger.Error("Failed to retrive transaction record. Err %v", err)
+		xapp.Logger.Info("Further processing of this Subscription Delete Response stopped. SubId: %v, Xid: %s",params.SubId, params.Xid)
+		return
+	}
+	xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Forwarding response to requestor...", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort)
+
+    params.SubId = int(payloadSeqNum)
+    params.Xid = transaction.OrigParams.Xid
+	xapp.Logger.Info("Forwarding Subscription Delete Response to UEEC: Mtype: %v, SubId: %v, Xid: %v, Meid: %v",params.Mtype, params.SubId, params.Xid, params.Meid)
+	err = c.rmrReplyToSender(params)
+	if err != nil {
+		xapp.Logger.Error("Failed to send response to requestor %v. SubId: %v, Xid: %s", err, params.SubId, params.Xid)
+//		return
+	}
+
+	time.Sleep(3 * time.Second)
+
+	xapp.Logger.Info("Starting routing manager update")
+	subRouteAction := SubRouteInfo{DELETE, transaction.XappInstanceAddress, transaction.XappPort, payloadSeqNum}
+	c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
+
+	xapp.Logger.Info("Deleting trancaction record")
 	if c.registry.releaseSequenceNumber(payloadSeqNum) {
 		transaction, err = c.tracker.completeTransaction(payloadSeqNum, DELETE)
 		if err != nil {
 			xapp.Logger.Error("Failed to delete a Subscription Delete Request transaction record due to %v", err)
 			return
 		}
-		xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Forwarding to E2 Termination...", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort)
-		//params.Src = xAddress + ":" + strconv.Itoa(int(xPort))
-		params.Mbuf = transaction.OrigParams.Mbuf
-		c.rmrReplyToSender(params)
 	}
 	return
 }
+
+func (c *Control) handleSubscriptionDeleteFailure(params *xapp.RMRParams) (err error) {
+	xapp.Logger.Info("Subscription Delete Failure Received from Src: %s, Mtype: %v, SubId: %v, Meid: %v",params.Src, params.Mtype, params.SubId, params.Meid)
+	xapp.Rmr.Free(params.Mbuf)
+	params.Mbuf = nil
+
+	payloadSeqNum, err := c.e2ap.GetSubscriptionDeleteFailureSequenceNumber(params.Payload)
+	if err != nil {
+		err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
+		return
+	}
+	xapp.Logger.Info("Received payloadSeqNum: %v", payloadSeqNum)
+
+	// should here be IsValidSequenceNumber check?
+//	c.timerMap.StopTimer(payloadSeqNum)
+
+	var transaction Transaction
+	transaction, err = c.tracker.RetriveTransaction(payloadSeqNum, DELETE)
+	if  err != nil {
+		xapp.Logger.Error("Failed to retrive transaction record. Err %v", err)
+		xapp.Logger.Info("Further processing of this Subscription Delete Failure stopped. SubId: %v, Xid: %s",params.SubId, params.Xid)
+		return
+	}
+	xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Forwarding response to requestor...", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort)
+
+    params.SubId = int(payloadSeqNum)
+    params.Xid = transaction.OrigParams.Xid
+	xapp.Logger.Info("Forwarding Subscription Delete Failure to UEEC: Mtype: %v, SubId: %v, Xid: %v, Meid: %v",params.Mtype, params.SubId, params.Xid, params.Meid)
+	err = c.rmrReplyToSender(params)
+	if err != nil {
+		xapp.Logger.Error("Failed to send response to requestor %v. SubId: %v, Xid: %s", err, params.SubId, params.Xid)
+//		return
+	}
+
+	time.Sleep(3 * time.Second)
+
+	xapp.Logger.Info("Starting routing manager update")
+	subRouteAction := SubRouteInfo{DELETE, transaction.XappInstanceAddress, transaction.XappPort, payloadSeqNum}
+	c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
+
+	xapp.Logger.Info("Deleting trancaction record")
+	if c.registry.releaseSequenceNumber(payloadSeqNum) {
+		transaction, err = c.tracker.completeTransaction(payloadSeqNum, DELETE)
+		if err != nil {
+			xapp.Logger.Error("Failed to delete a Subscription Delete Request transaction record due to %v", err)
+			return
+		}
+	}
+	return
+}
+
+func (c *Control) handleSubscriptionDeleteRequestTimer(subId uint16) {
+	xapp.Logger.Info("Subscription Delete Request timer expired. SubId: %v",subId)
+/*	
+	transaction, err := c.tracker.completeTransaction(subId, DELETE)
+	if err != nil {
+		xapp.Logger.Error("Failed to delete a Subscription Delete Request transaction record due to %v", err)
+		return
+	}
+	xapp.Logger.Info("SubId: %v, Xid %v, Meid: %v",subId, transaction.OrigParams.Xid, transaction.OrigParams.Meid)
+
+	var params xapp.RMRParams
+	params.Mtype = 12022 //xapp.RICMessageTypes["RIC_SUB_DEL_FAILURE"]
+	params.SubId = int(subId)
+	params.Meid = transaction.OrigParams.Meid
+	params.Xid = transaction.OrigParams.Xid
+	payload := []byte("40CA4018000003EA7E00050000010016EA6300020021EA74000200C0")
+	params.PayloadLen = len(payload)
+	params.Payload = payload
+
+	xapp.Logger.Info("Forwarding Subscription Delete Failure to UEEC: Mtype: %v, SubId: %v, Xid: %s, Meid: %v",params.Mtype, params.SubId, params.Xid, params.Meid)
+	err = c.rmrReplyToSender(&params)
+	if err != nil {
+		xapp.Logger.Error("Failed to send response to requestor %v. SubId: %v, Xid: %s", err, params.SubId, params.Xid)
+	}
+*/	
+/*
+	time.Sleep(3 * time.Second)
+	xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Deleting transaction record", int(subId), transaction.XappInstanceAddress, transaction.XappPort)
+
+	xapp.Logger.Info("Starting routing manager update")
+	subRouteAction := SubRouteInfo{DELETE, transaction.XappInstanceAddress, transaction.XappPort, payloadSeqNum}
+	c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
+
+	xapp.Logger.Info("Deleting trancaction record")
+	if c.registry.releaseSequenceNumber(payloadSeqNum) {
+		transaction, err = c.tracker.completeTransaction(payloadSeqNum, DELETE)
+		if err != nil {
+			xapp.Logger.Error("Failed to delete a Subscription Delete Request transaction record due to %v", err)
+			return
+		}
+	}
+*/
+	return
+	}
diff --git a/pkg/control/e2ap.go b/pkg/control/e2ap.go
index 6f5edcb..f9580d9 100644
--- a/pkg/control/e2ap.go
+++ b/pkg/control/e2ap.go
@@ -117,3 +117,49 @@
 	newPayload = C.GoBytes(cptr, C.int(size))
 	return
 }
+
+/* RICsubscriptionRequestFailure */
+
+func (c *E2ap) GetSubscriptionFailureSequenceNumber(payload []byte) (subId uint16, err error) {
+	cptr := unsafe.Pointer(&payload[0])
+	cret := C.e2ap_get_ric_subscription_failure_sequence_number(cptr, C.size_t(len(payload)))
+	if cret < 0 {
+		return 0, errors.New("e2ap wrapper is unable to get Subscirption Failure Sequence Number due to wrong or invalid payload")
+	}
+	subId = uint16(cret)
+	return
+}
+
+// This function is not used currently. Can be deleted if not needed
+func (c *E2ap) SetSubscriptionFailureSequenceNumber(payload []byte, newSubscriptionid uint16) (newPayload []byte, err error) {
+	cptr := unsafe.Pointer(&payload[0])
+	size := C.e2ap_set_ric_subscription_failure_sequence_number(cptr, C.size_t(len(payload)), C.long(newSubscriptionid))
+	if size < 0 {
+		return make([]byte, 0), errors.New("e2ap wrapper is unable to set Subscription Failure Sequence Number due to wrong or invalid payload")
+	}
+	newPayload = C.GoBytes(cptr, C.int(size))
+	return
+}
+
+/* RICsubscriptionDeleteFailure */
+
+func (c *E2ap) GetSubscriptionDeleteFailureSequenceNumber(payload []byte) (subId uint16, err error) {
+	cptr := unsafe.Pointer(&payload[0])
+	cret := C.e2ap_get_ric_subscription_delete_failure_sequence_number(cptr, C.size_t(len(payload)))
+	if cret < 0 {
+		return 0, errors.New("e2ap wrapper is unable to get Subscirption Delete Failure Sequence Number due to wrong or invalid payload")
+	}
+	subId = uint16(cret)
+	return
+}
+
+// This function is not used currently. Can be deleted if not needed
+func (c *E2ap) SetSubscriptionDeleteFailureSequenceNumber(payload []byte, newSubscriptionid uint16) (newPayload []byte, err error) {
+	cptr := unsafe.Pointer(&payload[0])
+	size := C.e2ap_set_ric_subscription_delete_failure_sequence_number(cptr, C.size_t(len(payload)), C.long(newSubscriptionid))
+	if size < 0 {
+		return make([]byte, 0), errors.New("e2ap wrapper is unable to set Subscription Delete Failure Sequence Number due to wrong or invalid payload")
+	}
+	newPayload = C.GoBytes(cptr, C.int(size))
+	return
+}
diff --git a/pkg/control/registry.go b/pkg/control/registry.go
index 6717612..98aa97e 100644
--- a/pkg/control/registry.go
+++ b/pkg/control/registry.go
@@ -19,7 +19,12 @@
 
 package control
 
-import "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
+import (
+	"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
+	"sync"
+)
+
+var registryMutex = &sync.Mutex{}
 
 type Registry struct {
 	register map[uint16]bool
@@ -33,16 +38,30 @@
 }
 
 // Reserves and returns the next free sequence number
-func (r *Registry) ReserveSequenceNumber() uint16 {
-	if r.IsValidSequenceNumber(r.counter) {	}
+func (r *Registry) ReserveSequenceNumber() (uint16, bool) {
+	// Check is current SequenceNumber valid
+	registryMutex.Lock()
+	defer registryMutex.Unlock()
 	sequenceNumber := r.counter
+	if _, ok := r.register[sequenceNumber]; ok {
+		xapp.Logger.Error("Invalid SeqenceNumber sequenceNumber: %v",sequenceNumber)
+		return sequenceNumber, false
+	}
 	r.register[sequenceNumber] = false
-	r.shift()
-	return sequenceNumber
+
+	// Allocate next SequenceNumber value
+	if r.counter == 65535 {
+		r.counter = 0
+	} else {
+		r.counter++
+	}
+	return sequenceNumber, true
 }
 
 // This function checks the validity of the given subscription id
 func (r *Registry) IsValidSequenceNumber(sn uint16) bool {
+	registryMutex.Lock()
+	defer registryMutex.Unlock()
 	xapp.Logger.Debug("Registry map: %v", r.register)
 	if _, ok := r.register[sn]; ok {
 		return true
@@ -52,27 +71,25 @@
 
 // This function sets the give id as confirmed in the register
 func (r *Registry) setSubscriptionToConfirmed(sn uint16) {
+	registryMutex.Lock()
+	defer registryMutex.Unlock()
 	r.register[sn] = true
 }
 
-func (r *Registry) shift() {
-	if r.counter == 65535 {
-		r.counter = 0
-	} else {
-		r.counter++
-	}
-}
-
 //This function sets the given id as unused in the register
 func (r *Registry) deleteSubscription(sn uint16) {
+	registryMutex.Lock()
+	defer registryMutex.Unlock()
 	r.register[sn] = false
 }
 
 //This function releases the given id as unused in the register
 func (r *Registry) releaseSequenceNumber(sn uint16) bool {
+	registryMutex.Lock()
+	defer registryMutex.Unlock()
 	if r.register[sn] {
 		return false
-	} else {
+		} else {
 		delete(r.register, sn)
 		return true
 	}
diff --git a/pkg/control/timers.go b/pkg/control/timers.go
new file mode 100644
index 0000000..6f64c5b
--- /dev/null
+++ b/pkg/control/timers.go
@@ -0,0 +1,108 @@
+/*
+==================================================================================
+  Copyright (c) 2019 AT&T Intellectual Property.
+  Copyright (c) 2019 Nokia
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+package control
+
+import (
+	"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
+	"sync"
+	"time"
+)
+
+var timerMutex = &sync.Mutex{}
+
+type TimerInfo struct {
+	timerAddress *time.Timer	
+	timerFunctionAddress func()
+}
+
+type TimerMap struct {
+	timer map[uint16] TimerInfo
+}
+
+// This method should run as a constructor
+func (t *TimerMap) Init() {
+	t.timer = make(map[uint16] TimerInfo)
+}
+
+func (t *TimerMap) StartTimer(subId uint16, expireAfterTime time.Duration, timerFunction func(subId uint16)) bool {
+	timerMutex.Lock()
+	defer timerMutex.Unlock()
+	if (timerFunction == nil) {
+		xapp.Logger.Error("StartTimer() timerFunc == nil")
+		return false
+	}
+
+	// Stop timer if there is already timer running with the same id
+	if val, ok := t.timer[subId]; ok {
+		xapp.Logger.Error("StartTimer() old timer found")
+		if val.timerAddress != nil {
+			xapp.Logger.Error("StartTimer() deleting old timer")
+			val.timerAddress.Stop()
+		}
+		delete(t.timer, subId)
+	}
+
+	// Store timer + timer function excecutor function and the function to be excecuted when timer expires, in map
+	t.timer[subId] = TimerInfo{timerAddress: time.AfterFunc(expireAfterTime, func(){t.timerFunctionExcecutor(subId)}),
+							   timerFunctionAddress: func(){timerFunction(subId)}}
+	return true
+}
+
+func (t *TimerMap) StopTimer(subId uint16) bool {
+	timerMutex.Lock()
+	defer timerMutex.Unlock()
+	if val, ok := t.timer[subId]; ok {
+		if val.timerAddress != nil {
+			val.timerAddress.Stop()
+			delete(t.timer, subId)
+			return true
+		} else {
+			xapp.Logger.Error("StopTimer() timerAddress == nil")
+			return false
+		}
+	} else {
+		xapp.Logger.Info("StopTimer() Timer not found. May be expired or stopped already. subId: %v",subId)
+		return false
+	}
+}
+
+func (t *TimerMap) timerFunctionExcecutor(subId uint16) {
+	timerMutex.Lock()
+	if val, ok := t.timer[subId]; ok {
+		if val.timerFunctionAddress != nil {
+			// Take local copy of timer function address
+			f := val.timerFunctionAddress
+			// Delete timer instance from map
+			delete(t.timer, subId)
+			timerMutex.Unlock()
+			// Excecute the timer function
+			f()
+			return
+		} else {
+			xapp.Logger.Error("timerExcecutorFunc() timerFunctionAddress == nil")
+			timerMutex.Unlock()
+			return
+		}
+	} else {
+		xapp.Logger.Error("timerExcecutorFunc() Timer not anymore in map. subId: %v",subId)
+		timerMutex.Unlock()
+		return
+	}
+}
diff --git a/pkg/control/tracker.go b/pkg/control/tracker.go
index eddfbda..dfab96e 100644
--- a/pkg/control/tracker.go
+++ b/pkg/control/tracker.go
@@ -21,8 +21,11 @@
 
 import (
 	"fmt"
+	"sync"
 )
 
+var trackerMutex = &sync.Mutex{}
+
 /*
 Implements a record of ongoing transactions and helper functions to CRUD the records.
 */
@@ -39,6 +42,8 @@
 Returns error if there is similar transatcion ongoing.
 */
 func (t *Tracker) TrackTransaction(key TransactionKey, xact Transaction) error {
+	trackerMutex.Lock()
+	defer trackerMutex.Unlock()
 	if _, ok := t.transactionTable[key]; ok {
 		// TODO: Implement merge related check here. If the key is same but the value is different.
 		err := fmt.Errorf("transaction tracker: Similar transaction with sub id %d and type %s is ongoing", key.SubID, key.transType)
@@ -54,6 +59,8 @@
 */
 func (t *Tracker) UpdateTransaction(SubID uint16, transType Action, xact Transaction) error {
 	key := TransactionKey{SubID, transType}
+	trackerMutex.Lock()
+	defer trackerMutex.Unlock()
 	if _, ok := t.transactionTable[key]; ok {
 		// TODO: Implement merge related check here. If the key is same but the value is different.
 		err := fmt.Errorf("transaction tracker: Similar transaction with sub id %d and type %v is ongoing", key.SubID, key.transType)
@@ -69,6 +76,8 @@
 */
 func (t *Tracker) RetriveTransaction(subID uint16, act Action) (Transaction, error) {
 	key := TransactionKey{subID, act}
+	trackerMutex.Lock()
+	defer trackerMutex.Unlock()
 	var xact Transaction
 	if xact, ok := t.transactionTable[key]; ok {
 		return xact, nil
@@ -84,6 +93,8 @@
 func (t *Tracker) completeTransaction(subID uint16, act Action) (Transaction, error) {
 	key := TransactionKey{subID, act}
 	var emptyTransaction Transaction
+	trackerMutex.Lock()
+	defer trackerMutex.Unlock()
 	if xact, ok := t.transactionTable[key]; ok {
 		delete(t.transactionTable, key)
 		return xact, nil