blob: 4e5e63585f07e11a97384ec3a7143e14e3750cc5 [file] [log] [blame]
Tommy Carpenter21f659c2020-02-26 14:12:54 -05001"""
2Contains rmr functionality specific to the xapp
3The general rmr API is via "rmr"
4"""
5# ==================================================================================
6# Copyright (c) 2020 Nokia
7# Copyright (c) 2020 AT&T Intellectual Property.
8#
9# Licensed under the Apache License, Version 2.0 (the "License");
10# you may not use this file except in compliance with the License.
11# You may obtain a copy of the License at
12#
13# http://www.apache.org/licenses/LICENSE-2.0
14#
15# Unless required by applicable law or agreed to in writing, software
16# distributed under the License is distributed on an "AS IS" BASIS,
17# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18# See the License for the specific language governing permissions and
19# limitations under the License.
20# ==================================================================================
21
22
23import time
24import queue
25from threading import Thread
26from mdclogpy import Logger
27from rmr import rmr, helpers
28
29
30mdc_logger = Logger(name=__name__)
31
32
33class RmrLoop:
34 """
35 Class represents an rmr loop that constantly reads from rmr
36
37 Note, we use a queue here, and a thread, rather than the xapp frame just looping and calling consume, so that a possibly slow running consume function does not block the reading of new messages
38 """
39
40 def __init__(self, port, wait_for_ready=True):
41 """
42 sets up rmr, then launches a thread that reads and injects messages into a queue
43
44 Parameters
45 ----------
46 port: int
47 port to listen on
48
49 wait_for_ready: bool (optional)
50 if this is True, then this function hangs until rmr is ready to send, which includes having a valid routing file.
51 this can be set to False if the client only wants to *receive only*
52 """
53
54 # Public
55 # thread safe queue https://docs.python.org/3/library/queue.html
56 # We use a thread and a queue so that a long running consume callback function can never block reads.
57 # IE a consume implementation could take a long time and the ring size for rmr blows up here and messages are lost
58 self.rcv_queue = queue.Queue()
59
60 # rmr context; RMRFL_MTCALL puts RMR into a multithreaded mode, where a thread populates a ring of messages that receive calls read from
61 self.mrc = rmr.rmr_init(str(port).encode(), rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
62
63 if wait_for_ready:
64 mdc_logger.debug("Waiting for rmr to init on port {}..".format(port))
65 while rmr.rmr_ready(self.mrc) == 0:
66 time.sleep(0.1)
67
68 # Private
Tommy Carpenter99a0b482020-03-03 10:21:24 -050069 self._keep_going = True # used to tell this thread to stop it's work
70 self._last_ran = time.time() # used for healthcheck
71 self._loop_is_running = False # used in stop to know when it's safe to kill the mrc
Tommy Carpenter21f659c2020-02-26 14:12:54 -050072
73 # start the work loop
74 mdc_logger.debug("Starting loop thread")
75
76 def loop():
77 mdc_logger.debug("Work loop starting")
Tommy Carpenter99a0b482020-03-03 10:21:24 -050078 self._loop_is_running = True
Tommy Carpenter21f659c2020-02-26 14:12:54 -050079 while self._keep_going:
80
81 # read our mailbox
82 # TODO: take a flag as to whether RAW is needed or not
83 # RAW allows for RTS however the caller must free, and the caller may not need RTS.
84 # Currently after consuming, callers should do rmr.rmr_free_msg(sbuf)
85
86 for (msg, sbuf) in helpers.rmr_rcvall_msgs_raw(self.mrc):
87 self.rcv_queue.put((msg, sbuf))
88
89 self._last_ran = time.time()
90
Tommy Carpenter99a0b482020-03-03 10:21:24 -050091 self._loop_is_running = False
92
Tommy Carpenter21f659c2020-02-26 14:12:54 -050093 self._thread = Thread(target=loop)
94 self._thread.start()
95
96 def stop(self):
97 """
98 sets a flag that will cleanly stop the thread
Tommy Carpenter21f659c2020-02-26 14:12:54 -050099 """
Tommy Carpenter99a0b482020-03-03 10:21:24 -0500100 mdc_logger.debug("Stopping rmr thread. Waiting for last iteration to finish..")
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500101 self._keep_going = False
Tommy Carpenter99a0b482020-03-03 10:21:24 -0500102 # wait until the current batch of messages is done, then kill the rmr connection
103 # note; I debated putting this in "loop" however if the while loop was still going setting mrc to close here would blow up any processing still currently happening
104 # probably more polite to at least finish the current batch and then close. So here we wait until the current batch is done, then we kill the mrc
105 while self._loop_is_running:
106 pass
107 mdc_logger.debug("Closing rmr connection")
108 rmr.rmr_close(self.mrc)
Tommy Carpenter21f659c2020-02-26 14:12:54 -0500109
110 def healthcheck(self, seconds=30):
111 """
112 returns a boolean representing whether the rmr loop is healthy, by checking two attributes:
113 1. is it running?,
114 2. is it stuck in a long (> seconds) loop?
115
116 Parameters
117 ----------
118 seconds: int (optional)
119 the rmr loop is determined healthy if it has completed in the last (seconds)
120 """
121 return self._thread.is_alive() and ((time.time() - self._last_ran) < seconds)