| """ |
| Framework for python xapps |
| Framework here means Xapp classes that can be subclassed |
| """ |
| # ================================================================================== |
| # Copyright (c) 2020 Nokia |
| # Copyright (c) 2020 AT&T Intellectual Property. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # ================================================================================== |
| |
| |
| from ricxappframe import xapp_rmr |
| from ricxappframe.xapp_sdl import SDLWrapper |
| from rmr import rmr |
| from mdclogpy import Logger |
| |
| |
| mdc_logger = Logger(name=__name__) |
| |
| |
| # Private base class; not for direct client use |
| |
| |
| class _BaseXapp: |
| """ |
| Base xapp; not for client use directly |
| """ |
| |
| def __init__(self, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False): |
| """ |
| Init |
| |
| Parameters |
| ---------- |
| rmr_port: int |
| port to listen on |
| |
| rmr_wait_for_ready: bool (optional) |
| if this is True, then init waits until rmr is ready to send, which includes having a valid routing file. |
| this can be set to False if the client only wants to *receive only* |
| |
| use_fake_sdl: bool (optional) |
| if this is True, it uses dbaas' "fake dict backend" instead of Redis or other backends. |
| Set this to true when developing your xapp or during unit testing to completely avoid needing a dbaas running or any network at all |
| """ |
| |
| # Start rmr rcv thread |
| self._rmr_loop = xapp_rmr.RmrLoop(port=rmr_port, wait_for_ready=rmr_wait_for_ready) |
| self._mrc = self._rmr_loop.mrc # for convenience |
| |
| # SDL |
| self._sdl = SDLWrapper(use_fake_sdl) |
| |
| def rmr_get_messages(self): |
| """ |
| returns a generator iterable over all current messages in the queue that have not yet been read by the client xapp |
| """ |
| while not self._rmr_loop.rcv_queue.empty(): |
| (summary, sbuf) = self._rmr_loop.rcv_queue.get() |
| yield (summary, sbuf) |
| |
| def rmr_send(self, payload, mtype, retries=100): |
| """ |
| Allocates a buffer, sets payload and mtype, and sends |
| |
| Parameters |
| ---------- |
| payload: bytes |
| payload to set |
| mtype: int |
| message type |
| retries: int (optional) |
| Number of times to retry at the application level before excepting RMRFailure |
| |
| Returns |
| ------- |
| bool |
| whether or not the send worked after retries attempts |
| """ |
| sbuf = rmr.rmr_alloc_msg(vctx=self._mrc, size=len(payload), payload=payload, gen_transaction_id=True, mtype=mtype) |
| |
| for _ in range(retries): |
| sbuf = rmr.rmr_send_msg(self._mrc, sbuf) |
| if sbuf.contents.state == 0: |
| self.rmr_free(sbuf) |
| return True |
| |
| self.rmr_free(sbuf) |
| return False |
| |
| def rmr_rts(self, sbuf, new_payload=None, new_mtype=None, retries=100): |
| """ |
| Allows the xapp to return to sender, possibly adjusting the payload and message type before doing so |
| |
| This does NOT free the sbuf for the caller as the caller may wish to perform multiple rts per buffer. |
| The client needs to free. |
| |
| Parameters |
| ---------- |
| sbuf: ctypes c_void_p |
| Pointer to an rmr message buffer |
| new_payload: bytes (optional) |
| New payload to set |
| new_mtype: int (optional) |
| New message type (replaces the received message) |
| retries: int (optional) |
| Number of times to retry at the application level before excepting RMRFailure |
| |
| Returns |
| ------- |
| bool |
| whether or not the send worked after retries attempts |
| """ |
| for _ in range(retries): |
| sbuf = rmr.rmr_rts_msg(self._mrc, sbuf, payload=new_payload, mtype=new_mtype) |
| if sbuf.contents.state == 0: |
| return True |
| |
| return False |
| |
| def rmr_free(self, sbuf): |
| """ |
| Free an rmr message buffer after use |
| |
| Note: this does not need to be a class method, self is not used. However if we break it out as a function we need a home for it. |
| Parameters |
| ---------- |
| sbuf: ctypes c_void_p |
| Pointer to an rmr message buffer |
| """ |
| rmr.rmr_free_msg(sbuf) |
| |
| # SDL |
| # NOTE, even though these are passthroughs, the seperate SDL wrapper is useful for other applications like A1. |
| # Therefore, we don't embed that SDLWrapper functionality here so that it can be instantiated on it's own. |
| |
| def sdl_set(self, ns, key, value, usemsgpack=True): |
| """ |
| set a key |
| |
| Parameters |
| ---------- |
| ns: string |
| the sdl namespace |
| key: string |
| the sdl key |
| value: |
| if usemsgpack is True, value can be anything serializable by msgpack |
| if usemsgpack is False, value must be bytes |
| usemsgpack: boolean (optional) |
| determines whether the value is serialized using msgpack |
| """ |
| self._sdl.set(ns, key, value, usemsgpack) |
| |
| def sdl_get(self, ns, key, usemsgpack=True): |
| """ |
| get a key |
| |
| Parameters |
| ---------- |
| ns: string |
| the sdl namespace |
| key: string |
| the sdl key |
| usemsgpack: boolean (optional) |
| if usemsgpack is True, the value is deserialized using msgpack |
| if usemsgpack is False, the value is returned as raw bytes |
| |
| Returns |
| ------- |
| None (if not exist) or see above; depends on usemsgpack |
| """ |
| return self._sdl.get(ns, key, usemsgpack) |
| |
| def sdl_find_and_get(self, ns, prefix, usemsgpack=True): |
| """ |
| get all k v pairs that start with prefix |
| |
| Parameters |
| ---------- |
| ns: string |
| the sdl namespace |
| key: string |
| the sdl key |
| prefix: string |
| the prefix |
| usemsgpack: boolean (optional) |
| if usemsgpack is True, the value returned is a dict where each value has been deserialized using msgpack |
| if usemsgpack is False, the value returned is as a dict mapping keys to raw bytes |
| |
| Returns |
| ------- |
| {} (if no keys match) or see above; depends on usemsgpack |
| """ |
| return self._sdl.find_and_get(ns, prefix, usemsgpack) |
| |
| def sdl_delete(self, ns, key): |
| """ |
| delete a key |
| |
| Parameters |
| ---------- |
| ns: string |
| the sdl namespace |
| key: string |
| the sdl key |
| """ |
| self._sdl.delete(ns, key) |
| |
| # Health |
| |
| def healthcheck(self): |
| """ |
| this needs to be understood how this is supposed to work |
| """ |
| return self._rmr_loop.healthcheck() and self._sdl.healthcheck() |
| |
| def stop(self): |
| """ |
| cleans up and stops the xapp. |
| Currently this only stops the rmr thread |
| This is critical for unit testing as pytest will never return if the thread is running. |
| |
| TODO: can we register a ctrl-c handler so this gets called on ctrl-c? Because currently two ctrl-c are needed to stop |
| """ |
| self._rmr_loop.stop() |
| |
| |
| # Public Classes to subclass (these subclass _BaseXapp) |
| |
| |
| class RMRXapp(_BaseXapp): |
| """ |
| Represents an xapp that is purely driven by rmr messages (i.e., when messages are received, the xapp does something |
| When run is called, the xapp framework waits for rmr messages, and calls the client provided consume callback on every one |
| """ |
| |
| def consume(self, summary, sbuf): |
| """ |
| This function is to be implemented by the client and is called whenever a new rmr message is received. |
| It is expected to take two parameters (besides self): |
| |
| Parameters |
| ---------- |
| summary: dict |
| the rmr message summary |
| sbuf: ctypes c_void_p |
| Pointer to an rmr message buffer. The user must call free on this when done. |
| """ |
| raise NotImplementedError() |
| |
| def run(self): |
| """ |
| This function should be called when the client xapp is ready to wait for consume to be called on received messages |
| |
| TODO: should we run this in a thread too? We can't currently call "stop" on rmr xapps at an arbitrary time because this doesn't return control |
| Running the below in a thread probably makes the most sense. |
| """ |
| while True: |
| if not self._rmr_loop.rcv_queue.empty(): |
| (summary, sbuf) = self._rmr_loop.rcv_queue.get() |
| self.consume(summary, sbuf) |
| |
| |
| class Xapp(_BaseXapp): |
| """ |
| Represents an xapp where the client provides a generic function to call, which is mostly likely a loop-forever loop |
| """ |
| |
| def loop(self): |
| """ |
| This function is to be implemented by the client and is called |
| """ |
| raise NotImplementedError() |
| |
| def run(self): |
| """ |
| This function should be called when the client xapp is ready to start their loop |
| This is simple and the client could just call self.loop(), however this gives a consistent interface as the other xapps |
| """ |
| self.loop() |