Xapp-frame, v0.8.1 Rest Subscription Creation /Query /Deletion

Convert RMR unittest cases to REST API based
* Fixed REST endpoint construction functionality to include correct port numbers in endpoint string
* Added a REST version of a testcase TestSubReqAndRouteUpdateNok
* Refactored ut_stub REST endpoint setting API and restored original port numbers.

Xapp-frame, v0.8.1 integration:
  * Added REST API based unit test cases

Change-Id: I0989939f4210692493de549380faa1f111d71435
Signed-off-by: Konstantinos Archangelof <konstantinos.archangelof@nokia.com>
diff --git a/pkg/control/control.go b/pkg/control/control.go
index 02110b6..aee1158 100755
--- a/pkg/control/control.go
+++ b/pkg/control/control.go
@@ -34,6 +34,7 @@
 	httptransport "github.com/go-openapi/runtime/client"
 	"github.com/go-openapi/strfmt"
 	"github.com/gorilla/mux"
+	"github.com/segmentio/ksuid"
 	"github.com/spf13/viper"
 )
 
@@ -119,7 +120,7 @@
 	xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
 	xapp.Resource.InjectRoute("/ric/v1/symptomdata", c.SymptomDataHandler, "GET")
 
-	go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandler)
+	go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandlerCB)
 
 	if readSubsFromDb == "false" {
 		return c
@@ -213,29 +214,254 @@
 //
 //-------------------------------------------------------------------
 func (c *Control) SubscriptionHandler(params interface{}) (*models.SubscriptionResponse, error) {
-	/*
-	   switch p := params.(type) {
-	   case *models.ReportParams:
-	       trans := c.tracker.NewXappTransaction(NewRmrEndpoint(p.ClientEndpoint),"" , 0, &xapp.RMRMeid{RanName: p.Meid})
-	       if trans == nil {
-	             xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
-	             return
-	       }
-	       defer trans.Release()
-	   case *models.ControlParams:
-	   case *models.PolicyParams:
-	   }
-	*/
-	return &models.SubscriptionResponse{}, fmt.Errorf("Subscription rest interface not implemented")
+
+	restSubId := ksuid.New().String()
+	subResp := models.SubscriptionResponse{}
+	subResp.SubscriptionID = &restSubId
+	p := params.(*models.SubscriptionParams)
+
+	c.CntRecvMsg++
+
+	c.UpdateCounter(cSubReqFromXapp)
+
+	if p.ClientEndpoint == nil {
+		xapp.Logger.Error("ClientEndpoint == nil")
+		return nil, fmt.Errorf("")
+	}
+
+	_, xAppRmrEndpoint, err := ConstructEndpointAddresses(*p.ClientEndpoint)
+	if err != nil {
+		xapp.Logger.Error("%s", err.Error())
+		return nil, err
+	}
+
+	restSubscription, err := c.registry.CreateRESTSubscription(&restSubId, &xAppRmrEndpoint, p.Meid)
+	if err != nil {
+		xapp.Logger.Error("%s", err.Error())
+		return nil, err
+	}
+
+	subReqList := e2ap.SubscriptionRequestList{}
+	err = c.e2ap.FillSubscriptionReqMsgs(params, &subReqList, restSubscription)
+	if err != nil {
+		xapp.Logger.Error("%s", err.Error())
+		c.registry.DeleteRESTSubscription(&restSubId)
+		return nil, err
+	}
+
+	go c.processSubscriptionRequests(restSubscription, &subReqList, p.ClientEndpoint, p.Meid, &restSubId)
+
+	return &subResp, nil
+
 }
 
-func (c *Control) SubscriptionDeleteHandler(s string) error {
+//-------------------------------------------------------------------
+//
+//-------------------------------------------------------------------
+
+func (c *Control) processSubscriptionRequests(restSubscription *RESTSubscription, subReqList *e2ap.SubscriptionRequestList,
+	clientEndpoint *models.SubscriptionParamsClientEndpoint, meid *string, restSubId *string) {
+
+	xapp.Logger.Info("Subscription Request count=%v ", len(subReqList.E2APSubscriptionRequests))
+
+	_, xAppRmrEndpoint, err := ConstructEndpointAddresses(*clientEndpoint)
+	if err != nil {
+		xapp.Logger.Error("%s", err.Error())
+		return
+	}
+
+	var requestorID int64
+	var instanceId int64
+	for index := 0; index < len(subReqList.E2APSubscriptionRequests); index++ {
+		subReqMsg := subReqList.E2APSubscriptionRequests[index]
+
+		trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(xAppRmrEndpoint), *restSubId, subReqMsg.RequestId, &xapp.RMRMeid{RanName: *meid})
+		if trans == nil {
+			c.registry.DeleteRESTSubscription(restSubId)
+			xapp.Logger.Error("XAPP-SubReq transaction not created. RESTSubId=%s, EndPoint=%s, Meid=%s", *restSubId, xAppRmrEndpoint, *meid)
+			return
+		}
+
+		defer trans.Release()
+		xapp.Logger.Info("Handle SubscriptionRequest index=%v, %s", index, idstring(nil, trans))
+		subRespMsg, err := c.handleSubscriptionRequest(trans, &subReqMsg, meid, restSubId)
+		if err != nil {
+			// Send notification to xApp that prosessing of a Subscription Request has failed. Currently it is not possible
+			// to indicate error. Such possibility should be added. As a workaround requestorID and instanceId are set to zero value
+			requestorID = (int64)(0)
+			instanceId = (int64)(0)
+			resp := &models.SubscriptionResponse{
+				SubscriptionID: restSubId,
+				SubscriptionInstances: []*models.SubscriptionInstance{
+					&models.SubscriptionInstance{RequestorID: &requestorID, InstanceID: &instanceId},
+				},
+			}
+			// Mark REST subscription request processed.
+			restSubscription.SetProcessed()
+			xapp.Logger.Info("Sending unsuccessful REST notification to endpoint=%v:%v, InstanceId=%v, %s", clientEndpoint.Host, clientEndpoint.HTTPPort, instanceId, idstring(nil, trans))
+			xapp.Subscription.Notify(resp, *clientEndpoint)
+		} else {
+			xapp.Logger.Info("SubscriptionRequest index=%v processed successfully. endpoint=%v, InstanceId=%v, %s", index, *clientEndpoint, instanceId, idstring(nil, trans))
+
+			// Store successfully processed InstanceId for deletion
+			restSubscription.AddInstanceId(subRespMsg.RequestId.InstanceId)
+
+			// Send notification to xApp that a Subscription Request has been processed.
+			requestorID = (int64)(subRespMsg.RequestId.Id)
+			instanceId = (int64)(subRespMsg.RequestId.InstanceId)
+			resp := &models.SubscriptionResponse{
+				SubscriptionID: restSubId,
+				SubscriptionInstances: []*models.SubscriptionInstance{
+					&models.SubscriptionInstance{RequestorID: &requestorID, InstanceID: &instanceId},
+				},
+			}
+			// Mark REST subscription request processesd.
+			restSubscription.SetProcessed()
+			xapp.Logger.Info("Sending successful REST notification to endpoint=%v, InstanceId=%v, %s", *clientEndpoint, instanceId, idstring(nil, trans))
+			xapp.Subscription.Notify(resp, *clientEndpoint)
+		}
+		c.UpdateCounter(cSubRespToXapp)
+	}
+}
+
+//-------------------------------------------------------------------
+//
+//------------------------------------------------------------------
+func (c *Control) handleSubscriptionRequest(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, meid *string,
+	restSubId *string) (*e2ap.E2APSubscriptionResponse, error) {
+
+	err := c.tracker.Track(trans)
+	if err != nil {
+		err = fmt.Errorf("XAPP-SubReq: %s", idstring(err, trans))
+		xapp.Logger.Error("%s", err.Error())
+		return nil, err
+	}
+
+	subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag, c)
+	if err != nil {
+		err = fmt.Errorf("XAPP-SubReq: %s", idstring(err, trans))
+		xapp.Logger.Error("%s", err.Error())
+		return nil, err
+	}
+
+	//
+	// Wake subs request
+	//
+	go c.handleSubscriptionCreate(subs, trans)
+	event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
+
+	err = nil
+	if event != nil {
+		switch themsg := event.(type) {
+		case *e2ap.E2APSubscriptionResponse:
+			trans.Release()
+			return themsg, nil
+		case *e2ap.E2APSubscriptionFailure:
+			err = fmt.Errorf("SubscriptionFailure received")
+			return nil, err
+		default:
+			break
+		}
+	}
+	err = fmt.Errorf("XAPP-SubReq: failed %s", idstring(err, trans, subs))
+	xapp.Logger.Error("%s", err.Error())
+	c.registry.RemoveFromSubscription(subs, trans, 5*time.Second, c)
+	return nil, err
+}
+
+//-------------------------------------------------------------------
+//
+//-------------------------------------------------------------------
+func (c *Control) SubscriptionDeleteHandlerCB(restSubId string) error {
+
+	c.CntRecvMsg++
+	c.UpdateCounter(cSubDelReqFromXapp)
+
+	xapp.Logger.Info("SubscriptionDeleteRequest from XAPP")
+
+	restSubscription, err := c.registry.GetRESTSubscription(restSubId)
+	if err != nil {
+		xapp.Logger.Error("%s", err.Error())
+		if restSubscription == nil {
+			// Subscription was not found
+			return nil
+		} else {
+			if restSubscription.SubReqOngoing == true {
+				err := fmt.Errorf("Handling of the REST Subscription Request still ongoing %s", restSubId)
+				xapp.Logger.Error("%s", err.Error())
+				return err
+			} else if restSubscription.SubDelReqOngoing == true {
+				// Previous request for same restSubId still ongoing
+				return nil
+			}
+		}
+	}
+
+	xAppRmrEndPoint := restSubscription.xAppRmrEndPoint
+	go func() {
+		for _, instanceId := range restSubscription.InstanceIds {
+			err := c.SubscriptionDeleteHandler(&restSubId, &xAppRmrEndPoint, &restSubscription.Meid, instanceId)
+			if err != nil {
+				xapp.Logger.Error("%s", err.Error())
+				//return err
+			}
+			xapp.Logger.Info("Deleteting instanceId = %v", instanceId)
+			restSubscription.DeleteInstanceId(instanceId)
+		}
+		c.registry.DeleteRESTSubscription(&restSubId)
+	}()
+
+	c.UpdateCounter(cSubDelRespToXapp)
+
 	return nil
 }
 
+//-------------------------------------------------------------------
+//
+//-------------------------------------------------------------------
+func (c *Control) SubscriptionDeleteHandler(restSubId *string, endPoint *string, meid *string, instanceId uint32) error {
+
+	trans := c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(*endPoint), *restSubId, e2ap.RequestId{0, 0}, &xapp.RMRMeid{RanName: *meid})
+	if trans == nil {
+		err := fmt.Errorf("XAPP-SubDelReq transaction not created. restSubId %s, endPoint %s, meid %s, instanceId %v", *restSubId, *endPoint, *meid, instanceId)
+		xapp.Logger.Error("%s", err.Error())
+	}
+	defer trans.Release()
+
+	err := c.tracker.Track(trans)
+	if err != nil {
+		err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
+		xapp.Logger.Error("%s", err.Error())
+		return &time.ParseError{}
+	}
+
+	subs, err := c.registry.GetSubscriptionFirstMatch([]uint32{instanceId})
+	if err != nil {
+		err := fmt.Errorf("XAPP-SubDelReq %s:", idstring(err, trans))
+		xapp.Logger.Error("%s", err.Error())
+		return err
+	}
+	//
+	// Wake subs delete
+	//
+	go c.handleSubscriptionDelete(subs, trans)
+	trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
+
+	xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
+
+	c.registry.RemoveFromSubscription(subs, trans, 5*time.Second, c)
+
+	return nil
+}
+
+//-------------------------------------------------------------------
+//
+//-------------------------------------------------------------------
 func (c *Control) QueryHandler() (models.SubscriptionList, error) {
 	xapp.Logger.Info("QueryHandler() called")
 
+	c.CntRecvMsg++
+
 	return c.registry.QueryHandler()
 }
 
@@ -504,6 +730,7 @@
 			removeSubscriptionFromDb = true
 			subRfMsg, valid = subs.SetCachedResponse(event, false)
 			xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
+			c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
 		case *SubmgrRestartTestEvent:
 			// This simulates that no response has been received and after restart subscriptions are restored from db
 			xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")