blob: 9012094faef4f0f52ee8bef246d1a3f2797927a5 [file] [log] [blame]
Tommy Carpenter78ba2732020-02-07 14:06:20 -05001/*
2==================================================================================
3 Copyright (c) 2020 AT&T Intellectual Property.
4 Copyright (c) 2020 Nokia
5
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
9
10 http://www.apache.org/licenses/LICENSE-2.0
11
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17==================================================================================
18*/
19package main
20
21import (
22 "encoding/json"
23 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
24 "os"
25 "strconv"
26 "time"
27)
28
29var delay int // used for the delay receiver
30var handlerID string // used for the delay receiver too
31var doQuery bool // used for the query receiver
32
33type a1Receiver struct {
34 msgChan chan *xapp.RMRParams
35 appReady bool
36 rmrReady bool
37}
38
39type policyRequest struct {
40 Operation string `json:"operation"`
41 PolicyTypeID int `json:"policy_type_id"`
42 PolicyInstanceID string `json:"policy_instance_id"`
43 Pay interface{} `json:"payload"`
44}
45
46type policyRequestResponse struct {
47 PolicyTypeID int `json:"policy_type_id"`
48 PolicyInstanceID string `json:"policy_instance_id"`
49 HandlerID string `json:"handler_id"`
50 Status string `json:"status"`
51}
52
53type policyQuery struct {
54 PolicyTypeID int `json:"policy_type_id"`
55}
56
57func (e *a1Receiver) sendMsgRetry(params *xapp.RMRParams) {
58 // helper for rmr that handles retries and sleep
59 retries := 0
60 for { // just keep trying until it works
61 if e.rmrReady { // we must wait for ready, else SendMsg will blow with a nullptr
62 if ok := xapp.Rmr.SendMsg(params); ok {
63 xapp.Logger.Info("Msg successfully sent after %d retries!", retries)
64 return
65 }
66 retries++
67 //xapp.Logger.Info("Query failed to send...")
68 } else {
69 xapp.Logger.Info("rmr not ready...")
70 time.Sleep(time.Duration(1) * time.Second)
71 }
72 }
73}
74
75func (e *a1Receiver) handlePolicyReq(msg *xapp.RMRParams) {
76
77 // unmarshal the request
78 var dat policyRequest
79 if err := json.Unmarshal(msg.Payload, &dat); err != nil {
80 panic(err)
81 }
82
83 var status string
84 switch dat.Operation {
85 case "CREATE":
86 status = "OK"
87 case "DELETE":
88 status = "DELETED"
89 }
90
91 // form the response
92 res := &policyRequestResponse{
93 dat.PolicyTypeID,
94 dat.PolicyInstanceID,
95 "test_receiver",
96 status,
97 }
98
99 outgoing, err := json.Marshal(res)
100 if err != nil {
101 panic(err)
102 }
103
104 /*
105 WARNING:
106 we want to use rts here. However, the current go xapp framework rts is broken.
107 */
108 params := &xapp.RMRParams{
109 Mtype: 20011,
110 Payload: outgoing,
111 }
112
113 if delay > 0 {
114 xapp.Logger.Info("Xapp is sleeping...")
115 time.Sleep(time.Duration(delay) * time.Second) // so much work to replicate python's time.sleep(5)...
116 }
117
118 e.sendMsgRetry(params)
119
120 xapp.Logger.Info("Policy response sent!")
121}
122
123func (e *a1Receiver) sendQuery() {
124 // form the query
125 res := &policyQuery{
126 1006001,
127 }
128 outgoing, err := json.Marshal(res)
129 if err != nil {
130 panic(err)
131 }
132 params := &xapp.RMRParams{
133 Mtype: 20012,
134 Payload: outgoing,
135 }
136
137 for {
138 /* We do this in a loop here, because even when the query first works, it could be the case that
139 a1 does not even have the type yet, or there are no instances yet. In this integration test,
140 we just keep pounding away so that eventually a1 returns the list this int test is looking for.
141 A real xapp would NOT call the query in a loop like this.
142 */
143 e.sendMsgRetry(params)
144 xapp.Logger.Info("Query sent successfully")
145 time.Sleep(time.Duration(1) * time.Second)
146 }
147}
148
149func (e *a1Receiver) messageLoop() {
150 for {
151 xapp.Logger.Info("Waiting for message..")
152
153 msg := <-e.msgChan
154
155 xapp.Logger.Info("Message received!")
156 defer xapp.Rmr.Free(msg.Mbuf)
157
158 switch msg.Mtype {
159 case 20010:
160 e.handlePolicyReq(msg)
161 default:
162 panic("Unexpected message type!")
163 }
164 }
165}
166
167// Consume: This named function is a required callback for e to use the xapp interface. it is called on all received rmr messages.
168func (e *a1Receiver) Consume(rp *xapp.RMRParams) (err error) {
169 e.msgChan <- rp
170 return
171}
172
173func (e *a1Receiver) Run() {
174 // Set MDC (read: name visible in the logs)
175 xapp.Logger.SetMdc(handlerID, "0.1.0")
176
177 /* from reading the xapp frame code...
178 this SetReadyCB sets off a chain of events..
179 it sets readycb and readycbparams at the module level in xapp.go
180 nothing happens yet..
181 when the xapp is ran with` xapp.Run, this callback actually gets passed into the Rmr client which is not exposed in the xapp
182 Rmr.SetReadyCB(xappReadyCb, nil)
183 This "primes" the rmr client with it's own readycb, which is now set to this callback function
184 When the rmr client is ready, it invokes the callback
185 so basically, when rmr is ready, this function is invoked
186 I think the xapp frame code could have been greatly simplified by just passing this into the invocation of Run() and then just passing that into the rmr client init!
187 */
188 xapp.SetReadyCB(func(d interface{}) { e.rmrReady = true }, true)
189
190 // start message loop. We cannot wait for e.rmrReady here since that doesn't get populated until Run() runs.
191 go e.messageLoop()
192
193 if doQuery {
194 // we are in the query tester; kick off a loop that does that until it works
195 go e.sendQuery()
196 }
197
198 xapp.Run(e)
199}
200
201func newA1Receiver(appReady, rmrReady bool) *a1Receiver {
202 return &a1Receiver{
203 msgChan: make(chan *xapp.RMRParams),
204 rmrReady: rmrReady,
205 appReady: appReady,
206 }
207}
208
209func main() {
210
211 delay = 0
212 if d, ok := os.LookupEnv("TEST_RCV_SEC_DELAY"); ok {
213 delay, _ = strconv.Atoi(d)
214 }
215
216 handlerID = "test_receiver"
217 if hid, ok := os.LookupEnv("HANDLER_ID"); ok {
218 handlerID = hid
219 }
220
221 doQuery = false
222 if _, ok := os.LookupEnv("DO_QUERY"); ok {
223 doQuery = true
224 }
225
226 newA1Receiver(true, false).Run()
227}