Use blocking get call w/ timeout to read msg queue
Add parameters to the queue get method in the xapp_frame loop that reads
messages so it waits for a message to arrive and occasionally checks for
the end-loop flag, instead of spinning the CPU at 100% while waiting.
Upgrade all to use the latest RMR, version 4.0.5.
Tweak the example xapps to emit their names in log messages.
Improve documentation especially the package overview shown at PyPI.
Issue-ID: RIC-354
Signed-off-by: Lott, Christopher (cl778h) <cl778h@att.com>
Change-Id: I08692e6ef60d199cb0b92c1c99740ae808b8885c
diff --git a/ricxappframe/xapp_frame.py b/ricxappframe/xapp_frame.py
index b81804e..4321852 100644
--- a/ricxappframe/xapp_frame.py
+++ b/ricxappframe/xapp_frame.py
@@ -19,6 +19,7 @@
Framework here means Xapp classes that can be subclassed
"""
+import queue
from threading import Thread
from ricxappframe import xapp_rmr
from ricxappframe.rmr import rmr
@@ -48,16 +49,20 @@
port to listen on
rmr_wait_for_ready: bool (optional)
- if this is True, then init waits until rmr is ready to send, which includes having a valid routing file.
- this can be set to False if the client only wants to *receive only*
+
+ if this is True, then init waits until rmr is ready to send, which
+ includes having a valid routing file. This can be set to
+ False if the client only wants to *receive only*.
use_fake_sdl: bool (optional)
- if this is True, it uses dbaas' "fake dict backend" instead of Redis or other backends.
- Set this to true when developing your xapp or during unit testing to completely avoid needing a dbaas running or any network at all
+ if this is True, it uses dbaas' "fake dict backend" instead
+ of Redis or other backends. Set this to true when developing
+ your xapp or during unit testing to completely avoid needing
+ a dbaas running or any network at all.
post_init: function (optional)
- runs this user provided function after the base xapp is initialized
- it's signature should be post_init(self)
+ runs this user provided function after the base xapp is
+ initialized; its signature should be post_init(self)
"""
# PUBLIC, can be used by xapps using self.(name):
self.logger = Logger(name=__name__)
@@ -77,9 +82,11 @@
def rmr_get_messages(self):
"""
- Returns a generator iterable over all items in the queue that have not yet been read by the client xapp.
- Each item is a tuple (S, sbuf) where S is a message summary dict and sbuf is the raw message.
- The caller MUST call rmr.rmr_free_msg(sbuf) when finished with each sbuf to prevent memory leaks!
+ Returns a generator iterable over all items in the queue that
+ have not yet been read by the client xapp. Each item is a tuple
+ (S, sbuf) where S is a message summary dict and sbuf is the raw
+ message. The caller MUST call rmr.rmr_free_msg(sbuf) when
+ finished with each sbuf to prevent memory leaks!
"""
while not self._rmr_loop.rcv_queue.empty():
(summary, sbuf) = self._rmr_loop.rcv_queue.get()
@@ -116,10 +123,10 @@
def rmr_rts(self, sbuf, new_payload=None, new_mtype=None, retries=100):
"""
- Allows the xapp to return to sender, possibly adjusting the payload and message type before doing so
-
- This does NOT free the sbuf for the caller as the caller may wish to perform multiple rts per buffer.
- The client needs to free.
+ Allows the xapp to return to sender, possibly adjusting the
+ payload and message type before doing so. This does NOT free
+ the sbuf for the caller as the caller may wish to perform
+ multiple rts per buffer. The client needs to free.
Parameters
----------
@@ -130,7 +137,8 @@
new_mtype: int (optional)
New message type (replaces the received message)
retries: int (optional)
- Number of times to retry at the application level before excepting RMRFailure
+ Number of times to retry at the application level before
+ throwing exception RMRFailure
Returns
-------
@@ -147,9 +155,12 @@
def rmr_free(self, sbuf):
"""
- Free an rmr message buffer after use
+ Frees an rmr message buffer after use
- Note: this does not need to be a class method, self is not used. However if we break it out as a function we need a home for it.
+ Note: this does not need to be a class method, self is not
+ used. However if we break it out as a function we need a home
+ for it.
+
Parameters
----------
sbuf: ctypes c_void_p
@@ -158,8 +169,10 @@
rmr.rmr_free_msg(sbuf)
# SDL
- # NOTE, even though these are passthroughs, the seperate SDL wrapper is useful for other applications like A1.
- # Therefore, we don't embed that SDLWrapper functionality here so that it can be instantiated on it's own.
+ # NOTE, even though these are passthroughs, the seperate SDL wrapper
+ # is useful for other applications like A1. Therefore, we don't
+ # embed that SDLWrapper functionality here so that it can be
+ # instantiated on its own.
def sdl_set(self, ns, key, value, usemsgpack=True):
"""
@@ -244,10 +257,12 @@
def stop(self):
"""
- cleans up and stops the xapp rmr thread (currently)
- This is critical for unit testing as pytest will never return if the thread is running.
+ cleans up and stops the xapp rmr thread (currently). This is
+ critical for unit testing as pytest will never return if the
+ thread is running.
- TODO: can we register a ctrl-c handler so this gets called on ctrl-c? Because currently two ctrl-c are needed to stop
+ TODO: can we register a ctrl-c handler so this gets called on
+ ctrl-c? Because currently two ctrl-c are needed to stop.
"""
self._rmr_loop.stop()
@@ -257,8 +272,10 @@
class RMRXapp(_BaseXapp):
"""
- Represents an xapp that is purely driven by rmr messages (i.e., when messages are received, the xapp does something
- When run is called, the xapp framework waits for rmr messages, and calls the client provided consume callback on every one
+ Represents an xapp that is purely driven by RMR messages; i.e., when
+ messages are received, the xapp does something. When run is called,
+ the xapp framework waits for rmr messages, and calls the
+ client-provided consume callback on every one.
"""
def __init__(self, default_handler, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
@@ -266,15 +283,16 @@
Parameters
----------
default_handler: function
- a function with the signature (summary, sbuf) to be called when a message of type message_type is received
- summary: dict
- the rmr message summary
- sbuf: ctypes c_void_p
- Pointer to an rmr message buffer. The user must call free on this when done.
-
+ a function with the signature (summary, sbuf) to be called
+ when a message of type message_type is received.
+ summary: dict
+ the rmr message summary
+ sbuf: ctypes c_void_p
+ Pointer to an rmr message buffer. The user must call free on
+ this when done.
post_init: function (optional)
- optionally runs this function after the app initializes and before the run loop
- it's signature should be post_init(self)
+ optionally runs this function after the app initializes and
+ before the run loop; its signature should be post_init(self)
For the other parameters, see _BaseXapp
"""
@@ -292,7 +310,8 @@
# register a default healthcheck handler
# this default checks that rmr is working and SDL is working
- # the user can override this and register their own handler if they wish since the "last registered callback wins".
+ # the user can override this and register their own handler
+ # if they wish since the "last registered callback wins".
def handle_healthcheck(self, summary, sbuf):
ok = self.healthcheck()
payload = b"OK\n" if ok else b"ERROR [RMR or SDL is unhealthy]\n"
@@ -308,11 +327,12 @@
Parameters
----------
handler: function
- a function with the signature (summary, sbuf) to be called when a message of type message_type is received
- summary: dict
- the rmr message summary
- sbuf: ctypes c_void_p
- Pointer to an rmr message buffer. The user must call free on this when done.
+ a function with the signature (summary, sbuf) to be called
+ when a message of type message_type is received
+ summary: dict
+ the rmr message summary
+ sbuf: ctypes c_void_p
+ Pointer to an rmr message buffer. The user must call free on this when done.
message:type: int
the message type to look for
@@ -323,25 +343,30 @@
def run(self, thread=False):
"""
- This function should be called when the client xapp is ready to wait for their handlers to be called on received messages
+ This function should be called when the client xapp is ready to
+ wait for its handlers to be called on received messages.
Parameters
----------
thread: bool (optional)
- if thread is True, execution is returned to caller and the queue read loop is executed in a thread.
- The thread can be stopped using .stop()
- if False, execution is not returned and the framework loops
+ If True, a thread is started to run the queue read/dispatch loop
+ and execution is returned to caller; the thread can be stopped
+ by calling .stop(). If False (the default), execution is not
+ returned and the framework loops forever.
"""
def loop():
while self._keep_going:
- if not self._rmr_loop.rcv_queue.empty():
- (summary, sbuf) = self._rmr_loop.rcv_queue.get()
+ try:
+ (summary, sbuf) = self._rmr_loop.rcv_queue.get(block=True, timeout=5)
# dispatch
func = self._dispatch.get(summary[rmr.RMR_MS_MSG_TYPE], None)
if not func:
func = self._default_handler
func(self, summary, sbuf)
+ except queue.Empty:
+ # the get timed out
+ pass
if thread:
Thread(target=loop).start()
@@ -350,16 +375,17 @@
def stop(self):
"""
- stops the rmr xapp completely.
+ Sets the flag to end the dispatch loop.
"""
super().stop()
- self.logger.debug("Stopping queue reading thread..")
+ self.logger.debug("Setting flag to end framework work loop.")
self._keep_going = False
class Xapp(_BaseXapp):
"""
- Represents an xapp where the client provides a generic function to call, which is mostly likely a loop-forever loop
+ Represents an xapp where the client provides a generic function to
+ call, which is mostly likely a loop-forever loop.
"""
def __init__(self, entrypoint, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False):
@@ -367,8 +393,8 @@
Parameters
----------
entrypoint: function
- this function is called when the xapp runs; this is the user code
- it's signature should be function(self)
+ this function is called when the xapp runs; this is the user code.
+ its signature should be function(self)
For the other parameters, see _BaseXapp
"""
@@ -378,8 +404,10 @@
def run(self):
"""
- This function should be called when the client xapp is ready to start their code
+ This function should be called when the client xapp is ready to
+ start their code.
"""
self._entrypoint(self)
- # there is no need for stop currently here (base has, and nothing special to do here)
+ # there is no need for stop currently here (base has, and nothing
+ # special to do here)