Subscription manager v0.10.0

Contains v0.8.0-v0.10.0
Handle RICsubscriptionDeleteResponse message
Communicating RICsubscriptionDeleteResponse to routing manager
Updated transaction handling
Tracking Mbuf in transaction table

Change-Id: I0d4964b7bd717941a0e50ede3e9a878590079141
Signed-off-by: kalnagy <kalman.nagy@nokia.com>
diff --git a/pkg/control/client.go b/pkg/control/client.go
index 598c7ef..1c07ad4 100644
--- a/pkg/control/client.go
+++ b/pkg/control/client.go
@@ -32,6 +32,7 @@
 type RtmgrClient struct {
 	rtClient         *rtmgrclient.RoutingManager
 	xappHandleParams *rtmgrhandle.ProvideXappSubscriptionHandleParams
+	xappDeleteParams *rtmgrhandle.DeleteXappSubscriptionHandleParams
 }
 
 func (rc *RtmgrClient) SubscriptionRequestUpdate() error {
@@ -54,6 +55,15 @@
 			xapp.Logger.Info("Succesfully updated routing manager about the subscription: %d", subID)
 			return nil
 		}
+	case DELETE:
+		_, _, deleteErr := rc.rtClient.Handle.DeleteXappSubscriptionHandle(rc.xappDeleteParams.WithXappSubscriptionData(&xappSubReq))
+		if deleteErr != nil && !(strings.Contains(deleteErr.Error(), "status 200"))  {
+			xapp.Logger.Error("Deleting subscription id = %d  in routing manager, failed with error: %v", subID, deleteErr)
+			return deleteErr
+		} else {
+			xapp.Logger.Info("Succesfully deleted subscription: %d in routing manager.", subID)
+			return nil
+		}
 	default:
 		return nil
 	}
diff --git a/pkg/control/control.go b/pkg/control/control.go
index f6cd771..ec6419e 100644
--- a/pkg/control/control.go
+++ b/pkg/control/control.go
@@ -53,17 +53,6 @@
 	EnbID  string
 }
 
-type RMRParams struct {
-	Mtype           int
-	Payload         []byte
-	PayloadLen      int
-	Meid            *RMRMeid
-	Xid             string
-	SubId           int
-	Src             string
-	Mbuf            *C.rmr_mbuf_t
-}
-
 var SEEDSN uint16
 var SubscriptionReqChan = make(chan subRouteInfo, 10)
 
@@ -98,7 +87,8 @@
 	transport := httptransport.New(viper.GetString("rtmgr.HostAddr") + ":" + viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
 	client := rtmgrclient.New(transport, strfmt.Default)
 	handle := rtmgrhandle.NewProvideXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
-	rtmgrClient := RtmgrClient{client, handle}
+	delete_handle := rtmgrhandle.NewDeleteXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
+	rtmgrClient := RtmgrClient{client, handle, delete_handle}
 
 	return Control{new(E2ap), registry, &rtmgrClient, tracker}
 }
@@ -115,6 +105,8 @@
 		err = c.handleSubscriptionResponse(rp)
 	case C.RIC_SUB_DEL_REQ:
 		err = c.handleSubscriptionDeleteRequest(rp)
+	case C.RIC_SUB_DEL_RESP:
+		err = c.handleSubscriptionDeleteResponse(rp)
 	default:
 		err = errors.New("Message Type " + strconv.Itoa(rp.Mtype) + " is discarded")
 	}
@@ -128,6 +120,13 @@
 	return
 }
 
+func (c *Control) rmrReplyToSender(params *xapp.RMRParams) (err error) {
+	if !xapp.Rmr.Send(params, true) {
+		err = errors.New("rmr.Send() failed")
+	}
+	return
+}
+
 func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) (err error) {
 	payload_seq_num, err := c.e2ap.GetSubscriptionRequestSequenceNumber(params.Payload)
 	if err != nil {
@@ -153,7 +152,7 @@
 
 	/* Create transatcion records for every subscription request */
 	xact_key := Transaction_key{new_sub_id, CREATE}
-	xact_value := Transaction{*src_addr, *src_port, params.Payload}
+	xact_value := Transaction{*src_addr, *src_port, params.Payload, params.Mbuf}
 	err = c.tracker.Track_transaction(xact_key, xact_value)
 	if err != nil {
 		xapp.Logger.Error("Failed to create a transaction record due to %v", err)
@@ -170,6 +169,7 @@
 
 	xapp.Logger.Info("Generated ID: %v. Forwarding to E2 Termination...", int(new_sub_id))
 	c.rmrSend(params)
+	xapp.Logger.Info("--- Debugging transaction table = %v", c.tracker.transaction_table)
 	return
 }
 
@@ -186,7 +186,14 @@
 	}
 	c.registry.setSubscriptionToConfirmed(payload_seq_num)
 	xapp.Logger.Info("Subscription Response Registered. Forwarding to Requestor...")
-	c.rmrSend(params)
+	transaction, err := c.tracker.complete_transaction(payload_seq_num, DELETE)
+	if err != nil {
+		xapp.Logger.Error("Failed to create a transaction record due to %v", err)
+		return
+	}
+	xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Forwarding to E2 Termination...", int(payload_seq_num), transaction.Xapp_instance_address, transaction.Xapp_port)
+	params.Mbuf = transaction.Mbuf
+	c.rmrReplyToSender(params)
 	return
 }
 
@@ -221,8 +228,47 @@
 	xapp.Logger.Info("Subscription Delete Request Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payload_seq_num)
 	if c.registry.IsValidSequenceNumber(payload_seq_num) {
 		c.registry.deleteSubscription(payload_seq_num)
+		trackErr := c.trackDeleteTransaction(params, payload_seq_num)
+		if trackErr != nil {
+			xapp.Logger.Error("Failed to create a transaction record due to %v", err)
+			return trackErr
+		}
 	}
 	xapp.Logger.Info("Subscription ID: %v. Forwarding to E2 Termination...", int(payload_seq_num))
 	c.rmrSend(params)
 	return
 }
+
+func (c *Control) trackDeleteTransaction(params *xapp.RMRParams, payload_seq_num uint16) (err error) {
+	src_addr, src_port, err := c.rtmgrClient.SplitSource(params.Src)
+	xact_key := Transaction_key{payload_seq_num, DELETE}
+	xact_value := Transaction{*src_addr, *src_port, params.Payload, params.Mbuf}
+	err = c.tracker.Track_transaction(xact_key, xact_value)
+	return
+}
+
+func (c *Control) handleSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
+	payload_seq_num, err := c.e2ap.GetSubscriptionDeleteResponseSequenceNumber(params.Payload)
+	if err != nil {
+		err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
+		return
+	}
+    var transaction , _= c.tracker.Retrive_transaction(payload_seq_num, DELETE)
+	sub_route_action := subRouteInfo{DELETE, transaction.Xapp_instance_address, transaction.Xapp_port, payload_seq_num }
+	go c.rtmgrClient.SubscriptionRequestUpdate()
+	SubscriptionReqChan <- sub_route_action
+
+	xapp.Logger.Info("Subscription Delete Response Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payload_seq_num)
+	if c.registry.releaseSequenceNumber(payload_seq_num) {
+		transaction, err = c.tracker.complete_transaction(payload_seq_num, DELETE)
+		if err != nil {
+			xapp.Logger.Error("Failed to create a transaction record due to %v", err)
+			return
+		}
+		xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Forwarding to E2 Termination...", int(payload_seq_num), transaction.Xapp_instance_address, transaction.Xapp_port)
+		//params.Src = xAddress + ":" + strconv.Itoa(int(xPort))
+		params.Mbuf = transaction.Mbuf
+		c.rmrReplyToSender(params)
+	}
+	return
+}
diff --git a/pkg/control/registry.go b/pkg/control/registry.go
index c349921..2b04f38 100644
--- a/pkg/control/registry.go
+++ b/pkg/control/registry.go
@@ -71,6 +71,11 @@
 }
 
 //This function releases the given id as unused in the register
-//func (r *Registry) releaseSequenceNumber(sn uint16) {
-//	delete(r.register, sn)
-//}
\ No newline at end of file
+func (r *Registry) releaseSequenceNumber(sn uint16) bool {
+	if r.register[sn] {
+		return false
+	} else {
+		delete(r.register, sn)
+		return true
+	}
+}
\ No newline at end of file
diff --git a/pkg/control/tracker.go b/pkg/control/tracker.go
index fdbbeaf..e08f8db 100644
--- a/pkg/control/tracker.go
+++ b/pkg/control/tracker.go
@@ -21,7 +21,6 @@
 
 import (
 	"fmt"
-//	"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
 )
 
 /*
@@ -53,6 +52,21 @@
 Retreives the transaction table entry for the given request.
 Returns error in case the transaction cannot be found.
 */
+func (t *Tracker) Update_transaction(SubID uint16, trans_type Action, xact Transaction) error{
+	key := Transaction_key{SubID, trans_type}
+	if _, ok := t.transaction_table[key]; ok {
+		// TODO: Implement merge related check here. If the key is same but the value is different.
+		err := fmt.Errorf("Transaction tracker: Similar transaction with sub id %d and type %s is ongoing", key.SubID, key.trans_type )
+		return err
+	}
+	t.transaction_table[key] = xact
+	return nil
+}
+
+/*
+Retreives the transaction table entry for the given request.
+Returns error in case the transaction cannot be found.
+*/
 func (t *Tracker) Retrive_transaction(subID uint16, act Action) (Transaction, error){
 	key := Transaction_key{subID, act}
 	var xact Transaction
@@ -67,14 +81,13 @@
 Deletes the transaction table entry for the given request and returns the deleted xapp's address and port for reference.
 Returns error in case the transaction cannot be found.
 */
-func (t *Tracker) complete_transaction(subID uint16, act Action) (string, uint16, error){
+func (t *Tracker) complete_transaction(subID uint16, act Action) (Transaction, error){
 	key := Transaction_key{subID, act}
-	var empty_address string
-	var empty_port uint16
+	var empty_transaction Transaction
 	if xact, ok := t.transaction_table[key]; ok {
 		delete(t.transaction_table, key)
-		return xact.Xapp_instance_address, xact.Xapp_port, nil
+		return xact, nil
 	}
 	err := fmt.Errorf("Tranaction record for Subscription ID %d and action %s does not exist", subID, act)
-	return empty_address, empty_port, err
+	return empty_transaction, err
 }
diff --git a/pkg/control/types.go b/pkg/control/types.go
index 1a2c92f..d12233c 100644
--- a/pkg/control/types.go
+++ b/pkg/control/types.go
@@ -19,6 +19,10 @@
 
 package control
 
+import (
+	"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
+)
+
 type RmrDatagram struct {
 	MessageType    int
 	SubscriptionId uint16
@@ -44,4 +48,5 @@
 	Xapp_instance_address string
 	Xapp_port             uint16
 	Ric_sub_req           []byte
+	Mbuf                  *xapp.RMRMbuf
 }