RICPLT-2985 Route update via registry/subscription entry
Change-Id: I5aa4b71290e5405b9bf995b8528e49e900375cbc
Signed-off-by: Juha Hyttinen <juha.hyttinen@nokia.com>
diff --git a/pkg/control/control.go b/pkg/control/control.go
index 5a38656..5dd8f05 100755
--- a/pkg/control/control.go
+++ b/pkg/control/control.go
@@ -79,6 +79,7 @@
}
func NewControl() *Control {
+
registry := new(Registry)
registry.Initialize(seedSN)
@@ -94,9 +95,14 @@
deleteHandle := rtmgrhandle.NewDeleteXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
rtmgrClient := RtmgrClient{client, handle, deleteHandle}
+ rtmgrClientPtr := &rtmgrClient
+
+ //TODO: to make this better. Now it is just a hack.
+ registry.rtmgrClient = rtmgrClientPtr
+
return &Control{e2ap: new(E2ap),
registry: registry,
- rtmgrClient: &rtmgrClient,
+ rtmgrClient: rtmgrClientPtr,
tracker: tracker,
timerMap: timerMap,
msgCounter: 0,
@@ -163,10 +169,9 @@
return
}
- /* Reserve a sequence number and set it in the payload */
- subs := c.registry.ReserveSubscription(RmrEndpoint{*srcAddr, *srcPort}, params.Meid)
- if subs == nil {
- xapp.Logger.Error("SubReq: Failed to reserve sequence number. Dropping this msg. SubId: %v, Xid: %s", params.SubId, params.Xid)
+ subs, err := c.registry.ReserveSubscription(RmrEndpoint{*srcAddr, *srcPort}, params.Meid)
+ if err != nil {
+ xapp.Logger.Error("SubReq: %s, Dropping this msg.", err.Error())
return
}
@@ -174,7 +179,7 @@
err = c.e2ap.SetSubscriptionRequestSequenceNumber(params.Payload, subs.Seq)
if err != nil {
xapp.Logger.Error("SubReq: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, SubId: %v, Xid: %s, Payload %X", err, params.SubId, params.Xid, params.Payload)
- c.registry.releaseSequenceNumber(subs.Seq)
+ c.registry.DelSubscription(subs.Seq)
return
}
@@ -184,18 +189,7 @@
_, err = c.tracker.TrackTransaction(subs, RmrEndpoint{*srcAddr, *srcPort}, params, responseReceived, forwardRespToXapp)
if err != nil {
xapp.Logger.Error("SubReq: %s, Dropping this msg.", err.Error())
- c.registry.releaseSequenceNumber(subs.Seq)
- return
- }
-
- // Update routing manager about the new subscription
- subRouteAction := subs.SubRouteInfo(CREATE)
- xapp.Logger.Info("SubReq: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid)
-
- err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
- if err != nil {
- xapp.Logger.Error("SubReq: Failed to update routing manager. Dropping this msg. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
- c.registry.releaseSequenceNumber(subs.Seq)
+ c.registry.DelSubscription(subs.Seq)
return
}
@@ -295,16 +289,9 @@
time.Sleep(3 * time.Second)
- xapp.Logger.Info("SubFail: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid)
- subRouteAction := subs.SubRouteInfo(DELETE)
- err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
- if err != nil {
- xapp.Logger.Error("SubFail: Failed to update routing manager. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
- }
-
xapp.Logger.Info("SubFail: Deleting transaction record. SubId: %v, Xid: %s", params.SubId, params.Xid)
transaction.Release()
- if !c.registry.releaseSequenceNumber(payloadSeqNum) {
+ if !c.registry.DelSubscription(payloadSeqNum) {
xapp.Logger.Error("SubFail: Failed to release sequency number. SubId: %v, Xid: %s", params.SubId, params.Xid)
}
return
@@ -500,16 +487,8 @@
time.Sleep(3 * time.Second)
}
- xapp.Logger.Info("SubDelResp: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid)
- subRouteAction := subs.SubRouteInfo(DELETE)
- err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
- if err != nil {
- xapp.Logger.Error("SubDelResp: Failed to update routing manager. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
- return
- }
-
xapp.Logger.Info("SubDelResp: Deleting transaction record. SubId: %v, Xid: %s", params.SubId, params.Xid)
- if !c.registry.releaseSequenceNumber(payloadSeqNum) {
+ if !c.registry.DelSubscription(payloadSeqNum) {
xapp.Logger.Error("SubDelResp: Failed to release sequency number. SubId: %v, Xid: %s", params.SubId, params.Xid)
return
}
@@ -574,17 +553,9 @@
time.Sleep(3 * time.Second)
}
- xapp.Logger.Info("SubDelFail: Starting routing manager update. SubId: %v, Xid: %s", params.SubId, params.Xid)
- subRouteAction := subs.SubRouteInfo(DELETE)
- err = c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
- if err != nil {
- xapp.Logger.Error("SubDelFail: Failed to update routing manager. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
- return
- }
-
xapp.Logger.Info("SubDelFail: Deleting transaction record. SubId: %v, Xid: %s", params.SubId, params.Xid)
transaction.Release()
- if !c.registry.releaseSequenceNumber(payloadSeqNum) {
+ if !c.registry.DelSubscription(payloadSeqNum) {
xapp.Logger.Error("SubDelFail: Failed to release sequency number. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid)
return
}
@@ -656,17 +627,9 @@
time.Sleep(3 * time.Second)
}
- xapp.Logger.Info("handleSubDelTimer: Starting routing manager update. SubId: %v, Xid: %s", subId, params.Xid)
- subRouteAction := subs.SubRouteInfo(DELETE)
- err := c.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
- if err != nil {
- xapp.Logger.Error("handleSubDelTimer: Failed to update routing manager. Err: %v, SubId: %v, Xid: %s", err, subId, params.Xid)
- return
- }
-
xapp.Logger.Info("handleSubDelTimer: Deleting transaction record. SubId: %v, Xid: %s", subId, params.Xid)
transaction.Release()
- if !c.registry.releaseSequenceNumber(subId) {
+ if !c.registry.DelSubscription(subId) {
xapp.Logger.Error("handleSubDelTimer: Failed to release sequency number. SubId: %v, Xid: %s", subId, params.Xid)
}
return
diff --git a/pkg/control/main_test.go b/pkg/control/main_test.go
index 69dadff..155f92c 100644
--- a/pkg/control/main_test.go
+++ b/pkg/control/main_test.go
@@ -117,9 +117,6 @@
//-----------------------------------------------------------------------------
//
//-----------------------------------------------------------------------------
-//-----------------------------------------------------------------------------
-//
-//-----------------------------------------------------------------------------
type xappTransaction struct {
tc *testingXappControl
xid string
diff --git a/pkg/control/registry.go b/pkg/control/registry.go
index 1adb4f7..2c5bd8c 100644
--- a/pkg/control/registry.go
+++ b/pkg/control/registry.go
@@ -20,87 +20,19 @@
package control
import (
+ "fmt"
"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
- "strconv"
"sync"
)
//-----------------------------------------------------------------------------
//
//-----------------------------------------------------------------------------
-type Subscription struct {
- mutex sync.Mutex
- Seq uint16
- Active bool
- //
- Meid *xapp.RMRMeid
- RmrEndpoint // xapp endpoint
- Trans *Transaction
-}
-
-func (s *Subscription) String() string {
- s.mutex.Lock()
- defer s.mutex.Unlock()
- return strconv.FormatUint(uint64(s.Seq), 10) + "/" + s.RmrEndpoint.String() + "/" + s.Meid.RanName
-}
-
-func (s *Subscription) Confirmed() {
- s.mutex.Lock()
- defer s.mutex.Unlock()
- s.Active = true
-}
-
-func (s *Subscription) UnConfirmed() {
- s.mutex.Lock()
- defer s.mutex.Unlock()
- s.Active = false
-}
-
-func (s *Subscription) IsConfirmed() bool {
- s.mutex.Lock()
- defer s.mutex.Unlock()
- return s.Active
-}
-
-func (s *Subscription) SetTransaction(trans *Transaction) bool {
- s.mutex.Lock()
- defer s.mutex.Unlock()
- if s.Trans == nil {
- s.Trans = trans
- return true
- }
- return false
-}
-
-func (s *Subscription) UnSetTransaction(trans *Transaction) bool {
- s.mutex.Lock()
- defer s.mutex.Unlock()
- if trans == nil || trans == s.Trans {
- s.Trans = nil
- return true
- }
- return false
-}
-
-func (s *Subscription) GetTransaction() *Transaction {
- s.mutex.Lock()
- defer s.mutex.Unlock()
- return s.Trans
-}
-
-func (s *Subscription) SubRouteInfo(act Action) SubRouteInfo {
- s.mutex.Lock()
- defer s.mutex.Unlock()
- return SubRouteInfo{act, s.RmrEndpoint.Addr, s.RmrEndpoint.Port, s.Seq}
-}
-
-//-----------------------------------------------------------------------------
-//
-//-----------------------------------------------------------------------------
type Registry struct {
- register map[uint16]*Subscription
- counter uint16
- mutex sync.Mutex
+ mutex sync.Mutex
+ register map[uint16]*Subscription
+ counter uint16
+ rtmgrClient *RtmgrClient
}
// This method should run as a constructor
@@ -110,7 +42,7 @@
}
// Reserves and returns the next free sequence number
-func (r *Registry) ReserveSubscription(endPoint RmrEndpoint, meid *xapp.RMRMeid) *Subscription {
+func (r *Registry) ReserveSubscription(endPoint RmrEndpoint, meid *xapp.RMRMeid) (*Subscription, error) {
// Check is current SequenceNumber valid
// Allocate next SequenceNumber value and retry N times
r.mutex.Lock()
@@ -125,17 +57,29 @@
r.counter++
}
if _, ok := r.register[sequenceNumber]; ok == false {
- r.register[sequenceNumber] = &Subscription{
+ subs := &Subscription{
Seq: sequenceNumber,
Active: false,
RmrEndpoint: endPoint,
Meid: meid,
Trans: nil,
}
- return r.register[sequenceNumber]
+ r.register[sequenceNumber] = subs
+
+ // Update routing
+ r.mutex.Unlock()
+ err := subs.UpdateRoute(CREATE, r.rtmgrClient)
+ r.mutex.Lock()
+ if err != nil {
+ if _, ok := r.register[sequenceNumber]; ok {
+ delete(r.register, sequenceNumber)
+ }
+ return nil, err
+ }
+ return subs, nil
}
}
- return nil
+ return nil, fmt.Errorf("Registry: Failed to reserves subcription. RmrEndpoint: %s, Meid: %s", endPoint, meid.RanName)
}
func (r *Registry) GetSubscription(sn uint16) *Subscription {
@@ -148,14 +92,21 @@
return nil
}
-//This function releases the given id as unused in the register
-func (r *Registry) releaseSequenceNumber(sn uint16) bool {
+func (r *Registry) DelSubscription(sn uint16) bool {
r.mutex.Lock()
defer r.mutex.Unlock()
if _, ok := r.register[sn]; ok {
+ subs := r.register[sn]
delete(r.register, sn)
+
+ // Update routing
+ r.mutex.Unlock()
+ err := subs.UpdateRoute(DELETE, r.rtmgrClient)
+ r.mutex.Lock()
+ if err != nil {
+ xapp.Logger.Error("Registry: Failed to del route. SubId: %d, RmrEndpoint: %s", subs.Seq, subs.RmrEndpoint)
+ }
return true
- } else {
- return false
}
+ return false
}
diff --git a/pkg/control/subscription.go b/pkg/control/subscription.go
new file mode 100644
index 0000000..9bbe3d4
--- /dev/null
+++ b/pkg/control/subscription.go
@@ -0,0 +1,109 @@
+/*
+==================================================================================
+ Copyright (c) 2019 AT&T Intellectual Property.
+ Copyright (c) 2019 Nokia
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+==================================================================================
+*/
+
+package control
+
+import (
+ "fmt"
+ "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
+ "strconv"
+ "sync"
+)
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+type Subscription struct {
+ mutex sync.Mutex
+ Seq uint16
+ Active bool
+ //
+ Meid *xapp.RMRMeid
+ RmrEndpoint // xapp endpoint. Now only one xapp can have relation to single subscription. To be changed in merge
+ Trans *Transaction
+}
+
+func (s *Subscription) String() string {
+ s.mutex.Lock()
+ defer s.mutex.Unlock()
+ return strconv.FormatUint(uint64(s.Seq), 10) + "/" + s.RmrEndpoint.String() + "/" + s.Meid.RanName
+}
+
+func (s *Subscription) Confirmed() {
+ s.mutex.Lock()
+ defer s.mutex.Unlock()
+ s.Active = true
+}
+
+func (s *Subscription) UnConfirmed() {
+ s.mutex.Lock()
+ defer s.mutex.Unlock()
+ s.Active = false
+}
+
+func (s *Subscription) IsConfirmed() bool {
+ s.mutex.Lock()
+ defer s.mutex.Unlock()
+ return s.Active
+}
+
+func (s *Subscription) SetTransaction(trans *Transaction) error {
+ s.mutex.Lock()
+ defer s.mutex.Unlock()
+
+ subString := strconv.FormatUint(uint64(s.Seq), 10) + "/" + s.RmrEndpoint.String() + "/" + s.Meid.RanName
+
+ if (s.RmrEndpoint.Addr != trans.RmrEndpoint.Addr) || (s.RmrEndpoint.Port != trans.RmrEndpoint.Port) {
+ return fmt.Errorf("Subscription: %s endpoint mismatch with trans: %s", subString, trans)
+ }
+ if s.Trans != nil {
+ return fmt.Errorf("Subscription: %s trans %s exist, can not register %s", subString, s.Trans, trans)
+ }
+ trans.Subs = s
+ s.Trans = trans
+ return nil
+}
+
+func (s *Subscription) UnSetTransaction(trans *Transaction) bool {
+ s.mutex.Lock()
+ defer s.mutex.Unlock()
+ if trans == nil || trans == s.Trans {
+ s.Trans = nil
+ return true
+ }
+ return false
+}
+
+func (s *Subscription) GetTransaction() *Transaction {
+ s.mutex.Lock()
+ defer s.mutex.Unlock()
+ return s.Trans
+}
+
+func (s *Subscription) UpdateRoute(act Action, rtmgrClient *RtmgrClient) error {
+ s.mutex.Lock()
+ defer s.mutex.Unlock()
+ xapp.Logger.Info("Subscription: Starting routing manager route add. SubId: %d, RmrEndpoint: %s", s.Seq, s.RmrEndpoint)
+ subRouteAction := SubRouteInfo{act, s.RmrEndpoint.Addr, s.RmrEndpoint.Port, s.Seq}
+ err := rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
+ if err != nil {
+ return fmt.Errorf("Subscription: Failed to add route. SubId: %d, RmrEndpoint: %s", s.Seq, s.RmrEndpoint)
+ }
+ return nil
+}
diff --git a/pkg/control/tracker.go b/pkg/control/tracker.go
index 0e6941d..087b781 100644
--- a/pkg/control/tracker.go
+++ b/pkg/control/tracker.go
@@ -22,82 +22,15 @@
import (
"fmt"
"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
- "strconv"
"sync"
)
//-----------------------------------------------------------------------------
//
//-----------------------------------------------------------------------------
-type TransactionXappKey struct {
- RmrEndpoint
- Xid string // xapp xid in req
-}
-
-func (key *TransactionXappKey) String() string {
- return key.RmrEndpoint.String() + "/" + key.Xid
-}
-
-//-----------------------------------------------------------------------------
-//
-//-----------------------------------------------------------------------------
-type Transaction struct {
- tracker *Tracker // tracker instance
- Subs *Subscription
- RmrEndpoint RmrEndpoint
- Xid string // xapp xid in req
- OrigParams *xapp.RMRParams // request orginal params
- RespReceived bool
- ForwardRespToXapp bool
- mutex sync.Mutex
-}
-
-func (t *Transaction) String() string {
- t.mutex.Lock()
- defer t.mutex.Unlock()
- var subId string = "?"
- if t.Subs != nil {
- subId = strconv.FormatUint(uint64(t.Subs.Seq), 10)
- }
- return subId + "/" + t.RmrEndpoint.String() + "/" + t.Xid
-}
-
-func (t *Transaction) CheckResponseReceived() bool {
- t.mutex.Lock()
- defer t.mutex.Unlock()
- if t.RespReceived == false {
- t.RespReceived = true
- return false
- }
- return true
-}
-
-func (t *Transaction) RetryTransaction() {
- t.mutex.Lock()
- defer t.mutex.Unlock()
- t.RespReceived = false
-}
-
-func (t *Transaction) Release() {
- t.mutex.Lock()
- defer t.mutex.Unlock()
- if t.Subs != nil {
- t.Subs.UnSetTransaction(t)
- }
- if t.tracker != nil {
- xappkey := TransactionXappKey{t.RmrEndpoint, t.Xid}
- t.tracker.UnTrackTransaction(xappkey)
- }
- t.Subs = nil
- t.tracker = nil
-}
-
-//-----------------------------------------------------------------------------
-//
-//-----------------------------------------------------------------------------
type Tracker struct {
- transactionXappTable map[TransactionXappKey]*Transaction
mutex sync.Mutex
+ transactionXappTable map[TransactionXappKey]*Transaction
}
func (t *Tracker) Init() {
@@ -125,18 +58,10 @@
return nil, err
}
- if subs.SetTransaction(trans) == false {
- othTrans := subs.GetTransaction()
- err := fmt.Errorf("Tracker: Subscription %s got already transaction ongoing: %s, transaction %s not created", subs, othTrans, trans)
+ err := subs.SetTransaction(trans)
+ if err != nil {
return nil, err
}
- trans.Subs = subs
- if (trans.Subs.RmrEndpoint.Addr != trans.RmrEndpoint.Addr) || (trans.Subs.RmrEndpoint.Port != trans.RmrEndpoint.Port) {
- err := fmt.Errorf("Tracker: Subscription endpoint %s mismatch with trans: %s", subs, trans)
- trans.Subs.UnSetTransaction(nil)
- return nil, err
- }
-
trans.tracker = t
t.transactionXappTable[xappkey] = trans
return trans, nil
diff --git a/pkg/control/transaction.go b/pkg/control/transaction.go
new file mode 100644
index 0000000..f686b44
--- /dev/null
+++ b/pkg/control/transaction.go
@@ -0,0 +1,92 @@
+/*
+==================================================================================
+ Copyright (c) 2019 AT&T Intellectual Property.
+ Copyright (c) 2019 Nokia
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+==================================================================================
+*/
+
+package control
+
+import (
+ "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
+ "strconv"
+ "sync"
+)
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+type TransactionXappKey struct {
+ RmrEndpoint
+ Xid string // xapp xid in req
+}
+
+func (key *TransactionXappKey) String() string {
+ return key.RmrEndpoint.String() + "/" + key.Xid
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+type Transaction struct {
+ mutex sync.Mutex
+ tracker *Tracker // tracker instance
+ Subs *Subscription
+ RmrEndpoint RmrEndpoint
+ Xid string // xapp xid in req
+ OrigParams *xapp.RMRParams // request orginal params
+ RespReceived bool
+ ForwardRespToXapp bool
+}
+
+func (t *Transaction) String() string {
+ t.mutex.Lock()
+ defer t.mutex.Unlock()
+ var subId string = "?"
+ if t.Subs != nil {
+ subId = strconv.FormatUint(uint64(t.Subs.Seq), 10)
+ }
+ return subId + "/" + t.RmrEndpoint.String() + "/" + t.Xid
+}
+
+func (t *Transaction) CheckResponseReceived() bool {
+ t.mutex.Lock()
+ defer t.mutex.Unlock()
+ if t.RespReceived == false {
+ t.RespReceived = true
+ return false
+ }
+ return true
+}
+
+func (t *Transaction) RetryTransaction() {
+ t.mutex.Lock()
+ defer t.mutex.Unlock()
+ t.RespReceived = false
+}
+
+func (t *Transaction) Release() {
+ t.mutex.Lock()
+ defer t.mutex.Unlock()
+ if t.Subs != nil {
+ t.Subs.UnSetTransaction(t)
+ }
+ if t.tracker != nil {
+ xappkey := TransactionXappKey{t.RmrEndpoint, t.Xid}
+ t.tracker.UnTrackTransaction(xappkey)
+ }
+ t.Subs = nil
+ t.tracker = nil
+}