RICPLT-2965 Unittest to support multiple xappconns
Change-Id: I4a7afe59b62d4628afdf4e5434c143eb945ea5fc
Signed-off-by: Juha Hyttinen <juha.hyttinen@nokia.com>
diff --git a/pkg/control/main_test.go b/pkg/control/main_test.go
index 2185d53..1c69b01 100644
--- a/pkg/control/main_test.go
+++ b/pkg/control/main_test.go
@@ -20,12 +20,16 @@
package control
import (
+ "encoding/json"
"errors"
"fmt"
+ "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_models"
"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
"io/ioutil"
"net/http"
"os"
+ "strconv"
+ "strings"
"testing"
"time"
)
@@ -44,19 +48,27 @@
return
}
+func (tc *testingControl) WaitCB() {
+ <-tc.syncChan
+}
+
+func initTestingControl(desc string, rtfile string, port string) testingControl {
+ tc := testingControl{}
+ os.Setenv("RMR_SEED_RT", rtfile)
+ os.Setenv("RMR_SRC_ID", "localhost:"+port)
+ xapp.Logger.Info("Using rt file %s", os.Getenv("RMR_SEED_RT"))
+ xapp.Logger.Info("Using src id %s", os.Getenv("RMR_SRC_ID"))
+ tc.desc = strings.ToUpper(desc)
+ tc.syncChan = make(chan struct{})
+ return tc
+}
+
//-----------------------------------------------------------------------------
//
//-----------------------------------------------------------------------------
type testingRmrControl struct {
testingControl
rmrClientTest *xapp.RMRClient
- rmrConChan chan *xapp.RMRParams
-}
-
-func (tc *testingRmrControl) Consume(msg *xapp.RMRParams) (err error) {
- xapp.Logger.Info("testingRmrControl(%s) Consume", tc.desc)
- tc.rmrConChan <- msg
- return
}
func (tc *testingRmrControl) RmrSend(params *xapp.RMRParams) (err error) {
@@ -79,20 +91,88 @@
return
}
-func createNewRmrControl(desc string, rtfile string, port string, stat string) *testingRmrControl {
- os.Setenv("RMR_SEED_RT", rtfile)
- os.Setenv("RMR_SRC_ID", "localhost:"+port)
- xapp.Logger.Info("Using rt file %s", os.Getenv("RMR_SEED_RT"))
- xapp.Logger.Info("Using src id %s", os.Getenv("RMR_SRC_ID"))
- newConn := &testingRmrControl{}
- newConn.desc = desc
- newConn.syncChan = make(chan struct{})
- newConn.rmrClientTest = xapp.NewRMRClientWithParams("tcp:"+port, 4096, 1, stat)
- newConn.rmrConChan = make(chan *xapp.RMRParams)
- newConn.rmrClientTest.SetReadyCB(newConn.ReadyCB, nil)
- go newConn.rmrClientTest.Start(newConn)
- <-newConn.syncChan
- return newConn
+func initTestingRmrControl(desc string, rtfile string, port string, stat string, consumer xapp.MessageConsumer) testingRmrControl {
+ tc := testingRmrControl{}
+ tc.testingControl = initTestingControl(desc, rtfile, port)
+ tc.rmrClientTest = xapp.NewRMRClientWithParams("tcp:"+port, 4096, 1, stat)
+ tc.rmrClientTest.SetReadyCB(tc.ReadyCB, nil)
+ go tc.rmrClientTest.Start(consumer)
+ tc.WaitCB()
+ return tc
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+type testingMessageChannel struct {
+ rmrConChan chan *xapp.RMRParams
+}
+
+func initTestingMessageChannel() testingMessageChannel {
+ mc := testingMessageChannel{}
+ mc.rmrConChan = make(chan *xapp.RMRParams)
+ return mc
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+
+type testingXappControl struct {
+ testingRmrControl
+ testingMessageChannel
+ meid *xapp.RMRMeid
+ xid_seq uint64
+ xid string
+}
+
+func (tc *testingXappControl) newXid() string {
+ tc.xid = tc.desc + "_XID_" + strconv.FormatUint(uint64(tc.xid_seq), 10)
+ tc.xid_seq++
+ return tc.xid
+}
+
+func (tc *testingXappControl) Consume(msg *xapp.RMRParams) (err error) {
+
+ //if msg.Xid == tc.xid {
+ if strings.Contains(msg.Xid, tc.desc) {
+ xapp.Logger.Info("(%s) Consume mtype=%s subid=%d xid=%s", tc.desc, xapp.RicMessageTypeToName[msg.Mtype], msg.SubId, msg.Xid)
+ tc.rmrConChan <- msg
+ } else {
+ xapp.Logger.Info("(%s) Ignore mtype=%s subid=%d xid=%s, Expected xid to contain %s", tc.desc, xapp.RicMessageTypeToName[msg.Mtype], msg.SubId, msg.Xid, tc.desc)
+ }
+ return
+}
+
+func createNewXappControl(desc string, rtfile string, port string, stat string, ranname string) *testingXappControl {
+ xappCtrl := &testingXappControl{}
+ xappCtrl.testingRmrControl = initTestingRmrControl(desc, rtfile, port, stat, xappCtrl)
+ xappCtrl.testingMessageChannel = initTestingMessageChannel()
+ xappCtrl.meid = &xapp.RMRMeid{RanName: ranname}
+ xappCtrl.xid_seq = 0
+ xappCtrl.newXid()
+ return xappCtrl
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+type testingE2termControl struct {
+ testingRmrControl
+ testingMessageChannel
+}
+
+func (tc *testingE2termControl) Consume(msg *xapp.RMRParams) (err error) {
+ xapp.Logger.Info("(%s) Consume mtype=%s subid=%d xid=%s", tc.desc, xapp.RicMessageTypeToName[msg.Mtype], msg.SubId, msg.Xid)
+ tc.rmrConChan <- msg
+ return
+}
+
+func createNewE2termControl(desc string, rtfile string, port string, stat string) *testingE2termControl {
+ e2termCtrl := &testingE2termControl{}
+ e2termCtrl.testingRmrControl = initTestingRmrControl(desc, rtfile, port, stat, e2termCtrl)
+ e2termCtrl.testingMessageChannel = initTestingMessageChannel()
+ return e2termCtrl
}
//-----------------------------------------------------------------------------
@@ -103,15 +183,14 @@
c *Control
}
-func (mc *testingMainControl) wait_subs_clean(e2SubsId int, secs int) bool {
- i := 1
- for ; i <= secs*2; i++ {
- if mc.c.registry.IsValidSequenceNumber(uint16(e2SubsId)) == false {
- return true
- }
- time.Sleep(500 * time.Millisecond)
- }
- return false
+func createNewMainControl(desc string, rtfile string, port string) *testingMainControl {
+ mainCtrl = &testingMainControl{}
+ mainCtrl.testingControl = initTestingControl(desc, rtfile, port)
+ mainCtrl.c = NewControl()
+ xapp.SetReadyCB(mainCtrl.ReadyCB, nil)
+ go xapp.RunWithParams(mainCtrl.c, false)
+ mainCtrl.WaitCB()
+ return mainCtrl
}
//-----------------------------------------------------------------------------
@@ -123,6 +202,11 @@
t.Errorf(fmt.Sprintf(pattern, args...))
}
+func testLog(t *testing.T, pattern string, args ...interface{}) {
+ xapp.Logger.Info(fmt.Sprintf(pattern, args...))
+ t.Logf(fmt.Sprintf(pattern, args...))
+}
+
func testCreateTmpFile(str string) (string, error) {
file, err := ioutil.TempFile("/tmp", "*.rt")
if err != nil {
@@ -140,8 +224,9 @@
//
//-----------------------------------------------------------------------------
-var xappConn *testingRmrControl
-var e2termConn *testingRmrControl
+var xappConn1 *testingXappControl
+var xappConn2 *testingXappControl
+var e2termConn *testingE2termControl
var mainCtrl *testingMainControl
func TestMain(m *testing.M) {
@@ -184,7 +269,10 @@
xapp.Logger.Info("Using cfg file %s", os.Getenv("CFG_FILE"))
//---------------------------------
- //
+ // NOTE: Routing table is configured so, that responses
+ // are duplicated to xapp1 and xapp2 instances.
+ // If XID is not matching xapp stub will just
+ // drop message. (Messages 12011, 12012, 12021, 12022)
//---------------------------------
xapp.Logger.Info("### submgr main run ###")
@@ -192,40 +280,27 @@
mse|12010|-1|localhost:14560
mse|12010,localhost:14560|-1|localhost:15560
mse|12011,localhost:15560|-1|localhost:14560
-mse|12011|-1|localhost:13560
+mse|12011|-1|localhost:13560;localhost:13660
mse|12012,localhost:15560|-1|localhost:14560
-mse|12012|-1|localhost:13560
+mse|12012|-1|localhost:13560;localhost:13660
mse|12020|-1|localhost:14560
mse|12020,localhost:14560|-1|localhost:15560
mse|12021,localhost:15560|-1|localhost:14560
-mse|12021|-1|localhost:13560
+mse|12021|-1|localhost:13560;localhost:13660
mse|12022,localhost:15560|-1|localhost:14560
-mse|12022|-1|localhost:13560
+mse|12022|-1|localhost:13560;localhost:13660
newrt|end
`
-
subrtfilename, _ := testCreateTmpFile(subsrt)
defer os.Remove(subrtfilename)
- os.Setenv("RMR_SEED_RT", subrtfilename)
- os.Setenv("RMR_SRC_ID", "localhost:14560")
- xapp.Logger.Info("Using rt file %s", os.Getenv("RMR_SEED_RT"))
- xapp.Logger.Info("Using src id %s", os.Getenv("RMR_SRC_ID"))
-
- mainCtrl = &testingMainControl{}
- mainCtrl.desc = "main"
- mainCtrl.syncChan = make(chan struct{})
-
- mainCtrl.c = NewControl()
- xapp.SetReadyCB(mainCtrl.ReadyCB, nil)
- go xapp.RunWithParams(mainCtrl.c, false)
- <-mainCtrl.syncChan
+ mainCtrl = createNewMainControl("main", subrtfilename, "14560")
//---------------------------------
//
//---------------------------------
- xapp.Logger.Info("### xapp rmr run ###")
+ xapp.Logger.Info("### xapp1 rmr run ###")
- xapprt := `newrt|start
+ xapprt1 := `newrt|start
mse|12010|-1|localhost:14560
mse|12011|-1|localhost:13560
mse|12012|-1|localhost:13560
@@ -235,9 +310,29 @@
newrt|end
`
- xapprtfilename, _ := testCreateTmpFile(xapprt)
- defer os.Remove(xapprtfilename)
- xappConn = createNewRmrControl("xappConn", xapprtfilename, "13560", "RMRXAPPSTUB")
+ xapprtfilename1, _ := testCreateTmpFile(xapprt1)
+ defer os.Remove(xapprtfilename1)
+ xappConn1 = createNewXappControl("xappConn1", xapprtfilename1, "13560", "RMRXAPP1STUB", "RAN_NAME_1")
+
+ //---------------------------------
+ //
+ //---------------------------------
+
+ xapp.Logger.Info("### xapp2 rmr run ###")
+
+ xapprt2 := `newrt|start
+mse|12010|-1|localhost:14560
+mse|12011|-1|localhost:13660
+mse|12012|-1|localhost:13660
+mse|12020|-1|localhost:14560
+mse|12021|-1|localhost:13660
+mse|12022|-1|localhost:13660
+newrt|end
+`
+
+ xapprtfilename2, _ := testCreateTmpFile(xapprt2)
+ defer os.Remove(xapprtfilename2)
+ xappConn2 = createNewXappControl("xappConn2", xapprtfilename2, "13660", "RMRXAPP2STUB", "RAN_NAME_1")
//---------------------------------
//
@@ -256,13 +351,19 @@
e2termrtfilename, _ := testCreateTmpFile(e2termrt)
defer os.Remove(e2termrtfilename)
- e2termConn = createNewRmrControl("e2termConn", e2termrtfilename, "15560", "RMRE2TERMSTUB")
+ e2termConn = createNewE2termControl("e2termConn", e2termrtfilename, "15560", "RMRE2TERMSTUB")
//---------------------------------
//
//---------------------------------
http_handler := func(w http.ResponseWriter, r *http.Request) {
- xapp.Logger.Info("(http handler) handling")
+ var req rtmgr_models.XappSubscriptionData
+ err := json.NewDecoder(r.Body).Decode(&req)
+ if err != nil {
+ xapp.Logger.Error("%s", err.Error())
+ }
+ xapp.Logger.Info("(http handler) handling Address=%s Port=%d SubscriptionID=%d", *req.Address, *req.Port, *req.SubscriptionID)
+
w.WriteHeader(200)
}
diff --git a/pkg/control/messaging_test.go b/pkg/control/messaging_test.go
index 7308563..b8ed6e6 100644
--- a/pkg/control/messaging_test.go
+++ b/pkg/control/messaging_test.go
@@ -37,7 +37,29 @@
//-----------------------------------------------------------------------------
//
//-----------------------------------------------------------------------------
-func createSubsReq() *e2ap.E2APSubscriptionRequest {
+type xappTransaction struct {
+ xappConn *testingXappControl
+ xid string
+}
+
+func newXappTransaction(xappConn *testingXappControl) {
+ trans := &xappTransaction{}
+ trans.xappConn = xappConn
+ trans.xid = xappConn.newXid()
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+func (xappConn *testingXappControl) handle_xapp_subs_req(t *testing.T, updseq bool) {
+ xapp.Logger.Info("handle_xapp_subs_req")
+ e2SubsReq := e2asnpacker.NewPackerSubscriptionRequest()
+
+ //---------------------------------
+ // xapp activity: Send Subs Req
+ //---------------------------------
+ xapp.Logger.Info("(%s) Send Subs Req", xappConn.desc)
+
req := &e2ap.E2APSubscriptionRequest{}
req.RequestId.Id = 1
@@ -66,13 +88,187 @@
req.ActionSetups[0].SubsequentAction.Type = e2ap.E2AP_SubSeqActionTypeContinue
req.ActionSetups[0].SubsequentAction.TimetoWait = e2ap.E2AP_TimeToWaitZero
- return req
+ e2SubsReq.Set(req)
+ xapp.Logger.Debug("%s", e2SubsReq.String())
+ err, packedMsg := e2SubsReq.Pack(nil)
+ if err != nil {
+ testError(t, "(%s) pack NOK %s", xappConn.desc, err.Error())
+ }
+
+ params := &xapp.RMRParams{}
+ params.Mtype = xapp.RIC_SUB_REQ
+ params.SubId = -1
+ params.Payload = packedMsg.Buf
+ params.Meid = xappConn.meid
+ if updseq {
+ xappConn.newXid()
+ }
+ params.Xid = xappConn.xid
+ params.Mbuf = nil
+
+ snderr := xappConn.RmrSend(params)
+ if snderr != nil {
+ testError(t, "(%s) RMR SEND FAILED: %s", xappConn.desc, snderr.Error())
+ }
}
//-----------------------------------------------------------------------------
//
//-----------------------------------------------------------------------------
-func createSubsResp(req *e2ap.E2APSubscriptionRequest) *e2ap.E2APSubscriptionResponse {
+func (xappConn *testingXappControl) handle_xapp_subs_resp(t *testing.T) int {
+ xapp.Logger.Info("handle_xapp_subs_resp")
+ e2SubsResp := e2asnpacker.NewPackerSubscriptionResponse()
+ var e2SubsId int
+
+ //---------------------------------
+ // xapp activity: Recv Subs Resp
+ //---------------------------------
+ select {
+ case msg := <-xappConn.rmrConChan:
+ if msg.Mtype != xapp.RICMessageTypes["RIC_SUB_RESP"] {
+ testError(t, "(%s) Received wrong mtype expected %s got %s, error", xappConn.desc, "RIC_SUB_RESP", xapp.RicMessageTypeToName[msg.Mtype])
+ return -1
+ } else {
+ packedData := &packer.PackedData{}
+ packedData.Buf = msg.Payload
+ e2SubsId = msg.SubId
+ unpackerr := e2SubsResp.UnPack(packedData)
+
+ if unpackerr != nil {
+ testError(t, "(%s) RIC_SUB_RESP unpack failed err: %s", xappConn.desc, unpackerr.Error())
+ }
+ geterr, resp := e2SubsResp.Get()
+ if geterr != nil {
+ testError(t, "(%s) RIC_SUB_RESP get failed err: %s", xappConn.desc, geterr.Error())
+ }
+
+ xapp.Logger.Info("(%s) Recv Subs Resp rmr: xid=%s subid=%d, asn: seqnro=%d", xappConn.desc, msg.Xid, msg.SubId, resp.RequestId.Seq)
+ return e2SubsId
+ }
+ case <-time.After(15 * time.Second):
+ testError(t, "(%s) Not Received RIC_SUB_RESP within 15 secs", xappConn.desc)
+ return -1
+ }
+ return -1
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+func (xappConn *testingXappControl) handle_xapp_subs_del_req(t *testing.T, updseq bool, e2SubsId int) {
+ xapp.Logger.Info("handle_xapp_subs_del_req")
+ e2SubsDelReq := e2asnpacker.NewPackerSubscriptionDeleteRequest()
+
+ //---------------------------------
+ // xapp activity: Send Subs Del Req
+ //---------------------------------
+ xapp.Logger.Info("(%s) Send Subs Del Req", xappConn.desc)
+
+ req := &e2ap.E2APSubscriptionDeleteRequest{}
+ req.RequestId.Id = 1
+ req.RequestId.Seq = uint32(e2SubsId)
+ req.FunctionId = 1
+
+ e2SubsDelReq.Set(req)
+ xapp.Logger.Debug("%s", e2SubsDelReq.String())
+ err, packedMsg := e2SubsDelReq.Pack(nil)
+ if err != nil {
+ testError(t, "(%s) pack NOK %s", xappConn.desc, err.Error())
+ }
+
+ params := &xapp.RMRParams{}
+ params.Mtype = xapp.RIC_SUB_DEL_REQ
+ params.SubId = e2SubsId
+ params.Payload = packedMsg.Buf
+ params.Meid = xappConn.meid
+ if updseq {
+ xappConn.newXid()
+ }
+ params.Xid = xappConn.xid
+ params.Mbuf = nil
+
+ snderr := xappConn.RmrSend(params)
+ if snderr != nil {
+ testError(t, "(%s) RMR SEND FAILED: %s", xappConn.desc, snderr.Error())
+ }
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+func (xappConn *testingXappControl) handle_xapp_subs_del_resp(t *testing.T) {
+ xapp.Logger.Info("handle_xapp_subs_del_resp")
+ e2SubsDelResp := e2asnpacker.NewPackerSubscriptionDeleteResponse()
+
+ //---------------------------------
+ // xapp activity: Recv Subs Del Resp
+ //---------------------------------
+ select {
+ case msg := <-xappConn.rmrConChan:
+ if msg.Mtype != xapp.RICMessageTypes["RIC_SUB_DEL_RESP"] {
+ testError(t, "(%s) Received wrong mtype expected %s got %s, error", xappConn.desc, "RIC_SUB_DEL_RESP", xapp.RicMessageTypeToName[msg.Mtype])
+ return
+ } else {
+ packedData := &packer.PackedData{}
+ packedData.Buf = msg.Payload
+ unpackerr := e2SubsDelResp.UnPack(packedData)
+ if unpackerr != nil {
+ testError(t, "(%s) RIC_SUB_DEL_RESP unpack failed err: %s", xappConn.desc, unpackerr.Error())
+ }
+ geterr, resp := e2SubsDelResp.Get()
+ if geterr != nil {
+ testError(t, "(%s) RIC_SUB_DEL_RESP get failed err: %s", xappConn.desc, geterr.Error())
+ }
+ xapp.Logger.Info("(%s) Recv Subs Del Resp rmr: xid=%s subid=%d, asn: seqnro=%d", xappConn.desc, msg.Xid, msg.SubId, resp.RequestId.Seq)
+ return
+ }
+ case <-time.After(15 * time.Second):
+ testError(t, "(%s) Not Received RIC_SUB_DEL_RESP within 15 secs", xappConn.desc)
+ }
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+func (e2termConn *testingE2termControl) handle_e2term_subs_req(t *testing.T) (*e2ap.E2APSubscriptionRequest, *xapp.RMRParams) {
+ xapp.Logger.Info("handle_e2term_subs_req")
+ e2SubsReq := e2asnpacker.NewPackerSubscriptionRequest()
+
+ //---------------------------------
+ // e2term activity: Recv Subs Req
+ //---------------------------------
+ select {
+ case msg := <-e2termConn.rmrConChan:
+ if msg.Mtype != xapp.RICMessageTypes["RIC_SUB_REQ"] {
+ testError(t, "(%s) Received wrong mtype expected %s got %s, error", e2termConn.desc, "RIC_SUB_REQ", xapp.RicMessageTypeToName[msg.Mtype])
+ } else {
+ xapp.Logger.Info("(%s) Recv Subs Req", e2termConn.desc)
+ packedData := &packer.PackedData{}
+ packedData.Buf = msg.Payload
+ unpackerr := e2SubsReq.UnPack(packedData)
+ if unpackerr != nil {
+ testError(t, "(%s) RIC_SUB_REQ unpack failed err: %s", e2termConn.desc, unpackerr.Error())
+ }
+ geterr, req := e2SubsReq.Get()
+ if geterr != nil {
+ testError(t, "(%s) RIC_SUB_REQ get failed err: %s", e2termConn.desc, geterr.Error())
+ }
+ return req, msg
+ }
+ case <-time.After(15 * time.Second):
+ testError(t, "(%s) Not Received RIC_SUB_REQ within 15 secs", e2termConn.desc)
+ }
+ return nil, nil
+}
+
+func (e2termConn *testingE2termControl) handle_e2term_subs_resp(t *testing.T, req *e2ap.E2APSubscriptionRequest, msg *xapp.RMRParams) {
+ xapp.Logger.Info("handle_e2term_subs_resp")
+ e2SubsResp := e2asnpacker.NewPackerSubscriptionResponse()
+
+ //---------------------------------
+ // e2term activity: Send Subs Resp
+ //---------------------------------
+ xapp.Logger.Info("(%s) Send Subs Resp", e2termConn.desc)
resp := &e2ap.E2APSubscriptionResponse{}
@@ -93,115 +289,11 @@
resp.ActionNotAdmittedList.Items = append(resp.ActionNotAdmittedList.Items, item)
}
- return resp
-}
-
-//-----------------------------------------------------------------------------
-//
-//-----------------------------------------------------------------------------
-func createSubsDelReq(e2SubsId uint32) *e2ap.E2APSubscriptionDeleteRequest {
- req := &e2ap.E2APSubscriptionDeleteRequest{}
- req.RequestId.Id = 1
- req.RequestId.Seq = e2SubsId
- req.FunctionId = 1
- return req
-}
-
-//-----------------------------------------------------------------------------
-//
-//-----------------------------------------------------------------------------
-func createSubsDelResp(req *e2ap.E2APSubscriptionDeleteRequest) *e2ap.E2APSubscriptionDeleteResponse {
- resp := &e2ap.E2APSubscriptionDeleteResponse{}
- resp.RequestId.Id = req.RequestId.Id
- resp.RequestId.Seq = req.RequestId.Seq
- resp.FunctionId = req.FunctionId
- return resp
-}
-
-//-----------------------------------------------------------------------------
-//
-//-----------------------------------------------------------------------------
-func handle_xapp_subs_req(t *testing.T) {
- xapp.Logger.Info("handle_xapp_subs_req start")
- e2SubsReq := e2asnpacker.NewPackerSubscriptionRequest()
-
- //---------------------------------
- // xapp activity: Send Subs Req
- //---------------------------------
- //select {
- //case <-time.After(1 * time.Second):
- xapp.Logger.Info("(xappConn) Send Subs Req")
- req := createSubsReq()
- e2SubsReq.Set(req)
- xapp.Logger.Debug("%s", e2SubsReq.String())
- err, packedMsg := e2SubsReq.Pack(nil)
- if err != nil {
- testError(t, "(xappConn) pack NOK %s", err.Error())
- }
-
- params := &xapp.RMRParams{}
- params.Mtype = xapp.RIC_SUB_REQ
- params.SubId = -1
- params.Payload = packedMsg.Buf
- params.Meid = &xapp.RMRMeid{RanName: "RAN_NAME_1"}
- params.Xid = "XID_1"
- params.Mbuf = nil
-
- snderr := xappConn.RmrSend(params)
- if snderr != nil {
- testError(t, "(xappConn) RMR SEND FAILED: %s", snderr.Error())
- }
- //}
-}
-
-//-----------------------------------------------------------------------------
-//
-//-----------------------------------------------------------------------------
-func handle_e2term_subs_req(t *testing.T) (*e2ap.E2APSubscriptionRequest, *xapp.RMRParams) {
- xapp.Logger.Info("handle_e2term_subs_req start")
- e2SubsReq := e2asnpacker.NewPackerSubscriptionRequest()
-
- //---------------------------------
- // e2term activity: Recv Subs Req
- //---------------------------------
- select {
- case msg := <-e2termConn.rmrConChan:
- if msg.Mtype != xapp.RICMessageTypes["RIC_SUB_REQ"] {
- testError(t, "(e2termConn) Received non RIC_SUB_REQ message")
- } else {
- xapp.Logger.Info("(e2termConn) Recv Subs Req")
- packedData := &packer.PackedData{}
- packedData.Buf = msg.Payload
- unpackerr := e2SubsReq.UnPack(packedData)
- if unpackerr != nil {
- testError(t, "(e2termConn) RIC_SUB_REQ unpack failed err: %s", unpackerr.Error())
- }
- geterr, req := e2SubsReq.Get()
- if geterr != nil {
- testError(t, "(e2termConn) RIC_SUB_REQ get failed err: %s", geterr.Error())
- }
- return req, msg
- }
- case <-time.After(15 * time.Second):
- testError(t, "(e2termConn) Not Received RIC_SUB_REQ within 15 secs")
- }
- return nil, nil
-}
-
-func handle_e2term_subs_resp(t *testing.T, req *e2ap.E2APSubscriptionRequest, msg *xapp.RMRParams) {
- xapp.Logger.Info("handle_e2term_subs_resp start")
- e2SubsResp := e2asnpacker.NewPackerSubscriptionResponse()
-
- //---------------------------------
- // e2term activity: Send Subs Resp
- //---------------------------------
- xapp.Logger.Info("(e2termConn) Send Subs Resp")
- resp := createSubsResp(req)
e2SubsResp.Set(resp)
xapp.Logger.Debug("%s", e2SubsResp.String())
packerr, packedMsg := e2SubsResp.Pack(nil)
if packerr != nil {
- testError(t, "(e2termConn) pack NOK %s", packerr.Error())
+ testError(t, "(%s) pack NOK %s", e2termConn.desc, packerr.Error())
}
params := &xapp.RMRParams{}
@@ -214,94 +306,15 @@
snderr := e2termConn.RmrSend(params)
if snderr != nil {
- testError(t, "(e2termConn) RMR SEND FAILED: %s", snderr.Error())
+ testError(t, "(%s) RMR SEND FAILED: %s", e2termConn.desc, snderr.Error())
}
}
-func handle_e2term_subs_reqandresp(t *testing.T) {
- req, msg := handle_e2term_subs_req(t)
- handle_e2term_subs_resp(t, req, msg)
-}
-
//-----------------------------------------------------------------------------
//
//-----------------------------------------------------------------------------
-func handle_xapp_subs_resp(t *testing.T) int {
- xapp.Logger.Info("handle_xapp_subs_resp start")
- e2SubsResp := e2asnpacker.NewPackerSubscriptionResponse()
- var e2SubsId int
-
- //---------------------------------
- // xapp activity: Recv Subs Resp
- //---------------------------------
- select {
- case msg := <-xappConn.rmrConChan:
- if msg.Mtype != xapp.RICMessageTypes["RIC_SUB_RESP"] {
- testError(t, "(xappConn) Received non RIC_SUB_RESP message")
- } else {
- xapp.Logger.Info("(xappConn) Recv Subs Resp")
-
- packedData := &packer.PackedData{}
- packedData.Buf = msg.Payload
- e2SubsId = msg.SubId
- unpackerr := e2SubsResp.UnPack(packedData)
-
- if unpackerr != nil {
- testError(t, "(xappConn) RIC_SUB_RESP unpack failed err: %s", unpackerr.Error())
- }
- geterr, _ := e2SubsResp.Get()
- if geterr != nil {
- testError(t, "(xappConn) RIC_SUB_RESP get failed err: %s", geterr.Error())
- }
-
- }
- case <-time.After(15 * time.Second):
- testError(t, "(xappConn) Not Received RIC_SUB_RESP within 15 secs")
- }
- return e2SubsId
-}
-
-//-----------------------------------------------------------------------------
-//
-//-----------------------------------------------------------------------------
-func handle_xapp_subs_del_req(t *testing.T, e2SubsId int) {
- xapp.Logger.Info("handle_xapp_subs_del_req start")
- e2SubsDelReq := e2asnpacker.NewPackerSubscriptionDeleteRequest()
-
- //---------------------------------
- // xapp activity: Send Subs Del Req
- //---------------------------------
- //select {
- //case <-time.After(1 * time.Second):
- xapp.Logger.Info("(xappConn) Send Subs Del Req")
- req := createSubsDelReq(uint32(e2SubsId))
- e2SubsDelReq.Set(req)
- xapp.Logger.Debug("%s", e2SubsDelReq.String())
- err, packedMsg := e2SubsDelReq.Pack(nil)
- if err != nil {
- testError(t, "(xappConn) pack NOK %s", err.Error())
- }
-
- params := &xapp.RMRParams{}
- params.Mtype = xapp.RIC_SUB_DEL_REQ
- params.SubId = e2SubsId
- params.Payload = packedMsg.Buf
- params.Meid = &xapp.RMRMeid{RanName: "RAN_NAME_1"}
- params.Xid = "XID_1"
- params.Mbuf = nil
-
- snderr := xappConn.RmrSend(params)
- if snderr != nil {
- testError(t, "(xappConn) RMR SEND FAILED: %s", snderr.Error())
- }
- //}
-}
-
-//-----------------------------------------------------------------------------
-//
-//-----------------------------------------------------------------------------
-func handle_e2term_subs_del_req(t *testing.T) (*e2ap.E2APSubscriptionDeleteRequest, *xapp.RMRParams) {
- xapp.Logger.Info("handle_e2term_subs_del_req start")
+func (e2termConn *testingE2termControl) handle_e2term_subs_del_req(t *testing.T) (*e2ap.E2APSubscriptionDeleteRequest, *xapp.RMRParams) {
+ xapp.Logger.Info("handle_e2term_subs_del_req")
e2SubsDelReq := e2asnpacker.NewPackerSubscriptionDeleteRequest()
//---------------------------------
@@ -310,42 +323,54 @@
select {
case msg := <-e2termConn.rmrConChan:
if msg.Mtype != xapp.RICMessageTypes["RIC_SUB_DEL_REQ"] {
- testError(t, "(e2termConn) Received non RIC_SUB_DEL_REQ message")
+ testError(t, "(%s) Received wrong mtype expected %s got %s, error", e2termConn.desc, "RIC_SUB_DEL_REQ", xapp.RicMessageTypeToName[msg.Mtype])
} else {
- xapp.Logger.Info("(e2termConn) Recv Subs Del Req")
+ xapp.Logger.Info("(%s) Recv Subs Del Req", e2termConn.desc)
packedData := &packer.PackedData{}
packedData.Buf = msg.Payload
unpackerr := e2SubsDelReq.UnPack(packedData)
if unpackerr != nil {
- testError(t, "(e2termConn) RIC_SUB_DEL_REQ unpack failed err: %s", unpackerr.Error())
+ testError(t, "(%s) RIC_SUB_DEL_REQ unpack failed err: %s", e2termConn.desc, unpackerr.Error())
}
geterr, req := e2SubsDelReq.Get()
if geterr != nil {
- testError(t, "(e2termConn) RIC_SUB_DEL_REQ get failed err: %s", geterr.Error())
+ testError(t, "(%s) RIC_SUB_DEL_REQ get failed err: %s", e2termConn.desc, geterr.Error())
}
return req, msg
}
case <-time.After(15 * time.Second):
- testError(t, "(e2termConn) Not Received RIC_SUB_DEL_REQ within 15 secs")
+ testError(t, "(%s) Not Received RIC_SUB_DEL_REQ within 15 secs", e2termConn.desc)
}
return nil, nil
}
-func handle_e2term_subs_del_resp(t *testing.T, req *e2ap.E2APSubscriptionDeleteRequest, msg *xapp.RMRParams) {
- xapp.Logger.Info("handle_e2term_subs_del_resp start")
+func handle_e2term_recv_empty() bool {
+ if len(e2termConn.rmrConChan) > 0 {
+ return false
+ }
+ return true
+}
+
+func (e2termConn *testingE2termControl) handle_e2term_subs_del_resp(t *testing.T, req *e2ap.E2APSubscriptionDeleteRequest, msg *xapp.RMRParams) {
+ xapp.Logger.Info("handle_e2term_subs_del_resp")
e2SubsDelResp := e2asnpacker.NewPackerSubscriptionDeleteResponse()
//---------------------------------
// e2term activity: Send Subs Del Resp
//---------------------------------
- xapp.Logger.Info("(e2termConn) Send Subs Del Resp")
- resp := createSubsDelResp(req)
+ xapp.Logger.Info("(%s) Send Subs Del Resp", e2termConn.desc)
+
+ resp := &e2ap.E2APSubscriptionDeleteResponse{}
+ resp.RequestId.Id = req.RequestId.Id
+ resp.RequestId.Seq = req.RequestId.Seq
+ resp.FunctionId = req.FunctionId
+
e2SubsDelResp.Set(resp)
xapp.Logger.Debug("%s", e2SubsDelResp.String())
packerr, packedMsg := e2SubsDelResp.Pack(nil)
if packerr != nil {
- testError(t, "(e2termConn) pack NOK %s", packerr.Error())
+ testError(t, "(%s) pack NOK %s", e2termConn.desc, packerr.Error())
}
params := &xapp.RMRParams{}
@@ -358,60 +383,24 @@
snderr := e2termConn.RmrSend(params)
if snderr != nil {
- testError(t, "(e2termConn) RMR SEND FAILED: %s", snderr.Error())
+ testError(t, "(%s) RMR SEND FAILED: %s", e2termConn.desc, snderr.Error())
}
}
-func handle_e2term_subs_del_reqandresp(t *testing.T) {
- req, msg := handle_e2term_subs_del_req(t)
- handle_e2term_subs_del_resp(t, req, msg)
-}
-
//-----------------------------------------------------------------------------
//
//-----------------------------------------------------------------------------
-func handle_xapp_subs_del_resp(t *testing.T) {
- xapp.Logger.Info("handle_xapp_subs_del_resp start")
- e2SubsDelResp := e2asnpacker.NewPackerSubscriptionDeleteResponse()
-
- //---------------------------------
- // xapp activity: Recv Subs Del Resp
- //---------------------------------
- select {
- case msg := <-xappConn.rmrConChan:
- if msg.Mtype != xapp.RICMessageTypes["RIC_SUB_DEL_RESP"] {
- testError(t, "(xappConn) Received non RIC_SUB_DEL_RESP message")
- } else {
- xapp.Logger.Info("(xappConn) Recv Subs Del Resp")
-
- packedData := &packer.PackedData{}
- packedData.Buf = msg.Payload
- unpackerr := e2SubsDelResp.UnPack(packedData)
- if unpackerr != nil {
- testError(t, "(xappConn) RIC_SUB_DEL_RESP unpack failed err: %s", unpackerr.Error())
- }
- geterr, _ := e2SubsDelResp.Get()
- if geterr != nil {
- testError(t, "(xappConn) RIC_SUB_DEL_RESP get failed err: %s", geterr.Error())
- }
-
+func (mc *testingMainControl) wait_subs_clean(t *testing.T, e2SubsId int, secs int) bool {
+ i := 1
+ for ; i <= secs*2; i++ {
+ if mc.c.registry.IsValidSequenceNumber(uint16(e2SubsId)) == false {
+ return true
}
- case <-time.After(15 * time.Second):
- testError(t, "(xappConn) Not Received RIC_SUB_DEL_RESP within 15 secs")
+ time.Sleep(500 * time.Millisecond)
}
-}
-
-//-----------------------------------------------------------------------------
-//
-//-----------------------------------------------------------------------------
-func handle_wait_subs_clean(t *testing.T, e2SubsId int) bool {
- xapp.Logger.Info("handle_wait_subs_clean start")
- if mainCtrl.wait_subs_clean(e2SubsId, 10) == false {
- testError(t, "(general) no clean within 10 secs")
- return false
- }
- return true
+ testError(t, "(general) no clean within %d secs", secs)
+ return false
}
//-----------------------------------------------------------------------------
@@ -449,18 +438,20 @@
//
//-----------------------------------------------------------------------------
func TestSubReqAndSubDelOk(t *testing.T) {
- xapp.Logger.Info("TestSubReqAndSubDelOk start")
+ xapp.Logger.Info("TestSubReqAndSubDelOk")
- handle_xapp_subs_req(t)
- handle_e2term_subs_reqandresp(t)
- e2SubsId := handle_xapp_subs_resp(t)
+ xappConn1.handle_xapp_subs_req(t, true)
+ crereq, cremsg := e2termConn.handle_e2term_subs_req(t)
+ e2termConn.handle_e2term_subs_resp(t, crereq, cremsg)
+ e2SubsId := xappConn1.handle_xapp_subs_resp(t)
- handle_xapp_subs_del_req(t, e2SubsId)
- handle_e2term_subs_del_reqandresp(t)
- handle_xapp_subs_del_resp(t)
+ xappConn1.handle_xapp_subs_del_req(t, true, e2SubsId)
+ delreq, delmsg := e2termConn.handle_e2term_subs_del_req(t)
+ e2termConn.handle_e2term_subs_del_resp(t, delreq, delmsg)
+ xappConn1.handle_xapp_subs_del_resp(t)
//Wait that subs is cleaned
- handle_wait_subs_clean(t, e2SubsId)
+ mainCtrl.wait_subs_clean(t, e2SubsId, 10)
}
//-----------------------------------------------------------------------------
@@ -492,24 +483,25 @@
//
//-----------------------------------------------------------------------------
func TestSubReqRetransmission(t *testing.T) {
- xapp.Logger.Info("TestSubReqRetransmission start")
+ xapp.Logger.Info("TestSubReqRetransmission")
//Subs Create
- handle_xapp_subs_req(t)
- req, msg := handle_e2term_subs_req(t)
- handle_xapp_subs_req(t)
+ xappConn1.handle_xapp_subs_req(t, true)
+ crereq, cremsg := e2termConn.handle_e2term_subs_req(t)
+ xappConn1.handle_xapp_subs_req(t, false)
- handle_e2term_subs_resp(t, req, msg)
+ e2termConn.handle_e2term_subs_resp(t, crereq, cremsg)
- e2SubsId := handle_xapp_subs_resp(t)
+ e2SubsId := xappConn1.handle_xapp_subs_resp(t)
//Subs Delete
- handle_xapp_subs_del_req(t, e2SubsId)
- handle_e2term_subs_del_reqandresp(t)
- handle_xapp_subs_del_resp(t)
+ xappConn1.handle_xapp_subs_del_req(t, true, e2SubsId)
+ delreq, delmsg := e2termConn.handle_e2term_subs_del_req(t)
+ e2termConn.handle_e2term_subs_del_resp(t, delreq, delmsg)
+ xappConn1.handle_xapp_subs_del_resp(t)
//Wait that subs is cleaned
- handle_wait_subs_clean(t, e2SubsId)
+ mainCtrl.wait_subs_clean(t, e2SubsId, 10)
}
//-----------------------------------------------------------------------------
@@ -540,23 +532,98 @@
//
//-----------------------------------------------------------------------------
func TestSubDelReqRetransmission(t *testing.T) {
- xapp.Logger.Info("TestSubDelReqRetransmission start")
+ xapp.Logger.Info("TestSubDelReqRetransmission")
//Subs Create
- handle_xapp_subs_req(t)
- handle_e2term_subs_reqandresp(t)
- e2SubsId := handle_xapp_subs_resp(t)
+ xappConn1.handle_xapp_subs_req(t, true)
+ crereq, cremsg := e2termConn.handle_e2term_subs_req(t)
+ e2termConn.handle_e2term_subs_resp(t, crereq, cremsg)
+ e2SubsId := xappConn1.handle_xapp_subs_resp(t)
//Subs Delete
- handle_xapp_subs_del_req(t, e2SubsId)
- req, msg := handle_e2term_subs_del_req(t)
+ xappConn1.handle_xapp_subs_del_req(t, true, e2SubsId)
+ delreq, delmsg := e2termConn.handle_e2term_subs_del_req(t)
<-time.After(2 * time.Second)
+ xappConn1.handle_xapp_subs_del_req(t, false, e2SubsId)
- handle_xapp_subs_del_req(t, e2SubsId)
-
- handle_e2term_subs_del_resp(t, req, msg)
+ e2termConn.handle_e2term_subs_del_resp(t, delreq, delmsg)
+ xappConn1.handle_xapp_subs_del_resp(t)
//Wait that subs is cleaned
- handle_wait_subs_clean(t, e2SubsId)
+ mainCtrl.wait_subs_clean(t, e2SubsId, 10)
+}
+
+//-----------------------------------------------------------------------------
+// TestSubReqAndSubDelOkTwoParallel
+//
+// stub stub
+// +-------+ +---------+ +---------+
+// | xapp | | submgr | | e2term |
+// +-------+ +---------+ +---------+
+// | | |
+// | | |
+// | | |
+// | SubReq1 | |
+// |------------->| |
+// | | |
+// | | SubReq1 |
+// | |------------->|
+// | | |
+// | SubReq2 | |
+// |------------->| |
+// | | |
+// | | SubReq2 |
+// | |------------->|
+// | | |
+// | | SubResp1 |
+// | |<-------------|
+// | | SubResp2 |
+// | |<-------------|
+// | | |
+// | SubResp1 | |
+// |<-------------| |
+// | SubResp2 | |
+// |<-------------| |
+// | | |
+// | [SUBS 1 DELETE] |
+// | | |
+// | [SUBS 2 DELETE] |
+// | | |
+//
+//-----------------------------------------------------------------------------
+func TestSubReqAndSubDelOkTwoParallel(t *testing.T) {
+ xapp.Logger.Info("TestSubReqAndSubDelOkTwoParallel")
+
+ //Req1
+ xappConn1.handle_xapp_subs_req(t, true)
+ crereq1, cremsg1 := e2termConn.handle_e2term_subs_req(t)
+
+ //Req2
+ xappConn2.handle_xapp_subs_req(t, true)
+ crereq2, cremsg2 := e2termConn.handle_e2term_subs_req(t)
+
+ //Resp1
+ e2termConn.handle_e2term_subs_resp(t, crereq1, cremsg1)
+ e2SubsId1 := xappConn1.handle_xapp_subs_resp(t)
+
+ //Resp2
+ e2termConn.handle_e2term_subs_resp(t, crereq2, cremsg2)
+ e2SubsId2 := xappConn2.handle_xapp_subs_resp(t)
+
+ //Del1
+ xappConn1.handle_xapp_subs_del_req(t, true, e2SubsId1)
+ delreq1, delmsg1 := e2termConn.handle_e2term_subs_del_req(t)
+ e2termConn.handle_e2term_subs_del_resp(t, delreq1, delmsg1)
+ xappConn1.handle_xapp_subs_del_resp(t)
+ //Wait that subs is cleaned
+ mainCtrl.wait_subs_clean(t, e2SubsId1, 10)
+
+ //Del2
+ xappConn2.handle_xapp_subs_del_req(t, true, e2SubsId2)
+ delreq2, delmsg2 := e2termConn.handle_e2term_subs_del_req(t)
+ e2termConn.handle_e2term_subs_del_resp(t, delreq2, delmsg2)
+ xappConn2.handle_xapp_subs_del_resp(t)
+ //Wait that subs is cleaned
+ mainCtrl.wait_subs_clean(t, e2SubsId2, 10)
}
diff --git a/pkg/control/registry.go b/pkg/control/registry.go
index 4f5c4fd..3ac15f1 100644
--- a/pkg/control/registry.go
+++ b/pkg/control/registry.go
@@ -24,15 +24,20 @@
"sync"
)
+type Subscription struct {
+ Seq uint16
+ Confirmed bool
+}
+
type Registry struct {
- register map[uint16]bool
+ register map[uint16]*Subscription
counter uint16
mutex sync.Mutex
}
// This method should run as a constructor
func (r *Registry) Initialize(seedsn uint16) {
- r.register = make(map[uint16]bool)
+ r.register = make(map[uint16]*Subscription)
r.counter = seedsn
}
@@ -46,7 +51,7 @@
xapp.Logger.Error("Invalid SeqenceNumber sequenceNumber: %v", sequenceNumber)
return sequenceNumber, false
}
- r.register[sequenceNumber] = false
+ r.register[sequenceNumber] = &Subscription{sequenceNumber, false}
// Allocate next SequenceNumber value
if r.counter == 65535 {
@@ -72,14 +77,14 @@
func (r *Registry) setSubscriptionToConfirmed(sn uint16) {
r.mutex.Lock()
defer r.mutex.Unlock()
- r.register[sn] = true
+ r.register[sn].Confirmed = true
}
//This function sets the given id as unused in the register
func (r *Registry) deleteSubscription(sn uint16) {
r.mutex.Lock()
defer r.mutex.Unlock()
- r.register[sn] = false
+ r.register[sn].Confirmed = false
}
//This function releases the given id as unused in the register