blob: 87ab27e8e839430933e4cb1a9b5b993b333c3c7b [file] [log] [blame]
Tommy Carpenter21f659c2020-02-26 14:12:54 -05001# ==================================================================================
2# Copyright (c) 2020 Nokia
3# Copyright (c) 2020 AT&T Intellectual Property.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16# ==================================================================================
Lott, Christopher (cl778h)61270902020-05-06 09:23:55 -040017"""
18Framework for python xapps
19Framework here means Xapp classes that can be subclassed
20"""
21
Lott, Christopher (cl778h)bbc90282020-05-07 08:39:49 -040022import queue
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -040023from threading import Thread
Tommy Carpenter21f659c2020-02-26 14:12:54 -050024from ricxappframe import xapp_rmr
Tommy Carpenter3a6ac012020-04-06 14:42:57 -040025from ricxappframe.rmr import rmr
Lott, Christopher (cl778h)61270902020-05-06 09:23:55 -040026from ricxappframe.xapp_sdl import SDLWrapper
Tommy Carpenter21f659c2020-02-26 14:12:54 -050027from mdclogpy import Logger
28
Tommy Carpenter09894e32020-04-02 19:45:19 -040029# constants
30RIC_HEALTH_CHECK_REQ = 100
31RIC_HEALTH_CHECK_RESP = 101
32
Tommy Carpenter21f659c2020-02-26 14:12:54 -050033
Tommy Carpenter21f659c2020-02-26 14:12:54 -050034# Private base class; not for direct client use
35
36
37class _BaseXapp:
38 """
39 Base xapp; not for client use directly
40 """
41
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -040042 def __init__(self, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
Tommy Carpenter21f659c2020-02-26 14:12:54 -050043 """
44 Init
45
46 Parameters
47 ----------
48 rmr_port: int
49 port to listen on
50
51 rmr_wait_for_ready: bool (optional)
Lott, Christopher (cl778h)bbc90282020-05-07 08:39:49 -040052
53 if this is True, then init waits until rmr is ready to send, which
54 includes having a valid routing file. This can be set to
55 False if the client only wants to *receive only*.
Tommy Carpenter21f659c2020-02-26 14:12:54 -050056
57 use_fake_sdl: bool (optional)
Lott, Christopher (cl778h)bbc90282020-05-07 08:39:49 -040058 if this is True, it uses dbaas' "fake dict backend" instead
59 of Redis or other backends. Set this to true when developing
60 your xapp or during unit testing to completely avoid needing
61 a dbaas running or any network at all.
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -040062
63 post_init: function (optional)
Lott, Christopher (cl778h)bbc90282020-05-07 08:39:49 -040064 runs this user provided function after the base xapp is
65 initialized; its signature should be post_init(self)
Tommy Carpenter21f659c2020-02-26 14:12:54 -050066 """
Tommy Carpenter0f8305b2020-03-18 10:34:28 -040067 # PUBLIC, can be used by xapps using self.(name):
68 self.logger = Logger(name=__name__)
Tommy Carpenter21f659c2020-02-26 14:12:54 -050069
70 # Start rmr rcv thread
71 self._rmr_loop = xapp_rmr.RmrLoop(port=rmr_port, wait_for_ready=rmr_wait_for_ready)
72 self._mrc = self._rmr_loop.mrc # for convenience
73
74 # SDL
75 self._sdl = SDLWrapper(use_fake_sdl)
76
Tommy Carpenter99a0b482020-03-03 10:21:24 -050077 # run the optionally provided user post init
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -040078 if post_init:
79 post_init(self)
Tommy Carpenter99a0b482020-03-03 10:21:24 -050080
81 # Public rmr methods
82
Tommy Carpenter21f659c2020-02-26 14:12:54 -050083 def rmr_get_messages(self):
84 """
Lott, Christopher (cl778h)bbc90282020-05-07 08:39:49 -040085 Returns a generator iterable over all items in the queue that
86 have not yet been read by the client xapp. Each item is a tuple
87 (S, sbuf) where S is a message summary dict and sbuf is the raw
88 message. The caller MUST call rmr.rmr_free_msg(sbuf) when
89 finished with each sbuf to prevent memory leaks!
Tommy Carpenter21f659c2020-02-26 14:12:54 -050090 """
91 while not self._rmr_loop.rcv_queue.empty():
92 (summary, sbuf) = self._rmr_loop.rcv_queue.get()
93 yield (summary, sbuf)
94
95 def rmr_send(self, payload, mtype, retries=100):
96 """
97 Allocates a buffer, sets payload and mtype, and sends
98
99 Parameters
100 ----------
101 payload: bytes
102 payload to set
103 mtype: int
104 message type
105 retries: int (optional)
106 Number of times to retry at the application level before excepting RMRFailure
107
108 Returns
109 -------
110 bool
111 whether or not the send worked after retries attempts
112 """
113 sbuf = rmr.rmr_alloc_msg(vctx=self._mrc, size=len(payload), payload=payload, gen_transaction_id=True, mtype=mtype)
114
115 for _ in range(retries):
116 sbuf = rmr.rmr_send_msg(self._mrc, sbuf)
117 if sbuf.contents.state == 0:
118 self.rmr_free(sbuf)
119 return True
120
121 self.rmr_free(sbuf)
122 return False
123
124 def rmr_rts(self, sbuf, new_payload=None, new_mtype=None, retries=100):
125 """
Lott, Christopher (cl778h)bbc90282020-05-07 08:39:49 -0400126 Allows the xapp to return to sender, possibly adjusting the
127 payload and message type before doing so. This does NOT free
128 the sbuf for the caller as the caller may wish to perform
129 multiple rts per buffer. The client needs to free.
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500130
131 Parameters
132 ----------
133 sbuf: ctypes c_void_p
134 Pointer to an rmr message buffer
135 new_payload: bytes (optional)
136 New payload to set
137 new_mtype: int (optional)
138 New message type (replaces the received message)
139 retries: int (optional)
Lott, Christopher (cl778h)bbc90282020-05-07 08:39:49 -0400140 Number of times to retry at the application level before
141 throwing exception RMRFailure
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500142
143 Returns
144 -------
145 bool
146 whether or not the send worked after retries attempts
147 """
148 for _ in range(retries):
149 sbuf = rmr.rmr_rts_msg(self._mrc, sbuf, payload=new_payload, mtype=new_mtype)
150 if sbuf.contents.state == 0:
151 return True
152
Tommy Carpenter09894e32020-04-02 19:45:19 -0400153 self.logger.info("RTS Failed! Summary: {}".format(rmr.message_summary(sbuf)))
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500154 return False
155
156 def rmr_free(self, sbuf):
157 """
Lott, Christopher (cl778h)bbc90282020-05-07 08:39:49 -0400158 Frees an rmr message buffer after use
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500159
Lott, Christopher (cl778h)bbc90282020-05-07 08:39:49 -0400160 Note: this does not need to be a class method, self is not
161 used. However if we break it out as a function we need a home
162 for it.
163
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500164 Parameters
165 ----------
166 sbuf: ctypes c_void_p
167 Pointer to an rmr message buffer
168 """
169 rmr.rmr_free_msg(sbuf)
170
171 # SDL
Lott, Christopher (cl778h)ca170d32020-05-12 15:05:59 -0400172 # NOTE, even though these are passthroughs, the separate SDL wrapper
Lott, Christopher (cl778h)bbc90282020-05-07 08:39:49 -0400173 # is useful for other applications like A1. Therefore, we don't
174 # embed that SDLWrapper functionality here so that it can be
175 # instantiated on its own.
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500176
177 def sdl_set(self, ns, key, value, usemsgpack=True):
178 """
Lott, Christopher (cl778h)ca170d32020-05-12 15:05:59 -0400179 Stores a key-value pair,
180 optionally serializing the value to bytes using msgpack.
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500181
182 Parameters
183 ----------
184 ns: string
Lott, Christopher (cl778h)ca170d32020-05-12 15:05:59 -0400185 SDL namespace
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500186 key: string
Lott, Christopher (cl778h)ca170d32020-05-12 15:05:59 -0400187 SDL key
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500188 value:
Lott, Christopher (cl778h)ca170d32020-05-12 15:05:59 -0400189 Object or byte array to store. See the `usemsgpack` parameter.
190 usemsgpack: boolean (optional, default is True)
191 Determines whether the value is serialized using msgpack before storing.
192 If usemsgpack is True, the msgpack function `packb` is invoked
193 on the value to yield a byte array that is then sent to SDL.
194 Stated differently, if usemsgpack is True, the value can be anything
195 that is serializable by msgpack.
196 If usemsgpack is False, the value must be bytes.
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500197 """
198 self._sdl.set(ns, key, value, usemsgpack)
199
200 def sdl_get(self, ns, key, usemsgpack=True):
201 """
Lott, Christopher (cl778h)ca170d32020-05-12 15:05:59 -0400202 Gets the value for the specified namespace and key,
203 optionally deserializing stored bytes using msgpack.
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500204
205 Parameters
206 ----------
207 ns: string
Lott, Christopher (cl778h)ca170d32020-05-12 15:05:59 -0400208 SDL namespace
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500209 key: string
Lott, Christopher (cl778h)ca170d32020-05-12 15:05:59 -0400210 SDL key
211 usemsgpack: boolean (optional, default is True)
212 If usemsgpack is True, the byte array stored by SDL is deserialized
213 using msgpack to yield the original object that was stored.
214 If usemsgpack is False, the byte array stored by SDL is returned
215 without further processing.
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500216
217 Returns
218 -------
Lott, Christopher (cl778h)ca170d32020-05-12 15:05:59 -0400219 Value
220 See the usemsgpack parameter for an explanation of the returned value type.
221 Answers None if the key is not found.
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500222 """
223 return self._sdl.get(ns, key, usemsgpack)
224
225 def sdl_find_and_get(self, ns, prefix, usemsgpack=True):
226 """
Lott, Christopher (cl778h)ca170d32020-05-12 15:05:59 -0400227 Gets all key-value pairs in the specified namespace
228 with keys that start with the specified prefix,
229 optionally deserializing stored bytes using msgpack.
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500230
231 Parameters
232 ----------
233 ns: string
Lott, Christopher (cl778h)ca170d32020-05-12 15:05:59 -0400234 SDL namespace
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500235 prefix: string
Lott, Christopher (cl778h)ca170d32020-05-12 15:05:59 -0400236 the key prefix
237 usemsgpack: boolean (optional, default is True)
238 If usemsgpack is True, the byte array stored by SDL is deserialized
239 using msgpack to yield the original value that was stored.
240 If usemsgpack is False, the byte array stored by SDL is returned
241 without further processing.
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500242
243 Returns
244 -------
Lott, Christopher (cl778h)ca170d32020-05-12 15:05:59 -0400245 Dictionary of key-value pairs
246 Each key has the specified prefix.
247 The value object (its type) depends on the usemsgpack parameter,
248 but is either a Python object or raw bytes as discussed above.
249 Answers an empty dictionary if no keys matched the prefix.
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500250 """
251 return self._sdl.find_and_get(ns, prefix, usemsgpack)
252
253 def sdl_delete(self, ns, key):
254 """
Lott, Christopher (cl778h)ca170d32020-05-12 15:05:59 -0400255 Deletes the key-value pair with the specified key in the specified namespace.
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500256
257 Parameters
258 ----------
259 ns: string
Lott, Christopher (cl778h)ca170d32020-05-12 15:05:59 -0400260 SDL namespace
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500261 key: string
Lott, Christopher (cl778h)ca170d32020-05-12 15:05:59 -0400262 SDL key
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500263 """
264 self._sdl.delete(ns, key)
265
266 # Health
267
268 def healthcheck(self):
269 """
270 this needs to be understood how this is supposed to work
271 """
272 return self._rmr_loop.healthcheck() and self._sdl.healthcheck()
273
274 def stop(self):
275 """
Lott, Christopher (cl778h)bbc90282020-05-07 08:39:49 -0400276 cleans up and stops the xapp rmr thread (currently). This is
277 critical for unit testing as pytest will never return if the
278 thread is running.
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500279
Lott, Christopher (cl778h)bbc90282020-05-07 08:39:49 -0400280 TODO: can we register a ctrl-c handler so this gets called on
281 ctrl-c? Because currently two ctrl-c are needed to stop.
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500282 """
283 self._rmr_loop.stop()
284
285
286# Public Classes to subclass (these subclass _BaseXapp)
287
288
289class RMRXapp(_BaseXapp):
290 """
Lott, Christopher (cl778h)ca170d32020-05-12 15:05:59 -0400291 Represents an Xapp that reacts only to RMR messages; i.e., when
292 messages are received, the Xapp does something. When run is called,
293 the xapp framework waits for RMR messages, and calls the appropriate
294 client-registered consume callback on each.
295
296 Parameters
297 ----------
298 default_handler: function
299 A function with the signature (summary, sbuf) to be called
300 when a message type is received for which no other handler is registered.
301 default_handler argument summary: dict
302 The RMR message summary, a dict of key-value pairs
303 default_handler argument sbuf: ctypes c_void_p
304 Pointer to an RMR message buffer. The user must call free on this when done.
305 rmr_port: integer (optional, default is 4562)
306 Initialize RMR to listen on this port
307 rmr_wait_for_ready: boolean (optional, default is True)
308 Wait for RMR to signal ready before starting the dispatch loop
309 use_fake_sdl: boolean (optional, default is False)
310 Use an in-memory store instead of the real SDL service
311 post_init: function (optional, default None)
312 Run this function after the app initializes and before the dispatch loop starts;
313 its signature should be post_init(self)
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500314 """
315
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -0400316 def __init__(self, default_handler, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500317 """
Lott, Christopher (cl778h)ca170d32020-05-12 15:05:59 -0400318 Also see _BaseXapp
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -0400319 """
320 # init base
321 super().__init__(
322 rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl, post_init=post_init
323 )
324
325 # setup callbacks
326 self._default_handler = default_handler
327 self._dispatch = {}
328
329 # used for thread control
330 self._keep_going = True
331
Tommy Carpenter09894e32020-04-02 19:45:19 -0400332 # register a default healthcheck handler
333 # this default checks that rmr is working and SDL is working
Lott, Christopher (cl778h)bbc90282020-05-07 08:39:49 -0400334 # the user can override this and register their own handler
335 # if they wish since the "last registered callback wins".
Tommy Carpenter09894e32020-04-02 19:45:19 -0400336 def handle_healthcheck(self, summary, sbuf):
337 ok = self.healthcheck()
338 payload = b"OK\n" if ok else b"ERROR [RMR or SDL is unhealthy]\n"
339 self.rmr_rts(sbuf, new_payload=payload, new_mtype=RIC_HEALTH_CHECK_RESP)
340 self.rmr_free(sbuf)
341
342 self.register_callback(handle_healthcheck, RIC_HEALTH_CHECK_REQ)
343
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -0400344 def register_callback(self, handler, message_type):
345 """
346 registers this xapp to call handler(summary, buf) when an rmr message is received of type message_type
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500347
348 Parameters
349 ----------
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -0400350 handler: function
Lott, Christopher (cl778h)bbc90282020-05-07 08:39:49 -0400351 a function with the signature (summary, sbuf) to be called
352 when a message of type message_type is received
353 summary: dict
354 the rmr message summary
355 sbuf: ctypes c_void_p
356 Pointer to an rmr message buffer. The user must call free on this when done.
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -0400357
358 message:type: int
359 the message type to look for
360
361 Note if this method is called multiple times for a single message type, the "last one wins".
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500362 """
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -0400363 self._dispatch[message_type] = handler
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500364
Tommy Carpenter1c9ce6b2020-03-13 09:36:36 -0400365 def run(self, thread=False):
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500366 """
Lott, Christopher (cl778h)ca170d32020-05-12 15:05:59 -0400367 This function should be called when the reactive Xapp is ready to start.
368 After start, the Xapp's handlers will be called on received messages.
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500369
Tommy Carpenter1c9ce6b2020-03-13 09:36:36 -0400370 Parameters
371 ----------
Lott, Christopher (cl778h)ca170d32020-05-12 15:05:59 -0400372 thread: bool (optional, default is False)
373 If False, execution is not returned and the framework loops forever.
Lott, Christopher (cl778h)bbc90282020-05-07 08:39:49 -0400374 If True, a thread is started to run the queue read/dispatch loop
375 and execution is returned to caller; the thread can be stopped
Lott, Christopher (cl778h)ca170d32020-05-12 15:05:59 -0400376 by calling the .stop() method.
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500377 """
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -0400378
379 def loop():
380 while self._keep_going:
Lott, Christopher (cl778h)bbc90282020-05-07 08:39:49 -0400381 try:
382 (summary, sbuf) = self._rmr_loop.rcv_queue.get(block=True, timeout=5)
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -0400383 # dispatch
Lott, Christopher (cl778h)61270902020-05-06 09:23:55 -0400384 func = self._dispatch.get(summary[rmr.RMR_MS_MSG_TYPE], None)
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -0400385 if not func:
386 func = self._default_handler
387 func(self, summary, sbuf)
Lott, Christopher (cl778h)bbc90282020-05-07 08:39:49 -0400388 except queue.Empty:
389 # the get timed out
390 pass
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -0400391
Tommy Carpenter1c9ce6b2020-03-13 09:36:36 -0400392 if thread:
393 Thread(target=loop).start()
394 else:
395 loop()
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -0400396
397 def stop(self):
398 """
Lott, Christopher (cl778h)bbc90282020-05-07 08:39:49 -0400399 Sets the flag to end the dispatch loop.
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -0400400 """
401 super().stop()
Lott, Christopher (cl778h)bbc90282020-05-07 08:39:49 -0400402 self.logger.debug("Setting flag to end framework work loop.")
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -0400403 self._keep_going = False
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500404
405
406class Xapp(_BaseXapp):
407 """
Lott, Christopher (cl778h)ca170d32020-05-12 15:05:59 -0400408 Represents a generic Xapp where the client provides a function for the framework to call,
409 which usually contains a loop-forever construct.
410
411 Parameters
412 ----------
413 entrypoint: function
414 This function is called when the Xapp class's run method is invoked.
415 The function signature must be just function(self)
416 rmr_port: integer (optional, default is 4562)
417 Initialize RMR to listen on this port
418 rmr_wait_for_ready: boolean (optional, default is True)
419 Wait for RMR to signal ready before starting the dispatch loop
420 use_fake_sdl: boolean (optional, default is False)
421 Use an in-memory store instead of the real SDL service
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500422 """
423
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -0400424 def __init__(self, entrypoint, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False):
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500425 """
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -0400426 Parameters
427 ----------
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -0400428
Lott, Christopher (cl778h)ca170d32020-05-12 15:05:59 -0400429 For the other parameters, see class _BaseXapp.
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500430 """
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -0400431 # init base
432 super().__init__(rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl)
433 self._entrypoint = entrypoint
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500434
435 def run(self):
436 """
Lott, Christopher (cl778h)ca170d32020-05-12 15:05:59 -0400437 This function should be called when the general Xapp is ready to start.
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500438 """
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -0400439 self._entrypoint(self)
440
Lott, Christopher (cl778h)bbc90282020-05-07 08:39:49 -0400441 # there is no need for stop currently here (base has, and nothing
442 # special to do here)