Tommy Carpenter | 21f659c | 2020-02-26 14:12:54 -0500 | [diff] [blame] | 1 | # ================================================================================== |
| 2 | # Copyright (c) 2020 Nokia |
| 3 | # Copyright (c) 2020 AT&T Intellectual Property. |
| 4 | # |
| 5 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 6 | # you may not use this file except in compliance with the License. |
| 7 | # You may obtain a copy of the License at |
| 8 | # |
| 9 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | # |
| 11 | # Unless required by applicable law or agreed to in writing, software |
| 12 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 13 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 14 | # See the License for the specific language governing permissions and |
| 15 | # limitations under the License. |
| 16 | # ================================================================================== |
Lott, Christopher (cl778h) | 6127090 | 2020-05-06 09:23:55 -0400 | [diff] [blame^] | 17 | """ |
| 18 | Framework for python xapps |
| 19 | Framework here means Xapp classes that can be subclassed |
| 20 | """ |
| 21 | |
Tommy Carpenter | f9cd5cc | 2020-03-09 13:46:37 -0400 | [diff] [blame] | 22 | from threading import Thread |
Tommy Carpenter | 21f659c | 2020-02-26 14:12:54 -0500 | [diff] [blame] | 23 | from ricxappframe import xapp_rmr |
Tommy Carpenter | 3a6ac01 | 2020-04-06 14:42:57 -0400 | [diff] [blame] | 24 | from ricxappframe.rmr import rmr |
Lott, Christopher (cl778h) | 6127090 | 2020-05-06 09:23:55 -0400 | [diff] [blame^] | 25 | from ricxappframe.xapp_sdl import SDLWrapper |
Tommy Carpenter | 21f659c | 2020-02-26 14:12:54 -0500 | [diff] [blame] | 26 | from mdclogpy import Logger |
| 27 | |
Tommy Carpenter | 09894e3 | 2020-04-02 19:45:19 -0400 | [diff] [blame] | 28 | # constants |
| 29 | RIC_HEALTH_CHECK_REQ = 100 |
| 30 | RIC_HEALTH_CHECK_RESP = 101 |
| 31 | |
Tommy Carpenter | 21f659c | 2020-02-26 14:12:54 -0500 | [diff] [blame] | 32 | |
Tommy Carpenter | 21f659c | 2020-02-26 14:12:54 -0500 | [diff] [blame] | 33 | # Private base class; not for direct client use |
| 34 | |
| 35 | |
| 36 | class _BaseXapp: |
| 37 | """ |
| 38 | Base xapp; not for client use directly |
| 39 | """ |
| 40 | |
Tommy Carpenter | f9cd5cc | 2020-03-09 13:46:37 -0400 | [diff] [blame] | 41 | def __init__(self, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None): |
Tommy Carpenter | 21f659c | 2020-02-26 14:12:54 -0500 | [diff] [blame] | 42 | """ |
| 43 | Init |
| 44 | |
| 45 | Parameters |
| 46 | ---------- |
| 47 | rmr_port: int |
| 48 | port to listen on |
| 49 | |
| 50 | rmr_wait_for_ready: bool (optional) |
| 51 | if this is True, then init waits until rmr is ready to send, which includes having a valid routing file. |
| 52 | this can be set to False if the client only wants to *receive only* |
| 53 | |
| 54 | use_fake_sdl: bool (optional) |
| 55 | if this is True, it uses dbaas' "fake dict backend" instead of Redis or other backends. |
| 56 | Set this to true when developing your xapp or during unit testing to completely avoid needing a dbaas running or any network at all |
Tommy Carpenter | f9cd5cc | 2020-03-09 13:46:37 -0400 | [diff] [blame] | 57 | |
| 58 | post_init: function (optional) |
| 59 | runs this user provided function after the base xapp is initialized |
| 60 | it's signature should be post_init(self) |
Tommy Carpenter | 21f659c | 2020-02-26 14:12:54 -0500 | [diff] [blame] | 61 | """ |
Tommy Carpenter | 0f8305b | 2020-03-18 10:34:28 -0400 | [diff] [blame] | 62 | # PUBLIC, can be used by xapps using self.(name): |
| 63 | self.logger = Logger(name=__name__) |
Tommy Carpenter | 21f659c | 2020-02-26 14:12:54 -0500 | [diff] [blame] | 64 | |
| 65 | # Start rmr rcv thread |
| 66 | self._rmr_loop = xapp_rmr.RmrLoop(port=rmr_port, wait_for_ready=rmr_wait_for_ready) |
| 67 | self._mrc = self._rmr_loop.mrc # for convenience |
| 68 | |
| 69 | # SDL |
| 70 | self._sdl = SDLWrapper(use_fake_sdl) |
| 71 | |
Tommy Carpenter | 99a0b48 | 2020-03-03 10:21:24 -0500 | [diff] [blame] | 72 | # run the optionally provided user post init |
Tommy Carpenter | f9cd5cc | 2020-03-09 13:46:37 -0400 | [diff] [blame] | 73 | if post_init: |
| 74 | post_init(self) |
Tommy Carpenter | 99a0b48 | 2020-03-03 10:21:24 -0500 | [diff] [blame] | 75 | |
| 76 | # Public rmr methods |
| 77 | |
Tommy Carpenter | 21f659c | 2020-02-26 14:12:54 -0500 | [diff] [blame] | 78 | def rmr_get_messages(self): |
| 79 | """ |
Lott, Christopher (cl778h) | 666e831 | 2020-05-05 18:31:54 -0400 | [diff] [blame] | 80 | Returns a generator iterable over all items in the queue that have not yet been read by the client xapp. |
| 81 | Each item is a tuple (S, sbuf) where S is a message summary dict and sbuf is the raw message. |
| 82 | The caller MUST call rmr.rmr_free_msg(sbuf) when finished with each sbuf to prevent memory leaks! |
Tommy Carpenter | 21f659c | 2020-02-26 14:12:54 -0500 | [diff] [blame] | 83 | """ |
| 84 | while not self._rmr_loop.rcv_queue.empty(): |
| 85 | (summary, sbuf) = self._rmr_loop.rcv_queue.get() |
| 86 | yield (summary, sbuf) |
| 87 | |
| 88 | def rmr_send(self, payload, mtype, retries=100): |
| 89 | """ |
| 90 | Allocates a buffer, sets payload and mtype, and sends |
| 91 | |
| 92 | Parameters |
| 93 | ---------- |
| 94 | payload: bytes |
| 95 | payload to set |
| 96 | mtype: int |
| 97 | message type |
| 98 | retries: int (optional) |
| 99 | Number of times to retry at the application level before excepting RMRFailure |
| 100 | |
| 101 | Returns |
| 102 | ------- |
| 103 | bool |
| 104 | whether or not the send worked after retries attempts |
| 105 | """ |
| 106 | sbuf = rmr.rmr_alloc_msg(vctx=self._mrc, size=len(payload), payload=payload, gen_transaction_id=True, mtype=mtype) |
| 107 | |
| 108 | for _ in range(retries): |
| 109 | sbuf = rmr.rmr_send_msg(self._mrc, sbuf) |
| 110 | if sbuf.contents.state == 0: |
| 111 | self.rmr_free(sbuf) |
| 112 | return True |
| 113 | |
| 114 | self.rmr_free(sbuf) |
| 115 | return False |
| 116 | |
| 117 | def rmr_rts(self, sbuf, new_payload=None, new_mtype=None, retries=100): |
| 118 | """ |
| 119 | Allows the xapp to return to sender, possibly adjusting the payload and message type before doing so |
| 120 | |
| 121 | This does NOT free the sbuf for the caller as the caller may wish to perform multiple rts per buffer. |
| 122 | The client needs to free. |
| 123 | |
| 124 | Parameters |
| 125 | ---------- |
| 126 | sbuf: ctypes c_void_p |
| 127 | Pointer to an rmr message buffer |
| 128 | new_payload: bytes (optional) |
| 129 | New payload to set |
| 130 | new_mtype: int (optional) |
| 131 | New message type (replaces the received message) |
| 132 | retries: int (optional) |
| 133 | Number of times to retry at the application level before excepting RMRFailure |
| 134 | |
| 135 | Returns |
| 136 | ------- |
| 137 | bool |
| 138 | whether or not the send worked after retries attempts |
| 139 | """ |
| 140 | for _ in range(retries): |
| 141 | sbuf = rmr.rmr_rts_msg(self._mrc, sbuf, payload=new_payload, mtype=new_mtype) |
| 142 | if sbuf.contents.state == 0: |
| 143 | return True |
| 144 | |
Tommy Carpenter | 09894e3 | 2020-04-02 19:45:19 -0400 | [diff] [blame] | 145 | self.logger.info("RTS Failed! Summary: {}".format(rmr.message_summary(sbuf))) |
Tommy Carpenter | 21f659c | 2020-02-26 14:12:54 -0500 | [diff] [blame] | 146 | return False |
| 147 | |
| 148 | def rmr_free(self, sbuf): |
| 149 | """ |
| 150 | Free an rmr message buffer after use |
| 151 | |
| 152 | Note: this does not need to be a class method, self is not used. However if we break it out as a function we need a home for it. |
| 153 | Parameters |
| 154 | ---------- |
| 155 | sbuf: ctypes c_void_p |
| 156 | Pointer to an rmr message buffer |
| 157 | """ |
| 158 | rmr.rmr_free_msg(sbuf) |
| 159 | |
| 160 | # SDL |
| 161 | # NOTE, even though these are passthroughs, the seperate SDL wrapper is useful for other applications like A1. |
| 162 | # Therefore, we don't embed that SDLWrapper functionality here so that it can be instantiated on it's own. |
| 163 | |
| 164 | def sdl_set(self, ns, key, value, usemsgpack=True): |
| 165 | """ |
| 166 | set a key |
| 167 | |
| 168 | Parameters |
| 169 | ---------- |
| 170 | ns: string |
| 171 | the sdl namespace |
| 172 | key: string |
| 173 | the sdl key |
| 174 | value: |
| 175 | if usemsgpack is True, value can be anything serializable by msgpack |
| 176 | if usemsgpack is False, value must be bytes |
| 177 | usemsgpack: boolean (optional) |
| 178 | determines whether the value is serialized using msgpack |
| 179 | """ |
| 180 | self._sdl.set(ns, key, value, usemsgpack) |
| 181 | |
| 182 | def sdl_get(self, ns, key, usemsgpack=True): |
| 183 | """ |
| 184 | get a key |
| 185 | |
| 186 | Parameters |
| 187 | ---------- |
| 188 | ns: string |
| 189 | the sdl namespace |
| 190 | key: string |
| 191 | the sdl key |
| 192 | usemsgpack: boolean (optional) |
| 193 | if usemsgpack is True, the value is deserialized using msgpack |
| 194 | if usemsgpack is False, the value is returned as raw bytes |
| 195 | |
| 196 | Returns |
| 197 | ------- |
| 198 | None (if not exist) or see above; depends on usemsgpack |
| 199 | """ |
| 200 | return self._sdl.get(ns, key, usemsgpack) |
| 201 | |
| 202 | def sdl_find_and_get(self, ns, prefix, usemsgpack=True): |
| 203 | """ |
| 204 | get all k v pairs that start with prefix |
| 205 | |
| 206 | Parameters |
| 207 | ---------- |
| 208 | ns: string |
| 209 | the sdl namespace |
| 210 | key: string |
| 211 | the sdl key |
| 212 | prefix: string |
| 213 | the prefix |
| 214 | usemsgpack: boolean (optional) |
| 215 | if usemsgpack is True, the value returned is a dict where each value has been deserialized using msgpack |
| 216 | if usemsgpack is False, the value returned is as a dict mapping keys to raw bytes |
| 217 | |
| 218 | Returns |
| 219 | ------- |
| 220 | {} (if no keys match) or see above; depends on usemsgpack |
| 221 | """ |
| 222 | return self._sdl.find_and_get(ns, prefix, usemsgpack) |
| 223 | |
| 224 | def sdl_delete(self, ns, key): |
| 225 | """ |
| 226 | delete a key |
| 227 | |
| 228 | Parameters |
| 229 | ---------- |
| 230 | ns: string |
| 231 | the sdl namespace |
| 232 | key: string |
| 233 | the sdl key |
| 234 | """ |
| 235 | self._sdl.delete(ns, key) |
| 236 | |
| 237 | # Health |
| 238 | |
| 239 | def healthcheck(self): |
| 240 | """ |
| 241 | this needs to be understood how this is supposed to work |
| 242 | """ |
| 243 | return self._rmr_loop.healthcheck() and self._sdl.healthcheck() |
| 244 | |
| 245 | def stop(self): |
| 246 | """ |
Tommy Carpenter | f9cd5cc | 2020-03-09 13:46:37 -0400 | [diff] [blame] | 247 | cleans up and stops the xapp rmr thread (currently) |
Tommy Carpenter | 21f659c | 2020-02-26 14:12:54 -0500 | [diff] [blame] | 248 | This is critical for unit testing as pytest will never return if the thread is running. |
| 249 | |
| 250 | TODO: can we register a ctrl-c handler so this gets called on ctrl-c? Because currently two ctrl-c are needed to stop |
| 251 | """ |
| 252 | self._rmr_loop.stop() |
| 253 | |
| 254 | |
| 255 | # Public Classes to subclass (these subclass _BaseXapp) |
| 256 | |
| 257 | |
| 258 | class RMRXapp(_BaseXapp): |
| 259 | """ |
| 260 | Represents an xapp that is purely driven by rmr messages (i.e., when messages are received, the xapp does something |
| 261 | When run is called, the xapp framework waits for rmr messages, and calls the client provided consume callback on every one |
| 262 | """ |
| 263 | |
Tommy Carpenter | f9cd5cc | 2020-03-09 13:46:37 -0400 | [diff] [blame] | 264 | def __init__(self, default_handler, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None): |
Tommy Carpenter | 21f659c | 2020-02-26 14:12:54 -0500 | [diff] [blame] | 265 | """ |
Tommy Carpenter | f9cd5cc | 2020-03-09 13:46:37 -0400 | [diff] [blame] | 266 | Parameters |
| 267 | ---------- |
| 268 | default_handler: function |
| 269 | a function with the signature (summary, sbuf) to be called when a message of type message_type is received |
| 270 | summary: dict |
| 271 | the rmr message summary |
| 272 | sbuf: ctypes c_void_p |
| 273 | Pointer to an rmr message buffer. The user must call free on this when done. |
| 274 | |
| 275 | post_init: function (optional) |
| 276 | optionally runs this function after the app initializes and before the run loop |
| 277 | it's signature should be post_init(self) |
| 278 | |
| 279 | For the other parameters, see _BaseXapp |
| 280 | """ |
| 281 | # init base |
| 282 | super().__init__( |
| 283 | rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl, post_init=post_init |
| 284 | ) |
| 285 | |
| 286 | # setup callbacks |
| 287 | self._default_handler = default_handler |
| 288 | self._dispatch = {} |
| 289 | |
| 290 | # used for thread control |
| 291 | self._keep_going = True |
| 292 | |
Tommy Carpenter | 09894e3 | 2020-04-02 19:45:19 -0400 | [diff] [blame] | 293 | # register a default healthcheck handler |
| 294 | # this default checks that rmr is working and SDL is working |
| 295 | # the user can override this and register their own handler if they wish since the "last registered callback wins". |
| 296 | def handle_healthcheck(self, summary, sbuf): |
| 297 | ok = self.healthcheck() |
| 298 | payload = b"OK\n" if ok else b"ERROR [RMR or SDL is unhealthy]\n" |
| 299 | self.rmr_rts(sbuf, new_payload=payload, new_mtype=RIC_HEALTH_CHECK_RESP) |
| 300 | self.rmr_free(sbuf) |
| 301 | |
| 302 | self.register_callback(handle_healthcheck, RIC_HEALTH_CHECK_REQ) |
| 303 | |
Tommy Carpenter | f9cd5cc | 2020-03-09 13:46:37 -0400 | [diff] [blame] | 304 | def register_callback(self, handler, message_type): |
| 305 | """ |
| 306 | registers this xapp to call handler(summary, buf) when an rmr message is received of type message_type |
Tommy Carpenter | 21f659c | 2020-02-26 14:12:54 -0500 | [diff] [blame] | 307 | |
| 308 | Parameters |
| 309 | ---------- |
Tommy Carpenter | f9cd5cc | 2020-03-09 13:46:37 -0400 | [diff] [blame] | 310 | handler: function |
| 311 | a function with the signature (summary, sbuf) to be called when a message of type message_type is received |
| 312 | summary: dict |
| 313 | the rmr message summary |
| 314 | sbuf: ctypes c_void_p |
| 315 | Pointer to an rmr message buffer. The user must call free on this when done. |
| 316 | |
| 317 | message:type: int |
| 318 | the message type to look for |
| 319 | |
| 320 | Note if this method is called multiple times for a single message type, the "last one wins". |
Tommy Carpenter | 21f659c | 2020-02-26 14:12:54 -0500 | [diff] [blame] | 321 | """ |
Tommy Carpenter | f9cd5cc | 2020-03-09 13:46:37 -0400 | [diff] [blame] | 322 | self._dispatch[message_type] = handler |
Tommy Carpenter | 21f659c | 2020-02-26 14:12:54 -0500 | [diff] [blame] | 323 | |
Tommy Carpenter | 1c9ce6b | 2020-03-13 09:36:36 -0400 | [diff] [blame] | 324 | def run(self, thread=False): |
Tommy Carpenter | 21f659c | 2020-02-26 14:12:54 -0500 | [diff] [blame] | 325 | """ |
Tommy Carpenter | f9cd5cc | 2020-03-09 13:46:37 -0400 | [diff] [blame] | 326 | This function should be called when the client xapp is ready to wait for their handlers to be called on received messages |
Tommy Carpenter | 21f659c | 2020-02-26 14:12:54 -0500 | [diff] [blame] | 327 | |
Tommy Carpenter | 1c9ce6b | 2020-03-13 09:36:36 -0400 | [diff] [blame] | 328 | Parameters |
| 329 | ---------- |
| 330 | thread: bool (optional) |
| 331 | if thread is True, execution is returned to caller and the queue read loop is executed in a thread. |
| 332 | The thread can be stopped using .stop() |
| 333 | if False, execution is not returned and the framework loops |
Tommy Carpenter | 21f659c | 2020-02-26 14:12:54 -0500 | [diff] [blame] | 334 | """ |
Tommy Carpenter | f9cd5cc | 2020-03-09 13:46:37 -0400 | [diff] [blame] | 335 | |
| 336 | def loop(): |
| 337 | while self._keep_going: |
| 338 | if not self._rmr_loop.rcv_queue.empty(): |
| 339 | (summary, sbuf) = self._rmr_loop.rcv_queue.get() |
| 340 | # dispatch |
Lott, Christopher (cl778h) | 6127090 | 2020-05-06 09:23:55 -0400 | [diff] [blame^] | 341 | func = self._dispatch.get(summary[rmr.RMR_MS_MSG_TYPE], None) |
Tommy Carpenter | f9cd5cc | 2020-03-09 13:46:37 -0400 | [diff] [blame] | 342 | if not func: |
| 343 | func = self._default_handler |
| 344 | func(self, summary, sbuf) |
| 345 | |
Tommy Carpenter | 1c9ce6b | 2020-03-13 09:36:36 -0400 | [diff] [blame] | 346 | if thread: |
| 347 | Thread(target=loop).start() |
| 348 | else: |
| 349 | loop() |
Tommy Carpenter | f9cd5cc | 2020-03-09 13:46:37 -0400 | [diff] [blame] | 350 | |
| 351 | def stop(self): |
| 352 | """ |
| 353 | stops the rmr xapp completely. |
| 354 | """ |
| 355 | super().stop() |
Tommy Carpenter | 0f8305b | 2020-03-18 10:34:28 -0400 | [diff] [blame] | 356 | self.logger.debug("Stopping queue reading thread..") |
Tommy Carpenter | f9cd5cc | 2020-03-09 13:46:37 -0400 | [diff] [blame] | 357 | self._keep_going = False |
Tommy Carpenter | 21f659c | 2020-02-26 14:12:54 -0500 | [diff] [blame] | 358 | |
| 359 | |
| 360 | class Xapp(_BaseXapp): |
| 361 | """ |
| 362 | Represents an xapp where the client provides a generic function to call, which is mostly likely a loop-forever loop |
| 363 | """ |
| 364 | |
Tommy Carpenter | f9cd5cc | 2020-03-09 13:46:37 -0400 | [diff] [blame] | 365 | def __init__(self, entrypoint, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False): |
Tommy Carpenter | 21f659c | 2020-02-26 14:12:54 -0500 | [diff] [blame] | 366 | """ |
Tommy Carpenter | f9cd5cc | 2020-03-09 13:46:37 -0400 | [diff] [blame] | 367 | Parameters |
| 368 | ---------- |
| 369 | entrypoint: function |
| 370 | this function is called when the xapp runs; this is the user code |
| 371 | it's signature should be function(self) |
| 372 | |
| 373 | For the other parameters, see _BaseXapp |
Tommy Carpenter | 21f659c | 2020-02-26 14:12:54 -0500 | [diff] [blame] | 374 | """ |
Tommy Carpenter | f9cd5cc | 2020-03-09 13:46:37 -0400 | [diff] [blame] | 375 | # init base |
| 376 | super().__init__(rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl) |
| 377 | self._entrypoint = entrypoint |
Tommy Carpenter | 21f659c | 2020-02-26 14:12:54 -0500 | [diff] [blame] | 378 | |
| 379 | def run(self): |
| 380 | """ |
Tommy Carpenter | 99a0b48 | 2020-03-03 10:21:24 -0500 | [diff] [blame] | 381 | This function should be called when the client xapp is ready to start their code |
Tommy Carpenter | 21f659c | 2020-02-26 14:12:54 -0500 | [diff] [blame] | 382 | """ |
Tommy Carpenter | f9cd5cc | 2020-03-09 13:46:37 -0400 | [diff] [blame] | 383 | self._entrypoint(self) |
| 384 | |
| 385 | # there is no need for stop currently here (base has, and nothing special to do here) |