blob: e939b0e5a0ea57577f20bb62b78d170e248d5e98 [file] [log] [blame]
# ==================================================================================
# Copyright (c) 2020 Nokia
# Copyright (c) 2020 AT&T Intellectual Property.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==================================================================================
"""
This framework for Python Xapps provides classes that Xapp writers
should instantiate and/or subclass depending on their needs.
"""
import json
import os
import queue
import time
from threading import Thread
from typing import List, Set
import inotify_simple
from mdclogpy import Logger
from ricxappframe import xapp_rmr
from ricxappframe.constants import sdl_namespaces
import ricxappframe.entities.rnib.nodeb_info_pb2 as pb_nbi
import ricxappframe.entities.rnib.cell_pb2 as pb_cell
from ricxappframe.entities.rnib.nb_identity_pb2 import NbIdentity
from ricxappframe.entities.rnib.nodeb_info_pb2 import Node
from ricxappframe.rmr import rmr
from ricxappframe.util.constants import Constants
from ricxappframe.xapp_sdl import SDLWrapper
import requests
class _BaseXapp:
"""
This class initializes RMR, starts a thread that checks for incoming
messages, provisions an SDL object and optionally creates a
config-file watcher. This private base class should not be
instantiated by clients directly, but it defines many public methods
that may be used by clients.
If environment variable CONFIG_FILE is defined, and that variable
contains a path to an existing file, a watcher is defined to monitor
modifications (writes) to that file using the Linux kernel's inotify
feature. The watcher must be polled by calling method
config_check().
Parameters
----------
rmr_port: int (optional, default is 4562)
Port on which the RMR library listens for incoming messages.
rmr_wait_for_ready: bool (optional, default is True)
If this is True, then init waits until RMR is ready to send,
which includes having a valid routing file. This can be set
to False if the client wants to *receive only*.
use_fake_sdl: bool (optional, default is False)
if this is True, it uses the DBaaS "fake dict backend" instead
of Redis or other backends. Set this to True when developing
an xapp or during unit testing to eliminate the need for DBaaS.
post_init: function (optional, default is None)
Runs this user-provided function at the end of the init method;
its signature should be post_init(self)
"""
def __init__(self, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
"""
Documented in the class comment.
"""
# PUBLIC, can be used by xapps using self.(name):
self.logger = Logger(name=__name__)
self._appthread = None
# Start rmr rcv thread
self._rmr_loop = xapp_rmr.RmrLoop(port=rmr_port, wait_for_ready=rmr_wait_for_ready)
self._mrc = self._rmr_loop.mrc # for convenience
# SDL
self.sdl = SDLWrapper(use_fake_sdl)
# Config
# The environment variable specifies the path to the Xapp config file
self._config_path = os.environ.get(Constants.CONFIG_FILE_ENV, None)
if self._config_path and os.path.isfile(self._config_path):
self._inotify = inotify_simple.INotify()
self._inotify.add_watch(self._config_path, inotify_simple.flags.MODIFY)
self.logger.debug("__init__: watching config file {}".format(self._config_path))
else:
self._inotify = None
self.logger.warning("__init__: NOT watching any config file")
# used for thread control of Registration of Xapp
self._keep_registration = True
# configuration data for xapp registration and deregistration
self._config_data = None
if self._config_path and os.path.isfile(self._config_path):
with open(self._config_path) as json_file:
self._config_data = json.load(json_file)
else:
self._keep_registration = False
self.logger.error("__init__: Cannot Read config file for xapp Registration")
self._config_data = {}
self._appthread = Thread(target=self.registerXapp).start()
# run the optionally provided user post init
if post_init:
post_init(self)
def get_service(self, host, service):
"""
To find the url for connecting to the service
Parameters
----------
host: string
defines the hostname in the url
service: string
defines the servicename in the url
Returns
-------
string
url for the service
"""
app_namespace = self._config_data.get("APP_NAMESPACE")
if app_namespace is None:
app_namespace = Constants.DEFAULT_XAPP_NS
self.logger.debug("service : {} host : {},appnamespace : {}".format(service, host, app_namespace))
if app_namespace is not None and host is not None:
svc = service.format(app_namespace.upper(), host.upper())
urlkey = svc.replace("-", "_")
url = os.environ.get(urlkey).split("//")
self.logger.debug("Service urlkey : {} and url: {}".format(urlkey, url))
if len(url) > 1:
return url[1]
return ""
def do_post(self, plt_namespace, url, msg):
"""
registration of the xapp using the url and json msg
Parameters
----------
plt_namespace: string
platform namespace where the xapp is running
url: string
url for xapp registration
msg: string
json msg containing the xapp details
Returns
-------
bool
whether or not the xapp is registered
"""
if url is None:
self.logger.error("url is empty ")
return False
if plt_namespace is None:
self.logger.error("plt_namespace is empty")
return False
try:
request_url = url.format(plt_namespace, plt_namespace)
resp = requests.post(request_url, json=msg)
self.logger.debug("Post to '{}' done, status : {}".format(request_url, resp.status_code))
self.logger.debug("Response Text : {}".format(resp.text))
return resp.status_code == 200 or resp.status_code == 201
except requests.exceptions.RequestException as err:
self.logger.error("Error : {}".format(err))
return format(err)
except requests.exceptions.HTTPError as errh:
self.logger.error("Http Error: {}".format(errh))
return errh
except requests.exceptions.ConnectionError as errc:
self.logger.error("Error Connecting: {}".format(errc))
return errc
except requests.exceptions.Timeout as errt:
self.logger.error("Timeout Error: {}".format(errt))
return errt
def register(self):
"""
function to registers the xapp
Returns
-------
bool
whether or not the xapp is registered
"""
hostname = os.environ.get("HOSTNAME")
xappname = self._config_data.get("name")
xappversion = self._config_data.get("version")
pltnamespace = os.environ.get("PLT_NAMESPACE")
if pltnamespace is None:
pltnamespace = Constants.DEFAULT_PLT_NS
self.logger.debug("config details hostname : {} xappname: {} xappversion : {} pltnamespace : {}".format(
hostname, xappname, xappversion, pltnamespace))
http_endpoint = self.get_service(hostname, Constants.SERVICE_HTTP)
rmr_endpoint = self.get_service(hostname, Constants.SERVICE_RMR)
if http_endpoint == "" or rmr_endpoint == "":
self.logger.error(
"Couldn't resolve service endpoints: http_endpoint={} rmr_endpoint={}".format(http_endpoint,
rmr_endpoint))
return False
self.logger.debug(
"config details hostname : {} xappname: {} xappversion : {} pltnamespace : {} http_endpoint : {} rmr_endpoint "
": {} configpath : {}".format(hostname, xappname, xappversion, pltnamespace, http_endpoint, rmr_endpoint,
self._config_data.get("CONFIG_PATH")))
request_string = {
"appName": hostname,
"appVersion": xappversion,
"configPath": "",
"appInstanceName": xappname,
"httpEndpoint": http_endpoint,
"rmrEndpoint": rmr_endpoint,
"config": json.dumps(self._config_data)
}
self.logger.info("REQUEST STRING :{}".format(request_string))
return self.do_post(pltnamespace, Constants.REGISTER_PATH, request_string)
def registerXapp(self):
"""
registers the xapp
"""
retries = 5
while self._keep_registration and retries > 0:
time.sleep(2)
retries = retries-1
# checking for rmr/sdl/xapp health
healthy = self.healthcheck()
if not healthy:
self.logger.warning(
"Application='{}' is not ready yet, waiting ...".format(self._config_data.get("name")))
continue
self.logger.debug("Application='{}' is now up and ready, continue with registration ...".format(
self._config_data.get("name")))
if self.register():
self.logger.debug("Registration done, proceeding with startup ...")
break
def deregister(self):
"""
Deregisters the xapp
Returns
-------
bool
whether or not the xapp is registered
"""
healthy = self.healthcheck()
if not healthy:
self.logger.error("RMR or SDL or xapp == Not Healthy")
return None
if self._config_data is None:
return None
name = os.environ.get("HOSTNAME")
xappname = self._config_data.get("name")
pltnamespace = os.environ.get("PLT_NAMESPACE")
if pltnamespace is None:
pltnamespace = Constants.DEFAULT_PLT_NS
request_string = {
"appName": name,
"appInstanceName": xappname,
}
return self.do_post(pltnamespace, Constants.DEREGISTER_PATH, request_string)
def xapp_shutdown(self):
"""
Deregisters the xapp while shutting down
"""
self.deregister()
self.logger.debug("Wait for xapp to get unregistered")
time.sleep(10)
# Public rmr methods
def rmr_get_messages(self):
"""
Returns a generator iterable over all items in the queue that
have not yet been read by the client xapp. Each item is a tuple
(S, sbuf) where S is a message summary dict and sbuf is the raw
message. The caller MUST call rmr.rmr_free_msg(sbuf) when
finished with each sbuf to prevent memory leaks!
"""
while not self._rmr_loop.rcv_queue.empty():
(summary, sbuf) = self._rmr_loop.rcv_queue.get()
yield (summary, sbuf)
def rmr_send(self, payload, mtype, retries=100):
"""
Allocates a buffer, sets payload and mtype, and sends
Parameters
----------
payload: bytes
payload to set
mtype: int
message type
retries: int (optional)
Number of times to retry at the application level before excepting RMRFailure
Returns
-------
bool
whether or not the send worked after retries attempts
"""
sbuf = rmr.rmr_alloc_msg(vctx=self._mrc, size=len(payload), payload=payload, gen_transaction_id=True,
mtype=mtype)
for _ in range(retries):
sbuf = rmr.rmr_send_msg(self._mrc, sbuf)
if sbuf.contents.state == 0:
self.rmr_free(sbuf)
return True
self.rmr_free(sbuf)
return False
def rmr_rts(self, sbuf, new_payload=None, new_mtype=None, retries=100):
"""
Allows the xapp to return to sender, possibly adjusting the
payload and message type before doing so. This does NOT free
the sbuf for the caller as the caller may wish to perform
multiple rts per buffer. The client needs to free.
Parameters
----------
sbuf: ctypes c_void_p
Pointer to an rmr message buffer
new_payload: bytes (optional)
New payload to set
new_mtype: int (optional)
New message type (replaces the received message)
retries: int (optional, default 100)
Number of times to retry at the application level
Returns
-------
bool
whether or not the send worked after retries attempts
"""
for _ in range(retries):
sbuf = rmr.rmr_rts_msg(self._mrc, sbuf, payload=new_payload, mtype=new_mtype)
if sbuf.contents.state == 0:
return True
self.logger.warning("RTS Failed! Summary: {}".format(rmr.message_summary(sbuf)))
return False
def rmr_free(self, sbuf):
"""
Frees an rmr message buffer after use
Note: this does not need to be a class method, self is not
used. However if we break it out as a function we need a home
for it.
Parameters
----------
sbuf: ctypes c_void_p
Pointer to an rmr message buffer
"""
rmr.rmr_free_msg(sbuf)
# Convenience (pass-thru) function for invoking SDL.
def sdl_set(self, namespace, key, value, usemsgpack=True):
"""
** Deprecate Warning **
** Will be removed in a future function **
Stores a key-value pair to SDL, optionally serializing the value
to bytes using msgpack.
Parameters
----------
namespace: string
SDL namespace
key: string
SDL key
value:
Object or byte array to store. See the `usemsgpack` parameter.
usemsgpack: boolean (optional, default is True)
Determines whether the value is serialized using msgpack before storing.
If usemsgpack is True, the msgpack function `packb` is invoked
on the value to yield a byte array that is then sent to SDL.
Stated differently, if usemsgpack is True, the value can be anything
that is serializable by msgpack.
If usemsgpack is False, the value must be bytes.
"""
self.sdl.set(namespace, key, value, usemsgpack)
def sdl_get(self, namespace, key, usemsgpack=True):
"""
** Deprecate Warning **
** Will be removed in a future function **
Gets the value for the specified namespace and key from SDL,
optionally deserializing stored bytes using msgpack.
Parameters
----------
namespace: string
SDL namespace
key: string
SDL key
usemsgpack: boolean (optional, default is True)
If usemsgpack is True, the byte array stored by SDL is deserialized
using msgpack to yield the original object that was stored.
If usemsgpack is False, the byte array stored by SDL is returned
without further processing.
Returns
-------
Value
See the usemsgpack parameter for an explanation of the returned value type.
Answers None if the key is not found.
"""
return self.sdl.get(namespace, key, usemsgpack)
def sdl_find_and_get(self, namespace, prefix, usemsgpack=True):
"""
** Deprecate Warning **
** Will be removed in a future function **
Gets all key-value pairs in the specified namespace
with keys that start with the specified prefix,
optionally deserializing stored bytes using msgpack.
Parameters
----------
nnamespaces: string
SDL namespace
prefix: string
the key prefix
usemsgpack: boolean (optional, default is True)
If usemsgpack is True, the byte array stored by SDL is deserialized
using msgpack to yield the original value that was stored.
If usemsgpack is False, the byte array stored by SDL is returned
without further processing.
Returns
-------
Dictionary of key-value pairs
Each key has the specified prefix.
The value object (its type) depends on the usemsgpack parameter,
but is either a Python object or raw bytes as discussed above.
Answers an empty dictionary if no keys matched the prefix.
"""
return self.sdl.find_and_get(namespace, prefix, usemsgpack)
def sdl_delete(self, namespace, key):
"""
** Deprecate Warning **
** Will be removed in a future function **
Deletes the key-value pair with the specified key in the specified namespace.
Parameters
----------
namespace: string
SDL namespace
key: string
SDL key
"""
self.sdl.delete(namespace, key)
def _get_rnib_info(self, node_type):
"""
Since the difference between get_list_gnb_ids and get_list_enb_ids is only node-type,
this function extracted from the duplicated logic.
Parameters
----------
node_type: string
Type of node. This is EnumDescriptor.
Available node types
- UNKNOWN
- ENB
- GNB
Returns
-------
List: (NbIdentity)
Raises
------
SdlTypeError: If function's argument is of an inappropriate type.
NotConnected: If SDL is not connected to the backend data storage.
RejectedByBackend: If backend data storage rejects the request.
BackendError: If the backend data storage fails to process the request.
"""
nbid_strings: Set[bytes] = self.sdl.get_members(sdl_namespaces.E2_MANAGER, node_type, usemsgpack=False)
ret: List[NbIdentity] = []
for nbid_string in nbid_strings:
nbid = NbIdentity()
nbid.ParseFromString(nbid_string)
ret.append(nbid)
return ret
def get_list_gnb_ids(self):
"""
Retrieves the list of gNodeb identity entities
gNodeb information is stored in SDL by E2Manager. Therefore, gNode information
is stored in SDL's `e2Manager` namespace as protobuf serialized.
Returns
-------
List: (NbIdentity)
Raises
------
SdlTypeError: If function's argument is of an inappropriate type.
NotConnected: If SDL is not connected to the backend data storage.
RejectedByBackend: If backend data storage rejects the request.
BackendError: If the backend data storage fails to process the request.
"""
return self._get_rnib_info(Node.Type.Name(Node.GNB))
def get_list_enb_ids(self):
"""
Retrieves the list of eNodeb identity entities
eNodeb information is stored in SDL by E2Manager. Therefore, eNode information
is stored in SDL's `e2Manager` namespace as protobuf serialized.
Returns
-------
List: (NbIdentity)
Raises
------
SdlTypeError: If function's argument is of an inappropriate type.
NotConnected: If SDL is not connected to the backend data storage.
RejectedByBackend: If backend data storage rejects the request.
BackendError: If the backend data storage fails to process the request.
"""
return self._get_rnib_info(Node.Type.Name(Node.ENB))
"""
Following RNIB methods are made to be inline of the go-lang based RNIB methods.
Method names are same as in repository:
gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/rnib
"""
def GetNodeb(self, inventoryName):
"""
Returns nodeb info
In RNIB SDL key is defined following way: RAN:<inventoryName>
Parameters
----------
inventoryName: string
Returns
-------
NodebInfo()
Raises
------
SdlTypeError: If function's argument is of an inappropriate type.
NotConnected: If SDL is not connected to the backend data storage.
RejectedByBackend: If backend data storage rejects the request.
BackendError: If the backend data storage fails to process the request.
"""
nbid_string: Set[bytes] = self.sdl_get(sdl_namespaces.E2_MANAGER, 'RAN:' + inventoryName, usemsgpack=False)
if nbid_string is not None:
nbinfo = pb_nbi.NodebInfo()
nbinfo.ParseFromString(nbid_string)
return nbinfo
return None
def GetNodebByGlobalNbId(self, nodeType, plmnId, nbId):
"""
Returns nodeb identity based on type, plmn id and node id
In RNIB SDL key is defined following way: <nodeType>:<plmnId>:<nbId>
Parameters
----------
nodeType: string
plmnId: string
nbId: string
Returns
-------
NbIdentity()
Raises
------
SdlTypeError: If function's argument is of an inappropriate type.
NotConnected: If SDL is not connected to the backend data storage.
RejectedByBackend: If backend data storage rejects the request.
BackendError: If the backend data storage fails to process the request.
"""
nbid_string: Set[bytes] = self.sdl_get(sdl_namespaces.E2_MANAGER, nodeType + ':' + plmnId + ':' + nbId, usemsgpack=False)
if nbid_string is not None:
nbid = NbIdentity()
nbid.ParseFromString(nbid_string)
return nbid
return None
def GetCellList(self, inventoryName):
"""
Returns nodeb served cell list from the saved node data
In RNIB SDL key is defined following way: RAN:<inventoryName>
Parameters
----------
nodeType: string
plmnId: string
nbId: string
Returns
-------
ServedCellInfo() in case of ENB
ServedNRCell() in case of GNB
Raises
------
SdlTypeError: If function's argument is of an inappropriate type.
NotConnected: If SDL is not connected to the backend data storage.
RejectedByBackend: If backend data storage rejects the request.
BackendError: If the backend data storage fails to process the request.
"""
nodeb = self.GetNodeb(inventoryName)
if nodeb is not None:
if nodeb.HasField('enb'):
return nodeb.enb.served_cells
elif nodeb.HasField('gnb'):
return nodeb.gnb.served_nr_cells
return None
def GetCellById(self, cell_type, cell_id):
"""
Returns cell info by cell type and id.
In RNIB SDL keys are defined based on the cell type:
ENB type CELL:<cell_id>
GNB type NRCELL:<cell_id>
Parameters
----------
cell_type: string
Available cell types
- ENB
- GNB
Returns
-------
Cell()
Raises
------
SdlTypeError: If function's argument is of an inappropriate type.
NotConnected: If SDL is not connected to the backend data storage.
RejectedByBackend: If backend data storage rejects the request.
BackendError: If the backend data storage fails to process the request.
"""
cellstr = None
if cell_type == pb_cell.Cell.Type.Name(pb_cell.Cell.LTE_CELL):
cellstr = 'CELL'
elif cell_type == pb_cell.Cell.Type.Name(pb_cell.Cell.NR_CELL):
cellstr = 'NRCELL'
if cellstr is not None:
cell_string: Set[bytes] = self.sdl_get(sdl_namespaces.E2_MANAGER, cellstr + ':' + cell_id, usemsgpack=False)
if cell_string is not None:
cell = pb_cell.Cell()
cell.ParseFromString(cell_string)
return cell
return None
def GetListNodebIds(self):
"""
Returns both enb and gnb NbIdentity list
Returns
-------
List: (NbIdentity)
Raises
------
SdlTypeError: If function's argument is of an inappropriate type.
NotConnected: If SDL is not connected to the backend data storage.
RejectedByBackend: If backend data storage rejects the request.
BackendError: If the backend data storage fails to process the request.
"""
nlist1 = self._get_rnib_info(Node.Type.Name(Node.ENB))
nlist2 = self._get_rnib_info(Node.Type.Name(Node.GNB))
for n in nlist2:
nlist1.append(n)
return nlist1
def GetCell(self, inventoryName, pci):
"""
Returns cell info using pci
In RNIB SDL key is defined following way: PCI:<inventoryName>:<pci hex val>
Parameters
----------
inventoryName: string
pci: int
Returns
-------
Cell()
Raises
------
SdlTypeError: If function's argument is of an inappropriate type.
NotConnected: If SDL is not connected to the backend data storage.
RejectedByBackend: If backend data storage rejects the request.
BackendError: If the backend data storage fails to process the request.
"""
cell_string: Set[bytes] = self.sdl_get(sdl_namespaces.E2_MANAGER, 'PCI:{0:s}:{1:02x}'.format(inventoryName, pci), usemsgpack=False)
if cell_string is not None:
cell = pb_cell.Cell()
cell.ParseFromString(cell_string)
return cell
return None
def GetRanFunctionDefinition(self, inventoryName, ran_function_oid):
"""
Returns GNB ran function definition list based on the ran_function_oid
In RNIB SDL key is defined following way: RAN:<inventoryName>
Parameters
----------
inventoryName: string
ran_function_oid: int
Returns
-------
array of ran_function_definition matching to ran_function_oid
Raises
------
SdlTypeError: If function's argument is of an inappropriate type.
NotConnected: If SDL is not connected to the backend data storage.
RejectedByBackend: If backend data storage rejects the request.
BackendError: If the backend data storage fails to process the request.
"""
nodeb = self.GetNodeb(inventoryName)
if nodeb is not None:
if nodeb.HasField('gnb') and nodeb.gnb.ran_functions is not None:
ranFDList = []
for rf in nodeb.gnb.ran_functions:
if rf.ran_function_oid == ran_function_oid:
ranFDList.append(rf.ran_function_definition)
return ranFDList
return None
def healthcheck(self):
"""
this needs to be understood how this is supposed to work
"""
return self._rmr_loop.healthcheck() and self.sdl.healthcheck()
# Convenience function for discovering config change events
def config_check(self, timeout=0):
"""
Checks the watcher for configuration-file events. The watcher
prerequisites and event mask are documented in __init__().
Parameters
----------
timeout: int (optional)
Number of seconds to wait for a configuration-file event, default 0.
Returns
-------
List of Events, possibly empty
An event is a tuple with objects wd, mask, cookie and name.
For example::
Event(wd=1, mask=1073742080, cookie=0, name='foo')
"""
if not self._inotify:
return []
events = self._inotify.read(timeout=timeout)
return list(events)
def stop(self):
"""
cleans up and stops the xapp rmr thread (currently). This is
critical for unit testing as pytest will never return if the
thread is running.
TODO: can we register a ctrl-c handler so this gets called on
ctrl-c? Because currently two ctrl-c are needed to stop.
"""
if self._appthread is not None:
self._appthread.join()
self.xapp_shutdown()
self._rmr_loop.stop()
# Public classes that Xapp writers should instantiate or subclass
# to implement an Xapp.
class RMRXapp(_BaseXapp):
"""
Represents an Xapp that reacts only to RMR messages; i.e., the Xapp
only performs an action when a message is received. Clients should
invoke the run method, which has a loop that waits for RMR messages
and calls the appropriate client-registered consume callback on each.
If environment variable CONFIG_FILE is defined, and that variable
contains a path to an existing file, this class polls a watcher
defined on that file to detect file-write events, and invokes a
configuration-change handler on each event. The handler is also
invoked at startup. If no handler function is supplied to the
constructor, this class defines a default handler that only logs a
message.
Parameters
----------
default_handler: function
A function with the signature (summary, sbuf) to be called when a
message type is received for which no other handler is registered.
default_handler argument summary: dict
The RMR message summary, a dict of key-value pairs
default_handler argument sbuf: ctypes c_void_p
Pointer to an RMR message buffer. The user must call free on this when done.
config_handler: function (optional, default is documented above)
A function with the signature (json) to be called at startup and each time
a configuration-file change event is detected. The JSON object is read from
the configuration file, if the prerequisites are met.
config_handler argument json: dict
The contents of the configuration file, parsed as JSON.
rmr_port: integer (optional, default is 4562)
Initialize RMR to listen on this port
rmr_wait_for_ready: boolean (optional, default is True)
Wait for RMR to signal ready before starting the dispatch loop
use_fake_sdl: boolean (optional, default is False)
Use an in-memory store instead of the real SDL service
post_init: function (optional, default None)
Run this function after the app initializes and before the dispatch loop starts;
its signature should be post_init(self)
"""
def __init__(self, default_handler, config_handler=None, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False,
post_init=None):
"""
Also see _BaseXapp
"""
# init base
super().__init__(
rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl, post_init=post_init
)
# setup callbacks
self._default_handler = default_handler
self._config_handler = config_handler
self._dispatch = {}
# used for thread control
self._keep_going = True
# register a default healthcheck handler
# this default checks that rmr is working and SDL is working
# the user can override this and register their own handler
# if they wish since the "last registered callback wins".
def handle_healthcheck(self, summary, sbuf):
healthy = self.healthcheck()
payload = b"OK\n" if healthy else b"ERROR [RMR or SDL is unhealthy]\n"
self.rmr_rts(sbuf, new_payload=payload, new_mtype=Constants.RIC_HEALTH_CHECK_RESP)
self.rmr_free(sbuf)
self.register_callback(handle_healthcheck, Constants.RIC_HEALTH_CHECK_REQ)
# define a default configuration-change handler if none was provided.
if not config_handler:
def handle_config_change(self, config):
self.logger.debug("xapp_frame: default config handler invoked")
self._config_handler = handle_config_change
# call the config handler at startup if prereqs were met
if self._inotify:
with open(self._config_path) as json_file:
data = json.load(json_file)
self.logger.debug("run: invoking config handler at start")
self._config_handler(self, data)
def register_callback(self, handler, message_type):
"""
registers this xapp to call handler(summary, buf) when an rmr message is received of type message_type
Parameters
----------
handler: function
a function with the signature (summary, sbuf) to be called
when a message of type message_type is received
summary: dict
the rmr message summary
sbuf: ctypes c_void_p
Pointer to an rmr message buffer. The user must call free on this when done.
message:type: int
the message type to look for
Note if this method is called multiple times for a single message type, the "last one wins".
"""
self._dispatch[message_type] = handler
def run(self, thread=False, rmr_timeout=5, inotify_timeout=0):
"""
This function should be called when the reactive Xapp is ready to start.
After start, the Xapp's handlers will be called on received messages.
Parameters
----------
thread: bool (optional, default is False)
If False, execution is not returned and the framework loops forever.
If True, a thread is started to run the queue read/dispatch loop
and execution is returned to caller; the thread can be stopped
by calling the .stop() method.
rmr_timeout: integer (optional, default is 5 seconds)
Length of time to wait for an RMR message to arrive.
inotify_timeout: integer (optional, default is 0 seconds)
Length of time to wait for an inotify event to arrive.
"""
def loop():
while self._keep_going:
# poll RMR
try:
(summary, sbuf) = self._rmr_loop.rcv_queue.get(block=True, timeout=rmr_timeout)
# dispatch
func = self._dispatch.get(summary[rmr.RMR_MS_MSG_TYPE], None)
if not func:
func = self._default_handler
self.logger.debug("run: invoking msg handler on type {}".format(summary[rmr.RMR_MS_MSG_TYPE]))
func(self, summary, sbuf)
except queue.Empty:
# the get timed out
pass
# poll configuration file watcher
try:
events = self.config_check(timeout=inotify_timeout)
for event in events:
with open(self._config_path) as json_file:
data = json.load(json_file)
self.logger.debug("run: invoking config handler on change event {}".format(event))
self._config_handler(self, data)
except Exception as error:
self.logger.error("run: configuration handler failed: {}".format(error))
if thread:
Thread(target=loop).start()
else:
loop()
def stop(self):
"""
Sets the flag to end the dispatch loop.
"""
super().stop()
self.logger.debug("Setting flag to end framework work loop.")
self._keep_going = False
class Xapp(_BaseXapp):
"""
Represents a generic Xapp where the client provides a single function
for the framework to call at startup time (instead of providing callback
functions by message type). The Xapp writer must implement and provide a
function with a loop-forever construct similar to the `run` function in
the `RMRXapp` class. That function should poll to retrieve RMR messages
and dispatch them appropriately, poll for configuration changes, etc.
Parameters
----------
entrypoint: function
This function is called when the Xapp class's run method is invoked.
The function signature must be just function(self)
rmr_port: integer (optional, default is 4562)
Initialize RMR to listen on this port
rmr_wait_for_ready: boolean (optional, default is True)
Wait for RMR to signal ready before starting the dispatch loop
use_fake_sdl: boolean (optional, default is False)
Use an in-memory store instead of the real SDL service
"""
def __init__(self, entrypoint, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False):
"""
Parameters
----------
For the other parameters, see class _BaseXapp.
"""
# init base
super().__init__(rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl)
self._entrypoint = entrypoint
def run(self):
"""
This function should be called when the general Xapp is ready to start.
"""
self._entrypoint(self)
# there is no need for stop currently here (base has, and nothing
# special to do here)