Submgr restart improvement
Change-Id: I02388dea48b826eac09aec0308d85bcdcf812a6f
Signed-off-by: Anssi Mannila <anssi.mannila@nokia.com>
diff --git a/pkg/control/control.go b/pkg/control/control.go
index 9f90252..7d180ad 100755
--- a/pkg/control/control.go
+++ b/pkg/control/control.go
@@ -21,14 +21,20 @@
import (
"fmt"
+ "net/http"
+ "os"
+ "strconv"
+ "strings"
+ "time"
+
"gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
httptransport "github.com/go-openapi/runtime/client"
"github.com/go-openapi/strfmt"
+ "github.com/gorilla/mux"
"github.com/spf13/viper"
- "time"
)
//-----------------------------------------------------------------------------
@@ -59,14 +65,17 @@
var e2tRecvMsgTimeout time.Duration
var e2tMaxSubReqTryCount uint64 // Initial try + retry
var e2tMaxSubDelReqTryCount uint64 // Initial try + retry
+var readSubsFromDb string
type Control struct {
*xapp.RMRClient
e2ap *E2ap
registry *Registry
tracker *Tracker
+ db Sdlnterface
//subscriber *xapp.Subscriber
- CntRecvMsg uint64
+ CntRecvMsg uint64
+ ResetTestFlag bool
}
type RMRMeid struct {
@@ -75,6 +84,9 @@
RanName string
}
+type SubmgrRestartTestEvent struct{}
+type SubmgrRestartUpEvent struct{}
+
func init() {
xapp.Logger.Info("SUBMGR")
viper.AutomaticEnv()
@@ -84,6 +96,54 @@
func NewControl() *Control {
+ ReadConfigParameters()
+ transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
+ rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
+
+ registry := new(Registry)
+ registry.Initialize()
+ registry.rtmgrClient = &rtmgrClient
+
+ tracker := new(Tracker)
+ tracker.Init()
+
+ //subscriber := xapp.NewSubscriber(viper.GetString("subscription.host"), viper.GetInt("subscription.timeout"))
+
+ c := &Control{e2ap: new(E2ap),
+ registry: registry,
+ tracker: tracker,
+ db: CreateSdl(),
+ //subscriber: subscriber,
+ }
+
+ // Register REST handler for testing support
+ xapp.Resource.InjectRoute("/ric/v1/test/{testId}", c.TestRestHandler, "POST")
+
+ go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandler)
+ //go c.subscriber.Listen(c.SubscriptionHandler, c.QueryHandler)
+
+ if readSubsFromDb == "false" {
+ return c
+ }
+
+ // Read subscriptions from db
+ xapp.Logger.Info("Reading subscriptions from db")
+ subIds, register, err := c.ReadAllSubscriptionsFromSdl()
+ if err != nil {
+ xapp.Logger.Error("%v", err)
+ } else {
+ c.registry.subIds = subIds
+ c.registry.register = register
+ c.HandleUncompletedSubscriptions(register)
+ }
+ return c
+}
+
+//-------------------------------------------------------------------
+//
+//-------------------------------------------------------------------
+func ReadConfigParameters() {
+
// viper.GetDuration returns nanoseconds
e2tSubReqTimeout = viper.GetDuration("controls.e2tSubReqTimeout_ms") * 1000000
if e2tSubReqTimeout == 0 {
@@ -111,26 +171,26 @@
}
xapp.Logger.Info("e2tMaxSubDelReqTryCount %v", e2tMaxSubDelReqTryCount)
- transport := httptransport.New(viper.GetString("rtmgr.HostAddr")+":"+viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
- rtmgrClient := RtmgrClient{rtClient: rtmgrclient.New(transport, strfmt.Default)}
-
- registry := new(Registry)
- registry.Initialize()
- registry.rtmgrClient = &rtmgrClient
-
- tracker := new(Tracker)
- tracker.Init()
-
- //subscriber := xapp.NewSubscriber(viper.GetString("subscription.host"), viper.GetInt("subscription.timeout"))
-
- c := &Control{e2ap: new(E2ap),
- registry: registry,
- tracker: tracker,
- //subscriber: subscriber,
+ readSubsFromDb = viper.GetString("controls.readSubsFromDb")
+ if readSubsFromDb == "" {
+ readSubsFromDb = "true"
}
- go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler, c.SubscriptionDeleteHandler)
- //go c.subscriber.Listen(c.SubscriptionHandler, c.QueryHandler)
- return c
+ xapp.Logger.Info("readSubsFromDb %v", readSubsFromDb)
+}
+
+//-------------------------------------------------------------------
+//
+//-------------------------------------------------------------------
+func (c *Control) HandleUncompletedSubscriptions(register map[uint32]*Subscription) {
+
+ xapp.Logger.Debug("HandleUncompletedSubscriptions. len(register) = %v", len(register))
+ for subId, subs := range register {
+ if subs.SubRespRcvd == false {
+ subs.NoRespToXapp = true
+ xapp.Logger.Debug("SendSubscriptionDeleteReq. subId = %v", subId)
+ c.SendSubscriptionDeleteReq(subs)
+ }
+ }
}
func (c *Control) ReadyCB(data interface{}) {
@@ -164,14 +224,47 @@
return &models.SubscriptionResponse{}, fmt.Errorf("Subscription rest interface not implemented")
}
-func (c *Control) SubscriptionDeleteHandler(string) error {
- return fmt.Errorf("Subscription rest interface not implemented")
+func (c *Control) SubscriptionDeleteHandler(s string) error {
+ return nil
}
func (c *Control) QueryHandler() (models.SubscriptionList, error) {
return c.registry.QueryHandler()
}
+func (c *Control) TestRestHandler(w http.ResponseWriter, r *http.Request) {
+
+ xapp.Logger.Info("TestRestHandler() called")
+
+ pathParams := mux.Vars(r)
+ s := pathParams["testId"]
+
+ // This can be used to delete single subscription from db
+ if contains := strings.Contains(s, "deletesubid="); contains == true {
+ var splits = strings.Split(s, "=")
+ if subId, err := strconv.ParseInt(splits[1], 10, 64); err == nil {
+ xapp.Logger.Info("RemoveSubscriptionFromSdl() called. subId = %v", subId)
+ c.RemoveSubscriptionFromSdl(uint32(subId))
+ return
+ }
+ }
+
+ // This can be used to remove all subscriptions db from
+ if s == "emptydb" {
+ xapp.Logger.Info("RemoveAllSubscriptionsFromSdl() called")
+ c.RemoveAllSubscriptionsFromSdl()
+ return
+ }
+
+ // This is meant to cause submgr's restart in testing
+ if s == "restart" {
+ xapp.Logger.Info("os.Exit(1) called")
+ os.Exit(1)
+ }
+
+ xapp.Logger.Info("Unsupported rest command received %s", s)
+}
+
//-------------------------------------------------------------------
//
//-------------------------------------------------------------------
@@ -270,7 +363,7 @@
}
//TODO handle subscription toward e2term inside AssignToSubscription / hide handleSubscriptionCreate in it?
- subs, err := c.registry.AssignToSubscription(trans, subReqMsg)
+ subs, err := c.registry.AssignToSubscription(trans, subReqMsg, c.ResetTestFlag)
if err != nil {
xapp.Logger.Error("XAPP-SubReq: %s", idstring(err, trans))
return
@@ -281,7 +374,6 @@
//
go c.handleSubscriptionCreate(subs, trans)
event, _ := trans.WaitEvent(0) //blocked wait as timeout is handled in subs side
-
err = nil
if event != nil {
switch themsg := event.(type) {
@@ -344,7 +436,12 @@
xapp.Logger.Debug("XAPP-SubDelReq: Handling event %s ", idstring(nil, trans, subs))
- // Whatever is received send ok delete response
+ if subs.NoRespToXapp == true {
+ // Do no send delete responses to xapps due to submgr restart is deleting uncompleted subscriptions
+ return
+ }
+
+ // Whatever is received success, fail or timeout, send successful delete response
subDelRespMsg := &e2ap.E2APSubscriptionDeleteResponse{}
subDelRespMsg.RequestId = subs.GetReqId().RequestId
subDelRespMsg.FunctionId = subs.SubReqMsg.FunctionId
@@ -362,6 +459,7 @@
//-------------------------------------------------------------------
func (c *Control) handleSubscriptionCreate(subs *Subscription, parentTrans *TransactionXapp) {
+ var removeSubscriptionFromDb bool = false
trans := c.tracker.NewSubsTransaction(subs)
subs.WaitTransactionTurn(trans)
defer subs.ReleaseTransactionTurn(trans)
@@ -386,7 +484,9 @@
switch themsg := event.(type) {
case *e2ap.E2APSubscriptionResponse:
subRfMsg, valid = subs.SetCachedResponse(event, true)
+ subs.SubRespRcvd = true
case *e2ap.E2APSubscriptionFailure:
+ removeSubscriptionFromDb = true
subRfMsg, valid = subs.SetCachedResponse(event, false)
doRetry = true
for _, item := range themsg.ActionNotAdmittedList.Items {
@@ -397,13 +497,17 @@
}
xapp.Logger.Info("SUBS-SubReq: internal delete and possible retry due event(%s) retry(%t,%d/%d) %s", typeofSubsMessage(event), doRetry, retries, maxRetries, idstring(nil, trans, subs, parentTrans))
c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
+ case *SubmgrRestartTestEvent:
+ // This simulates that no response has been received and after restart subscriptions are restored from db
+ xapp.Logger.Debug("Test restart flag is active. Dropping this transaction to test restart case")
+ return
default:
xapp.Logger.Info("SUBS-SubReq: internal delete due event(%s) %s", typeofSubsMessage(event), idstring(nil, trans, subs, parentTrans))
+ removeSubscriptionFromDb = true
subRfMsg, valid = subs.SetCachedResponse(nil, false)
c.sendE2TSubscriptionDeleteRequest(subs, trans, parentTrans)
}
}
-
xapp.Logger.Debug("SUBS-SubReq: Handling (e2t response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
} else {
xapp.Logger.Debug("SUBS-SubReq: Handling (cached response %s) %s", typeofSubsMessage(subRfMsg), idstring(nil, trans, subs, parentTrans))
@@ -411,8 +515,10 @@
//Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
if valid == false {
- c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second)
+ c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second, c)
}
+
+ c.UpdateSubscriptionInDB(subs, removeSubscriptionFromDb)
parentTrans.SendEvent(subRfMsg, 0)
}
@@ -440,7 +546,8 @@
//Now RemoveFromSubscription in here to avoid race conditions (mostly concerns delete)
// If parallel deletes ongoing both might pass earlier sendE2TSubscriptionDeleteRequest(...) if
// RemoveFromSubscription locates in caller side (now in handleXAPPSubscriptionDeleteRequest(...))
- c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second)
+ c.registry.RemoveFromSubscription(subs, parentTrans, 5*time.Second, c)
+ c.registry.UpdateSubscriptionToDb(subs, c)
parentTrans.SendEvent(nil, 0)
}
@@ -460,12 +567,19 @@
return event
}
+ // Write uncompleted subscrition in db. If no response for subscrition it need to be re-processed (deleted) after restart
+ c.WriteSubscriptionToDb(subs)
for retries := uint64(0); retries < e2tMaxSubReqTryCount; retries++ {
desc := fmt.Sprintf("(retry %d)", retries)
c.rmrSendToE2T(desc, subs, trans)
- event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
- if timedOut {
- continue
+ if subs.DoNotWaitSubResp == false {
+ event, timedOut = trans.WaitEvent(e2tSubReqTimeout)
+ if timedOut {
+ continue
+ }
+ } else {
+ // Simulating case where subscrition request has been sent but response has not been received before restart
+ event = &SubmgrRestartTestEvent{}
}
break
}
@@ -644,3 +758,76 @@
return "Unknown"
}
}
+
+//-------------------------------------------------------------------
+//
+//-------------------------------------------------------------------
+func (c *Control) WriteSubscriptionToDb(subs *Subscription) {
+ xapp.Logger.Debug("WriteSubscriptionToDb() subId = %v", subs.ReqId.InstanceId)
+ err := c.WriteSubscriptionToSdl(subs.ReqId.InstanceId, subs)
+ if err != nil {
+ xapp.Logger.Error("%v", err)
+ }
+}
+
+//-------------------------------------------------------------------
+//
+//-------------------------------------------------------------------
+func (c *Control) UpdateSubscriptionInDB(subs *Subscription, removeSubscriptionFromDb bool) {
+
+ if removeSubscriptionFromDb == true {
+ // Subscription was written in db already when subscription request was sent to BTS, except for merged request
+ c.RemoveSubscriptionFromDb(subs)
+ } else {
+ // Update is needed for successful response and merge case here
+ if subs.RetryFromXapp == false {
+ c.WriteSubscriptionToDb(subs)
+ }
+ }
+ subs.RetryFromXapp = false
+}
+
+//-------------------------------------------------------------------
+//
+//-------------------------------------------------------------------
+func (c *Control) RemoveSubscriptionFromDb(subs *Subscription) {
+ xapp.Logger.Debug("RemoveSubscriptionFromDb() subId = %v", subs.ReqId.InstanceId)
+ err := c.RemoveSubscriptionFromSdl(subs.ReqId.InstanceId)
+ if err != nil {
+ xapp.Logger.Error("%v", err)
+ }
+}
+
+func (c *Control) SendSubscriptionDeleteReq(subs *Subscription) {
+
+ xapp.Logger.Debug("Sending subscription delete due to restart. subId = %v", subs.ReqId.InstanceId)
+
+ // Send delete for every endpoint in the subscription
+ subDelReqMsg := &e2ap.E2APSubscriptionDeleteRequest{}
+ subDelReqMsg.RequestId = subs.GetReqId().RequestId
+ subDelReqMsg.FunctionId = subs.SubReqMsg.FunctionId
+ mType, payload, err := c.e2ap.PackSubscriptionDeleteRequest(subDelReqMsg)
+ if err != nil {
+ xapp.Logger.Error("SendSubscriptionDeleteReq() %s", idstring(err))
+ return
+ }
+ for _, endPoint := range subs.EpList.Endpoints {
+ params := &xapp.RMRParams{}
+ params.Mtype = mType
+ params.SubId = int(subs.GetReqId().InstanceId)
+ params.Xid = ""
+ params.Meid = subs.Meid
+ params.Src = endPoint.String()
+ params.PayloadLen = len(payload.Buf)
+ params.Payload = payload.Buf
+ params.Mbuf = nil
+
+ if params == nil {
+ xapp.Logger.Error("SendSubscriptionDeleteReq() params == nil")
+ return
+ }
+
+ subs.DeleteFromDb = true
+ c.handleXAPPSubscriptionDeleteRequest(params)
+ }
+}
diff --git a/pkg/control/registry.go b/pkg/control/registry.go
index 7ad54c1..e07116c 100644
--- a/pkg/control/registry.go
+++ b/pkg/control/registry.go
@@ -42,8 +42,8 @@
func (r *Registry) Initialize() {
r.register = make(map[uint32]*Subscription)
var i uint32
- for i = 0; i < 65535; i++ {
- r.subIds = append(r.subIds, i+1)
+ for i = 1; i < 65535; i++ {
+ r.subIds = append(r.subIds, i)
}
}
@@ -60,7 +60,7 @@
return resp, nil
}
-func (r *Registry) allocateSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) {
+func (r *Registry) allocateSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool) (*Subscription, error) {
if len(r.subIds) > 0 {
subId := r.subIds[0]
r.subIds = r.subIds[1:]
@@ -69,19 +69,26 @@
return nil, fmt.Errorf("Registry: Failed to reserve subscription exists")
}
subs := &Subscription{
- registry: r,
- Meid: trans.Meid,
- SubReqMsg: subReqMsg,
- valid: true,
+ registry: r,
+ Meid: trans.Meid,
+ SubReqMsg: subReqMsg,
+ valid: true,
+ RetryFromXapp: false,
+ SubRespRcvd: false,
+ DeleteFromDb: false,
+ NoRespToXapp: false,
+ DoNotWaitSubResp: false,
}
subs.ReqId.Id = 123
subs.ReqId.InstanceId = subId
+ if resetTestFlag == true {
+ subs.DoNotWaitSubResp = true
+ }
if subs.EpList.AddEndpoint(trans.GetEndpoint()) == false {
r.subIds = append(r.subIds, subs.ReqId.InstanceId)
return nil, fmt.Errorf("Registry: Endpoint existing already in subscription")
}
-
return subs, nil
}
return nil, fmt.Errorf("Registry: Failed to reserve subscription no free ids")
@@ -121,7 +128,7 @@
return nil, false
}
-func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) {
+func (r *Registry) AssignToSubscription(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest, resetTestFlag bool) (*Subscription, error) {
var err error
var newAlloc bool
r.mutex.Lock()
@@ -151,15 +158,16 @@
subs, endPointFound := r.findExistingSubs(trans, subReqMsg)
if subs == nil {
- subs, err = r.allocateSubs(trans, subReqMsg)
+ subs, err = r.allocateSubs(trans, subReqMsg, resetTestFlag)
if err != nil {
return nil, err
}
newAlloc = true
} else if endPointFound == true {
// Requesting endpoint is already present in existing subscription. This can happen if xApp is restarted.
+ subs.RetryFromXapp = true
xapp.Logger.Debug("CREATE: subscription already exists. %s", subs.String())
- xapp.Logger.Debug("Registry: substable=%v", r.register)
+ //xapp.Logger.Debug("Registry: substable=%v", r.register)
return subs, nil
}
@@ -231,7 +239,7 @@
}
// TODO: Works with concurrent calls, but check if can be improved
-func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *TransactionXapp, waitRouteClean time.Duration) error {
+func (r *Registry) RemoveFromSubscription(subs *Subscription, trans *TransactionXapp, waitRouteClean time.Duration, c *Control) error {
r.mutex.Lock()
defer r.mutex.Unlock()
@@ -276,7 +284,6 @@
xapp.Logger.Debug("Registry: substable=%v", r.register)
}
r.subIds = append(r.subIds, subId)
-
} else if subs.EpList.Size() > 0 {
//
// Subscription route updates
@@ -284,12 +291,30 @@
subRouteAction := SubRouteInfo{subs.EpList, uint16(subId)}
r.rtmgrClient.SubscriptionRequestUpdate(subRouteAction)
}
-
}()
return nil
}
+func (r *Registry) UpdateSubscriptionToDb(subs *Subscription, c *Control) {
+ r.mutex.Lock()
+ defer r.mutex.Unlock()
+ subs.mutex.Lock()
+ defer subs.mutex.Unlock()
+
+ epamount := subs.EpList.Size()
+ if epamount == 0 {
+ if _, ok := r.register[subs.ReqId.InstanceId]; ok {
+ // Not merged subscription is being deleted
+ c.RemoveSubscriptionFromDb(subs)
+
+ }
+ } else if subs.EpList.Size() > 0 {
+ // Endpoint of merged subscription is being deleted
+ c.WriteSubscriptionToDb(subs)
+ }
+}
+
func (r *Registry) GetSubscription(subId uint32) *Subscription {
r.mutex.Lock()
defer r.mutex.Unlock()
diff --git a/pkg/control/sdl.go b/pkg/control/sdl.go
new file mode 100644
index 0000000..3a083fb
--- /dev/null
+++ b/pkg/control/sdl.go
@@ -0,0 +1,229 @@
+/*
+==================================================================================
+ Copyright (c) 2019 AT&T Intellectual Property.
+ Copyright (c) 2019 Nokia
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+==================================================================================
+*/
+
+package control
+
+import (
+ "encoding/json"
+ "fmt"
+ "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
+ sdl "gerrit.o-ran-sc.org/r/ric-plt/sdlgo"
+ "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
+ "strconv"
+)
+
+type SubscriptionInfo struct {
+ Valid bool
+ ReqId RequestId
+ Meid xapp.RMRMeid
+ EpList xapp.RmrEndpointList
+ SubReqMsg e2ap.E2APSubscriptionRequest
+ SubRespMsg e2ap.E2APSubscriptionResponse
+ SubFailMsg e2ap.E2APSubscriptionFailure
+ SubRespRcvd string
+}
+
+func CreateSdl() Sdlnterface {
+ return sdl.NewSdlInstance("submgr", sdl.NewDatabase())
+}
+
+func (c *Control) WriteSubscriptionToSdl(subId uint32, subs *Subscription) error {
+
+ var subscriptionInfo SubscriptionInfo
+ subscriptionInfo.Valid = subs.valid
+ subscriptionInfo.ReqId = subs.ReqId
+ subscriptionInfo.Meid = *subs.Meid
+ subscriptionInfo.EpList = subs.EpList
+ subscriptionInfo.SubReqMsg = *subs.SubReqMsg
+
+ if typeofSubsMessage(subs.SubRFMsg) == "SubResp" {
+ subscriptionInfo.SubRespRcvd = "SubResp"
+ subscriptionInfo.SubRespMsg = *subs.SubRFMsg.(*e2ap.E2APSubscriptionResponse)
+ } else if typeofSubsMessage(subs.SubRFMsg) == "SubFail" {
+ subscriptionInfo.SubRespRcvd = "SubFail"
+ subscriptionInfo.SubFailMsg = *subs.SubRFMsg.(*e2ap.E2APSubscriptionFailure)
+ } else {
+ subscriptionInfo.SubRespRcvd = ""
+ }
+
+ jsonData, err := json.Marshal(subscriptionInfo)
+ if err != nil {
+ return fmt.Errorf("SDL: WriteSubscriptionToSdl() json.Marshal error: %s", err.Error())
+ }
+
+ err = c.db.Set(strconv.FormatUint(uint64(subId), 10), jsonData)
+ if err != nil {
+ return fmt.Errorf("SDL: WriteSubscriptionToSdl(): %s", err.Error())
+ } else {
+ xapp.Logger.Debug("SDL: Subscription written in db. subId = %v", subId)
+ }
+ return nil
+}
+
+func (c *Control) ReadSubscriptionFromSdl(subId uint32) (*Subscription, error) {
+
+ // This function is now just for testing purpose
+ key := strconv.FormatUint(uint64(subId), 10)
+ retMap, err := c.db.Get([]string{key})
+ if err != nil {
+ return nil, fmt.Errorf("SDL: ReadSubscriptionFromSdl(): %s", err.Error())
+ } else {
+ xapp.Logger.Debug("SDL: Subscription read from db. subId = %v", subId)
+ }
+
+ subs := &Subscription{}
+ for _, iSubscriptionInfo := range retMap {
+
+ if iSubscriptionInfo == nil {
+ return nil, fmt.Errorf("SDL: ReadSubscriptionFromSdl() subscription not found. subId = %v\n", subId)
+ }
+
+ subscriptionInfo := &SubscriptionInfo{}
+ jsonSubscriptionInfo := iSubscriptionInfo.(string)
+
+ err := json.Unmarshal([]byte(jsonSubscriptionInfo), subscriptionInfo)
+ if err != nil {
+ return nil, fmt.Errorf("SDL: ReadSubscriptionFromSdl() json.unmarshal error: %s\n", err.Error())
+ }
+
+ subs = c.CreateSubscription(subscriptionInfo, &jsonSubscriptionInfo)
+ }
+ return subs, nil
+}
+
+func (c *Control) CreateSubscription(subscriptionInfo *SubscriptionInfo, jsonSubscriptionInfo *string) *Subscription {
+
+ subs := &Subscription{}
+ subs.registry = c.registry
+ subs.valid = subscriptionInfo.Valid
+ subs.ReqId = subscriptionInfo.ReqId
+ meid := xapp.RMRMeid{}
+ meid = subscriptionInfo.Meid
+ subs.Meid = &meid
+ subs.EpList = subscriptionInfo.EpList
+ subs.TheTrans = nil
+ subReq := e2ap.E2APSubscriptionRequest{}
+ subReq = subscriptionInfo.SubReqMsg
+ subs.SubReqMsg = &subReq
+
+ if subscriptionInfo.SubRespRcvd == "SubResp" {
+ subs.SubRespRcvd = true
+ subResp := e2ap.E2APSubscriptionResponse{}
+ subResp = subscriptionInfo.SubRespMsg
+ subs.SubRFMsg = &subResp
+ } else if subscriptionInfo.SubRespRcvd == "SubFail" {
+ subs.SubRespRcvd = false
+ subFail := e2ap.E2APSubscriptionFailure{}
+ subFail = subscriptionInfo.SubFailMsg
+ subs.SubRFMsg = &subFail
+ } else {
+ subs.SubRespRcvd = false
+ subs.SubRFMsg = nil
+ xapp.Logger.Debug("SDL: CreateSubscription() subscriptionInfo.SubRespRcvd == '', InstanceId=%v ", subscriptionInfo.ReqId.InstanceId)
+ }
+ return subs
+}
+
+func (c *Control) RemoveSubscriptionFromSdl(subId uint32) error {
+
+ key := strconv.FormatUint(uint64(subId), 10)
+ err := c.db.Remove([]string{key})
+ if err != nil {
+ return fmt.Errorf("SDL: RemoveSubscriptionfromSdl(): %s\n", err.Error())
+ } else {
+ xapp.Logger.Debug("SDL: Subscription removed from db. subId = %v", subId)
+ }
+ return nil
+}
+
+func (c *Control) ReadAllSubscriptionsFromSdl() ([]uint32, map[uint32]*Subscription, error) {
+
+ // Read all subscriptionInfos
+ var subIds []uint32
+ var i uint32
+ for i = 1; i < 65535; i++ {
+ subIds = append(subIds, i)
+ }
+
+ retMap := make(map[uint32]*Subscription)
+ // Get all keys
+ keys, err := c.db.GetAll()
+ if err != nil {
+ return nil, nil, fmt.Errorf("SDL: ReadAllSubscriptionsFromSdl(), GetAll(). Error while reading keys from DBAAS %s\n", err.Error())
+ }
+
+ if len(keys) == 0 {
+ return subIds, retMap, nil
+ }
+
+ // Get all subscriptionInfos
+ iSubscriptionMap, err := c.db.Get(keys)
+ if err != nil {
+ return nil, nil, fmt.Errorf("SDL: ReadAllSubscriptionsFromSdl(), Get(): Error while reading subscriptions from DBAAS %s\n", err.Error())
+ }
+
+ for _, iSubscriptionInfo := range iSubscriptionMap {
+
+ if iSubscriptionInfo == nil {
+ return nil, nil, fmt.Errorf("SDL: ReadAllSubscriptionsFromSdl() iSubscriptionInfo = nil\n")
+ }
+
+ subscriptionInfo := &SubscriptionInfo{}
+ jsonSubscriptionInfo := iSubscriptionInfo.(string)
+
+ err := json.Unmarshal([]byte(jsonSubscriptionInfo), subscriptionInfo)
+ if err != nil {
+ return nil, nil, fmt.Errorf("SDL: ReadAllSubscriptionsFromSdl() json.unmarshal error: %s\n", err.Error())
+ }
+
+ subs := c.CreateSubscription(subscriptionInfo, &jsonSubscriptionInfo)
+
+ if int(subscriptionInfo.ReqId.InstanceId) >= len(subIds) {
+ return nil, nil, fmt.Errorf("SDL: ReadAllSubscriptionsFromSdl() index is out of range. Index is %d with slice length %d", subscriptionInfo.ReqId.InstanceId, len(subIds))
+ }
+ retMap[subscriptionInfo.ReqId.InstanceId] = subs
+
+ // Remove subId from free subIds. Original slice is modified here!
+ subIds, err = removeNumber(subIds, subscriptionInfo.ReqId.InstanceId)
+ if err != nil {
+ return nil, nil, fmt.Errorf("SDL: ReadAllSubscriptionsFromSdl() error: %s\n", err.Error())
+ }
+ }
+ return subIds, retMap, nil
+}
+
+func removeNumber(s []uint32, removedNum uint32) ([]uint32, error) {
+ for i, num := range s {
+ if removedNum == uint32(num) {
+ s = append(s[:i], s[i+1:]...)
+ return s[:len(s)], nil
+ }
+ }
+ return nil, fmt.Errorf("SDL: To be removed number not in the slice. removedNum: %v", removedNum)
+}
+func (c *Control) RemoveAllSubscriptionsFromSdl() error {
+
+ err := c.db.RemoveAll()
+ if err != nil {
+ return fmt.Errorf("SDL: RemoveAllSubscriptionsFromSdl(): %s\n", err.Error())
+ } else {
+ xapp.Logger.Debug("SDL: All subscriptions removed from db")
+ }
+ return nil
+}
diff --git a/pkg/control/sdl_test.go b/pkg/control/sdl_test.go
new file mode 100644
index 0000000..9198e5c
--- /dev/null
+++ b/pkg/control/sdl_test.go
@@ -0,0 +1,459 @@
+/*
+==================================================================================
+ Copyright (c) 2019 AT&T Intellectual Property.
+ Copyright (c) 2019 Nokia
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+==================================================================================
+*/
+
+package control
+
+import (
+ "encoding/json"
+ "fmt"
+ "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
+ "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/teststube2ap"
+ "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
+ "github.com/stretchr/testify/assert"
+ "reflect"
+ "strconv"
+ "testing"
+ "time"
+)
+
+const (
+ subsResponse = 1
+ subsFailure = 2
+ noResponse = 3
+)
+
+type Mock struct {
+ subsDB map[string]string // Store information as a string like real db does.
+ register map[uint32]*Subscription
+ subIds []uint32
+ lastAllocatedSubId uint32
+}
+
+var mock *Mock
+
+func CreateMock() *Mock {
+ fmt.Println("Test CreateMock()")
+ mock = new(Mock)
+ mock.ResetTestSettings()
+ return mock
+}
+
+func (m *Mock) ResetTestSettings() {
+ m.subsDB = make(map[string]string)
+ m.register = make(map[uint32]*Subscription)
+ var i uint32
+ for i = 1; i < 65535; i++ {
+ m.subIds = append(m.subIds, i)
+ }
+}
+
+func (m *Mock) AllocNextSubId() uint32 {
+ m.lastAllocatedSubId = m.subIds[0]
+ return m.lastAllocatedSubId
+}
+
+func TestWait(t *testing.T) {
+ // Wait to test settings to complete
+ <-time.After(1 * time.Second)
+}
+
+func GetSubscription(t *testing.T, e2SubId uint32, responseType int, srcEndPoint, ranName string, xId string) *Subscription {
+ t.Log("TEST: Getting subscription")
+
+ subs := &Subscription{}
+
+ // Create unpacked e2SubReqMsg
+ subReqParams := &teststube2ap.E2StubSubsReqParams{}
+ subReqParams.Init()
+
+ meid := xapp.RMRMeid{}
+ meid.RanName = ranName
+
+ params := &xapp.RMRParams{}
+ params.Src = srcEndPoint
+ params.Xid = xId
+ params.Meid = &meid
+
+ // Create xApp transaction
+ trans := mainCtrl.c.tracker.NewXappTransaction(xapp.NewRmrEndpoint(params.Src), params.Xid, subReqParams.Req.RequestId.InstanceId, params.Meid)
+ if trans == nil {
+ t.Errorf("TEST: %s", idstring(fmt.Errorf("transaction not created"), params))
+ return nil
+ }
+
+ // Allocate E2 instanceId/subId
+ subReqParams.Req.RequestId.InstanceId = e2SubId
+
+ subs.ReqId.Id = 123
+ subs.ReqId.InstanceId = subReqParams.Req.RequestId.InstanceId
+ subs.Meid = &meid
+ subs.EpList.AddEndpoint(trans.GetEndpoint())
+ subs.SubReqMsg = subReqParams.Req
+ // subs.SubRFMsg contains received/cached SubscriptionResponse or SubscriptionFailure, nil in no response received
+ if responseType == subsResponse {
+ subs.SubRFMsg = GetSubsResponse(t, subReqParams.Req)
+ subs.valid = true
+ } else if responseType == subsFailure {
+ subs.SubRFMsg = GetSubsFailure(t, subReqParams.Req)
+ subs.valid = false
+ } else if responseType == noResponse {
+ subs.SubRFMsg = nil
+ subs.valid = false
+ }
+ return subs
+}
+
+func GetSubsResponse(t *testing.T, req *e2ap.E2APSubscriptionRequest) *e2ap.E2APSubscriptionResponse {
+ t.Log("TEST: Getting ricSubscriptionResponse")
+
+ // Create e2SubRespMsg
+ resp := &e2ap.E2APSubscriptionResponse{}
+ resp.RequestId.Id = 123
+ resp.RequestId.InstanceId = req.RequestId.InstanceId
+ resp.FunctionId = req.FunctionId
+
+ resp.ActionAdmittedList.Items = make([]e2ap.ActionAdmittedItem, len(req.ActionSetups))
+ for index := int(0); index < len(req.ActionSetups); index++ {
+ resp.ActionAdmittedList.Items[index].ActionId = req.ActionSetups[index].ActionId
+ }
+
+ for index := uint64(0); index < 1; index++ {
+ item := e2ap.ActionNotAdmittedItem{}
+ item.ActionId = index
+ item.Cause.Content = 1
+ item.Cause.Value = 1
+ resp.ActionNotAdmittedList.Items = append(resp.ActionNotAdmittedList.Items, item)
+ }
+ return resp
+}
+
+func GetSubsFailure(t *testing.T, req *e2ap.E2APSubscriptionRequest) *e2ap.E2APSubscriptionFailure {
+ t.Log("TEST: Getting ricSubscriptionFailure")
+
+ fail := &e2ap.E2APSubscriptionFailure{}
+ fail.RequestId.Id = req.RequestId.Id
+ fail.RequestId.InstanceId = req.RequestId.InstanceId
+ fail.FunctionId = req.FunctionId
+ return fail
+}
+
+func PrintSubscriptionData(t *testing.T, subs *Subscription) {
+ t.Log("TEST: subscription data")
+ t.Logf("TEST: subs.mutex = %v", subs.mutex)
+ t.Logf("TEST: subs.ReqId.InstanceId = %v", subs.ReqId.InstanceId)
+ t.Logf("TEST: subs.ReqId.Id = %v", subs.ReqId.Id)
+ t.Logf("TEST: subs.EpList = %v", subs.EpList)
+ t.Logf("TEST: subs.Meid.RanName = %v", subs.Meid.RanName)
+ t.Logf("TEST: subs.SubReqMsg = %v", subs.SubReqMsg.String())
+ t.Logf("TEST: subs.valid = %v", subs.valid)
+
+ if subs.SubRFMsg != nil {
+ switch typeofSubsMessage(subs.SubRFMsg) {
+ case "SubResp":
+ t.Logf("TEST: subs.SubRFMsg == SubResp")
+ subResp := subs.SubRFMsg.(*e2ap.E2APSubscriptionResponse)
+ t.Logf("TEST: subResp = %+v", subResp)
+ case "SubFail":
+ t.Logf("TEST: subs.SubRFMsg == SubFail")
+ subFail := subs.SubRFMsg.(*e2ap.E2APSubscriptionFailure)
+ t.Logf("TEST: subFail = %+v", subFail)
+ }
+ } else {
+ t.Logf("TEST: subs.SubRFMsg == nil")
+ }
+}
+
+func TestWriteSubscriptionToSdl(t *testing.T) {
+ t.Log("TestWriteSubscriptionToSdl")
+
+ // Write one subscription
+ subId := mock.AllocNextSubId()
+ subs := GetSubscription(t, subId, subsResponse, "localhost:13560", "RAN_NAME_1", "123456")
+ PrintSubscriptionData(t, subs)
+ t.Logf("TEST: Writing subId = %v\n", subId)
+ err := mainCtrl.c.WriteSubscriptionToSdl(subId, subs)
+ if err != nil {
+ t.Errorf("TEST: %s", err.Error())
+ return
+ }
+}
+
+func TestReadSubscriptionFromSdl(t *testing.T) {
+ t.Log("TestReadSubscriptionFromSdl")
+
+ subId := mock.lastAllocatedSubId
+ t.Logf("Reading subId = %v\n", subId)
+ subs, err := mainCtrl.c.ReadSubscriptionFromSdl(subId)
+ if err != nil {
+ t.Errorf("TEST: %s", err.Error())
+ return
+ }
+ PrintSubscriptionData(t, subs)
+ assert.Equal(t, mock.register[subId].SubReqMsg, subs.SubReqMsg)
+}
+
+func TestRemoveSubscriptionFromSdl(t *testing.T) {
+ t.Log("TestRemoveSubscriptionFromSdl")
+
+ subId := mock.lastAllocatedSubId
+ err := mainCtrl.c.RemoveSubscriptionFromSdl(subId)
+ if err != nil {
+ t.Errorf("TEST: %s", err.Error())
+ return
+ }
+ delete(mock.register, subId)
+ mock.subIds = append(mock.subIds, subId)
+ t.Logf("TEST: subscription removed from db. subId = %v", subId)
+}
+
+func TestReadNotExistingSubscriptionFromSdl(t *testing.T) {
+ t.Log("TestReadNotExistingSubscriptionFromSdl")
+
+ var subId uint32 = 0
+ subs, err := mainCtrl.c.ReadSubscriptionFromSdl(subId)
+ if err != nil {
+ t.Logf("TEST: subscription not found from db. subId = %v", subId)
+ return
+ }
+ t.Errorf("TEST: subscription read from db. %v", subs.String())
+ PrintSubscriptionData(t, subs)
+}
+
+func TestReadNotExistingSubscriptionFromSdl2(t *testing.T) {
+ t.Log("TestReadNotExistingSubscriptionFromSdl")
+
+ var subId uint32 = 7
+ subs, err := mainCtrl.c.ReadSubscriptionFromSdl(subId)
+ if err != nil {
+ t.Logf("TEST: subscription not found from db. subId = %v", subId)
+ return
+ }
+ t.Errorf("TEST: subscription read from db. %v", subs.String())
+ PrintSubscriptionData(t, subs)
+}
+
+func TestRemoveNotExistingSubscriptionFromSdl(t *testing.T) {
+ t.Log("TestRemoveNotExistingSubscriptionFromSdl")
+
+ var subId uint32 = 0
+ err := mainCtrl.c.RemoveSubscriptionFromSdl(subId)
+ if err != nil {
+ t.Logf("TEST: %s", err.Error())
+ return
+ }
+ t.Logf("TEST: subscription removed from db. subId = %v", subId)
+}
+
+func TestWriteSubscriptionsToSdl(t *testing.T) {
+ t.Log("TestWriteSubscriptionsToSdl")
+
+ // Write 1st subscription
+ subId := mock.AllocNextSubId()
+ t.Logf("TEST: Writing subId = %v\n", subId)
+ subs := GetSubscription(t, subId, subsResponse, "localhost:13560", "RAN_NAME_1", "123456")
+ PrintSubscriptionData(t, subs)
+ err := mainCtrl.c.WriteSubscriptionToSdl(subId, subs)
+ if err != nil {
+ t.Errorf("TEST: %s", err.Error())
+ return
+ }
+ t.Logf("TEST: subscription written in db = %v", subs.String())
+
+ // Write 2nd subscription
+ subId = mock.AllocNextSubId()
+ t.Logf("TEST:Writing subId = %v\n", subId)
+ subs = GetSubscription(t, subId, subsFailure, "localhost:13560", "RAN_NAME_2", "123457")
+ PrintSubscriptionData(t, subs)
+ err = mainCtrl.c.WriteSubscriptionToSdl(subId, subs)
+ if err != nil {
+ t.Errorf("TEST: %s", err.Error())
+ return
+ }
+ t.Logf("TEST: subscription written in db = %v", subs.String())
+
+ // Write 3rd subscription
+ subId = mock.AllocNextSubId()
+ t.Logf("TEST:Writing subId = %v\n", subId)
+ subs = GetSubscription(t, subId, noResponse, "localhost:13560", "RAN_NAME_3", "123458")
+ PrintSubscriptionData(t, subs)
+ err = mainCtrl.c.WriteSubscriptionToSdl(subId, subs)
+ if err != nil {
+ t.Errorf("TEST: %s", err.Error())
+ return
+ }
+ t.Logf("TEST: subscription written in db = %v", subs.String())
+}
+
+func TestReadSubscriptionsFromSdl(t *testing.T) {
+ t.Log("TestReadSubscriptionsFromSdl")
+
+ // Subscription with subId 1 was added and and removed above. Then subscriptions with subIds 2, 3 and 4 was added
+ // Db subscriptions should now contain subIDs 2, 3 and 4
+ var subId uint32
+ for subId = 2; subId <= 4; subId++ {
+ subs, err := mainCtrl.c.ReadSubscriptionFromSdl(subId)
+ if err != nil {
+ t.Errorf("TEST: %s", err.Error())
+ return
+ }
+ PrintSubscriptionData(t, subs)
+ }
+}
+
+func TestReadAllSubscriptionsFromSdl(t *testing.T) {
+ t.Log("TestReadAllSubscriptionsFromSdl")
+
+ // This test cases simulates submgr restart. SubIds and subscriptions are restored from db
+ // after initializing mock.subIds and mock.register
+ // var err error
+ subIds, register, err := mainCtrl.c.ReadAllSubscriptionsFromSdl()
+ if err != nil {
+ t.Errorf("TEST: %s", err.Error())
+ return
+ }
+ // for _, subs := range mock.register {
+ for _, subs := range register {
+ PrintSubscriptionData(t, subs)
+ }
+ // SubIds slices before and after restart can't be directly compared as original slice is not stored
+ // in the db. SubId values 1, 2, 3, 4 are already removed from the beginning of subIds slice above
+ // so far. Next free subId is 5 in the beginning of mock.subIds slice. The db contains now however only
+ // 3 subscriptions with subIds 2, 3 and 4, so only subId values 2, 3, 4 are removed from the returned
+ // subIds slice and there next free value is 1
+ assert.Equal(t, uint32(0x1), subIds[0])
+}
+
+func TestRemoveAllSubscriptionsFromSdl(t *testing.T) {
+ t.Log("TestRemoveAllSubscriptionsFromSdl")
+
+ err := mainCtrl.c.RemoveAllSubscriptionsFromSdl()
+ if err != nil {
+ t.Errorf("TEST: %s", err.Error())
+ return
+ }
+ t.Log("TEST: All subscription removed from db")
+}
+
+func TestReadAllSubscriptionsFromSdl2(t *testing.T) {
+ t.Log("TestReadAllSubscriptionsFromSdl2")
+
+ // This test cases simulates submgr startup. SubIds and subscriptions are restored from empty db
+ // after initializing mock.subIds and mock.register
+ subIds, register, err := mainCtrl.c.ReadAllSubscriptionsFromSdl()
+ if err != nil {
+ t.Errorf("TEST: %s", err.Error())
+ return
+ }
+ for _, subs := range mock.register {
+ PrintSubscriptionData(t, subs)
+ }
+ assert.Equal(t, len(subIds), 65534)
+ assert.Equal(t, len(register), 0)
+}
+
+func (m *Mock) Set(pairs ...interface{}) error {
+ var key string
+ var val string
+
+ for _, v := range pairs {
+ reflectType := reflect.TypeOf(v)
+ switch reflectType.Kind() {
+ case reflect.Slice:
+ val = fmt.Sprintf("%s", v.([]uint8))
+ default:
+ switch v.(type) {
+ case string:
+ key = v.(string)
+ default:
+ return fmt.Errorf("Set() error: Unexpected type\n")
+ }
+ }
+ }
+
+ if key != "" {
+ m.subsDB[key] = val
+ subId := m.subIds[0]
+ subscriptionInfo := &SubscriptionInfo{}
+ err := json.Unmarshal([]byte(val), subscriptionInfo)
+ if err != nil {
+ return fmt.Errorf("Set() json.unmarshal error: %s\n", err.Error())
+ }
+
+ subs := mainCtrl.c.CreateSubscription(subscriptionInfo, &val)
+ m.register[subId] = subs
+ m.subIds = m.subIds[1:]
+ } else {
+ return fmt.Errorf("Set() error: key == ''\n")
+ }
+ return nil
+}
+
+func (m *Mock) Get(keys []string) (map[string]interface{}, error) {
+ retMap := make(map[string]interface{})
+ if len(keys) == 0 {
+ return nil, fmt.Errorf("Get() error: len(key) == 0\n")
+ }
+
+ for _, key := range keys {
+ if key != "" {
+ retMap[key] = m.subsDB[key]
+ } else {
+ return nil, fmt.Errorf("Get() error: key == ''\n")
+ }
+ }
+ return retMap, nil
+}
+
+func (m *Mock) GetAll() ([]string, error) {
+
+ keys := []string{}
+ for key, _ := range m.subsDB {
+ keys = append(keys, key)
+ }
+ return keys, nil
+}
+
+func (m *Mock) Remove(keys []string) error {
+ if len(keys) == 0 {
+ return fmt.Errorf("Remove() error: len(key) == 0\n")
+ }
+ subId64, err := strconv.ParseUint(keys[0], 10, 64)
+ if err != nil {
+ return fmt.Errorf("Remove() ParseUint() error: %s\n", err.Error())
+ }
+ subId := uint32(subId64)
+ delete(m.subsDB, keys[0])
+ delete(m.register, subId)
+ m.subIds = append(m.subIds, subId)
+ return nil
+}
+
+func (m *Mock) RemoveAll() error {
+ for key := range m.subsDB {
+ subId64, err := strconv.ParseUint(key, 10, 64)
+ if err != nil {
+ return fmt.Errorf("RemoveAll() ParseUint() error: %s\n", err.Error())
+ }
+ subId := uint32(subId64)
+ delete(m.subsDB, key)
+ delete(m.register, subId)
+ m.subIds = append(m.subIds, subId)
+ }
+ return nil
+}
diff --git a/pkg/control/subscription.go b/pkg/control/subscription.go
index 9342ccc..ec6f67a 100644
--- a/pkg/control/subscription.go
+++ b/pkg/control/subscription.go
@@ -22,6 +22,7 @@
import (
"gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
+
//"reflect"
"sync"
)
@@ -30,16 +31,21 @@
//
//-----------------------------------------------------------------------------
type Subscription struct {
- mutex sync.Mutex // Lock
- valid bool // valid
- registry *Registry // Registry
- ReqId RequestId // ReqId (Requestor Id + Seq Nro a.k.a subsid)
- Meid *xapp.RMRMeid // Meid/ RanName
- EpList xapp.RmrEndpointList // Endpoints
- TransLock sync.Mutex // Lock transactions, only one executed per time for subs
- TheTrans TransactionIf // Ongoing transaction
- SubReqMsg *e2ap.E2APSubscriptionRequest // Subscription information
- SubRFMsg interface{} // Subscription information
+ mutex sync.Mutex // Lock
+ valid bool // valid
+ registry *Registry // Registry
+ ReqId RequestId // ReqId (Requestor Id + Seq Nro a.k.a subsid)
+ Meid *xapp.RMRMeid // Meid/ RanName
+ EpList xapp.RmrEndpointList // Endpoints
+ TransLock sync.Mutex // Lock transactions, only one executed per time for subs
+ TheTrans TransactionIf // Ongoing transaction
+ SubReqMsg *e2ap.E2APSubscriptionRequest // Subscription information
+ SubRFMsg interface{} // Subscription information
+ RetryFromXapp bool // Retry form xApp for subscription that already exist
+ SubRespRcvd bool // Subscription response received
+ DeleteFromDb bool // Delete subscription form db
+ NoRespToXapp bool // Send no response for subscription delete to xApp after restart
+ DoNotWaitSubResp bool // Test flag. Response is not waited for Subscription Request
}
func (s *Subscription) String() string {
diff --git a/pkg/control/types.go b/pkg/control/types.go
index 08bcda4..c8c09dc 100644
--- a/pkg/control/types.go
+++ b/pkg/control/types.go
@@ -33,3 +33,11 @@
func (rid *RequestId) String() string {
return "reqid(" + rid.RequestId.String() + ")"
}
+
+type Sdlnterface interface {
+ Set(pairs ...interface{}) error
+ Get(keys []string) (map[string]interface{}, error)
+ GetAll() ([]string, error)
+ Remove(keys []string) error
+ RemoveAll() error
+}
diff --git a/pkg/control/ut_ctrl_submgr_test.go b/pkg/control/ut_ctrl_submgr_test.go
index 85cbe8f..f253424 100644
--- a/pkg/control/ut_ctrl_submgr_test.go
+++ b/pkg/control/ut_ctrl_submgr_test.go
@@ -20,7 +20,9 @@
package control
import (
+ "fmt"
"gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/teststub"
+ "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
"testing"
"time"
@@ -38,6 +40,8 @@
mainCtrl = &testingSubmgrControl{}
mainCtrl.RmrControl.Init("SUBMGRCTL", srcId, rtgSvc)
mainCtrl.c = NewControl()
+ xapp.Logger.Debug("Replacing real db with test db")
+ mainCtrl.c.db = CreateMock() // This overrides real database for testing
xapp.SetReadyCB(mainCtrl.ReadyCB, nil)
go xapp.RunWithParams(mainCtrl.c, false)
mainCtrl.WaitCB()
@@ -45,6 +49,59 @@
return mainCtrl
}
+func (mc *testingSubmgrControl) SimulateRestart(t *testing.T) {
+ mc.TestLog(t, "Simulating submgr restart")
+ mainCtrl.c.registry.subIds = nil
+ // Initialize subIds slice and subscription map
+ mainCtrl.c.registry.Initialize()
+ // Read subIds and subscriptions from database
+ subIds, register, err := mainCtrl.c.ReadAllSubscriptionsFromSdl()
+ if err != nil {
+ mc.TestError(t, "%v", err)
+ } else {
+ mainCtrl.c.registry.register = nil
+ mainCtrl.c.registry.subIds = subIds
+ mainCtrl.c.registry.register = register
+
+ fmt.Println("register:")
+ for subId, subs := range register {
+ fmt.Println(" subId", subId)
+ fmt.Println(" subs.SubRespRcvd", subs.SubRespRcvd)
+ fmt.Printf(" subs %v\n", subs)
+ }
+
+ fmt.Println("mainCtrl.c.registry.register:")
+ for subId, subs := range mainCtrl.c.registry.register {
+ fmt.Println(" subId", subId)
+ fmt.Println(" subs.SubRespRcvd", subs.SubRespRcvd)
+ fmt.Printf(" subs %v\n", subs)
+ }
+ }
+ go mainCtrl.c.HandleUncompletedSubscriptions(mainCtrl.c.registry.register)
+}
+
+func (mc *testingSubmgrControl) SetResetTestFlag(t *testing.T, status bool) {
+ mc.TestLog(t, "ResetTestFlag set to %v", status)
+ mainCtrl.c.ResetTestFlag = status
+}
+
+func (mc *testingSubmgrControl) removeExistingSubscriptions(t *testing.T) {
+
+ mc.TestLog(t, "Removing existing subscriptions")
+ mainCtrl.c.RemoveAllSubscriptionsFromSdl()
+ mainCtrl.c.registry.subIds = nil
+ // Initialize subIds slice and subscription map
+ mainCtrl.c.registry.Initialize()
+}
+
+func PringSubscriptionQueryResult(resp models.SubscriptionList) {
+ for _, item := range resp {
+ fmt.Printf("item.SubscriptionID %v\n", item.SubscriptionID)
+ fmt.Printf("item.Meid %v\n", item.Meid)
+ fmt.Printf("item.Endpoint %v\n", item.Endpoint)
+ }
+}
+
func (mc *testingSubmgrControl) wait_registry_empty(t *testing.T, secs int) bool {
cnt := int(0)
i := 1
diff --git a/pkg/control/ut_messaging_test.go b/pkg/control/ut_messaging_test.go
index 6068d66..f559f91 100644
--- a/pkg/control/ut_messaging_test.go
+++ b/pkg/control/ut_messaging_test.go
@@ -1721,3 +1721,248 @@
e2termConn1.TestMsgChanEmpty(t)
mainCtrl.wait_registry_empty(t, 10)
}
+
+//-----------------------------------------------------------------------------
+// TestSubReqNokAndSubDelOkWithRestartInMiddle
+//
+// stub stub
+// +-------+ +---------+ +---------+
+// | xapp | | submgr | | e2term |
+// +-------+ +---------+ +---------+
+// | | |
+// | SubReq | |
+// |------------->| |
+// | | |
+// | | SubReq |
+// | |------------->|
+// | | |
+// | | SubResp |
+// | <----|
+// | |
+// | Submgr restart |
+// | |
+// | | |
+// | | SubDelReq |
+// | |------------->|
+// | | |
+// | | SubDelResp |
+// | |<-------------|
+// | | |
+//
+//-----------------------------------------------------------------------------
+
+func TestSubReqNokAndSubDelOkWithRestartInMiddle(t *testing.T) {
+ CaseBegin("TestSubReqNokAndSubDelOkWithRestartInMiddle")
+
+ // Remove possible existing subscrition
+ mainCtrl.removeExistingSubscriptions(t)
+
+ mainCtrl.SetResetTestFlag(t, true) // subs.DoNotWaitSubResp will be set TRUE for the subscription
+ xappConn1.SendSubsReq(t, nil, nil)
+ e2termConn1.RecvSubsReq(t)
+ mainCtrl.SetResetTestFlag(t, false)
+
+ resp, _ := xapp.Subscription.QuerySubscriptions()
+ assert.Equal(t, resp[0].Meid, "RAN_NAME_1")
+ assert.Equal(t, resp[0].Endpoint, []string{"localhost:13560"})
+ e2SubsId := uint32(resp[0].SubscriptionID)
+ t.Logf("e2SubsId = %v", e2SubsId)
+
+ mainCtrl.SimulateRestart(t) // This will trigger sending of SubDelReq
+
+ delreq, delmsg := e2termConn1.RecvSubsDelReq(t)
+ e2termConn1.SendSubsDelResp(t, delreq, delmsg)
+
+ // Wait that subs is cleaned
+ mainCtrl.wait_subs_clean(t, e2SubsId, 10)
+
+ xappConn1.TestMsgChanEmpty(t)
+ xappConn2.TestMsgChanEmpty(t)
+ e2termConn1.TestMsgChanEmpty(t)
+ mainCtrl.wait_registry_empty(t, 10)
+}
+
+//-----------------------------------------------------------------------------
+// TestSubReqAndSubDelOkWithRestartInMiddle
+//
+// stub stub
+// +-------+ +---------+ +---------+
+// | xapp | | submgr | | e2term |
+// +-------+ +---------+ +---------+
+// | | |
+// | SubReq | |
+// |------------->| |
+// | | |
+// | | SubReq |
+// | |------------->|
+// | | |
+// | | SubResp |
+// | |<-------------|
+// | | |
+// | SubResp | |
+// |<-------------| |
+// | | |
+// | |
+// | Submgr restart |
+// | |
+// | SubDelReq | |
+// |------------->| |
+// | | |
+// | | SubDelReq |
+// | |------------->|
+// | | |
+// | | SubDelResp |
+// | |<-------------|
+// | | |
+// | SubDelResp | |
+// |<-------------| |
+//
+//-----------------------------------------------------------------------------
+
+func TestSubReqAndSubDelOkWithRestartInMiddle(t *testing.T) {
+ CaseBegin("TestSubReqAndSubDelOkWithRestartInMiddle")
+
+ cretrans := xappConn1.SendSubsReq(t, nil, nil)
+ crereq, cremsg := e2termConn1.RecvSubsReq(t)
+ e2termConn1.SendSubsResp(t, crereq, cremsg)
+ e2SubsId := xappConn1.RecvSubsResp(t, cretrans)
+
+ // Check subscription
+ resp, _ := xapp.Subscription.QuerySubscriptions()
+ assert.Equal(t, resp[0].SubscriptionID, int64(e2SubsId))
+ assert.Equal(t, resp[0].Meid, "RAN_NAME_1")
+ assert.Equal(t, resp[0].Endpoint, []string{"localhost:13560"})
+
+ mainCtrl.SimulateRestart(t)
+
+ // Check that subscription is restored correctly after restart
+ resp, _ = xapp.Subscription.QuerySubscriptions()
+ assert.Equal(t, resp[0].SubscriptionID, int64(e2SubsId))
+ assert.Equal(t, resp[0].Meid, "RAN_NAME_1")
+ assert.Equal(t, resp[0].Endpoint, []string{"localhost:13560"})
+
+ deltrans := xappConn1.SendSubsDelReq(t, nil, e2SubsId)
+ delreq, delmsg := e2termConn1.RecvSubsDelReq(t)
+ e2termConn1.SendSubsDelResp(t, delreq, delmsg)
+ xappConn1.RecvSubsDelResp(t, deltrans)
+
+ //Wait that subs is cleaned
+ mainCtrl.wait_subs_clean(t, e2SubsId, 10)
+
+ xappConn1.TestMsgChanEmpty(t)
+ xappConn2.TestMsgChanEmpty(t)
+ e2termConn1.TestMsgChanEmpty(t)
+ mainCtrl.wait_registry_empty(t, 10)
+}
+
+//-----------------------------------------------------------------------------
+// TestSubReqAndSubDelOkSameActionWithRestartsInMiddle
+//
+// stub stub
+// +-------+ +-------+ +---------+ +---------+
+// | xapp2 | | xapp1 | | submgr | | e2term |
+// +-------+ +-------+ +---------+ +---------+
+// | | | |
+// | | | |
+// | | | |
+// | | SubReq1 | |
+// | |------------->| |
+// | | | |
+// | | | SubReq1 |
+// | | |------------->|
+// | | | SubResp1 |
+// | | |<-------------|
+// | | SubResp1 | |
+// | |<-------------| |
+// | | | |
+// | |
+// | submgr restart |
+// | |
+// | | | |
+// | | | |
+// | SubReq2 | |
+// |--------------------------->| |
+// | | | |
+// | SubResp2 | |
+// |<---------------------------| |
+// | | | |
+// | | SubDelReq 1 | |
+// | |------------->| |
+// | | | |
+// | | SubDelResp 1 | |
+// | |<-------------| |
+// | | | |
+// | | | |
+// | |
+// | submgr restart |
+// | |
+// | | | |
+// | SubDelReq 2 | |
+// |--------------------------->| |
+// | | | |
+// | | | SubDelReq 2 |
+// | | |------------->|
+// | | | |
+// | | | SubDelReq 2 |
+// | | |------------->|
+// | | | |
+// | SubDelResp 2 | |
+// |<---------------------------| |
+//
+//-----------------------------------------------------------------------------
+
+func TestSubReqAndSubDelOkSameActionWithRestartsInMiddle(t *testing.T) {
+ CaseBegin("TestSubReqAndSubDelOkSameActionWithRestartsInMiddle")
+
+ //Req1
+ rparams1 := &teststube2ap.E2StubSubsReqParams{}
+ rparams1.Init()
+ cretrans1 := xappConn1.SendSubsReq(t, rparams1, nil)
+ crereq1, cremsg1 := e2termConn1.RecvSubsReq(t)
+ e2termConn1.SendSubsResp(t, crereq1, cremsg1)
+ e2SubsId1 := xappConn1.RecvSubsResp(t, cretrans1)
+
+ //Req2
+ rparams2 := &teststube2ap.E2StubSubsReqParams{}
+ rparams2.Init()
+ cretrans2 := xappConn2.SendSubsReq(t, rparams2, nil)
+ e2SubsId2 := xappConn2.RecvSubsResp(t, cretrans2)
+
+ // Check subscription
+ resp, _ := xapp.Subscription.QuerySubscriptions() ////////////////////////////////
+ assert.Equal(t, resp[0].SubscriptionID, int64(e2SubsId1))
+ assert.Equal(t, resp[0].Meid, "RAN_NAME_1")
+ assert.Equal(t, resp[0].Endpoint, []string{"localhost:13560", "localhost:13660"})
+
+ mainCtrl.SimulateRestart(t)
+
+ // Check that subscription is restored correctly after restart
+ resp, _ = xapp.Subscription.QuerySubscriptions()
+ assert.Equal(t, resp[0].SubscriptionID, int64(e2SubsId1))
+ assert.Equal(t, resp[0].Meid, "RAN_NAME_1")
+ assert.Equal(t, resp[0].Endpoint, []string{"localhost:13560", "localhost:13660"})
+
+ //Del1
+ deltrans1 := xappConn1.SendSubsDelReq(t, nil, e2SubsId1)
+ xapp.Logger.Debug("xappConn1.RecvSubsDelResp")
+ xappConn1.RecvSubsDelResp(t, deltrans1)
+ xapp.Logger.Debug("xappConn1.RecvSubsDelResp received")
+
+ mainCtrl.SimulateRestart(t)
+ xapp.Logger.Debug("mainCtrl.SimulateRestart done")
+
+ //Del2
+ deltrans2 := xappConn2.SendSubsDelReq(t, nil, e2SubsId2)
+ delreq2, delmsg2 := e2termConn1.RecvSubsDelReq(t)
+
+ e2termConn1.SendSubsDelResp(t, delreq2, delmsg2)
+ xappConn2.RecvSubsDelResp(t, deltrans2)
+
+ //Wait that subs is cleaned
+ mainCtrl.wait_subs_clean(t, e2SubsId2, 10)
+
+ xappConn1.TestMsgChanEmpty(t)
+ xappConn2.TestMsgChanEmpty(t)
+ e2termConn1.TestMsgChanEmpty(t)
+ mainCtrl.wait_registry_empty(t, 10)
+}