Add configuration-change API
If a configuration file path is defined in an environment variable,
use the Linux kernel's inotify feature to define a watcher on that file.
Xapps that subclass RMRXapp can supply a configuration-change handler
that the framework invokes on write events by polling the watcher.
Xapps that subclass Xapp must invoke a method to poll the watcher.
Bump version to 1.3.0
Issue-ID: RIC-425
Signed-off-by: Lott, Christopher (cl778h) <cl778h@att.com>
Change-Id: I070b36bc7e5a9dcd66c08da0304f7bf9e6a794a1
diff --git a/ricxappframe/xapp_frame.py b/ricxappframe/xapp_frame.py
index 87ab27e..994e52b 100644
--- a/ricxappframe/xapp_frame.py
+++ b/ricxappframe/xapp_frame.py
@@ -15,54 +15,64 @@
# limitations under the License.
# ==================================================================================
"""
-Framework for python xapps
-Framework here means Xapp classes that can be subclassed
+This framework for Python Xapps provides classes that Xapp writers should
+instantiate and/or subclass depending on their needs.
"""
+import json
+import os
import queue
from threading import Thread
+import inotify_simple
+from mdclogpy import Logger
from ricxappframe import xapp_rmr
from ricxappframe.rmr import rmr
from ricxappframe.xapp_sdl import SDLWrapper
-from mdclogpy import Logger
-# constants
+# message-type constants
RIC_HEALTH_CHECK_REQ = 100
RIC_HEALTH_CHECK_RESP = 101
+# environment variable with path to configuration file
+CONFIG_FILE_ENV = "CONFIG_FILE"
# Private base class; not for direct client use
class _BaseXapp:
"""
- Base xapp; not for client use directly
+ This base class initializes RMR by starting a thread that checks for
+ incoming messages, and provisions an SDL object.
+
+ If environment variable CONFIG_FILE_ENV is defined, and that value is a
+ path to an existing file, a watcher is defined to monitor modifications
+ (writes) to that file using the Linux kernel's inotify feature, and the
+ configuration-change handler function is invoked. The watcher can be
+ polled by calling method config_check().
+
+ Parameters
+ ----------
+ rmr_port: int
+ port to listen on
+
+ rmr_wait_for_ready: bool (optional)
+ If this is True, then init waits until rmr is ready to send, which
+ includes having a valid routing file. This can be set to
+ False if the client only wants to *receive only*.
+
+ use_fake_sdl: bool (optional)
+ if this is True, it uses the dbaas "fake dict backend" instead
+ of Redis or other backends. Set this to true when developing
+ an xapp or during unit testing to eliminate the need for DBAAS.
+
+ post_init: function (optional)
+ Runs this user-provided function after the base xapp is
+ initialized; its signature should be post_init(self)
"""
def __init__(self, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
"""
- Init
-
- Parameters
- ----------
- rmr_port: int
- port to listen on
-
- rmr_wait_for_ready: bool (optional)
-
- if this is True, then init waits until rmr is ready to send, which
- includes having a valid routing file. This can be set to
- False if the client only wants to *receive only*.
-
- use_fake_sdl: bool (optional)
- if this is True, it uses dbaas' "fake dict backend" instead
- of Redis or other backends. Set this to true when developing
- your xapp or during unit testing to completely avoid needing
- a dbaas running or any network at all.
-
- post_init: function (optional)
- runs this user provided function after the base xapp is
- initialized; its signature should be post_init(self)
+ Documented in the class comment.
"""
# PUBLIC, can be used by xapps using self.(name):
self.logger = Logger(name=__name__)
@@ -74,6 +84,17 @@
# SDL
self._sdl = SDLWrapper(use_fake_sdl)
+ # Config
+ # The environment variable specifies the path to the Xapp config file
+ self._config_path = os.environ.get(CONFIG_FILE_ENV, None)
+ if self._config_path and os.path.isfile(self._config_path):
+ self._inotify = inotify_simple.INotify()
+ self._inotify.add_watch(self._config_path, inotify_simple.flags.MODIFY)
+ self.logger.debug("__init__: watching config file {}".format(self._config_path))
+ else:
+ self._inotify = None
+ self.logger.warning("__init__: NOT watching any config file")
+
# run the optionally provided user post init
if post_init:
post_init(self)
@@ -150,7 +171,7 @@
if sbuf.contents.state == 0:
return True
- self.logger.info("RTS Failed! Summary: {}".format(rmr.message_summary(sbuf)))
+ self.logger.warning("RTS Failed! Summary: {}".format(rmr.message_summary(sbuf)))
return False
def rmr_free(self, sbuf):
@@ -168,16 +189,12 @@
"""
rmr.rmr_free_msg(sbuf)
- # SDL
- # NOTE, even though these are passthroughs, the separate SDL wrapper
- # is useful for other applications like A1. Therefore, we don't
- # embed that SDLWrapper functionality here so that it can be
- # instantiated on its own.
+ # Convenience (pass-thru) function for invoking SDL.
def sdl_set(self, ns, key, value, usemsgpack=True):
"""
- Stores a key-value pair,
- optionally serializing the value to bytes using msgpack.
+ Stores a key-value pair to SDL, optionally serializing the value
+ to bytes using msgpack.
Parameters
----------
@@ -199,7 +216,7 @@
def sdl_get(self, ns, key, usemsgpack=True):
"""
- Gets the value for the specified namespace and key,
+ Gets the value for the specified namespace and key from SDL,
optionally deserializing stored bytes using msgpack.
Parameters
@@ -271,6 +288,32 @@
"""
return self._rmr_loop.healthcheck() and self._sdl.healthcheck()
+ # Convenience function for discovering config change events
+
+ def config_check(self, timeout=0):
+ """
+ Checks the watcher for configuration-file events. The watcher
+ prerequisites and event mask are documented in __init__().
+
+ Parameters
+ ----------
+ timeout: int (optional)
+ Number of seconds to wait for a configuration-file event, default 0.
+
+ Returns
+ -------
+ List of Events, possibly empty
+ An event is a tuple with objects wd, mask, cookie and name.
+ For example::
+
+ Event(wd=1, mask=1073742080, cookie=0, name='foo')
+
+ """
+ if not self._inotify:
+ return []
+ events = self._inotify.read(timeout=timeout)
+ return list(events)
+
def stop(self):
"""
cleans up and stops the xapp rmr thread (currently). This is
@@ -283,7 +326,8 @@
self._rmr_loop.stop()
-# Public Classes to subclass (these subclass _BaseXapp)
+# Public classes that Xapp writers should instantiate or subclass
+# to implement an Xapp.
class RMRXapp(_BaseXapp):
@@ -293,6 +337,11 @@
the xapp framework waits for RMR messages, and calls the appropriate
client-registered consume callback on each.
+ If environment variable CONFIG_FILE_ENV is defined, and that value is a
+ path to an existing file, the configuration-change handler is invoked at
+ startup and on each configuration-file write event. If no handler is
+ supplied, this class defines a default handler that logs each invocation.
+
Parameters
----------
default_handler: function
@@ -302,6 +351,12 @@
The RMR message summary, a dict of key-value pairs
default_handler argument sbuf: ctypes c_void_p
Pointer to an RMR message buffer. The user must call free on this when done.
+ config_handler: function (optional, default is documented above)
+ A function with the signature (json) to be called at startup and each time
+ a configuration-file change event is detected. The JSON object is read from
+ the configuration file, if the prerequisites are met.
+ config_handler argument json: dict
+ The contents of the configuration file, parsed as JSON.
rmr_port: integer (optional, default is 4562)
Initialize RMR to listen on this port
rmr_wait_for_ready: boolean (optional, default is True)
@@ -313,7 +368,7 @@
its signature should be post_init(self)
"""
- def __init__(self, default_handler, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
+ def __init__(self, default_handler, config_handler=None, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
"""
Also see _BaseXapp
"""
@@ -324,6 +379,7 @@
# setup callbacks
self._default_handler = default_handler
+ self._config_handler = config_handler
self._dispatch = {}
# used for thread control
@@ -341,6 +397,19 @@
self.register_callback(handle_healthcheck, RIC_HEALTH_CHECK_REQ)
+ # define a default configuration-change handler if none was provided.
+ if not config_handler:
+ def handle_config_change(self, config):
+ self.logger.debug("xapp_frame: default config handler invoked")
+ self._config_handler = handle_config_change
+
+ # call the config handler at startup if prereqs were met
+ if self._inotify:
+ with open(self._config_path) as json_file:
+ data = json.load(json_file)
+ self.logger.debug("run: invoking config handler at start")
+ self._config_handler(self, data)
+
def register_callback(self, handler, message_type):
"""
registers this xapp to call handler(summary, buf) when an rmr message is received of type message_type
@@ -362,7 +431,7 @@
"""
self._dispatch[message_type] = handler
- def run(self, thread=False):
+ def run(self, thread=False, rmr_timeout=5, inotify_timeout=0):
"""
This function should be called when the reactive Xapp is ready to start.
After start, the Xapp's handlers will be called on received messages.
@@ -374,21 +443,41 @@
If True, a thread is started to run the queue read/dispatch loop
and execution is returned to caller; the thread can be stopped
by calling the .stop() method.
+
+ rmr_timeout: integer (optional, default is 5 seconds)
+ Length of time to wait for an RMR message to arrive.
+
+ inotify_timeout: integer (optional, default is 0 seconds)
+ Length of time to wait for an inotify event to arrive.
"""
def loop():
while self._keep_going:
+
+ # poll RMR
try:
- (summary, sbuf) = self._rmr_loop.rcv_queue.get(block=True, timeout=5)
+ (summary, sbuf) = self._rmr_loop.rcv_queue.get(block=True, timeout=rmr_timeout)
# dispatch
func = self._dispatch.get(summary[rmr.RMR_MS_MSG_TYPE], None)
if not func:
func = self._default_handler
+ self.logger.debug("run: invoking msg handler on type {}".format(summary[rmr.RMR_MS_MSG_TYPE]))
func(self, summary, sbuf)
except queue.Empty:
# the get timed out
pass
+ # poll configuration file watcher
+ try:
+ events = self.config_check(timeout=inotify_timeout)
+ for event in events:
+ with open(self._config_path) as json_file:
+ data = json.load(json_file)
+ self.logger.debug("run: invoking config handler on change event {}".format(event))
+ self._config_handler(self, data)
+ except Exception as error:
+ self.logger.error("run: configuration handler failed: {}".format(error))
+
if thread:
Thread(target=loop).start()
else:
@@ -405,8 +494,12 @@
class Xapp(_BaseXapp):
"""
- Represents a generic Xapp where the client provides a function for the framework to call,
- which usually contains a loop-forever construct.
+ Represents a generic Xapp where the client provides a single function
+ for the framework to call at startup time (instead of providing callback
+ functions by message type). The Xapp writer must implement and provide a
+ function with a loop-forever construct similar to the `run` function in
+ the `RMRXapp` class. That function should poll to retrieve RMR messages
+ and dispatch them appropriately, poll for configuration changes, etc.
Parameters
----------