Subscription manager v0.10.0
Contains v0.8.0-v0.10.0
Handle RICsubscriptionDeleteResponse message
Communicating RICsubscriptionDeleteResponse to routing manager
Updated transaction handling
Tracking Mbuf in transaction table
Change-Id: I0d4964b7bd717941a0e50ede3e9a878590079141
Signed-off-by: kalnagy <kalman.nagy@nokia.com>
diff --git a/pkg/control/client.go b/pkg/control/client.go
index 598c7ef..1c07ad4 100644
--- a/pkg/control/client.go
+++ b/pkg/control/client.go
@@ -32,6 +32,7 @@
type RtmgrClient struct {
rtClient *rtmgrclient.RoutingManager
xappHandleParams *rtmgrhandle.ProvideXappSubscriptionHandleParams
+ xappDeleteParams *rtmgrhandle.DeleteXappSubscriptionHandleParams
}
func (rc *RtmgrClient) SubscriptionRequestUpdate() error {
@@ -54,6 +55,15 @@
xapp.Logger.Info("Succesfully updated routing manager about the subscription: %d", subID)
return nil
}
+ case DELETE:
+ _, _, deleteErr := rc.rtClient.Handle.DeleteXappSubscriptionHandle(rc.xappDeleteParams.WithXappSubscriptionData(&xappSubReq))
+ if deleteErr != nil && !(strings.Contains(deleteErr.Error(), "status 200")) {
+ xapp.Logger.Error("Deleting subscription id = %d in routing manager, failed with error: %v", subID, deleteErr)
+ return deleteErr
+ } else {
+ xapp.Logger.Info("Succesfully deleted subscription: %d in routing manager.", subID)
+ return nil
+ }
default:
return nil
}
diff --git a/pkg/control/control.go b/pkg/control/control.go
index f6cd771..ec6419e 100644
--- a/pkg/control/control.go
+++ b/pkg/control/control.go
@@ -53,17 +53,6 @@
EnbID string
}
-type RMRParams struct {
- Mtype int
- Payload []byte
- PayloadLen int
- Meid *RMRMeid
- Xid string
- SubId int
- Src string
- Mbuf *C.rmr_mbuf_t
-}
-
var SEEDSN uint16
var SubscriptionReqChan = make(chan subRouteInfo, 10)
@@ -98,7 +87,8 @@
transport := httptransport.New(viper.GetString("rtmgr.HostAddr") + ":" + viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
client := rtmgrclient.New(transport, strfmt.Default)
handle := rtmgrhandle.NewProvideXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
- rtmgrClient := RtmgrClient{client, handle}
+ delete_handle := rtmgrhandle.NewDeleteXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
+ rtmgrClient := RtmgrClient{client, handle, delete_handle}
return Control{new(E2ap), registry, &rtmgrClient, tracker}
}
@@ -115,6 +105,8 @@
err = c.handleSubscriptionResponse(rp)
case C.RIC_SUB_DEL_REQ:
err = c.handleSubscriptionDeleteRequest(rp)
+ case C.RIC_SUB_DEL_RESP:
+ err = c.handleSubscriptionDeleteResponse(rp)
default:
err = errors.New("Message Type " + strconv.Itoa(rp.Mtype) + " is discarded")
}
@@ -128,6 +120,13 @@
return
}
+func (c *Control) rmrReplyToSender(params *xapp.RMRParams) (err error) {
+ if !xapp.Rmr.Send(params, true) {
+ err = errors.New("rmr.Send() failed")
+ }
+ return
+}
+
func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) (err error) {
payload_seq_num, err := c.e2ap.GetSubscriptionRequestSequenceNumber(params.Payload)
if err != nil {
@@ -153,7 +152,7 @@
/* Create transatcion records for every subscription request */
xact_key := Transaction_key{new_sub_id, CREATE}
- xact_value := Transaction{*src_addr, *src_port, params.Payload}
+ xact_value := Transaction{*src_addr, *src_port, params.Payload, params.Mbuf}
err = c.tracker.Track_transaction(xact_key, xact_value)
if err != nil {
xapp.Logger.Error("Failed to create a transaction record due to %v", err)
@@ -170,6 +169,7 @@
xapp.Logger.Info("Generated ID: %v. Forwarding to E2 Termination...", int(new_sub_id))
c.rmrSend(params)
+ xapp.Logger.Info("--- Debugging transaction table = %v", c.tracker.transaction_table)
return
}
@@ -186,7 +186,14 @@
}
c.registry.setSubscriptionToConfirmed(payload_seq_num)
xapp.Logger.Info("Subscription Response Registered. Forwarding to Requestor...")
- c.rmrSend(params)
+ transaction, err := c.tracker.complete_transaction(payload_seq_num, DELETE)
+ if err != nil {
+ xapp.Logger.Error("Failed to create a transaction record due to %v", err)
+ return
+ }
+ xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Forwarding to E2 Termination...", int(payload_seq_num), transaction.Xapp_instance_address, transaction.Xapp_port)
+ params.Mbuf = transaction.Mbuf
+ c.rmrReplyToSender(params)
return
}
@@ -221,8 +228,47 @@
xapp.Logger.Info("Subscription Delete Request Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payload_seq_num)
if c.registry.IsValidSequenceNumber(payload_seq_num) {
c.registry.deleteSubscription(payload_seq_num)
+ trackErr := c.trackDeleteTransaction(params, payload_seq_num)
+ if trackErr != nil {
+ xapp.Logger.Error("Failed to create a transaction record due to %v", err)
+ return trackErr
+ }
}
xapp.Logger.Info("Subscription ID: %v. Forwarding to E2 Termination...", int(payload_seq_num))
c.rmrSend(params)
return
}
+
+func (c *Control) trackDeleteTransaction(params *xapp.RMRParams, payload_seq_num uint16) (err error) {
+ src_addr, src_port, err := c.rtmgrClient.SplitSource(params.Src)
+ xact_key := Transaction_key{payload_seq_num, DELETE}
+ xact_value := Transaction{*src_addr, *src_port, params.Payload, params.Mbuf}
+ err = c.tracker.Track_transaction(xact_key, xact_value)
+ return
+}
+
+func (c *Control) handleSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
+ payload_seq_num, err := c.e2ap.GetSubscriptionDeleteResponseSequenceNumber(params.Payload)
+ if err != nil {
+ err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
+ return
+ }
+ var transaction , _= c.tracker.Retrive_transaction(payload_seq_num, DELETE)
+ sub_route_action := subRouteInfo{DELETE, transaction.Xapp_instance_address, transaction.Xapp_port, payload_seq_num }
+ go c.rtmgrClient.SubscriptionRequestUpdate()
+ SubscriptionReqChan <- sub_route_action
+
+ xapp.Logger.Info("Subscription Delete Response Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payload_seq_num)
+ if c.registry.releaseSequenceNumber(payload_seq_num) {
+ transaction, err = c.tracker.complete_transaction(payload_seq_num, DELETE)
+ if err != nil {
+ xapp.Logger.Error("Failed to create a transaction record due to %v", err)
+ return
+ }
+ xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Forwarding to E2 Termination...", int(payload_seq_num), transaction.Xapp_instance_address, transaction.Xapp_port)
+ //params.Src = xAddress + ":" + strconv.Itoa(int(xPort))
+ params.Mbuf = transaction.Mbuf
+ c.rmrReplyToSender(params)
+ }
+ return
+}
diff --git a/pkg/control/registry.go b/pkg/control/registry.go
index c349921..2b04f38 100644
--- a/pkg/control/registry.go
+++ b/pkg/control/registry.go
@@ -71,6 +71,11 @@
}
//This function releases the given id as unused in the register
-//func (r *Registry) releaseSequenceNumber(sn uint16) {
-// delete(r.register, sn)
-//}
\ No newline at end of file
+func (r *Registry) releaseSequenceNumber(sn uint16) bool {
+ if r.register[sn] {
+ return false
+ } else {
+ delete(r.register, sn)
+ return true
+ }
+}
\ No newline at end of file
diff --git a/pkg/control/tracker.go b/pkg/control/tracker.go
index fdbbeaf..e08f8db 100644
--- a/pkg/control/tracker.go
+++ b/pkg/control/tracker.go
@@ -21,7 +21,6 @@
import (
"fmt"
-// "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
)
/*
@@ -53,6 +52,21 @@
Retreives the transaction table entry for the given request.
Returns error in case the transaction cannot be found.
*/
+func (t *Tracker) Update_transaction(SubID uint16, trans_type Action, xact Transaction) error{
+ key := Transaction_key{SubID, trans_type}
+ if _, ok := t.transaction_table[key]; ok {
+ // TODO: Implement merge related check here. If the key is same but the value is different.
+ err := fmt.Errorf("Transaction tracker: Similar transaction with sub id %d and type %s is ongoing", key.SubID, key.trans_type )
+ return err
+ }
+ t.transaction_table[key] = xact
+ return nil
+}
+
+/*
+Retreives the transaction table entry for the given request.
+Returns error in case the transaction cannot be found.
+*/
func (t *Tracker) Retrive_transaction(subID uint16, act Action) (Transaction, error){
key := Transaction_key{subID, act}
var xact Transaction
@@ -67,14 +81,13 @@
Deletes the transaction table entry for the given request and returns the deleted xapp's address and port for reference.
Returns error in case the transaction cannot be found.
*/
-func (t *Tracker) complete_transaction(subID uint16, act Action) (string, uint16, error){
+func (t *Tracker) complete_transaction(subID uint16, act Action) (Transaction, error){
key := Transaction_key{subID, act}
- var empty_address string
- var empty_port uint16
+ var empty_transaction Transaction
if xact, ok := t.transaction_table[key]; ok {
delete(t.transaction_table, key)
- return xact.Xapp_instance_address, xact.Xapp_port, nil
+ return xact, nil
}
err := fmt.Errorf("Tranaction record for Subscription ID %d and action %s does not exist", subID, act)
- return empty_address, empty_port, err
+ return empty_transaction, err
}
diff --git a/pkg/control/types.go b/pkg/control/types.go
index 1a2c92f..d12233c 100644
--- a/pkg/control/types.go
+++ b/pkg/control/types.go
@@ -19,6 +19,10 @@
package control
+import (
+ "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
+)
+
type RmrDatagram struct {
MessageType int
SubscriptionId uint16
@@ -44,4 +48,5 @@
Xapp_instance_address string
Xapp_port uint16
Ric_sub_req []byte
+ Mbuf *xapp.RMRMbuf
}