RICPLT-2989 Submgr routing manager client code to support multiple endpoints

Change-Id: I0b4931090c06b4cf9a81b766a11162c8a6ebecf4
Signed-off-by: Juha Hyttinen <juha.hyttinen@nokia.com>
diff --git a/pkg/control/registry.go b/pkg/control/registry.go
index 5edc3eb..2750b78 100644
--- a/pkg/control/registry.go
+++ b/pkg/control/registry.go
@@ -24,6 +24,7 @@
 	"gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
 	"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
 	"sync"
+	"time"
 )
 
 //-----------------------------------------------------------------------------
@@ -36,7 +37,6 @@
 	rtmgrClient *RtmgrClient
 }
 
-// This method should run as a constructor
 func (r *Registry) Initialize() {
 	r.register = make(map[uint16]*Subscription)
 	var i uint16
@@ -45,32 +45,125 @@
 	}
 }
 
-// Reserves and returns the next free sequence number
 func (r *Registry) AssignToSubscription(trans *Transaction, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) {
 	r.mutex.Lock()
 	defer r.mutex.Unlock()
-	if len(r.subIds) > 0 {
-		sequenceNumber := r.subIds[0]
-		r.subIds = r.subIds[1:]
-		if _, ok := r.register[sequenceNumber]; ok == false {
-			subs := &Subscription{
-				registry: r,
-				Seq:      sequenceNumber,
-				Meid:     trans.Meid,
-			}
-			err := subs.AddEndpoint(trans.GetEndpoint())
-			if err != nil {
-				return nil, err
-			}
-			subs.SubReqMsg = subReqMsg
 
-			r.register[sequenceNumber] = subs
-			xapp.Logger.Debug("Registry: Create %s", subs.String())
+	var sequenceNumber uint16
+
+	//
+	// Allocate subscription
+	//
+	if len(r.subIds) > 0 {
+		sequenceNumber = r.subIds[0]
+		r.subIds = r.subIds[1:]
+		if _, ok := r.register[sequenceNumber]; ok == true {
+			r.subIds = append(r.subIds, sequenceNumber)
+			return nil, fmt.Errorf("Registry: Failed to reserves subscription")
+		}
+	} else {
+		return nil, fmt.Errorf("Registry: Failed to reserves subscription no free ids")
+	}
+	subs := &Subscription{
+		registry: r,
+		Seq:      sequenceNumber,
+		Meid:     trans.Meid,
+	}
+
+	//
+	// Add to subscription
+	//
+	subs.mutex.Lock()
+	defer subs.mutex.Unlock()
+
+	if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
+		r.subIds = append(r.subIds, sequenceNumber)
+		return nil, fmt.Errorf("Registry: Endpoint existing already in subscription")
+	}
+	epamount := subs.EpList.Size()
+
+	r.mutex.Unlock()
+	//
+	// Subscription route updates
+	//
+	var err error
+	if epamount == 1 {
+		subRouteAction := SubRouteInfo{CREATE, subs.EpList, subs.Seq}
+		err = r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
+	} else {
+		subRouteAction := SubRouteInfo{UPDATE, subs.EpList, subs.Seq}
+		err = r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
+	}
+	r.mutex.Lock()
+
+	if err != nil {
+		r.subIds = append(r.subIds, sequenceNumber)
+		return nil, err
+	}
+	subs.SubReqMsg = subReqMsg
+
+	r.register[sequenceNumber] = subs
+	xapp.Logger.Debug("Registry: Create %s", subs.String())
+	xapp.Logger.Debug("Registry: substable=%v", r.register)
+	return subs, nil
+}
+
+func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *Transaction, waitRouteClean time.Duration) error {
+	r.mutex.Lock()
+	defer r.mutex.Unlock()
+	subs.mutex.Lock()
+	defer subs.mutex.Unlock()
+
+	delStatus := subs.EpList.DelEndpoint(trans.GetEndpoint())
+	epamount := subs.EpList.Size()
+
+	//
+	// If last endpoint remove from register map
+	//
+	if epamount == 0 {
+		if _, ok := r.register[subs.Seq]; ok {
+			xapp.Logger.Debug("Registry: Delete %s", subs.String())
+			delete(r.register, subs.Seq)
 			xapp.Logger.Debug("Registry: substable=%v", r.register)
-			return subs, nil
 		}
 	}
-	return nil, fmt.Errorf("Registry: Failed to reserves subscription")
+	r.mutex.Unlock()
+
+	//
+	// Wait some time before really do route updates
+	//
+	if waitRouteClean > 0 {
+		subs.mutex.Unlock()
+		time.Sleep(waitRouteClean)
+		subs.mutex.Lock()
+	}
+
+	xapp.Logger.Info("Registry: Cleaning %s", subs.String())
+
+	//
+	// Subscription route updates
+	//
+	if delStatus {
+		if epamount == 0 {
+			tmpList := RmrEndpointList{}
+			tmpList.AddEndpoint(trans.GetEndpoint())
+			subRouteAction := SubRouteInfo{DELETE, tmpList, subs.Seq}
+			r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
+		} else {
+			subRouteAction := SubRouteInfo{UPDATE, subs.EpList, subs.Seq}
+			r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
+		}
+	}
+
+	r.mutex.Lock()
+	//
+	// If last endpoint free seq nro
+	//
+	if epamount == 0 {
+		r.subIds = append(r.subIds, subs.Seq)
+	}
+
+	return nil
 }
 
 func (r *Registry) GetSubscription(sn uint16) *Subscription {
@@ -92,17 +185,3 @@
 	}
 	return nil, fmt.Errorf("No valid subscription found with ids %v", ids)
 }
-
-func (r *Registry) DelSubscription(sn uint16) bool {
-	r.mutex.Lock()
-	defer r.mutex.Unlock()
-	if _, ok := r.register[sn]; ok {
-		subs := r.register[sn]
-		xapp.Logger.Debug("Registry: Delete %s", subs.String())
-		r.subIds = append(r.subIds, sn)
-		delete(r.register, sn)
-		xapp.Logger.Debug("Registry: substable=%v", r.register)
-		return true
-	}
-	return false
-}