blob: 180c3ae9f91838b96b47246bab7c48d7337da77f [file] [log] [blame]
Tommy Carpenter21f659c2020-02-26 14:12:54 -05001"""
2Framework for python xapps
3Framework here means Xapp classes that can be subclassed
4"""
5# ==================================================================================
6# Copyright (c) 2020 Nokia
7# Copyright (c) 2020 AT&T Intellectual Property.
8#
9# Licensed under the Apache License, Version 2.0 (the "License");
10# you may not use this file except in compliance with the License.
11# You may obtain a copy of the License at
12#
13# http://www.apache.org/licenses/LICENSE-2.0
14#
15# Unless required by applicable law or agreed to in writing, software
16# distributed under the License is distributed on an "AS IS" BASIS,
17# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18# See the License for the specific language governing permissions and
19# limitations under the License.
20# ==================================================================================
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -040021from threading import Thread
Tommy Carpenter21f659c2020-02-26 14:12:54 -050022from ricxappframe import xapp_rmr
23from ricxappframe.xapp_sdl import SDLWrapper
24from rmr import rmr
25from mdclogpy import Logger
26
Tommy Carpenter09894e32020-04-02 19:45:19 -040027# constants
28RIC_HEALTH_CHECK_REQ = 100
29RIC_HEALTH_CHECK_RESP = 101
30
Tommy Carpenter21f659c2020-02-26 14:12:54 -050031
Tommy Carpenter21f659c2020-02-26 14:12:54 -050032# Private base class; not for direct client use
33
34
35class _BaseXapp:
36 """
37 Base xapp; not for client use directly
38 """
39
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -040040 def __init__(self, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
Tommy Carpenter21f659c2020-02-26 14:12:54 -050041 """
42 Init
43
44 Parameters
45 ----------
46 rmr_port: int
47 port to listen on
48
49 rmr_wait_for_ready: bool (optional)
50 if this is True, then init waits until rmr is ready to send, which includes having a valid routing file.
51 this can be set to False if the client only wants to *receive only*
52
53 use_fake_sdl: bool (optional)
54 if this is True, it uses dbaas' "fake dict backend" instead of Redis or other backends.
55 Set this to true when developing your xapp or during unit testing to completely avoid needing a dbaas running or any network at all
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -040056
57 post_init: function (optional)
58 runs this user provided function after the base xapp is initialized
59 it's signature should be post_init(self)
Tommy Carpenter21f659c2020-02-26 14:12:54 -050060 """
Tommy Carpenter0f8305b2020-03-18 10:34:28 -040061 # PUBLIC, can be used by xapps using self.(name):
62 self.logger = Logger(name=__name__)
Tommy Carpenter21f659c2020-02-26 14:12:54 -050063
64 # Start rmr rcv thread
65 self._rmr_loop = xapp_rmr.RmrLoop(port=rmr_port, wait_for_ready=rmr_wait_for_ready)
66 self._mrc = self._rmr_loop.mrc # for convenience
67
68 # SDL
69 self._sdl = SDLWrapper(use_fake_sdl)
70
Tommy Carpenter99a0b482020-03-03 10:21:24 -050071 # run the optionally provided user post init
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -040072 if post_init:
73 post_init(self)
Tommy Carpenter99a0b482020-03-03 10:21:24 -050074
75 # Public rmr methods
76
Tommy Carpenter21f659c2020-02-26 14:12:54 -050077 def rmr_get_messages(self):
78 """
79 returns a generator iterable over all current messages in the queue that have not yet been read by the client xapp
80 """
81 while not self._rmr_loop.rcv_queue.empty():
82 (summary, sbuf) = self._rmr_loop.rcv_queue.get()
83 yield (summary, sbuf)
84
85 def rmr_send(self, payload, mtype, retries=100):
86 """
87 Allocates a buffer, sets payload and mtype, and sends
88
89 Parameters
90 ----------
91 payload: bytes
92 payload to set
93 mtype: int
94 message type
95 retries: int (optional)
96 Number of times to retry at the application level before excepting RMRFailure
97
98 Returns
99 -------
100 bool
101 whether or not the send worked after retries attempts
102 """
103 sbuf = rmr.rmr_alloc_msg(vctx=self._mrc, size=len(payload), payload=payload, gen_transaction_id=True, mtype=mtype)
104
105 for _ in range(retries):
106 sbuf = rmr.rmr_send_msg(self._mrc, sbuf)
107 if sbuf.contents.state == 0:
108 self.rmr_free(sbuf)
109 return True
110
111 self.rmr_free(sbuf)
112 return False
113
114 def rmr_rts(self, sbuf, new_payload=None, new_mtype=None, retries=100):
115 """
116 Allows the xapp to return to sender, possibly adjusting the payload and message type before doing so
117
118 This does NOT free the sbuf for the caller as the caller may wish to perform multiple rts per buffer.
119 The client needs to free.
120
121 Parameters
122 ----------
123 sbuf: ctypes c_void_p
124 Pointer to an rmr message buffer
125 new_payload: bytes (optional)
126 New payload to set
127 new_mtype: int (optional)
128 New message type (replaces the received message)
129 retries: int (optional)
130 Number of times to retry at the application level before excepting RMRFailure
131
132 Returns
133 -------
134 bool
135 whether or not the send worked after retries attempts
136 """
137 for _ in range(retries):
138 sbuf = rmr.rmr_rts_msg(self._mrc, sbuf, payload=new_payload, mtype=new_mtype)
139 if sbuf.contents.state == 0:
140 return True
141
Tommy Carpenter09894e32020-04-02 19:45:19 -0400142 self.logger.info("RTS Failed! Summary: {}".format(rmr.message_summary(sbuf)))
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500143 return False
144
145 def rmr_free(self, sbuf):
146 """
147 Free an rmr message buffer after use
148
149 Note: this does not need to be a class method, self is not used. However if we break it out as a function we need a home for it.
150 Parameters
151 ----------
152 sbuf: ctypes c_void_p
153 Pointer to an rmr message buffer
154 """
155 rmr.rmr_free_msg(sbuf)
156
157 # SDL
158 # NOTE, even though these are passthroughs, the seperate SDL wrapper is useful for other applications like A1.
159 # Therefore, we don't embed that SDLWrapper functionality here so that it can be instantiated on it's own.
160
161 def sdl_set(self, ns, key, value, usemsgpack=True):
162 """
163 set a key
164
165 Parameters
166 ----------
167 ns: string
168 the sdl namespace
169 key: string
170 the sdl key
171 value:
172 if usemsgpack is True, value can be anything serializable by msgpack
173 if usemsgpack is False, value must be bytes
174 usemsgpack: boolean (optional)
175 determines whether the value is serialized using msgpack
176 """
177 self._sdl.set(ns, key, value, usemsgpack)
178
179 def sdl_get(self, ns, key, usemsgpack=True):
180 """
181 get a key
182
183 Parameters
184 ----------
185 ns: string
186 the sdl namespace
187 key: string
188 the sdl key
189 usemsgpack: boolean (optional)
190 if usemsgpack is True, the value is deserialized using msgpack
191 if usemsgpack is False, the value is returned as raw bytes
192
193 Returns
194 -------
195 None (if not exist) or see above; depends on usemsgpack
196 """
197 return self._sdl.get(ns, key, usemsgpack)
198
199 def sdl_find_and_get(self, ns, prefix, usemsgpack=True):
200 """
201 get all k v pairs that start with prefix
202
203 Parameters
204 ----------
205 ns: string
206 the sdl namespace
207 key: string
208 the sdl key
209 prefix: string
210 the prefix
211 usemsgpack: boolean (optional)
212 if usemsgpack is True, the value returned is a dict where each value has been deserialized using msgpack
213 if usemsgpack is False, the value returned is as a dict mapping keys to raw bytes
214
215 Returns
216 -------
217 {} (if no keys match) or see above; depends on usemsgpack
218 """
219 return self._sdl.find_and_get(ns, prefix, usemsgpack)
220
221 def sdl_delete(self, ns, key):
222 """
223 delete a key
224
225 Parameters
226 ----------
227 ns: string
228 the sdl namespace
229 key: string
230 the sdl key
231 """
232 self._sdl.delete(ns, key)
233
234 # Health
235
236 def healthcheck(self):
237 """
238 this needs to be understood how this is supposed to work
239 """
240 return self._rmr_loop.healthcheck() and self._sdl.healthcheck()
241
242 def stop(self):
243 """
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -0400244 cleans up and stops the xapp rmr thread (currently)
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500245 This is critical for unit testing as pytest will never return if the thread is running.
246
247 TODO: can we register a ctrl-c handler so this gets called on ctrl-c? Because currently two ctrl-c are needed to stop
248 """
249 self._rmr_loop.stop()
250
251
252# Public Classes to subclass (these subclass _BaseXapp)
253
254
255class RMRXapp(_BaseXapp):
256 """
257 Represents an xapp that is purely driven by rmr messages (i.e., when messages are received, the xapp does something
258 When run is called, the xapp framework waits for rmr messages, and calls the client provided consume callback on every one
259 """
260
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -0400261 def __init__(self, default_handler, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500262 """
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -0400263 Parameters
264 ----------
265 default_handler: function
266 a function with the signature (summary, sbuf) to be called when a message of type message_type is received
267 summary: dict
268 the rmr message summary
269 sbuf: ctypes c_void_p
270 Pointer to an rmr message buffer. The user must call free on this when done.
271
272 post_init: function (optional)
273 optionally runs this function after the app initializes and before the run loop
274 it's signature should be post_init(self)
275
276 For the other parameters, see _BaseXapp
277 """
278 # init base
279 super().__init__(
280 rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl, post_init=post_init
281 )
282
283 # setup callbacks
284 self._default_handler = default_handler
285 self._dispatch = {}
286
287 # used for thread control
288 self._keep_going = True
289
Tommy Carpenter09894e32020-04-02 19:45:19 -0400290 # register a default healthcheck handler
291 # this default checks that rmr is working and SDL is working
292 # the user can override this and register their own handler if they wish since the "last registered callback wins".
293 def handle_healthcheck(self, summary, sbuf):
294 ok = self.healthcheck()
295 payload = b"OK\n" if ok else b"ERROR [RMR or SDL is unhealthy]\n"
296 self.rmr_rts(sbuf, new_payload=payload, new_mtype=RIC_HEALTH_CHECK_RESP)
297 self.rmr_free(sbuf)
298
299 self.register_callback(handle_healthcheck, RIC_HEALTH_CHECK_REQ)
300
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -0400301 def register_callback(self, handler, message_type):
302 """
303 registers this xapp to call handler(summary, buf) when an rmr message is received of type message_type
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500304
305 Parameters
306 ----------
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -0400307 handler: function
308 a function with the signature (summary, sbuf) to be called when a message of type message_type is received
309 summary: dict
310 the rmr message summary
311 sbuf: ctypes c_void_p
312 Pointer to an rmr message buffer. The user must call free on this when done.
313
314 message:type: int
315 the message type to look for
316
317 Note if this method is called multiple times for a single message type, the "last one wins".
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500318 """
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -0400319 self._dispatch[message_type] = handler
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500320
Tommy Carpenter1c9ce6b2020-03-13 09:36:36 -0400321 def run(self, thread=False):
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500322 """
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -0400323 This function should be called when the client xapp is ready to wait for their handlers to be called on received messages
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500324
Tommy Carpenter1c9ce6b2020-03-13 09:36:36 -0400325 Parameters
326 ----------
327 thread: bool (optional)
328 if thread is True, execution is returned to caller and the queue read loop is executed in a thread.
329 The thread can be stopped using .stop()
330 if False, execution is not returned and the framework loops
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500331 """
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -0400332
333 def loop():
334 while self._keep_going:
335 if not self._rmr_loop.rcv_queue.empty():
336 (summary, sbuf) = self._rmr_loop.rcv_queue.get()
337 # dispatch
338 func = self._dispatch.get(summary["message type"], None)
339 if not func:
340 func = self._default_handler
341 func(self, summary, sbuf)
342
Tommy Carpenter1c9ce6b2020-03-13 09:36:36 -0400343 if thread:
344 Thread(target=loop).start()
345 else:
346 loop()
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -0400347
348 def stop(self):
349 """
350 stops the rmr xapp completely.
351 """
352 super().stop()
Tommy Carpenter0f8305b2020-03-18 10:34:28 -0400353 self.logger.debug("Stopping queue reading thread..")
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -0400354 self._keep_going = False
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500355
356
357class Xapp(_BaseXapp):
358 """
359 Represents an xapp where the client provides a generic function to call, which is mostly likely a loop-forever loop
360 """
361
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -0400362 def __init__(self, entrypoint, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False):
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500363 """
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -0400364 Parameters
365 ----------
366 entrypoint: function
367 this function is called when the xapp runs; this is the user code
368 it's signature should be function(self)
369
370 For the other parameters, see _BaseXapp
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500371 """
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -0400372 # init base
373 super().__init__(rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl)
374 self._entrypoint = entrypoint
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500375
376 def run(self):
377 """
Tommy Carpenter99a0b482020-03-03 10:21:24 -0500378 This function should be called when the client xapp is ready to start their code
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500379 """
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -0400380 self._entrypoint(self)
381
382 # there is no need for stop currently here (base has, and nothing special to do here)