0.7.1 Version of submgr

Introducing Routing Manager integration from 0.6.0
Transaction handling for Subscription Requests
DeleteRequest Handling

Change-Id: I029ee51c16e00feb90f61277135b06e784657002
Signed-off-by: kalnagy <kalman.nagy@nokia.com>
diff --git a/pkg/control/client.go b/pkg/control/client.go
new file mode 100644
index 0000000..598c7ef
--- /dev/null
+++ b/pkg/control/client.go
@@ -0,0 +1,76 @@
+/*
+==================================================================================
+  Copyright (c) 2019 AT&T Intellectual Property.
+  Copyright (c) 2019 Nokia
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+package control
+
+import (
+	rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
+	rtmgrhandle "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client/handle"
+	"gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_models"
+	"strings"
+	"strconv"
+	"errors"
+	"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
+)
+
+type RtmgrClient struct {
+	rtClient         *rtmgrclient.RoutingManager
+	xappHandleParams *rtmgrhandle.ProvideXappSubscriptionHandleParams
+}
+
+func (rc *RtmgrClient) SubscriptionRequestUpdate() error {
+	xapp.Logger.Debug("SubscriptionRequestUpdate() invoked")
+	subRouteAction := <-SubscriptionReqChan
+	// Routing manager handles subscription id as int32 to accomodate -1 and uint16 values
+	subID := int32(subRouteAction.SubID)
+
+	xapp.Logger.Debug("Subscription action details received: ", subRouteAction)
+
+	xappSubReq := rtmgr_models.XappSubscriptionData{&subRouteAction.Address, &subRouteAction.Port, &subID}
+
+	switch subRouteAction.Command {
+	case CREATE:
+		_, postErr := rc.rtClient.Handle.ProvideXappSubscriptionHandle(rc.xappHandleParams.WithXappSubscriptionData(&xappSubReq))
+		if postErr != nil && !(strings.Contains(postErr.Error(), "status 200"))  {
+			xapp.Logger.Error("Updating routing manager about subscription id = %d failed with error: %v", subID, postErr)
+			return postErr
+		} else {
+			xapp.Logger.Info("Succesfully updated routing manager about the subscription: %d", subID)
+			return nil
+		}
+	default:
+		return nil
+	}
+}
+
+func (rc *RtmgrClient) SplitSource(src string) (*string, *uint16, error) {
+	tcpSrc := strings.Split(src, ":")
+	if len(tcpSrc) != 2 {
+		err := errors.New("Unable to get the source details of the xapp. Check the source string received from the rmr.")
+		return nil, nil, err
+	}
+	srcAddr := tcpSrc[0]
+	xapp.Logger.Info("---Debugging Inside splitsource tcpsrc[0] = %s and tcpsrc[1]= %s ", tcpSrc[0], tcpSrc[1])
+	srcPort, err := strconv.ParseUint(tcpSrc[1], 10, 16)
+	if err != nil {
+		return nil, nil, err
+	}
+	srcPortInt := uint16(srcPort)
+	return &srcAddr, &srcPortInt, nil
+}
diff --git a/pkg/control/control.go b/pkg/control/control.go
index 9a5022f..27e3fbe 100644
--- a/pkg/control/control.go
+++ b/pkg/control/control.go
@@ -21,6 +21,7 @@
 
 /*
 #include <rmr/RIC_message_types.h>
+#include <rmr/rmr.h>
 
 #cgo CFLAGS: -I../
 #cgo LDFLAGS: -lrmr_nng -lnng
@@ -31,17 +32,46 @@
 	"errors"
 	"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
 	"github.com/spf13/viper"
+	"github.com/go-openapi/strfmt"
+	httptransport "github.com/go-openapi/runtime/client"
+	rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
+	rtmgrhandle "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client/handle"
 	"math/rand"
 	"strconv"
 	"time"
 )
 
 type Control struct {
-	e2ap     *E2ap
-	registry *Registry
+	e2ap        *E2ap
+	registry    *Registry
+	rtmgrClient *RtmgrClient
+	tracker     *Tracker
+}
+
+type RMRMeid struct {
+	PlmnID string
+	EnbID  string
+}
+
+type RMRParams struct {
+	Mtype           int
+	Payload         []byte
+	PayloadLen      int
+	Meid            *RMRMeid
+	Xid             string
+	SubId           int
+	Src             string
+	Mbuf            *C.rmr_mbuf_t
 }
 
 var SEEDSN uint16
+var SubscriptionReqChan = make(chan subRouteInfo, 10)
+
+const (
+	CREATE Action = 0
+	MERGE Action = 1
+	DELETE Action = 3
+)
 
 func init() {
 	viper.AutomaticEnv()
@@ -61,63 +91,135 @@
 func NewControl() Control {
 	registry := new(Registry)
 	registry.Initialize(SEEDSN)
-	return Control{new(E2ap), registry}
+
+	transport := httptransport.New(viper.GetString("rtmgr.HostAddr") + ":" + viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
+	client := rtmgrclient.New(transport, strfmt.Default)
+	handle := rtmgrhandle.NewProvideXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
+	rtmgrClient := RtmgrClient{client, handle}
+
+	return Control{new(E2ap), registry, &rtmgrClient, new(Tracker)}
 }
 
 func (c *Control) Run() {
 	xapp.Run(c)
 }
 
-func (c *Control) Consume(mtype, sub_id int, len int, payload []byte) (err error) {
-	switch mtype {
+func (c *Control) Consume(rp *xapp.RMRParams) (err error) {
+	switch rp.Mtype {
 	case C.RIC_SUB_REQ:
-		err = c.handleSubscriptionRequest(&RmrDatagram{mtype, uint16(sub_id), payload})
+		err = c.handleSubscriptionRequest(rp)
 	case C.RIC_SUB_RESP:
-		err = c.handleSubscriptionResponse(&RmrDatagram{mtype, uint16(sub_id), payload})
+		err = c.handleSubscriptionResponse(rp)
+	case C.RIC_SUB_DEL_REQ:
+		err = c.handleSubscriptionDeleteRequest(rp)
 	default:
-		err = errors.New("Message Type " + strconv.Itoa(mtype) + " is discarded")
+		err = errors.New("Message Type " + strconv.Itoa(rp.Mtype) + " is discarded")
 	}
 	return
 }
 
-func (c *Control) rmrSend(datagram *RmrDatagram) (err error) {
-	if !xapp.Rmr.Send(datagram.MessageType, int(datagram.SubscriptionId), len(datagram.Payload), datagram.Payload) {
+func (c *Control) rmrSend(params *xapp.RMRParams) (err error) {
+	if !xapp.Rmr.Send(params, false) {
 		err = errors.New("rmr.Send() failed")
 	}
 	return
 }
 
-func (c *Control) handleSubscriptionRequest(datagram *RmrDatagram) (err error) {
-	payload_seq_num, err := c.e2ap.GetSubscriptionRequestSequenceNumber(datagram.Payload)
+func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) (err error) {
+	payload_seq_num, err := c.e2ap.GetSubscriptionRequestSequenceNumber(params.Payload)
 	if err != nil {
 		err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
 		return
 	}
-	xapp.Logger.Info("Subscription Request Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", datagram.SubscriptionId, payload_seq_num)
+	xapp.Logger.Info("Subscription Request Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payload_seq_num)
+
+	/* Reserve a sequence number and set it in the payload */
 	new_sub_id := c.registry.ReserveSequenceNumber()
-	payload, err := c.e2ap.SetSubscriptionRequestSequenceNumber(datagram.Payload, new_sub_id)
+
+	_, err = c.e2ap.SetSubscriptionRequestSequenceNumber(params.Payload, new_sub_id)
 	if err != nil {
 		err = errors.New("Unable to set Subscription Sequence Number in Payload due to: " + err.Error())
 		return
 	}
+
+	src_addr, src_port, err := c.rtmgrClient.SplitSource(params.Src)
+	if err != nil {
+		xapp.Logger.Error("Failed to update routing-manager about the subscription request with reason: %s", err)
+		return
+	}
+
+	/* Create transatcion records for every subscription request */
+	xact_key := Transaction_key{new_sub_id, CREATE}
+	xact_value := Transaction{*src_addr, *src_port, params.Payload}
+	err = c.tracker.Track_transaction(xact_key, &xact_value)
+	if err != nil {
+		xapp.Logger.Error("Failed to create a transaction record due to %v", err)
+		return
+	}
+
+	/* Update routing manager about the new subscription*/
+	sub_route_action := subRouteInfo{CREATE, *src_addr, *src_port, new_sub_id }
+	go c.rtmgrClient.SubscriptionRequestUpdate()
+	SubscriptionReqChan <- sub_route_action
+
+	// Setting new subscription ID in the RMR header
+	params.SubId = int(new_sub_id)
+
 	xapp.Logger.Info("Generated ID: %v. Forwarding to E2 Termination...", int(new_sub_id))
-	c.rmrSend(&RmrDatagram{C.RIC_SUB_REQ, new_sub_id, payload})
+	c.rmrSend(params)
 	return
 }
 
-func (c *Control) handleSubscriptionResponse(datagram *RmrDatagram) (err error) {
-	payload_seq_num, err := c.e2ap.GetSubscriptionResponseSequenceNumber(datagram.Payload)
+func (c *Control) handleSubscriptionResponse(params *xapp.RMRParams) (err error) {
+	payload_seq_num, err := c.e2ap.GetSubscriptionResponseSequenceNumber(params.Payload)
 	if err != nil {
 		err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
 		return
 	}
-	xapp.Logger.Info("Subscription Response Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", datagram.SubscriptionId, payload_seq_num)
+	xapp.Logger.Info("Subscription Response Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payload_seq_num)
 	if !c.registry.IsValidSequenceNumber(payload_seq_num) {
 		err = errors.New("Unknown Subscription ID: " + strconv.Itoa(int(payload_seq_num)) + " in Subscritpion Response. Message discarded.")
 		return
 	}
 	c.registry.setSubscriptionToConfirmed(payload_seq_num)
 	xapp.Logger.Info("Subscription Response Registered. Forwarding to Requestor...")
-	c.rmrSend(&RmrDatagram{C.RIC_SUB_RESP, payload_seq_num, datagram.Payload})
+	c.rmrSend(params)
+	return
+}
+
+func (act Action) String() string {
+	actions := [...]string{
+		"CREATE",
+		"MERGE",
+		"DELETE",
+	}
+
+	if act < CREATE || act > DELETE {
+		return "Unknown"
+	}
+	return actions[act]
+}
+
+func (act Action) valid() bool {
+	switch act {
+	case CREATE, MERGE, DELETE:
+		return true
+	default:
+		return false
+	}
+}
+
+func (c *Control) handleSubscriptionDeleteRequest(params *xapp.RMRParams) (err error) {
+	payload_seq_num, err := c.e2ap.GetSubscriptionDeleteRequestSequenceNumber(params.Payload)
+	if err != nil {
+		err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
+		return
+	}
+	xapp.Logger.Info("Subscription Delete Request Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payload_seq_num)
+	if c.registry.IsValidSequenceNumber(payload_seq_num) {
+		c.registry.deleteSubscription(payload_seq_num)
+	}
+	xapp.Logger.Info("Subscription ID: %v. Forwarding to E2 Termination...", int(payload_seq_num))
+	c.rmrSend(params)
 	return
 }
diff --git a/pkg/control/e2ap.go b/pkg/control/e2ap.go
index c6b5e28..9ecef4e 100644
--- a/pkg/control/e2ap.go
+++ b/pkg/control/e2ap.go
@@ -73,3 +73,47 @@
 	new_payload = C.GoBytes(cptr, C.int(size))
 	return
 }
+
+/* RICsubscriptionDeleteRequest */
+
+func (c *E2ap) GetSubscriptionDeleteRequestSequenceNumber(payload []byte) (sub_id uint16, err error) {
+	cptr := unsafe.Pointer(&payload[0])
+	cret := C.e2ap_get_ric_subscription_delete_request_sequence_number(cptr, C.size_t(len(payload)))
+	if cret < 0 {
+		return 0, errors.New("e2ap wrapper is unable to get Subscirption Delete Request Sequence Number due to wrong or invalid payload")
+	}
+	sub_id = uint16(cret)
+	return
+}
+
+func (c *E2ap) SetSubscriptionDeleteRequestSequenceNumber(payload []byte, newSubscriptionid uint16) (new_payload []byte, err error) {
+	cptr := unsafe.Pointer(&payload[0])
+	size := C.e2ap_set_ric_subscription_delete_request_sequence_number(cptr, C.size_t(len(payload)), C.long(newSubscriptionid))
+	if size < 0 {
+		return make([]byte, 0), errors.New("e2ap wrapper is unable to set Subscirption Request Sequence Number due to wrong or invalid payload")
+	}
+	new_payload = C.GoBytes(cptr, C.int(size))
+	return
+}
+
+/* RICsubscriptionDeleteResponse */
+
+func (c *E2ap) GetSubscriptionDeleteResponseSequenceNumber(payload []byte) (sub_id uint16, err error) {
+	cptr := unsafe.Pointer(&payload[0])
+	cret := C.e2ap_get_ric_subscription_delete_response_sequence_number(cptr, C.size_t(len(payload)))
+	if cret < 0 {
+		return 0, errors.New("e2ap wrapper is unable to get Subscirption Delete Response Sequence Number due to wrong or invalid payload")
+	}
+	sub_id = uint16(cret)
+	return
+}
+
+func (c *E2ap) SetSubscriptionDeleteResponseSequenceNumber(payload []byte, newSubscriptionid uint16) (new_payload []byte, err error) {
+	cptr := unsafe.Pointer(&payload[0])
+	size := C.e2ap_set_ric_subscription_delete_response_sequence_number(cptr, C.size_t(len(payload)), C.long(newSubscriptionid))
+	if size < 0 {
+		return make([]byte, 0), errors.New("e2ap wrapper is unable to set Subscirption Reponse Sequence Number due to wrong or invalid payload")
+	}
+	new_payload = C.GoBytes(cptr, C.int(size))
+	return
+}
diff --git a/pkg/control/registry.go b/pkg/control/registry.go
index d182b3e..c349921 100644
--- a/pkg/control/registry.go
+++ b/pkg/control/registry.go
@@ -34,6 +34,9 @@
 
 // Reserves and returns the next free sequence number
 func (r *Registry) ReserveSequenceNumber() uint16 {
+	if r.IsValidSequenceNumber(r.counter){
+
+	}
 	sequenceNumber := r.counter
 	r.register[sequenceNumber] = false
 	r.shift()
@@ -61,3 +64,13 @@
 		r.counter++
 	}
 }
+
+//This function sets the given id as unused in the register
+func (r *Registry) deleteSubscription(sn uint16) {
+	r.register[sn] = false
+}
+
+//This function releases the given id as unused in the register
+//func (r *Registry) releaseSequenceNumber(sn uint16) {
+//	delete(r.register, sn)
+//}
\ No newline at end of file
diff --git a/pkg/control/tracker.go b/pkg/control/tracker.go
new file mode 100644
index 0000000..bd062da
--- /dev/null
+++ b/pkg/control/tracker.go
@@ -0,0 +1,73 @@
+/*
+==================================================================================
+  Copyright (c) 2019 AT&T Intellectual Property.
+  Copyright (c) 2019 Nokia
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+package control
+
+import (
+	"fmt"
+//	"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
+)
+
+/*
+Implements a record of ongoing transactions and helper functions to CRUD the records.
+*/
+type Tracker struct {
+	transaction_table map[Transaction_key]*Transaction
+}
+
+/*
+Checks if a tranascation with similar type has been ongoing. If not then creates one.
+Returns error if there is similar transatcion ongoing.
+*/
+func (t *Tracker) Track_transaction(key Transaction_key, xact *Transaction) error{
+	if _, ok := t.transaction_table[key]; ok {
+		// TODO: Implement merge related check here. If the key is same but the value is different.
+		err := fmt.Errorf("Transaction tracker: Similar transaction with sub id %d and type %s is ongoing", key.SubID, key.trans_type )
+		return err
+	}
+	t.transaction_table[key] = xact
+	return nil
+}
+
+/*
+Retreives the transaction table entry for the given request.
+Returns error in case the transaction cannot be found.
+*/
+func (t *Tracker) Retrive_transaction(subID uint16, act Action) (*Transaction, error){
+	key := Transaction_key{subID, act}
+	if xact, ok := t.transaction_table[key]; ok {
+		return xact, nil
+	}
+	err := fmt.Errorf("Tranaction record for Subscription ID %d and action %s does not exist", subID, act)
+	return nil, err
+}
+
+/*
+Deletes the transaction table entry for the given request and returns the deleted xapp's address and port for reference.
+Returns error in case the transaction cannot be found.
+*/
+func (t *Tracker) complete_transaction(subID uint16, act Action) (*string, *uint16, error){
+	key := Transaction_key{subID, act}
+	if xact, ok := t.transaction_table[key]; ok {
+		delete(t.transaction_table, key)
+		return &(xact.Xapp_instance_address), &(xact.Xapp_port), nil
+	}
+	err := fmt.Errorf("Tranaction record for Subscription ID %d and action %s does not exist", subID, act)
+	return nil, nil, err
+}
diff --git a/pkg/control/types.go b/pkg/control/types.go
index 2a4e9d5..1a2c92f 100644
--- a/pkg/control/types.go
+++ b/pkg/control/types.go
@@ -24,3 +24,24 @@
 	SubscriptionId uint16
 	Payload        []byte
 }
+
+type subRouteInfo struct {
+	Command  Action
+	Address  string
+	Port     uint16
+	SubID    uint16
+}
+
+type Action int
+
+type Transaction_key struct {
+	SubID      uint16
+	trans_type Action
+}
+
+type Transaction struct {
+//	Xapp_address          string
+	Xapp_instance_address string
+	Xapp_port             uint16
+	Ric_sub_req           []byte
+}