blob: 94a3a3e6d335c72a3320c7f54f1122410529bbc5 [file] [log] [blame]
"""
Framework for python xapps
Framework here means Xapp classes that can be subclassed
"""
# ==================================================================================
# Copyright (c) 2020 Nokia
# Copyright (c) 2020 AT&T Intellectual Property.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==================================================================================
from ricxappframe import xapp_rmr
from ricxappframe.xapp_sdl import SDLWrapper
from rmr import rmr
from mdclogpy import Logger
mdc_logger = Logger(name=__name__)
# Private base class; not for direct client use
class _BaseXapp:
"""
Base xapp; not for client use directly
"""
def __init__(self, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False):
"""
Init
Parameters
----------
rmr_port: int
port to listen on
rmr_wait_for_ready: bool (optional)
if this is True, then init waits until rmr is ready to send, which includes having a valid routing file.
this can be set to False if the client only wants to *receive only*
use_fake_sdl: bool (optional)
if this is True, it uses dbaas' "fake dict backend" instead of Redis or other backends.
Set this to true when developing your xapp or during unit testing to completely avoid needing a dbaas running or any network at all
"""
# Start rmr rcv thread
self._rmr_loop = xapp_rmr.RmrLoop(port=rmr_port, wait_for_ready=rmr_wait_for_ready)
self._mrc = self._rmr_loop.mrc # for convenience
# SDL
self._sdl = SDLWrapper(use_fake_sdl)
def rmr_get_messages(self):
"""
returns a generator iterable over all current messages in the queue that have not yet been read by the client xapp
"""
while not self._rmr_loop.rcv_queue.empty():
(summary, sbuf) = self._rmr_loop.rcv_queue.get()
yield (summary, sbuf)
def rmr_send(self, payload, mtype, retries=100):
"""
Allocates a buffer, sets payload and mtype, and sends
Parameters
----------
payload: bytes
payload to set
mtype: int
message type
retries: int (optional)
Number of times to retry at the application level before excepting RMRFailure
Returns
-------
bool
whether or not the send worked after retries attempts
"""
sbuf = rmr.rmr_alloc_msg(vctx=self._mrc, size=len(payload), payload=payload, gen_transaction_id=True, mtype=mtype)
for _ in range(retries):
sbuf = rmr.rmr_send_msg(self._mrc, sbuf)
if sbuf.contents.state == 0:
self.rmr_free(sbuf)
return True
self.rmr_free(sbuf)
return False
def rmr_rts(self, sbuf, new_payload=None, new_mtype=None, retries=100):
"""
Allows the xapp to return to sender, possibly adjusting the payload and message type before doing so
This does NOT free the sbuf for the caller as the caller may wish to perform multiple rts per buffer.
The client needs to free.
Parameters
----------
sbuf: ctypes c_void_p
Pointer to an rmr message buffer
new_payload: bytes (optional)
New payload to set
new_mtype: int (optional)
New message type (replaces the received message)
retries: int (optional)
Number of times to retry at the application level before excepting RMRFailure
Returns
-------
bool
whether or not the send worked after retries attempts
"""
for _ in range(retries):
sbuf = rmr.rmr_rts_msg(self._mrc, sbuf, payload=new_payload, mtype=new_mtype)
if sbuf.contents.state == 0:
return True
return False
def rmr_free(self, sbuf):
"""
Free an rmr message buffer after use
Note: this does not need to be a class method, self is not used. However if we break it out as a function we need a home for it.
Parameters
----------
sbuf: ctypes c_void_p
Pointer to an rmr message buffer
"""
rmr.rmr_free_msg(sbuf)
# SDL
# NOTE, even though these are passthroughs, the seperate SDL wrapper is useful for other applications like A1.
# Therefore, we don't embed that SDLWrapper functionality here so that it can be instantiated on it's own.
def sdl_set(self, ns, key, value, usemsgpack=True):
"""
set a key
Parameters
----------
ns: string
the sdl namespace
key: string
the sdl key
value:
if usemsgpack is True, value can be anything serializable by msgpack
if usemsgpack is False, value must be bytes
usemsgpack: boolean (optional)
determines whether the value is serialized using msgpack
"""
self._sdl.set(ns, key, value, usemsgpack)
def sdl_get(self, ns, key, usemsgpack=True):
"""
get a key
Parameters
----------
ns: string
the sdl namespace
key: string
the sdl key
usemsgpack: boolean (optional)
if usemsgpack is True, the value is deserialized using msgpack
if usemsgpack is False, the value is returned as raw bytes
Returns
-------
None (if not exist) or see above; depends on usemsgpack
"""
return self._sdl.get(ns, key, usemsgpack)
def sdl_find_and_get(self, ns, prefix, usemsgpack=True):
"""
get all k v pairs that start with prefix
Parameters
----------
ns: string
the sdl namespace
key: string
the sdl key
prefix: string
the prefix
usemsgpack: boolean (optional)
if usemsgpack is True, the value returned is a dict where each value has been deserialized using msgpack
if usemsgpack is False, the value returned is as a dict mapping keys to raw bytes
Returns
-------
{} (if no keys match) or see above; depends on usemsgpack
"""
return self._sdl.find_and_get(ns, prefix, usemsgpack)
def sdl_delete(self, ns, key):
"""
delete a key
Parameters
----------
ns: string
the sdl namespace
key: string
the sdl key
"""
self._sdl.delete(ns, key)
# Health
def healthcheck(self):
"""
this needs to be understood how this is supposed to work
"""
return self._rmr_loop.healthcheck() and self._sdl.healthcheck()
def stop(self):
"""
cleans up and stops the xapp.
Currently this only stops the rmr thread
This is critical for unit testing as pytest will never return if the thread is running.
TODO: can we register a ctrl-c handler so this gets called on ctrl-c? Because currently two ctrl-c are needed to stop
"""
self._rmr_loop.stop()
# Public Classes to subclass (these subclass _BaseXapp)
class RMRXapp(_BaseXapp):
"""
Represents an xapp that is purely driven by rmr messages (i.e., when messages are received, the xapp does something
When run is called, the xapp framework waits for rmr messages, and calls the client provided consume callback on every one
"""
def consume(self, summary, sbuf):
"""
This function is to be implemented by the client and is called whenever a new rmr message is received.
It is expected to take two parameters (besides self):
Parameters
----------
summary: dict
the rmr message summary
sbuf: ctypes c_void_p
Pointer to an rmr message buffer. The user must call free on this when done.
"""
raise NotImplementedError()
def run(self):
"""
This function should be called when the client xapp is ready to wait for consume to be called on received messages
TODO: should we run this in a thread too? We can't currently call "stop" on rmr xapps at an arbitrary time because this doesn't return control
Running the below in a thread probably makes the most sense.
"""
while True:
if not self._rmr_loop.rcv_queue.empty():
(summary, sbuf) = self._rmr_loop.rcv_queue.get()
self.consume(summary, sbuf)
class Xapp(_BaseXapp):
"""
Represents an xapp where the client provides a generic function to call, which is mostly likely a loop-forever loop
"""
def loop(self):
"""
This function is to be implemented by the client and is called
"""
raise NotImplementedError()
def run(self):
"""
This function should be called when the client xapp is ready to start their loop
This is simple and the client could just call self.loop(), however this gives a consistent interface as the other xapps
"""
self.loop()