xapp-frame v0.8.2 integration to submgr

Signed-off-by: Konstantinos Archangelof <konstantinos.archangelof@nokia.com>
Change-Id: Ief6d20c298b98b0e344ff011542f6e50492dc8ac
diff --git a/pkg/control/control.go b/pkg/control/control.go
index 0c2f917..325427b 100755
--- a/pkg/control/control.go
+++ b/pkg/control/control.go
@@ -233,9 +233,7 @@
 	c.CntRecvMsg++
 	c.UpdateCounter(cRestSubReqFromXapp)
 
-	restSubId := ksuid.New().String()
 	subResp := models.SubscriptionResponse{}
-	subResp.SubscriptionID = &restSubId
 	p := params.(*models.SubscriptionParams)
 
 	if c.LoggerLevel > 2 {
@@ -254,14 +252,28 @@
 		c.UpdateCounter(cRestSubFailToXapp)
 		return nil, err
 	}
+	var restSubId string
+	var restSubscription *RESTSubscription
+	if p.SubscriptionID == "" {
+		restSubId = ksuid.New().String()
+		restSubscription, err = c.registry.CreateRESTSubscription(&restSubId, &xAppRmrEndpoint, p.Meid)
+		if err != nil {
+			xapp.Logger.Error("%s", err.Error())
+			c.UpdateCounter(cRestSubFailToXapp)
+			return nil, err
+		}
 
-	restSubscription, err := c.registry.CreateRESTSubscription(&restSubId, &xAppRmrEndpoint, p.Meid)
-	if err != nil {
-		xapp.Logger.Error("%s", err.Error())
-		c.UpdateCounter(cRestSubFailToXapp)
-		return nil, err
+	} else {
+		restSubId = p.SubscriptionID
+		restSubscription, err = c.registry.GetRESTSubscription(restSubId, false)
+		if err != nil {
+			xapp.Logger.Error("%s", err.Error())
+			c.UpdateCounter(cRestSubFailToXapp)
+			return nil, err
+		}
 	}
 
+	subResp.SubscriptionID = &restSubId
 	subReqList := e2ap.SubscriptionRequestList{}
 	err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
 	if err != nil {
@@ -292,14 +304,12 @@
 		return
 	}
 
-	var requestorID int64
-	var instanceId int64
+	var xAppEventInstanceID int64
+	var e2EventInstanceID int64
 	for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
 		subReqMsg := subReqList.E2APSubscriptionRequests[index]
 
-		xid := *restSubId + "_" + strconv.FormatUint(uint64(subReqMsg.RequestId.InstanceId), 10)
-		trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), xid, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
-		//trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
+		trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
 		if trans == nil {
 			c.registry.DeleteRESTSubscription(restSubId)
 			xapp.Logger.Error("XAPP-SubReq transaction not created. RESTSubId=%s, EndPoint=%s, Meid=%s", *restSubId, xAppRmrEndpoint, *meid)
@@ -310,39 +320,47 @@
 		xapp.Logger.Info("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
 		subRespMsg, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, restSubId)
 		if err != nil {
-			// Send notification to xApp that prosessing of a Subscription Request has failed. Currently it is not possible
-			// to indicate error. Such possibility should be added. As a workaround requestorID and instanceId are set to zero value
-			requestorID = (int64)(0)
-			instanceId = (int64)(0)
+			// Send notification to xApp that prosessing of a Subscription Request has failed.
+			xAppEventInstanceID = (int64)(subReqMsg.RequestId.Id)
+			e2EventInstanceID = (int64)(0)
 			resp := &models.SubscriptionResponse{
 				SubscriptionID: restSubId,
 				SubscriptionInstances: []*models.SubscriptionInstance{
-					&models.SubscriptionInstance{RequestorID: &requestorID, InstanceID: &instanceId},
+					&models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
+						ErrorCause:          nil, //TODO: Suitable Error cause.
+						XappEventInstanceID: &xAppEventInstanceID},
 				},
 			}
 			// Mark REST subscription request processed.
 			restSubscription.SetProcessed()
-			xapp.Logger.Info("Sending unsuccessful REST notification to endpoint=%v:%v, InstanceId=%v, %s", clientEndpoint.Host, clientEndpoint.HTTPPort, instanceId, idstring(nil, trans))
+			xapp.Logger.Info("Sending unsuccessful REST notification to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
+				clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
 			xapp.Subscription.Notify(resp, *clientEndpoint)
 			c.UpdateCounter(cRestSubFailNotifToXapp)
 		} else {
-			xapp.Logger.Info("SubscriptionRequest index=%v processed successfully. endpoint=%v, InstanceId=%v, %s", index, *clientEndpoint, instanceId, idstring(nil, trans))
+			xAppEventInstanceID = (int64)(subRespMsg.RequestId.Id)
+			e2EventInstanceID = (int64)(subRespMsg.RequestId.InstanceId)
+
+			xapp.Logger.Info("SubscriptionRequest index=%v processed successfully. endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
+				index, clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
 
 			// Store successfully processed InstanceId for deletion
-			restSubscription.AddInstanceId(subRespMsg.RequestId.InstanceId)
+			restSubscription.AddE2InstanceId(subRespMsg.RequestId.InstanceId)
+			restSubscription.AddXappIdToE2Id(xAppEventInstanceID, e2EventInstanceID)
 
 			// Send notification to xApp that a Subscription Request has been processed.
-			requestorID = (int64)(subRespMsg.RequestId.Id)
-			instanceId = (int64)(subRespMsg.RequestId.InstanceId)
 			resp := &models.SubscriptionResponse{
 				SubscriptionID: restSubId,
 				SubscriptionInstances: []*models.SubscriptionInstance{
-					&models.SubscriptionInstance{RequestorID: &requestorID, InstanceID: &instanceId},
+					&models.SubscriptionInstance{E2EventInstanceID: &e2EventInstanceID,
+						ErrorCause:          nil,
+						XappEventInstanceID: &xAppEventInstanceID},
 				},
 			}
 			// Mark REST subscription request processesd.
 			restSubscription.SetProcessed()
-			xapp.Logger.Info("Sending successful REST notification to endpoint=%v, InstanceId=%v, %s", *clientEndpoint, instanceId, idstring(nil, trans))
+			xapp.Logger.Info("Sending successful REST notification to endpoint=%v:%v, XappEventInstanceID=%v, E2EventInstanceID=%v, %s",
+				clientEndpoint.Host, *clientEndpoint.HTTPPort, xAppEventInstanceID, e2EventInstanceID, idstring(nil, trans))
 			xapp.Subscription.Notify(resp, *clientEndpoint)
 			c.UpdateCounter(cRestSubNotifToXapp)
 
@@ -405,7 +423,7 @@
 
 	xapp.Logger.Info("SubscriptionDeleteRequest from XAPP")
 
-	restSubscription, err := c.registry.GetRESTSubscription(restSubId)
+	restSubscription, err := c.registry.GetRESTSubscription(restSubId, true)
 	if err != nil {
 		xapp.Logger.Error("%s", err.Error())
 		if restSubscription == nil {
@@ -426,13 +444,15 @@
 	xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
 	go func() {
 		for _, instanceId := range restSubscription.InstanceIds {
-			err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId)
+			xAppEventInstanceID, err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId)
+
 			if err != nil {
 				xapp.Logger.Error("%s", err.Error())
 				//return err
 			}
 			xapp.Logger.Info("Deleteting instanceId = %v", instanceId)
-			restSubscription.DeleteInstanceId(instanceId)
+			restSubscription.DeleteXappIdToE2Id(xAppEventInstanceID)
+			restSubscription.DeleteE2InstanceId(instanceId)
 		}
 		c.registry.DeleteRESTSubscription(&restSubId)
 	}()
@@ -445,29 +465,29 @@
 //-------------------------------------------------------------------
 //
 //-------------------------------------------------------------------
-func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32) error {
+func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32) (int64, error) {
 
-	xid := *restSubId + "_" + strconv.FormatUint(uint64(instanceId), 10)
-	trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), xid, e2ap.RequestId{0, 0}, &xapp.RMRMeid{RanName: *meid})
-	//trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{0, 0}, &xapp.RMRMeid{RanName: *meid})
+	var xAppEventInstanceID int64
+	subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
+	if err != nil {
+		xapp.Logger.Info("Subscription Delete Handler subscription for restSubId=%v, E2EventInstanceID=%v not found %s",
+			restSubId, instanceId, idstring(err, nil))
+		return xAppEventInstanceID, nil
+	}
+
+	xAppEventInstanceID = int64(subs.ReqId.Id)
+	trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{subs.ReqId.Id, 0}, &xapp.RMRMeid{RanName: *meid})
 	if trans == nil {
 		err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
 		xapp.Logger.Error("%s", err.Error())
 	}
 	defer trans.Release()
 
-	err := c.tracker.Track(trans)
+	err = c.tracker.Track(trans)
 	if err != nil {
 		err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
 		xapp.Logger.Error("%s", err.Error())
-		return &time.ParseError{}
-	}
-
-	subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
-	if err != nil {
-		err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
-		xapp.Logger.Error("%s", err.Error())
-		return err
+		return xAppEventInstanceID, &time.ParseError{}
 	}
 	//
 	// Wake subs delete
@@ -479,7 +499,7 @@
 
 	c.registry.RemoveFromSubscription(subs, trans, waitRouteCleanup_ms, c)
 
-	return nil
+	return xAppEventInstanceID, nil
 }
 
 //-------------------------------------------------------------------
@@ -1135,8 +1155,7 @@
 		} else {
 			fmt.Println("  RANFunctionID = nil")
 		}
-		fmt.Printf("  SubscriptionDetail.RequestorID = %v\n", *subscriptionDetail.RequestorID)
-		fmt.Printf("  SubscriptionDetail.InstanceID = %v\n", *subscriptionDetail.InstanceID)
+		fmt.Printf("  SubscriptionDetail.XappEventInstanceID = %v\n", *subscriptionDetail.XappEventInstanceID)
 		fmt.Printf("  SubscriptionDetail.EventTriggers.OctetString = %X\n", subscriptionDetail.EventTriggers.OctetString)
 
 		for _, actionToBeSetup := range subscriptionDetail.ActionToBeSetupList {
diff --git a/pkg/control/e2ap.go b/pkg/control/e2ap.go
index f17012b..54eba76 100644
--- a/pkg/control/e2ap.go
+++ b/pkg/control/e2ap.go
@@ -61,7 +61,8 @@
 		if p.RANFunctionID != nil {
 			subReqMsg.FunctionId = (e2ap.FunctionId)(*p.RANFunctionID)
 		}
-		subReqMsg.RequestId = e2ap.RequestId{uint32(*subscriptionDetail.RequestorID), uint32(*subscriptionDetail.InstanceID)}
+		e2EventInstanceID := restSubscription.GetE2IdFromXappIdToE2Id(*subscriptionDetail.XappEventInstanceID)
+		subReqMsg.RequestId = e2ap.RequestId{uint32(*subscriptionDetail.XappEventInstanceID), uint32(e2EventInstanceID)}
 
 		subReqMsg.EventTriggerDefinition.Data.Data = []byte(subscriptionDetail.EventTriggers.OctetString)
 		subReqMsg.EventTriggerDefinition.Data.Length = uint64(len(subscriptionDetail.EventTriggers.OctetString))
diff --git a/pkg/control/registry.go b/pkg/control/registry.go
index eae870d..416b415 100644
--- a/pkg/control/registry.go
+++ b/pkg/control/registry.go
@@ -37,22 +37,35 @@
 	xAppRmrEndPoint  string
 	Meid             string
 	InstanceIds      []uint32
+	xAppIdToE2Id     map[int64]int64
 	SubReqOngoing    bool
 	SubDelReqOngoing bool
 }
 
-func (r *RESTSubscription) AddInstanceId(instanceId uint32) {
+func (r *RESTSubscription) AddE2InstanceId(instanceId uint32) {
 	r.InstanceIds = append(r.InstanceIds, instanceId)
 }
 
+func (r *RESTSubscription) DeleteE2InstanceId(instanceId uint32) {
+	r.InstanceIds = r.InstanceIds[1:]
+}
+
+func (r *RESTSubscription) AddXappIdToE2Id(xAppEventInstanceID int64, e2EventInstanceID int64) {
+	r.xAppIdToE2Id[xAppEventInstanceID] = e2EventInstanceID
+}
+
+func (r *RESTSubscription) GetE2IdFromXappIdToE2Id(xAppEventInstanceID int64) int64 {
+	return r.xAppIdToE2Id[xAppEventInstanceID]
+}
+
+func (r *RESTSubscription) DeleteXappIdToE2Id(xAppEventInstanceID int64) {
+	delete(r.xAppIdToE2Id, xAppEventInstanceID)
+}
+
 func (r *RESTSubscription) SetProcessed() {
 	r.SubReqOngoing = false
 }
 
-func (r *RESTSubscription) DeleteInstanceId(instanceId uint32) {
-	r.InstanceIds = r.InstanceIds[1:]
-}
-
 type Registry struct {
 	mutex             sync.Mutex
 	register          map[uint32]*Subscription
@@ -80,6 +93,7 @@
 	newRestSubscription.SubReqOngoing = true
 	newRestSubscription.SubDelReqOngoing = false
 	r.restSubscriptions[*restSubId] = &newRestSubscription
+	newRestSubscription.xAppIdToE2Id = make(map[int64]int64)
 	xapp.Logger.Info("Registry: Created REST subscription successfully. restSubId=%v, subscriptionCount=%v, e2apSubscriptionCount=%v", *restSubId, len(r.restSubscriptions), len(r.register))
 	return &newRestSubscription, nil
 }
@@ -91,13 +105,15 @@
 	xapp.Logger.Info("Registry: Deleted REST subscription successfully. restSubId=%v, subscriptionCount=%v", *restSubId, len(r.restSubscriptions))
 }
 
-func (r *Registry) GetRESTSubscription(restSubId string) (*RESTSubscription, error) {
+func (r *Registry) GetRESTSubscription(restSubId string, IsDelReqOngoing bool) (*RESTSubscription, error) {
 	r.mutex.Lock()
 	defer r.mutex.Unlock()
 	if restSubscription, ok := r.restSubscriptions[restSubId]; ok {
 		// Subscription deletion is not allowed if prosessing subscription request in not ready
 		if restSubscription.SubDelReqOngoing == false && restSubscription.SubReqOngoing == false {
-			restSubscription.SubDelReqOngoing = true
+			if IsDelReqOngoing == true {
+				restSubscription.SubDelReqOngoing = true
+			}
 			r.restSubscriptions[restSubId] = restSubscription
 			return restSubscription, nil
 		} else {
@@ -140,7 +156,7 @@
 			NoRespToXapp:     false,
 			DoNotWaitSubResp: false,
 		}
-		subs.ReqId.Id = 123
+		subs.ReqId.Id = subReqMsg.RequestId.Id
 		subs.ReqId.InstanceId = subId
 		if resetTestFlag == true {
 			subs.DoNotWaitSubResp = true
diff --git a/pkg/control/tracker.go b/pkg/control/tracker.go
index d837ac8..df3d56e 100644
--- a/pkg/control/tracker.go
+++ b/pkg/control/tracker.go
@@ -63,7 +63,7 @@
 	meid *xapp.RMRMeid) *TransactionXapp {
 
 	trans := &TransactionXapp{}
-	trans.XappKey = &TransactionXappKey{*endpoint, xid}
+	trans.XappKey = &TransactionXappKey{requestId.Id, *endpoint, xid}
 	trans.Meid = meid
 	trans.RequestId = requestId
 	t.initTransaction(&trans.Transaction)
@@ -104,5 +104,5 @@
 		//xapp.Logger.Debug("Tracker: transtable=%v", t.transactionXappTable)
 		return trans, nil
 	}
-	return nil, fmt.Errorf("Tracker: No record %s", xappKey)
+	return nil, fmt.Errorf("Tracker: No record %v", xappKey)
 }
diff --git a/pkg/control/transaction.go b/pkg/control/transaction.go
index efb630c..f3d5c17 100644
--- a/pkg/control/transaction.go
+++ b/pkg/control/transaction.go
@@ -131,6 +131,7 @@
 //
 //-----------------------------------------------------------------------------
 type TransactionXappKey struct {
+	InstanceID uint32
 	xapp.RmrEndpoint
 	Xid string // xapp xid in req
 }
diff --git a/pkg/control/ut_messaging_test.go b/pkg/control/ut_messaging_test.go
index b3f105d..f95855e 100644
--- a/pkg/control/ut_messaging_test.go
+++ b/pkg/control/ut_messaging_test.go
@@ -20,6 +20,7 @@
 package control
 
 import (
+	"strings"
 	"testing"
 	"time"
 
@@ -3857,10 +3858,10 @@
 	restSubId, e2SubsId := createSubscription(t, xappConn1, e2termConn1, params)
 
 	// Policy change
-	instanceId := int64(e2SubsId)
 	// GetRESTSubsReqPolicyParams sets some coutners on tc side.
+
 	params = xappConn1.GetRESTSubsReqPolicyParams(subReqCount)
-	params.SubsReqParams.SubscriptionDetails[0].InstanceID = &instanceId
+	params.SetSubscriptionID(&restSubId)
 	params.SetTimeToWait("w200ms")
 	restSubId, e2SubsId = createSubscription(t, xappConn1, e2termConn1, params)
 
@@ -3876,6 +3877,90 @@
 }
 
 //-----------------------------------------------------------------------------
+// TestRESTSubReqPolicyChangeNOk
+//
+//   stub                             stub
+// +-------+        +---------+    +---------+
+// | xapp  |        | submgr  |    | e2term  |
+// +-------+        +---------+    +---------+
+//     |                 |              |
+//     | RESTSubReq      |              |
+//     |---------------->|              |
+//     |                 |              |
+//     |     RESTSubResp |              |
+//     |<----------------|              |
+//     |                 | SubReq       |
+//     |                 |------------->|
+//     |                 |              |
+//     |                 |      SubResp |
+//     |                 |<-------------|
+//     |                 |              |
+//     |       RESTNotif |              |
+//     |<----------------|              |
+//     |                 |              |
+//     | RESTSubReq      |              |
+//     |---------------->|              |
+//     |                 |              |
+//     |         RESTSubUpdateFail      |
+//     |                 |              |
+//     | RESTSubDelReq   |              |
+//     |---------------->|              |
+//     |                 |              |
+//     |                 | SubDelReq    |
+//     |                 |------------->|
+//     |                 |              |
+//     |                 |   SubDelResp |
+//     |                 |<-------------|
+//     |                 |              |
+//     |  RESTSubDelResp |              |
+//     |<----------------|              |
+//
+//-----------------------------------------------------------------------------
+func TestRESTSubReqPolicyChangeNOk(t *testing.T) {
+	CaseBegin("TestRESTSubReqPolicyChangeNOk")
+
+	mainCtrl.CounterValuesToBeVeriefied(t, CountersToBeAdded{
+		Counter{cRestSubReqFromXapp, 2},
+		Counter{cRestSubRespToXapp, 1},
+		Counter{cSubReqToE2, 1},
+		Counter{cSubRespFromE2, 1},
+		Counter{cRestSubNotifToXapp, 1},
+		Counter{cRestSubFailToXapp, 1},
+		Counter{cRestSubDelReqFromXapp, 1},
+		Counter{cSubDelReqToE2, 1},
+		Counter{cSubDelRespFromE2, 1},
+		Counter{cRestSubDelRespToXapp, 1},
+	})
+
+	const subReqCount int = 1
+
+	// Req
+	params := xappConn1.GetRESTSubsReqPolicyParams(subReqCount)
+	restSubId, e2SubsId := createSubscription(t, xappConn1, e2termConn1, params)
+
+	// Policy change
+
+	params = xappConn1.GetRESTSubsReqPolicyParams(subReqCount)
+
+	restSubIdUpd := strings.ToUpper(restSubId)
+	params.SetSubscriptionID(&restSubIdUpd)
+	params.SetTimeToWait("w200ms")
+
+	restSubId2 := xappConn1.SendRESTSubsReq(t, params)
+	assert.Equal(t, restSubId2, "")
+
+	// Del
+	xappConn1.SendRESTSubsDelReq(t, &restSubId)
+
+	delreq, delmsg := e2termConn1.RecvSubsDelReq(t)
+	e2termConn1.SendSubsDelResp(t, delreq, delmsg)
+
+	// Wait that subs is cleaned
+	waitSubsCleanup(t, e2SubsId, 10)
+	mainCtrl.VerifyCounterValues(t)
+}
+
+//-----------------------------------------------------------------------------
 // TestRESTSubReqAndSubDelOkTwoE2termParallel
 //
 //   stub                             stub           stub