blob: 9012094faef4f0f52ee8bef246d1a3f2797927a5 [file] [log] [blame]
/*
==================================================================================
Copyright (c) 2020 AT&T Intellectual Property.
Copyright (c) 2020 Nokia
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==================================================================================
*/
package main
import (
"encoding/json"
"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
"os"
"strconv"
"time"
)
var delay int // used for the delay receiver
var handlerID string // used for the delay receiver too
var doQuery bool // used for the query receiver
type a1Receiver struct {
msgChan chan *xapp.RMRParams
appReady bool
rmrReady bool
}
type policyRequest struct {
Operation string `json:"operation"`
PolicyTypeID int `json:"policy_type_id"`
PolicyInstanceID string `json:"policy_instance_id"`
Pay interface{} `json:"payload"`
}
type policyRequestResponse struct {
PolicyTypeID int `json:"policy_type_id"`
PolicyInstanceID string `json:"policy_instance_id"`
HandlerID string `json:"handler_id"`
Status string `json:"status"`
}
type policyQuery struct {
PolicyTypeID int `json:"policy_type_id"`
}
func (e *a1Receiver) sendMsgRetry(params *xapp.RMRParams) {
// helper for rmr that handles retries and sleep
retries := 0
for { // just keep trying until it works
if e.rmrReady { // we must wait for ready, else SendMsg will blow with a nullptr
if ok := xapp.Rmr.SendMsg(params); ok {
xapp.Logger.Info("Msg successfully sent after %d retries!", retries)
return
}
retries++
//xapp.Logger.Info("Query failed to send...")
} else {
xapp.Logger.Info("rmr not ready...")
time.Sleep(time.Duration(1) * time.Second)
}
}
}
func (e *a1Receiver) handlePolicyReq(msg *xapp.RMRParams) {
// unmarshal the request
var dat policyRequest
if err := json.Unmarshal(msg.Payload, &dat); err != nil {
panic(err)
}
var status string
switch dat.Operation {
case "CREATE":
status = "OK"
case "DELETE":
status = "DELETED"
}
// form the response
res := &policyRequestResponse{
dat.PolicyTypeID,
dat.PolicyInstanceID,
"test_receiver",
status,
}
outgoing, err := json.Marshal(res)
if err != nil {
panic(err)
}
/*
WARNING:
we want to use rts here. However, the current go xapp framework rts is broken.
*/
params := &xapp.RMRParams{
Mtype: 20011,
Payload: outgoing,
}
if delay > 0 {
xapp.Logger.Info("Xapp is sleeping...")
time.Sleep(time.Duration(delay) * time.Second) // so much work to replicate python's time.sleep(5)...
}
e.sendMsgRetry(params)
xapp.Logger.Info("Policy response sent!")
}
func (e *a1Receiver) sendQuery() {
// form the query
res := &policyQuery{
1006001,
}
outgoing, err := json.Marshal(res)
if err != nil {
panic(err)
}
params := &xapp.RMRParams{
Mtype: 20012,
Payload: outgoing,
}
for {
/* We do this in a loop here, because even when the query first works, it could be the case that
a1 does not even have the type yet, or there are no instances yet. In this integration test,
we just keep pounding away so that eventually a1 returns the list this int test is looking for.
A real xapp would NOT call the query in a loop like this.
*/
e.sendMsgRetry(params)
xapp.Logger.Info("Query sent successfully")
time.Sleep(time.Duration(1) * time.Second)
}
}
func (e *a1Receiver) messageLoop() {
for {
xapp.Logger.Info("Waiting for message..")
msg := <-e.msgChan
xapp.Logger.Info("Message received!")
defer xapp.Rmr.Free(msg.Mbuf)
switch msg.Mtype {
case 20010:
e.handlePolicyReq(msg)
default:
panic("Unexpected message type!")
}
}
}
// Consume: This named function is a required callback for e to use the xapp interface. it is called on all received rmr messages.
func (e *a1Receiver) Consume(rp *xapp.RMRParams) (err error) {
e.msgChan <- rp
return
}
func (e *a1Receiver) Run() {
// Set MDC (read: name visible in the logs)
xapp.Logger.SetMdc(handlerID, "0.1.0")
/* from reading the xapp frame code...
this SetReadyCB sets off a chain of events..
it sets readycb and readycbparams at the module level in xapp.go
nothing happens yet..
when the xapp is ran with` xapp.Run, this callback actually gets passed into the Rmr client which is not exposed in the xapp
Rmr.SetReadyCB(xappReadyCb, nil)
This "primes" the rmr client with it's own readycb, which is now set to this callback function
When the rmr client is ready, it invokes the callback
so basically, when rmr is ready, this function is invoked
I think the xapp frame code could have been greatly simplified by just passing this into the invocation of Run() and then just passing that into the rmr client init!
*/
xapp.SetReadyCB(func(d interface{}) { e.rmrReady = true }, true)
// start message loop. We cannot wait for e.rmrReady here since that doesn't get populated until Run() runs.
go e.messageLoop()
if doQuery {
// we are in the query tester; kick off a loop that does that until it works
go e.sendQuery()
}
xapp.Run(e)
}
func newA1Receiver(appReady, rmrReady bool) *a1Receiver {
return &a1Receiver{
msgChan: make(chan *xapp.RMRParams),
rmrReady: rmrReady,
appReady: appReady,
}
}
func main() {
delay = 0
if d, ok := os.LookupEnv("TEST_RCV_SEC_DELAY"); ok {
delay, _ = strconv.Atoi(d)
}
handlerID = "test_receiver"
if hid, ok := os.LookupEnv("HANDLER_ID"); ok {
handlerID = hid
}
doQuery = false
if _, ok := os.LookupEnv("DO_QUERY"); ok {
doQuery = true
}
newA1Receiver(true, false).Run()
}