Threading pt 1
* a1 now has a seperate, continuous polling thread
this will enable operations like database cleanup (based on ACKs) and external notifications in real time,
rather than when the API is invoked
* all rmr send and receive operations are now in this thread
* introduces a thread safe job queue between the two threads
* Not done yet: database cleanups in the thread
* Bump rmr python version
Change-Id: I29ca7843ffe7497c84891920f8aee332ac676591
Signed-off-by: Tommy Carpenter <tc677g@att.com>
diff --git a/a1/a1rmr.py b/a1/a1rmr.py
index 51b5694..d6114bf 100644
--- a/a1/a1rmr.py
+++ b/a1/a1rmr.py
@@ -15,40 +15,46 @@
# limitations under the License.
# ==================================================================================
import os
-import gevent
+import queue
+import time
+import json
from rmr import rmr, helpers
from a1 import get_module_logger
+from a1 import data
+from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound
logger = get_module_logger(__name__)
RETRY_TIMES = int(os.environ.get("RMR_RETRY_TIMES", 4))
-MRC = None
+
+_SEND_QUEUE = queue.Queue() # thread safe queue https://docs.python.org/3/library/queue.html
-def init_rmr():
+def _init_rmr():
"""
- called from run; not called for unit tests
+ init an rmr context
+ This gets monkeypatched out for unit testing
"""
- global MRC
# rmr.RMRFL_MTCALL puts RMR into a multithreaded mode, where a receiving thread populates an
# internal ring of messages, and receive calls read from that
# currently the size is 2048 messages, so this is fine for the foreseeable future
- MRC = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
+ mrc = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
- while rmr.rmr_ready(MRC) == 0:
- gevent.sleep(1)
- logger.debug("not yet ready")
+ while rmr.rmr_ready(mrc) == 0:
+ time.sleep(0.5)
+
+ return mrc
-def send(payload, message_type=0):
+def _send(mrc, payload, message_type=0):
"""
Sends a message up to RETRY_TIMES
If the message is sent successfully, it returns the transactionid
Does nothing otherwise
"""
- # we may be called many times in asynchronous loops, so for now, it is safer not to share buffers. We can investigate later whether this is really a problem.
- sbuf = rmr.rmr_alloc_msg(MRC, 4096)
+ # TODO: investigate moving this below and allocating the space based on the payload size
+ sbuf = rmr.rmr_alloc_msg(mrc, 4096)
payload = payload if isinstance(payload, bytes) else payload.encode("utf-8")
# retry RETRY_TIMES to send the message
@@ -63,7 +69,7 @@
transaction_id = pre_send_summary["transaction id"] # save the transactionid because we need it later
# send
- sbuf = rmr.rmr_send_msg(MRC, sbuf)
+ sbuf = rmr.rmr_send_msg(mrc, sbuf)
post_send_summary = rmr.message_summary(sbuf)
logger.debug("Post message send summary: %s", rmr.message_summary(sbuf))
@@ -80,8 +86,85 @@
return None
-def dequeue_all_waiting_messages(filter_type=[]):
+def _update_all_statuses(mrc):
"""
- dequeue all waiting rmr messages from rmr
+ get all waiting messages, and try to parse them as status updates
+ (currently, those are the only messages a1 should get, this may have to be revisited later)
"""
- return helpers.rmr_rcvall_msgs(MRC, filter_type)
+ for msg in helpers.rmr_rcvall_msgs(mrc, [21024]):
+ try:
+ pay = json.loads(msg["payload"])
+ data.set_status(pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"])
+ except (PolicyTypeNotFound, PolicyInstanceNotFound, KeyError):
+ logger.debug("Dropping malformed or non applicable message")
+ logger.debug(msg)
+
+
+# Public
+
+
+def queue_work(item):
+ """
+ push an item into the work queue
+ currently the only type of work is to send out messages
+ """
+ _SEND_QUEUE.put(item)
+
+
+class RmrLoop:
+ """
+ class represents an rmr loop meant to be called as a longstanding separate thread
+ """
+
+ def __init__(self, real_init=True):
+ self._rmr_is_ready = False
+ self._keep_going = True
+ self._real_init = real_init # useful for unit testing to turn off initialization
+
+ def rmr_is_ready(self):
+ """returns whether rmr has been initialized"""
+ return self._rmr_is_ready
+
+ def stop(self):
+ """sets a flag for the loop to end"""
+ self._keep_going = False
+
+ def loop(self):
+ """
+ This loop runs in an a1 thread forever, and has 3 jobs:
+ - send out any messages that have to go out (create instance, delete instance)
+ - read a1s mailbox and update the status of all instances based on acks from downstream policy handlers
+ - clean up the database (eg delete the instance) under certain conditions based on those statuses (NOT DONE YET)
+ """
+
+ # get a context
+ mrc = None
+ logger.debug("Waiting for rmr to initialize...")
+ if self._real_init:
+ mrc = _init_rmr()
+ self._rmr_is_ready = True
+ logger.debug("Rmr is ready")
+
+ # loop forever
+ logger.debug("Work loop starting")
+ while self._keep_going:
+ """
+ We never raise an exception here. Log and keep moving
+ Bugs will eventually be caught be examining logs.
+ """
+ try:
+ # First, send out all messages waiting for us
+ while not _SEND_QUEUE.empty():
+ work_item = _SEND_QUEUE.get(block=False, timeout=None)
+ _send(mrc, payload=work_item["payload"], message_type=work_item["msg type"])
+
+ # Next, update all statuses waiting in a1s mailbox
+ _update_all_statuses(mrc)
+
+ # TODO: next body of work is to try to clean up the database for any updated statuses
+
+ except Exception as e:
+ logger.debug("Polling thread encountered an unexpected exception, but it will continue:")
+ logger.exception(e)
+
+ time.sleep(1)
diff --git a/a1/controller.py b/a1/controller.py
index b0ae31a..b3ce88e 100644
--- a/a1/controller.py
+++ b/a1/controller.py
@@ -35,11 +35,9 @@
"""
try:
return func()
- except (ValidationError, exceptions.PolicyTypeAlreadyExists, exceptions.CantDeleteNonEmptyType) as exc:
- logger.exception(exc)
+ except (ValidationError, exceptions.PolicyTypeAlreadyExists, exceptions.CantDeleteNonEmptyType):
return "", 400
- except (exceptions.PolicyTypeNotFound, exceptions.PolicyInstanceNotFound) as exc:
- logger.exception(exc)
+ except (exceptions.PolicyTypeNotFound, exceptions.PolicyInstanceNotFound):
return "", 404
except BaseException as exc:
# catch all, should never happen...
@@ -170,7 +168,7 @@
# send rmr (best effort)
body = _gen_body_to_handler("CREATE", policy_type_id, policy_instance_id, payload=instance)
- a1rmr.send(json.dumps(body), message_type=policy_type_id)
+ a1rmr.queue_work({"payload": json.dumps(body), "msg type": policy_type_id})
return "", 202
@@ -190,7 +188,7 @@
# send rmr (best effort)
body = _gen_body_to_handler("DELETE", policy_type_id, policy_instance_id)
- a1rmr.send(json.dumps(body), message_type=policy_type_id)
+ a1rmr.queue_work({"payload": json.dumps(body), "msg type": policy_type_id})
return "", 202
diff --git a/a1/data.py b/a1/data.py
index 631d2c6..135d853 100644
--- a/a1/data.py
+++ b/a1/data.py
@@ -23,11 +23,9 @@
For now, the database is in memory.
We use dict data structures (KV) with the expectation of having to move this into Redis
"""
-import json
import msgpack
from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound, PolicyTypeAlreadyExists, CantDeleteNonEmptyType
from a1 import get_module_logger
-from a1 import a1rmr
logger = get_module_logger(__name__)
@@ -132,36 +130,9 @@
def _clean_up_type(policy_type_id):
"""
- pop through a1s mailbox, updating a1s db of all policy statuses
for all instances of type, see if it can be deleted
"""
type_is_valid(policy_type_id)
- for msg in a1rmr.dequeue_all_waiting_messages([21024]):
- # try to parse the messages as responses. Drop those that are malformed
- pay = json.loads(msg["payload"])
- if "policy_type_id" in pay and "policy_instance_id" in pay and "handler_id" in pay and "status" in pay:
- # We don't use the parameters "policy_type_id, policy_instance" from above here,
- # because we are popping the whole mailbox, which might include other statuses
- pti = pay["policy_type_id"]
- pii = pay["policy_instance_id"]
-
- try:
- """
- can't raise an exception here e.g.:
- because this is called on many functions; just drop bad status messages.
- We def don't want bad messages that happen to hit a1s mailbox to blow up anything
-
- """
- type_is_valid(pti)
- instance_is_valid(pti, pii)
- SDL.set(_generate_handler_key(pti, pii, pay["handler_id"]), pay["status"])
- except (PolicyTypeNotFound, PolicyInstanceNotFound):
- pass
-
- else:
- logger.debug("Dropping message")
- logger.debug(pay)
-
for policy_instance_id in _get_instance_list(policy_type_id):
# see if we can delete
vector = _get_statuses(policy_type_id, policy_instance_id)
@@ -286,3 +257,16 @@
"""
_clean_up_type(policy_type_id)
return _get_instance_list(policy_type_id)
+
+
+# Statuses
+
+
+def set_status(policy_type_id, policy_instance_id, handler_id, status):
+ """
+ update the database status for a handler
+ called from a1's rmr thread
+ """
+ type_is_valid(policy_type_id)
+ instance_is_valid(policy_type_id, policy_instance_id)
+ SDL.set(_generate_handler_key(policy_type_id, policy_instance_id, handler_id), status)
diff --git a/a1/run.py b/a1/run.py
index 9eb59f5..67f63d6 100644
--- a/a1/run.py
+++ b/a1/run.py
@@ -1,3 +1,6 @@
+"""
+A1 entrypoint
+"""
# ==================================================================================
# Copyright (c) 2019 Nokia
# Copyright (c) 2018-2019 AT&T Intellectual Property.
@@ -14,18 +17,36 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# ==================================================================================
+import time
+from threading import Thread
from gevent.pywsgi import WSGIServer
from a1 import get_module_logger, app
-from a1.a1rmr import init_rmr
+from a1 import a1rmr
logger = get_module_logger(__name__)
+def start_rmr_thread(real_init=True):
+ """
+ Start a1s rmr thread
+ Also called during unit testing
+ """
+ rmr_loop = a1rmr.RmrLoop(real_init)
+ thread = Thread(target=rmr_loop.loop)
+ thread.start()
+ while not rmr_loop.rmr_is_ready():
+ time.sleep(0.5)
+ return rmr_loop # return the handle; useful during unit testing
+
+
def main():
"""Entrypoint"""
- logger.debug("Initializing rmr")
- init_rmr()
+ # start rmr thread
+ logger.debug("Initializing rmr thread. A1s webserver will not start until rmr initialization is complete.")
+ start_rmr_thread()
+
+ # start webserver
logger.debug("Starting gevent server")
http_server = WSGIServer(("", 10000), app)
http_server.serve_forever()
diff --git a/container-tag.yaml b/container-tag.yaml
index 4072098..efad040 100644
--- a/container-tag.yaml
+++ b/container-tag.yaml
@@ -1,4 +1,4 @@
# The Jenkins job uses this string for the tag in the image name
# for example nexus3.o-ran-sc.org:10004/my-image-name:my-tag
---
-tag: 1.0.1
+tag: 1.0.2
diff --git a/docs/release-notes.rst b/docs/release-notes.rst
index 2060a2a..a4af2b6 100644
--- a/docs/release-notes.rst
+++ b/docs/release-notes.rst
@@ -29,7 +29,20 @@
* Represents a resillent version of 1.0.0 that uses Redis for persistence
-[1.0.1]
+[1.0.2] - 10/17/2019
+
+::
+
+ * a1 now has a seperate, continuous polling thread
+ this will enable operations like database cleanup (based on ACKs) and external notifications in real time,
+ rather than when the API is invoked
+ * all rmr send and receive operations are now in this thread
+ * introduces a thread safe job queue between the two threads
+ * Not done yet: database cleanups in the thread
+ * Bump rmr python version
+ * Clean up some logging
+
+[1.0.1] - 10/15/2019
::
diff --git a/integration_tests/a1mediator/Chart.yaml b/integration_tests/a1mediator/Chart.yaml
index 792818d..58e1e1f 100644
--- a/integration_tests/a1mediator/Chart.yaml
+++ b/integration_tests/a1mediator/Chart.yaml
@@ -1,4 +1,4 @@
apiVersion: v1
description: A1 Helm chart for Kubernetes
name: a1mediator
-version: 1.0.1
+version: 1.0.2
diff --git a/setup.py b/setup.py
index 9c242be..9894fb8 100644
--- a/setup.py
+++ b/setup.py
@@ -18,13 +18,13 @@
setup(
name="a1",
- version="1.0.1",
+ version="1.0.2",
packages=find_packages(exclude=["tests.*", "tests"]),
author="Tommy Carpenter",
description="RIC A1 Mediator for policy/intent changes",
url="https://gerrit.o-ran-sc.org/r/admin/repos/ric-plt/a1",
entry_points={"console_scripts": ["run.py=a1.run:main"]},
# we require jsonschema, should be in that list, but connexion already requires a specific version of it
- install_requires=["requests", "Flask", "connexion[swagger-ui]", "gevent", "msgpack", "rmr>=0.13.2"],
+ install_requires=["requests", "Flask", "connexion[swagger-ui]", "gevent", "msgpack", "rmr>=0.13.3"],
package_data={"a1": ["openapi.yaml"]},
)
diff --git a/tests/conftest.py b/tests/conftest.py
index c39b962..73573d0 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -1,8 +1,49 @@
+"""
+pytest conftest
+"""
+# ==================================================================================
+# Copyright (c) 2019 Nokia
+# Copyright (c) 2018-2019 AT&T Intellectual Property.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ==================================================================================
+import tempfile
+import os
import pytest
+from a1 import app
+
+
+@pytest.fixture
+def client():
+ """
+ http://flask.pocoo.org/docs/1.0/testing/
+ """
+
+ db_fd, app.app.config["DATABASE"] = tempfile.mkstemp()
+ app.app.config["TESTING"] = True
+ cl = app.app.test_client()
+
+ yield cl
+
+ os.close(db_fd)
+ os.unlink(app.app.config["DATABASE"])
@pytest.fixture
def adm_type_good():
+ """
+ represents a good put for adm control type
+ """
return {
"name": "Admission Control",
"description": "various parameters to control admission of dual connection",
@@ -41,4 +82,7 @@
@pytest.fixture
def adm_instance_good():
+ """
+ represents a good put for adm control instance
+ """
return {"enforce": True, "window_length": 10, "blocking_rate": 20, "trigger_threshold": 10}
diff --git a/tests/test_controller.py b/tests/test_controller.py
index f4e0ae2..e5c3ac7 100644
--- a/tests/test_controller.py
+++ b/tests/test_controller.py
@@ -14,12 +14,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# ==================================================================================
-import tempfile
-import os
+import time
from rmr.rmr_mocks import rmr_mocks
-from a1 import app
-import pytest
+from a1 import run
ADM_CTRL = "admission_control_policy"
@@ -30,23 +28,8 @@
TEST_TYPE = "/a1-p/policytypes/20001"
-# http://flask.pocoo.org/docs/1.0/testing/
-@pytest.fixture
-def client():
- db_fd, app.app.config["DATABASE"] = tempfile.mkstemp()
- app.app.config["TESTING"] = True
- cl = app.app.test_client()
-
- yield cl
-
- os.close(db_fd)
- os.unlink(app.app.config["DATABASE"])
-
-
-def _fake_dequeue(_filter_type):
- """
- for monkeypatching a1rmnr.dequeue_all_messages with a good status
- """
+def _fake_dequeue(_mrc, _filter_type):
+ """for monkeypatching with a good status"""
fake_msg = {}
pay = b'{"policy_type_id": 20000, "policy_instance_id": "admission_control_policy", "handler_id": "test_receiver", "status": "OK"}'
fake_msg["payload"] = pay
@@ -54,17 +37,13 @@
return new_messages
-def _fake_dequeue_none(_filter_type):
- """
- for monkeypatching a1rmnr.dequeue_all_messages with no waiting messages
- """
+def _fake_dequeue_none(_mrc, _filter_type):
+ """for monkeypatching with no waiting messages"""
return []
-def _fake_dequeue_deleted(_filter_type):
- """
- for monkeypatching a1rmnr.dequeue_all_messages with a DELETED status
- """
+def _fake_dequeue_deleted(_mrc, _filter_type):
+ """for monkeypatching with a DELETED status"""
new_msgs = []
# insert some that don't exist to make sure nothing blows up
@@ -85,7 +64,8 @@
def _test_put_patch(monkeypatch):
rmr_mocks.patch_rmr(monkeypatch)
- monkeypatch.setattr("rmr.rmr.rmr_send_msg", rmr_mocks.send_mock_generator(0)) # good sends for this whole batch
+ # assert that rmr bad states don't cause problems
+ monkeypatch.setattr("rmr.rmr.rmr_send_msg", rmr_mocks.send_mock_generator(10))
# we need this because free expects a real sbuf
# TODO: move this into rmr_mocks
@@ -109,12 +89,24 @@
monkeypatch.setattr("rmr.rmr.generate_and_set_transaction_id", fake_set_transactionid)
+# Module level Hack
+
+
+RMR_THREAD = None
+
+
+def setup_module():
+ """module level setup"""
+ global RMR_THREAD
+ RMR_THREAD = run.start_rmr_thread(real_init=False)
+
+
# Actual Tests
def test_workflow_nothing_there_yet(client, monkeypatch, adm_type_good, adm_instance_good):
""" test policy put good"""
-
+ monkeypatch.setattr("rmr.helpers.rmr_rcvall_msgs", _fake_dequeue_none)
# no type there yet
res = client.get(ADM_CTRL_TYPE)
assert res.status_code == 404
@@ -125,12 +117,15 @@
assert res.json == []
# instance 404 because type not there yet
- monkeypatch.setattr("a1.a1rmr.dequeue_all_waiting_messages", _fake_dequeue_none)
res = client.get(ADM_CTRL_POLICIES)
assert res.status_code == 404
def test_workflow(client, monkeypatch, adm_type_good, adm_instance_good):
+ """
+ test a full A1 workflow
+ """
+ monkeypatch.setattr("rmr.helpers.rmr_rcvall_msgs", _fake_dequeue_none)
# put the type
res = client.put(ADM_CTRL_TYPE, json=adm_type_good)
assert res.status_code == 201
@@ -148,7 +143,6 @@
assert res.json == [20000]
# instance 200 but empty list
- monkeypatch.setattr("a1.a1rmr.dequeue_all_waiting_messages", _fake_dequeue_none)
res = client.get(ADM_CTRL_POLICIES)
assert res.status_code == 200
assert res.json == []
@@ -161,8 +155,6 @@
# create a good instance
_test_put_patch(monkeypatch)
- # assert that rmr bad states don't cause problems
- monkeypatch.setattr("rmr.rmr.rmr_send_msg", rmr_mocks.send_mock_generator(10))
res = client.put(ADM_CTRL_INSTANCE, json=adm_instance_good)
assert res.status_code == 202
@@ -186,12 +178,13 @@
assert res.status_code == 200
assert res.get_data(as_text=True) == expected
- # try a status get but pretend we didn't get any ACKs yet to test NOT IN EFFECT
- monkeypatch.setattr("a1.a1rmr.dequeue_all_waiting_messages", _fake_dequeue_none)
+ # try a status get but we didn't get any ACKs yet to test NOT IN EFFECT
+ time.sleep(1) # wait for the rmr thread
get_instance_good("NOT IN EFFECT")
# now pretend we did get a good ACK
- monkeypatch.setattr("a1.a1rmr.dequeue_all_waiting_messages", _fake_dequeue)
+ monkeypatch.setattr("rmr.helpers.rmr_rcvall_msgs", _fake_dequeue)
+ time.sleep(1) # wait for the rmr thread
get_instance_good("IN EFFECT")
# cant delete type until there are no instances
@@ -205,11 +198,13 @@
assert res.status_code == 202
# status after a delete, but there are no messages yet, should still return
- monkeypatch.setattr("a1.a1rmr.dequeue_all_waiting_messages", _fake_dequeue)
+ monkeypatch.setattr("rmr.helpers.rmr_rcvall_msgs", _fake_dequeue)
+ time.sleep(1) # wait for the rmr thread
get_instance_good("IN EFFECT")
# now pretend we deleted successfully
- monkeypatch.setattr("a1.a1rmr.dequeue_all_waiting_messages", _fake_dequeue_deleted)
+ monkeypatch.setattr("rmr.helpers.rmr_rcvall_msgs", _fake_dequeue_deleted)
+ time.sleep(1) # wait for the rmr thread
res = client.get(ADM_CTRL_INSTANCE_STATUS) # cant get status
assert res.status_code == 404
res = client.get(ADM_CTRL_INSTANCE) # cant get instance
@@ -253,7 +248,8 @@
assert res.status_code == 404
# get a non existent instance
- monkeypatch.setattr("a1.a1rmr.dequeue_all_waiting_messages", _fake_dequeue)
+ monkeypatch.setattr("rmr.helpers.rmr_rcvall_msgs", _fake_dequeue)
+ time.sleep(1)
res = client.get(ADM_CTRL_INSTANCE + "DARKNESS")
assert res.status_code == 404
@@ -278,3 +274,8 @@
"""
res = client.get("/a1-p/healthcheck")
assert res.status_code == 200
+
+
+def teardown_module():
+ """module teardown"""
+ RMR_THREAD.stop()
diff --git a/tox.ini b/tox.ini
index 70389b6..8a47c3c 100644
--- a/tox.ini
+++ b/tox.ini
@@ -29,6 +29,7 @@
# Note, before this will work, for the first time on that machine, run ./install_deps.sh
commands =
+# sometimes the -s flag is helpful; add -s after pytest; which streams the logs as they come in, rather than saving them all for the end of tests
pytest --junitxml xunit-results.xml --cov a1 --cov-report xml --cov-report term-missing --cov-report html --cov-fail-under=70
coverage xml -i