blob: b81804e7dc230bb438939cf643e39328ae2de19a [file] [log] [blame]
Tommy Carpenter21f659c2020-02-26 14:12:54 -05001# ==================================================================================
2# Copyright (c) 2020 Nokia
3# Copyright (c) 2020 AT&T Intellectual Property.
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16# ==================================================================================
Lott, Christopher (cl778h)61270902020-05-06 09:23:55 -040017"""
18Framework for python xapps
19Framework here means Xapp classes that can be subclassed
20"""
21
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -040022from threading import Thread
Tommy Carpenter21f659c2020-02-26 14:12:54 -050023from ricxappframe import xapp_rmr
Tommy Carpenter3a6ac012020-04-06 14:42:57 -040024from ricxappframe.rmr import rmr
Lott, Christopher (cl778h)61270902020-05-06 09:23:55 -040025from ricxappframe.xapp_sdl import SDLWrapper
Tommy Carpenter21f659c2020-02-26 14:12:54 -050026from mdclogpy import Logger
27
Tommy Carpenter09894e32020-04-02 19:45:19 -040028# constants
29RIC_HEALTH_CHECK_REQ = 100
30RIC_HEALTH_CHECK_RESP = 101
31
Tommy Carpenter21f659c2020-02-26 14:12:54 -050032
Tommy Carpenter21f659c2020-02-26 14:12:54 -050033# Private base class; not for direct client use
34
35
36class _BaseXapp:
37 """
38 Base xapp; not for client use directly
39 """
40
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -040041 def __init__(self, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
Tommy Carpenter21f659c2020-02-26 14:12:54 -050042 """
43 Init
44
45 Parameters
46 ----------
47 rmr_port: int
48 port to listen on
49
50 rmr_wait_for_ready: bool (optional)
51 if this is True, then init waits until rmr is ready to send, which includes having a valid routing file.
52 this can be set to False if the client only wants to *receive only*
53
54 use_fake_sdl: bool (optional)
55 if this is True, it uses dbaas' "fake dict backend" instead of Redis or other backends.
56 Set this to true when developing your xapp or during unit testing to completely avoid needing a dbaas running or any network at all
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -040057
58 post_init: function (optional)
59 runs this user provided function after the base xapp is initialized
60 it's signature should be post_init(self)
Tommy Carpenter21f659c2020-02-26 14:12:54 -050061 """
Tommy Carpenter0f8305b2020-03-18 10:34:28 -040062 # PUBLIC, can be used by xapps using self.(name):
63 self.logger = Logger(name=__name__)
Tommy Carpenter21f659c2020-02-26 14:12:54 -050064
65 # Start rmr rcv thread
66 self._rmr_loop = xapp_rmr.RmrLoop(port=rmr_port, wait_for_ready=rmr_wait_for_ready)
67 self._mrc = self._rmr_loop.mrc # for convenience
68
69 # SDL
70 self._sdl = SDLWrapper(use_fake_sdl)
71
Tommy Carpenter99a0b482020-03-03 10:21:24 -050072 # run the optionally provided user post init
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -040073 if post_init:
74 post_init(self)
Tommy Carpenter99a0b482020-03-03 10:21:24 -050075
76 # Public rmr methods
77
Tommy Carpenter21f659c2020-02-26 14:12:54 -050078 def rmr_get_messages(self):
79 """
Lott, Christopher (cl778h)666e8312020-05-05 18:31:54 -040080 Returns a generator iterable over all items in the queue that have not yet been read by the client xapp.
81 Each item is a tuple (S, sbuf) where S is a message summary dict and sbuf is the raw message.
82 The caller MUST call rmr.rmr_free_msg(sbuf) when finished with each sbuf to prevent memory leaks!
Tommy Carpenter21f659c2020-02-26 14:12:54 -050083 """
84 while not self._rmr_loop.rcv_queue.empty():
85 (summary, sbuf) = self._rmr_loop.rcv_queue.get()
86 yield (summary, sbuf)
87
88 def rmr_send(self, payload, mtype, retries=100):
89 """
90 Allocates a buffer, sets payload and mtype, and sends
91
92 Parameters
93 ----------
94 payload: bytes
95 payload to set
96 mtype: int
97 message type
98 retries: int (optional)
99 Number of times to retry at the application level before excepting RMRFailure
100
101 Returns
102 -------
103 bool
104 whether or not the send worked after retries attempts
105 """
106 sbuf = rmr.rmr_alloc_msg(vctx=self._mrc, size=len(payload), payload=payload, gen_transaction_id=True, mtype=mtype)
107
108 for _ in range(retries):
109 sbuf = rmr.rmr_send_msg(self._mrc, sbuf)
110 if sbuf.contents.state == 0:
111 self.rmr_free(sbuf)
112 return True
113
114 self.rmr_free(sbuf)
115 return False
116
117 def rmr_rts(self, sbuf, new_payload=None, new_mtype=None, retries=100):
118 """
119 Allows the xapp to return to sender, possibly adjusting the payload and message type before doing so
120
121 This does NOT free the sbuf for the caller as the caller may wish to perform multiple rts per buffer.
122 The client needs to free.
123
124 Parameters
125 ----------
126 sbuf: ctypes c_void_p
127 Pointer to an rmr message buffer
128 new_payload: bytes (optional)
129 New payload to set
130 new_mtype: int (optional)
131 New message type (replaces the received message)
132 retries: int (optional)
133 Number of times to retry at the application level before excepting RMRFailure
134
135 Returns
136 -------
137 bool
138 whether or not the send worked after retries attempts
139 """
140 for _ in range(retries):
141 sbuf = rmr.rmr_rts_msg(self._mrc, sbuf, payload=new_payload, mtype=new_mtype)
142 if sbuf.contents.state == 0:
143 return True
144
Tommy Carpenter09894e32020-04-02 19:45:19 -0400145 self.logger.info("RTS Failed! Summary: {}".format(rmr.message_summary(sbuf)))
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500146 return False
147
148 def rmr_free(self, sbuf):
149 """
150 Free an rmr message buffer after use
151
152 Note: this does not need to be a class method, self is not used. However if we break it out as a function we need a home for it.
153 Parameters
154 ----------
155 sbuf: ctypes c_void_p
156 Pointer to an rmr message buffer
157 """
158 rmr.rmr_free_msg(sbuf)
159
160 # SDL
161 # NOTE, even though these are passthroughs, the seperate SDL wrapper is useful for other applications like A1.
162 # Therefore, we don't embed that SDLWrapper functionality here so that it can be instantiated on it's own.
163
164 def sdl_set(self, ns, key, value, usemsgpack=True):
165 """
166 set a key
167
168 Parameters
169 ----------
170 ns: string
171 the sdl namespace
172 key: string
173 the sdl key
174 value:
175 if usemsgpack is True, value can be anything serializable by msgpack
176 if usemsgpack is False, value must be bytes
177 usemsgpack: boolean (optional)
178 determines whether the value is serialized using msgpack
179 """
180 self._sdl.set(ns, key, value, usemsgpack)
181
182 def sdl_get(self, ns, key, usemsgpack=True):
183 """
184 get a key
185
186 Parameters
187 ----------
188 ns: string
189 the sdl namespace
190 key: string
191 the sdl key
192 usemsgpack: boolean (optional)
193 if usemsgpack is True, the value is deserialized using msgpack
194 if usemsgpack is False, the value is returned as raw bytes
195
196 Returns
197 -------
198 None (if not exist) or see above; depends on usemsgpack
199 """
200 return self._sdl.get(ns, key, usemsgpack)
201
202 def sdl_find_and_get(self, ns, prefix, usemsgpack=True):
203 """
204 get all k v pairs that start with prefix
205
206 Parameters
207 ----------
208 ns: string
209 the sdl namespace
210 key: string
211 the sdl key
212 prefix: string
213 the prefix
214 usemsgpack: boolean (optional)
215 if usemsgpack is True, the value returned is a dict where each value has been deserialized using msgpack
216 if usemsgpack is False, the value returned is as a dict mapping keys to raw bytes
217
218 Returns
219 -------
220 {} (if no keys match) or see above; depends on usemsgpack
221 """
222 return self._sdl.find_and_get(ns, prefix, usemsgpack)
223
224 def sdl_delete(self, ns, key):
225 """
226 delete a key
227
228 Parameters
229 ----------
230 ns: string
231 the sdl namespace
232 key: string
233 the sdl key
234 """
235 self._sdl.delete(ns, key)
236
237 # Health
238
239 def healthcheck(self):
240 """
241 this needs to be understood how this is supposed to work
242 """
243 return self._rmr_loop.healthcheck() and self._sdl.healthcheck()
244
245 def stop(self):
246 """
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -0400247 cleans up and stops the xapp rmr thread (currently)
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500248 This is critical for unit testing as pytest will never return if the thread is running.
249
250 TODO: can we register a ctrl-c handler so this gets called on ctrl-c? Because currently two ctrl-c are needed to stop
251 """
252 self._rmr_loop.stop()
253
254
255# Public Classes to subclass (these subclass _BaseXapp)
256
257
258class RMRXapp(_BaseXapp):
259 """
260 Represents an xapp that is purely driven by rmr messages (i.e., when messages are received, the xapp does something
261 When run is called, the xapp framework waits for rmr messages, and calls the client provided consume callback on every one
262 """
263
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -0400264 def __init__(self, default_handler, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500265 """
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -0400266 Parameters
267 ----------
268 default_handler: function
269 a function with the signature (summary, sbuf) to be called when a message of type message_type is received
270 summary: dict
271 the rmr message summary
272 sbuf: ctypes c_void_p
273 Pointer to an rmr message buffer. The user must call free on this when done.
274
275 post_init: function (optional)
276 optionally runs this function after the app initializes and before the run loop
277 it's signature should be post_init(self)
278
279 For the other parameters, see _BaseXapp
280 """
281 # init base
282 super().__init__(
283 rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl, post_init=post_init
284 )
285
286 # setup callbacks
287 self._default_handler = default_handler
288 self._dispatch = {}
289
290 # used for thread control
291 self._keep_going = True
292
Tommy Carpenter09894e32020-04-02 19:45:19 -0400293 # register a default healthcheck handler
294 # this default checks that rmr is working and SDL is working
295 # the user can override this and register their own handler if they wish since the "last registered callback wins".
296 def handle_healthcheck(self, summary, sbuf):
297 ok = self.healthcheck()
298 payload = b"OK\n" if ok else b"ERROR [RMR or SDL is unhealthy]\n"
299 self.rmr_rts(sbuf, new_payload=payload, new_mtype=RIC_HEALTH_CHECK_RESP)
300 self.rmr_free(sbuf)
301
302 self.register_callback(handle_healthcheck, RIC_HEALTH_CHECK_REQ)
303
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -0400304 def register_callback(self, handler, message_type):
305 """
306 registers this xapp to call handler(summary, buf) when an rmr message is received of type message_type
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500307
308 Parameters
309 ----------
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -0400310 handler: function
311 a function with the signature (summary, sbuf) to be called when a message of type message_type is received
312 summary: dict
313 the rmr message summary
314 sbuf: ctypes c_void_p
315 Pointer to an rmr message buffer. The user must call free on this when done.
316
317 message:type: int
318 the message type to look for
319
320 Note if this method is called multiple times for a single message type, the "last one wins".
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500321 """
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -0400322 self._dispatch[message_type] = handler
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500323
Tommy Carpenter1c9ce6b2020-03-13 09:36:36 -0400324 def run(self, thread=False):
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500325 """
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -0400326 This function should be called when the client xapp is ready to wait for their handlers to be called on received messages
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500327
Tommy Carpenter1c9ce6b2020-03-13 09:36:36 -0400328 Parameters
329 ----------
330 thread: bool (optional)
331 if thread is True, execution is returned to caller and the queue read loop is executed in a thread.
332 The thread can be stopped using .stop()
333 if False, execution is not returned and the framework loops
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500334 """
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -0400335
336 def loop():
337 while self._keep_going:
338 if not self._rmr_loop.rcv_queue.empty():
339 (summary, sbuf) = self._rmr_loop.rcv_queue.get()
340 # dispatch
Lott, Christopher (cl778h)61270902020-05-06 09:23:55 -0400341 func = self._dispatch.get(summary[rmr.RMR_MS_MSG_TYPE], None)
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -0400342 if not func:
343 func = self._default_handler
344 func(self, summary, sbuf)
345
Tommy Carpenter1c9ce6b2020-03-13 09:36:36 -0400346 if thread:
347 Thread(target=loop).start()
348 else:
349 loop()
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -0400350
351 def stop(self):
352 """
353 stops the rmr xapp completely.
354 """
355 super().stop()
Tommy Carpenter0f8305b2020-03-18 10:34:28 -0400356 self.logger.debug("Stopping queue reading thread..")
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -0400357 self._keep_going = False
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500358
359
360class Xapp(_BaseXapp):
361 """
362 Represents an xapp where the client provides a generic function to call, which is mostly likely a loop-forever loop
363 """
364
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -0400365 def __init__(self, entrypoint, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False):
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500366 """
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -0400367 Parameters
368 ----------
369 entrypoint: function
370 this function is called when the xapp runs; this is the user code
371 it's signature should be function(self)
372
373 For the other parameters, see _BaseXapp
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500374 """
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -0400375 # init base
376 super().__init__(rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl)
377 self._entrypoint = entrypoint
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500378
379 def run(self):
380 """
Tommy Carpenter99a0b482020-03-03 10:21:24 -0500381 This function should be called when the client xapp is ready to start their code
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500382 """
Tommy Carpenterf9cd5cc2020-03-09 13:46:37 -0400383 self._entrypoint(self)
384
385 # there is no need for stop currently here (base has, and nothing special to do here)