| /* |
| ================================================================================== |
| Copyright (c) 2020 AT&T Intellectual Property. |
| Copyright (c) 2020 Nokia |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| ================================================================================== |
| */ |
| package main |
| |
| import ( |
| "encoding/json" |
| "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" |
| "os" |
| "strconv" |
| "time" |
| ) |
| |
| var delay int // used for the delay receiver |
| var handlerID string // used for the delay receiver too |
| var doQuery bool // used for the query receiver |
| |
| type a1Receiver struct { |
| msgChan chan *xapp.RMRParams |
| appReady bool |
| rmrReady bool |
| } |
| |
| type policyRequest struct { |
| Operation string `json:"operation"` |
| PolicyTypeID int `json:"policy_type_id"` |
| PolicyInstanceID string `json:"policy_instance_id"` |
| Pay interface{} `json:"payload"` |
| } |
| |
| type policyRequestResponse struct { |
| PolicyTypeID int `json:"policy_type_id"` |
| PolicyInstanceID string `json:"policy_instance_id"` |
| HandlerID string `json:"handler_id"` |
| Status string `json:"status"` |
| } |
| |
| type policyQuery struct { |
| PolicyTypeID int `json:"policy_type_id"` |
| } |
| |
| func (e *a1Receiver) sendMsgRetry(params *xapp.RMRParams) { |
| // helper for rmr that handles retries and sleep |
| retries := 0 |
| for { // just keep trying until it works |
| if e.rmrReady { // we must wait for ready, else SendMsg will blow with a nullptr |
| if ok := xapp.Rmr.SendMsg(params); ok { |
| xapp.Logger.Info("Msg successfully sent after %d retries!", retries) |
| return |
| } |
| retries++ |
| //xapp.Logger.Info("Query failed to send...") |
| } else { |
| xapp.Logger.Info("rmr not ready...") |
| time.Sleep(time.Duration(1) * time.Second) |
| } |
| } |
| } |
| |
| func (e *a1Receiver) handlePolicyReq(msg *xapp.RMRParams) { |
| |
| // unmarshal the request |
| var dat policyRequest |
| if err := json.Unmarshal(msg.Payload, &dat); err != nil { |
| panic(err) |
| } |
| |
| var status string |
| switch dat.Operation { |
| case "CREATE": |
| status = "OK" |
| case "DELETE": |
| status = "DELETED" |
| } |
| |
| // form the response |
| res := &policyRequestResponse{ |
| dat.PolicyTypeID, |
| dat.PolicyInstanceID, |
| "test_receiver", |
| status, |
| } |
| |
| outgoing, err := json.Marshal(res) |
| if err != nil { |
| panic(err) |
| } |
| |
| /* |
| WARNING: |
| we want to use rts here. However, the current go xapp framework rts is broken. |
| */ |
| params := &xapp.RMRParams{ |
| Mtype: 20011, |
| Payload: outgoing, |
| } |
| |
| if delay > 0 { |
| xapp.Logger.Info("Xapp is sleeping...") |
| time.Sleep(time.Duration(delay) * time.Second) // so much work to replicate python's time.sleep(5)... |
| } |
| |
| e.sendMsgRetry(params) |
| |
| xapp.Logger.Info("Policy response sent!") |
| } |
| |
| func (e *a1Receiver) sendQuery() { |
| // form the query |
| res := &policyQuery{ |
| 1006001, |
| } |
| outgoing, err := json.Marshal(res) |
| if err != nil { |
| panic(err) |
| } |
| params := &xapp.RMRParams{ |
| Mtype: 20012, |
| Payload: outgoing, |
| } |
| |
| for { |
| /* We do this in a loop here, because even when the query first works, it could be the case that |
| a1 does not even have the type yet, or there are no instances yet. In this integration test, |
| we just keep pounding away so that eventually a1 returns the list this int test is looking for. |
| A real xapp would NOT call the query in a loop like this. |
| */ |
| e.sendMsgRetry(params) |
| xapp.Logger.Info("Query sent successfully") |
| time.Sleep(time.Duration(1) * time.Second) |
| } |
| } |
| |
| func (e *a1Receiver) messageLoop() { |
| for { |
| xapp.Logger.Info("Waiting for message..") |
| |
| msg := <-e.msgChan |
| |
| xapp.Logger.Info("Message received!") |
| defer xapp.Rmr.Free(msg.Mbuf) |
| |
| switch msg.Mtype { |
| case 20010: |
| e.handlePolicyReq(msg) |
| default: |
| panic("Unexpected message type!") |
| } |
| } |
| } |
| |
| // Consume: This named function is a required callback for e to use the xapp interface. it is called on all received rmr messages. |
| func (e *a1Receiver) Consume(rp *xapp.RMRParams) (err error) { |
| e.msgChan <- rp |
| return |
| } |
| |
| func (e *a1Receiver) Run() { |
| // Set MDC (read: name visible in the logs) |
| xapp.Logger.SetMdc(handlerID, "0.1.0") |
| |
| /* from reading the xapp frame code... |
| this SetReadyCB sets off a chain of events.. |
| it sets readycb and readycbparams at the module level in xapp.go |
| nothing happens yet.. |
| when the xapp is ran with` xapp.Run, this callback actually gets passed into the Rmr client which is not exposed in the xapp |
| Rmr.SetReadyCB(xappReadyCb, nil) |
| This "primes" the rmr client with it's own readycb, which is now set to this callback function |
| When the rmr client is ready, it invokes the callback |
| so basically, when rmr is ready, this function is invoked |
| I think the xapp frame code could have been greatly simplified by just passing this into the invocation of Run() and then just passing that into the rmr client init! |
| */ |
| xapp.SetReadyCB(func(d interface{}) { e.rmrReady = true }, true) |
| |
| // start message loop. We cannot wait for e.rmrReady here since that doesn't get populated until Run() runs. |
| go e.messageLoop() |
| |
| if doQuery { |
| // we are in the query tester; kick off a loop that does that until it works |
| go e.sendQuery() |
| } |
| |
| xapp.Run(e) |
| } |
| |
| func newA1Receiver(appReady, rmrReady bool) *a1Receiver { |
| return &a1Receiver{ |
| msgChan: make(chan *xapp.RMRParams), |
| rmrReady: rmrReady, |
| appReady: appReady, |
| } |
| } |
| |
| func main() { |
| |
| delay = 0 |
| if d, ok := os.LookupEnv("TEST_RCV_SEC_DELAY"); ok { |
| delay, _ = strconv.Atoi(d) |
| } |
| |
| handlerID = "test_receiver" |
| if hid, ok := os.LookupEnv("HANDLER_ID"); ok { |
| handlerID = hid |
| } |
| |
| doQuery = false |
| if _, ok := os.LookupEnv("DO_QUERY"); ok { |
| doQuery = true |
| } |
| |
| newA1Receiver(true, false).Run() |
| } |