RICPLT-2965 Unittest to support multiple xappconns

Change-Id: I4a7afe59b62d4628afdf4e5434c143eb945ea5fc
Signed-off-by: Juha Hyttinen <juha.hyttinen@nokia.com>
diff --git a/pkg/control/main_test.go b/pkg/control/main_test.go
index 2185d53..1c69b01 100644
--- a/pkg/control/main_test.go
+++ b/pkg/control/main_test.go
@@ -20,12 +20,16 @@
 package control
 
 import (
+	"encoding/json"
 	"errors"
 	"fmt"
+	"gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_models"
 	"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
 	"io/ioutil"
 	"net/http"
 	"os"
+	"strconv"
+	"strings"
 	"testing"
 	"time"
 )
@@ -44,19 +48,27 @@
 	return
 }
 
+func (tc *testingControl) WaitCB() {
+	<-tc.syncChan
+}
+
+func initTestingControl(desc string, rtfile string, port string) testingControl {
+	tc := testingControl{}
+	os.Setenv("RMR_SEED_RT", rtfile)
+	os.Setenv("RMR_SRC_ID", "localhost:"+port)
+	xapp.Logger.Info("Using rt file %s", os.Getenv("RMR_SEED_RT"))
+	xapp.Logger.Info("Using src id  %s", os.Getenv("RMR_SRC_ID"))
+	tc.desc = strings.ToUpper(desc)
+	tc.syncChan = make(chan struct{})
+	return tc
+}
+
 //-----------------------------------------------------------------------------
 //
 //-----------------------------------------------------------------------------
 type testingRmrControl struct {
 	testingControl
 	rmrClientTest *xapp.RMRClient
-	rmrConChan    chan *xapp.RMRParams
-}
-
-func (tc *testingRmrControl) Consume(msg *xapp.RMRParams) (err error) {
-	xapp.Logger.Info("testingRmrControl(%s) Consume", tc.desc)
-	tc.rmrConChan <- msg
-	return
 }
 
 func (tc *testingRmrControl) RmrSend(params *xapp.RMRParams) (err error) {
@@ -79,20 +91,88 @@
 	return
 }
 
-func createNewRmrControl(desc string, rtfile string, port string, stat string) *testingRmrControl {
-	os.Setenv("RMR_SEED_RT", rtfile)
-	os.Setenv("RMR_SRC_ID", "localhost:"+port)
-	xapp.Logger.Info("Using rt file %s", os.Getenv("RMR_SEED_RT"))
-	xapp.Logger.Info("Using src id  %s", os.Getenv("RMR_SRC_ID"))
-	newConn := &testingRmrControl{}
-	newConn.desc = desc
-	newConn.syncChan = make(chan struct{})
-	newConn.rmrClientTest = xapp.NewRMRClientWithParams("tcp:"+port, 4096, 1, stat)
-	newConn.rmrConChan = make(chan *xapp.RMRParams)
-	newConn.rmrClientTest.SetReadyCB(newConn.ReadyCB, nil)
-	go newConn.rmrClientTest.Start(newConn)
-	<-newConn.syncChan
-	return newConn
+func initTestingRmrControl(desc string, rtfile string, port string, stat string, consumer xapp.MessageConsumer) testingRmrControl {
+	tc := testingRmrControl{}
+	tc.testingControl = initTestingControl(desc, rtfile, port)
+	tc.rmrClientTest = xapp.NewRMRClientWithParams("tcp:"+port, 4096, 1, stat)
+	tc.rmrClientTest.SetReadyCB(tc.ReadyCB, nil)
+	go tc.rmrClientTest.Start(consumer)
+	tc.WaitCB()
+	return tc
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+type testingMessageChannel struct {
+	rmrConChan chan *xapp.RMRParams
+}
+
+func initTestingMessageChannel() testingMessageChannel {
+	mc := testingMessageChannel{}
+	mc.rmrConChan = make(chan *xapp.RMRParams)
+	return mc
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+
+type testingXappControl struct {
+	testingRmrControl
+	testingMessageChannel
+	meid    *xapp.RMRMeid
+	xid_seq uint64
+	xid     string
+}
+
+func (tc *testingXappControl) newXid() string {
+	tc.xid = tc.desc + "_XID_" + strconv.FormatUint(uint64(tc.xid_seq), 10)
+	tc.xid_seq++
+	return tc.xid
+}
+
+func (tc *testingXappControl) Consume(msg *xapp.RMRParams) (err error) {
+
+	//if msg.Xid == tc.xid {
+	if strings.Contains(msg.Xid, tc.desc) {
+		xapp.Logger.Info("(%s) Consume mtype=%s subid=%d xid=%s", tc.desc, xapp.RicMessageTypeToName[msg.Mtype], msg.SubId, msg.Xid)
+		tc.rmrConChan <- msg
+	} else {
+		xapp.Logger.Info("(%s) Ignore mtype=%s subid=%d xid=%s, Expected xid to contain %s", tc.desc, xapp.RicMessageTypeToName[msg.Mtype], msg.SubId, msg.Xid, tc.desc)
+	}
+	return
+}
+
+func createNewXappControl(desc string, rtfile string, port string, stat string, ranname string) *testingXappControl {
+	xappCtrl := &testingXappControl{}
+	xappCtrl.testingRmrControl = initTestingRmrControl(desc, rtfile, port, stat, xappCtrl)
+	xappCtrl.testingMessageChannel = initTestingMessageChannel()
+	xappCtrl.meid = &xapp.RMRMeid{RanName: ranname}
+	xappCtrl.xid_seq = 0
+	xappCtrl.newXid()
+	return xappCtrl
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+type testingE2termControl struct {
+	testingRmrControl
+	testingMessageChannel
+}
+
+func (tc *testingE2termControl) Consume(msg *xapp.RMRParams) (err error) {
+	xapp.Logger.Info("(%s) Consume mtype=%s subid=%d xid=%s", tc.desc, xapp.RicMessageTypeToName[msg.Mtype], msg.SubId, msg.Xid)
+	tc.rmrConChan <- msg
+	return
+}
+
+func createNewE2termControl(desc string, rtfile string, port string, stat string) *testingE2termControl {
+	e2termCtrl := &testingE2termControl{}
+	e2termCtrl.testingRmrControl = initTestingRmrControl(desc, rtfile, port, stat, e2termCtrl)
+	e2termCtrl.testingMessageChannel = initTestingMessageChannel()
+	return e2termCtrl
 }
 
 //-----------------------------------------------------------------------------
@@ -103,15 +183,14 @@
 	c *Control
 }
 
-func (mc *testingMainControl) wait_subs_clean(e2SubsId int, secs int) bool {
-	i := 1
-	for ; i <= secs*2; i++ {
-		if mc.c.registry.IsValidSequenceNumber(uint16(e2SubsId)) == false {
-			return true
-		}
-		time.Sleep(500 * time.Millisecond)
-	}
-	return false
+func createNewMainControl(desc string, rtfile string, port string) *testingMainControl {
+	mainCtrl = &testingMainControl{}
+	mainCtrl.testingControl = initTestingControl(desc, rtfile, port)
+	mainCtrl.c = NewControl()
+	xapp.SetReadyCB(mainCtrl.ReadyCB, nil)
+	go xapp.RunWithParams(mainCtrl.c, false)
+	mainCtrl.WaitCB()
+	return mainCtrl
 }
 
 //-----------------------------------------------------------------------------
@@ -123,6 +202,11 @@
 	t.Errorf(fmt.Sprintf(pattern, args...))
 }
 
+func testLog(t *testing.T, pattern string, args ...interface{}) {
+	xapp.Logger.Info(fmt.Sprintf(pattern, args...))
+	t.Logf(fmt.Sprintf(pattern, args...))
+}
+
 func testCreateTmpFile(str string) (string, error) {
 	file, err := ioutil.TempFile("/tmp", "*.rt")
 	if err != nil {
@@ -140,8 +224,9 @@
 //
 //-----------------------------------------------------------------------------
 
-var xappConn *testingRmrControl
-var e2termConn *testingRmrControl
+var xappConn1 *testingXappControl
+var xappConn2 *testingXappControl
+var e2termConn *testingE2termControl
 var mainCtrl *testingMainControl
 
 func TestMain(m *testing.M) {
@@ -184,7 +269,10 @@
 	xapp.Logger.Info("Using cfg file %s", os.Getenv("CFG_FILE"))
 
 	//---------------------------------
-	//
+	// NOTE: Routing table is configured so, that responses
+	//       are duplicated to xapp1 and xapp2 instances.
+	//       If XID is not matching xapp stub will just
+	//       drop message. (Messages 12011, 12012, 12021, 12022)
 	//---------------------------------
 	xapp.Logger.Info("### submgr main run ###")
 
@@ -192,40 +280,27 @@
 mse|12010|-1|localhost:14560
 mse|12010,localhost:14560|-1|localhost:15560
 mse|12011,localhost:15560|-1|localhost:14560
-mse|12011|-1|localhost:13560
+mse|12011|-1|localhost:13560;localhost:13660
 mse|12012,localhost:15560|-1|localhost:14560
-mse|12012|-1|localhost:13560
+mse|12012|-1|localhost:13560;localhost:13660
 mse|12020|-1|localhost:14560
 mse|12020,localhost:14560|-1|localhost:15560
 mse|12021,localhost:15560|-1|localhost:14560
-mse|12021|-1|localhost:13560
+mse|12021|-1|localhost:13560;localhost:13660
 mse|12022,localhost:15560|-1|localhost:14560
-mse|12022|-1|localhost:13560
+mse|12022|-1|localhost:13560;localhost:13660
 newrt|end
 `
-
 	subrtfilename, _ := testCreateTmpFile(subsrt)
 	defer os.Remove(subrtfilename)
-	os.Setenv("RMR_SEED_RT", subrtfilename)
-	os.Setenv("RMR_SRC_ID", "localhost:14560")
-	xapp.Logger.Info("Using rt file %s", os.Getenv("RMR_SEED_RT"))
-	xapp.Logger.Info("Using src id  %s", os.Getenv("RMR_SRC_ID"))
-
-	mainCtrl = &testingMainControl{}
-	mainCtrl.desc = "main"
-	mainCtrl.syncChan = make(chan struct{})
-
-	mainCtrl.c = NewControl()
-	xapp.SetReadyCB(mainCtrl.ReadyCB, nil)
-	go xapp.RunWithParams(mainCtrl.c, false)
-	<-mainCtrl.syncChan
+	mainCtrl = createNewMainControl("main", subrtfilename, "14560")
 
 	//---------------------------------
 	//
 	//---------------------------------
-	xapp.Logger.Info("### xapp rmr run ###")
+	xapp.Logger.Info("### xapp1 rmr run ###")
 
-	xapprt := `newrt|start
+	xapprt1 := `newrt|start
 mse|12010|-1|localhost:14560
 mse|12011|-1|localhost:13560
 mse|12012|-1|localhost:13560
@@ -235,9 +310,29 @@
 newrt|end
 `
 
-	xapprtfilename, _ := testCreateTmpFile(xapprt)
-	defer os.Remove(xapprtfilename)
-	xappConn = createNewRmrControl("xappConn", xapprtfilename, "13560", "RMRXAPPSTUB")
+	xapprtfilename1, _ := testCreateTmpFile(xapprt1)
+	defer os.Remove(xapprtfilename1)
+	xappConn1 = createNewXappControl("xappConn1", xapprtfilename1, "13560", "RMRXAPP1STUB", "RAN_NAME_1")
+
+	//---------------------------------
+	//
+	//---------------------------------
+
+	xapp.Logger.Info("### xapp2 rmr run ###")
+
+	xapprt2 := `newrt|start
+mse|12010|-1|localhost:14560
+mse|12011|-1|localhost:13660
+mse|12012|-1|localhost:13660
+mse|12020|-1|localhost:14560
+mse|12021|-1|localhost:13660
+mse|12022|-1|localhost:13660
+newrt|end
+`
+
+	xapprtfilename2, _ := testCreateTmpFile(xapprt2)
+	defer os.Remove(xapprtfilename2)
+	xappConn2 = createNewXappControl("xappConn2", xapprtfilename2, "13660", "RMRXAPP2STUB", "RAN_NAME_1")
 
 	//---------------------------------
 	//
@@ -256,13 +351,19 @@
 
 	e2termrtfilename, _ := testCreateTmpFile(e2termrt)
 	defer os.Remove(e2termrtfilename)
-	e2termConn = createNewRmrControl("e2termConn", e2termrtfilename, "15560", "RMRE2TERMSTUB")
+	e2termConn = createNewE2termControl("e2termConn", e2termrtfilename, "15560", "RMRE2TERMSTUB")
 
 	//---------------------------------
 	//
 	//---------------------------------
 	http_handler := func(w http.ResponseWriter, r *http.Request) {
-		xapp.Logger.Info("(http handler) handling")
+		var req rtmgr_models.XappSubscriptionData
+		err := json.NewDecoder(r.Body).Decode(&req)
+		if err != nil {
+			xapp.Logger.Error("%s", err.Error())
+		}
+		xapp.Logger.Info("(http handler) handling Address=%s Port=%d SubscriptionID=%d", *req.Address, *req.Port, *req.SubscriptionID)
+
 		w.WriteHeader(200)
 	}
 
diff --git a/pkg/control/messaging_test.go b/pkg/control/messaging_test.go
index 7308563..b8ed6e6 100644
--- a/pkg/control/messaging_test.go
+++ b/pkg/control/messaging_test.go
@@ -37,7 +37,29 @@
 //-----------------------------------------------------------------------------
 //
 //-----------------------------------------------------------------------------
-func createSubsReq() *e2ap.E2APSubscriptionRequest {
+type xappTransaction struct {
+	xappConn *testingXappControl
+	xid      string
+}
+
+func newXappTransaction(xappConn *testingXappControl) {
+	trans := &xappTransaction{}
+	trans.xappConn = xappConn
+	trans.xid = xappConn.newXid()
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+func (xappConn *testingXappControl) handle_xapp_subs_req(t *testing.T, updseq bool) {
+	xapp.Logger.Info("handle_xapp_subs_req")
+	e2SubsReq := e2asnpacker.NewPackerSubscriptionRequest()
+
+	//---------------------------------
+	// xapp activity: Send Subs Req
+	//---------------------------------
+	xapp.Logger.Info("(%s) Send Subs Req", xappConn.desc)
+
 	req := &e2ap.E2APSubscriptionRequest{}
 
 	req.RequestId.Id = 1
@@ -66,13 +88,187 @@
 	req.ActionSetups[0].SubsequentAction.Type = e2ap.E2AP_SubSeqActionTypeContinue
 	req.ActionSetups[0].SubsequentAction.TimetoWait = e2ap.E2AP_TimeToWaitZero
 
-	return req
+	e2SubsReq.Set(req)
+	xapp.Logger.Debug("%s", e2SubsReq.String())
+	err, packedMsg := e2SubsReq.Pack(nil)
+	if err != nil {
+		testError(t, "(%s) pack NOK %s", xappConn.desc, err.Error())
+	}
+
+	params := &xapp.RMRParams{}
+	params.Mtype = xapp.RIC_SUB_REQ
+	params.SubId = -1
+	params.Payload = packedMsg.Buf
+	params.Meid = xappConn.meid
+	if updseq {
+		xappConn.newXid()
+	}
+	params.Xid = xappConn.xid
+	params.Mbuf = nil
+
+	snderr := xappConn.RmrSend(params)
+	if snderr != nil {
+		testError(t, "(%s) RMR SEND FAILED: %s", xappConn.desc, snderr.Error())
+	}
 }
 
 //-----------------------------------------------------------------------------
 //
 //-----------------------------------------------------------------------------
-func createSubsResp(req *e2ap.E2APSubscriptionRequest) *e2ap.E2APSubscriptionResponse {
+func (xappConn *testingXappControl) handle_xapp_subs_resp(t *testing.T) int {
+	xapp.Logger.Info("handle_xapp_subs_resp")
+	e2SubsResp := e2asnpacker.NewPackerSubscriptionResponse()
+	var e2SubsId int
+
+	//---------------------------------
+	// xapp activity: Recv Subs Resp
+	//---------------------------------
+	select {
+	case msg := <-xappConn.rmrConChan:
+		if msg.Mtype != xapp.RICMessageTypes["RIC_SUB_RESP"] {
+			testError(t, "(%s) Received wrong mtype expected %s got %s, error", xappConn.desc, "RIC_SUB_RESP", xapp.RicMessageTypeToName[msg.Mtype])
+			return -1
+		} else {
+			packedData := &packer.PackedData{}
+			packedData.Buf = msg.Payload
+			e2SubsId = msg.SubId
+			unpackerr := e2SubsResp.UnPack(packedData)
+
+			if unpackerr != nil {
+				testError(t, "(%s) RIC_SUB_RESP unpack failed err: %s", xappConn.desc, unpackerr.Error())
+			}
+			geterr, resp := e2SubsResp.Get()
+			if geterr != nil {
+				testError(t, "(%s) RIC_SUB_RESP get failed err: %s", xappConn.desc, geterr.Error())
+			}
+
+			xapp.Logger.Info("(%s) Recv Subs Resp rmr: xid=%s subid=%d, asn: seqnro=%d", xappConn.desc, msg.Xid, msg.SubId, resp.RequestId.Seq)
+			return e2SubsId
+		}
+	case <-time.After(15 * time.Second):
+		testError(t, "(%s) Not Received RIC_SUB_RESP within 15 secs", xappConn.desc)
+		return -1
+	}
+	return -1
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+func (xappConn *testingXappControl) handle_xapp_subs_del_req(t *testing.T, updseq bool, e2SubsId int) {
+	xapp.Logger.Info("handle_xapp_subs_del_req")
+	e2SubsDelReq := e2asnpacker.NewPackerSubscriptionDeleteRequest()
+
+	//---------------------------------
+	// xapp activity: Send Subs Del Req
+	//---------------------------------
+	xapp.Logger.Info("(%s) Send Subs Del Req", xappConn.desc)
+
+	req := &e2ap.E2APSubscriptionDeleteRequest{}
+	req.RequestId.Id = 1
+	req.RequestId.Seq = uint32(e2SubsId)
+	req.FunctionId = 1
+
+	e2SubsDelReq.Set(req)
+	xapp.Logger.Debug("%s", e2SubsDelReq.String())
+	err, packedMsg := e2SubsDelReq.Pack(nil)
+	if err != nil {
+		testError(t, "(%s) pack NOK %s", xappConn.desc, err.Error())
+	}
+
+	params := &xapp.RMRParams{}
+	params.Mtype = xapp.RIC_SUB_DEL_REQ
+	params.SubId = e2SubsId
+	params.Payload = packedMsg.Buf
+	params.Meid = xappConn.meid
+	if updseq {
+		xappConn.newXid()
+	}
+	params.Xid = xappConn.xid
+	params.Mbuf = nil
+
+	snderr := xappConn.RmrSend(params)
+	if snderr != nil {
+		testError(t, "(%s) RMR SEND FAILED: %s", xappConn.desc, snderr.Error())
+	}
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+func (xappConn *testingXappControl) handle_xapp_subs_del_resp(t *testing.T) {
+	xapp.Logger.Info("handle_xapp_subs_del_resp")
+	e2SubsDelResp := e2asnpacker.NewPackerSubscriptionDeleteResponse()
+
+	//---------------------------------
+	// xapp activity: Recv Subs Del Resp
+	//---------------------------------
+	select {
+	case msg := <-xappConn.rmrConChan:
+		if msg.Mtype != xapp.RICMessageTypes["RIC_SUB_DEL_RESP"] {
+			testError(t, "(%s) Received wrong mtype expected %s got %s, error", xappConn.desc, "RIC_SUB_DEL_RESP", xapp.RicMessageTypeToName[msg.Mtype])
+			return
+		} else {
+			packedData := &packer.PackedData{}
+			packedData.Buf = msg.Payload
+			unpackerr := e2SubsDelResp.UnPack(packedData)
+			if unpackerr != nil {
+				testError(t, "(%s) RIC_SUB_DEL_RESP unpack failed err: %s", xappConn.desc, unpackerr.Error())
+			}
+			geterr, resp := e2SubsDelResp.Get()
+			if geterr != nil {
+				testError(t, "(%s) RIC_SUB_DEL_RESP get failed err: %s", xappConn.desc, geterr.Error())
+			}
+			xapp.Logger.Info("(%s) Recv Subs Del Resp rmr: xid=%s subid=%d, asn: seqnro=%d", xappConn.desc, msg.Xid, msg.SubId, resp.RequestId.Seq)
+			return
+		}
+	case <-time.After(15 * time.Second):
+		testError(t, "(%s) Not Received RIC_SUB_DEL_RESP within 15 secs", xappConn.desc)
+	}
+}
+
+//-----------------------------------------------------------------------------
+//
+//-----------------------------------------------------------------------------
+func (e2termConn *testingE2termControl) handle_e2term_subs_req(t *testing.T) (*e2ap.E2APSubscriptionRequest, *xapp.RMRParams) {
+	xapp.Logger.Info("handle_e2term_subs_req")
+	e2SubsReq := e2asnpacker.NewPackerSubscriptionRequest()
+
+	//---------------------------------
+	// e2term activity: Recv Subs Req
+	//---------------------------------
+	select {
+	case msg := <-e2termConn.rmrConChan:
+		if msg.Mtype != xapp.RICMessageTypes["RIC_SUB_REQ"] {
+			testError(t, "(%s) Received wrong mtype expected %s got %s, error", e2termConn.desc, "RIC_SUB_REQ", xapp.RicMessageTypeToName[msg.Mtype])
+		} else {
+			xapp.Logger.Info("(%s) Recv Subs Req", e2termConn.desc)
+			packedData := &packer.PackedData{}
+			packedData.Buf = msg.Payload
+			unpackerr := e2SubsReq.UnPack(packedData)
+			if unpackerr != nil {
+				testError(t, "(%s) RIC_SUB_REQ unpack failed err: %s", e2termConn.desc, unpackerr.Error())
+			}
+			geterr, req := e2SubsReq.Get()
+			if geterr != nil {
+				testError(t, "(%s) RIC_SUB_REQ get failed err: %s", e2termConn.desc, geterr.Error())
+			}
+			return req, msg
+		}
+	case <-time.After(15 * time.Second):
+		testError(t, "(%s) Not Received RIC_SUB_REQ within 15 secs", e2termConn.desc)
+	}
+	return nil, nil
+}
+
+func (e2termConn *testingE2termControl) handle_e2term_subs_resp(t *testing.T, req *e2ap.E2APSubscriptionRequest, msg *xapp.RMRParams) {
+	xapp.Logger.Info("handle_e2term_subs_resp")
+	e2SubsResp := e2asnpacker.NewPackerSubscriptionResponse()
+
+	//---------------------------------
+	// e2term activity: Send Subs Resp
+	//---------------------------------
+	xapp.Logger.Info("(%s) Send Subs Resp", e2termConn.desc)
 
 	resp := &e2ap.E2APSubscriptionResponse{}
 
@@ -93,115 +289,11 @@
 		resp.ActionNotAdmittedList.Items = append(resp.ActionNotAdmittedList.Items, item)
 	}
 
-	return resp
-}
-
-//-----------------------------------------------------------------------------
-//
-//-----------------------------------------------------------------------------
-func createSubsDelReq(e2SubsId uint32) *e2ap.E2APSubscriptionDeleteRequest {
-	req := &e2ap.E2APSubscriptionDeleteRequest{}
-	req.RequestId.Id = 1
-	req.RequestId.Seq = e2SubsId
-	req.FunctionId = 1
-	return req
-}
-
-//-----------------------------------------------------------------------------
-//
-//-----------------------------------------------------------------------------
-func createSubsDelResp(req *e2ap.E2APSubscriptionDeleteRequest) *e2ap.E2APSubscriptionDeleteResponse {
-	resp := &e2ap.E2APSubscriptionDeleteResponse{}
-	resp.RequestId.Id = req.RequestId.Id
-	resp.RequestId.Seq = req.RequestId.Seq
-	resp.FunctionId = req.FunctionId
-	return resp
-}
-
-//-----------------------------------------------------------------------------
-//
-//-----------------------------------------------------------------------------
-func handle_xapp_subs_req(t *testing.T) {
-	xapp.Logger.Info("handle_xapp_subs_req start")
-	e2SubsReq := e2asnpacker.NewPackerSubscriptionRequest()
-
-	//---------------------------------
-	// xapp activity: Send Subs Req
-	//---------------------------------
-	//select {
-	//case <-time.After(1 * time.Second):
-	xapp.Logger.Info("(xappConn) Send Subs Req")
-	req := createSubsReq()
-	e2SubsReq.Set(req)
-	xapp.Logger.Debug("%s", e2SubsReq.String())
-	err, packedMsg := e2SubsReq.Pack(nil)
-	if err != nil {
-		testError(t, "(xappConn) pack NOK %s", err.Error())
-	}
-
-	params := &xapp.RMRParams{}
-	params.Mtype = xapp.RIC_SUB_REQ
-	params.SubId = -1
-	params.Payload = packedMsg.Buf
-	params.Meid = &xapp.RMRMeid{RanName: "RAN_NAME_1"}
-	params.Xid = "XID_1"
-	params.Mbuf = nil
-
-	snderr := xappConn.RmrSend(params)
-	if snderr != nil {
-		testError(t, "(xappConn) RMR SEND FAILED: %s", snderr.Error())
-	}
-	//}
-}
-
-//-----------------------------------------------------------------------------
-//
-//-----------------------------------------------------------------------------
-func handle_e2term_subs_req(t *testing.T) (*e2ap.E2APSubscriptionRequest, *xapp.RMRParams) {
-	xapp.Logger.Info("handle_e2term_subs_req start")
-	e2SubsReq := e2asnpacker.NewPackerSubscriptionRequest()
-
-	//---------------------------------
-	// e2term activity: Recv Subs Req
-	//---------------------------------
-	select {
-	case msg := <-e2termConn.rmrConChan:
-		if msg.Mtype != xapp.RICMessageTypes["RIC_SUB_REQ"] {
-			testError(t, "(e2termConn) Received non RIC_SUB_REQ message")
-		} else {
-			xapp.Logger.Info("(e2termConn) Recv Subs Req")
-			packedData := &packer.PackedData{}
-			packedData.Buf = msg.Payload
-			unpackerr := e2SubsReq.UnPack(packedData)
-			if unpackerr != nil {
-				testError(t, "(e2termConn) RIC_SUB_REQ unpack failed err: %s", unpackerr.Error())
-			}
-			geterr, req := e2SubsReq.Get()
-			if geterr != nil {
-				testError(t, "(e2termConn) RIC_SUB_REQ get failed err: %s", geterr.Error())
-			}
-			return req, msg
-		}
-	case <-time.After(15 * time.Second):
-		testError(t, "(e2termConn) Not Received RIC_SUB_REQ within 15 secs")
-	}
-	return nil, nil
-}
-
-func handle_e2term_subs_resp(t *testing.T, req *e2ap.E2APSubscriptionRequest, msg *xapp.RMRParams) {
-	xapp.Logger.Info("handle_e2term_subs_resp start")
-	e2SubsResp := e2asnpacker.NewPackerSubscriptionResponse()
-
-	//---------------------------------
-	// e2term activity: Send Subs Resp
-	//---------------------------------
-	xapp.Logger.Info("(e2termConn) Send Subs Resp")
-	resp := createSubsResp(req)
 	e2SubsResp.Set(resp)
 	xapp.Logger.Debug("%s", e2SubsResp.String())
 	packerr, packedMsg := e2SubsResp.Pack(nil)
 	if packerr != nil {
-		testError(t, "(e2termConn) pack NOK %s", packerr.Error())
+		testError(t, "(%s) pack NOK %s", e2termConn.desc, packerr.Error())
 	}
 
 	params := &xapp.RMRParams{}
@@ -214,94 +306,15 @@
 
 	snderr := e2termConn.RmrSend(params)
 	if snderr != nil {
-		testError(t, "(e2termConn) RMR SEND FAILED: %s", snderr.Error())
+		testError(t, "(%s) RMR SEND FAILED: %s", e2termConn.desc, snderr.Error())
 	}
 }
 
-func handle_e2term_subs_reqandresp(t *testing.T) {
-	req, msg := handle_e2term_subs_req(t)
-	handle_e2term_subs_resp(t, req, msg)
-}
-
 //-----------------------------------------------------------------------------
 //
 //-----------------------------------------------------------------------------
-func handle_xapp_subs_resp(t *testing.T) int {
-	xapp.Logger.Info("handle_xapp_subs_resp start")
-	e2SubsResp := e2asnpacker.NewPackerSubscriptionResponse()
-	var e2SubsId int
-
-	//---------------------------------
-	// xapp activity: Recv Subs Resp
-	//---------------------------------
-	select {
-	case msg := <-xappConn.rmrConChan:
-		if msg.Mtype != xapp.RICMessageTypes["RIC_SUB_RESP"] {
-			testError(t, "(xappConn) Received non RIC_SUB_RESP message")
-		} else {
-			xapp.Logger.Info("(xappConn) Recv Subs Resp")
-
-			packedData := &packer.PackedData{}
-			packedData.Buf = msg.Payload
-			e2SubsId = msg.SubId
-			unpackerr := e2SubsResp.UnPack(packedData)
-
-			if unpackerr != nil {
-				testError(t, "(xappConn) RIC_SUB_RESP unpack failed err: %s", unpackerr.Error())
-			}
-			geterr, _ := e2SubsResp.Get()
-			if geterr != nil {
-				testError(t, "(xappConn) RIC_SUB_RESP get failed err: %s", geterr.Error())
-			}
-
-		}
-	case <-time.After(15 * time.Second):
-		testError(t, "(xappConn) Not Received RIC_SUB_RESP within 15 secs")
-	}
-	return e2SubsId
-}
-
-//-----------------------------------------------------------------------------
-//
-//-----------------------------------------------------------------------------
-func handle_xapp_subs_del_req(t *testing.T, e2SubsId int) {
-	xapp.Logger.Info("handle_xapp_subs_del_req start")
-	e2SubsDelReq := e2asnpacker.NewPackerSubscriptionDeleteRequest()
-
-	//---------------------------------
-	// xapp activity: Send Subs Del Req
-	//---------------------------------
-	//select {
-	//case <-time.After(1 * time.Second):
-	xapp.Logger.Info("(xappConn) Send Subs Del Req")
-	req := createSubsDelReq(uint32(e2SubsId))
-	e2SubsDelReq.Set(req)
-	xapp.Logger.Debug("%s", e2SubsDelReq.String())
-	err, packedMsg := e2SubsDelReq.Pack(nil)
-	if err != nil {
-		testError(t, "(xappConn) pack NOK %s", err.Error())
-	}
-
-	params := &xapp.RMRParams{}
-	params.Mtype = xapp.RIC_SUB_DEL_REQ
-	params.SubId = e2SubsId
-	params.Payload = packedMsg.Buf
-	params.Meid = &xapp.RMRMeid{RanName: "RAN_NAME_1"}
-	params.Xid = "XID_1"
-	params.Mbuf = nil
-
-	snderr := xappConn.RmrSend(params)
-	if snderr != nil {
-		testError(t, "(xappConn) RMR SEND FAILED: %s", snderr.Error())
-	}
-	//}
-}
-
-//-----------------------------------------------------------------------------
-//
-//-----------------------------------------------------------------------------
-func handle_e2term_subs_del_req(t *testing.T) (*e2ap.E2APSubscriptionDeleteRequest, *xapp.RMRParams) {
-	xapp.Logger.Info("handle_e2term_subs_del_req start")
+func (e2termConn *testingE2termControl) handle_e2term_subs_del_req(t *testing.T) (*e2ap.E2APSubscriptionDeleteRequest, *xapp.RMRParams) {
+	xapp.Logger.Info("handle_e2term_subs_del_req")
 	e2SubsDelReq := e2asnpacker.NewPackerSubscriptionDeleteRequest()
 
 	//---------------------------------
@@ -310,42 +323,54 @@
 	select {
 	case msg := <-e2termConn.rmrConChan:
 		if msg.Mtype != xapp.RICMessageTypes["RIC_SUB_DEL_REQ"] {
-			testError(t, "(e2termConn) Received non RIC_SUB_DEL_REQ message")
+			testError(t, "(%s) Received wrong mtype expected %s got %s, error", e2termConn.desc, "RIC_SUB_DEL_REQ", xapp.RicMessageTypeToName[msg.Mtype])
 		} else {
-			xapp.Logger.Info("(e2termConn) Recv Subs Del Req")
+			xapp.Logger.Info("(%s) Recv Subs Del Req", e2termConn.desc)
 
 			packedData := &packer.PackedData{}
 			packedData.Buf = msg.Payload
 			unpackerr := e2SubsDelReq.UnPack(packedData)
 			if unpackerr != nil {
-				testError(t, "(e2termConn) RIC_SUB_DEL_REQ unpack failed err: %s", unpackerr.Error())
+				testError(t, "(%s) RIC_SUB_DEL_REQ unpack failed err: %s", e2termConn.desc, unpackerr.Error())
 			}
 			geterr, req := e2SubsDelReq.Get()
 			if geterr != nil {
-				testError(t, "(e2termConn) RIC_SUB_DEL_REQ get failed err: %s", geterr.Error())
+				testError(t, "(%s) RIC_SUB_DEL_REQ get failed err: %s", e2termConn.desc, geterr.Error())
 			}
 			return req, msg
 		}
 	case <-time.After(15 * time.Second):
-		testError(t, "(e2termConn) Not Received RIC_SUB_DEL_REQ within 15 secs")
+		testError(t, "(%s) Not Received RIC_SUB_DEL_REQ within 15 secs", e2termConn.desc)
 	}
 	return nil, nil
 }
 
-func handle_e2term_subs_del_resp(t *testing.T, req *e2ap.E2APSubscriptionDeleteRequest, msg *xapp.RMRParams) {
-	xapp.Logger.Info("handle_e2term_subs_del_resp start")
+func handle_e2term_recv_empty() bool {
+	if len(e2termConn.rmrConChan) > 0 {
+		return false
+	}
+	return true
+}
+
+func (e2termConn *testingE2termControl) handle_e2term_subs_del_resp(t *testing.T, req *e2ap.E2APSubscriptionDeleteRequest, msg *xapp.RMRParams) {
+	xapp.Logger.Info("handle_e2term_subs_del_resp")
 	e2SubsDelResp := e2asnpacker.NewPackerSubscriptionDeleteResponse()
 
 	//---------------------------------
 	// e2term activity: Send Subs Del Resp
 	//---------------------------------
-	xapp.Logger.Info("(e2termConn) Send Subs Del Resp")
-	resp := createSubsDelResp(req)
+	xapp.Logger.Info("(%s) Send Subs Del Resp", e2termConn.desc)
+
+	resp := &e2ap.E2APSubscriptionDeleteResponse{}
+	resp.RequestId.Id = req.RequestId.Id
+	resp.RequestId.Seq = req.RequestId.Seq
+	resp.FunctionId = req.FunctionId
+
 	e2SubsDelResp.Set(resp)
 	xapp.Logger.Debug("%s", e2SubsDelResp.String())
 	packerr, packedMsg := e2SubsDelResp.Pack(nil)
 	if packerr != nil {
-		testError(t, "(e2termConn) pack NOK %s", packerr.Error())
+		testError(t, "(%s) pack NOK %s", e2termConn.desc, packerr.Error())
 	}
 
 	params := &xapp.RMRParams{}
@@ -358,60 +383,24 @@
 
 	snderr := e2termConn.RmrSend(params)
 	if snderr != nil {
-		testError(t, "(e2termConn) RMR SEND FAILED: %s", snderr.Error())
+		testError(t, "(%s) RMR SEND FAILED: %s", e2termConn.desc, snderr.Error())
 	}
 
 }
 
-func handle_e2term_subs_del_reqandresp(t *testing.T) {
-	req, msg := handle_e2term_subs_del_req(t)
-	handle_e2term_subs_del_resp(t, req, msg)
-}
-
 //-----------------------------------------------------------------------------
 //
 //-----------------------------------------------------------------------------
-func handle_xapp_subs_del_resp(t *testing.T) {
-	xapp.Logger.Info("handle_xapp_subs_del_resp start")
-	e2SubsDelResp := e2asnpacker.NewPackerSubscriptionDeleteResponse()
-
-	//---------------------------------
-	// xapp activity: Recv Subs Del Resp
-	//---------------------------------
-	select {
-	case msg := <-xappConn.rmrConChan:
-		if msg.Mtype != xapp.RICMessageTypes["RIC_SUB_DEL_RESP"] {
-			testError(t, "(xappConn) Received non RIC_SUB_DEL_RESP message")
-		} else {
-			xapp.Logger.Info("(xappConn) Recv Subs Del Resp")
-
-			packedData := &packer.PackedData{}
-			packedData.Buf = msg.Payload
-			unpackerr := e2SubsDelResp.UnPack(packedData)
-			if unpackerr != nil {
-				testError(t, "(xappConn) RIC_SUB_DEL_RESP unpack failed err: %s", unpackerr.Error())
-			}
-			geterr, _ := e2SubsDelResp.Get()
-			if geterr != nil {
-				testError(t, "(xappConn) RIC_SUB_DEL_RESP get failed err: %s", geterr.Error())
-			}
-
+func (mc *testingMainControl) wait_subs_clean(t *testing.T, e2SubsId int, secs int) bool {
+	i := 1
+	for ; i <= secs*2; i++ {
+		if mc.c.registry.IsValidSequenceNumber(uint16(e2SubsId)) == false {
+			return true
 		}
-	case <-time.After(15 * time.Second):
-		testError(t, "(xappConn) Not Received RIC_SUB_DEL_RESP within 15 secs")
+		time.Sleep(500 * time.Millisecond)
 	}
-}
-
-//-----------------------------------------------------------------------------
-//
-//-----------------------------------------------------------------------------
-func handle_wait_subs_clean(t *testing.T, e2SubsId int) bool {
-	xapp.Logger.Info("handle_wait_subs_clean start")
-	if mainCtrl.wait_subs_clean(e2SubsId, 10) == false {
-		testError(t, "(general) no clean within 10 secs")
-		return false
-	}
-	return true
+	testError(t, "(general) no clean within %d secs", secs)
+	return false
 }
 
 //-----------------------------------------------------------------------------
@@ -449,18 +438,20 @@
 //
 //-----------------------------------------------------------------------------
 func TestSubReqAndSubDelOk(t *testing.T) {
-	xapp.Logger.Info("TestSubReqAndSubDelOk start")
+	xapp.Logger.Info("TestSubReqAndSubDelOk")
 
-	handle_xapp_subs_req(t)
-	handle_e2term_subs_reqandresp(t)
-	e2SubsId := handle_xapp_subs_resp(t)
+	xappConn1.handle_xapp_subs_req(t, true)
+	crereq, cremsg := e2termConn.handle_e2term_subs_req(t)
+	e2termConn.handle_e2term_subs_resp(t, crereq, cremsg)
+	e2SubsId := xappConn1.handle_xapp_subs_resp(t)
 
-	handle_xapp_subs_del_req(t, e2SubsId)
-	handle_e2term_subs_del_reqandresp(t)
-	handle_xapp_subs_del_resp(t)
+	xappConn1.handle_xapp_subs_del_req(t, true, e2SubsId)
+	delreq, delmsg := e2termConn.handle_e2term_subs_del_req(t)
+	e2termConn.handle_e2term_subs_del_resp(t, delreq, delmsg)
+	xappConn1.handle_xapp_subs_del_resp(t)
 
 	//Wait that subs is cleaned
-	handle_wait_subs_clean(t, e2SubsId)
+	mainCtrl.wait_subs_clean(t, e2SubsId, 10)
 }
 
 //-----------------------------------------------------------------------------
@@ -492,24 +483,25 @@
 //
 //-----------------------------------------------------------------------------
 func TestSubReqRetransmission(t *testing.T) {
-	xapp.Logger.Info("TestSubReqRetransmission start")
+	xapp.Logger.Info("TestSubReqRetransmission")
 
 	//Subs Create
-	handle_xapp_subs_req(t)
-	req, msg := handle_e2term_subs_req(t)
-	handle_xapp_subs_req(t)
+	xappConn1.handle_xapp_subs_req(t, true)
+	crereq, cremsg := e2termConn.handle_e2term_subs_req(t)
+	xappConn1.handle_xapp_subs_req(t, false)
 
-	handle_e2term_subs_resp(t, req, msg)
+	e2termConn.handle_e2term_subs_resp(t, crereq, cremsg)
 
-	e2SubsId := handle_xapp_subs_resp(t)
+	e2SubsId := xappConn1.handle_xapp_subs_resp(t)
 
 	//Subs Delete
-	handle_xapp_subs_del_req(t, e2SubsId)
-	handle_e2term_subs_del_reqandresp(t)
-	handle_xapp_subs_del_resp(t)
+	xappConn1.handle_xapp_subs_del_req(t, true, e2SubsId)
+	delreq, delmsg := e2termConn.handle_e2term_subs_del_req(t)
+	e2termConn.handle_e2term_subs_del_resp(t, delreq, delmsg)
+	xappConn1.handle_xapp_subs_del_resp(t)
 
 	//Wait that subs is cleaned
-	handle_wait_subs_clean(t, e2SubsId)
+	mainCtrl.wait_subs_clean(t, e2SubsId, 10)
 }
 
 //-----------------------------------------------------------------------------
@@ -540,23 +532,98 @@
 //
 //-----------------------------------------------------------------------------
 func TestSubDelReqRetransmission(t *testing.T) {
-	xapp.Logger.Info("TestSubDelReqRetransmission start")
+	xapp.Logger.Info("TestSubDelReqRetransmission")
 
 	//Subs Create
-	handle_xapp_subs_req(t)
-	handle_e2term_subs_reqandresp(t)
-	e2SubsId := handle_xapp_subs_resp(t)
+	xappConn1.handle_xapp_subs_req(t, true)
+	crereq, cremsg := e2termConn.handle_e2term_subs_req(t)
+	e2termConn.handle_e2term_subs_resp(t, crereq, cremsg)
+	e2SubsId := xappConn1.handle_xapp_subs_resp(t)
 
 	//Subs Delete
-	handle_xapp_subs_del_req(t, e2SubsId)
-	req, msg := handle_e2term_subs_del_req(t)
+	xappConn1.handle_xapp_subs_del_req(t, true, e2SubsId)
+	delreq, delmsg := e2termConn.handle_e2term_subs_del_req(t)
 
 	<-time.After(2 * time.Second)
+	xappConn1.handle_xapp_subs_del_req(t, false, e2SubsId)
 
-	handle_xapp_subs_del_req(t, e2SubsId)
-
-	handle_e2term_subs_del_resp(t, req, msg)
+	e2termConn.handle_e2term_subs_del_resp(t, delreq, delmsg)
+	xappConn1.handle_xapp_subs_del_resp(t)
 
 	//Wait that subs is cleaned
-	handle_wait_subs_clean(t, e2SubsId)
+	mainCtrl.wait_subs_clean(t, e2SubsId, 10)
+}
+
+//-----------------------------------------------------------------------------
+// TestSubReqAndSubDelOkTwoParallel
+//
+//   stub                          stub
+// +-------+     +---------+    +---------+
+// | xapp  |     | submgr  |    | e2term  |
+// +-------+     +---------+    +---------+
+//     |              |              |
+//     |              |              |
+//     |              |              |
+//     | SubReq1      |              |
+//     |------------->|              |
+//     |              |              |
+//     |              | SubReq1      |
+//     |              |------------->|
+//     |              |              |
+//     | SubReq2      |              |
+//     |------------->|              |
+//     |              |              |
+//     |              | SubReq2      |
+//     |              |------------->|
+//     |              |              |
+//     |              |    SubResp1  |
+//     |              |<-------------|
+//     |              |    SubResp2  |
+//     |              |<-------------|
+//     |              |              |
+//     |    SubResp1  |              |
+//     |<-------------|              |
+//     |    SubResp2  |              |
+//     |<-------------|              |
+//     |              |              |
+//     |        [SUBS 1 DELETE]      |
+//     |              |              |
+//     |        [SUBS 2 DELETE]      |
+//     |              |              |
+//
+//-----------------------------------------------------------------------------
+func TestSubReqAndSubDelOkTwoParallel(t *testing.T) {
+	xapp.Logger.Info("TestSubReqAndSubDelOkTwoParallel")
+
+	//Req1
+	xappConn1.handle_xapp_subs_req(t, true)
+	crereq1, cremsg1 := e2termConn.handle_e2term_subs_req(t)
+
+	//Req2
+	xappConn2.handle_xapp_subs_req(t, true)
+	crereq2, cremsg2 := e2termConn.handle_e2term_subs_req(t)
+
+	//Resp1
+	e2termConn.handle_e2term_subs_resp(t, crereq1, cremsg1)
+	e2SubsId1 := xappConn1.handle_xapp_subs_resp(t)
+
+	//Resp2
+	e2termConn.handle_e2term_subs_resp(t, crereq2, cremsg2)
+	e2SubsId2 := xappConn2.handle_xapp_subs_resp(t)
+
+	//Del1
+	xappConn1.handle_xapp_subs_del_req(t, true, e2SubsId1)
+	delreq1, delmsg1 := e2termConn.handle_e2term_subs_del_req(t)
+	e2termConn.handle_e2term_subs_del_resp(t, delreq1, delmsg1)
+	xappConn1.handle_xapp_subs_del_resp(t)
+	//Wait that subs is cleaned
+	mainCtrl.wait_subs_clean(t, e2SubsId1, 10)
+
+	//Del2
+	xappConn2.handle_xapp_subs_del_req(t, true, e2SubsId2)
+	delreq2, delmsg2 := e2termConn.handle_e2term_subs_del_req(t)
+	e2termConn.handle_e2term_subs_del_resp(t, delreq2, delmsg2)
+	xappConn2.handle_xapp_subs_del_resp(t)
+	//Wait that subs is cleaned
+	mainCtrl.wait_subs_clean(t, e2SubsId2, 10)
 }
diff --git a/pkg/control/registry.go b/pkg/control/registry.go
index 4f5c4fd..3ac15f1 100644
--- a/pkg/control/registry.go
+++ b/pkg/control/registry.go
@@ -24,15 +24,20 @@
 	"sync"
 )
 
+type Subscription struct {
+	Seq       uint16
+	Confirmed bool
+}
+
 type Registry struct {
-	register map[uint16]bool
+	register map[uint16]*Subscription
 	counter  uint16
 	mutex    sync.Mutex
 }
 
 // This method should run as a constructor
 func (r *Registry) Initialize(seedsn uint16) {
-	r.register = make(map[uint16]bool)
+	r.register = make(map[uint16]*Subscription)
 	r.counter = seedsn
 }
 
@@ -46,7 +51,7 @@
 		xapp.Logger.Error("Invalid SeqenceNumber sequenceNumber: %v", sequenceNumber)
 		return sequenceNumber, false
 	}
-	r.register[sequenceNumber] = false
+	r.register[sequenceNumber] = &Subscription{sequenceNumber, false}
 
 	// Allocate next SequenceNumber value
 	if r.counter == 65535 {
@@ -72,14 +77,14 @@
 func (r *Registry) setSubscriptionToConfirmed(sn uint16) {
 	r.mutex.Lock()
 	defer r.mutex.Unlock()
-	r.register[sn] = true
+	r.register[sn].Confirmed = true
 }
 
 //This function sets the given id as unused in the register
 func (r *Registry) deleteSubscription(sn uint16) {
 	r.mutex.Lock()
 	defer r.mutex.Unlock()
-	r.register[sn] = false
+	r.register[sn].Confirmed = false
 }
 
 //This function releases the given id as unused in the register