RICPLT-2985 Route update via registry/subscription entry

Change-Id: I5aa4b71290e5405b9bf995b8528e49e900375cbc
Signed-off-by: Juha Hyttinen <juha.hyttinen@nokia.com>
diff --git a/pkg/control/control.go b/pkg/control/control.go
index 5a38656..5dd8f05 100755
--- a/pkg/control/control.go
+++ b/pkg/control/control.go
@@ -79,6 +79,7 @@
 }
 
 func NewControl() *Control {
+
 	registry := new(Registry)
 	registry.Initialize(seedSN)
 
@@ -94,9 +95,14 @@
 	deleteHandle := rtmgrhandle.NewDeleteXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
 	rtmgrClient := RtmgrClient{client, handle, deleteHandle}
 
+	rtmgrClientPtr := &rtmgrClient
+
+	//TODO: to make this better. Now it is just a hack.
+	registry.rtmgrClient = rtmgrClientPtr
+
 	return &Control{e2ap: new(E2ap),
 		registry:    registry,
-		rtmgrClient: &rtmgrClient,
+		rtmgrClient: rtmgrClientPtr,
 		tracker:     tracker,
 		timerMap:    timerMap,
 		msgCounter:  0,
@@ -163,10 +169,9 @@
 		return
 	}
 
-	/* Reserve a sequence number and set it in the payload */
-	subs := c.registry.ReserveSubscription(RmrEndpoint{*srcAddr, *srcPort}, params.Meid)
-	if subs == nil {
-		xapp.Logger.Error("SubReq: Failed to reserve sequence number. Dropping this msg. SubId: %v, Xid: %s", params.SubId, params.Xid)
+	subs, err := c.registry.ReserveSubscription(RmrEndpoint{*srcAddr, *srcPort}, params.Meid)
+	if err != nil {
+		xapp.Logger.Error("SubReq: %s, Dropping this msg.", err.Error())
 		return
 	}
 
@@ -174,7 +179,7 @@
 	err = c.e2ap.SetSubscriptionRequestSequenceNumber(params.Payload, subs.Seq)
 	if err != nil {
 		xapp.Logger.Error("SubReq: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, SubId: %v, Xid: %s, Payload %X", err, params.SubId, params.Xid, params.Payload)
-		c.registry.releaseSequenceNumber(subs.Seq)
+		c.registry.DelSubscription(subs.Seq)
 		return
 	}
 
@@ -184,18 +189,7 @@
 	_, err = c.tracker.TrackTransaction(subs, RmrEndpoint{*srcAddr, *srcPort}, params, responseReceived, forwardRespToXapp)
 	if err != nil {
 		xapp.Logger.Error("SubReq: %s, Dropping this msg.", err.Error())
-		c.registry.releaseSequenceNumber(subs.Seq)
-		return
-	}
-
-	// Update routing manager about the new subscription
-	subRouteAction := subs.SubRouteInfo(CREATE)
-	xapp.Logger.Info("SubReq: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid)
-
-	err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
-	if err != nil {
-		xapp.Logger.Error("SubReq: Failed to update routing manager. Dropping this msg. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
-		c.registry.releaseSequenceNumber(subs.Seq)
+		c.registry.DelSubscription(subs.Seq)
 		return
 	}
 
@@ -295,16 +289,9 @@
 
 	time.Sleep(3 * time.Second)
 
-	xapp.Logger.Info("SubFail: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid)
-	subRouteAction := subs.SubRouteInfo(DELETE)
-	err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
-	if err != nil {
-		xapp.Logger.Error("SubFail: Failed to update routing manager. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
-	}
-
 	xapp.Logger.Info("SubFail: Deleting transaction record. SubId: %v, Xid: %s", params.SubId, params.Xid)
 	transaction.Release()
-	if !c.registry.releaseSequenceNumber(payloadSeqNum) {
+	if !c.registry.DelSubscription(payloadSeqNum) {
 		xapp.Logger.Error("SubFail: Failed to release sequency number. SubId: %v, Xid: %s", params.SubId, params.Xid)
 	}
 	return
@@ -500,16 +487,8 @@
 		time.Sleep(3 * time.Second)
 	}
 
-	xapp.Logger.Info("SubDelResp: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid)
-	subRouteAction := subs.SubRouteInfo(DELETE)
-	err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
-	if err != nil {
-		xapp.Logger.Error("SubDelResp: Failed to update routing manager. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
-		return
-	}
-
 	xapp.Logger.Info("SubDelResp: Deleting transaction record. SubId: %v, Xid: %s", params.SubId, params.Xid)
-	if !c.registry.releaseSequenceNumber(payloadSeqNum) {
+	if !c.registry.DelSubscription(payloadSeqNum) {
 		xapp.Logger.Error("SubDelResp: Failed to release sequency number. SubId: %v, Xid: %s", params.SubId, params.Xid)
 		return
 	}
@@ -574,17 +553,9 @@
 		time.Sleep(3 * time.Second)
 	}
 
-	xapp.Logger.Info("SubDelFail: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid)
-	subRouteAction := subs.SubRouteInfo(DELETE)
-	err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
-	if err != nil {
-		xapp.Logger.Error("SubDelFail: Failed to update routing manager. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
-		return
-	}
-
 	xapp.Logger.Info("SubDelFail: Deleting transaction record. SubId: %v, Xid: %s", params.SubId, params.Xid)
 	transaction.Release()
-	if !c.registry.releaseSequenceNumber(payloadSeqNum) {
+	if !c.registry.DelSubscription(payloadSeqNum) {
 		xapp.Logger.Error("SubDelFail: Failed to release sequency number. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
 		return
 	}
@@ -656,17 +627,9 @@
 		time.Sleep(3 * time.Second)
 	}
 
-	xapp.Logger.Info("handleSubDelTimer: Starting routing manager update. SubId: %v, Xid: %s", subId, params.Xid)
-	subRouteAction := subs.SubRouteInfo(DELETE)
-	err := c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
-	if err != nil {
-		xapp.Logger.Error("handleSubDelTimer: Failed to update routing manager. Err: %v, SubId: %v, Xid: %s", err, subId, params.Xid)
-		return
-	}
-
 	xapp.Logger.Info("handleSubDelTimer: Deleting transaction record. SubId: %v, Xid: %s", subId, params.Xid)
 	transaction.Release()
-	if !c.registry.releaseSequenceNumber(subId) {
+	if !c.registry.DelSubscription(subId) {
 		xapp.Logger.Error("handleSubDelTimer: Failed to release sequency number. SubId: %v, Xid: %s", subId, params.Xid)
 	}
 	return
diff --git a/pkg/control/main_test.go b/pkg/control/main_test.go
index 69dadff..155f92c 100644
--- a/pkg/control/main_test.go
+++ b/pkg/control/main_test.go
@@ -117,9 +117,6 @@
 //-----------------------------------------------------------------------------
 //
 //-----------------------------------------------------------------------------
-//-----------------------------------------------------------------------------
-//
-//-----------------------------------------------------------------------------
 type xappTransaction struct {
 	tc   *testingXappControl
 	xid  string
diff --git a/pkg/control/registry.go b/pkg/control/registry.go
index 1adb4f7..2c5bd8c 100644
--- a/pkg/control/registry.go
+++ b/pkg/control/registry.go
@@ -20,87 +20,19 @@
 package control
 
 import (
+	"fmt"
 	"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
-	"strconv"
 	"sync"
 )
 
 //-----------------------------------------------------------------------------
 //
 //-----------------------------------------------------------------------------
-type Subscription struct {
-	mutex  sync.Mutex
-	Seq    uint16
-	Active bool
-	//
-	Meid        *xapp.RMRMeid
-	RmrEndpoint // xapp endpoint
-	Trans       *Transaction
-}
-
-func (s *Subscription) String() string {
-	s.mutex.Lock()
-	defer s.mutex.Unlock()
-	return strconv.FormatUint(uint64(s.Seq), 10) + "/" + s.RmrEndpoint.String() + "/" + s.Meid.RanName
-}
-
-func (s *Subscription) Confirmed() {
-	s.mutex.Lock()
-	defer s.mutex.Unlock()
-	s.Active = true
-}
-
-func (s *Subscription) UnConfirmed() {
-	s.mutex.Lock()
-	defer s.mutex.Unlock()
-	s.Active = false
-}
-
-func (s *Subscription) IsConfirmed() bool {
-	s.mutex.Lock()
-	defer s.mutex.Unlock()
-	return s.Active
-}
-
-func (s *Subscription) SetTransaction(trans *Transaction) bool {
-	s.mutex.Lock()
-	defer s.mutex.Unlock()
-	if s.Trans == nil {
-		s.Trans = trans
-		return true
-	}
-	return false
-}
-
-func (s *Subscription) UnSetTransaction(trans *Transaction) bool {
-	s.mutex.Lock()
-	defer s.mutex.Unlock()
-	if trans == nil || trans == s.Trans {
-		s.Trans = nil
-		return true
-	}
-	return false
-}
-
-func (s *Subscription) GetTransaction() *Transaction {
-	s.mutex.Lock()
-	defer s.mutex.Unlock()
-	return s.Trans
-}
-
-func (s *Subscription) SubRouteInfo(act Action) SubRouteInfo {
-	s.mutex.Lock()
-	defer s.mutex.Unlock()
-	return SubRouteInfo{act, s.RmrEndpoint.Addr, s.RmrEndpoint.Port, s.Seq}
-}
-
-//-----------------------------------------------------------------------------
-//
-//-----------------------------------------------------------------------------
 type Registry struct {
-	register map[uint16]*Subscription
-	counter  uint16
-	mutex    sync.Mutex
+	mutex       sync.Mutex
+	register    map[uint16]*Subscription
+	counter     uint16
+	rtmgrClient *RtmgrClient
 }
 
 // This method should run as a constructor
@@ -110,7 +42,7 @@
 }
 
 // Reserves and returns the next free sequence number
-func (r *Registry) ReserveSubscription(endPoint RmrEndpoint, meid *xapp.RMRMeid) *Subscription {
+func (r *Registry) ReserveSubscription(endPoint RmrEndpoint, meid *xapp.RMRMeid) (*Subscription, error) {
 	// Check is current SequenceNumber valid
 	// Allocate next SequenceNumber value and retry N times
 	r.mutex.Lock()
@@ -125,17 +57,29 @@
 			r.counter++
 		}
 		if _, ok := r.register[sequenceNumber]; ok == false {
-			r.register[sequenceNumber] = &Subscription{
+			subs := &Subscription{
 				Seq:         sequenceNumber,
 				Active:      false,
 				RmrEndpoint: endPoint,
 				Meid:        meid,
 				Trans:       nil,
 			}
-			return r.register[sequenceNumber]
+			r.register[sequenceNumber] = subs
+
+			// Update routing
+			r.mutex.Unlock()
+			err := subs.UpdateRoute(CREATE, r.rtmgrClient)
+			r.mutex.Lock()
+			if err != nil {
+				if _, ok := r.register[sequenceNumber]; ok {
+					delete(r.register, sequenceNumber)
+				}
+				return nil, err
+			}
+			return subs, nil
 		}
 	}
-	return nil
+	return nil, fmt.Errorf("Registry: Failed to reserves subcription. RmrEndpoint: %s, Meid: %s", endPoint, meid.RanName)
 }
 
 func (r *Registry) GetSubscription(sn uint16) *Subscription {
@@ -148,14 +92,21 @@
 	return nil
 }
 
-//This function releases the given id as unused in the register
-func (r *Registry) releaseSequenceNumber(sn uint16) bool {
+func (r *Registry) DelSubscription(sn uint16) bool {
 	r.mutex.Lock()
 	defer r.mutex.Unlock()
 	if _, ok := r.register[sn]; ok {
+		subs := r.register[sn]
 		delete(r.register, sn)
+
+		// Update routing
+		r.mutex.Unlock()
+		err := subs.UpdateRoute(DELETE, r.rtmgrClient)
+		r.mutex.Lock()
+		if err != nil {
+			xapp.Logger.Error("Registry: Failed to del route. SubId: %d, RmrEndpoint: %s", subs.Seq, subs.RmrEndpoint)
+		}
 		return true
-	} else {
-		return false
 	}
+	return false
 }
diff --git a/pkg/control/subscription.go b/pkg/control/subscription.go
new file mode 100644
index 0000000..9bbe3d4
--- /dev/null
+++ b/pkg/control/subscription.go
@@ -0,0 +1,109 @@
+/*
+==================================================================================
+  Copyright (c) 2019 AT&T Intellectual Property.
+  Copyright (c) 2019 Nokia
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+package control
+
+import (
+	"fmt"
+	"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
+	"strconv"
+	"sync"
+)
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+type Subscription struct {
+	mutex  sync.Mutex
+	Seq    uint16
+	Active bool
+	//
+	Meid        *xapp.RMRMeid
+	RmrEndpoint // xapp endpoint. Now only one xapp can have relation to single subscription. To be changed in merge
+	Trans       *Transaction
+}
+
+func (s *Subscription) String() string {
+	s.mutex.Lock()
+	defer s.mutex.Unlock()
+	return strconv.FormatUint(uint64(s.Seq), 10) + "/" + s.RmrEndpoint.String() + "/" + s.Meid.RanName
+}
+
+func (s *Subscription) Confirmed() {
+	s.mutex.Lock()
+	defer s.mutex.Unlock()
+	s.Active = true
+}
+
+func (s *Subscription) UnConfirmed() {
+	s.mutex.Lock()
+	defer s.mutex.Unlock()
+	s.Active = false
+}
+
+func (s *Subscription) IsConfirmed() bool {
+	s.mutex.Lock()
+	defer s.mutex.Unlock()
+	return s.Active
+}
+
+func (s *Subscription) SetTransaction(trans *Transaction) error {
+	s.mutex.Lock()
+	defer s.mutex.Unlock()
+
+	subString := strconv.FormatUint(uint64(s.Seq), 10) + "/" + s.RmrEndpoint.String() + "/" + s.Meid.RanName
+
+	if (s.RmrEndpoint.Addr != trans.RmrEndpoint.Addr) || (s.RmrEndpoint.Port != trans.RmrEndpoint.Port) {
+		return fmt.Errorf("Subscription: %s endpoint mismatch with trans: %s", subString, trans)
+	}
+	if s.Trans != nil {
+		return fmt.Errorf("Subscription: %s trans %s exist, can not register %s", subString, s.Trans, trans)
+	}
+	trans.Subs = s
+	s.Trans = trans
+	return nil
+}
+
+func (s *Subscription) UnSetTransaction(trans *Transaction) bool {
+	s.mutex.Lock()
+	defer s.mutex.Unlock()
+	if trans == nil || trans == s.Trans {
+		s.Trans = nil
+		return true
+	}
+	return false
+}
+
+func (s *Subscription) GetTransaction() *Transaction {
+	s.mutex.Lock()
+	defer s.mutex.Unlock()
+	return s.Trans
+}
+
+func (s *Subscription) UpdateRoute(act Action, rtmgrClient *RtmgrClient) error {
+	s.mutex.Lock()
+	defer s.mutex.Unlock()
+	xapp.Logger.Info("Subscription: Starting routing manager route add. SubId: %d, RmrEndpoint: %s", s.Seq, s.RmrEndpoint)
+	subRouteAction := SubRouteInfo{act, s.RmrEndpoint.Addr, s.RmrEndpoint.Port, s.Seq}
+	err := rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
+	if err != nil {
+		return fmt.Errorf("Subscription: Failed to add route. SubId: %d, RmrEndpoint: %s", s.Seq, s.RmrEndpoint)
+	}
+	return nil
+}
diff --git a/pkg/control/tracker.go b/pkg/control/tracker.go
index 0e6941d..087b781 100644
--- a/pkg/control/tracker.go
+++ b/pkg/control/tracker.go
@@ -22,82 +22,15 @@
 import (
 	"fmt"
 	"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
-	"strconv"
 	"sync"
 )
 
 //-----------------------------------------------------------------------------
 //
 //-----------------------------------------------------------------------------
-type TransactionXappKey struct {
-	RmrEndpoint
-	Xid string // xapp xid in req
-}
-
-func (key *TransactionXappKey) String() string {
-	return key.RmrEndpoint.String() + "/" + key.Xid
-}
-
-//-----------------------------------------------------------------------------
-//
-//-----------------------------------------------------------------------------
-type Transaction struct {
-	tracker           *Tracker // tracker instance
-	Subs              *Subscription
-	RmrEndpoint       RmrEndpoint
-	Xid               string          // xapp xid in req
-	OrigParams        *xapp.RMRParams // request orginal params
-	RespReceived      bool
-	ForwardRespToXapp bool
-	mutex             sync.Mutex
-}
-
-func (t *Transaction) String() string {
-	t.mutex.Lock()
-	defer t.mutex.Unlock()
-	var subId string = "?"
-	if t.Subs != nil {
-		subId = strconv.FormatUint(uint64(t.Subs.Seq), 10)
-	}
-	return subId + "/" + t.RmrEndpoint.String() + "/" + t.Xid
-}
-
-func (t *Transaction) CheckResponseReceived() bool {
-	t.mutex.Lock()
-	defer t.mutex.Unlock()
-	if t.RespReceived == false {
-		t.RespReceived = true
-		return false
-	}
-	return true
-}
-
-func (t *Transaction) RetryTransaction() {
-	t.mutex.Lock()
-	defer t.mutex.Unlock()
-	t.RespReceived = false
-}
-
-func (t *Transaction) Release() {
-	t.mutex.Lock()
-	defer t.mutex.Unlock()
-	if t.Subs != nil {
-		t.Subs.UnSetTransaction(t)
-	}
-	if t.tracker != nil {
-		xappkey := TransactionXappKey{t.RmrEndpoint, t.Xid}
-		t.tracker.UnTrackTransaction(xappkey)
-	}
-	t.Subs = nil
-	t.tracker = nil
-}
-
-//-----------------------------------------------------------------------------
-//
-//-----------------------------------------------------------------------------
 type Tracker struct {
-	transactionXappTable map[TransactionXappKey]*Transaction
 	mutex                sync.Mutex
+	transactionXappTable map[TransactionXappKey]*Transaction
 }
 
 func (t *Tracker) Init() {
@@ -125,18 +58,10 @@
 		return nil, err
 	}
 
-	if subs.SetTransaction(trans) == false {
-		othTrans := subs.GetTransaction()
-		err := fmt.Errorf("Tracker: Subscription %s got already transaction ongoing: %s, transaction %s not created", subs, othTrans, trans)
+	err := subs.SetTransaction(trans)
+	if err != nil {
 		return nil, err
 	}
-	trans.Subs = subs
-	if (trans.Subs.RmrEndpoint.Addr != trans.RmrEndpoint.Addr) || (trans.Subs.RmrEndpoint.Port != trans.RmrEndpoint.Port) {
-		err := fmt.Errorf("Tracker: Subscription endpoint %s mismatch with trans: %s", subs, trans)
-		trans.Subs.UnSetTransaction(nil)
-		return nil, err
-	}
-
 	trans.tracker = t
 	t.transactionXappTable[xappkey] = trans
 	return trans, nil
diff --git a/pkg/control/transaction.go b/pkg/control/transaction.go
new file mode 100644
index 0000000..f686b44
--- /dev/null
+++ b/pkg/control/transaction.go
@@ -0,0 +1,92 @@
+/*
+==================================================================================
+  Copyright (c) 2019 AT&T Intellectual Property.
+  Copyright (c) 2019 Nokia
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+==================================================================================
+*/
+
+package control
+
+import (
+	"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
+	"strconv"
+	"sync"
+)
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+type TransactionXappKey struct {
+	RmrEndpoint
+	Xid string // xapp xid in req
+}
+
+func (key *TransactionXappKey) String() string {
+	return key.RmrEndpoint.String() + "/" + key.Xid
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+type Transaction struct {
+	mutex             sync.Mutex
+	tracker           *Tracker // tracker instance
+	Subs              *Subscription
+	RmrEndpoint       RmrEndpoint
+	Xid               string          // xapp xid in req
+	OrigParams        *xapp.RMRParams // request orginal params
+	RespReceived      bool
+	ForwardRespToXapp bool
+}
+
+func (t *Transaction) String() string {
+	t.mutex.Lock()
+	defer t.mutex.Unlock()
+	var subId string = "?"
+	if t.Subs != nil {
+		subId = strconv.FormatUint(uint64(t.Subs.Seq), 10)
+	}
+	return subId + "/" + t.RmrEndpoint.String() + "/" + t.Xid
+}
+
+func (t *Transaction) CheckResponseReceived() bool {
+	t.mutex.Lock()
+	defer t.mutex.Unlock()
+	if t.RespReceived == false {
+		t.RespReceived = true
+		return false
+	}
+	return true
+}
+
+func (t *Transaction) RetryTransaction() {
+	t.mutex.Lock()
+	defer t.mutex.Unlock()
+	t.RespReceived = false
+}
+
+func (t *Transaction) Release() {
+	t.mutex.Lock()
+	defer t.mutex.Unlock()
+	if t.Subs != nil {
+		t.Subs.UnSetTransaction(t)
+	}
+	if t.tracker != nil {
+		xappkey := TransactionXappKey{t.RmrEndpoint, t.Xid}
+		t.tracker.UnTrackTransaction(xappkey)
+	}
+	t.Subs = nil
+	t.tracker = nil
+}