| /* |
| ================================================================================== |
| Copyright (c) 2019 AT&T Intellectual Property. |
| Copyright (c) 2019 Nokia |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| ================================================================================== |
| */ |
| |
| package control |
| |
| import ( |
| "encoding/json" |
| "fmt" |
| "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_models" |
| "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" |
| "io/ioutil" |
| "net/http" |
| "os" |
| "strconv" |
| "strings" |
| "testing" |
| "time" |
| ) |
| |
| //----------------------------------------------------------------------------- |
| // |
| //----------------------------------------------------------------------------- |
| |
| type httpRtmgrMsg struct { |
| msg *rtmgr_models.XappSubscriptionData |
| w http.ResponseWriter |
| r *http.Request |
| } |
| |
| func (msg *httpRtmgrMsg) RetOk() { |
| msg.w.WriteHeader(200) |
| } |
| |
| type testingHttpRtmgrControl struct { |
| desc string |
| port string |
| useChannel bool |
| msgChan chan *httpRtmgrMsg |
| } |
| |
| func (hc *testingHttpRtmgrControl) UseChannel(flag bool) { |
| hc.useChannel = flag |
| } |
| |
| func (hc *testingHttpRtmgrControl) WaitReq(t *testing.T) *httpRtmgrMsg { |
| xapp.Logger.Info("(%s) handle_rtmgr_req", hc.desc) |
| select { |
| case msg := <-hc.msgChan: |
| return msg |
| case <-time.After(15 * time.Second): |
| testError(t, "(%s) Not Received RTMGR Subscription message within 15 secs", hc.desc) |
| return nil |
| } |
| return nil |
| } |
| |
| func (hc *testingHttpRtmgrControl) http_handler(w http.ResponseWriter, r *http.Request) { |
| var req rtmgr_models.XappSubscriptionData |
| err := json.NewDecoder(r.Body).Decode(&req) |
| if err != nil { |
| xapp.Logger.Error("%s", err.Error()) |
| } |
| xapp.Logger.Info("(%s) handling Address=%s Port=%d SubscriptionID=%d", hc.desc, *req.Address, *req.Port, *req.SubscriptionID) |
| msg := &httpRtmgrMsg{ |
| msg: &req, |
| w: w, |
| r: r, |
| } |
| if hc.useChannel { |
| hc.msgChan <- msg |
| } else { |
| msg.RetOk() |
| } |
| } |
| |
| func (hc *testingHttpRtmgrControl) run() { |
| http.HandleFunc("/", hc.http_handler) |
| http.ListenAndServe("localhost:"+hc.port, nil) |
| } |
| |
| func initTestingHttpRtmgrControl(desc string, port string) *testingHttpRtmgrControl { |
| hc := &testingHttpRtmgrControl{} |
| hc.desc = desc |
| hc.port = port |
| hc.useChannel = false |
| hc.msgChan = make(chan *httpRtmgrMsg) |
| return hc |
| } |
| |
| //----------------------------------------------------------------------------- |
| // |
| //----------------------------------------------------------------------------- |
| type testingRmrControl struct { |
| desc string |
| syncChan chan struct{} |
| } |
| |
| func (tc *testingRmrControl) ReadyCB(data interface{}) { |
| xapp.Logger.Info("testingRmrControl(%s) ReadyCB", tc.desc) |
| tc.syncChan <- struct{}{} |
| return |
| } |
| |
| func (tc *testingRmrControl) WaitCB() { |
| <-tc.syncChan |
| } |
| |
| func initTestingControl(desc string, rtfile string, port string) testingRmrControl { |
| tc := testingRmrControl{} |
| os.Setenv("RMR_SEED_RT", rtfile) |
| os.Setenv("RMR_SRC_ID", "localhost:"+port) |
| xapp.Logger.Info("Using rt file %s", os.Getenv("RMR_SEED_RT")) |
| xapp.Logger.Info("Using src id %s", os.Getenv("RMR_SRC_ID")) |
| tc.desc = strings.ToUpper(desc) |
| tc.syncChan = make(chan struct{}) |
| return tc |
| } |
| |
| //----------------------------------------------------------------------------- |
| // |
| //----------------------------------------------------------------------------- |
| type testingRmrStubControl struct { |
| testingRmrControl |
| rmrClientTest *xapp.RMRClient |
| active bool |
| } |
| |
| func (tc *testingRmrStubControl) RmrSend(params *RMRParams) (err error) { |
| // |
| //NOTE: Do this way until xapp-frame sending is improved |
| // |
| xapp.Logger.Info("(%s) RmrSend %s", tc.desc, params.String()) |
| status := false |
| i := 1 |
| for ; i <= 10 && status == false; i++ { |
| status = tc.rmrClientTest.SendMsg(params.RMRParams) |
| if status == false { |
| xapp.Logger.Info("(%s) RmrSend failed. Retry count %v, %s", tc.desc, i, params.String()) |
| time.Sleep(500 * time.Millisecond) |
| } |
| } |
| if status == false { |
| err = fmt.Errorf("(%s) RmrSend failed. Retry count %v, %s", tc.desc, i, params.String()) |
| xapp.Rmr.Free(params.Mbuf) |
| } |
| return |
| } |
| |
| func initTestingRmrControl(desc string, rtfile string, port string, stat string, consumer xapp.MessageConsumer) testingRmrStubControl { |
| tc := testingRmrStubControl{} |
| tc.active = false |
| tc.testingRmrControl = initTestingControl(desc, rtfile, port) |
| tc.rmrClientTest = xapp.NewRMRClientWithParams("tcp:"+port, 4096, 1, stat) |
| tc.rmrClientTest.SetReadyCB(tc.ReadyCB, nil) |
| go tc.rmrClientTest.Start(consumer) |
| tc.WaitCB() |
| return tc |
| } |
| |
| //----------------------------------------------------------------------------- |
| // |
| //----------------------------------------------------------------------------- |
| type testingMessageChannel struct { |
| rmrConChan chan *RMRParams |
| } |
| |
| func initTestingMessageChannel() testingMessageChannel { |
| mc := testingMessageChannel{} |
| mc.rmrConChan = make(chan *RMRParams) |
| return mc |
| } |
| |
| //----------------------------------------------------------------------------- |
| // |
| //----------------------------------------------------------------------------- |
| type xappTransaction struct { |
| tc *testingXappControl |
| xid string |
| meid *xapp.RMRMeid |
| } |
| |
| type testingXappControl struct { |
| testingRmrStubControl |
| testingMessageChannel |
| xid_seq uint64 |
| } |
| |
| func (tc *testingXappControl) newXid() string { |
| var xid string |
| xid = tc.desc + "_XID_" + strconv.FormatUint(uint64(tc.xid_seq), 10) |
| tc.xid_seq++ |
| return xid |
| } |
| |
| func (tc *testingXappControl) newXappTransaction(xid *string, ranname string) *xappTransaction { |
| trans := &xappTransaction{} |
| trans.tc = tc |
| if xid == nil { |
| trans.xid = tc.newXid() |
| } else { |
| trans.xid = *xid |
| } |
| trans.meid = &xapp.RMRMeid{RanName: ranname} |
| return trans |
| } |
| |
| func (tc *testingXappControl) Consume(params *xapp.RMRParams) (err error) { |
| xapp.Rmr.Free(params.Mbuf) |
| params.Mbuf = nil |
| msg := &RMRParams{params} |
| |
| if params.Mtype == 55555 { |
| xapp.Logger.Info("(%s) Testing message ignore %s", tc.desc, msg.String()) |
| tc.active = true |
| return |
| } |
| |
| if strings.Contains(msg.Xid, tc.desc) { |
| xapp.Logger.Info("(%s) Consume %s", tc.desc, msg.String()) |
| tc.rmrConChan <- msg |
| } else { |
| xapp.Logger.Info("(%s) Ignore %s", tc.desc, msg.String()) |
| } |
| return |
| } |
| |
| func createNewXappControl(desc string, rtfile string, port string, stat string) *testingXappControl { |
| xappCtrl := &testingXappControl{} |
| xappCtrl.testingRmrStubControl = initTestingRmrControl(desc, rtfile, port, stat, xappCtrl) |
| xappCtrl.testingMessageChannel = initTestingMessageChannel() |
| xappCtrl.xid_seq = 1 |
| return xappCtrl |
| } |
| |
| //----------------------------------------------------------------------------- |
| // |
| //----------------------------------------------------------------------------- |
| type testingE2termControl struct { |
| testingRmrStubControl |
| testingMessageChannel |
| } |
| |
| func (tc *testingE2termControl) Consume(params *xapp.RMRParams) (err error) { |
| xapp.Rmr.Free(params.Mbuf) |
| params.Mbuf = nil |
| msg := &RMRParams{params} |
| |
| if params.Mtype == 55555 { |
| xapp.Logger.Info("(%s) Testing message ignore %s", tc.desc, msg.String()) |
| tc.active = true |
| return |
| } |
| |
| xapp.Logger.Info("(%s) Consume %s", tc.desc, msg.String()) |
| tc.rmrConChan <- msg |
| return |
| } |
| |
| func createNewE2termControl(desc string, rtfile string, port string, stat string) *testingE2termControl { |
| e2termCtrl := &testingE2termControl{} |
| e2termCtrl.testingRmrStubControl = initTestingRmrControl(desc, rtfile, port, stat, e2termCtrl) |
| e2termCtrl.testingMessageChannel = initTestingMessageChannel() |
| return e2termCtrl |
| } |
| |
| //----------------------------------------------------------------------------- |
| // |
| //----------------------------------------------------------------------------- |
| type testingMainControl struct { |
| testingRmrControl |
| c *Control |
| } |
| |
| func createNewMainControl(desc string, rtfile string, port string) *testingMainControl { |
| mainCtrl = &testingMainControl{} |
| mainCtrl.testingRmrControl = initTestingControl(desc, rtfile, port) |
| mainCtrl.c = NewControl() |
| xapp.SetReadyCB(mainCtrl.ReadyCB, nil) |
| go xapp.RunWithParams(mainCtrl.c, false) |
| mainCtrl.WaitCB() |
| return mainCtrl |
| } |
| |
| //----------------------------------------------------------------------------- |
| // |
| //----------------------------------------------------------------------------- |
| |
| func testError(t *testing.T, pattern string, args ...interface{}) { |
| xapp.Logger.Error(fmt.Sprintf(pattern, args...)) |
| t.Errorf(fmt.Sprintf(pattern, args...)) |
| } |
| |
| func testLog(t *testing.T, pattern string, args ...interface{}) { |
| xapp.Logger.Info(fmt.Sprintf(pattern, args...)) |
| t.Logf(fmt.Sprintf(pattern, args...)) |
| } |
| |
| func testCreateTmpFile(str string) (string, error) { |
| file, err := ioutil.TempFile("/tmp", "*.rt") |
| if err != nil { |
| return "", err |
| } |
| _, err = file.WriteString(str) |
| if err != nil { |
| file.Close() |
| return "", err |
| } |
| return file.Name(), nil |
| } |
| |
| //----------------------------------------------------------------------------- |
| // |
| //----------------------------------------------------------------------------- |
| |
| var xappConn1 *testingXappControl |
| var xappConn2 *testingXappControl |
| var e2termConn *testingE2termControl |
| var mainCtrl *testingMainControl |
| var rtmgrHttp *testingHttpRtmgrControl |
| |
| func TestMain(m *testing.M) { |
| xapp.Logger.Info("TestMain start") |
| |
| //--------------------------------- |
| // |
| //--------------------------------- |
| rtmgrHttp = initTestingHttpRtmgrControl("RTMGRSTUB", "8989") |
| go rtmgrHttp.run() |
| |
| //--------------------------------- |
| // |
| //--------------------------------- |
| |
| // |
| //Cfg creation won't work like this as xapp-frame reads it during init. |
| // |
| /* |
| cfgstr:=`{ |
| "local": { |
| "host": ":8080" |
| }, |
| "logger": { |
| "level": 4 |
| }, |
| "rmr": { |
| "protPort": "tcp:14560", |
| "maxSize": 4096, |
| "numWorkers": 1, |
| "txMessages": ["RIC_SUB_REQ", "RIC_SUB_DEL_REQ"], |
| "rxMessages": ["RIC_SUB_RESP", "RIC_SUB_FAILURE", "RIC_SUB_DEL_RESP", "RIC_SUB_DEL_FAILURE", "RIC_INDICATION"] |
| }, |
| "db": { |
| "host": "localhost", |
| "port": 6379, |
| "namespaces": ["sdl", "rnib"] |
| }, |
| "rtmgr" : { |
| "HostAddr" : "localhost", |
| "port" : "8989", |
| "baseUrl" : "/" |
| } |
| ` |
| |
| cfgfilename,_ := testCreateTmpFile(cfgstr) |
| defer os.Remove(cfgfilename) |
| os.Setenv("CFG_FILE", cfgfilename) |
| */ |
| xapp.Logger.Info("Using cfg file %s", os.Getenv("CFG_FILE")) |
| |
| //--------------------------------- |
| // Static routetable for rmr |
| // |
| // NOTE: Routing table is configured so, that responses |
| // are duplicated to xapp1 and xapp2 instances. |
| // If XID is not matching xapp stub will just |
| // drop message. (Messages 12011, 12012, 12021, 12022) |
| // |
| // 14560 submgr |
| // 15560 e2term stub |
| // 13560 xapp1 stub |
| // 13660 xapp2 stub |
| // |
| //--------------------------------- |
| |
| allrt := `newrt|start |
| mse|12010|-1|localhost:14560 |
| mse|12010,localhost:14560|-1|localhost:15560 |
| mse|12011,localhost:15560|-1|localhost:14560 |
| mse|12012,localhost:15560|-1|localhost:14560 |
| mse|12011,localhost:14560|-1|localhost:13660;localhost:13560 |
| mse|12012,localhost:14560|-1|localhost:13660;localhost:13560 |
| mse|12020|-1|localhost:14560 |
| mse|12020,localhost:14560|-1|localhost:15560 |
| mse|12021,localhost:15560|-1|localhost:14560 |
| mse|12022,localhost:15560|-1|localhost:14560 |
| mse|12021,localhost:14560|-1|localhost:13660;localhost:13560 |
| mse|12022,localhost:14560|-1|localhost:13660;localhost:13560 |
| mse|55555|-1|localhost:13660;localhost:13560,localhost:15560 |
| newrt|end |
| ` |
| |
| //--------------------------------- |
| // |
| //--------------------------------- |
| xapp.Logger.Info("### submgr main run ###") |
| |
| subsrt := allrt |
| /* |
| subsrt := `newrt|start |
| mse|12010|-1|localhost:14560 |
| mse|12010,localhost:14560|-1|localhost:15560 |
| mse|12011,localhost:15560|-1|localhost:14560 |
| mse|12011|-1|localhost:13560;localhost:13660 |
| mse|12012,localhost:15560|-1|localhost:14560 |
| mse|12012|-1|localhost:13560;localhost:13660 |
| mse|12020|-1|localhost:14560 |
| mse|12020,localhost:14560|-1|localhost:15560 |
| mse|12021,localhost:15560|-1|localhost:14560 |
| mse|12021|-1|localhost:13560;localhost:13660 |
| mse|12022,localhost:15560|-1|localhost:14560 |
| mse|12022|-1|localhost:13560;localhost:13660 |
| newrt|end |
| ` |
| */ |
| |
| subrtfilename, _ := testCreateTmpFile(subsrt) |
| defer os.Remove(subrtfilename) |
| mainCtrl = createNewMainControl("main", subrtfilename, "14560") |
| |
| //--------------------------------- |
| // |
| //--------------------------------- |
| xapp.Logger.Info("### xapp1 rmr run ###") |
| |
| xapprt1 := allrt |
| /* |
| xapprt1 := `newrt|start |
| mse|12010|-1|localhost:14560 |
| mse|12011|-1|localhost:13560 |
| mse|12012|-1|localhost:13560 |
| mse|12020|-1|localhost:14560 |
| mse|12021|-1|localhost:13560 |
| mse|12022|-1|localhost:13560 |
| newrt|end |
| ` |
| */ |
| |
| xapprtfilename1, _ := testCreateTmpFile(xapprt1) |
| defer os.Remove(xapprtfilename1) |
| xappConn1 = createNewXappControl("xappstub1", xapprtfilename1, "13560", "RMRXAPP1STUB") |
| |
| //--------------------------------- |
| // |
| //--------------------------------- |
| |
| xapp.Logger.Info("### xapp2 rmr run ###") |
| |
| xapprt2 := allrt |
| /* |
| xapprt2 := `newrt|start |
| mse|12010|-1|localhost:14560 |
| mse|12011|-1|localhost:13660 |
| mse|12012|-1|localhost:13660 |
| mse|12020|-1|localhost:14560 |
| mse|12021|-1|localhost:13660 |
| mse|12022|-1|localhost:13660 |
| newrt|end |
| ` |
| */ |
| |
| xapprtfilename2, _ := testCreateTmpFile(xapprt2) |
| defer os.Remove(xapprtfilename2) |
| xappConn2 = createNewXappControl("xappstub2", xapprtfilename2, "13660", "RMRXAPP2STUB") |
| |
| //--------------------------------- |
| // |
| //--------------------------------- |
| xapp.Logger.Info("### e2term rmr run ###") |
| |
| e2termrt := allrt |
| /* |
| e2termrt := `newrt|start |
| mse|12010|-1|localhost:15560 |
| mse|12011|-1|localhost:14560 |
| mse|12012|-1|localhost:14560 |
| mse|12020|-1|localhost:15560 |
| mse|12021|-1|localhost:14560 |
| mse|12022|-1|localhost:14560 |
| newrt|end |
| ` |
| */ |
| |
| e2termrtfilename, _ := testCreateTmpFile(e2termrt) |
| defer os.Remove(e2termrtfilename) |
| e2termConn = createNewE2termControl("e2termstub", e2termrtfilename, "15560", "RMRE2TERMSTUB") |
| |
| //--------------------------------- |
| // Testing message sending |
| //--------------------------------- |
| var dummyBuf []byte = make([]byte, 100) |
| |
| params := &RMRParams{&xapp.RMRParams{}} |
| params.Mtype = 55555 |
| params.SubId = -1 |
| params.Payload = dummyBuf |
| params.PayloadLen = 100 |
| params.Meid = &xapp.RMRMeid{RanName: "NONEXISTINGRAN"} |
| params.Xid = "THISISTESTFORSTUBS" |
| params.Mbuf = nil |
| |
| status := false |
| i := 1 |
| for ; i <= 10 && status == false; i++ { |
| xapp.Rmr.Send(params.RMRParams, false) |
| if e2termConn.active == true && xappConn1.active == true && xappConn2.active == true { |
| status = true |
| break |
| } else { |
| xapp.Logger.Info("Sleep 0.5 secs and try routes again") |
| time.Sleep(500 * time.Millisecond) |
| } |
| } |
| |
| if status == false { |
| xapp.Logger.Error("Could not initialize routes") |
| os.Exit(1) |
| } |
| |
| //--------------------------------- |
| // |
| //--------------------------------- |
| code := m.Run() |
| os.Exit(code) |
| } |