A1 to send policy instance data via RMR
Enabling a1 to send policy instance data to xapps which subscribed to
the message type via RMR
Signed-off-by: naman.gupta <naman.gupta@samsung.com>
Change-Id: Ib5feab97ae8e538f792a13fafc5705c845b433a4
diff --git a/a1-go/go.mod b/a1-go/go.mod
index a189744..26b642f 100644
--- a/a1-go/go.mod
+++ b/a1-go/go.mod
@@ -26,6 +26,8 @@
replace gerrit.o-ran-sc.org/r/com/golog => gerrit.o-ran-sc.org/r/com/golog.git v0.0.2
+replace gerrit.o-ran-sc.org/r/ric-plt/xapp-frame => gerrit.o-ran-sc.org/r/ric-plt/xapp-frame.git v0.9.3
+
require (
gerrit.o-ran-sc.org/r/com/golog v0.0.2
gerrit.o-ran-sc.org/r/ric-plt/sdlgo v0.7.0
@@ -41,4 +43,5 @@
github.com/xeipuuv/gojsonschema v1.2.0
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b
k8s.io/utils v0.0.0-20210930125809-cb0fa318a74b // indirect
+ gerrit.o-ran-sc.org/r/ric-plt/xapp-frame v0.0.0-00010101000000-000000000000
)
diff --git a/a1-go/pkg/restful/restful.go b/a1-go/pkg/restful/restful.go
index f34a5ad..ec63201 100644
--- a/a1-go/pkg/restful/restful.go
+++ b/a1-go/pkg/restful/restful.go
@@ -117,7 +117,7 @@
server := restapi.NewServer(r.api)
defer server.Shutdown()
- server.Port = 8080
+ server.Port = 10000
server.Host = "0.0.0.0"
if err := server.Serve(); err != nil {
log.Fatal(err.Error())
diff --git a/a1-go/pkg/resthooks/resthooks.go b/a1-go/pkg/resthooks/resthooks.go
index 45c3928..79dba93 100644
--- a/a1-go/pkg/resthooks/resthooks.go
+++ b/a1-go/pkg/resthooks/resthooks.go
@@ -31,6 +31,7 @@
"gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/a1"
"gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/models"
"gerrit.o-ran-sc.org/r/ric-plt/sdlgo"
+ "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
"github.com/santhosh-tekuri/jsonschema/v5"
"gopkg.in/yaml.v2"
)
@@ -72,12 +73,13 @@
return err == invalidJsonSchema
}
func NewResthook() *Resthook {
- return createResthook(sdlgo.NewSyncStorage())
+ return createResthook(sdlgo.NewSyncStorage(), rmr.NewRMRSender())
}
-func createResthook(sdlInst iSdl) *Resthook {
+func createResthook(sdlInst iSdl, rmrSenderInst rmr.IRmrSender) *Resthook {
return &Resthook{
- db: sdlInst,
+ db: sdlInst,
+ iRmrSenderInst: rmrSenderInst,
}
}
@@ -363,6 +365,13 @@
if iscreated {
a1.Logger.Debug("policy instance metadata created")
}
+ isSent := rh.iRmrSenderInst.RmrSendToXapp(httpBodyString)
+ if isSent {
+ a1.Logger.Debug("rmrSendToXapp : message sent")
+ } else {
+ a1.Logger.Debug("rmrSendToXapp : message not sent")
+ }
+
} else {
a1.Logger.Error("%+v", invalidJsonSchema)
return invalidJsonSchema
diff --git a/a1-go/pkg/resthooks/resthooks_test.go b/a1-go/pkg/resthooks/resthooks_test.go
index da062aa..4466927 100644
--- a/a1-go/pkg/resthooks/resthooks_test.go
+++ b/a1-go/pkg/resthooks/resthooks_test.go
@@ -33,8 +33,13 @@
"github.com/stretchr/testify/mock"
)
+type RmrSenderMock struct {
+ mock.Mock
+}
+
var rh *Resthook
var sdlInst *SdlMock
+var rmrSenderInst *RmrSenderMock
func TestMain(m *testing.M) {
sdlInst = new(SdlMock)
@@ -46,9 +51,9 @@
"a1.policy_type.20000",
"a1.policy_inst_metadata.1006001.qos",
}, nil)
-
+ RMRclient = new(RMRClientMock)
a1.Init()
- rh = createResthook(sdlInst)
+ rh = createResthook(sdlInst, RMRclient)
code := m.Run()
os.Exit(code)
}
@@ -139,6 +144,7 @@
a1.Logger.Debug("metadatainstancekey : %+v", metadatainstancekey)
metadatainstancearr := []interface{}{metadatainstancekey, string(metadata)}
sdlInst.On("Set", "A1m_ns", metadatainstancearr).Return(nil)
+ rmrSenderInst.On("RmrSendToXapp", "httpBodyString").Return(true)
errresp := rh.CreatePolicyInstance(policyTypeId, policyInstanceID, instancedata)
@@ -210,13 +216,14 @@
policySchemaString = `{"create_schema":{"$schema":"http://json-schema.org/draft-07/schema#","properties":{"additionalProperties":false,"blocking_rate":{"default":10,"description":"% Connections to block","maximum":1001,"minimum":1,"type":"number"},"enforce":{"default":"true","type":"boolean"},"window_length":{"default":1,"description":"Sliding window length (in minutes)","maximum":60,"minimum":1,"type":"integer"}},"type":"object"},"description":"various parameters to control admission of dual connection","name":"admission_control_policy_mine","policy_type_id":20001}`
key = a1PolicyPrefix + strconv.FormatInt((policytypeid), 10)
}
- a1.Logger.Error(" policy SchemaString %+v", policySchemaString)
+ a1.Logger.Debug(" policy SchemaString %+v", policySchemaString)
policyTypeSchema, _ := json.Marshal((policySchemaString))
- a1.Logger.Error(" policyTypeSchema %+v", string(policyTypeSchema))
+ a1.Logger.Debug(" policyTypeSchema %+v", string(policyTypeSchema))
- a1.Logger.Error(" key for policy type %+v", key)
+ a1.Logger.Debug(" key for policy type %+v", key)
mp := map[string]interface{}{key: string(policySchemaString)}
- a1.Logger.Error("Get Called and mp return %+v ", mp)
+ a1.Logger.Debug("Get Called and mp return %+v ", mp)
+
return mp, nil
}
@@ -233,3 +240,8 @@
args := s.MethodCalled("SetIfNotExists", ns, key, oldData, newData)
return args.Bool(0), args.Error(1)
}
+
+func (rmr *RmrSenderMock) RmrSendToXapp(httpBodyString string) bool {
+ args := rmr.MethodCalled("RmrSendToXapp", httpBodyString)
+ return args.Bool(0)
+}
diff --git a/a1-go/pkg/resthooks/types.go b/a1-go/pkg/resthooks/types.go
index 8302874..01f4258 100644
--- a/a1-go/pkg/resthooks/types.go
+++ b/a1-go/pkg/resthooks/types.go
@@ -20,10 +20,14 @@
*/
package resthooks
-type Resthook struct {
- db iSdl
-}
+import (
+ "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/rmr"
+)
+type Resthook struct {
+ db iSdl
+ iRmrSenderInst rmr.IRmrSender
+}
type iSdl interface {
GetAll(string) ([]string, error)
SetIfNotExists(ns string, key string, data interface{}) (bool, error)
@@ -31,3 +35,7 @@
SetIf(ns string, key string, oldData, newData interface{}) (bool, error)
Set(ns string, pairs ...interface{}) error
}
+
+type iRMRClient interface {
+ SendMsg(params *xapp.RMRParams) bool
+}
diff --git a/a1-go/pkg/rmr/rmr.go b/a1-go/pkg/rmr/rmr.go
new file mode 100644
index 0000000..78adf46
--- /dev/null
+++ b/a1-go/pkg/rmr/rmr.go
@@ -0,0 +1,74 @@
+/*
+==================================================================================
+ Copyright (c) 2022 Samsung
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+ This source code is part of the near-RT RIC (RAN Intelligent Controller)
+ platform project (RICP).
+==================================================================================
+*/
+
+package rmr
+
+import (
+ "gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/a1"
+ "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
+)
+
+const (
+ a1PolicyRequest = 20010
+ a1SourceName = "service-ricplt-a1mediator-http"
+)
+
+type RmrSender struct {
+ rmrclient *xapp.RMRClient
+}
+
+type IRmrSender interface {
+ RmrSendToXapp(httpBodyString string) bool
+}
+
+func NewRMRSender() IRmrSender {
+ RMRclient := xapp.NewRMRClientWithParams(&xapp.RMRClientParams{
+ StatDesc: "",
+ RmrData: xapp.PortData{
+ Name: "",
+ MaxSize: 65534,
+ ThreadType: 0,
+ LowLatency: false,
+ FastAck: false,
+ MaxRetryOnFailure: 1,
+ },
+ })
+ return &RmrSender{
+ rmrclient: RMRclient,
+ }
+}
+
+func (rmr *RmrSender) RmrSendToXapp(httpBodyString string) bool {
+
+ params := &xapp.RMRParams{}
+ params.Mtype = a1PolicyRequest
+ params.SubId = -1
+ params.Xid = ""
+ params.Meid = &xapp.RMRMeid{}
+ params.Src = a1SourceName
+ params.PayloadLen = len([]byte(httpBodyString))
+ params.Payload = []byte(httpBodyString)
+ a1.Logger.Debug("MSG to XAPP: %s ", params.String())
+ a1.Logger.Debug("len payload %+v", len(params.Payload))
+ s := rmr.rmrclient.SendMsg(params)
+ a1.Logger.Debug("rmrSendToXapp: sending: %+v", s)
+ return s
+}