A1 to send policy instance data via RMR

Enabling a1 to send policy instance data to xapps which subscribed to
the message type via RMR

Signed-off-by: naman.gupta <naman.gupta@samsung.com>
Change-Id: Ib5feab97ae8e538f792a13fafc5705c845b433a4
diff --git a/a1-go/go.mod b/a1-go/go.mod
index a189744..26b642f 100644
--- a/a1-go/go.mod
+++ b/a1-go/go.mod
@@ -26,6 +26,8 @@
 
 replace gerrit.o-ran-sc.org/r/com/golog => gerrit.o-ran-sc.org/r/com/golog.git v0.0.2
 
+replace gerrit.o-ran-sc.org/r/ric-plt/xapp-frame => gerrit.o-ran-sc.org/r/ric-plt/xapp-frame.git v0.9.3
+
 require (
 	gerrit.o-ran-sc.org/r/com/golog v0.0.2
 	gerrit.o-ran-sc.org/r/ric-plt/sdlgo v0.7.0
@@ -41,4 +43,5 @@
 	github.com/xeipuuv/gojsonschema v1.2.0
 	golang.org/x/net v0.0.0-20201110031124-69a78807bb2b
 	k8s.io/utils v0.0.0-20210930125809-cb0fa318a74b // indirect
+	gerrit.o-ran-sc.org/r/ric-plt/xapp-frame v0.0.0-00010101000000-000000000000
 )
diff --git a/a1-go/pkg/restful/restful.go b/a1-go/pkg/restful/restful.go
index f34a5ad..ec63201 100644
--- a/a1-go/pkg/restful/restful.go
+++ b/a1-go/pkg/restful/restful.go
@@ -117,7 +117,7 @@
 
 	server := restapi.NewServer(r.api)
 	defer server.Shutdown()
-	server.Port = 8080
+	server.Port = 10000
 	server.Host = "0.0.0.0"
 	if err := server.Serve(); err != nil {
 		log.Fatal(err.Error())
diff --git a/a1-go/pkg/resthooks/resthooks.go b/a1-go/pkg/resthooks/resthooks.go
index 45c3928..79dba93 100644
--- a/a1-go/pkg/resthooks/resthooks.go
+++ b/a1-go/pkg/resthooks/resthooks.go
@@ -31,6 +31,7 @@
 	"gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/a1"
 	"gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/models"
 	"gerrit.o-ran-sc.org/r/ric-plt/sdlgo"
+	"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
 	"github.com/santhosh-tekuri/jsonschema/v5"
 	"gopkg.in/yaml.v2"
 )
@@ -72,12 +73,13 @@
 	return err == invalidJsonSchema
 }
 func NewResthook() *Resthook {
-	return createResthook(sdlgo.NewSyncStorage())
+	return createResthook(sdlgo.NewSyncStorage(), rmr.NewRMRSender())
 }
 
-func createResthook(sdlInst iSdl) *Resthook {
+func createResthook(sdlInst iSdl, rmrSenderInst rmr.IRmrSender) *Resthook {
 	return &Resthook{
-		db: sdlInst,
+		db:             sdlInst,
+		iRmrSenderInst: rmrSenderInst,
 	}
 }
 
@@ -363,6 +365,13 @@
 		if iscreated {
 			a1.Logger.Debug("policy instance metadata created")
 		}
+		isSent := rh.iRmrSenderInst.RmrSendToXapp(httpBodyString)
+		if isSent {
+			a1.Logger.Debug("rmrSendToXapp : message sent")
+		} else {
+			a1.Logger.Debug("rmrSendToXapp : message not sent")
+		}
+
 	} else {
 		a1.Logger.Error("%+v", invalidJsonSchema)
 		return invalidJsonSchema
diff --git a/a1-go/pkg/resthooks/resthooks_test.go b/a1-go/pkg/resthooks/resthooks_test.go
index da062aa..4466927 100644
--- a/a1-go/pkg/resthooks/resthooks_test.go
+++ b/a1-go/pkg/resthooks/resthooks_test.go
@@ -33,8 +33,13 @@
 	"github.com/stretchr/testify/mock"
 )
 
+type RmrSenderMock struct {
+	mock.Mock
+}
+
 var rh *Resthook
 var sdlInst *SdlMock
+var rmrSenderInst *RmrSenderMock
 
 func TestMain(m *testing.M) {
 	sdlInst = new(SdlMock)
@@ -46,9 +51,9 @@
 		"a1.policy_type.20000",
 		"a1.policy_inst_metadata.1006001.qos",
 	}, nil)
-
+	RMRclient = new(RMRClientMock)
 	a1.Init()
-	rh = createResthook(sdlInst)
+	rh = createResthook(sdlInst, RMRclient)
 	code := m.Run()
 	os.Exit(code)
 }
@@ -139,6 +144,7 @@
 	a1.Logger.Debug("metadatainstancekey   : %+v", metadatainstancekey)
 	metadatainstancearr := []interface{}{metadatainstancekey, string(metadata)}
 	sdlInst.On("Set", "A1m_ns", metadatainstancearr).Return(nil)
+	rmrSenderInst.On("RmrSendToXapp", "httpBodyString").Return(true)
 
 	errresp := rh.CreatePolicyInstance(policyTypeId, policyInstanceID, instancedata)
 
@@ -210,13 +216,14 @@
 		policySchemaString = `{"create_schema":{"$schema":"http://json-schema.org/draft-07/schema#","properties":{"additionalProperties":false,"blocking_rate":{"default":10,"description":"% Connections to block","maximum":1001,"minimum":1,"type":"number"},"enforce":{"default":"true","type":"boolean"},"window_length":{"default":1,"description":"Sliding window length (in minutes)","maximum":60,"minimum":1,"type":"integer"}},"type":"object"},"description":"various parameters to control admission of dual connection","name":"admission_control_policy_mine","policy_type_id":20001}`
 		key = a1PolicyPrefix + strconv.FormatInt((policytypeid), 10)
 	}
-	a1.Logger.Error(" policy SchemaString %+v", policySchemaString)
+	a1.Logger.Debug(" policy SchemaString %+v", policySchemaString)
 	policyTypeSchema, _ := json.Marshal((policySchemaString))
-	a1.Logger.Error(" policyTypeSchema %+v", string(policyTypeSchema))
+	a1.Logger.Debug(" policyTypeSchema %+v", string(policyTypeSchema))
 
-	a1.Logger.Error(" key for policy type %+v", key)
+	a1.Logger.Debug(" key for policy type %+v", key)
 	mp := map[string]interface{}{key: string(policySchemaString)}
-	a1.Logger.Error("Get Called and mp return %+v ", mp)
+	a1.Logger.Debug("Get Called and mp return %+v ", mp)
+
 	return mp, nil
 }
 
@@ -233,3 +240,8 @@
 	args := s.MethodCalled("SetIfNotExists", ns, key, oldData, newData)
 	return args.Bool(0), args.Error(1)
 }
+
+func (rmr *RmrSenderMock) RmrSendToXapp(httpBodyString string) bool {
+	args := rmr.MethodCalled("RmrSendToXapp", httpBodyString)
+	return args.Bool(0)
+}
diff --git a/a1-go/pkg/resthooks/types.go b/a1-go/pkg/resthooks/types.go
index 8302874..01f4258 100644
--- a/a1-go/pkg/resthooks/types.go
+++ b/a1-go/pkg/resthooks/types.go
@@ -20,10 +20,14 @@
 */
 package resthooks
 
-type Resthook struct {
-	db iSdl
-}
+import (
+	"gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/rmr"
+)
 
+type Resthook struct {
+	db             iSdl
+	iRmrSenderInst rmr.IRmrSender
+}
 type iSdl interface {
 	GetAll(string) ([]string, error)
 	SetIfNotExists(ns string, key string, data interface{}) (bool, error)
@@ -31,3 +35,7 @@
 	SetIf(ns string, key string, oldData, newData interface{}) (bool, error)
 	Set(ns string, pairs ...interface{}) error
 }
+
+type iRMRClient interface {
+	SendMsg(params *xapp.RMRParams) bool
+}
diff --git a/a1-go/pkg/rmr/rmr.go b/a1-go/pkg/rmr/rmr.go
new file mode 100644
index 0000000..78adf46
--- /dev/null
+++ b/a1-go/pkg/rmr/rmr.go
@@ -0,0 +1,74 @@
+/*
+==================================================================================
+  Copyright (c) 2022 Samsung
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+
+   This source code is part of the near-RT RIC (RAN Intelligent Controller)
+   platform project (RICP).
+==================================================================================
+*/
+
+package rmr
+
+import (
+	"gerrit.o-ran-sc.org/r/ric-plt/a1/pkg/a1"
+	"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
+)
+
+const (
+	a1PolicyRequest = 20010
+	a1SourceName    = "service-ricplt-a1mediator-http"
+)
+
+type RmrSender struct {
+	rmrclient *xapp.RMRClient
+}
+
+type IRmrSender interface {
+	RmrSendToXapp(httpBodyString string) bool
+}
+
+func NewRMRSender() IRmrSender {
+	RMRclient := xapp.NewRMRClientWithParams(&xapp.RMRClientParams{
+		StatDesc: "",
+		RmrData: xapp.PortData{
+			Name:              "",
+			MaxSize:           65534,
+			ThreadType:        0,
+			LowLatency:        false,
+			FastAck:           false,
+			MaxRetryOnFailure: 1,
+		},
+	})
+	return &RmrSender{
+		rmrclient: RMRclient,
+	}
+}
+
+func (rmr *RmrSender) RmrSendToXapp(httpBodyString string) bool {
+
+	params := &xapp.RMRParams{}
+	params.Mtype = a1PolicyRequest
+	params.SubId = -1
+	params.Xid = ""
+	params.Meid = &xapp.RMRMeid{}
+	params.Src = a1SourceName
+	params.PayloadLen = len([]byte(httpBodyString))
+	params.Payload = []byte(httpBodyString)
+	a1.Logger.Debug("MSG to XAPP: %s ", params.String())
+	a1.Logger.Debug("len payload %+v", len(params.Payload))
+	s := rmr.rmrclient.SendMsg(params)
+	a1.Logger.Debug("rmrSendToXapp: sending: %+v", s)
+	return s
+}