New submgr for testing. Tagged as ric-plt-submgr:r3-test-v2. Tested that submgr can receive and forward messages correctly.
Signed-off-by: Anssi Mannila <anssi.mannila@nokia.com>
Change-Id: Ib6a472c3205fbf2e2d9a030282b7071a12ee8b0d
diff --git a/pkg/control/client.go b/pkg/control/client.go
index 14e2455..d377d8e 100644
--- a/pkg/control/client.go
+++ b/pkg/control/client.go
@@ -35,14 +35,10 @@
xappDeleteParams *rtmgrhandle.DeleteXappSubscriptionHandleParams
}
-func (rc *RtmgrClient) SubscriptionRequestUpdate() error {
+func (rc *RtmgrClient) SubscriptionRequestUpdate(subRouteAction SubRouteInfo) error {
xapp.Logger.Debug("SubscriptionRequestUpdate() invoked")
- subRouteAction := <-SubscriptionReqChan
- // Routing manager handles subscription id as int32 to accomodate -1 and uint16 values
subID := int32(subRouteAction.SubID)
-
- xapp.Logger.Debug("Subscription action details received: ", subRouteAction)
-
+ xapp.Logger.Debug("Subscription action details received. subRouteAction.Command: %v, Address %s, Port %v, subID %v", int16(subRouteAction.Command), subRouteAction.Address, subRouteAction.Port, subID)
xappSubReq := rtmgr_models.XappSubscriptionData{&subRouteAction.Address, &subRouteAction.Port, &subID}
switch subRouteAction.Command {
@@ -65,6 +61,7 @@
return nil
}
default:
+ xapp.Logger.Debug("Unknown subRouteAction.Command: %v, subID: %v", subRouteAction.Command, subRouteAction.Address, subRouteAction.Port, subID)
return nil
}
}
diff --git a/pkg/control/control.go b/pkg/control/control.go
index dc8c5e0..af629a1 100644
--- a/pkg/control/control.go
+++ b/pkg/control/control.go
@@ -32,23 +32,30 @@
"math/rand"
"strconv"
"time"
+ "sync"
)
+var rmrSendMutex = &sync.Mutex{}
+
+var subReqTime time.Duration = 2 * time.Second
+var SubDelReqTime time.Duration = 2 * time.Second
+
type Control struct {
e2ap *E2ap
registry *Registry
rtmgrClient *RtmgrClient
tracker *Tracker
rcChan chan *xapp.RMRParams
+ timerMap *TimerMap
}
type RMRMeid struct {
PlmnID string
EnbID string
+ RanName string
}
var seedSN uint16
-var SubscriptionReqChan = make(chan SubRouteInfo, 10)
const (
CREATE Action = 0
@@ -57,6 +64,7 @@
)
func init() {
+ xapp.Logger.Info("SUBMGR /ric-plt-submgr:r3-test-v2")
viper.AutomaticEnv()
viper.SetEnvPrefix("submgr")
viper.AllowEmptyEnv(true)
@@ -78,13 +86,16 @@
tracker := new(Tracker)
tracker.Init()
+ timerMap := new(TimerMap)
+ timerMap.Init()
+
transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
client := rtmgrclient.New(transport, strfmt.Default)
handle := rtmgrhandle.NewProvideXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
deleteHandle := rtmgrhandle.NewDeleteXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
rtmgrClient := RtmgrClient{client, handle, deleteHandle}
- return Control{new(E2ap), registry, &rtmgrClient, tracker, make(chan *xapp.RMRParams)}
+ return Control{new(E2ap), registry, &rtmgrClient, tracker, make(chan *xapp.RMRParams),timerMap}
}
func (c *Control) Run() {
@@ -98,16 +109,33 @@
}
func (c *Control) rmrSend(params *xapp.RMRParams) (err error) {
+ status := false
+ i := 1
+ rmrSendMutex.Lock()
+ for ; i <= 10 && status == false; i++ {
+ status = xapp.Rmr.Send(params, false)
+ if status == false {
+ xapp.Logger.Info("rmr.Send() failed. Retry count %v, Mtype: %v, SubId: %v, Xid %s",i, params.Mtype, params.SubId, params.Xid)
+ time.Sleep(500 * time.Millisecond)
+ }
+ }
+ if status == false {
+ err = errors.New("rmr.Send() failed")
+ xapp.Rmr.Free(params.Mbuf)
+ }
+ rmrSendMutex.Unlock()
+
+ /*
if !xapp.Rmr.Send(params, false) {
err = errors.New("rmr.Send() failed")
+ xapp.Rmr.Free(params.Mbuf)
}
+ */
return
}
func (c *Control) rmrReplyToSender(params *xapp.RMRParams) (err error) {
- if !xapp.Rmr.Send(params, true) {
- err = errors.New("rmr.Send() failed")
- }
+ c.rmrSend(params)
return
}
@@ -116,13 +144,15 @@
msg := <-c.rcChan
switch msg.Mtype {
case xapp.RICMessageTypes["RIC_SUB_REQ"]:
- c.handleSubscriptionRequest(msg)
+ go c.handleSubscriptionRequest(msg)
case xapp.RICMessageTypes["RIC_SUB_RESP"]:
- c.handleSubscriptionResponse(msg)
+ go c.handleSubscriptionResponse(msg)
+ case xapp.RICMessageTypes["RIC_SUB_FAILURE"]:
+ go c.handleSubscriptionFailure(msg)
case xapp.RICMessageTypes["RIC_SUB_DEL_REQ"]:
- c.handleSubscriptionDeleteRequest(msg)
+ go c.handleSubscriptionDeleteRequest(msg)
case xapp.RICMessageTypes["RIC_SUB_DEL_RESP"]:
- c.handleSubscriptionDeleteResponse(msg)
+ go c.handleSubscriptionDeleteResponse(msg)
default:
err := errors.New("Message Type " + strconv.Itoa(msg.Mtype) + " is discarded")
xapp.Logger.Error("Unknown message type: %v", err)
@@ -131,15 +161,16 @@
}
func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) (err error) {
- payloadSeqNum, err := c.e2ap.GetSubscriptionRequestSequenceNumber(params.Payload)
- if err != nil {
- err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
- return
- }
- xapp.Logger.Info("Subscription Request Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payloadSeqNum)
+ xapp.Logger.Info("Subscription Request Received from Src: %s, Mtype: %v, SubId: %v, Xid: %s, Meid: %v",params.Src, params.Mtype, params.SubId, params.Xid, params.Meid)
+ xapp.Rmr.Free(params.Mbuf)
+ params.Mbuf = nil
/* Reserve a sequence number and set it in the payload */
- newSubId := c.registry.ReserveSequenceNumber()
+ newSubId, isIdValid := c.registry.ReserveSequenceNumber()
+ if isIdValid != true {
+ xapp.Logger.Info("Further processing of this SubscriptionRequest stopped. SubId: %v, Xid: %s",params.SubId, params.Xid)
+ return
+ }
_, err = c.e2ap.SetSubscriptionRequestSequenceNumber(params.Payload, newSubId)
if err != nil {
@@ -164,39 +195,165 @@
/* Update routing manager about the new subscription*/
subRouteAction := SubRouteInfo{CREATE, *srcAddr, *srcPort, newSubId}
- go c.rtmgrClient.SubscriptionRequestUpdate()
- SubscriptionReqChan <- subRouteAction
+ xapp.Logger.Info("Starting routing manager update")
+ c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
+
+ //time.Sleep(3 * time.Second)
// Setting new subscription ID in the RMR header
params.SubId = int(newSubId)
-
- xapp.Logger.Info("Generated ID: %v. Forwarding to E2 Termination...", int(newSubId))
- c.rmrSend(params)
+ xapp.Logger.Info("Forwarding Subscription Request to E2T: Mtype: %v, SubId: %v, Xid %s, Meid %v",params.Mtype, params.SubId, params.Xid, params.Meid)
+ err = c.rmrSend(params)
+ if err != nil {
+ xapp.Logger.Error("Failed to send request to E2T %v. SubId: %v, Xid: %s", err, params.SubId, params.Xid)
+ } /*else {
+ c.timerMap.StartTimer(newSubId, subReqTime, c.handleSubscriptionRequestTimer)
+ }*/
xapp.Logger.Debug("--- Debugging transaction table = %v", c.tracker.transactionTable)
return
}
func (c *Control) handleSubscriptionResponse(params *xapp.RMRParams) (err error) {
+ xapp.Logger.Info("Subscription Response Received from Src: %s, Mtype: %v, SubId: %v, Meid: %v",params.Src, params.Mtype, params.SubId, params.Xid, params.Meid)
+ xapp.Rmr.Free(params.Mbuf)
+ params.Mbuf = nil
+
payloadSeqNum, err := c.e2ap.GetSubscriptionResponseSequenceNumber(params.Payload)
if err != nil {
err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
return
}
- xapp.Logger.Info("Subscription Response Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payloadSeqNum)
+
+ xapp.Logger.Info("Received payloadSeqNum: %v",payloadSeqNum)
if !c.registry.IsValidSequenceNumber(payloadSeqNum) {
err = errors.New("Unknown Subscription ID: " + strconv.Itoa(int(payloadSeqNum)) + " in Subscritpion Response. Message discarded.")
return
}
+
+// c.timerMap.StopTimer(payloadSeqNum)
+
c.registry.setSubscriptionToConfirmed(payloadSeqNum)
- xapp.Logger.Info("Subscription Response Registered. Forwarding to Requestor...")
- transaction, err := c.tracker.completeTransaction(payloadSeqNum, CREATE)
+ var transaction Transaction
+ transaction, err = c.tracker.RetriveTransaction(payloadSeqNum, CREATE)
+ if err != nil {
+ xapp.Logger.Error("Failed to retrive transaction record. Err: %v", err)
+ xapp.Logger.Info("Further processing of this Subscription Response stopped. SubId: %v, Xid: %s",params.SubId, params.Xid)
+ return
+ }
+ xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Retrieved old subId...", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort)
+
+ params.SubId = int(payloadSeqNum)
+ params.Xid = transaction.OrigParams.Xid
+
+ xapp.Logger.Info("Forwarding Subscription Response to UEEC: Mtype: %v, SubId: %v, Xid: %s, Meid: %v",params.Mtype, params.SubId, params.Xid, params.Meid)
+ err = c.rmrReplyToSender(params)
+ if err != nil {
+ xapp.Logger.Error("Failed to send response to requestor %v. SubId: %v, Xid: %s", err, params.SubId, params.Xid)
+ }
+
+ xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Deleting transaction record", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort)
+ transaction, err = c.tracker.completeTransaction(payloadSeqNum, CREATE)
if err != nil {
xapp.Logger.Error("Failed to delete a Subscription Request transaction record due to %v", err)
return
}
- xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Forwarding to E2 Termination...", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort)
- params.Mbuf = transaction.OrigParams.Mbuf
- c.rmrReplyToSender(params)
+ return
+}
+
+func (c *Control) handleSubscriptionFailure(params *xapp.RMRParams) (err error) {
+ xapp.Logger.Info("Subscription Failure Received from Src: %s, Mtype: %v, SubId: %v, Meid: %v",params.Src, params.Mtype, params.SubId, params.Meid)
+ xapp.Rmr.Free(params.Mbuf)
+ params.Mbuf = nil
+
+ payloadSeqNum, err := c.e2ap.GetSubscriptionFailureSequenceNumber(params.Payload)
+ if err != nil {
+ err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
+ return
+ }
+ xapp.Logger.Info("Received payloadSeqNum: %v", payloadSeqNum)
+
+ // should here be IsValidSequenceNumber check?
+
+// c.timerMap.StopTimer(payloadSeqNum)
+
+ var transaction Transaction
+ transaction, err = c.tracker.RetriveTransaction(payloadSeqNum, CREATE)
+ if err != nil {
+ xapp.Logger.Error("Failed to retrive transaction record. Err %v", err)
+ xapp.Logger.Info("Further processing of this Subscription Failure stopped. SubId: %v, Xid: %s",params.SubId, params.Xid)
+ return
+ }
+ xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Forwarding response to requestor...", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort)
+
+ params.SubId = int(payloadSeqNum)
+ params.Xid = transaction.OrigParams.Xid
+
+ xapp.Logger.Info("Forwarding Subscription Failure to UEEC: Mtype: %v, SubId: %v, Xid: %v, Meid: %v",params.Mtype, params.SubId, params.Xid, params.Meid)
+ err = c.rmrReplyToSender(params)
+ if err != nil {
+ xapp.Logger.Error("Failed to send response to requestor %v. SubId: %v, Xid: %s", err, params.SubId, params.Xid)
+ }
+
+ time.Sleep(3 * time.Second)
+
+ xapp.Logger.Info("Starting routing manager update")
+ subRouteAction := SubRouteInfo{DELETE, transaction.XappInstanceAddress, transaction.XappPort, payloadSeqNum}
+ c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
+
+ xapp.Logger.Info("Deleting trancaction record")
+ if c.registry.releaseSequenceNumber(payloadSeqNum) {
+ transaction, err = c.tracker.completeTransaction(payloadSeqNum, CREATE)
+ if err != nil {
+ xapp.Logger.Error("Failed to delete a Subscription Request transaction record due to %v", err)
+ return
+ }
+ }
+ return
+}
+
+func (c *Control) handleSubscriptionRequestTimer(subId uint16) {
+ xapp.Logger.Info("Subscription Request timer expired. SubId: %v",subId)
+/*
+ transaction, err := c.tracker.completeTransaction(subId, CREATE)
+ if err != nil {
+ xapp.Logger.Error("Failed to delete a Subscription Request transaction record due to %v", err)
+ return
+ }
+ xapp.Logger.Info("SubId: %v, Xid %v, Meid: %v",subId, transaction.OrigParams.Xid, transaction.OrigParams.Meid)
+
+ var params xapp.RMRParams
+ params.Mtype = 12012 //xapp.RICMessageTypes["RIC_SUB_FAILURE"]
+ params.SubId = int(subId)
+ params.Meid = transaction.OrigParams.Meid
+ params.Xid = transaction.OrigParams.Xid
+ payload := []byte("40C9408098000003EA7E00050000010016EA6300020021EA6E00808180EA6F000400000000EA6F000400010040EA6F000400020080EA6F0004000300C0EA6F000400040100EA6F000400050140EA6F000400060180EA6F0004000701C0EA6F000400080200EA6F000400090240EA6F0004000A0280EA6F0004000B02C0EA6F0004000C0300EA6F0004000D0340EA6F0004000E0380EA6F0004000F03C0")
+ params.PayloadLen = len(payload)
+ params.Payload = payload
+
+ xapp.Logger.Info("Forwarding Subscription Failure to UEEC: Mtype: %v, SubId: %v, Xid: %s, Meid: %v",params.Mtype, params.SubId, params.Xid, params.Meid)
+ err = c.rmrReplyToSender(¶ms)
+ if err != nil {
+ xapp.Logger.Error("Failed to send response to requestor %v. SubId: %v, Xid: %s", err, params.SubId, params.Xid)
+ }
+*/
+/*
+ time.Sleep(3 * time.Second)
+
+ xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Deleting transaction record", int(subId), transaction.XappInstanceAddress, transaction.XappPort)
+
+ xapp.Logger.Info("Starting routing manager update")
+ subRouteAction := SubRouteInfo{DELETE, transaction.XappInstanceAddress, transaction.XappPort, payloadSeqNum}
+ c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
+
+ xapp.Logger.Info("Deleting trancaction record")
+ if c.registry.releaseSequenceNumber(payloadSeqNum) {
+ transaction, err = c.tracker.completeTransaction(payloadSeqNum, CREATE)
+ if err != nil {
+ xapp.Logger.Error("Failed to delete a Subscription Request transaction record due to %v", err)
+ return
+ }
+ }
+*/
return
}
@@ -223,12 +380,16 @@
}
func (c *Control) handleSubscriptionDeleteRequest(params *xapp.RMRParams) (err error) {
+ xapp.Logger.Info("Subscription Delete Request Received from Src: %s, Mtype: %v, SubId: %v, Xid: %s, Meid: %v",params.Src, params.Mtype, params.SubId, params.Xid, params.Meid)
+ xapp.Rmr.Free(params.Mbuf)
+ params.Mbuf = nil
+
payloadSeqNum, err := c.e2ap.GetSubscriptionDeleteRequestSequenceNumber(params.Payload)
if err != nil {
err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
return
}
- xapp.Logger.Info("Subscription Delete Request Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payloadSeqNum)
+ xapp.Logger.Info("Received payloadSeqNum: %v", payloadSeqNum)
if c.registry.IsValidSequenceNumber(payloadSeqNum) {
c.registry.deleteSubscription(payloadSeqNum)
trackErr := c.trackDeleteTransaction(params, payloadSeqNum)
@@ -237,8 +398,14 @@
return trackErr
}
}
- xapp.Logger.Info("Subscription ID: %v. Forwarding to E2 Termination...", int(payloadSeqNum))
+
+ xapp.Logger.Info("Forwarding Delete Subscription Request to E2T: Mtype: %v, SubId: %v, Xid: %s, Meid: %v",params.Mtype, params.SubId, params.Xid, params.Meid)
c.rmrSend(params)
+ if err != nil {
+ xapp.Logger.Error("Failed to send request to E2T %v. SubId: %v, Xid: %s", err, params.SubId, params.Xid)
+ } /*else {
+ c.timerMap.StartTimer(payloadSeqNum, SubDelReqTime, c.handleSubscriptionDeleteRequestTimer)
+ }*/
return
}
@@ -254,27 +421,146 @@
}
func (c *Control) handleSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
+ xapp.Logger.Info("Subscription Delete Response Received from Src: %s, Mtype: %v, SubId: %v, Meid: %v",params.Src, params.Mtype, params.SubId, params.Meid)
+ xapp.Rmr.Free(params.Mbuf)
+ params.Mbuf = nil
+
payloadSeqNum, err := c.e2ap.GetSubscriptionDeleteResponseSequenceNumber(params.Payload)
if err != nil {
err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
return
}
- var transaction, _ = c.tracker.RetriveTransaction(payloadSeqNum, DELETE)
- subRouteAction := SubRouteInfo{DELETE, transaction.XappInstanceAddress, transaction.XappPort, payloadSeqNum}
- go c.rtmgrClient.SubscriptionRequestUpdate()
- SubscriptionReqChan <- subRouteAction
+ xapp.Logger.Info("Received payloadSeqNum: %v", payloadSeqNum)
- xapp.Logger.Info("Subscription Delete Response Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payloadSeqNum)
+ // should here be IsValidSequenceNumber check?
+// c.timerMap.StopTimer(payloadSeqNum)
+
+ var transaction Transaction
+ transaction, err = c.tracker.RetriveTransaction(payloadSeqNum, DELETE)
+ if err != nil {
+ xapp.Logger.Error("Failed to retrive transaction record. Err %v", err)
+ xapp.Logger.Info("Further processing of this Subscription Delete Response stopped. SubId: %v, Xid: %s",params.SubId, params.Xid)
+ return
+ }
+ xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Forwarding response to requestor...", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort)
+
+ params.SubId = int(payloadSeqNum)
+ params.Xid = transaction.OrigParams.Xid
+ xapp.Logger.Info("Forwarding Subscription Delete Response to UEEC: Mtype: %v, SubId: %v, Xid: %v, Meid: %v",params.Mtype, params.SubId, params.Xid, params.Meid)
+ err = c.rmrReplyToSender(params)
+ if err != nil {
+ xapp.Logger.Error("Failed to send response to requestor %v. SubId: %v, Xid: %s", err, params.SubId, params.Xid)
+// return
+ }
+
+ time.Sleep(3 * time.Second)
+
+ xapp.Logger.Info("Starting routing manager update")
+ subRouteAction := SubRouteInfo{DELETE, transaction.XappInstanceAddress, transaction.XappPort, payloadSeqNum}
+ c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
+
+ xapp.Logger.Info("Deleting trancaction record")
if c.registry.releaseSequenceNumber(payloadSeqNum) {
transaction, err = c.tracker.completeTransaction(payloadSeqNum, DELETE)
if err != nil {
xapp.Logger.Error("Failed to delete a Subscription Delete Request transaction record due to %v", err)
return
}
- xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Forwarding to E2 Termination...", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort)
- //params.Src = xAddress + ":" + strconv.Itoa(int(xPort))
- params.Mbuf = transaction.OrigParams.Mbuf
- c.rmrReplyToSender(params)
}
return
}
+
+func (c *Control) handleSubscriptionDeleteFailure(params *xapp.RMRParams) (err error) {
+ xapp.Logger.Info("Subscription Delete Failure Received from Src: %s, Mtype: %v, SubId: %v, Meid: %v",params.Src, params.Mtype, params.SubId, params.Meid)
+ xapp.Rmr.Free(params.Mbuf)
+ params.Mbuf = nil
+
+ payloadSeqNum, err := c.e2ap.GetSubscriptionDeleteFailureSequenceNumber(params.Payload)
+ if err != nil {
+ err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
+ return
+ }
+ xapp.Logger.Info("Received payloadSeqNum: %v", payloadSeqNum)
+
+ // should here be IsValidSequenceNumber check?
+// c.timerMap.StopTimer(payloadSeqNum)
+
+ var transaction Transaction
+ transaction, err = c.tracker.RetriveTransaction(payloadSeqNum, DELETE)
+ if err != nil {
+ xapp.Logger.Error("Failed to retrive transaction record. Err %v", err)
+ xapp.Logger.Info("Further processing of this Subscription Delete Failure stopped. SubId: %v, Xid: %s",params.SubId, params.Xid)
+ return
+ }
+ xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Forwarding response to requestor...", int(payloadSeqNum), transaction.XappInstanceAddress, transaction.XappPort)
+
+ params.SubId = int(payloadSeqNum)
+ params.Xid = transaction.OrigParams.Xid
+ xapp.Logger.Info("Forwarding Subscription Delete Failure to UEEC: Mtype: %v, SubId: %v, Xid: %v, Meid: %v",params.Mtype, params.SubId, params.Xid, params.Meid)
+ err = c.rmrReplyToSender(params)
+ if err != nil {
+ xapp.Logger.Error("Failed to send response to requestor %v. SubId: %v, Xid: %s", err, params.SubId, params.Xid)
+// return
+ }
+
+ time.Sleep(3 * time.Second)
+
+ xapp.Logger.Info("Starting routing manager update")
+ subRouteAction := SubRouteInfo{DELETE, transaction.XappInstanceAddress, transaction.XappPort, payloadSeqNum}
+ c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
+
+ xapp.Logger.Info("Deleting trancaction record")
+ if c.registry.releaseSequenceNumber(payloadSeqNum) {
+ transaction, err = c.tracker.completeTransaction(payloadSeqNum, DELETE)
+ if err != nil {
+ xapp.Logger.Error("Failed to delete a Subscription Delete Request transaction record due to %v", err)
+ return
+ }
+ }
+ return
+}
+
+func (c *Control) handleSubscriptionDeleteRequestTimer(subId uint16) {
+ xapp.Logger.Info("Subscription Delete Request timer expired. SubId: %v",subId)
+/*
+ transaction, err := c.tracker.completeTransaction(subId, DELETE)
+ if err != nil {
+ xapp.Logger.Error("Failed to delete a Subscription Delete Request transaction record due to %v", err)
+ return
+ }
+ xapp.Logger.Info("SubId: %v, Xid %v, Meid: %v",subId, transaction.OrigParams.Xid, transaction.OrigParams.Meid)
+
+ var params xapp.RMRParams
+ params.Mtype = 12022 //xapp.RICMessageTypes["RIC_SUB_DEL_FAILURE"]
+ params.SubId = int(subId)
+ params.Meid = transaction.OrigParams.Meid
+ params.Xid = transaction.OrigParams.Xid
+ payload := []byte("40CA4018000003EA7E00050000010016EA6300020021EA74000200C0")
+ params.PayloadLen = len(payload)
+ params.Payload = payload
+
+ xapp.Logger.Info("Forwarding Subscription Delete Failure to UEEC: Mtype: %v, SubId: %v, Xid: %s, Meid: %v",params.Mtype, params.SubId, params.Xid, params.Meid)
+ err = c.rmrReplyToSender(¶ms)
+ if err != nil {
+ xapp.Logger.Error("Failed to send response to requestor %v. SubId: %v, Xid: %s", err, params.SubId, params.Xid)
+ }
+*/
+/*
+ time.Sleep(3 * time.Second)
+ xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Deleting transaction record", int(subId), transaction.XappInstanceAddress, transaction.XappPort)
+
+ xapp.Logger.Info("Starting routing manager update")
+ subRouteAction := SubRouteInfo{DELETE, transaction.XappInstanceAddress, transaction.XappPort, payloadSeqNum}
+ c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
+
+ xapp.Logger.Info("Deleting trancaction record")
+ if c.registry.releaseSequenceNumber(payloadSeqNum) {
+ transaction, err = c.tracker.completeTransaction(payloadSeqNum, DELETE)
+ if err != nil {
+ xapp.Logger.Error("Failed to delete a Subscription Delete Request transaction record due to %v", err)
+ return
+ }
+ }
+*/
+ return
+ }
diff --git a/pkg/control/e2ap.go b/pkg/control/e2ap.go
index 6f5edcb..f9580d9 100644
--- a/pkg/control/e2ap.go
+++ b/pkg/control/e2ap.go
@@ -117,3 +117,49 @@
newPayload = C.GoBytes(cptr, C.int(size))
return
}
+
+/* RICsubscriptionRequestFailure */
+
+func (c *E2ap) GetSubscriptionFailureSequenceNumber(payload []byte) (subId uint16, err error) {
+ cptr := unsafe.Pointer(&payload[0])
+ cret := C.e2ap_get_ric_subscription_failure_sequence_number(cptr, C.size_t(len(payload)))
+ if cret < 0 {
+ return 0, errors.New("e2ap wrapper is unable to get Subscirption Failure Sequence Number due to wrong or invalid payload")
+ }
+ subId = uint16(cret)
+ return
+}
+
+// This function is not used currently. Can be deleted if not needed
+func (c *E2ap) SetSubscriptionFailureSequenceNumber(payload []byte, newSubscriptionid uint16) (newPayload []byte, err error) {
+ cptr := unsafe.Pointer(&payload[0])
+ size := C.e2ap_set_ric_subscription_failure_sequence_number(cptr, C.size_t(len(payload)), C.long(newSubscriptionid))
+ if size < 0 {
+ return make([]byte, 0), errors.New("e2ap wrapper is unable to set Subscription Failure Sequence Number due to wrong or invalid payload")
+ }
+ newPayload = C.GoBytes(cptr, C.int(size))
+ return
+}
+
+/* RICsubscriptionDeleteFailure */
+
+func (c *E2ap) GetSubscriptionDeleteFailureSequenceNumber(payload []byte) (subId uint16, err error) {
+ cptr := unsafe.Pointer(&payload[0])
+ cret := C.e2ap_get_ric_subscription_delete_failure_sequence_number(cptr, C.size_t(len(payload)))
+ if cret < 0 {
+ return 0, errors.New("e2ap wrapper is unable to get Subscirption Delete Failure Sequence Number due to wrong or invalid payload")
+ }
+ subId = uint16(cret)
+ return
+}
+
+// This function is not used currently. Can be deleted if not needed
+func (c *E2ap) SetSubscriptionDeleteFailureSequenceNumber(payload []byte, newSubscriptionid uint16) (newPayload []byte, err error) {
+ cptr := unsafe.Pointer(&payload[0])
+ size := C.e2ap_set_ric_subscription_delete_failure_sequence_number(cptr, C.size_t(len(payload)), C.long(newSubscriptionid))
+ if size < 0 {
+ return make([]byte, 0), errors.New("e2ap wrapper is unable to set Subscription Delete Failure Sequence Number due to wrong or invalid payload")
+ }
+ newPayload = C.GoBytes(cptr, C.int(size))
+ return
+}
diff --git a/pkg/control/registry.go b/pkg/control/registry.go
index 6717612..98aa97e 100644
--- a/pkg/control/registry.go
+++ b/pkg/control/registry.go
@@ -19,7 +19,12 @@
package control
-import "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
+import (
+ "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
+ "sync"
+)
+
+var registryMutex = &sync.Mutex{}
type Registry struct {
register map[uint16]bool
@@ -33,16 +38,30 @@
}
// Reserves and returns the next free sequence number
-func (r *Registry) ReserveSequenceNumber() uint16 {
- if r.IsValidSequenceNumber(r.counter) { }
+func (r *Registry) ReserveSequenceNumber() (uint16, bool) {
+ // Check is current SequenceNumber valid
+ registryMutex.Lock()
+ defer registryMutex.Unlock()
sequenceNumber := r.counter
+ if _, ok := r.register[sequenceNumber]; ok {
+ xapp.Logger.Error("Invalid SeqenceNumber sequenceNumber: %v",sequenceNumber)
+ return sequenceNumber, false
+ }
r.register[sequenceNumber] = false
- r.shift()
- return sequenceNumber
+
+ // Allocate next SequenceNumber value
+ if r.counter == 65535 {
+ r.counter = 0
+ } else {
+ r.counter++
+ }
+ return sequenceNumber, true
}
// This function checks the validity of the given subscription id
func (r *Registry) IsValidSequenceNumber(sn uint16) bool {
+ registryMutex.Lock()
+ defer registryMutex.Unlock()
xapp.Logger.Debug("Registry map: %v", r.register)
if _, ok := r.register[sn]; ok {
return true
@@ -52,27 +71,25 @@
// This function sets the give id as confirmed in the register
func (r *Registry) setSubscriptionToConfirmed(sn uint16) {
+ registryMutex.Lock()
+ defer registryMutex.Unlock()
r.register[sn] = true
}
-func (r *Registry) shift() {
- if r.counter == 65535 {
- r.counter = 0
- } else {
- r.counter++
- }
-}
-
//This function sets the given id as unused in the register
func (r *Registry) deleteSubscription(sn uint16) {
+ registryMutex.Lock()
+ defer registryMutex.Unlock()
r.register[sn] = false
}
//This function releases the given id as unused in the register
func (r *Registry) releaseSequenceNumber(sn uint16) bool {
+ registryMutex.Lock()
+ defer registryMutex.Unlock()
if r.register[sn] {
return false
- } else {
+ } else {
delete(r.register, sn)
return true
}
diff --git a/pkg/control/timers.go b/pkg/control/timers.go
new file mode 100644
index 0000000..6f64c5b
--- /dev/null
+++ b/pkg/control/timers.go
@@ -0,0 +1,108 @@
+/*
+==================================================================================
+ Copyright (c) 2019 AT&T Intellectual Property.
+ Copyright (c) 2019 Nokia
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+==================================================================================
+*/
+
+package control
+
+import (
+ "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
+ "sync"
+ "time"
+)
+
+var timerMutex = &sync.Mutex{}
+
+type TimerInfo struct {
+ timerAddress *time.Timer
+ timerFunctionAddress func()
+}
+
+type TimerMap struct {
+ timer map[uint16] TimerInfo
+}
+
+// This method should run as a constructor
+func (t *TimerMap) Init() {
+ t.timer = make(map[uint16] TimerInfo)
+}
+
+func (t *TimerMap) StartTimer(subId uint16, expireAfterTime time.Duration, timerFunction func(subId uint16)) bool {
+ timerMutex.Lock()
+ defer timerMutex.Unlock()
+ if (timerFunction == nil) {
+ xapp.Logger.Error("StartTimer() timerFunc == nil")
+ return false
+ }
+
+ // Stop timer if there is already timer running with the same id
+ if val, ok := t.timer[subId]; ok {
+ xapp.Logger.Error("StartTimer() old timer found")
+ if val.timerAddress != nil {
+ xapp.Logger.Error("StartTimer() deleting old timer")
+ val.timerAddress.Stop()
+ }
+ delete(t.timer, subId)
+ }
+
+ // Store timer + timer function excecutor function and the function to be excecuted when timer expires, in map
+ t.timer[subId] = TimerInfo{timerAddress: time.AfterFunc(expireAfterTime, func(){t.timerFunctionExcecutor(subId)}),
+ timerFunctionAddress: func(){timerFunction(subId)}}
+ return true
+}
+
+func (t *TimerMap) StopTimer(subId uint16) bool {
+ timerMutex.Lock()
+ defer timerMutex.Unlock()
+ if val, ok := t.timer[subId]; ok {
+ if val.timerAddress != nil {
+ val.timerAddress.Stop()
+ delete(t.timer, subId)
+ return true
+ } else {
+ xapp.Logger.Error("StopTimer() timerAddress == nil")
+ return false
+ }
+ } else {
+ xapp.Logger.Info("StopTimer() Timer not found. May be expired or stopped already. subId: %v",subId)
+ return false
+ }
+}
+
+func (t *TimerMap) timerFunctionExcecutor(subId uint16) {
+ timerMutex.Lock()
+ if val, ok := t.timer[subId]; ok {
+ if val.timerFunctionAddress != nil {
+ // Take local copy of timer function address
+ f := val.timerFunctionAddress
+ // Delete timer instance from map
+ delete(t.timer, subId)
+ timerMutex.Unlock()
+ // Excecute the timer function
+ f()
+ return
+ } else {
+ xapp.Logger.Error("timerExcecutorFunc() timerFunctionAddress == nil")
+ timerMutex.Unlock()
+ return
+ }
+ } else {
+ xapp.Logger.Error("timerExcecutorFunc() Timer not anymore in map. subId: %v",subId)
+ timerMutex.Unlock()
+ return
+ }
+}
diff --git a/pkg/control/tracker.go b/pkg/control/tracker.go
index eddfbda..dfab96e 100644
--- a/pkg/control/tracker.go
+++ b/pkg/control/tracker.go
@@ -21,8 +21,11 @@
import (
"fmt"
+ "sync"
)
+var trackerMutex = &sync.Mutex{}
+
/*
Implements a record of ongoing transactions and helper functions to CRUD the records.
*/
@@ -39,6 +42,8 @@
Returns error if there is similar transatcion ongoing.
*/
func (t *Tracker) TrackTransaction(key TransactionKey, xact Transaction) error {
+ trackerMutex.Lock()
+ defer trackerMutex.Unlock()
if _, ok := t.transactionTable[key]; ok {
// TODO: Implement merge related check here. If the key is same but the value is different.
err := fmt.Errorf("transaction tracker: Similar transaction with sub id %d and type %s is ongoing", key.SubID, key.transType)
@@ -54,6 +59,8 @@
*/
func (t *Tracker) UpdateTransaction(SubID uint16, transType Action, xact Transaction) error {
key := TransactionKey{SubID, transType}
+ trackerMutex.Lock()
+ defer trackerMutex.Unlock()
if _, ok := t.transactionTable[key]; ok {
// TODO: Implement merge related check here. If the key is same but the value is different.
err := fmt.Errorf("transaction tracker: Similar transaction with sub id %d and type %v is ongoing", key.SubID, key.transType)
@@ -69,6 +76,8 @@
*/
func (t *Tracker) RetriveTransaction(subID uint16, act Action) (Transaction, error) {
key := TransactionKey{subID, act}
+ trackerMutex.Lock()
+ defer trackerMutex.Unlock()
var xact Transaction
if xact, ok := t.transactionTable[key]; ok {
return xact, nil
@@ -84,6 +93,8 @@
func (t *Tracker) completeTransaction(subID uint16, act Action) (Transaction, error) {
key := TransactionKey{subID, act}
var emptyTransaction Transaction
+ trackerMutex.Lock()
+ defer trackerMutex.Unlock()
if xact, ok := t.transactionTable[key]; ok {
delete(t.transactionTable, key)
return xact, nil